Install
sudo apt update && sudo apt upgrade
sudo apt install -y mosquitto mosquitto-clients
sudo systemctl enable mosquitto.service
sudo systemctl status mosquitto
https://randomnerdtutorials.com/how-to-install-mosquitto-broker-on-raspberry-pi/
Set configuration
sudo nano /etc/mosquitto/mosquitto.conf
listener 1883
allow_anonymous false
password_file /etc/mosquitto/pwfile
Create user and password, such as jmli/123456789
if needed, change file permission with chmod +X
sudo mosquitto_passwd -c /etc/mosquitto/pwfile jmli
Use docker
docker pull eclipse-mosquitto
mkdir -p mosquitto/config
mkdir -p mosquitto/data
mkdir -p mosquitto/log
nano mosquitto/config/mosquitto.conf
persistence true
persistence_location /mosquitto/data
log_dest file /mosquitto/log/mosquitto.log
docker run -it -d --name=mosquitto --privileged \
-p 1883:1883 -p 9001:9001 \
-v $HOME/mosquitto/config/mosquitto.conf:/mosquitto/config/mosquitto.conf \
-v $HOME/mosquitto/data:/mosquitto/data \
-v $HOME/mosquitto/log:/mosquitto/log \
eclipse-mosquitto:latest
附:配置权限
(1)有时为了安全我们希望连接 mosquitto 服务时需要用户名密码,首先修改配置文件(/mosquitto/config/mosquitto.conf),添加以下配置:
# 关闭匿名模式
allow_anonymous false
# 指定密码文件
password_file /mosquitto/config/pwfile.conf
(2)接着执行如下命令进入容器:
docker exec -it mosquitto sh
(3)执行如下命令建立 pwfile.conf 文件,并设置权限:
touch /mosquitto/config/pwfile.conf
chmod -R 755 /mosquitto/config/pwfile.conf
(4)然后使用 mosquitto_passwd 命令创建用户(比如下面我们创建了一个名为 jmli 的用户,密码为 123),添加完毕后执行 exit 退出容器。
mosquitto_passwd -b /mosquitto/config/pwfile.conf jmli 123
docker restart mosquitto
(5)最后执行如下命令启动容器,这样就为 mosquitto 服务增加了权限验证功能,需要使用我们前面创建的用户密码才能连接。
Examples
安全证书
import paho.mqtt.client as paho
import ssl
broker = "mybrokerurl.com"
port = 8883
ca_file = "certs/file.pem"
cert_file = "certs/file.crt"
key_file = "certs/file.key"
client_id = 'client1234'
class MQTTConnector():
def __init__(self):
self.connected = False
def on_connect(self, client, userdata, flags, rc):
self.connected = True
if rc == 0:
print("Connected to MQTT Broker!") # Not being printed in output
else:
print("Failed to connect, return code:", rc)
def bootstrap_mqtt(self):
self.client = paho.Client(client_id=client_id)
self.client.tls_set(
ca_file,
certfile=cert_file,
keyfile=key_file,
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLSv1_2,
ciphers=None
)
self.client.on_connect = self.on_connect
self.client.connect(host=broker, port=port, keepalive=120)
print(self.connected) # Gives False
print(self.client.connect(host=broker, port=port, keepalive=120)) # Gives 0
return self
if __name__ == '__main__':
conn = MQTTConnector()
conn.bootstrap_mqtt()
conn.client.loop_forever()
消息内容转码
import re
import struct
import threading
import time
import uuid
import numpy as np
import paho.mqtt.client as mqtt
import pandas as pd
# from confluent_kafka import Producer
from kafka import KafkaProducer
class Messages(threading.Thread):
def __init__(self, clientname, broker, topic):
super().__init__()
self.broker = broker
self.topic = topic
self.clientname = clientname
self.client = mqtt.Client(self.clientname)
self.client.username_pw_set(username, password)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_subscibe = self.on_subscribe
self.received = ""
self.published = ""
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
print("Drone Connection Established")
else:
print("bad connection Returned code=", rc)
self.client.subscribe(self.topic)
def on_subscribe(self, client, userdata, mid, granted_qos):
print("Subscription complete")
def on_message(self, client, userdata, msg):
print("got a message")
self.received = eval(msg.payload.decode())
print(self.received)
pc = PayloadConverter(self.received)
key_uuid = str(uuid.uuid4()).encode("utf-8")
self.published = pc.normalize_data()
print(self.published)
# producer.send("xxx", self.published)
# producer.send('xxx', key=key_uuid, value=self.published)
def begin(self):
print("Setting up connection")
self.client.connect(self.broker)
self.client.loop_forever()
def end(self):
time.sleep(1)
print("Ending Connection")
self.client.loop_stop()
self.client.disconnect()
class PayloadConverter(object):
def __init__(self, raw_msg):
self.raw_msg_dict = raw_msg[0]
print(self.raw_msg_dict)
self.norm_msg_dict = {}
def normalize_data(self):
value_char = self.raw_msg_dict["value"]
start_reg = self.raw_msg_dict["start_reg"]
end_reg = self.raw_msg_dict["end_reg"]
deviceId = self.raw_msg_dict["deviceId"]
# print("start_reg: ", start_reg, "\n", "end_reg: ", end_reg)
reg_list = np.arange(start_reg, end_reg + 1)
reg_list = [int(x) for x in reg_list]
value_list = re.findall(r".{8}", value_char)
if len(reg_list) != len(value_list):
print("error: lengths of data point / value are different")
value_list_decimal = []
for value_hex_rev in value_list:
value_hex = self.adjust_value_hex_order(value_hex_rev)
value_decimal = struct.unpack(">f", bytes.fromhex(value_hex))[0]
value_decimal = round(value_decimal, 2)
value_list_decimal.append(value_decimal)
value_dict = dict(zip(reg_list, value_list_decimal))
# Reformat message as requested
new_struct = pd.DataFrame(
list(value_dict.items()), columns=["tagCode", "value"]
)
new_struct["tagCode"] = new_struct["tagCode"].apply(lambda v: f"{v+1:04d}")
if deviceId == "XXX":
new_struct["tagCode"] = "XXX" + new_struct["tagCode"].astype(str)
else:
new_struct["tagCode"] = "XXX" + new_struct["tagCode"].astype(str)
new_struct["mqtt_topic"] = "data_value"
new_struct["ts"] = int(round(time.time() * 1000000000))
new_struct["quality"] = 197
new_struct["type"] = 7
new_struct["deviceId"] = "XXX"
new_struct["value"] = new_struct["value"].astype(str)
return new_struct.to_json(orient="records").encode("utf-8")
@staticmethod
def adjust_value_hex_order(value_hex_rev):
value_hex = ""
if len(value_hex_rev):
byte_list = re.findall(r".{2}", value_hex_rev)
byte_list.reverse()
value_hex = "".join(byte_list)
return value_hex
broker = "127.0.0.1"
username = "admin"
password = "123456"
port = 1883
client_id = "m2k"
topic = "topic"
producer = KafkaProducer(
bootstrap_servers=[
"127.1.1.3:9092",
"127.1.1.4:9092",
"127.1.1.2:9092",
],
)
remote = Messages(clientname="m2k", broker=broker, topic=topic)
remote.begin()
限定订阅消息数量
import paho.mqtt.subscribe as subscribe
msg = subscribe.simple(
"/edge/MIDEAHQ/F26/rtg",
hostname="192.168.1.1",
port=1883,
msg_count=100,
auth={'username': "name", 'password': "password"},
)
messages = []
for item in msg:
messages.append(item.payload)