RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: j...@rpm5.org Module: rpm Date: 02-Jul-2016 11:33:44 Branch: rpm-5_4 Handle: 2016070209334400 Modified files: (Branch: rpm-5_4) rpm/rpmio rpmmqtt.c rpmmqtt.h tmqtt.c Log: - mqtt: wire up the cli options. Summary: Revision Changes Path 1.1.2.10 +316 -185 rpm/rpmio/rpmmqtt.c 1.1.2.9 +34 -30 rpm/rpmio/rpmmqtt.h 1.1.2.6 +65 -46 rpm/rpmio/tmqtt.c ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.c ============================================================================ $ cvs diff -u -r1.1.2.9 -r1.1.2.10 rpmmqtt.c --- rpm/rpmio/rpmmqtt.c 1 Jul 2016 14:43:47 -0000 1.1.2.9 +++ rpm/rpmio/rpmmqtt.c 2 Jul 2016 09:33:44 -0000 1.1.2.10 @@ -30,6 +30,118 @@ #define MF_ISSET(_FLAG) F_ISSET(mqtt->flags, _FLAG) /*==============================================================*/ +struct rpmmqtt_s _mqtt; + +#ifndef DYING +static void dumpU(const char * msg, urlinfo u) +{ + FILE * fp = stderr; + const char * url = (u ? u->url : NULL); + const char *path = NULL; + urltype ut = urlPath(url, &path);; + + if (msg) + fprintf(fp, "===================================== %s u %p ut %d\n", + msg, u, ut); + if (u) { + + fprintf(fp, " url: %s\n", u->url); + fprintf(fp, " scheme: %s\n", u->scheme); + fprintf(fp, " user: %s\n", u->user); + fprintf(fp, "password: %s\n", u->password); + fprintf(fp, " host: %s\n", u->host); + fprintf(fp, " portstr: %s\n", u->portstr); + fprintf(fp, " path: %s\n", path); + fprintf(fp, " query: %s\n", u->query); + fprintf(fp, "fragment: %s\n", u->fragment); + if (u->query) { + ARGV_t av = NULL; + int xx; + xx = argvSplit(&av, u->query, ","); + argvPrint(u->query, av, fp); + av = argvFree(av); + } + } +} +#endif + +static void dumpMQTT(const char * msg, rpmmqtt mqtt) +{ + FILE * fp = stderr; + + if (msg) + fprintf(fp, "===================================== %s(%p)\n", + msg, mqtt); + if (mqtt) { + +#define PRINT(_fmt, _foo) fprintf(fp, "%19s: %"#_fmt"\n", #_foo, mqtt->_foo) + PRINT(s, _address); + PRINT(s, _infile); + PRINT(s, idprefix); + PRINT(s, _message); + + PRINT(s, cafile); + PRINT(s, _capath); + PRINT(s, cert); + PRINT(s, privkey); + PRINT(s, ciphers); + PRINT(s, _tls_version); + PRINT(s, _psk_key); + PRINT(s, _psk_identity); + PRINT(s, _proxy); + + PRINT(d, _max_msg_count); + +fprintf(stderr, "====================\n"); + PRINT(d, keepalive); + PRINT(d, max_inflight); + PRINT(s, protocol_version); + PRINT(s, will_message); + PRINT(d, will_qos); + PRINT(s, will_topic); + + PRINT(s, user); + PRINT(s, password); + PRINT(s, host); + PRINT(d, port); + PRINT(s, uri); + + PRINT(s, _clientid); + PRINT(s, clientid); + + PRINT(u, qos); + + if (mqtt->_topics) + argvPrint("mqtt->_topics", (ARGV_t)mqtt->_topics, fp); + PRINT(s, topic); + +fprintf(stderr, "====================\n"); + PRINT(d, ut); + dumpU("mqtt->u", mqtt->u); + PRINT(p, C); + PRINT(u, persist_type); + PRINT(p, persist_ctx); + PRINT(s, persist_path); + + PRINT(u, token); + PRINT(u, timeout); + + PRINT(d, debug); + PRINT(d, trace); + PRINT(d, finished); + + PRINT(s, serverURI); + PRINT(d, MQTTVersion); + PRINT(d, sessionPresent); + + PRINT(s, dn); +#undef PRINT + + + } +} + +/*==============================================================*/ typedef struct key_s { int v; const char *n; @@ -91,10 +203,10 @@ /*==============================================================*/ #ifdef WITH_MQTT -static int onMessageArrived(void * _mqtt, char * topic, int topicLen, +static int onMessageArrived(void * _ctx, char * topic, int topicLen, MQTTAsync_message * message) { - rpmmqtt mqtt = (rpmmqtt) _mqtt; + rpmmqtt mqtt = (rpmmqtt) _ctx; int rc = 1; (void)mqtt; @@ -115,17 +227,17 @@ return rc; } -static void onDeliveryComplete(void * _mqtt, int token) +static void onDeliveryComplete(void * _ctx, int token) { - rpmmqtt mqtt = (rpmmqtt) _mqtt; + rpmmqtt mqtt = (rpmmqtt) _ctx; if (_rpmmqtt_debug < 0) rpmlog(RPMLOG_DEBUG, "--- MQTT done(%d)\n", token); mqtt->token = token; } -static void onConnectionLost(void * _mqtt, char *cause) +static void onConnectionLost(void * _ctx, char *cause) { - rpmmqtt mqtt = (rpmmqtt) _mqtt; + rpmmqtt mqtt = (rpmmqtt) _ctx; rpmlog(RPMLOG_DEBUG, "--- MQTT disconnect(%s) version(%d) present(%d)\n", @@ -135,14 +247,13 @@ rpmlog(RPMLOG_WARNING, "MQTT reconnecting(%s) ...\n", mqtt->serverURI); - mqtt->connected = 0; (void) rpmmqttConnect(mqtt); } -static void onDisconnectFailure(void * _mqtt, MQTTAsync_failureData * response) +static void onDisconnectFailure(void * _ctx, MQTTAsync_failureData * response) { - rpmmqtt mqtt = (rpmmqtt) _mqtt; -fprintf(stderr, "<-- %s(%p,%p) MQTT disconnect failed\n", __FUNCTION__, _mqtt, response); + rpmmqtt mqtt = (rpmmqtt) _ctx; +fprintf(stderr, "<-- %s(%p,%p) MQTT disconnect failed\n", __FUNCTION__, _ctx, response); if (response) { const char *s = response->message; int token = response->token; @@ -152,42 +263,39 @@ token, code, s); } else rpmlog(RPMLOG_WARNING, "MQTT disconnect failed\n"); - mqtt->connected = 0; mqtt->finished = 1; } -static void onDisconnect(void * _mqtt, MQTTAsync_successData * response) +static void onDisconnect(void * _ctx, MQTTAsync_successData * response) { - rpmmqtt mqtt = (rpmmqtt) _mqtt; + rpmmqtt mqtt = (rpmmqtt) _ctx; if (mqtt->debug || _rpmmqtt_debug) rpmlog(RPMLOG_DEBUG, "MQTT disconnect(%s) version(%d) present(%d)\n", mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent); mqtt->serverURI = _free(mqtt->serverURI); - mqtt->connected = 0; mqtt->finished = 1; } -static void onConnectFailure(void * _mqtt, MQTTAsync_failureData * response) +static void onConnectFailure(void * _ctx, MQTTAsync_failureData * response) { - rpmmqtt mqtt = (rpmmqtt) _mqtt; -fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _mqtt, response); + rpmmqtt mqtt = (rpmmqtt) _ctx; +fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _ctx, response); if (response) { const char *s = response->message; int token = response->token; int code = response->code; - rpmlog(RPMLOG_WARNING, "MQTT connect failed: code(%d) msg %s\n", + rpmlog(RPMLOG_WARNING, "MQTT connect failed: token(%d) code(%d) msg %s\n", token, code, s); } else rpmlog(RPMLOG_WARNING, "MQTT connect failed\n"); - mqtt->connected = 0; mqtt->finished = 1; } -static void onConnect(void * _mqtt, MQTTAsync_successData * response) +static void onConnect(void * _ctx, MQTTAsync_successData * response) { - rpmmqtt mqtt = (rpmmqtt) _mqtt; + rpmmqtt mqtt = (rpmmqtt) _ctx; if (response) { mqtt->serverURI = xstrdup(response->alt.connect.serverURI); @@ -199,14 +307,13 @@ rpmlog(RPMLOG_DEBUG, "MQTT connect(%s) version(%d) present(%d)\n", mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent); - mqtt->connected = 1; mqtt->finished = 1; } -static void onSubscribeFailure(void * _mqtt, MQTTAsync_failureData * response) +static void onSubscribeFailure(void * _ctx, MQTTAsync_failureData * response) { - rpmmqtt mqtt = (rpmmqtt) _mqtt; -fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _mqtt, response); + rpmmqtt mqtt = (rpmmqtt) _ctx; +fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _ctx, response); if (response) { const char *s = response->message; int token = response->token; @@ -215,24 +322,22 @@ token, code, s); } else rpmlog(RPMLOG_WARNING, "MQTT subscribe failed\n"); - mqtt->subscribed = 0; mqtt->finished = 1; } -static void onSubscribe(void * _mqtt, MQTTAsync_successData * response) +static void onSubscribe(void * _ctx, MQTTAsync_successData * response) { - rpmmqtt mqtt = (rpmmqtt) _mqtt; + rpmmqtt mqtt = (rpmmqtt) _ctx; int qos = response->alt.qos; if (mqtt->debug || _rpmmqtt_debug) rpmlog(RPMLOG_DEBUG, "MQTT subscribe qos(%d)\n", qos); - mqtt->subscribed = 1; mqtt->finished = 1; } -static void onSendFailure(void * _mqtt, MQTTAsync_failureData * response) +static void onSendFailure(void * _ctx, MQTTAsync_failureData * response) { - rpmmqtt mqtt = (rpmmqtt) _mqtt; + rpmmqtt mqtt = (rpmmqtt) _ctx; { const char *s = response->message; @@ -244,9 +349,9 @@ mqtt->finished = 1; } -static void onSend(void * _mqtt, MQTTAsync_successData * response) +static void onSend(void * _ctx, MQTTAsync_successData * response) { - rpmmqtt mqtt = (rpmmqtt) _mqtt; + rpmmqtt mqtt = (rpmmqtt) _ctx; if (mqtt->debug || _rpmmqtt_debug) { const char * s = response->alt.pub.message.payload; @@ -266,15 +371,15 @@ /*==============================================================*/ #ifdef WITH_MQTT -static int rpmmqttOpen(void **_mqttp, const char *clientID, const char *serverURI, - void *_mqtt) +static int rpmmqttOpen(void **_ctxp, const char *clientID, const char *serverURI, + void *_ctx) { - rpmmqtt mqtt = (rpmmqtt) _mqtt; + rpmmqtt mqtt = (rpmmqtt) _ctx; char * dn; char * te; int rc = MQTTCLIENT_PERSISTENCE_ERROR; - *_mqttp = _mqtt; + *_ctxp = _ctx; mqtt->dn = _free(mqtt->dn); dn = rpmGetPath(mqtt->persist_path, "/", clientID, "-", serverURI, NULL); @@ -291,14 +396,14 @@ exit: if (mqtt->debug || _rpmmqtt_debug < 0) -fprintf(stderr, "<-- %s(%p,\"%s\",\"%s\",%p) rc %d dn %s\n", __FUNCTION__, _mqttp, clientID, serverURI, _mqtt, rc, mqtt->dn); +fprintf(stderr, "<-- %s(%p,\"%s\",\"%s\",%p) rc %d dn %s\n", __FUNCTION__, _ctxp, clientID, serverURI, _ctx, rc, mqtt->dn); return rc; } -static int rpmmqttClose(void *_mqtt) +static int rpmmqttClose(void *_ctx) { - rpmmqtt mqtt = (rpmmqtt) _mqtt; + rpmmqtt mqtt = (rpmmqtt) _ctx; int rc = MQTTCLIENT_PERSISTENCE_ERROR; @@ -314,14 +419,14 @@ exit: if (mqtt->debug || _rpmmqtt_debug < 0) -fprintf(stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, _mqtt, rc); +fprintf(stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, _ctx, rc); return rc; } -static int rpmmqttPut(void *_mqtt, char *key, +static int rpmmqttPut(void *_ctx, char *key, int bufcount, char *buffers[], int buflens[]) { - rpmmqtt mqtt = (rpmmqtt) _mqtt; + rpmmqtt mqtt = (rpmmqtt) _ctx; char *fn = NULL; FD_t fd = NULL; size_t nb = 0; @@ -359,15 +464,15 @@ if (fd) Fclose(fd); if (mqtt->debug || _rpmmqtt_debug < 0) -fprintf(stderr, "<-- %s(%p,\"%s\",%d,%p,%p) rc %d fn %s\n", __FUNCTION__, _mqtt, key, bufcount, buffers, buflens, rc, fn); +fprintf(stderr, "<-- %s(%p,\"%s\",%d,%p,%p) rc %d fn %s\n", __FUNCTION__, _ctx, key, bufcount, buffers, buflens, rc, fn); fn = _free(fn); return rc; } -static int rpmmqttGet(void *_mqtt, char *key, +static int rpmmqttGet(void *_ctx, char *key, char *buffer[], int *buflen) { - rpmmqtt mqtt = (rpmmqtt) _mqtt; + rpmmqtt mqtt = (rpmmqtt) _ctx; char *fn = NULL; FD_t fd = NULL; size_t nr = 0; @@ -405,16 +510,16 @@ if (fd) Fclose(fd); if (mqtt->debug || _rpmmqtt_debug < 0) -fprintf(stderr, "<-- %s(%p,\"%s\",%p,%p) rc %d fn %s\n", __FUNCTION__, _mqtt, key, buffer, buflen, rc, fn); +fprintf(stderr, "<-- %s(%p,\"%s\",%p,%p) rc %d fn %s\n", __FUNCTION__, _ctx, key, buffer, buflen, rc, fn); fn = _free(fn); *buffer = b; *buflen = nb; return rc; } -static int rpmmqttRemove(void *_mqtt, char *key) +static int rpmmqttRemove(void *_ctx, char *key) { - rpmmqtt mqtt = (rpmmqtt) _mqtt; + rpmmqtt mqtt = (rpmmqtt) _ctx; char *fn = NULL; int rc = MQTTCLIENT_PERSISTENCE_ERROR; @@ -430,14 +535,14 @@ exit: if (mqtt->debug || _rpmmqtt_debug < 0) -fprintf(stderr, "<-- %s(%p,\"%s\") rc %d fn %s\n", __FUNCTION__, _mqtt, key, rc, fn); +fprintf(stderr, "<-- %s(%p,\"%s\") rc %d fn %s\n", __FUNCTION__, _ctx, key, rc, fn); fn = _free(fn); return rc; } -static int rpmmqttKeys(void *_mqtt, char ***keys, int *nkeys) +static int rpmmqttKeys(void *_ctx, char ***keys, int *nkeys) { - rpmmqtt mqtt = (rpmmqtt) _mqtt; + rpmmqtt mqtt = (rpmmqtt) _ctx; ARGV_t av = NULL; int ac = 0; DIR * dir = NULL; @@ -468,15 +573,15 @@ if (dir) (void) Closedir(dir); if (mqtt->debug || _rpmmqtt_debug < 0) -fprintf(stderr, "<-- %s(%p,%p,%p) rc %d keys %p[%u]\n", __FUNCTION__, _mqtt, keys, nkeys, rc, av, ac); +fprintf(stderr, "<-- %s(%p,%p,%p) rc %d keys %p[%u]\n", __FUNCTION__, _ctx, keys, nkeys, rc, av, ac); *keys = (char **) av; *nkeys = ac; return rc; } -static int rpmmqttClear(void *_mqtt) +static int rpmmqttClear(void *_ctx) { - rpmmqtt mqtt = (rpmmqtt) _mqtt; + rpmmqtt mqtt = (rpmmqtt) _ctx; DIR * dir = NULL; struct dirent *dp; int nerrs = 0; @@ -506,13 +611,13 @@ if (dir) (void) Closedir(dir); if (mqtt->debug || _rpmmqtt_debug < 0) -fprintf(stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, _mqtt, rc); +fprintf(stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, _ctx, rc); return rc; } -static int rpmmqttContainsKey(void *_mqtt, char *key) +static int rpmmqttContainsKey(void *_ctx, char *key) { - rpmmqtt mqtt = (rpmmqtt) _mqtt; + rpmmqtt mqtt = (rpmmqtt) _ctx; DIR * dir = NULL; struct dirent *dp; int rc = MQTTCLIENT_PERSISTENCE_ERROR; @@ -542,7 +647,7 @@ if (dir) (void) Closedir(dir); if (mqtt->debug || _rpmmqtt_debug < 0) -fprintf(stderr, "<-- %s(%p,\"%s\") rc %d\n", __FUNCTION__, _mqtt, key, rc); +fprintf(stderr, "<-- %s(%p,\"%s\") rc %d\n", __FUNCTION__, _ctx, key, rc); return rc; } @@ -561,6 +666,17 @@ #endif /* WITH_MQTT */ /*==============================================================*/ +static const MQTTAsync_willOptions _Wopts + = MQTTAsync_willOptions_initializer; +static const MQTTAsync_connectOptions _Copts + = MQTTAsync_connectOptions_initializer; +static const MQTTAsync_SSLOptions _Sopts + = MQTTAsync_SSLOptions_initializer; +static const MQTTAsync_disconnectOptions _Dopts = + MQTTAsync_disconnectOptions_initializer; +static const MQTTAsync_responseOptions _Ropts = + MQTTAsync_responseOptions_initializer; + int rpmmqttConnect(rpmmqtt mqtt) { int rc = -1; @@ -570,46 +686,56 @@ } else { urlinfo u = mqtt->u; - MQTTAsync_willOptions Wopts = MQTTAsync_willOptions_initializer; + MQTTAsync_willOptions *Wopts = &mqtt->Wopts; + memcpy(Wopts, &_Wopts, sizeof(*Wopts)); #ifdef REF - memcpy(Wopts.struct_id, "MQTW", 4); - Wopts.struct_version = 0; + memcpy(Wopts->struct_id, "MQTW", 4); + Wopts->struct_version = 0; #endif - Wopts.topicName = mqtt->topic; - Wopts.message = (mqtt->_will_message ? mqtt->_will_message : ""); - Wopts.retained = (MF_ISSET(WILL_RETAIN) ? 1 : 0); - Wopts.qos = mqtt->_will_qos; + Wopts->topicName = (mqtt->will_topic ? mqtt->will_topic : mqtt->topic); + Wopts->message = (mqtt->will_message ? mqtt->will_message : ""); + Wopts->retained = (MF_ISSET(WILL_RETAIN) ? 1 : 0); + Wopts->qos = mqtt->will_qos; /* XXX mqtt->qos? */ - MQTTAsync_connectOptions Copts = MQTTAsync_connectOptions_initializer; + MQTTAsync_connectOptions *Copts = &mqtt->Copts; + memcpy(Copts, &_Copts, sizeof(*Copts)); #ifdef REF - memcpy(Copts.struct_id, "MQTC", 4); - Copts.struct_version = 3; /* 0-4 enables fields below */ + memcpy(Copts->struct_id, "MQTC", 4); + Copts->struct_version = 3; /* 0-4 enables fields below */ #endif - Copts.keepAliveInterval = 10; /* 60 */ - Copts.cleansession = 0; /* 1 discards session state */ - Copts.maxInflight = 10; /* 10 */ - - Copts.will = &Wopts; /* last will */ - Copts.username = (u->user ? u->user : mqtt->_user);; - Copts.password = (u->password ? u->password : mqtt->_password); - Copts.connectTimeout = 30; /* secs */ - Copts.retryInterval = 0; /* secs */ -#ifdef REF - MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer; - ssl_opts.enableServerCertAuth = 0; - Copts.ssl = &ssl_opts; + Copts->keepAliveInterval = mqtt->keepalive; + Copts->cleansession = (MF_ISSET(CLEAN) ? 1 : 0); + Copts->maxInflight = mqtt->max_inflight; + + Copts->will = Wopts; /* last will */ + Copts->username = (u && u->user ? u->user : mqtt->user);; + Copts->password = (u && u->password ? u->password : mqtt->password); + + Copts->connectTimeout = 30; /* XXX secs. configure?*/ + Copts->retryInterval = 0; /* XXX secs. configure? */ + +#ifdef NOTYET + MQTTAsync_SSLOptions *Sopts = &mqtt->Sopts; + memcpy(Sopts, &_Sopts, sizeof(*Sopts)); + Sopts->trustStore = mqtt->cafile; + Sopts->keyStore = mqtt->cert; + Sopts->privateKey = mqtt->privkey; + Sopts->enabledCipherSuites = mqtt->ciphers; + Sopts->enableServerCertAuth = (MF_ISSET(INSECURE) ? 0 : 1); + Copts->ssl = Sopts; #else - Copts.ssl = NULL; + (void)_Sopts; + Copts->ssl = NULL; #endif - Copts.onSuccess = onConnect; - Copts.onFailure = onConnectFailure; - Copts.context = mqtt; - Copts.serverURIcount = 0; - Copts.serverURIs = NULL; + Copts->onSuccess = onConnect; + Copts->onFailure = onConnectFailure; + Copts->context = mqtt; + Copts->serverURIcount = 0; + Copts->serverURIs = NULL; mqtt->finished = 0; rc = check(mqtt, "connect", - MQTTAsync_connect(mqtt->C, &Copts)); + MQTTAsync_connect(mqtt->C, Copts)); while (!mqtt->finished) usleep(1000); @@ -624,20 +750,20 @@ int rc = -1; #ifdef WITH_MQTT if (MQTTAsync_isConnected(mqtt->C)) { - MQTTAsync_disconnectOptions Dopts = - MQTTAsync_disconnectOptions_initializer; + MQTTAsync_disconnectOptions *Dopts = &mqtt->Dopts; + memcpy(Dopts, &_Dopts, sizeof(*Dopts)); #ifdef REF - memcpy(Dopts.struct_id, "MQTD", 4); - Dopts.struct_version = 0; + memcpy(Dopts->struct_id, "MQTD", 4); + Dopts->struct_version = 0; #endif - Dopts.timeout = mqtt->msecs; - Dopts.onSuccess = onDisconnect; - Dopts.onFailure = onDisconnectFailure; - Dopts.context = mqtt; + Dopts->timeout = mqtt->timeout; + Dopts->onSuccess = onDisconnect; + Dopts->onFailure = onDisconnectFailure; + Dopts->context = mqtt; mqtt->finished = 0; rc = check(mqtt, "disconnect", - MQTTAsync_disconnect(mqtt->C, &Dopts)); + MQTTAsync_disconnect(mqtt->C, Dopts)); while (!mqtt->finished) usleep(100); } @@ -666,21 +792,20 @@ pubmsg.msgid = 0; #endif - MQTTAsync_responseOptions Ropts = - MQTTAsync_responseOptions_initializer; + MQTTAsync_responseOptions *Ropts = &mqtt->Ropts; + memcpy(Ropts, &_Ropts, sizeof(*Ropts)); #ifdef REF - memcpy(pubmsg.struct_id, "MQTR", 4); - pubmsg.struct_version = 0; + memcpy(Ropts->struct_id, "MQTR", 4); + Ropts->struct_version = 0; #endif - Ropts.onSuccess = onSend; - Ropts.onFailure = onSendFailure; - Ropts.context = mqtt; - Ropts.token = 0; + Ropts->onSuccess = onSend; + Ropts->onFailure = onSendFailure; + Ropts->context = mqtt; + Ropts->token = 0; mqtt->finished = 0; rc = check(mqtt, "sendMessage", - MQTTAsync_sendMessage(mqtt->C, mqtt->topic, &pubmsg, - &Ropts)); + MQTTAsync_sendMessage(mqtt->C, mqtt->topic, &pubmsg, Ropts)); while (!mqtt->finished) usleep(100); #endif /* WITH_MQTT */ @@ -748,23 +873,24 @@ rpmlog(_lvl, "%19s: %s qos(%u)\n", "subtopic", subtopic, subqos); - MQTTAsync_responseOptions Ropts = - MQTTAsync_responseOptions_initializer; + MQTTAsync_responseOptions *Ropts = &mqtt->Ropts; + memcpy(Ropts, &_Ropts, sizeof(*Ropts)); #ifdef REF - memcpy(pubmsg.struct_id, "MQTR", 4); - pubmsg.struct_version = 0; + memcpy(Ropts->struct_id, "MQTR", 4); + Ropts->struct_version = 0; #endif - Ropts.onSuccess = onSubscribe; - Ropts.onFailure = onSubscribeFailure; - Ropts.context = mqtt; + Ropts->onSuccess = onSubscribe; + Ropts->onFailure = onSubscribeFailure; + Ropts->context = mqtt; mqtt->finished = 0; rc = check(mqtt, "subscribe", - MQTTAsync_subscribe(mqtt->C, subtopic, subqos, &Ropts)); - while (!mqtt->finished) + MQTTAsync_subscribe(mqtt->C, subtopic, subqos, Ropts)); +fprintf(stderr, "*** finished(%d)\n", mqtt->finished); + while (rc == 0 && !mqtt->finished) usleep(100); - ret = 0; /* XXX */ + ret = (rc ? -1 : 0); subtopic = _free(subtopic); @@ -775,9 +901,9 @@ } /*==============================================================*/ -static void rpmmqttFini(void * _mqtt) +static void rpmmqttFini(void * _ctx) { - rpmmqtt mqtt = (rpmmqtt) _mqtt; + rpmmqtt mqtt = (rpmmqtt) _ctx; #ifdef WITH_MQTT (void) rpmmqttDisconnect(mqtt); @@ -788,23 +914,23 @@ /* ========== */ mqtt->_address = _free(mqtt->_address); mqtt->_infile = _free(mqtt->_infile); - mqtt->_host = _free(mqtt->_host); - mqtt->_id = _free(mqtt->_id); - mqtt->_idprefix = _free(mqtt->_idprefix); + mqtt->host = _free(mqtt->host); + mqtt->_clientid = _free(mqtt->_clientid); + mqtt->idprefix = _free(mqtt->idprefix); mqtt->_message = _free(mqtt->_message); - mqtt->_password = _free(mqtt->_password); - mqtt->_topic = argvFree((ARGV_t)mqtt->_topic); - mqtt->_user = _free(mqtt->_user); - mqtt->_version = _free(mqtt->_version); - - mqtt->_will_message = _free(mqtt->_will_message); - mqtt->_will_topic = _free(mqtt->_will_topic); - mqtt->_cafile = _free(mqtt->_cafile); + mqtt->password = _free(mqtt->password); + mqtt->_topics = argvFree((ARGV_t)mqtt->_topics); + mqtt->user = _free(mqtt->user); + mqtt->protocol_version = _free(mqtt->protocol_version); /* XXX malloc? */ + + mqtt->will_message = _free(mqtt->will_message); + mqtt->will_topic = _free(mqtt->will_topic); + mqtt->cafile = _free(mqtt->cafile); mqtt->_capath = _free(mqtt->_capath); - mqtt->_cert = _free(mqtt->_cert); - mqtt->_privkey = _free(mqtt->_privkey); - mqtt->_ciphers = _free(mqtt->_ciphers); - mqtt->_tls_version = _free(mqtt->_tls_version); + mqtt->cert = _free(mqtt->cert); + mqtt->privkey = _free(mqtt->privkey); + mqtt->ciphers = _free(mqtt->ciphers); + mqtt->_tls_version = _free(mqtt->_tls_version); /* XXX malloc? */ mqtt->_psk_key = _free(mqtt->_psk_key); mqtt->_psk_identity = _free(mqtt->_psk_identity); mqtt->_proxy = _free(mqtt->_proxy); @@ -828,39 +954,6 @@ RPMIOPOOL_MODULE(mqtt) -#ifdef DYING -static void dumpURL(const char * msg, urlinfo u) -{ - FILE * fp = stderr; - const char * url = (u ? u->url : NULL); - const char *path = NULL; - urltype ut = urlPath(url, &path);; - - if (msg) - fprintf(fp, "===================================== %s u %p ut %d\n", - msg, u, ut); - if (u) { - - fprintf(fp, " url: %s\n", u->url); - fprintf(fp, " scheme: %s\n", u->scheme); - fprintf(fp, " user: %s\n", u->user); - fprintf(fp, "password: %s\n", u->password); - fprintf(fp, " host: %s\n", u->host); - fprintf(fp, " portstr: %s\n", u->portstr); - fprintf(fp, " query: %s\n", u->query); - if (u->query) { - ARGV_t av = NULL; - int xx; - xx = argvSplit(&av, u->query, ","); - argvPrint(u->query, av, fp); - av = argvFree(av); - } - fprintf(fp, "fragment: %s\n", u->fragment); - fprintf(fp, " path: %s\n", path); - } -} -#endif - rpmmqtt rpmmqttNew(char ** av, uint32_t flags) { static const char *_av[] = { "mqtt://localhost:1883/rpm/%{pid}/mqtt", NULL }; @@ -868,6 +961,29 @@ urlinfo u = NULL; const char *s = NULL; + /* -- Copy (and initialize the defaults) from the CLI options. */ + { yarnLock use = mqtt->_item.use; + void *pool = mqtt->_item.pool; +#ifdef WTF + *mqtt = _mqtt; /* structure assignment */ +#else + memcpy(mqtt, &_mqtt, sizeof(*mqtt)); +#endif + mqtt->_item.use = use; + mqtt->_item.pool = pool; + } + memset(&_mqtt, 0, sizeof(_mqtt)); + _mqtt.flags |= (MQTT_FLAGS_CLEAN|MQTT_FLAGS_EOL); + _mqtt.port = 1883; + _mqtt.max_inflight = 20; + _mqtt.keepalive = 60; + _mqtt.qos = 0; + _mqtt.protocol_version = xstrdup("31"); /* XXX malloc? */ + _mqtt.host = rpmExpand("localhost", NULL); + _mqtt.idprefix = rpmExpand("rpm", NULL); + _mqtt._clientid = rpmExpand(_mqtt.idprefix, "-%%{pid}", NULL); + _mqtt._tls_version = xstrdup("1.2"); /* XXX malloc? */ + mqtt->flags = flags; mqtt->av = NULL; @@ -892,20 +1008,31 @@ mqtt->ut = urlSplit(mqtt->av[0], &u); mqtt->u = u; + if (u->scheme == NULL || !strcmp(u->scheme, "mqtt") || !strcmp(u->scheme, "mqtts")) { - if (u->portstr == NULL) + /* XXX propagate mqtt->port ??? */ + if (u->portstr == NULL) { u->portstr = !strcmp(u->scheme, "mqtts") ? xstrdup("8883") : xstrdup("1883"); + + } u->scheme = _free(u->scheme); u->scheme = xstrdup("tcp"); } #ifdef DYING -dumpURL(__FUNCTION__, u); +dumpU(__FUNCTION__, u); #endif + if (mqtt->user == NULL && u->user != NULL) + mqtt->user = xstrdup(u->user); + if (mqtt->password == NULL && u->password != NULL) + mqtt->password = xstrdup(u->password); + if (mqtt->port == 0 && u->port != 0) + mqtt->port = u->port; + mqtt->uri = rpmExpand(u->scheme, "://", u->host, ":", u->portstr, NULL); (void) urlPath(u->url, &s); @@ -921,10 +1048,11 @@ "%{?_mqtt_clientid}%{!?_mqtt_clientid:rpm}"; mqtt->clientid = rpmExpand(_mqtt_clientid, NULL); + /* XXX CLI options clobbered */ static const char _mqtt_qos[] = "%{?_mqtt_qos}%{!?_mqtt_qos:1}"; mqtt->qos = (rpmExpandNumeric(_mqtt_qos) % 3); static const char _mqtt_timeout[] = "%{?_mqtt_timeout}%{!?_mqtt_timeout:10000}"; - mqtt->msecs = rpmExpandNumeric(_mqtt_timeout); + mqtt->timeout = rpmExpandNumeric(_mqtt_timeout); if (u->query) { ARGV_t qav = NULL; @@ -944,7 +1072,7 @@ continue; } if (!strncmp(t, "timeout", (te - t)) && xisdigit(te[1])) { - mqtt->msecs = strtol(te+1, NULL, 0); + mqtt->timeout = strtol(te+1, NULL, 0); continue; } if (!strncmp(t, "trace", (te - t)) && xisdigit(te[1])) { @@ -988,10 +1116,11 @@ char *persist_path = rpmGetPath(_mqtt_cachedir, NULL); rpmlog(_lvl, "%19s: %s\n", "clientid", mqtt->clientid); - rpmlog(_lvl, "%19s: %s qos(%d) timeout(%u)\n", - "topic", mqtt->topic, mqtt->qos, mqtt->msecs); - rpmlog(_lvl, "%19s: %s type(%u)\n", - "persist", persist_path, mqtt->persist_type); + rpmlog(_lvl, "%19s: %s qos(%d) timeout(%umsecs)\n", + "topic", mqtt->topic, mqtt->qos, mqtt->timeout); + rpmlog(_lvl, "%19s: type(%u) %s\n", + "persist", mqtt->persist_type, + (mqtt->persist_type ? persist_path : "")); rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace); @@ -1014,7 +1143,9 @@ } break; } persist_path = _free(persist_path); - + +dumpMQTT(__FUNCTION__, mqtt); + xx = check(mqtt, "create", MQTTAsync_create(&mqtt->C, mqtt->uri, mqtt->clientid, mqtt->persist_type, mqtt->persist_ctx)); @@ -1042,19 +1173,19 @@ rpmlog(_lvl, "%19s: %s qos(%u)\n", "subtopic", t, subqos[i]); } - MQTTAsync_responseOptions Ropts = - MQTTAsync_responseOptions_initializer; + MQTTAsync_responseOptions *Ropts = &mqtt->Ropts; + memcpy(Ropts, &_Ropts, sizeof(*Ropts)); #ifdef REF - memcpy(pubmsg.struct_id, "MQTR", 4); - pubmsg.struct_version = 0; + memcpy(Ropts->struct_id, "MQTR", 4); + Ropts->struct_version = 0; #endif - Ropts.onSuccess = onSubscribe; - Ropts.onFailure = onSubscribeFailure; - Ropts.context = mqtt; + Ropts->onSuccess = onSubscribe; + Ropts->onFailure = onSubscribeFailure; + Ropts->context = mqtt; xx = check(mqtt, "subscribeMany", MQTTAsync_subscribeMany(&mqtt->C, - mqtt->ac-1, mqtt->av+1, subqos+1, &Ropts)); + mqtt->ac-1, mqtt->av+1, subqos+1, Ropts)); subqos = _free(subqos); #else for (int i = 1; i < mqtt->ac; i++) { @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.h ============================================================================ $ cvs diff -u -r1.1.2.8 -r1.1.2.9 rpmmqtt.h --- rpm/rpmio/rpmmqtt.h 1 Jul 2016 14:43:47 -0000 1.1.2.8 +++ rpm/rpmio/rpmmqtt.h 2 Jul 2016 09:33:44 -0000 1.1.2.9 @@ -14,8 +14,8 @@ typedef enum mqttFlags_e { MQTT_FLAGS_NONE = 0, - MQTT_FLAGS_NOCLEAN = _MFB( 0), /*!< (sub) -c */ - MQTT_FLAGS_NOMSGEOL = _MFB( 1), /*!< (sub) -N */ + MQTT_FLAGS_CLEAN = _MFB( 0), /*!< (sub) -c */ + MQTT_FLAGS_EOL = _MFB( 1), /*!< (sub) -N */ MQTT_FLAGS_NOSTALE = _MFB( 2), /*!< (sub) -R */ MQTT_FLAGS_STDIN_EACH = _MFB( 3), /*!< -l */ @@ -37,33 +37,29 @@ char ** av; int ac; -/* ========== */ mqttFlags flags; +/* ========== */ const char *_address; /*!< -A */ const char *_infile; /*!< -f */ - const char *_host; /*!< -h */ - const char *_id; /*!< -i */ - const char *_idprefix; /*!< -I */ - int _keepalive; /*!< -k */ + const char *host; /*!< -h */ + const char *idprefix; /*!< -I */ + int keepalive; /*!< -k */ const char *_message; /*!< -m */ - int _max_inflight; /*!< -M */ - int _port; /*!< -p */ - const char *_password; /*!< -P */ - int _qos; /*!< -q */ - int _retain; /*!< -r */ - const char **_topic; /*!< -t */ - const char *_user; /*!< -u */ - const char *_version; /*!< -V */ - - const char * _will_message; /*!< --will-payload */ - int _will_qos; /*!< --will-qos */ - int _will_retain; /*!< --will-retain */ - const char * _will_topic; /*!< --will-topic */ - const char *_cafile; /*!< --cafile */ + int max_inflight; /*!< -M */ + int port; /*!< -p */ + const char *password; /*!< -P */ + const char **_topics; /*!< -t */ + const char *user; /*!< -u */ + const char *protocol_version; /*!< -V */ + + const char * will_message; /*!< --will-payload */ + int will_qos; /*!< --will-qos */ + const char * will_topic; /*!< --will-topic */ + const char *cafile; /*!< --cafile */ const char *_capath; /*!< --capath */ - const char *_cert; /*!< --cert */ - const char *_privkey; /*!< --key */ - const char *_ciphers; /*!< --ciphers */ + const char *cert; /*!< --cert */ + const char *privkey; /*!< --key */ + const char *ciphers; /*!< --ciphers */ const char *_tls_version; /*!< --tls-version */ const char *_psk_key; /*!< --psk */ const char *_psk_identity; /*!< --psk-identity */ @@ -79,28 +75,36 @@ const char * uri; const char * topic; + const char *_clientid; /*!< -i */ const char * clientid; unsigned persist_type; void * persist_ctx; /* MQTTClient_persistence */ const char * persist_path; - const char *dn; - unsigned qos; - unsigned token; - unsigned msecs; + unsigned qos; /*<! -q */ + unsigned timeout; /* msecs */ int debug; int trace; volatile int finished; - volatile int connected; - volatile int subscribed; + volatile unsigned token; /* XXX MQTTClient_subscriptve.c */ char * serverURI; int MQTTVersion; int sessionPresent; + const char *dn; + + MQTTAsync_connectOptions Copts; + MQTTAsync_willOptions Wopts; + MQTTAsync_SSLOptions Sopts; + MQTTAsync_disconnectOptions Dopts; + MQTTAsync_responseOptions Ropts; }; + +extern struct rpmmqtt_s _mqtt; + #endif /* _RPMMQTT_INTERNAL */ #ifdef __cplusplus @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/tmqtt.c ============================================================================ $ cvs diff -u -r1.1.2.5 -r1.1.2.6 tmqtt.c --- rpm/rpmio/tmqtt.c 1 Jul 2016 14:43:47 -0000 1.1.2.5 +++ rpm/rpmio/tmqtt.c 2 Jul 2016 09:33:44 -0000 1.1.2.6 @@ -11,6 +11,10 @@ #include <poptIO.h> #include <rpmdefs.h> + +#ifdef WITH_MQTT +#include <MQTTAsync.h> +#endif #define _RPMMQTT_INTERNAL #include "rpmmqtt.h" @@ -25,8 +29,8 @@ int rc = 0; #ifdef DYING - (void) rpmmqttRead(mqtt, "rpm/#?qos=2", 0); - (void) rpmmqttRead(mqtt, "$SYS/broker/version?qos=2", 0); + (void) rpmmqttRead(mqtt, "rpm/#?qos=0", 0); + (void) rpmmqttRead(mqtt, "$SYS/broker/version?qos=0", 0); #endif nw = rpmmqttWrite(mqtt, "bzzt ...", 0); @@ -47,23 +51,12 @@ #define F_ISSET(_f, _FLAG) (((_f) & ((MQTT_FLAGS_##_FLAG) & ~0x40000000)) != MQTT_FLAGS_NONE) #define MF_ISSET(_FLAG) F_ISSET(mqtt->flags, _FLAG) -static struct rpmmqtt_s _mqtt = { - ._host = "localhost", /* "localhost" */ - ._id = "rpm-%{pid}", /* "mosquito_pub-%{pid}" */ - ._idprefix = "rpm", /* "mosquito_pub" */ - ._keepalive = 60, /* 60 */ - ._port = 1883, /* 1883 */ - ._qos = 0, /* 0 */ - ._version = "31", /* "mqttv31" */ - ._tls_version="1.2", /* "tlsv1.2" */ -}; - struct poptOption mqttOptionsTable[] = { - { NULL, 'c', POPT_BIT_SET, &_mqtt.flags, MQTT_FLAGS_NOCLEAN, + { "clean", 'c', POPT_ARG_VAL|POPT_ARGFLAG_XOR, &_mqtt.flags, MQTT_FLAGS_CLEAN, N_("(sub) Do not clean session."), NULL }, { NULL, 'C', POPT_ARG_INT, &_mqtt._max_msg_count, 0, N_("(sub) Disconnect/exit after <MSGCNT>."), N_("<MSGCNT>") }, - { NULL, 'N', POPT_BIT_SET, &_mqtt.flags, MQTT_FLAGS_NOMSGEOL, + { "eol", 'N', POPT_ARG_VAL|POPT_ARGFLAG_XOR, &_mqtt.flags, MQTT_FLAGS_EOL, N_("(sub) Do not print NL after messages."), NULL }, { NULL, 'R', POPT_BIT_SET, &_mqtt.flags, MQTT_FLAGS_NOSTALE, N_("(sub) Do not print stale messages."), NULL }, @@ -72,27 +65,27 @@ N_("Connect from local <ADDR>."), N_("<ADDR>") }, { "file", 'f', POPT_ARG_STRING, &_mqtt._infile, 0, N_("Send <FILE> as message."), N_("<FILE>") }, - { "host", 'h', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt._host, 0, + { "host", 'h', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt.host, 0, N_("Connect to <HOST>."), N_("<HOST>") }, - { "id", 'i', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt._id, 0, + { "id", 'i', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt._clientid, 0, N_("MQTT client <ID>."), N_("<ID>") }, - { "prefix", 'I', POPT_ARG_STRING, &_mqtt._idprefix, 0, + { "prefix", 'I', POPT_ARG_STRING, &_mqtt.idprefix, 0, N_("Use <IDPREFIX>-%{pid} for clientid."), N_("<IDPREFIX>") }, - { "keepalive", 'k', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, &_mqtt._keepalive, 0, + { "keepalive", 'k', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, &_mqtt.keepalive, 0, N_("Keep alive in <SECS>."), N_("<SECS>") }, { NULL, 'l', POPT_BIT_SET, &_mqtt.flags, MQTT_FLAGS_STDIN_EACH, N_("Send messages line-by-line from stdin."), NULL }, { "message", 'm', POPT_ARG_STRING, &_mqtt._message, 0, N_("MQTT payload <MESSAGE> to send."), N_("<MESSAGE>") }, - { NULL, 'M', POPT_ARG_INT, &_mqtt._max_inflight, 0, + { NULL, 'M', POPT_ARG_INT, &_mqtt.max_inflight, 0, N_("Permit <MAX> inflight messages."), N_("<MAX>") }, { "null", 'n', POPT_BIT_SET, &_mqtt.flags, MQTT_FLAGS_EMPTY, N_("Send a null (zero length) message."), NULL }, - { "port", 'p', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, &_mqtt._port, 0, + { "port", 'p', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, &_mqtt.port, 0, N_("Connect to network <PORT>."), N_("<PORT>") }, - { "pass", 'P', POPT_ARG_STRING, &_mqtt._password, 0, + { "pass", 'P', POPT_ARG_STRING, &_mqtt.password, 0, N_("Remote user <PASSWORD>."), N_("<PASSWORD>") }, - { "qos", 'q', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, &_mqtt._qos, 0, + { "qos", 'q', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, &_mqtt.qos, 0, N_("MQTT <QOS> level."), N_("<QOS>") }, { "retain", 'r', POPT_BIT_SET, &_mqtt.flags, MQTT_FLAGS_RETAIN, N_("Retain the message on the host."), NULL }, @@ -100,31 +93,31 @@ N_("Send stdin lines as a single message."), NULL }, { NULL, 'S', POPT_BIT_SET, &_mqtt.flags, MQTT_FLAGS_DNSSRV, N_("Use SRV record to find remote host."), NULL }, - { "topic", 't', POPT_ARG_ARGV, &_mqtt._topic, 0, + { "topic", 't', POPT_ARG_ARGV, &_mqtt._topics, 0, N_("MQTT pub/sub <TOPIC>."), N_("<TOPIC>") }, - { "user", 'u', POPT_ARG_STRING, &_mqtt._user, 0, + { "user", 'u', POPT_ARG_STRING, &_mqtt.user, 0, N_("Remote <USER>."), N_("<USER>") }, - { NULL, 'V', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt._version, 0, + { NULL, 'V', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt.protocol_version, 0, N_("MQTT protocol <VERSION>"), N_("{31|311}") }, - { "will-payload", '\0', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt._will_message, 0, + { "will-payload", '\0', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt.will_message, 0, N_("Will payload <MESSAGE> ."), N_("<MESSAGE>") }, - { "will-qos", '\0', POPT_ARG_INT, &_mqtt._will_qos, 0, + { "will-qos", '\0', POPT_ARG_INT, &_mqtt.will_qos, 0, N_("Will <QOS> level."), N_("<QOS>") }, - { "will-topic", '\0', POPT_ARG_STRING, &_mqtt._will_topic, 0, + { "will-topic", '\0', POPT_ARG_STRING, &_mqtt.will_topic, 0, N_("Will <TOPIC> to publish to."), N_("<TOPIC>") }, { "will-retain", '\0', POPT_BIT_SET, &_mqtt.flags, MQTT_FLAGS_WILL_RETAIN, N_("Retain the client Will."), NULL }, - { "cafile", '\0', POPT_ARG_STRING, &_mqtt._cafile, 0, + { "cafile", '\0', POPT_ARG_STRING, &_mqtt.cafile, 0, N_("CA certificate(s) from <FILE>."), N_("<FILE>") }, { "capath", '\0', POPT_ARG_STRING, &_mqtt._capath, 0, N_("CA certificate(s) in <DIR>."), N_("<DIR>") }, - { "cert", '\0', POPT_ARG_STRING, &_mqtt._cert, 0, + { "cert", '\0', POPT_ARG_STRING, &_mqtt.cert, 0, N_("Client authentication <CERT>."), N_("<CERT>") }, - { "key", '\0', POPT_ARG_STRING, &_mqtt._privkey, 0, + { "key", '\0', POPT_ARG_STRING, &_mqtt.privkey, 0, N_("Client private <KEY>."), N_("<KEY>") }, - { "ciphers", '\0', POPT_ARG_STRING, &_mqtt._ciphers, 0, + { "ciphers", '\0', POPT_ARG_STRING, &_mqtt.ciphers, 0, N_("TLS ciphers <LIST> (openssl)."), N_("<LIST>") }, { "tls-version", '\0', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&_mqtt._tls_version, 0, N_("TLS protocol <VERSION>."), N_("{1.2|1.1|1}") }, @@ -153,17 +146,19 @@ static char _mqtt_argv[] = "\ mqtt://luser:jasnl@localhost:1883/rpm/mqtt?qos=1,timeout=10000,trace=4 \n\ rpm/#?qos=1 \n\ - $SYS/broker/version?qos=1 \n\ - $SYS/broker/timestamp?qos=1 \n\ - $SYS/broker/uptime?qos=1 \n\ - $SYS/broker/clients/#?qos=1 \n\ - $SYS/broker/messages/#?qos=1 \n\ - $SYS/broker/subscriptions/#?qos=1 \n\ - $SYS/broker/heap/#?qos=1 \n\ - $SYS/broker/publish/#?qos=1 \n\ - $SYS/broker/bytes/#?qos=1 \n\ "; - poptContext optCon = rpmioInit(argc, argv, mqttOptionsTable); +#ifdef REF + $SYS/broker/version?qos=0 \n\ + $SYS/broker/timestamp?qos=0 \n\ + $SYS/broker/uptime?qos=0 \n\ + $SYS/broker/clients/#?qos=0 \n\ + $SYS/broker/messages/#?qos=0 \n\ + $SYS/broker/subscriptions/#?qos=0 \n\ + $SYS/broker/heap/#?qos=0 \n\ + $SYS/broker/publish/#?qos=0 \n\ + $SYS/broker/bytes/#?qos=0 \n +#endif + poptContext optCon; #ifdef UNUSED ARGV_t av = poptGetArgs(optCon); int ac = argvCount(av); @@ -172,7 +167,22 @@ rpmmqtt mqtt; int rc = -1; -(void) rpmDefineMacro(NULL, "_mqtt_trace 2", 0); + /* Initialize the (allocated) CLI defaults. */ + memset(&_mqtt, 0, sizeof(_mqtt)); + _mqtt.flags |= (MQTT_FLAGS_CLEAN|MQTT_FLAGS_EOL); + _mqtt.port = 1883; + _mqtt.max_inflight = 20; + _mqtt.keepalive = 60; + _mqtt.qos = 0; + _mqtt.protocol_version = rpmExpand("31", NULL); + _mqtt.host = rpmExpand("localhost", NULL); + _mqtt.idprefix = rpmExpand("rpm", NULL); + _mqtt._clientid = rpmExpand(_mqtt.idprefix, "-%%{pid}", NULL); + _mqtt._tls_version = rpmExpand("1.2", NULL); + + + /* Initialize the _mqtt_ macro context */ +(void) rpmDefineMacro(NULL, "_mqtt_trace 4", 0); (void) rpmDefineMacro(NULL, "_mqtt_cachedir /var/cache/mqtt", 0); (void) rpmDefineMacro(NULL, "_mqtt_user luser", 0); @@ -181,15 +191,24 @@ (void) rpmDefineMacro(NULL, "_mqtt_port 1883", 0); (void) rpmDefineMacro(NULL, "_mqtt_clientid rpm-%{pid}", 0); (void) rpmDefineMacro(NULL, "_mqtt_topic rpm/#", 0); -(void) rpmDefineMacro(NULL, "_mqtt_qos 1", 0); -(void) rpmDefineMacro(NULL, "_mqtt_persist 2", 0); +(void) rpmDefineMacro(NULL, "_mqtt_qos 0", 0); +(void) rpmDefineMacro(NULL, "_mqtt_persist 0", 0); (void) rpmDefineMacro(NULL, "_mqtt_timeout 10000", 0); (void) rpmDefineMacro(NULL, "_mqtt_prefix %{now} rpm pid %{pid} on cpu%{cpu} %{user}:%{group} ", 0); + optCon = rpmioInit(argc, argv, mqttOptionsTable); + mqtt = rpmmqttNew(_av, _mqtt.flags); rc = _DoMQTT(mqtt); +sleep(1); mqtt = rpmmqttFree(mqtt); + _mqtt.protocol_version = _free(_mqtt.protocol_version); + _mqtt.host = _free(_mqtt.host); + _mqtt.idprefix = _free(_mqtt.idprefix); + _mqtt._clientid = _free(_mqtt._clientid); + _mqtt._tls_version = _free(_mqtt._tls_version); + optCon = rpmioFini(optCon); return rc; } @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository rpm-cvs@rpm5.org