MQTT 在孪生物联网系统中的应用实践

MQTT 在孪生物联网系统中的应用实践

本文将详细介绍 MQTT 协议在孪生物联网系统中的具体应用实现,涵盖基于 Node.js (Express) 的后端 MQTT 服务端开发、Cocos Creator 前端 MQTT 客户端开发,以及实际开发过程中遇到的兼容性问题和解决方案。

一、MQTT 协议简介

MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布 / 订阅消息传输协议,特别适用于低带宽、高延迟或不可靠的网络环境,是物联网通信的首选协议之一。在孪生物联网系统中,MQTT 主要用于:

  • 设备状态实时上报
  • 设备指令下发控制
  • 设备故障信息推送
  • 孪生体与物理设备的数据同步

二、技术架构设计

本次实现采用前后端分离架构:

  • 后端:Node.js + Express + mqtt-server,实现 MQTT 消息代理服务器
  • 前端:Cocos Creator,实现 MQTT 客户端,对接孪生可视化界面
  • 通信方式:WebSocket(ws://)协议,端口 1884

三、后端 MQTT 服务端实现(Node.js)

3.1 核心依赖安装

1
npm install mqtt-server events --save

3.2 MQTT 服务类实现

核心类MQTTService继承自EventEmitter,实现 MQTT 服务器的完整功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
const { EventEmitter } = require('events');
const mqttServer = require('mqtt-server'); // 使用正确的包名
/**
* MQTT服务类 - 负责MQTT消息代理和设备状态管理
* @class MQTTService
* @extends EventEmitter
*/
class MQTTService extends EventEmitter {
constructor() {
super();
this.isConnected = false;
this.subscribedTopics = new Set();
this.deviceStatusCache = new Map(); // 设备状态缓存
this.messageHandlers = new Map(); // 消息处理器
this.mqttServer = null; // MQTT服务器实例
this.clientConnections = new Map(); // 客户端连接管理
}

/**
* 初始化MQTT服务(MQTT服务器模式)
* @returns {Promise<boolean>} 初始化是否成功
*/
async connect() {
try {
this.mqttServer = mqttServer({
mqtt: 'ws://localhost:1884' // MQTT协议端口
}, {
emitEvents: true // 启用事件发射
}, (client) => {
const clientId = `client_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
client.id = clientId;
console.log('🔗 MQTT客户端连接成功:', clientId);
//存储客户端连接
this.clientConnections.set(clientId, client);
//立即发送连接确认
client.connack({
returnCode: 0 // 连接成功
});

client.on('connect', (packet) => {
console.log(`✅ MQTT客户端连接确认 [${clientId}]`);
});

client.on('publish', (packet) => {
this._handleMqttPublish(clientId, packet);
});

client.on('subscribe', (packet) => {
this._handleMqttSubscribe(clientId, packet);
});

client.on('unsubscribe', (packet) => {
this._handleMqttUnsubscribe(clientId, packet);
});

client.on('pingreq', () => {
client.pingresp();
});

client.on('disconnect', () => {
console.log(`🔌 MQTT客户端断开连接 [${clientId}]`);
this.clientConnections.delete(clientId);
});

client.on('error', (error) => {
console.error(`❌ MQTT客户端错误 [${clientId}]:`, error);
this.clientConnections.delete(clientId);
});
});
//启动MQTT服务器 - 使用Promise包装异步操作
return new Promise((resolve, reject) => {
this.mqttServer.listen((err) => {
if (err) {
console.error('❌ MQTT服务器启动失败:', err);
reject(err);
return;
}
console.log('✅ MQTT服务器启动成功,端口: 1884');
this.isConnected = true;
this.emit('connected');
resolve(true);
});
});
} catch (error) {
console.error('❌ MQTT服务器启动失败:', error);
throw error;
}
}

/**
* 处理MQTT客户端发布消息
* @private
* @param {string} clientId - 客户端ID
* @param {Object} packet - MQTT发布包
*/
_handleMqttPublish(clientId, packet) {
try {
const topic = packet.topic;
const message = packet.payload.toString();
const data = JSON.parse(message);
console.log(`📥 收到MQTT消息 [${clientId}]: ${topic}`, data);
// 处理消息
this._handleMessage(topic, packet.payload);
//立即回应客户端消息
//this._sendResponseToClient(clientId, topic, data);
//广播消息给所有订阅了该主题的客户端
this._broadcastToSubscribers(topic, data, clientId);
} catch (error) {
console.error('处理MQTT发布消息错误:', error);
}
}

/**
* 处理MQTT客户端订阅
* @private
* @param {string} clientId - 客户端ID
* @param {Object} packet - MQTT订阅包
*/
_handleMqttSubscribe(clientId, packet) {
packet.subscriptions.forEach((subscription) => {
const topic = subscription.topic;
this.subscribedTopics.add(topic);
console.log(`✅ MQTT客户端订阅成功 [${clientId}]: ${topic}`);
});
// 发送订阅确认
const client = this.clientConnections.get(clientId);
if (client) {
client.suback({
messageId: packet.messageId,
granted: packet.subscriptions.map(() => 0) // QoS 0
});
}
}

/**
* 处理MQTT客户端取消订阅
* @private
* @param {string} clientId - 客户端ID
* @param {Object} packet - MQTT取消订阅包
*/
_handleMqttUnsubscribe(clientId, packet) {
packet.unsubscriptions.forEach((topic) => {
this.subscribedTopics.delete(topic);
console.log(`✅ MQTT客户端取消订阅成功 [${clientId}]: ${topic}`);
});
//发送取消订阅确认
const client = this.clientConnections.get(clientId);
if (client) {
client.unsuback({
messageId: packet.messageId
});
}
}

/**
* 广播消息给订阅了特定主题的客户端
* @private
* @param {string} topic - 主题
* @param {Object} payload - 消息内容
* @param {string} excludeClientId - 排除的客户端ID(不发送给消息发布者)
*/
_broadcastToSubscribers(topic, payload, excludeClientId = null) {
for (const [clientId, client] of this.clientConnections) {
if (clientId !== excludeClientId && this._isTopicSubscribed(topic)) {
this._publishToClient(client, topic, payload);
}
}
}

/**
* 发布消息给特定客户端
* @private
* @param {Object} client - MQTT客户端
* @param {string} topic - 主题
* @param {Object} payload - 消息内容
*/
_publishToClient(client, topic, payload) {
try {
client.publish({
topic: topic,
payload: JSON.stringify(payload),
qos: 0,
retain: false
});
} catch (error) {
console.error('发布消息给客户端错误:', error);
}
}

/**
* 发送回应消息给客户端
* @private
* @param {string} clientId - 客户端ID
* @param {string} topic - 原始消息主题
* @param {Object} data - 原始消息数据
*/
_sendResponseToClient(clientId, topic, data) {
try {
const client = this.clientConnections.get(clientId);
if (!client) {
console.warn(`客户端 ${clientId} 不存在,无法发送回应`);
return;
}
//构建回应主题(在原始主题后添加 /response)
const responseTopic = `${topic}/response`;
//构建回应消息
const responseMessage = {
originalTopic: topic,
originalData: data,
response: '消息已收到',
serverTime: new Date().toISOString(),
status: 'success'
};
//发送回应消息
this._publishToClient(client, responseTopic, responseMessage);
console.log(`📤 发送回应消息给客户端 [${clientId}]: ${responseTopic}`, responseMessage);
} catch (error) {
console.error('发送回应消息错误:', error);
}
}

/**
* 检查主题是否被订阅
* @private
* @param {string} topic - 主题
* @returns {boolean} 是否被订阅
*/
_isTopicSubscribed(topic) {
// 简单的通配符匹配逻辑
for (const subscribedTopic of this.subscribedTopics) {
if (this._matchTopic(subscribedTopic, topic)) {
return true;
}
}
return false;
}

/**
* 主题匹配(支持简单通配符)
* @private
* @param {string} pattern - 模式(支持+通配符)
* @param {string} topic - 实际主题
* @returns {boolean} 是否匹配
*/
_matchTopic(pattern, topic) {
const patternParts = pattern.split('/');
const topicParts = topic.split('/');
if (patternParts.length !== topicParts.length) {
return false;
}
for (let i = 0; i < patternParts.length; i++) {
if (patternParts[i] === '+') {
continue; // 单级通配符匹配任何内容
}
if (patternParts[i] !== topicParts[i]) {
return false;
}
}

return true;
}

/**
* 订阅主题(服务端内部模式)
* @param {string} topic - 要订阅的主题
* @returns {Promise<boolean>} 订阅是否成功
*/
async subscribe(topic) {
if (!this.isConnected) {
throw new Error('MQTT服务未初始化');
}
console.log(`✅ 服务端订阅主题成功 [${topic}]`);
this.subscribedTopics.add(topic);
return true;
}

/**
* 发布消息(服务端内部模式)
* @param {string} topic - 目标主题
* @param {Object} message - 消息内容
* @param {Object} options - 发布选项
* @returns {Promise<boolean>} 发布是否成功
*/
async publish(topic, message, options = {}) {
if (!this.isConnected) {
throw new Error('MQTT服务未初始化');
}

console.log(`📤 服务端消息发布成功 [${topic}]:`, message);

// 广播给所有订阅了该主题的客户端
this._broadcastToSubscribers(topic, message);

return true;
}

/**
* 模拟设备状态更新(用于测试和演示)
* @param {string} deviceId - 设备ID
* @param {Object} status - 设备状态
*/
async simulateDeviceStatus(deviceId, status) {
const topic = `device/status/${deviceId}`;
const message = {
deviceId,
status: {
battery: status.battery || 100,
signal: status.signal || 90,
channel: status.channel || 1,
timestamp: Date.now()
}
};

await this.publish(topic, message);
}

/**
* 模拟设备故障(用于测试和演示)
* @param {string} deviceId - 设备ID
* @param {string} faultType - 故障类型
*/
async simulateDeviceFault(deviceId, faultType) {
const topic = `device/fault/${deviceId}`;
const message = {
deviceId,
fault: faultType,
timestamp: Date.now()
};

await this.publish(topic, message);
}

/**
* 处理接收到的消息
* @private
* @param {string} topic - 消息主题
* @param {Buffer} message - 消息内容
*/
_handleMessage(topic, message) {
try {
const messageStr = message.toString();
const data = JSON.parse(messageStr);
// 根据主题类型分发消息
if (topic.startsWith('device/status/')) {
this._handleDeviceStatus(topic, data);
} else if (topic.startsWith('device/fault/')) {
this._handleDeviceFault(topic, data);
} else if (topic.startsWith('device/operation/')) {
this._handleDeviceOperation(topic, data);
}
// 触发消息事件
this.emit('message', { topic, data });
} catch (error) {
console.error('消息处理错误:', error);
}
}

_handleDeviceStatus(topic, data) {
const topicParts = topic.split('/').pop();
if (topicParts && data) {
// 更新设备状态缓存
this.deviceStatusCache.set(topicParts, {
...data.status,
lastUpdate: Date.now()
});
// 广播设备状态更新
this.emit('deviceStatus', data);
}
}

_handleDeviceFault(topic, data) {
if ( data) {
this.emit('deviceFault', data);
}
}

_handleDeviceOperation(topic, data) {
if (data) {
this.emit('deviceOperation', data);
}
}

getDeviceStatus(deviceId) {
return this.deviceStatusCache.get(deviceId) || null;
}

getAllDeviceStatus() {
const devices = [];
for (const [deviceId, status] of this.deviceStatusCache) {
devices.push({
deviceId,
...status
});
}
return devices;
}

/**
* 断开MQTT服务
*/
disconnect() {
this.isConnected = false;
this.subscribedTopics.clear();
this.deviceStatusCache.clear();
//关闭所有客户端连接
for (const [clientId, client] of this.clientConnections) {
client.destroy();
}
this.clientConnections.clear();

// 关闭MQTT服务器
if (this.mqttServer) {
this.mqttServer.close();
}

console.log('🔌 MQTT服务已断开');
}
}

// 创建单例实例
const mqttService = new MQTTService();

module.exports = mqttService;

3.3 服务端使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 初始化并启动MQTT服务
const mqttService = require('./MQTTService');

// 启动MQTT服务器
mqttService.connect()
.then(() => {
console.log('MQTT服务启动成功');
// 订阅设备状态主题
return mqttService.subscribe('device/status/+');
})
.then(() => {
// 模拟设备状态更新
setInterval(() => {
mqttService.simulateDeviceStatus('device_001', {
battery: Math.floor(Math.random() * 100),
signal: Math.floor(Math.random() * 100),
channel: 1
});
}, 5000);
})
.catch((error) => {
console.error('MQTT服务启动失败:', error);
});

四、前端 MQTT 客户端实现(Cocos Creator)

4.1 依赖安装

1
2
3
4
# 先卸载现有版本
npm uninstall mqtt
# 安装兼容版本
npm install mqtt@4.0.1 --save

4.2 MQTT 管理器类实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
/**
* npm uninstall mqtt
npm install mqtt@4.0.1 --save
*/
import { Component } from 'cc';
import { eventEmitter } from '../../core/event/EventEmitter';
import * as mqttModule from 'mqtt/dist/mqtt.min.js';
const mqtt = mqttModule.default || mqttModule;

// 类型定义
interface MQTTOptions {
host?: string;
port?: number;
username?: string;
password?: string;
}

interface DeviceStatus {
[key: string]: any;
lastUpdate: number;
}

interface DeviceFault {
[key: string]: any;
}

interface DeviceOperation {
[key: string]: any;
}

interface MessageEvent {
topic: string;
data: any;
}

interface DeviceStatusEvent {
deviceId: string;
status: DeviceStatus;
}

interface DeviceFaultEvent {
deviceId: string;
fault: DeviceFault;
}

interface DeviceOperationEvent {
deviceId: string;
operation: DeviceOperation;
}

// MQTT事件接口
interface MQTTManagerEvents {
connected: () => void;
disconnected: () => void;
error: (error: Error) => void;
message: (event: MessageEvent) => void;
deviceStatus: (event: DeviceStatusEvent) => void;
deviceFault: (event: DeviceFaultEvent) => void;
deviceOperation: (event: DeviceOperationEvent) => void;
}

/**
* MQTT管理器类 - 负责MQTT连接管理和消息处理
* @class MQTTManager
* @extends EventEmitter
*/
export class MQTTMgr extends Component {
private static instance: MQTTMgr;
public client: mqttModule.MqttClient | null = null;
private isConnected: boolean = false;
private subscribedTopics: Set<string> = new Set();
private deviceStatusCache: Map<string, DeviceStatus> = new Map();

constructor() {
super();
}

/**
* 获取MQTT管理器单例实例
* @returns {MQTTMgr} 单例实例
*/
public static getInstance(): MQTTMgr {
if (!MQTTMgr.instance) {
MQTTMgr.instance = new MQTTMgr();
}
return MQTTMgr.instance;
}

/**
* 连接MQTT服务器
* @param {MQTTOptions} options - MQTT连接选项
* @returns {Promise<boolean>} 连接是否成功
*/
async connect(options: MQTTOptions = {}): Promise<boolean> {
try {
const {
host = 'localhost',
port = 1884, // 修改为1884端口
username = 'admin',
password = 'public'
} = options;

// 修复URL格式 - 移除多余的"http//"
const mqttUrl = `ws://localhost:1884`; // 正确的URL格式
const clientOptions: mqttModule.IClientOptions = {
username: username,
password: password,
clientId: `walkie_client_${Date.now()}`,
clean: true,
reconnectPeriod: 1000,
connectTimeout: 6000,
protocol: 'ws'
};

this.client = mqtt.connect(mqttUrl, clientOptions);
return new Promise<boolean>((resolve, reject) => {
this.client!.on('connect', () => {
console.log('✅ MQTT连接成功:', mqttUrl);
this.isConnected = true;
eventEmitter.emit('connected');
resolve(true);
});

this.client!.on('error', (error: Error) => {
console.error('❌ MQTT连接错误:', error);
this.isConnected = false;
eventEmitter.emit('error', error);
reject(error);
});

this.client!.on('message', (topic: string, message: any, packet: mqttModule.IPublishPacket) => {
this._handleMessage(topic, message);
});

this.client!.on('close', () => {
console.log('🔌 MQTT连接关闭');
this.isConnected = false;
eventEmitter.emit('disconnected');
});
});
} catch (error) {
console.error('❌ MQTT连接异常:', error);
throw error;
}
}

/**
* 订阅主题
* @param {string} topic - 要订阅的主题
* @param {IClientSubscribeOptions} options - 订阅选项
* @returns {Promise<boolean>} 订阅是否成功
*/
async subscribe(topic: string, options: mqttModule.IClientSubscribeOptions = { qos: 0 }): Promise<boolean> {
if (!this.isConnected || !this.client) {
throw new Error('MQTT客户端未连接');
}
return new Promise<boolean>((resolve, reject) => {
this.client!.subscribe(topic, options, (error: Error | null, granted?: mqttModule.ISubscriptionGrant[]) => {
if (error) {
console.error(`订阅主题失败 [${topic}]:`, error);
reject(error);
} else {
console.log(`订阅主题成功 [${topic}]`);
this.subscribedTopics.add(topic);
resolve(true);
}
});
});
}

/**
* 发布消息
* @param {string} topic - 目标主题
* @param {any} message - 消息内容
* @param {Object} options - 发布选项
* @returns {Promise<boolean>} 发布是否成功
*/
async publish(topic: string, message: any, options: any = {}): Promise<boolean> {
if (!this.isConnected || !this.client) {
throw new Error('MQTT客户端未连接');
}
const messageStr = JSON.stringify(message);
return new Promise<boolean>((resolve, reject) => {
this.client!.publish(topic, messageStr, options, (error?: Error) => {
if (error) {
console.error(`发布消息失败 [${topic}]:`, error);
reject(error);
} else {
console.log(`消息发布成功 [${topic}]:`, message);
resolve(true);
}
});
});
}

/**
* 处理接收到的消息
* @private
* @param {string} topic - 消息主题
* @param {Buffer} message - 消息内容
*/
private _handleMessage(topic: string, message: any): void {
try {
const messageStr = message.toString();
const data = JSON.parse(messageStr);
console.log(`收到消息 [${topic}]:`, data);
// 根据主题类型分发消息
if (topic.startsWith('device/status/')) {
this._handleDeviceStatus(topic, data);
} else if (topic.startsWith('device/fault/')) {
this._handleDeviceFault(topic, data);
} else if (topic.startsWith('device/operation/')) {
this._handleDeviceOperation(topic, data);
}
// 触发消息事件
eventEmitter.emit('message', { topic, data });
} catch (error) {
console.error('消息处理错误:', error);
}
}

/**
* 处理设备状态消息
* @private
* @param {string} topic - 主题
* @param {DeviceStatus} data - 设备状态数据
*/
private _handleDeviceStatus(topic: string, data: DeviceStatus): void {
const deviceId = topic.split('/').pop();
if (deviceId && data) {
//更新设备状态缓存
this.deviceStatusCache.set(deviceId, {
...data,
lastUpdate: Date.now()
});
//广播设备状态更新
eventEmitter.emit('deviceStatus', { deviceId, status: data });
}
}

/**
* 处理设备故障消息
* @private
* @param {string} topic - 主题
* @param {DeviceFault} data - 设备故障数据
*/
private _handleDeviceFault(topic: string, data: DeviceFault): void {
const deviceId = topic.split('/').pop();
if (deviceId && data) {
eventEmitter.emit('deviceFault', { deviceId, fault: data });
}
}

/**
* 处理设备操作消息
* @private
* @param {string} topic - 主题
* @param {DeviceOperation} data - 设备操作数据
*/
private _handleDeviceOperation(topic: string, data: DeviceOperation): void {
const deviceId = topic.split('/').pop();
if (deviceId && data) {
eventEmitter.emit('deviceOperation', { deviceId, operation: data });
}
}

/**
* 获取设备状态
* @param {string} deviceId - 设备ID
* @returns {DeviceStatus | null} 设备状态信息
*/
getDeviceStatus(deviceId: string): DeviceStatus | null {
return this.deviceStatusCache.get(deviceId) || null;
}

/**
* 获取所有设备状态
* @returns {Array<{deviceId: string} & DeviceStatus>} 所有设备状态列表
*/
getAllDeviceStatus(): Array<{deviceId: string} & DeviceStatus> {
const devices: Array<{deviceId: string} & DeviceStatus> = [];
for (const [deviceId, status] of this.deviceStatusCache) {
devices.push({
deviceId,
...status
});
}
return devices;
}

/**
* 断开MQTT连接
*/
disconnect(): void {
if (this.client) {
this.client.end();
this.isConnected = false;
this.subscribedTopics.clear();
this.deviceStatusCache.clear();
console.log('🔌 MQTT连接已断开');
}
}

/**
* 检查是否已连接
* @returns {boolean} 连接状态
*/
getIsConnected(): boolean {
return this.isConnected;
}

/**
* 获取已订阅的主题列表
* @returns {string[]} 已订阅的主题数组
*/
getSubscribedTopics(): string[] {
return Array.from(this.subscribedTopics);
}
}

// 类型声明扩展 EventEmitter
declare interface MQTTManager {
on<U extends keyof MQTTManagerEvents>(
event: U, listener: MQTTManagerEvents[U]
): this;

emit<U extends keyof MQTTManagerEvents>(
event: U, ...args: Parameters<MQTTManagerEvents[U]>
): boolean;
}
export const mqttMgr = MQTTMgr.getInstance();

4.3 MQTT 客户端调用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
import { _decorator, Label, Button, EditBox, ScrollView, Color } from 'cc';
import { BaseExp } from '../core/base/BaseExp';
import { autoBind } from '../extend/AutoBind';
import { mqttMgr } from '../network/mqtt/MQTTMgr';
import { eventEmitter } from '../core/event/EventEmitter';
const { ccclass, property } = _decorator;

// 设备状态接口
interface DeviceStatus {
deviceId: string;
battery: number;
signal: number;
channel: number;
online: boolean;
lastUpdate: number;
}

// 设备操作接口
interface DeviceOperation {
deviceId: string;
command: string;
value: any;
timestamp: number;
}

@ccclass('MQTTDemo')
export class MQTTDemo extends BaseExp {
@autoBind(Label)
private statusLabel: Label = null!;
@autoBind(Label)
private logLabel: Label = null!;
@autoBind(EditBox)
private hostInput: EditBox = null!;
@autoBind(EditBox)
private portInput: EditBox = null!;
@autoBind(EditBox)
private usernameInput: EditBox = null!;
@autoBind(EditBox)
private passwordInput: EditBox = null!;
@autoBind(EditBox)
private topicInput: EditBox = null!;
@autoBind(EditBox)
private messageInput: EditBox = null!;
@autoBind(Button)
private connectBtn: Button = null!;
@autoBind(Button)
private disconnectBtn: Button = null!;
@autoBind(Button)
private subscribeBtn: Button = null!;
@autoBind(Button)
private publishBtn: Button = null!;
@autoBind(ScrollView)
private logScrollView: ScrollView = null!;

private isConnected: boolean = false;
private subscribedTopics: Set<string> = new Set();
private deviceStatusMap: Map<string, DeviceStatus> = new Map();
private logMessages: string[] = [];

async onLoad(): Promise<void> {
super.onLoad();
//设置默认连接参数
this.hostInput.string = 'localhost';//http://192.168.0.107
this.portInput.string = '1884';
this.usernameInput.string = 'admin';
this.passwordInput.string = 'public';
this.topicInput.string = 'device/status/#';
this.messageInput.string = '{deviceId: "pid:123456","operation": "test", "value": 1}';
//绑定按钮事件
this.connectBtn.node.on('click', this.onConnectClick, this);
this.disconnectBtn.node.on('click', this.onDisconnectClick, this);
this.subscribeBtn.node.on('click', this.onSubscribeClick, this);
this.publishBtn.node.on('click', this.onPublishClick, this);
//初始化MQTT事件监听
this.initMQTTEvents();
this.updateStatus();
}

public onDestroy(): void {
//清理事件监听
mqttMgr.client.removeAllListeners();
if (this.isConnected) {
mqttMgr.disconnect();
}
}

/**
* 初始化MQTT事件监听
*/
private initMQTTEvents(){
this.register('connected', () => {
this.isConnected = true;
this.addLog('✅ MQTT连接成功');
this.updateStatus();
});
this.register('disconnected', () => {
this.isConnected = false;
this.subscribedTopics.clear();
this.addLog('🔌 MQTT连接断开');
this.updateStatus();
});
this.register('error', (error: Error) => {
this.addLog(`❌ MQTT错误: ${error.message}`);
this.updateStatus();
});
this.register('message', (event: any) => {
this.addLog(`📥 收到消息 [${event.topic}]: ${JSON.stringify(event.data)}`);
this.handleMessage(event.topic, event.data);
});
this.register('deviceStatus', (event: any) => {
this.handleDeviceStatus(event.deviceId, event.status);
});
this.register('deviceFault', (event: any) => {
this.addLog(`⚠️ 设备故障 [${event.deviceId}]: ${JSON.stringify(event.fault)}`);
});
this.register('deviceOperation', (event: any) => {
this.addLog(`🔧 设备操作 [${event.deviceId}]: ${JSON.stringify(event.operation)}`);
});
}

/**
* 处理接收到的消息
*/
private handleMessage(topic: string, data: any): void {
// 根据主题类型处理不同消息
if (topic.startsWith('device/status/')) {
const deviceId = topic.split('/').pop();
if (deviceId) {
this.handleDeviceStatus(deviceId, data);
}
}
}

/**
* 处理设备状态更新
*/
private handleDeviceStatus(deviceId: string, status: any): void {
const deviceStatus: DeviceStatus = {
deviceId,
battery: status.battery || 0,
signal: status.signal || 0,
channel: status.channel || 0,
online: status.online || false,
lastUpdate: Date.now()
};
this.deviceStatusMap.set(deviceId, deviceStatus);
this.addLog(`📊 设备状态更新 [${deviceId}]: 电量${deviceStatus.battery}%, 信号${deviceStatus.signal}%`);
}

/**
* 连接按钮点击事件
*/
private async onConnectClick(): Promise<void> {
if (this.isConnected) {
this.addLog('⚠️ 已经连接到MQTT服务器');
return;
}
try {
const options = {
host: this.hostInput.string || 'localhost',
port: parseInt(this.portInput.string) || 1884,
username: this.usernameInput.string || undefined,
password: this.passwordInput.string || undefined
};
this.addLog(`🔗 正在连接MQTT服务器: ${options.host}:${options.port}`);
await mqttMgr.connect(options);
} catch (error) {
this.addLog(`❌ 连接失败: ${error}`);
}
}

/**
* 断开连接按钮点击事件
*/
private onDisconnectClick(): void {
if (!this.isConnected) {
this.addLog('⚠️ 当前未连接到MQTT服务器');
return;
}
mqttMgr.disconnect();
this.subscribedTopics.clear();
this.addLog('🔌 已断开MQTT连接');
}

/**
* 订阅按钮点击事件
*/
private async onSubscribeClick(): Promise<void> {
if (!this.isConnected) {
this.addLog('❌ 请先连接到MQTT服务器');
return;
}
const topic = this.topicInput.string.trim();
if (!topic) {
this.addLog('❌ 请输入要订阅的主题');
return;
}
if (this.subscribedTopics.has(topic)) {
this.addLog(`⚠️ 已经订阅了主题: ${topic}`);
return;
}
try {
await mqttMgr.subscribe(topic);
this.subscribedTopics.add(topic);
this.addLog(`✅ 订阅成功: ${topic}`);
} catch (error) {
this.addLog(`❌ 订阅失败: ${error}`);
}
}

/**
* 发布按钮点击事件
*/
private async onPublishClick(): Promise<void> {
if (!this.isConnected) {
this.addLog('❌ 请先连接到MQTT服务器');
return;
}
const topic = this.topicInput.string;//'device/operation/test'; // 默认发布到操作主题
let message: any;
try {
message = JSON.parse(this.messageInput.string);
} catch {
message = { message: this.messageInput.string };
}
try {
await mqttMgr.publish(topic, message);
this.addLog(`📤 发布成功 [${topic}]: ${JSON.stringify(message)}`);
} catch (error) {
this.addLog(`❌ 发布失败: ${error}`);
}
}

/**
* 添加日志消息
*/
private addLog(message: string): void {
const timestamp = new Date().toLocaleTimeString();
const logMessage = `[${timestamp}] ${message}`;
this.logMessages.push(logMessage);
if (this.logMessages.length > 50) {
this.logMessages.shift(); // 限制日志数量
}
this.updateLogDisplay();
}

/**
* 更新日志显示
*/
private updateLogDisplay(): void {
if (this.logLabel) {
this.logLabel.string = this.logMessages.join('\n');
// 自动滚动到底部
setTimeout(() => {
if (this.logScrollView) {
this.logScrollView.scrollToBottom(0.1);
}
}, 100);
}
}

/**
* 更新状态显示
*/
private updateStatus(): void {
if (this.statusLabel) {
const statusText = this.isConnected ?
`🟢 已连接 | 订阅主题: ${this.subscribedTopics.size} | 在线设备: ${this.deviceStatusMap.size}` :
'🔴 未连接';
this.statusLabel.string = statusText;
this.statusLabel.color = this.isConnected ? Color.GREEN : Color.RED;
}
// 更新按钮状态
this.connectBtn.interactable = !this.isConnected;
this.disconnectBtn.interactable = this.isConnected;
this.subscribeBtn.interactable = this.isConnected;
this.publishBtn.interactable = this.isConnected;
}

/**
* 获取设备状态信息
*/
public getDeviceStatus(deviceId: string): DeviceStatus | null {
return this.deviceStatusMap.get(deviceId) || null;
}

/**
* 获取所有设备状态
*/
public getAllDeviceStatus(): DeviceStatus[] {
return Array.from(this.deviceStatusMap.values());
}

/**
* 发送设备操作指令
*/
public async sendDeviceCommand(deviceId: string, command: string, value: any): Promise<boolean> {
if (!this.isConnected) {
this.addLog('❌ 请先连接到MQTT服务器');
return false;
}
const operation: DeviceOperation = {
deviceId,
command,
value,
timestamp: Date.now()
};
try {
const topic = `device/operation/${deviceId}`;
await mqttMgr.publish(topic, operation);
this.addLog(`🔧 发送指令成功 [${deviceId}]: ${command} = ${value}`);
return true;
} catch (error) {
this.addLog(`❌ 发送指令失败: ${error}`);
return false;
}
}

/**
* 订阅设备状态
*/
public async subscribeDeviceStatus(deviceId: string): Promise<boolean> {
if (!this.isConnected) {
this.addLog('❌ 请先连接到MQTT服务器');
return false;
}
const topic = `device/status/${deviceId}`;
try {
await mqttMgr.subscribe(topic);
this.subscribedTopics.add(topic);
this.addLog(`✅ 订阅设备状态成功: ${deviceId}`);
return true;
} catch (error) {
this.addLog(`❌ 订阅设备状态失败: ${error}`);
return false;
}
}

/**
* 取消订阅
*/
public async unsubscribe(topic: string): Promise<boolean> {
if (!this.subscribedTopics.has(topic)) {
this.addLog(`⚠️ 未订阅该主题: ${topic}`);
return false;
}
// 注意:实际的取消订阅功能需要在MQTTMgr中实现
this.subscribedTopics.delete(topic);
this.addLog(`✅ 取消订阅: ${topic}`);
return true;
}
}

五、开发过程中遇到的问题及解决方案

5.1 问题 1:Cocos Creator 中导入 MQTT 报错

错误信息

1
Error: Error: Unexpected export statement in CJS module. at :7456/@:8:16534 at Object.execute (data:text/javascript…%20%20%20%20%20:3:7)

解决方案

使用浏览器兼容的导入方式,替换原来的导入代码:

1
2
3
4
5
// 错误导入方式
// import mqtt from 'mqtt';
// 正确导入方式
import * as mqttModule from 'mqtt/dist/mqtt.min.js';
const mqtt = mqttModule.default || mqttModule;

5.2 问题 2:MQTT 版本兼容性问题

错误信息

1
n.createConnection is not a function

解决方案

降级 MQTT 版本到 4.0.1,该版本在 Cocos Creator 环境中兼容性最佳:

1
2
3
4
# 先卸载现有版本
npm uninstall mqtt
# 安装兼容版本
npm install mqtt@4.0.1 --save

5.3 其他注意事项

  1. 通信协议选择:使用 WebSocket(ws://)而非原生 MQTT 协议(mqtt://),确保在浏览器环境中正常运行
  2. 端口配置:服务端和客户端需使用相同端口(本文使用 1884),避免端口冲突
  3. 主题设计:采用分层主题命名规范(如device/status/{deviceId}),便于消息分类处理
  4. 错误处理:完善的错误处理和重连机制,确保网络不稳定时系统的鲁棒性

六、系统功能总结

本实现完成了孪生物联网系统中 MQTT 通信的核心功能:

  1. 服务端:MQTT 消息代理服务器,支持客户端连接管理、消息发布订阅、设备状态缓存
  2. 客户端:Cocos Creator 中实现 MQTT 客户端,支持连接、订阅、发布、消息处理
  3. 数据同步:设备状态、故障信息、操作指令的实时同步
  4. 可视化:客户端提供 MQTT 通信的可视化调试界面

总结

  1. 核心实现:基于 Node.js 的mqtt-server构建 MQTT 服务端,基于 Cocos Creator 和 mqtt@4.0.1 构建客户端,实现了孪生物联网系统的 MQTT 通信层。
  2. 关键问题:解决了 Cocos Creator 中 MQTT 导入报错和版本兼容性问题,确保了前端客户端的正常运行。
  3. 最佳实践:采用 WebSocket 协议通信、分层主题设计、完善的错误处理机制,保证了系统的稳定性和可扩展性。