Contiki-NG
mqtt.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015, Texas Instruments Incorporated - http://www.ti.com/
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  * 3. Neither the name of the copyright holder nor the names of its
14  * contributors may be used to endorse or promote products derived
15  * from this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
20  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
21  * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
22  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
26  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
28  * OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30 /*---------------------------------------------------------------------------*/
31 /**
32  * \addtogroup mqtt-engine
33  * @{
34  */
35 /**
36  * \file
37  * Implementation of the Contiki MQTT engine
38  *
39  * \author
40  * Texas Instruments
41  */
42 /*---------------------------------------------------------------------------*/
43 #include "mqtt.h"
44 #include "contiki.h"
45 #include "contiki-net.h"
46 #include "contiki-lib.h"
47 #include "lib/random.h"
48 #include "sys/ctimer.h"
49 #include "sys/etimer.h"
50 #include "sys/pt.h"
51 #include "net/ipv6/uip.h"
52 #include "net/ipv6/uip-ds6.h"
53 #include "dev/leds.h"
54 
55 #include "tcp-socket.h"
56 
57 #include "lib/assert.h"
58 #include "lib/list.h"
59 #include "sys/cc.h"
60 
61 #include <stdlib.h>
62 #include <stdio.h>
63 #include <string.h>
64 /*---------------------------------------------------------------------------*/
65 #define DEBUG 0
66 #if DEBUG
67 #define PRINTF(...) printf(__VA_ARGS__)
68 #else
69 #define PRINTF(...)
70 #endif
71 /*---------------------------------------------------------------------------*/
72 typedef enum {
73  MQTT_FHDR_MSG_TYPE_CONNECT = 0x10,
74  MQTT_FHDR_MSG_TYPE_CONNACK = 0x20,
75  MQTT_FHDR_MSG_TYPE_PUBLISH = 0x30,
76  MQTT_FHDR_MSG_TYPE_PUBACK = 0x40,
77  MQTT_FHDR_MSG_TYPE_PUBREC = 0x50,
78  MQTT_FHDR_MSG_TYPE_PUBREL = 0x60,
79  MQTT_FHDR_MSG_TYPE_PUBCOMP = 0x70,
80  MQTT_FHDR_MSG_TYPE_SUBSCRIBE = 0x80,
81  MQTT_FHDR_MSG_TYPE_SUBACK = 0x90,
82  MQTT_FHDR_MSG_TYPE_UNSUBSCRIBE = 0xA0,
83  MQTT_FHDR_MSG_TYPE_UNSUBACK = 0xB0,
84  MQTT_FHDR_MSG_TYPE_PINGREQ = 0xC0,
85  MQTT_FHDR_MSG_TYPE_PINGRESP = 0xD0,
86  MQTT_FHDR_MSG_TYPE_DISCONNECT = 0xE0,
87 
88  MQTT_FHDR_DUP_FLAG = 0x08,
89 
90  MQTT_FHDR_QOS_LEVEL_0 = 0x00,
91  MQTT_FHDR_QOS_LEVEL_1 = 0x02,
92  MQTT_FHDR_QOS_LEVEL_2 = 0x04,
93 
94  MQTT_FHDR_RETAIN_FLAG = 0x01,
95 } mqtt_fhdr_fields_t;
96 /*---------------------------------------------------------------------------*/
97 typedef enum {
98  MQTT_VHDR_USERNAME_FLAG = 0x80,
99  MQTT_VHDR_PASSWORD_FLAG = 0x40,
100 
101  MQTT_VHDR_WILL_RETAIN_FLAG = 0x20,
102  MQTT_VHDR_WILL_QOS_LEVEL_0 = 0x00,
103  MQTT_VHDR_WILL_QOS_LEVEL_1 = 0x08,
104  MQTT_VHDR_WILL_QOS_LEVEL_2 = 0x10,
105 
106  MQTT_VHDR_WILL_FLAG = 0x04,
107  MQTT_VHDR_CLEAN_SESSION_FLAG = 0x02,
108 } mqtt_vhdr_conn_fields_t;
109 /*---------------------------------------------------------------------------*/
110 typedef enum {
111  MQTT_VHDR_CONN_ACCEPTED,
112  MQTT_VHDR_CONN_REJECTED_PROTOCOL,
113  MQTT_VHDR_CONN_REJECTED_IDENTIFIER,
114  MQTT_VHDR_CONN_REJECTED_UNAVAILABLE,
115  MQTT_VHDR_CONN_REJECTED_BAD_USER_PASS,
116  MQTT_VHDR_CONN_REJECTED_UNAUTHORIZED,
117 } mqtt_vhdr_connack_ret_code_t;
118 
119 typedef enum {
120  MQTT_VHDR_CONNACK_SESSION_PRESENT = 0x1
121 } mqtt_vhdr_connack_flags_t;
122 
123 /*---------------------------------------------------------------------------*/
124 #if MQTT_311
125 typedef enum {
126  MQTT_SUBACK_RET_QOS_0 = 0x00,
127  MQTT_SUBACK_RET_QOS_1 = 0x01,
128  MQTT_SUBACK_RET_QOS_2 = 0x02,
129  MQTT_SUBACK_RET_FAIL = 0x08,
130 } mqtt_suback_ret_code_t;
131 #endif
132 /*---------------------------------------------------------------------------*/
133 #if MQTT_31
134 /* Len MSB(0)
135  * Len LSB(6)
136  * 'M'
137  * 'Q'
138  * 'I'
139  * 's'
140  * 'd'
141  * 'p'
142  * Protocol Level (3)
143  * Connect Flags
144  * Keep Alive MSB
145  * Keep Alive LSB
146  */
147 #define MQTT_CONNECT_VHDR_SIZE 12
148 #else
149 /* Len MSB(0)
150  * Len LSB(4)
151  * 'M'
152  * 'Q'
153  * 'T'
154  * 'T'
155  * Protocol Level (4)
156  * Connect Flags
157  * Keep Alive MSB
158  * Keep Alive LSB
159  */
160 #define MQTT_CONNECT_VHDR_SIZE 10
161 #endif
162 
163 #define MQTT_STRING_LEN_SIZE 2
164 #define MQTT_MID_SIZE 2
165 #define MQTT_QOS_SIZE 1
166 /*---------------------------------------------------------------------------*/
167 #define RESPONSE_WAIT_TIMEOUT (CLOCK_SECOND * 10)
168 /*---------------------------------------------------------------------------*/
169 #define INCREMENT_MID(conn) (conn)->mid_counter += 2
170 #define MQTT_STRING_LENGTH(s) (((s)->length) == 0 ? 0 : (MQTT_STRING_LEN_SIZE + (s)->length))
171 /*---------------------------------------------------------------------------*/
172 /* Protothread send macros */
173 #define PT_MQTT_WRITE_BYTES(conn, data, len) \
174  conn->out_write_pos = 0; \
175  while(write_bytes(conn, data, len)) { \
176  PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \
177  }
178 
179 #define PT_MQTT_WRITE_BYTE(conn, data) \
180  while(write_byte(conn, data)) { \
181  PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \
182  }
183 /*---------------------------------------------------------------------------*/
184 /*
185  * Sends the continue send event and wait for that event.
186  *
187  * The reason we cannot use PROCESS_PAUSE() is since we would risk loosing any
188  * events posted during the sending process.
189  */
190 #define PT_MQTT_WAIT_SEND() \
191  do { \
192  if (PROCESS_ERR_OK == \
193  process_post(PROCESS_CURRENT(), mqtt_continue_send_event, NULL)) { \
194  do { \
195  PROCESS_WAIT_EVENT(); \
196  if(ev == mqtt_abort_now_event) { \
197  conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE; \
198  PT_INIT(&conn->out_proto_thread); \
199  process_post(PROCESS_CURRENT(), ev, data); \
200  } else if(ev >= mqtt_event_min && ev <= mqtt_event_max) { \
201  process_post(PROCESS_CURRENT(), ev, data); \
202  } \
203  } while (ev != mqtt_continue_send_event); \
204  } \
205  } while(0)
206 /*---------------------------------------------------------------------------*/
207 static process_event_t mqtt_do_connect_tcp_event;
208 static process_event_t mqtt_do_connect_mqtt_event;
209 static process_event_t mqtt_do_disconnect_mqtt_event;
210 static process_event_t mqtt_do_subscribe_event;
211 static process_event_t mqtt_do_unsubscribe_event;
212 static process_event_t mqtt_do_publish_event;
213 static process_event_t mqtt_do_pingreq_event;
214 static process_event_t mqtt_continue_send_event;
215 static process_event_t mqtt_abort_now_event;
216 process_event_t mqtt_update_event;
217 
218 /*
219  * Min and Max event numbers we want to acknowledge while we're in the process
220  * of doing something else. continue_send does not count, therefore must be
221  * allocated last
222  */
223 static process_event_t mqtt_event_min;
224 static process_event_t mqtt_event_max;
225 /*---------------------------------------------------------------------------*/
226 /* Prototypes */
227 static int
228 tcp_input(struct tcp_socket *s, void *ptr, const uint8_t *input_data_ptr,
229  int input_data_len);
230 
231 static void tcp_event(struct tcp_socket *s, void *ptr,
232  tcp_socket_event_t event);
233 
234 static void reset_packet(struct mqtt_in_packet *packet);
235 /*---------------------------------------------------------------------------*/
236 LIST(mqtt_conn_list);
237 /*---------------------------------------------------------------------------*/
238 PROCESS(mqtt_process, "MQTT process");
239 /*---------------------------------------------------------------------------*/
240 static void
241 call_event(struct mqtt_connection *conn,
242  mqtt_event_t event,
243  void *data)
244 {
245  conn->event_callback(conn, event, data);
246  process_post(conn->app_process, mqtt_update_event, NULL);
247 }
248 /*---------------------------------------------------------------------------*/
249 static void
250 reset_defaults(struct mqtt_connection *conn)
251 {
252  conn->mid_counter = 1;
253  PT_INIT(&conn->out_proto_thread);
254  conn->waiting_for_pingresp = 0;
255 
256  reset_packet(&conn->in_packet);
257  conn->out_buffer_sent = 0;
258 }
259 /*---------------------------------------------------------------------------*/
260 static void
261 abort_connection(struct mqtt_connection *conn)
262 {
263  conn->out_buffer_ptr = conn->out_buffer;
264  conn->out_queue_full = 0;
265 
266  /* Reset outgoing packet */
267  memset(&conn->out_packet, 0, sizeof(conn->out_packet));
268 
269  tcp_socket_close(&conn->socket);
270  tcp_socket_unregister(&conn->socket);
271 
272  memset(&conn->socket, 0, sizeof(conn->socket));
273 
274  conn->state = MQTT_CONN_STATE_NOT_CONNECTED;
275 }
276 /*---------------------------------------------------------------------------*/
277 static void
278 connect_tcp(struct mqtt_connection *conn)
279 {
280  conn->state = MQTT_CONN_STATE_TCP_CONNECTING;
281 
282  reset_defaults(conn);
283  tcp_socket_register(&(conn->socket),
284  conn,
285  conn->in_buffer,
286  MQTT_TCP_INPUT_BUFF_SIZE,
287  conn->out_buffer,
288  MQTT_TCP_OUTPUT_BUFF_SIZE,
289  tcp_input,
290  tcp_event);
291  tcp_socket_connect(&(conn->socket), &(conn->server_ip), conn->server_port);
292 }
293 /*---------------------------------------------------------------------------*/
294 static void
295 disconnect_tcp(struct mqtt_connection *conn)
296 {
297  conn->state = MQTT_CONN_STATE_DISCONNECTING;
298  tcp_socket_close(&(conn->socket));
299  tcp_socket_unregister(&conn->socket);
300 
301  memset(&conn->socket, 0, sizeof(conn->socket));
302 }
303 /*---------------------------------------------------------------------------*/
304 static void
305 send_out_buffer(struct mqtt_connection *conn)
306 {
307  if(conn->out_buffer_ptr - conn->out_buffer == 0) {
308  conn->out_buffer_sent = 1;
309  return;
310  }
311  conn->out_buffer_sent = 0;
312 
313  DBG("MQTT - (send_out_buffer) Space used in buffer: %i\n",
314  conn->out_buffer_ptr - conn->out_buffer);
315 
316  tcp_socket_send(&conn->socket, conn->out_buffer,
317  conn->out_buffer_ptr - conn->out_buffer);
318 }
319 /*---------------------------------------------------------------------------*/
320 static void
321 string_to_mqtt_string(struct mqtt_string *mqtt_string, char *string)
322 {
323  if(mqtt_string == NULL) {
324  return;
325  }
326  mqtt_string->string = string;
327 
328  if(string != NULL) {
329  mqtt_string->length = strlen(string);
330  } else {
331  mqtt_string->length = 0;
332  }
333 }
334 /*---------------------------------------------------------------------------*/
335 static int
336 write_byte(struct mqtt_connection *conn, uint8_t data)
337 {
338  DBG("MQTT - (write_byte) buff_size: %i write: '%02X'\n",
339  &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr,
340  data);
341 
342  if(&conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr == 0) {
343  send_out_buffer(conn);
344  return 1;
345  }
346 
347  *conn->out_buffer_ptr = data;
348  conn->out_buffer_ptr++;
349  return 0;
350 }
351 /*---------------------------------------------------------------------------*/
352 static int
353 write_bytes(struct mqtt_connection *conn, uint8_t *data, uint16_t len)
354 {
355  uint16_t write_bytes;
356  write_bytes =
357  MIN(&conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr,
358  len - conn->out_write_pos);
359 
360  memcpy(conn->out_buffer_ptr, &data[conn->out_write_pos], write_bytes);
361  conn->out_write_pos += write_bytes;
362  conn->out_buffer_ptr += write_bytes;
363 
364  DBG("MQTT - (write_bytes) len: %u write_pos: %lu\n", len,
365  conn->out_write_pos);
366 
367  if(len - conn->out_write_pos == 0) {
368  conn->out_write_pos = 0;
369  return 0;
370  } else {
371  send_out_buffer(conn);
372  return len - conn->out_write_pos;
373  }
374 }
375 /*---------------------------------------------------------------------------*/
376 static void
377 encode_remaining_length(uint8_t *remaining_length,
378  uint8_t *remaining_length_bytes,
379  uint32_t length)
380 {
381  uint8_t digit;
382 
383  DBG("MQTT - Encoding length %lu\n", length);
384 
385  *remaining_length_bytes = 0;
386  do {
387  digit = length % 128;
388  length = length / 128;
389  if(length > 0) {
390  digit = digit | 0x80;
391  }
392 
393  remaining_length[*remaining_length_bytes] = digit;
394  (*remaining_length_bytes)++;
395  DBG("MQTT - Encode len digit '%u' length '%lu'\n", digit, length);
396  } while(length > 0 && *remaining_length_bytes < 5);
397  DBG("MQTT - remaining_length_bytes %u\n", *remaining_length_bytes);
398 }
399 /*---------------------------------------------------------------------------*/
400 static void
401 keep_alive_callback(void *ptr)
402 {
403  struct mqtt_connection *conn = ptr;
404 
405  DBG("MQTT - (keep_alive_callback) Called!\n");
406 
407  /* The flag is set when the PINGREQ has been sent */
408  if(conn->waiting_for_pingresp) {
409  PRINTF("MQTT - Disconnect due to no PINGRESP from broker.\n");
410  disconnect_tcp(conn);
411  return;
412  }
413 
414  process_post(&mqtt_process, mqtt_do_pingreq_event, conn);
415 }
416 /*---------------------------------------------------------------------------*/
417 static void
418 reset_packet(struct mqtt_in_packet *packet)
419 {
420  memset(packet, 0, sizeof(struct mqtt_in_packet));
421  packet->remaining_multiplier = 1;
422 }
423 /*---------------------------------------------------------------------------*/
424 static
425 PT_THREAD(connect_pt(struct pt *pt, struct mqtt_connection *conn))
426 {
427  PT_BEGIN(pt);
428 
429  DBG("MQTT - Sending CONNECT message...\n");
430 
431  /* Set up FHDR */
432  conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_CONNECT;
433  conn->out_packet.remaining_length = 0;
434  conn->out_packet.remaining_length += MQTT_CONNECT_VHDR_SIZE;
435  conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->client_id);
436  conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->credentials.username);
437  conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->credentials.password);
438  conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->will.topic);
439  conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->will.message);
440  encode_remaining_length(conn->out_packet.remaining_length_enc,
441  &conn->out_packet.remaining_length_enc_bytes,
442  conn->out_packet.remaining_length);
443  if(conn->out_packet.remaining_length_enc_bytes > 4) {
444  call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
445  PRINTF("MQTT - Error, remaining length > 4 bytes\n");
446  PT_EXIT(pt);
447  }
448 
449  /* Write Fixed Header */
450  PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
451  PT_MQTT_WRITE_BYTES(conn,
452  conn->out_packet.remaining_length_enc,
453  conn->out_packet.remaining_length_enc_bytes);
454  PT_MQTT_WRITE_BYTE(conn, 0);
455  PT_MQTT_WRITE_BYTE(conn, strlen(MQTT_PROTOCOL_NAME));
456  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)MQTT_PROTOCOL_NAME, strlen(MQTT_PROTOCOL_NAME));
457  PT_MQTT_WRITE_BYTE(conn, MQTT_PROTOCOL_VERSION);
458  PT_MQTT_WRITE_BYTE(conn, conn->connect_vhdr_flags);
459  PT_MQTT_WRITE_BYTE(conn, (conn->keep_alive >> 8));
460  PT_MQTT_WRITE_BYTE(conn, (conn->keep_alive & 0x00FF));
461  PT_MQTT_WRITE_BYTE(conn, conn->client_id.length >> 8);
462  PT_MQTT_WRITE_BYTE(conn, conn->client_id.length & 0x00FF);
463  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->client_id.string,
464  conn->client_id.length);
465  if(conn->connect_vhdr_flags & MQTT_VHDR_WILL_FLAG) {
466  PT_MQTT_WRITE_BYTE(conn, conn->will.topic.length >> 8);
467  PT_MQTT_WRITE_BYTE(conn, conn->will.topic.length & 0x00FF);
468  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->will.topic.string,
469  conn->will.topic.length);
470  PT_MQTT_WRITE_BYTE(conn, conn->will.message.length >> 8);
471  PT_MQTT_WRITE_BYTE(conn, conn->will.message.length & 0x00FF);
472  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->will.message.string,
473  conn->will.message.length);
474  DBG("MQTT - Setting will topic to '%s' %u bytes and message to '%s' %u bytes\n",
475  conn->will.topic.string,
476  conn->will.topic.length,
477  conn->will.message.string,
478  conn->will.message.length);
479  }
480  if(conn->connect_vhdr_flags & MQTT_VHDR_USERNAME_FLAG) {
481  PT_MQTT_WRITE_BYTE(conn, conn->credentials.username.length >> 8);
482  PT_MQTT_WRITE_BYTE(conn, conn->credentials.username.length & 0x00FF);
483  PT_MQTT_WRITE_BYTES(conn,
484  (uint8_t *)conn->credentials.username.string,
485  conn->credentials.username.length);
486  }
487  if(conn->connect_vhdr_flags & MQTT_VHDR_PASSWORD_FLAG) {
488  PT_MQTT_WRITE_BYTE(conn, conn->credentials.password.length >> 8);
489  PT_MQTT_WRITE_BYTE(conn, conn->credentials.password.length & 0x00FF);
490  PT_MQTT_WRITE_BYTES(conn,
491  (uint8_t *)conn->credentials.password.string,
492  conn->credentials.password.length);
493  }
494 
495  /* Send out buffer */
496  send_out_buffer(conn);
497  conn->state = MQTT_CONN_STATE_CONNECTING_TO_BROKER;
498 
499  timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
500 
501  /* Wait for CONNACK */
502  reset_packet(&conn->in_packet);
503  PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
504  timer_expired(&conn->t));
505  if(timer_expired(&conn->t)) {
506  DBG("Timeout waiting for CONNACK\n");
507  /* We stick to the letter of the spec here: Tear the connection down */
508  mqtt_disconnect(conn);
509  }
510  reset_packet(&conn->in_packet);
511 
512  DBG("MQTT - Done sending CONNECT\n");
513 
514 #if DEBUG_MQTT == 1
515  DBG("MQTT - CONNECT message sent: \n");
516  uint16_t i;
517  for(i = 0; i < (conn->out_buffer_ptr - conn->out_buffer); i++) {
518  DBG("%02X ", conn->out_buffer[i]);
519  }
520  DBG("\n");
521 #endif
522 
523  PT_END(pt);
524 }
525 /*---------------------------------------------------------------------------*/
526 static
527 PT_THREAD(disconnect_pt(struct pt *pt, struct mqtt_connection *conn))
528 {
529  PT_BEGIN(pt);
530 
531  PT_MQTT_WRITE_BYTE(conn, MQTT_FHDR_MSG_TYPE_DISCONNECT);
532  PT_MQTT_WRITE_BYTE(conn, 0);
533 
534  send_out_buffer(conn);
535 
536  /*
537  * Wait a couple of seconds for a TCP ACK. We don't really need the ACK,
538  * we do want the TCP/IP stack to actually send this disconnect before we
539  * tear down the session.
540  */
541  timer_set(&conn->t, (CLOCK_SECOND * 2));
542  PT_WAIT_UNTIL(pt, conn->out_buffer_sent || timer_expired(&conn->t));
543 
544  PT_END(pt);
545 }
546 /*---------------------------------------------------------------------------*/
547 static
548 PT_THREAD(subscribe_pt(struct pt *pt, struct mqtt_connection *conn))
549 {
550  PT_BEGIN(pt);
551 
552  DBG("MQTT - Sending subscribe message! topic %s topic_length %i\n",
553  conn->out_packet.topic,
554  conn->out_packet.topic_length);
555  DBG("MQTT - Buffer space is %i \n",
556  &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
557 
558  /* Set up FHDR */
559  conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_SUBSCRIBE | MQTT_FHDR_QOS_LEVEL_1;
560  conn->out_packet.remaining_length = MQTT_MID_SIZE +
561  MQTT_STRING_LEN_SIZE +
562  conn->out_packet.topic_length +
563  MQTT_QOS_SIZE;
564  encode_remaining_length(conn->out_packet.remaining_length_enc,
565  &conn->out_packet.remaining_length_enc_bytes,
566  conn->out_packet.remaining_length);
567  if(conn->out_packet.remaining_length_enc_bytes > 4) {
568  call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
569  PRINTF("MQTT - Error, remaining length > 4 bytes\n");
570  PT_EXIT(pt);
571  }
572 
573  /* Write Fixed Header */
574  PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
575  PT_MQTT_WRITE_BYTES(conn,
576  conn->out_packet.remaining_length_enc,
577  conn->out_packet.remaining_length_enc_bytes);
578  /* Write Variable Header */
579  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid >> 8));
580  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
581  /* Write Payload */
582  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
583  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
584  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
585  conn->out_packet.topic_length);
586  PT_MQTT_WRITE_BYTE(conn, conn->out_packet.qos);
587 
588  /* Send out buffer */
589  send_out_buffer(conn);
590  timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
591 
592  /* Wait for SUBACK. */
593  reset_packet(&conn->in_packet);
594  PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
595  timer_expired(&conn->t));
596 
597  if(timer_expired(&conn->t)) {
598  DBG("Timeout waiting for SUBACK\n");
599  }
600  reset_packet(&conn->in_packet);
601 
602  /* This is clear after the entire transaction is complete */
603  conn->out_queue_full = 0;
604 
605  DBG("MQTT - Done in send_subscribe!\n");
606 
607  PT_END(pt);
608 }
609 /*---------------------------------------------------------------------------*/
610 static
611 PT_THREAD(unsubscribe_pt(struct pt *pt, struct mqtt_connection *conn))
612 {
613  PT_BEGIN(pt);
614 
615  DBG("MQTT - Sending unsubscribe message on topic %s topic_length %i\n",
616  conn->out_packet.topic,
617  conn->out_packet.topic_length);
618  DBG("MQTT - Buffer space is %i \n",
619  &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
620 
621  /* Set up FHDR */
622  conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_UNSUBSCRIBE |
623  MQTT_FHDR_QOS_LEVEL_1;
624  conn->out_packet.remaining_length = MQTT_MID_SIZE +
625  MQTT_STRING_LEN_SIZE +
626  conn->out_packet.topic_length;
627  encode_remaining_length(conn->out_packet.remaining_length_enc,
628  &conn->out_packet.remaining_length_enc_bytes,
629  conn->out_packet.remaining_length);
630  if(conn->out_packet.remaining_length_enc_bytes > 4) {
631  call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
632  PRINTF("MQTT - Error, remaining length > 4 bytes\n");
633  PT_EXIT(pt);
634  }
635 
636  /* Write Fixed Header */
637  PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
638  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc,
639  conn->out_packet.remaining_length_enc_bytes);
640  /* Write Variable Header */
641  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid >> 8));
642  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
643  /* Write Payload */
644  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
645  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
646  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
647  conn->out_packet.topic_length);
648 
649  /* Send out buffer */
650  send_out_buffer(conn);
651  timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
652 
653  /* Wait for UNSUBACK */
654  reset_packet(&conn->in_packet);
655  PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
656  timer_expired(&conn->t));
657 
658  if(timer_expired(&conn->t)) {
659  DBG("Timeout waiting for UNSUBACK\n");
660  }
661 
662  reset_packet(&conn->in_packet);
663 
664  /* This is clear after the entire transaction is complete */
665  conn->out_queue_full = 0;
666 
667  DBG("MQTT - Done writing subscribe message to out buffer!\n");
668 
669  PT_END(pt);
670 }
671 /*---------------------------------------------------------------------------*/
672 static
673 PT_THREAD(publish_pt(struct pt *pt, struct mqtt_connection *conn))
674 {
675  PT_BEGIN(pt);
676 
677  DBG("MQTT - Sending publish message! topic %s topic_length %i\n",
678  conn->out_packet.topic,
679  conn->out_packet.topic_length);
680  DBG("MQTT - Buffer space is %i \n",
681  &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
682 
683  /* Set up FHDR */
684  conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_PUBLISH |
685  conn->out_packet.qos << 1;
686  if(conn->out_packet.retain == MQTT_RETAIN_ON) {
687  conn->out_packet.fhdr |= MQTT_FHDR_RETAIN_FLAG;
688  }
689  conn->out_packet.remaining_length = MQTT_STRING_LEN_SIZE +
690  conn->out_packet.topic_length +
691  conn->out_packet.payload_size;
692  if(conn->out_packet.qos > MQTT_QOS_LEVEL_0) {
693  conn->out_packet.remaining_length += MQTT_MID_SIZE;
694  }
695  encode_remaining_length(conn->out_packet.remaining_length_enc,
696  &conn->out_packet.remaining_length_enc_bytes,
697  conn->out_packet.remaining_length);
698  if(conn->out_packet.remaining_length_enc_bytes > 4) {
699  call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
700  PRINTF("MQTT - Error, remaining length > 4 bytes\n");
701  PT_EXIT(pt);
702  }
703 
704  /* The DUP flag MUST be set to 0 for all QoS 0 messages */
705  if(conn->out_packet.qos == MQTT_QOS_LEVEL_0) {
706  conn->out_packet.fhdr &= ~MQTT_FHDR_DUP_FLAG;
707  }
708 
709  /* Write Fixed Header */
710  PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
711  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc,
712  conn->out_packet.remaining_length_enc_bytes);
713  /* Write Variable Header */
714  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
715  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
716  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
717  conn->out_packet.topic_length);
718  if(conn->out_packet.qos > MQTT_QOS_LEVEL_0) {
719  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid >> 8));
720  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
721  }
722 
723  /* Write Payload */
724  PT_MQTT_WRITE_BYTES(conn,
725  conn->out_packet.payload,
726  conn->out_packet.payload_size);
727 
728  send_out_buffer(conn);
729  timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
730 
731  /*
732  * If QoS is zero then wait until the message has been sent, since there is
733  * no ACK to wait for.
734  *
735  * Also notify the app will not be notified via PUBACK or PUBCOMP
736  */
737  if(conn->out_packet.qos == 0) {
738  process_post(conn->app_process, mqtt_update_event, NULL);
739  } else if(conn->out_packet.qos == 1) {
740  /* Wait for PUBACK */
741  reset_packet(&conn->in_packet);
742  PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
743  timer_expired(&conn->t));
744  if(timer_expired(&conn->t)) {
745  DBG("Timeout waiting for PUBACK\n");
746  }
747  if(conn->in_packet.mid != conn->out_packet.mid) {
748  DBG("MQTT - Warning, got PUBACK with none matching MID. Currently there "
749  "is no support for several concurrent PUBLISH messages.\n");
750  }
751  } else if(conn->out_packet.qos == 2) {
752  DBG("MQTT - QoS not implemented yet.\n");
753  /* Should wait for PUBREC, send PUBREL and then wait for PUBCOMP */
754  }
755 
756  reset_packet(&conn->in_packet);
757 
758  /* This is clear after the entire transaction is complete */
759  conn->out_queue_full = 0;
760 
761  DBG("MQTT - Publish Enqueued\n");
762 
763  PT_END(pt);
764 }
765 /*---------------------------------------------------------------------------*/
766 static
767 PT_THREAD(pingreq_pt(struct pt *pt, struct mqtt_connection *conn))
768 {
769  PT_BEGIN(pt);
770 
771  DBG("MQTT - Sending PINGREQ\n");
772 
773  /* Write Fixed Header */
774  PT_MQTT_WRITE_BYTE(conn, MQTT_FHDR_MSG_TYPE_PINGREQ);
775  PT_MQTT_WRITE_BYTE(conn, 0);
776 
777  send_out_buffer(conn);
778 
779  /* Start timeout for reply. */
780  conn->waiting_for_pingresp = 1;
781 
782  /* Wait for PINGRESP or timeout */
783  reset_packet(&conn->in_packet);
784  timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
785 
786  PT_WAIT_UNTIL(pt, conn->in_packet.packet_received || timer_expired(&conn->t));
787 
788  reset_packet(&conn->in_packet);
789 
790  conn->waiting_for_pingresp = 0;
791 
792  PT_END(pt);
793 }
794 /*---------------------------------------------------------------------------*/
795 static void
796 handle_connack(struct mqtt_connection *conn)
797 {
798  mqtt_connack_event_t connack_event;
799 
800  DBG("MQTT - Got CONNACK\n");
801 
802  if(conn->in_packet.payload[1] != 0) {
803  PRINTF("MQTT - Connection refused with Return Code %i\n",
804  conn->in_packet.payload[1]);
805  call_event(conn,
806  MQTT_EVENT_CONNECTION_REFUSED_ERROR,
807  &conn->in_packet.payload[1]);
808  abort_connection(conn);
809  return;
810  }
811 
812  conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
813 
814 #if MQTT_PROTOCOL_VERSION >= MQTT_PROTOCOL_VERSION_3_1_1
815  connack_event.session_present = conn->in_packet.payload[0] & MQTT_VHDR_CONNACK_SESSION_PRESENT;
816 #endif
817 
818  ctimer_set(&conn->keep_alive_timer, conn->keep_alive * CLOCK_SECOND,
819  keep_alive_callback, conn);
820 
821  /* Always reset packet before callback since it might be used directly */
822  conn->state = MQTT_CONN_STATE_CONNECTED_TO_BROKER;
823  call_event(conn, MQTT_EVENT_CONNECTED, &connack_event);
824 }
825 /*---------------------------------------------------------------------------*/
826 static void
827 handle_pingresp(struct mqtt_connection *conn)
828 {
829  DBG("MQTT - Got PINGRESP\n");
830 }
831 /*---------------------------------------------------------------------------*/
832 static void
833 handle_suback(struct mqtt_connection *conn)
834 {
835  mqtt_suback_event_t suback_event;
836 
837  DBG("MQTT - Got SUBACK\n");
838 
839  /* Only accept SUBACKS with X topic QoS response, assume 1 */
840  if(conn->in_packet.remaining_length > MQTT_MID_SIZE +
841  MQTT_MAX_TOPICS_PER_SUBSCRIBE * MQTT_QOS_SIZE) {
842  DBG("MQTT - Error, SUBACK with > 1 topic, not supported.\n");
843  }
844 
845  conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
846 
847  suback_event.mid = (conn->in_packet.payload[0] << 8) |
848  (conn->in_packet.payload[1]);
849  conn->in_packet.mid = suback_event.mid;
850 
851 #if MQTT_311
852  suback_event.success = 0;
853 
854  switch(conn->in_packet.payload[2]) {
855  case MQTT_SUBACK_RET_FAIL:
856  PRINTF("MQTT - Error, SUBSCRIBE failed with SUBACK return code '%x'", conn->in_packet.payload[2]);
857  break;
858 
859  case MQTT_SUBACK_RET_QOS_0:
860  case MQTT_SUBACK_RET_QOS_1:
861  case MQTT_SUBACK_RET_QOS_2:
862  suback_event.qos_level = conn->in_packet.payload[2] & 0x03;
863  suback_event.success = 1;
864  break;
865 
866  default:
867  PRINTF("MQTT - Error, Unrecognised SUBACK return code '%x'", conn->in_packet.payload[2]);
868  break;
869  }
870 
871  suback_event.return_code = conn->in_packet.payload[2];
872 #else
873  suback_event.qos_level = conn->in_packet.payload[2];
874 #endif
875 
876  if(conn->in_packet.mid != conn->out_packet.mid) {
877  DBG("MQTT - Warning, got SUBACK with none matching MID. Currently there is"
878  "no support for several concurrent SUBSCRIBE messages.\n");
879  }
880 
881  /* Always reset packet before callback since it might be used directly */
882  call_event(conn, MQTT_EVENT_SUBACK, &suback_event);
883 }
884 /*---------------------------------------------------------------------------*/
885 static void
886 handle_unsuback(struct mqtt_connection *conn)
887 {
888  DBG("MQTT - Got UNSUBACK\n");
889 
890  conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
891  conn->in_packet.mid = (conn->in_packet.payload[0] << 8) |
892  (conn->in_packet.payload[1]);
893 
894  if(conn->in_packet.mid != conn->out_packet.mid) {
895  DBG("MQTT - Warning, got UNSUBACK with none matching MID. Currently there is"
896  "no support for several concurrent UNSUBSCRIBE messages.\n");
897  }
898 
899  call_event(conn, MQTT_EVENT_UNSUBACK, &conn->in_packet.mid);
900 }
901 /*---------------------------------------------------------------------------*/
902 static void
903 handle_puback(struct mqtt_connection *conn)
904 {
905  DBG("MQTT - Got PUBACK\n");
906 
907  conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
908  conn->in_packet.mid = (conn->in_packet.payload[0] << 8) |
909  (conn->in_packet.payload[1]);
910 
911  call_event(conn, MQTT_EVENT_PUBACK, &conn->in_packet.mid);
912 }
913 /*---------------------------------------------------------------------------*/
914 static mqtt_pub_status_t
915 handle_publish(struct mqtt_connection *conn)
916 {
917  DBG("MQTT - Got PUBLISH, called once per manageable chunk of message.\n");
918  DBG("MQTT - Handling publish on topic '%s'\n", conn->in_publish_msg.topic);
919 
920 #if MQTT_PROTOCOL_VERSION >= MQTT_PROTOCOL_VERSION_3_1_1
921  if(strlen(conn->in_publish_msg.topic) < conn->in_packet.topic_len) {
922  DBG("NULL detected in received PUBLISH topic\n");
923  mqtt_disconnect(conn);
924  return MQTT_PUBLISH_ERR;
925  }
926 #endif
927 
928  DBG("MQTT - This chunk is %i bytes\n", conn->in_packet.payload_pos);
929 
930  if(((conn->in_packet.fhdr & 0x09) >> 1) > 0) {
931  PRINTF("MQTT - Error, got incoming PUBLISH with QoS > 0, not supported atm!\n");
932  }
933 
934  call_event(conn, MQTT_EVENT_PUBLISH, &conn->in_publish_msg);
935 
936  if(conn->in_publish_msg.first_chunk == 1) {
937  conn->in_publish_msg.first_chunk = 0;
938  }
939 
940  /* If this is the last time handle_publish will be called, reset packet. */
941  if(conn->in_publish_msg.payload_left == 0) {
942 
943  /* Check for QoS and initiate the reply, do not rely on the data in the
944  * in_packet being untouched. */
945 
946  DBG("MQTT - (handle_publish) resetting packet.\n");
947  reset_packet(&conn->in_packet);
948  }
949 
950  return MQTT_PUBLISH_OK;
951 }
952 /*---------------------------------------------------------------------------*/
953 static void
954 parse_publish_vhdr(struct mqtt_connection *conn,
955  uint32_t *pos,
956  const uint8_t *input_data_ptr,
957  int input_data_len)
958 {
959  uint16_t copy_bytes;
960 
961  /* Read out topic length */
962  if(conn->in_packet.topic_len_received == 0) {
963  conn->in_packet.topic_pos = 0;
964  conn->in_packet.topic_len = (input_data_ptr[(*pos)++] << 8);
965  conn->in_packet.byte_counter++;
966  if(*pos >= input_data_len) {
967  return;
968  }
969  conn->in_packet.topic_len |= input_data_ptr[(*pos)++];
970  conn->in_packet.byte_counter++;
971  conn->in_packet.topic_len_received = 1;
972  /* Abort if topic is longer than our topic buffer */
973  if(conn->in_packet.topic_len > MQTT_MAX_TOPIC_LENGTH) {
974  DBG("MQTT - topic too long %u/%u\n", conn->in_packet.topic_len, MQTT_MAX_TOPIC_LENGTH);
975  return;
976  }
977  DBG("MQTT - Read PUBLISH topic len %i\n", conn->in_packet.topic_len);
978  /* WARNING: Check here if TOPIC fits in payload area, otherwise error */
979  }
980 
981  /* Read out topic */
982  if(conn->in_packet.topic_len_received == 1 &&
983  conn->in_packet.topic_received == 0) {
984  copy_bytes = MIN(conn->in_packet.topic_len - conn->in_packet.topic_pos,
985  input_data_len - *pos);
986  DBG("MQTT - topic_pos: %i copy_bytes: %i", conn->in_packet.topic_pos,
987  copy_bytes);
988  memcpy(&conn->in_publish_msg.topic[conn->in_packet.topic_pos],
989  &input_data_ptr[*pos],
990  copy_bytes);
991  (*pos) += copy_bytes;
992  conn->in_packet.byte_counter += copy_bytes;
993  conn->in_packet.topic_pos += copy_bytes;
994 
995  if(conn->in_packet.topic_len - conn->in_packet.topic_pos == 0) {
996  DBG("MQTT - Got topic '%s'", conn->in_publish_msg.topic);
997  conn->in_packet.topic_received = 1;
998  conn->in_publish_msg.topic[conn->in_packet.topic_pos] = '\0';
999  conn->in_publish_msg.payload_length =
1000  conn->in_packet.remaining_length - conn->in_packet.topic_len - 2;
1001  conn->in_publish_msg.payload_left = conn->in_publish_msg.payload_length;
1002  }
1003 
1004  /* Set this once per incomming publish message */
1005  conn->in_publish_msg.first_chunk = 1;
1006  }
1007 }
1008 /*---------------------------------------------------------------------------*/
1009 static int
1010 tcp_input(struct tcp_socket *s,
1011  void *ptr,
1012  const uint8_t *input_data_ptr,
1013  int input_data_len)
1014 {
1015  struct mqtt_connection *conn = ptr;
1016  uint32_t pos = 0;
1017  uint32_t copy_bytes = 0;
1018  uint8_t byte;
1019  mqtt_pub_status_t pub_status;
1020 
1021  if(input_data_len == 0) {
1022  return 0;
1023  }
1024 
1025  if(conn->in_packet.packet_received) {
1026  reset_packet(&conn->in_packet);
1027  }
1028 
1029  DBG("tcp_input with %i bytes of data:\n", input_data_len);
1030 
1031  /* Read the fixed header field, if we do not have it */
1032  if(!conn->in_packet.fhdr) {
1033  conn->in_packet.fhdr = input_data_ptr[pos++];
1034  conn->in_packet.byte_counter++;
1035 
1036  DBG("MQTT - Read VHDR '%02X'\n", conn->in_packet.fhdr);
1037 
1038  if(pos >= input_data_len) {
1039  return 0;
1040  }
1041  }
1042 
1043  /* Read the Remaining Length field, if we do not have it */
1044  if(!conn->in_packet.has_remaining_length) {
1045  do {
1046  if(pos >= input_data_len) {
1047  return 0;
1048  }
1049 
1050  byte = input_data_ptr[pos++];
1051  conn->in_packet.byte_counter++;
1052  conn->in_packet.remaining_length_bytes++;
1053  DBG("MQTT - Read Remaining Length byte\n");
1054 
1055  if(conn->in_packet.byte_counter > 5) {
1056  call_event(conn, MQTT_EVENT_ERROR, NULL);
1057  DBG("Received more then 4 byte 'remaining lenght'.");
1058  return 0;
1059  }
1060 
1061  conn->in_packet.remaining_length +=
1062  (byte & 127) * conn->in_packet.remaining_multiplier;
1063  conn->in_packet.remaining_multiplier *= 128;
1064  } while((byte & 128) != 0);
1065 
1066  DBG("MQTT - Finished reading remaining length byte\n");
1067  conn->in_packet.has_remaining_length = 1;
1068  }
1069 
1070  /*
1071  * Check for unsupported payload length. Will read all incoming data from the
1072  * server in any case and then reset the packet.
1073  *
1074  * TODO: Decide if we, for example, want to disconnect instead.
1075  */
1076  if((conn->in_packet.remaining_length > MQTT_INPUT_BUFF_SIZE) &&
1077  (conn->in_packet.fhdr & 0xF0) != MQTT_FHDR_MSG_TYPE_PUBLISH) {
1078 
1079  PRINTF("MQTT - Error, unsupported payload size for non-PUBLISH message\n");
1080 
1081  conn->in_packet.byte_counter += input_data_len;
1082  if(conn->in_packet.byte_counter >=
1083  (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)) {
1084  conn->in_packet.packet_received = 1;
1085  }
1086  return 0;
1087  }
1088 
1089  /*
1090  * Supported payload, reads out both VHDR and Payload of all packets.
1091  *
1092  * Note: There will always be at least one byte left to read when we enter
1093  * this loop.
1094  */
1095  while(conn->in_packet.byte_counter <
1096  (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)) {
1097 
1098  if((conn->in_packet.fhdr & 0xF0) == MQTT_FHDR_MSG_TYPE_PUBLISH &&
1099  conn->in_packet.topic_received == 0) {
1100  parse_publish_vhdr(conn, &pos, input_data_ptr, input_data_len);
1101  }
1102 
1103  /* Read in as much as we can into the packet payload */
1104  copy_bytes = MIN(input_data_len - pos,
1105  MQTT_INPUT_BUFF_SIZE - conn->in_packet.payload_pos);
1106  DBG("- Copied %lu payload bytes\n", copy_bytes);
1107  memcpy(&conn->in_packet.payload[conn->in_packet.payload_pos],
1108  &input_data_ptr[pos],
1109  copy_bytes);
1110  conn->in_packet.byte_counter += copy_bytes;
1111  conn->in_packet.payload_pos += copy_bytes;
1112  pos += copy_bytes;
1113 
1114 #if DEBUG_MQTT == 1
1115  uint32_t i;
1116  DBG("MQTT - Copied bytes: \n");
1117  for(i = 0; i < copy_bytes; i++) {
1118  DBG("%02X ", conn->in_packet.payload[i]);
1119  }
1120  DBG("\n");
1121 #endif
1122 
1123  /* Full buffer, shall only happen to PUBLISH messages. */
1124  if(MQTT_INPUT_BUFF_SIZE - conn->in_packet.payload_pos == 0) {
1125  conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1126  conn->in_publish_msg.payload_chunk_length = MQTT_INPUT_BUFF_SIZE;
1127  conn->in_publish_msg.payload_left -= MQTT_INPUT_BUFF_SIZE;
1128 
1129  pub_status = handle_publish(conn);
1130 
1131  conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1132  conn->in_packet.payload_pos = 0;
1133 
1134  if(pub_status != MQTT_PUBLISH_OK) {
1135  return 0;
1136  }
1137  }
1138 
1139  if(pos >= input_data_len &&
1140  (conn->in_packet.byte_counter < (MQTT_FHDR_SIZE + conn->in_packet.remaining_length))) {
1141  return 0;
1142  }
1143  }
1144 
1145  /* Debug information */
1146  DBG("\n");
1147  /* Take care of input */
1148  DBG("MQTT - Finished reading packet!\n");
1149  /* What to return? */
1150  DBG("MQTT - total data was %i bytes of data. \n",
1151  (MQTT_FHDR_SIZE + conn->in_packet.remaining_length));
1152 
1153  /* Handle packet here. */
1154  switch(conn->in_packet.fhdr & 0xF0) {
1155  case MQTT_FHDR_MSG_TYPE_CONNACK:
1156  handle_connack(conn);
1157  break;
1158  case MQTT_FHDR_MSG_TYPE_PUBLISH:
1159  /* This is the only or the last chunk of publish payload */
1160  conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1161  conn->in_publish_msg.payload_chunk_length = conn->in_packet.payload_pos;
1162  conn->in_publish_msg.payload_left = 0;
1163  (void) handle_publish(conn);
1164  break;
1165  case MQTT_FHDR_MSG_TYPE_PUBACK:
1166  handle_puback(conn);
1167  break;
1168  case MQTT_FHDR_MSG_TYPE_SUBACK:
1169  handle_suback(conn);
1170  break;
1171  case MQTT_FHDR_MSG_TYPE_UNSUBACK:
1172  handle_unsuback(conn);
1173  break;
1174  case MQTT_FHDR_MSG_TYPE_PINGRESP:
1175  handle_pingresp(conn);
1176  break;
1177 
1178  /* QoS 2 not implemented yet */
1179  case MQTT_FHDR_MSG_TYPE_PUBREC:
1180  case MQTT_FHDR_MSG_TYPE_PUBREL:
1181  case MQTT_FHDR_MSG_TYPE_PUBCOMP:
1182  call_event(conn, MQTT_EVENT_NOT_IMPLEMENTED_ERROR, NULL);
1183  PRINTF("MQTT - Got unhandled MQTT Message Type '%i'",
1184  (conn->in_packet.fhdr & 0xF0));
1185  break;
1186 
1187  default:
1188  /* All server-only message */
1189  PRINTF("MQTT - Got MQTT Message Type '%i'", (conn->in_packet.fhdr & 0xF0));
1190  break;
1191  }
1192 
1193  conn->in_packet.packet_received = 1;
1194 
1195  return 0;
1196 }
1197 /*---------------------------------------------------------------------------*/
1198 /*
1199  * Handles TCP events from Simple TCP
1200  */
1201 static void
1202 tcp_event(struct tcp_socket *s, void *ptr, tcp_socket_event_t event)
1203 {
1204  struct mqtt_connection *conn = ptr;
1205 
1206  /* Take care of event */
1207  switch(event) {
1208 
1209  /* Fall through to manage different disconnect event the same way. */
1210  case TCP_SOCKET_CLOSED:
1211  case TCP_SOCKET_TIMEDOUT:
1212  case TCP_SOCKET_ABORTED: {
1213 
1214  DBG("MQTT - Disconnected by tcp event %d\n", event);
1215  process_post(&mqtt_process, mqtt_abort_now_event, conn);
1216  conn->state = MQTT_CONN_STATE_NOT_CONNECTED;
1217  ctimer_stop(&conn->keep_alive_timer);
1218  call_event(conn, MQTT_EVENT_DISCONNECTED, &event);
1219  abort_connection(conn);
1220 
1221  /* If connecting retry */
1222  if(conn->auto_reconnect == 1) {
1223  connect_tcp(conn);
1224  }
1225  break;
1226  }
1227  case TCP_SOCKET_CONNECTED: {
1228  conn->state = MQTT_CONN_STATE_TCP_CONNECTED;
1229  conn->out_buffer_sent = 1;
1230 
1231  process_post(&mqtt_process, mqtt_do_connect_mqtt_event, conn);
1232  break;
1233  }
1234  case TCP_SOCKET_DATA_SENT: {
1235  DBG("MQTT - Got TCP_DATA_SENT\n");
1236 
1237  if(conn->socket.output_data_len == 0) {
1238  conn->out_buffer_sent = 1;
1239  conn->out_buffer_ptr = conn->out_buffer;
1240  }
1241 
1242  ctimer_restart(&conn->keep_alive_timer);
1243  break;
1244  }
1245 
1246  default: {
1247  DBG("MQTT - TCP Event %d is currently not managed by the tcp event callback\n",
1248  event);
1249  }
1250  }
1251 }
1252 /*---------------------------------------------------------------------------*/
1253 PROCESS_THREAD(mqtt_process, ev, data)
1254 {
1255  static struct mqtt_connection *conn;
1256 
1257  PROCESS_BEGIN();
1258 
1259  while(1) {
1261 
1262  if(ev == mqtt_abort_now_event) {
1263  DBG("MQTT - Abort\n");
1264  conn = data;
1265  conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE;
1266 
1267  abort_connection(conn);
1268  }
1269  if(ev == mqtt_do_connect_tcp_event) {
1270  conn = data;
1271  DBG("MQTT - Got mqtt_do_connect_tcp_event!\n");
1272  connect_tcp(conn);
1273  }
1274  if(ev == mqtt_do_connect_mqtt_event) {
1275  conn = data;
1276  conn->socket.output_data_max_seg = conn->max_segment_size;
1277  DBG("MQTT - Got mqtt_do_connect_mqtt_event!\n");
1278 
1279  if(conn->out_buffer_sent == 1) {
1280  PT_INIT(&conn->out_proto_thread);
1281  while(connect_pt(&conn->out_proto_thread, conn) < PT_EXITED &&
1282  conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE) {
1283  PT_MQTT_WAIT_SEND();
1284  }
1285  }
1286  }
1287  if(ev == mqtt_do_disconnect_mqtt_event) {
1288  conn = data;
1289  DBG("MQTT - Got mqtt_do_disconnect_mqtt_event!\n");
1290 
1291  /* Send MQTT Disconnect if we are connected */
1292  if(conn->state == MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT) {
1293  if(conn->out_buffer_sent == 1) {
1294  PT_INIT(&conn->out_proto_thread);
1295  while(conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE &&
1296  disconnect_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1297  PT_MQTT_WAIT_SEND();
1298  }
1299  abort_connection(conn);
1300  call_event(conn, MQTT_EVENT_DISCONNECTED, &ev);
1301  } else {
1302  process_post(&mqtt_process, mqtt_do_disconnect_mqtt_event, conn);
1303  }
1304  }
1305  }
1306  if(ev == mqtt_do_pingreq_event) {
1307  conn = data;
1308  DBG("MQTT - Got mqtt_do_pingreq_event!\n");
1309 
1310  if(conn->out_buffer_sent == 1 &&
1311  conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1312  PT_INIT(&conn->out_proto_thread);
1313  while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1314  pingreq_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1315  PT_MQTT_WAIT_SEND();
1316  }
1317  }
1318  }
1319  if(ev == mqtt_do_subscribe_event) {
1320  conn = data;
1321  DBG("MQTT - Got mqtt_do_subscribe_mqtt_event!\n");
1322 
1323  if(conn->out_buffer_sent == 1 &&
1324  conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1325  PT_INIT(&conn->out_proto_thread);
1326  while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1327  subscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1328  PT_MQTT_WAIT_SEND();
1329  }
1330  }
1331  }
1332  if(ev == mqtt_do_unsubscribe_event) {
1333  conn = data;
1334  DBG("MQTT - Got mqtt_do_unsubscribe_mqtt_event!\n");
1335 
1336  if(conn->out_buffer_sent == 1 &&
1337  conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1338  PT_INIT(&conn->out_proto_thread);
1339  while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1340  unsubscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1341  PT_MQTT_WAIT_SEND();
1342  }
1343  }
1344  }
1345  if(ev == mqtt_do_publish_event) {
1346  conn = data;
1347  DBG("MQTT - Got mqtt_do_publish_mqtt_event!\n");
1348 
1349  if(conn->out_buffer_sent == 1 &&
1350  conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1351  PT_INIT(&conn->out_proto_thread);
1352  while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1353  publish_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1354  PT_MQTT_WAIT_SEND();
1355  }
1356  }
1357  }
1358  }
1359  PROCESS_END();
1360 }
1361 /*---------------------------------------------------------------------------*/
1362 void
1363 mqtt_init(void)
1364 {
1365  static uint8_t inited = 0;
1366  if(!inited) {
1367  mqtt_do_connect_tcp_event = process_alloc_event();
1368  mqtt_event_min = mqtt_do_connect_tcp_event;
1369 
1370  mqtt_do_connect_mqtt_event = process_alloc_event();
1371  mqtt_do_disconnect_mqtt_event = process_alloc_event();
1372  mqtt_do_subscribe_event = process_alloc_event();
1373  mqtt_do_unsubscribe_event = process_alloc_event();
1374  mqtt_do_publish_event = process_alloc_event();
1375  mqtt_do_pingreq_event = process_alloc_event();
1376  mqtt_update_event = process_alloc_event();
1377  mqtt_abort_now_event = process_alloc_event();
1378  mqtt_event_max = mqtt_abort_now_event;
1379 
1380  mqtt_continue_send_event = process_alloc_event();
1381 
1382  list_init(mqtt_conn_list);
1383  process_start(&mqtt_process, NULL);
1384  inited = 1;
1385  }
1386 }
1387 /*---------------------------------------------------------------------------*/
1388 mqtt_status_t
1389 mqtt_register(struct mqtt_connection *conn, struct process *app_process,
1390  char *client_id, mqtt_event_callback_t event_callback,
1391  uint16_t max_segment_size)
1392 {
1393 #if MQTT_31 || !MQTT_SRV_SUPPORTS_EMPTY_CLIENT_ID
1394  if(strlen(client_id) < 1) {
1395  return MQTT_STATUS_INVALID_ARGS_ERROR;
1396  }
1397 #endif
1398 
1399  /* Set defaults - Set all to zero to begin with */
1400  memset(conn, 0, sizeof(struct mqtt_connection));
1401  string_to_mqtt_string(&conn->client_id, client_id);
1402  conn->event_callback = event_callback;
1403  conn->app_process = app_process;
1404  conn->auto_reconnect = 1;
1405  conn->max_segment_size = max_segment_size;
1406  reset_defaults(conn);
1407 
1408  mqtt_init();
1409  list_add(mqtt_conn_list, conn);
1410 
1411  DBG("MQTT - Registered successfully\n");
1412 
1413  return MQTT_STATUS_OK;
1414 }
1415 /*---------------------------------------------------------------------------*/
1416 /*
1417  * Connect to MQTT broker.
1418  *
1419  * N.B. Non-blocking call.
1420  */
1421 mqtt_status_t
1422 mqtt_connect(struct mqtt_connection *conn, char *host, uint16_t port,
1423  uint16_t keep_alive, uint8_t clean_session)
1424 {
1425  uip_ip6addr_t ip6addr;
1426  uip_ipaddr_t *ipaddr;
1427  ipaddr = &ip6addr;
1428 
1429  /* Check if we are already trying to connect */
1430  if(conn->state > MQTT_CONN_STATE_NOT_CONNECTED) {
1431  return MQTT_STATUS_OK;
1432  }
1433 
1434  conn->server_host = host;
1435  conn->keep_alive = keep_alive;
1436  conn->server_port = port;
1437  conn->out_buffer_ptr = conn->out_buffer;
1438  conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1439 
1440  /* If the Client supplies a zero-byte ClientId, the Client MUST also set CleanSession to 1 */
1441  if(clean_session || (conn->client_id.length == 0)) {
1442  conn->connect_vhdr_flags |= MQTT_VHDR_CLEAN_SESSION_FLAG;
1443  }
1444 
1445  /* convert the string IPv6 address to a numeric IPv6 address */
1446  if(uiplib_ip6addrconv(host, &ip6addr) == 0) {
1447  return MQTT_STATUS_ERROR;
1448  }
1449 
1450  uip_ipaddr_copy(&(conn->server_ip), ipaddr);
1451 
1452  /*
1453  * Initiate the connection if the IP could be resolved. Otherwise the
1454  * connection will be initiated when the DNS lookup is finished, in the main
1455  * event loop.
1456  */
1457  process_post(&mqtt_process, mqtt_do_connect_tcp_event, conn);
1458 
1459  return MQTT_STATUS_OK;
1460 }
1461 /*----------------------------------------------------------------------------*/
1462 void
1463 mqtt_disconnect(struct mqtt_connection *conn)
1464 {
1465  if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1466  return;
1467  }
1468 
1469  conn->state = MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT;
1470 
1471  process_post(&mqtt_process, mqtt_do_disconnect_mqtt_event, conn);
1472 }
1473 /*----------------------------------------------------------------------------*/
1474 mqtt_status_t
1475 mqtt_subscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic,
1476  mqtt_qos_level_t qos_level)
1477 {
1478  if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1479  return MQTT_STATUS_NOT_CONNECTED_ERROR;
1480  }
1481 
1482  DBG("MQTT - Call to mqtt_subscribe...\n");
1483 
1484  /* Currently don't have a queue, so only one item at a time */
1485  if(conn->out_queue_full) {
1486  DBG("MQTT - Not accepted!\n");
1487  return MQTT_STATUS_OUT_QUEUE_FULL;
1488  }
1489  conn->out_queue_full = 1;
1490  DBG("MQTT - Accepted!\n");
1491 
1492  conn->out_packet.mid = INCREMENT_MID(conn);
1493  conn->out_packet.topic = topic;
1494  conn->out_packet.topic_length = strlen(topic);
1495  conn->out_packet.qos = qos_level;
1496  conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1497 
1498  process_post(&mqtt_process, mqtt_do_subscribe_event, conn);
1499  return MQTT_STATUS_OK;
1500 }
1501 /*----------------------------------------------------------------------------*/
1502 mqtt_status_t
1503 mqtt_unsubscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic)
1504 {
1505  if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1506  return MQTT_STATUS_NOT_CONNECTED_ERROR;
1507  }
1508 
1509  DBG("MQTT - Call to mqtt_unsubscribe...\n");
1510  /* Currently don't have a queue, so only one item at a time */
1511  if(conn->out_queue_full) {
1512  DBG("MQTT - Not accepted!\n");
1513  return MQTT_STATUS_OUT_QUEUE_FULL;
1514  }
1515  conn->out_queue_full = 1;
1516  DBG("MQTT - Accepted!\n");
1517 
1518  conn->out_packet.mid = INCREMENT_MID(conn);
1519  conn->out_packet.topic = topic;
1520  conn->out_packet.topic_length = strlen(topic);
1521  conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1522 
1523  process_post(&mqtt_process, mqtt_do_unsubscribe_event, conn);
1524  return MQTT_STATUS_OK;
1525 }
1526 /*----------------------------------------------------------------------------*/
1527 mqtt_status_t
1528 mqtt_publish(struct mqtt_connection *conn, uint16_t *mid, char *topic,
1529  uint8_t *payload, uint32_t payload_size,
1530  mqtt_qos_level_t qos_level, mqtt_retain_t retain)
1531 {
1532  if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1533  return MQTT_STATUS_NOT_CONNECTED_ERROR;
1534  }
1535 
1536  DBG("MQTT - Call to mqtt_publish...\n");
1537 
1538  /* Currently don't have a queue, so only one item at a time */
1539  if(conn->out_queue_full) {
1540  DBG("MQTT - Not accepted!\n");
1541  return MQTT_STATUS_OUT_QUEUE_FULL;
1542  }
1543  conn->out_queue_full = 1;
1544  DBG("MQTT - Accepted!\n");
1545 
1546  conn->out_packet.mid = INCREMENT_MID(conn);
1547  conn->out_packet.retain = retain;
1548  conn->out_packet.topic = topic;
1549  conn->out_packet.topic_length = strlen(topic);
1550  conn->out_packet.payload = payload;
1551  conn->out_packet.payload_size = payload_size;
1552  conn->out_packet.qos = qos_level;
1553  conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1554 
1555  process_post(&mqtt_process, mqtt_do_publish_event, conn);
1556  return MQTT_STATUS_OK;
1557 }
1558 /*----------------------------------------------------------------------------*/
1559 void
1560 mqtt_set_username_password(struct mqtt_connection *conn, char *username,
1561  char *password)
1562 {
1563  /* Set strings, NULL string will simply set length to zero */
1564  string_to_mqtt_string(&conn->credentials.username, username);
1565  string_to_mqtt_string(&conn->credentials.password, password);
1566 
1567  /* Set CONNECT VHDR flags */
1568  if(username != NULL) {
1569  conn->connect_vhdr_flags |= MQTT_VHDR_USERNAME_FLAG;
1570  } else {
1571  conn->connect_vhdr_flags &= ~MQTT_VHDR_USERNAME_FLAG;
1572  }
1573  if(password != NULL) {
1574  conn->connect_vhdr_flags |= MQTT_VHDR_PASSWORD_FLAG;
1575  } else {
1576  conn->connect_vhdr_flags &= ~MQTT_VHDR_PASSWORD_FLAG;
1577  }
1578 }
1579 /*----------------------------------------------------------------------------*/
1580 void
1581 mqtt_set_last_will(struct mqtt_connection *conn, char *topic, char *message,
1582  mqtt_qos_level_t qos)
1583 {
1584  /* Set strings, NULL string will simply set length to zero */
1585  string_to_mqtt_string(&conn->will.topic, topic);
1586  string_to_mqtt_string(&conn->will.message, message);
1587 
1588  /* Currently not used! */
1589  conn->will.qos = qos;
1590 
1591  if(topic != NULL) {
1592  conn->connect_vhdr_flags |= MQTT_VHDR_WILL_FLAG |
1593  MQTT_VHDR_WILL_RETAIN_FLAG;
1594  }
1595 }
1596 /*----------------------------------------------------------------------------*/
1597 /** @} */
mqtt_status_t mqtt_subscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic, mqtt_qos_level_t qos_level)
Subscribes to a MQTT topic.
Definition: mqtt.c:1475
static uip_ipaddr_t ipaddr
Pointer to prefix information option in uip_buf.
Definition: uip-nd6.c:116
mqtt_event_t
MQTT engine events.
Definition: mqtt.h:182
void timer_set(struct timer *t, clock_time_t interval)
Set a timer.
Definition: timer.c:64
#define PROCESS(name, strname)
Declare a process.
Definition: process.h:307
Protothreads implementation.
void ctimer_stop(struct ctimer *c)
Stop a pending callback timer.
Definition: ctimer.c:149
#define PROCESS_WAIT_EVENT()
Wait for an event to be posted to the process.
Definition: process.h:141
#define PROCESS_BEGIN()
Define the beginning of a process.
Definition: process.h:120
mqtt_status_t mqtt_publish(struct mqtt_connection *conn, uint16_t *mid, char *topic, uint8_t *payload, uint32_t payload_size, mqtt_qos_level_t qos_level, mqtt_retain_t retain)
Publish to a MQTT topic.
Definition: mqtt.c:1528
#define PROCESS_END()
Define the end of a process.
Definition: process.h:131
#define PT_BEGIN(pt)
Declare the start of a protothread inside the C function implementing the protothread.
Definition: pt.h:114
#define PT_WAIT_UNTIL(pt, condition)
Block and wait until condition is true.
Definition: pt.h:147
void(* mqtt_event_callback_t)(struct mqtt_connection *m, mqtt_event_t event, void *data)
MQTT event callback function.
Definition: mqtt.h:346
Header file for IPv6-related data structures.
#define PT_INIT(pt)
Initialize a protothread.
Definition: pt.h:79
#define CLOCK_SECOND
A second, measured in system clock time.
Definition: clock.h:82
Header file for the callback timer
#define PT_END(pt)
Declare the end of a protothread.
Definition: pt.h:126
Event timer header file.
mqtt_status_t mqtt_unsubscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic)
Unsubscribes from a MQTT topic.
Definition: mqtt.c:1503
Linked list manipulation routines.
void mqtt_set_username_password(struct mqtt_connection *conn, char *username, char *password)
Set the user name and password for a MQTT client.
Definition: mqtt.c:1560
Header file for the Contiki MQTT engine.
void ctimer_set(struct ctimer *c, clock_time_t t, void(*f)(void *), void *ptr)
Set a callback timer.
Definition: ctimer.c:99
int timer_expired(struct timer *t)
Check if a timer has expired.
Definition: timer.c:123
#define uip_ipaddr_copy(dest, src)
Copy an IP address from one place to another.
Definition: uip.h:1015
#define PT_EXIT(pt)
Exit the protothread.
Definition: pt.h:245
void ctimer_restart(struct ctimer *c)
Restart a callback timer from the current point in time.
Definition: ctimer.c:137
#define PT_THREAD(name_args)
Declaration of a protothread.
Definition: pt.h:99
process_event_t process_alloc_event(void)
Allocate a global event number.
Definition: process.c:93
void list_add(list_t list, void *item)
Add an item at the end of a list.
Definition: list.c:142
void list_init(list_t list)
Initialize a list.
Definition: list.c:65
Header file for the uIP TCP/IP stack.
#define LIST(name)
Declare a linked list.
Definition: list.h:89
mqtt_status_t mqtt_register(struct mqtt_connection *conn, struct process *app_process, char *client_id, mqtt_event_callback_t event_callback, uint16_t max_segment_size)
Initializes the MQTT engine.
Definition: mqtt.c:1389
int process_post(struct process *p, process_event_t ev, process_data_t data)
Post an asynchronous event.
Definition: process.c:322
void mqtt_disconnect(struct mqtt_connection *conn)
Disconnects from a MQTT broker.
Definition: mqtt.c:1463
Default definitions of C compiler quirk work-arounds.
PROCESS_THREAD(cc2538_rf_process, ev, data)
Implementation of the cc2538 RF driver process.
Definition: cc2538-rf.c:1107
void mqtt_set_last_will(struct mqtt_connection *conn, char *topic, char *message, mqtt_qos_level_t qos)
Set the last will topic and message for a MQTT client.
Definition: mqtt.c:1581
Header file for the LED HAL.
mqtt_status_t mqtt_connect(struct mqtt_connection *conn, char *host, uint16_t port, uint16_t keep_alive, uint8_t clean_session)
Connects to a MQTT broker.
Definition: mqtt.c:1422
void process_start(struct process *p, process_data_t data)
Start a process.
Definition: process.c:99