RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: j...@rpm5.org Module: rpm Date: 11-Jul-2016 22:26:53 Branch: rpm-5_4 Handle: 2016071120265300 Modified files: (Branch: rpm-5_4) rpm CHANGES configure.ac rpm/rpmio rpmmqtt.c rpmmqtt.h tmqtt.c Log: - mqtt: add a WITH_MOSQUITTO implementation. Summary: Revision Changes Path 1.3501.2.516+1 -0 rpm/CHANGES 2.472.2.153 +20 -2 rpm/configure.ac 1.1.2.20 +1790 -347 rpm/rpmio/rpmmqtt.c 1.1.2.18 +134 -22 rpm/rpmio/rpmmqtt.h 1.1.2.13 +5 -6 rpm/rpmio/tmqtt.c ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/CHANGES ============================================================================ $ cvs diff -u -r1.3501.2.515 -r1.3501.2.516 CHANGES --- rpm/CHANGES 7 Jul 2016 14:34:56 -0000 1.3501.2.515 +++ rpm/CHANGES 11 Jul 2016 20:26:53 -0000 1.3501.2.516 @@ -1,4 +1,5 @@ 5.4.17 -> 5.4.18: + - jbj: mqtt: add a WITH_MOSQUITTO implementation. - jbj: blake2: upgrade to 20160619 release. - jbj: macros: stub-in rpmmc/rpmme pools for MacroContext/MacroEntry. - jbj: mqtt: prepare for MacroContext sub-classing. @@ . patch -p0 <<'@@ .' Index: rpm/configure.ac ============================================================================ $ cvs diff -u -r2.472.2.152 -r2.472.2.153 configure.ac --- rpm/configure.ac 29 Jun 2016 12:17:57 -0000 2.472.2.152 +++ rpm/configure.ac 11 Jul 2016 20:26:53 -0000 2.472.2.153 @@ -2246,10 +2246,28 @@ # MQTT RPM_CHECK_LIB( - [MQTT], [mqtt], + [Paho], [paho], [paho-mqtt3as], [MQTTAsync_create], [MQTTAsync.h], [no,external:none], [], - [ AC_DEFINE(WITH_MQTT, 1, [Define if building with MQTT]) + [ AC_DEFINE(WITH_PAHO, 1, [Define if building with MQTT+PAHO]) + ], []) +RPM_CHECK_LIB( + [Mosquitto], [mosquitto], + [mosquitto], [mosquitto_lib_init], [mosquitto.h], + [no,external:none], [], + [ AC_DEFINE(WITH_MOSQUITTO, 1, [Define if building with MQTT+MOSQUITTO]) + ], []) +RPM_CHECK_LIB( + [RabbitMQ], [rabbitmq], + [rabbitmq], [amqp_new_connection], [amqp.h], + [no,external:none], [], + [ AC_DEFINE(WITH_RABBITMQ, 1, [Define if building with MQTT+RABBITMQ]) + ], []) +RPM_CHECK_LIB( + [ZeroMQ], [zeromq], + [zmq], [zmq_ctx_new], [zmq.h], + [no,external:none], [], + [ AC_DEFINE(WITH_ZEROMQ, 1, [Define if building with MQTT+ZEROMQ]) ], []) # Libgit2 @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.c ============================================================================ $ cvs diff -u -r1.1.2.19 -r1.1.2.20 rpmmqtt.c --- rpm/rpmio/rpmmqtt.c 10 Jul 2016 16:16:04 -0000 1.1.2.19 +++ rpm/rpmio/rpmmqtt.c 11 Jul 2016 20:26:53 -0000 1.1.2.20 @@ -3,6 +3,15 @@ */ #include "system.h" + +#undef WITH_PAHO +#undef WITH_RABBITMQ +#undef WITH_ZEROMQ + +#if defined(WITH_PAHO) || defined(WITH_MOSQUITTO) || defined(WITH_RABBITMQ) || defined(WITH_ZEROMQ) +#define WITH_MQTT +#endif + #include <stdio.h> #include <stdlib.h> #include <string.h> @@ -19,17 +28,25 @@ #include <poptIO.h> #include <argv.h> -#ifdef WITH_MQTT +#ifdef WITH_PAHO #include <MQTTAsync.h> #endif +#ifdef WITH_MOSQUITTO +#include <mosquitto.h> +#endif +#ifdef WITH_RABBITMQ +#include <amqp.h> +#endif +#ifdef WITH_ZEROMQ +#include <zmq.h> +#endif #define _RPMMQTT_INTERNAL #include <rpmmqtt.h> #include "debug.h" -int _rpmmqtt_debug = 1; -#define SPEW(_list) if (mqtt->debug || _rpmmqtt_debug < 0) fprintf _list +int _rpmmqtt_debug; #define F_ISSET(_f, _FLAG) (((_f) & ((MQTT_FLAGS_##_FLAG) & ~0x40000000)) != MQTT_FLAGS_NONE) #define MF_ISSET(_FLAG) F_ISSET(mqtt->flags, _FLAG) @@ -121,8 +138,8 @@ return t; } -#define _ENTRY(_v) { MQTT_FLAGS_##_v, #_v, } static KEY MqttFlags[] = { +#define _ENTRY(_v) { MQTT_FLAGS_##_v, #_v, } _ENTRY(CLEAN), _ENTRY(EOL), _ENTRY(NOSTALE), @@ -133,8 +150,8 @@ _ENTRY(RETAIN), _ENTRY(WILL_RETAIN), _ENTRY(BUFFER), -}; #undef _ENTRY +}; static size_t nMqttFlags = sizeof(MqttFlags) / sizeof(MqttFlags[0]); static const char * fmtMqttFlags(uint32_t flags) @@ -146,33 +163,10 @@ } #define _MQTTFLAGS(_flags) fmtMqttFlags(_flags) -#define _ENTRY(_v) { MQTTASYNC_##_v, #_v, } -static KEY rpmmqtt_errs[] = { -#ifdef WITH_MQTT - _ENTRY(SUCCESS), - _ENTRY(FAILURE), - _ENTRY(PERSISTENCE_ERROR), - _ENTRY(DISCONNECTED), - _ENTRY(MAX_MESSAGES_INFLIGHT), - _ENTRY(BAD_UTF8_STRING), - _ENTRY(NULL_PARAMETER), - _ENTRY(TOPICNAME_TRUNCATED), - _ENTRY(BAD_STRUCTURE), - _ENTRY(BAD_QOS), - _ENTRY(NO_MORE_MSGIDS), - _ENTRY(OPERATION_INCOMPLETE), - _ENTRY(MAX_BUFFERED_MESSAGES), -#else - { 0, NULL }, -#endif -}; -#undef _ENTRY -static size_t rpmmqtt_nerrs = sizeof(rpmmqtt_errs) / sizeof(rpmmqtt_errs[0]); - -static const char * rpmmqttStrerror(int v) +static const char * rpmmqttStrerror(rpmmqtt mqtt, int v) { - KEY * tbl = rpmmqtt_errs; - size_t ntbl = rpmmqtt_nerrs; + KEY * tbl = (KEY *) (mqtt && mqtt->vec ? mqtt->vec->errs : NULL); + size_t ntbl = (mqtt && mqtt->vec && mqtt->vec->errs ? mqtt->vec->nerrs : 0); const char * n = NULL; static char buf[64]; @@ -192,11 +186,14 @@ static rpmRC Xcheck(rpmmqtt mqtt, const char * msg, int rc, int printit, const char * func, const char * fn, unsigned ln) { + int all = 0; - if (rc != 0) { /* MQTTCLIENT_SUCCESS */ + if (all || rc != 0) { int _lvl = RPMLOG_WARNING; - rpmlog(_lvl, "%s:%s:%u: MQTTAsync_%s: %s(%d)\n", - func, fn, ln, msg, rpmmqttStrerror(rc), rc); + const char * api = (mqtt && mqtt->vec && mqtt->vec->prefix + ? mqtt->vec->prefix : ""); + rpmlog(_lvl, "%s:%s:%u: %s%s: %s(%d)\n", + func, fn, ln, api, msg, rpmmqttStrerror(mqtt, rc), rc); } return (rpmRC) rc; } @@ -390,8 +387,8 @@ } /*==============================================================*/ -#ifdef WITH_MQTT -static int onMessageArrived(void * _mqtt, char * topic, int topicLen, +#ifdef WITH_PAHO +static int pahoOnMessageArrived(void * _mqtt, char * topic, int topicLen, MQTTAsync_message * message) { rpmmqtt mqtt = (rpmmqtt) _mqtt; @@ -401,8 +398,9 @@ const char * s = (const char *) message->payload; size_t ns = message->payloadlen; - if (_rpmmqtt_debug < 0) - rpmlog(RPMLOG_DEBUG, "MQTT rcvd topic(%s) \"%.*s\"\n", topic, (int)ns, s); + + rpmlog(RPMLOG_DEBUG, "%s rcvd topic(%s) \"%.*s\"\n", mqtt->vec->name, + topic, (int)ns, s); if (mqtt->iob) { mqtt->iob = rpmiobAppend(mqtt->iob, topic, 0); @@ -442,73 +440,67 @@ return rc; } -static void onDeliveryComplete(void * _mqtt, int token) +static void pahoOnDeliveryComplete(void * _mqtt, int token) { rpmmqtt mqtt = (rpmmqtt) _mqtt; - if (_rpmmqtt_debug < 0) - rpmlog(RPMLOG_DEBUG, "--- MQTT done(%d)\n", token); mqtt->token = token; } -static void onConnectionLost(void * _mqtt, char *cause) +static void pahoOnConnectionLost(void * _mqtt, char *cause) { rpmmqtt mqtt = (rpmmqtt) _mqtt; rpmlog(RPMLOG_DEBUG, - "--- MQTT disconnect(%s) version(%d) present(%d)\n", + "%s disconnect(%s) version(%d) present(%d)\n", mqtt->vec->name, mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent); if (cause) rpmlog(RPMLOG_DEBUG, "\tcause: %s\n", cause); - rpmlog(RPMLOG_WARNING, "MQTT reconnecting(%s) ...\n", mqtt->serverURI); + rpmlog(RPMLOG_WARNING, "%s reconnecting(%s) ...\n", mqtt->vec->name, + mqtt->serverURI); (void) rpmmqttConnect(mqtt); } -static void onFailure(void * _mqtt, MQTTAsync_failureData * response) +static void pahoOnFailure(void * _mqtt, MQTTAsync_failureData * response) { rpmmqtt mqtt = (rpmmqtt) _mqtt; - rpmlog(RPMLOG_WARNING, "MQTT failed\n"); -SPEW((stderr, "<-- %s(%p,%p)\n", __FUNCTION__, _mqtt, response)); + rpmlog(RPMLOG_WARNING, "%s cmd failed.\n", mqtt->vec->name); mqtt->finished = 1; } -static void onSuccess(void * _mqtt, MQTTAsync_successData * response) +static void pahoOnSuccess(void * _mqtt, MQTTAsync_successData * response) { rpmmqtt mqtt = (rpmmqtt) _mqtt; -SPEW((stderr, "<-- %s(%p,%p)\n", __FUNCTION__, _mqtt, response)); mqtt->finished = 1; } -static void onDisconnectFailure(void * _mqtt, MQTTAsync_failureData * response) +static void pahoOnDisconnectFailure(void * _mqtt, MQTTAsync_failureData * response) { rpmmqtt mqtt = (rpmmqtt) _mqtt; -fprintf(stderr, "<-- %s(%p,%p) MQTT disconnect failed\n", __FUNCTION__, _mqtt, response); - rpmlog(RPMLOG_WARNING, "MQTT disconnect failed\n"); + rpmlog(RPMLOG_WARNING, "%s disconnect failed\n", mqtt->vec->name); mqtt->finished = 1; } -static void onDisconnect(void * _mqtt, MQTTAsync_successData * response) +static void pahoOnDisconnect(void * _mqtt, MQTTAsync_successData * response) { rpmmqtt mqtt = (rpmmqtt) _mqtt; - if (mqtt->debug || _rpmmqtt_debug) - rpmlog(RPMLOG_DEBUG, - "MQTT disconnect(%s) version(%d) present(%d)\n", + rpmlog(RPMLOG_DEBUG, + "%s disconnect(%s) version(%d) present(%d)\n", mqtt->vec->name, mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent); mqtt->serverURI = _free(mqtt->serverURI); mqtt->finished = 1; } -static void onConnectFailure(void * _mqtt, MQTTAsync_failureData * response) +static void pahoOnConnectFailure(void * _mqtt, MQTTAsync_failureData * response) { rpmmqtt mqtt = (rpmmqtt) _mqtt; -fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _mqtt, response); - rpmlog(RPMLOG_WARNING, "MQTT connect failed\n"); + rpmlog(RPMLOG_WARNING, "%s connect failed\n", mqtt->vec->name); mqtt->finished = 1; } -static void onConnect(void * _mqtt, MQTTAsync_successData * response) +static void pahoOnConnect(void * _mqtt, MQTTAsync_successData * response) { rpmmqtt mqtt = (rpmmqtt) _mqtt; @@ -519,77 +511,73 @@ mqtt->sessionPresent = response->alt.connect.sessionPresent; } - if (mqtt->debug || _rpmmqtt_debug) - rpmlog(RPMLOG_DEBUG, - "MQTT connect(%s) version(%d) present(%d)\n", + rpmlog(RPMLOG_DEBUG, + "%s connect(%s) version(%d) present(%d)\n", mqtt->vec->name, mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent); mqtt->finished = 1; } -static void onSubscribeFailure(void * _mqtt, MQTTAsync_failureData * response) +static void pahoOnSubscribeFailure(void * _mqtt, MQTTAsync_failureData * response) { rpmmqtt mqtt = (rpmmqtt) _mqtt; -fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _mqtt, response); if (response) { const char *s = response->message; int token = response->token; int code = response->code; - rpmlog(RPMLOG_WARNING, "MQTT subscribe(%d) failed: code(%d) msg %s\n", - token, code, s); + rpmlog(RPMLOG_WARNING, "%s subscribe(%d) failed: code(%d) msg %s\n", + mqtt->vec->name, token, code, s); } else - rpmlog(RPMLOG_WARNING, "MQTT subscribe failed\n"); + rpmlog(RPMLOG_WARNING, "%s subscribe failed\n", mqtt->vec->name); mqtt->finished = 1; } -static void onSubscribe(void * _mqtt, MQTTAsync_successData * response) +static void pahoOnSubscribe(void * _mqtt, MQTTAsync_successData * response) { rpmmqtt mqtt = (rpmmqtt) _mqtt; int qos = response->alt.qos; - if (mqtt->debug || _rpmmqtt_debug) - rpmlog(RPMLOG_DEBUG, "MQTT subscribe qos(%d)\n", qos); + rpmlog(RPMLOG_DEBUG, "%s subscribe qos(%d)\n", mqtt->vec->name, qos); mqtt->finished = 1; } -static void onSubscribeManyFailure(void * _mqtt, MQTTAsync_failureData * response) +static void pahoOnSubscribeManyFailure(void * _mqtt, MQTTAsync_failureData * response) { rpmmqtt mqtt = (rpmmqtt) _mqtt; - rpmlog(RPMLOG_WARNING, "MQTT subscribeMany failed\n"); + + rpmlog(RPMLOG_WARNING, "%s subscribeMany failed\n", mqtt->vec->name); mqtt->finished = 1; } -static void onSubscribeMany(void * _mqtt, MQTTAsync_successData * response) +static void pahoOnSubscribeMany(void * _mqtt, MQTTAsync_successData * response) { rpmmqtt mqtt = (rpmmqtt) _mqtt; + #ifdef DYING int *subqos = response->alt.qosList; - -SPEW((stderr, "--> %s(%p,%p) subqos %p[%u]\n", __FUNCTION__, _mqtt, response, subqos, mqtt->ac)); for (int i = 0; i < mqtt->ac; i++) { - if (mqtt->debug || _rpmmqtt_debug) - rpmlog(RPMLOG_DEBUG, "MQTT subscribe qos(%d)\n", subqos[i]); + rpmlog(RPMLOG_DEBUG, "%s subscribe qos(%d)\n", mqtt->vec->name, subqos[i]); } #endif + mqtt->finished = 1; } -static void onUnsubscribeManyFailure(void * _mqtt, MQTTAsync_failureData * response) +static void pahoOnUnsubscribeManyFailure(void * _mqtt, MQTTAsync_failureData * response) { rpmmqtt mqtt = (rpmmqtt) _mqtt; - rpmlog(RPMLOG_WARNING, "MQTT unsubscribeMany failed\n"); + + rpmlog(RPMLOG_WARNING, "%s unsubscribeMany failed\n", mqtt->vec->name); mqtt->finished = 1; } -static void onUnsubscribeMany(void * _mqtt, MQTTAsync_successData * response) +static void pahoOnUnsubscribeMany(void * _mqtt, MQTTAsync_successData * response) { rpmmqtt mqtt = (rpmmqtt) _mqtt; - rpmlog(RPMLOG_DEBUG, "MQTT unsubscribeMany\n"); -SPEW((stderr, "<-- %s(%p,%p)\n", __FUNCTION__, _mqtt, response)); mqtt->finished = 1; } -static void onSendFailure(void * _mqtt, MQTTAsync_failureData * response) +static void pahoOnSendFailure(void * _mqtt, MQTTAsync_failureData * response) { rpmmqtt mqtt = (rpmmqtt) _mqtt; @@ -597,22 +585,22 @@ const char *s = response->message; int token = response->token; int code = response->code; - rpmlog(RPMLOG_WARNING, "MQTT send(%d) failed: code(%d) msg %s\n", - token, code, s); + rpmlog(RPMLOG_WARNING, "%s send(%d) failed: code(%d) msg %s\n", + mqtt->vec->name, token, code, s); } mqtt->finished = 1; } -static void onSend(void * _mqtt, MQTTAsync_successData * response) +static void pahoOnSend(void * _mqtt, MQTTAsync_successData * response) { rpmmqtt mqtt = (rpmmqtt) _mqtt; - if (mqtt->debug || _rpmmqtt_debug) { + { const char * s = (const char *) response->alt.pub.message.payload; size_t ns = response->alt.pub.message.payloadlen; int token = response->token; - rpmlog(RPMLOG_DEBUG, "MQTT sent(%d) topic(%s) \"%.*s\"\n", - token, mqtt->topic, (int)ns, s); + rpmlog(RPMLOG_DEBUG, "%s sent(%d) topic(%s) \"%.*s\"\n", + mqtt->vec->name, token, mqtt->topic, (int)ns, s); } mqtt->finished = 1; } @@ -621,11 +609,11 @@ { rpmlog(RPMLOG_DEBUG, "%s\n", message); } -#endif /* WITH_MQTT */ +#endif /* WITH_PAHO */ /*==============================================================*/ -#ifdef WITH_MQTT -static int rpmmqttOpen(void **_mqttp, const char *clientID, const char *serverURI, +#ifdef WITH_PAHO +static int pahoOpen(void **_mqttp, const char *clientID, const char *serverURI, void *_mqtt) { rpmmqtt mqtt = (rpmmqtt) _mqtt; @@ -649,12 +637,12 @@ rc = 0; exit: -SPEW((stderr, "<-- %s(%p,\"%s\",\"%s\",%p) rc %d dn %s\n", __FUNCTION__, _mqttp, clientID, serverURI, _mqtt, rc, mqtt->cachedn)); +SPEW((stderr, "<-- %s(%p,\"%s\",\"%s\",%p) rc %d dn %s\n", __FUNCTION__, _mqttp, clientID, serverURI, _mqtt, rc, mqtt->cachedn)); return rc; } -static int rpmmqttClose(void *_mqtt) +static int pahoClose(void *_mqtt) { rpmmqtt mqtt = (rpmmqtt) _mqtt; int rc = MQTTCLIENT_PERSISTENCE_ERROR; @@ -671,11 +659,11 @@ rc = 0; exit: -SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, _mqtt, rc)); +SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, _mqtt, rc)); return rc; } -static int rpmmqttPut(void *_mqtt, char *key, +static int pahoPut(void *_mqtt, char *key, int bufcount, char *buffers[], int buflens[]) { rpmmqtt mqtt = (rpmmqtt) _mqtt; @@ -697,10 +685,31 @@ for (int i = 0; i < bufcount; i++) { const char * s = buffers[i]; int ns = buflens[i]; + int j; + for (j = 0; j < ns; j++) { + if (xisprint(s[j])) + continue; + break; + } + if (i > 0 && (j == ns || (j == ns-1 && s[j] == '\0'))) { SPEW((stderr, "%5d\t%p[%d]\t\"%.*s\"\n", i, s, ns, ns, s)); + } else { + if (ns <= 8) { + unsigned long val = 0; + j = ns; + while (--j >= 0) { + val <<= 8; + val |= s[j]; + } +SPEW((stderr, "%5d\t%p[%d]\t0x%lx\n", i, s, ns, val)); + } else { +SPEW((stderr, "%5d\t%p[%d]\n", i, s, ns)); + } + } nb += buflens[i]; nw += Fwrite(buffers[i], sizeof(*buffers[i]), buflens[i], fd); } + (void) fdatasync(Fileno(fd)); (void) Fclose(fd); fd = NULL; @@ -714,13 +723,12 @@ exit: if (fd) Fclose(fd); -SPEW((stderr, "<-- %s(%p,\"%s\",%d,%p,%p) rc %d fn %s\n", __FUNCTION__, _mqtt, key, bufcount, buffers, buflens, rc, fn)); +SPEW((stderr, "<-- %s(%p,\"%s\",%d,%p,%p) rc %d fn %s\n", __FUNCTION__, _mqtt, key, bufcount, buffers, buflens, rc, fn)); fn = _free(fn); return rc; } -static int rpmmqttGet(void *_mqtt, char *key, - char *buffer[], int *buflen) +static int pahoGet(void *_mqtt, char *key, char *buffer[], int *buflen) { rpmmqtt mqtt = (rpmmqtt) _mqtt; char *fn = NULL; @@ -759,14 +767,14 @@ exit: if (fd) Fclose(fd); -SPEW((stderr, "<-- %s(%p,\"%s\",%p,%p) rc %d fn %s\n", __FUNCTION__, _mqtt, key, buffer, buflen, rc, fn)); +SPEW((stderr, "<-- %s(%p,\"%s\",%p,%p) rc %d fn %s\n", __FUNCTION__, _mqtt, key, buffer, buflen, rc, fn)); fn = _free(fn); *buffer = b; *buflen = nb; return rc; } -static int rpmmqttRemove(void *_mqtt, char *key) +static int pahoRemove(void *_mqtt, char *key) { rpmmqtt mqtt = (rpmmqtt) _mqtt; char *fn = NULL; @@ -783,12 +791,12 @@ rc = 0; exit: -SPEW((stderr, "<-- %s(%p,\"%s\") rc %d fn %s\n", __FUNCTION__, _mqtt, key, rc, fn)); +SPEW((stderr, "<-- %s(%p,\"%s\") rc %d fn %s\n", __FUNCTION__, _mqtt, key, rc, fn)); fn = _free(fn); return rc; } -static int rpmmqttKeys(void *_mqtt, char ***keys, int *nkeys) +static int pahoKeys(void *_mqtt, char ***keys, int *nkeys) { rpmmqtt mqtt = (rpmmqtt) _mqtt; ARGV_t av = NULL; @@ -820,13 +828,13 @@ exit: if (dir) (void) Closedir(dir); -SPEW((stderr, "<-- %s(%p,%p,%p) rc %d keys %p[%u]\n", __FUNCTION__, _mqtt, keys, nkeys, rc, av, ac)); +SPEW((stderr, "<-- %s(%p,%p,%p) rc %d keys %p[%u]\n", __FUNCTION__, _mqtt, keys, nkeys, rc, av, ac)); *keys = (char **) av; *nkeys = ac; return rc; } -static int rpmmqttClear(void *_mqtt) +static int pahoClear(void *_mqtt) { rpmmqtt mqtt = (rpmmqtt) _mqtt; DIR * dir = NULL; @@ -857,11 +865,11 @@ exit: if (dir) (void) Closedir(dir); -SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, _mqtt, rc)); +SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, _mqtt, rc)); return rc; } -static int rpmmqttContainsKey(void *_mqtt, char *key) +static int pahoContainsKey(void *_mqtt, char *key) { rpmmqtt mqtt = (rpmmqtt) _mqtt; DIR * dir = NULL; @@ -892,28 +900,28 @@ exit: if (dir) (void) Closedir(dir); -SPEW((stderr, "<-- %s(%p,\"%s\") rc %d\n", __FUNCTION__, _mqtt, key, rc)); +SPEW((stderr, "<-- %s(%p,\"%s\") rc %d\n", __FUNCTION__, _mqtt, key, rc)); return rc; } -static MQTTClient_persistence _rpmmqtt_persistence = { +static MQTTClient_persistence _mqtt_persistence = { NULL, - rpmmqttOpen, - rpmmqttClose, - rpmmqttPut, - rpmmqttGet, - rpmmqttRemove, - rpmmqttKeys, - rpmmqttClear, - rpmmqttContainsKey, + pahoOpen, + pahoClose, + pahoPut, + pahoGet, + pahoRemove, + pahoKeys, + pahoClear, + pahoContainsKey, }; -#endif /* WITH_MQTT */ +#endif /* WITH_PAHO */ /*==============================================================*/ -#ifdef WITH_MQTT -struct mqttState_s { +#ifdef WITH_PAHO +struct pahoState_s { MQTTAsync_connectOptions C; MQTTAsync_willOptions W; MQTTAsync_SSLOptions S; @@ -923,7 +931,7 @@ MQTTAsync_message M; }; -static struct mqttState_s mqttStateInitial = { +static struct pahoState_s pahoStateInitial = { .C = MQTTAsync_connectOptions_initializer, .W = MQTTAsync_willOptions_initializer, .S = MQTTAsync_SSLOptions_initializer, @@ -932,14 +940,13 @@ .O = MQTTAsync_createOptions_initializer, .M = MQTTAsync_message_initializer, }; -#endif /* WITH_MQTT */ -static void * AOBJ(rpmmqtt mqtt, char otype) +static +void * AOBJ(rpmmqtt mqtt, char otype) { void * ptr = NULL; -#ifdef WITH_MQTT - mqttState p = NULL; - mqttState q = &mqttStateInitial; + pahoState_t p = NULL; + pahoState_t q = &pahoStateInitial; MQTTAsync_connectOptions *C; MQTTAsync_disconnectOptions *D; MQTTAsync_message *M; @@ -950,8 +957,8 @@ urlinfo u; if (mqtt->state == NULL) - mqtt->state = (mqttState) xcalloc(1, sizeof(*p)); - p = (mqttState) mqtt->state; + mqtt->state = (pahoState_t) xcalloc(1, sizeof(*p)); + p = (pahoState_t) mqtt->state; switch (otype) { case 'C': @@ -965,7 +972,7 @@ W = (MQTTAsync_willOptions *) AOBJ(mqtt, 'W'); /* XXX LWT */ C->will = (W && W->topicName ? W : NULL); - C->username = (u && u->user ? u->user : mqtt->user);; + C->username = (u && u->user ? u->user : mqtt->user); C->password = (u && u->password ? u->password : mqtt->password); C->connectTimeout = 30; /* XXX secs. configure?*/ @@ -974,8 +981,8 @@ S = (MQTTAsync_SSLOptions *) AOBJ(mqtt, 'S'); /* XXX SSL */ C->ssl = (S && S->keyStore) ? S : NULL; /* XXX */ - C->onSuccess = onConnect; - C->onFailure = onConnectFailure; + C->onSuccess = pahoOnConnect; + C->onFailure = pahoOnConnectFailure; C->context = mqtt; C->serverURIcount = 0; @@ -999,8 +1006,8 @@ D = &p->D; memcpy(D, &q->D, sizeof(*D)); D->timeout = mqtt->timeout; - D->onSuccess = onDisconnect; - D->onFailure = onDisconnectFailure; + D->onSuccess = pahoOnDisconnect; + D->onFailure = pahoOnDisconnectFailure; D->context = mqtt; ptr = (void *) D; break; @@ -1054,55 +1061,170 @@ assert(0); break; } -#endif /* WITH_MQTT */ return ptr; } -rpmRC rpmmqttConnect(rpmmqtt mqtt) +static +rpmRC pahoDestroy(rpmmqtt mqtt) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + rc = check(mqtt, "destroy", + (MQTTAsync_destroy(&mqtt->I), 0)); + mqtt->state = _free(mqtt->state); + mqtt->I = NULL; + return rc; +} + +static +rpmRC pahoCreate(rpmmqtt mqtt) { + static int oneshot; + int _lvl = RPMLOG_DEBUG; rpmRC rc = RPMRC_FAIL; /* assume failure */ + int xx; + +#ifdef DYING +mqtt->trace = 4; /* XXX */ +#endif + if (mqtt->trace && rpmIsDebug()) { + xx = check(mqtt, "setTraceCallback", + (MQTTAsync_setTraceCallback(onTrace), 0)); + xx = check(mqtt, "setTraceLevel", + (MQTTAsync_setTraceLevel((enum MQTTASYNC_TRACE_LEVELS)mqtt->trace), 0)); + } + + rpmlog(_lvl, "==================== %s\n", mqtt->vec->name); + + if (!oneshot) { + if (mqtt->trace == 0) { + MQTTAsync_nameValue *NV = MQTTAsync_getVersionInfo(); + while (NV->name) { + rpmlog(_lvl, "%19s: %s\n", NV->name, NV->value); + NV++; + } + } + oneshot++; + } + + /* XXX improve integration */ +static const char _mqtt_persist[] = + "%{?_mqtt_persist}%{!?_mqtt_persist:2}"; + mqtt->persist_type = (rpmmqttExpandNumeric(mqtt, _mqtt_persist) % 3); +static const char _mqtt_cachedir[] = + "%{?_mqtt_cachedir}%{!?_mqtt_cachedir:/var/cache/mqtt}"; +char *persist_path = rpmGetPath(_mqtt_cachedir, NULL); + + rpmlog(_lvl, "%19s: %s\n", "clientid", mqtt->clientid); + rpmlog(_lvl, "%19s: %s qos(%d) timeout(%u msecs)\n", + "topic", mqtt->topic, mqtt->qos, mqtt->timeout); + rpmlog(_lvl, "%19s: type(%u) %s\n", + "persist", mqtt->persist_type, + (mqtt->persist_type ? persist_path : "")); + rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags)); + if (mqtt->trace) + rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace); + +mqtt->persist_path = _free(mqtt->persist_path); +mqtt->persist_ctx = _free(mqtt->persist_ctx); + switch (mqtt->persist_type) { + default: + case MQTTCLIENT_PERSISTENCE_NONE: + mqtt->persist_ctx = NULL; + break; + case MQTTCLIENT_PERSISTENCE_DEFAULT: + { + mqtt->persist_path = xstrdup(persist_path); + /* XXX rpmmqttFini double free */ + mqtt->persist_ctx = (void *)xstrdup(mqtt->persist_path); + } break; + case MQTTCLIENT_PERSISTENCE_USER: + { + mqtt->persist_path = xstrdup(persist_path); + MQTTClient_persistence * ctx = + (MQTTClient_persistence *) xmalloc(sizeof(*ctx)); + *ctx = _mqtt_persistence; /* structure assignment */ + ctx->context = mqtt; + mqtt->persist_ctx = ctx; + } break; + } +persist_path = _free(persist_path); + +mqtt->u = NULL; +dumpMQTT(__FUNCTION__, mqtt); + + if (mqtt->I == NULL) { + xx = check(mqtt, "createWithOptions", + MQTTAsync_createWithOptions(&mqtt->I, + mqtt->uri, mqtt->clientid, + mqtt->persist_type, mqtt->persist_ctx, + (MQTTAsync_createOptions *)AOBJ(mqtt, 'O'))); + + xx = check(mqtt, "setCallbacks", + MQTTAsync_setCallbacks(mqtt->I, mqtt, + pahoOnConnectionLost, + pahoOnMessageArrived, + pahoOnDeliveryComplete)); + } + + rc = RPMRC_OK; -SPEW((stderr, "--> %s(%p)\n", __FUNCTION__, mqtt)); -#ifdef WITH_MQTT - if (!MQTTAsync_isConnected(mqtt->I)) { + return rc; +} + +static +rpmRC pahoDisconnect(rpmmqtt mqtt) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + if (rpmmqttIsConnected(mqtt) == RPMRC_OK) { mqtt->finished = 0; - rc = check(mqtt, "connect", - MQTTAsync_connect(mqtt->I, - (const MQTTAsync_connectOptions*)AOBJ(mqtt, 'C'))); + rc = check(mqtt, "disconnect", + MQTTAsync_disconnect(mqtt->I, + (const MQTTAsync_disconnectOptions*)AOBJ(mqtt, 'D'))); while (!mqtt->finished) - usleep(1000); + usleep(100); if (rc) goto exit; } rc = RPMRC_OK; + exit: -#endif /* WITH_MQTT */ -SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc)); return rc; } -rpmRC rpmmqttDisconnect(rpmmqtt mqtt) +static +rpmRC pahoConnect(rpmmqtt mqtt) { rpmRC rc = RPMRC_FAIL; /* assume failure */ -#ifdef WITH_MQTT - if (MQTTAsync_isConnected(mqtt->I)) { + + if (rpmmqttIsConnected(mqtt) == RPMRC_NOTFOUND) { mqtt->finished = 0; - rc = check(mqtt, "disconnect", - MQTTAsync_disconnect(mqtt->I, - (const MQTTAsync_disconnectOptions*)AOBJ(mqtt, 'D'))); + rc = check(mqtt, "connect", + MQTTAsync_connect(mqtt->I, + (const MQTTAsync_connectOptions*)AOBJ(mqtt, 'C'))); while (!mqtt->finished) - usleep(100); + usleep(1000); if (rc) goto exit; } rc = RPMRC_OK; + exit: -#endif -SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc)); return rc; } -rpmRC rpmmqttSendMessage(rpmmqtt mqtt, const char * topic, +static +rpmRC pahoIsConnected(rpmmqtt mqtt) +{ + rpmRC rc = RPMRC_NOTFOUND; + + if (check(mqtt, "isconnected", + MQTTAsync_isConnected(mqtt->I))) + rc = RPMRC_OK; + return rc; +} + +static +rpmRC pahoSendMessage(rpmmqtt mqtt, const char * topic, const char * s, size_t ns) { rpmRC rc = RPMRC_FAIL; /* assume failure */ @@ -1114,8 +1236,7 @@ if (ns == 0) ns = strlen(s); -#ifdef WITH_MQTT - if (!rpmmqttConnect(mqtt)) { + if (rpmmqttIsConnected(mqtt) == RPMRC_OK) { MQTTAsync_message *M = (MQTTAsync_message *) AOBJ(mqtt, 'M'); M->payloadlen = ns; @@ -1123,8 +1244,8 @@ MQTTAsync_responseOptions *R = (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R'); - R->onSuccess = onSend; - R->onFailure = onSendFailure; + R->onSuccess = pahoOnSend; + R->onFailure = pahoOnSendFailure; mqtt->finished = 0; rc = check(mqtt, "sendMessage", @@ -1135,21 +1256,18 @@ goto exit; rc = RPMRC_OK; } -exit: -#endif /* WITH_MQTT */ -SPEW((stderr, "<-- %s(%p,%p[%u]) rc %d\n", __FUNCTION__, mqtt, s, (unsigned)ns, rc)); +exit: return rc; } -rpmRC rpmmqttSubscribeMany(rpmmqtt mqtt, int ac, char ** av) +static +rpmRC pahoSubscribeMany(rpmmqtt mqtt, int ac, char ** av) { rpmRC rc = RPMRC_FAIL; /* assume failure */ -SPEW((stderr, "--> %s(%p,%p[%d])\n", __FUNCTION__, mqtt, av, ac)); if (ac <= 0) goto exit; -#ifdef WITH_MQTT if (!rpmmqttConnect(mqtt)) { int _lvl = RPMLOG_DEBUG; int *subqos = (int *) xcalloc(ac, sizeof(*subqos)); @@ -1171,8 +1289,8 @@ MQTTAsync_responseOptions *R = (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R'); - R->onSuccess = onSubscribeMany; - R->onFailure = onSubscribeManyFailure; + R->onSuccess = pahoOnSubscribeMany; + R->onFailure = pahoOnSubscribeManyFailure; mqtt->finished = 0; rc = check(mqtt, "subscribeMany", @@ -1185,19 +1303,16 @@ goto exit; rc = RPMRC_OK; } -#endif /* WITH_MQTT */ exit: -SPEW((stderr, "<-- %s(%p,%p[%d]) rc %d\n", __FUNCTION__, mqtt, av, ac, rc)); return rc; } -rpmRC rpmmqttSubscribe(rpmmqtt mqtt, const char * topic, int qos) +static +rpmRC pahoSubscribe(rpmmqtt mqtt, const char * topic, int qos) { rpmRC rc = RPMRC_FAIL; /* assume failure */ -SPEW((stderr, "--> %s(%p,%p,%d)\n", __FUNCTION__, mqtt, topic, qos)); -#ifdef WITH_MQTT if (!rpmmqttConnect(mqtt)) { int _lvl = RPMLOG_DEBUG; @@ -1205,8 +1320,8 @@ MQTTAsync_responseOptions *R = (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R'); - R->onSuccess = onSubscribe; - R->onFailure = onSubscribeFailure; + R->onSuccess = pahoOnSubscribe; + R->onFailure = pahoOnSubscribeFailure; mqtt->finished = 0; rc = check(mqtt, "subscribe", @@ -1217,19 +1332,16 @@ goto exit; rc = RPMRC_OK; } -#endif /* WITH_MQTT */ exit: -SPEW((stderr, "<-- %s(%p,%p[%d]) rc %d\n", __FUNCTION__, mqtt, topic, qos, rc)); return rc; } -rpmRC rpmmqttUnsubscribe(rpmmqtt mqtt, const char * topic) +static +rpmRC pahoUnsubscribe(rpmmqtt mqtt, const char * topic) { rpmRC rc = RPMRC_FAIL; /* assume failure */ -SPEW((stderr, "--> %s(%p,\"%s\")\n", __FUNCTION__, mqtt, topic)); -#ifdef WITH_MQTT if (!rpmmqttConnect(mqtt)) { int _lvl = RPMLOG_DEBUG; @@ -1237,8 +1349,8 @@ MQTTAsync_responseOptions *R = (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R'); - R->onSuccess = onUnsubscribeMany; - R->onFailure = onUnsubscribeManyFailure; + R->onSuccess = pahoOnUnsubscribeMany; + R->onFailure = pahoOnUnsubscribeManyFailure; mqtt->finished = 0; rc = check(mqtt, "unsubscribe", @@ -1249,21 +1361,18 @@ goto exit; rc = RPMRC_OK; } -#endif /* WITH_MQTT */ exit: -SPEW((stderr, "<-- %s(%p,\"%s\") rc %d\n", __FUNCTION__, mqtt, topic, rc)); return rc; } -rpmRC rpmmqttUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av) +static +rpmRC pahoUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av) { rpmRC rc = RPMRC_FAIL; /* assume failure */ -SPEW((stderr, "--> %s(%p,%p[%d])\n", __FUNCTION__, mqtt, av, ac)); if (ac <= 0) goto exit; -#ifdef WITH_MQTT if (!rpmmqttConnect(mqtt)) { int _lvl = RPMLOG_DEBUG; for (int i = 0; i < ac; i++) { @@ -1273,8 +1382,8 @@ MQTTAsync_responseOptions *R = (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R'); - R->onSuccess = onUnsubscribeMany; - R->onFailure = onUnsubscribeManyFailure; + R->onSuccess = pahoOnUnsubscribeMany; + R->onFailure = pahoOnUnsubscribeManyFailure; mqtt->finished = 0; rc = check(mqtt, "unsubscribeMany", @@ -1285,67 +1394,1492 @@ goto exit; rc = RPMRC_OK; } -#endif /* WITH_MQTT */ exit: -SPEW((stderr, "<-- %s(%p,%p[%d]) rc %d\n", __FUNCTION__, mqtt, av, ac, rc)); return rc; } +static KEY pahoErrs[] = { +#define _ENTRY(_v) { MQTTASYNC_##_v, #_v, } + _ENTRY(SUCCESS), + _ENTRY(FAILURE), + _ENTRY(PERSISTENCE_ERROR), + _ENTRY(DISCONNECTED), + _ENTRY(MAX_MESSAGES_INFLIGHT), + _ENTRY(BAD_UTF8_STRING), + _ENTRY(NULL_PARAMETER), + _ENTRY(TOPICNAME_TRUNCATED), + _ENTRY(BAD_STRUCTURE), + _ENTRY(BAD_QOS), + _ENTRY(NO_MORE_MSGIDS), + _ENTRY(OPERATION_INCOMPLETE), + _ENTRY(MAX_BUFFERED_MESSAGES), +#undef _ENTRY + { 0, NULL }, +}; + +static +struct mqttVec_s pahoVec = { + .name = "paho", + .port = 1883, + .sport = 8883, + .prefix = "MQTTAsync_", + .errs = pahoErrs, + .nerrs = sizeof(pahoErrs) / sizeof(pahoErrs[0]), + .destroy = pahoDestroy, + .create = pahoCreate, + .disconnect = pahoDisconnect, + .connect = pahoConnect, + .isconnected = pahoIsConnected, + .unsubscribe = pahoUnsubscribe, + .subscribe = pahoSubscribe, + .unsubscribeMany = pahoUnsubscribeMany, + .subscribeMany = pahoSubscribeMany, + .sendMessage = pahoSendMessage, +}; +#endif /* WITH_PAHO */ + /*==============================================================*/ -rpmRC rpmmqttPub(rpmmqtt mqtt, const char * topic, const char *s, size_t ns) +#ifdef WITH_MOSQUITTO +static int mosqGetPassword(char *b, int size, int rwflag, void * _mqtt) { - rpmRC rc = RPMRC_FAIL; /* assume failure */ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + (void)mqtt; + int nb = 0; + /* XXX returns password in b[nb] */ + return nb; +} - if (topic == NULL) - topic = mqtt->topic; - if (s == NULL) - s = ""; - if (ns == 0) - ns = strlen(s); +static void mosqOnConnect(struct mosquitto * I, void * _mqtt, int rc) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + rpmlog(RPMLOG_DEBUG, "%s: connected rc(%d)\n", mqtt->vec->name, rc); + mqtt->finished = 1; +} -#ifdef WITH_MQTT - if (rpmmqttConnect(mqtt)) - goto exit; - { static char _mqtt_prefix[] = "%{?_mqtt_prefix}"; - /* XXX extra space: prepend in *sendMessage or *send */ - char * t = rpmmqttExpand(mqtt, NULL, - _mqtt_prefix, " ", s, NULL); - size_t nt = strlen(t); +static void mosqOnDisconnect(struct mosquitto * I, void * _mqtt, int rc) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + rpmlog(RPMLOG_DEBUG, "%s: disconnected rc(%d)\n", mqtt->vec->name, rc); +mqtt->connected = 0; + mqtt->finished = 1; +} - rc = rpmmqttSendMessage(mqtt, topic, t, nt); - t = _free(t); - if (rc) - goto exit; - rc = RPMRC_OK; +static void mosqOnPublish(struct mosquitto * I, void * _mqtt, int mid) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + rpmlog(RPMLOG_DEBUG, "%s: sent(%d)\n", mqtt->vec->name, mid); + mqtt->token = mid; + mqtt->finished = 1; +} + +static void mosqOnMessage(struct mosquitto * I, void * _mqtt, + const struct mosquitto_message *msg) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + + mqtt->msg_count++; + mqtt->token = msg->mid; + + const char * s = (const char *) msg->payload; + size_t ns = msg->payloadlen; + + rpmlog(RPMLOG_DEBUG, "%s: rcvd topic(%s) qos(%d) retain(%d) \"%.*s\"\n", + mqtt->vec->name, msg->topic, msg->qos, (msg->retain ? 1 : 0), + (int)ns, s); + + if (mqtt->iob) { + mqtt->iob = rpmiobAppend(mqtt->iob, msg->topic, 0); + mqtt->iob = rpmiobAppend(mqtt->iob, ":\t", 0); + { char * t = (char *) memcpy(alloca(ns+1), s, ns); + t[ns] = '\0'; + mqtt->iob = rpmiobAppend(mqtt->iob, t, 1); + } + } + switch (mqtt->msg_output) { + case MQTT_OUTPUT_UNKNOWN: + case MQTT_OUTPUT_CALLBACK: + default: + break; + case MQTT_OUTPUT_STDOUT: + case MQTT_OUTPUT_FILE: + if (mqtt->ofd) { + size_t nw; + nw = Fwrite(msg->topic, sizeof(*msg->topic), strlen(msg->topic), + mqtt->ofd); + nw = Fwrite(":\t", 1, sizeof(":\t")-1, mqtt->ofd); + nw = Fwrite(s, sizeof(*s), ns, mqtt->ofd); + nw = Fwrite("\n", 1, sizeof("\n")-1, mqtt->ofd); + (void)nw; + } + break; } +#ifdef NEW + message->qos + message->retained + message->dup + message->msgid +#endif -exit: -#endif /* WITH_MQTT */ + mqtt->finished = 1; +} -SPEW((stderr, "<-- %s(%p,%p[%u]) rc %d\n", __FUNCTION__, mqtt, s, (unsigned)ns, (int)rc)); - return rc; +static void mosqOnSubscribe(struct mosquitto * I, void * _mqtt, int mid, + int qos_counted, const int *granted_qos) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + rpmlog(RPMLOG_DEBUG, "%s: subscribed(%d) qos %p[%d]\n", + mqtt->vec->name, mid, granted_qos, qos_counted); + mqtt->token = mid; + mqtt->finished = 1; } -rpmRC rpmmqttSub(rpmmqtt mqtt, const char *s, size_t ns) +static void mosqOnUnsubscribe(struct mosquitto * I, void * _mqtt, int mid) { - rpmRC rc = RPMRC_FAIL; /* assume failure */ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + rpmlog(RPMLOG_DEBUG, "%s: unsubscribed(%d)\n", mqtt->vec->name, mid); + mqtt->token = mid; + mqtt->finished = 1; +} - if (ns == 0) ns = strlen(s); +static void mosqOnLog(struct mosquitto * I, void * _mqtt, + int logtype, const char *str) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + const char * subsys = ""; + int lvl = RPMLOG_DEBUG; -#ifdef WITH_MQTT - if (rpmmqttConnect(mqtt)) - goto exit; + if (logtype & MOSQ_LOG_INFO) + lvl = RPMLOG_INFO; + if (logtype & MOSQ_LOG_NOTICE) + lvl = RPMLOG_NOTICE; + if (logtype & MOSQ_LOG_WARNING) + lvl = RPMLOG_WARNING; + if (logtype & MOSQ_LOG_ERR) + lvl = RPMLOG_ERR; + if (logtype & MOSQ_LOG_DEBUG) + lvl = RPMLOG_DEBUG; + if (logtype & MOSQ_LOG_SUBSCRIBE) + subsys = "subscribe: "; + if (logtype & MOSQ_LOG_SUBSCRIBE) + subsys = "subscribe: "; + if (logtype & MOSQ_LOG_UNSUBSCRIBE) + subsys = "unsubscribe: "; + if (logtype & MOSQ_LOG_WEBSOCKETS) + subsys = "websockets: "; - { char * subtopic = rpmmqttExpand(mqtt, NULL, - (s ? s : mqtt->topic), NULL); - unsigned subqos = mqtt->qos; - char *t, *te; - int _lvl = RPMLOG_DEBUG; + rpmlog(RPMLOG_DEBUG, "%s: %s%s\n", mqtt->vec->name, subsys, str); +} - if ((t = strchr(subtopic, '?')) != NULL) { - ARGV_t qav = NULL; - int qac; +static +rpmRC mosqDestroy(rpmmqtt mqtt) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + int xx; +#ifdef NOTYET + mqtt->state = _free(mqtt->state); +#endif + if (mqtt->I) { + + xx = check(mqtt, "loop_stop", + mosquitto_loop_stop(mqtt->I, true)); + + xx = check(mqtt, "destroy", + (mosquitto_destroy(mqtt->I), 0)); + } + mqtt->I = NULL; + mosquitto_lib_cleanup(); + return rc; +} + +static +rpmRC mosqCreate(rpmmqtt mqtt) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + static int oneshot; + int _lvl = RPMLOG_DEBUG; + int xx; + +#ifdef DYING +mqtt->trace = 4; /* XXX */ +#endif + + rpmlog(_lvl, "==================== %s\n", mqtt->vec->name); + + if (!oneshot) { + int major = 0; + int minor = 0; + int revision = 0; + int version = mosquitto_lib_version(&major, &minor, &revision); + rpmlog(_lvl, "%19s: %d.%d.%d (%d)\n", "version", + major, minor, revision, version); + oneshot++; + } + + rpmlog(_lvl, "%19s: %s\n", "clientid", mqtt->clientid); + rpmlog(_lvl, "%19s: %s qos(%d) timeout(%u msecs)\n", + "topic", mqtt->topic, mqtt->qos, mqtt->timeout); + rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags)); + if (mqtt->trace) + rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace); + +mqtt->u = NULL; +dumpMQTT(__FUNCTION__, mqtt); + + xx = check(mqtt, "lib_init", + mosquitto_lib_init()); + + xx = check(mqtt, "new", + (mqtt->I = mosquitto_new(mqtt->clientid, + (MF_ISSET(CLEAN) ? true : false), + mqtt)) == NULL); + + mosquitto_connect_callback_set(mqtt->I, mosqOnConnect); + mosquitto_disconnect_callback_set(mqtt->I, mosqOnDisconnect); + mosquitto_publish_callback_set(mqtt->I, mosqOnPublish); + mosquitto_message_callback_set(mqtt->I, mosqOnMessage); + mosquitto_subscribe_callback_set(mqtt->I, mosqOnSubscribe); + mosquitto_unsubscribe_callback_set(mqtt->I, mosqOnUnsubscribe); + +#ifdef NOTYET + if (mqtt->trace && rpmIsDebug()) +#else + if (mqtt->trace || mqtt->debug) +#endif + mosquitto_log_callback_set(mqtt->I, mosqOnLog); + +/* + * Example 1: + * delay=2, delay_max=10, exponential_backoff=False + * Delays would be: 2, 4, 6, 8, 10, 10, ... + * + * Example 2: + * delay=3, delay_max=30, exponential_backoff=True + * Delays would be: 3, 6, 12, 24, 30, 30, ... + */ + xx = check(mqtt, "reconnect_delay_set", + mosquitto_reconnect_delay_set(mqtt->I, 2, 10, false)); + + xx = check(mqtt, "user_data_set", + (mosquitto_user_data_set(mqtt->I, mqtt), 0)); + + xx = check(mqtt, "loop_start", + mosquitto_loop_start(mqtt->I)); + + rc = RPMRC_OK; + + return rc; +} + +static +rpmRC mosqDisconnect(rpmmqtt mqtt) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + int xx; + + if (rpmmqttIsConnected(mqtt) == RPMRC_OK) { + + xx = check(mqtt, "loop", + mosquitto_loop(mqtt->I, 20000, 1)); /* XXX */ + + mqtt->finished = 0; + rc = check(mqtt, "disconnect", + mosquitto_disconnect(mqtt->I)); + + xx = check(mqtt, "loop", + mosquitto_loop(mqtt->I, -1, 1)); + +mqtt->connected = 0; + if (rc) + goto exit; + } + rc = RPMRC_OK; + +exit: + return rc; +} + +static +rpmRC mosqConnect(rpmmqtt mqtt) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + int xx; + + if (rpmmqttIsConnected(mqtt) == RPMRC_NOTFOUND) { + urlinfo u = mqtt->u; + + if (mqtt->will_topic) { + const char *payload = + (mqtt->will_message ? mqtt->will_message : ""); + int payloadlen = strlen(payload); + xx = check(mqtt, "will_set", + mosquitto_will_set(mqtt->I, + (mqtt->will_topic ? mqtt->will_topic : mqtt->topic), + payloadlen, payload, + mqtt->will_qos, /* XXX mqtt->qos? */ + (MF_ISSET(WILL_RETAIN) ? true : false))); + } else + xx = check(mqtt, "will_clear", + mosquitto_will_clear(mqtt->I)); + + xx = check(mqtt, "username_pw_set", + mosquitto_username_pw_set(mqtt->I, + (u && u->user ? u->user : mqtt->user), + (u && u->password ? u->password : mqtt->password))); + + if (mqtt->cafile || mqtt->_capath) { + xx = check(mqtt, "tls_set", + mosquitto_tls_set(mqtt->I, + mqtt->cafile, mqtt->_capath, + mqtt->cert, mqtt->privkey, mosqGetPassword)); + } + if (MF_ISSET(INSECURE)) { + xx = check(mqtt, "tls_insecure_set", + mosquitto_tls_insecure_set(mqtt->I, true)); + } + if (mqtt->_psk_key) { + xx = check(mqtt, "tls_psk_set", + mosquitto_tls_psk_set(mqtt->I, + mqtt->_psk_key, mqtt->_psk_identity, NULL)); + } + if (mqtt->ciphers && mqtt->_tls_version) { + static int _cert_reqs = 1; /* XXX 0/1: SSL_VERIFY_{NONE,PEER} */ + char * _tls_version = rpmExpand("tlsv", mqtt->_tls_version, NULL); + xx = check(mqtt, "tls_opts_set", + mosquitto_tls_opts_set(mqtt->I, + _cert_reqs, _tls_version, mqtt->ciphers)); + _tls_version = _free(_tls_version); + } + + xx = check(mqtt, "max_inflight_messages_set", + mosquitto_max_inflight_messages_set(mqtt->I, + (mqtt->max_inflight ? mqtt->max_inflight : 20))); + + if (mqtt->_proxy) { + urlinfo u; + int ut = (urltype) urlSplit(mqtt->_proxy, &u); + (void) ut; + xx = check(mqtt, "socks5_set", + mosquitto_socks5_set(mqtt->I, + u->host, + u->port, + u->user, + u->password)); + } + + { int MQTTVersion = MQTT_PROTOCOL_V31; + if (!strcmp(mqtt->protocol_version, "31")) + MQTTVersion = MQTT_PROTOCOL_V31; + if (!strcmp(mqtt->protocol_version, "311")) + MQTTVersion = MQTT_PROTOCOL_V311; + xx = check(mqtt, "opts_set", + mosquitto_opts_set(mqtt->I, + MOSQ_OPT_PROTOCOL_VERSION, &MQTTVersion)); + } + +#ifndef NOTYET + mqtt->finished = 0; + /* XXX mosquitto_connect_srv(..., mqtt->_address? */ + if (mqtt->_address) { + rc = check(mqtt, "connect_bind_async", + mosquitto_connect_bind_async(mqtt->I, + (u && u->host ? u->host : mqtt->host), + (u && u->host ? u->port : mqtt->port), + mqtt->keepalive, mqtt->_address)); + } else + rc = check(mqtt, "connect_async", + mosquitto_connect_async(mqtt->I, + (u && u->host ? u->host : mqtt->host), + (u && u->host ? u->port : mqtt->port), + mqtt->keepalive)); + +#else + if (mqtt->_address) { + if (MF_ISSET(DNSSRV)) { + rc = check(mqtt, "connect_srv", + mosquitto_connect_srv(mqtt->I, + (u && u->host ? u->host : mqtt->host), + mqtt->keepalive, mqtt->_address)); + } else { + rc = check(mqtt, "connect_bind", + mosquitto_connect_bind(mqtt->I, + (u && u->host ? u->host : mqtt->host), + (u && u->host ? u->port : mqtt->port), + mqtt->keepalive, mqtt->_address)); + } + } else + rc = check(mqtt, "connect", + mosquitto_connect(mqtt->I, + (u && u->host ? u->host : mqtt->host), + (u && u->host ? u->port : mqtt->port), + mqtt->keepalive)); +#endif + + xx = check(mqtt, "loop", + mosquitto_loop(mqtt->I, -1, 1)); + + if (rc) + goto exit; +mqtt->connected = 1; + } + rc = RPMRC_OK; + +exit: + return rc; +} + +static +rpmRC mosqIsConnected(rpmmqtt mqtt) +{ + rpmRC rc = (mqtt->connected ? RPMRC_OK : RPMRC_NOTFOUND); + return rc; +} + +static +rpmRC mosqSendMessage(rpmmqtt mqtt, const char * topic, + const char * s, size_t ns) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + int xx; + + if (topic == NULL) + topic = mqtt->topic; + if (s == NULL) + s = ""; + if (ns == 0) + ns = strlen(s); + + if (!rpmmqttConnect(mqtt)) { + + mqtt->finished = 0; + rc = check(mqtt, "publish", + mosquitto_publish(mqtt->I, + (int *)&mqtt->token, + (topic ? topic : mqtt->topic), + ns, s, + mqtt->qos, + (MF_ISSET(RETAIN) ? true : false))); + + rpmlog(RPMLOG_DEBUG, "%s: send(%d) topic(%s) \"%.*s\"\n", + mqtt->vec->name, mqtt->token, mqtt->topic, (int)ns, s); + + xx = check(mqtt, "loop", + mosquitto_loop(mqtt->I, -1, 1)); + + if (rc) + goto exit; + rc = RPMRC_OK; + } + +exit: + return rc; +} + +static +rpmRC mosqSubscribeMany(rpmmqtt mqtt, int ac, char ** av) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + + if (ac <= 0) + goto exit; + if (!rpmmqttConnect(mqtt)) { + int *subqos = (int *) xcalloc(ac, sizeof(*subqos)); + for (int i = 0; i < ac; i++) { + char * t = av[i]; +#ifdef NOTYET /* XXX qos as subtopic ?qos=N? */ + char * te = strchr(t, '?'); + if (te) { + *te++ = '\0'; + if ((te = strchr(te, '='))) { + if (!strncmp(t, "qos", (te - t))) + subqos[i] = strtoul(te+1, NULL, 0); + } + } else +#endif + subqos[i] = mqtt->qos; /* XXX */ + rc = rpmmqttSubscribe(mqtt, t, subqos[i]); + if (rc) + break; + } + + if (subqos) + free(subqos); + if (rc) + goto exit; + rc = RPMRC_OK; + } + +exit: + return rc; +} + +static +rpmRC mosqSubscribe(rpmmqtt mqtt, const char * topic, int qos) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + int xx; + + if (!rpmmqttConnect(mqtt)) { + + rpmlog(RPMLOG_DEBUG, "%19s: %s qos(%d)\n", "subscribe", topic, qos); + + mqtt->finished = 0; + rc = check(mqtt, "subscribe", + mosquitto_subscribe(mqtt->I, + (int *)&mqtt->token, + (topic ? topic : mqtt->topic), + qos)); + + xx = check(mqtt, "loop", + mosquitto_loop(mqtt->I, -1, 1)); + + if (rc) + goto exit; + rc = RPMRC_OK; + } + +exit: + return rc; +} + +static +rpmRC mosqUnsubscribe(rpmmqtt mqtt, const char * topic) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + int xx; + + if (!rpmmqttConnect(mqtt)) { + + rpmlog(RPMLOG_DEBUG, "%19s: %s\n", "unsubscribe", topic); + + mqtt->finished = 0; + rc = check(mqtt, "unsubscribe", + mosquitto_unsubscribe(mqtt->I, + (int *)&mqtt->token, + (topic ? topic : mqtt->topic))); + + xx = check(mqtt, "loop", + mosquitto_loop(mqtt->I, -1, 1)); + + if (rc) + goto exit; + rc = RPMRC_OK; + } + +exit: + return rc; +} + +static +rpmRC mosqUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + + if (ac <= 0) + goto exit; + if (!rpmmqttConnect(mqtt)) { + for (int i = 0; i < ac; i++) { + rc = rpmmqttUnsubscribe(mqtt, av[i]); + if (rc) + goto exit; + } + } + +exit: + return rc; +} + +static KEY mosqErrs[] = { +#define _ENTRY(_v) { MOSQ_ERR_##_v, #_v, } + _ENTRY(CONN_PENDING), + _ENTRY(SUCCESS), + _ENTRY(NOMEM), + _ENTRY(PROTOCOL), + _ENTRY(INVAL), + _ENTRY(NO_CONN), + _ENTRY(CONN_REFUSED), + _ENTRY(NOT_FOUND), + _ENTRY(CONN_LOST), + _ENTRY(TLS), + _ENTRY(PAYLOAD_SIZE), + _ENTRY(NOT_SUPPORTED), + _ENTRY(AUTH), + _ENTRY(ACL_DENIED), + _ENTRY(UNKNOWN), + _ENTRY(ERRNO), + _ENTRY(EAI), + _ENTRY(PROXY), +#undef _ENTRY + { 0, NULL }, +}; + +static +struct mqttVec_s mosqVec = { + .name = "mosquitto", + .port = 1883, + .sport = 8883, + .prefix = "mosquitto_", + .errs = mosqErrs, + .nerrs = sizeof(mosqErrs) / sizeof(mosqErrs[0]), + .destroy = mosqDestroy, + .create = mosqCreate, + .disconnect = mosqDisconnect, + .connect = mosqConnect, + .isconnected = mosqIsConnected, + .unsubscribe = mosqUnsubscribe, + .subscribe = mosqSubscribe, + .unsubscribeMany = mosqUnsubscribeMany, + .subscribeMany = mosqSubscribeMany, + .sendMessage = mosqSendMessage, +}; +#endif /* WITH_MOSQUITTO */ + +/*==============================================================*/ +#ifdef WITH_RABBITMQ +static +rpmRC amqpDestroy(rpmmqtt mqtt) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ +#ifdef NOTYET + rc = check(mqtt, "destroy", + (MQTTAsync_destroy(&mqtt->I), 0)); + mqtt->state = _free(mqtt->state); + mqtt->I = NULL; +#endif + return rc; +} + +static +rpmRC amqpCreate(rpmmqtt mqtt) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ +#ifdef NOTYET + static int oneshot; + int _lvl = RPMLOG_DEBUG; + int xx; + +#ifdef DYING +mqtt->trace = 4; /* XXX */ +#endif + if (mqtt->trace && rpmIsDebug()) { + xx = check(mqtt, "setTraceCallback", + (MQTTAsync_setTraceCallback(onTrace), 0)); + xx = check(mqtt, "setTraceLevel", + (MQTTAsync_setTraceLevel((enum MQTTASYNC_TRACE_LEVELS)mqtt->trace), 0)); + } + + rpmlog(_lvl, "==================== %s\n", mqtt->vec->name); + + if (!oneshot) { + if (mqtt->trace == 0) { + MQTTAsync_nameValue *NV = MQTTAsync_getVersionInfo(); + while (NV->name) { + rpmlog(_lvl, "%19s: %s\n", NV->name, NV->value); + NV++; + } + } + oneshot++; + } + + /* XXX improve integration */ +static const char _mqtt_persist[] = + "%{?_mqtt_persist}%{!?_mqtt_persist:2}"; + mqtt->persist_type = (rpmmqttExpandNumeric(mqtt, _mqtt_persist) % 3); +static const char _mqtt_cachedir[] = + "%{?_mqtt_cachedir}%{!?_mqtt_cachedir:/var/cache/mqtt}"; +char *persist_path = rpmGetPath(_mqtt_cachedir, NULL); + + rpmlog(_lvl, "%19s: %s\n", "clientid", mqtt->clientid); + rpmlog(_lvl, "%19s: %s qos(%d) timeout(%u msecs)\n", + "topic", mqtt->topic, mqtt->qos, mqtt->timeout); + rpmlog(_lvl, "%19s: type(%u) %s\n", + "persist", mqtt->persist_type, + (mqtt->persist_type ? persist_path : "")); + rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags)); + if (mqtt->trace) + rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace); + +mqtt->persist_path = _free(mqtt->persist_path); +mqtt->persist_ctx = _free(mqtt->persist_ctx); + switch (mqtt->persist_type) { + default: + case MQTTCLIENT_PERSISTENCE_NONE: + mqtt->persist_ctx = NULL; + break; + case MQTTCLIENT_PERSISTENCE_DEFAULT: + { + mqtt->persist_path = xstrdup(persist_path); + /* XXX rpmmqttFini double free */ + mqtt->persist_ctx = (void *)xstrdup(mqtt->persist_path); + } break; + case MQTTCLIENT_PERSISTENCE_USER: + { + mqtt->persist_path = xstrdup(persist_path); + MQTTClient_persistence * ctx = + (MQTTClient_persistence *) xmalloc(sizeof(*ctx)); + *ctx = _mqtt_persistence; /* structure assignment */ + ctx->context = mqtt; + mqtt->persist_ctx = ctx; + } break; + } +persist_path = _free(persist_path); + +mqtt->u = NULL; +dumpMQTT(__FUNCTION__, mqtt); + + if (mqtt->I == NULL) { + xx = check(mqtt, "createWithOptions", + MQTTAsync_createWithOptions(&mqtt->I, + mqtt->uri, mqtt->clientid, + mqtt->persist_type, mqtt->persist_ctx, + (MQTTAsync_createOptions *)AOBJ(mqtt, 'O'))); + + xx = check(mqtt, "setCallbacks", + MQTTAsync_setCallbacks(mqtt->I, mqtt, + amqpOnConnectionLost, + amqpOnMessageArrived, + amqpOnDeliveryComplete)); + } + + rc = RPMRC_OK; +#endif /* NOTYET */ + + return rc; +} + +static +rpmRC amqpDisconnect(rpmmqtt mqtt) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + if (rpmmqttIsConnected(mqtt) == RPMRC_OK) { +#ifdef NOTYET + mqtt->finished = 0; + rc = check(mqtt, "disconnect", + MQTTAsync_disconnect(mqtt->I, + (const MQTTAsync_disconnectOptions*)AOBJ(mqtt, 'D'))); + while (!mqtt->finished) + usleep(100); +#endif + if (rc) + goto exit; + } + rc = RPMRC_OK; + +exit: + return rc; +} + +static +rpmRC amqpConnect(rpmmqtt mqtt) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + + if (rpmmqttIsConnected(mqtt) == RPMRC_NOTFOUND) { +#ifdef NOTYET + mqtt->finished = 0; + rc = check(mqtt, "connect", + MQTTAsync_connect(mqtt->I, + (const MQTTAsync_connectOptions*)AOBJ(mqtt, 'C'))); + while (!mqtt->finished) + usleep(1000); +#endif + if (rc) + goto exit; + } + rc = RPMRC_OK; + +exit: + return rc; +} + +static +rpmRC amqpIsConnected(rpmmqtt mqtt) +{ + rpmRC rc = RPMRC_NOTFOUND; + +#ifdef NOTYET + if (check(mqtt, "isconnected", + MQTTAsync_isConnected(mqtt->I))) + rc = RPMRC_OK; +#endif + return rc; +} + +static +rpmRC amqpSendMessage(rpmmqtt mqtt, const char * topic, + const char * s, size_t ns) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + + if (topic == NULL) + topic = mqtt->topic; + if (s == NULL) + s = ""; + if (ns == 0) + ns = strlen(s); + + if (!rpmmqttConnect(mqtt)) { +#ifdef NOTYET + MQTTAsync_message *M = + (MQTTAsync_message *) AOBJ(mqtt, 'M'); + M->payloadlen = ns; + M->payload = (char *) s; + + MQTTAsync_responseOptions *R = + (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R'); + R->onSuccess = amqpOnSend; + R->onFailure = amqpOnSendFailure; + + mqtt->finished = 0; + rc = check(mqtt, "sendMessage", + MQTTAsync_sendMessage(mqtt->I, topic, M, R)); + while (!mqtt->finished) + usleep(100); +#endif + if (rc) + goto exit; + rc = RPMRC_OK; + } + +exit: + return rc; +} + +static +rpmRC amqpSubscribeMany(rpmmqtt mqtt, int ac, char ** av) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + + if (ac <= 0) + goto exit; + if (!rpmmqttConnect(mqtt)) { + int _lvl = RPMLOG_DEBUG; + int *subqos = (int *) xcalloc(ac, sizeof(*subqos)); + for (int i = 0; i < ac; i++) { + char * t = av[i]; +#ifdef NOTYET + char * te = strchr(t, '?'); + if (te) { + *te++ = '\0'; + if ((te = strchr(te, '='))) { + if (!strncmp(t, "qos", (te - t))) + subqos[i] = strtoul(te+1, NULL, 0); + } + } else +#endif + subqos[i] = mqtt->qos; /* XXX */ + rpmlog(_lvl, "%19s: %s qos(%u)\n", "subscribe", t, subqos[i]); + } + +#ifdef NOTYET + MQTTAsync_responseOptions *R = + (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R'); + R->onSuccess = amqpOnSubscribeMany; + R->onFailure = amqpOnSubscribeManyFailure; + + mqtt->finished = 0; + rc = check(mqtt, "subscribeMany", + MQTTAsync_subscribeMany(mqtt->I, ac, av, subqos, R)); + while (!mqtt->finished) + usleep(100); +#endif + if (subqos) + free(subqos); + if (rc) + goto exit; + rc = RPMRC_OK; + } + +exit: + return rc; +} + +static +rpmRC amqpSubscribe(rpmmqtt mqtt, const char * topic, int qos) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + + if (!rpmmqttConnect(mqtt)) { + int _lvl = RPMLOG_DEBUG; + + rpmlog(_lvl, "%19s: %s qos(%d)\n", "subscribe", topic, qos); + +#ifdef NOTYET + MQTTAsync_responseOptions *R = + (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R'); + R->onSuccess = amqpOnSubscribe; + R->onFailure = amqpOnSubscribeFailure; + + mqtt->finished = 0; + rc = check(mqtt, "subscribe", + MQTTAsync_subscribe(mqtt->I, topic, qos, R)); + while (!mqtt->finished) + usleep(100); +#endif + if (rc) + goto exit; + rc = RPMRC_OK; + } + +exit: + return rc; +} + +static +rpmRC amqpUnsubscribe(rpmmqtt mqtt, const char * topic) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + + if (!rpmmqttConnect(mqtt)) { + int _lvl = RPMLOG_DEBUG; + + rpmlog(_lvl, "%19s: %s\n", "unsubscribe", topic); + +#ifdef NOTYET + MQTTAsync_responseOptions *R = + (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R'); + R->onSuccess = amqpOnUnsubscribeMany; + R->onFailure = amqpOnUnsubscribeManyFailure; + + mqtt->finished = 0; + rc = check(mqtt, "unsubscribe", + MQTTAsync_unsubscribe(mqtt->I, topic, R)); + while (!mqtt->finished) + usleep(100); +#endif + if (rc) + goto exit; + rc = RPMRC_OK; + } + +exit: + return rc; +} + +static +rpmRC amqpUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + + if (ac <= 0) + goto exit; + if (!rpmmqttConnect(mqtt)) { + int _lvl = RPMLOG_DEBUG; + for (int i = 0; i < ac; i++) { + char * t = av[i]; + rpmlog(_lvl, "%19s: %s\n", "unsubscribe", t); + } + +#ifdef NOTYET + MQTTAsync_responseOptions *R = + (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R'); + R->onSuccess = amqpOnUnsubscribeMany; + R->onFailure = amqpOnUnsubscribeManyFailure; + + mqtt->finished = 0; + rc = check(mqtt, "unsubscribeMany", + MQTTAsync_unsubscribeMany(mqtt->I, ac, av, R)); + while (!mqtt->finished) + usleep(100); +#endif + + if (rc) + goto exit; + rc = RPMRC_OK; + } + +exit: + return rc; +} + +static KEY amqpErrs[] = { +#define _ENTRY(_v) { AMQP_STATUS_##_v, #_v, } + _ENTRY(OK), + _ENTRY(NO_MEMORY), + _ENTRY(BAD_AMQP_DATA), + _ENTRY(UNKNOWN_CLASS), + _ENTRY(UNKNOWN_METHOD), + _ENTRY(HOSTNAME_RESOLUTION_FAILED), + _ENTRY(INCOMPATIBLE_AMQP_VERSION), + _ENTRY(CONNECTION_CLOSED), + _ENTRY(BAD_URL), + _ENTRY(SOCKET_ERROR), + _ENTRY(INVALID_PARAMETER), + _ENTRY(TABLE_TOO_BIG), + _ENTRY(WRONG_METHOD), + _ENTRY(TIMEOUT), + _ENTRY(TIMER_FAILURE), + _ENTRY(HEARTBEAT_TIMEOUT), + _ENTRY(UNEXPECTED_STATE), + _ENTRY(SOCKET_CLOSED), + _ENTRY(SOCKET_INUSE), + _ENTRY(BROKER_UNSUPPORTED_SASL_METHOD), + _ENTRY(UNSUPPORTED), + _ENTRY(TCP_ERROR), + _ENTRY(TCP_SOCKETLIB_INIT_ERROR), + _ENTRY(SSL_ERROR), + _ENTRY(SSL_HOSTNAME_VERIFY_FAILED), + _ENTRY(SSL_PEER_VERIFY_FAILED), + _ENTRY(SSL_CONNECTION_FAILED), +#undef _ENTRY + { 0, NULL }, +}; + +static +struct mqttVec_s amqpVec = { + .name = "amqp", + .port = 5672, + .sport = 5671, + .prefix = "amqp_", + .errs = amqpErrs, + .nerrs = sizeof(amqpErrs) / sizeof(amqpErrs[0]), + .destroy = amqpDestroy, + .create = amqpCreate, + .disconnect = amqpDisconnect, + .connect = amqpConnect, + .isconnected = amqpIsConnected, /* XXX */ + .unsubscribe = amqpUnsubscribe, + .subscribe = amqpSubscribe, + .unsubscribeMany = amqpUnsubscribeMany, + .subscribeMany = amqpSubscribeMany, + .sendMessage = amqpSendMessage, +}; +#endif /* WITH_RABBITMQ */ + +/*==============================================================*/ +#ifdef WITH_ZEROMQ +static +rpmRC zmqDestroy(rpmmqtt mqtt) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ +#ifdef NOTYET + rc = check(mqtt, "destroy", + (MQTTAsync_destroy(&mqtt->I), 0)); + mqtt->state = _free(mqtt->state); + mqtt->I = NULL; +#endif + return rc; +} + +static +rpmRC zmqCreate(rpmmqtt mqtt) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ +#ifdef NOTYET + static int oneshot; + int _lvl = RPMLOG_DEBUG; + int xx; + +#ifdef DYING +mqtt->trace = 4; /* XXX */ +#endif + if (mqtt->trace && rpmIsDebug()) { + xx = check(mqtt, "setTraceCallback", + (MQTTAsync_setTraceCallback(onTrace), 0)); + xx = check(mqtt, "setTraceLevel", + (MQTTAsync_setTraceLevel((enum MQTTASYNC_TRACE_LEVELS)mqtt->trace), 0)); + } + + rpmlog(_lvl, "==================== %s\n", mqtt->vec->name); + + if (!oneshot) { + if (mqtt->trace == 0) { + MQTTAsync_nameValue *NV = MQTTAsync_getVersionInfo(); + while (NV->name) { + rpmlog(_lvl, "%19s: %s\n", NV->name, NV->value); + NV++; + } + } + oneshot++; + } + + /* XXX improve integration */ +static const char _mqtt_persist[] = + "%{?_mqtt_persist}%{!?_mqtt_persist:2}"; + mqtt->persist_type = (rpmmqttExpandNumeric(mqtt, _mqtt_persist) % 3); +static const char _mqtt_cachedir[] = + "%{?_mqtt_cachedir}%{!?_mqtt_cachedir:/var/cache/mqtt}"; +char *persist_path = rpmGetPath(_mqtt_cachedir, NULL); + + rpmlog(_lvl, "%19s: %s\n", "clientid", mqtt->clientid); + rpmlog(_lvl, "%19s: %s qos(%d) timeout(%u msecs)\n", + "topic", mqtt->topic, mqtt->qos, mqtt->timeout); + rpmlog(_lvl, "%19s: type(%u) %s\n", + "persist", mqtt->persist_type, + (mqtt->persist_type ? persist_path : "")); + rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags)); + if (mqtt->trace) + rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace); + +mqtt->persist_path = _free(mqtt->persist_path); +mqtt->persist_ctx = _free(mqtt->persist_ctx); + switch (mqtt->persist_type) { + default: + case MQTTCLIENT_PERSISTENCE_NONE: + mqtt->persist_ctx = NULL; + break; + case MQTTCLIENT_PERSISTENCE_DEFAULT: + { + mqtt->persist_path = xstrdup(persist_path); + /* XXX rpmmqttFini double free */ + mqtt->persist_ctx = (void *)xstrdup(mqtt->persist_path); + } break; + case MQTTCLIENT_PERSISTENCE_USER: + { + mqtt->persist_path = xstrdup(persist_path); + MQTTClient_persistence * ctx = + (MQTTClient_persistence *) xmalloc(sizeof(*ctx)); + *ctx = _mqtt_persistence; /* structure assignment */ + ctx->context = mqtt; + mqtt->persist_ctx = ctx; + } break; + } +persist_path = _free(persist_path); + +mqtt->u = NULL; +dumpMQTT(__FUNCTION__, mqtt); + + if (mqtt->I == NULL) { + xx = check(mqtt, "createWithOptions", + MQTTAsync_createWithOptions(&mqtt->I, + mqtt->uri, mqtt->clientid, + mqtt->persist_type, mqtt->persist_ctx, + (MQTTAsync_createOptions *)AOBJ(mqtt, 'O'))); + + xx = check(mqtt, "setCallbacks", + MQTTAsync_setCallbacks(mqtt->I, mqtt, + zmqOnConnectionLost, + zmqOnMessageArrived, + zmqOnDeliveryComplete)); + } + + rc = RPMRC_OK; +#endif /* NOTYET */ + + return rc; +} + +static +rpmRC zmqDisconnect(rpmmqtt mqtt) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + if (rpmmqttIsConnected(mqtt) == RPMRC_OK) { +#ifdef NOTYET + mqtt->finished = 0; + rc = check(mqtt, "disconnect", + MQTTAsync_disconnect(mqtt->I, + (const MQTTAsync_disconnectOptions*)AOBJ(mqtt, 'D'))); + while (!mqtt->finished) + usleep(100); +#endif + if (rc) + goto exit; + } + rc = RPMRC_OK; + +exit: + return rc; +} + +static +rpmRC zmqConnect(rpmmqtt mqtt) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + + if (rpmmqttIsConnected(mqtt) == RPMRC_NOTFOUND) { +#ifdef NOTYET + mqtt->finished = 0; + rc = check(mqtt, "connect", + MQTTAsync_connect(mqtt->I, + (const MQTTAsync_connectOptions*)AOBJ(mqtt, 'C'))); + while (!mqtt->finished) + usleep(1000); +#endif + if (rc) + goto exit; + } + rc = RPMRC_OK; + +exit: + return rc; +} + +static +rpmRC zmqIsConnected(rpmmqtt mqtt) +{ + rpmRC rc = RPMRC_NOTFOUND; + +#ifdef NOTYET + if (check(mqtt, "isconnected", + MQTTAsync_isConnected(mqtt->I))) + rc = RPMRC_OK; +#endif + return rc; +} + +static +rpmRC zmqSendMessage(rpmmqtt mqtt, const char * topic, + const char * s, size_t ns) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + + if (topic == NULL) + topic = mqtt->topic; + if (s == NULL) + s = ""; + if (ns == 0) + ns = strlen(s); + + if (!rpmmqttConnect(mqtt)) { +#ifdef NOTYET + MQTTAsync_message *M = + (MQTTAsync_message *) AOBJ(mqtt, 'M'); + M->payloadlen = ns; + M->payload = (char *) s; + + MQTTAsync_responseOptions *R = + (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R'); + R->onSuccess = zmqOnSend; + R->onFailure = zmqOnSendFailure; + + mqtt->finished = 0; + rc = check(mqtt, "sendMessage", + MQTTAsync_sendMessage(mqtt->I, topic, M, R)); + while (!mqtt->finished) + usleep(100); +#endif + if (rc) + goto exit; + rc = RPMRC_OK; + } + +exit: + return rc; +} + +static +rpmRC zmqSubscribeMany(rpmmqtt mqtt, int ac, char ** av) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + + if (ac <= 0) + goto exit; + if (!rpmmqttConnect(mqtt)) { + int _lvl = RPMLOG_DEBUG; + int *subqos = (int *) xcalloc(ac, sizeof(*subqos)); + for (int i = 0; i < ac; i++) { + char * t = av[i]; +#ifdef NOTYET + char * te = strchr(t, '?'); + if (te) { + *te++ = '\0'; + if ((te = strchr(te, '='))) { + if (!strncmp(t, "qos", (te - t))) + subqos[i] = strtoul(te+1, NULL, 0); + } + } else +#endif + subqos[i] = mqtt->qos; /* XXX */ + rpmlog(_lvl, "%19s: %s qos(%u)\n", "subscribe", t, subqos[i]); + } + +#ifdef NOTYET + MQTTAsync_responseOptions *R = + (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R'); + R->onSuccess = zmqOnSubscribeMany; + R->onFailure = zmqOnSubscribeManyFailure; + + mqtt->finished = 0; + rc = check(mqtt, "subscribeMany", + MQTTAsync_subscribeMany(mqtt->I, ac, av, subqos, R)); + while (!mqtt->finished) + usleep(100); +#endif + if (subqos) + free(subqos); + if (rc) + goto exit; + rc = RPMRC_OK; + } + +exit: + return rc; +} + +static +rpmRC zmqSubscribe(rpmmqtt mqtt, const char * topic, int qos) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + + if (!rpmmqttConnect(mqtt)) { + int _lvl = RPMLOG_DEBUG; + + rpmlog(_lvl, "%19s: %s qos(%d)\n", "subscribe", topic, qos); + +#ifdef NOTYET + MQTTAsync_responseOptions *R = + (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R'); + R->onSuccess = zmqOnSubscribe; + R->onFailure = zmqOnSubscribeFailure; + + mqtt->finished = 0; + rc = check(mqtt, "subscribe", + MQTTAsync_subscribe(mqtt->I, topic, qos, R)); + while (!mqtt->finished) + usleep(100); +#endif + if (rc) + goto exit; + rc = RPMRC_OK; + } + +exit: + return rc; +} + +static +rpmRC zmqUnsubscribe(rpmmqtt mqtt, const char * topic) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + + if (!rpmmqttConnect(mqtt)) { + int _lvl = RPMLOG_DEBUG; + + rpmlog(_lvl, "%19s: %s\n", "unsubscribe", topic); + +#ifdef NOTYET + MQTTAsync_responseOptions *R = + (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R'); + R->onSuccess = zmqOnUnsubscribeMany; + R->onFailure = zmqOnUnsubscribeManyFailure; + + mqtt->finished = 0; + rc = check(mqtt, "unsubscribe", + MQTTAsync_unsubscribe(mqtt->I, topic, R)); + while (!mqtt->finished) + usleep(100); +#endif + if (rc) + goto exit; + rc = RPMRC_OK; + } + +exit: + return rc; +} + +static +rpmRC zmqUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + + if (ac <= 0) + goto exit; + if (!rpmmqttConnect(mqtt)) { + int _lvl = RPMLOG_DEBUG; + for (int i = 0; i < ac; i++) { + char * t = av[i]; + rpmlog(_lvl, "%19s: %s\n", "unsubscribe", t); + } + +#ifdef NOTYET + MQTTAsync_responseOptions *R = + (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R'); + R->onSuccess = zmqOnUnsubscribeMany; + R->onFailure = zmqOnUnsubscribeManyFailure; + + mqtt->finished = 0; + rc = check(mqtt, "unsubscribeMany", + MQTTAsync_unsubscribeMany(mqtt->I, ac, av, R)); + while (!mqtt->finished) + usleep(100); +#endif + + if (rc) + goto exit; + rc = RPMRC_OK; + } + +exit: + return rc; +} + +static KEY zmqErrs[] = { + { 0, NULL }, +}; + +static +struct mqttVec_s zmqVec = { + .name = "zmq", + .port = 0xfffe, /* XXX W2DO? */ + .sport = 0xfffe, /* XXX W2DO? */ + .prefix = "zmq_", + .errs = zmqErrs, /* XXX W2DO? */ + .nerrs = sizeof(zmqErrs) / sizeof(zmqErrs[0]), + .destroy = zmqDestroy, + .create = zmqCreate, + .disconnect = zmqDisconnect, + .connect = zmqConnect, + .isconnected = zmqIsConnected, /* XXX */ + .unsubscribe = zmqUnsubscribe, + .subscribe = zmqSubscribe, + .unsubscribeMany = zmqUnsubscribeMany, + .subscribeMany = zmqSubscribeMany, + .sendMessage = zmqSendMessage, +}; +#endif /* WITH_ZEROMQ */ + +/*==============================================================*/ +rpmRC rpmmqttPub(rpmmqtt mqtt, const char * topic, const char *s, size_t ns) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + + if (topic == NULL) + topic = mqtt->topic; + if (s == NULL) + s = ""; + if (ns == 0) + ns = strlen(s); + + if (rpmmqttConnect(mqtt)) + goto exit; + { static char _mqtt_prefix[] = "%{?_mqtt_prefix}"; + /* XXX extra space: prepend in *sendMessage or *send */ + char * t = rpmmqttExpand(mqtt, NULL, + _mqtt_prefix, " ", s, NULL); + size_t nt = strlen(t); + + rc = rpmmqttSendMessage(mqtt, topic, t, nt); + t = _free(t); + if (rc) + goto exit; + rc = RPMRC_OK; + } + +exit: +SPEW((stderr, "<-- %s(%p,%p[%u]) rc %d\n", __FUNCTION__, mqtt, s, (unsigned)ns, (int)rc)); + return rc; +} + +rpmRC rpmmqttSub(rpmmqtt mqtt, const char *s, size_t ns) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + + if (ns == 0) ns = strlen(s); + + if (rpmmqttConnect(mqtt)) + goto exit; + + { char * subtopic = rpmmqttExpand(mqtt, NULL, + (s ? s : mqtt->topic), NULL); + unsigned subqos = mqtt->qos; + char *t, *te; +int _lvl = RPMLOG_DEBUG; + + /* XXX qos appended to subscription topic? */ + if ((t = strchr(subtopic, '?')) != NULL) { + ARGV_t qav = NULL; + int qac; *t++ = '\0'; (void) argvSplit(&qav, t, ","); @@ -1365,20 +2899,8 @@ rpmlog(_lvl, "%19s: %s qos(%u)\n", "subtopic", subtopic, subqos); - MQTTAsync_responseOptions *R = - (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R'); - R->onSuccess = onSubscribe; - R->onFailure = onSubscribeFailure; - -#ifdef DYING - mqtt->finished = 0; - rc = check(mqtt, "subscribe", - MQTTAsync_subscribe(mqtt->I, subtopic, subqos, R)); - while (rc == 0 && !mqtt->finished) - usleep(100); -#else rc = rpmmqttSubscribe(mqtt, subtopic, subqos); -#endif + subtopic = _free(subtopic); if (rc) goto exit; @@ -1386,8 +2908,6 @@ } exit: -#endif /* WITH_MQTT */ - SPEW((stderr, "<-- %s(%p,%p[%u]) ret %d\n", __FUNCTION__, mqtt, s, (unsigned)ns, (int)rc)); return rc; } @@ -1462,7 +2982,7 @@ N_("Retain the client Will."), NULL }, POPT_TABLEEND }; - struct poptOption rpmmqttSubscribePoptTable[] = { + struct poptOption mqttSubscribePoptTable[] = { { "clean", 'c', POPT_ARG_VAL|POPT_ARGFLAG_XOR,&mqtt->flags, MQTT_FLAGS_CLEAN, N_("(sub) Do not clean session."), NULL }, { NULL, 'C', POPT_ARG_INT, &mqtt->max_msg_count, 0, @@ -1561,7 +3081,7 @@ N_("MQTT Protocol:"), NULL }, - { NULL, '\0', POPT_ARG_INCLUDE_TABLE, &rpmmqttSubscribePoptTable, 0, + { NULL, '\0', POPT_ARG_INCLUDE_TABLE, &mqttSubscribePoptTable, 0, N_("MQTT Subscribe:"), NULL }, @@ -1917,20 +3437,16 @@ static rpmRC rpmmqttInitSubscribe(rpmmqtt mqtt, const char ** topics) { rpmRC rc = RPMRC_FAIL; /* assume failure */ + int nsubs = 0; int xx; - if (topics) { - int nsubs = argvCount((ARGV_t)topics); -#ifndef NOTYET /* XXX MQTT segfault if done here. */ argvPrint(__FUNCTION__, (ARGV_t)topics, NULL); + if (topics) { + nsubs = argvCount((ARGV_t)topics); xx = rpmmqttSubscribeMany(mqtt, nsubs, (char **)topics); -#else - for (int i = 0; i < nsubs; i++) { - xx = rpmmqttSub(mqtt, topics[i], 0); - } -#endif } rc = RPMRC_OK; +SPEW((stderr, "<-- %s(%p,%p[%d]) rc %d\n", __FUNCTION__, mqtt, topics, nsubs, rc)); return rc; } @@ -2021,11 +3537,8 @@ { rpmmqtt mqtt = (rpmmqtt) _mqtt; -#ifdef WITH_MQTT (void) rpmmqttDisconnect(mqtt); - (void) check(mqtt, "destroy", - (MQTTAsync_destroy(&mqtt->I), 0)); -#endif /* WITH_MQTT */ + (void) rpmmqttDestroy(mqtt); /* ========== */ mqtt->_progname = _free(mqtt->_progname); @@ -2104,75 +3617,28 @@ rpmRC rc = rpmmqttInit(mqtt, ac, (const char **)av, (mqttFlags)flags); (void)rc; -#ifdef WITH_MQTT - { static int oneshot; - int _lvl = RPMLOG_DEBUG; + { int xx; -mqtt->trace = 4; - if (mqtt->trace && rpmIsDebug()) { - xx = check(mqtt, "setTraceCallback", - (MQTTAsync_setTraceCallback(onTrace), 0)); - xx = check(mqtt, "setTraceLevel", - (MQTTAsync_setTraceLevel((enum MQTTASYNC_TRACE_LEVELS)mqtt->trace), 0)); - } - - rpmlog(_lvl, "==================== MQTT\n"); - - if (!oneshot) { - if (mqtt->trace == 0) { - MQTTAsync_nameValue *I = MQTTAsync_getVersionInfo(); - while (I->name) { - rpmlog(_lvl, "%19s: %s\n", I->name, I->value); - I++; - } - } - oneshot++; - } - - /* XXX improve integration */ -static const char _mqtt_persist[] = - "%{?_mqtt_persist}%{!?_mqtt_persist:2}"; - mqtt->persist_type = (rpmmqttExpandNumeric(mqtt, _mqtt_persist) % 3); -static const char _mqtt_cachedir[] = - "%{?_mqtt_cachedir}%{!?_mqtt_cachedir:/var/cache/mqtt}"; -char *persist_path = rpmGetPath(_mqtt_cachedir, NULL); - - rpmlog(_lvl, "%19s: %s\n", "clientid", mqtt->clientid); - rpmlog(_lvl, "%19s: %s qos(%d) timeout(%u msecs)\n", - "topic", mqtt->topic, mqtt->qos, mqtt->timeout); - rpmlog(_lvl, "%19s: type(%u) %s\n", - "persist", mqtt->persist_type, - (mqtt->persist_type ? persist_path : "")); - rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags)); - if (mqtt->trace) - rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace); - + /* XXX these need to be initialized based on URI scheme */ +#ifdef WITH_PAHO + if (mqtt->vec == NULL) + mqtt->vec = &pahoVec; +#endif +#ifdef WITH_MOSQUITTO + if (mqtt->vec == NULL) + mqtt->vec = &mosqVec; +#endif +#ifdef WITH_RABBITMQ + if (mqtt->vec == NULL) + mqtt->vec = &amqpVec; +#endif +#ifdef WITH_ZEROMQ + if (mqtt->vec == NULL) + mqtt->vec = &zmqVec; +#endif -mqtt->persist_path = _free(mqtt->persist_path); -mqtt->persist_ctx = _free(mqtt->persist_ctx); - switch (mqtt->persist_type) { - default: - case MQTTCLIENT_PERSISTENCE_NONE: - mqtt->persist_ctx = NULL; - break; - case MQTTCLIENT_PERSISTENCE_DEFAULT: - { - mqtt->persist_path = xstrdup(persist_path); - /* XXX rpmmqttFini double free */ - mqtt->persist_ctx = (void *)xstrdup(mqtt->persist_path); - } break; - case MQTTCLIENT_PERSISTENCE_USER: - { - mqtt->persist_path = xstrdup(persist_path); - MQTTClient_persistence * ctx = - (MQTTClient_persistence *) xmalloc(sizeof(*ctx)); - *ctx = _rpmmqtt_persistence; /* structure assignment */ - ctx->context = mqtt; - mqtt->persist_ctx = ctx; - } break; - } -persist_path = _free(persist_path); + xx = rpmmqttCreate(mqtt); /* Prepare for subscription delivery. */ if (MF_ISSET(BUFFER) && mqtt->iob == NULL) @@ -2196,23 +3662,6 @@ break; } -mqtt->u = NULL; -dumpMQTT(__FUNCTION__, mqtt); - - if (mqtt->I == NULL) { - xx = check(mqtt, "createWithOptions", - MQTTAsync_createWithOptions(&mqtt->I, - mqtt->uri, mqtt->clientid, - mqtt->persist_type, mqtt->persist_ctx, - (MQTTAsync_createOptions *)AOBJ(mqtt, 'O'))); - - xx = check(mqtt, "setCallbacks", - MQTTAsync_setCallbacks(mqtt->I, mqtt, - onConnectionLost, - onMessageArrived, - onDeliveryComplete)); - } - /* XXX If any topic has a wild card, then switch to "sub" mode. */ { const char * topic; @@ -2258,10 +3707,7 @@ /* Publish any initial messages (if any). */ xx = rpmmqttInitPublish(mqtt, mqtt->topics); } - - } -#endif /* WITH_MQTT */ return rpmmqttLink(mqtt); } @@ -2277,7 +3723,6 @@ msg = rpmmqttExpand(mqtt, NULL, "%now", " ", str, NULL); -#if defined(WITH_MQTT) if (rpmmqttConnect(mqtt)) goto exit; if (msg != NULL && !rpmmqttSendMessage(mqtt, NULL, msg, strlen(msg))) { @@ -2288,8 +3733,6 @@ } exit: -#endif - msg = _free(msg); SPEW((stderr, "<== %s(%p,\"%s\",%p) rc %d\n", __FUNCTION__, mqtt, str, resultp, rc)); return rc; @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.h ============================================================================ $ cvs diff -u -r1.1.2.17 -r1.1.2.18 rpmmqtt.h --- rpm/rpmio/rpmmqtt.h 10 Jul 2016 16:16:04 -0000 1.1.2.17 +++ rpm/rpmio/rpmmqtt.h 11 Jul 2016 20:26:53 -0000 1.1.2.18 @@ -7,8 +7,9 @@ extern int _rpmmqtt_debug; -typedef struct rpmmqtt_s * rpmmqtt; -typedef struct mqttState_s * mqttState; +typedef struct rpmmqtt_s * rpmmqtt; +typedef struct pahoState_s * pahoState_t; +typedef struct mqttVec_s * mqttVec_t; #define _KFB(n) (1U << (n)) #define _MFB(n) (_KFB(n) | 0x40000000) @@ -47,7 +48,14 @@ MQTT_OUTPUT_CALLBACK = (1 << 2), } mqttOutput; +#ifdef __cplusplus +extern "C" { +#endif + #if defined(_RPMMQTT_INTERNAL) + +#define SPEW(_list) if (mqtt->debug || _rpmmqtt_debug < 0) fprintf _list + struct rpmmqtt_s { struct rpmioItem_s _item; /*!< usage mutex and pool identifier. */ MacroContext mc; @@ -116,19 +124,138 @@ /* -- MQTT internals */ void * I; /* MQTTClient */ + volatile int connected; /* XXX mosquitto */ volatile int finished; volatile unsigned token; char * serverURI; int MQTTVersion; int sessionPresent; - mqttState state; + pahoState_t state; + mqttVec_t vec; }; -#endif /* _RPMMQTT_INTERNAL */ +struct mqttVec_s { + const char * name; + uint16_t port; + uint16_t sport; + const char * prefix; + void * errs; + size_t nerrs; + rpmRC (*destroy) (rpmmqtt mqtt); + rpmRC (*create) (rpmmqtt mqtt); + rpmRC (*disconnect) (rpmmqtt mqtt); + rpmRC (*connect) (rpmmqtt mqtt); + rpmRC (*isconnected) (rpmmqtt mqtt); + rpmRC (*unsubscribe) (rpmmqtt mqtt, const char *topic); + rpmRC (*subscribe) (rpmmqtt mqtt, const char *topic, int qos); + rpmRC (*unsubscribeMany) (rpmmqtt mqtt, int ac, char ** av); + rpmRC (*subscribeMany) (rpmmqtt mqtt, int ac, char ** av); + rpmRC (*sendMessage) (rpmmqtt mqtt, const char * topic, + const char *s, size_t ns); +}; -#ifdef __cplusplus -extern "C" { -#endif +static inline +rpmRC rpmmqttDestroy(rpmmqtt mqtt) +{ + rpmRC rc = mqtt && mqtt->vec && mqtt->vec->destroy + ? (mqtt->vec->destroy) (mqtt) + : RPMRC_FAIL; +SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc)); + return rc; +} + +static inline +rpmRC rpmmqttCreate(rpmmqtt mqtt) +{ + rpmRC rc = mqtt && mqtt->vec && mqtt->vec->create + ? (mqtt->vec->create) (mqtt) + : RPMRC_FAIL; +SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc)); + return rc; +} + +static inline +rpmRC rpmmqttDisconnect(rpmmqtt mqtt) +{ + rpmRC rc = mqtt && mqtt->vec && mqtt->vec->disconnect + ? (mqtt->vec->disconnect) (mqtt) + : RPMRC_FAIL; +SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc)); + return rc; +} + +static inline +rpmRC rpmmqttConnect(rpmmqtt mqtt) +{ + rpmRC rc = mqtt && mqtt->vec && mqtt->vec->connect + ? (mqtt->vec->connect) (mqtt) + : RPMRC_FAIL; +SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc)); + return rc; +} + +static inline +rpmRC rpmmqttIsConnected(rpmmqtt mqtt) +{ + rpmRC rc = mqtt && mqtt->vec && mqtt->vec->isconnected + ? (mqtt->vec->isconnected) (mqtt) + : RPMRC_NOTFOUND; /* XXX */ +SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc)); + return rc; +} + +static inline +rpmRC rpmmqttUnsubscribe(rpmmqtt mqtt, const char *topic) +{ + rpmRC rc = mqtt && mqtt->vec && mqtt->vec->unsubscribe + ? (mqtt->vec->unsubscribe) (mqtt, topic) + : RPMRC_FAIL; +SPEW((stderr, "<-- %s(%p,\"%s\") rc %d\n", __FUNCTION__, mqtt, topic, rc)); + return rc; +} + +static inline +rpmRC rpmmqttSubscribe(rpmmqtt mqtt, const char *topic, int qos) +{ + rpmRC rc = mqtt && mqtt->vec && mqtt->vec->subscribe + ? (mqtt->vec->subscribe) (mqtt, topic, qos) + : RPMRC_FAIL; +SPEW((stderr, "<-- %s(%p,%p[%d]) rc %d\n", __FUNCTION__, mqtt, topic, qos, rc)); + return rc; +} + +static inline +rpmRC rpmmqttUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av) +{ + rpmRC rc = mqtt && mqtt->vec && mqtt->vec->unsubscribeMany + ? (mqtt->vec->unsubscribeMany) (mqtt, ac, av) + : RPMRC_FAIL; +SPEW((stderr, "<-- %s(%p,%p[%d]) rc %d\n", __FUNCTION__, mqtt, av, ac, rc)); + return rc; +} + +static inline +rpmRC rpmmqttSubscribeMany(rpmmqtt mqtt, int ac, char ** av) +{ + rpmRC rc = mqtt && mqtt->vec && mqtt->vec->subscribeMany + ? (mqtt->vec->subscribeMany) (mqtt, ac, av) + : RPMRC_FAIL; +SPEW((stderr, "<-- %s(%p,%p[%d]) rc %d\n", __FUNCTION__, mqtt, av, ac, rc)); + return rc; +} + +static inline +rpmRC rpmmqttSendMessage(rpmmqtt mqtt, const char * topic, + const char *s, size_t ns) +{ + rpmRC rc = mqtt && mqtt->vec && mqtt->vec->sendMessage + ? (mqtt->vec->sendMessage) (mqtt, topic, s, ns) + : RPMRC_FAIL; +SPEW((stderr, "<-- %s(%p,%p[%u]) rc %d\n", __FUNCTION__, mqtt, s, (unsigned)ns, rc)); + return rc; +} + +#endif /* _RPMMQTT_INTERNAL */ /** * Unreference a mqtt wrapper instance. @@ -171,21 +298,6 @@ RPM_GNUC_NULL_TERMINATED; int rpmmqttExpandNumeric(rpmmqtt mqtt, const char *arg); -rpmRC rpmmqttConnect(rpmmqtt mqtt); - -rpmRC rpmmqttDisconnect(rpmmqtt mqtt); - -rpmRC rpmmqttSendMessage(rpmmqtt mqtt, const char * topic, - const char *s, size_t ns); - -rpmRC rpmmqttSubscribeMany(rpmmqtt mqtt, int ac, char ** av); - -rpmRC rpmmqttUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av); - -rpmRC rpmmqttSubscribe(rpmmqtt mqtt, const char *topic, int qos); - -rpmRC rpmmqttUnsubscribe(rpmmqtt mqtt, const char *topic); - rpmRC rpmmqttPub(rpmmqtt mqtt, const char * topic, const char *s, size_t ns); rpmRC rpmmqttSub(rpmmqtt mqtt, const char *s, size_t ns); @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/tmqtt.c ============================================================================ $ cvs diff -u -r1.1.2.12 -r1.1.2.13 tmqtt.c --- rpm/rpmio/tmqtt.c 6 Jul 2016 13:26:36 -0000 1.1.2.12 +++ rpm/rpmio/tmqtt.c 11 Jul 2016 20:26:53 -0000 1.1.2.13 @@ -32,17 +32,12 @@ int qos = 2; xx = rpmmqttConnect(mqtt); + xx = rpmmqttSubscribe(mqtt, av[0], qos); xx = rpmmqttUnsubscribe(mqtt, av[0]); xx = rpmmqttSubscribeMany(mqtt, ac, (char **)av); xx = rpmmqttUnsubscribeMany(mqtt, ac, (char **)av); - xx = rpmmqttDisconnect(mqtt); -#ifdef DYING - (void) rpmmqttSub(mqtt, "rpm/#?qos=0", 0); - (void) rpmmqttSub(mqtt, "$SYS/broker/version?qos=0", 0); -#endif - xx = rpmmqttPub(mqtt, NULL, "bzzt ...", 0); xx = rpmmqttPub(mqtt, NULL, "bzzT ...", 0); xx = rpmmqttPub(mqtt, NULL, "bzZT ...", 0); @@ -50,6 +45,8 @@ xx = rpmmqttPub(mqtt, NULL, "BZZT ...", 0); xx = rpmmqttPub(mqtt, NULL, "SWAT !!!", 0); + xx = rpmmqttDisconnect(mqtt); + return rc; } @@ -115,5 +112,7 @@ } mqtt = rpmmqttFree(mqtt); + (void) rpmioClean(); + return rc; } @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository rpm-cvs@rpm5.org