src.mavlite module

This file is an attempt to produce a bare-minimum Mavlink UART communication system. https://docs.python.org/3/library/struct.html

TypesList = {

“float”:”f”, “double”:”d”, “char”:”s”, “int8_t”:”b”, “int16_t”:”h”, “int32_t”:”l”, “int64_t”:”q”, “uint8_t”:”B”, “uint16_t”:”H”, “uint32_t”:”I”, “uint64_t”:”Q” }

class src.mavlite.Command(s_id: int, c_id: int)[source]

Bases: object

This will allow us to send commands and read the ACK.

packet = <src.mavlite.Packet object>
async static wait(cmd_id: int, debug: bool = False)[source]

This will wait for our command ACK response.

class src.mavlite.Heartbeat[source]

Bases: object

Send a heartbeat packet.

packet = <src.mavlite.Packet object>
async static wait(debug: bool = False) True[source]

Waits for a heartbeat.

Bases: object

This is where it all comes together babah.

NOTE: When using callbacks for incoming command executions ensure they accept 7 arguments for the params:

callbacks = { 246: shutdown_command_function, <id>: <command> }

On reception the command parser will:

ack_payload = shutdown_command_function(command_parameters_seven_long)

send_ack(payload)

ack_payload body:

[<result>, <progress>, <result_param2>]

ack = None
async ack_wait(command_id: int, debug: bool = False)[source]

This will wait for a command ACK up to a specific timeout.

async command_parser(debug: bool = False)[source]

This will collect incoming commands and fire off their respective callbacks.

confirmation = 0
async heartbeat_wait(debug: bool = False)[source]

Wait for heartbeat packet.

async io_buffers(uart: any, debug: bool = False)[source]

This will return the tasks required to handle UART I/O via our read and write buffers.

async receive()[source]

Read from the incoming buffer… We need to break this out into receive_message and receive_command.

decode COMMAND_ACK: struct.unpack(“<HBBiBB”, bytes(payload))

command | result | progress | result_param2 | target_system | target_component |

https://mavlink.io/en/messages/common.html#MAV_RESULT

async send_command(command_id: int, target_system: int = 1, target_component: int = 1, params: (<class 'int'>, <class 'int'>, <class 'int'>, <class 'int'>, <class 'int'>, <class 'int'>, <class 'int'>) = (0, 0, 0, 0, 0, 0, 0), s_id: int = 1, c_id: int = 158, c_flags: int = 0, i_flags: int = 0, debug: bool = False) Packet[source]

Send a command with a 7 byte payload and wait for ACK.

async send_message(message_id: int, payload: [<class 'list'>, <class 'tuple'>], s_id: int = 1, c_id: int = 158, c_flags: int = 0, i_flags: int = 0, debug: bool = False) Packet[source]

Formats an outgoing message into a Mavlink packet.

term = False
class src.mavlite.Packet[source]

Bases: object

Construct/deconstruct Mavlink packets.

async assemble()[source]

Assembles a packet for transmission.

cid = 1
compat = 0
crc = 65535
crc_extra = 0
async create_header()[source]

Constructs a mavlink 2 header.

async create_packet(m_id: int, payload: [<class 'list'>, <class 'tuple'>], c_flags: int = 0, i_flags: int = 0, s_id: int = 1, c_id: int = 1) bytes[source]

Creates a finished mavlink packet that is ready for transmission.

async create_payload(message_id, payload)[source]

Pack the user payload according to the specified format.

header = b''
incompat = 0
magic = 253
mid = 0
next_payload: bytes = b''
p_len = 0
packet = None
payload: bytes = b''
psn = 0
async static receive()[source]

Just returns the read buffer.

async reset()[source]

Clear all variables to save memory. Also increment the PSN.

async send()[source]

This will transmit our shiny new packet.

sid = 1
sig = 0
async truncate()[source]

Strip nullbytes from payload.

x25 = <src.mavlite.X25crc object>
class src.mavlite.X25crc[source]

Bases: object

Improved CRC code.

async accumulate(buf)[source]

Add in rolling byte-chunks.

async accumulate_str(buf: [<class 'str'>, <class 'bytes'>])[source]

Add in rolling byte-chunks.

async create(buf=None)[source]

Perform CRC creation actions independent of class init.

async src.mavlite.crc_check(pack: dict, debug: bool = False) bool[source]

CRC Checking callback.

async src.mavlite.decode_payload(message_id: int, payload: list, format_override: [None, <class 'str'>] = None, debug: bool = False)[source]

Uses the indexed format to decode the contents of an incoming payload.

async src.mavlite.test(_uart)[source]

This is a simple test method that sends and receives restart commands.