RPM Package Manager, CVS Repository
  http://rpm5.org/cvs/
  ____________________________________________________________________________

  Server: rpm5.org                         Name:   Jeff Johnson
  Root:   /v/rpm/cvs                       Email:  j...@rpm5.org
  Module: rpm                              Date:   04-Jul-2016 09:45:24
  Branch: rpm-5_4                          Handle: 2016070407452301

  Modified files:           (Branch: rpm-5_4)
    rpm                     CHANGES
    rpm/macros              mqtt.in
    rpm/rpmio               librpmio.vers macro.c poptIO.c rpmio.c rpmmqtt.c
                            rpmmqtt.h tmqtt.c

  Log:
    - mqtt: stub-in a %{mqtt:...} mcro embedding.

  Summary:
    Revision    Changes     Path
    1.3501.2.510+1  -0      rpm/CHANGES
    1.1.2.3     +28 -11     rpm/macros/mqtt.in
    2.199.2.59  +5  -2      rpm/rpmio/librpmio.vers
    2.249.2.36  +16 -8      rpm/rpmio/macro.c
    1.94.2.22   +3  -0      rpm/rpmio/poptIO.c
    1.230.2.34  +2  -1      rpm/rpmio/rpmio.c
    1.1.2.13    +409 -277   rpm/rpmio/rpmmqtt.c
    1.1.2.12    +17 -9      rpm/rpmio/rpmmqtt.h
    1.1.2.9     +15 -10     rpm/rpmio/tmqtt.c
  ____________________________________________________________________________

  patch -p0 <<'@@ .'
  Index: rpm/CHANGES
  ============================================================================
  $ cvs diff -u -r1.3501.2.509 -r1.3501.2.510 CHANGES
  --- rpm/CHANGES       29 Jun 2016 12:17:57 -0000      1.3501.2.509
  +++ rpm/CHANGES       4 Jul 2016 07:45:23 -0000       1.3501.2.510
  @@ -1,4 +1,5 @@
   5.4.17 -> 5.4.18:
  +    - jbj: mqtt: stub-in a %{mqtt:...} mcro embedding.
       - jbj: mqtt: add self-subscriptions and macro configgery.
       - jbj: macro: add primitives useful for log spewage.
       - jbj: mqtt: stub-in a paho-mqtt client.
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/macros/mqtt.in
  ============================================================================
  $ cvs diff -u -r1.1.2.2 -r1.1.2.3 mqtt.in
  --- rpm/macros/mqtt.in        30 Jun 2016 16:59:39 -0000      1.1.2.2
  +++ rpm/macros/mqtt.in        4 Jul 2016 07:45:24 -0000       1.1.2.3
  @@ -1,14 +1,31 @@
   
#==============================================================================
   # ---- MQTT configuration.
  -#
  -%_mqtt_cachedir      /var/cache/mqtt
  -%_mqtt_user  luser
  -%_mqtt_pass  jasnl
  -%_mqtt_host  localhost
  -%_mqtt_port  1883
  -%_mqtt_clientid      rpm
  -%_mqtt_topic rpm/#
  -%_mqtt_qos   1
  +#%_mqtt_scheme       mqtt
  +#%_mqtt_user luser
  +#%_mqtt_pass jasnl
  +#%_mqtt_host localhost
  +#%_mqtt_port 1883
  +#%_mqtt_clientid     rpm
  +#%_mqtt_topic        rpm/#
  +#%_mqtt_qos  1
  +
  +# Default publish/subscribe topics
  +#%_mqtt_u    %{?_mqtt_user:%{_mqtt_user}%{?_mqtt_pass::%{_mqtt_pass}}}
  +#%_mqtt_h    %{?_mqtt_host:%{_mqtt_host}%{?_mqtt_port::%{_mqtt_port}}}
  +#%_mqtt_argv \
  +#    mqtt://luser:jasnl@localhost:1883/rpm/mqtt?qos=1,timeout=10000 \
  +#%{nil}
  +
  +# Timeout in msecs
  +#%_mqtt_timeout      10000
  +
  +# Default connection template(s)
  +%_mqtt_uri
  +     mqtt://luser:jasnl@localhost:1883/rpm/mqtt?qos=2,timeout=10000 \
  +
  +# Client persistence store
   %_mqtt_persist       2
  -%_mqtt_timeout       10000
  -%_mqtt_prefix        %{now} rpm pid %{pid} on cpu%{cpu} %{user}:%{group} 
%{nil}
  +%_mqtt_cachedir      /var/cache/mqtt
  +
  +# Prepended to msgs
  +%_mqtt_prefix        %{now} %{nil}
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/librpmio.vers
  ============================================================================
  $ cvs diff -u -r2.199.2.58 -r2.199.2.59 librpmio.vers
  --- rpm/rpmio/librpmio.vers   27 Jun 2016 22:00:14 -0000      2.199.2.58
  +++ rpm/rpmio/librpmio.vers   4 Jul 2016 07:45:23 -0000       2.199.2.59
  @@ -635,11 +635,14 @@
       rpmmgFile;
       rpmmgBuffer;
       _rpmmqtt_debug;
  +    _rpmmqttI;
  +    _rpmmqttPool;
       rpmmqttConnect;
       rpmmqttDisconnect;
       rpmmqttNew;
  -    rpmmqttWrite;
  -    rpmmqttRead;
  +    rpmmqttPub;
  +    rpmmqttRun;
  +    rpmmqttSub;
       _rpmmrb_debug;
       _rpmmrbI;
       _rpmmrbPool;
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/macro.c
  ============================================================================
  $ cvs diff -u -r2.249.2.35 -r2.249.2.36 macro.c
  --- rpm/rpmio/macro.c 29 Jun 2016 14:03:27 -0000      2.249.2.35
  +++ rpm/rpmio/macro.c 4 Jul 2016 07:45:23 -0000       2.249.2.36
  @@ -89,6 +89,7 @@
   
   #include <rpmjni.h>
   #include <rpmjs.h>
  +#include <rpmmqtt.h>
   #include <rpmmrb.h>
   #include <rpmperl.h>
   #include <rpmpython.h>
  @@ -303,7 +304,7 @@
   
       if (mc == NULL) mc = rpmGlobalMacroContext;
       if (fp == NULL) fp = stderr;
  -    
  +
       fprintf(fp, "========================\n");
       if (mc->macroTable != NULL) {
        int i;
  @@ -363,7 +364,7 @@
       }
       av[ac] = NULL;
       *avp = av = (const char **) xrealloc(av, (ac+1) * sizeof(*av));
  -    
  +
       return ac;
   }
   #endif
  @@ -393,7 +394,7 @@
        t[namelen] = '\0';
        name = t;
       }
  -    
  +
       key = (MacroEntry) memset(alloca(sizeof(*key)), 0, sizeof(*key));
       /*@-temptrans -assignexpose@*/
       key->name = (char *)name;
  @@ -1070,7 +1071,7 @@
       b = be = stpcpy(buf, me->name);
   
       addMacro(mb->mc, "0", NULL, buf, mb->depth);
  -    
  +
       argc = 1;        /* XXX count argv[0] */
   
       /* Copy args into buf until lastc */
  @@ -1189,7 +1190,7 @@
       if (argv != NULL)
       for (c = 0; argv[c] != NULL; c++)
        argc++;
  -    
  +
       /* Add arg count as macro. */
       sprintf(aname, "%d", argc);
       addMacro(mb->mc, "#", NULL, aname, mb->depth);
  @@ -2292,6 +2293,13 @@
        }
   #endif
   
  +#ifdef       WITH_MQTT
  +     if (STREQ("mqtt", f, fn)) {
  +         RPMIOPOOL_INTERP_EXPAND(mqtt)
  +         continue;
  +     }
  +#endif
  +
   #ifdef       WITH_MRBEMBED
        if (STREQ("mrb", f, fn) || STREQ("mruby", f, fn)) {
            RPMIOPOOL_INTERP_EXPAND(mrb)
  @@ -3224,7 +3232,7 @@
   void
   rpmFreeMacros(MacroContext mc)
   {
  -    
  +
       if (mc == NULL) mc = rpmGlobalMacroContext;
   
       if (mc->macroTable != NULL) {
  @@ -3405,7 +3413,7 @@
       (void) expandMacros(NULL, mc, t, tn + bufn + 1);
       t[tn + bufn] = '\0';
       t = (char *) xrealloc(t, strlen(t) + 1);
  -    
  +
       return t;
   }
   
  @@ -3441,7 +3449,7 @@
       (void) expandMacros(NULL, mc, t, tn + bufn + 1);
       t[tn + bufn] = '\0';
       t = (char *) xrealloc(t, strlen(t) + 1);
  -    
  +
       return t;
   }
   /*@=modfilesys@*/
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/poptIO.c
  ============================================================================
  $ cvs diff -u -r1.94.2.21 -r1.94.2.22 poptIO.c
  --- rpm/rpmio/poptIO.c        25 Jun 2016 22:36:54 -0000      1.94.2.21
  +++ rpm/rpmio/poptIO.c        4 Jul 2016 07:45:23 -0000       1.94.2.22
  @@ -94,6 +94,7 @@
   extern int _rpmio_debug;
   extern int _rpmiob_debug;
   extern int _rpmlua_debug;
  +extern int _rpmmqtt_debug;
   extern int _rpmsq_debug;
   extern int _rpmzq_debug;
   extern int _tar_debug;
  @@ -650,6 +651,8 @@
   #endif
    { "rpmmgdebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &_rpmmg_debug, 
-1,
        N_("Debug rpmmg magic"), NULL},
  + { "rpmmqttdebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, 
&_rpmmqtt_debug, -1,
  +     N_("Debug rpmmqtt magic"), NULL},
    { "rpmmrbdebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, 
&_rpmmrb_debug, -1,
        N_("Debug embedded MRuby interpreter"), NULL},
    { "rpmgfsdebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, 
&_rpmgfs_debug, -1,
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmio.c
  ============================================================================
  $ cvs diff -u -r1.230.2.33 -r1.230.2.34 rpmio.c
  --- rpm/rpmio/rpmio.c 28 Jun 2016 07:18:12 -0000      1.230.2.33
  +++ rpm/rpmio/rpmio.c 4 Jul 2016 07:45:23 -0000       1.230.2.34
  @@ -75,6 +75,7 @@
   #include <rpmjni.h>
   #include <rpmjs.h>
   #include <rpmlua.h>          /* XXX rpmioClean() calls rpmluaFree() */
  +#include <rpmmqtt.h>
   #include <rpmmrb.h>
   #include <rpmnix.h>
   #include <rpmodbc.h>
  @@ -3317,6 +3318,7 @@
   
       RPMIOPOOL_INTERP_FREE(aug)
       RPMIOPOOL_INTERP_FREE(gfs)
  +    RPMIOPOOL_INTERP_FREE(mqtt)
   
   #ifdef       NOTYET
       RPMIOPOOL_FREE(mgo)
  @@ -3328,7 +3330,6 @@
       RPMIOPOOL_FREE(cudf)
       RPMIOPOOL_FREE(cvs)
       RPMIOPOOL_FREE(date)
  -    RPMIOPOOL_FREE(mqtt)
   
       _odbcPool = rpmioFreePool(_odbcPool);
       RPMIOPOOL_FREE(sed)
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmqtt.c
  ============================================================================
  $ cvs diff -u -r1.1.2.12 -r1.1.2.13 rpmmqtt.c
  --- rpm/rpmio/rpmmqtt.c       3 Jul 2016 08:41:57 -0000       1.1.2.12
  +++ rpm/rpmio/rpmmqtt.c       4 Jul 2016 07:45:23 -0000       1.1.2.13
  @@ -58,8 +58,8 @@
        fprintf(fp, "fragment: %s\n", u->fragment);
        if (u->query) {
            ARGV_t av = NULL;
  -         int xx;
  -         xx = argvSplit(&av, u->query, ",");
  +         int xx = argvSplit(&av, u->query, ",");
  +         (void)xx;
            argvPrint(u->query, av, fp);
            av = argvFree(av);
        }
  @@ -77,79 +77,131 @@
       if (mqtt) {
   
   #define      PRINT(_fmt, _foo) fprintf(fp, "%19s: %"#_fmt"\n", #_foo, 
mqtt->_foo)
  -     PRINT(x, flags);
  -     PRINT(d, msg_input);
  -     PRINT(d, msg_output);
  -     argvPrint("mqtt->av", (ARGV_t)mqtt->av, fp);
  -
  -     PRINT(s, _address);
  -     argvPrint("mqtt->msgs", (ARGV_t)mqtt->msgs, fp);
  -
  -     PRINT(s, cafile);
  -     PRINT(s, _capath);
  -     PRINT(s, cert);
  -     PRINT(s, privkey);
  -     PRINT(s, ciphers);
  -
  -     PRINT(s, _tls_version);
  -     PRINT(s, _psk_key);
  -     PRINT(s, _psk_identity);
  -     PRINT(s, _proxy);
  -
  -fprintf(stderr, "====================\n");
  -     PRINT(d, keepalive);
  -     PRINT(d, max_inflight);
  -     PRINT(s, protocol_version);
  -
  -     PRINT(s, will_message);
  -     PRINT(d, will_qos);
  -     PRINT(s, will_topic);
  -
  -     PRINT(s, user);
  -     PRINT(s, password);
  -     PRINT(s, host);
  -     PRINT(d, port);
  -     PRINT(s, uri);
  -
  -     PRINT(s, idprefix);
  -     PRINT(s, _clientid);
  -     PRINT(s, clientid);
  -
  -     PRINT(u, qos);
  -     PRINT(u, timeout);
  +     if (mqtt->flags != 0x40000003)
  +         PRINT(x, flags);
  +     if (mqtt->msg_input != 0)
  +         PRINT(d, msg_input);
  +     if (mqtt->msg_output != 1)
  +         PRINT(d, msg_output);
  +     if (mqtt->av)
  +         argvPrint("mqtt->av", (ARGV_t)mqtt->av, fp);
  +
  +     if (mqtt->_address)
  +         PRINT(s, _address);
  +     if (mqtt->msgs)
  +         argvPrint("mqtt->msgs", (ARGV_t)mqtt->msgs, fp);
  +
  +     if (mqtt->cafile)
  +         PRINT(s, cafile);
  +     if (mqtt->_capath)
  +         PRINT(s, _capath);
  +     if (mqtt->cert)
  +         PRINT(s, cert);
  +     if (mqtt->privkey)
  +         PRINT(s, privkey);
  +     if (mqtt->ciphers)
  +         PRINT(s, ciphers);
  +
  +     if (mqtt->_tls_version == NULL || strcmp(mqtt->_tls_version, "1.2"))
  +         PRINT(s, _tls_version);
  +     if (mqtt->_psk_key)
  +         PRINT(s, _psk_key);
  +     if (mqtt->_psk_identity)
  +         PRINT(s, _psk_identity);
  +     if (mqtt->_proxy)
  +         PRINT(s, _proxy);
  +
  +     if (mqtt->keepalive != 60)
  +         PRINT(d, keepalive);
  +     if (mqtt->max_inflight != 20)
  +         PRINT(d, max_inflight);
  +     if (mqtt->protocol_version == NULL
  +      || strcmp(mqtt->protocol_version, "auto"))
  +         PRINT(s, protocol_version);
  +
  +     if (mqtt->will_topic) {
  +         PRINT(s, will_message);
  +         PRINT(d, will_qos);
  +         PRINT(s, will_topic);
  +     }
  +
  +     if (mqtt->user)
  +         PRINT(s, user);
  +     if (mqtt->password)
  +         PRINT(s, password);
  +     if (mqtt->host == NULL || strcmp(mqtt->host, "localhost"))
  +         PRINT(s, host);
  +     if (mqtt->port != 1883)
  +         PRINT(d, port);
  +     if (mqtt->uri == NULL || strcmp(mqtt->uri, "tcp://localhost:1883"))
  +         PRINT(s, uri);
  +
  +     if (mqtt->idprefix == NULL || strcmp(mqtt->idprefix, "rpm"))
  +         PRINT(s, idprefix);
  +     if (mqtt->_clientid == NULL || strcmp(mqtt->_clientid, "rpm-%{pid}"))
  +         PRINT(s, _clientid);
  +     if (mqtt->clientid == NULL || strncmp(mqtt->_clientid, "rpm-", 4))
  +         PRINT(s, clientid);
  +
  +     if (mqtt->qos != 2)
  +         PRINT(u, qos);
  +     if (mqtt->timeout != 10000)
  +         PRINT(u, timeout);
   
        if (mqtt->_topics)
            argvPrint("mqtt->_topics", (ARGV_t)mqtt->_topics, fp);
  -     PRINT(s, topic);
  -
  -fprintf(stderr, "====================\n");
  -     PRINT(d, ut);
  -     dumpU("mqtt->u", mqtt->u);
  -     PRINT(p, C);
  -     PRINT(u, persist_type);
  -     PRINT(p, persist_ctx);
  -     PRINT(s, persist_path);
  -
  -     PRINT(d, debug);
  -     PRINT(d, trace);
  -     PRINT(d, finished);
  -     PRINT(u, token);
  -
  -     PRINT(d, msg_count);
  -     PRINT(d, max_msg_count);
  -
  -     PRINT(s, serverURI);
  -     PRINT(d, MQTTVersion);
  -     PRINT(d, sessionPresent);
  +     if (mqtt->_filter_out)
  +         argvPrint("mqtt->_filter_out", (ARGV_t)mqtt->_filter_out, fp);
  +     if (mqtt->topic == NULL || strcmp(mqtt->topic, "rpm/mqtt"))
  +         PRINT(s, topic);
  +
  +     if (mqtt->u) {
  +         PRINT(d, ut);
  +         dumpU("mqtt->u", mqtt->u);
  +     }
  +
  +     if (mqtt->I)
  +         PRINT(p, I);
  +     if (mqtt->persist_type != 2) {
  +         PRINT(u, persist_type);
  +         PRINT(p, persist_ctx);
  +         PRINT(s, persist_path);
  +     }
  +
  +     if (mqtt->debug)
  +             PRINT(d, debug);
  +     if (mqtt->trace)
  +             PRINT(d, trace);
  +     if (mqtt->finished)
  +             PRINT(d, finished);
  +     if (mqtt->token)
  +             PRINT(u, token);
  +
  +     if (mqtt->msg_count) {
  +         PRINT(d, msg_count);
  +         PRINT(d, max_msg_count);
  +     }
  +
  +     if (mqtt->serverURI) {
  +         PRINT(s, serverURI);
  +         PRINT(d, MQTTVersion);
  +         PRINT(d, sessionPresent);
  +     }
   
  -     PRINT(s, dn);
  +     if (mqtt->cachedn)
  +         PRINT(s, cachedn);
   
        if (mqtt->ifn)
            argvPrint("mqtt->ifn", (ARGV_t)mqtt->ifn, fp);
  -     PRINT(p, ifd);
  -     PRINT(s, ofn);
  -     PRINT(p, ofd);
  -     PRINT(p, iob);
  +     if (mqtt->ifd)
  +         PRINT(p, ifd);
  +     if (mqtt->ofn == NULL || strcmp(mqtt->ofn, "-")) {
  +         PRINT(s, ofn);
  +         if (mqtt->ofd)
  +             PRINT(p, ofd);
  +     }
  +     if (mqtt->iob)
  +         PRINT(p, iob);
   
   #undef       PRINT
       }
  @@ -175,11 +227,14 @@
       _ENTRY(BAD_STRUCTURE),
       _ENTRY(BAD_QOS),
       _ENTRY(NO_MORE_MSGIDS),
  +    _ENTRY(OPERATION_INCOMPLETE),
  +    _ENTRY(MAX_BUFFERED_MESSAGES),
   #else
       { 0, NULL },
   #endif
   };
   static size_t rpmmqtt_nerrs = sizeof(rpmmqtt_errs) / sizeof(rpmmqtt_errs[0]);
  +#undef       _ENTRY
   
   static const char * rpmmqttStrerror(int v)
   {
  @@ -255,6 +310,12 @@
        }
        break;
       }
  +#ifdef       NEW
  +    message->qos
  +    message->retained
  +    message->dup
  +    message->msgid
  +#endif
   
       MQTTAsync_freeMessage(&message);
       MQTTAsync_free(topic);
  @@ -289,15 +350,7 @@
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
   fprintf(stderr, "<-- %s(%p,%p) MQTT disconnect failed\n", __FUNCTION__, 
_mqtt, response);
  -    if (response) {
  -     const char *s = response->message;
  -     int token = response->token;
  -     int code = response->code;
  -     rpmlog(RPMLOG_WARNING,
  -             "MQTT disconnect failed: code(%d) msg %s\n",
  -             token, code, s);
  -    } else
  -     rpmlog(RPMLOG_WARNING, "MQTT disconnect failed\n");
  +    rpmlog(RPMLOG_WARNING, "MQTT disconnect failed\n");
       mqtt->finished = 1;
   }
   
  @@ -317,14 +370,7 @@
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
   fprintf(stderr, "<-- %s(%p,%p) MQTT connect failed\n", __FUNCTION__, _mqtt, 
response);
  -    if (response) {
  -     const char *s = response->message;
  -     int token = response->token;
  -     int code = response->code;
  -     rpmlog(RPMLOG_WARNING, "MQTT    connect failed: token(%d) code(%d) msg 
%s\n",
  -                     token, code, s);
  -    } else
  -     rpmlog(RPMLOG_WARNING, "MQTT    connect failed\n");
  +    rpmlog(RPMLOG_WARNING, "MQTT    connect failed\n");
       mqtt->finished = 1;
   }
   
  @@ -333,6 +379,7 @@
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
   
       if (response) {
  +     mqtt->token = response->token;
        mqtt->serverURI = xstrdup(response->alt.connect.serverURI);
        mqtt->MQTTVersion = response->alt.connect.MQTTVersion;
        mqtt->sessionPresent = response->alt.connect.sessionPresent;
  @@ -429,21 +476,21 @@
   
       *_mqttp = _mqtt;
   
  -    mqtt->dn = _free(mqtt->dn);
  +    mqtt->cachedn = _free(mqtt->cachedn);
       dn = rpmGetPath(mqtt->persist_path, "/", clientID, "-", serverURI, NULL);
       for (te = dn; (te = strchr(te, ':')) != NULL; te++)
        *te = '-';
   
  -    if (rpmioMkpath(dn, (mode_t)0775, (uid_t)-1, (gid_t)-1)) {
  +    if (rpmioMkpath(dn, (mode_t)0740, (uid_t)-1, (gid_t)-1)) {
        dn = _free(dn);
        goto exit;
       }
  -    mqtt->dn = dn;
  +    mqtt->cachedn = dn;
   
       rc = 0;
   
   exit:
  -SPEW((stderr, "<-- %s(%p,\"%s\",\"%s\",%p) rc %d dn %s\n", __FUNCTION__, 
_mqttp, clientID, serverURI, _mqtt, rc, mqtt->dn));
  +SPEW((stderr, "<-- %s(%p,\"%s\",\"%s\",%p) rc %d dn %s\n", __FUNCTION__, 
_mqttp, clientID, serverURI, _mqtt, rc, mqtt->cachedn));
   
       return rc;
   }
  @@ -454,13 +501,13 @@
       int rc = MQTTCLIENT_PERSISTENCE_ERROR;
   
   
  -    if (mqtt->dn == NULL)
  +    if (mqtt->cachedn == NULL)
        goto exit;
   
  -    if (Rmdir(mqtt->dn) && (errno != ENOENT && errno != ENOTEMPTY))
  +    if (Rmdir(mqtt->cachedn) && (errno != ENOENT && errno != ENOTEMPTY))
        goto exit;
   
  -    rpmlog(RPMLOG_DEBUG, D_("removed directory %s\n"), mqtt->dn);
  +    rpmlog(RPMLOG_DEBUG, D_("removed directory %s\n"), mqtt->cachedn);
   
       rc = 0;
   
  @@ -479,11 +526,11 @@
       size_t nw = 0;
       int rc = MQTTCLIENT_PERSISTENCE_ERROR;
   
  -    if (mqtt->dn == NULL)
  +    if (mqtt->cachedn == NULL)
        goto exit;
   
       /* XXX add .msg extension? */
  -    fn = rpmGetPath(mqtt->dn, "/", key, NULL);
  +    fn = rpmGetPath(mqtt->cachedn, "/", key, NULL);
       fd = Fopen(fn, "wb");
       if (fd == NULL || Ferror(fd))
        goto exit;
  @@ -525,11 +572,11 @@
       struct stat sb;
       int rc = MQTTCLIENT_PERSISTENCE_ERROR;
   
  -    if (mqtt->dn == NULL)
  +    if (mqtt->cachedn == NULL)
        goto exit;
   
       /* XXX add .msg extension? */
  -    fn = rpmGetPath(mqtt->dn, "/", key, NULL);
  +    fn = rpmGetPath(mqtt->cachedn, "/", key, NULL);
       fd = Fopen(fn, "rb");
       if (fd == NULL || Ferror(fd))
        goto exit;
  @@ -566,11 +613,11 @@
       char *fn = NULL;
       int rc = MQTTCLIENT_PERSISTENCE_ERROR;
   
  -    if (mqtt->dn == NULL)
  +    if (mqtt->cachedn == NULL)
        goto exit;
   
       /* XXX add .msg extension? */
  -    fn = rpmGetPath(mqtt->dn, "/", key, NULL);
  +    fn = rpmGetPath(mqtt->cachedn, "/", key, NULL);
   
       if (Unlink(fn) && errno != ENOENT)
        goto exit;
  @@ -591,14 +638,14 @@
       struct dirent *dp;
       int rc = MQTTCLIENT_PERSISTENCE_ERROR;
   
  -    if (mqtt->dn == NULL)
  +    if (mqtt->cachedn == NULL)
        goto exit;
   
  -    if ((dir = Opendir(mqtt->dn)) == NULL)
  +    if ((dir = Opendir(mqtt->cachedn)) == NULL)
        goto exit;
   
       while ((dp = Readdir(dir)) != NULL) {
  -     char * fn = rpmGetPath(mqtt->dn, "/", dp->d_name, NULL);
  +     char * fn = rpmGetPath(mqtt->cachedn, "/", dp->d_name, NULL);
        struct stat sb;
   
        if (Lstat(fn, &sb) == 0 && S_ISREG(sb.st_mode))
  @@ -628,14 +675,14 @@
       int nerrs = 0;
       int rc = MQTTCLIENT_PERSISTENCE_ERROR;
   
  -    if (mqtt->dn == NULL)
  +    if (mqtt->cachedn == NULL)
        goto exit;
   
  -    if ((dir = Opendir(mqtt->dn)) == NULL)
  +    if ((dir = Opendir(mqtt->cachedn)) == NULL)
        goto exit;
   
       while ((dp = Readdir(dir)) != NULL) {
  -     char * fn = rpmGetPath(mqtt->dn, "/", dp->d_name, NULL);
  +     char * fn = rpmGetPath(mqtt->cachedn, "/", dp->d_name, NULL);
        struct stat sb;
   
        if (Lstat(fn, &sb) == 0 && S_ISREG(sb.st_mode)) {
  @@ -662,14 +709,14 @@
       struct dirent *dp;
       int rc = MQTTCLIENT_PERSISTENCE_ERROR;
   
  -    if (mqtt->dn == NULL)
  +    if (mqtt->cachedn == NULL)
        goto exit;
   
  -    if ((dir = Opendir(mqtt->dn)) == NULL)
  +    if ((dir = Opendir(mqtt->cachedn)) == NULL)
        goto exit;
   
       while ((dp = Readdir(dir)) != NULL) {
  -     char * fn = rpmGetPath(mqtt->dn, "/", dp->d_name, NULL);
  +     char * fn = rpmGetPath(mqtt->cachedn, "/", dp->d_name, NULL);
        struct stat sb;
   
        if (Lstat(fn, &sb) == 0 && S_ISREG(sb.st_mode)) {
  @@ -705,93 +752,151 @@
   #endif       /* WITH_MQTT */
   
   /*==============================================================*/
  -static const MQTTAsync_willOptions _Wopts
  -             = MQTTAsync_willOptions_initializer;
  -static const MQTTAsync_connectOptions _Copts
  -             = MQTTAsync_connectOptions_initializer;
  -static const MQTTAsync_SSLOptions _Sopts
  -             = MQTTAsync_SSLOptions_initializer;
  -static const MQTTAsync_disconnectOptions _Dopts =
  -             MQTTAsync_disconnectOptions_initializer;
  -static const MQTTAsync_responseOptions _Ropts =
  -             MQTTAsync_responseOptions_initializer;
   
  -int rpmmqttConnect(rpmmqtt mqtt)
  +static void * AOBJ(rpmmqtt mqtt, char otype)
   {
  -    int rc = -1;
  +    void * ptr = NULL;
   #ifdef       WITH_MQTT
  -    if (MQTTAsync_isConnected(mqtt->C)) {
  -     rc = 0;
  -    } else {
  -     urlinfo u = mqtt->u;
  -
  -     MQTTAsync_willOptions *Wopts = &mqtt->Wopts;
  -     memcpy(Wopts, &_Wopts, sizeof(*Wopts));
  -#ifdef       REF
  -     memcpy(Wopts->struct_id, "MQTW", 4);
  -     Wopts->struct_version = 0;
  -#endif
  -     Wopts->topicName = (mqtt->will_topic ? mqtt->will_topic : mqtt->topic);
  -     Wopts->message = (mqtt->will_message ? mqtt->will_message : "");
  -     Wopts->retained = (MF_ISSET(WILL_RETAIN) ? 1 : 0);
  -     Wopts->qos = mqtt->will_qos;    /* XXX mqtt->qos? */
  -
  -     MQTTAsync_connectOptions *Copts = &mqtt->Copts;
  -     memcpy(Copts, &_Copts, sizeof(*Copts));
  -#ifdef       REF
  -     memcpy(Copts->struct_id, "MQTC", 4);
  -     Copts->struct_version = 3;      /* 0-4 enables fields below */
  -#endif
  -     Copts->keepAliveInterval = mqtt->keepalive;
  -     Copts->cleansession = (MF_ISSET(CLEAN) ? 1 : 0);
  -     Copts->maxInflight = mqtt->max_inflight;
  -
  -     Copts->will = Wopts;            /* last will */
  -     Copts->username = (u && u->user ? u->user : mqtt->user);;
  -     Copts->password = (u && u->password ? u->password : mqtt->password);
  -
  -     Copts->connectTimeout = 30;     /* XXX secs. configure?*/
  -     Copts->retryInterval = 0;       /* XXX secs. configure? */
  -
  -#ifdef       NOTYET
  -     MQTTAsync_SSLOptions *Sopts = &mqtt->Sopts;
  -     memcpy(Sopts, &_Sopts, sizeof(*Sopts));
  -     Sopts->trustStore = mqtt->cafile;
  -     Sopts->keyStore = mqtt->cert;
  -     Sopts->privateKey = mqtt->privkey;
  -     Sopts->enabledCipherSuites = mqtt->ciphers;
  -     Sopts->enableServerCertAuth = (MF_ISSET(INSECURE) ? 0 : 1);
  -     Copts->ssl = Sopts;
  -#else
  -     (void)_Sopts;
  -     Copts->ssl = NULL;
  +    static const MQTTAsync_connectOptions _C =
  +             MQTTAsync_connectOptions_initializer;
  +    MQTTAsync_connectOptions *C = &mqtt->C;
  +    static const MQTTAsync_disconnectOptions _D =
  +             MQTTAsync_disconnectOptions_initializer;
  +    MQTTAsync_disconnectOptions *D = &mqtt->D;
  +    static const MQTTAsync_message _M =
  +             MQTTAsync_message_initializer;
  +    MQTTAsync_message *M = &mqtt->M;
  +    static const MQTTAsync_createOptions _O =
  +             MQTTAsync_createOptions_initializer;
  +    MQTTAsync_createOptions *O = &mqtt->O;
  +    static const MQTTAsync_responseOptions _R =
  +             MQTTAsync_responseOptions_initializer;
  +    MQTTAsync_responseOptions *R = &mqtt->R;
  +    static const MQTTAsync_SSLOptions _S =
  +             MQTTAsync_SSLOptions_initializer;
  +    MQTTAsync_SSLOptions *S = &mqtt->S;
  +    static const MQTTAsync_willOptions _W =
  +             MQTTAsync_willOptions_initializer;
  +    MQTTAsync_willOptions *W = &mqtt->W;
  +    urlinfo u;
  +
  +    switch (otype) {
  +    case 'C':
  +     u = mqtt->u;
  +#ifdef       DYING
  +assert(u);
   #endif
  +     memcpy(C, &_C, sizeof(*C));
  +     C->keepAliveInterval = mqtt->keepalive;
  +     C->cleansession = (MF_ISSET(CLEAN) ? 1 : 0);
  +     C->maxInflight = mqtt->max_inflight;
  +
  +     W = AOBJ(mqtt, 'W');            /* XXX LWT */
  +     C->will = (W && W->topicName ? W : NULL);
  +
  +     C->username = (u && u->user ? u->user : mqtt->user);;
  +     C->password = (u && u->password ? u->password : mqtt->password);
  +
  +     C->connectTimeout = 30; /* XXX secs. configure?*/
  +     C->retryInterval = 0;   /* XXX secs. configure? */
  +
  +     S = AOBJ(mqtt, 'S');            /* XXX SSL */
  +     C->ssl = (S && S->keyStore) ? S : NULL; /* XXX */
   
  -     Copts->onSuccess = onConnect;
  -     Copts->onFailure = onConnectFailure;
  -     Copts->context = mqtt;
  +     C->onSuccess = onConnect;
  +     C->onFailure = onConnectFailure;
  +     C->context = mqtt;
   
  -     Copts->serverURIcount = 0;
  -     Copts->serverURIs = NULL;
  +     C->serverURIcount = 0;
  +     C->serverURIs = NULL;
   
  -     Copts->MQTTVersion = 0;
  +     C->MQTTVersion = 0;
        if (mqtt->protocol_version) {
            if (!strcmp(mqtt->protocol_version, "auto"))
  -             Copts->MQTTVersion = 0;
  +             C->MQTTVersion = 0;
            if (!strcmp(mqtt->protocol_version, "31"))
  -             Copts->MQTTVersion = 3;
  +             C->MQTTVersion = 3;
            if (!strcmp(mqtt->protocol_version, "311"))
  -             Copts->MQTTVersion = 4;
  +             C->MQTTVersion = 4;
        }
  +     C->automaticReconnect = 1;
  +     C->minRetryInterval = 1;        /* secs */
  +     C->maxRetryInterval = 60;       /* secs */
  +mqtt->ut = 0;
  +mqtt->u = NULL;
  +     ptr = (void *) C;
  +     break;
  +    case 'D':
  +     memcpy(D, &_D, sizeof(*D));
  +     D->timeout = mqtt->timeout;
  +     D->onSuccess = onDisconnect;
  +     D->onFailure = onDisconnectFailure;
  +     D->context = mqtt;
  +     ptr = (void *) D;
  +     break;
  +    case 'M':
  +     memcpy(M, &_M, sizeof(*M));
  +     M->payloadlen = 0;
  +     M->payload = "";
  +     M->qos = mqtt->qos;
  +     M->retained = (MF_ISSET(RETAIN) ? 1 : 0);
  +     M->dup = 0;
  +     M->msgid = 0;
  +     ptr = (void *) M;
  +     break;
  +    case 'O':
  +     memcpy(O, &_O, sizeof(*O));
  +     O->sendWhileDisconnected = 1;           /* XXX = 0 */
  +     O->maxBufferedMessages = 100;
  +     ptr = (void *) O;
  +     break;
  +    case 'R':
  +     memcpy(R, &_R, sizeof(*R));
  +     R->onSuccess = NULL;                    /* XXX multiple uses */
  +     R->onFailure = NULL;                    /* XXX multiple uses */
  +     R->context = mqtt;
  +     R->token = 0;
  +     ptr = (void *) R;
  +     break;
  +    case 'S':
  +     memcpy(S, &_S, sizeof(*S));
  +     S->trustStore = mqtt->cafile;
  +     S->keyStore = mqtt->cert;
  +     S->privateKey = mqtt->privkey;
  +     S->enabledCipherSuites = mqtt->ciphers;
  +     S->enableServerCertAuth = (MF_ISSET(INSECURE) ? 0 : 1);
  +     ptr = (void *) S;
  +     break;
  +    case 'W':
  +     memcpy(W, &_W, sizeof(*W));
  +     W->topicName = (mqtt->will_topic ? mqtt->will_topic : mqtt->topic);
  +     W->message = (mqtt->will_message ? mqtt->will_message : "");
  +     W->retained = (MF_ISSET(WILL_RETAIN) ? 1 : 0);
  +     W->qos = mqtt->will_qos;        /* XXX mqtt->qos? */
  +     ptr = (void *) W;
  +     break;
  +    default:
  +assert(0);
  +     break;
  +    }
  +#endif       /* WITH_MQTT */
  +    return ptr;
  +}
   
  +int rpmmqttConnect(rpmmqtt mqtt)
  +{
  +    int rc = -1;
  +
  +SPEW((stderr, "--> %s(%p)\n", __FUNCTION__, mqtt));
  +#ifdef       WITH_MQTT
  +    if (!MQTTAsync_isConnected(mqtt->I)) {
        mqtt->finished = 0;
        rc = check(mqtt, "connect",
  -             MQTTAsync_connect(mqtt->C, Copts));
  -
  +             MQTTAsync_connect(mqtt->I, AOBJ(mqtt, 'C')));
        while (!mqtt->finished)
            usleep(1000);
  -
  -    }
  +    } else
  +     rc = 0;
   #endif       /* WITH_MQTT */
   SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc));
       return rc;
  @@ -801,64 +906,37 @@
   {
       int rc = -1;
   #ifdef       WITH_MQTT
  -    if (MQTTAsync_isConnected(mqtt->C)) {
  -     MQTTAsync_disconnectOptions *Dopts = &mqtt->Dopts;
  -     memcpy(Dopts, &_Dopts, sizeof(*Dopts));
  -#ifdef       REF
  -     memcpy(Dopts->struct_id, "MQTD", 4);
  -     Dopts->struct_version = 0;
  -#endif
  -     Dopts->timeout = mqtt->timeout;
  -     Dopts->onSuccess = onDisconnect;
  -     Dopts->onFailure = onDisconnectFailure;
  -     Dopts->context = mqtt;
  -
  +    if (MQTTAsync_isConnected(mqtt->I)) {
        mqtt->finished = 0;
        rc = check(mqtt, "disconnect",
  -             MQTTAsync_disconnect(mqtt->C, Dopts));
  +             MQTTAsync_disconnect(mqtt->I, AOBJ(mqtt, 'D')));
        while (!mqtt->finished)
            usleep(100);
  -    }
  +    } else
  +     rc = 0;
   #endif
   SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc));
       return rc;
   }
   
  -int rpmmqttSendmsg(rpmmqtt mqtt, const char * s, size_t ns)
  +int rpmmqttSendMessage(rpmmqtt mqtt, const char * s, size_t ns)
   {
       int rc = -1;
   
       if (ns == 0) ns = strlen(s);
   
   #ifdef       WITH_MQTT
  -    MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  -#ifdef       REF
  -    memcpy(pubmsg.struct_id, "MQTM", 4);
  -    pubmsg.struct_version = 0;
  -#endif
  -    pubmsg.payloadlen = ns;
  -    pubmsg.payload = (char *) s;
  -    pubmsg.qos = mqtt->qos;
  -    pubmsg.retained = (MF_ISSET(RETAIN) ? 1 : 0);
  -#ifdef       REF
  -    pubmsg.dup = 0;
  -    pubmsg.msgid = 0;
  -#endif
  -
  -    MQTTAsync_responseOptions *Ropts = &mqtt->Ropts;
  -    memcpy(Ropts, &_Ropts, sizeof(*Ropts));
  -#ifdef       REF
  -    memcpy(Ropts->struct_id, "MQTR", 4);
  -    Ropts->struct_version = 0;
  -#endif
  -    Ropts->onSuccess = onSend;
  -    Ropts->onFailure = onSendFailure;
  -    Ropts->context = mqtt;
  -    Ropts->token = 0;
  +    MQTTAsync_message *M = AOBJ(mqtt, 'M');
  +    M->payloadlen = ns;
  +    M->payload = (char *) s;
  +
  +    MQTTAsync_responseOptions *R = AOBJ(mqtt, 'R');
  +    R->onSuccess = onSend;
  +    R->onFailure = onSendFailure;
   
       mqtt->finished = 0;
       rc = check(mqtt, "sendMessage",
  -             MQTTAsync_sendMessage(mqtt->C, mqtt->topic, &pubmsg, Ropts));
  +             MQTTAsync_sendMessage(mqtt->I, mqtt->topic, M, R));
       while (!mqtt->finished)
        usleep(100);
   #endif       /* WITH_MQTT */
  @@ -891,19 +969,13 @@
        rpmlog(_lvl, "%19s: %s qos(%u)\n", "subtopic", t, subqos[i]);
       }
   
  -    MQTTAsync_responseOptions *Ropts = &mqtt->Ropts;
  -    memcpy(Ropts, &_Ropts, sizeof(*Ropts));
  -#ifdef  REF
  -    memcpy(Ropts->struct_id, "MQTR", 4);
  -    Ropts->struct_version = 0;
  -#endif
  -    Ropts->onSuccess = onSubscribeMany;
  -    Ropts->onFailure = onSubscribeFailure;
  -    Ropts->context = mqtt;
  +    MQTTAsync_responseOptions *R = AOBJ(mqtt, 'R');
  +    R->onSuccess = onSubscribeMany;
  +    R->onFailure = onSubscribeFailure;       /* XXX */
   
       rc = check(mqtt, "subscribeMany",
  -             MQTTAsync_subscribeMany(&mqtt->C,
  -                     ac, av, subqos, Ropts));
  +             MQTTAsync_subscribeMany(&mqtt->I,
  +                     ac, av, subqos, R));
       subqos = _free(subqos);
   #endif       /* WITH_MQTT */
   
  @@ -913,35 +985,44 @@
   }
   
   /*==============================================================*/
  -ssize_t rpmmqttWrite(rpmmqtt mqtt, const char *s, size_t ns)
  +int rpmmqttPub(rpmmqtt mqtt, const char *s, size_t ns)
   {
  -    ssize_t ret = -1;        /* assume failure */
  +    int ret = -1;    /* assume failure */
  +
  +    if (ns == 0) ns = strlen(s);
   
   #ifdef       WITH_MQTT
  -    if (rpmmqttConnect(mqtt) == 0) {
  -     static char _mqtt_prefix[] = "%{?_mqtt_prefix}";
  -     /* XXX extra space */
  +    if (rpmmqttConnect(mqtt))
  +     goto exit;
  +    {        static char _mqtt_prefix[] = "%{?_mqtt_prefix}";
  +     /* XXX extra space: prepend in *sendMessage or *send  */
        char * t = rpmExpand(_mqtt_prefix, " ", s, NULL);
        size_t nt = strlen(t);
   
  -     if (!rpmmqttSendmsg(mqtt, t, nt))
  +     if (!rpmmqttSendMessage(mqtt, t, nt))
            ret = nt;
   
        t = _free(t);
       }
  +
  +exit:
   #endif       /* WITH_MQTT */
   
   SPEW((stderr, "<-- %s(%p,%p[%u]) ret %d\n", __FUNCTION__, mqtt, s, 
(unsigned)ns, (int)ret));
       return ret;
   }
   
  -ssize_t rpmmqttRead(rpmmqtt mqtt, const char *s, size_t ns)
  +int rpmmqttSub(rpmmqtt mqtt, const char *s, size_t ns)
   {
  -    ssize_t ret = -1;        /* assume failure */
  +    int ret = -1;    /* assume failure */
  +
  +    if (ns == 0) ns = strlen(s);
   
   #ifdef       WITH_MQTT
  -    if (rpmmqttConnect(mqtt) == 0) {
  -     char * subtopic = rpmExpand((s ? s : mqtt->topic), NULL);
  +    if (rpmmqttConnect(mqtt))
  +     goto exit;
  +
  +    {        char * subtopic = rpmExpand((s ? s : mqtt->topic), NULL);
        unsigned subqos = mqtt->qos;
        char *t, *te;
        int _lvl = RPMLOG_DEBUG;
  @@ -969,19 +1050,13 @@
   
        rpmlog(_lvl, "%19s: %s qos(%u)\n", "subtopic", subtopic, subqos);
   
  -     MQTTAsync_responseOptions *Ropts = &mqtt->Ropts;
  -     memcpy(Ropts, &_Ropts, sizeof(*Ropts));
  -#ifdef       REF
  -     memcpy(Ropts->struct_id, "MQTR", 4);
  -     Ropts->struct_version = 0;
  -#endif
  -     Ropts->onSuccess = onSubscribe;
  -     Ropts->onFailure = onSubscribeFailure;
  -     Ropts->context = mqtt;
  +     MQTTAsync_responseOptions *R = AOBJ(mqtt, 'R');
  +     R->onSuccess = onSubscribe;
  +     R->onFailure = onSubscribeFailure;
   
        mqtt->finished = 0;
        rc = check(mqtt, "subscribe",
  -             MQTTAsync_subscribe(mqtt->C, subtopic, subqos, Ropts));
  +             MQTTAsync_subscribe(mqtt->I, subtopic, subqos, R));
        while (rc == 0 && !mqtt->finished)
            usleep(100);
   
  @@ -990,6 +1065,8 @@
        subtopic = _free(subtopic);
   
       }
  +
  +exit:
   #endif       /* WITH_MQTT */
   
   SPEW((stderr, "<-- %s(%p,%p[%u]) ret %d\n", __FUNCTION__, mqtt, s, 
(unsigned)ns, (int)ret));
  @@ -1009,6 +1086,8 @@
        N_("(sub) Do not print NL after messages."), NULL },
      { NULL, 'R', POPT_BIT_SET,                &mqtt->flags, 
MQTT_FLAGS_NOSTALE,
        N_("(sub) Do not print stale messages."), NULL },
  +   { "filter-out", 'T', POPT_ARG_ARGV,       &mqtt->_filter_out, 0,
  +     N_("(sub) Filter out topics."), NULL },
   
      { NULL, 'A', POPT_ARG_STRING,             &mqtt->_address, 0,
        N_("Connect from local <ADDR>."), N_("<ADDR>") },
  @@ -1022,7 +1101,7 @@
        N_("Connect to <HOST>."), N_("<HOST>") },
      { "id", 'i', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&mqtt->_clientid, 
0,
        N_("MQTT client <ID>."), N_("<ID>") },
  -   { "prefix", 'I', POPT_ARG_STRING, &mqtt->idprefix, 0,
  +   { "id-prefix", 'I', POPT_ARG_STRING,      &mqtt->idprefix, 0,
        N_("Use <IDPREFIX>-%{pid} for clientid."), N_("<IDPREFIX>") },
      { "keepalive", 'k', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, 
&mqtt->keepalive, 0,
        N_("Keep alive in <SECS>."), N_("<SECS>") },
  @@ -1038,7 +1117,7 @@
        N_("Write input mssages to <FILE>."), N_("<FILE>") },
      { "port", 'p', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT,    &mqtt->port, 0,
        N_("Connect to network <PORT>."), N_("<PORT>") },
  -   { "pass", 'P', POPT_ARG_STRING,   &mqtt->password, 0,
  +   { "pw", 'P', POPT_ARG_STRING,     &mqtt->password, 0,
        N_("Remote user <PASSWORD>."), N_("<PASSWORD>") },
      { "qos", 'q', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT,     &mqtt->qos, 0,
        N_("MQTT <QOS> level."), N_("<QOS>") },
  @@ -1152,7 +1231,7 @@
   static const char _mqtt_trace[] = "%{?_mqtt_trace}%{!?_mqtt_trace:0}";
       mqtt->trace = rpmExpandNumeric(_mqtt_trace);
   
  -#ifdef       NOTYET
  +#ifdef       DYING
   static const char _mqtt_topic[] =
        "%{?_mqtt_topic}%{!?_mqtt_topic:rpm/%{pid}/mqtt}";
   mqtt->topic = _free(mqtt->topic);
  @@ -1163,6 +1242,8 @@
       mqtt->clientid = rpmExpand(_mqtt_clientid, NULL);
   #endif
   
  +     /* XXX load mqtt->_topics from _mqtt_subtopics */
  +
       return rc;
   }
   
  @@ -1270,6 +1351,7 @@
        mqtt->port = strtoul(u->portstr, NULL, 0);
        }
   
  +mqtt->uri = _free(mqtt->uri);
       mqtt->uri = rpmExpand("tcp://", u->host, ":", u->portstr, NULL);
   
       {        const char * topic;
  @@ -1300,14 +1382,17 @@
       mqtt->flags = (flags ? flags : _flags);
   
       /* -- Initialize oddball values. */
  -#ifdef       DYING
  -    mqtt->protocol_version = rpmExpand("31", NULL);
  -#else
  +     /* XXX FIXME: needed for --help POT_ARGFLAG_DEFAULT spewage only */
  +mqtt->protocol_version = _free(mqtt->protocol_version);
       mqtt->protocol_version = rpmExpand("auto", NULL);
  +mqtt->will_message = _free(mqtt->will_message);
       mqtt->will_message = rpmExpand("", NULL);
  -#endif
  +mqtt->_tls_version = _free(mqtt->_tls_version);
       mqtt->_tls_version = rpmExpand("1.2", NULL);
  +     /* XXX FIXME:  __progname? */
  +mqtt->idprefix = _free(mqtt->idprefix);
       mqtt->idprefix = rpmExpand("rpm", NULL);
  +mqtt->_clientid = _free(mqtt->_clientid);
       mqtt->_clientid = rpmExpand(mqtt->idprefix, "-%%{pid}", NULL);
   
       /* -- Initialize values frpm default URI. */
  @@ -1351,24 +1436,24 @@
       if (mqtt->clientid == NULL) {
   assert(mqtt->_clientid);
        mqtt->clientid = rpmExpand(mqtt->_clientid, NULL);
  -fprintf(stderr, "*** %s: clientid %s\n", __FUNCTION__, mqtt->clientid);
       }
       if (mqtt->topic == NULL) {
   assert(mqtt->idprefix);
        mqtt->topic = rpmExpand(mqtt->idprefix, "/mqtt", NULL);
  -fprintf(stderr, "*** %s: topic %s\n", __FUNCTION__, mqtt->topic);
       }
        
  +#ifdef       DYING
       /* -- XXX Add default argv (if not specified) */
       if (mqtt->ac == 0) {
   static const char *_av[] = {
  -    "mqtt://luser:jasnl@localhost:1883/rpm/mqtt?trace=4",
  +    "mqtt://luser:jasnl@localhost:1883/rpm/mqtt",
       "rpm/#",
       NULL,
   };
        (void) argvAppend((ARGV_t *)&mqtt->av, _av);
        mqtt->ac = argvCount((ARGV_t)mqtt->av);
       }
  +#endif
   
       rc = RPMRC_OK;
   
  @@ -1385,7 +1470,7 @@
   #ifdef       WITH_MQTT
       (void) rpmmqttDisconnect(mqtt);
       (void) check(mqtt, "destroy",
  -             (MQTTAsync_destroy(&mqtt->C), 0));
  +             (MQTTAsync_destroy(&mqtt->I), 0));
   #endif       /* WITH_MQTT */
   
   /* ========== */
  @@ -1396,6 +1481,7 @@
       mqtt->msgs = argvFree((ARGV_t)mqtt->msgs);
       mqtt->password = _free(mqtt->password);
       mqtt->_topics = argvFree((ARGV_t)mqtt->_topics);
  +    mqtt->_filter_out = argvFree((ARGV_t)mqtt->_filter_out);
       mqtt->user = _free(mqtt->user);
       mqtt->protocol_version = _free(mqtt->protocol_version); /* XXX malloc? */
   
  @@ -1412,13 +1498,13 @@
       mqtt->_proxy = _free(mqtt->_proxy);
   /* ========== */
   
  -    mqtt->C = NULL;
  +    mqtt->I = NULL;
       mqtt->uri = _free(mqtt->uri);
       mqtt->topic = _free(mqtt->topic);
       mqtt->clientid = _free(mqtt->clientid);
       mqtt->persist_path = _free(mqtt->persist_path);
       mqtt->persist_ctx = _free(mqtt->persist_ctx);
  -    mqtt->dn = _free(mqtt->dn);
  +    mqtt->cachedn = _free(mqtt->cachedn);
   
       mqtt->ifn = argvFree(mqtt->ifn);
       if (mqtt->ifd)
  @@ -1436,15 +1522,21 @@
       mqtt->flags = 0;
   }
   
  -RPMIOPOOL_MODULE(mqtt)
  +RPMIOPOOL_INTERP_MODULE(mqtt)
   
   rpmmqtt rpmmqttNew(char ** av, uint32_t flags)
   {
  -    rpmmqtt mqtt = rpmmqttGetPool(_rpmmqttPool);
  -    int ac = argvCount((ARGV_t)av);
  +    static const char * _av[] = { (char *) "rpmmqtt", NULL };
  +    rpmmqtt mqtt = (flags & 0x80000000)
  +     ? rpmmqttI() : rpmmqttGetPool(_rpmmqttPool);
   
   SPEW((stderr, "--> %s(%p,0x%x)\n", __FUNCTION__, av, flags));
   
  +    /* XXX quick-n-dirty recursion avoidance. */
  +    if (av == NULL) av = (char **) _av;
  +
  +    int ac = argvCount((ARGV_t)av);
  +    flags &= ~0x80000000;
       rpmRC rc = rpmmqttInit(mqtt, ac, (const char **)av, flags);
       (void)rc;
   
  @@ -1473,15 +1565,16 @@
            oneshot++;
        }
   
  +     /* XXX improve integration */
   static const char _mqtt_persist[] =
  -     "%{?_mqtt_persist}%{!?_mqtt_persist:0}";
  +     "%{?_mqtt_persist}%{!?_mqtt_persist:2}";
        mqtt->persist_type = (rpmExpandNumeric(_mqtt_persist) % 3);
   static const char _mqtt_cachedir[] =
        "%{?_mqtt_cachedir}%{!?_mqtt_cachedir:/var/cache/mqtt}";
   char *persist_path = rpmGetPath(_mqtt_cachedir, NULL);
   
        rpmlog(_lvl, "%19s: %s\n", "clientid", mqtt->clientid);
  -     rpmlog(_lvl, "%19s: %s qos(%d) timeout(%umsecs)\n",
  +     rpmlog(_lvl, "%19s: %s qos(%d) timeout(%u msecs)\n",
                "topic", mqtt->topic, mqtt->qos, mqtt->timeout);
        rpmlog(_lvl, "%19s: type(%u) %s\n",
                "persist", mqtt->persist_type,
  @@ -1489,18 +1582,22 @@
        rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace);
   
   
  +mqtt->persist_path = _free(mqtt->persist_path);
  +mqtt->persist_ctx = _free(mqtt->persist_ctx);
        switch (mqtt->persist_type) {
        default:
        case MQTTCLIENT_PERSISTENCE_NONE:
            mqtt->persist_ctx = NULL;
            break;
        case MQTTCLIENT_PERSISTENCE_DEFAULT:
  -       { mqtt->persist_path = xstrdup(persist_path);
  +       {
  +         mqtt->persist_path = xstrdup(persist_path);
            /* XXX rpmmqttFini double free */
            mqtt->persist_ctx = (void *)xstrdup(mqtt->persist_path);
          } break;
        case MQTTCLIENT_PERSISTENCE_USER:
  -       { mqtt->persist_path = xstrdup(persist_path);
  +       {
  +         mqtt->persist_path = xstrdup(persist_path);
            MQTTClient_persistence * ctx = xmalloc(sizeof(*ctx));
            *ctx = _rpmmqtt_persistence;        /* structure assignment */
            ctx->context = mqtt;
  @@ -1510,8 +1607,10 @@
   persist_path = _free(persist_path);
   
        /* Prepare for subscription delivery. */
  -     if (MF_ISSET(BUFFER))
  +     if (MF_ISSET(BUFFER) && mqtt->iob == NULL)
            mqtt->iob = rpmiobNew(0);
  +     if (mqtt->ofd)                  /* XXX memleaks on recursion */
  +         (void) Fclose(mqtt->ofd);
        switch (mqtt->msg_output) {
        default:
        case MQTT_OUTPUT_UNKNOWN:
  @@ -1529,25 +1628,29 @@
            break;
        }
   
  +mqtt->u = NULL;
   dumpMQTT(__FUNCTION__, mqtt);
   
  -     xx = check(mqtt, "create",
  -             MQTTAsync_create(&mqtt->C, mqtt->uri, mqtt->clientid,
  -             mqtt->persist_type, mqtt->persist_ctx));
  +      if (mqtt->I == NULL) {
  +     xx = check(mqtt, "createWithOptions",
  +             MQTTAsync_createWithOptions(&mqtt->I,
  +                     mqtt->uri, mqtt->clientid,
  +                     mqtt->persist_type, mqtt->persist_ctx, AOBJ(mqtt, 
'O')));
   
        xx = check(mqtt, "setCallbacks",
  -             MQTTAsync_setCallbacks(mqtt->C, mqtt,
  +             MQTTAsync_setCallbacks(mqtt->I, mqtt,
                        onConnectionLost,
                        onMessageArrived,
                        onDeliveryComplete));
   
        /* Subscribe to channels (if any). */
  +     /* XXX rework using mqtt->_topics instead */
        if (mqtt->ac > 1) {
   #ifdef       NOTYET  /* XXX segfault here. */
            xx = rpmmqttSubscribeMany(mqtt, mqtt->ac-1, mqtt->av+1);
   #else
            for (int i = 1; i < mqtt->ac; i++)
  -             xx = rpmmqttRead(mqtt, mqtt->av[i], 0);
  +             xx = rpmmqttSub(mqtt, mqtt->av[i], 0);
   #endif
        }
   
  @@ -1560,7 +1663,7 @@
            if (mqtt->msgs) {
                int nmsgs = argvCount((ARGV_t)mqtt->msgs);
                for (int i = 0; i < nmsgs; i++) {
  -                 xx = rpmmqttWrite(mqtt, mqtt->msgs[i], 0);
  +                 xx = rpmmqttPub(mqtt, mqtt->msgs[i], 0);
                }
            }
            break;
  @@ -1572,7 +1675,7 @@
                    const char * ifn = mqtt->ifn[i];
                    rpmiob iob = NULL;
                    if (rpmiobSlurp(ifn, &iob) == 0) {
  -                     xx = rpmmqttWrite(mqtt,
  +                     xx = rpmmqttPub(mqtt,
                                rpmiobStr(iob), rpmiobLen(iob));
                    }
                    iob = rpmiobFree(iob);
  @@ -1595,7 +1698,7 @@
                            const char * s = lav[j];
                            size_t ns = strlen(s);
                            if (ns > 0) /* XXX skip empty lines? */
  -                             xx = rpmmqttWrite(mqtt, s, ns);
  +                             xx = rpmmqttPub(mqtt, s, ns);
                        }
                        lav = argvFree(lav);
                    }
  @@ -1604,8 +1707,37 @@
            }
            break;
        }
  +      }
       }
   #endif       /* WITH_MQTT */
   
       return rpmmqttLink(mqtt);
   }
  +
  +rpmRC rpmmqttRun(rpmmqtt mqtt, const char * str, const char ** resultp)
  +{
  +    const char *msg = NULL;
  +    rpmRC rc = RPMRC_FAIL;
  +
  +SPEW((stderr, "==> %s(%p,\"%s\",%p)\n", __FUNCTION__, mqtt, str, resultp));
  +
  +    if (mqtt == NULL) mqtt = rpmmqttI();
  +
  +    msg = rpmExpand("%now", " ", str, NULL);
  +#if defined(WITH_MQTT)
  +    if (rpmmqttConnect(mqtt))
  +     goto exit;
  +    if (msg != NULL && !rpmmqttSendMessage(mqtt, msg, strlen(msg))) {
  +     rc = RPMRC_OK;
  +     if (resultp) {
  +         *resultp = (mqtt->iob ? rpmiobStr(mqtt->iob) : "");
  +     }
  +    }
  +
  +exit:
  +#endif
  +
  +    msg = _free(msg);
  +SPEW((stderr, "<== %s(%p,\"%s\",%p) rc %d\n", __FUNCTION__, mqtt, str, 
resultp, rc));
  +    return rc;
  +}
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmqtt.h
  ============================================================================
  $ cvs diff -u -r1.1.2.11 -r1.1.2.12 rpmmqtt.h
  --- rpm/rpmio/rpmmqtt.h       3 Jul 2016 08:41:57 -0000       1.1.2.11
  +++ rpm/rpmio/rpmmqtt.h       4 Jul 2016 07:45:23 -0000       1.1.2.12
  @@ -68,6 +68,7 @@
       int port;                                        /*!< -p */
       const char *password;                    /*!< -P */
       const char **_topics;                    /*!< -t */
  +    const char **_filter_out;                        /*!< -T */
       const char *user;                                /*!< -u */
       const char *protocol_version;            /*!< -V */
   
  @@ -85,12 +86,13 @@
       const char *_psk_key;                    /*!< --psk */
       const char *_psk_identity;                       /*!< --psk-identity */
       const char *_proxy;                              /*!< --proxy */
  +
   /* ========== */
   
       urltype ut;                      /* "tcp://localhost:1833" */
       urlinfo u;
   
  -    void * C;                        /* MQTTClient */
  +    void * I;                        /* MQTTClient */
   
       const char * uri;
       const char * topic;
  @@ -116,7 +118,7 @@
       int MQTTVersion;
       int sessionPresent;
   
  -    const char *dn;
  +    const char *cachedn;
   
       FD_t ifd;
       const char **ifn;                                /*!< -f */
  @@ -124,11 +126,13 @@
       const char *ofn;                         /*!< -o */
       rpmiob iob;
   
  -    MQTTAsync_connectOptions Copts;
  -    MQTTAsync_willOptions Wopts;
  -    MQTTAsync_SSLOptions Sopts;
  -    MQTTAsync_disconnectOptions Dopts;
  -    MQTTAsync_responseOptions Ropts;
  +    MQTTAsync_connectOptions C;
  +    MQTTAsync_willOptions    W;
  +    MQTTAsync_SSLOptions     S;
  +    MQTTAsync_disconnectOptions      D;
  +    MQTTAsync_responseOptions        R;
  +    MQTTAsync_createOptions  O;
  +    MQTTAsync_message                M;
   };
   
   #endif       /* _RPMMQTT_INTERNAL */
  @@ -176,9 +180,13 @@
   
   int rpmmqttDisconnect(rpmmqtt mqtt);
   
  -ssize_t rpmmqttWrite(rpmmqtt mqtt, const char *s, size_t ns);
  +int rpmmqttSendMessage(rpmmqtt mqtt, const char *s, size_t ns);
  +
  +int rpmmqttPub(rpmmqtt mqtt, const char *s, size_t ns);
  +
  +int rpmmqttSub(rpmmqtt mqtt, const char *s, size_t ns);
   
  -ssize_t rpmmqttRead(rpmmqtt mqtt, const char *s, size_t ns);
  +rpmRC rpmmqttRun(rpmmqtt mqtt, const char * str, const char ** resultp);
   
   #ifdef __cplusplus
   }
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/tmqtt.c
  ============================================================================
  $ cvs diff -u -r1.1.2.8 -r1.1.2.9 tmqtt.c
  --- rpm/rpmio/tmqtt.c 3 Jul 2016 08:41:57 -0000       1.1.2.8
  +++ rpm/rpmio/tmqtt.c 4 Jul 2016 07:45:23 -0000       1.1.2.9
  @@ -29,17 +29,17 @@
       int rc = 0;
   
   #ifdef       DYING
  -    (void) rpmmqttRead(mqtt, "rpm/#?qos=0", 0);
  -    (void) rpmmqttRead(mqtt, "$SYS/broker/version?qos=0", 0);
  +    (void) rpmmqttSub(mqtt, "rpm/#?qos=0", 0);
  +    (void) rpmmqttSub(mqtt, "$SYS/broker/version?qos=0", 0);
   #endif
   
  -    nw = rpmmqttWrite(mqtt, "bzzt ...", 0);
  -    nw = rpmmqttWrite(mqtt, "bzzT ...", 0);
  -    nw = rpmmqttWrite(mqtt, "bzZT ...", 0);
  -    nw = rpmmqttWrite(mqtt, "bZZT ...", 0);
  -    nw = rpmmqttWrite(mqtt, "BZZT ...", 0);
  +    nw = rpmmqttPub(mqtt, "bzzt ...", 0);
  +    nw = rpmmqttPub(mqtt, "bzzT ...", 0);
  +    nw = rpmmqttPub(mqtt, "bzZT ...", 0);
  +    nw = rpmmqttPub(mqtt, "bZZT ...", 0);
  +    nw = rpmmqttPub(mqtt, "BZZT ...", 0);
       (void) rpmmqttDisconnect(mqtt);
  -    nw = rpmmqttWrite(mqtt, "SWAT !!!", 0);
  +    nw = rpmmqttPub(mqtt, "SWAT !!!", 0);
       (void) rpmmqttDisconnect(mqtt);
       (void) rpmmqttConnect(mqtt);
   
  @@ -73,9 +73,14 @@
       int rc = -1;
   
       /* Initialize the _mqtt_ macro context */
  -(void) rpmDefineMacro(NULL, "_mqtt_trace 4", 0);
   
  +(void) rpmDefineMacro(NULL, "_mqtt_persist 2", 0);
   (void) rpmDefineMacro(NULL, "_mqtt_cachedir /var/cache/mqtt", 0);
  +
  +(void) rpmDefineMacro(NULL, "_mqtt_prefix %{now} %{nil}", 0);
  +
  +#ifdef       DYING
  +(void) rpmDefineMacro(NULL, "_mqtt_trace 4", 0);
   (void) rpmDefineMacro(NULL, "_mqtt_user luser", 0);
   (void) rpmDefineMacro(NULL, "_mqtt_pass jasnl", 0);
   (void) rpmDefineMacro(NULL, "_mqtt_host localhost", 0);
  @@ -83,9 +88,9 @@
   (void) rpmDefineMacro(NULL, "_mqtt_clientid rpm-%{pid}", 0);
   (void) rpmDefineMacro(NULL, "_mqtt_topic rpm/#", 0);
   (void) rpmDefineMacro(NULL, "_mqtt_qos 1", 0);
  -(void) rpmDefineMacro(NULL, "_mqtt_persist 2", 0);
   (void) rpmDefineMacro(NULL, "_mqtt_timeout 10000", 0);
   (void) rpmDefineMacro(NULL, "_mqtt_prefix %{now} rpm pid %{pid} on cpu%{cpu} 
%{user}:%{group} ", 0);
  +#endif
   
       mqtt = rpmmqttNew(argv, 0);
       rc = _DoMQTT(mqtt);
  @@ .
______________________________________________________________________
RPM Package Manager                                    http://rpm5.org
CVS Sources Repository                                rpm-cvs@rpm5.org

Reply via email to