RPM Package Manager, CVS Repository
  http://rpm5.org/cvs/
  ____________________________________________________________________________

  Server: rpm5.org                         Name:   Jeff Johnson
  Root:   /v/rpm/cvs                       Email:  j...@rpm5.org
  Module: rpm                              Date:   06-Jul-2016 11:06:25
  Branch: rpm-5_4                          Handle: 2016070609062500

  Modified files:           (Branch: rpm-5_4)
    rpm/rpmio               librpmio.vers rpmmqtt.c rpmmqtt.h tmqtt.c

  Log:
    - mqtt: wrap subscribe*/unsubscribe* functions.

  Summary:
    Revision    Changes     Path
    2.199.2.60  +4  -0      rpm/rpmio/librpmio.vers
    1.1.2.17    +193 -35    rpm/rpmio/rpmmqtt.c
    1.1.2.15    +13 -5      rpm/rpmio/rpmmqtt.h
    1.1.2.11    +18 -11     rpm/rpmio/tmqtt.c
  ____________________________________________________________________________

  patch -p0 <<'@@ .'
  Index: rpm/rpmio/librpmio.vers
  ============================================================================
  $ cvs diff -u -r2.199.2.59 -r2.199.2.60 librpmio.vers
  --- rpm/rpmio/librpmio.vers   4 Jul 2016 07:45:23 -0000       2.199.2.59
  +++ rpm/rpmio/librpmio.vers   6 Jul 2016 09:06:25 -0000       2.199.2.60
  @@ -643,6 +643,10 @@
       rpmmqttPub;
       rpmmqttRun;
       rpmmqttSub;
  +    rpmmqttSubscribe;
  +    rpmmqttSubscribeMany;
  +    rpmmqttUnsubscribe;
  +    rpmmqttUnsubscribeMany;
       _rpmmrb_debug;
       _rpmmrbI;
       _rpmmrbPool;
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmqtt.c
  ============================================================================
  $ cvs diff -u -r1.1.2.16 -r1.1.2.17 rpmmqtt.c
  --- rpm/rpmio/rpmmqtt.c       5 Jul 2016 19:48:13 -0000       1.1.2.16
  +++ rpm/rpmio/rpmmqtt.c       6 Jul 2016 09:06:25 -0000       1.1.2.17
  @@ -425,6 +425,21 @@
       (void) rpmmqttConnect(mqtt);
   }
   
  +static void onFailure(void * _mqtt, MQTTAsync_failureData * response)
  +{
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    rpmlog(RPMLOG_WARNING, "MQTT failed\n");
  +SPEW((stderr, "<-- %s(%p,%p)\n", __FUNCTION__, _mqtt, response));
  +    mqtt->finished = 1;
  +}
  +
  +static void onSuccess(void * _mqtt, MQTTAsync_successData * response)
  +{
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +SPEW((stderr, "<-- %s(%p,%p)\n", __FUNCTION__, _mqtt, response));
  +    mqtt->finished = 1;
  +}
  +
   static void onDisconnectFailure(void * _mqtt, MQTTAsync_failureData * 
response)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
  @@ -496,9 +511,17 @@
       mqtt->finished = 1;
   }
   
  +static void onSubscribeManyFailure(void * _mqtt, MQTTAsync_failureData * 
response)
  +{
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    rpmlog(RPMLOG_WARNING, "MQTT    subscribeMany failed\n");
  +    mqtt->finished = 1;
  +}
  +
   static void onSubscribeMany(void * _mqtt, MQTTAsync_successData * response)
   {
       rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +#ifdef       DYING
       int *subqos = response->alt.qosList;
   
   SPEW((stderr, "--> %s(%p,%p) subqos %p[%u]\n", __FUNCTION__, _mqtt, 
response, subqos, mqtt->ac));
  @@ -506,6 +529,23 @@
        if (mqtt->debug || _rpmmqtt_debug)
            rpmlog(RPMLOG_DEBUG, "MQTT  subscribe qos(%d)\n", subqos[i]);
       }
  +#endif
  +    mqtt->finished = 1;
  +}
  +
  +static void onUnsubscribeManyFailure(void * _mqtt, MQTTAsync_failureData * 
response)
  +{
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +    rpmlog(RPMLOG_WARNING, "MQTT    unsubscribeMany failed\n");
  +    mqtt->finished = 1;
  +}
  +
  +static void onUnsubscribeMany(void * _mqtt, MQTTAsync_successData * response)
  +{
  +    rpmmqtt mqtt = (rpmmqtt) _mqtt;
  +
  +    rpmlog(RPMLOG_DEBUG, "MQTT  unsubscribeMany\n");
  +SPEW((stderr, "<-- %s(%p,%p)\n", __FUNCTION__, _mqtt, response));
       mqtt->finished = 1;
   }
   
  @@ -978,9 +1018,9 @@
       return ptr;
   }
   
  -int rpmmqttConnect(rpmmqtt mqtt)
  +rpmRC rpmmqttConnect(rpmmqtt mqtt)
   {
  -    int rc = -1;
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
   
   SPEW((stderr, "--> %s(%p)\n", __FUNCTION__, mqtt));
   #ifdef       WITH_MQTT
  @@ -990,16 +1030,19 @@
                MQTTAsync_connect(mqtt->I, AOBJ(mqtt, 'C')));
        while (!mqtt->finished)
            usleep(1000);
  -    } else
  -     rc = 0;
  +     if (rc)
  +         goto exit;
  +    }
  +    rc = RPMRC_OK;
  +exit:
   #endif       /* WITH_MQTT */
   SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc));
       return rc;
   }
   
  -int rpmmqttDisconnect(rpmmqtt mqtt)
  +rpmRC rpmmqttDisconnect(rpmmqtt mqtt)
   {
  -    int rc = -1;
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
   #ifdef       WITH_MQTT
       if (MQTTAsync_isConnected(mqtt->I)) {
        mqtt->finished = 0;
  @@ -1007,17 +1050,20 @@
                MQTTAsync_disconnect(mqtt->I, AOBJ(mqtt, 'D')));
        while (!mqtt->finished)
            usleep(100);
  -    } else
  -     rc = 0;
  +     if (rc)
  +         goto exit;
  +    }
  +    rc = RPMRC_OK;
  +exit:
   #endif
   SPEW((stderr, "<-- %s(%p) rc %d\n", __FUNCTION__, mqtt, rc));
       return rc;
   }
   
  -int rpmmqttSendMessage(rpmmqtt mqtt, const char * topic,
  +rpmRC rpmmqttSendMessage(rpmmqtt mqtt, const char * topic,
                const char * s, size_t ns)
   {
  -    int rc = -1;
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
   
       if (topic == NULL)
        topic = mqtt->topic;
  @@ -1027,6 +1073,8 @@
        ns = strlen(s);
   
   #ifdef       WITH_MQTT
  +    if (rpmmqttConnect(mqtt))
  +     goto exit;
       MQTTAsync_message *M = AOBJ(mqtt, 'M');
       M->payloadlen = ns;
       M->payload = (char *) s;
  @@ -1040,20 +1088,26 @@
                MQTTAsync_sendMessage(mqtt->I, topic, M, R));
       while (!mqtt->finished)
        usleep(100);
  +    if (rc)
  +     goto exit;
  +    rc = RPMRC_OK;
  +exit:
   #endif       /* WITH_MQTT */
   
   SPEW((stderr, "<-- %s(%p,%p[%u]) rc %d\n", __FUNCTION__, mqtt, s, 
(unsigned)ns, rc));
       return rc;
   }
   
  -static int rpmmqttSubscribeMany(rpmmqtt mqtt, int ac, char ** av)
  +rpmRC rpmmqttSubscribeMany(rpmmqtt mqtt, int ac, char ** av)
   {
  -    int rc = -1;
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
   
   SPEW((stderr, "--> %s(%p,%p[%d])\n", __FUNCTION__, mqtt, av, ac));
       if (ac <= 0)
        goto exit;
   #ifdef       WITH_MQTT
  +    if (rpmmqttConnect(mqtt))
  +     goto exit;
       int _lvl = RPMLOG_DEBUG;
       int *subqos = xcalloc(ac, sizeof(*subqos));
       for (int i = 0; i < ac; i++) {
  @@ -1069,17 +1123,119 @@
        } else
   #endif
            subqos[i] = mqtt->qos;      /* XXX */
  -     rpmlog(_lvl, "%19s: %s qos(%u)\n", "subtopic", t, subqos[i]);
  +     rpmlog(_lvl, "%19s: %s qos(%u)\n", "subscribe", t, subqos[i]);
       }
   
       MQTTAsync_responseOptions *R = AOBJ(mqtt, 'R');
       R->onSuccess = onSubscribeMany;
  -    R->onFailure = onSubscribeFailure;       /* XXX */
  +    R->onFailure = onSubscribeManyFailure;
   
  +    mqtt->finished = 0;
       rc = check(mqtt, "subscribeMany",
  -             MQTTAsync_subscribeMany(&mqtt->I,
  -                     ac, av, subqos, R));
  +             MQTTAsync_subscribeMany(mqtt->I, ac, av, subqos, R));
  +    while (!mqtt->finished)
  +     usleep(100);
       subqos = _free(subqos);
  +    if (rc)
  +     goto exit;
  +    rc = RPMRC_OK;
  +#endif       /* WITH_MQTT */
  +
  +exit:
  +SPEW((stderr, "<-- %s(%p,%p[%d]) rc %d\n", __FUNCTION__, mqtt, av, ac, rc));
  +    return rc;
  +}
  +
  +rpmRC rpmmqttSubscribe(rpmmqtt mqtt, const char * topic, int qos)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +
  +SPEW((stderr, "--> %s(%p,%p,%d)\n", __FUNCTION__, mqtt, topic, qos));
  +#ifdef       WITH_MQTT
  +    if (rpmmqttConnect(mqtt))
  +     goto exit;
  +    int _lvl = RPMLOG_DEBUG;
  +
  +    rpmlog(_lvl, "%19s: %s qos(%d)\n", "subscribe", topic, qos);
  +
  +    MQTTAsync_responseOptions *R = AOBJ(mqtt, 'R');
  +    R->onSuccess = onSubscribe;
  +    R->onFailure = onSubscribeFailure;
  +
  +    mqtt->finished = 0;
  +    rc = check(mqtt, "subscribe",
  +             MQTTAsync_subscribe(mqtt->I, topic, qos, R));
  +    while (!mqtt->finished)
  +     usleep(100);
  +    if (rc)
  +     goto exit;
  +    rc = RPMRC_OK;
  +#endif       /* WITH_MQTT */
  +
  +exit:
  +SPEW((stderr, "<-- %s(%p,%p[%d]) rc %d\n", __FUNCTION__, mqtt, topic, qos, 
rc));
  +    return rc;
  +}
  +
  +rpmRC rpmmqttUnsubscribe(rpmmqtt mqtt, const char * topic)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +
  +SPEW((stderr, "--> %s(%p,\"%s\")\n", __FUNCTION__, mqtt, topic));
  +#ifdef       WITH_MQTT
  +    if (rpmmqttConnect(mqtt))
  +     goto exit;
  +    int _lvl = RPMLOG_DEBUG;
  +
  +    rpmlog(_lvl, "%19s: %s\n", "unsubscribe", topic);
  +
  +    MQTTAsync_responseOptions *R = AOBJ(mqtt, 'R');
  +    R->onSuccess = onUnsubscribeMany;
  +    R->onFailure = onUnsubscribeManyFailure;
  +
  +    mqtt->finished = 0;
  +    rc = check(mqtt, "unsubscribe",
  +             MQTTAsync_unsubscribe(mqtt->I, topic, R));
  +    while (!mqtt->finished)
  +     usleep(100);
  +    if (rc)
  +     goto exit;
  +    rc = RPMRC_OK;
  +#endif       /* WITH_MQTT */
  +
  +exit:
  +SPEW((stderr, "<-- %s(%p,\"%s\") rc %d\n", __FUNCTION__, mqtt, topic, rc));
  +    return rc;
  +}
  +
  +rpmRC rpmmqttUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av)
  +{
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
  +
  +SPEW((stderr, "--> %s(%p,%p[%d])\n", __FUNCTION__, mqtt, av, ac));
  +    if (ac <= 0)
  +     goto exit;
  +#ifdef       WITH_MQTT
  +    if (rpmmqttConnect(mqtt))
  +     goto exit;
  +    int _lvl = RPMLOG_DEBUG;
  +    for (int i = 0; i < ac; i++) {
  +     char * t = av[i];
  +     rpmlog(_lvl, "%19s: %s\n", "unsubscribe", t);
  +    }
  +
  +    MQTTAsync_responseOptions *R = AOBJ(mqtt, 'R');
  +    R->onSuccess = onUnsubscribeMany;
  +    R->onFailure = onUnsubscribeManyFailure;
  +
  +    mqtt->finished = 0;
  +    rc = check(mqtt, "unsubscribeMany",
  +             MQTTAsync_unsubscribeMany(mqtt->I, ac, av, R));
  +    while (!mqtt->finished)
  +     usleep(100);
  +    if (rc)
  +     goto exit;
  +    rc = RPMRC_OK;
   #endif       /* WITH_MQTT */
   
   exit:
  @@ -1088,9 +1244,9 @@
   }
   
   /*==============================================================*/
  -int rpmmqttPub(rpmmqtt mqtt, const char * topic, const char *s, size_t ns)
  +rpmRC rpmmqttPub(rpmmqtt mqtt, const char * topic, const char *s, size_t ns)
   {
  -    int ret = -1;    /* assume failure */
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
   
       if (topic == NULL)
        topic = mqtt->topic;
  @@ -1099,7 +1255,6 @@
       if (ns == 0)
        ns = strlen(s);
   
  -
   #ifdef       WITH_MQTT
       if (rpmmqttConnect(mqtt))
        goto exit;
  @@ -1108,22 +1263,23 @@
        char * t = rpmExpand(_mqtt_prefix, " ", s, NULL);
        size_t nt = strlen(t);
   
  -     if (!rpmmqttSendMessage(mqtt, topic, t, nt))
  -         ret = nt;
  -
  +     rc = rpmmqttSendMessage(mqtt, topic, t, nt);
        t = _free(t);
  +     if (rc)
  +         goto exit;
  +     rc = RPMRC_OK;
       }
   
   exit:
   #endif       /* WITH_MQTT */
   
  -SPEW((stderr, "<-- %s(%p,%p[%u]) ret %d\n", __FUNCTION__, mqtt, s, 
(unsigned)ns, (int)ret));
  -    return ret;
  +SPEW((stderr, "<-- %s(%p,%p[%u]) rc %d\n", __FUNCTION__, mqtt, s, 
(unsigned)ns, (int)rc));
  +    return rc;
   }
   
  -int rpmmqttSub(rpmmqtt mqtt, const char *s, size_t ns)
  +rpmRC rpmmqttSub(rpmmqtt mqtt, const char *s, size_t ns)
   {
  -    int ret = -1;    /* assume failure */
  +    rpmRC rc = RPMRC_FAIL;   /* assume failure */
   
       if (ns == 0) ns = strlen(s);
   
  @@ -1135,7 +1291,6 @@
        unsigned subqos = mqtt->qos;
        char *t, *te;
        int _lvl = RPMLOG_DEBUG;
  -     int rc;
   
        if ((t = strchr(subtopic, '?')) != NULL) {
            ARGV_t qav = NULL;
  @@ -1163,23 +1318,26 @@
        R->onSuccess = onSubscribe;
        R->onFailure = onSubscribeFailure;
   
  +#ifdef       DYING
        mqtt->finished = 0;
        rc = check(mqtt, "subscribe",
                MQTTAsync_subscribe(mqtt->I, subtopic, subqos, R));
        while (rc == 0 && !mqtt->finished)
            usleep(100);
  -
  -     ret = (rc ? -1 : 0);
  -
  +#else
  +     rc = rpmmqttSubscribe(mqtt, subtopic, subqos);
  +#endif
        subtopic = _free(subtopic);
  -
  +     if (rc)
  +         goto exit;
  +     rc = RPMRC_OK;
       }
   
   exit:
   #endif       /* WITH_MQTT */
   
  -SPEW((stderr, "<-- %s(%p,%p[%u]) ret %d\n", __FUNCTION__, mqtt, s, 
(unsigned)ns, (int)ret));
  -    return ret;
  +SPEW((stderr, "<-- %s(%p,%p[%u]) ret %d\n", __FUNCTION__, mqtt, s, 
(unsigned)ns, (int)rc));
  +    return rc;
   }
   
   /*==============================================================*/
  @@ -1623,9 +1781,9 @@
   
       if (topics) {
        int nsubs = argvCount((ARGV_t)topics);
  -#ifdef       NOTYET  /* XXX MQTT segfault if done here. */
  +#ifndef      NOTYET  /* XXX MQTT segfault if done here. */
   argvPrint(__FUNCTION__, (ARGV_t)topics, NULL);
  -     xx = rpmmqttSubscribeMany(mqtt, nsubs, topics);
  +     xx = rpmmqttSubscribeMany(mqtt, nsubs, (char **)topics);
   #else
        for (int i = 0; i < nsubs; i++) {
            xx = rpmmqttSub(mqtt, topics[i], 0);
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmqtt.h
  ============================================================================
  $ cvs diff -u -r1.1.2.14 -r1.1.2.15 rpmmqtt.h
  --- rpm/rpmio/rpmmqtt.h       5 Jul 2016 19:48:14 -0000       1.1.2.14
  +++ rpm/rpmio/rpmmqtt.h       6 Jul 2016 09:06:25 -0000       1.1.2.15
  @@ -172,16 +172,24 @@
    */
   rpmmqtt rpmmqttNew(char ** av, uint32_t flags);
   
  -int rpmmqttConnect(rpmmqtt mqtt);
  +rpmRC rpmmqttConnect(rpmmqtt mqtt);
   
  -int rpmmqttDisconnect(rpmmqtt mqtt);
  +rpmRC rpmmqttDisconnect(rpmmqtt mqtt);
   
  -int rpmmqttSendMessage(rpmmqtt mqtt, const char * topic,
  +rpmRC rpmmqttSendMessage(rpmmqtt mqtt, const char * topic,
                const char *s, size_t ns);
   
  -int rpmmqttPub(rpmmqtt mqtt, const char * topic, const char *s, size_t ns);
  +rpmRC rpmmqttSubscribeMany(rpmmqtt mqtt, int ac, char ** av);
   
  -int rpmmqttSub(rpmmqtt mqtt, const char *s, size_t ns);
  +rpmRC rpmmqttUnsubscribeMany(rpmmqtt mqtt, int ac, char ** av);
  +
  +rpmRC rpmmqttSubscribe(rpmmqtt mqtt, const char *topic, int qos);
  +
  +rpmRC rpmmqttUnsubscribe(rpmmqtt mqtt, const char *topic);
  +
  +rpmRC rpmmqttPub(rpmmqtt mqtt, const char * topic, const char *s, size_t ns);
  +
  +rpmRC rpmmqttSub(rpmmqtt mqtt, const char *s, size_t ns);
   
   rpmRC rpmmqttRun(rpmmqtt mqtt, const char * str, const char ** resultp);
   
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/tmqtt.c
  ============================================================================
  $ cvs diff -u -r1.1.2.10 -r1.1.2.11 tmqtt.c
  --- rpm/rpmio/tmqtt.c 5 Jul 2016 14:49:11 -0000       1.1.2.10
  +++ rpm/rpmio/tmqtt.c 6 Jul 2016 09:06:25 -0000       1.1.2.11
  @@ -25,23 +25,30 @@
   
   static int _DoMQTT(rpmmqtt mqtt)
   {
  -    ssize_t nw;
  +    rpmRC xx;
       int rc = 0;
  -
  +static char *av[] = { "foo", "bar", "baz", NULL };
  +int ac = 3;
  +int qos = 2;
  +
  +    xx = rpmmqttConnect(mqtt);
  +    xx = rpmmqttSubscribe(mqtt, av[0], qos);
  +    xx = rpmmqttUnsubscribe(mqtt, av[0]);
  +    xx = rpmmqttSubscribeMany(mqtt, ac, av);
  +    xx = rpmmqttUnsubscribeMany(mqtt, ac, av);
  +    xx = rpmmqttDisconnect(mqtt);
  + 
   #ifdef       DYING
       (void) rpmmqttSub(mqtt, "rpm/#?qos=0", 0);
       (void) rpmmqttSub(mqtt, "$SYS/broker/version?qos=0", 0);
   #endif
   
  -    nw = rpmmqttPub(mqtt, NULL, "bzzt ...", 0);
  -    nw = rpmmqttPub(mqtt, NULL, "bzzT ...", 0);
  -    nw = rpmmqttPub(mqtt, NULL, "bzZT ...", 0);
  -    nw = rpmmqttPub(mqtt, NULL, "bZZT ...", 0);
  -    nw = rpmmqttPub(mqtt, NULL, "BZZT ...", 0);
  -    (void) rpmmqttDisconnect(mqtt);
  -    nw = rpmmqttPub(mqtt, NULL, "SWAT !!!", 0);
  -    (void) rpmmqttDisconnect(mqtt);
  -    (void) rpmmqttConnect(mqtt);
  +    xx = rpmmqttPub(mqtt, NULL, "bzzt ...", 0);
  +    xx = rpmmqttPub(mqtt, NULL, "bzzT ...", 0);
  +    xx = rpmmqttPub(mqtt, NULL, "bzZT ...", 0);
  +    xx = rpmmqttPub(mqtt, NULL, "bZZT ...", 0);
  +    xx = rpmmqttPub(mqtt, NULL, "BZZT ...", 0);
  +    xx = rpmmqttPub(mqtt, NULL, "SWAT !!!", 0);
   
       return rc;
   }
  @@ .
______________________________________________________________________
RPM Package Manager                                    http://rpm5.org
CVS Sources Repository                                rpm-cvs@rpm5.org

Reply via email to