IoT 设备开发指南
概述
py-xiaozhi 项目采用基于 Thing Pattern 的 IoT 设备架构,提供统一的设备抽象和管理接口。所有设备都继承自 Thing 基类,通过 ThingManager 进行集中管理。该架构支持异步操作、类型安全的参数处理和状态管理。
重要说明: 当前项目正在从 IoT 设备模式向 MCP (Model Context Protocol) 工具模式迁移,部分设备功能已迁移到 MCP 工具系统。
核心架构
目录结构
src/iot/
├── thing.py # 核心基类和工具类
│ ├── Thing # 设备抽象基类
│ ├── Property # 设备属性类
│ ├── Method # 设备方法类
│ ├── Parameter # 方法参数类
│ └── ValueType # 参数类型枚举
├── thing_manager.py # 设备管理器
│ └── ThingManager # 单例设备管理器
└── things/ # 设备实现
├── lamp.py # 灯光设备
├── speaker.py # 音响设备
├── music_player.py # 音乐播放器
├── countdown_timer.py # 倒计时器
└── CameraVL/ # 摄像头设备
├── Camera.py
└── VL.py
核心组件
1. Thing 基类
Thing 是所有 IoT 设备的抽象基类,提供统一的接口规范:
python
from src.iot.thing import Thing, Parameter, ValueType
class Thing:
def __init__(self, name: str, description: str):
self.name = name
self.description = description
self.properties = {}
self.methods = {}
def add_property(self, name: str, description: str, getter: Callable)
def add_method(self, name: str, description: str, parameters: List[Parameter], callback: Callable)
async def get_descriptor_json(self) -> str
async def get_state_json(self) -> str
async def invoke(self, command: dict) -> dict
关键要求:
- 所有属性 getter 函数必须是异步的 (
async def
) - 所有方法 callback 函数必须是异步的
- 设备名称必须全局唯一
2. Property 属性系统
Property 用于定义设备的可读状态,支持自动类型推断:
python
class Property:
def __init__(self, name: str, description: str, getter: Callable):
# 强制要求getter必须是异步函数
if not inspect.iscoroutinefunction(getter):
raise TypeError(f"Property getter for '{name}' must be an async function.")
支持的属性类型:
boolean
: 布尔值number
: 整数string
: 字符串float
: 浮点数array
: 数组object
: 对象
3. Method 方法系统
Method 用于定义设备的可执行操作:
python
class Method:
def __init__(self, name: str, description: str, parameters: List[Parameter], callback: Callable):
# 强制要求回调函数必须是异步函数
if not inspect.iscoroutinefunction(callback):
raise TypeError(f"Method callback for '{name}' must be an async function.")
4. Parameter 参数系统
Parameter 定义方法的参数规范:
python
class Parameter:
def __init__(self, name: str, description: str, type_: str, required: bool = True):
self.name = name
self.description = description
self.type = type_
self.required = required
def get_value(self):
return self.value
支持的参数类型:
ValueType.BOOLEAN
: 布尔值ValueType.NUMBER
: 整数ValueType.STRING
: 字符串ValueType.FLOAT
: 浮点数ValueType.ARRAY
: 数组ValueType.OBJECT
: 对象
5. ThingManager 管理器
ThingManager 采用单例模式,负责设备的注册、状态管理和方法调用:
python
from src.iot.thing_manager import ThingManager
class ThingManager:
@classmethod
def get_instance(cls):
if cls._instance is None:
cls._instance = ThingManager()
return cls._instance
def add_thing(self, thing: Thing)
async def get_states_json(self, delta: bool = False) -> Tuple[bool, str]
async def get_descriptors_json(self) -> str
async def invoke(self, command: dict) -> dict
核心功能:
- 设备注册和管理
- 状态缓存和增量更新
- 方法调用分发
- 设备描述信息查询
设备实现模式
1. 基础设备 - Lamp
最简单的设备实现模式:
python
import asyncio
from src.iot.thing import Thing
class Lamp(Thing):
def __init__(self):
super().__init__("Lamp", "一个测试用的灯")
self.power = False
# 注册属性 - getter必须是异步函数
self.add_property("power", "灯是否打开", self.get_power)
# 注册方法 - callback必须是异步函数
self.add_method("TurnOn", "打开灯", [], self._turn_on)
self.add_method("TurnOff", "关闭灯", [], self._turn_off)
async def get_power(self):
return self.power
async def _turn_on(self, params):
self.power = True
return {"status": "success", "message": "灯已打开"}
async def _turn_off(self, params):
self.power = False
return {"status": "success", "message": "灯已关闭"}
2. 带参数设备 - Speaker
带参数验证的设备实现:
python
import asyncio
from src.iot.thing import Thing, Parameter, ValueType
from src.utils.volume_controller import VolumeController
class Speaker(Thing):
def __init__(self):
super().__init__("Speaker", "当前 AI 机器人的扬声器")
# 初始化音量控制器
self.volume_controller = None
try:
if VolumeController.check_dependencies():
self.volume_controller = VolumeController()
self.volume = self.volume_controller.get_volume()
else:
self.volume = 70
except Exception:
self.volume = 70
# 注册属性
self.add_property("volume", "当前音量值", self.get_volume)
# 注册带参数的方法
self.add_method(
"SetVolume",
"设置音量",
[Parameter("volume", "0到100之间的整数", ValueType.NUMBER, True)],
self._set_volume,
)
async def get_volume(self):
if self.volume_controller:
try:
self.volume = self.volume_controller.get_volume()
except Exception:
pass
return self.volume
async def _set_volume(self, params):
# 从Parameter对象获取值
volume = params["volume"].get_value()
# 参数验证
if not (0 <= volume <= 100):
raise ValueError("音量必须在0-100之间")
self.volume = volume
try:
if self.volume_controller:
# 异步调用系统API
await asyncio.to_thread(self.volume_controller.set_volume, volume)
return {"success": True, "message": f"音量已设置为: {volume}"}
except Exception as e:
return {"success": False, "message": f"设置音量失败: {e}"}
3. 复杂设备 - CountdownTimer
异步任务管理的设备实现:
python
import asyncio
import json
from typing import Dict
from asyncio import Task
from src.iot.thing import Thing, Parameter
from src.iot.thing_manager import ThingManager
class CountdownTimer(Thing):
def __init__(self):
super().__init__("CountdownTimer", "一个用于延迟执行命令的倒计时器")
# 任务管理
self._timers: Dict[int, Task] = {}
self._next_timer_id = 0
self._lock = asyncio.Lock()
# 注册方法
self.add_method(
"StartCountdown",
"启动一个倒计时,结束后执行指定命令",
[
Parameter("command", "要执行的IoT命令 (JSON格式字符串)", "string", True),
Parameter("delay", "延迟时间(秒),默认为5秒", "integer", False)
],
self._start_countdown,
)
self.add_method(
"CancelCountdown",
"取消指定的倒计时",
[Parameter("timer_id", "要取消的计时器ID", "integer", True)],
self._cancel_countdown,
)
async def _start_countdown(self, params_dict):
# 处理必需参数
command_param = params_dict.get("command")
command_str = command_param.get_value() if command_param else None
if not command_str:
return {"status": "error", "message": "缺少 'command' 参数值"}
# 处理可选参数
delay_param = params_dict.get("delay")
delay = (
delay_param.get_value()
if delay_param and delay_param.get_value() is not None
else 5
)
# 验证命令格式
try:
json.loads(command_str)
except json.JSONDecodeError:
return {"status": "error", "message": "命令格式错误,无法解析JSON"}
# 创建异步任务
async with self._lock:
timer_id = self._next_timer_id
self._next_timer_id += 1
task = asyncio.create_task(
self._delayed_execution(delay, timer_id, command_str)
)
self._timers[timer_id] = task
return {
"status": "success",
"message": f"倒计时 {timer_id} 已启动,将在 {delay} 秒后执行",
"timer_id": timer_id
}
async def _delayed_execution(self, delay: int, timer_id: int, command_str: str):
try:
await asyncio.sleep(delay)
# 执行命令
command_dict = json.loads(command_str)
thing_manager = ThingManager.get_instance()
result = await thing_manager.invoke(command_dict)
print(f"倒计时 {timer_id} 执行结果: {result}")
except asyncio.CancelledError:
print(f"倒计时 {timer_id} 被取消")
finally:
async with self._lock:
self._timers.pop(timer_id, None)
async def _cancel_countdown(self, params_dict):
timer_id_param = params_dict.get("timer_id")
timer_id = timer_id_param.get_value() if timer_id_param else None
if timer_id is None:
return {"status": "error", "message": "缺少 'timer_id' 参数值"}
async with self._lock:
if timer_id in self._timers:
task = self._timers.pop(timer_id)
task.cancel()
return {"status": "success", "message": f"倒计时 {timer_id} 已取消"}
else:
return {"status": "error", "message": "计时器不存在"}
4. 多线程设备 - Camera
集成多线程和外部服务的设备实现:
python
import threading
import base64
import cv2
from src.iot.thing import Thing
from src.iot.things.CameraVL.VL import ImageAnalyzer
class Camera(Thing):
def __init__(self):
super().__init__("Camera", "摄像头管理")
self.cap = None
self.is_running = False
self.camera_thread = None
self.result = ""
# 初始化VL分析器
self.VL = ImageAnalyzer.get_instance()
# 注册属性
self.add_property("power", "摄像头是否打开", self.get_power)
self.add_property("result", "识别画面的内容", self.get_result)
# 注册方法
self.add_method("start_camera", "打开摄像头", [], self.start_camera)
self.add_method("stop_camera", "关闭摄像头", [], self.stop_camera)
self.add_method("capture_frame_to_base64", "识别画面", [], self.capture_frame_to_base64)
async def get_power(self):
return self.is_running
async def get_result(self):
return self.result
async def start_camera(self, params):
if self.camera_thread and self.camera_thread.is_alive():
return {"status": "info", "message": "摄像头已在运行"}
self.camera_thread = threading.Thread(target=self._camera_loop, daemon=True)
self.camera_thread.start()
return {"status": "success", "message": "摄像头已启动"}
async def stop_camera(self, params):
self.is_running = False
if self.camera_thread:
self.camera_thread.join()
self.camera_thread = None
return {"status": "success", "message": "摄像头已停止"}
async def capture_frame_to_base64(self, params):
if not self.cap or not self.cap.isOpened():
return {"status": "error", "message": "摄像头未打开"}
ret, frame = self.cap.read()
if not ret:
return {"status": "error", "message": "无法读取画面"}
# 转换为base64
_, buffer = cv2.imencode('.jpg', frame)
frame_base64 = base64.b64encode(buffer).decode('utf-8')
# 使用VL分析器识别画面
self.result = str(self.VL.analyze_image(frame_base64))
return {"status": "success", "result": self.result}
def _camera_loop(self):
self.cap = cv2.VideoCapture(0)
if not self.cap.isOpened():
return
self.is_running = True
while self.is_running:
ret, frame = self.cap.read()
if not ret:
break
cv2.imshow("Camera", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
self.is_running = False
self.cap.release()
cv2.destroyAllWindows()
设备注册和管理
1. 设备注册
在应用程序启动时注册设备:
python
from src.iot.thing_manager import ThingManager
from src.iot.things.lamp import Lamp
from src.iot.things.speaker import Speaker
def initialize_iot_devices():
# 获取设备管理器实例
manager = ThingManager.get_instance()
# 注册设备
manager.add_thing(Lamp())
manager.add_thing(Speaker())
print(f"已注册 {len(manager.things)} 个设备")
2. 设备状态查询
python
# 获取所有设备状态
changed, states = await manager.get_states_json(delta=False)
print(f"设备状态: {states}")
# 获取变化的状态(增量更新)
changed, delta_states = await manager.get_states_json(delta=True)
if changed:
print(f"状态变化: {delta_states}")
3. 设备方法调用
python
# 调用设备方法
command = {
"name": "Lamp",
"method": "TurnOn",
"parameters": {}
}
result = await manager.invoke(command)
print(f"执行结果: {result}")
# 带参数的方法调用
command = {
"name": "Speaker",
"method": "SetVolume",
"parameters": {"volume": 80}
}
result = await manager.invoke(command)
开发最佳实践
1. 异步编程
所有属性 getter 和方法 callback 必须是异步函数:
python
# 正确的异步属性
async def get_power(self):
return self.power
# 正确的异步方法
async def turn_on(self, params):
self.power = True
return {"status": "success"}
# 错误:同步函数会抛出异常
def get_power(self): # TypeError!
return self.power
2. 参数处理
正确处理必需和可选参数:
python
async def my_method(self, params):
# 处理必需参数
required_value = params["required_param"].get_value()
# 处理可选参数
optional_value = None
if "optional_param" in params:
optional_value = params["optional_param"].get_value()
# 参数验证
if not isinstance(required_value, str):
return {"status": "error", "message": "参数类型错误"}
return {"status": "success", "result": required_value}
3. 错误处理
实现适当的错误处理:
python
async def risky_operation(self, params):
try:
# 执行可能失败的操作
result = await self.perform_operation()
return {"status": "success", "result": result}
except ValueError as e:
return {"status": "error", "message": f"参数错误: {e}"}
except Exception as e:
logger.error(f"操作失败: {e}", exc_info=True)
return {"status": "error", "message": "操作失败"}
4. 资源管理
正确管理设备资源:
python
class MyDevice(Thing):
def __init__(self):
super().__init__("MyDevice", "我的设备")
self.resource = None
self._lock = asyncio.Lock()
async def acquire_resource(self, params):
async with self._lock:
if self.resource is None:
self.resource = await self.create_resource()
return {"status": "success"}
async def cleanup(self):
"""设备清理方法"""
if self.resource:
await self.resource.close()
self.resource = None
5. 日志记录
使用统一的日志系统:
python
from src.utils.logging_config import get_logger
class MyDevice(Thing):
def __init__(self):
super().__init__("MyDevice", "我的设备")
self.logger = get_logger(self.__class__.__name__)
async def my_method(self, params):
self.logger.info("方法被调用")
try:
result = await self.perform_operation()
self.logger.info(f"操作成功: {result}")
return {"status": "success", "result": result}
except Exception as e:
self.logger.error(f"操作失败: {e}", exc_info=True)
return {"status": "error", "message": str(e)}
迁移说明
重要提示: 项目正在从 IoT 设备模式向 MCP (Model Context Protocol) 工具模式迁移:
- 倒计时器: 已迁移到 MCP 工具,提供更好的 AI 集成
- 其他设备: 根据复杂度可能会陆续迁移
- 新功能: 建议考虑直接使用 MCP 工具框架
当前 IoT 架构仍然稳定可用,适合:
- 简单的设备控制
- 学习和演示
- 快速原型开发
注意事项
- 异步要求: 所有属性 getter 和方法 callback 必须是异步函数
- 参数处理: 方法参数通过 Parameter 对象传递,需要调用
get_value()
获取值 - 错误处理: 实现适当的错误处理和反馈机制
- 资源管理: 正确管理设备资源,避免资源泄漏
- 设备名称: 确保设备名称全局唯一
- 返回格式: 方法返回应包含
status
和message
字段
总结
py-xiaozhi 的 IoT 架构提供了完整的设备抽象和管理框架,支持异步操作、类型安全和状态管理。通过遵循本指南的最佳实践,可以快速开发出稳定可靠的 IoT 设备。随着项目向 MCP 工具模式迁移,建议新功能考虑使用 MCP 工具框架。