异步串口通信与文件压缩,Python的无缝结合

阿琳的代码小屋 2025-03-16 08:00:30

轻松实现数据存储与传输的完美融合

在如今的编程世界里,Python 是一个备受欢迎的编程语言,因其简洁易懂的语法和强大的库支持而深受开发者喜爱。今天要聊的是两个 Python 库,pyserial-asyncio 和 zipfile。pyserial-asyncio 用于处理非阻塞串口通信,特别适合需要实时数据处理的应用。而 zipfile 则是一个创建、读取、写入 ZIP 文件的库,方便进行文件的压缩和解压。将这两个库结合起来,可以实现数据的异步传输与高效存储,接下来我将具体介绍这个组合的功能和实现方式。

当我们把 pyserial-asyncio 和 zipfile 这两个库放在一起使用时,能搞定很多具有挑战性的项目。比如,我们可以实现异步读取串口数据并将其压缩存储;构建实时数据监测系统,实时采集数据并打包保存;以及创建数据备份工具,周期性将实时数据压缩存储。这里,我将给出几个示例代码,帮助大家更好地理解。

首先,我们看一个异步读取串口数据并将其压缩存储的例子。这个例子读取传感器的数据,并将这些数据实时压缩成一个 ZIP 文件。

import asyncioimport serial_asyncioimport zipfileimport osclass SerialReader(asyncio.Protocol):    def __init__(self):        self.buffer = []        def connection_made(self, transport):        self.transport = transport    def data_received(self, data):        self.buffer.append(data.decode())        print(f"Received: {data.decode()}")    def connection_lost(self, exc):        passasync def read_serial_data(port, baudrate):    loop = asyncio.get_running_loop()    reader = SerialReader()    await serial_asyncio.create_serial_connection(loop, lambda: reader, port, baudrate)async def compress_data(zip_filename, data):    with zipfile.ZipFile(zip_filename, 'w') as zip_file:        zip_file.writestr('sensor_data.txt', '\n'.join(data).encode())async def main():    port = '/dev/ttyUSB0'    baudrate = 9600    await read_serial_data(port, baudrate)    # 假设读取了一段数据    data = ['Sensor reading 1', 'Sensor reading 2', 'Sensor reading 3']        # 压缩数据    await compress_data('sensor_data.zip', data)if __name__ == '__main__':    asyncio.run(main())

这里,SerialReader 负责连接和读取串口数据,然后也会将读取到的数据存入一个列表。compress_data 函数则将这些数据压缩成一个 ZIP 文件。在实际应用中,可以通过不断读取数据并定期压缩存档,从而保持占用空间的可控。

接下来,我们来看一个实时数据监测系统的例子,这个系统会在每次获取数据时发送通知,并将数据打包保存。

import asyncioimport serial_asyncioimport zipfileimport smtplibfrom email.mime.text import MIMETextclass SerialReader(asyncio.Protocol):    def __init__(self):        self.buffer = []    def connection_made(self, transport):        self.transport = transport    def data_received(self, data):        self.buffer.append(data.decode())        print(f"Received: {data.decode()}")        asyncio.create_task(send_notification(data.decode()))    def connection_lost(self, exc):        passasync def send_notification(message):    sender = 'your_email@example.com'    receiver = 'receiver_email@example.com'    msg = MIMEText(f"New data received: {message}")    msg['Subject'] = 'Sensor Data Notification'    msg['From'] = sender    msg['To'] = receiver    with smtplib.SMTP('smtp.example.com') as server:        server.login('username', 'password')        server.sendmail(sender, [receiver], msg.as_string())async def compress_data(zip_filename, data):    with zipfile.ZipFile(zip_filename, 'w') as zip_file:        zip_file.writestr('sensor_data.txt', '\n'.join(data).encode())async def main():    port = '/dev/ttyUSB0'    baudrate = 9600    await read_serial_data(port, baudrate)        # 假设读取了一段数据    data = ['Sensor reading 1', 'Sensor reading 2']        # 压缩数据到 ZIP    await compress_data('sensor_data.zip', data)if __name__ == '__main__':    asyncio.run(main())

在这个例子中,SerialReader 读取数据后,会立即调用 send_notification 函数,通过邮件通知用户新数据已收到。在应用中,用户能够实时了解传感器的数据变化。同时,这个程序在列表中保存读取的数据,并进行压缩,方便后续使用。

最后,我们看一个数据备份工具的示例,这个工具会定期将读取的数据进行压缩存储,防止数据丢失。

import asyncioimport serial_asyncioimport zipfileimport osclass SerialReader(asyncio.Protocol):    def __init__(self):        self.buffer = []    def connection_made(self, transport):        self.transport = transport    def data_received(self, data):        self.buffer.append(data.decode())        print(f"Received: {data.decode()}")    def connection_lost(self, exc):        passasync def compress_data(zip_filename, data):    with zipfile.ZipFile(zip_filename, 'a') as zip_file:        zip_file.writestr('sensor_data.txt', '\n'.join(data).encode())async def backup_data_periodically(interval, port, baudrate):    while True:        await asyncio.sleep(interval)        await compress_data('sensor_data_backup.zip', reader.buffer)        reader.buffer.clear()  # 清空缓冲区async def read_serial_data(port, baudrate):    loop = asyncio.get_running_loop()    reader = SerialReader()    await serial_asyncio.create_serial_connection(loop, lambda: reader, port, baudrate)async def main():    port = '/dev/ttyUSB0'    baudrate = 9600    await read_serial_data(port, baudrate)    await backup_data_periodically(60, port, baudrate) if __name__ == '__main__':    asyncio.run(main())

在这个例子中,backup_data_periodically 每隔 60 秒就会将读取到的数据压缩存储到一个 ZIP 文件中,这样在处理大量数据时就不会丢失。程序中的 reader.buffer.clear() 可以将缓冲区清空,确保下次备份时不会重复存储。

使用这两个库时,有几个可能遇到的问题。可能会遇到串口连接失败,解决方法是检查串口设备是否连接正确;也可能在压缩文件时遇到权限问题,确保程序有写入文件的权限;还需关注数据的丢失问题,确保数据在压缩前都已正确读取。还有,读取速度与压缩频率需要合理安排,以防止程序阻塞或过多开销。

这些示例展示了如何结合 pyserial-asyncio 和 zipfile 来实现高效的项目功能。在使用中,也欢迎大家留言提问或分享经验,期待大家能在这条路上一起进步!你能创造出更多有趣的应用,相信整个过程会充满乐趣。学习编程的过程中,思考和动手实践最重要,祝你在 Python 的旅程中不断发现新乐趣!

0 阅读:0