RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: j...@rpm5.org Module: rpm Date: 14-Jul-2016 20:52:27 Branch: rpm-5_4 Handle: 2016071418522700 Modified files: (Branch: rpm-5_4) rpm/rpmio macro.c rpmmqtt.c rpmmqtt.h Log: - mqtt: lily gilding. Summary: Revision Changes Path 2.249.2.42 +17 -1 rpm/rpmio/macro.c 1.1.2.25 +316 -315 rpm/rpmio/rpmmqtt.c 1.1.2.22 +4 -0 rpm/rpmio/rpmmqtt.h ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/rpmio/macro.c ============================================================================ $ cvs diff -u -r2.249.2.41 -r2.249.2.42 macro.c --- rpm/rpmio/macro.c 14 Jul 2016 11:09:49 -0000 2.249.2.41 +++ rpm/rpmio/macro.c 14 Jul 2016 18:52:27 -0000 2.249.2.42 @@ -14,6 +14,7 @@ #define iseol(_c) ((char)(_c) == '\n' || (char)(_c) == '\r') #define STREQ(_t, _f, _fn) ((_fn) == (sizeof(_t)-1) && !strncmp((_t), (_f), (_fn))) +#define MEMEM(_t, _f, _fn) memem((_f), (_fn), (_t), sizeof(_t)-1) #ifdef DEBUG_MACROS #undef WITH_LUA /* XXX fixme */ @@ -2301,7 +2302,22 @@ #if defined(WITH_PAHO) || defined(WITH_MOSQUITTO) || defined(WITH_RABBITMQ) || defined(WITH_PROTON) || defined(WITH_ZEROMQ) if (STREQ("mqtt", f, fn) || STREQ("pub", f, fn) - || STREQ("sub", f, fn)) + || STREQ("sub", f, fn) + || STREQ("paho", f, fn) + || STREQ("mosquitto", f, fn) + || STREQ("amqp", f, fn) + || STREQ("qpid", f, fn) + || STREQ("zmq", f, fn) + || STREQ("paho_pub", f, fn) + || STREQ("amqp_pub", f, fn) + || STREQ("mosquitto_pub", f, fn) + || STREQ("qpid_pub", f, fn) + || STREQ("zmq_pub", f, fn) + || STREQ("paho_sub", f, fn) + || STREQ("amqp_sub", f, fn) + || STREQ("mosquitto_sub", f, fn) + || STREQ("qpid_sub", f, fn) + || STREQ("zmq_sub", f, fn)) { RPMIOPOOL_INTERP_EXPAND(mqtt) continue; @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.c ============================================================================ $ cvs diff -u -r1.1.2.24 -r1.1.2.25 rpmmqtt.c --- rpm/rpmio/rpmmqtt.c 14 Jul 2016 12:59:21 -0000 1.1.2.24 +++ rpm/rpmio/rpmmqtt.c 14 Jul 2016 18:52:27 -0000 1.1.2.25 @@ -150,6 +150,14 @@ _ENTRY(RETAIN), _ENTRY(WILL_RETAIN), _ENTRY(BUFFER), + _ENTRY(DURABLE), + _ENTRY(EXCLUSIVE), + _ENTRY(NOACK), + _ENTRY(IFEMPTY), + _ENTRY(IFUNUSED), + _ENTRY(PASSIVE), + _ENTRY(AUTODELETE), + _ENTRY(NOLOCAL), #undef _ENTRY }; static size_t nMqttFlags = sizeof(MqttFlags) / sizeof(MqttFlags[0]); @@ -1172,36 +1180,34 @@ rpmRC rc = RPMRC_FAIL; /* assume failure */ int xx; -#ifdef DYING -mqtt->trace = 4; /* XXX */ -#endif - if (mqtt->trace && rpmIsDebug()) { - xx = mqchk(mqtt, "setTraceCallback", + if (mqtt->I == NULL) { + if (mqtt->trace && rpmIsDebug()) { + xx = mqchk(mqtt, "setTraceCallback", (MQTTAsync_setTraceCallback(onTrace), 0)); - xx = mqchk(mqtt, "setTraceLevel", + xx = mqchk(mqtt, "setTraceLevel", (MQTTAsync_setTraceLevel((enum MQTTASYNC_TRACE_LEVELS)mqtt->trace), 0)); - } + } - rpmlog(_lvl, "==================== %s\n", mqtt->vec->name); + rpmlog(_lvl, "==================== %s\n", mqtt->vec->name); - if (!oneshot) { - if (mqtt->trace == 0) { - MQTTAsync_nameValue *NV = MQTTAsync_getVersionInfo(); - while (NV->name) { - rpmlog(_lvl, "%19s: %s\n", NV->name, NV->value); - NV++; + if (!oneshot) { + if (mqtt->trace == 0) { + MQTTAsync_nameValue *NV = MQTTAsync_getVersionInfo(); + while (NV->name) { + rpmlog(_lvl, "%19s: %s\n", NV->name, NV->value); + NV++; + } } + oneshot++; } - oneshot++; - } - { - char portstr[32] = ""; - (void) snprintf(portstr, sizeof(portstr), "%d", - (mqtt->port ? mqtt->port : 1883)); - (void) rpmmqttExpand(mqtt, &mqtt->uri, - "tcp://", mqtt->host, ":", portstr, NULL); - } + { + char portstr[32] = ""; + (void) snprintf(portstr, sizeof(portstr), "%d", + (mqtt->port ? mqtt->port : 1883)); + (void) rpmmqttExpand(mqtt, &mqtt->uri, + "tcp://", mqtt->host, ":", portstr, NULL); + } /* XXX set through uri?query */ static const char _mqtt_persist[] = @@ -1211,45 +1217,50 @@ "%{?_mqtt_cachedir}%{!?_mqtt_cachedir:/var/cache/mqtt}"; char *persist_path = rpmGetPath(_mqtt_cachedir, NULL); - rpmlog(_lvl, "%19s: %s\n", "uri", mqtt->uri); - rpmlog(_lvl, "%19s: %s\n", "host", mqtt->host); - rpmlog(_lvl, "%19s: %d\n", "port", mqtt->port); - rpmlog(_lvl, "%19s: %s\n", "user", mqtt->user); - rpmlog(_lvl, "%19s: %s\n", "pass", mqtt->password); + if (strcmp(mqtt->uri, "tcp://localhost:1883")) + rpmlog(_lvl, "%19s: %s\n", "uri", mqtt->uri); + if (strcmp(mqtt->host, "localhost")) + rpmlog(_lvl, "%19s: %s\n", "host", mqtt->host); + if (mqtt->port != 1883) + rpmlog(_lvl, "%19s: %d\n", "port", mqtt->port); + if (mqtt->user) + rpmlog(_lvl, "%19s: %s\n", "user", mqtt->user); + if (mqtt->password) + rpmlog(_lvl, "%19s: %s\n", "pass", mqtt->password); - rpmlog(_lvl, "%19s: %s\n", "clientid", mqtt->clientid); - rpmlog(_lvl, "%19s: %s qos(%d) timeout(%u msecs)\n", + rpmlog(_lvl, "%19s: %s\n", "clientid", mqtt->clientid); + rpmlog(_lvl, "%19s: %s qos(%d) timeout(%u msecs)\n", "topic", mqtt->topic, mqtt->qos, mqtt->timeout); - rpmlog(_lvl, "%19s: type(%u) %s\n", + rpmlog(_lvl, "%19s: type(%u) %s\n", "persist", mqtt->persist_type, (mqtt->persist_type ? persist_path : "")); - rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags)); - if (mqtt->trace) - rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace); + rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags)); + if (mqtt->trace) + rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace); mqtt->persist_path = _free(mqtt->persist_path); mqtt->persist_ctx = _free(mqtt->persist_ctx); - switch (mqtt->persist_type) { - default: - case MQTTCLIENT_PERSISTENCE_NONE: - mqtt->persist_ctx = NULL; - break; - case MQTTCLIENT_PERSISTENCE_DEFAULT: - { - mqtt->persist_path = xstrdup(persist_path); - /* XXX rpmmqttFini double free */ - mqtt->persist_ctx = (void *)xstrdup(mqtt->persist_path); - } break; - case MQTTCLIENT_PERSISTENCE_USER: - { - mqtt->persist_path = xstrdup(persist_path); - MQTTClient_persistence * ctx = + switch (mqtt->persist_type) { + default: + case MQTTCLIENT_PERSISTENCE_NONE: + mqtt->persist_ctx = NULL; + break; + case MQTTCLIENT_PERSISTENCE_DEFAULT: + { + mqtt->persist_path = xstrdup(persist_path); + /* XXX rpmmqttFini double free */ + mqtt->persist_ctx = (void *)xstrdup(mqtt->persist_path); + } break; + case MQTTCLIENT_PERSISTENCE_USER: + { + mqtt->persist_path = xstrdup(persist_path); + MQTTClient_persistence * ctx = (MQTTClient_persistence *) xmalloc(sizeof(*ctx)); - *ctx = _mqtt_persistence; /* structure assignment */ - ctx->context = mqtt; - mqtt->persist_ctx = ctx; - } break; - } + *ctx = _mqtt_persistence; /* structure assignment */ + ctx->context = mqtt; + mqtt->persist_ctx = ctx; + } break; + } persist_path = _free(persist_path); #ifdef DYING @@ -1257,7 +1268,6 @@ dumpMQTT(__FUNCTION__, mqtt); #endif - if (mqtt->I == NULL) { xx = mqchk(mqtt, "createWithOptions", MQTTAsync_createWithOptions(&mqtt->I, mqtt->uri, mqtt->clientid, @@ -1716,38 +1726,36 @@ int _lvl = RPMLOG_DEBUG; int xx; -#ifdef DYING -mqtt->trace = 4; /* XXX */ -#endif + if (mqtt->I == NULL) { - rpmlog(_lvl, "==================== %s\n", mqtt->vec->name); + rpmlog(_lvl, "==================== %s\n", mqtt->vec->name); - if (!oneshot) { - int major = 0; - int minor = 0; - int revision = 0; - int version = mosquitto_lib_version(&major, &minor, &revision); - rpmlog(_lvl, "%19s: %d.%d.%d (%d)\n", "version", - major, minor, revision, version); - oneshot++; - } + if (!oneshot) { + int major = 0; + int minor = 0; + int revision = 0; + int version = 0; - rpmlog(_lvl, "%19s: %s\n", "clientid", mqtt->clientid); - rpmlog(_lvl, "%19s: %s qos(%d) timeout(%u msecs)\n", + xx = mqchk(mqtt, "lib_init", + mosquitto_lib_init()); + version = mosquitto_lib_version(&major, &minor, &revision); + rpmlog(_lvl, "%19s: %d.%d.%d (%d)\n", "version", + major, minor, revision, version); + oneshot++; + } + + rpmlog(_lvl, "%19s: %s\n", "clientid", mqtt->clientid); + rpmlog(_lvl, "%19s: %s qos(%d) timeout(%u msecs)\n", "topic", mqtt->topic, mqtt->qos, mqtt->timeout); - rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags)); - if (mqtt->trace) - rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace); + rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags)); + if (mqtt->trace) + rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace); #ifdef DYING mqtt->u = NULL; dumpMQTT(__FUNCTION__, mqtt); #endif - xx = mqchk(mqtt, "lib_init", - mosquitto_lib_init()); - - if (mqtt->I == NULL) { xx = mqchk(mqtt, "new", (mqtt->I = mosquitto_new(mqtt->clientid, (MF_ISSET(CLEAN) ? true : false), @@ -1759,14 +1767,13 @@ mosquitto_message_callback_set(mqtt->I, mosqOnMessage); mosquitto_subscribe_callback_set(mqtt->I, mosqOnSubscribe); mosquitto_unsubscribe_callback_set(mqtt->I, mosqOnUnsubscribe); - } #ifdef NOTYET - if (mqtt->trace && rpmIsDebug()) + if (mqtt->trace && rpmIsDebug()) #else - if (mqtt->trace || mqtt->debug) + if (mqtt->trace || mqtt->debug) #endif - mosquitto_log_callback_set(mqtt->I, mosqOnLog); + mosquitto_log_callback_set(mqtt->I, mosqOnLog); /* * Example 1: @@ -1777,15 +1784,17 @@ * delay=3, delay_max=30, exponential_backoff=True * Delays would be: 3, 6, 12, 24, 30, 30, ... */ - xx = mqchk(mqtt, "reconnect_delay_set", + xx = mqchk(mqtt, "reconnect_delay_set", mosquitto_reconnect_delay_set(mqtt->I, 2, 10, false)); - xx = mqchk(mqtt, "user_data_set", + xx = mqchk(mqtt, "user_data_set", (mosquitto_user_data_set(mqtt->I, mqtt), 0)); - xx = mqchk(mqtt, "loop_start", + xx = mqchk(mqtt, "loop_start", mosquitto_loop_start(mqtt->I)); + } + rc = RPMRC_OK; return rc; @@ -2188,7 +2197,6 @@ #define mqchkrpc(_o, _m, _rc) \ Xmqchkrpc(_o, _m, _rc, _rpmmqtt_debug, __FUNCTION__, __FILE__, __LINE__) -#ifdef REFERENCE static void dump_row(long count, int numinrow, int *chs) { int i; @@ -2277,7 +2285,6 @@ printf("%08lX:\n", count); } } -#endif /* REFERENCE */ static rpmRC amqpDestroy(rpmmqtt mqtt) @@ -2297,68 +2304,74 @@ static int oneshot; int _lvl = RPMLOG_DEBUG; rpmRC rc = RPMRC_FAIL; /* assume failure */ - int xx; - -#ifdef DYING -mqtt->trace = 4; /* XXX */ -#endif - - rpmlog(_lvl, "==================== %s\n", mqtt->vec->name); - if (!oneshot) { - int major = AMQP_VERSION_MAJOR; - int minor = AMQP_VERSION_MINOR; - int patch = AMQP_VERSION_PATCH; - int version = AMQP_VERSION; - rpmlog(_lvl, "%19s: %d.%d.%d (%d)\n", "version", - major, minor, patch, version); - oneshot++; - } + if (mqtt->I == NULL) { + rpmlog(_lvl, "==================== %s\n", mqtt->vec->name); - rpmlog(_lvl, "%19s: %s\n", "vhost", mqtt->vhost); - rpmlog(_lvl, "%19s: %s\n", "queue", mqtt->queue); - rpmlog(_lvl, "%19s: %s\n", "exchange", mqtt->exchange); - rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags)); - if (mqtt->trace) - rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace); + if (!oneshot) { + int major = AMQP_VERSION_MAJOR; + int minor = AMQP_VERSION_MINOR; + int patch = AMQP_VERSION_PATCH; + int version = AMQP_VERSION; + rpmlog(_lvl, "%19s: %d.%d.%d (0x%x)\n", "version", + major, minor, patch, version); + oneshot++; + } + + rpmlog(_lvl, "%19s: %s\n", "vhost", mqtt->vhost); + rpmlog(_lvl, "%19s: %s\n", "queue", mqtt->queue); + rpmlog(_lvl, "%19s: %s\n", "exchange", mqtt->exchange); + rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags)); + if (mqtt->trace) + rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace); #ifdef DYING mqtt->u = NULL; dumpMQTT(__FUNCTION__, mqtt); #endif - if (mqtt->I == NULL) { amqp_socket_t *socket = NULL; int port; - xx = mqchk(mqtt, "new_connection", + rc = mqchk(mqtt, "new_connection", ((mqtt->I = amqp_new_connection()) == NULL)); if (mqtt->cacertfile || mqtt->_capath) { port = mqtt->vec->sport; /* XXX elsewhere */ - xx = mqchk(mqtt, "ssl_socket_new", + rc = mqchk(mqtt, "ssl_socket_new", ((socket = amqp_ssl_socket_new(mqtt->I)) == NULL)); - if (socket == NULL) + if (rc || socket == NULL) goto exit; +#ifdef NOTYET + amqp_ssl_socket_set_verify_peer(socket, 1); + amqp_ssl_socket_set_verify_hostname(socket, 1); +#endif if (mqtt->cacertfile) - xx = mqchk(mqtt, "ssl_socket_set_cacert", + rc = mqchk(mqtt, "ssl_socket_set_cacert", amqp_ssl_socket_set_cacert(mqtt->I, mqtt->cacertfile)); if (mqtt->keyfile) - xx = mqchk(mqtt, "ssl_socket_set_key", + rc = mqchk(mqtt, "ssl_socket_set_key", amqp_ssl_socket_set_key(mqtt->I, mqtt->certfile, mqtt->keyfile)); } else { port = mqtt->vec->port; /* XXX elsewhere */ - xx = mqchk(mqtt, "tcp_socket_new", + rc = mqchk(mqtt, "tcp_socket_new", ((socket = amqp_tcp_socket_new(mqtt->I)) == NULL)); - if (socket == NULL) + if (rc || socket == NULL) goto exit; } -if (_rpmmqtt_debug) -fprintf(stderr, "*** socket_open(%p,\"%s:%d\")\n", socket, mqtt->host, port); - xx = mqchk(mqtt, "socket_open", +#ifdef NOTYET + { strict timeval tv = { mqtt->connect_timeout, 0 }; + rc = mqchk(mqtt, "socket_open_noblock", + amqp_socket_open(socket, mqtt->host, port, &tv)); + + } +#endif + rc = mqchk(mqtt, "socket_open", amqp_socket_open(socket, mqtt->host, port)); + if (rc) + goto exit; } @@ -2374,12 +2387,14 @@ rpmRC rc = RPMRC_FAIL; /* assume failure */ if (rpmmqttIsConnected(mqtt) == RPMRC_OK) { + amqp_channel_t _channel = 1; + int _code = AMQP_REPLY_SUCCESS; rc = mqchkrpc(mqtt, "channel_close", - amqp_channel_close(mqtt->I, 1, AMQP_REPLY_SUCCESS)); + amqp_channel_close(mqtt->I, _channel, _code)); if (rc) goto exit; rc = mqchkrpc(mqtt, "connection_close", - amqp_connection_close(mqtt->I, AMQP_REPLY_SUCCESS)); + amqp_connection_close(mqtt->I, _code)); if (rc) goto exit; mqtt->connected = 0; @@ -2395,26 +2410,35 @@ rpmRC rc = RPMRC_FAIL; /* assume failure */ if (rpmmqttIsConnected(mqtt) == RPMRC_NOTFOUND) { - amqp_channel_open_ok_t * open_ok = NULL; - const char * username = "guest"; /* XXX elsewhere */ - const char * password = "guest"; /* XXX elsewhere */ + int _channel_max = AMQP_DEFAULT_MAX_CHANNELS; + int _frame_max = AMQP_DEFAULT_FRAME_SIZE; + const amqp_table_t *_properties = NULL; + /* XXX AMQP_SASL_METHOD_EXTERNAL uses stdargs */ + amqp_sasl_method_enum _sasl_method = + (mqtt->_sasl_mechanism == NULL + || !strcmp(mqtt->_sasl_mechanism, "plain") + ? AMQP_SASL_METHOD_PLAIN + : AMQP_SASL_METHOD_PLAIN); /* XXX EXTERNAL */ -if (_rpmmqtt_debug) -fprintf(stderr, "*** login(%p, \"%s\", %d,%d,%d, \"%s:%s\")\n", mqtt->I, mqtt->vhost, 0, 131072, mqtt->keepalive, username, password); - rc = mqchkrpc(mqtt, "login", - amqp_login(mqtt->I, + rc = mqchkrpc(mqtt, "login_with_properties", + amqp_login_with_properties(mqtt->I, (mqtt->vhost ? mqtt->vhost : "/"), - 0, 131072, mqtt->keepalive, - AMQP_SASL_METHOD_PLAIN, - username, password)); + _channel_max, + _frame_max, + mqtt->keepalive, + _properties, + _sasl_method, + mqtt->user, + mqtt->password)); if (rc) goto exit; + amqp_channel_open_ok_t * open_ok = NULL; + amqp_channel_t _channel = 1; rc = mqchk(mqtt, "channel_open", - ((open_ok = amqp_channel_open(mqtt->I, 1)), + ((open_ok = amqp_channel_open(mqtt->I, _channel)), 0)); - - rc = mqchkrpc(mqtt, "get_rpc_reply", + rc = mqchkrpc(mqtt, "get_rpc_reply", amqp_get_rpc_reply(mqtt->I)); if (rc) @@ -2448,18 +2472,31 @@ ns = strlen(s); if (!rpmmqttConnect(mqtt)) { + amqp_channel_t _channel = 1; + amqp_boolean_t _mandatory = false; + amqp_boolean_t _immediate = false; + struct amqp_basic_properties_t_ const *_properties = NULL; + amqp_bytes_t message_bytes; message_bytes.len = ns; message_bytes.bytes = (char *) s; +#ifdef NOTYET + amqp_basic_properties_t props; + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; + props.content_type = amqp_cstring_bytes("text/plain"); + props.delivery_mode = 2; /* persistent delivery mode */ + _properties = &props; +#endif + rc = mqchk(mqtt, "basic_publish", amqp_basic_publish(mqtt->I, - 1, + _channel, amqp_cstring_bytes(mqtt->exchange), amqp_cstring_bytes(mqtt->queue), - 0, - 0, - NULL, + _mandatory, + _immediate, + _properties, message_bytes)); rpmlog(RPMLOG_DEBUG, "%s: send() exchange(%s) queue(%s) \"%.*s\"\n", @@ -2482,41 +2519,110 @@ if (!rpmmqttConnect(mqtt)) { int _lvl = RPMLOG_DEBUG; +fprintf(stderr, "XXX %s: FIXME\n", __FUNCTION__); rpmlog(_lvl, "%19s: %s qos(%d)\n", "subscribe", topic, qos); - amqp_bytes_t queuename; + amqp_bytes_t _queuename; + + amqp_channel_t _channel = 1; + amqp_bytes_t _queue = amqp_empty_bytes; + amqp_boolean_t _passive = (MF_ISSET(PASSIVE) ? true : false); + amqp_boolean_t _durable = (MF_ISSET(DURABLE) ? true : false); + amqp_boolean_t _exclusive = (MF_ISSET(EXCLUSIVE) ? true : false); +#ifdef NOTYET + amqp_boolean_t _auto_delete = (MF_ISSET(AUTODELETE) ? true : false); +#else + amqp_boolean_t _auto_delete = true; +#endif + amqp_table_t _arguments = amqp_empty_table; { amqp_queue_declare_ok_t *r = - amqp_queue_declare(mqtt->I, 1, amqp_empty_bytes, - 0, 0, 0, 1, amqp_empty_table); + amqp_queue_declare(mqtt->I, + _channel, + _queue, + _passive, + _durable, + _exclusive, + _auto_delete, + _arguments); rc = mqchkrpc(mqtt, "queue_declare", - amqp_get_rpc_reply(mqtt->I)); + amqp_get_rpc_reply(mqtt->I)); if (rc) goto exit; - queuename = amqp_bytes_malloc_dup(r->queue); -assert(queuename.bytes); + _queuename = amqp_bytes_malloc_dup(r->queue); +assert(_queuename.bytes); } - const char * _bindingkey = "test queue"; /* XXX W2DO? */ + const char * _routing_key = "test queue"; /* XXX W2DO? */ amqp_queue_bind(mqtt->I, - 1, - queuename, + _channel, + _queuename, amqp_cstring_bytes(mqtt->exchange), - amqp_cstring_bytes(_bindingkey), - amqp_empty_table); + amqp_cstring_bytes(_routing_key), + _arguments); rc = mqchkrpc(mqtt, "queue_bind", - amqp_get_rpc_reply(mqtt->I)); + amqp_get_rpc_reply(mqtt->I)); if (rc) goto exit; - amqp_basic_consume(mqtt->I, 1, queuename, amqp_empty_bytes, - 0, 1, 0, amqp_empty_table); + amqp_bytes_t _consumer_tag = amqp_empty_bytes; + amqp_boolean_t _no_local = (MF_ISSET(NOLOCAL) ? true : false); +#ifdef NOTYET + amqp_boolean_t _no_ack = (MF_ISSET(NOACK) ? true : false); +#else + amqp_boolean_t _no_ack = true; +#endif + amqp_basic_consume(mqtt->I, + _channel, + _queuename, + _consumer_tag, + _no_local, + _no_ack, + _exclusive, + _arguments); rc = mqchkrpc(mqtt, "basic_consume", - amqp_get_rpc_reply(mqtt->I)); + amqp_get_rpc_reply(mqtt->I)); if (rc) goto exit; - /* XXX todo++ */ +#ifdef NOTYET /* XXX make asynchronous */ + for (;;) { + amqp_rpc_reply_t res; + amqp_envelope_t envelope; + struct timeval *_tvp = NULL; + int _flags = 0; + + rc = mqchk(mqtt, "maybe_release_buffers", + (amqp_maybe_release_buffers(mqtt->I),0)); + + rc = mqchkrpc(mqtt, "consume_message", + (res = amqp_consume_message(mqtt->I, + &envelope, _tvp, _flags))); + if (rc) + break; + + fprintf(stderr, "Delivery %u, exchange %.*s routingkey %.*s\n", + (unsigned)envelope.delivery_tag, + (int)envelope.exchange.len, + (char *)envelope.exchange.bytes, + (int)envelope.routing_key.len, + (char *)envelope.routing_key.bytes); + + if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { + fprintf(stderr, "Content-type: %.*s\n", + (int) envelope.message.properties.content_type.len, + (char *)envelope.message.properties.content_type.bytes); + } + fprintf(stderr, "----\n"); + + amqp_dump(envelope.message.body.bytes, envelope.message.body.len); + + amqp_destroy_envelope(&envelope); + + } + if (rc) + goto exit; +#endif rc = RPMRC_OK; } @@ -2530,7 +2636,6 @@ { rpmRC rc = RPMRC_FAIL; /* assume failure */ -SPEW((stderr, "--> %s\n", __FUNCTION__)); if (!rpmmqttConnect(mqtt)) { int _lvl = RPMLOG_DEBUG; @@ -2582,7 +2687,7 @@ static struct mqttVec_s amqpVec = { .name = "amqp", - .uri = "amqp://localhost:5672/" /* XXX vhost? */ + .uri = "amqp://guest:guest@localhost:5672/" /* XXX vhost? */ "?vhost=/" /* XXX vhost? */ "&exchange=amq.direct" "&queue=test queue" @@ -2635,45 +2740,38 @@ static int oneshot; int _lvl = RPMLOG_DEBUG; rpmRC rc = RPMRC_FAIL; /* assume failure */ - int xx; -#ifdef DYING -mqtt->trace = 4; /* XXX */ -#endif - - rpmlog(_lvl, "==================== %s\n", mqtt->vec->name); + if (mqtt->I == NULL) { + rpmlog(_lvl, "==================== %s\n", mqtt->vec->name); - if (!oneshot) { + if (!oneshot) { #ifdef NOTYET - int major = AMQP_VERSION_MAJOR; - int minor = AMQP_VERSION_MINOR; - int patch = AMQP_VERSION_PATCH; - int version = AMQP_VERSION; - rpmlog(_lvl, "%19s: %d.%d.%d (%d)\n", "version", + int major = AMQP_VERSION_MAJOR; + int minor = AMQP_VERSION_MINOR; + int patch = AMQP_VERSION_PATCH; + int version = AMQP_VERSION; + rpmlog(_lvl, "%19s: %d.%d.%d (%d)\n", "version", major, minor, patch, version); #endif - oneshot++; - } + oneshot++; + } - rpmlog(_lvl, "%19s: %s\n", "vhost", mqtt->vhost); - rpmlog(_lvl, "%19s: %s\n", "queue", mqtt->queue); - rpmlog(_lvl, "%19s: %s\n", "exchange", mqtt->exchange); - rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags)); - if (mqtt->trace) - rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace); + rpmlog(_lvl, "%19s: %s\n", "vhost", mqtt->vhost); + rpmlog(_lvl, "%19s: %s\n", "queue", mqtt->queue); + rpmlog(_lvl, "%19s: %s\n", "exchange", mqtt->exchange); + rpmlog(_lvl, "%19s: %s\n", "flags", _MQTTFLAGS(mqtt->flags)); + if (mqtt->trace) + rpmlog(_lvl, "%19s: %d\n", "trace", mqtt->trace); #ifdef DYING mqtt->u = NULL; -#endif dumpMQTT(__FUNCTION__, mqtt); +#endif - if (mqtt->I == NULL) { - - xx = mqchk(mqtt, "messenger", + rc = mqchk(mqtt, "messenger", ((mqtt->I = pn_messenger(NULL)) == NULL)); - xx = mqchk(mqtt, "messenger_start", + rc = mqchk(mqtt, "messenger_start", pn_messenger_start(mqtt->I)); - } rc = RPMRC_OK; @@ -2686,21 +2784,11 @@ { rpmRC rc = RPMRC_FAIL; /* assume failure */ -#ifdef NOTYET if (rpmmqttIsConnected(mqtt) == RPMRC_OK) { - rc = mqchkrpc(mqtt, "channel_close", - qpid_channel_close(mqtt->I, 1, AMQP_REPLY_SUCCESS)); - if (rc) - goto exit; - rc = mqchkrpc(mqtt, "connection_close", - qpid_connection_close(mqtt->I, AMQP_REPLY_SUCCESS)); - if (rc) - goto exit; +fprintf(stderr, "XXX %s: FIXME\n", __FUNCTION__); mqtt->connected = 0; } -#else rc = RPMRC_OK; -#endif return rc; } @@ -2710,38 +2798,11 @@ { rpmRC rc = RPMRC_FAIL; /* assume failure */ -#ifdef DYING if (rpmmqttIsConnected(mqtt) == RPMRC_NOTFOUND) { - qpid_channel_open_ok_t * open_ok = NULL; - const char * username = "guest"; /* XXX elsewhere */ - const char * password = "guest"; /* XXX elsewhere */ - -if (_rpmmqtt_debug) -fprintf(stderr, "*** login(%p, \"%s\", %d,%d,%d, \"%s:%s\")\n", mqtt->I, mqtt->vhost, 0, 131072, mqtt->keepalive, username, password); - rc = mqchkrpc(mqtt, "login", - qpid_login(mqtt->I, - (mqtt->vhost ? mqtt->vhost : "/"), - 0, 131072, mqtt->keepalive, - AMQP_SASL_METHOD_PLAIN, - username, password)); - if (rc) - goto exit; - - rc = mqchk(mqtt, "channel_open", - ((open_ok = qpid_channel_open(mqtt->I, 1)), - 0)); - - rc = mqchkrpc(mqtt, "get_rpc_reply", - qpid_get_rpc_reply(mqtt->I)); - - if (rc) - goto exit; +fprintf(stderr, "XXX %s: FIXME\n", __FUNCTION__); mqtt->connected = 1; - } else - rc = RPMRC_OK; -#else + } rc = RPMRC_OK; -#endif return rc; } @@ -2749,12 +2810,8 @@ static rpmRC qpidIsConnected(rpmmqtt mqtt) { -#ifdef DYING rpmRC rc = (mqtt->connected ? RPMRC_OK : RPMRC_NOTFOUND); return rc; -#else - return RPMRC_OK; -#endif } static @@ -2762,7 +2819,6 @@ const char * s, size_t ns) { rpmRC rc = RPMRC_FAIL; /* assume failure */ - int xx; if (topic == NULL) topic = mqtt->topic; @@ -2771,54 +2827,45 @@ if (ns == 0) ns = strlen(s); -#ifdef DYING if (!rpmmqttConnect(mqtt)) { - qpid_bytes_t message_bytes; - message_bytes.len = ns; - message_bytes.bytes = (char *) s; - - rc = mqchk(mqtt, "basic_publish", - qpid_basic_publish(mqtt->I, - 1, - qpid_cstring_bytes(mqtt->exchange), - qpid_cstring_bytes(mqtt->queue), - 0, - 0, - NULL, - message_bytes)); - - rpmlog(RPMLOG_DEBUG, "%s: send() exchange(%s) queue(%s) \"%.*s\"\n", - mqtt->vec->name, mqtt->exchange, mqtt->queue, (int)ns, s); - - if (rc) - goto exit; - rc = RPMRC_OK; - } -#else - if (mqtt->M == NULL) - xx = mqchk(mqtt, "message", + if (mqtt->M == NULL) { + rc = mqchk(mqtt, "message", ((mqtt->M = pn_message()) == NULL)); + if (rc) + goto exit; + } + - const char * _address = "amqp://localhost/test"; - xx = mqchk(mqtt, "message_set_address", +fprintf(stderr, "*** %s: host %s topic %s\n", __FUNCTION__, mqtt->host, mqtt->topic); + const char * _address = "amqp://localhost/test"; /* XXX */ + rc = mqchk(mqtt, "message_set_address", pn_message_set_address(mqtt->M, _address)); + { pn_data_t * body = pn_message_body(mqtt->M); - xx = mqchk(mqtt, "data_put_string", + rc = mqchk(mqtt, "data_put_string", pn_data_put_string(body, pn_bytes(ns, s))); - xx = mqchk(mqtt, "messenger_put", + rc = mqchk(mqtt, "messenger_put", pn_messenger_put(mqtt->I, mqtt->M)); #ifdef NOTYET check(messenger); #endif - xx = mqchk(mqtt, "messenger_send", + rc = mqchk(mqtt, "messenger_send", pn_messenger_send(mqtt->I, -1)); + #ifdef NOTYET check(messenger); #endif } + + rpmlog(RPMLOG_DEBUG, "%s: send() exchange(%s) queue(%s) \"%.*s\"\n", + mqtt->vec->name, mqtt->exchange, mqtt->queue, (int)ns, s); + + if (rc) + goto exit; rc = RPMRC_OK; -#endif + } +exit: return rc; } @@ -2827,52 +2874,18 @@ { rpmRC rc = RPMRC_FAIL; /* assume failure */ -#ifdef DYING if (!rpmmqttConnect(mqtt)) { int _lvl = RPMLOG_DEBUG; +fprintf(stderr, "XXX %s: FIXME\n", __FUNCTION__); rpmlog(_lvl, "%19s: %s qos(%d)\n", "subscribe", topic, qos); - qpid_bytes_t queuename; - { qpid_queue_declare_ok_t *r = - qpid_queue_declare(mqtt->I, 1, qpid_empty_bytes, - 0, 0, 0, 1, qpid_empty_table); - rc = mqchkrpc(mqtt, "queue_declare", - qpid_get_rpc_reply(mqtt->I)); - if (rc) - goto exit; - queuename = qpid_bytes_malloc_dup(r->queue); -assert(queuename.bytes); - } - - const char * _bindingkey = "test queue"; /* XXX W2DO? */ - - qpid_queue_bind(mqtt->I, - 1, - queuename, - qpid_cstring_bytes(mqtt->exchange), - qpid_cstring_bytes(_bindingkey), - qpid_empty_table); - rc = mqchkrpc(mqtt, "queue_bind", - qpid_get_rpc_reply(mqtt->I)); if (rc) goto exit; - - qpid_basic_consume(mqtt->I, 1, queuename, qpid_empty_bytes, - 0, 1, 0, qpid_empty_table); - rc = mqchkrpc(mqtt, "basic_consume", - qpid_get_rpc_reply(mqtt->I)); - if (rc) - goto exit; - - /* XXX todo++ */ - rc = RPMRC_OK; } -#else - rc = RPMRC_OK; -#endif +exit: return rc; } @@ -2881,34 +2894,18 @@ { rpmRC rc = RPMRC_FAIL; /* assume failure */ -#ifdef DYING -SPEW((stderr, "--> %s\n", __FUNCTION__)); if (!rpmmqttConnect(mqtt)) { int _lvl = RPMLOG_DEBUG; +fprintf(stderr, "XXX %s: FIXME\n", __FUNCTION__); rpmlog(_lvl, "%19s: %s\n", "unsubscribe", topic); -#ifdef NOTYET - MQTTAsync_responseOptions *R = - (MQTTAsync_responseOptions *) AOBJ(mqtt, 'R'); - R->onSuccess = qpidOnUnsubscribeMany; - R->onFailure = qpidOnUnsubscribeManyFailure; - - mqtt->finished = 0; - rc = mqchk(mqtt, "unsubscribe", - MQTTAsync_unsubscribe(mqtt->I, topic, R)); - while (!mqtt->finished) - usleep(100); -#endif if (rc) goto exit; rc = RPMRC_OK; } exit: -#else - rc = RPMRC_OK; -#endif return rc; } @@ -3985,12 +3982,16 @@ rpmmqtt rpmmqttNew(char ** av, uint32_t flags) { static const char * _av[] = { (char *) "mqtt", NULL }; - rpmmqtt mqtt = (flags & 0x80000000) - ? rpmmqttI() : rpmmqttGetPool(_rpmmqttPool); + static char ** _avp; + rpmmqtt mqtt = NULL; -SPEW((stderr, "--> %s(%p,0x%x)\n", __FUNCTION__, av, flags)); + /* XXX quick-n-dirty means to instantiate _rpmmqttI with parent argv. */ + if ((flags & 0x80000000) && _rpmmqttI == NULL) + _avp = av; + mqtt = (flags & 0x80000000) + ? rpmmqttI() : rpmmqttGetPool(_rpmmqttPool); - /* XXX quick-n-dirty recursion avoidance. */ + if (av == NULL) av = _avp; if (av == NULL) av = (char **) _av; #ifdef DYING @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmqtt.h ============================================================================ $ cvs diff -u -r1.1.2.21 -r1.1.2.22 rpmmqtt.h --- rpm/rpmio/rpmmqtt.h 14 Jul 2016 12:59:21 -0000 1.1.2.21 +++ rpm/rpmio/rpmmqtt.h 14 Jul 2016 18:52:27 -0000 1.1.2.22 @@ -35,6 +35,10 @@ MQTT_FLAGS_IFEMPTY = _MFB(19), /*!< -e,--if-empty */ MQTT_FLAGS_IFUNUSED = _MFB(20), /*!< -u,--if-unused */ + MQTT_FLAGS_PASSIVE = _MFB(21), /*!< XXX W2DO? */ + MQTT_FLAGS_AUTODELETE = _MFB(22), /*!< XXX W2DO? */ + MQTT_FLAGS_NOLOCAL = _MFB(23), /*!< XXX W2DO? */ + } mqttFlags; #define MQTT_FLAGS_DEFAULT ((mqttFlags)(MQTT_FLAGS_CLEAN|MQTT_FLAGS_EOL)) #undef _MFB @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository rpm-cvs@rpm5.org