Contiki-NG
Loading...
Searching...
No Matches
mqtt.c
Go to the documentation of this file.
1/*
2 * Copyright (c) 2015, Texas Instruments Incorporated - http://www.ti.com/
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * 3. Neither the name of the copyright holder nor the names of its
14 * contributors may be used to endorse or promote products derived
15 * from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
20 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
21 * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
26 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
28 * OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30/*---------------------------------------------------------------------------*/
31/**
32 * \addtogroup mqtt-engine
33 * @{
34 */
35/**
36 * \file
37 * Implementation of the Contiki MQTT engine
38 *
39 * \author
40 * Texas Instruments
41 */
42/*---------------------------------------------------------------------------*/
43#include "mqtt.h"
44#include "mqtt-prop.h"
45#include "contiki.h"
46#include "contiki-net.h"
47#include "contiki-lib.h"
48#include "lib/random.h"
49#include "sys/ctimer.h"
50#include "sys/etimer.h"
51#include "sys/pt.h"
52#include "net/ipv6/uip.h"
53#include "net/ipv6/uip-ds6.h"
54#include "dev/leds.h"
55
56#include "tcp-socket.h"
57
58#include "lib/assert.h"
59#include "lib/list.h"
60#include "sys/cc.h"
61
62#include <stdlib.h>
63#include <stdio.h>
64#include <string.h>
65#include <stdarg.h>
66/*---------------------------------------------------------------------------*/
67#define DEBUG 0
68#if DEBUG
69#define PRINTF(...) printf(__VA_ARGS__)
70#else
71#define PRINTF(...)
72#endif
73/*---------------------------------------------------------------------------*/
74typedef enum {
75 MQTT_FHDR_DUP_FLAG = 0x08,
76
77 MQTT_FHDR_QOS_LEVEL_0 = 0x00,
78 MQTT_FHDR_QOS_LEVEL_1 = 0x02,
79 MQTT_FHDR_QOS_LEVEL_2 = 0x04,
80
81 MQTT_FHDR_RETAIN_FLAG = 0x01,
82} mqtt_fhdr_fields_t;
83/*---------------------------------------------------------------------------*/
84typedef enum {
85 MQTT_VHDR_USERNAME_FLAG = 0x80,
86 MQTT_VHDR_PASSWORD_FLAG = 0x40,
87
88 MQTT_VHDR_WILL_RETAIN_FLAG = 0x20,
89 MQTT_VHDR_WILL_QOS_LEVEL_0 = 0x00,
90 MQTT_VHDR_WILL_QOS_LEVEL_1 = 0x08,
91 MQTT_VHDR_WILL_QOS_LEVEL_2 = 0x10,
92
93 MQTT_VHDR_WILL_FLAG = 0x04,
94 MQTT_VHDR_CLEAN_SESSION_FLAG = 0x02, /* called Clean Start in MQTTv5.0 */
95} mqtt_vhdr_conn_fields_t;
96/*---------------------------------------------------------------------------*/
97typedef enum {
98 MQTT_VHDR_CONN_ACCEPTED,
99 MQTT_VHDR_CONN_REJECTED_PROTOCOL,
100 MQTT_VHDR_CONN_REJECTED_IDENTIFIER,
101 MQTT_VHDR_CONN_REJECTED_UNAVAILABLE,
102 MQTT_VHDR_CONN_REJECTED_BAD_USER_PASS,
103 MQTT_VHDR_CONN_REJECTED_UNAUTHORIZED,
104} mqtt_vhdr_connack_ret_code_t;
105
106typedef enum {
107 MQTT_VHDR_CONNACK_SESSION_PRESENT = 0x1
108} mqtt_vhdr_connack_flags_t;
109
110/*---------------------------------------------------------------------------*/
111typedef enum {
112 MQTT_SUBACK_RET_QOS_0 = 0x00,
113 MQTT_SUBACK_RET_QOS_1 = 0x01,
114 MQTT_SUBACK_RET_QOS_2 = 0x02,
115 MQTT_SUBACK_RET_FAIL = 0x08,
116} mqtt_suback_ret_code_t;
117/*---------------------------------------------------------------------------*/
118/* MQTTv5.0 Reason Codes */
119typedef enum {
120 MQTT_VHDR_RC_SUCCES_OR_NORMAL = 0x00,
121 MQTT_VHDR_RC_QOS_0 = 0x01,
122 MQTT_VHDR_RC_QOS_1 = 0x02,
123 MQTT_VHDR_RC_QOS_2 = 0x03,
124 MQTT_VHDR_RC_DISC_WITH_WILL = 0x04,
125 MQTT_VHDR_RC_NO_MATCH_SUB = 0x10,
126 MQTT_VHDR_RC_NO_SUB_EXISTED = 0x11,
127 MQTT_VHDR_RC_CONTINUE_AUTH = 0x18,
128 MQTT_VHDR_RC_REAUTH = 0x19,
129 MQTT_VHDR_RC_UNSPEC_ERR = 0x80,
130 MQTT_VHDR_RC_MALFORMED_PKT = 0x81,
131 MQTT_VHDR_RC_PROTOCOL_ERR = 0x82,
132 MQTT_VHDR_RC_IMPL_SPEC_ERR = 0x83,
133 MQTT_VHDR_RC_PROT_VER_UNUSUPPORTED = 0x84,
134 MQTT_VHDR_RC_CLIENT_ID_INVALID = 0x85,
135 MQTT_VHDR_RC_BAD_USER_PASS = 0x86,
136 MQTT_VHDR_RC_NOT_AUTH = 0x87,
137 MQTT_VHDR_RC_SRV_UNAVAIL = 0x88,
138 MQTT_VHDR_RC_SRV_BUSY = 0x89,
139 MQTT_VHDR_RC_BANNED = 0x8A,
140 MQTT_VHDR_RC_SRV_SHUTDOWN = 0x8B,
141 MQTT_VHDR_RC_BAD_AUTH_METHOD = 0x8C,
142 MQTT_VHDR_RC_KEEP_ALIVE_TIMEOUT = 0x8D,
143 MQTT_VHDR_RC_SESS_TAKEN_OVER = 0x8E,
144 MQTT_VHDR_RC_TOPIC_FILT_INVAL = 0x8F,
145 MQTT_VHDR_RC_TOPIC_NAME_INVAL = 0x90,
146 MQTT_VHDR_RC_PKT_ID_IN_USE = 0x91,
147 MQTT_VHDR_RC_PKT_ID_NOT_FOUND = 0x92,
148 MQTT_VHDR_RC_RECV_MAX_EXCEEDED = 0x93,
149 MQTT_VHDR_RC_TOPIC_ALIAS_INVAL = 0x94,
150 MQTT_VHDR_RC_PKT_TOO_LARGE = 0x95,
151 MQTT_VHDR_RC_MSG_RATE_TOO_HIGH = 0x96,
152 MQTT_VHDR_RC_QUOTA_EXCEEDED = 0x97,
153 MQTT_VHDR_RC_ADMIN_ACTION = 0x98,
154 MQTT_VHDR_RC_PAYLD_FMT_INVAL = 0x99,
155 MQTT_VHDR_RC_RETAIN_UNSUPPORTED = 0x9A,
156 MQTT_VHDR_RC_QOS_UNSUPPORTED = 0x9B,
157 MQTT_VHDR_RC_USE_ANOTHER_SRV = 0x9C,
158 MQTT_VHDR_RC_SRV_MOVED = 0x9D,
159 MQTT_VHDR_RC_SHARED_SUB_UNSUPPORTED = 0x9E,
160 MQTT_VHDR_RC_CONN_RATE_EXCEEDED = 0x9F,
161 MQTT_VHDR_RC_MAX_CONN_TIME = 0xA0,
162 MQTT_VHDR_RC_SUB_ID_UNSUPPORTED = 0xA1,
163 MQTT_VHDR_RC_WILD_SUB_UNSUPPORTED = 0xA2,
164} mqtt_reason_code_t;
165/*---------------------------------------------------------------------------*/
166#define RESPONSE_WAIT_TIMEOUT (CLOCK_SECOND * 10)
167/*---------------------------------------------------------------------------*/
168#define INCREMENT_MID(conn) (conn)->mid_counter += 2
169#define MQTT_STRING_LENGTH(s) (((s)->length) == 0 ? 0 : (MQTT_STRING_LEN_SIZE + (s)->length))
170/*---------------------------------------------------------------------------*/
171/* Protothread send macros */
172#define PT_MQTT_WRITE_BYTES(conn, data, len) \
173 conn->out_write_pos = 0; \
174 while(write_bytes(conn, data, len)) { \
175 PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \
176 }
177
178#define PT_MQTT_WRITE_BYTE(conn, data) \
179 while(write_byte(conn, data)) { \
180 PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \
181 }
182/*---------------------------------------------------------------------------*/
183/*
184 * Sends the continue send event and wait for that event.
185 *
186 * The reason we cannot use PROCESS_PAUSE() is since we would risk loosing any
187 * events posted during the sending process.
188 */
189#define PT_MQTT_WAIT_SEND() \
190 do { \
191 if (PROCESS_ERR_OK == \
192 process_post(PROCESS_CURRENT(), mqtt_continue_send_event, NULL)) { \
193 do { \
194 PROCESS_WAIT_EVENT(); \
195 if(ev == mqtt_abort_now_event) { \
196 conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE; \
197 PT_INIT(&conn->out_proto_thread); \
198 process_post(PROCESS_CURRENT(), ev, data); \
199 } else if(ev >= mqtt_event_min && ev <= mqtt_event_max) { \
200 process_post(PROCESS_CURRENT(), ev, data); \
201 } \
202 } while (ev != mqtt_continue_send_event); \
203 } \
204 } while(0)
205/*---------------------------------------------------------------------------*/
206static process_event_t mqtt_do_connect_tcp_event;
207static process_event_t mqtt_do_connect_mqtt_event;
208static process_event_t mqtt_do_disconnect_mqtt_event;
209static process_event_t mqtt_do_subscribe_event;
210static process_event_t mqtt_do_unsubscribe_event;
211static process_event_t mqtt_do_publish_event;
212static process_event_t mqtt_do_pingreq_event;
213static process_event_t mqtt_continue_send_event;
214static process_event_t mqtt_abort_now_event;
215static process_event_t mqtt_do_auth_event;
216process_event_t mqtt_update_event;
217
218/*
219 * Min and Max event numbers we want to acknowledge while we're in the process
220 * of doing something else. continue_send does not count, therefore must be
221 * allocated last
222 */
223static process_event_t mqtt_event_min;
224static process_event_t mqtt_event_max;
225/*---------------------------------------------------------------------------*/
226/* Prototypes */
227static int
228tcp_input(struct tcp_socket *s, void *ptr, const uint8_t *input_data_ptr,
229 int input_data_len);
230
231static void tcp_event(struct tcp_socket *s, void *ptr,
232 tcp_socket_event_t event);
233
234static void reset_packet(struct mqtt_in_packet *packet);
235/*---------------------------------------------------------------------------*/
236LIST(mqtt_conn_list);
237/*---------------------------------------------------------------------------*/
238PROCESS(mqtt_process, "MQTT process");
239/*---------------------------------------------------------------------------*/
240static void
241call_event(struct mqtt_connection *conn,
242 mqtt_event_t event,
243 void *data)
244{
245 conn->event_callback(conn, event, data);
246 process_post(conn->app_process, mqtt_update_event, NULL);
247}
248/*---------------------------------------------------------------------------*/
249static void
250reset_defaults(struct mqtt_connection *conn)
251{
252 conn->mid_counter = 1;
253 PT_INIT(&conn->out_proto_thread);
254 conn->waiting_for_pingresp = 0;
255
256 reset_packet(&conn->in_packet);
257 conn->out_buffer_sent = 0;
258}
259/*---------------------------------------------------------------------------*/
260static void
261abort_connection(struct mqtt_connection *conn)
262{
263 conn->out_buffer_ptr = conn->out_buffer;
264 conn->out_queue_full = 0;
265
266 /* Reset outgoing packet */
267 memset(&conn->out_packet, 0, sizeof(conn->out_packet));
268
269 tcp_socket_close(&conn->socket);
270 tcp_socket_unregister(&conn->socket);
271
272 memset(&conn->socket, 0, sizeof(conn->socket));
273
274 conn->state = MQTT_CONN_STATE_NOT_CONNECTED;
275}
276/*---------------------------------------------------------------------------*/
277static void
278connect_tcp(struct mqtt_connection *conn)
279{
280 conn->state = MQTT_CONN_STATE_TCP_CONNECTING;
281
282 reset_defaults(conn);
283 tcp_socket_register(&(conn->socket),
284 conn,
285 conn->in_buffer,
286 MQTT_TCP_INPUT_BUFF_SIZE,
287 conn->out_buffer,
288 MQTT_TCP_OUTPUT_BUFF_SIZE,
289 tcp_input,
290 tcp_event);
291 tcp_socket_connect(&(conn->socket), &(conn->server_ip), conn->server_port);
292}
293/*---------------------------------------------------------------------------*/
294static void
295disconnect_tcp(struct mqtt_connection *conn)
296{
297 conn->state = MQTT_CONN_STATE_DISCONNECTING;
298 tcp_socket_close(&(conn->socket));
299 tcp_socket_unregister(&conn->socket);
300
301 memset(&conn->socket, 0, sizeof(conn->socket));
302}
303/*---------------------------------------------------------------------------*/
304static void
305send_out_buffer(struct mqtt_connection *conn)
306{
307 if(conn->out_buffer_ptr - conn->out_buffer == 0) {
308 conn->out_buffer_sent = 1;
309 return;
310 }
311 conn->out_buffer_sent = 0;
312
313 DBG("MQTT - (send_out_buffer) Space used in buffer: %i\n",
314 conn->out_buffer_ptr - conn->out_buffer);
315
316 tcp_socket_send(&conn->socket, conn->out_buffer,
317 conn->out_buffer_ptr - conn->out_buffer);
318}
319/*---------------------------------------------------------------------------*/
320static void
321string_to_mqtt_string(struct mqtt_string *mqtt_string, char *string)
322{
323 if(mqtt_string == NULL) {
324 return;
325 }
326 mqtt_string->string = string;
327
328 if(string != NULL) {
329 mqtt_string->length = strlen(string);
330 } else {
331 mqtt_string->length = 0;
332 }
333}
334/*---------------------------------------------------------------------------*/
335static int
336write_byte(struct mqtt_connection *conn, uint8_t data)
337{
338 DBG("MQTT - (write_byte) buff_size: %i write: '%02X'\n",
339 &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr,
340 data);
341
342 if(&conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr == 0) {
343 send_out_buffer(conn);
344 return 1;
345 }
346
347 *conn->out_buffer_ptr = data;
348 conn->out_buffer_ptr++;
349 return 0;
350}
351/*---------------------------------------------------------------------------*/
352static int
353write_bytes(struct mqtt_connection *conn, uint8_t *data, uint16_t len)
354{
355 uint16_t write_bytes;
356 write_bytes =
357 MIN(&conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr,
358 len - conn->out_write_pos);
359
360 memcpy(conn->out_buffer_ptr, &data[conn->out_write_pos], write_bytes);
361 conn->out_write_pos += write_bytes;
362 conn->out_buffer_ptr += write_bytes;
363
364 DBG("MQTT - (write_bytes) len: %u write_pos: %i\n", len,
365 conn->out_write_pos);
366
367 if(len - conn->out_write_pos == 0) {
368 conn->out_write_pos = 0;
369 return 0;
370 } else {
371 send_out_buffer(conn);
372 return len - conn->out_write_pos;
373 }
374}
375/*---------------------------------------------------------------------------*/
376uint8_t
377mqtt_decode_var_byte_int(const uint8_t *input_data_ptr,
378 int input_data_len,
379 uint32_t *input_pos,
380 uint32_t *pkt_byte_count,
381 uint16_t *dest)
382{
383 uint8_t read_bytes = 0;
384 uint8_t byte_in;
385 uint8_t multiplier = 1;
386 uint32_t input_pos_0 = 0;
387
388 if(input_pos == NULL) {
389 input_pos = &input_pos_0;
390 }
391
392 do {
393 if(*input_pos >= input_data_len) {
394 return 0;
395 }
396
397 byte_in = input_data_ptr[*input_pos];
398 (*input_pos)++;
399 if(pkt_byte_count) {
400 (*pkt_byte_count)++;
401 }
402 read_bytes++;
403 DBG("MQTT - Read Variable Byte Integer byte %i\n", byte_in);
404
405 if(read_bytes > 4) {
406 DBG("Received more than 4 byte 'Variable Byte Integer'.");
407 return 0;
408 }
409
410 *dest += (byte_in & 127) * multiplier;
411 multiplier *= 128;
412 } while((byte_in & 128) != 0);
413
414 return read_bytes;
415}
416/*---------------------------------------------------------------------------*/
417void
418mqtt_encode_var_byte_int(uint8_t *vbi_out,
419 uint8_t *vbi_bytes,
420 uint32_t val)
421{
422 uint8_t digit;
423
424 DBG("MQTT - Encoding Variable Byte Integer %u\n", val);
425
426 *vbi_bytes = 0;
427 do {
428 digit = val % 128;
429 val = val / 128;
430 if(val > 0) {
431 digit = digit | 0x80;
432 }
433
434 vbi_out[*vbi_bytes] = digit;
435 (*vbi_bytes)++;
436 DBG("MQTT - Encode VBI digit '%u' length '%i'\n", digit, val);
437 } while(val > 0 && *vbi_bytes < 5);
438 DBG("MQTT - var_byte_int bytes %u\n", *vbi_bytes);
439}
440/*---------------------------------------------------------------------------*/
441static void
442keep_alive_callback(void *ptr)
443{
444 struct mqtt_connection *conn = ptr;
445
446 DBG("MQTT - (keep_alive_callback) Called!\n");
447
448 /* The flag is set when the PINGREQ has been sent */
449 if(conn->waiting_for_pingresp) {
450 PRINTF("MQTT - Disconnect due to no PINGRESP from broker.\n");
451 disconnect_tcp(conn);
452 return;
453 }
454
455 process_post(&mqtt_process, mqtt_do_pingreq_event, conn);
456}
457/*---------------------------------------------------------------------------*/
458static void
459reset_packet(struct mqtt_in_packet *packet)
460{
461 memset(packet, 0, sizeof(struct mqtt_in_packet));
462}
463/*---------------------------------------------------------------------------*/
464#if MQTT_5
465static
466PT_THREAD(write_out_props(struct pt *pt, struct mqtt_connection *conn,
467 struct mqtt_prop_list *prop_list))
468{
469 PT_BEGIN(pt);
470
471 static struct mqtt_prop_out_property *prop;
472
473 if(prop_list) {
474 DBG("MQTT - Writing %i property bytes\n", prop_list->properties_len + prop_list->properties_len_enc_bytes);
475 /* Write total length of properties */
476 PT_MQTT_WRITE_BYTES(conn,
477 prop_list->properties_len_enc,
478 prop_list->properties_len_enc_bytes);
479
480 prop = (struct mqtt_prop_out_property *)list_head(prop_list->props);
481 do {
482 if(prop != NULL) {
483 DBG("MQTT - Property ID %i len %i\n", prop->id, prop->property_len);
484 PT_MQTT_WRITE_BYTE(conn, prop->id);
485 PT_MQTT_WRITE_BYTES(conn,
486 prop->val,
487 prop->property_len);
488 }
489 prop = (struct mqtt_prop_out_property *)list_item_next(prop);
490 } while(prop != NULL);
491 } else {
492 /* Write Property Length */
493 DBG("MQTT - No properties to write\n");
494 PT_MQTT_WRITE_BYTE(conn, 0);
495 }
496
497 PT_END(pt);
498}
499#endif
500/*---------------------------------------------------------------------------*/
501static
502PT_THREAD(connect_pt(struct pt *pt, struct mqtt_connection *conn))
503{
504 PT_BEGIN(pt);
505
506#if MQTT_5
507 static struct mqtt_prop_list *will_props = MQTT_PROP_LIST_NONE;
508 if(conn->will.properties) {
509 will_props = (struct mqtt_prop_list *)list_head(conn->will.properties);
510 }
511#endif
512
513 DBG("MQTT - Sending CONNECT message...\n");
514
515 /* Set up FHDR */
516 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_CONNECT;
517 conn->out_packet.remaining_length = 0;
518 conn->out_packet.remaining_length += MQTT_CONNECT_VHDR_SIZE;
519 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->client_id);
520#if (MQTT_PROTOCOL_VERSION > MQTT_PROTOCOL_VERSION_3_1) && MQTT_SRV_SUPPORTS_EMPTY_CLIENT_ID
521 /* Ensure we leave space for the 2 length bytes (which will encode 0) */
522 if(MQTT_STRING_LENGTH(&conn->client_id) == 0) {
523 conn->out_packet.remaining_length += 2;
524 }
525#endif
526 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->credentials.username);
527 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->credentials.password);
528 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->will.topic);
529 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->will.message);
530
531#if MQTT_5
532 /* For connect properties */
533 conn->out_packet.remaining_length +=
534 conn->out_props ? (conn->out_props->properties_len + conn->out_props->properties_len_enc_bytes)
535 : 1;
536
537 /* For will properties */
538 if(conn->connect_vhdr_flags & MQTT_VHDR_WILL_FLAG) {
539 conn->out_packet.remaining_length +=
540 will_props ? will_props->properties_len + will_props->properties_len_enc_bytes
541 : 1;
542 }
543#endif
544
545 mqtt_encode_var_byte_int(conn->out_packet.remaining_length_enc,
546 &conn->out_packet.remaining_length_enc_bytes,
547 conn->out_packet.remaining_length);
548 if(conn->out_packet.remaining_length_enc_bytes > 4) {
549 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
550 PRINTF("MQTT - Error, remaining length > 4 bytes\n");
551 PT_EXIT(pt);
552 }
553
554 /* Write Fixed Header */
555 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
556 PT_MQTT_WRITE_BYTES(conn,
557 conn->out_packet.remaining_length_enc,
558 conn->out_packet.remaining_length_enc_bytes);
559 PT_MQTT_WRITE_BYTE(conn, 0);
560 PT_MQTT_WRITE_BYTE(conn, strlen(MQTT_PROTOCOL_NAME));
561 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)MQTT_PROTOCOL_NAME, strlen(MQTT_PROTOCOL_NAME));
562 PT_MQTT_WRITE_BYTE(conn, MQTT_PROTOCOL_VERSION);
563 PT_MQTT_WRITE_BYTE(conn, conn->connect_vhdr_flags);
564 PT_MQTT_WRITE_BYTE(conn, (conn->keep_alive >> 8));
565 PT_MQTT_WRITE_BYTE(conn, (conn->keep_alive & 0x00FF));
566
567#if MQTT_5
568 /* Write Properties */
569 write_out_props(pt, conn, conn->out_props);
570#endif
571
572 /* Write Payload */
573 PT_MQTT_WRITE_BYTE(conn, conn->client_id.length >> 8);
574 PT_MQTT_WRITE_BYTE(conn, conn->client_id.length & 0x00FF);
575 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->client_id.string,
576 conn->client_id.length);
577
578 if(conn->connect_vhdr_flags & MQTT_VHDR_WILL_FLAG) {
579#if MQTT_5
580 /* Write Will Properties */
581 DBG("MQTT - Writing will properties\n");
582 write_out_props(pt, conn, will_props);
583#endif
584 PT_MQTT_WRITE_BYTE(conn, conn->will.topic.length >> 8);
585 PT_MQTT_WRITE_BYTE(conn, conn->will.topic.length & 0x00FF);
586 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->will.topic.string,
587 conn->will.topic.length);
588 PT_MQTT_WRITE_BYTE(conn, conn->will.message.length >> 8);
589 PT_MQTT_WRITE_BYTE(conn, conn->will.message.length & 0x00FF);
590 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->will.message.string,
591 conn->will.message.length);
592 DBG("MQTT - Setting will topic to '%s' %u bytes and message to '%s' %u bytes\n",
593 conn->will.topic.string,
594 conn->will.topic.length,
595 conn->will.message.string,
596 conn->will.message.length);
597 }
598 if(conn->connect_vhdr_flags & MQTT_VHDR_USERNAME_FLAG) {
599 PT_MQTT_WRITE_BYTE(conn, conn->credentials.username.length >> 8);
600 PT_MQTT_WRITE_BYTE(conn, conn->credentials.username.length & 0x00FF);
601 PT_MQTT_WRITE_BYTES(conn,
602 (uint8_t *)conn->credentials.username.string,
603 conn->credentials.username.length);
604 }
605 if(conn->connect_vhdr_flags & MQTT_VHDR_PASSWORD_FLAG) {
606 PT_MQTT_WRITE_BYTE(conn, conn->credentials.password.length >> 8);
607 PT_MQTT_WRITE_BYTE(conn, conn->credentials.password.length & 0x00FF);
608 PT_MQTT_WRITE_BYTES(conn,
609 (uint8_t *)conn->credentials.password.string,
610 conn->credentials.password.length);
611 }
612
613 /* Send out buffer */
614 send_out_buffer(conn);
615 conn->state = MQTT_CONN_STATE_CONNECTING_TO_BROKER;
616
617 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
618
619 /* Wait for CONNACK */
620 reset_packet(&conn->in_packet);
621 PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
622 timer_expired(&conn->t));
623 if(timer_expired(&conn->t)) {
624 DBG("Timeout waiting for CONNACK\n");
625 /* We stick to the letter of the spec here: Tear the connection down */
626#if MQTT_5
627 mqtt_disconnect(conn, MQTT_PROP_LIST_NONE);
628#else
629 mqtt_disconnect(conn);
630#endif
631 }
632 reset_packet(&conn->in_packet);
633
634 DBG("MQTT - Done sending CONNECT\n");
635
636#if DEBUG_MQTT == 1
637 DBG("MQTT - CONNECT message sent: \n");
638 uint16_t i;
639 for(i = 0; i < (conn->out_buffer_ptr - conn->out_buffer); i++) {
640 DBG("%02X ", conn->out_buffer[i]);
641 }
642 DBG("\n");
643#endif
644
645 PT_END(pt);
646}
647/*---------------------------------------------------------------------------*/
648static
649PT_THREAD(disconnect_pt(struct pt *pt, struct mqtt_connection *conn))
650{
651 PT_BEGIN(pt);
652
653 PT_MQTT_WRITE_BYTE(conn, MQTT_FHDR_MSG_TYPE_DISCONNECT);
654 PT_MQTT_WRITE_BYTE(conn, 0);
655
656#if MQTT_5
657/* Write Properties */
658 write_out_props(pt, conn, conn->out_props);
659#endif
660
661 send_out_buffer(conn);
662
663 /*
664 * Wait a couple of seconds for a TCP ACK. We don't really need the ACK,
665 * we do want the TCP/IP stack to actually send this disconnect before we
666 * tear down the session.
667 */
668 timer_set(&conn->t, (CLOCK_SECOND * 2));
669 PT_WAIT_UNTIL(pt, conn->out_buffer_sent || timer_expired(&conn->t));
670
671 PT_END(pt);
672}
673/*---------------------------------------------------------------------------*/
674static
675PT_THREAD(subscribe_pt(struct pt *pt, struct mqtt_connection *conn))
676{
677 PT_BEGIN(pt);
678
679 DBG("MQTT - Sending subscribe message! topic %s topic_length %i\n",
680 conn->out_packet.topic,
681 conn->out_packet.topic_length);
682 DBG("MQTT - Buffer space is %i \n",
683 &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
684
685 /* Set up FHDR */
686 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_SUBSCRIBE | MQTT_FHDR_QOS_LEVEL_1;
687 conn->out_packet.remaining_length = MQTT_MID_SIZE +
688 MQTT_STRING_LEN_SIZE +
689 conn->out_packet.topic_length +
690 MQTT_QOS_SIZE;
691
692#if MQTT_5
693 conn->out_packet.remaining_length +=
694 conn->out_props ? (conn->out_props->properties_len + conn->out_props->properties_len_enc_bytes)
695 : 1;
696#endif
697
698 mqtt_encode_var_byte_int(conn->out_packet.remaining_length_enc,
699 &conn->out_packet.remaining_length_enc_bytes,
700 conn->out_packet.remaining_length);
701 if(conn->out_packet.remaining_length_enc_bytes > 4) {
702 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
703 PRINTF("MQTT - Error, remaining length > 4 bytes\n");
704 PT_EXIT(pt);
705 }
706
707 /* Write Fixed Header */
708 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
709 PT_MQTT_WRITE_BYTES(conn,
710 conn->out_packet.remaining_length_enc,
711 conn->out_packet.remaining_length_enc_bytes);
712 /* Write Variable Header */
713 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid >> 8));
714 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
715
716#if MQTT_5
717 /* Write Properties */
718 write_out_props(pt, conn, conn->out_props);
719#endif
720
721 /* Write Payload */
722 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
723 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
724 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
725 conn->out_packet.topic_length);
726
727#if MQTT_5
728 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.sub_options);
729#else
730 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.qos);
731#endif
732
733 /* Send out buffer */
734 send_out_buffer(conn);
735 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
736
737 /* Wait for SUBACK. */
738 reset_packet(&conn->in_packet);
739 PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
740 timer_expired(&conn->t));
741
742 if(timer_expired(&conn->t)) {
743 DBG("Timeout waiting for SUBACK\n");
744 }
745 reset_packet(&conn->in_packet);
746
747 /* This is clear after the entire transaction is complete */
748 conn->out_queue_full = 0;
749
750 DBG("MQTT - Done in send_subscribe!\n");
751
752 PT_END(pt);
753}
754/*---------------------------------------------------------------------------*/
755static
756PT_THREAD(unsubscribe_pt(struct pt *pt, struct mqtt_connection *conn))
757{
758 PT_BEGIN(pt);
759
760 DBG("MQTT - Sending unsubscribe message on topic %s topic_length %i\n",
761 conn->out_packet.topic,
762 conn->out_packet.topic_length);
763 DBG("MQTT - Buffer space is %i \n",
764 &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
765
766 /* Set up FHDR */
767 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_UNSUBSCRIBE |
768 MQTT_FHDR_QOS_LEVEL_1;
769 conn->out_packet.remaining_length = MQTT_MID_SIZE +
770 MQTT_STRING_LEN_SIZE +
771 conn->out_packet.topic_length;
772
773#if MQTT_5
774 conn->out_packet.remaining_length +=
775 conn->out_props ? (conn->out_props->properties_len + conn->out_props->properties_len_enc_bytes)
776 : 1;
777#endif
778
779 mqtt_encode_var_byte_int(conn->out_packet.remaining_length_enc,
780 &conn->out_packet.remaining_length_enc_bytes,
781 conn->out_packet.remaining_length);
782 if(conn->out_packet.remaining_length_enc_bytes > 4) {
783 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
784 PRINTF("MQTT - Error, remaining length > 4 bytes\n");
785 PT_EXIT(pt);
786 }
787
788 /* Write Fixed Header */
789 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
790 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc,
791 conn->out_packet.remaining_length_enc_bytes);
792
793 /* Write Variable Header */
794 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid >> 8));
795 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
796#if MQTT_5
797 /* Write Properties */
798 write_out_props(pt, conn, conn->out_props);
799#endif
800
801 /* Write Payload */
802 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
803 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
804 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
805 conn->out_packet.topic_length);
806
807 /* Send out buffer */
808 send_out_buffer(conn);
809 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
810
811 /* Wait for UNSUBACK */
812 reset_packet(&conn->in_packet);
813 PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
814 timer_expired(&conn->t));
815
816 if(timer_expired(&conn->t)) {
817 DBG("Timeout waiting for UNSUBACK\n");
818 }
819
820 reset_packet(&conn->in_packet);
821
822 /* This is clear after the entire transaction is complete */
823 conn->out_queue_full = 0;
824
825 DBG("MQTT - Done writing subscribe message to out buffer!\n");
826
827 PT_END(pt);
828}
829/*---------------------------------------------------------------------------*/
830static
831PT_THREAD(publish_pt(struct pt *pt, struct mqtt_connection *conn))
832{
833 PT_BEGIN(pt);
834
835 DBG("MQTT - Sending publish message! topic %s topic_length %i\n",
836 conn->out_packet.topic,
837 conn->out_packet.topic_length);
838 DBG("MQTT - Buffer space is %i \n",
839 &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
840
841 /* Set up FHDR */
842 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_PUBLISH |
843 conn->out_packet.qos << 1;
844 if(conn->out_packet.retain == MQTT_RETAIN_ON) {
845 conn->out_packet.fhdr |= MQTT_FHDR_RETAIN_FLAG;
846 }
847 conn->out_packet.remaining_length = MQTT_STRING_LEN_SIZE +
848 conn->out_packet.topic_length +
849 conn->out_packet.payload_size;
850 if(conn->out_packet.qos > MQTT_QOS_LEVEL_0) {
851 conn->out_packet.remaining_length += MQTT_MID_SIZE;
852 }
853
854#if MQTT_5
855 conn->out_packet.remaining_length +=
856 conn->out_props ? (conn->out_props->properties_len + conn->out_props->properties_len_enc_bytes)
857 : 1;
858#endif
859
860 mqtt_encode_var_byte_int(conn->out_packet.remaining_length_enc,
861 &conn->out_packet.remaining_length_enc_bytes,
862 conn->out_packet.remaining_length);
863 if(conn->out_packet.remaining_length_enc_bytes > 4) {
864 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
865 PRINTF("MQTT - Error, remaining length > 4 bytes\n");
866 PT_EXIT(pt);
867 }
868
869 /* The DUP flag MUST be set to 0 for all QoS 0 messages */
870 if(conn->out_packet.qos == MQTT_QOS_LEVEL_0) {
871 conn->out_packet.fhdr &= ~MQTT_FHDR_DUP_FLAG;
872 }
873
874 /* Write Fixed Header */
875 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
876 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc,
877 conn->out_packet.remaining_length_enc_bytes);
878 /* Write Variable Header */
879 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
880 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
881 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
882 conn->out_packet.topic_length);
883 if(conn->out_packet.qos > MQTT_QOS_LEVEL_0) {
884 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid >> 8));
885 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
886 }
887
888#if MQTT_5
889 /* Write Properties */
890 write_out_props(pt, conn, conn->out_props);
891#endif
892
893 /* Write Payload */
894 PT_MQTT_WRITE_BYTES(conn,
895 conn->out_packet.payload,
896 conn->out_packet.payload_size);
897
898 send_out_buffer(conn);
899 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
900
901 /*
902 * If QoS is zero then wait until the message has been sent, since there is
903 * no ACK to wait for.
904 *
905 * Also notify the app will not be notified via PUBACK or PUBCOMP
906 */
907 if(conn->out_packet.qos == 0) {
908 process_post(conn->app_process, mqtt_update_event, NULL);
909 } else if(conn->out_packet.qos == 1) {
910 /* Wait for PUBACK */
911 reset_packet(&conn->in_packet);
912 PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
913 timer_expired(&conn->t));
914 if(timer_expired(&conn->t)) {
915 DBG("Timeout waiting for PUBACK\n");
916 }
917 if(conn->in_packet.mid != conn->out_packet.mid) {
918 DBG("MQTT - Warning, got PUBACK with none matching MID. Currently there "
919 "is no support for several concurrent PUBLISH messages.\n");
920 }
921 } else if(conn->out_packet.qos == 2) {
922 DBG("MQTT - QoS not implemented yet.\n");
923 /* Should wait for PUBREC, send PUBREL and then wait for PUBCOMP */
924 }
925
926 reset_packet(&conn->in_packet);
927
928 /* This is clear after the entire transaction is complete */
929 conn->out_queue_full = 0;
930
931 DBG("MQTT - Publish Enqueued\n");
932
933 PT_END(pt);
934}
935/*---------------------------------------------------------------------------*/
936static
937PT_THREAD(pingreq_pt(struct pt *pt, struct mqtt_connection *conn))
938{
939 PT_BEGIN(pt);
940
941 DBG("MQTT - Sending PINGREQ\n");
942
943 /* Write Fixed Header */
944 PT_MQTT_WRITE_BYTE(conn, MQTT_FHDR_MSG_TYPE_PINGREQ);
945 PT_MQTT_WRITE_BYTE(conn, 0);
946
947 send_out_buffer(conn);
948
949 /* Start timeout for reply. */
950 conn->waiting_for_pingresp = 1;
951
952 /* Wait for PINGRESP or timeout */
953 reset_packet(&conn->in_packet);
954 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
955
956 PT_WAIT_UNTIL(pt, conn->in_packet.packet_received || timer_expired(&conn->t));
957
958 reset_packet(&conn->in_packet);
959
960 conn->waiting_for_pingresp = 0;
961
962 PT_END(pt);
963}
964/*---------------------------------------------------------------------------*/
965#if MQTT_5
966static
967PT_THREAD(auth_pt(struct pt *pt, struct mqtt_connection *conn))
968{
969 PT_BEGIN(pt);
970
971 conn->out_packet.remaining_length +=
972 conn->out_props ? (conn->out_props->properties_len + conn->out_props->properties_len_enc_bytes)
973 : 1;
974
975 mqtt_encode_var_byte_int(conn->out_packet.remaining_length_enc,
976 &conn->out_packet.remaining_length_enc_bytes,
977 conn->out_packet.remaining_length);
978
979 if(conn->out_packet.remaining_length_enc_bytes > 4) {
980 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
981 PRINTF("MQTT - Error, remaining length > 4 bytes\n");
982 PT_EXIT(pt);
983 }
984
985 /* Write Fixed Header */
986 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
987 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc,
988 conn->out_packet.remaining_length_enc_bytes);
989
990 /* Write Variable Header */
991 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.auth_reason_code);
992
993 /* Write Properties */
994 write_out_props(pt, conn, conn->out_props);
995
996 /* No Payload */
997 send_out_buffer(conn);
998
999 PT_WAIT_UNTIL(pt, conn->out_buffer_sent);
1000
1001 PT_END(pt);
1002}
1003#endif
1004/*---------------------------------------------------------------------------*/
1005static void
1006handle_connack(struct mqtt_connection *conn)
1007{
1008 struct mqtt_connack_event connack_event;
1009
1010 DBG("MQTT - Got CONNACK\n");
1011
1012#if MQTT_PROTOCOL_VERSION <= MQTT_PROTOCOL_VERSION_3_1_1
1013 if(conn->in_packet.remaining_length != 2) {
1014 PRINTF("MQTT - CONNACK VHDR remaining length %i incorrect\n",
1015 conn->in_packet.remaining_length);
1016 call_event(conn,
1017 MQTT_EVENT_ERROR,
1018 NULL);
1019 abort_connection(conn);
1020 return;
1021 }
1022
1023 if(conn->in_packet.payload[1] != 0) {
1024 PRINTF("MQTT - Connection refused with Return Code %i\n",
1025 conn->in_packet.payload[1]);
1026 call_event(conn,
1027 MQTT_EVENT_CONNECTION_REFUSED_ERROR,
1028 &conn->in_packet.payload[1]);
1029 abort_connection(conn);
1030 return;
1031 }
1032#endif
1033
1034 conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
1035
1036#if MQTT_PROTOCOL_VERSION >= MQTT_PROTOCOL_VERSION_3_1_1
1037 connack_event.session_present = conn->in_packet.payload[0] & MQTT_VHDR_CONNACK_SESSION_PRESENT;
1038#endif
1039
1040#if MQTT_PROTOCOL_VERSION >= MQTT_PROTOCOL_VERSION_5
1041 /* The CONNACK VHDR must contain:
1042 * 0: Connect Acknowledge Flags
1043 * 1: Connect Reason Code
1044 * 2: Properties (whose Length field must be set even if no properties are present)
1045 */
1046 if(conn->in_packet.remaining_length < 3) {
1047 PRINTF("MQTT - CONNACK VHDR remaining length %i incorrect\n",
1048 conn->in_packet.remaining_length);
1049 call_event(conn,
1050 MQTT_EVENT_ERROR,
1051 NULL);
1052 abort_connection(conn);
1053 return;
1054 }
1055 mqtt_prop_parse_connack_props(conn);
1056#endif
1057
1058 ctimer_set(&conn->keep_alive_timer, conn->keep_alive * CLOCK_SECOND,
1059 keep_alive_callback, conn);
1060
1061 /* Always reset packet before callback since it might be used directly */
1062 conn->state = MQTT_CONN_STATE_CONNECTED_TO_BROKER;
1063 call_event(conn, MQTT_EVENT_CONNECTED, &connack_event);
1064}
1065/*---------------------------------------------------------------------------*/
1066static void
1067handle_pingresp(struct mqtt_connection *conn)
1068{
1069 DBG("MQTT - Got PINGRESP\n");
1070}
1071/*---------------------------------------------------------------------------*/
1072static void
1073handle_suback(struct mqtt_connection *conn)
1074{
1075 struct mqtt_suback_event suback_event;
1076
1077 DBG("MQTT - Got SUBACK\n");
1078
1079 /* Only accept SUBACKS with X topic QoS response, assume 1 */
1080#if MQTT_5
1081 if(conn->in_packet.remaining_length > MQTT_MID_SIZE +
1082 MQTT_MAX_TOPICS_PER_SUBSCRIBE * MQTT_QOS_SIZE +
1083 conn->in_packet.properties_len + conn->in_packet.properties_enc_len) {
1084#else
1085 if(conn->in_packet.remaining_length > MQTT_MID_SIZE +
1086 MQTT_MAX_TOPICS_PER_SUBSCRIBE * MQTT_QOS_SIZE) {
1087#endif
1088 DBG("MQTT - Error, SUBACK with > 1 topic, not supported.\n");
1089 }
1090
1091 conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
1092
1093 suback_event.mid = conn->in_packet.mid;
1094
1095#if !MQTT_31
1096 suback_event.success = 0;
1097
1098 switch(conn->in_packet.payload_start[0]) {
1099 case MQTT_SUBACK_RET_FAIL:
1100 PRINTF("MQTT - Error, SUBSCRIBE failed with SUBACK return code '%x'", conn->in_packet.payload_start[0]);
1101 break;
1102
1103 case MQTT_SUBACK_RET_QOS_0:
1104 case MQTT_SUBACK_RET_QOS_1:
1105 case MQTT_SUBACK_RET_QOS_2:
1106 suback_event.qos_level = conn->in_packet.payload_start[0] & 0x03;
1107 suback_event.success = 1;
1108 break;
1109
1110 default:
1111 PRINTF("MQTT - Error, Unrecognised SUBACK return code '%x'", conn->in_packet.payload_start[0]);
1112 break;
1113 }
1114
1115 suback_event.return_code = conn->in_packet.payload_start[0];
1116#else
1117 suback_event.qos_level = conn->in_packet.payload_start[0];
1118#endif
1119
1120 if(conn->in_packet.mid != conn->out_packet.mid) {
1121 DBG("MQTT - Warning, got SUBACK with none matching MID. Currently there is"
1122 "no support for several concurrent SUBSCRIBE messages.\n");
1123 }
1124
1125 /* Always reset packet before callback since it might be used directly */
1126 call_event(conn, MQTT_EVENT_SUBACK, &suback_event);
1127}
1128/*---------------------------------------------------------------------------*/
1129static void
1130handle_unsuback(struct mqtt_connection *conn)
1131{
1132 DBG("MQTT - Got UNSUBACK\n");
1133
1134 conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
1135
1136 if(conn->in_packet.mid != conn->out_packet.mid) {
1137 DBG("MQTT - Warning, got UNSUBACK with none matching MID. Currently there is"
1138 "no support for several concurrent UNSUBSCRIBE messages.\n");
1139 }
1140
1141 call_event(conn, MQTT_EVENT_UNSUBACK, &conn->in_packet.mid);
1142}
1143/*---------------------------------------------------------------------------*/
1144static void
1145handle_puback(struct mqtt_connection *conn)
1146{
1147 DBG("MQTT - Got PUBACK\n");
1148
1149 conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
1150
1151 call_event(conn, MQTT_EVENT_PUBACK, &conn->in_packet.mid);
1152}
1153/*---------------------------------------------------------------------------*/
1154static mqtt_pub_status_t
1155handle_publish(struct mqtt_connection *conn)
1156{
1157 DBG("MQTT - Got PUBLISH, called once per manageable chunk of message.\n");
1158 DBG("MQTT - Handling publish on topic '%s'\n", conn->in_publish_msg.topic);
1159
1160#if MQTT_PROTOCOL_VERSION >= MQTT_PROTOCOL_VERSION_3_1_1
1161 if(strlen(conn->in_publish_msg.topic) < conn->in_packet.topic_len) {
1162 DBG("NULL detected in received PUBLISH topic\n");
1163#if MQTT_5
1164 mqtt_disconnect(conn, MQTT_PROP_LIST_NONE);
1165#else
1166 mqtt_disconnect(conn);
1167#endif
1168 return MQTT_PUBLISH_ERR;
1169 }
1170#endif
1171
1172 DBG("MQTT - This chunk is %i bytes\n", conn->in_publish_msg.payload_chunk_length);
1173
1174 if(((conn->in_packet.fhdr & 0x09) >> 1) != 0) {
1175 PRINTF("MQTT - Error, got incoming PUBLISH with QoS > 0, not supported atm!\n");
1176 }
1177
1178 call_event(conn, MQTT_EVENT_PUBLISH, &conn->in_publish_msg);
1179
1180 if(conn->in_publish_msg.first_chunk == 1) {
1181 conn->in_publish_msg.first_chunk = 0;
1182 }
1183
1184 /* If this is the last time handle_publish will be called, reset packet. */
1185 if(conn->in_publish_msg.payload_left == 0) {
1186
1187 /* Check for QoS and initiate the reply, do not rely on the data in the
1188 * in_packet being untouched. */
1189
1190 DBG("MQTT - (handle_publish) resetting packet.\n");
1191 reset_packet(&conn->in_packet);
1192 }
1193
1194 return MQTT_PUBLISH_OK;
1195}
1196/*---------------------------------------------------------------------------*/
1197static void
1198parse_publish_vhdr(struct mqtt_connection *conn,
1199 uint32_t *pos,
1200 const uint8_t *input_data_ptr,
1201 int input_data_len)
1202{
1203 uint16_t copy_bytes;
1204
1205 /* Read out topic length */
1206 if(conn->in_packet.topic_len_received == 0) {
1207 conn->in_packet.topic_pos = 0;
1208 conn->in_packet.topic_len = (input_data_ptr[(*pos)++] << 8);
1209 conn->in_packet.byte_counter++;
1210 if(*pos >= input_data_len) {
1211 return;
1212 }
1213 conn->in_packet.topic_len |= input_data_ptr[(*pos)++];
1214 conn->in_packet.byte_counter++;
1215 conn->in_packet.topic_len_received = 1;
1216 /* Abort if topic is longer than our topic buffer */
1217 if(conn->in_packet.topic_len > MQTT_MAX_TOPIC_LENGTH) {
1218 DBG("MQTT - topic too long %u/%u\n", conn->in_packet.topic_len, MQTT_MAX_TOPIC_LENGTH);
1219 return;
1220 }
1221 DBG("MQTT - Read PUBLISH topic len %i\n", conn->in_packet.topic_len);
1222 /* WARNING: Check here if TOPIC fits in payload area, otherwise error */
1223 }
1224
1225 /* Read out topic */
1226 if(conn->in_packet.topic_len_received == 1 &&
1227 conn->in_packet.topic_received == 0) {
1228 copy_bytes = MIN(conn->in_packet.topic_len - conn->in_packet.topic_pos,
1229 input_data_len - *pos);
1230 DBG("MQTT - topic_pos: %i copy_bytes: %i\n", conn->in_packet.topic_pos,
1231 copy_bytes);
1232 memcpy(&conn->in_publish_msg.topic[conn->in_packet.topic_pos],
1233 &input_data_ptr[*pos],
1234 copy_bytes);
1235 (*pos) += copy_bytes;
1236 conn->in_packet.byte_counter += copy_bytes;
1237 conn->in_packet.topic_pos += copy_bytes;
1238
1239 if(conn->in_packet.topic_len - conn->in_packet.topic_pos == 0) {
1240 DBG("MQTT - Got topic '%s'", conn->in_publish_msg.topic);
1241 conn->in_packet.topic_received = 1;
1242 conn->in_publish_msg.topic[conn->in_packet.topic_pos] = '\0';
1243 conn->in_publish_msg.payload_length =
1244 conn->in_packet.remaining_length - conn->in_packet.topic_len - 2;
1245 conn->in_publish_msg.payload_left = conn->in_publish_msg.payload_length;
1246 }
1247
1248 /* Set this once per incomming publish message */
1249 conn->in_publish_msg.first_chunk = 1;
1250 }
1251}
1252/*---------------------------------------------------------------------------*/
1253/* MQTTv5 only */
1254#if MQTT_5
1255static void
1256handle_disconnect(struct mqtt_connection *conn)
1257{
1258 DBG("MQTT - (handle_disconnect) Got DISCONNECT.\n");
1259 call_event(conn, MQTT_EVENT_DISCONNECTED, NULL);
1260 abort_connection(conn);
1261}
1262/*---------------------------------------------------------------------------*/
1263static void
1264handle_auth(struct mqtt_connection *conn)
1265{
1266 struct mqtt_prop_auth_event event;
1267
1268 DBG("MQTT - (handle_auth) Got AUTH.\n");
1269
1270 if((conn->in_packet.fhdr & 0x0F) != 0x0) {
1271 call_event(conn,
1272 MQTT_EVENT_ERROR,
1273 NULL);
1274 abort_connection(conn);
1275 return;
1276 }
1277
1278 /* AUTH messages from the server */
1279 if(conn->state == MQTT_CONN_STATE_CONNECTING_TO_BROKER &&
1280 (!conn->in_packet.has_reason_code ||
1281 conn->in_packet.reason_code != MQTT_VHDR_RC_CONTINUE_AUTH)) {
1282 DBG("MQTT - (handle_auth) Not reauth - Reason Code 0x18 expected!\n");
1283 }
1284
1285 mqtt_prop_parse_auth_props(conn, &event);
1286 call_event(conn, MQTT_EVENT_AUTH, &event);
1287}
1288#endif
1289/*---------------------------------------------------------------------------*/
1290static void
1291parse_vhdr(struct mqtt_connection *conn)
1292{
1293 conn->in_packet.payload_start = conn->in_packet.payload;
1294
1295 /* Some message types include a packet identifier */
1296 switch(conn->in_packet.fhdr & 0xF0) {
1297 case MQTT_FHDR_MSG_TYPE_PUBACK:
1298 case MQTT_FHDR_MSG_TYPE_SUBACK:
1299 case MQTT_FHDR_MSG_TYPE_UNSUBACK:
1300 conn->in_packet.mid = (conn->in_packet.payload[0] << 8) |
1301 (conn->in_packet.payload[1]);
1302 conn->in_packet.payload_start += 2;
1303 break;
1304
1305 /* Other message types have a 0-length VHDR */
1306 /* PUBLISH has a VHDR for QoS > 0, which is currently unsupported */
1307 default:
1308 break;
1309 }
1310
1311#if MQTT_5
1312 /* CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, DISCONNECT and AUTH have a single
1313 * Reason Code as part of the Variable Header.
1314 * SUBACK and UNSUBACK contain a list of one or more Reason Codes in the Payload.
1315 */
1316 switch(conn->in_packet.fhdr & 0xF0) {
1317 case MQTT_FHDR_MSG_TYPE_CONNACK:
1318 case MQTT_FHDR_MSG_TYPE_PUBACK:
1319 case MQTT_FHDR_MSG_TYPE_PUBREC:
1320 case MQTT_FHDR_MSG_TYPE_PUBREL:
1321 case MQTT_FHDR_MSG_TYPE_PUBCOMP:
1322 case MQTT_FHDR_MSG_TYPE_DISCONNECT:
1323 case MQTT_FHDR_MSG_TYPE_AUTH:
1324 conn->in_packet.reason_code = conn->in_packet.payload_start[0];
1325 conn->in_packet.has_reason_code = 1;
1326 conn->in_packet.payload_start += 1;
1327 break;
1328
1329 default:
1330 conn->in_packet.has_reason_code = 0;
1331 break;
1332 }
1333
1334 if(!conn->in_packet.has_props) {
1335 mqtt_prop_decode_input_props(conn);
1336 }
1337#endif
1338}
1339/*---------------------------------------------------------------------------*/
1340static int
1341tcp_input(struct tcp_socket *s,
1342 void *ptr,
1343 const uint8_t *input_data_ptr,
1344 int input_data_len)
1345{
1346 struct mqtt_connection *conn = ptr;
1347 uint32_t pos = 0;
1348 uint32_t copy_bytes = 0;
1349 mqtt_pub_status_t pub_status;
1350 uint8_t remaining_length_bytes;
1351
1352 if(input_data_len == 0) {
1353 return 0;
1354 }
1355
1356 if(conn->in_packet.packet_received) {
1357 reset_packet(&conn->in_packet);
1358 }
1359
1360 DBG("tcp_input with %i bytes of data:\n", input_data_len);
1361
1362 /* Read the fixed header field, if we do not have it */
1363 if(!conn->in_packet.fhdr) {
1364 conn->in_packet.fhdr = input_data_ptr[pos++];
1365 conn->in_packet.byte_counter++;
1366
1367 DBG("MQTT - Read VHDR '%02X'\n", conn->in_packet.fhdr);
1368
1369 if(pos >= input_data_len) {
1370 return 0;
1371 }
1372 }
1373
1374 /* Read the Remaining Length field, if we do not have it */
1375 if(!conn->in_packet.has_remaining_length) {
1376 remaining_length_bytes =
1377 mqtt_decode_var_byte_int(input_data_ptr, input_data_len, &pos,
1378 &conn->in_packet.byte_counter,
1379 &conn->in_packet.remaining_length);
1380
1381 if(remaining_length_bytes == 0) {
1382 call_event(conn, MQTT_EVENT_ERROR, NULL);
1383 return 0;
1384 }
1385
1386 DBG("MQTT - Finished reading remaining length byte\n");
1387 conn->in_packet.has_remaining_length = 1;
1388 }
1389
1390 /*
1391 * Check for unsupported payload length. Will read all incoming data from the
1392 * server in any case and then reset the packet.
1393 *
1394 * TODO: Decide if we, for example, want to disconnect instead.
1395 */
1396 if((conn->in_packet.remaining_length > MQTT_INPUT_BUFF_SIZE) &&
1397 (conn->in_packet.fhdr & 0xF0) != MQTT_FHDR_MSG_TYPE_PUBLISH) {
1398
1399 PRINTF("MQTT - Error, unsupported payload size for non-PUBLISH message\n");
1400
1401 conn->in_packet.byte_counter += input_data_len;
1402 if(conn->in_packet.byte_counter >=
1403 (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)) {
1404 conn->in_packet.packet_received = 1;
1405 }
1406 return 0;
1407 }
1408
1409 /*
1410 * Supported payload, reads out both VHDR and Payload of all packets.
1411 *
1412 * Note: There will always be at least one byte left to read when we enter
1413 * this loop.
1414 */
1415 while(conn->in_packet.byte_counter <
1416 (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)) {
1417
1418 if((conn->in_packet.fhdr & 0xF0) == MQTT_FHDR_MSG_TYPE_PUBLISH &&
1419 conn->in_packet.topic_received == 0) {
1420 parse_publish_vhdr(conn, &pos, input_data_ptr, input_data_len);
1421 }
1422
1423 /* Read in as much as we can into the packet payload */
1424 copy_bytes = MIN(input_data_len - pos,
1425 MQTT_INPUT_BUFF_SIZE - conn->in_packet.payload_pos);
1426 DBG("- Copied %i payload bytes\n", copy_bytes);
1427 memcpy(&conn->in_packet.payload[conn->in_packet.payload_pos],
1428 &input_data_ptr[pos],
1429 copy_bytes);
1430 conn->in_packet.byte_counter += copy_bytes;
1431 conn->in_packet.payload_pos += copy_bytes;
1432 pos += copy_bytes;
1433
1434#if DEBUG_MQTT == 1
1435 uint32_t i;
1436 DBG("MQTT - Copied bytes: \n");
1437 for(i = 0; i < copy_bytes; i++) {
1438 DBG("%02X ", conn->in_packet.payload[i]);
1439 }
1440 DBG("\n");
1441#endif
1442
1443 /* Full buffer, shall only happen to PUBLISH messages. */
1444 if(MQTT_INPUT_BUFF_SIZE - conn->in_packet.payload_pos == 0) {
1445 conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1446 conn->in_publish_msg.payload_chunk_length = MQTT_INPUT_BUFF_SIZE;
1447 conn->in_publish_msg.payload_left -= MQTT_INPUT_BUFF_SIZE;
1448
1449#if MQTT_5
1450 if(!conn->in_packet.has_props) {
1451 mqtt_prop_decode_input_props(conn);
1452 }
1453
1454 if(conn->in_publish_msg.first_chunk) {
1455 conn->in_publish_msg.payload_chunk_length -= conn->in_packet.properties_len +
1456 conn->in_packet.properties_enc_len;
1457
1458 /* Payload chunk should point past the MQTT properties and to the payload itself */
1459 conn->in_publish_msg.payload_chunk += conn->in_packet.properties_len +
1460 conn->in_packet.properties_enc_len;
1461 }
1462#endif
1463
1464 pub_status = handle_publish(conn);
1465
1466 conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1467 conn->in_packet.payload_pos = 0;
1468
1469 if(pub_status != MQTT_PUBLISH_OK) {
1470 return 0;
1471 }
1472 }
1473
1474 if(pos >= input_data_len &&
1475 (conn->in_packet.byte_counter < (MQTT_FHDR_SIZE + conn->in_packet.remaining_length))) {
1476 return 0;
1477 }
1478 }
1479
1480 parse_vhdr(conn);
1481
1482 /* Debug information */
1483 DBG("\n");
1484 /* Take care of input */
1485 DBG("MQTT - Finished reading packet!\n");
1486 /* What to return? */
1487 DBG("MQTT - total data was %i bytes of data. \n",
1488 (MQTT_FHDR_SIZE + conn->in_packet.remaining_length));
1489
1490#if MQTT_5
1491 if(conn->in_packet.has_reason_code &&
1492 conn->in_packet.reason_code >= MQTT_VHDR_RC_UNSPEC_ERR) {
1493 PRINTF("MQTT - Reason Code indicated error %i\n",
1494 conn->in_packet.reason_code);
1495 call_event(conn,
1496 MQTT_EVENT_ERROR,
1497 NULL);
1498 abort_connection(conn);
1499 return 0;
1500 }
1501#endif
1502
1503 /* Handle packet here. */
1504 switch(conn->in_packet.fhdr & 0xF0) {
1505 case MQTT_FHDR_MSG_TYPE_CONNACK:
1506 handle_connack(conn);
1507 break;
1508 case MQTT_FHDR_MSG_TYPE_PUBLISH:
1509 /* This is the only or the last chunk of publish payload */
1510 conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1511 conn->in_publish_msg.payload_chunk_length = conn->in_packet.payload_pos;
1512 conn->in_publish_msg.payload_left = 0;
1513
1514 DBG("MQTT - First chunk? %i\n", conn->in_publish_msg.first_chunk);
1515#if MQTT_5
1516 if(conn->in_publish_msg.first_chunk) {
1517 conn->in_publish_msg.payload_chunk_length -= conn->in_packet.properties_len +
1518 conn->in_packet.properties_enc_len;
1519 /* Payload chunk should point past the MQTT properties and to the payload itself */
1520 conn->in_publish_msg.payload_chunk += conn->in_packet.properties_len +
1521 conn->in_packet.properties_enc_len;
1522 }
1523#endif
1524 (void)handle_publish(conn);
1525 break;
1526 case MQTT_FHDR_MSG_TYPE_PUBACK:
1527 handle_puback(conn);
1528 break;
1529 case MQTT_FHDR_MSG_TYPE_SUBACK:
1530 handle_suback(conn);
1531 break;
1532 case MQTT_FHDR_MSG_TYPE_UNSUBACK:
1533 handle_unsuback(conn);
1534 break;
1535 case MQTT_FHDR_MSG_TYPE_PINGRESP:
1536 handle_pingresp(conn);
1537 break;
1538
1539 /* QoS 2 not implemented yet */
1540 case MQTT_FHDR_MSG_TYPE_PUBREC:
1541 case MQTT_FHDR_MSG_TYPE_PUBREL:
1542 case MQTT_FHDR_MSG_TYPE_PUBCOMP:
1543 call_event(conn, MQTT_EVENT_NOT_IMPLEMENTED_ERROR, NULL);
1544 PRINTF("MQTT - Got unhandled MQTT Message Type '%i'",
1545 (conn->in_packet.fhdr & 0xF0));
1546 break;
1547
1548#if MQTT_PROTOCOL_VERSION >= MQTT_PROTOCOL_VERSION_5
1549 case MQTT_FHDR_MSG_TYPE_DISCONNECT:
1550 handle_disconnect(conn);
1551 break;
1552
1553 case MQTT_FHDR_MSG_TYPE_AUTH:
1554 handle_auth(conn);
1555 break;
1556#endif
1557
1558 default:
1559 /* All server-only message */
1560 PRINTF("MQTT - Got MQTT Message Type '%i'", (conn->in_packet.fhdr & 0xF0));
1561 break;
1562 }
1563
1564 conn->in_packet.packet_received = 1;
1565
1566 return 0;
1567}
1568/*---------------------------------------------------------------------------*/
1569/*
1570 * Handles TCP events from Simple TCP
1571 */
1572static void
1573tcp_event(struct tcp_socket *s, void *ptr, tcp_socket_event_t event)
1574{
1575 struct mqtt_connection *conn = ptr;
1576
1577 /* Take care of event */
1578 switch(event) {
1579
1580 /* Fall through to manage different disconnect event the same way. */
1581 case TCP_SOCKET_CLOSED:
1582 case TCP_SOCKET_TIMEDOUT:
1583 case TCP_SOCKET_ABORTED: {
1584
1585 DBG("MQTT - Disconnected by tcp event %d\n", event);
1586 process_post(&mqtt_process, mqtt_abort_now_event, conn);
1587 conn->state = MQTT_CONN_STATE_NOT_CONNECTED;
1588 ctimer_stop(&conn->keep_alive_timer);
1589 call_event(conn, MQTT_EVENT_DISCONNECTED, &event);
1590 abort_connection(conn);
1591
1592 /* If connecting retry */
1593 if(conn->auto_reconnect == 1) {
1594 connect_tcp(conn);
1595 }
1596 break;
1597 }
1598 case TCP_SOCKET_CONNECTED: {
1599 conn->state = MQTT_CONN_STATE_TCP_CONNECTED;
1600 conn->out_buffer_sent = 1;
1601
1602 process_post(&mqtt_process, mqtt_do_connect_mqtt_event, conn);
1603 break;
1604 }
1605 case TCP_SOCKET_DATA_SENT: {
1606 DBG("MQTT - Got TCP_DATA_SENT\n");
1607
1608 if(conn->socket.output_data_len == 0) {
1609 conn->out_buffer_sent = 1;
1610 conn->out_buffer_ptr = conn->out_buffer;
1611 }
1612
1613 ctimer_restart(&conn->keep_alive_timer);
1614 break;
1615 }
1616
1617 default: {
1618 DBG("MQTT - TCP Event %d is currently not managed by the tcp event callback\n",
1619 event);
1620 }
1621 }
1622}
1623/*---------------------------------------------------------------------------*/
1624PROCESS_THREAD(mqtt_process, ev, data)
1625{
1626 static struct mqtt_connection *conn;
1627
1628 PROCESS_BEGIN();
1629
1630 while(1) {
1632
1633 if(ev == mqtt_abort_now_event) {
1634 DBG("MQTT - Abort\n");
1635 conn = data;
1636 conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE;
1637
1638 abort_connection(conn);
1639 }
1640 if(ev == mqtt_do_connect_tcp_event) {
1641 conn = data;
1642 DBG("MQTT - Got mqtt_do_connect_tcp_event!\n");
1643 connect_tcp(conn);
1644 }
1645 if(ev == mqtt_do_connect_mqtt_event) {
1646 conn = data;
1647 conn->socket.output_data_max_seg = conn->max_segment_size;
1648 DBG("MQTT - Got mqtt_do_connect_mqtt_event!\n");
1649
1650 if(conn->out_buffer_sent == 1) {
1651 PT_INIT(&conn->out_proto_thread);
1652 while(connect_pt(&conn->out_proto_thread, conn) < PT_EXITED &&
1653 conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE) {
1654 PT_MQTT_WAIT_SEND();
1655 }
1656 }
1657 }
1658 if(ev == mqtt_do_disconnect_mqtt_event) {
1659 conn = data;
1660 DBG("MQTT - Got mqtt_do_disconnect_mqtt_event!\n");
1661
1662 /* Send MQTT Disconnect if we are connected */
1663 if(conn->state == MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT) {
1664 if(conn->out_buffer_sent == 1) {
1665 PT_INIT(&conn->out_proto_thread);
1666 while(conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE &&
1667 disconnect_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1668 PT_MQTT_WAIT_SEND();
1669 }
1670 abort_connection(conn);
1671 call_event(conn, MQTT_EVENT_DISCONNECTED, &ev);
1672 } else {
1673 process_post(&mqtt_process, mqtt_do_disconnect_mqtt_event, conn);
1674 }
1675 }
1676 }
1677 if(ev == mqtt_do_pingreq_event) {
1678 conn = data;
1679 DBG("MQTT - Got mqtt_do_pingreq_event!\n");
1680
1681 if(conn->out_buffer_sent == 1 &&
1682 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1683 PT_INIT(&conn->out_proto_thread);
1684 while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1685 pingreq_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1686 PT_MQTT_WAIT_SEND();
1687 }
1688 }
1689 }
1690 if(ev == mqtt_do_subscribe_event) {
1691 conn = data;
1692 DBG("MQTT - Got mqtt_do_subscribe_mqtt_event!\n");
1693
1694 if(conn->out_buffer_sent == 1 &&
1695 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1696 PT_INIT(&conn->out_proto_thread);
1697 while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1698 subscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1699 PT_MQTT_WAIT_SEND();
1700 }
1701 }
1702 }
1703 if(ev == mqtt_do_unsubscribe_event) {
1704 conn = data;
1705 DBG("MQTT - Got mqtt_do_unsubscribe_mqtt_event!\n");
1706
1707 if(conn->out_buffer_sent == 1 &&
1708 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1709 PT_INIT(&conn->out_proto_thread);
1710 while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1711 unsubscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1712 PT_MQTT_WAIT_SEND();
1713 }
1714 }
1715 }
1716 if(ev == mqtt_do_publish_event) {
1717 conn = data;
1718 DBG("MQTT - Got mqtt_do_publish_mqtt_event!\n");
1719
1720 if(conn->out_buffer_sent == 1 &&
1721 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1722 PT_INIT(&conn->out_proto_thread);
1723 while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1724 publish_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1725 PT_MQTT_WAIT_SEND();
1726 }
1727 }
1728 }
1729#if MQTT_5
1730 if(ev == mqtt_do_auth_event) {
1731 conn = data;
1732 DBG("MQTT - Got mqtt_do_auth_event!\n");
1733
1734 if(conn->out_buffer_sent == 1) {
1735 PT_INIT(&conn->out_proto_thread);
1736 while(auth_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1737 PT_MQTT_WAIT_SEND();
1738 }
1739 }
1740 }
1741 /* clear output properties; the next message sent should overwrite them */
1742 conn->out_props = NULL;
1743#endif
1744 }
1745 PROCESS_END();
1746}
1747/*---------------------------------------------------------------------------*/
1748void
1749mqtt_init(void)
1750{
1751 static uint8_t inited = 0;
1752 if(!inited) {
1753 mqtt_do_connect_tcp_event = process_alloc_event();
1754 mqtt_event_min = mqtt_do_connect_tcp_event;
1755
1756 mqtt_do_connect_mqtt_event = process_alloc_event();
1757 mqtt_do_disconnect_mqtt_event = process_alloc_event();
1758 mqtt_do_subscribe_event = process_alloc_event();
1759 mqtt_do_unsubscribe_event = process_alloc_event();
1760 mqtt_do_publish_event = process_alloc_event();
1761 mqtt_do_pingreq_event = process_alloc_event();
1762 mqtt_update_event = process_alloc_event();
1763 mqtt_abort_now_event = process_alloc_event();
1764 mqtt_event_max = mqtt_abort_now_event;
1765
1766 mqtt_continue_send_event = process_alloc_event();
1767 mqtt_do_auth_event = process_alloc_event();
1768
1769 list_init(mqtt_conn_list);
1770
1771 process_start(&mqtt_process, NULL);
1772 inited = 1;
1773 }
1774}
1775/*---------------------------------------------------------------------------*/
1776mqtt_status_t
1777mqtt_register(struct mqtt_connection *conn, struct process *app_process,
1778 char *client_id, mqtt_event_callback_t event_callback,
1779 uint16_t max_segment_size)
1780{
1781#if MQTT_31 || !MQTT_SRV_SUPPORTS_EMPTY_CLIENT_ID
1782 if(strlen(client_id) < 1) {
1783 return MQTT_STATUS_INVALID_ARGS_ERROR;
1784 }
1785#endif
1786
1787 /* Set defaults - Set all to zero to begin with */
1788 memset(conn, 0, sizeof(struct mqtt_connection));
1789#if MQTT_5
1790 /* Server capabilities have non-zero defaults */
1791 conn->srv_feature_en = -1;
1792#endif
1793 string_to_mqtt_string(&conn->client_id, client_id);
1794 conn->event_callback = event_callback;
1795 conn->app_process = app_process;
1796 conn->auto_reconnect = 1;
1797 conn->max_segment_size = max_segment_size;
1798
1799 reset_defaults(conn);
1800
1801 mqtt_init();
1802
1803 list_add(mqtt_conn_list, conn);
1804
1805 DBG("MQTT - Registered successfully\n");
1806
1807 return MQTT_STATUS_OK;
1808}
1809/*---------------------------------------------------------------------------*/
1810/*
1811 * Connect to MQTT broker.
1812 *
1813 * N.B. Non-blocking call.
1814 */
1815mqtt_status_t
1816mqtt_connect(struct mqtt_connection *conn, char *host, uint16_t port,
1817 uint16_t keep_alive,
1818#if MQTT_5
1819 uint8_t clean_session,
1820 struct mqtt_prop_list *prop_list)
1821#else
1822 uint8_t clean_session)
1823#endif
1824{
1825 uip_ip6addr_t ip6addr;
1826 uip_ipaddr_t *ipaddr;
1827 ipaddr = &ip6addr;
1828
1829 /* Check if we are already trying to connect */
1830 if(conn->state > MQTT_CONN_STATE_NOT_CONNECTED) {
1831 return MQTT_STATUS_OK;
1832 }
1833
1834 conn->server_host = host;
1835 conn->keep_alive = keep_alive;
1836 conn->server_port = port;
1837 conn->out_buffer_ptr = conn->out_buffer;
1838 conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1839
1840 /* If the Client supplies a zero-byte ClientId, the Client MUST also set CleanSession to 1 */
1841 if(clean_session || (conn->client_id.length == 0)) {
1842 conn->connect_vhdr_flags |= MQTT_VHDR_CLEAN_SESSION_FLAG;
1843 }
1844
1845 /* convert the string IPv6 address to a numeric IPv6 address */
1846 if(uiplib_ip6addrconv(host, &ip6addr) == 0) {
1847 return MQTT_STATUS_ERROR;
1848 }
1849
1850 uip_ipaddr_copy(&(conn->server_ip), ipaddr);
1851
1852 /*
1853 * Initiate the connection if the IP could be resolved. Otherwise the
1854 * connection will be initiated when the DNS lookup is finished, in the main
1855 * event loop.
1856 */
1857#if MQTT_5
1858 conn->out_props = prop_list;
1859#endif
1860
1861 process_post(&mqtt_process, mqtt_do_connect_tcp_event, conn);
1862
1863 return MQTT_STATUS_OK;
1864}
1865/*----------------------------------------------------------------------------*/
1866void
1867#if MQTT_5
1868mqtt_disconnect(struct mqtt_connection *conn,
1869 struct mqtt_prop_list *prop_list)
1870#else
1871mqtt_disconnect(struct mqtt_connection *conn)
1872#endif
1873{
1874 if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1875 return;
1876 }
1877
1878 conn->state = MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT;
1879
1880#if MQTT_5
1881 conn->out_props = prop_list;
1882#endif
1883
1884 process_post(&mqtt_process, mqtt_do_disconnect_mqtt_event, conn);
1885}
1886/*----------------------------------------------------------------------------*/
1887mqtt_status_t
1888mqtt_subscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic,
1889#if MQTT_5
1890 mqtt_qos_level_t qos_level,
1891 mqtt_nl_en_t nl, mqtt_rap_en_t rap,
1892 mqtt_retain_handling_t ret_handling,
1893 struct mqtt_prop_list *prop_list)
1894#else
1895 mqtt_qos_level_t qos_level)
1896#endif
1897{
1898 if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1899 return MQTT_STATUS_NOT_CONNECTED_ERROR;
1900 }
1901
1902 DBG("MQTT - Call to mqtt_subscribe...\n");
1903
1904 /* Currently don't have a queue, so only one item at a time */
1905 if(conn->out_queue_full) {
1906 DBG("MQTT - Not accepted!\n");
1907 return MQTT_STATUS_OUT_QUEUE_FULL;
1908 }
1909 conn->out_queue_full = 1;
1910 DBG("MQTT - Accepted!\n");
1911
1912 conn->out_packet.mid = INCREMENT_MID(conn);
1913 conn->out_packet.topic = topic;
1914 conn->out_packet.topic_length = strlen(topic);
1915 conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1916
1917 if(mid) {
1918 *mid = conn->out_packet.mid;
1919 }
1920
1921#if MQTT_5
1922 conn->out_packet.sub_options = 0x00;
1923 conn->out_packet.sub_options |= qos_level & MQTT_SUB_OPTION_QOS;
1924 conn->out_packet.sub_options |= nl & MQTT_SUB_OPTION_NL;
1925 conn->out_packet.sub_options |= rap & MQTT_SUB_OPTION_RAP;
1926 conn->out_packet.sub_options |= ret_handling & MQTT_SUB_OPTION_RETAIN_HANDLING;
1927#else
1928 conn->out_packet.qos = qos_level;
1929#endif
1930
1931#if MQTT_5
1932 conn->out_props = prop_list;
1933#endif
1934
1935 process_post(&mqtt_process, mqtt_do_subscribe_event, conn);
1936 return MQTT_STATUS_OK;
1937}
1938/*----------------------------------------------------------------------------*/
1939mqtt_status_t
1940mqtt_unsubscribe(struct mqtt_connection *conn, uint16_t *mid,
1941#if MQTT_5
1942 char *topic,
1943 struct mqtt_prop_list *prop_list)
1944#else
1945 char *topic)
1946#endif
1947{
1948 if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1949 return MQTT_STATUS_NOT_CONNECTED_ERROR;
1950 }
1951
1952 DBG("MQTT - Call to mqtt_unsubscribe...\n");
1953 /* Currently don't have a queue, so only one item at a time */
1954 if(conn->out_queue_full) {
1955 DBG("MQTT - Not accepted!\n");
1956 return MQTT_STATUS_OUT_QUEUE_FULL;
1957 }
1958 conn->out_queue_full = 1;
1959 DBG("MQTT - Accepted!\n");
1960
1961 conn->out_packet.mid = INCREMENT_MID(conn);
1962 conn->out_packet.topic = topic;
1963 conn->out_packet.topic_length = strlen(topic);
1964 conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1965
1966 if(mid) {
1967 *mid = conn->out_packet.mid;
1968 }
1969
1970#if MQTT_5
1971 conn->out_props = prop_list;
1972#endif
1973
1974 process_post(&mqtt_process, mqtt_do_unsubscribe_event, conn);
1975 return MQTT_STATUS_OK;
1976}
1977/*----------------------------------------------------------------------------*/
1978mqtt_status_t
1979mqtt_publish(struct mqtt_connection *conn, uint16_t *mid, char *topic,
1980 uint8_t *payload, uint32_t payload_size,
1981 mqtt_qos_level_t qos_level,
1982#if MQTT_5
1983 mqtt_retain_t retain,
1984 uint8_t topic_alias, mqtt_topic_alias_en_t topic_alias_en,
1985 struct mqtt_prop_list *prop_list)
1986#else
1987 mqtt_retain_t retain)
1988#endif
1989{
1990 if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1991 return MQTT_STATUS_NOT_CONNECTED_ERROR;
1992 }
1993
1994 DBG("MQTT - Call to mqtt_publish...\n");
1995
1996 /* Currently don't have a queue, so only one item at a time */
1997 if(conn->out_queue_full) {
1998 DBG("MQTT - Not accepted!\n");
1999 return MQTT_STATUS_OUT_QUEUE_FULL;
2000 }
2001 conn->out_queue_full = 1;
2002 DBG("MQTT - Accepted!\n");
2003
2004 conn->out_packet.mid = INCREMENT_MID(conn);
2005 conn->out_packet.retain = retain;
2006#if MQTT_5
2007 if(topic_alias_en == MQTT_TOPIC_ALIAS_ON) {
2008 conn->out_packet.topic = "";
2009 conn->out_packet.topic_length = 0;
2010 conn->out_packet.topic_alias = topic_alias;
2011 if(topic_alias == 0) {
2012 DBG("MQTT - Error, a topic alias of 0 is not permitted! It won't be sent.\n");
2013 }
2014 } else {
2015 conn->out_packet.topic = topic;
2016 conn->out_packet.topic_length = strlen(topic);
2017 conn->out_packet.topic_alias = 0;
2018 }
2019#else
2020 conn->out_packet.topic = topic;
2021 conn->out_packet.topic_length = strlen(topic);
2022#endif
2023 conn->out_packet.payload = payload;
2024 conn->out_packet.payload_size = payload_size;
2025 conn->out_packet.qos = qos_level;
2026 conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
2027
2028 if(mid) {
2029 *mid = conn->out_packet.mid;
2030 }
2031
2032#if MQTT_5
2033 conn->out_props = prop_list;
2034#endif
2035
2036 process_post(&mqtt_process, mqtt_do_publish_event, conn);
2037 return MQTT_STATUS_OK;
2038}
2039/*----------------------------------------------------------------------------*/
2040void
2041mqtt_set_username_password(struct mqtt_connection *conn, char *username,
2042 char *password)
2043{
2044 /* Set strings, NULL string will simply set length to zero */
2045 string_to_mqtt_string(&conn->credentials.username, username);
2046 string_to_mqtt_string(&conn->credentials.password, password);
2047
2048 /* Set CONNECT VHDR flags */
2049 if(username != NULL) {
2050 conn->connect_vhdr_flags |= MQTT_VHDR_USERNAME_FLAG;
2051 } else {
2052 conn->connect_vhdr_flags &= ~MQTT_VHDR_USERNAME_FLAG;
2053 }
2054 if(password != NULL) {
2055 conn->connect_vhdr_flags |= MQTT_VHDR_PASSWORD_FLAG;
2056 } else {
2057 conn->connect_vhdr_flags &= ~MQTT_VHDR_PASSWORD_FLAG;
2058 }
2059}
2060/*----------------------------------------------------------------------------*/
2061void
2062mqtt_set_last_will(struct mqtt_connection *conn, char *topic, char *message,
2063#if MQTT_5
2064 mqtt_qos_level_t qos, struct mqtt_prop_list *will_props)
2065#else
2066 mqtt_qos_level_t qos)
2067#endif
2068{
2069 /* Set strings, NULL string will simply set length to zero */
2070 string_to_mqtt_string(&conn->will.topic, topic);
2071 string_to_mqtt_string(&conn->will.message, message);
2072
2073 /* Currently not used! */
2074 conn->will.qos = qos;
2075
2076 if(topic != NULL) {
2077 conn->connect_vhdr_flags |= MQTT_VHDR_WILL_FLAG |
2078 MQTT_VHDR_WILL_RETAIN_FLAG;
2079 }
2080#if MQTT_5
2081 conn->will.properties = (list_t)will_props;
2082#endif
2083}
2084/*---------------------------------------------------------------------------*/
2085#if MQTT_5
2086/*----------------------------------------------------------------------------*/
2087/* MQTTv5-specific functions */
2088/*----------------------------------------------------------------------------*/
2089/*
2090 * Send authentication data to broker.
2091 *
2092 * N.B. Non-blocking call.
2093 */
2094mqtt_status_t
2095mqtt_auth(struct mqtt_connection *conn,
2096 mqtt_auth_type_t auth_type,
2097 struct mqtt_prop_list *prop_list)
2098{
2099 DBG("MQTT - Call to mqtt_auth...\n");
2100
2101 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_AUTH;
2102 conn->out_packet.remaining_length = 1; /* for the auth reason code */
2103 conn->out_packet.auth_reason_code = MQTT_VHDR_RC_CONTINUE_AUTH + auth_type;
2104
2105 conn->out_props = prop_list;
2106
2107 process_post(&mqtt_process, mqtt_do_auth_event, conn);
2108 return MQTT_STATUS_OK;
2109}
2110#endif
2111/*----------------------------------------------------------------------------*/
2112/** @} */
Default definitions of C compiler quirk work-arounds.
Header file for the callback timer.
Event timer header file.
#define CLOCK_SECOND
A second, measured in system clock time.
Definition clock.h:103
void ctimer_stop(struct ctimer *c)
Stop a pending callback timer.
Definition ctimer.c:138
static void ctimer_set(struct ctimer *c, clock_time_t t, void(*f)(void *), void *ptr)
Set a callback timer.
Definition ctimer.h:137
void ctimer_restart(struct ctimer *c)
Restart a callback timer from the current point in time.
Definition ctimer.c:126
static void list_init(list_t list)
Initialize a list.
Definition list.h:152
#define LIST(name)
Declare a linked list.
Definition list.h:90
static void * list_item_next(const void *item)
Get the next item following this item.
Definition list.h:294
void list_add(list_t list, void *item)
Add an item at the end of a list.
Definition list.c:71
void ** list_t
The linked list type.
Definition list.h:136
static void * list_head(const_list_t list)
Get a pointer to the first element of a list.
Definition list.h:169
mqtt_status_t mqtt_auth(struct mqtt_connection *conn, mqtt_auth_type_t auth_type, struct mqtt_prop_list *prop_list)
Send authentication message (MQTTv5-only).
Definition mqtt.c:2095
mqtt_status_t mqtt_connect(struct mqtt_connection *conn, char *host, uint16_t port, uint16_t keep_alive, uint8_t clean_session, struct mqtt_prop_list *prop_list)
Connects to a MQTT broker.
Definition mqtt.c:1816
mqtt_status_t mqtt_register(struct mqtt_connection *conn, struct process *app_process, char *client_id, mqtt_event_callback_t event_callback, uint16_t max_segment_size)
Initializes the MQTT engine.
Definition mqtt.c:1777
mqtt_status_t mqtt_unsubscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic, struct mqtt_prop_list *prop_list)
Unsubscribes from a MQTT topic.
Definition mqtt.c:1940
void(* mqtt_event_callback_t)(struct mqtt_connection *m, mqtt_event_t event, void *data)
MQTT event callback function.
Definition mqtt.h:499
mqtt_status_t mqtt_subscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic, mqtt_qos_level_t qos_level, mqtt_nl_en_t nl, mqtt_rap_en_t rap, mqtt_retain_handling_t ret_handling, struct mqtt_prop_list *prop_list)
Subscribes to a MQTT topic.
Definition mqtt.c:1888
void mqtt_disconnect(struct mqtt_connection *conn, struct mqtt_prop_list *prop_list)
Disconnects from a MQTT broker.
Definition mqtt.c:1868
mqtt_status_t mqtt_publish(struct mqtt_connection *conn, uint16_t *mid, char *topic, uint8_t *payload, uint32_t payload_size, mqtt_qos_level_t qos_level, mqtt_retain_t retain, uint8_t topic_alias, mqtt_topic_alias_en_t topic_alias_en, struct mqtt_prop_list *prop_list)
Publish to a MQTT topic.
Definition mqtt.c:1979
mqtt_event_t
MQTT engine events.
Definition mqtt.h:218
void mqtt_set_username_password(struct mqtt_connection *conn, char *username, char *password)
Set the user name and password for a MQTT client.
Definition mqtt.c:2041
void mqtt_set_last_will(struct mqtt_connection *conn, char *topic, char *message, mqtt_qos_level_t qos, struct mqtt_prop_list *will_props)
Set the last will topic and message for a MQTT client.
Definition mqtt.c:2062
#define PROCESS(name, strname)
Declare a process.
Definition process.h:308
#define PROCESS_WAIT_EVENT()
Wait for an event to be posted to the process.
Definition process.h:142
int process_post(struct process *p, process_event_t ev, process_data_t data)
Post an asynchronous event.
Definition process.c:325
process_event_t process_alloc_event(void)
Allocate a global event number.
Definition process.c:111
#define PROCESS_BEGIN()
Define the beginning of a process.
Definition process.h:121
#define PROCESS_END()
Define the end of a process.
Definition process.h:132
void process_start(struct process *p, process_data_t data)
Start a process.
Definition process.c:121
#define PROCESS_THREAD(name, ev, data)
Define the body of a process.
Definition process.h:274
#define PT_BEGIN(pt)
Declare the start of a protothread inside the C function implementing the protothread.
Definition pt.h:280
#define PT_THREAD(name_args)
Declaration of a protothread.
Definition pt.h:265
#define PT_END(pt)
Declare the end of a protothread.
Definition pt.h:292
#define PT_EXIT(pt)
Exit the protothread.
Definition pt.h:411
#define PT_WAIT_UNTIL(pt, condition)
Block and wait until condition is true.
Definition pt.h:313
#define PT_INIT(pt)
Initialize a protothread.
Definition pt.h:245
void timer_set(struct timer *t, clock_time_t interval)
Set a timer.
Definition timer.c:64
bool timer_expired(struct timer *t)
Check if a timer has expired.
Definition timer.c:123
#define uip_ipaddr_copy(dest, src)
Copy an IP address from one place to another.
Definition uip.h:969
Header file for the LED HAL.
Linked list manipulation routines.
Header file for the Contiki MQTT engine.
Protothreads implementation.
Header file for IPv6-related data structures.
static uip_ipaddr_t ipaddr
Pointer to prefix information option in uip_buf.
Definition uip-nd6.c:116
Header file for the uIP TCP/IP stack.