Source code for src.mavlite

"""
This file is an attempt to produce a bare-minimum Mavlink UART communication system.
https://docs.python.org/3/library/struct.html

TypesList = {
    "float":"f",
    "double":"d",
    "char":"s",
    "int8_t":"b",
    "int16_t":"h",
    "int32_t":"l",
    "int64_t":"q",
    "uint8_t":"B",
    "uint16_t":"H",
    "uint32_t":"I",
    "uint64_t":"Q"
    }
"""
import gc
import array
import struct
from time import monotonic
try:
    import asyncio
except ImportError:
    import uasyncio as asyncio  # noqa

try:
    from MSGFormats import formats
    from uart import (
        uart_read,
        uart_write,
        write_buffer,
        UART
    )
except ImportError:
    from .MSGFormats import formats
    from .uart import (
        uart_read,
        uart_write,
        write_buffer,
        UART
    )

TERM = False
read_buffer = list()

defs = {
    'sid': 1,
    'cid': 158,
    'cfl': 0,
    'ifl': 0

}


[docs]async def decode_payload(message_id: int, payload: list, format_override: [None, str] = None, debug: bool = False): """ Uses the indexed format to decode the contents of an incoming payload. """ if not format_override: _format = "<" + formats[message_id][0] else: _format = "<" + format_override p_len = struct.calcsize(_format) if p_len > len(payload): diff = p_len - len(payload) suffix = [0] * diff # Pad to fit. payload.extend(suffix) if debug: print('using format:', _format) print('payload length:', len(payload), 'payload', *payload) payload = struct.unpack(_format, bytes(payload)) # Decode Order if message_id <= 255: tmpPayload = payload[:] for i in range(0, len(payload)): payload[i] = tmpPayload[formats[message_id][2][i]] # noqa await asyncio.sleep(0.00021) return payload
[docs]class X25crc: """ Improved CRC code. """ def __init__(self): self.crc = 0xffff
[docs] async def create(self, buf=None): """ Perform CRC creation actions independent of class init. """ self.crc = 0xffff if buf is not None: if isinstance(buf, str): await self.accumulate_str(buf) else: await self.accumulate(buf) return self
[docs] async def accumulate(self, buf): """ Add in rolling byte-chunks. """ accum = self.crc for b in buf: tmp = b ^ (accum & 0xff) tmp = (tmp ^ (tmp << 4)) & 0xFF accum = (accum >> 8) ^ (tmp << 8) ^ (tmp << 3) ^ (tmp >> 4) await asyncio.sleep(0.0001) self.crc = accum return self
[docs] async def accumulate_str(self, buf: [str, bytes]): """ Add in rolling byte-chunks. """ try: bytes_array = array.array('B', list(buf)) except TypeError: bytes_array = array.array('B', list(buf.encode())) except AttributeError: raise ValueError await self.accumulate(bytes_array) return self
x25 = X25crc()
[docs]async def crc_check(pack: dict, debug: bool = False) -> bool: """ CRC Checking callback. """ result = False chk = pack['chk'] crc = pack['crc'] try: await x25.create(chk) crc_extra = formats[pack['message_id']][1] await x25.accumulate_str(struct.pack("B", crc_extra)) # noqa _crc = x25.crc if debug: print(crc, _crc) if crc == _crc: result = True if debug and not result: print('CRC check failed') elif debug: print('CRC check passed') except KeyError: if debug: print('Skipping CRC for unincluded message_id', pack['message_id']) return result
[docs]class Packet: """ Construct/deconstruct Mavlink packets. """ packet = None magic = 0xfd # Start. p_len = 0x00 # 0-255 Payload length. incompat = 0x00 # 0-255 Incompatibility Flags: https://mavlink.io/en/guide/serialization.html#incompat_flags compat = 0x00 # 0-255 Compatibility Flags: https://mavlink.io/en/guide/serialization.html#compat_flags psn = 0 # 0-255 Packet sequence number. sid = 0x01 # 1-255 System ID (sender). cid = 0x01 # 1-255 Component ID (sender): https://mavlink.io/en/messages/common.html#MAV_COMPONENT mid = 0x00 # 0-16777215 Message ID (low, middle, high bytes) payload: bytes = bytes() # Payload Max 255 bytes: https://mavlink.io/en/guide/serialization.html#payload next_payload: bytes = bytes() # A place to store payload information that is larger than the allowed size. crc = 0xffff # Checksum (low byte, high byte): https://mavlink.io/en/guide/serialization.html#checksum sig = 0x00 # Optional signature: https://mavlink.io/en/guide/message_signing.html crc_extra = 0x00 header = bytes() x25 = X25crc() def __init__(self): self.dummy = None
[docs] async def reset(self): """ Clear all variables to save memory. Also increment the PSN. """ self.p_len = 0x00 self.incompat = 0x00 self.compat = 0x00 self.sid = 0x01 self.cid = 0x01 self.mid = 0x00 self.crc = 0xffff self.sig = 0x00 self.payload: bytes = bytes() self.crc_extra = 0x00 self.x25 = X25crc() self.header = bytes() self.payload = bytes() self.psn += 1 if self.psn > 255: self.psn = 0 return self
[docs] async def create_header(self): """ Constructs a mavlink 2 header. """ self.header = struct.pack( "<BBBBBBBHB", # noqa self.magic, self.p_len, self.incompat, self.compat, self.psn, self.sid, self.cid, self.mid & 0xFFFF, self.mid >> 16, ) return self
[docs] async def truncate(self): """ Strip nullbytes from payload. """ if not isinstance(self.payload, bytes): raise TypeError self.p_len = len(self.payload) while self.p_len > 1 and self.payload[self.p_len - 1] == 0: self.p_len -= 1 await asyncio.sleep(0.0001) self.payload = self.payload[:self.p_len] return self
[docs] async def assemble(self): """ Assembles a packet for transmission. """ await self.create_payload(self.mid, self.payload) await self.create_header() self.packet = bytes(self.header) + bytes(self.payload) await self.x25.create(self.packet[1:]) self.crc_extra = formats[self.mid][1] await self.x25.accumulate_str(struct.pack("B", self.crc_extra)) # noqa self.crc = self.x25.crc self.packet += struct.pack("<H", self.crc) return self
[docs] async def create_payload(self, message_id, payload): """ Pack the user payload according to the specified format. """ _format = formats[message_id][0] if len(payload) != len(_format): print('payload malformed: expected', len(_format), 'arguments, received:', len(payload)) raise ValueError _format = "<" + formats[message_id][0] # Decode Order if message_id <= 255: tmpPayload = payload[:] for i in range(0, len(payload)): payload[formats[message_id][2][i]] = tmpPayload[i] await asyncio.sleep(0.00017) try: self.payload = struct.pack(_format, *payload) except OverflowError as err: print('Unable to pack payload\n', _format, payload) raise err await self.truncate() self.p_len = len(list(self.payload)) return self
[docs] async def create_packet( self, m_id: int, payload: [list, tuple], c_flags: int = 0x00, i_flags: int = 0x00, s_id: int = 0x01, c_id: int = 0x01, ) -> bytes: """ Creates a finished mavlink packet that is ready for transmission. """ await self.reset() self.mid = m_id self.cid = c_id self.sid = s_id self.payload = payload self.compat = c_flags self.incompat = i_flags await self.assemble() return self.packet
[docs] async def send(self): """ This will transmit our shiny new packet. """ if self.packet: write_buffer.append(self.packet) await self.reset() return self
[docs] @staticmethod async def receive(): """ Just returns the read buffer. """ return read_buffer
[docs]class Heartbeat: """ Send a heartbeat packet. """ packet = Packet() def __init__(self): self.type = None
[docs] @staticmethod async def wait(debug: bool = False) -> True: """ Waits for a heartbeat. """ beat = False hold = False now = monotonic() timeout = now + 1 while not beat and not TERM: for idx, message in enumerate(read_buffer): if message['message_id'] == 0: hold = message['increment'] del read_buffer[idx] if debug: print('\n\n\n\nfound heartbeat **************************************************************************\n\n\n\n') await asyncio.sleep(0.00013) if hold: beat = hold if timeout < now: print('\n\n\n\nwarning heartbeat timed out **************************************************************************\n\n\n\n') break await asyncio.sleep(0.0013) return beat
[docs]class Command: """ This will allow us to send commands and read the ACK. """ packet = Packet() def __init__(self, s_id: int, c_id: int): self.s_id = s_id self.c_id = c_id
[docs] @staticmethod async def wait(cmd_id: int, debug: bool = False): """ This will wait for our command ACK response. """ global read_buffer ack = False now = monotonic() timeout = now + 1 while not ack and not TERM: for idx, message in enumerate(read_buffer): if message['message_id'] == 77: if message['payload'] == [cmd_id]: del read_buffer[idx] if debug: print('\n\n\n\nfound ACK **************************************************************************\n\n\n\n') break await asyncio.sleep(0.00016) if timeout < now: if debug: print('\n\n\n\nwarning ACK timed out **************************************************************************\n\n\n\n') await asyncio.sleep(0.0011) return ack
[docs]async def test(_uart): """ This is a simple test method that sends and receives restart commands. """ async def reboot(*args): # noqa """ This is a simple example of an incoming command callback function. """ # A real command should evaluate the 7 bytes of param data that will be passed as args. import microcontroller # noqa microcontroller.reset() # noqa # Any other command would need to return status information. # return 0, 0, 0 m_id = 246 m = MavLink([m_id, 253], callbacks={246: reboot}) async def send(): """Send test command""" await m.heartbeat_wait(True) await m.send_command( command_id=246, target_system=0, target_component=0, params=[1, 0, 0, 0, 0, 0, 0], debug=True ) print('\n\n\n\n----------------------------- IF YOU SEE THIS THERE HAS BEEN AN ERROR -----------------------------------\n\n\n\n') async def main(uart_): """ Test mainloop. """ tasks = await m.io_buffers(uart_, debug=True) snd_cmd = asyncio.create_task(send()) await asyncio.gather(snd_cmd, *tasks) asyncio.run(main(_uart))