RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: j...@rpm5.org Module: rpm Date: 06-Jul-2016 11:06:25 Branch: rpm-5_4 Handle: 2016070609062500 Modified files: (Branch: rpm-5_4) rpm/rpmio librpmio.vers rpmmqtt.c rpmmqtt.h tmqtt.c Log: - mqtt: wrap subscribe*/unsubscribe* functions. Summary: Revision Changes Path 2.199.2.60 +4 -0 rpm/rpmio/librpmio.vers 1.1.2.17 +193 -35 rpm/rpmio/rpmmqtt.c 1.1.2.15 +13 -5 rpm/rpmio/rpmmqtt.h 1.1.2.11 +18 -11 rpm/rpmio/tmqtt.c ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/rpmio/librpmio.vers ============================================================================ $ cvs diff -u -r2.199.2.59 -r2.199.2.60 librpmio.vers --- rpm/rpmio/librpmio.vers 4 Jul 2016 07:45:23 -0000 2.199.2.59 +++ rpm/rpmio/librpmio.vers 6 Jul 2016 09:06:25 -0000 2.199.2.60 @@ -643,6 +643,10 @@ rpmmqttPub; rpmmqttRun; rpmmqttSub; + rpmmqttSubscribe; + rpmmqttSubscribeMany; + rpmmqttUnsubscribe; + rpmmqttUnsubscribeMany; _rpmmrb_debug; _rpmmrbI; _rpmmrbPool; @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.c ============================================================================ $ cvs diff -u -r1.1.2.16 -r1.1.2.17 rpmmqtt.c --- rpm/rpmio/rpmmqtt.c 5 Jul 2016 19:48:13 -0000 1.1.2.16 +++ rpm/rpmio/rpmmqtt.c 6 Jul 2016 09:06:25 -0000 1.1.2.17 @@ -425,6 +425,21 @@ (void) rpmmqttConnect(mqtt); } +static void onFailure(void * _mqtt, MQTTAsync_failureData * response) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + rpmlog(RPMLOG_WARNING, "MQTT failed\n"); +SPEW((stderr, "<-- %s(%p,%p)\n", __FUNCTION__, _mqtt, response)); + mqtt->finished = 1; +} + +static void onSuccess(void * _mqtt, MQTTAsync_successData * response) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; +SPEW((stderr, "<-- %s(%p,%p)\n", __FUNCTION__, _mqtt, response)); + mqtt->finished = 1; +} + static void onDisconnectFailure(void * _mqtt, MQTTAsync_failureData * response) { rpmmqtt mqtt = (rpmmqtt) _mqtt; @@ -496,9 +511,17 @@ mqtt->finished = 1; } +static void onSubscribeManyFailure(void * _mqtt, MQTTAsync_failureData * response) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + rpmlog(RPMLOG_WARNING, "MQTT subscribeMany failed\n"); + mqtt->finished = 1; +} + static void onSubscribeMany(void * _mqtt, MQTTAsync_successData * response) { rpmmqtt mqtt = (rpmmqtt) _mqtt; +#ifdef DYING int *subqos = response->alt.qosList; SPEW((stderr, "--> %s(%p,%p) subqos %p[%u]\n", __FUNCTION__, _mqtt, response, subqos, mqtt->ac)); @@ -506,6 +529,23 @@ if (mqtt->debug || _rpmmqtt_debug) rpmlog(RPMLOG_DEBUG, "MQTT subscribe qos(%d)\n", subqos[i]); } +#endif + mqtt->finished = 1; +} + +static void onUnsubscribeManyFailure(void * _mqtt, MQTTAsync_failureData * response) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + rpmlog(RPMLOG_WARNING, "MQTT unsubscribeMany failed\n"); + mqtt->finished = 1; +} + +static void onUnsubscribeMany(void * _mqtt, MQTTAsync_successData * response) +{ + rpmmqtt mqtt = (rpmmqtt) _mqtt; + + rpmlog(RPMLOG_DEBUG, "MQTT unsubscribeMany\n"); +SPEW((stderr, "<-- %s(%p,%p)\n", __FUNCTION__, _mqtt, response)); mqtt->finished = 1; } @@ -978,9 +1018,9 @@ return ptr; } -int rpmmqttConnect(rpmmqtt mqtt) +rpmRC rpmmqttConnect(rpmmqtt mqtt) { - int rc = -1; + rpmRC rc = RPMRC_FAIL; /* assume failure */ SPEW((stderr, "--> %s(%p)\n", __FUNCTION__, mqtt)); #ifdef WITH_MQTT @@ -990,16 +1030,19 @@ MQTTAsync_connect(mqtt->I, AOBJ(mqtt, 'C'))); while (!mqtt->finished) usleep(1000); - } else - rc = 0; + if (rc) + goto exit; + } + rc = RPMRC_OK; +exit: #endif /* WITH_MQTT */ SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc)); return rc; } -int rpmmqttDisconnect(rpmmqtt mqtt) +rpmRC rpmmqttDisconnect(rpmmqtt mqtt) { - int rc = -1; + rpmRC rc = RPMRC_FAIL; /* assume failure */ #ifdef WITH_MQTT if (MQTTAsync_isConnected(mqtt->I)) { mqtt->finished = 0; @@ -1007,17 +1050,20 @@ MQTTAsync_disconnect(mqtt->I, AOBJ(mqtt, 'D'))); while (!mqtt->finished) usleep(100); - } else - rc = 0; + if (rc) + goto exit; + } + rc = RPMRC_OK; +exit: #endif SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc)); return rc; } -int rpmmqttSendMessage(rpmmqtt mqtt, const char * topic, +rpmRC rpmmqttSendMessage(rpmmqtt mqtt, const char * topic, const char * s, size_t ns) { - int rc = -1; + rpmRC rc = RPMRC_FAIL; /* assume failure */ if (topic == NULL) topic = mqtt->topic; @@ -1027,6 +1073,8 @@ ns = strlen(s); #ifdef WITH_MQTT + if (rpmmqttConnect(mqtt)) + goto exit; MQTTAsync_message *M = AOBJ(mqtt, 'M'); M->payloadlen = ns; M->payload = (char *) s; @@ -1040,20 +1088,26 @@ MQTTAsync_sendMessage(mqtt->I, topic, M, R)); while (!mqtt->finished) usleep(100); + if (rc) + goto exit; + rc = RPMRC_OK; +exit: #endif /* WITH_MQTT */ SPEW((stderr, "<-- %s(%p,%p[%u]) rc %d\n", __FUNCTION__, mqtt, s, (unsigned)ns, rc)); return rc; } -static int rpmmqttSubscribeMany(rpmmqtt mqtt, int ac, char ** av) +rpmRC rpmmqttSubscribeMany(rpmmqtt mqtt, int ac, char ** av) { - int rc = -1; + rpmRC rc = RPMRC_FAIL; /* assume failure */ SPEW((stderr, "--> %s(%p,%p[%d])\n", __FUNCTION__, mqtt, av, ac)); if (ac <= 0) goto exit; #ifdef WITH_MQTT + if (rpmmqttConnect(mqtt)) + goto exit; int _lvl = RPMLOG_DEBUG; int *subqos = xcalloc(ac, sizeof(*subqos)); for (int i = 0; i < ac; i++) { @@ -1069,17 +1123,119 @@ } else #endif subqos[i] = mqtt->qos; /* XXX */ - rpmlog(_lvl, "%19s: %s qos(%u)\n", "subtopic", t, subqos[i]); + rpmlog(_lvl, "%19s: %s qos(%u)\n", "subscribe", t, subqos[i]); } MQTTAsync_responseOptions *R = AOBJ(mqtt, 'R'); R->onSuccess = onSubscribeMany; - R->onFailure = onSubscribeFailure; /* XXX */ + R->onFailure = onSubscribeManyFailure; + mqtt->finished = 0; rc = check(mqtt, "subscribeMany", - MQTTAsync_subscribeMany(&mqtt->I, - ac, av, subqos, R)); + MQTTAsync_subscribeMany(mqtt->I, ac, av, subqos, R)); + while (!mqtt->finished) + usleep(100); subqos = _free(subqos); + if (rc) + goto exit; + rc = RPMRC_OK; +#endif /* WITH_MQTT */ + +exit: +SPEW((stderr, "<-- %s(%p,%p[%d]) rc %d\n", __FUNCTION__, mqtt, av, ac, rc)); + return rc; +} + +rpmRC rpmmqttSubscribe(rpmmqtt mqtt, const char * topic, int qos) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + +SPEW((stderr, "--> %s(%p,%p,%d)\n", __FUNCTION__, mqtt, topic, qos)); +#ifdef WITH_MQTT + if (rpmmqttConnect(mqtt)) + goto exit; + int _lvl = RPMLOG_DEBUG; + + rpmlog(_lvl, "%19s: %s qos(%d)\n", "subscribe", topic, qos); + + MQTTAsync_responseOptions *R = AOBJ(mqtt, 'R'); + R->onSuccess = onSubscribe; + R->onFailure = onSubscribeFailure; + + mqtt->finished = 0; + rc = check(mqtt, "subscribe", + MQTTAsync_subscribe(mqtt->I, topic, qos, R)); + while (!mqtt->finished) + usleep(100); + if (rc) + goto exit; + rc = RPMRC_OK; +#endif /* WITH_MQTT */ + +exit: +SPEW((stderr, "<-- %s(%p,%p[%d]) rc %d\n", __FUNCTION__, mqtt, topic, qos, rc)); + return rc; +} + +rpmRC rpmmqttUnsubscribe(rpmmqtt mqtt, const char * topic) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + +SPEW((stderr, "--> %s(%p,\"%s\")\n", __FUNCTION__, mqtt, topic)); +#ifdef WITH_MQTT + if (rpmmqttConnect(mqtt)) + goto exit; + int _lvl = RPMLOG_DEBUG; + + rpmlog(_lvl, "%19s: %s\n", "unsubscribe", topic); + + MQTTAsync_responseOptions *R = AOBJ(mqtt, 'R'); + R->onSuccess = onUnsubscribeMany; + R->onFailure = onUnsubscribeManyFailure; + + mqtt->finished = 0; + rc = check(mqtt, "unsubscribe", + MQTTAsync_unsubscribe(mqtt->I, topic, R)); + while (!mqtt->finished) + usleep(100); + if (rc) + goto exit; + rc = RPMRC_OK; +#endif /* WITH_MQTT */ + +exit: +SPEW((stderr, "<-- %s(%p,\"%s\") rc %d\n", __FUNCTION__, mqtt, topic, rc)); + return rc; +} + +rpmRC rpmmqttUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av) +{ + rpmRC rc = RPMRC_FAIL; /* assume failure */ + +SPEW((stderr, "--> %s(%p,%p[%d])\n", __FUNCTION__, mqtt, av, ac)); + if (ac <= 0) + goto exit; +#ifdef WITH_MQTT + if (rpmmqttConnect(mqtt)) + goto exit; + int _lvl = RPMLOG_DEBUG; + for (int i = 0; i < ac; i++) { + char * t = av[i]; + rpmlog(_lvl, "%19s: %s\n", "unsubscribe", t); + } + + MQTTAsync_responseOptions *R = AOBJ(mqtt, 'R'); + R->onSuccess = onUnsubscribeMany; + R->onFailure = onUnsubscribeManyFailure; + + mqtt->finished = 0; + rc = check(mqtt, "unsubscribeMany", + MQTTAsync_unsubscribeMany(mqtt->I, ac, av, R)); + while (!mqtt->finished) + usleep(100); + if (rc) + goto exit; + rc = RPMRC_OK; #endif /* WITH_MQTT */ exit: @@ -1088,9 +1244,9 @@ } /*==============================================================*/ -int rpmmqttPub(rpmmqtt mqtt, const char * topic, const char *s, size_t ns) +rpmRC rpmmqttPub(rpmmqtt mqtt, const char * topic, const char *s, size_t ns) { - int ret = -1; /* assume failure */ + rpmRC rc = RPMRC_FAIL; /* assume failure */ if (topic == NULL) topic = mqtt->topic; @@ -1099,7 +1255,6 @@ if (ns == 0) ns = strlen(s); - #ifdef WITH_MQTT if (rpmmqttConnect(mqtt)) goto exit; @@ -1108,22 +1263,23 @@ char * t = rpmExpand(_mqtt_prefix, " ", s, NULL); size_t nt = strlen(t); - if (!rpmmqttSendMessage(mqtt, topic, t, nt)) - ret = nt; - + rc = rpmmqttSendMessage(mqtt, topic, t, nt); t = _free(t); + if (rc) + goto exit; + rc = RPMRC_OK; } exit: #endif /* WITH_MQTT */ -SPEW((stderr, "<-- %s(%p,%p[%u]) ret %d\n", __FUNCTION__, mqtt, s, (unsigned)ns, (int)ret)); - return ret; +SPEW((stderr, "<-- %s(%p,%p[%u]) rc %d\n", __FUNCTION__, mqtt, s, (unsigned)ns, (int)rc)); + return rc; } -int rpmmqttSub(rpmmqtt mqtt, const char *s, size_t ns) +rpmRC rpmmqttSub(rpmmqtt mqtt, const char *s, size_t ns) { - int ret = -1; /* assume failure */ + rpmRC rc = RPMRC_FAIL; /* assume failure */ if (ns == 0) ns = strlen(s); @@ -1135,7 +1291,6 @@ unsigned subqos = mqtt->qos; char *t, *te; int _lvl = RPMLOG_DEBUG; - int rc; if ((t = strchr(subtopic, '?')) != NULL) { ARGV_t qav = NULL; @@ -1163,23 +1318,26 @@ R->onSuccess = onSubscribe; R->onFailure = onSubscribeFailure; +#ifdef DYING mqtt->finished = 0; rc = check(mqtt, "subscribe", MQTTAsync_subscribe(mqtt->I, subtopic, subqos, R)); while (rc == 0 && !mqtt->finished) usleep(100); - - ret = (rc ? -1 : 0); - +#else + rc = rpmmqttSubscribe(mqtt, subtopic, subqos); +#endif subtopic = _free(subtopic); - + if (rc) + goto exit; + rc = RPMRC_OK; } exit: #endif /* WITH_MQTT */ -SPEW((stderr, "<-- %s(%p,%p[%u]) ret %d\n", __FUNCTION__, mqtt, s, (unsigned)ns, (int)ret)); - return ret; +SPEW((stderr, "<-- %s(%p,%p[%u]) ret %d\n", __FUNCTION__, mqtt, s, (unsigned)ns, (int)rc)); + return rc; } /*==============================================================*/ @@ -1623,9 +1781,9 @@ if (topics) { int nsubs = argvCount((ARGV_t)topics); -#ifdef NOTYET /* XXX MQTT segfault if done here. */ +#ifndef NOTYET /* XXX MQTT segfault if done here. */ argvPrint(__FUNCTION__, (ARGV_t)topics, NULL); - xx = rpmmqttSubscribeMany(mqtt, nsubs, topics); + xx = rpmmqttSubscribeMany(mqtt, nsubs, (char **)topics); #else for (int i = 0; i < nsubs; i++) { xx = rpmmqttSub(mqtt, topics[i], 0); @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.h ============================================================================ $ cvs diff -u -r1.1.2.14 -r1.1.2.15 rpmmqtt.h --- rpm/rpmio/rpmmqtt.h 5 Jul 2016 19:48:14 -0000 1.1.2.14 +++ rpm/rpmio/rpmmqtt.h 6 Jul 2016 09:06:25 -0000 1.1.2.15 @@ -172,16 +172,24 @@ */ rpmmqtt rpmmqttNew(char ** av, uint32_t flags); -int rpmmqttConnect(rpmmqtt mqtt); +rpmRC rpmmqttConnect(rpmmqtt mqtt); -int rpmmqttDisconnect(rpmmqtt mqtt); +rpmRC rpmmqttDisconnect(rpmmqtt mqtt); -int rpmmqttSendMessage(rpmmqtt mqtt, const char * topic, +rpmRC rpmmqttSendMessage(rpmmqtt mqtt, const char * topic, const char *s, size_t ns); -int rpmmqttPub(rpmmqtt mqtt, const char * topic, const char *s, size_t ns); +rpmRC rpmmqttSubscribeMany(rpmmqtt mqtt, int ac, char ** av); -int rpmmqttSub(rpmmqtt mqtt, const char *s, size_t ns); +rpmRC rpmmqttUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av); + +rpmRC rpmmqttSubscribe(rpmmqtt mqtt, const char *topic, int qos); + +rpmRC rpmmqttUnsubscribe(rpmmqtt mqtt, const char *topic); + +rpmRC rpmmqttPub(rpmmqtt mqtt, const char * topic, const char *s, size_t ns); + +rpmRC rpmmqttSub(rpmmqtt mqtt, const char *s, size_t ns); rpmRC rpmmqttRun(rpmmqtt mqtt, const char * str, const char ** resultp); @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/tmqtt.c ============================================================================ $ cvs diff -u -r1.1.2.10 -r1.1.2.11 tmqtt.c --- rpm/rpmio/tmqtt.c 5 Jul 2016 14:49:11 -0000 1.1.2.10 +++ rpm/rpmio/tmqtt.c 6 Jul 2016 09:06:25 -0000 1.1.2.11 @@ -25,23 +25,30 @@ static int _DoMQTT(rpmmqtt mqtt) { - ssize_t nw; + rpmRC xx; int rc = 0; - +static char *av[] = { "foo", "bar", "baz", NULL }; +int ac = 3; +int qos = 2; + + xx = rpmmqttConnect(mqtt); + xx = rpmmqttSubscribe(mqtt, av[0], qos); + xx = rpmmqttUnsubscribe(mqtt, av[0]); + xx = rpmmqttSubscribeMany(mqtt, ac, av); + xx = rpmmqttUnsubscribeMany(mqtt, ac, av); + xx = rpmmqttDisconnect(mqtt); + #ifdef DYING (void) rpmmqttSub(mqtt, "rpm/#?qos=0", 0); (void) rpmmqttSub(mqtt, "$SYS/broker/version?qos=0", 0); #endif - nw = rpmmqttPub(mqtt, NULL, "bzzt ...", 0); - nw = rpmmqttPub(mqtt, NULL, "bzzT ...", 0); - nw = rpmmqttPub(mqtt, NULL, "bzZT ...", 0); - nw = rpmmqttPub(mqtt, NULL, "bZZT ...", 0); - nw = rpmmqttPub(mqtt, NULL, "BZZT ...", 0); - (void) rpmmqttDisconnect(mqtt); - nw = rpmmqttPub(mqtt, NULL, "SWAT !!!", 0); - (void) rpmmqttDisconnect(mqtt); - (void) rpmmqttConnect(mqtt); + xx = rpmmqttPub(mqtt, NULL, "bzzt ...", 0); + xx = rpmmqttPub(mqtt, NULL, "bzzT ...", 0); + xx = rpmmqttPub(mqtt, NULL, "bzZT ...", 0); + xx = rpmmqttPub(mqtt, NULL, "bZZT ...", 0); + xx = rpmmqttPub(mqtt, NULL, "BZZT ...", 0); + xx = rpmmqttPub(mqtt, NULL, "SWAT !!!", 0); return rc; } @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository rpm-cvs@rpm5.org