RPM Package Manager, CVS Repository
  http://rpm5.org/cvs/
  ____________________________________________________________________________

  Server: rpm5.org                         Name:   Jeff Johnson
  Root:   /v/rpm/cvs                       Email:  j...@rpm5.org
  Module: rpm                              Date:   03-Jul-2016 10:41:57
  Branch: rpm-5_4                          Handle: 2016070308415700

  Modified files:           (Branch: rpm-5_4)
    rpm/rpmio               rpmmqtt.c rpmmqtt.h tmqtt.c

  Log:
    - mqtt: attach input/output modes.

  Summary:
    Revision    Changes     Path
    1.1.2.12    +351 -107   rpm/rpmio/rpmmqtt.c
    1.1.2.11    +33 -7      rpm/rpmio/rpmmqtt.h
    1.1.2.8     +4  -0      rpm/rpmio/tmqtt.c
  ____________________________________________________________________________

  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmqtt.c
  ============================================================================
  $ cvs diff -u -r1.1.2.11 -r1.1.2.12 rpmmqtt.c
  --- rpm/rpmio/rpmmqtt.c       2 Jul 2016 14:32:59 -0000       1.1.2.11
  +++ rpm/rpmio/rpmmqtt.c       3 Jul 2016 08:41:57 -0000       1.1.2.12
  @@ -7,6 +7,8 @@
   #include <stdlib.h>
   #include <string.h>
   
  +#define      _RPMIOB_INTERNAL
  +#include <rpmiotypes.h>      /* for rpmiobSlurp */
   #include <rpmio.h>   /* for *Pool methods */
   #include <rpmlog.h>
   #include <rpmcb.h>
  @@ -74,17 +76,21 @@
                msg, mqtt);
       if (mqtt) {
   
  -     argvPrint("mqtt->av", (ARGV_t)mqtt->av, fp);
   #define      PRINT(_fmt, _foo) fprintf(fp, "%19s: %"#_fmt"\n", #_foo, 
mqtt->_foo)
  +     PRINT(x, flags);
  +     PRINT(d, msg_input);
  +     PRINT(d, msg_output);
  +     argvPrint("mqtt->av", (ARGV_t)mqtt->av, fp);
  +
        PRINT(s, _address);
  -     PRINT(s, _infile);
  -     PRINT(s, _message);
  +     argvPrint("mqtt->msgs", (ARGV_t)mqtt->msgs, fp);
   
        PRINT(s, cafile);
        PRINT(s, _capath);
        PRINT(s, cert);
        PRINT(s, privkey);
        PRINT(s, ciphers);
  +
        PRINT(s, _tls_version);
        PRINT(s, _psk_key);
        PRINT(s, _psk_identity);
  @@ -94,6 +100,7 @@
        PRINT(d, keepalive);
        PRINT(d, max_inflight);
        PRINT(s, protocol_version);
  +
        PRINT(s, will_message);
        PRINT(d, will_qos);
        PRINT(s, will_topic);
  @@ -136,6 +143,14 @@
        PRINT(d, sessionPresent);
   
        PRINT(s, dn);
  +
  +     if (mqtt->ifn)
  +         argvPrint("mqtt->ifn", (ARGV_t)mqtt->ifn, fp);
  +     PRINT(p, ifd);
  +     PRINT(s, ofn);
  +     PRINT(p, ofd);
  +     PRINT(p, iob);
  +
   #undef       PRINT
       }
   }
  @@ -215,10 +230,31 @@
       if (_rpmmqtt_debug < 0)
        rpmlog(RPMLOG_DEBUG, "MQTT rcvd topic(%s) \"%.*s\"\n", topic, ns, s);
   
  -    printf("%.*s:\t", topicLen, topic);
  -    for (size_t i = 0; i < ns; i++)
  -        putchar(s[i]); 
  -    putchar('\n');              
  +    if (mqtt->iob) {
  +     mqtt->iob = rpmiobAppend(mqtt->iob, topic, 0);
  +     mqtt->iob = rpmiobAppend(mqtt->iob, ":\t", 0);
  +     {   char * t = memcpy(alloca(ns+1), s, ns);
  +         t[ns] = '\0';
  +         mqtt->iob = rpmiobAppend(mqtt->iob, t, 1);
  +     }
  +    }
  +    switch (mqtt->msg_output) {
  +    case MQTT_OUTPUT_UNKNOWN:
  +    case MQTT_OUTPUT_CALLBACK:
  +    default:
  +     break;
  +    case MQTT_OUTPUT_STDOUT:
  +    case MQTT_OUTPUT_FILE:
  +     if (mqtt->ofd) {
  +         size_t nw;
  +         nw = Fwrite(topic, sizeof(*topic), topicLen, mqtt->ofd);
  +         nw = Fwrite(":\t", 1, sizeof(":\t")-1, mqtt->ofd);
  +         nw = Fwrite(s, sizeof(*s), ns, mqtt->ofd);
  +         nw = Fwrite("\n", 1, sizeof("\n")-1, mqtt->ofd);
  +         (void)nw;
  +     }
  +     break;
  +    }
   
       MQTTAsync_freeMessage(&message);
       MQTTAsync_free(topic);
  @@ -976,9 +1012,11 @@
   
      { NULL, 'A', POPT_ARG_STRING,             &mqtt->_address, 0,
        N_("Connect from local <ADDR>."), N_("<ADDR>") },
  +   { "buffer", 'b', POPT_BIT_SET,    &mqtt->flags, MQTT_FLAGS_BUFFER,
  +     N_("Collect input messages in buffer."), NULL },
      { "debug", 'd', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &_rpmmqtt_debug, -1,
        N_("Debug spewage."), NULL },
  -   { "file", 'f', POPT_ARG_STRING,   &mqtt->_infile, 0,
  +   { "file", 'f', POPT_ARG_ARGV,     &mqtt->ifn, 0,
        N_("Send <FILE> as message."), N_("<FILE>") },
      { "host", 'h', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&mqtt->host, 0,
        N_("Connect to <HOST>."), N_("<HOST>") },
  @@ -988,14 +1026,16 @@
        N_("Use <IDPREFIX>-%{pid} for clientid."), N_("<IDPREFIX>") },
      { "keepalive", 'k', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, 
&mqtt->keepalive, 0,
        N_("Keep alive in <SECS>."), N_("<SECS>") },
  -   { NULL, 'l', POPT_BIT_SET,                &mqtt->flags, 
MQTT_FLAGS_STDIN_EACH,
  -     N_("Send messages line-by-line from stdin."), NULL },
  -   { "message", 'm', POPT_ARG_STRING,        &mqtt->_message, 0,
  +   { "lines", 'l', POPT_BIT_SET,     &mqtt->flags, MQTT_FLAGS_LINES,
  +     N_("Send messages line-by-line."), NULL },
  +   { "message", 'm', POPT_ARG_ARGV,  &mqtt->msgs, 0,
        N_("MQTT payload <MESSAGE> to send."), N_("<MESSAGE>") },
      { NULL, 'M', POPT_ARG_INT,                &mqtt->max_inflight, 0,
        N_("Permit <MAX> inflight messages."), N_("<MAX>") },
      { "null", 'n', POPT_BIT_SET,              &mqtt->flags, MQTT_FLAGS_EMPTY,
        N_("Send a null (zero length) message."), NULL },
  +   { "output", 'o', POPT_ARG_STRING, &mqtt->ofn, 0,
  +     N_("Write input mssages to <FILE>."), N_("<FILE>") },
      { "port", 'p', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT,    &mqtt->port, 0,
        N_("Connect to network <PORT>."), N_("<PORT>") },
      { "pass", 'P', POPT_ARG_STRING,   &mqtt->password, 0,
  @@ -1004,7 +1044,7 @@
        N_("MQTT <QOS> level."), N_("<QOS>") },
      { "retain", 'r', POPT_BIT_SET,            &mqtt->flags, MQTT_FLAGS_RETAIN,
        N_("Retain the message on the host."), NULL },
  -   { NULL, 's', POPT_BIT_SET,                &mqtt->flags, 
MQTT_FLAGS_STDIN_ALL,
  +   { NULL, 's', POPT_BIT_SET,                &mqtt->flags, MQTT_FLAGS_STDIN,
        N_("Send stdin lines as a single message."), NULL },
      { NULL, 'S', POPT_BIT_SET,                &mqtt->flags, MQTT_FLAGS_DNSSRV,
        N_("Use SRV record to find remote host."), NULL },
  @@ -1087,38 +1127,239 @@
       return rc;
   }
   
  +static rpmRC rpmmqttInitMacros(rpmmqtt mqtt)
  +{
  +    rpmRC rc = RPMRC_OK;
  +
  +     /* XXX MQTT_FLAGS_CLEAN */
  +     /* XXX MQTT_FLAGS_EOL */
  +     /* XXX MQTT_FLAGS_NOSTALE */
  +     /* XXX MQTT_FLAGS_LINES */
  +     /* XXX MQTT_FLAGS_STDIN */
  +     /* XXX MQTT_FLAGS_EMPTY */
  +     /* XXX MQTT_FLAGS_DNSSRV */
  +     /* XXX MQTT_FLAGS_INSECURE */
  +     /* XXX MQTT_FLAGS_RETAIN */
  +     /* XXX MQTT_FLAGS_WILL_RETAIN */
  +static const char _mqtt_qos[] = "%{?_mqtt_qos}%{!?_mqtt_qos:2}";
  +    mqtt->qos = (rpmExpandNumeric(_mqtt_qos) % 3);
  +static const char _mqtt_timeout[] = 
"%{?_mqtt_timeout}%{!?_mqtt_timeout:10000}";
  +    mqtt->timeout = rpmExpandNumeric(_mqtt_timeout);
  +static const char _mqtt_max_inflight[] = 
"%{?_mqtt_max_inflight}%{!?_mqtt_max_inflight:20}";
  +    mqtt->max_inflight = rpmExpandNumeric(_mqtt_max_inflight);
  +static const char _mqtt_keepalive[] = 
"%{?_mqtt_keepalive}%{!?_mqtt_keepalive:60}";
  +    mqtt->keepalive = rpmExpandNumeric(_mqtt_keepalive);
  +static const char _mqtt_trace[] = "%{?_mqtt_trace}%{!?_mqtt_trace:0}";
  +    mqtt->trace = rpmExpandNumeric(_mqtt_trace);
  +
  +#ifdef       NOTYET
  +static const char _mqtt_topic[] =
  +     "%{?_mqtt_topic}%{!?_mqtt_topic:rpm/%{pid}/mqtt}";
  +mqtt->topic = _free(mqtt->topic);
  +    mqtt->topic = rpmExpand(_mqtt_topic, NULL);
  +static const char _mqtt_clientid[] =
  +     "%{?_mqtt_clientid}%{!?_mqtt_clientid:rpm}";
  +mqtt->clientid = _free(mqtt->clientid);
  +    mqtt->clientid = rpmExpand(_mqtt_clientid, NULL);
  +#endif
  +
  +    return rc;
  +}
  +
  +static rpmRC rpmmqttInitURIQuery(rpmmqtt mqtt, const char *query)
  +{
  +    ARGV_t av = NULL;
  +    int ac;
  +    const char *t;
  +    const char *te;
  +    rpmRC rc = RPMRC_OK;
  +
  +    (void) argvSplit(&av, query, ",");
  +    ac = argvCount(av);
  +    for (int i = 0; i < ac; i++) {
  +     t = av[i];
  +     if ((te = strchr(t, '=')) == NULL)
  +         te += strlen(t);
  +     if (!strncmp(t, "clean", (te - t))) {
  +         mqtt->flags |= MQTT_FLAGS_CLEAN;
  +         continue;
  +     }
  +     /* XXX MQTT_FLAGS_EOL */
  +     /* XXX MQTT_FLAGS_NOSTALE */
  +     /* XXX MQTT_FLAGS_LINES */
  +     /* XXX MQTT_FLAGS_STDIN */
  +     /* XXX MQTT_FLAGS_EMPTY */
  +     /* XXX MQTT_FLAGS_DNSSRV */
  +     /* XXX MQTT_FLAGS_INSECURE */
  +     if (!strncmp(t, "retain", (te - t))) {
  +         mqtt->flags |= MQTT_FLAGS_RETAIN;
  +         continue;
  +     }
  +     /* XXX MQTT_FLAGS_WILL_RETAIN */
  +     if (!strncmp(t, "qos", (te - t))) {
  +         mqtt->qos = (te[0] == '='  && xisdigit(te[1]))
  +             ? strtoul(te+1, NULL, 0)
  +             : 2;            /* XXX default qos=2? */
  +         mqtt->qos %= 3;
  +         continue;
  +     }
  +     if (!strncmp(t, "timeout", (te - t))) {
  +         mqtt->timeout = (te[0] == '='  && xisdigit(te[1]))
  +             ? strtoul(te+1, NULL, 0)
  +             : 10000;        /* XXX default timeout=10000? */
  +         continue;
  +     }
  +     if (!strncmp(t, "max_inflight", (te - t))) {
  +         mqtt->max_inflight = (te[0] == '='  && xisdigit(te[1]))
  +             ? strtoul(te+1, NULL, 0)
  +             : 20;   /* XXX default max_inflight=20? */
  +         continue;
  +     }
  +     if (!strncmp(t, "keepalive", (te - t))) {
  +         mqtt->max_inflight = (te[0] == '='  && xisdigit(te[1]))
  +             ? strtoul(te+1, NULL, 0)
  +             : 60;   /* XXX default keepalive=60? */
  +         continue;
  +     }
  +     /* XXX set global mqtt->trace from URI query string? */
  +     if (!strncmp(t, "trace", (te - t))) {
  +         mqtt->trace = (te[0] == '='  && xisdigit(te[1]))
  +             ? strtoul(te+1, NULL, 0)
  +             : 4;            /* XXX default trace=4? */
  +         continue;
  +     }
  +    }
  +    av = argvFree(av);
  +
  +    return rc;
  +}
  +
  +static rpmRC rpmmqttInitURI(rpmmqtt mqtt, const char *url)
  +{
  +    urlinfo u = NULL;
  +    rpmRC rc = RPMRC_OK;
  +     
  +    /* -- Set unspecified MQTT options from the URI parameters. */
  +    mqtt->ut = urlSplit(url, &u);
  +    mqtt->u = u;
  +
  +    if (u->scheme == NULL
  +     || !strcmp(u->scheme, "mqtt")
  +     || !strcmp(u->scheme, "mqtts"))
  +    {
  +     if (u->portstr == NULL) {
  +         u->portstr = !strcmp(u->scheme, "mqtts")
  +             ? xstrdup("8883") : xstrdup("1883");
  +     
  +     }
  +    }
  +
  +    if (u->user != NULL) {
  +mqtt->user = _free(mqtt->user);
  +     mqtt->user = xstrdup(u->user);
  +    }
  +    if (u->password != NULL) {
  +mqtt->password = _free(mqtt->password);
  +     mqtt->password = xstrdup(u->password);
  +    }
  +    if (u->host != NULL) {
  +mqtt->host = _free(mqtt->host);
  +     mqtt->host = xstrdup(u->host);
  +    }
  +    if (u->portstr != 0) {
  +     mqtt->port = strtoul(u->portstr, NULL, 0);
  +     }
  +
  +    mqtt->uri = rpmExpand("tcp://", u->host, ":", u->portstr, NULL);
  +
  +    {        const char * topic;
  +     (void) urlPath(u->url, &topic);
  +     if (topic && *topic == '/')
  +         topic++;
  +     if (topic && *topic) {
  +mqtt->topic = _free(mqtt->topic);
  +         mqtt->topic = rpmExpand(topic, NULL);
  +     }
  +    }
  +
  +    if (u->query) {
  +     rc = rpmmqttInitURIQuery(mqtt, u->query);
  +    }
  +
  +    return rc;
  +}
  +
   static rpmRC rpmmqttInit(rpmmqtt mqtt, int ac, const char ** av,
                mqttFlags flags)
   {
  -    static mqttFlags _flags = (MQTT_FLAGS_CLEAN|MQTT_FLAGS_EOL);
  -    urlinfo u = NULL;
  -    const char *s = NULL;
       rpmRC rc;
   
   SPEW((stderr, "--> %s(%p,%p[%d],0x%x)\n", __FUNCTION__, mqtt, av, ac, 
flags));
   
  -    /* -- Initialize the default values before popt processing. */
  +    static mqttFlags _flags = MQTT_FLAGS_DEFAULT;    /* CLEAN|EOL */
       mqtt->flags = (flags ? flags : _flags);
  -    mqtt->port = 1883;
  -    mqtt->max_inflight = 20;
  -    mqtt->keepalive = 60;
  -    mqtt->qos = 0;
  +
  +    /* -- Initialize oddball values. */
   #ifdef       DYING
       mqtt->protocol_version = rpmExpand("31", NULL);
   #else
       mqtt->protocol_version = rpmExpand("auto", NULL);
       mqtt->will_message = rpmExpand("", NULL);
   #endif
  -    mqtt->host = rpmExpand("localhost", NULL);
  +    mqtt->_tls_version = rpmExpand("1.2", NULL);
       mqtt->idprefix = rpmExpand("rpm", NULL);
       mqtt->_clientid = rpmExpand(mqtt->idprefix, "-%%{pid}", NULL);
  -    mqtt->_tls_version = rpmExpand("1.2", NULL);
   
  -    /* -- Process the options/arguments. */
  +    /* -- Initialize values frpm default URI. */
  +static const char _mqtt_default_uri[] =
  +     
"mqtt://localhost:1883/?qos=0,timeout=10000,max_inflight=20,keepalive=60";
  +    rc = rpmmqttInitURI(mqtt, _mqtt_default_uri);
  +    if (rc)
  +        goto exit;
  +
  +    /* -- Override with values from macros (if any). */
  +    rc = rpmmqttInitMacros(mqtt);
  +    if (rc)
  +        goto exit;
  +
  +    /* -- Process the explicit options/arguments. */
       rc = rpmmqttInitPopt(mqtt, ac, (char *const *)av);
       if (rc)
           goto exit;
   
  +    /* -- Set message input/output modes from flags/options. */
  +    if (MF_ISSET(STDIN))
  +     (void) argvAdd(&mqtt->ifn, "-");
  +
  +    if (mqtt->msgs)
  +     mqtt->msg_input = MQTT_INPUT_OPTION;
  +    else if (mqtt->ifn)
  +     mqtt->msg_input = MF_ISSET(LINES)
  +             ? MQTT_INPUT_FILES_LINES : MQTT_INPUT_FILES;
  +    else
  +     mqtt->msg_input = MQTT_INPUT_UNKNOWN;
  +
  +    if (mqtt->ofn) {
  +     mqtt->msg_output = strcmp(mqtt->ofn, "-")
  +             ? MQTT_OUTPUT_FILE : MQTT_OUTPUT_STDOUT;
  +    } else {
  +     mqtt->msg_output = MQTT_OUTPUT_STDOUT;
  +     mqtt->ofn = xstrdup("-");
  +    }
  +
  +    /* -- XXX Ensure topic/clientid are set. */
  +    if (mqtt->clientid == NULL) {
  +assert(mqtt->_clientid);
  +     mqtt->clientid = rpmExpand(mqtt->_clientid, NULL);
  +fprintf(stderr, "*** %s: clientid %s\n", __FUNCTION__, mqtt->clientid);
  +    }
  +    if (mqtt->topic == NULL) {
  +assert(mqtt->idprefix);
  +     mqtt->topic = rpmExpand(mqtt->idprefix, "/mqtt", NULL);
  +fprintf(stderr, "*** %s: topic %s\n", __FUNCTION__, mqtt->topic);
  +    }
  +     
  +    /* -- XXX Add default argv (if not specified) */
       if (mqtt->ac == 0) {
   static const char *_av[] = {
       "mqtt://luser:jasnl@localhost:1883/rpm/mqtt?trace=4",
  @@ -1129,85 +1370,6 @@
        mqtt->ac = argvCount((ARGV_t)mqtt->av);
       }
   
  -    /* -- Set unspecified options from the URI parameters. */
  -    /* XXX W2DO: which takes precedence, options or URI??? */
  -    mqtt->ut = urlSplit(mqtt->av[0], &u);
  -    mqtt->u = u;
  -
  -    if (u->scheme == NULL
  -     || !strcmp(u->scheme, "mqtt")
  -     || !strcmp(u->scheme, "mqtts"))
  -    {
  -     /* XXX propagate mqtt->port ??? */
  -     if (u->portstr == NULL) {
  -         u->portstr = !strcmp(u->scheme, "mqtts")
  -             ? xstrdup("8883") : xstrdup("1883");
  -     
  -     }
  -     u->scheme = _free(u->scheme);
  -     u->scheme = xstrdup("tcp");
  -    }
  -#ifdef       DYING
  -dumpU(__FUNCTION__, u);
  -#endif
  -
  -    if (mqtt->user == NULL && u->user != NULL)
  -     mqtt->user = xstrdup(u->user);
  -    if (mqtt->password == NULL && u->password != NULL)
  -     mqtt->password = xstrdup(u->password);
  -     /* XXX mqtt->host? */
  -    if (mqtt->port == 0 && u->port != 0)
  -     mqtt->port = u->port;
  -    mqtt->uri = rpmExpand(u->scheme, "://", u->host, ":", u->portstr, NULL);
  -
  -    (void) urlPath(u->url, &s);
  -static const char _mqtt_topic[] =
  -     "%{?_mqtt_topic}%{!?_mqtt_topic:rpm/%{pid}/mqtt}";
  -    if (s && *s == '/')
  -     s++;
  -    if (s == NULL || *s == '\0')
  -     s = _mqtt_topic;
  -    mqtt->topic = rpmExpand(s, NULL);
  -
  -static const char _mqtt_clientid[] =
  -     "%{?_mqtt_clientid}%{!?_mqtt_clientid:rpm}";
  -    mqtt->clientid = rpmExpand(_mqtt_clientid, NULL);
  -
  -    /* XXX CLI options clobbered */
  -static const char _mqtt_qos[] = "%{?_mqtt_qos}%{!?_mqtt_qos:1}";
  -    mqtt->qos = (rpmExpandNumeric(_mqtt_qos) % 3);
  -static const char _mqtt_timeout[] = 
"%{?_mqtt_timeout}%{!?_mqtt_timeout:10000}";
  -    mqtt->timeout = rpmExpandNumeric(_mqtt_timeout);
  -
  -    if (u->query) {
  -     ARGV_t qav = NULL;
  -     int qac;
  -     const char *t;
  -     const char *te;
  -
  -     (void) argvSplit(&qav, u->query, ",");
  -     qac = argvCount(qav);
  -     for (int i = 0; i < qac; i++) {
  -         t = qav[i];
  -         if ((te = strchr(t, '=')) == NULL)
  -             continue;
  -         if (!strncmp(t, "qos", (te - t)) && xisdigit(te[1])) {
  -             mqtt->qos = strtol(te+1, NULL, 0);
  -             mqtt->qos %= 3;
  -             continue;
  -         }
  -         if (!strncmp(t, "timeout", (te - t)) && xisdigit(te[1])) {
  -             mqtt->timeout = strtol(te+1, NULL, 0);
  -             continue;
  -         }
  -         if (!strncmp(t, "trace", (te - t)) && xisdigit(te[1])) {
  -             mqtt->trace = strtol(te+1, NULL, 0);
  -             continue;
  -         }
  -     }
  -     qav = argvFree(qav);
  -    }
  -
       rc = RPMRC_OK;
   
   exit:
  @@ -1228,11 +1390,10 @@
   
   /* ========== */
       mqtt->_address = _free(mqtt->_address);
  -    mqtt->_infile = _free(mqtt->_infile);
       mqtt->host = _free(mqtt->host);
       mqtt->_clientid = _free(mqtt->_clientid);
       mqtt->idprefix = _free(mqtt->idprefix);
  -    mqtt->_message = _free(mqtt->_message);
  +    mqtt->msgs = argvFree((ARGV_t)mqtt->msgs);
       mqtt->password = _free(mqtt->password);
       mqtt->_topics = argvFree((ARGV_t)mqtt->_topics);
       mqtt->user = _free(mqtt->user);
  @@ -1259,11 +1420,19 @@
       mqtt->persist_ctx = _free(mqtt->persist_ctx);
       mqtt->dn = _free(mqtt->dn);
   
  +    mqtt->ifn = argvFree(mqtt->ifn);
  +    if (mqtt->ifd)
  +     (void) Fclose(mqtt->ifd);
  +    mqtt->ifd = NULL;
  +    mqtt->ofn = _free(mqtt->ofn);
  +    if (mqtt->ofd)
  +     (void) Fclose(mqtt->ofd);
  +    mqtt->ofd = NULL;
  +    mqtt->iob = rpmiobFree(mqtt->iob);
  +
       mqtt->serverURI = _free(mqtt->serverURI);
   
  -    if (mqtt->av)
  -     (void) argvFree((ARGV_t)mqtt->av);
  -    mqtt->av = NULL;
  +    mqtt->av = (char **) argvFree((ARGV_t)mqtt->av);
       mqtt->flags = 0;
   }
   
  @@ -1340,6 +1509,26 @@
        }
   persist_path = _free(persist_path);
   
  +     /* Prepare for subscription delivery. */
  +     if (MF_ISSET(BUFFER))
  +         mqtt->iob = rpmiobNew(0);
  +     switch (mqtt->msg_output) {
  +     default:
  +     case MQTT_OUTPUT_UNKNOWN:
  +         break;
  +     case MQTT_OUTPUT_STDOUT:
  +     case MQTT_OUTPUT_FILE:
  +         mqtt->ofd = Fopen(mqtt->ofn, "a");  /* XXX always append? */
  +         if (mqtt->ofd == NULL || Ferror(mqtt->ofd)) {
  +             if (mqtt->ofd)
  +                 (void) Fclose(mqtt->ofd);
  +             mqtt->ofd = NULL;
  +         }
  +         break;
  +     case MQTT_OUTPUT_CALLBACK:
  +         break;
  +     }
  +
   dumpMQTT(__FUNCTION__, mqtt);
   
        xx = check(mqtt, "create",
  @@ -1352,6 +1541,7 @@
                        onMessageArrived,
                        onDeliveryComplete));
   
  +     /* Subscribe to channels (if any). */
        if (mqtt->ac > 1) {
   #ifdef       NOTYET  /* XXX segfault here. */
            xx = rpmmqttSubscribeMany(mqtt, mqtt->ac-1, mqtt->av+1);
  @@ -1360,6 +1550,60 @@
                xx = rpmmqttRead(mqtt, mqtt->av[i], 0);
   #endif
        }
  +
  +     /* Publish any initial messages. */
  +     switch (mqtt->msg_input) {
  +     case MQTT_INPUT_UNKNOWN:
  +     default:
  +         break;
  +     case MQTT_INPUT_OPTION:
  +         if (mqtt->msgs) {
  +             int nmsgs = argvCount((ARGV_t)mqtt->msgs);
  +             for (int i = 0; i < nmsgs; i++) {
  +                 xx = rpmmqttWrite(mqtt, mqtt->msgs[i], 0);
  +             }
  +         }
  +         break;
  +     case MQTT_INPUT_STDIN:
  +     case MQTT_INPUT_FILES:
  +         if (mqtt->ifn) {
  +             int nifn = argvCount(mqtt->ifn);
  +             for (int i = 0; i < nifn; i++) {
  +                 const char * ifn = mqtt->ifn[i];
  +                 rpmiob iob = NULL;
  +                 if (rpmiobSlurp(ifn, &iob) == 0) {
  +                     xx = rpmmqttWrite(mqtt,
  +                             rpmiobStr(iob), rpmiobLen(iob));
  +                 }
  +                 iob = rpmiobFree(iob);
  +             }
  +         }
  +         break;
  +     case MQTT_INPUT_STDIN_LINES:
  +     case MQTT_INPUT_FILES_LINES:
  +         if (mqtt->ifn) {
  +             int nifn = argvCount(mqtt->ifn);
  +             for (int i = 0; i < nifn; i++) {
  +                 const char * ifn = mqtt->ifn[i];
  +                 rpmiob iob = NULL;
  +                 if (rpmiobSlurp(ifn, &iob) == 0) {
  +                     ARGV_t lav = NULL;
  +                     int lac;
  +                     xx = argvSplit(&lav, rpmiobStr(iob), "\n\r");
  +                     lac = argvCount(lav);
  +                     for (int j = 0; j < lac; j++) {
  +                         const char * s = lav[j];
  +                         size_t ns = strlen(s);
  +                         if (ns > 0) /* XXX skip empty lines? */
  +                             xx = rpmmqttWrite(mqtt, s, ns);
  +                     }
  +                     lav = argvFree(lav);
  +                 }
  +                 iob = rpmiobFree(iob);
  +             }
  +         }
  +         break;
  +     }
       }
   #endif       /* WITH_MQTT */
   
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmqtt.h
  ============================================================================
  $ cvs diff -u -r1.1.2.10 -r1.1.2.11 rpmmqtt.h
  --- rpm/rpmio/rpmmqtt.h       2 Jul 2016 14:32:59 -0000       1.1.2.10
  +++ rpm/rpmio/rpmmqtt.h       3 Jul 2016 08:41:57 -0000       1.1.2.11
  @@ -18,19 +18,37 @@
       MQTT_FLAGS_EOL           = _MFB( 1),     /*!< (sub) -N */
       MQTT_FLAGS_NOSTALE               = _MFB( 2),     /*!< (sub) -R */
   
  -    MQTT_FLAGS_STDIN_EACH    = _MFB( 3),     /*!< -l */
  -    MQTT_FLAGS_EMPTY         = _MFB( 4),     /*!< -n */
  -    MQTT_FLAGS_STDIN_ALL     = _MFB( 5),     /*!< -s */
  +    MQTT_FLAGS_LINES         = _MFB( 3),     /*!< -l */
  +    MQTT_FLAGS_STDIN         = _MFB( 4),     /*!< -s */
  +    MQTT_FLAGS_EMPTY         = _MFB( 5),     /*!< -n */
       MQTT_FLAGS_DNSSRV                = _MFB( 6),     /*!< -S */
       MQTT_FLAGS_INSECURE              = _MFB( 7),     /*!< --insecure */
   
       MQTT_FLAGS_RETAIN                = _MFB( 8),     /*!< -r */
       MQTT_FLAGS_WILL_RETAIN   = _MFB( 9),     /*!< --will-retain */
  +    MQTT_FLAGS_BUFFER                = _MFB(10),     /*!< -b */
   
   } mqttFlags;
  +#define      MQTT_FLAGS_DEFAULT      (MQTT_FLAGS_CLEAN|MQTT_FLAGS_EOL)
   #undef       _MFB
   #undef       _KFB
   
  +typedef enum mqttInput_e {
  +    MQTT_INPUT_UNKNOWN               = 0,
  +    MQTT_INPUT_OPTION                = 1,            /*!< -m str */
  +    MQTT_INPUT_FILES_LINES   = 2,            /*!< -l -f fn */
  +    MQTT_INPUT_FILES         = 3,            /*!< -f fn */
  +    MQTT_INPUT_STDIN_LINES   = 4,            /*!< -l -f '-' */
  +    MQTT_INPUT_STDIN         = 5,            /*!< -s */
  +} mqttInput;
  +
  +typedef enum mqttOutput_e {
  +    MQTT_OUTPUT_UNKNOWN              = 0,
  +    MQTT_OUTPUT_STDOUT               = (1 << 0),
  +    MQTT_OUTPUT_FILE         = (1 << 1),
  +    MQTT_OUTPUT_CALLBACK     = (1 << 2),
  +} mqttOutput;
  +
   #if defined(_RPMMQTT_INTERNAL)
   struct rpmmqtt_s {
       struct rpmioItem_s _item;        /*!< usage mutex and pool identifier. */
  @@ -38,13 +56,14 @@
       int ac;
   
       mqttFlags flags;
  +    mqttInput msg_input;
  +    mqttOutput msg_output;
   /* ========== */
       const char *_address;                    /*!< -A */
  -    const char *_infile;                     /*!< -f */
       const char *host;                                /*!< -h */
       const char *idprefix;                    /*!< -I */
       int keepalive;                           /*!< -k */
  -    const char *_message;                    /*!< -m */
  +    const char **msgs;                               /*!< -m */
       int max_inflight;                                /*!< -M */
       int port;                                        /*!< -p */
       const char *password;                    /*!< -P */
  @@ -55,19 +74,20 @@
       const char * will_message;                       /*!< --will-payload */
       int will_qos;                            /*!< --will-qos */
       const char * will_topic;                 /*!< --will-topic */
  +
       const char *cafile;                              /*!< --cafile */
       const char *_capath;                     /*!< --capath */
       const char *cert;                                /*!< --cert */
       const char *privkey;                     /*!< --key */
       const char *ciphers;                     /*!< --ciphers */
  +
       const char *_tls_version;                        /*!< --tls-version */
       const char *_psk_key;                    /*!< --psk */
       const char *_psk_identity;                       /*!< --psk-identity */
       const char *_proxy;                              /*!< --proxy */
  -
   /* ========== */
   
  -    urltype ut;
  +    urltype ut;                      /* "tcp://localhost:1833" */
       urlinfo u;
   
       void * C;                        /* MQTTClient */
  @@ -98,6 +118,12 @@
   
       const char *dn;
   
  +    FD_t ifd;
  +    const char **ifn;                                /*!< -f */
  +    FD_t ofd;
  +    const char *ofn;                         /*!< -o */
  +    rpmiob iob;
  +
       MQTTAsync_connectOptions Copts;
       MQTTAsync_willOptions Wopts;
       MQTTAsync_SSLOptions Sopts;
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/tmqtt.c
  ============================================================================
  $ cvs diff -u -r1.1.2.7 -r1.1.2.8 tmqtt.c
  --- rpm/rpmio/tmqtt.c 2 Jul 2016 14:32:59 -0000       1.1.2.7
  +++ rpm/rpmio/tmqtt.c 3 Jul 2016 08:41:57 -0000       1.1.2.8
  @@ -89,6 +89,10 @@
   
       mqtt = rpmmqttNew(argv, 0);
       rc = _DoMQTT(mqtt);
  +    if (mqtt->iob && rpmiobLen(mqtt->iob) > 0) {
  +     fprintf(stderr, "========== input msgs\n");
  +     fprintf(stderr, "%s", rpmiobStr(mqtt->iob));
  +    }
       mqtt = rpmmqttFree(mqtt);
   
       return rc;
  @@ .
______________________________________________________________________
RPM Package Manager                                    http://rpm5.org
CVS Sources Repository                                rpm-cvs@rpm5.org

Reply via email to