This is an automated email from the ASF dual-hosted git repository. gsim pushed a commit to branch DISPATCH-1278 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit ecd5cb559aee818fecf436409ab3b7e478d58260 Author: Gordon Sim <g...@redhat.com> AuthorDate: Wed Mar 6 10:30:50 2019 +0000 DISPATCH-1278: initial support for prometheus metrics export --- include/qpid/dispatch/router_core.h | 24 ++++ include/qpid/dispatch/server.h | 5 + python/qpid_dispatch/management/qdrouter.json | 6 + src/connection_manager.c | 4 + src/dispatch.c | 4 + src/dispatch_private.h | 2 + src/http-libwebsockets.c | 170 +++++++++++++++++++++++++- src/router_core/router_core.c | 42 +++++++ src/router_core/router_core_private.h | 12 +- tests/system_tests_http.py | 36 ++++++ 10 files changed, 303 insertions(+), 2 deletions(-) diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index f7d6c51..2f7da25 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -837,4 +837,28 @@ qdr_connection_info_t *qdr_connection_info(bool is_encrypted, int ssl_ssf, bool ssl); + +typedef struct { + size_t connections; + size_t links; + size_t addrs; + size_t routers; + size_t link_routes; + size_t auto_links; + size_t presettled_deliveries; + size_t dropped_presettled_deliveries; + size_t accepted_deliveries; + size_t rejected_deliveries; + size_t released_deliveries; + size_t modified_deliveries; + size_t deliveries_ingress; + size_t deliveries_egress; + size_t deliveries_transit; + size_t deliveries_ingress_route_container; + size_t deliveries_egress_route_container; +} qdr_global_stats_t; +ALLOC_DECLARE(qdr_global_stats_t); +typedef void (*qdr_global_stats_handler_t) (void *context); +void qdr_request_global_stats(qdr_core_t *core, qdr_global_stats_t *stats, qdr_global_stats_handler_t callback, void *context); + #endif diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h index e56519f..94f14cc 100644 --- a/include/qpid/dispatch/server.h +++ b/include/qpid/dispatch/server.h @@ -133,6 +133,11 @@ typedef struct qd_server_config_t { char *protocol_family; /** + * Export metrics. + */ + bool metrics; + + /** * Accept HTTP connections, allow WebSocket "amqp" protocol upgrades. */ bool http; diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index 861d620..e69ab17 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -828,6 +828,12 @@ "deprecationName": "failoverList", "description": "A comma-separated list of failover urls to be supplied to connected clients. Form: [(amqp|amqps|ws|wss)://]host_or_ip[:port]" }, + "metrics": { + "type": "boolean", + "default": true, + "description": "Export metrics in prometheus text format for the router (using path /metrics).", + "create": true + }, "http": { "type": "boolean", "default": false, diff --git a/src/connection_manager.c b/src/connection_manager.c index 98eadd4..8f8cbe7 100644 --- a/src/connection_manager.c +++ b/src/connection_manager.c @@ -312,6 +312,7 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf config->role = qd_entity_get_string(entity, "role"); CHECK(); config->inter_router_cost = qd_entity_opt_long(entity, "cost", 1); CHECK(); config->protocol_family = qd_entity_opt_string(entity, "protocolFamily", 0); CHECK(); + config->metrics = qd_entity_opt_bool(entity, "metrics", true); CHECK(); config->http = qd_entity_opt_bool(entity, "http", false); CHECK(); config->http_root_dir = qd_entity_opt_string(entity, "httpRootDir", false); CHECK(); @@ -323,6 +324,9 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf if (config->http && ! config->http_root_dir) { qd_log(qd->connection_manager->log_source, QD_LOG_INFO, "HTTP service is requested but no httpRootDir specified. The router will serve AMQP-over-websockets but no static content."); } + if (config->metrics && !config->http) { + qd_log(qd->connection_manager->log_source, QD_LOG_INFO, "Metrics can only be exported on listener with http enabled."); + } config->max_frame_size = qd_entity_get_long(entity, "maxFrameSize"); CHECK(); config->max_sessions = qd_entity_get_long(entity, "maxSessions"); CHECK(); diff --git a/src/dispatch.c b/src/dispatch.c index 4906b39..eb2d195 100644 --- a/src/dispatch.c +++ b/src/dispatch.c @@ -367,3 +367,7 @@ void qd_dispatch_free(qd_dispatch_t *qd) void qd_dispatch_router_lock(qd_dispatch_t *qd) { sys_mutex_lock(qd->router->lock); } void qd_dispatch_router_unlock(qd_dispatch_t *qd) { sys_mutex_unlock(qd->router->lock); } + +qdr_core_t* qd_dispatch_router_core(qd_dispatch_t *qd) { + return qd->router->router_core; +} diff --git a/src/dispatch_private.h b/src/dispatch_private.h index f5a089c..a01db9b 100644 --- a/src/dispatch_private.h +++ b/src/dispatch_private.h @@ -124,4 +124,6 @@ void qd_dispatch_unregister_entity(qd_dispatch_t *qd, void *impl); /** Set the agent */ void qd_dispatch_set_agent(qd_dispatch_t *qd, void *agent); +qdr_core_t* qd_dispatch_router_core(qd_dispatch_t *qd); + #endif diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c index b2f0d9c..0690c0c 100644 --- a/src/http-libwebsockets.c +++ b/src/http-libwebsockets.c @@ -19,6 +19,7 @@ #include <qpid/dispatch/atomic.h> #include <qpid/dispatch/amqp.h> +#include <qpid/dispatch/router_core.h> #include <qpid/dispatch/threading.h> #include <qpid/dispatch/timer.h> @@ -95,6 +96,14 @@ typedef struct connection_t { struct lws *wsi; } connection_t; +typedef struct stats_t { + size_t current; + bool headers_sent; + qdr_global_stats_t stats; + qd_http_server_t *server; + struct lws *wsi; +} stats_t; + /* Navigating from WSI pointer to qd objects */ static qd_http_server_t *wsi_server(struct lws *wsi); static qd_http_listener_t *wsi_listener(struct lws *wsi); @@ -106,6 +115,8 @@ static int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); +static int callback_metrics(struct lws *wsi, enum lws_callback_reasons reason, + void *user, void *in, size_t len); static struct lws_protocols protocols[] = { /* HTTP only protocol comes first */ @@ -128,6 +139,11 @@ static struct lws_protocols protocols[] = { callback_amqpws, sizeof(connection_t), }, + { + "http", + callback_metrics, + sizeof(stats_t), + }, { NULL, NULL, 0, 0 } /* terminator */ }; @@ -161,7 +177,7 @@ static int handle_events(connection_t* c) { /* The server has a bounded, thread-safe queue for external work */ typedef struct work_t { - enum { W_NONE, W_LISTEN, W_CLOSE, W_WAKE, W_STOP } type; + enum { W_NONE, W_LISTEN, W_CLOSE, W_WAKE, W_STOP, W_HANDLE_STATS } type; void *value; } work_t; @@ -177,6 +193,7 @@ typedef struct work_queue_t { /* HTTP Server runs in a single thread, communication from other threads via work_queue */ struct qd_http_server_t { qd_server_t *server; + qdr_core_t *core; sys_thread_t *thread; work_queue_t work; qd_log_source_t *log; @@ -230,6 +247,7 @@ struct qd_http_listener_t { qd_http_server_t *server; struct lws_vhost *vhost; struct lws_http_mount mount; + struct lws_http_mount metrics; }; void qd_http_listener_free(qd_http_listener_t *hl) { @@ -283,6 +301,14 @@ static void listener_start(qd_http_listener_t *hl, qd_http_server_t *hs) { m->def = "index.html"; /* Default file name */ m->origin_protocol = LWSMPRO_FILE; /* mount type is a directory in a filesystem */ m->extra_mimetypes = mime_types; + if (config->metrics) { + struct lws_http_mount *metrics = &hl->metrics; + m->mount_next = metrics; + metrics->mountpoint = "/metrics"; + metrics->mountpoint_len = strlen(metrics->mountpoint); + metrics->origin_protocol = LWSMPRO_CALLBACK; + metrics->protocol = "http"; + } struct lws_context_creation_info info = {0}; info.mounts = m; @@ -361,6 +387,143 @@ static void connection_wake(qd_connection_t *qd_conn) } } +static void handle_stats_results(void *context) +{ + stats_t* stats = (stats_t*) context; + qd_http_server_t *hs = stats->server; + if (hs) { + work_t w = { W_HANDLE_STATS, stats->wsi }; + work_push(hs, w); + } +} + +typedef int (*int_metric) (qdr_global_stats_t *stats); +typedef struct metric_definition { + const char* name; + const char* type; + int_metric value; +} metric_definition; + +static int stats_get_connections(qdr_global_stats_t *stats) { return stats->connections; } +static int stats_get_links(qdr_global_stats_t *stats) { return stats->links; } +static int stats_get_addrs(qdr_global_stats_t *stats) { return stats->addrs; } +static int stats_get_routers(qdr_global_stats_t *stats) { return stats->routers; } +static int stats_get_link_routes(qdr_global_stats_t *stats) { return stats->link_routes; } +static int stats_get_auto_links(qdr_global_stats_t *stats) { return stats->auto_links; } +static int stats_get_presettled_deliveries(qdr_global_stats_t *stats) { return stats->presettled_deliveries; } +static int stats_get_dropped_presettled_deliveries(qdr_global_stats_t *stats) { return stats->dropped_presettled_deliveries; } +static int stats_get_accepted_deliveries(qdr_global_stats_t *stats) { return stats->accepted_deliveries; } +static int stats_get_released_deliveries(qdr_global_stats_t *stats) { return stats->released_deliveries; } +static int stats_get_rejected_deliveries(qdr_global_stats_t *stats) { return stats->rejected_deliveries; } +static int stats_get_modified_deliveries(qdr_global_stats_t *stats) { return stats->modified_deliveries; } +static int stats_get_deliveries_ingress(qdr_global_stats_t *stats) { return stats->deliveries_ingress; } +static int stats_get_deliveries_egress(qdr_global_stats_t *stats) { return stats->deliveries_egress; } +static int stats_get_deliveries_transit(qdr_global_stats_t *stats) { return stats->deliveries_transit; } +static int stats_get_deliveries_ingress_route_container(qdr_global_stats_t *stats) { return stats->deliveries_ingress_route_container; } +static int stats_get_deliveries_egress_route_container(qdr_global_stats_t *stats) { return stats->deliveries_egress_route_container; } + +static struct metric_definition metrics[] = { + {"connections", "gauge", stats_get_connections}, + {"links", "gauge", stats_get_links}, + {"addresses", "gauge", stats_get_addrs}, + {"routers", "gauge", stats_get_routers}, + {"link_routes", "gauge", stats_get_link_routes}, + {"auto_links", "gauge", stats_get_auto_links}, + {"presettled_deliveries", "counter", stats_get_presettled_deliveries}, + {"dropped_presettled_deliveries", "counter", stats_get_dropped_presettled_deliveries}, + {"accepted_deliveries", "counter", stats_get_accepted_deliveries}, + {"released_deliveries", "counter", stats_get_released_deliveries}, + {"rejected_deliveries", "counter", stats_get_rejected_deliveries}, + {"modified_deliveries", "counter", stats_get_modified_deliveries}, + {"deliveries_ingress", "counter", stats_get_deliveries_ingress}, + {"deliveries_egress", "counter", stats_get_deliveries_egress}, + {"deliveries_transit", "counter", stats_get_deliveries_transit}, + {"deliveries_ingress_route_container", "counter", stats_get_deliveries_ingress_route_container}, + {"deliveries_egress_route_container", "counter", stats_get_deliveries_egress_route_container} +}; +static size_t metrics_length = sizeof(metrics)/sizeof(metrics[0]); + +static bool write_stats(uint8_t **position, const uint8_t * const end, const char* name, const char* type, int value) +{ + //11 chars + type + 2*name + 20 chars for int + size_t length = 11 + strlen(type) + strlen(name)*2 + 20; + if (end - *position >= length) { + *position += lws_snprintf((char*) *position, end - *position, "# TYPE %s %s\n", name, type); + *position += lws_snprintf((char*) *position, end - *position, "%s %i\n", name, value); + return true; + } else { + return false; + } +} + +static bool write_metric(uint8_t **position, const uint8_t * const end, metric_definition* definition, qdr_global_stats_t* stats) +{ + return write_stats(position, end, definition->name, definition->type, definition->value(stats)); +} + +static int add_header_by_name(struct lws *wsi, const char* name, const char* value, uint8_t** position, uint8_t* end) +{ + return lws_add_http_header_by_name(wsi, (unsigned char*) name, (unsigned char*) value, strlen(value), position, end); +} + +static int callback_metrics(struct lws *wsi, enum lws_callback_reasons reason, + void *user, void *in, size_t len) +{ + qd_http_server_t *hs = wsi_server(wsi); + stats_t *stats = (stats_t*) user; + uint8_t buffer[LWS_PRE + 2048]; + uint8_t *start = &buffer[LWS_PRE], *position = start, *end = &buffer[sizeof(buffer) - LWS_PRE - 1]; + + switch (reason) { + + case LWS_CALLBACK_HTTP: { + stats->wsi = wsi; + stats->server = hs; + //request stats from core thread + qdr_request_global_stats(hs->core, &stats->stats, handle_stats_results, (void*) stats); + return 0; + } + + case LWS_CALLBACK_HTTP_WRITEABLE: { + //encode stats into buffer + if (!stats->headers_sent) { + if (lws_add_http_header_status(wsi, HTTP_STATUS_OK, &position, end) + || add_header_by_name(wsi, "content-type:", "text/plain", &position, end) + || add_header_by_name(wsi, "connection:", "close", &position, end)) + return 1; + if (lws_finalize_http_header(wsi, &position, end)) + return 1; + stats->headers_sent = true; + } + + while (stats->current < metrics_length) { + if (write_metric(&position, end, &metrics[stats->current], &stats->stats)) { + stats->current++; + qd_log(hs->log, QD_LOG_DEBUG, "wrote metric %i of %i", stats->current, metrics_length); + } else { + qd_log(hs->log, QD_LOG_DEBUG, "insufficient space in buffer"); + break; + } + } + int n = stats->current < metrics_length ? LWS_WRITE_HTTP : LWS_WRITE_HTTP_FINAL; + + //write buffer + size_t available = position - start; + if (lws_write(wsi, (unsigned char*) start, available, n) != available) + return 1; + if (n == LWS_WRITE_HTTP_FINAL) { + if (lws_http_transaction_completed(wsi)) return -1; + } else { + lws_callback_on_writable(wsi); + } + return 0; + } + + default: + return 0; + } +} + /* Callbacks for promoted AMQP over WS connections. */ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) @@ -494,6 +657,9 @@ static void* http_thread_run(void* v) { case W_CLOSE: listener_close((qd_http_listener_t*)w.value, hs); break; + case W_HANDLE_STATS: + lws_callback_on_writable((struct lws*) w.value); + break; case W_WAKE: { connection_t *c = w.value; pn_collector_put(c->driver.collector, PN_OBJECT, c->driver.connection, @@ -546,6 +712,7 @@ qd_http_server_t *qd_http_server(qd_server_t *s, qd_log_source_t *log) { hs->context = lws_create_context(&info); hs->server = s; hs->log = log; /* For messages from this file */ + hs->core = 0; // not yet available if (!hs->context) { qd_log(hs->log, QD_LOG_CRITICAL, "No memory starting HTTP server"); qd_http_server_free(hs); @@ -559,6 +726,7 @@ qd_http_server_t *qd_http_server(qd_server_t *s, qd_log_source_t *log) { qd_http_listener_t *qd_http_server_listen(qd_http_server_t *hs, qd_listener_t *li) { + hs->core = qd_dispatch_router_core(qd_server_dispatch(hs->server)); sys_mutex_lock(hs->work.lock); if (!hs->thread) { hs->thread = sys_thread(http_thread_run, hs); diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index c2dbdaa..05fe970 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -748,3 +748,45 @@ void qdr_connection_work_free_CT(qdr_connection_work_t *work) qdr_terminus_free(work->target); free_qdr_connection_work_t(work); } + +static void qdr_post_global_stats_response(qdr_core_t *core, qdr_general_work_t *work) +{ + work->stats_handler(work->context); +} + +static void qdr_global_stats_request_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +{ + qdr_global_stats_t *stats = action->args.stats_request.stats; + stats->addrs = DEQ_SIZE(core->addrs); + stats->links = DEQ_SIZE(core->open_links); + stats->routers = DEQ_SIZE(core->routers); + stats->connections = DEQ_SIZE(core->open_connections); + stats->link_routes = DEQ_SIZE(core->link_routes); + stats->auto_links = DEQ_SIZE(core->auto_links); + stats->presettled_deliveries = core->presettled_deliveries; + stats->dropped_presettled_deliveries = core->dropped_presettled_deliveries; + stats->accepted_deliveries = core->accepted_deliveries; + stats->rejected_deliveries = core->rejected_deliveries; + stats->released_deliveries = core->released_deliveries; + stats->modified_deliveries = core->modified_deliveries; + stats->deliveries_ingress = core->deliveries_ingress; + stats->deliveries_egress = core->deliveries_egress; + stats->deliveries_transit = core->deliveries_transit; + stats->deliveries_ingress_route_container = core->deliveries_ingress_route_container; + stats->deliveries_egress_route_container = core->deliveries_egress_route_container; + + qdr_general_work_t *work = qdr_general_work(qdr_post_global_stats_response); + work->stats_handler = action->args.stats_request.handler; + work->context = action->args.stats_request.context; + qdr_post_general_work_CT(core, work); +} + +void qdr_request_global_stats(qdr_core_t *core, qdr_global_stats_t *stats, qdr_global_stats_handler_t callback, void *context) +{ + qdr_action_t *action = qdr_action(qdr_global_stats_request_CT, "global_stats_request"); + action->args.stats_request.stats = stats; + action->args.stats_request.handler = callback; + action->args.stats_request.context = context; + qdr_action_enqueue(core, action); +} + diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 0865c78..efba16f 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -166,6 +166,15 @@ struct qdr_action_t { } agent; // + // Arguments for stats request actions + // + struct { + qdr_global_stats_t *stats; + qdr_global_stats_handler_t handler; + void *context; + } stats_request; + + // // Arguments for general use // struct { @@ -203,6 +212,8 @@ struct qdr_general_work_t { qd_message_t *msg; uint64_t in_conn_id; int treatment; + qdr_global_stats_handler_t stats_handler; + void *context; }; ALLOC_DECLARE(qdr_general_work_t); @@ -301,7 +312,6 @@ struct qdr_query_t { DEQ_DECLARE(qdr_query_t, qdr_query_list_t); - struct qdr_node_t { DEQ_LINKS(qdr_node_t); qdr_address_t *owning_addr; diff --git a/tests/system_tests_http.py b/tests/system_tests_http.py index 70c87c4..7e7ba8e 100644 --- a/tests/system_tests_http.py +++ b/tests/system_tests_http.py @@ -149,6 +149,42 @@ class RouterTestHttp(TestCase): # https not configured self.assertRaises(URLError, urlopen, "https://localhost:%d/nosuch" % r.ports[0]) + def test_http_metrics(self): + + if not sys.version_info >= (2, 7): + return + + config = Qdrouterd.Config([ + ('router', {'id': 'QDR.METRICS'}), + ('listener', {'port': self.get_port(), 'http': 'yes'}), + ('listener', {'port': self.get_port(), 'httpRootDir': os.path.dirname(__file__)}), + ]) + r = self.qdrouterd('metrics-test-router', config) + + def test(port): + result = urlopen("http://localhost:%d/metrics" % port, cafile=self.ssl_file('ca-certificate.pem')) + self.assertEqual(200, result.getcode()) + data = result.read().decode('utf-8') + assert('connections' in data) + assert('deliveries_ingress' in data) + + # Sequential calls on multiple ports + for port in r.ports: test(port) + + # Concurrent calls on multiple ports + class TestThread(threading.Thread): + def __init__(self, port): + threading.Thread.__init__(self) + self.port, self.ex = port, None + self.start() + def run(self): + try: test(self.port) + except Exception as e: self.ex = e + threads = [TestThread(p) for p in r.ports + r.ports] + for t in threads: t.join() + for t in threads: + if t.ex: raise t.ex + def test_https_get(self): if not sys.version_info >= (2, 9): return --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org