The MQTT Client Library is a Python library that provides MQTT functionality with support for multiple payload formats including JSON, compressed JSON with ZSTD, and SparkplugB protocol.
from rdp_mqtt.mqtt_client import MqttClient, MqttSettings
# Configure client
settings = MqttSettings(
host="localhost",
port=1883,
ssl=False,
topic="sensors/+",
payload_parser="json"
)
# Create and setup client
client = MqttClient(settings)
await client.setup()
# Subscribe to messages
async for message_dict in client.subscribe():
print(f"Received: {message_dict}")
# Publish a message
await client.publish({
"sensor": "temperature",
"value": 23.5,
"unit": "celsius"
}, topic="sensors/temperature")
settings = MqttSettings(
host="localhost",
port=1883,
ssl=False,
payload_parser="sparkplug",
sparkplug_group_id="MyGroup",
sparkplug_node_id="MyNode",
sparkplug_device_id="MyDevice",
batch_size=10,
batch_timeout=5.0
)
client = MqttClient(settings)
await client.setup()
# Publish metric (will be encoded as SparkplugB dataset)
# For SparkplugB, a "timestamp" field is required
await client.publish({
"timestamp": 1737800200000, # Unix timestamp in milliseconds
"voltage": 230.0,
"current": 10.5,
"status": "online"
})
The MqttSettings class combines multiple parameter groups:
host: str - MQTT broker hostname (default: "localhost")port: int - MQTT broker port (default: 8883)username: Optional[str] - Username for authentication (default: None)password: Optional[SecretStr] - Password for authentication (default: None)ssl: bool - Enable SSL/TLS (default: True)validate_certificate: bool - Validate SSL certificate (default: True)identifier: str - MQTT client ID (default: "RDP_" + uuid)topic: str - Topic to subscribe to, supports wildcards (default: "#")qos: int - QoS level 0-2 (default: 0)subscribe: bool - Whether to subscribe to topic (default: True)payload_parser: Literal["sparkplug", "json", "json_zstd"] - Payload format (default: "json")field_precision: Optional[int] - Round numeric fields to N decimal places (default: None)batch_size: int - Number of metrics to batch (0 = no batching) (default: 0)batch_timeout: float - Max seconds to wait for partial batch (default: 1.0)sparkplug_group_id: str - Group ID for SparkplugB topics (default: "RDP_group_" + uuid)sparkplug_node_id: str - Node ID for SparkplugB topics (default: "RDP_node_" + uuid)sparkplug_device_id: str - Device ID for SparkplugB topics (default: "RDP_device_" + uuid)The library works with plain Python dictionaries. No special structure is required.
# Simple message
{
"sensor_id": "temp_001",
"value": 23.5,
"unit": "celsius"
}
# Complex message
{
"device": "weather_station",
"measurements": {
"temperature": 23.5,
"humidity": 65.2,
"pressure": 1013.25
},
"location": "Vienna",
"timestamp": "2025-01-15T10:30:00Z"
}
# Array of messages
[
{"sensor": "temp_1", "value": 23.5},
{"sensor": "temp_2", "value": 24.1}
]
When field_precision is set, all float values in the dictionary are rounded:
settings = MqttSettings(field_precision=2)
# Input
{"value": 23.456789, "precise": 1.23456}
# Output (automatically rounded)
{"value": 23.46, "precise": 1.23}
For SparkplugB payload parsing, dictionaries must contain a timestamp field or the current time will be used.
# Valid SparkplugB message
{
"timestamp": 1737800200000, # Unix timestamp in milliseconds or iso time string
"temperature": 25.5,
"humidity": 60.0,
"status": "active"
}
The library can decode different types of SparkplugB messages into dictionaries:
Simple SparkplugB Data:
# Received dictionary from simple SparkplugB message
{
"temperature": 25.5,
"pressure": 1013.25,
# Plus any timestamp information extracted by the decoder
}
Generic DataSet:
# Received dictionary from dataset message
{
"Time": 1737800200000,
"Value": 42.7,
"Quality": 1.0,
# Each dataset column becomes a dictionary key
}
Dewesoft DataSet:
# Received dictionary from Dewesoft dataset
{
"voltage_reading": 230.5, # Field name extracted from metric name
# Plus timestamp and other extracted data
}
All outgoing SparkplugB messages are encoded as datasets:
# Input dictionary
{
"timestamp": 1737800200000, # Required!
"voltage": 230.0,
"current": 15.2,
"power": 3450.0
}
# Gets encoded as SparkplugB dataset message with:
# - Columns: ["voltage", "current", "power"]
# - Row data: [230.0, 15.2, 3450.0]
# - Timestamp: 1737800200000
When batch_size > 0, dictionaries are collected and sent together:
settings = MqttSettings(
batch_size=5, # Send when 5 messages collected
batch_timeout=2.0, # Or after 2 seconds
payload_parser="json"
)
# Multiple publish calls get batched
await client.publish({"sensor": "temp1", "value": 23.5})
await client.publish({"sensor": "temp2", "value": 24.1})
await client.publish({"sensor": "temp3", "value": 22.8})
# ... batched and sent together when size/timeout reached
# Publisher
settings = MqttSettings(
host="broker.example.com",
payload_parser="json",
subscribe=False # Publisher only
)
client = MqttClient(settings)
await client.setup()
await client.publish({
"device_id": "sensor_001",
"temperature": 25.3,
"timestamp": "2025-01-15T10:30:00Z"
}, topic="sensors/temperature")
# Subscriber
settings = MqttSettings(
host="broker.example.com",
topic="sensors/+",
payload_parser="json"
)
client = MqttClient(settings)
await client.setup()
async for message in client.subscribe():
print(f"Device: {message.get('device_id')}")
print(f"Temperature: {message.get('temperature')}")
settings = MqttSettings(
payload_parser="json_zstd", # Automatic compression
batch_size=10,
field_precision=2
)
client = MqttClient(settings)
await client.setup()
# Large messages automatically compressed
await client.publish({
"device": "data_logger",
"large_dataset": list(range(1000)), # Will be compressed
"metadata": "compressed_transmission"
})
settings = MqttSettings(
payload_parser="sparkplug",
sparkplug_group_id="FactoryFloor",
sparkplug_node_id="Line1",
sparkplug_device_id="Sensor01",
batch_size=20,
batch_timeout=5.0
)
client = MqttClient(settings)
await client.setup()
# Send multiple readings - will be batched into single SparkplugB message
for i in range(25):
await client.publish({
"timestamp": int(time.time() * 1000), # Current time in ms
"reading_id": i,
"value": 100 + i * 0.5,
"quality": 1.0 if i % 10 != 0 else 0.8
})
try:
client = MqttClient(settings)
await client.setup()
async for message in client.subscribe():
# Process raw dictionary
process_message(message)
except ConnectionError:
print("MQTT broker connection failed")
except ValueError as e:
print(f"Invalid configuration: {e}")
finally:
await client.shutdown()
__init__(settings: MqttSettings)Create MQTT client with configuration.
async setup() -> NoneInitialize connection and prepare client.
async publish(data: Dict[str, Any], topic: Optional[str] = None) -> NonePublish a dictionary to MQTT.
data must contain timestamp field else the current time is used.async subscribe() -> AsyncGenerator[Dict[str, Any], None]Subscribe and yield received dictionaries.
async shutdown() -> NoneGracefully shutdown the client and flush any pending batches.
get_sparkplug_topic(group_id: str, node_id: str, message_type: str) -> strGenerate SparkplugB topic string.
encode_data_message(metrics: List[Dict], alias_map: Dict = None, use_aliases_only: bool = False) -> bytesEncode dictionaries as SparkplugB DDATA message and use aliases if required from alias_map.
encode_birth_message(metrics: List[Dict], alias_map: Dict = None) -> bytesEncode dictionaries as SparkplugB DBIRTH message and store aliases in alias_map.
decode_mqtt_message(message: MQTTMessage) -> List[Dict[str, Any]]Decode SparkplugB MQTT message to list of dictionaries.