"""# Write your code here :-)"""
import struct
try:
import asyncio
except ImportError:
import uasyncio as asyncio # noqa
magic = [253]
[docs]class UART:
# noinspection GrazieInspection
"""
Wrapper class for machine.UART and busio.UART
"""
try:
mp = True
from machine import UART as uart # noqa
except ImportError:
mp = False
from busio import UART as uart # noqa
def __init__(
self,
s_id: int = 1,
baudrate: int = 57600,
bits: int = 8,
parity: [None, int] = None,
stop: int = 1,
timeout: int = 0,
rxbuf: int = 64,
tx: any = None,
rx: any = None
):
if self.mp:
self.uart = self.uart(s_id, baudrate)
self.uart.init(baudrate, bits=bits, parity=parity, stop=stop, timeout=timeout, tx=tx, rx=rx)
else:
self.uart = self.uart(
tx, rx,
baudrate=baudrate,
bits=bits,
parity=parity,
stop=stop,
timeout=timeout,
receiver_buffer_size=rxbuf
)
[docs] async def read(self, n_bytes: int = 8) -> uart.read:
"""
Read wrapper.
"""
if self.mp:
result = None
Timeout = 3000
while result is None and Timeout:
result = self.uart.read(n_bytes)
Timeout += 1
await asyncio.sleep(0.0002)
return result
else:
return self.uart.read(n_bytes)
[docs] async def readinto(self, buf: bytes) -> uart.readinto:
"""
Readinto wrapper.
"""
return self.readinto(buf)
[docs] async def readline(self) -> uart.readline:
"""
Readline wrapper.
"""
return self.uart.readline()
[docs] async def write(self, buf: [str, bytes]) -> uart.write:
"""
Write wrapper
"""
if not isinstance(buf, bytes):
buf = buf.encode()
return self.uart.write(buf)
[docs] async def deinit(self) -> uart.deinit:
"""
Deinit wrapper.
"""
return self.uart.deinit()
[docs] async def setbaudrate(self, baudrate: int):
"""
Set baudrate wrapper.
"""
if self.mp:
return self.uart.setbaudrate(baudrate)
else:
self.uart.baudrate = baudrate
return self.uart
[docs] async def any(self):
"""
Any wrapper.
"""
if self.mp:
result = self.uart.any()
else:
result = self.uart.in_waiting
if result:
print('bytes waiting', result)
return result
TERM = False
stream = list()
packets = list()
read_buffer = list()
write_buffer = list()
buffer_size = 8
[docs]async def uart_read(_uart: any = None, callback: any = None, debug: bool = False) -> list:
"""
Uart packet listener.
TODO: This needs to take into account message includes so we don't spend all this processing time decoding every
packet.
"""
global stream
global packets
global read_buffer
partial_buff = list()
raw = await _uart.read(64)
if raw:
data = list(raw) # read up to 64 bytes
skip = 0
if data:
for idx, byte in enumerate(range(len(data))):
if data[byte] in magic and not skip: # Check for start condition.
try: # Don't check for another magic byte until after the end of this packet.
p_len = data[idx + 1]
min_length = 12 + p_len
# TODO: We will need to add 13 bytes here if the communication is signed.
skip = min_length
except IndexError:
pass
if stream: # If we have a large partial stored from the last read iteration, finish it.
stream.extend(partial_buff)
packets.append(stream)
elif partial_buff: # If the packets are smaller than the read buffer, handle them independently.
packets.append(partial_buff)
partial_buff = list()
stream = list()
partial_buff.append(data[byte])
if skip:
skip -= 1
await asyncio.sleep(0.0015)
stream.extend(partial_buff)
if len(packets):
for idx, p in enumerate(packets):
try:
if 12 <= len(p) <= 280: # 12 is the minimum MavLink packet length.
if debug:
print('--------------------')
try:
message_id = struct.unpack("H", bytes(p[7:9]))[0]
except (RuntimeError, ValueError) as err:
print('Unable to unpack message_id', err, '\n', p[7:9])
del packets[idx] # Clear memory.
break
pay_end = 10 + p[1]
payload = p[10:pay_end]
try: # Prevent a failure from snowballing into logic down the line.
crc = struct.unpack("H", bytes(p[pay_end:pay_end + 2]))[0]
except RuntimeError as err:
print(err, 'unable to decode crc\n', p, '\n', bytes(p))
del packets[idx] # Clear memory.
break # Prevent a failure from snowballing into logic down the line.
chk = p[1:pay_end]
msg = f'start {p[0]}, length {p[1]}, incompat {p[2]}, compat {p[3]}, seq {p[4]}, sys_id {p[5]}, '
msg += f'comp_id {p[6]}, mes_id {message_id}, '
msg += f'payload {payload}, crc {crc}, \nraw {bytes(p)}'
pack = {
'message_id': message_id,
'system_id': p[5],
'component_id': p[6],
'payload': payload,
'increment': p[4],
'crc': crc,
'chk': chk,
'raw': bytes(p)
}
if callback: # For CRC checking.
if await callback(pack, debug):
read_buffer.append(pack)
read_buffer = read_buffer[-buffer_size:]
else:
msg = f'dropping packet: {p}'
del packets[idx] # Clear memory.
except IndexError:
msg = f'bad_data {p}'
del packets[idx] # Clear memory.
if debug:
print('--------------------\n', 'receiving', msg)
await asyncio.sleep(0.002)
packets = list()
else:
await asyncio.sleep(0.001)
return read_buffer
[docs]async def uart_write(_uart: any, debug: bool = False):
"""
Writes our buffer to the device
"""
global write_buffer
if len(write_buffer):
for idx, packet in enumerate(write_buffer):
if debug:
p = list(packet)
pay_end = 10 + p[1]
msg = f'start {p[0]}, length {p[1]}, incompat {p[2]}, compat {p[3]}, seq {p[4]}, sys_id {p[5]}, '
msg += f'comp_id {p[6]}, mes_id {struct.unpack("H", bytes(p[7:9]))[0]}, '
msg += f'payload {p[10:pay_end]}, crc {struct.unpack("H", bytes(p[pay_end:pay_end + 2]))[0]}'
msg += f'\nraw {bytes(p)}'
print('--------------------\n', 'sending', msg)
await _uart.write(bytes(packet))
del write_buffer[idx]
await asyncio.sleep(0.0017)
else:
await asyncio.sleep(0.023)
return write_buffer
[docs]async def uart_io(_uart: any, callback: any = None, debug: bool = False):
"""
Mainloop for this operation.
"""
await uart_write(_uart, debug)
await uart_read(_uart, callback, debug)
[docs]def test():
"""
Run a test loop
"""
import board # noqa
import asyncio
async def loop():
"""
A while clause...
"""
_uart = UART(tx=board.TX, rx=board.RX, baudrate=115200)
while True:
try:
await uart_io(_uart, True)
except KeyboardInterrupt:
break
await asyncio.sleep(0.00014)
asyncio.run(loop())