46#include "contiki-net.h"
47#include "contiki-lib.h"
48#include "lib/random.h"
56#include "tcp-socket.h"
58#include "lib/assert.h"
69#define PRINTF(...) printf(__VA_ARGS__)
75 MQTT_FHDR_DUP_FLAG = 0x08,
77 MQTT_FHDR_QOS_LEVEL_0 = 0x00,
78 MQTT_FHDR_QOS_LEVEL_1 = 0x02,
79 MQTT_FHDR_QOS_LEVEL_2 = 0x04,
81 MQTT_FHDR_RETAIN_FLAG = 0x01,
85 MQTT_VHDR_USERNAME_FLAG = 0x80,
86 MQTT_VHDR_PASSWORD_FLAG = 0x40,
88 MQTT_VHDR_WILL_RETAIN_FLAG = 0x20,
89 MQTT_VHDR_WILL_QOS_LEVEL_0 = 0x00,
90 MQTT_VHDR_WILL_QOS_LEVEL_1 = 0x08,
91 MQTT_VHDR_WILL_QOS_LEVEL_2 = 0x10,
93 MQTT_VHDR_WILL_FLAG = 0x04,
94 MQTT_VHDR_CLEAN_SESSION_FLAG = 0x02,
95} mqtt_vhdr_conn_fields_t;
98 MQTT_VHDR_CONN_ACCEPTED,
99 MQTT_VHDR_CONN_REJECTED_PROTOCOL,
100 MQTT_VHDR_CONN_REJECTED_IDENTIFIER,
101 MQTT_VHDR_CONN_REJECTED_UNAVAILABLE,
102 MQTT_VHDR_CONN_REJECTED_BAD_USER_PASS,
103 MQTT_VHDR_CONN_REJECTED_UNAUTHORIZED,
104} mqtt_vhdr_connack_ret_code_t;
107 MQTT_VHDR_CONNACK_SESSION_PRESENT = 0x1
108} mqtt_vhdr_connack_flags_t;
112 MQTT_SUBACK_RET_QOS_0 = 0x00,
113 MQTT_SUBACK_RET_QOS_1 = 0x01,
114 MQTT_SUBACK_RET_QOS_2 = 0x02,
115 MQTT_SUBACK_RET_FAIL = 0x08,
116} mqtt_suback_ret_code_t;
120 MQTT_VHDR_RC_SUCCES_OR_NORMAL = 0x00,
121 MQTT_VHDR_RC_QOS_0 = 0x01,
122 MQTT_VHDR_RC_QOS_1 = 0x02,
123 MQTT_VHDR_RC_QOS_2 = 0x03,
124 MQTT_VHDR_RC_DISC_WITH_WILL = 0x04,
125 MQTT_VHDR_RC_NO_MATCH_SUB = 0x10,
126 MQTT_VHDR_RC_NO_SUB_EXISTED = 0x11,
127 MQTT_VHDR_RC_CONTINUE_AUTH = 0x18,
128 MQTT_VHDR_RC_REAUTH = 0x19,
129 MQTT_VHDR_RC_UNSPEC_ERR = 0x80,
130 MQTT_VHDR_RC_MALFORMED_PKT = 0x81,
131 MQTT_VHDR_RC_PROTOCOL_ERR = 0x82,
132 MQTT_VHDR_RC_IMPL_SPEC_ERR = 0x83,
133 MQTT_VHDR_RC_PROT_VER_UNUSUPPORTED = 0x84,
134 MQTT_VHDR_RC_CLIENT_ID_INVALID = 0x85,
135 MQTT_VHDR_RC_BAD_USER_PASS = 0x86,
136 MQTT_VHDR_RC_NOT_AUTH = 0x87,
137 MQTT_VHDR_RC_SRV_UNAVAIL = 0x88,
138 MQTT_VHDR_RC_SRV_BUSY = 0x89,
139 MQTT_VHDR_RC_BANNED = 0x8A,
140 MQTT_VHDR_RC_SRV_SHUTDOWN = 0x8B,
141 MQTT_VHDR_RC_BAD_AUTH_METHOD = 0x8C,
142 MQTT_VHDR_RC_KEEP_ALIVE_TIMEOUT = 0x8D,
143 MQTT_VHDR_RC_SESS_TAKEN_OVER = 0x8E,
144 MQTT_VHDR_RC_TOPIC_FILT_INVAL = 0x8F,
145 MQTT_VHDR_RC_TOPIC_NAME_INVAL = 0x90,
146 MQTT_VHDR_RC_PKT_ID_IN_USE = 0x91,
147 MQTT_VHDR_RC_PKT_ID_NOT_FOUND = 0x92,
148 MQTT_VHDR_RC_RECV_MAX_EXCEEDED = 0x93,
149 MQTT_VHDR_RC_TOPIC_ALIAS_INVAL = 0x94,
150 MQTT_VHDR_RC_PKT_TOO_LARGE = 0x95,
151 MQTT_VHDR_RC_MSG_RATE_TOO_HIGH = 0x96,
152 MQTT_VHDR_RC_QUOTA_EXCEEDED = 0x97,
153 MQTT_VHDR_RC_ADMIN_ACTION = 0x98,
154 MQTT_VHDR_RC_PAYLD_FMT_INVAL = 0x99,
155 MQTT_VHDR_RC_RETAIN_UNSUPPORTED = 0x9A,
156 MQTT_VHDR_RC_QOS_UNSUPPORTED = 0x9B,
157 MQTT_VHDR_RC_USE_ANOTHER_SRV = 0x9C,
158 MQTT_VHDR_RC_SRV_MOVED = 0x9D,
159 MQTT_VHDR_RC_SHARED_SUB_UNSUPPORTED = 0x9E,
160 MQTT_VHDR_RC_CONN_RATE_EXCEEDED = 0x9F,
161 MQTT_VHDR_RC_MAX_CONN_TIME = 0xA0,
162 MQTT_VHDR_RC_SUB_ID_UNSUPPORTED = 0xA1,
163 MQTT_VHDR_RC_WILD_SUB_UNSUPPORTED = 0xA2,
166#define RESPONSE_WAIT_TIMEOUT (CLOCK_SECOND * 10)
168#define INCREMENT_MID(conn) (conn)->mid_counter += 2
169#define MQTT_STRING_LENGTH(s) (((s)->length) == 0 ? 0 : (MQTT_STRING_LEN_SIZE + (s)->length))
172#define PT_MQTT_WRITE_BYTES(conn, data, len) \
173 conn->out_write_pos = 0; \
174 while(write_bytes(conn, data, len)) { \
175 PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \
178#define PT_MQTT_WRITE_BYTE(conn, data) \
179 while(write_byte(conn, data)) { \
180 PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \
189#define PT_MQTT_WAIT_SEND() \
191 if (PROCESS_ERR_OK == \
192 process_post(PROCESS_CURRENT(), mqtt_continue_send_event, NULL)) { \
194 PROCESS_WAIT_EVENT(); \
195 if(ev == mqtt_abort_now_event) { \
196 conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE; \
197 PT_INIT(&conn->out_proto_thread); \
198 process_post(PROCESS_CURRENT(), ev, data); \
199 } else if(ev >= mqtt_event_min && ev <= mqtt_event_max) { \
200 process_post(PROCESS_CURRENT(), ev, data); \
202 } while (ev != mqtt_continue_send_event); \
206static process_event_t mqtt_do_connect_tcp_event;
207static process_event_t mqtt_do_connect_mqtt_event;
208static process_event_t mqtt_do_disconnect_mqtt_event;
209static process_event_t mqtt_do_subscribe_event;
210static process_event_t mqtt_do_unsubscribe_event;
211static process_event_t mqtt_do_publish_event;
212static process_event_t mqtt_do_pingreq_event;
213static process_event_t mqtt_continue_send_event;
214static process_event_t mqtt_abort_now_event;
215static process_event_t mqtt_do_auth_event;
216process_event_t mqtt_update_event;
223static process_event_t mqtt_event_min;
224static process_event_t mqtt_event_max;
228tcp_input(
struct tcp_socket *s,
void *ptr,
const uint8_t *input_data_ptr,
231static void tcp_event(
struct tcp_socket *s,
void *ptr,
232 tcp_socket_event_t event);
234static void reset_packet(
struct mqtt_in_packet *packet);
238PROCESS(mqtt_process,
"MQTT process");
241call_event(
struct mqtt_connection *conn,
245 conn->event_callback(conn, event, data);
246 process_post(conn->app_process, mqtt_update_event, NULL);
250reset_defaults(
struct mqtt_connection *conn)
252 conn->mid_counter = 1;
253 PT_INIT(&conn->out_proto_thread);
254 conn->waiting_for_pingresp = 0;
256 reset_packet(&conn->in_packet);
257 conn->out_buffer_sent = 0;
261abort_connection(
struct mqtt_connection *conn)
263 conn->out_buffer_ptr = conn->out_buffer;
264 conn->out_queue_full = 0;
267 memset(&conn->out_packet, 0,
sizeof(conn->out_packet));
269 tcp_socket_close(&conn->socket);
270 tcp_socket_unregister(&conn->socket);
272 memset(&conn->socket, 0,
sizeof(conn->socket));
274 conn->state = MQTT_CONN_STATE_NOT_CONNECTED;
278connect_tcp(
struct mqtt_connection *conn)
280 conn->state = MQTT_CONN_STATE_TCP_CONNECTING;
282 reset_defaults(conn);
283 tcp_socket_register(&(conn->socket),
286 MQTT_TCP_INPUT_BUFF_SIZE,
288 MQTT_TCP_OUTPUT_BUFF_SIZE,
291 tcp_socket_connect(&(conn->socket), &(conn->server_ip), conn->server_port);
295disconnect_tcp(
struct mqtt_connection *conn)
297 conn->state = MQTT_CONN_STATE_DISCONNECTING;
298 tcp_socket_close(&(conn->socket));
299 tcp_socket_unregister(&conn->socket);
301 memset(&conn->socket, 0,
sizeof(conn->socket));
305send_out_buffer(
struct mqtt_connection *conn)
307 if(conn->out_buffer_ptr - conn->out_buffer == 0) {
308 conn->out_buffer_sent = 1;
311 conn->out_buffer_sent = 0;
313 DBG(
"MQTT - (send_out_buffer) Space used in buffer: %i\n",
314 conn->out_buffer_ptr - conn->out_buffer);
316 tcp_socket_send(&conn->socket, conn->out_buffer,
317 conn->out_buffer_ptr - conn->out_buffer);
321string_to_mqtt_string(
struct mqtt_string *mqtt_string,
char *
string)
323 if(mqtt_string == NULL) {
326 mqtt_string->string = string;
329 mqtt_string->length = strlen(
string);
331 mqtt_string->length = 0;
336write_byte(
struct mqtt_connection *conn, uint8_t data)
338 DBG(
"MQTT - (write_byte) buff_size: %i write: '%02X'\n",
339 &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr,
342 if(&conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr == 0) {
343 send_out_buffer(conn);
347 *conn->out_buffer_ptr = data;
348 conn->out_buffer_ptr++;
353write_bytes(
struct mqtt_connection *conn, uint8_t *data, uint16_t len)
355 uint16_t write_bytes;
357 MIN(&conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr,
358 len - conn->out_write_pos);
360 memcpy(conn->out_buffer_ptr, &data[conn->out_write_pos], write_bytes);
361 conn->out_write_pos += write_bytes;
362 conn->out_buffer_ptr += write_bytes;
364 DBG(
"MQTT - (write_bytes) len: %u write_pos: %i\n", len,
365 conn->out_write_pos);
367 if(len - conn->out_write_pos == 0) {
368 conn->out_write_pos = 0;
371 send_out_buffer(conn);
372 return len - conn->out_write_pos;
377mqtt_decode_var_byte_int(
const uint8_t *input_data_ptr,
380 uint32_t *pkt_byte_count,
383 uint8_t read_bytes = 0;
385 uint8_t multiplier = 1;
386 uint32_t input_pos_0 = 0;
388 if(input_pos == NULL) {
389 input_pos = &input_pos_0;
393 if(*input_pos >= input_data_len) {
397 byte_in = input_data_ptr[*input_pos];
403 DBG(
"MQTT - Read Variable Byte Integer byte %i\n", byte_in);
406 DBG(
"Received more than 4 byte 'Variable Byte Integer'.");
410 *dest += (byte_in & 127) * multiplier;
412 }
while((byte_in & 128) != 0);
418mqtt_encode_var_byte_int(uint8_t *vbi_out,
424 DBG(
"MQTT - Encoding Variable Byte Integer %u\n", val);
431 digit = digit | 0x80;
434 vbi_out[*vbi_bytes] = digit;
436 DBG(
"MQTT - Encode VBI digit '%u' length '%i'\n", digit, val);
437 }
while(val > 0 && *vbi_bytes < 5);
438 DBG(
"MQTT - var_byte_int bytes %u\n", *vbi_bytes);
442keep_alive_callback(
void *ptr)
444 struct mqtt_connection *conn = ptr;
446 DBG(
"MQTT - (keep_alive_callback) Called!\n");
449 if(conn->waiting_for_pingresp) {
450 PRINTF(
"MQTT - Disconnect due to no PINGRESP from broker.\n");
451 disconnect_tcp(conn);
455 process_post(&mqtt_process, mqtt_do_pingreq_event, conn);
459reset_packet(
struct mqtt_in_packet *packet)
461 memset(packet, 0,
sizeof(
struct mqtt_in_packet));
466PT_THREAD(write_out_props(
struct pt *pt,
struct mqtt_connection *conn,
467 struct mqtt_prop_list *prop_list))
471 static struct mqtt_prop_out_property *prop;
474 DBG(
"MQTT - Writing %i property bytes\n", prop_list->properties_len + prop_list->properties_len_enc_bytes);
476 PT_MQTT_WRITE_BYTES(conn,
477 prop_list->properties_len_enc,
478 prop_list->properties_len_enc_bytes);
480 prop = (
struct mqtt_prop_out_property *)
list_head(prop_list->props);
483 DBG(
"MQTT - Property ID %i len %i\n", prop->id, prop->property_len);
484 PT_MQTT_WRITE_BYTE(conn, prop->id);
485 PT_MQTT_WRITE_BYTES(conn,
490 }
while(prop != NULL);
493 DBG(
"MQTT - No properties to write\n");
494 PT_MQTT_WRITE_BYTE(conn, 0);
502PT_THREAD(connect_pt(
struct pt *pt,
struct mqtt_connection *conn))
507 static struct mqtt_prop_list *will_props = MQTT_PROP_LIST_NONE;
508 if(conn->will.properties) {
509 will_props = (
struct mqtt_prop_list *)
list_head(conn->will.properties);
513 DBG(
"MQTT - Sending CONNECT message...\n");
516 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_CONNECT;
517 conn->out_packet.remaining_length = 0;
518 conn->out_packet.remaining_length += MQTT_CONNECT_VHDR_SIZE;
519 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->client_id);
520#if (MQTT_PROTOCOL_VERSION > MQTT_PROTOCOL_VERSION_3_1) && MQTT_SRV_SUPPORTS_EMPTY_CLIENT_ID
522 if(MQTT_STRING_LENGTH(&conn->client_id) == 0) {
523 conn->out_packet.remaining_length += 2;
526 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->credentials.username);
527 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->credentials.password);
528 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->will.topic);
529 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->will.message);
533 conn->out_packet.remaining_length +=
534 conn->out_props ? (conn->out_props->properties_len + conn->out_props->properties_len_enc_bytes)
538 if(conn->connect_vhdr_flags & MQTT_VHDR_WILL_FLAG) {
539 conn->out_packet.remaining_length +=
540 will_props ? will_props->properties_len + will_props->properties_len_enc_bytes
545 mqtt_encode_var_byte_int(conn->out_packet.remaining_length_enc,
546 &conn->out_packet.remaining_length_enc_bytes,
547 conn->out_packet.remaining_length);
548 if(conn->out_packet.remaining_length_enc_bytes > 4) {
549 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
550 PRINTF(
"MQTT - Error, remaining length > 4 bytes\n");
555 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
556 PT_MQTT_WRITE_BYTES(conn,
557 conn->out_packet.remaining_length_enc,
558 conn->out_packet.remaining_length_enc_bytes);
559 PT_MQTT_WRITE_BYTE(conn, 0);
560 PT_MQTT_WRITE_BYTE(conn, strlen(MQTT_PROTOCOL_NAME));
561 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)MQTT_PROTOCOL_NAME, strlen(MQTT_PROTOCOL_NAME));
562 PT_MQTT_WRITE_BYTE(conn, MQTT_PROTOCOL_VERSION);
563 PT_MQTT_WRITE_BYTE(conn, conn->connect_vhdr_flags);
564 PT_MQTT_WRITE_BYTE(conn, (conn->keep_alive >> 8));
565 PT_MQTT_WRITE_BYTE(conn, (conn->keep_alive & 0x00FF));
569 write_out_props(pt, conn, conn->out_props);
573 PT_MQTT_WRITE_BYTE(conn, conn->client_id.length >> 8);
574 PT_MQTT_WRITE_BYTE(conn, conn->client_id.length & 0x00FF);
575 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->client_id.string,
576 conn->client_id.length);
578 if(conn->connect_vhdr_flags & MQTT_VHDR_WILL_FLAG) {
581 DBG(
"MQTT - Writing will properties\n");
582 write_out_props(pt, conn, will_props);
584 PT_MQTT_WRITE_BYTE(conn, conn->will.topic.length >> 8);
585 PT_MQTT_WRITE_BYTE(conn, conn->will.topic.length & 0x00FF);
586 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->will.topic.string,
587 conn->will.topic.length);
588 PT_MQTT_WRITE_BYTE(conn, conn->will.message.length >> 8);
589 PT_MQTT_WRITE_BYTE(conn, conn->will.message.length & 0x00FF);
590 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->will.message.string,
591 conn->will.message.length);
592 DBG(
"MQTT - Setting will topic to '%s' %u bytes and message to '%s' %u bytes\n",
593 conn->will.topic.string,
594 conn->will.topic.length,
595 conn->will.message.string,
596 conn->will.message.length);
598 if(conn->connect_vhdr_flags & MQTT_VHDR_USERNAME_FLAG) {
599 PT_MQTT_WRITE_BYTE(conn, conn->credentials.username.length >> 8);
600 PT_MQTT_WRITE_BYTE(conn, conn->credentials.username.length & 0x00FF);
601 PT_MQTT_WRITE_BYTES(conn,
602 (uint8_t *)conn->credentials.username.string,
603 conn->credentials.username.length);
605 if(conn->connect_vhdr_flags & MQTT_VHDR_PASSWORD_FLAG) {
606 PT_MQTT_WRITE_BYTE(conn, conn->credentials.password.length >> 8);
607 PT_MQTT_WRITE_BYTE(conn, conn->credentials.password.length & 0x00FF);
608 PT_MQTT_WRITE_BYTES(conn,
609 (uint8_t *)conn->credentials.password.string,
610 conn->credentials.password.length);
614 send_out_buffer(conn);
615 conn->state = MQTT_CONN_STATE_CONNECTING_TO_BROKER;
617 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
620 reset_packet(&conn->in_packet);
621 PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
624 DBG(
"Timeout waiting for CONNACK\n");
632 reset_packet(&conn->in_packet);
634 DBG(
"MQTT - Done sending CONNECT\n");
637 DBG(
"MQTT - CONNECT message sent: \n");
639 for(i = 0; i < (conn->out_buffer_ptr - conn->out_buffer); i++) {
640 DBG(
"%02X ", conn->out_buffer[i]);
649PT_THREAD(disconnect_pt(
struct pt *pt,
struct mqtt_connection *conn))
653 PT_MQTT_WRITE_BYTE(conn, MQTT_FHDR_MSG_TYPE_DISCONNECT);
654 PT_MQTT_WRITE_BYTE(conn, 0);
658 write_out_props(pt, conn, conn->out_props);
661 send_out_buffer(conn);
675PT_THREAD(subscribe_pt(
struct pt *pt,
struct mqtt_connection *conn))
679 DBG(
"MQTT - Sending subscribe message! topic %s topic_length %i\n",
680 conn->out_packet.topic,
681 conn->out_packet.topic_length);
682 DBG(
"MQTT - Buffer space is %i \n",
683 &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
686 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_SUBSCRIBE | MQTT_FHDR_QOS_LEVEL_1;
687 conn->out_packet.remaining_length = MQTT_MID_SIZE +
688 MQTT_STRING_LEN_SIZE +
689 conn->out_packet.topic_length +
693 conn->out_packet.remaining_length +=
694 conn->out_props ? (conn->out_props->properties_len + conn->out_props->properties_len_enc_bytes)
698 mqtt_encode_var_byte_int(conn->out_packet.remaining_length_enc,
699 &conn->out_packet.remaining_length_enc_bytes,
700 conn->out_packet.remaining_length);
701 if(conn->out_packet.remaining_length_enc_bytes > 4) {
702 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
703 PRINTF(
"MQTT - Error, remaining length > 4 bytes\n");
708 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
709 PT_MQTT_WRITE_BYTES(conn,
710 conn->out_packet.remaining_length_enc,
711 conn->out_packet.remaining_length_enc_bytes);
713 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid >> 8));
714 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
718 write_out_props(pt, conn, conn->out_props);
722 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
723 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
724 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
725 conn->out_packet.topic_length);
728 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.sub_options);
730 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.qos);
734 send_out_buffer(conn);
735 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
738 reset_packet(&conn->in_packet);
739 PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
743 DBG(
"Timeout waiting for SUBACK\n");
745 reset_packet(&conn->in_packet);
748 conn->out_queue_full = 0;
750 DBG(
"MQTT - Done in send_subscribe!\n");
756PT_THREAD(unsubscribe_pt(
struct pt *pt,
struct mqtt_connection *conn))
760 DBG(
"MQTT - Sending unsubscribe message on topic %s topic_length %i\n",
761 conn->out_packet.topic,
762 conn->out_packet.topic_length);
763 DBG(
"MQTT - Buffer space is %i \n",
764 &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
767 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_UNSUBSCRIBE |
768 MQTT_FHDR_QOS_LEVEL_1;
769 conn->out_packet.remaining_length = MQTT_MID_SIZE +
770 MQTT_STRING_LEN_SIZE +
771 conn->out_packet.topic_length;
774 conn->out_packet.remaining_length +=
775 conn->out_props ? (conn->out_props->properties_len + conn->out_props->properties_len_enc_bytes)
779 mqtt_encode_var_byte_int(conn->out_packet.remaining_length_enc,
780 &conn->out_packet.remaining_length_enc_bytes,
781 conn->out_packet.remaining_length);
782 if(conn->out_packet.remaining_length_enc_bytes > 4) {
783 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
784 PRINTF(
"MQTT - Error, remaining length > 4 bytes\n");
789 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
790 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc,
791 conn->out_packet.remaining_length_enc_bytes);
794 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid >> 8));
795 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
798 write_out_props(pt, conn, conn->out_props);
802 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
803 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
804 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
805 conn->out_packet.topic_length);
808 send_out_buffer(conn);
809 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
812 reset_packet(&conn->in_packet);
813 PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
817 DBG(
"Timeout waiting for UNSUBACK\n");
820 reset_packet(&conn->in_packet);
823 conn->out_queue_full = 0;
825 DBG(
"MQTT - Done writing subscribe message to out buffer!\n");
831PT_THREAD(publish_pt(
struct pt *pt,
struct mqtt_connection *conn))
835 DBG(
"MQTT - Sending publish message! topic %s topic_length %i\n",
836 conn->out_packet.topic,
837 conn->out_packet.topic_length);
838 DBG(
"MQTT - Buffer space is %i \n",
839 &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
842 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_PUBLISH |
843 conn->out_packet.qos << 1;
844 if(conn->out_packet.retain == MQTT_RETAIN_ON) {
845 conn->out_packet.fhdr |= MQTT_FHDR_RETAIN_FLAG;
847 conn->out_packet.remaining_length = MQTT_STRING_LEN_SIZE +
848 conn->out_packet.topic_length +
849 conn->out_packet.payload_size;
850 if(conn->out_packet.qos > MQTT_QOS_LEVEL_0) {
851 conn->out_packet.remaining_length += MQTT_MID_SIZE;
855 conn->out_packet.remaining_length +=
856 conn->out_props ? (conn->out_props->properties_len + conn->out_props->properties_len_enc_bytes)
860 mqtt_encode_var_byte_int(conn->out_packet.remaining_length_enc,
861 &conn->out_packet.remaining_length_enc_bytes,
862 conn->out_packet.remaining_length);
863 if(conn->out_packet.remaining_length_enc_bytes > 4) {
864 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
865 PRINTF(
"MQTT - Error, remaining length > 4 bytes\n");
870 if(conn->out_packet.qos == MQTT_QOS_LEVEL_0) {
871 conn->out_packet.fhdr &= ~MQTT_FHDR_DUP_FLAG;
875 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
876 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc,
877 conn->out_packet.remaining_length_enc_bytes);
879 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
880 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
881 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
882 conn->out_packet.topic_length);
883 if(conn->out_packet.qos > MQTT_QOS_LEVEL_0) {
884 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid >> 8));
885 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
890 write_out_props(pt, conn, conn->out_props);
894 PT_MQTT_WRITE_BYTES(conn,
895 conn->out_packet.payload,
896 conn->out_packet.payload_size);
898 send_out_buffer(conn);
899 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
907 if(conn->out_packet.qos == 0) {
908 process_post(conn->app_process, mqtt_update_event, NULL);
909 }
else if(conn->out_packet.qos == 1) {
911 reset_packet(&conn->in_packet);
912 PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
915 DBG(
"Timeout waiting for PUBACK\n");
917 if(conn->in_packet.mid != conn->out_packet.mid) {
918 DBG(
"MQTT - Warning, got PUBACK with none matching MID. Currently there "
919 "is no support for several concurrent PUBLISH messages.\n");
921 }
else if(conn->out_packet.qos == 2) {
922 DBG(
"MQTT - QoS not implemented yet.\n");
926 reset_packet(&conn->in_packet);
929 conn->out_queue_full = 0;
931 DBG(
"MQTT - Publish Enqueued\n");
937PT_THREAD(pingreq_pt(
struct pt *pt,
struct mqtt_connection *conn))
941 DBG(
"MQTT - Sending PINGREQ\n");
944 PT_MQTT_WRITE_BYTE(conn, MQTT_FHDR_MSG_TYPE_PINGREQ);
945 PT_MQTT_WRITE_BYTE(conn, 0);
947 send_out_buffer(conn);
950 conn->waiting_for_pingresp = 1;
953 reset_packet(&conn->in_packet);
954 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
958 reset_packet(&conn->in_packet);
960 conn->waiting_for_pingresp = 0;
967PT_THREAD(auth_pt(
struct pt *pt,
struct mqtt_connection *conn))
971 conn->out_packet.remaining_length +=
972 conn->out_props ? (conn->out_props->properties_len + conn->out_props->properties_len_enc_bytes)
975 mqtt_encode_var_byte_int(conn->out_packet.remaining_length_enc,
976 &conn->out_packet.remaining_length_enc_bytes,
977 conn->out_packet.remaining_length);
979 if(conn->out_packet.remaining_length_enc_bytes > 4) {
980 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
981 PRINTF(
"MQTT - Error, remaining length > 4 bytes\n");
986 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
987 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc,
988 conn->out_packet.remaining_length_enc_bytes);
991 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.auth_reason_code);
994 write_out_props(pt, conn, conn->out_props);
997 send_out_buffer(conn);
1006handle_connack(
struct mqtt_connection *conn)
1008 struct mqtt_connack_event connack_event;
1010 DBG(
"MQTT - Got CONNACK\n");
1012#if MQTT_PROTOCOL_VERSION <= MQTT_PROTOCOL_VERSION_3_1_1
1013 if(conn->in_packet.remaining_length != 2) {
1014 PRINTF(
"MQTT - CONNACK VHDR remaining length %i incorrect\n",
1015 conn->in_packet.remaining_length);
1019 abort_connection(conn);
1023 if(conn->in_packet.payload[1] != 0) {
1024 PRINTF(
"MQTT - Connection refused with Return Code %i\n",
1025 conn->in_packet.payload[1]);
1027 MQTT_EVENT_CONNECTION_REFUSED_ERROR,
1028 &conn->in_packet.payload[1]);
1029 abort_connection(conn);
1034 conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
1036#if MQTT_PROTOCOL_VERSION >= MQTT_PROTOCOL_VERSION_3_1_1
1037 connack_event.session_present = conn->in_packet.payload[0] & MQTT_VHDR_CONNACK_SESSION_PRESENT;
1040#if MQTT_PROTOCOL_VERSION >= MQTT_PROTOCOL_VERSION_5
1046 if(conn->in_packet.remaining_length < 3) {
1047 PRINTF(
"MQTT - CONNACK VHDR remaining length %i incorrect\n",
1048 conn->in_packet.remaining_length);
1052 abort_connection(conn);
1055 mqtt_prop_parse_connack_props(conn);
1059 keep_alive_callback, conn);
1062 conn->state = MQTT_CONN_STATE_CONNECTED_TO_BROKER;
1063 call_event(conn, MQTT_EVENT_CONNECTED, &connack_event);
1067handle_pingresp(
struct mqtt_connection *conn)
1069 DBG(
"MQTT - Got PINGRESP\n");
1073handle_suback(
struct mqtt_connection *conn)
1075 struct mqtt_suback_event suback_event;
1077 DBG(
"MQTT - Got SUBACK\n");
1081 if(conn->in_packet.remaining_length > MQTT_MID_SIZE +
1082 MQTT_MAX_TOPICS_PER_SUBSCRIBE * MQTT_QOS_SIZE +
1083 conn->in_packet.properties_len + conn->in_packet.properties_enc_len) {
1085 if(conn->in_packet.remaining_length > MQTT_MID_SIZE +
1086 MQTT_MAX_TOPICS_PER_SUBSCRIBE * MQTT_QOS_SIZE) {
1088 DBG(
"MQTT - Error, SUBACK with > 1 topic, not supported.\n");
1091 conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
1093 suback_event.mid = conn->in_packet.mid;
1096 suback_event.success = 0;
1098 switch(conn->in_packet.payload_start[0]) {
1099 case MQTT_SUBACK_RET_FAIL:
1100 PRINTF(
"MQTT - Error, SUBSCRIBE failed with SUBACK return code '%x'", conn->in_packet.payload_start[0]);
1103 case MQTT_SUBACK_RET_QOS_0:
1104 case MQTT_SUBACK_RET_QOS_1:
1105 case MQTT_SUBACK_RET_QOS_2:
1106 suback_event.qos_level = conn->in_packet.payload_start[0] & 0x03;
1107 suback_event.success = 1;
1111 PRINTF(
"MQTT - Error, Unrecognised SUBACK return code '%x'", conn->in_packet.payload_start[0]);
1115 suback_event.return_code = conn->in_packet.payload_start[0];
1117 suback_event.qos_level = conn->in_packet.payload_start[0];
1120 if(conn->in_packet.mid != conn->out_packet.mid) {
1121 DBG(
"MQTT - Warning, got SUBACK with none matching MID. Currently there is"
1122 "no support for several concurrent SUBSCRIBE messages.\n");
1126 call_event(conn, MQTT_EVENT_SUBACK, &suback_event);
1130handle_unsuback(
struct mqtt_connection *conn)
1132 DBG(
"MQTT - Got UNSUBACK\n");
1134 conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
1136 if(conn->in_packet.mid != conn->out_packet.mid) {
1137 DBG(
"MQTT - Warning, got UNSUBACK with none matching MID. Currently there is"
1138 "no support for several concurrent UNSUBSCRIBE messages.\n");
1141 call_event(conn, MQTT_EVENT_UNSUBACK, &conn->in_packet.mid);
1145handle_puback(
struct mqtt_connection *conn)
1147 DBG(
"MQTT - Got PUBACK\n");
1149 conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
1151 call_event(conn, MQTT_EVENT_PUBACK, &conn->in_packet.mid);
1154static mqtt_pub_status_t
1155handle_publish(
struct mqtt_connection *conn)
1157 DBG(
"MQTT - Got PUBLISH, called once per manageable chunk of message.\n");
1158 DBG(
"MQTT - Handling publish on topic '%s'\n", conn->in_publish_msg.topic);
1160#if MQTT_PROTOCOL_VERSION >= MQTT_PROTOCOL_VERSION_3_1_1
1161 if(strlen(conn->in_publish_msg.topic) < conn->in_packet.topic_len) {
1162 DBG(
"NULL detected in received PUBLISH topic\n");
1168 return MQTT_PUBLISH_ERR;
1172 DBG(
"MQTT - This chunk is %i bytes\n", conn->in_publish_msg.payload_chunk_length);
1174 if(((conn->in_packet.fhdr & 0x09) >> 1) != 0) {
1175 PRINTF(
"MQTT - Error, got incoming PUBLISH with QoS > 0, not supported atm!\n");
1178 call_event(conn, MQTT_EVENT_PUBLISH, &conn->in_publish_msg);
1180 if(conn->in_publish_msg.first_chunk == 1) {
1181 conn->in_publish_msg.first_chunk = 0;
1185 if(conn->in_publish_msg.payload_left == 0) {
1190 DBG(
"MQTT - (handle_publish) resetting packet.\n");
1191 reset_packet(&conn->in_packet);
1194 return MQTT_PUBLISH_OK;
1198parse_publish_vhdr(
struct mqtt_connection *conn,
1200 const uint8_t *input_data_ptr,
1203 uint16_t copy_bytes;
1206 if(conn->in_packet.topic_len_received == 0) {
1207 conn->in_packet.topic_pos = 0;
1208 conn->in_packet.topic_len = (input_data_ptr[(*pos)++] << 8);
1209 conn->in_packet.byte_counter++;
1210 if(*pos >= input_data_len) {
1213 conn->in_packet.topic_len |= input_data_ptr[(*pos)++];
1214 conn->in_packet.byte_counter++;
1215 conn->in_packet.topic_len_received = 1;
1217 if(conn->in_packet.topic_len > MQTT_MAX_TOPIC_LENGTH) {
1218 DBG(
"MQTT - topic too long %u/%u\n", conn->in_packet.topic_len, MQTT_MAX_TOPIC_LENGTH);
1221 DBG(
"MQTT - Read PUBLISH topic len %i\n", conn->in_packet.topic_len);
1226 if(conn->in_packet.topic_len_received == 1 &&
1227 conn->in_packet.topic_received == 0) {
1228 copy_bytes = MIN(conn->in_packet.topic_len - conn->in_packet.topic_pos,
1229 input_data_len - *pos);
1230 DBG(
"MQTT - topic_pos: %i copy_bytes: %i\n", conn->in_packet.topic_pos,
1232 memcpy(&conn->in_publish_msg.topic[conn->in_packet.topic_pos],
1233 &input_data_ptr[*pos],
1235 (*pos) += copy_bytes;
1236 conn->in_packet.byte_counter += copy_bytes;
1237 conn->in_packet.topic_pos += copy_bytes;
1239 if(conn->in_packet.topic_len - conn->in_packet.topic_pos == 0) {
1240 DBG(
"MQTT - Got topic '%s'", conn->in_publish_msg.topic);
1241 conn->in_packet.topic_received = 1;
1242 conn->in_publish_msg.topic[conn->in_packet.topic_pos] =
'\0';
1243 conn->in_publish_msg.payload_length =
1244 conn->in_packet.remaining_length - conn->in_packet.topic_len - 2;
1245 conn->in_publish_msg.payload_left = conn->in_publish_msg.payload_length;
1249 conn->in_publish_msg.first_chunk = 1;
1256handle_disconnect(
struct mqtt_connection *conn)
1258 DBG(
"MQTT - (handle_disconnect) Got DISCONNECT.\n");
1259 call_event(conn, MQTT_EVENT_DISCONNECTED, NULL);
1260 abort_connection(conn);
1264handle_auth(
struct mqtt_connection *conn)
1266 struct mqtt_prop_auth_event event;
1268 DBG(
"MQTT - (handle_auth) Got AUTH.\n");
1270 if((conn->in_packet.fhdr & 0x0F) != 0x0) {
1274 abort_connection(conn);
1279 if(conn->state == MQTT_CONN_STATE_CONNECTING_TO_BROKER &&
1280 (!conn->in_packet.has_reason_code ||
1281 conn->in_packet.reason_code != MQTT_VHDR_RC_CONTINUE_AUTH)) {
1282 DBG(
"MQTT - (handle_auth) Not reauth - Reason Code 0x18 expected!\n");
1285 mqtt_prop_parse_auth_props(conn, &event);
1286 call_event(conn, MQTT_EVENT_AUTH, &event);
1291parse_vhdr(
struct mqtt_connection *conn)
1293 conn->in_packet.payload_start = conn->in_packet.payload;
1296 switch(conn->in_packet.fhdr & 0xF0) {
1297 case MQTT_FHDR_MSG_TYPE_PUBACK:
1298 case MQTT_FHDR_MSG_TYPE_SUBACK:
1299 case MQTT_FHDR_MSG_TYPE_UNSUBACK:
1300 conn->in_packet.mid = (conn->in_packet.payload[0] << 8) |
1301 (conn->in_packet.payload[1]);
1302 conn->in_packet.payload_start += 2;
1316 switch(conn->in_packet.fhdr & 0xF0) {
1317 case MQTT_FHDR_MSG_TYPE_CONNACK:
1318 case MQTT_FHDR_MSG_TYPE_PUBACK:
1319 case MQTT_FHDR_MSG_TYPE_PUBREC:
1320 case MQTT_FHDR_MSG_TYPE_PUBREL:
1321 case MQTT_FHDR_MSG_TYPE_PUBCOMP:
1322 case MQTT_FHDR_MSG_TYPE_DISCONNECT:
1323 case MQTT_FHDR_MSG_TYPE_AUTH:
1324 conn->in_packet.reason_code = conn->in_packet.payload_start[0];
1325 conn->in_packet.has_reason_code = 1;
1326 conn->in_packet.payload_start += 1;
1330 conn->in_packet.has_reason_code = 0;
1334 if(!conn->in_packet.has_props) {
1335 mqtt_prop_decode_input_props(conn);
1341tcp_input(
struct tcp_socket *s,
1343 const uint8_t *input_data_ptr,
1346 struct mqtt_connection *conn = ptr;
1348 uint32_t copy_bytes = 0;
1349 mqtt_pub_status_t pub_status;
1350 uint8_t remaining_length_bytes;
1352 if(input_data_len == 0) {
1356 if(conn->in_packet.packet_received) {
1357 reset_packet(&conn->in_packet);
1360 DBG(
"tcp_input with %i bytes of data:\n", input_data_len);
1363 if(!conn->in_packet.fhdr) {
1364 conn->in_packet.fhdr = input_data_ptr[pos++];
1365 conn->in_packet.byte_counter++;
1367 DBG(
"MQTT - Read VHDR '%02X'\n", conn->in_packet.fhdr);
1369 if(pos >= input_data_len) {
1375 if(!conn->in_packet.has_remaining_length) {
1376 remaining_length_bytes =
1377 mqtt_decode_var_byte_int(input_data_ptr, input_data_len, &pos,
1378 &conn->in_packet.byte_counter,
1379 &conn->in_packet.remaining_length);
1381 if(remaining_length_bytes == 0) {
1382 call_event(conn, MQTT_EVENT_ERROR, NULL);
1386 DBG(
"MQTT - Finished reading remaining length byte\n");
1387 conn->in_packet.has_remaining_length = 1;
1396 if((conn->in_packet.remaining_length > MQTT_INPUT_BUFF_SIZE) &&
1397 (conn->in_packet.fhdr & 0xF0) != MQTT_FHDR_MSG_TYPE_PUBLISH) {
1399 PRINTF(
"MQTT - Error, unsupported payload size for non-PUBLISH message\n");
1401 conn->in_packet.byte_counter += input_data_len;
1402 if(conn->in_packet.byte_counter >=
1403 (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)) {
1404 conn->in_packet.packet_received = 1;
1415 while(conn->in_packet.byte_counter <
1416 (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)) {
1418 if((conn->in_packet.fhdr & 0xF0) == MQTT_FHDR_MSG_TYPE_PUBLISH &&
1419 conn->in_packet.topic_received == 0) {
1420 parse_publish_vhdr(conn, &pos, input_data_ptr, input_data_len);
1424 copy_bytes = MIN(input_data_len - pos,
1425 MQTT_INPUT_BUFF_SIZE - conn->in_packet.payload_pos);
1426 DBG(
"- Copied %i payload bytes\n", copy_bytes);
1427 memcpy(&conn->in_packet.payload[conn->in_packet.payload_pos],
1428 &input_data_ptr[pos],
1430 conn->in_packet.byte_counter += copy_bytes;
1431 conn->in_packet.payload_pos += copy_bytes;
1436 DBG(
"MQTT - Copied bytes: \n");
1437 for(i = 0; i < copy_bytes; i++) {
1438 DBG(
"%02X ", conn->in_packet.payload[i]);
1444 if(MQTT_INPUT_BUFF_SIZE - conn->in_packet.payload_pos == 0) {
1445 conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1446 conn->in_publish_msg.payload_chunk_length = MQTT_INPUT_BUFF_SIZE;
1447 conn->in_publish_msg.payload_left -= MQTT_INPUT_BUFF_SIZE;
1450 if(!conn->in_packet.has_props) {
1451 mqtt_prop_decode_input_props(conn);
1454 if(conn->in_publish_msg.first_chunk) {
1455 conn->in_publish_msg.payload_chunk_length -= conn->in_packet.properties_len +
1456 conn->in_packet.properties_enc_len;
1459 conn->in_publish_msg.payload_chunk += conn->in_packet.properties_len +
1460 conn->in_packet.properties_enc_len;
1464 pub_status = handle_publish(conn);
1466 conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1467 conn->in_packet.payload_pos = 0;
1469 if(pub_status != MQTT_PUBLISH_OK) {
1474 if(pos >= input_data_len &&
1475 (conn->in_packet.byte_counter < (MQTT_FHDR_SIZE + conn->in_packet.remaining_length))) {
1485 DBG(
"MQTT - Finished reading packet!\n");
1487 DBG(
"MQTT - total data was %i bytes of data. \n",
1488 (MQTT_FHDR_SIZE + conn->in_packet.remaining_length));
1491 if(conn->in_packet.has_reason_code &&
1492 conn->in_packet.reason_code >= MQTT_VHDR_RC_UNSPEC_ERR) {
1493 PRINTF(
"MQTT - Reason Code indicated error %i\n",
1494 conn->in_packet.reason_code);
1498 abort_connection(conn);
1504 switch(conn->in_packet.fhdr & 0xF0) {
1505 case MQTT_FHDR_MSG_TYPE_CONNACK:
1506 handle_connack(conn);
1508 case MQTT_FHDR_MSG_TYPE_PUBLISH:
1510 conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1511 conn->in_publish_msg.payload_chunk_length = conn->in_packet.payload_pos;
1512 conn->in_publish_msg.payload_left = 0;
1514 DBG(
"MQTT - First chunk? %i\n", conn->in_publish_msg.first_chunk);
1516 if(conn->in_publish_msg.first_chunk) {
1517 conn->in_publish_msg.payload_chunk_length -= conn->in_packet.properties_len +
1518 conn->in_packet.properties_enc_len;
1520 conn->in_publish_msg.payload_chunk += conn->in_packet.properties_len +
1521 conn->in_packet.properties_enc_len;
1524 (void)handle_publish(conn);
1526 case MQTT_FHDR_MSG_TYPE_PUBACK:
1527 handle_puback(conn);
1529 case MQTT_FHDR_MSG_TYPE_SUBACK:
1530 handle_suback(conn);
1532 case MQTT_FHDR_MSG_TYPE_UNSUBACK:
1533 handle_unsuback(conn);
1535 case MQTT_FHDR_MSG_TYPE_PINGRESP:
1536 handle_pingresp(conn);
1540 case MQTT_FHDR_MSG_TYPE_PUBREC:
1541 case MQTT_FHDR_MSG_TYPE_PUBREL:
1542 case MQTT_FHDR_MSG_TYPE_PUBCOMP:
1543 call_event(conn, MQTT_EVENT_NOT_IMPLEMENTED_ERROR, NULL);
1544 PRINTF(
"MQTT - Got unhandled MQTT Message Type '%i'",
1545 (conn->in_packet.fhdr & 0xF0));
1548#if MQTT_PROTOCOL_VERSION >= MQTT_PROTOCOL_VERSION_5
1549 case MQTT_FHDR_MSG_TYPE_DISCONNECT:
1550 handle_disconnect(conn);
1553 case MQTT_FHDR_MSG_TYPE_AUTH:
1560 PRINTF(
"MQTT - Got MQTT Message Type '%i'", (conn->in_packet.fhdr & 0xF0));
1564 conn->in_packet.packet_received = 1;
1573tcp_event(
struct tcp_socket *s,
void *ptr, tcp_socket_event_t event)
1575 struct mqtt_connection *conn = ptr;
1581 case TCP_SOCKET_CLOSED:
1582 case TCP_SOCKET_TIMEDOUT:
1583 case TCP_SOCKET_ABORTED: {
1585 DBG(
"MQTT - Disconnected by tcp event %d\n", event);
1586 process_post(&mqtt_process, mqtt_abort_now_event, conn);
1587 conn->state = MQTT_CONN_STATE_NOT_CONNECTED;
1589 call_event(conn, MQTT_EVENT_DISCONNECTED, &event);
1590 abort_connection(conn);
1593 if(conn->auto_reconnect == 1) {
1598 case TCP_SOCKET_CONNECTED: {
1599 conn->state = MQTT_CONN_STATE_TCP_CONNECTED;
1600 conn->out_buffer_sent = 1;
1602 process_post(&mqtt_process, mqtt_do_connect_mqtt_event, conn);
1605 case TCP_SOCKET_DATA_SENT: {
1606 DBG(
"MQTT - Got TCP_DATA_SENT\n");
1608 if(conn->socket.output_data_len == 0) {
1609 conn->out_buffer_sent = 1;
1610 conn->out_buffer_ptr = conn->out_buffer;
1618 DBG(
"MQTT - TCP Event %d is currently not managed by the tcp event callback\n",
1626 static struct mqtt_connection *conn;
1633 if(ev == mqtt_abort_now_event) {
1634 DBG(
"MQTT - Abort\n");
1636 conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE;
1638 abort_connection(conn);
1640 if(ev == mqtt_do_connect_tcp_event) {
1642 DBG(
"MQTT - Got mqtt_do_connect_tcp_event!\n");
1645 if(ev == mqtt_do_connect_mqtt_event) {
1647 conn->socket.output_data_max_seg = conn->max_segment_size;
1648 DBG(
"MQTT - Got mqtt_do_connect_mqtt_event!\n");
1650 if(conn->out_buffer_sent == 1) {
1651 PT_INIT(&conn->out_proto_thread);
1652 while(connect_pt(&conn->out_proto_thread, conn) < PT_EXITED &&
1653 conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE) {
1654 PT_MQTT_WAIT_SEND();
1658 if(ev == mqtt_do_disconnect_mqtt_event) {
1660 DBG(
"MQTT - Got mqtt_do_disconnect_mqtt_event!\n");
1663 if(conn->state == MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT) {
1664 if(conn->out_buffer_sent == 1) {
1665 PT_INIT(&conn->out_proto_thread);
1666 while(conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE &&
1667 disconnect_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1668 PT_MQTT_WAIT_SEND();
1670 abort_connection(conn);
1671 call_event(conn, MQTT_EVENT_DISCONNECTED, &ev);
1673 process_post(&mqtt_process, mqtt_do_disconnect_mqtt_event, conn);
1677 if(ev == mqtt_do_pingreq_event) {
1679 DBG(
"MQTT - Got mqtt_do_pingreq_event!\n");
1681 if(conn->out_buffer_sent == 1 &&
1682 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1683 PT_INIT(&conn->out_proto_thread);
1684 while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1685 pingreq_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1686 PT_MQTT_WAIT_SEND();
1690 if(ev == mqtt_do_subscribe_event) {
1692 DBG(
"MQTT - Got mqtt_do_subscribe_mqtt_event!\n");
1694 if(conn->out_buffer_sent == 1 &&
1695 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1696 PT_INIT(&conn->out_proto_thread);
1697 while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1698 subscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1699 PT_MQTT_WAIT_SEND();
1703 if(ev == mqtt_do_unsubscribe_event) {
1705 DBG(
"MQTT - Got mqtt_do_unsubscribe_mqtt_event!\n");
1707 if(conn->out_buffer_sent == 1 &&
1708 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1709 PT_INIT(&conn->out_proto_thread);
1710 while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1711 unsubscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1712 PT_MQTT_WAIT_SEND();
1716 if(ev == mqtt_do_publish_event) {
1718 DBG(
"MQTT - Got mqtt_do_publish_mqtt_event!\n");
1720 if(conn->out_buffer_sent == 1 &&
1721 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1722 PT_INIT(&conn->out_proto_thread);
1723 while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1724 publish_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1725 PT_MQTT_WAIT_SEND();
1730 if(ev == mqtt_do_auth_event) {
1732 DBG(
"MQTT - Got mqtt_do_auth_event!\n");
1734 if(conn->out_buffer_sent == 1) {
1735 PT_INIT(&conn->out_proto_thread);
1736 while(auth_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1737 PT_MQTT_WAIT_SEND();
1742 conn->out_props = NULL;
1751 static uint8_t inited = 0;
1754 mqtt_event_min = mqtt_do_connect_tcp_event;
1764 mqtt_event_max = mqtt_abort_now_event;
1779 uint16_t max_segment_size)
1781#if MQTT_31 || !MQTT_SRV_SUPPORTS_EMPTY_CLIENT_ID
1782 if(strlen(client_id) < 1) {
1783 return MQTT_STATUS_INVALID_ARGS_ERROR;
1788 memset(conn, 0,
sizeof(
struct mqtt_connection));
1791 conn->srv_feature_en = -1;
1793 string_to_mqtt_string(&conn->client_id, client_id);
1794 conn->event_callback = event_callback;
1795 conn->app_process = app_process;
1796 conn->auto_reconnect = 1;
1797 conn->max_segment_size = max_segment_size;
1799 reset_defaults(conn);
1805 DBG(
"MQTT - Registered successfully\n");
1807 return MQTT_STATUS_OK;
1817 uint16_t keep_alive,
1819 uint8_t clean_session,
1820 struct mqtt_prop_list *prop_list)
1822 uint8_t clean_session)
1825 uip_ip6addr_t ip6addr;
1830 if(conn->state > MQTT_CONN_STATE_NOT_CONNECTED) {
1831 return MQTT_STATUS_OK;
1834 conn->server_host = host;
1835 conn->keep_alive = keep_alive;
1836 conn->server_port = port;
1837 conn->out_buffer_ptr = conn->out_buffer;
1838 conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1841 if(clean_session || (conn->client_id.length == 0)) {
1842 conn->connect_vhdr_flags |= MQTT_VHDR_CLEAN_SESSION_FLAG;
1846 if(uiplib_ip6addrconv(host, &ip6addr) == 0) {
1847 return MQTT_STATUS_ERROR;
1858 conn->out_props = prop_list;
1861 process_post(&mqtt_process, mqtt_do_connect_tcp_event, conn);
1863 return MQTT_STATUS_OK;
1869 struct mqtt_prop_list *prop_list)
1874 if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1878 conn->state = MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT;
1881 conn->out_props = prop_list;
1884 process_post(&mqtt_process, mqtt_do_disconnect_mqtt_event, conn);
1890 mqtt_qos_level_t qos_level,
1891 mqtt_nl_en_t nl, mqtt_rap_en_t rap,
1892 mqtt_retain_handling_t ret_handling,
1893 struct mqtt_prop_list *prop_list)
1895 mqtt_qos_level_t qos_level)
1898 if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1899 return MQTT_STATUS_NOT_CONNECTED_ERROR;
1902 DBG(
"MQTT - Call to mqtt_subscribe...\n");
1905 if(conn->out_queue_full) {
1906 DBG(
"MQTT - Not accepted!\n");
1907 return MQTT_STATUS_OUT_QUEUE_FULL;
1909 conn->out_queue_full = 1;
1910 DBG(
"MQTT - Accepted!\n");
1912 conn->out_packet.mid = INCREMENT_MID(conn);
1913 conn->out_packet.topic = topic;
1914 conn->out_packet.topic_length = strlen(topic);
1915 conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1918 *mid = conn->out_packet.mid;
1922 conn->out_packet.sub_options = 0x00;
1923 conn->out_packet.sub_options |= qos_level & MQTT_SUB_OPTION_QOS;
1924 conn->out_packet.sub_options |= nl & MQTT_SUB_OPTION_NL;
1925 conn->out_packet.sub_options |= rap & MQTT_SUB_OPTION_RAP;
1926 conn->out_packet.sub_options |= ret_handling & MQTT_SUB_OPTION_RETAIN_HANDLING;
1928 conn->out_packet.qos = qos_level;
1932 conn->out_props = prop_list;
1935 process_post(&mqtt_process, mqtt_do_subscribe_event, conn);
1936 return MQTT_STATUS_OK;
1943 struct mqtt_prop_list *prop_list)
1948 if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1949 return MQTT_STATUS_NOT_CONNECTED_ERROR;
1952 DBG(
"MQTT - Call to mqtt_unsubscribe...\n");
1954 if(conn->out_queue_full) {
1955 DBG(
"MQTT - Not accepted!\n");
1956 return MQTT_STATUS_OUT_QUEUE_FULL;
1958 conn->out_queue_full = 1;
1959 DBG(
"MQTT - Accepted!\n");
1961 conn->out_packet.mid = INCREMENT_MID(conn);
1962 conn->out_packet.topic = topic;
1963 conn->out_packet.topic_length = strlen(topic);
1964 conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1967 *mid = conn->out_packet.mid;
1971 conn->out_props = prop_list;
1974 process_post(&mqtt_process, mqtt_do_unsubscribe_event, conn);
1975 return MQTT_STATUS_OK;
1980 uint8_t *payload, uint32_t payload_size,
1981 mqtt_qos_level_t qos_level,
1983 mqtt_retain_t retain,
1984 uint8_t topic_alias, mqtt_topic_alias_en_t topic_alias_en,
1985 struct mqtt_prop_list *prop_list)
1987 mqtt_retain_t retain)
1990 if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1991 return MQTT_STATUS_NOT_CONNECTED_ERROR;
1994 DBG(
"MQTT - Call to mqtt_publish...\n");
1997 if(conn->out_queue_full) {
1998 DBG(
"MQTT - Not accepted!\n");
1999 return MQTT_STATUS_OUT_QUEUE_FULL;
2001 conn->out_queue_full = 1;
2002 DBG(
"MQTT - Accepted!\n");
2004 conn->out_packet.mid = INCREMENT_MID(conn);
2005 conn->out_packet.retain = retain;
2007 if(topic_alias_en == MQTT_TOPIC_ALIAS_ON) {
2008 conn->out_packet.topic =
"";
2009 conn->out_packet.topic_length = 0;
2010 conn->out_packet.topic_alias = topic_alias;
2011 if(topic_alias == 0) {
2012 DBG(
"MQTT - Error, a topic alias of 0 is not permitted! It won't be sent.\n");
2015 conn->out_packet.topic = topic;
2016 conn->out_packet.topic_length = strlen(topic);
2017 conn->out_packet.topic_alias = 0;
2020 conn->out_packet.topic = topic;
2021 conn->out_packet.topic_length = strlen(topic);
2023 conn->out_packet.payload = payload;
2024 conn->out_packet.payload_size = payload_size;
2025 conn->out_packet.qos = qos_level;
2026 conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
2029 *mid = conn->out_packet.mid;
2033 conn->out_props = prop_list;
2036 process_post(&mqtt_process, mqtt_do_publish_event, conn);
2037 return MQTT_STATUS_OK;
2045 string_to_mqtt_string(&conn->credentials.username, username);
2046 string_to_mqtt_string(&conn->credentials.password, password);
2049 if(username != NULL) {
2050 conn->connect_vhdr_flags |= MQTT_VHDR_USERNAME_FLAG;
2052 conn->connect_vhdr_flags &= ~MQTT_VHDR_USERNAME_FLAG;
2054 if(password != NULL) {
2055 conn->connect_vhdr_flags |= MQTT_VHDR_PASSWORD_FLAG;
2057 conn->connect_vhdr_flags &= ~MQTT_VHDR_PASSWORD_FLAG;
2064 mqtt_qos_level_t qos,
struct mqtt_prop_list *will_props)
2066 mqtt_qos_level_t qos)
2070 string_to_mqtt_string(&conn->will.topic, topic);
2071 string_to_mqtt_string(&conn->will.message, message);
2074 conn->will.qos = qos;
2077 conn->connect_vhdr_flags |= MQTT_VHDR_WILL_FLAG |
2078 MQTT_VHDR_WILL_RETAIN_FLAG;
2081 conn->will.properties = (
list_t)will_props;
2096 mqtt_auth_type_t auth_type,
2097 struct mqtt_prop_list *prop_list)
2099 DBG(
"MQTT - Call to mqtt_auth...\n");
2101 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_AUTH;
2102 conn->out_packet.remaining_length = 1;
2103 conn->out_packet.auth_reason_code = MQTT_VHDR_RC_CONTINUE_AUTH + auth_type;
2105 conn->out_props = prop_list;
2108 return MQTT_STATUS_OK;
Default definitions of C compiler quirk work-arounds.
Header file for the callback timer.
#define CLOCK_SECOND
A second, measured in system clock time.
void ctimer_stop(struct ctimer *c)
Stop a pending callback timer.
static void ctimer_set(struct ctimer *c, clock_time_t t, void(*f)(void *), void *ptr)
Set a callback timer.
void ctimer_restart(struct ctimer *c)
Restart a callback timer from the current point in time.
static void list_init(list_t list)
Initialize a list.
#define LIST(name)
Declare a linked list.
static void * list_item_next(const void *item)
Get the next item following this item.
void list_add(list_t list, void *item)
Add an item at the end of a list.
void ** list_t
The linked list type.
static void * list_head(const_list_t list)
Get a pointer to the first element of a list.
mqtt_status_t mqtt_auth(struct mqtt_connection *conn, mqtt_auth_type_t auth_type, struct mqtt_prop_list *prop_list)
Send authentication message (MQTTv5-only).
mqtt_status_t mqtt_connect(struct mqtt_connection *conn, char *host, uint16_t port, uint16_t keep_alive, uint8_t clean_session, struct mqtt_prop_list *prop_list)
Connects to a MQTT broker.
mqtt_status_t mqtt_register(struct mqtt_connection *conn, struct process *app_process, char *client_id, mqtt_event_callback_t event_callback, uint16_t max_segment_size)
Initializes the MQTT engine.
mqtt_status_t mqtt_unsubscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic, struct mqtt_prop_list *prop_list)
Unsubscribes from a MQTT topic.
void(* mqtt_event_callback_t)(struct mqtt_connection *m, mqtt_event_t event, void *data)
MQTT event callback function.
mqtt_status_t mqtt_subscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic, mqtt_qos_level_t qos_level, mqtt_nl_en_t nl, mqtt_rap_en_t rap, mqtt_retain_handling_t ret_handling, struct mqtt_prop_list *prop_list)
Subscribes to a MQTT topic.
void mqtt_disconnect(struct mqtt_connection *conn, struct mqtt_prop_list *prop_list)
Disconnects from a MQTT broker.
mqtt_status_t mqtt_publish(struct mqtt_connection *conn, uint16_t *mid, char *topic, uint8_t *payload, uint32_t payload_size, mqtt_qos_level_t qos_level, mqtt_retain_t retain, uint8_t topic_alias, mqtt_topic_alias_en_t topic_alias_en, struct mqtt_prop_list *prop_list)
Publish to a MQTT topic.
mqtt_event_t
MQTT engine events.
void mqtt_set_username_password(struct mqtt_connection *conn, char *username, char *password)
Set the user name and password for a MQTT client.
void mqtt_set_last_will(struct mqtt_connection *conn, char *topic, char *message, mqtt_qos_level_t qos, struct mqtt_prop_list *will_props)
Set the last will topic and message for a MQTT client.
#define PROCESS(name, strname)
Declare a process.
#define PROCESS_WAIT_EVENT()
Wait for an event to be posted to the process.
int process_post(struct process *p, process_event_t ev, process_data_t data)
Post an asynchronous event.
process_event_t process_alloc_event(void)
Allocate a global event number.
#define PROCESS_BEGIN()
Define the beginning of a process.
#define PROCESS_END()
Define the end of a process.
void process_start(struct process *p, process_data_t data)
Start a process.
#define PROCESS_THREAD(name, ev, data)
Define the body of a process.
#define PT_BEGIN(pt)
Declare the start of a protothread inside the C function implementing the protothread.
#define PT_THREAD(name_args)
Declaration of a protothread.
#define PT_END(pt)
Declare the end of a protothread.
#define PT_EXIT(pt)
Exit the protothread.
#define PT_WAIT_UNTIL(pt, condition)
Block and wait until condition is true.
#define PT_INIT(pt)
Initialize a protothread.
void timer_set(struct timer *t, clock_time_t interval)
Set a timer.
bool timer_expired(struct timer *t)
Check if a timer has expired.
#define uip_ipaddr_copy(dest, src)
Copy an IP address from one place to another.
Header file for the LED HAL.
Linked list manipulation routines.
Header file for the Contiki MQTT engine.
Protothreads implementation.
Header file for IPv6-related data structures.
static uip_ipaddr_t ipaddr
Pointer to prefix information option in uip_buf.
Header file for the uIP TCP/IP stack.