Mosquitto

Install

sudo apt update && sudo apt upgrade
sudo apt install -y mosquitto mosquitto-clients
sudo systemctl enable mosquitto.service
sudo systemctl status mosquitto

https://randomnerdtutorials.com/how-to-install-mosquitto-broker-on-raspberry-pi/

Set configuration

sudo nano /etc/mosquitto/mosquitto.conf

listener 1883
allow_anonymous false
password_file /etc/mosquitto/pwfile

Create user and password, such as jmli/123456789

if needed, change file permission with chmod +X

sudo mosquitto_passwd -c /etc/mosquitto/pwfile jmli

Use docker

docker pull eclipse-mosquitto
mkdir -p mosquitto/config
mkdir -p mosquitto/data
mkdir -p mosquitto/log
nano mosquitto/config/mosquitto.conf
persistence true
persistence_location /mosquitto/data
log_dest file /mosquitto/log/mosquitto.log
docker run -it -d --name=mosquitto --privileged \
-p 1883:1883 -p 9001:9001 \
-v $HOME/mosquitto/config/mosquitto.conf:/mosquitto/config/mosquitto.conf \
-v $HOME/mosquitto/data:/mosquitto/data \
-v $HOME/mosquitto/log:/mosquitto/log \
eclipse-mosquitto:latest

附:配置权限

(1)有时为了安全我们希望连接 mosquitto 服务时需要用户名密码,首先修改配置文件(/mosquitto/config/mosquitto.conf),添加以下配置:

# 关闭匿名模式
allow_anonymous false
# 指定密码文件
password_file /mosquitto/config/pwfile.conf

(2)接着执行如下命令进入容器:

docker exec -it mosquitto sh

(3)执行如下命令建立 pwfile.conf 文件,并设置权限:

touch /mosquitto/config/pwfile.conf
chmod -R 755 /mosquitto/config/pwfile.conf

(4)然后使用 mosquitto_passwd 命令创建用户(比如下面我们创建了一个名为 jmli 的用户,密码为 123),添加完毕后执行 exit 退出容器。

mosquitto_passwd -b /mosquitto/config/pwfile.conf jmli 123
docker restart mosquitto

(5)最后执行如下命令启动容器,这样就为 mosquitto 服务增加了权限验证功能,需要使用我们前面创建的用户密码才能连接。

Examples

安全证书

import paho.mqtt.client as paho
import ssl

broker = "mybrokerurl.com"
port = 8883
ca_file = "certs/file.pem"
cert_file = "certs/file.crt"
key_file = "certs/file.key"
client_id = 'client1234'

class MQTTConnector():
    def __init__(self):
        self.connected = False

    def on_connect(self, client, userdata, flags, rc):
        self.connected = True
        if rc == 0:
            print("Connected to MQTT Broker!") # Not being printed in output
        else:
            print("Failed to connect, return code:", rc)

    def bootstrap_mqtt(self):
        self.client = paho.Client(client_id=client_id)

        self.client.tls_set(
            ca_file,
            certfile=cert_file,
            keyfile=key_file,
            cert_reqs=ssl.CERT_REQUIRED,
            tls_version=ssl.PROTOCOL_TLSv1_2,
            ciphers=None
            )

        self.client.on_connect = self.on_connect

        self.client.connect(host=broker, port=port, keepalive=120)

        print(self.connected) # Gives False
        print(self.client.connect(host=broker, port=port, keepalive=120)) # Gives 0

        return self

if __name__ == '__main__':
    conn = MQTTConnector()
    conn.bootstrap_mqtt()
    conn.client.loop_forever()

消息内容转码

import re
import struct
import threading
import time
import uuid

import numpy as np
import paho.mqtt.client as mqtt
import pandas as pd

# from confluent_kafka import Producer
from kafka import KafkaProducer


class Messages(threading.Thread):
    def __init__(self, clientname, broker, topic):
        super().__init__()
        self.broker = broker
        self.topic = topic
        self.clientname = clientname
        self.client = mqtt.Client(self.clientname)
        self.client.username_pw_set(username, password)
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.on_subscibe = self.on_subscribe
        self.received = ""
        self.published = ""

    def on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            print("Drone Connection Established")
        else:
            print("bad connection Returned code=", rc)
        self.client.subscribe(self.topic)

    def on_subscribe(self, client, userdata, mid, granted_qos):
        print("Subscription complete")

    def on_message(self, client, userdata, msg):
        print("got a message")
        self.received = eval(msg.payload.decode())
        print(self.received)

        pc = PayloadConverter(self.received)
        key_uuid = str(uuid.uuid4()).encode("utf-8")
        self.published = pc.normalize_data()
        print(self.published)

        # producer.send("xxx", self.published)
        # producer.send('xxx', key=key_uuid, value=self.published)

    def begin(self):
        print("Setting up connection")
        self.client.connect(self.broker)
        self.client.loop_forever()

    def end(self):
        time.sleep(1)
        print("Ending Connection")
        self.client.loop_stop()
        self.client.disconnect()

class PayloadConverter(object):
    def __init__(self, raw_msg):
        self.raw_msg_dict = raw_msg[0]
        print(self.raw_msg_dict)
        self.norm_msg_dict = {}

    def normalize_data(self):
        value_char = self.raw_msg_dict["value"]
        start_reg = self.raw_msg_dict["start_reg"]
        end_reg = self.raw_msg_dict["end_reg"]
        deviceId = self.raw_msg_dict["deviceId"]
        # print("start_reg: ", start_reg, "\n", "end_reg: ", end_reg)
        reg_list = np.arange(start_reg, end_reg + 1)
        reg_list = [int(x) for x in reg_list]

        value_list = re.findall(r".{8}", value_char)

        if len(reg_list) != len(value_list):
            print("error: lengths of data point / value are different")

        value_list_decimal = []
        for value_hex_rev in value_list:
            value_hex = self.adjust_value_hex_order(value_hex_rev)
            value_decimal = struct.unpack(">f", bytes.fromhex(value_hex))[0]
            value_decimal = round(value_decimal, 2)
            value_list_decimal.append(value_decimal)

        value_dict = dict(zip(reg_list, value_list_decimal))

        # Reformat message as requested
        new_struct = pd.DataFrame(
            list(value_dict.items()), columns=["tagCode", "value"]
        )
        new_struct["tagCode"] = new_struct["tagCode"].apply(lambda v: f"{v+1:04d}")
        if deviceId == "XXX":
            new_struct["tagCode"] = "XXX" + new_struct["tagCode"].astype(str)
        else:
            new_struct["tagCode"] = "XXX" + new_struct["tagCode"].astype(str)
        new_struct["mqtt_topic"] = "data_value"
        new_struct["ts"] = int(round(time.time() * 1000000000))
        new_struct["quality"] = 197
        new_struct["type"] = 7
        new_struct["deviceId"] = "XXX"
        new_struct["value"] = new_struct["value"].astype(str)

        return new_struct.to_json(orient="records").encode("utf-8")

    @staticmethod
    def adjust_value_hex_order(value_hex_rev):
        value_hex = ""
        if len(value_hex_rev):
            byte_list = re.findall(r".{2}", value_hex_rev)
            byte_list.reverse()
            value_hex = "".join(byte_list)
        return value_hex

broker = "127.0.0.1"
username = "admin"
password = "123456"
port = 1883
client_id = "m2k"
topic = "topic"
producer = KafkaProducer(
    bootstrap_servers=[
        "127.1.1.3:9092",
        "127.1.1.4:9092",
        "127.1.1.2:9092",
    ],
)

remote = Messages(clientname="m2k", broker=broker, topic=topic)
remote.begin()

限定订阅消息数量

import paho.mqtt.subscribe as subscribe

msg = subscribe.simple(
    "/edge/MIDEAHQ/F26/rtg",
    hostname="192.168.1.1",
    port=1883,
    msg_count=100,
    auth={'username': "name", 'password': "password"},
)

messages = []
for item in msg:
    messages.append(item.payload)
Previous