RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: j...@rpm5.org Module: rpm Date: 28-Jun-2016 09:20:22 Branch: rpm-5_4 Handle: 2016062807202200 Modified files: (Branch: rpm-5_4) rpm/rpmio rpmmqtt.c rpmmqtt.h rpmurl.h tmqtt.c url.c Log: - mqtt: lily gilding before integrating with rpmio. Summary: Revision Changes Path 1.1.2.4 +439 -18 rpm/rpmio/rpmmqtt.c 1.1.2.4 +11 -0 rpm/rpmio/rpmmqtt.h 1.41.4.9 +1 -0 rpm/rpmio/rpmurl.h 1.1.2.2 +3 -1 rpm/rpmio/tmqtt.c 1.73.4.15 +8 -0 rpm/rpmio/url.c ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.c ============================================================================ $ cvs diff -u -r1.1.2.3 -r1.1.2.4 rpmmqtt.c --- rpm/rpmio/rpmmqtt.c 27 Jun 2016 22:00:14 -0000 1.1.2.3 +++ rpm/rpmio/rpmmqtt.c 28 Jun 2016 07:20:22 -0000 1.1.2.4 @@ -9,6 +9,9 @@ #include <rpmio.h> /* for *Pool methods */ #include <rpmlog.h> +#include <rpmdir.h> +#include <rpmmacro.h> +#include <rpmurl.h> #include <argv.h> #define _RPMMQTT_INTERNAL @@ -16,19 +19,61 @@ #include "debug.h" -int _rpmmqtt_debug = -1; - -static char _test_mqtt[] = "test/mqtt"; +int _rpmmqtt_debug = 1; /*==============================================================*/ +typedef struct key_s { + int v; + const char *n; +} KEY; + +#define _ENTRY(_v) { MQTTCLIENT_##_v, #_v, } +static KEY rpmmqtt_errs[] = { +#ifdef WITH_MQTT + _ENTRY(SUCCESS), + _ENTRY(FAILURE), + _ENTRY(PERSISTENCE_ERROR), + _ENTRY(DISCONNECTED), + _ENTRY(MAX_MESSAGES_INFLIGHT), + _ENTRY(BAD_UTF8_STRING), + _ENTRY(NULL_PARAMETER), + _ENTRY(TOPICNAME_TRUNCATED), + _ENTRY(BAD_STRUCTURE), + _ENTRY(BAD_QOS), +#else + { 0, NULL }, +#endif +}; +static size_t rpmmqtt_nerrs = sizeof(rpmmqtt_errs) / sizeof(rpmmqtt_errs[0]); + +static const char * rpmmqttStrerror(int v) +{ + KEY * tbl = rpmmqtt_errs; + size_t ntbl = rpmmqtt_nerrs; + const char * n = NULL; + static char buf[64]; + + for (size_t i = 0; i < ntbl; i++) { + if (v != tbl[i].v) + continue; + n = tbl[i].n; + break; + } + if (n == NULL) { + (void) snprintf(buf, sizeof(buf), "0x%x", v); + n = buf; + } + return n; +} + static int Xcheck(rpmmqtt mqtt, const char * msg, int rc, int printit, const char * func, const char * fn, unsigned ln) { if (rc != 0) { /* MQTTCLIENT_SUCCESS */ int _lvl = RPMLOG_WARNING; - rpmlog(_lvl, "%s:%s:%u: MQTTClient_%s(%d)\n", - func, fn, ln, msg, rc); + rpmlog(_lvl, "%s:%s:%u: MQTTClient_%s: %s(%d)\n", + func, fn, ln, msg, rpmmqttStrerror(rc), rc); } return rc; } @@ -49,7 +94,7 @@ const char * s = message->payload; size_t ns = message->payloadlen; if (_rpmmqtt_debug < 0) - rpmlog(RPMLOG_DEBUG, "+++ MQTT rcvd topic(%s) \"%*s\"\n", topic, ns, s); + rpmlog(RPMLOG_DEBUG, "+++ MQTT rcvd topic(%s) \"%.*s\"\n", topic, ns, s); MQTTClient_freeMessage(&message); MQTTClient_free(topic); #endif @@ -82,6 +127,300 @@ } /*==============================================================*/ +#ifdef WITH_MQTT +static int rpmmqttOpen(void **_mqttp, const char *clientID, const char *serverURI, + void *_mqtt) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + char * dn; + char * te; + int rc = MQTTCLIENT_PERSISTENCE_ERROR; + + *_mqttp = _mqtt; + + mqtt->dn = _free(mqtt->dn); + dn = rpmGetPath(mqtt->persist_path, "/", clientID, "-", serverURI, NULL); + for (te = dn; (te = strchr(te, ':')) != NULL; te++) + *te = '-'; + + if (rpmioMkpath(dn, (mode_t)0775, (uid_t)-1, (gid_t)-1)) { + dn = _free(dn); + goto exit; + } + mqtt->dn = dn; + + rc = 0; + +exit: +if (mqtt->debug || _rpmmqtt_debug < 0) +fprintf(stderr, "<-- %s(%p,\"%s\",\"%s\",%p) rc %d dn %s\n", __FUNCTION__, _mqttp, clientID, serverURI, _mqtt, rc, mqtt->dn); + + return rc; +} + +static int rpmmqttClose(void *_mqtt) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + int rc = MQTTCLIENT_PERSISTENCE_ERROR; + + + if (mqtt->dn == NULL) + goto exit; + + if (Rmdir(mqtt->dn) && (errno != ENOENT && errno != ENOTEMPTY)) + goto exit; + + rc = 0; + +exit: +if (mqtt->debug || _rpmmqtt_debug < 0) +fprintf(stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, _mqtt, rc); + return rc; +} + +static int rpmmqttPut(void *_mqtt, char *key, + int bufcount, char *buffers[], int buflens[]) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + char *fn = NULL; + FD_t fd = NULL; + size_t nb = 0; + size_t nw = 0; + int rc = MQTTCLIENT_PERSISTENCE_ERROR; + + if (mqtt->dn == NULL) + goto exit; + + /* XXX add .msg extension? */ + fn = rpmGetPath(mqtt->dn, "/", key, NULL); + fd = Fopen(fn, "wb"); + if (fd == NULL || Ferror(fd)) + goto exit; + + for (int i = 0; i < bufcount; i++) { + const char * s = buffers[i]; + int ns = buflens[i]; +if (mqtt->debug || _rpmmqtt_debug < 0) +fprintf(stderr, "%5d\t%p[%d]\t\"%.*s\"\n", i, s, ns, ns, s); + nb += buflens[i]; + nw += Fwrite(buffers[i], sizeof(*buffers[i]), buflens[i], fd); + } + (void) Fclose(fd); + fd = NULL; + + if (nb != nw) { + (void) Unlink(fn); + goto exit; + } + + rc = 0; + +exit: + if (fd) + Fclose(fd); +if (mqtt->debug || _rpmmqtt_debug < 0) +fprintf(stderr, "<-- %s(%p,\"%s\",%d,%p,%p) rc %d fn %s\n", __FUNCTION__, _mqtt, key, bufcount, buffers, buflens, rc, fn); + fn = _free(fn); + return rc; +} + +static int rpmmqttGet(void *_mqtt, char *key, + char *buffer[], int *buflen) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + char *fn = NULL; + FD_t fd = NULL; + size_t nr = 0; + char *b = NULL; + size_t nb = 0; + struct stat sb; + int rc = MQTTCLIENT_PERSISTENCE_ERROR; + + if (mqtt->dn == NULL) + goto exit; + + /* XXX add .msg extension? */ + fn = rpmGetPath(mqtt->dn, "/", key, NULL); + fd = Fopen(fn, "rb"); + if (fd == NULL || Ferror(fd)) + goto exit; + + if (Fstat(fd, &sb) != 0) + goto exit; + nb = sb.st_size; + b = xmalloc(nb); + nr = Fread(b, sizeof(*b), nb, fd); + (void) Fclose(fd); + fd = NULL; + + if (nb != nr) { + b = _free(b); + nb = 0; + goto exit; + } + + rc = 0; + +exit: + if (fd) + Fclose(fd); +if (mqtt->debug || _rpmmqtt_debug < 0) +fprintf(stderr, "<-- %s(%p,\"%s\",%p,%p) rc %d fn %s\n", __FUNCTION__, _mqtt, key, buffer, buflen, rc, fn); + fn = _free(fn); + *buffer = b; + *buflen = nb; + return rc; +} + +static int rpmmqttRemove(void *_mqtt, char *key) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + char *fn = NULL; + int rc = MQTTCLIENT_PERSISTENCE_ERROR; + + if (mqtt->dn == NULL) + goto exit; + + /* XXX add .msg extension? */ + fn = rpmGetPath(mqtt->dn, "/", key, NULL); + + if (Unlink(fn) && errno != ENOENT) + goto exit; + rc = 0; + +exit: +if (mqtt->debug || _rpmmqtt_debug < 0) +fprintf(stderr, "<-- %s(%p,\"%s\") rc %d fn %s\n", __FUNCTION__, _mqtt, key, rc, fn); + fn = _free(fn); + return rc; +} + +static int rpmmqttKeys(void *_mqtt, char ***keys, int *nkeys) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + ARGV_t av = NULL; + int ac = 0; + DIR * dir = NULL; + struct dirent *dp; + int rc = MQTTCLIENT_PERSISTENCE_ERROR; + + if (mqtt->dn == NULL) + goto exit; + + if ((dir = Opendir(mqtt->dn)) == NULL) + goto exit; + + while ((dp = Readdir(dir)) != NULL) { + char * fn = rpmGetPath(mqtt->dn, "/", dp->d_name, NULL); + struct stat sb; + + if (Lstat(fn, &sb) == 0 && S_ISREG(sb.st_mode)) + (void) argvAdd(&av, dp->d_name); + /* XXX strip .msg extension? */ + fn = _free(fn); + } + ac = argvCount(av); + if (ac == 0) + av = argvFree(av); + rc = 0; + +exit: + if (dir) + (void) Closedir(dir); +if (mqtt->debug || _rpmmqtt_debug < 0) +fprintf(stderr, "<-- %s(%p,%p,%p) rc %d keys %p[%u]\n", __FUNCTION__, _mqtt, keys, nkeys, rc, av, ac); + *keys = (char **) av; + *nkeys = ac; + return rc; +} + +static int rpmmqttClear(void *_mqtt) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + DIR * dir = NULL; + struct dirent *dp; + int nerrs = 0; + int rc = MQTTCLIENT_PERSISTENCE_ERROR; + + if (mqtt->dn == NULL) + goto exit; + + if ((dir = Opendir(mqtt->dn)) == NULL) + goto exit; + + while ((dp = Readdir(dir)) != NULL) { + char * fn = rpmGetPath(mqtt->dn, "/", dp->d_name, NULL); + struct stat sb; + + if (Lstat(fn, &sb) == 0 && S_ISREG(sb.st_mode)) { + if (Unlink(fn)) + nerrs++; + } + fn = _free(fn); + } + + if (nerrs == 0) + rc = 0; + +exit: + if (dir) + (void) Closedir(dir); +if (mqtt->debug || _rpmmqtt_debug < 0) +fprintf(stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, _mqtt, rc); + return rc; +} + +static int rpmmqttContainsKey(void *_mqtt, char *key) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + DIR * dir = NULL; + struct dirent *dp; + int rc = MQTTCLIENT_PERSISTENCE_ERROR; + + if (mqtt->dn == NULL) + goto exit; + + if ((dir = Opendir(mqtt->dn)) == NULL) + goto exit; + + while ((dp = Readdir(dir)) != NULL) { + char * fn = rpmGetPath(mqtt->dn, "/", dp->d_name, NULL); + struct stat sb; + + if (Lstat(fn, &sb) == 0 && S_ISREG(sb.st_mode)) { + /* XXX strip .msg extension? */ + if (!strcmp(dp->d_name, key)) { + fn = _free(fn); + rc = 0; + goto exit; + } + } + fn = _free(fn); + } + +exit: + if (dir) + (void) Closedir(dir); +if (mqtt->debug || _rpmmqtt_debug < 0) +fprintf(stderr, "<-- %s(%p,\"%s\") rc %d\n", __FUNCTION__, _mqtt, key, rc); + return rc; +} + +static MQTTClient_persistence _rpmmqtt_persistence = { + NULL, + rpmmqttOpen, + rpmmqttClose, + rpmmqttPut, + rpmmqttGet, + rpmmqttRemove, + rpmmqttKeys, + rpmmqttClear, + rpmmqttContainsKey, +}; + +#endif /* WITH_MQTT */ + +/*==============================================================*/ int rpmmqttConnect(rpmmqtt mqtt) { int rc = -1; @@ -115,7 +454,7 @@ mqtt->serverURI = xstrdup(Copts.returned.serverURI); mqtt->MQTTVersion = Copts.returned.MQTTVersion; mqtt->sessionPresent = Copts.returned.sessionPresent; - if (mqtt->debug || _rpmmqtt_debug < 0) + if (mqtt->debug || _rpmmqtt_debug) rpmlog(RPMLOG_DEBUG, "+++ MQTT connect(mqtt://%s) version(%d) present(%d)\n", mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent); @@ -133,7 +472,7 @@ if (MQTTClient_isConnected(mqtt->C)) { rc = check(mqtt, "disconnect", MQTTClient_disconnect(mqtt->C, mqtt->msecs)); - if (mqtt->debug || _rpmmqtt_debug < 0) + if (mqtt->debug || _rpmmqtt_debug) rpmlog(RPMLOG_DEBUG, "+++ MQTT disconnect(mqtt://%s) version(%d) present(%d)\n", mqtt->serverURI, mqtt->MQTTVersion, mqtt->sessionPresent); @@ -164,10 +503,9 @@ rc = check(mqtt, "publishMessage", MQTTClient_publishMessage(mqtt->C, mqtt->topic, &pubmsg, &mqtt->token)); - if (_rpmmqtt_debug < 0) - rpmlog(RPMLOG_DEBUG, "+++ MQTT sent(%d) topic(%s) \"%*s\"\n", + if (_rpmmqtt_debug) + rpmlog(RPMLOG_DEBUG, "+++ MQTT sent(%d) topic(%s) \"%.*s\"\n", mqtt->token, mqtt->topic, ns, s); - sleep(1); if (!mqtt->delivered) rc = check(mqtt, "waitForCompletion", @@ -224,6 +562,13 @@ } #endif mqtt->C = NULL; + mqtt->uri = _free(mqtt->uri); + mqtt->topic = _free(mqtt->topic); + mqtt->clientid = _free(mqtt->clientid); + mqtt->persist_path = _free(mqtt->persist_path); + mqtt->persist_ctx = _free(mqtt->persist_ctx); + mqtt->dn = _free(mqtt->dn); + mqtt->serverURI = _free(mqtt->serverURI); if (mqtt->av) @@ -234,28 +579,81 @@ RPMIOPOOL_MODULE(mqtt) +#ifdef DYING +static void dumpURL(const char * msg, urlinfo u) +{ + FILE * fp = stderr; + const char * url = (u ? u->url : NULL); + const char *path = NULL; + urltype ut = urlPath(url, &path);; + + if (msg) + fprintf(fp, "===================================== %s u %p ut %d\n", + msg, u, ut); + if (u) { + + fprintf(fp, " url: %s\n", u->url); + fprintf(fp, " scheme: %s\n", u->scheme); + fprintf(fp, " user: %s\n", u->user); + fprintf(fp, "password: %s\n", u->password); + fprintf(fp, " host: %s\n", u->host); + fprintf(fp, " portstr: %s\n", u->portstr); + fprintf(fp, " query: %s\n", u->query); + fprintf(fp, "fragment: %s\n", u->fragment); + fprintf(fp, " path: %s\n", path); + } +} +#endif + rpmmqtt rpmmqttNew(char ** av, uint32_t flags) { +static char _test_mqtt[] = "test/mqtt"; + + static const char *_av[] = { "tcp://localhost:1883/test/mqtt", NULL }; rpmmqtt mqtt = rpmmqttGetPool(_rpmmqttPool); + urlinfo u = NULL; + const char *s = NULL; mqtt->flags = flags; mqtt->av = NULL; + if (av == NULL) + av = (char **)_av; if (av) { int ac = argvCount((ARGV_t)av); for (int i = 0; i < ac; i++) (void) argvAdd((ARGV_t *)&mqtt->av, av[i]); } - mqtt->topic = _test_mqtt; + mqtt->ut = urlSplit(av[0], &u); + mqtt->u = u; + if (u->scheme == NULL || !strcmp(u->scheme, "mqtt")) { + u->scheme = _free(u->scheme); + u->scheme = xstrdup("tcp"); + if (u->portstr == NULL) + u->portstr = xstrdup("1883"); + } +#ifdef DYING +dumpURL(__FUNCTION__, u); +#endif + + mqtt->uri = rpmExpand(u->scheme, "://", u->host, ":", u->portstr, NULL); + + (void) urlPath(u->url, &s); + if (s && *s == '/') + s++; + if (s == NULL || *s == '\0') + s = _test_mqtt; + mqtt->topic = xstrdup(s); + + mqtt->clientid = xstrdup("rpm"); + mqtt->qos = 1; mqtt->msecs = 10000; #ifdef WITH_MQTT + { static int oneshot; - static char * _uri = "tcp://localhost:1883"; - static char * _clientid = "ExampleClientPub"; - static int _persist_type = MQTTCLIENT_PERSISTENCE_NONE; - static void * _persist_ctx = NULL; + static char _var_cache_mqtt[] = "/var/cache/mqtt"; int xx; if (!oneshot) { @@ -268,10 +666,32 @@ } oneshot++; } + + mqtt->persist_type = MQTTCLIENT_PERSISTENCE_USER; + switch (mqtt->persist_type) { + default: + case MQTTCLIENT_PERSISTENCE_NONE: + mqtt->persist_ctx = NULL; + break; + case MQTTCLIENT_PERSISTENCE_DEFAULT: + { + /* XXX rpmmqttFini double free */ + mqtt->persist_ctx = (void *)xstrdup(mqtt->persist_path); + mqtt->persist_path = xstrdup(_var_cache_mqtt); + } break; + case MQTTCLIENT_PERSISTENCE_USER: + { + MQTTClient_persistence * ctx = xmalloc(sizeof(*ctx)); + *ctx = _rpmmqtt_persistence; /* structure assignment */ + ctx->context = mqtt; + mqtt->persist_ctx = ctx; + mqtt->persist_path = xstrdup(_var_cache_mqtt); + } break; + } xx = check(mqtt, "create", - MQTTClient_create(&mqtt->C, _uri, _clientid, - _persist_type, _persist_ctx)); + MQTTClient_create(&mqtt->C, mqtt->uri, mqtt->clientid, + mqtt->persist_type, mqtt->persist_ctx)); xx = check(mqtt, "setCallbacks", MQTTClient_setCallbacks(mqtt->C, mqtt, @@ -280,6 +700,7 @@ rpmmqttDeliveryComplete)); xx = rpmmqttConnect(mqtt); + /* XXX exit if cannot connect? */ } #endif @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.h ============================================================================ $ cvs diff -u -r1.1.2.3 -r1.1.2.4 rpmmqtt.h --- rpm/rpmio/rpmmqtt.h 27 Jun 2016 22:00:14 -0000 1.1.2.3 +++ rpm/rpmio/rpmmqtt.h 28 Jun 2016 07:20:22 -0000 1.1.2.4 @@ -16,9 +16,20 @@ char ** av; uint32_t flags; + urltype ut; + urlinfo u; + void * C; /* MQTTClient */ + const char * uri; const char * topic; + const char * clientid; + + int persist_type; + void * persist_ctx; /* MQTTClient_persistence */ + const char * persist_path; + const char *dn; + int qos; int token; int msecs; @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmurl.h ============================================================================ $ cvs diff -u -r1.41.4.8 -r1.41.4.9 rpmurl.h --- rpm/rpmio/rpmurl.h 25 Jun 2016 22:36:54 -0000 1.41.4.8 +++ rpm/rpmio/rpmurl.h 28 Jun 2016 07:20:22 -0000 1.41.4.9 @@ -31,6 +31,7 @@ #define URL_IS_MYSQL (urltype)33 #define URL_IS_POSTGRES (urltype)34 #define URL_IS_SQLSERVER (urltype)35 +#define URL_IS_MQTT (urltype)36 #define URLMAGIC 0xd00b1ed0U #define URLSANE(u) assert(u && u->magic == URLMAGIC) @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/tmqtt.c ============================================================================ $ cvs diff -u -r1.1.2.1 -r1.1.2.2 tmqtt.c --- rpm/rpmio/tmqtt.c 27 Jun 2016 22:00:14 -0000 1.1.2.1 +++ rpm/rpmio/tmqtt.c 28 Jun 2016 07:20:22 -0000 1.1.2.2 @@ -57,7 +57,9 @@ #ifdef UNUSED int ac = argvCount(av); #endif - rpmmqtt mqtt = rpmmqttNew((char **)av, 0); + (void)av; + static char *_av[] = { "mqtt://localhost/test/mqtt", NULL, }; + rpmmqtt mqtt = rpmmqttNew(_av, 0); int rc = -1; rc = _DoMQTT(mqtt); @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/url.c ============================================================================ $ cvs diff -u -r1.73.4.14 -r1.73.4.15 url.c --- rpm/rpmio/url.c 25 Jun 2016 22:36:54 -0000 1.73.4.14 +++ rpm/rpmio/url.c 28 Jun 2016 07:20:22 -0000 1.73.4.15 @@ -31,6 +31,9 @@ #ifndef IPPORT_HTTPS #define IPPORT_HTTPS 443 #endif +#ifndef IPPORT_MQTT +#define IPPORT_MQTT 1883 +#endif #ifndef IPPORT_PGPKEYSERVER #define IPPORT_PGPKEYSERVER 11371 #endif @@ -410,7 +413,12 @@ { "mssql://", sizeof("mssql://")-1, URL_IS_SQLSERVER }, { "sqlserver://", sizeof("sqlserver://")-1, URL_IS_SQLSERVER }, + { "mqtt://", sizeof("mqtt://")-1, URL_IS_MQTT }, + { "-", sizeof("-")-1, URL_IS_DASH }, + { "ssh://", sizeof("ssh://")-1, URL_IS_UNKNOWN }, + { "tcp://", sizeof("tcp://")-1, URL_IS_UNKNOWN }, + { "udp://", sizeof("udp://")-1, URL_IS_UNKNOWN }, { NULL, 0, URL_IS_UNKNOWN } }; @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository rpm-cvs@rpm5.org