45 #include "contiki-net.h" 46 #include "contiki-lib.h" 47 #include "lib/random.h" 55 #include "tcp-socket.h" 57 #include "lib/assert.h" 67 #define PRINTF(...) printf(__VA_ARGS__) 73 MQTT_FHDR_MSG_TYPE_CONNECT = 0x10,
74 MQTT_FHDR_MSG_TYPE_CONNACK = 0x20,
75 MQTT_FHDR_MSG_TYPE_PUBLISH = 0x30,
76 MQTT_FHDR_MSG_TYPE_PUBACK = 0x40,
77 MQTT_FHDR_MSG_TYPE_PUBREC = 0x50,
78 MQTT_FHDR_MSG_TYPE_PUBREL = 0x60,
79 MQTT_FHDR_MSG_TYPE_PUBCOMP = 0x70,
80 MQTT_FHDR_MSG_TYPE_SUBSCRIBE = 0x80,
81 MQTT_FHDR_MSG_TYPE_SUBACK = 0x90,
82 MQTT_FHDR_MSG_TYPE_UNSUBSCRIBE = 0xA0,
83 MQTT_FHDR_MSG_TYPE_UNSUBACK = 0xB0,
84 MQTT_FHDR_MSG_TYPE_PINGREQ = 0xC0,
85 MQTT_FHDR_MSG_TYPE_PINGRESP = 0xD0,
86 MQTT_FHDR_MSG_TYPE_DISCONNECT = 0xE0,
88 MQTT_FHDR_DUP_FLAG = 0x08,
90 MQTT_FHDR_QOS_LEVEL_0 = 0x00,
91 MQTT_FHDR_QOS_LEVEL_1 = 0x02,
92 MQTT_FHDR_QOS_LEVEL_2 = 0x04,
94 MQTT_FHDR_RETAIN_FLAG = 0x01,
98 MQTT_VHDR_USERNAME_FLAG = 0x80,
99 MQTT_VHDR_PASSWORD_FLAG = 0x40,
101 MQTT_VHDR_WILL_RETAIN_FLAG = 0x20,
102 MQTT_VHDR_WILL_QOS_LEVEL_0 = 0x00,
103 MQTT_VHDR_WILL_QOS_LEVEL_1 = 0x08,
104 MQTT_VHDR_WILL_QOS_LEVEL_2 = 0x10,
106 MQTT_VHDR_WILL_FLAG = 0x04,
107 MQTT_VHDR_CLEAN_SESSION_FLAG = 0x02,
108 } mqtt_vhdr_conn_fields_t;
111 MQTT_VHDR_CONN_ACCEPTED,
112 MQTT_VHDR_CONN_REJECTED_PROTOCOL,
113 MQTT_VHDR_CONN_REJECTED_IDENTIFIER,
114 MQTT_VHDR_CONN_REJECTED_UNAVAILABLE,
115 MQTT_VHDR_CONN_REJECTED_BAD_USER_PASS,
116 MQTT_VHDR_CONN_REJECTED_UNAUTHORIZED,
117 } mqtt_vhdr_connack_fields_t;
119 #define MQTT_CONNECT_VHDR_FLAGS_SIZE 12 121 #define MQTT_STRING_LEN_SIZE 2 122 #define MQTT_MID_SIZE 2 123 #define MQTT_QOS_SIZE 1 125 #define RESPONSE_WAIT_TIMEOUT (CLOCK_SECOND * 10) 127 #define INCREMENT_MID(conn) (conn)->mid_counter += 2 128 #define MQTT_STRING_LENGTH(s) (((s)->length) == 0 ? 0 : (MQTT_STRING_LEN_SIZE + (s)->length)) 131 #define PT_MQTT_WRITE_BYTES(conn, data, len) \ 132 conn->out_write_pos = 0; \ 133 while(write_bytes(conn, data, len)) { \ 134 PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \ 137 #define PT_MQTT_WRITE_BYTE(conn, data) \ 138 while(write_byte(conn, data)) { \ 139 PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \ 148 #define PT_MQTT_WAIT_SEND() \ 150 if (PROCESS_ERR_OK == \ 151 process_post(PROCESS_CURRENT(), mqtt_continue_send_event, NULL)) { \ 153 PROCESS_WAIT_EVENT(); \ 154 if(ev == mqtt_abort_now_event) { \ 155 conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE; \ 156 PT_INIT(&conn->out_proto_thread); \ 157 process_post(PROCESS_CURRENT(), ev, data); \ 158 } else if(ev >= mqtt_event_min && ev <= mqtt_event_max) { \ 159 process_post(PROCESS_CURRENT(), ev, data); \ 161 } while (ev != mqtt_continue_send_event); \ 165 static process_event_t mqtt_do_connect_tcp_event;
166 static process_event_t mqtt_do_connect_mqtt_event;
167 static process_event_t mqtt_do_disconnect_mqtt_event;
168 static process_event_t mqtt_do_subscribe_event;
169 static process_event_t mqtt_do_unsubscribe_event;
170 static process_event_t mqtt_do_publish_event;
171 static process_event_t mqtt_do_pingreq_event;
172 static process_event_t mqtt_continue_send_event;
173 static process_event_t mqtt_abort_now_event;
174 process_event_t mqtt_update_event;
181 static process_event_t mqtt_event_min;
182 static process_event_t mqtt_event_max;
186 tcp_input(
struct tcp_socket *s,
void *ptr,
const uint8_t *input_data_ptr,
189 static void tcp_event(
struct tcp_socket *s,
void *ptr,
190 tcp_socket_event_t event);
192 static void reset_packet(
struct mqtt_in_packet *packet);
194 LIST(mqtt_conn_list);
196 PROCESS(mqtt_process,
"MQTT process");
199 call_event(
struct mqtt_connection *conn,
203 conn->event_callback(conn, event, data);
204 process_post(conn->app_process, mqtt_update_event, NULL);
208 reset_defaults(
struct mqtt_connection *conn)
210 conn->mid_counter = 1;
211 PT_INIT(&conn->out_proto_thread);
212 conn->waiting_for_pingresp = 0;
214 reset_packet(&conn->in_packet);
215 conn->out_buffer_sent = 0;
219 abort_connection(
struct mqtt_connection *conn)
221 conn->out_buffer_ptr = conn->out_buffer;
222 conn->out_queue_full = 0;
225 memset(&conn->out_packet, 0,
sizeof(conn->out_packet));
227 tcp_socket_close(&conn->socket);
228 tcp_socket_unregister(&conn->socket);
230 memset(&conn->socket, 0,
sizeof(conn->socket));
232 conn->state = MQTT_CONN_STATE_NOT_CONNECTED;
236 connect_tcp(
struct mqtt_connection *conn)
238 conn->state = MQTT_CONN_STATE_TCP_CONNECTING;
240 reset_defaults(conn);
241 tcp_socket_register(&(conn->socket),
244 MQTT_TCP_INPUT_BUFF_SIZE,
246 MQTT_TCP_OUTPUT_BUFF_SIZE,
249 tcp_socket_connect(&(conn->socket), &(conn->server_ip), conn->server_port);
253 disconnect_tcp(
struct mqtt_connection *conn)
255 conn->state = MQTT_CONN_STATE_DISCONNECTING;
256 tcp_socket_close(&(conn->socket));
257 tcp_socket_unregister(&conn->socket);
259 memset(&conn->socket, 0,
sizeof(conn->socket));
263 send_out_buffer(
struct mqtt_connection *conn)
265 if(conn->out_buffer_ptr - conn->out_buffer == 0) {
266 conn->out_buffer_sent = 1;
269 conn->out_buffer_sent = 0;
271 DBG(
"MQTT - (send_out_buffer) Space used in buffer: %i\n",
272 conn->out_buffer_ptr - conn->out_buffer);
274 tcp_socket_send(&conn->socket, conn->out_buffer,
275 conn->out_buffer_ptr - conn->out_buffer);
279 string_to_mqtt_string(
struct mqtt_string *mqtt_string,
char *
string)
281 if(mqtt_string == NULL) {
284 mqtt_string->string = string;
287 mqtt_string->length = strlen(
string);
289 mqtt_string->length = 0;
294 write_byte(
struct mqtt_connection *conn, uint8_t data)
296 DBG(
"MQTT - (write_byte) buff_size: %i write: '%02X'\n",
297 &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr,
300 if(&conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr == 0) {
301 send_out_buffer(conn);
305 *conn->out_buffer_ptr = data;
306 conn->out_buffer_ptr++;
311 write_bytes(
struct mqtt_connection *conn, uint8_t *data, uint16_t len)
313 uint16_t write_bytes;
315 MIN(&conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr,
316 len - conn->out_write_pos);
318 memcpy(conn->out_buffer_ptr, &data[conn->out_write_pos], write_bytes);
319 conn->out_write_pos += write_bytes;
320 conn->out_buffer_ptr += write_bytes;
322 DBG(
"MQTT - (write_bytes) len: %u write_pos: %lu\n", len,
323 conn->out_write_pos);
325 if(len - conn->out_write_pos == 0) {
326 conn->out_write_pos = 0;
329 send_out_buffer(conn);
330 return len - conn->out_write_pos;
335 encode_remaining_length(uint8_t *remaining_length,
336 uint8_t *remaining_length_bytes,
341 DBG(
"MQTT - Encoding length %lu\n", length);
343 *remaining_length_bytes = 0;
345 digit = length % 128;
346 length = length / 128;
348 digit = digit | 0x80;
351 remaining_length[*remaining_length_bytes] = digit;
352 (*remaining_length_bytes)++;
353 DBG(
"MQTT - Encode len digit '%u' length '%lu'\n", digit, length);
354 }
while(length > 0 && *remaining_length_bytes < 5);
355 DBG(
"MQTT - remaining_length_bytes %u\n", *remaining_length_bytes);
359 keep_alive_callback(
void *ptr)
361 struct mqtt_connection *conn = ptr;
363 DBG(
"MQTT - (keep_alive_callback) Called!\n");
366 if(conn->waiting_for_pingresp) {
367 PRINTF(
"MQTT - Disconnect due to no PINGRESP from broker.\n");
368 disconnect_tcp(conn);
372 process_post(&mqtt_process, mqtt_do_pingreq_event, conn);
376 reset_packet(
struct mqtt_in_packet *packet)
378 memset(packet, 0,
sizeof(
struct mqtt_in_packet));
379 packet->remaining_multiplier = 1;
383 PT_THREAD(connect_pt(
struct pt *pt,
struct mqtt_connection *conn))
387 DBG(
"MQTT - Sending CONNECT message...\n");
390 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_CONNECT;
391 conn->out_packet.remaining_length = 0;
392 conn->out_packet.remaining_length += MQTT_CONNECT_VHDR_FLAGS_SIZE;
393 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->client_id);
394 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->credentials.username);
395 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->credentials.password);
396 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->will.topic);
397 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->will.message);
398 encode_remaining_length(conn->out_packet.remaining_length_enc,
399 &conn->out_packet.remaining_length_enc_bytes,
400 conn->out_packet.remaining_length);
401 if(conn->out_packet.remaining_length_enc_bytes > 4) {
402 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
403 PRINTF(
"MQTT - Error, remaining length > 4 bytes\n");
408 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
409 PT_MQTT_WRITE_BYTES(conn,
410 conn->out_packet.remaining_length_enc,
411 conn->out_packet.remaining_length_enc_bytes);
412 PT_MQTT_WRITE_BYTE(conn, 0);
413 PT_MQTT_WRITE_BYTE(conn, 6);
414 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)MQTT_PROTOCOL_NAME, 6);
415 PT_MQTT_WRITE_BYTE(conn, MQTT_PROTOCOL_VERSION);
416 PT_MQTT_WRITE_BYTE(conn, conn->connect_vhdr_flags);
417 PT_MQTT_WRITE_BYTE(conn, (conn->keep_alive >> 8));
418 PT_MQTT_WRITE_BYTE(conn, (conn->keep_alive & 0x00FF));
419 PT_MQTT_WRITE_BYTE(conn, conn->client_id.length << 8);
420 PT_MQTT_WRITE_BYTE(conn, conn->client_id.length & 0x00FF);
421 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->client_id.string,
422 conn->client_id.length);
423 if(conn->connect_vhdr_flags & MQTT_VHDR_WILL_FLAG) {
424 PT_MQTT_WRITE_BYTE(conn, conn->will.topic.length << 8);
425 PT_MQTT_WRITE_BYTE(conn, conn->will.topic.length & 0x00FF);
426 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->will.topic.string,
427 conn->will.topic.length);
428 PT_MQTT_WRITE_BYTE(conn, conn->will.message.length << 8);
429 PT_MQTT_WRITE_BYTE(conn, conn->will.message.length & 0x00FF);
430 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->will.message.string,
431 conn->will.message.length);
432 DBG(
"MQTT - Setting will topic to '%s' %u bytes and message to '%s' %u bytes\n",
433 conn->will.topic.string,
434 conn->will.topic.length,
435 conn->will.message.string,
436 conn->will.message.length);
438 if(conn->connect_vhdr_flags & MQTT_VHDR_USERNAME_FLAG) {
439 PT_MQTT_WRITE_BYTE(conn, conn->credentials.username.length << 8);
440 PT_MQTT_WRITE_BYTE(conn, conn->credentials.username.length & 0x00FF);
441 PT_MQTT_WRITE_BYTES(conn,
442 (uint8_t *)conn->credentials.username.string,
443 conn->credentials.username.length);
445 if(conn->connect_vhdr_flags & MQTT_VHDR_PASSWORD_FLAG) {
446 PT_MQTT_WRITE_BYTE(conn, conn->credentials.password.length << 8);
447 PT_MQTT_WRITE_BYTE(conn, conn->credentials.password.length & 0x00FF);
448 PT_MQTT_WRITE_BYTES(conn,
449 (uint8_t *)conn->credentials.password.string,
450 conn->credentials.password.length);
454 send_out_buffer(conn);
455 conn->state = MQTT_CONN_STATE_CONNECTING_TO_BROKER;
457 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
460 reset_packet(&conn->in_packet);
461 PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
464 DBG(
"Timeout waiting for CONNACK\n");
468 reset_packet(&conn->in_packet);
470 DBG(
"MQTT - Done sending CONNECT\n");
473 DBG(
"MQTT - CONNECT message sent: \n");
475 for(i = 0; i < (conn->out_buffer_ptr - conn->out_buffer); i++) {
476 DBG(
"%02X ", conn->out_buffer[i]);
485 PT_THREAD(disconnect_pt(
struct pt *pt,
struct mqtt_connection *conn))
489 PT_MQTT_WRITE_BYTE(conn, MQTT_FHDR_MSG_TYPE_DISCONNECT);
490 PT_MQTT_WRITE_BYTE(conn, 0);
492 send_out_buffer(conn);
506 PT_THREAD(subscribe_pt(
struct pt *pt,
struct mqtt_connection *conn))
510 DBG(
"MQTT - Sending subscribe message! topic %s topic_length %i\n",
511 conn->out_packet.topic,
512 conn->out_packet.topic_length);
513 DBG(
"MQTT - Buffer space is %i \n",
514 &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
517 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_SUBSCRIBE | MQTT_FHDR_QOS_LEVEL_1;
518 conn->out_packet.remaining_length = MQTT_MID_SIZE +
519 MQTT_STRING_LEN_SIZE +
520 conn->out_packet.topic_length +
522 encode_remaining_length(conn->out_packet.remaining_length_enc,
523 &conn->out_packet.remaining_length_enc_bytes,
524 conn->out_packet.remaining_length);
525 if(conn->out_packet.remaining_length_enc_bytes > 4) {
526 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
527 PRINTF(
"MQTT - Error, remaining length > 4 bytes\n");
532 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
533 PT_MQTT_WRITE_BYTES(conn,
534 conn->out_packet.remaining_length_enc,
535 conn->out_packet.remaining_length_enc_bytes);
537 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid << 8));
538 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
540 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
541 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
542 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
543 conn->out_packet.topic_length);
544 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.qos);
547 send_out_buffer(conn);
548 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
551 reset_packet(&conn->in_packet);
552 PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
556 DBG(
"Timeout waiting for SUBACK\n");
558 reset_packet(&conn->in_packet);
561 conn->out_queue_full = 0;
563 DBG(
"MQTT - Done in send_subscribe!\n");
569 PT_THREAD(unsubscribe_pt(
struct pt *pt,
struct mqtt_connection *conn))
573 DBG(
"MQTT - Sending unsubscribe message on topic %s topic_length %i\n",
574 conn->out_packet.topic,
575 conn->out_packet.topic_length);
576 DBG(
"MQTT - Buffer space is %i \n",
577 &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
580 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_UNSUBSCRIBE |
581 MQTT_FHDR_QOS_LEVEL_1;
582 conn->out_packet.remaining_length = MQTT_MID_SIZE +
583 MQTT_STRING_LEN_SIZE +
584 conn->out_packet.topic_length;
585 encode_remaining_length(conn->out_packet.remaining_length_enc,
586 &conn->out_packet.remaining_length_enc_bytes,
587 conn->out_packet.remaining_length);
588 if(conn->out_packet.remaining_length_enc_bytes > 4) {
589 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
590 PRINTF(
"MQTT - Error, remaining length > 4 bytes\n");
595 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
596 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc,
597 conn->out_packet.remaining_length_enc_bytes);
599 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid << 8));
600 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
602 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
603 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
604 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
605 conn->out_packet.topic_length);
608 send_out_buffer(conn);
609 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
612 reset_packet(&conn->in_packet);
613 PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
617 DBG(
"Timeout waiting for UNSUBACK\n");
620 reset_packet(&conn->in_packet);
623 conn->out_queue_full = 0;
625 DBG(
"MQTT - Done writing subscribe message to out buffer!\n");
631 PT_THREAD(publish_pt(
struct pt *pt,
struct mqtt_connection *conn))
635 DBG(
"MQTT - Sending publish message! topic %s topic_length %i\n",
636 conn->out_packet.topic,
637 conn->out_packet.topic_length);
638 DBG(
"MQTT - Buffer space is %i \n",
639 &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
642 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_PUBLISH |
643 conn->out_packet.qos << 1;
644 if(conn->out_packet.retain == MQTT_RETAIN_ON) {
645 conn->out_packet.fhdr |= MQTT_FHDR_RETAIN_FLAG;
647 conn->out_packet.remaining_length = MQTT_STRING_LEN_SIZE +
648 conn->out_packet.topic_length +
649 conn->out_packet.payload_size;
650 if(conn->out_packet.qos > MQTT_QOS_LEVEL_0) {
651 conn->out_packet.remaining_length += MQTT_MID_SIZE;
653 encode_remaining_length(conn->out_packet.remaining_length_enc,
654 &conn->out_packet.remaining_length_enc_bytes,
655 conn->out_packet.remaining_length);
656 if(conn->out_packet.remaining_length_enc_bytes > 4) {
657 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
658 PRINTF(
"MQTT - Error, remaining length > 4 bytes\n");
663 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
664 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc,
665 conn->out_packet.remaining_length_enc_bytes);
667 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
668 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
669 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
670 conn->out_packet.topic_length);
671 if(conn->out_packet.qos > MQTT_QOS_LEVEL_0) {
672 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid << 8));
673 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
676 PT_MQTT_WRITE_BYTES(conn,
677 conn->out_packet.payload,
678 conn->out_packet.payload_size);
680 send_out_buffer(conn);
681 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
689 if(conn->out_packet.qos == 0) {
690 process_post(conn->app_process, mqtt_update_event, NULL);
691 }
else if(conn->out_packet.qos == 1) {
693 reset_packet(&conn->in_packet);
694 PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
697 DBG(
"Timeout waiting for PUBACK\n");
699 if(conn->in_packet.mid != conn->out_packet.mid) {
700 DBG(
"MQTT - Warning, got PUBACK with none matching MID. Currently there " 701 "is no support for several concurrent PUBLISH messages.\n");
703 }
else if(conn->out_packet.qos == 2) {
704 DBG(
"MQTT - QoS not implemented yet.\n");
708 reset_packet(&conn->in_packet);
711 conn->out_queue_full = 0;
713 DBG(
"MQTT - Publish Enqueued\n");
719 PT_THREAD(pingreq_pt(
struct pt *pt,
struct mqtt_connection *conn))
723 DBG(
"MQTT - Sending PINGREQ\n");
726 PT_MQTT_WRITE_BYTE(conn, MQTT_FHDR_MSG_TYPE_PINGREQ);
727 PT_MQTT_WRITE_BYTE(conn, 0);
729 send_out_buffer(conn);
732 conn->waiting_for_pingresp = 1;
735 reset_packet(&conn->in_packet);
736 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
740 reset_packet(&conn->in_packet);
742 conn->waiting_for_pingresp = 0;
748 handle_connack(
struct mqtt_connection *conn)
750 DBG(
"MQTT - Got CONNACK\n");
752 if(conn->in_packet.payload[1] != 0) {
753 PRINTF(
"MQTT - Connection refused with Return Code %i\n",
754 conn->in_packet.payload[1]);
756 MQTT_EVENT_CONNECTION_REFUSED_ERROR,
757 &conn->in_packet.payload[1]);
758 abort_connection(conn);
762 conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
765 keep_alive_callback, conn);
768 conn->state = MQTT_CONN_STATE_CONNECTED_TO_BROKER;
769 call_event(conn, MQTT_EVENT_CONNECTED, NULL);
773 handle_pingresp(
struct mqtt_connection *conn)
775 DBG(
"MQTT - Got RINGRESP\n");
779 handle_suback(
struct mqtt_connection *conn)
781 struct mqtt_suback_event suback_event;
783 DBG(
"MQTT - Got SUBACK\n");
786 if(conn->in_packet.remaining_length > MQTT_MID_SIZE +
787 MQTT_MAX_TOPICS_PER_SUBSCRIBE * MQTT_QOS_SIZE) {
788 DBG(
"MQTT - Error, SUBACK with > 1 topic, not supported.\n");
791 conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
793 suback_event.mid = (conn->in_packet.payload[0] << 8) |
794 (conn->in_packet.payload[1]);
795 suback_event.qos_level = conn->in_packet.payload[2];
796 conn->in_packet.mid = suback_event.mid;
798 if(conn->in_packet.mid != conn->out_packet.mid) {
799 DBG(
"MQTT - Warning, got SUBACK with none matching MID. Currently there is" 800 "no support for several concurrent SUBSCRIBE messages.\n");
804 call_event(conn, MQTT_EVENT_SUBACK, &suback_event);
808 handle_unsuback(
struct mqtt_connection *conn)
810 DBG(
"MQTT - Got UNSUBACK\n");
812 conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
813 conn->in_packet.mid = (conn->in_packet.payload[0] << 8) |
814 (conn->in_packet.payload[1]);
816 if(conn->in_packet.mid != conn->out_packet.mid) {
817 DBG(
"MQTT - Warning, got UNSUBACK with none matching MID. Currently there is" 818 "no support for several concurrent UNSUBSCRIBE messages.\n");
821 call_event(conn, MQTT_EVENT_UNSUBACK, &conn->in_packet.mid);
825 handle_puback(
struct mqtt_connection *conn)
827 DBG(
"MQTT - Got PUBACK\n");
829 conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
830 conn->in_packet.mid = (conn->in_packet.payload[0] << 8) |
831 (conn->in_packet.payload[1]);
833 call_event(conn, MQTT_EVENT_PUBACK, &conn->in_packet.mid);
837 handle_publish(
struct mqtt_connection *conn)
839 DBG(
"MQTT - Got PUBLISH, called once per manageable chunk of message.\n");
840 DBG(
"MQTT - Handling publish on topic '%s'\n", conn->in_publish_msg.topic);
842 DBG(
"MQTT - This chunk is %i bytes\n", conn->in_packet.payload_pos);
844 if(((conn->in_packet.fhdr & 0x09) >> 1) > 0) {
845 PRINTF(
"MQTT - Error, got incoming PUBLISH with QoS > 0, not supported atm!\n");
848 call_event(conn, MQTT_EVENT_PUBLISH, &conn->in_publish_msg);
850 if(conn->in_publish_msg.first_chunk == 1) {
851 conn->in_publish_msg.first_chunk = 0;
855 if(conn->in_publish_msg.payload_left == 0) {
860 DBG(
"MQTT - (handle_publish) resetting packet.\n");
861 reset_packet(&conn->in_packet);
866 parse_publish_vhdr(
struct mqtt_connection *conn,
868 const uint8_t *input_data_ptr,
874 if(conn->in_packet.topic_len_received == 0) {
875 conn->in_packet.topic_pos = 0;
876 conn->in_packet.topic_len = (input_data_ptr[(*pos)++] << 8);
877 conn->in_packet.byte_counter++;
878 if(*pos >= input_data_len) {
881 conn->in_packet.topic_len |= input_data_ptr[(*pos)++];
882 conn->in_packet.byte_counter++;
883 conn->in_packet.topic_len_received = 1;
885 if(conn->in_packet.topic_len > MQTT_MAX_TOPIC_LENGTH) {
886 DBG(
"MQTT - topic too long %u/%u\n", conn->in_packet.topic_len, MQTT_MAX_TOPIC_LENGTH);
889 DBG(
"MQTT - Read PUBLISH topic len %i\n", conn->in_packet.topic_len);
894 if(conn->in_packet.topic_len_received == 1 &&
895 conn->in_packet.topic_received == 0) {
896 copy_bytes = MIN(conn->in_packet.topic_len - conn->in_packet.topic_pos,
897 input_data_len - *pos);
898 DBG(
"MQTT - topic_pos: %i copy_bytes: %i", conn->in_packet.topic_pos,
900 memcpy(&conn->in_publish_msg.topic[conn->in_packet.topic_pos],
901 &input_data_ptr[*pos],
903 (*pos) += copy_bytes;
904 conn->in_packet.byte_counter += copy_bytes;
905 conn->in_packet.topic_pos += copy_bytes;
907 if(conn->in_packet.topic_len - conn->in_packet.topic_pos == 0) {
908 DBG(
"MQTT - Got topic '%s'", conn->in_publish_msg.topic);
909 conn->in_packet.topic_received = 1;
910 conn->in_publish_msg.topic[conn->in_packet.topic_pos] =
'\0';
911 conn->in_publish_msg.payload_length =
912 conn->in_packet.remaining_length - conn->in_packet.topic_len - 2;
913 conn->in_publish_msg.payload_left = conn->in_publish_msg.payload_length;
917 conn->in_publish_msg.first_chunk = 1;
922 tcp_input(
struct tcp_socket *s,
924 const uint8_t *input_data_ptr,
927 struct mqtt_connection *conn = ptr;
929 uint32_t copy_bytes = 0;
932 if(input_data_len == 0) {
936 if(conn->in_packet.packet_received) {
937 reset_packet(&conn->in_packet);
940 DBG(
"tcp_input with %i bytes of data:\n", input_data_len);
943 if(!conn->in_packet.fhdr) {
944 conn->in_packet.fhdr = input_data_ptr[pos++];
945 conn->in_packet.byte_counter++;
947 DBG(
"MQTT - Read VHDR '%02X'\n", conn->in_packet.fhdr);
949 if(pos >= input_data_len) {
955 if(!conn->in_packet.has_remaining_length) {
957 if(pos >= input_data_len) {
961 byte = input_data_ptr[pos++];
962 conn->in_packet.byte_counter++;
963 conn->in_packet.remaining_length_bytes++;
964 DBG(
"MQTT - Read Remaining Length byte\n");
966 if(conn->in_packet.byte_counter > 5) {
967 call_event(conn, MQTT_EVENT_ERROR, NULL);
968 DBG(
"Received more then 4 byte 'remaining lenght'.");
972 conn->in_packet.remaining_length +=
973 (byte & 127) * conn->in_packet.remaining_multiplier;
974 conn->in_packet.remaining_multiplier *= 128;
975 }
while((byte & 128) != 0);
977 DBG(
"MQTT - Finished reading remaining length byte\n");
978 conn->in_packet.has_remaining_length = 1;
987 if((conn->in_packet.remaining_length > MQTT_INPUT_BUFF_SIZE) &&
988 (conn->in_packet.fhdr & 0xF0) != MQTT_FHDR_MSG_TYPE_PUBLISH) {
990 PRINTF(
"MQTT - Error, unsupported payload size for non-PUBLISH message\n");
992 conn->in_packet.byte_counter += input_data_len;
993 if(conn->in_packet.byte_counter >=
994 (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)) {
995 conn->in_packet.packet_received = 1;
1006 while(conn->in_packet.byte_counter <
1007 (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)) {
1009 if((conn->in_packet.fhdr & 0xF0) == MQTT_FHDR_MSG_TYPE_PUBLISH &&
1010 conn->in_packet.topic_received == 0) {
1011 parse_publish_vhdr(conn, &pos, input_data_ptr, input_data_len);
1015 copy_bytes = MIN(input_data_len - pos,
1016 MQTT_INPUT_BUFF_SIZE - conn->in_packet.payload_pos);
1017 DBG(
"- Copied %lu payload bytes\n", copy_bytes);
1018 memcpy(&conn->in_packet.payload[conn->in_packet.payload_pos],
1019 &input_data_ptr[pos],
1021 conn->in_packet.byte_counter += copy_bytes;
1022 conn->in_packet.payload_pos += copy_bytes;
1026 DBG(
"MQTT - Copied bytes: \n");
1027 for(i = 0; i < copy_bytes; i++) {
1028 DBG(
"%02X ", conn->in_packet.payload[i]);
1033 if(MQTT_INPUT_BUFF_SIZE - conn->in_packet.payload_pos == 0) {
1034 conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1035 conn->in_publish_msg.payload_chunk_length = MQTT_INPUT_BUFF_SIZE;
1036 conn->in_publish_msg.payload_left -= MQTT_INPUT_BUFF_SIZE;
1038 handle_publish(conn);
1040 conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1041 conn->in_packet.payload_pos = 0;
1044 if(pos >= input_data_len &&
1045 (conn->in_packet.byte_counter < (MQTT_FHDR_SIZE + conn->in_packet.remaining_length))) {
1053 DBG(
"MQTT - Finished reading packet!\n");
1055 DBG(
"MQTT - total data was %i bytes of data. \n",
1056 (MQTT_FHDR_SIZE + conn->in_packet.remaining_length));
1059 switch(conn->in_packet.fhdr & 0xF0) {
1060 case MQTT_FHDR_MSG_TYPE_CONNACK:
1061 handle_connack(conn);
1063 case MQTT_FHDR_MSG_TYPE_PUBLISH:
1065 conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1066 conn->in_publish_msg.payload_chunk_length = conn->in_packet.payload_pos;
1067 conn->in_publish_msg.payload_left = 0;
1068 handle_publish(conn);
1070 case MQTT_FHDR_MSG_TYPE_PUBACK:
1071 handle_puback(conn);
1073 case MQTT_FHDR_MSG_TYPE_SUBACK:
1074 handle_suback(conn);
1076 case MQTT_FHDR_MSG_TYPE_UNSUBACK:
1077 handle_unsuback(conn);
1079 case MQTT_FHDR_MSG_TYPE_PINGRESP:
1080 handle_pingresp(conn);
1084 case MQTT_FHDR_MSG_TYPE_PUBREC:
1085 case MQTT_FHDR_MSG_TYPE_PUBREL:
1086 case MQTT_FHDR_MSG_TYPE_PUBCOMP:
1087 call_event(conn, MQTT_EVENT_NOT_IMPLEMENTED_ERROR, NULL);
1088 PRINTF(
"MQTT - Got unhandled MQTT Message Type '%i'",
1089 (conn->in_packet.fhdr & 0xF0));
1094 PRINTF(
"MQTT - Got MQTT Message Type '%i'", (conn->in_packet.fhdr & 0xF0));
1098 conn->in_packet.packet_received = 1;
1107 tcp_event(
struct tcp_socket *s,
void *ptr, tcp_socket_event_t event)
1109 struct mqtt_connection *conn = ptr;
1115 case TCP_SOCKET_CLOSED:
1116 case TCP_SOCKET_TIMEDOUT:
1117 case TCP_SOCKET_ABORTED: {
1119 DBG(
"MQTT - Disconnected by tcp event %d\n", event);
1120 process_post(&mqtt_process, mqtt_abort_now_event, conn);
1121 conn->state = MQTT_CONN_STATE_NOT_CONNECTED;
1123 call_event(conn, MQTT_EVENT_DISCONNECTED, &event);
1124 abort_connection(conn);
1127 if(conn->auto_reconnect == 1) {
1132 case TCP_SOCKET_CONNECTED: {
1133 conn->state = MQTT_CONN_STATE_TCP_CONNECTED;
1134 conn->out_buffer_sent = 1;
1136 process_post(&mqtt_process, mqtt_do_connect_mqtt_event, conn);
1139 case TCP_SOCKET_DATA_SENT: {
1140 DBG(
"MQTT - Got TCP_DATA_SENT\n");
1142 if(conn->socket.output_data_len == 0) {
1143 conn->out_buffer_sent = 1;
1144 conn->out_buffer_ptr = conn->out_buffer;
1152 DBG(
"MQTT - TCP Event %d is currently not managed by the tcp event callback\n",
1160 static struct mqtt_connection *conn;
1167 if(ev == mqtt_abort_now_event) {
1168 DBG(
"MQTT - Abort\n");
1170 conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE;
1172 abort_connection(conn);
1174 if(ev == mqtt_do_connect_tcp_event) {
1176 DBG(
"MQTT - Got mqtt_do_connect_tcp_event!\n");
1179 if(ev == mqtt_do_connect_mqtt_event) {
1181 conn->socket.output_data_max_seg = conn->max_segment_size;
1182 DBG(
"MQTT - Got mqtt_do_connect_mqtt_event!\n");
1184 if(conn->out_buffer_sent == 1) {
1185 PT_INIT(&conn->out_proto_thread);
1186 while(connect_pt(&conn->out_proto_thread, conn) < PT_EXITED &&
1187 conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE) {
1188 PT_MQTT_WAIT_SEND();
1192 if(ev == mqtt_do_disconnect_mqtt_event) {
1194 DBG(
"MQTT - Got mqtt_do_disconnect_mqtt_event!\n");
1197 if(conn->state == MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT) {
1198 if(conn->out_buffer_sent == 1) {
1199 PT_INIT(&conn->out_proto_thread);
1200 while(conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE &&
1201 disconnect_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1202 PT_MQTT_WAIT_SEND();
1204 abort_connection(conn);
1205 call_event(conn, MQTT_EVENT_DISCONNECTED, &ev);
1207 process_post(&mqtt_process, mqtt_do_disconnect_mqtt_event, conn);
1211 if(ev == mqtt_do_pingreq_event) {
1213 DBG(
"MQTT - Got mqtt_do_pingreq_event!\n");
1215 if(conn->out_buffer_sent == 1 &&
1216 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1217 PT_INIT(&conn->out_proto_thread);
1218 while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1219 pingreq_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1220 PT_MQTT_WAIT_SEND();
1224 if(ev == mqtt_do_subscribe_event) {
1226 DBG(
"MQTT - Got mqtt_do_subscribe_mqtt_event!\n");
1228 if(conn->out_buffer_sent == 1 &&
1229 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1230 PT_INIT(&conn->out_proto_thread);
1231 while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1232 subscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1233 PT_MQTT_WAIT_SEND();
1237 if(ev == mqtt_do_unsubscribe_event) {
1239 DBG(
"MQTT - Got mqtt_do_unsubscribe_mqtt_event!\n");
1241 if(conn->out_buffer_sent == 1 &&
1242 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1243 PT_INIT(&conn->out_proto_thread);
1244 while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1245 unsubscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1246 PT_MQTT_WAIT_SEND();
1250 if(ev == mqtt_do_publish_event) {
1252 DBG(
"MQTT - Got mqtt_do_publish_mqtt_event!\n");
1254 if(conn->out_buffer_sent == 1 &&
1255 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1256 PT_INIT(&conn->out_proto_thread);
1257 while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1258 publish_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1259 PT_MQTT_WAIT_SEND();
1270 static uint8_t inited = 0;
1273 mqtt_event_min = mqtt_do_connect_tcp_event;
1283 mqtt_event_max = mqtt_abort_now_event;
1296 uint16_t max_segment_size)
1298 if(strlen(client_id) < 1) {
1299 return MQTT_STATUS_INVALID_ARGS_ERROR;
1303 memset(conn, 0,
sizeof(
struct mqtt_connection));
1304 string_to_mqtt_string(&conn->client_id, client_id);
1305 conn->event_callback = event_callback;
1306 conn->app_process = app_process;
1307 conn->auto_reconnect = 1;
1308 conn->max_segment_size = max_segment_size;
1309 reset_defaults(conn);
1314 DBG(
"MQTT - Registered successfully\n");
1316 return MQTT_STATUS_OK;
1326 uint16_t keep_alive)
1328 uip_ip6addr_t ip6addr;
1333 if(conn->state > MQTT_CONN_STATE_NOT_CONNECTED) {
1334 return MQTT_STATUS_OK;
1337 conn->server_host = host;
1338 conn->keep_alive = keep_alive;
1339 conn->server_port = port;
1340 conn->out_buffer_ptr = conn->out_buffer;
1341 conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1342 conn->connect_vhdr_flags |= MQTT_VHDR_CLEAN_SESSION_FLAG;
1345 if(uiplib_ip6addrconv(host, &ip6addr) == 0) {
1346 return MQTT_STATUS_ERROR;
1356 process_post(&mqtt_process, mqtt_do_connect_tcp_event, conn);
1358 return MQTT_STATUS_OK;
1364 if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1368 conn->state = MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT;
1370 process_post(&mqtt_process, mqtt_do_disconnect_mqtt_event, conn);
1375 mqtt_qos_level_t qos_level)
1377 if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1378 return MQTT_STATUS_NOT_CONNECTED_ERROR;
1381 DBG(
"MQTT - Call to mqtt_subscribe...\n");
1384 if(conn->out_queue_full) {
1385 DBG(
"MQTT - Not accepted!\n");
1386 return MQTT_STATUS_OUT_QUEUE_FULL;
1388 conn->out_queue_full = 1;
1389 DBG(
"MQTT - Accepted!\n");
1391 conn->out_packet.mid = INCREMENT_MID(conn);
1392 conn->out_packet.topic = topic;
1393 conn->out_packet.topic_length = strlen(topic);
1394 conn->out_packet.qos = qos_level;
1395 conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1397 process_post(&mqtt_process, mqtt_do_subscribe_event, conn);
1398 return MQTT_STATUS_OK;
1404 if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1405 return MQTT_STATUS_NOT_CONNECTED_ERROR;
1408 DBG(
"MQTT - Call to mqtt_unsubscribe...\n");
1410 if(conn->out_queue_full) {
1411 DBG(
"MQTT - Not accepted!\n");
1412 return MQTT_STATUS_OUT_QUEUE_FULL;
1414 conn->out_queue_full = 1;
1415 DBG(
"MQTT - Accepted!\n");
1417 conn->out_packet.mid = INCREMENT_MID(conn);
1418 conn->out_packet.topic = topic;
1419 conn->out_packet.topic_length = strlen(topic);
1420 conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1422 process_post(&mqtt_process, mqtt_do_unsubscribe_event, conn);
1423 return MQTT_STATUS_OK;
1428 uint8_t *payload, uint32_t payload_size,
1429 mqtt_qos_level_t qos_level, mqtt_retain_t retain)
1431 if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1432 return MQTT_STATUS_NOT_CONNECTED_ERROR;
1435 DBG(
"MQTT - Call to mqtt_publish...\n");
1438 if(conn->out_queue_full) {
1439 DBG(
"MQTT - Not accepted!\n");
1440 return MQTT_STATUS_OUT_QUEUE_FULL;
1442 conn->out_queue_full = 1;
1443 DBG(
"MQTT - Accepted!\n");
1445 conn->out_packet.mid = INCREMENT_MID(conn);
1446 conn->out_packet.retain = retain;
1447 conn->out_packet.topic = topic;
1448 conn->out_packet.topic_length = strlen(topic);
1449 conn->out_packet.payload = payload;
1450 conn->out_packet.payload_size = payload_size;
1451 conn->out_packet.qos = qos_level;
1452 conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1454 process_post(&mqtt_process, mqtt_do_publish_event, conn);
1455 return MQTT_STATUS_OK;
1463 string_to_mqtt_string(&conn->credentials.username, username);
1464 string_to_mqtt_string(&conn->credentials.password, password);
1467 if(username != NULL) {
1468 conn->connect_vhdr_flags |= MQTT_VHDR_USERNAME_FLAG;
1470 conn->connect_vhdr_flags &= ~MQTT_VHDR_USERNAME_FLAG;
1472 if(password != NULL) {
1473 conn->connect_vhdr_flags |= MQTT_VHDR_PASSWORD_FLAG;
1475 conn->connect_vhdr_flags &= ~MQTT_VHDR_PASSWORD_FLAG;
1481 mqtt_qos_level_t qos)
1484 string_to_mqtt_string(&conn->will.topic, topic);
1485 string_to_mqtt_string(&conn->will.message, message);
1488 conn->will.qos = qos;
1491 conn->connect_vhdr_flags |= MQTT_VHDR_WILL_FLAG |
1492 MQTT_VHDR_WILL_RETAIN_FLAG;
mqtt_status_t mqtt_subscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic, mqtt_qos_level_t qos_level)
Subscribes to a MQTT topic.
static uip_ipaddr_t ipaddr
Pointer to prefix information option in uip_buf.
mqtt_event_t
MQTT engine events.
void timer_set(struct timer *t, clock_time_t interval)
Set a timer.
#define PROCESS(name, strname)
Declare a process.
Protothreads implementation.
void ctimer_stop(struct ctimer *c)
Stop a pending callback timer.
#define PROCESS_WAIT_EVENT()
Wait for an event to be posted to the process.
mqtt_status_t mqtt_connect(struct mqtt_connection *conn, char *host, uint16_t port, uint16_t keep_alive)
Connects to a MQTT broker.
#define PROCESS_BEGIN()
Define the beginning of a process.
mqtt_status_t mqtt_publish(struct mqtt_connection *conn, uint16_t *mid, char *topic, uint8_t *payload, uint32_t payload_size, mqtt_qos_level_t qos_level, mqtt_retain_t retain)
Publish to a MQTT topic.
#define PROCESS_END()
Define the end of a process.
#define PT_BEGIN(pt)
Declare the start of a protothread inside the C function implementing the protothread.
#define PT_WAIT_UNTIL(pt, condition)
Block and wait until condition is true.
void(* mqtt_event_callback_t)(struct mqtt_connection *m, mqtt_event_t event, void *data)
MQTT event callback function.
Header file for IPv6-related data structures.
#define PT_INIT(pt)
Initialize a protothread.
#define CLOCK_SECOND
A second, measured in system clock time.
Header file for the callback timer
#define PT_END(pt)
Declare the end of a protothread.
mqtt_status_t mqtt_unsubscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic)
Unsubscribes from a MQTT topic.
Linked list manipulation routines.
void mqtt_set_username_password(struct mqtt_connection *conn, char *username, char *password)
Set the user name and password for a MQTT client.
Header file for the Contiki MQTT engine.
void ctimer_set(struct ctimer *c, clock_time_t t, void(*f)(void *), void *ptr)
Set a callback timer.
int timer_expired(struct timer *t)
Check if a timer has expired.
#define uip_ipaddr_copy(dest, src)
Copy an IP address from one place to another.
#define PT_EXIT(pt)
Exit the protothread.
void ctimer_restart(struct ctimer *c)
Restart a callback timer from the current point in time.
#define PT_THREAD(name_args)
Declaration of a protothread.
process_event_t process_alloc_event(void)
Allocate a global event number.
void list_add(list_t list, void *item)
Add an item at the end of a list.
void list_init(list_t list)
Initialize a list.
Header file for the uIP TCP/IP stack.
#define LIST(name)
Declare a linked list.
mqtt_status_t mqtt_register(struct mqtt_connection *conn, struct process *app_process, char *client_id, mqtt_event_callback_t event_callback, uint16_t max_segment_size)
Initializes the MQTT engine.
int process_post(struct process *p, process_event_t ev, process_data_t data)
Post an asynchronous event.
void mqtt_disconnect(struct mqtt_connection *conn)
Disconnects from a MQTT broker.
Default definitions of C compiler quirk work-arounds.
PROCESS_THREAD(cc2538_rf_process, ev, data)
Implementation of the cc2538 RF driver process.
void mqtt_set_last_will(struct mqtt_connection *conn, char *topic, char *message, mqtt_qos_level_t qos)
Set the last will topic and message for a MQTT client.
Header file for the LED HAL.
void process_start(struct process *p, process_data_t data)
Start a process.