RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: j...@rpm5.org Module: rpm Date: 03-Jul-2016 10:41:57 Branch: rpm-5_4 Handle: 2016070308415700 Modified files: (Branch: rpm-5_4) rpm/rpmio rpmmqtt.c rpmmqtt.h tmqtt.c Log: - mqtt: attach input/output modes. Summary: Revision Changes Path 1.1.2.12 +351 -107 rpm/rpmio/rpmmqtt.c 1.1.2.11 +33 -7 rpm/rpmio/rpmmqtt.h 1.1.2.8 +4 -0 rpm/rpmio/tmqtt.c ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.c ============================================================================ $ cvs diff -u -r1.1.2.11 -r1.1.2.12 rpmmqtt.c --- rpm/rpmio/rpmmqtt.c 2 Jul 2016 14:32:59 -0000 1.1.2.11 +++ rpm/rpmio/rpmmqtt.c 3 Jul 2016 08:41:57 -0000 1.1.2.12 @@ -7,6 +7,8 @@ #include <stdlib.h> #include <string.h> +#define _RPMIOB_INTERNAL +#include <rpmiotypes.h> /* for rpmiobSlurp */ #include <rpmio.h> /* for *Pool methods */ #include <rpmlog.h> #include <rpmcb.h> @@ -74,17 +76,21 @@ msg, mqtt); if (mqtt) { - argvPrint("mqtt->av", (ARGV_t)mqtt->av, fp); #define PRINT(_fmt, _foo) fprintf(fp, "%19s: %"#_fmt"\n", #_foo, mqtt->_foo) + PRINT(x, flags); + PRINT(d, msg_input); + PRINT(d, msg_output); + argvPrint("mqtt->av", (ARGV_t)mqtt->av, fp); + PRINT(s, _address); - PRINT(s, _infile); - PRINT(s, _message); + argvPrint("mqtt->msgs", (ARGV_t)mqtt->msgs, fp); PRINT(s, cafile); PRINT(s, _capath); PRINT(s, cert); PRINT(s, privkey); PRINT(s, ciphers); + PRINT(s, _tls_version); PRINT(s, _psk_key); PRINT(s, _psk_identity); @@ -94,6 +100,7 @@ PRINT(d, keepalive); PRINT(d, max_inflight); PRINT(s, protocol_version); + PRINT(s, will_message); PRINT(d, will_qos); PRINT(s, will_topic); @@ -136,6 +143,14 @@ PRINT(d, sessionPresent); PRINT(s, dn); + + if (mqtt->ifn) + argvPrint("mqtt->ifn", (ARGV_t)mqtt->ifn, fp); + PRINT(p, ifd); + PRINT(s, ofn); + PRINT(p, ofd); + PRINT(p, iob); + #undef PRINT } } @@ -215,10 +230,31 @@ if (_rpmmqtt_debug < 0) rpmlog(RPMLOG_DEBUG, "MQTT rcvd topic(%s) \"%.*s\"\n", topic, ns, s); - printf("%.*s:\t", topicLen, topic); - for (size_t i = 0; i < ns; i++) - putchar(s[i]); - putchar('\n'); + if (mqtt->iob) { + mqtt->iob = rpmiobAppend(mqtt->iob, topic, 0); + mqtt->iob = rpmiobAppend(mqtt->iob, ":\t", 0); + { char * t = memcpy(alloca(ns+1), s, ns); + t[ns] = '\0'; + mqtt->iob = rpmiobAppend(mqtt->iob, t, 1); + } + } + switch (mqtt->msg_output) { + case MQTT_OUTPUT_UNKNOWN: + case MQTT_OUTPUT_CALLBACK: + default: + break; + case MQTT_OUTPUT_STDOUT: + case MQTT_OUTPUT_FILE: + if (mqtt->ofd) { + size_t nw; + nw = Fwrite(topic, sizeof(*topic), topicLen, mqtt->ofd); + nw = Fwrite(":\t", 1, sizeof(":\t")-1, mqtt->ofd); + nw = Fwrite(s, sizeof(*s), ns, mqtt->ofd); + nw = Fwrite("\n", 1, sizeof("\n")-1, mqtt->ofd); + (void)nw; + } + break; + } MQTTAsync_freeMessage(&message); MQTTAsync_free(topic); @@ -976,9 +1012,11 @@ { NULL, 'A', POPT_ARG_STRING, &mqtt->_address, 0, N_("Connect from local <ADDR>."), N_("<ADDR>") }, + { "buffer", 'b', POPT_BIT_SET, &mqtt->flags, MQTT_FLAGS_BUFFER, + N_("Collect input messages in buffer."), NULL }, { "debug", 'd', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &_rpmmqtt_debug, -1, N_("Debug spewage."), NULL }, - { "file", 'f', POPT_ARG_STRING, &mqtt->_infile, 0, + { "file", 'f', POPT_ARG_ARGV, &mqtt->ifn, 0, N_("Send <FILE> as message."), N_("<FILE>") }, { "host", 'h', POPT_ARG_STRING|POPT_ARGFLAG_SHOW_DEFAULT,&mqtt->host, 0, N_("Connect to <HOST>."), N_("<HOST>") }, @@ -988,14 +1026,16 @@ N_("Use <IDPREFIX>-%{pid} for clientid."), N_("<IDPREFIX>") }, { "keepalive", 'k', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, &mqtt->keepalive, 0, N_("Keep alive in <SECS>."), N_("<SECS>") }, - { NULL, 'l', POPT_BIT_SET, &mqtt->flags, MQTT_FLAGS_STDIN_EACH, - N_("Send messages line-by-line from stdin."), NULL }, - { "message", 'm', POPT_ARG_STRING, &mqtt->_message, 0, + { "lines", 'l', POPT_BIT_SET, &mqtt->flags, MQTT_FLAGS_LINES, + N_("Send messages line-by-line."), NULL }, + { "message", 'm', POPT_ARG_ARGV, &mqtt->msgs, 0, N_("MQTT payload <MESSAGE> to send."), N_("<MESSAGE>") }, { NULL, 'M', POPT_ARG_INT, &mqtt->max_inflight, 0, N_("Permit <MAX> inflight messages."), N_("<MAX>") }, { "null", 'n', POPT_BIT_SET, &mqtt->flags, MQTT_FLAGS_EMPTY, N_("Send a null (zero length) message."), NULL }, + { "output", 'o', POPT_ARG_STRING, &mqtt->ofn, 0, + N_("Write input mssages to <FILE>."), N_("<FILE>") }, { "port", 'p', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, &mqtt->port, 0, N_("Connect to network <PORT>."), N_("<PORT>") }, { "pass", 'P', POPT_ARG_STRING, &mqtt->password, 0, @@ -1004,7 +1044,7 @@ N_("MQTT <QOS> level."), N_("<QOS>") }, { "retain", 'r', POPT_BIT_SET, &mqtt->flags, MQTT_FLAGS_RETAIN, N_("Retain the message on the host."), NULL }, - { NULL, 's', POPT_BIT_SET, &mqtt->flags, MQTT_FLAGS_STDIN_ALL, + { NULL, 's', POPT_BIT_SET, &mqtt->flags, MQTT_FLAGS_STDIN, N_("Send stdin lines as a single message."), NULL }, { NULL, 'S', POPT_BIT_SET, &mqtt->flags, MQTT_FLAGS_DNSSRV, N_("Use SRV record to find remote host."), NULL }, @@ -1087,38 +1127,239 @@ return rc; } +static rpmRC rpmmqttInitMacros(rpmmqtt mqtt) +{ + rpmRC rc = RPMRC_OK; + + /* XXX MQTT_FLAGS_CLEAN */ + /* XXX MQTT_FLAGS_EOL */ + /* XXX MQTT_FLAGS_NOSTALE */ + /* XXX MQTT_FLAGS_LINES */ + /* XXX MQTT_FLAGS_STDIN */ + /* XXX MQTT_FLAGS_EMPTY */ + /* XXX MQTT_FLAGS_DNSSRV */ + /* XXX MQTT_FLAGS_INSECURE */ + /* XXX MQTT_FLAGS_RETAIN */ + /* XXX MQTT_FLAGS_WILL_RETAIN */ +static const char _mqtt_qos[] = "%{?_mqtt_qos}%{!?_mqtt_qos:2}"; + mqtt->qos = (rpmExpandNumeric(_mqtt_qos) % 3); +static const char _mqtt_timeout[] = "%{?_mqtt_timeout}%{!?_mqtt_timeout:10000}"; + mqtt->timeout = rpmExpandNumeric(_mqtt_timeout); +static const char _mqtt_max_inflight[] = "%{?_mqtt_max_inflight}%{!?_mqtt_max_inflight:20}"; + mqtt->max_inflight = rpmExpandNumeric(_mqtt_max_inflight); +static const char _mqtt_keepalive[] = "%{?_mqtt_keepalive}%{!?_mqtt_keepalive:60}"; + mqtt->keepalive = rpmExpandNumeric(_mqtt_keepalive); +static const char _mqtt_trace[] = "%{?_mqtt_trace}%{!?_mqtt_trace:0}"; + mqtt->trace = rpmExpandNumeric(_mqtt_trace); + +#ifdef NOTYET +static const char _mqtt_topic[] = + "%{?_mqtt_topic}%{!?_mqtt_topic:rpm/%{pid}/mqtt}"; +mqtt->topic = _free(mqtt->topic); + mqtt->topic = rpmExpand(_mqtt_topic, NULL); +static const char _mqtt_clientid[] = + "%{?_mqtt_clientid}%{!?_mqtt_clientid:rpm}"; +mqtt->clientid = _free(mqtt->clientid); + mqtt->clientid = rpmExpand(_mqtt_clientid, NULL); +#endif + + return rc; +} + +static rpmRC rpmmqttInitURIQuery(rpmmqtt mqtt, const char *query) +{ + ARGV_t av = NULL; + int ac; + const char *t; + const char *te; + rpmRC rc = RPMRC_OK; + + (void) argvSplit(&av, query, ","); + ac = argvCount(av); + for (int i = 0; i < ac; i++) { + t = av[i]; + if ((te = strchr(t, '=')) == NULL) + te += strlen(t); + if (!strncmp(t, "clean", (te - t))) { + mqtt->flags |= MQTT_FLAGS_CLEAN; + continue; + } + /* XXX MQTT_FLAGS_EOL */ + /* XXX MQTT_FLAGS_NOSTALE */ + /* XXX MQTT_FLAGS_LINES */ + /* XXX MQTT_FLAGS_STDIN */ + /* XXX MQTT_FLAGS_EMPTY */ + /* XXX MQTT_FLAGS_DNSSRV */ + /* XXX MQTT_FLAGS_INSECURE */ + if (!strncmp(t, "retain", (te - t))) { + mqtt->flags |= MQTT_FLAGS_RETAIN; + continue; + } + /* XXX MQTT_FLAGS_WILL_RETAIN */ + if (!strncmp(t, "qos", (te - t))) { + mqtt->qos = (te[0] == '=' && xisdigit(te[1])) + ? strtoul(te+1, NULL, 0) + : 2; /* XXX default qos=2? */ + mqtt->qos %= 3; + continue; + } + if (!strncmp(t, "timeout", (te - t))) { + mqtt->timeout = (te[0] == '=' && xisdigit(te[1])) + ? strtoul(te+1, NULL, 0) + : 10000; /* XXX default timeout=10000? */ + continue; + } + if (!strncmp(t, "max_inflight", (te - t))) { + mqtt->max_inflight = (te[0] == '=' && xisdigit(te[1])) + ? strtoul(te+1, NULL, 0) + : 20; /* XXX default max_inflight=20? */ + continue; + } + if (!strncmp(t, "keepalive", (te - t))) { + mqtt->max_inflight = (te[0] == '=' && xisdigit(te[1])) + ? strtoul(te+1, NULL, 0) + : 60; /* XXX default keepalive=60? */ + continue; + } + /* XXX set global mqtt->trace from URI query string? */ + if (!strncmp(t, "trace", (te - t))) { + mqtt->trace = (te[0] == '=' && xisdigit(te[1])) + ? strtoul(te+1, NULL, 0) + : 4; /* XXX default trace=4? */ + continue; + } + } + av = argvFree(av); + + return rc; +} + +static rpmRC rpmmqttInitURI(rpmmqtt mqtt, const char *url) +{ + urlinfo u = NULL; + rpmRC rc = RPMRC_OK; + + /* -- Set unspecified MQTT options from the URI parameters. */ + mqtt->ut = urlSplit(url, &u); + mqtt->u = u; + + if (u->scheme == NULL + || !strcmp(u->scheme, "mqtt") + || !strcmp(u->scheme, "mqtts")) + { + if (u->portstr == NULL) { + u->portstr = !strcmp(u->scheme, "mqtts") + ? xstrdup("8883") : xstrdup("1883"); + + } + } + + if (u->user != NULL) { +mqtt->user = _free(mqtt->user); + mqtt->user = xstrdup(u->user); + } + if (u->password != NULL) { +mqtt->password = _free(mqtt->password); + mqtt->password = xstrdup(u->password); + } + if (u->host != NULL) { +mqtt->host = _free(mqtt->host); + mqtt->host = xstrdup(u->host); + } + if (u->portstr != 0) { + mqtt->port = strtoul(u->portstr, NULL, 0); + } + + mqtt->uri = rpmExpand("tcp://", u->host, ":", u->portstr, NULL); + + { const char * topic; + (void) urlPath(u->url, &topic); + if (topic && *topic == '/') + topic++; + if (topic && *topic) { +mqtt->topic = _free(mqtt->topic); + mqtt->topic = rpmExpand(topic, NULL); + } + } + + if (u->query) { + rc = rpmmqttInitURIQuery(mqtt, u->query); + } + + return rc; +} + static rpmRC rpmmqttInit(rpmmqtt mqtt, int ac, const char ** av, mqttFlags flags) { - static mqttFlags _flags = (MQTT_FLAGS_CLEAN|MQTT_FLAGS_EOL); - urlinfo u = NULL; - const char *s = NULL; rpmRC rc; SPEW((stderr, "--> %s(%p,%p[%d],0x%x)\n", __FUNCTION__, mqtt, av, ac, flags)); - /* -- Initialize the default values before popt processing. */ + static mqttFlags _flags = MQTT_FLAGS_DEFAULT; /* CLEAN|EOL */ mqtt->flags = (flags ? flags : _flags); - mqtt->port = 1883; - mqtt->max_inflight = 20; - mqtt->keepalive = 60; - mqtt->qos = 0; + + /* -- Initialize oddball values. */ #ifdef DYING mqtt->protocol_version = rpmExpand("31", NULL); #else mqtt->protocol_version = rpmExpand("auto", NULL); mqtt->will_message = rpmExpand("", NULL); #endif - mqtt->host = rpmExpand("localhost", NULL); + mqtt->_tls_version = rpmExpand("1.2", NULL); mqtt->idprefix = rpmExpand("rpm", NULL); mqtt->_clientid = rpmExpand(mqtt->idprefix, "-%%{pid}", NULL); - mqtt->_tls_version = rpmExpand("1.2", NULL); - /* -- Process the options/arguments. */ + /* -- Initialize values frpm default URI. */ +static const char _mqtt_default_uri[] = + "mqtt://localhost:1883/?qos=0,timeout=10000,max_inflight=20,keepalive=60"; + rc = rpmmqttInitURI(mqtt, _mqtt_default_uri); + if (rc) + goto exit; + + /* -- Override with values from macros (if any). */ + rc = rpmmqttInitMacros(mqtt); + if (rc) + goto exit; + + /* -- Process the explicit options/arguments. */ rc = rpmmqttInitPopt(mqtt, ac, (char *const *)av); if (rc) goto exit; + /* -- Set message input/output modes from flags/options. */ + if (MF_ISSET(STDIN)) + (void) argvAdd(&mqtt->ifn, "-"); + + if (mqtt->msgs) + mqtt->msg_input = MQTT_INPUT_OPTION; + else if (mqtt->ifn) + mqtt->msg_input = MF_ISSET(LINES) + ? MQTT_INPUT_FILES_LINES : MQTT_INPUT_FILES; + else + mqtt->msg_input = MQTT_INPUT_UNKNOWN; + + if (mqtt->ofn) { + mqtt->msg_output = strcmp(mqtt->ofn, "-") + ? MQTT_OUTPUT_FILE : MQTT_OUTPUT_STDOUT; + } else { + mqtt->msg_output = MQTT_OUTPUT_STDOUT; + mqtt->ofn = xstrdup("-"); + } + + /* -- XXX Ensure topic/clientid are set. */ + if (mqtt->clientid == NULL) { +assert(mqtt->_clientid); + mqtt->clientid = rpmExpand(mqtt->_clientid, NULL); +fprintf(stderr, "*** %s: clientid %s\n", __FUNCTION__, mqtt->clientid); + } + if (mqtt->topic == NULL) { +assert(mqtt->idprefix); + mqtt->topic = rpmExpand(mqtt->idprefix, "/mqtt", NULL); +fprintf(stderr, "*** %s: topic %s\n", __FUNCTION__, mqtt->topic); + } + + /* -- XXX Add default argv (if not specified) */ if (mqtt->ac == 0) { static const char *_av[] = { "mqtt://luser:jasnl@localhost:1883/rpm/mqtt?trace=4", @@ -1129,85 +1370,6 @@ mqtt->ac = argvCount((ARGV_t)mqtt->av); } - /* -- Set unspecified options from the URI parameters. */ - /* XXX W2DO: which takes precedence, options or URI??? */ - mqtt->ut = urlSplit(mqtt->av[0], &u); - mqtt->u = u; - - if (u->scheme == NULL - || !strcmp(u->scheme, "mqtt") - || !strcmp(u->scheme, "mqtts")) - { - /* XXX propagate mqtt->port ??? */ - if (u->portstr == NULL) { - u->portstr = !strcmp(u->scheme, "mqtts") - ? xstrdup("8883") : xstrdup("1883"); - - } - u->scheme = _free(u->scheme); - u->scheme = xstrdup("tcp"); - } -#ifdef DYING -dumpU(__FUNCTION__, u); -#endif - - if (mqtt->user == NULL && u->user != NULL) - mqtt->user = xstrdup(u->user); - if (mqtt->password == NULL && u->password != NULL) - mqtt->password = xstrdup(u->password); - /* XXX mqtt->host? */ - if (mqtt->port == 0 && u->port != 0) - mqtt->port = u->port; - mqtt->uri = rpmExpand(u->scheme, "://", u->host, ":", u->portstr, NULL); - - (void) urlPath(u->url, &s); -static const char _mqtt_topic[] = - "%{?_mqtt_topic}%{!?_mqtt_topic:rpm/%{pid}/mqtt}"; - if (s && *s == '/') - s++; - if (s == NULL || *s == '\0') - s = _mqtt_topic; - mqtt->topic = rpmExpand(s, NULL); - -static const char _mqtt_clientid[] = - "%{?_mqtt_clientid}%{!?_mqtt_clientid:rpm}"; - mqtt->clientid = rpmExpand(_mqtt_clientid, NULL); - - /* XXX CLI options clobbered */ -static const char _mqtt_qos[] = "%{?_mqtt_qos}%{!?_mqtt_qos:1}"; - mqtt->qos = (rpmExpandNumeric(_mqtt_qos) % 3); -static const char _mqtt_timeout[] = "%{?_mqtt_timeout}%{!?_mqtt_timeout:10000}"; - mqtt->timeout = rpmExpandNumeric(_mqtt_timeout); - - if (u->query) { - ARGV_t qav = NULL; - int qac; - const char *t; - const char *te; - - (void) argvSplit(&qav, u->query, ","); - qac = argvCount(qav); - for (int i = 0; i < qac; i++) { - t = qav[i]; - if ((te = strchr(t, '=')) == NULL) - continue; - if (!strncmp(t, "qos", (te - t)) && xisdigit(te[1])) { - mqtt->qos = strtol(te+1, NULL, 0); - mqtt->qos %= 3; - continue; - } - if (!strncmp(t, "timeout", (te - t)) && xisdigit(te[1])) { - mqtt->timeout = strtol(te+1, NULL, 0); - continue; - } - if (!strncmp(t, "trace", (te - t)) && xisdigit(te[1])) { - mqtt->trace = strtol(te+1, NULL, 0); - continue; - } - } - qav = argvFree(qav); - } - rc = RPMRC_OK; exit: @@ -1228,11 +1390,10 @@ /* ========== */ mqtt->_address = _free(mqtt->_address); - mqtt->_infile = _free(mqtt->_infile); mqtt->host = _free(mqtt->host); mqtt->_clientid = _free(mqtt->_clientid); mqtt->idprefix = _free(mqtt->idprefix); - mqtt->_message = _free(mqtt->_message); + mqtt->msgs = argvFree((ARGV_t)mqtt->msgs); mqtt->password = _free(mqtt->password); mqtt->_topics = argvFree((ARGV_t)mqtt->_topics); mqtt->user = _free(mqtt->user); @@ -1259,11 +1420,19 @@ mqtt->persist_ctx = _free(mqtt->persist_ctx); mqtt->dn = _free(mqtt->dn); + mqtt->ifn = argvFree(mqtt->ifn); + if (mqtt->ifd) + (void) Fclose(mqtt->ifd); + mqtt->ifd = NULL; + mqtt->ofn = _free(mqtt->ofn); + if (mqtt->ofd) + (void) Fclose(mqtt->ofd); + mqtt->ofd = NULL; + mqtt->iob = rpmiobFree(mqtt->iob); + mqtt->serverURI = _free(mqtt->serverURI); - if (mqtt->av) - (void) argvFree((ARGV_t)mqtt->av); - mqtt->av = NULL; + mqtt->av = (char **) argvFree((ARGV_t)mqtt->av); mqtt->flags = 0; } @@ -1340,6 +1509,26 @@ } persist_path = _free(persist_path); + /* Prepare for subscription delivery. */ + if (MF_ISSET(BUFFER)) + mqtt->iob = rpmiobNew(0); + switch (mqtt->msg_output) { + default: + case MQTT_OUTPUT_UNKNOWN: + break; + case MQTT_OUTPUT_STDOUT: + case MQTT_OUTPUT_FILE: + mqtt->ofd = Fopen(mqtt->ofn, "a"); /* XXX always append? */ + if (mqtt->ofd == NULL || Ferror(mqtt->ofd)) { + if (mqtt->ofd) + (void) Fclose(mqtt->ofd); + mqtt->ofd = NULL; + } + break; + case MQTT_OUTPUT_CALLBACK: + break; + } + dumpMQTT(__FUNCTION__, mqtt); xx = check(mqtt, "create", @@ -1352,6 +1541,7 @@ onMessageArrived, onDeliveryComplete)); + /* Subscribe to channels (if any). */ if (mqtt->ac > 1) { #ifdef NOTYET /* XXX segfault here. */ xx = rpmmqttSubscribeMany(mqtt, mqtt->ac-1, mqtt->av+1); @@ -1360,6 +1550,60 @@ xx = rpmmqttRead(mqtt, mqtt->av[i], 0); #endif } + + /* Publish any initial messages. */ + switch (mqtt->msg_input) { + case MQTT_INPUT_UNKNOWN: + default: + break; + case MQTT_INPUT_OPTION: + if (mqtt->msgs) { + int nmsgs = argvCount((ARGV_t)mqtt->msgs); + for (int i = 0; i < nmsgs; i++) { + xx = rpmmqttWrite(mqtt, mqtt->msgs[i], 0); + } + } + break; + case MQTT_INPUT_STDIN: + case MQTT_INPUT_FILES: + if (mqtt->ifn) { + int nifn = argvCount(mqtt->ifn); + for (int i = 0; i < nifn; i++) { + const char * ifn = mqtt->ifn[i]; + rpmiob iob = NULL; + if (rpmiobSlurp(ifn, &iob) == 0) { + xx = rpmmqttWrite(mqtt, + rpmiobStr(iob), rpmiobLen(iob)); + } + iob = rpmiobFree(iob); + } + } + break; + case MQTT_INPUT_STDIN_LINES: + case MQTT_INPUT_FILES_LINES: + if (mqtt->ifn) { + int nifn = argvCount(mqtt->ifn); + for (int i = 0; i < nifn; i++) { + const char * ifn = mqtt->ifn[i]; + rpmiob iob = NULL; + if (rpmiobSlurp(ifn, &iob) == 0) { + ARGV_t lav = NULL; + int lac; + xx = argvSplit(&lav, rpmiobStr(iob), "\n\r"); + lac = argvCount(lav); + for (int j = 0; j < lac; j++) { + const char * s = lav[j]; + size_t ns = strlen(s); + if (ns > 0) /* XXX skip empty lines? */ + xx = rpmmqttWrite(mqtt, s, ns); + } + lav = argvFree(lav); + } + iob = rpmiobFree(iob); + } + } + break; + } } #endif /* WITH_MQTT */ @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.h ============================================================================ $ cvs diff -u -r1.1.2.10 -r1.1.2.11 rpmmqtt.h --- rpm/rpmio/rpmmqtt.h 2 Jul 2016 14:32:59 -0000 1.1.2.10 +++ rpm/rpmio/rpmmqtt.h 3 Jul 2016 08:41:57 -0000 1.1.2.11 @@ -18,19 +18,37 @@ MQTT_FLAGS_EOL = _MFB( 1), /*!< (sub) -N */ MQTT_FLAGS_NOSTALE = _MFB( 2), /*!< (sub) -R */ - MQTT_FLAGS_STDIN_EACH = _MFB( 3), /*!< -l */ - MQTT_FLAGS_EMPTY = _MFB( 4), /*!< -n */ - MQTT_FLAGS_STDIN_ALL = _MFB( 5), /*!< -s */ + MQTT_FLAGS_LINES = _MFB( 3), /*!< -l */ + MQTT_FLAGS_STDIN = _MFB( 4), /*!< -s */ + MQTT_FLAGS_EMPTY = _MFB( 5), /*!< -n */ MQTT_FLAGS_DNSSRV = _MFB( 6), /*!< -S */ MQTT_FLAGS_INSECURE = _MFB( 7), /*!< --insecure */ MQTT_FLAGS_RETAIN = _MFB( 8), /*!< -r */ MQTT_FLAGS_WILL_RETAIN = _MFB( 9), /*!< --will-retain */ + MQTT_FLAGS_BUFFER = _MFB(10), /*!< -b */ } mqttFlags; +#define MQTT_FLAGS_DEFAULT (MQTT_FLAGS_CLEAN|MQTT_FLAGS_EOL) #undef _MFB #undef _KFB +typedef enum mqttInput_e { + MQTT_INPUT_UNKNOWN = 0, + MQTT_INPUT_OPTION = 1, /*!< -m str */ + MQTT_INPUT_FILES_LINES = 2, /*!< -l -f fn */ + MQTT_INPUT_FILES = 3, /*!< -f fn */ + MQTT_INPUT_STDIN_LINES = 4, /*!< -l -f '-' */ + MQTT_INPUT_STDIN = 5, /*!< -s */ +} mqttInput; + +typedef enum mqttOutput_e { + MQTT_OUTPUT_UNKNOWN = 0, + MQTT_OUTPUT_STDOUT = (1 << 0), + MQTT_OUTPUT_FILE = (1 << 1), + MQTT_OUTPUT_CALLBACK = (1 << 2), +} mqttOutput; + #if defined(_RPMMQTT_INTERNAL) struct rpmmqtt_s { struct rpmioItem_s _item; /*!< usage mutex and pool identifier. */ @@ -38,13 +56,14 @@ int ac; mqttFlags flags; + mqttInput msg_input; + mqttOutput msg_output; /* ========== */ const char *_address; /*!< -A */ - const char *_infile; /*!< -f */ const char *host; /*!< -h */ const char *idprefix; /*!< -I */ int keepalive; /*!< -k */ - const char *_message; /*!< -m */ + const char **msgs; /*!< -m */ int max_inflight; /*!< -M */ int port; /*!< -p */ const char *password; /*!< -P */ @@ -55,19 +74,20 @@ const char * will_message; /*!< --will-payload */ int will_qos; /*!< --will-qos */ const char * will_topic; /*!< --will-topic */ + const char *cafile; /*!< --cafile */ const char *_capath; /*!< --capath */ const char *cert; /*!< --cert */ const char *privkey; /*!< --key */ const char *ciphers; /*!< --ciphers */ + const char *_tls_version; /*!< --tls-version */ const char *_psk_key; /*!< --psk */ const char *_psk_identity; /*!< --psk-identity */ const char *_proxy; /*!< --proxy */ - /* ========== */ - urltype ut; + urltype ut; /* "tcp://localhost:1833" */ urlinfo u; void * C; /* MQTTClient */ @@ -98,6 +118,12 @@ const char *dn; + FD_t ifd; + const char **ifn; /*!< -f */ + FD_t ofd; + const char *ofn; /*!< -o */ + rpmiob iob; + MQTTAsync_connectOptions Copts; MQTTAsync_willOptions Wopts; MQTTAsync_SSLOptions Sopts; @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/tmqtt.c ============================================================================ $ cvs diff -u -r1.1.2.7 -r1.1.2.8 tmqtt.c --- rpm/rpmio/tmqtt.c 2 Jul 2016 14:32:59 -0000 1.1.2.7 +++ rpm/rpmio/tmqtt.c 3 Jul 2016 08:41:57 -0000 1.1.2.8 @@ -89,6 +89,10 @@ mqtt = rpmmqttNew(argv, 0); rc = _DoMQTT(mqtt); + if (mqtt->iob && rpmiobLen(mqtt->iob) > 0) { + fprintf(stderr, "========== input msgs\n"); + fprintf(stderr, "%s", rpmiobStr(mqtt->iob)); + } mqtt = rpmmqttFree(mqtt); return rc; @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository rpm-cvs@rpm5.org