RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: j...@rpm5.org Module: rpm Date: 04-Jul-2016 09:45:24 Branch: rpm-5_4 Handle: 2016070407452301 Modified files: (Branch: rpm-5_4) rpm CHANGES rpm/macros mqtt.in rpm/rpmio librpmio.vers macro.c poptIO.c rpmio.c rpmmqtt.c rpmmqtt.h tmqtt.c Log: - mqtt: stub-in a %{mqtt:...} mcro embedding. Summary: Revision Changes Path 1.3501.2.510+1 -0 rpm/CHANGES 1.1.2.3 +28 -11 rpm/macros/mqtt.in 2.199.2.59 +5 -2 rpm/rpmio/librpmio.vers 2.249.2.36 +16 -8 rpm/rpmio/macro.c 1.94.2.22 +3 -0 rpm/rpmio/poptIO.c 1.230.2.34 +2 -1 rpm/rpmio/rpmio.c 1.1.2.13 +409 -277 rpm/rpmio/rpmmqtt.c 1.1.2.12 +17 -9 rpm/rpmio/rpmmqtt.h 1.1.2.9 +15 -10 rpm/rpmio/tmqtt.c ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/CHANGES ============================================================================ $ cvs diff -u -r1.3501.2.509 -r1.3501.2.510 CHANGES --- rpm/CHANGES 29 Jun 2016 12:17:57 -0000 1.3501.2.509 +++ rpm/CHANGES 4 Jul 2016 07:45:23 -0000 1.3501.2.510 @@ -1,4 +1,5 @@ 5.4.17 -> 5.4.18: + - jbj: mqtt: stub-in a %{mqtt:...} mcro embedding. - jbj: mqtt: add self-subscriptions and macro configgery. - jbj: macro: add primitives useful for log spewage. - jbj: mqtt: stub-in a paho-mqtt client. @@ . patch -p0 <<'@@ .' Index: rpm/macros/mqtt.in ============================================================================ $ cvs diff -u -r1.1.2.2 -r1.1.2.3 mqtt.in --- rpm/macros/mqtt.in 30 Jun 2016 16:59:39 -0000 1.1.2.2 +++ rpm/macros/mqtt.in 4 Jul 2016 07:45:24 -0000 1.1.2.3 @@ -1,14 +1,31 @@ #============================================================================== # ---- MQTT configuration. -# -%_mqtt_cachedir /var/cache/mqtt -%_mqtt_user luser -%_mqtt_pass jasnl -%_mqtt_host localhost -%_mqtt_port 1883 -%_mqtt_clientid rpm -%_mqtt_topic rpm/# -%_mqtt_qos 1 +#%_mqtt_scheme mqtt +#%_mqtt_user luser +#%_mqtt_pass jasnl +#%_mqtt_host localhost +#%_mqtt_port 1883 +#%_mqtt_clientid rpm +#%_mqtt_topic rpm/# +#%_mqtt_qos 1 + +# Default publish/subscribe topics +#%_mqtt_u %{?_mqtt_user:%{_mqtt_user}%{?_mqtt_pass::%{_mqtt_pass}}} +#%_mqtt_h %{?_mqtt_host:%{_mqtt_host}%{?_mqtt_port::%{_mqtt_port}}} +#%_mqtt_argv \ +# mqtt://luser:jasnl@localhost:1883/rpm/mqtt?qos=1,timeout=10000 \ +#%{nil} + +# Timeout in msecs +#%_mqtt_timeout 10000 + +# Default connection template(s) +%_mqtt_uri + mqtt://luser:jasnl@localhost:1883/rpm/mqtt?qos=2,timeout=10000 \ + +# Client persistence store %_mqtt_persist 2 -%_mqtt_timeout 10000 -%_mqtt_prefix %{now} rpm pid %{pid} on cpu%{cpu} %{user}:%{group} %{nil} +%_mqtt_cachedir /var/cache/mqtt + +# Prepended to msgs +%_mqtt_prefix %{now} %{nil} @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/librpmio.vers ============================================================================ $ cvs diff -u -r2.199.2.58 -r2.199.2.59 librpmio.vers --- rpm/rpmio/librpmio.vers 27 Jun 2016 22:00:14 -0000 2.199.2.58 +++ rpm/rpmio/librpmio.vers 4 Jul 2016 07:45:23 -0000 2.199.2.59 @@ -635,11 +635,14 @@ rpmmgFile; rpmmgBuffer; _rpmmqtt_debug; + _rpmmqttI; + _rpmmqttPool; rpmmqttConnect; rpmmqttDisconnect; rpmmqttNew; - rpmmqttWrite; - rpmmqttRead; + rpmmqttPub; + rpmmqttRun; + rpmmqttSub; _rpmmrb_debug; _rpmmrbI; _rpmmrbPool; @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/macro.c ============================================================================ $ cvs diff -u -r2.249.2.35 -r2.249.2.36 macro.c --- rpm/rpmio/macro.c 29 Jun 2016 14:03:27 -0000 2.249.2.35 +++ rpm/rpmio/macro.c 4 Jul 2016 07:45:23 -0000 2.249.2.36 @@ -89,6 +89,7 @@ #include <rpmjni.h> #include <rpmjs.h> +#include <rpmmqtt.h> #include <rpmmrb.h> #include <rpmperl.h> #include <rpmpython.h> @@ -303,7 +304,7 @@ if (mc == NULL) mc = rpmGlobalMacroContext; if (fp == NULL) fp = stderr; - + fprintf(fp, "========================\n"); if (mc->macroTable != NULL) { int i; @@ -363,7 +364,7 @@ } av[ac] = NULL; *avp = av = (const char **) xrealloc(av, (ac+1) * sizeof(*av)); - + return ac; } #endif @@ -393,7 +394,7 @@ t[namelen] = '\0'; name = t; } - + key = (MacroEntry) memset(alloca(sizeof(*key)), 0, sizeof(*key)); /*@-temptrans -assignexpose@*/ key->name = (char *)name; @@ -1070,7 +1071,7 @@ b = be = stpcpy(buf, me->name); addMacro(mb->mc, "0", NULL, buf, mb->depth); - + argc = 1; /* XXX count argv[0] */ /* Copy args into buf until lastc */ @@ -1189,7 +1190,7 @@ if (argv != NULL) for (c = 0; argv[c] != NULL; c++) argc++; - + /* Add arg count as macro. */ sprintf(aname, "%d", argc); addMacro(mb->mc, "#", NULL, aname, mb->depth); @@ -2292,6 +2293,13 @@ } #endif +#ifdef WITH_MQTT + if (STREQ("mqtt", f, fn)) { + RPMIOPOOL_INTERP_EXPAND(mqtt) + continue; + } +#endif + #ifdef WITH_MRBEMBED if (STREQ("mrb", f, fn) || STREQ("mruby", f, fn)) { RPMIOPOOL_INTERP_EXPAND(mrb) @@ -3224,7 +3232,7 @@ void rpmFreeMacros(MacroContext mc) { - + if (mc == NULL) mc = rpmGlobalMacroContext; if (mc->macroTable != NULL) { @@ -3405,7 +3413,7 @@ (void) expandMacros(NULL, mc, t, tn + bufn + 1); t[tn + bufn] = '\0'; t = (char *) xrealloc(t, strlen(t) + 1); - + return t; } @@ -3441,7 +3449,7 @@ (void) expandMacros(NULL, mc, t, tn + bufn + 1); t[tn + bufn] = '\0'; t = (char *) xrealloc(t, strlen(t) + 1); - + return t; } /*@=modfilesys@*/ @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/poptIO.c ============================================================================ $ cvs diff -u -r1.94.2.21 -r1.94.2.22 poptIO.c --- rpm/rpmio/poptIO.c 25 Jun 2016 22:36:54 -0000 1.94.2.21 +++ rpm/rpmio/poptIO.c 4 Jul 2016 07:45:23 -0000 1.94.2.22 @@ -94,6 +94,7 @@ extern int _rpmio_debug; extern int _rpmiob_debug; extern int _rpmlua_debug; +extern int _rpmmqtt_debug; extern int _rpmsq_debug; extern int _rpmzq_debug; extern int _tar_debug; @@ -650,6 +651,8 @@ #endif { "rpmmgdebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &_rpmmg_debug, -1, N_("Debug rpmmg magic"), NULL}, + { "rpmmqttdebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &_rpmmqtt_debug, -1, + N_("Debug rpmmqtt magic"), NULL}, { "rpmmrbdebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &_rpmmrb_debug, -1, N_("Debug embedded MRuby interpreter"), NULL}, { "rpmgfsdebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &_rpmgfs_debug, -1, @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmio.c ============================================================================ $ cvs diff -u -r1.230.2.33 -r1.230.2.34 rpmio.c --- rpm/rpmio/rpmio.c 28 Jun 2016 07:18:12 -0000 1.230.2.33 +++ rpm/rpmio/rpmio.c 4 Jul 2016 07:45:23 -0000 1.230.2.34 @@ -75,6 +75,7 @@ #include <rpmjni.h> #include <rpmjs.h> #include <rpmlua.h> /* XXX rpmioClean() calls rpmluaFree() */ +#include <rpmmqtt.h> #include <rpmmrb.h> #include <rpmnix.h> #include <rpmodbc.h> @@ -3317,6 +3318,7 @@ RPMIOPOOL_INTERP_FREE(aug) RPMIOPOOL_INTERP_FREE(gfs) + RPMIOPOOL_INTERP_FREE(mqtt) #ifdef NOTYET RPMIOPOOL_FREE(mgo) @@ -3328,7 +3330,6 @@ RPMIOPOOL_FREE(cudf) RPMIOPOOL_FREE(cvs) RPMIOPOOL_FREE(date) - RPMIOPOOL_FREE(mqtt) _odbcPool = rpmioFreePool(_odbcPool); RPMIOPOOL_FREE(sed) @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.c ============================================================================ $ cvs diff -u -r1.1.2.12 -r1.1.2.13 rpmmqtt.c --- rpm/rpmio/rpmmqtt.c 3 Jul 2016 08:41:57 -0000 1.1.2.12 +++ rpm/rpmio/rpmmqtt.c 4 Jul 2016 07:45:23 -0000 1.1.2.13 @@ -58,8 +58,8 @@ fprintf(fp, "fragment: %s\n", u->fragment); if (u->query) { ARGV_t av = NULL; - int xx; - xx = argvSplit(&av, u->query, ","); + int xx = argvSplit(&av, u->query, ","); + (void)xx; argvPrint(u->query, av, fp); av = argvFree(av); } @@ -77,79 +77,131 @@ if (mqtt) { #define PRINT(_fmt, _foo) fprintf(fp, "%19s: %"#_fmt"\n", #_foo, mqtt->_foo) - PRINT(x, flags); - PRINT(d, msg_input); - PRINT(d, msg_output); - argvPrint("mqtt->av", (ARGV_t)mqtt->av, fp); - - PRINT(s, _address); - argvPrint("mqtt->msgs", (ARGV_t)mqtt->msgs, fp); - - PRINT(s, cafile); - PRINT(s, _capath); - PRINT(s, cert); - PRINT(s, privkey); - PRINT(s, ciphers); - - PRINT(s, _tls_version); - PRINT(s, _psk_key); - PRINT(s, _psk_identity); - PRINT(s, _proxy); - -fprintf(stderr, "====================\n"); - PRINT(d, keepalive); - PRINT(d, max_inflight); - PRINT(s, protocol_version); - - PRINT(s, will_message); - PRINT(d, will_qos); - PRINT(s, will_topic); - - PRINT(s, user); - PRINT(s, password); - PRINT(s, host); - PRINT(d, port); - PRINT(s, uri); - - PRINT(s, idprefix); - PRINT(s, _clientid); - PRINT(s, clientid); - - PRINT(u, qos); - PRINT(u, timeout); + if (mqtt->flags != 0x40000003) + PRINT(x, flags); + if (mqtt->msg_input != 0) + PRINT(d, msg_input); + if (mqtt->msg_output != 1) + PRINT(d, msg_output); + if (mqtt->av) + argvPrint("mqtt->av", (ARGV_t)mqtt->av, fp); + + if (mqtt->_address) + PRINT(s, _address); + if (mqtt->msgs) + argvPrint("mqtt->msgs", (ARGV_t)mqtt->msgs, fp); + + if (mqtt->cafile) + PRINT(s, cafile); + if (mqtt->_capath) + PRINT(s, _capath); + if (mqtt->cert) + PRINT(s, cert); + if (mqtt->privkey) + PRINT(s, privkey); + if (mqtt->ciphers) + PRINT(s, ciphers); + + if (mqtt->_tls_version == NULL || strcmp(mqtt->_tls_version, "1.2")) + PRINT(s, _tls_version); + if (mqtt->_psk_key) + PRINT(s, _psk_key); + if (mqtt->_psk_identity) + PRINT(s, _psk_identity); + if (mqtt->_proxy) + PRINT(s, _proxy); + + if (mqtt->keepalive != 60) + PRINT(d, keepalive); + if (mqtt->max_inflight != 20) + PRINT(d, max_inflight); + if (mqtt->protocol_version == NULL + || strcmp(mqtt->protocol_version, "auto")) + PRINT(s, protocol_version); + + if (mqtt->will_topic) { + PRINT(s, will_message); + PRINT(d, will_qos); + PRINT(s, will_topic); + } + + if (mqtt->user) + PRINT(s, user); + if (mqtt->password) + PRINT(s, password); + if (mqtt->host == NULL || strcmp(mqtt->host, "localhost")) + PRINT(s, host); + if (mqtt->port != 1883) + PRINT(d, port); + if (mqtt->uri == NULL || strcmp(mqtt->uri, "tcp://localhost:1883")) + PRINT(s, uri); + + if (mqtt->idprefix == NULL || strcmp(mqtt->idprefix, "rpm")) + PRINT(s, idprefix); + if (mqtt->_clientid == NULL || strcmp(mqtt->_clientid, "rpm-%{pid}")) + PRINT(s, _clientid); + if (mqtt->clientid == NULL || strncmp(mqtt->_clientid, "rpm-", 4)) + PRINT(s, clientid); + + if (mqtt->qos != 2) + PRINT(u, qos); + if (mqtt->timeout != 10000) + PRINT(u, timeout); if (mqtt->_topics) argvPrint("mqtt->_topics", (ARGV_t)mqtt->_topics, fp); - PRINT(s, topic); - -fprintf(stderr, "====================\n"); - PRINT(d, ut); - dumpU("mqtt->u", mqtt->u); - PRINT(p, C); - PRINT(u, persist_type); - PRINT(p, persist_ctx); - PRINT(s, persist_path); - - PRINT(d, debug); - PRINT(d, trace); - PRINT(d, finished); - PRINT(u, token); - - PRINT(d, msg_count); - PRINT(d, max_msg_count); - - PRINT(s, serverURI); - PRINT(d, MQTTVersion); - PRINT(d, sessionPresent); + if (mqtt->_filter_out) + argvPrint("mqtt->_filter_out", (ARGV_t)mqtt->_filter_out, fp); + if (mqtt->topic == NULL || strcmp(mqtt->topic, "rpm/mqtt")) + PRINT(s, topic); + + if (mqtt->u) { + PRINT(d, ut); + dumpU("mqtt->u", mqtt->u); + } + + if (mqtt->I) + PRINT(p, I); + if (mqtt->persist_type != 2) { + PRINT(u, persist_type); + PRINT(p, persist_ctx); + PRINT(s, persist_path); + } + + if (mqtt->debug) + PRINT(d, debug); + if (mqtt->trace) + PRINT(d, trace); + if (mqtt->finished) + PRINT(d, finished); + if (mqtt->token) + PRINT(u, token); + + if (mqtt->msg_count) { + PRINT(d, msg_count); + PRINT(d, max_msg_count); + } + + if (mqtt->serverURI) { + PRINT(s, serverURI); + PRINT(d, MQTTVersion); + PRINT(d, sessionPresent); + } - PRINT(s, dn); + if (mqtt->cachedn) + PRINT(s, cachedn); if (mqtt->ifn) argvPrint("mqtt->ifn", (ARGV_t)mqtt->ifn, fp); - PRINT(p, ifd); - PRINT(s, ofn); - PRINT(p, ofd); - PRINT(p, iob); + if (mqtt->ifd) + PRINT(p, ifd); + if (mqtt->ofn == NULL || strcmp(mqtt->ofn, "-")) { + PRINT(s, ofn); + if (mqtt->ofd) + PRINT(p, ofd); + } + if (mqtt->iob) + PRINT(p, iob); #undef PRINT } @@ -175,11 +227,14 @@ _ENTRY(BAD_STRUCTURE), _ENTRY(BAD_QOS), _ENTRY(NO_MORE_MSGIDS), + _ENTRY(OPERATION_INCOMPLETE), + _ENTRY(MAX_BUFFERED_MESSAGES), #else { 0, NULL }, #endif }; static size_t rpmmqtt_nerrs = sizeof(rpmmqtt_errs) / sizeof(rpmmqtt_errs[0]); +#undef _ENTRY static const char * rpmmqttStrerror(int v) { @@ -255,6 +310,12 @@ } break; } +#ifdef NEW + message->qos + message->retained + message->dup + message->msgid +#endif MQTTAsync_freeMessage(&message); MQTTAsync_free(topic); @@ -289,15 +350,7 @@ { rpmmqtt mqtt = (rpmmqtt) _mqtt; fprintf(stderr, "<-- %s(%p,%p) MQTT disconnect failed\n", __FUNCTION__, _mqtt, response); - if (response) { - const char *s = response->message; - int token = response->token; - int code = response->code; - rpmlog(RPMLOG_WARNING, - "MQTT disconnect failed: code(%d) msg %s\n", - token, code, s); - } else - rpmlog(RPMLOG_WARNING, "MQTT disconnect failed\n"); + rpmlog(RPMLOG_WARNING, "MQTT disconnect failed\n"); mqtt->finished = 1; } @@ -317,14 +370,7 @@ { rpmmqtt mqtt = (rpmmqtt) _mqtt; fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _mqtt, response); - if (response) { - const char *s = response->message; - int token = response->token; - int code = response->code; - rpmlog(RPMLOG_WARNING, "MQTT connect failed: token(%d) code(%d) msg %s\n", - token, code, s); - } else - rpmlog(RPMLOG_WARNING, "MQTT connect failed\n"); + rpmlog(RPMLOG_WARNING, "MQTT connect failed\n"); mqtt->finished = 1; } @@ -333,6 +379,7 @@ rpmmqtt mqtt = (rpmmqtt) _mqtt; if (response) { + mqtt->token = response->token; mqtt->serverURI = xstrdup(response->alt.connect.serverURI); mqtt->MQTTVersion = response->alt.connect.MQTTVersion; mqtt->sessionPresent = response->alt.connect.sessionPresent; @@ -429,21 +476,21 @@ *_mqttp = _mqtt; - mqtt->dn = _free(mqtt->dn); + mqtt->cachedn = _free(mqtt->cachedn); dn = rpmGetPath(mqtt->persist_path, "/", clientID, "-", serverURI, NULL); for (te = dn; (te = strchr(te, ':')) != NULL; te++) *te = '-'; - if (rpmioMkpath(dn, (mode_t)0775, (uid_t)-1, (gid_t)-1)) { + if (rpmioMkpath(dn, (mode_t)0740, (uid_t)-1, (gid_t)-1)) { dn = _free(dn); goto exit; } - mqtt->dn = dn; + mqtt->cachedn = dn; rc = 0; exit: -SPEW((stderr, "<-- %s(%p,\"%s\",\"%s\",%p) rc %d dn %s\n", __FUNCTION__, _mqttp, clientID, serverURI, _mqtt, rc, mqtt->dn)); +SPEW((stderr, "<-- %s(%p,\"%s\",\"%s\",%p) rc %d dn %s\n", __FUNCTION__, _mqttp, clientID, serverURI, _mqtt, rc, mqtt->cachedn)); return rc; } @@ -454,13 +501,13 @@ int rc = MQTTCLIENT_PERSISTENCE_ERROR; - if (mqtt->dn == NULL) + if (mqtt->cachedn == NULL) goto exit; - if (Rmdir(mqtt->dn) && (errno != ENOENT && errno != ENOTEMPTY)) + if (Rmdir(mqtt->cachedn) && (errno != ENOENT && errno != ENOTEMPTY)) goto exit; - rpmlog(RPMLOG_DEBUG, D_("removed directory %s\n"), mqtt->dn); + rpmlog(RPMLOG_DEBUG, D_("removed directory %s\n"), mqtt->cachedn); rc = 0; @@ -479,11 +526,11 @@ size_t nw = 0; int rc = MQTTCLIENT_PERSISTENCE_ERROR; - if (mqtt->dn == NULL) + if (mqtt->cachedn == NULL) goto exit; /* XXX add .msg extension? */ - fn = rpmGetPath(mqtt->dn, "/", key, NULL); + fn = rpmGetPath(mqtt->cachedn, "/", key, NULL); fd = Fopen(fn, "wb"); if (fd == NULL || Ferror(fd)) goto exit; @@ -525,11 +572,11 @@ struct stat sb; int rc = MQTTCLIENT_PERSISTENCE_ERROR; - if (mqtt->dn == NULL) + if (mqtt->cachedn == NULL) goto exit; /* XXX add .msg extension? */ - fn = rpmGetPath(mqtt->dn, "/", key, NULL); + fn = rpmGetPath(mqtt->cachedn, "/", key, NULL); fd = Fopen(fn, "rb"); if (fd == NULL || Ferror(fd)) goto exit; @@ -566,11 +613,11 @@ char *fn = NULL; int rc = MQTTCLIENT_PERSISTENCE_ERROR; - if (mqtt->dn == NULL) + if (mqtt->cachedn == NULL) goto exit; /* XXX add .msg extension? */ - fn = rpmGetPath(mqtt->dn, "/", key, NULL); + fn = rpmGetPath(mqtt->cachedn, "/", key, NULL); if (Unlink(fn) && errno != ENOENT) goto exit; @@ -591,14 +638,14 @@ struct dirent *dp; int rc = MQTTCLIENT_PERSISTENCE_ERROR; - if (mqtt->dn == NULL) + if (mqtt->cachedn == NULL) goto exit; - if ((dir = Opendir(mqtt->dn)) == NULL) + if ((dir = Opendir(mqtt->cachedn)) == NULL) goto exit; while ((dp = Readdir(dir)) != NULL) { - char * fn = rpmGetPath(mqtt->dn, "/", dp->d_name, NULL); + char * fn = rpmGetPath(mqtt->cachedn, "/", dp->d_name, NULL); struct stat sb; if (Lstat(fn, &sb) == 0 && S_ISREG(sb.st_mode)) @@ -628,14 +675,14 @@ int nerrs = 0; int rc = MQTTCLIENT_PERSISTENCE_ERROR; - if (mqtt->dn == NULL) + if (mqtt->cachedn == NULL) goto exit; - if ((dir = Opendir(mqtt->dn)) == NULL) + if ((dir = Opendir(mqtt->cachedn)) == NULL) goto exit; while ((dp = Readdir(dir)) != NULL) { - char * fn = rpmGetPath(mqtt->dn, "/", dp->d_name, NULL); + char * fn = rpmGetPath(mqtt->cachedn, "/", dp->d_name, NULL); struct stat sb; if (Lstat(fn, &sb) == 0 && S_ISREG(sb.st_mode)) { @@ -662,14 +709,14 @@ struct dirent *dp; int rc = MQTTCLIENT_PERSISTENCE_ERROR; - if (mqtt->dn == NULL) + if (mqtt->cachedn == NULL) goto exit; - if ((dir = Opendir(mqtt->dn)) == NULL) + if ((dir = Opendir(mqtt->cachedn)) == NULL) goto exit; while ((dp = Readdir(dir)) != NULL) { - char * fn = rpmGetPath(mqtt->dn, "/", dp->d_name, NULL); + char * fn = rpmGetPath(mqtt->cachedn, "/", dp->d_name, NULL); struct stat sb; if (Lstat(fn, &sb) == 0 && S_ISREG(sb.st_mode)) { @@ -705,93 +752,151 @@ #endif /* WITH_MQTT */ /*==============================================================*/ -static const MQTTAsync_willOptions _Wopts - = MQTTAsync_willOptions_initializer; -static const MQTTAsync_connectOptions _Copts - = MQTTAsync_connectOptions_initializer; -static const MQTTAsync_SSLOptions _Sopts - = MQTTAsync_SSLOptions_initializer; -static const MQTTAsync_disconnectOptions _Dopts = - MQTTAsync_disconnectOptions_initializer; -static const MQTTAsync_responseOptions _Ropts = - MQTTAsync_responseOptions_initializer; -int rpmmqttConnect(rpmmqtt mqtt) +static void * AOBJ(rpmmqtt mqtt, char otype) { - int rc = -1; + void * ptr = NULL; #ifdef WITH_MQTT - if (MQTTAsync_isConnected(mqtt->C)) { - rc = 0; - } else { - urlinfo u = mqtt->u; - - MQTTAsync_willOptions *Wopts = &mqtt->Wopts; - memcpy(Wopts, &_Wopts, sizeof(*Wopts)); -#ifdef REF - memcpy(Wopts->struct_id, "MQTW", 4); - Wopts->struct_version = 0; -#endif - Wopts->topicName = (mqtt->will_topic ? mqtt->will_topic : mqtt->topic); - Wopts->message = (mqtt->will_message ? mqtt->will_message : ""); - Wopts->retained = (MF_ISSET(WILL_RETAIN) ? 1 : 0); - Wopts->qos = mqtt->will_qos; /* XXX mqtt->qos? */ - - MQTTAsync_connectOptions *Copts = &mqtt->Copts; - memcpy(Copts, &_Copts, sizeof(*Copts)); -#ifdef REF - memcpy(Copts->struct_id, "MQTC", 4); - Copts->struct_version = 3; /* 0-4 enables fields below */ -#endif - Copts->keepAliveInterval = mqtt->keepalive; - Copts->cleansession = (MF_ISSET(CLEAN) ? 1 : 0); - Copts->maxInflight = mqtt->max_inflight; - - Copts->will = Wopts; /* last will */ - Copts->username = (u && u->user ? u->user : mqtt->user);; - Copts->password = (u && u->password ? u->password : mqtt->password); - - Copts->connectTimeout = 30; /* XXX secs. configure?*/ - Copts->retryInterval = 0; /* XXX secs. configure? */ - -#ifdef NOTYET - MQTTAsync_SSLOptions *Sopts = &mqtt->Sopts; - memcpy(Sopts, &_Sopts, sizeof(*Sopts)); - Sopts->trustStore = mqtt->cafile; - Sopts->keyStore = mqtt->cert; - Sopts->privateKey = mqtt->privkey; - Sopts->enabledCipherSuites = mqtt->ciphers; - Sopts->enableServerCertAuth = (MF_ISSET(INSECURE) ? 0 : 1); - Copts->ssl = Sopts; -#else - (void)_Sopts; - Copts->ssl = NULL; + static const MQTTAsync_connectOptions _C = + MQTTAsync_connectOptions_initializer; + MQTTAsync_connectOptions *C = &mqtt->C; + static const MQTTAsync_disconnectOptions _D = + MQTTAsync_disconnectOptions_initializer; + MQTTAsync_disconnectOptions *D = &mqtt->D; + static const MQTTAsync_message _M = + MQTTAsync_message_initializer; + MQTTAsync_message *M = &mqtt->M; + static const MQTTAsync_createOptions _O = + MQTTAsync_createOptions_initializer; + MQTTAsync_createOptions *O = &mqtt->O; + static const MQTTAsync_responseOptions _R = + MQTTAsync_responseOptions_initializer; + MQTTAsync_responseOptions *R = &mqtt->R; + static const MQTTAsync_SSLOptions _S = + MQTTAsync_SSLOptions_initializer; + MQTTAsync_SSLOptions *S = &mqtt->S; + static const MQTTAsync_willOptions _W = + MQTTAsync_willOptions_initializer; + MQTTAsync_willOptions *W = &mqtt->W; + urlinfo u; + + switch (otype) { + case 'C': + u = mqtt->u; +#ifdef DYING +assert(u); #endif + memcpy(C, &_C, sizeof(*C)); + C->keepAliveInterval = mqtt->keepalive; + C->cleansession = (MF_ISSET(CLEAN) ? 1 : 0); + C->maxInflight = mqtt->max_inflight; + + W = AOBJ(mqtt, 'W'); /* XXX LWT */ + C->will = (W && W->topicName ? W : NULL); + + C->username = (u && u->user ? u->user : mqtt->user);; + C->password = (u && u->password ? u->password : mqtt->password); + + C->connectTimeout = 30; /* XXX secs. configure?*/ + C->retryInterval = 0; /* XXX secs. configure? */ + + S = AOBJ(mqtt, 'S'); /* XXX SSL */ + C->ssl = (S && S->keyStore) ? S : NULL; /* XXX */ - Copts->onSuccess = onConnect; - Copts->onFailure = onConnectFailure; - Copts->context = mqtt; + C->onSuccess = onConnect; + C->onFailure = onConnectFailure; + C->context = mqtt; - Copts->serverURIcount = 0; - Copts->serverURIs = NULL; + C->serverURIcount = 0; + C->serverURIs = NULL; - Copts->MQTTVersion = 0; + C->MQTTVersion = 0; if (mqtt->protocol_version) { if (!strcmp(mqtt->protocol_version, "auto")) - Copts->MQTTVersion = 0; + C->MQTTVersion = 0; if (!strcmp(mqtt->protocol_version, "31")) - Copts->MQTTVersion = 3; + C->MQTTVersion = 3; if (!strcmp(mqtt->protocol_version, "311")) - Copts->MQTTVersion = 4; + C->MQTTVersion = 4; } + C->automaticReconnect = 1; + C->minRetryInterval = 1; /* secs */ + C->maxRetryInterval = 60; /* secs */ +mqtt->ut = 0; +mqtt->u = NULL; + ptr = (void *) C; + break; + case 'D': + memcpy(D, &_D, sizeof(*D)); + D->timeout = mqtt->timeout; + D->onSuccess = onDisconnect; + D->onFailure = onDisconnectFailure; + D->context = mqtt; + ptr = (void *) D; + break; + case 'M': + memcpy(M, &_M, sizeof(*M)); + M->payloadlen = 0; + M->payload = ""; + M->qos = mqtt->qos; + M->retained = (MF_ISSET(RETAIN) ? 1 : 0); + M->dup = 0; + M->msgid = 0; + ptr = (void *) M; + break; + case 'O': + memcpy(O, &_O, sizeof(*O)); + O->sendWhileDisconnected = 1; /* XXX = 0 */ + O->maxBufferedMessages = 100; + ptr = (void *) O; + break; + case 'R': + memcpy(R, &_R, sizeof(*R)); + R->onSuccess = NULL; /* XXX multiple uses */ + R->onFailure = NULL; /* XXX multiple uses */ + R->context = mqtt; + R->token = 0; + ptr = (void *) R; + break; + case 'S': + memcpy(S, &_S, sizeof(*S)); + S->trustStore = mqtt->cafile; + S->keyStore = mqtt->cert; + S->privateKey = mqtt->privkey; + S->enabledCipherSuites = mqtt->ciphers; + S->enableServerCertAuth = (MF_ISSET(INSECURE) ? 0 : 1); + ptr = (void *) S; + break; + case 'W': + memcpy(W, &_W, sizeof(*W)); + W->topicName = (mqtt->will_topic ? mqtt->will_topic : mqtt->topic); + W->message = (mqtt->will_message ? mqtt->will_message : ""); + W->retained = (MF_ISSET(WILL_RETAIN) ? 1 : 0); + W->qos = mqtt->will_qos; /* XXX mqtt->qos? */ + ptr = (void *) W; + break; + default: +assert(0); + break; + } +#endif /* WITH_MQTT */ + return ptr; +} +int rpmmqttConnect(rpmmqtt mqtt) +{ + int rc = -1; + +SPEW((stderr, "--> %s(%p)\n", __FUNCTION__, mqtt)); +#ifdef WITH_MQTT + if (!MQTTAsync_isConnected(mqtt->I)) { mqtt->finished = 0; rc = check(mqtt, "connect", - MQTTAsync_connect(mqtt->C, Copts)); - + MQTTAsync_connect(mqtt->I, AOBJ(mqtt, 'C'))); while (!mqtt->finished) usleep(1000); - - } + } else + rc = 0; #endif /* WITH_MQTT */ SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc)); return rc; @@ -801,64 +906,37 @@ { int rc = -1; #ifdef WITH_MQTT - if (MQTTAsync_isConnected(mqtt->C)) { - MQTTAsync_disconnectOptions *Dopts = &mqtt->Dopts; - memcpy(Dopts, &_Dopts, sizeof(*Dopts)); -#ifdef REF - memcpy(Dopts->struct_id, "MQTD", 4); - Dopts->struct_version = 0; -#endif - Dopts->timeout = mqtt->timeout; - Dopts->onSuccess = onDisconnect; - Dopts->onFailure = onDisconnectFailure; - Dopts->context = mqtt; - + if (MQTTAsync_isConnected(mqtt->I)) { mqtt->finished = 0; rc = check(mqtt, "disconnect", - MQTTAsync_disconnect(mqtt->C, Dopts)); + MQTTAsync_disconnect(mqtt->I, AOBJ(mqtt, 'D'))); while (!mqtt->finished) usleep(100); - } + } else + rc = 0; #endif SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc)); return rc; } -int rpmmqttSendmsg(rpmmqtt mqtt, const char * s, size_t ns) +int rpmmqttSendMessage(rpmmqtt mqtt, const char * s, size_t ns) { int rc = -1; if (ns == 0) ns = strlen(s); #ifdef WITH_MQTT - MQTTAsync_message pubmsg = MQTTAsync_message_initializer; -#ifdef REF - memcpy(pubmsg.struct_id, "MQTM", 4); - pubmsg.struct_version = 0; -#endif - pubmsg.payloadlen = ns; - pubmsg.payload = (char *) s; - pubmsg.qos = mqtt->qos; - pubmsg.retained = (MF_ISSET(RETAIN) ? 1 : 0); -#ifdef REF - pubmsg.dup = 0; - pubmsg.msgid = 0; -#endif - - MQTTAsync_responseOptions *Ropts = &mqtt->Ropts; - memcpy(Ropts, &_Ropts, sizeof(*Ropts)); -#ifdef REF - memcpy(Ropts->struct_id, "MQTR", 4); - Ropts->struct_version = 0; -#endif - Ropts->onSuccess = onSend; - Ropts->onFailure = onSendFailure; - Ropts->context = mqtt; - Ropts->token = 0; + MQTTAsync_message *M = AOBJ(mqtt, 'M'); + M->payloadlen = ns; + M->payload = (char *) s; + + MQTTAsync_responseOptions *R = AOBJ(mqtt, 'R'); + R->onSuccess = onSend; + R->onFailure = onSendFailure; mqtt->finished = 0; rc = check(mqtt, "sendMessage", - MQTTAsync_sendMessage(mqtt->C, mqtt->topic, &pubmsg, Ropts)); + MQTTAsync_sendMessage(mqtt->I, mqtt->topic, M, R)); while (!mqtt->finished) usleep(100); #endif /* WITH_MQTT */ @@ -891,19 +969,13 @@ rpmlog(_lvl, "%19s: %s qos(%u)\n", "subtopic", t, subqos[i]); } - MQTTAsync_responseOptions *Ropts = &mqtt->Ropts; - memcpy(Ropts, &_Ropts, sizeof(*Ropts)); -#ifdef REF - memcpy(Ropts->struct_id, "MQTR", 4); - Ropts->struct_version = 0; -#endif - Ropts->onSuccess = onSubscribeMany; - Ropts->onFailure = onSubscribeFailure; - Ropts->context = mqtt; + MQTTAsync_responseOptions *R = AOBJ(mqtt, 'R'); + R->onSuccess = onSubscribeMany; + R->onFailure = onSubscribeFailure; /* XXX */ rc = check(mqtt, "subscribeMany", - MQTTAsync_subscribeMany(&mqtt->C, - ac, av, subqos, Ropts)); + MQTTAsync_subscribeMany(&mqtt->I, + ac, av, subqos, R)); subqos = _free(subqos); #endif /* WITH_MQTT */ @@ -913,35 +985,44 @@ } /*==============================================================*/ -ssize_t rpmmqttWrite(rpmmqtt mqtt, const char *s, size_t ns) +int rpmmqttPub(rpmmqtt mqtt, const char *s, size_t ns) { - ssize_t ret = -1; /* assume failure */ + int ret = -1; /* assume failure */ + + if (ns == 0) ns = strlen(s); #ifdef WITH_MQTT - if (rpmmqttConnect(mqtt) == 0) { - static char _mqtt_prefix[] = "%{?_mqtt_prefix}"; - /* XXX extra space */ + if (rpmmqttConnect(mqtt)) + goto exit; + { static char _mqtt_prefix[] = "%{?_mqtt_prefix}"; + /* XXX extra space: prepend in *sendMessage or *send */ char * t = rpmExpand(_mqtt_prefix, " ", s, NULL); size_t nt = strlen(t); - if (!rpmmqttSendmsg(mqtt, t, nt)) + if (!rpmmqttSendMessage(mqtt, t, nt)) ret = nt; t = _free(t); } + +exit: #endif /* WITH_MQTT */ SPEW((stderr, "<-- %s(%p,%p[%u]) ret %d\n", __FUNCTION__, mqtt, s, (unsigned)ns, (int)ret)); return ret; } -ssize_t rpmmqttRead(rpmmqtt mqtt, const char *s, size_t ns) +int rpmmqttSub(rpmmqtt mqtt, const char *s, size_t ns) { - ssize_t ret = -1; /* assume failure */ + int ret = -1; /* assume failure */ + + if (ns == 0) ns = strlen(s); #ifdef WITH_MQTT - if (rpmmqttConnect(mqtt) == 0) { - char * subtopic = rpmExpand((s ? s : mqtt->topic), NULL); + if (rpmmqttConnect(mqtt)) + goto exit; + + { char * subtopic = rpmExpand((s ? s : mqtt->topic), NULL); unsigned subqos = mqtt->qos; char *t, *te; int _lvl = RPMLOG_DEBUG; @@ -969,19 +1050,13 @@ rpmlog(_lvl, "%19s: %s qos(%u)\n", "subtopic", subtopic, subqos); - MQTTAsync_responseOptions *Ropts = &mqtt->Ropts; - memcpy(Ropts, &_Ropts, sizeof(*Ropts)); -#ifdef REF - memcpy(Ropts->struct_id, "MQTR", 4); - Ropts->struct_version = 0; -#endif - Ropts->onSuccess = onSubscribe; - Ropts->onFailure = onSubscribeFailure; - Ropts->context = mqtt; + MQTTAsync_responseOptions *R = AOBJ(mqtt, 'R'); + R->onSuccess = onSubscribe; + R->onFailure = onSubscribeFailure; mqtt->finished = 0; rc = check(mqtt, "subscribe", - MQTTAsync_subscribe(mqtt->C, subtopic, subqos, Ropts)); + MQTTAsync_subscribe(mqtt->I, subtopic, subqos, R)); while (rc == 0 && !mqtt->finished) usleep(100); @@ -990,6 +1065,8 @@ subtopic = _free(subtopic); } + +exit: #endif /* WITH_MQTT */ SPEW((stderr, "<-- %s(%p,%p[%u]) ret %d\n", __FUNCTION__, mqtt, s, (unsigned)ns, (int)ret)); @@ -1009,6 +1086,8 @@ N_("(sub) Do not print NL after messages."), NULL }, { NULL, 'R', POPT_BIT_SET, &mqtt->flags, MQTT_FLAGS_NOSTALE, N_("(sub) Do not print stale messages."), NULL }, + { "filter-out", 'T', POPT_ARG_ARGV, &mqtt->_filter_out, 0, + N_("(sub) Filter out topics."), NULL }, { NULL, 'A', POPT_ARG_STRING, &mqtt->_address, 0, N_("Connect from local <ADDR>."), N_("<ADDR>") }, @@ -1022,7 +1101,7 @@ N_("Connect to <HOST>."), N_("<HOST>") }, { "id", 'i', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&mqtt->_clientid, 0, N_("MQTT client <ID>."), N_("<ID>") }, - { "prefix", 'I', POPT_ARG_STRING, &mqtt->idprefix, 0, + { "id-prefix", 'I', POPT_ARG_STRING, &mqtt->idprefix, 0, N_("Use <IDPREFIX>-%{pid} for clientid."), N_("<IDPREFIX>") }, { "keepalive", 'k', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, &mqtt->keepalive, 0, N_("Keep alive in <SECS>."), N_("<SECS>") }, @@ -1038,7 +1117,7 @@ N_("Write input mssages to <FILE>."), N_("<FILE>") }, { "port", 'p', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, &mqtt->port, 0, N_("Connect to network <PORT>."), N_("<PORT>") }, - { "pass", 'P', POPT_ARG_STRING, &mqtt->password, 0, + { "pw", 'P', POPT_ARG_STRING, &mqtt->password, 0, N_("Remote user <PASSWORD>."), N_("<PASSWORD>") }, { "qos", 'q', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, &mqtt->qos, 0, N_("MQTT <QOS> level."), N_("<QOS>") }, @@ -1152,7 +1231,7 @@ static const char _mqtt_trace[] = "%{?_mqtt_trace}%{!?_mqtt_trace:0}"; mqtt->trace = rpmExpandNumeric(_mqtt_trace); -#ifdef NOTYET +#ifdef DYING static const char _mqtt_topic[] = "%{?_mqtt_topic}%{!?_mqtt_topic:rpm/%{pid}/mqtt}"; mqtt->topic = _free(mqtt->topic); @@ -1163,6 +1242,8 @@ mqtt->clientid = rpmExpand(_mqtt_clientid, NULL); #endif + /* XXX load mqtt->_topics from _mqtt_subtopics */ + return rc; } @@ -1270,6 +1351,7 @@ mqtt->port = strtoul(u->portstr, NULL, 0); } +mqtt->uri = _free(mqtt->uri); mqtt->uri = rpmExpand("tcp://", u->host, ":", u->portstr, NULL); { const char * topic; @@ -1300,14 +1382,17 @@ mqtt->flags = (flags ? flags : _flags); /* -- Initialize oddball values. */ -#ifdef DYING - mqtt->protocol_version = rpmExpand("31", NULL); -#else + /* XXX FIXME: needed for --help POT_ARGFLAG_DEFAULT spewage only */ +mqtt->protocol_version = _free(mqtt->protocol_version); mqtt->protocol_version = rpmExpand("auto", NULL); +mqtt->will_message = _free(mqtt->will_message); mqtt->will_message = rpmExpand("", NULL); -#endif +mqtt->_tls_version = _free(mqtt->_tls_version); mqtt->_tls_version = rpmExpand("1.2", NULL); + /* XXX FIXME: __progname? */ +mqtt->idprefix = _free(mqtt->idprefix); mqtt->idprefix = rpmExpand("rpm", NULL); +mqtt->_clientid = _free(mqtt->_clientid); mqtt->_clientid = rpmExpand(mqtt->idprefix, "-%%{pid}", NULL); /* -- Initialize values frpm default URI. */ @@ -1351,24 +1436,24 @@ if (mqtt->clientid == NULL) { assert(mqtt->_clientid); mqtt->clientid = rpmExpand(mqtt->_clientid, NULL); -fprintf(stderr, "*** %s: clientid %s\n", __FUNCTION__, mqtt->clientid); } if (mqtt->topic == NULL) { assert(mqtt->idprefix); mqtt->topic = rpmExpand(mqtt->idprefix, "/mqtt", NULL); -fprintf(stderr, "*** %s: topic %s\n", __FUNCTION__, mqtt->topic); } +#ifdef DYING /* -- XXX Add default argv (if not specified) */ if (mqtt->ac == 0) { static const char *_av[] = { - "mqtt://luser:jasnl@localhost:1883/rpm/mqtt?trace=4", + "mqtt://luser:jasnl@localhost:1883/rpm/mqtt", "rpm/#", NULL, }; (void) argvAppend((ARGV_t *)&mqtt->av, _av); mqtt->ac = argvCount((ARGV_t)mqtt->av); } +#endif rc = RPMRC_OK; @@ -1385,7 +1470,7 @@ #ifdef WITH_MQTT (void) rpmmqttDisconnect(mqtt); (void) check(mqtt, "destroy", - (MQTTAsync_destroy(&mqtt->C), 0)); + (MQTTAsync_destroy(&mqtt->I), 0)); #endif /* WITH_MQTT */ /* ========== */ @@ -1396,6 +1481,7 @@ mqtt->msgs = argvFree((ARGV_t)mqtt->msgs); mqtt->password = _free(mqtt->password); mqtt->_topics = argvFree((ARGV_t)mqtt->_topics); + mqtt->_filter_out = argvFree((ARGV_t)mqtt->_filter_out); mqtt->user = _free(mqtt->user); mqtt->protocol_version = _free(mqtt->protocol_version); /* XXX malloc? */ @@ -1412,13 +1498,13 @@ mqtt->_proxy = _free(mqtt->_proxy); /* ========== */ - mqtt->C = NULL; + mqtt->I = NULL; mqtt->uri = _free(mqtt->uri); mqtt->topic = _free(mqtt->topic); mqtt->clientid = _free(mqtt->clientid); mqtt->persist_path = _free(mqtt->persist_path); mqtt->persist_ctx = _free(mqtt->persist_ctx); - mqtt->dn = _free(mqtt->dn); + mqtt->cachedn = _free(mqtt->cachedn); mqtt->ifn = argvFree(mqtt->ifn); if (mqtt->ifd) @@ -1436,15 +1522,21 @@ mqtt->flags = 0; } -RPMIOPOOL_MODULE(mqtt) +RPMIOPOOL_INTERP_MODULE(mqtt) rpmmqtt rpmmqttNew(char ** av, uint32_t flags) { - rpmmqtt mqtt = rpmmqttGetPool(_rpmmqttPool); - int ac = argvCount((ARGV_t)av); + static const char * _av[] = { (char *) "rpmmqtt", NULL }; + rpmmqtt mqtt = (flags & 0x80000000) + ? rpmmqttI() : rpmmqttGetPool(_rpmmqttPool); SPEW((stderr, "--> %s(%p,0x%x)\n", __FUNCTION__, av, flags)); + /* XXX quick-n-dirty recursion avoidance. */ + if (av == NULL) av = (char **) _av; + + int ac = argvCount((ARGV_t)av); + flags &= ~0x80000000; rpmRC rc = rpmmqttInit(mqtt, ac, (const char **)av, flags); (void)rc; @@ -1473,15 +1565,16 @@ oneshot++; } + /* XXX improve integration */ static const char _mqtt_persist[] = - "%{?_mqtt_persist}%{!?_mqtt_persist:0}"; + "%{?_mqtt_persist}%{!?_mqtt_persist:2}"; mqtt->persist_type = (rpmExpandNumeric(_mqtt_persist) % 3); static const char _mqtt_cachedir[] = "%{?_mqtt_cachedir}%{!?_mqtt_cachedir:/var/cache/mqtt}"; char *persist_path = rpmGetPath(_mqtt_cachedir, NULL); rpmlog(_lvl, "%19s: %s\n", "clientid", mqtt->clientid); - rpmlog(_lvl, "%19s: %s qos(%d) timeout(%umsecs)\n", + rpmlog(_lvl, "%19s: %s qos(%d) timeout(%u msecs)\n", "topic", mqtt->topic, mqtt->qos, mqtt->timeout); rpmlog(_lvl, "%19s: type(%u) %s\n", "persist", mqtt->persist_type, @@ -1489,18 +1582,22 @@ rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace); +mqtt->persist_path = _free(mqtt->persist_path); +mqtt->persist_ctx = _free(mqtt->persist_ctx); switch (mqtt->persist_type) { default: case MQTTCLIENT_PERSISTENCE_NONE: mqtt->persist_ctx = NULL; break; case MQTTCLIENT_PERSISTENCE_DEFAULT: - { mqtt->persist_path = xstrdup(persist_path); + { + mqtt->persist_path = xstrdup(persist_path); /* XXX rpmmqttFini double free */ mqtt->persist_ctx = (void *)xstrdup(mqtt->persist_path); } break; case MQTTCLIENT_PERSISTENCE_USER: - { mqtt->persist_path = xstrdup(persist_path); + { + mqtt->persist_path = xstrdup(persist_path); MQTTClient_persistence * ctx = xmalloc(sizeof(*ctx)); *ctx = _rpmmqtt_persistence; /* structure assignment */ ctx->context = mqtt; @@ -1510,8 +1607,10 @@ persist_path = _free(persist_path); /* Prepare for subscription delivery. */ - if (MF_ISSET(BUFFER)) + if (MF_ISSET(BUFFER) && mqtt->iob == NULL) mqtt->iob = rpmiobNew(0); + if (mqtt->ofd) /* XXX memleaks on recursion */ + (void) Fclose(mqtt->ofd); switch (mqtt->msg_output) { default: case MQTT_OUTPUT_UNKNOWN: @@ -1529,25 +1628,29 @@ break; } +mqtt->u = NULL; dumpMQTT(__FUNCTION__, mqtt); - xx = check(mqtt, "create", - MQTTAsync_create(&mqtt->C, mqtt->uri, mqtt->clientid, - mqtt->persist_type, mqtt->persist_ctx)); + if (mqtt->I == NULL) { + xx = check(mqtt, "createWithOptions", + MQTTAsync_createWithOptions(&mqtt->I, + mqtt->uri, mqtt->clientid, + mqtt->persist_type, mqtt->persist_ctx, AOBJ(mqtt, 'O'))); xx = check(mqtt, "setCallbacks", - MQTTAsync_setCallbacks(mqtt->C, mqtt, + MQTTAsync_setCallbacks(mqtt->I, mqtt, onConnectionLost, onMessageArrived, onDeliveryComplete)); /* Subscribe to channels (if any). */ + /* XXX rework using mqtt->_topics instead */ if (mqtt->ac > 1) { #ifdef NOTYET /* XXX segfault here. */ xx = rpmmqttSubscribeMany(mqtt, mqtt->ac-1, mqtt->av+1); #else for (int i = 1; i < mqtt->ac; i++) - xx = rpmmqttRead(mqtt, mqtt->av[i], 0); + xx = rpmmqttSub(mqtt, mqtt->av[i], 0); #endif } @@ -1560,7 +1663,7 @@ if (mqtt->msgs) { int nmsgs = argvCount((ARGV_t)mqtt->msgs); for (int i = 0; i < nmsgs; i++) { - xx = rpmmqttWrite(mqtt, mqtt->msgs[i], 0); + xx = rpmmqttPub(mqtt, mqtt->msgs[i], 0); } } break; @@ -1572,7 +1675,7 @@ const char * ifn = mqtt->ifn[i]; rpmiob iob = NULL; if (rpmiobSlurp(ifn, &iob) == 0) { - xx = rpmmqttWrite(mqtt, + xx = rpmmqttPub(mqtt, rpmiobStr(iob), rpmiobLen(iob)); } iob = rpmiobFree(iob); @@ -1595,7 +1698,7 @@ const char * s = lav[j]; size_t ns = strlen(s); if (ns > 0) /* XXX skip empty lines? */ - xx = rpmmqttWrite(mqtt, s, ns); + xx = rpmmqttPub(mqtt, s, ns); } lav = argvFree(lav); } @@ -1604,8 +1707,37 @@ } break; } + } } #endif /* WITH_MQTT */ return rpmmqttLink(mqtt); } + +rpmRC rpmmqttRun(rpmmqtt mqtt, const char * str, const char ** resultp) +{ + const char *msg = NULL; + rpmRC rc = RPMRC_FAIL; + +SPEW((stderr, "==> %s(%p,\"%s\",%p)\n", __FUNCTION__, mqtt, str, resultp)); + + if (mqtt == NULL) mqtt = rpmmqttI(); + + msg = rpmExpand("%now", " ", str, NULL); +#if defined(WITH_MQTT) + if (rpmmqttConnect(mqtt)) + goto exit; + if (msg != NULL && !rpmmqttSendMessage(mqtt, msg, strlen(msg))) { + rc = RPMRC_OK; + if (resultp) { + *resultp = (mqtt->iob ? rpmiobStr(mqtt->iob) : ""); + } + } + +exit: +#endif + + msg = _free(msg); +SPEW((stderr, "<== %s(%p,\"%s\",%p) rc %d\n", __FUNCTION__, mqtt, str, resultp, rc)); + return rc; +} @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.h ============================================================================ $ cvs diff -u -r1.1.2.11 -r1.1.2.12 rpmmqtt.h --- rpm/rpmio/rpmmqtt.h 3 Jul 2016 08:41:57 -0000 1.1.2.11 +++ rpm/rpmio/rpmmqtt.h 4 Jul 2016 07:45:23 -0000 1.1.2.12 @@ -68,6 +68,7 @@ int port; /*!< -p */ const char *password; /*!< -P */ const char **_topics; /*!< -t */ + const char **_filter_out; /*!< -T */ const char *user; /*!< -u */ const char *protocol_version; /*!< -V */ @@ -85,12 +86,13 @@ const char *_psk_key; /*!< --psk */ const char *_psk_identity; /*!< --psk-identity */ const char *_proxy; /*!< --proxy */ + /* ========== */ urltype ut; /* "tcp://localhost:1833" */ urlinfo u; - void * C; /* MQTTClient */ + void * I; /* MQTTClient */ const char * uri; const char * topic; @@ -116,7 +118,7 @@ int MQTTVersion; int sessionPresent; - const char *dn; + const char *cachedn; FD_t ifd; const char **ifn; /*!< -f */ @@ -124,11 +126,13 @@ const char *ofn; /*!< -o */ rpmiob iob; - MQTTAsync_connectOptions Copts; - MQTTAsync_willOptions Wopts; - MQTTAsync_SSLOptions Sopts; - MQTTAsync_disconnectOptions Dopts; - MQTTAsync_responseOptions Ropts; + MQTTAsync_connectOptions C; + MQTTAsync_willOptions W; + MQTTAsync_SSLOptions S; + MQTTAsync_disconnectOptions D; + MQTTAsync_responseOptions R; + MQTTAsync_createOptions O; + MQTTAsync_message M; }; #endif /* _RPMMQTT_INTERNAL */ @@ -176,9 +180,13 @@ int rpmmqttDisconnect(rpmmqtt mqtt); -ssize_t rpmmqttWrite(rpmmqtt mqtt, const char *s, size_t ns); +int rpmmqttSendMessage(rpmmqtt mqtt, const char *s, size_t ns); + +int rpmmqttPub(rpmmqtt mqtt, const char *s, size_t ns); + +int rpmmqttSub(rpmmqtt mqtt, const char *s, size_t ns); -ssize_t rpmmqttRead(rpmmqtt mqtt, const char *s, size_t ns); +rpmRC rpmmqttRun(rpmmqtt mqtt, const char * str, const char ** resultp); #ifdef __cplusplus } @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/tmqtt.c ============================================================================ $ cvs diff -u -r1.1.2.8 -r1.1.2.9 tmqtt.c --- rpm/rpmio/tmqtt.c 3 Jul 2016 08:41:57 -0000 1.1.2.8 +++ rpm/rpmio/tmqtt.c 4 Jul 2016 07:45:23 -0000 1.1.2.9 @@ -29,17 +29,17 @@ int rc = 0; #ifdef DYING - (void) rpmmqttRead(mqtt, "rpm/#?qos=0", 0); - (void) rpmmqttRead(mqtt, "$SYS/broker/version?qos=0", 0); + (void) rpmmqttSub(mqtt, "rpm/#?qos=0", 0); + (void) rpmmqttSub(mqtt, "$SYS/broker/version?qos=0", 0); #endif - nw = rpmmqttWrite(mqtt, "bzzt ...", 0); - nw = rpmmqttWrite(mqtt, "bzzT ...", 0); - nw = rpmmqttWrite(mqtt, "bzZT ...", 0); - nw = rpmmqttWrite(mqtt, "bZZT ...", 0); - nw = rpmmqttWrite(mqtt, "BZZT ...", 0); + nw = rpmmqttPub(mqtt, "bzzt ...", 0); + nw = rpmmqttPub(mqtt, "bzzT ...", 0); + nw = rpmmqttPub(mqtt, "bzZT ...", 0); + nw = rpmmqttPub(mqtt, "bZZT ...", 0); + nw = rpmmqttPub(mqtt, "BZZT ...", 0); (void) rpmmqttDisconnect(mqtt); - nw = rpmmqttWrite(mqtt, "SWAT !!!", 0); + nw = rpmmqttPub(mqtt, "SWAT !!!", 0); (void) rpmmqttDisconnect(mqtt); (void) rpmmqttConnect(mqtt); @@ -73,9 +73,14 @@ int rc = -1; /* Initialize the _mqtt_ macro context */ -(void) rpmDefineMacro(NULL, "_mqtt_trace 4", 0); +(void) rpmDefineMacro(NULL, "_mqtt_persist 2", 0); (void) rpmDefineMacro(NULL, "_mqtt_cachedir /var/cache/mqtt", 0); + +(void) rpmDefineMacro(NULL, "_mqtt_prefix %{now} %{nil}", 0); + +#ifdef DYING +(void) rpmDefineMacro(NULL, "_mqtt_trace 4", 0); (void) rpmDefineMacro(NULL, "_mqtt_user luser", 0); (void) rpmDefineMacro(NULL, "_mqtt_pass jasnl", 0); (void) rpmDefineMacro(NULL, "_mqtt_host localhost", 0); @@ -83,9 +88,9 @@ (void) rpmDefineMacro(NULL, "_mqtt_clientid rpm-%{pid}", 0); (void) rpmDefineMacro(NULL, "_mqtt_topic rpm/#", 0); (void) rpmDefineMacro(NULL, "_mqtt_qos 1", 0); -(void) rpmDefineMacro(NULL, "_mqtt_persist 2", 0); (void) rpmDefineMacro(NULL, "_mqtt_timeout 10000", 0); (void) rpmDefineMacro(NULL, "_mqtt_prefix %{now} rpm pid %{pid} on cpu%{cpu} %{user}:%{group} ", 0); +#endif mqtt = rpmmqttNew(argv, 0); rc = _DoMQTT(mqtt); @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository rpm-cvs@rpm5.org