This is an automated email from the ASF dual-hosted git repository.

gsim pushed a commit to branch DISPATCH-1278
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit ecd5cb559aee818fecf436409ab3b7e478d58260
Author: Gordon Sim <g...@redhat.com>
AuthorDate: Wed Mar 6 10:30:50 2019 +0000

    DISPATCH-1278: initial support for prometheus metrics export
---
 include/qpid/dispatch/router_core.h           |  24 ++++
 include/qpid/dispatch/server.h                |   5 +
 python/qpid_dispatch/management/qdrouter.json |   6 +
 src/connection_manager.c                      |   4 +
 src/dispatch.c                                |   4 +
 src/dispatch_private.h                        |   2 +
 src/http-libwebsockets.c                      | 170 +++++++++++++++++++++++++-
 src/router_core/router_core.c                 |  42 +++++++
 src/router_core/router_core_private.h         |  12 +-
 tests/system_tests_http.py                    |  36 ++++++
 10 files changed, 303 insertions(+), 2 deletions(-)

diff --git a/include/qpid/dispatch/router_core.h 
b/include/qpid/dispatch/router_core.h
index f7d6c51..2f7da25 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -837,4 +837,28 @@ qdr_connection_info_t *qdr_connection_info(bool            
 is_encrypted,
                                            int              ssl_ssf,
                                            bool             ssl);
 
+
+typedef struct {
+    size_t connections;
+    size_t links;
+    size_t addrs;
+    size_t routers;
+    size_t link_routes;
+    size_t auto_links;
+    size_t presettled_deliveries;
+    size_t dropped_presettled_deliveries;
+    size_t accepted_deliveries;
+    size_t rejected_deliveries;
+    size_t released_deliveries;
+    size_t modified_deliveries;
+    size_t deliveries_ingress;
+    size_t deliveries_egress;
+    size_t deliveries_transit;
+    size_t deliveries_ingress_route_container;
+    size_t deliveries_egress_route_container;
+}  qdr_global_stats_t;
+ALLOC_DECLARE(qdr_global_stats_t);
+typedef void (*qdr_global_stats_handler_t) (void *context);
+void qdr_request_global_stats(qdr_core_t *core, qdr_global_stats_t *stats, 
qdr_global_stats_handler_t callback, void *context);
+
 #endif
diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h
index e56519f..94f14cc 100644
--- a/include/qpid/dispatch/server.h
+++ b/include/qpid/dispatch/server.h
@@ -133,6 +133,11 @@ typedef struct qd_server_config_t {
     char *protocol_family;
 
     /**
+     * Export metrics.
+     */
+    bool metrics;
+
+    /**
      * Accept HTTP connections, allow WebSocket "amqp" protocol upgrades.
      */
     bool http;
diff --git a/python/qpid_dispatch/management/qdrouter.json 
b/python/qpid_dispatch/management/qdrouter.json
index 861d620..e69ab17 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -828,6 +828,12 @@
                     "deprecationName": "failoverList",
                     "description": "A comma-separated list of failover urls to 
be supplied to connected clients.  Form: 
[(amqp|amqps|ws|wss)://]host_or_ip[:port]"
                 },
+                "metrics": {
+                    "type": "boolean",
+                    "default": true,
+                    "description": "Export metrics in prometheus text format 
for the router (using path /metrics).",
+                    "create": true
+                },
                 "http": {
                     "type": "boolean",
                     "default": false,
diff --git a/src/connection_manager.c b/src/connection_manager.c
index 98eadd4..8f8cbe7 100644
--- a/src/connection_manager.c
+++ b/src/connection_manager.c
@@ -312,6 +312,7 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, 
qd_server_config_t *conf
     config->role                 = qd_entity_get_string(entity, "role");       
       CHECK();
     config->inter_router_cost    = qd_entity_opt_long(entity, "cost", 1);      
       CHECK();
     config->protocol_family      = qd_entity_opt_string(entity, 
"protocolFamily", 0); CHECK();
+    config->metrics              = qd_entity_opt_bool(entity, "metrics", 
true);       CHECK();
     config->http                 = qd_entity_opt_bool(entity, "http", false);  
       CHECK();
     config->http_root_dir        = qd_entity_opt_string(entity, "httpRootDir", 
false);   CHECK();
 
@@ -323,6 +324,9 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, 
qd_server_config_t *conf
     if (config->http && ! config->http_root_dir) {
         qd_log(qd->connection_manager->log_source, QD_LOG_INFO, "HTTP service 
is requested but no httpRootDir specified. The router will serve 
AMQP-over-websockets but no static content.");
     }
+    if (config->metrics && !config->http) {
+        qd_log(qd->connection_manager->log_source, QD_LOG_INFO, "Metrics can 
only be exported on listener with http enabled.");
+    }
 
     config->max_frame_size       = qd_entity_get_long(entity, "maxFrameSize"); 
       CHECK();
     config->max_sessions         = qd_entity_get_long(entity, "maxSessions");  
       CHECK();
diff --git a/src/dispatch.c b/src/dispatch.c
index 4906b39..eb2d195 100644
--- a/src/dispatch.c
+++ b/src/dispatch.c
@@ -367,3 +367,7 @@ void qd_dispatch_free(qd_dispatch_t *qd)
 
 void qd_dispatch_router_lock(qd_dispatch_t *qd) { 
sys_mutex_lock(qd->router->lock); }
 void qd_dispatch_router_unlock(qd_dispatch_t *qd) { 
sys_mutex_unlock(qd->router->lock); }
+
+qdr_core_t* qd_dispatch_router_core(qd_dispatch_t *qd) {
+    return qd->router->router_core;
+}
diff --git a/src/dispatch_private.h b/src/dispatch_private.h
index f5a089c..a01db9b 100644
--- a/src/dispatch_private.h
+++ b/src/dispatch_private.h
@@ -124,4 +124,6 @@ void qd_dispatch_unregister_entity(qd_dispatch_t *qd, void 
*impl);
 /** Set the agent */
 void qd_dispatch_set_agent(qd_dispatch_t *qd, void *agent);
 
+qdr_core_t* qd_dispatch_router_core(qd_dispatch_t *qd);
+
 #endif
diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c
index b2f0d9c..0690c0c 100644
--- a/src/http-libwebsockets.c
+++ b/src/http-libwebsockets.c
@@ -19,6 +19,7 @@
 
 #include <qpid/dispatch/atomic.h>
 #include <qpid/dispatch/amqp.h>
+#include <qpid/dispatch/router_core.h>
 #include <qpid/dispatch/threading.h>
 #include <qpid/dispatch/timer.h>
 
@@ -95,6 +96,14 @@ typedef struct connection_t {
     struct lws *wsi;
 } connection_t;
 
+typedef struct stats_t {
+    size_t current;
+    bool headers_sent;
+    qdr_global_stats_t stats;
+    qd_http_server_t *server;
+    struct lws *wsi;
+} stats_t;
+
 /* Navigating from WSI pointer to qd objects */
 static qd_http_server_t *wsi_server(struct lws *wsi);
 static qd_http_listener_t *wsi_listener(struct lws *wsi);
@@ -106,6 +115,8 @@ static int callback_http(struct lws *wsi, enum 
lws_callback_reasons reason,
                          void *user, void *in, size_t len);
 static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
                            void *user, void *in, size_t len);
+static int callback_metrics(struct lws *wsi, enum lws_callback_reasons reason,
+                               void *user, void *in, size_t len);
 
 static struct lws_protocols protocols[] = {
     /* HTTP only protocol comes first */
@@ -128,6 +139,11 @@ static struct lws_protocols protocols[] = {
         callback_amqpws,
         sizeof(connection_t),
     },
+    {
+        "http",
+        callback_metrics,
+        sizeof(stats_t),
+    },
     { NULL, NULL, 0, 0 } /* terminator */
 };
 
@@ -161,7 +177,7 @@ static int handle_events(connection_t* c) {
 
 /* The server has a bounded, thread-safe queue for external work */
 typedef struct work_t {
-    enum { W_NONE, W_LISTEN, W_CLOSE, W_WAKE, W_STOP } type;
+    enum { W_NONE, W_LISTEN, W_CLOSE, W_WAKE, W_STOP, W_HANDLE_STATS } type;
     void *value;
 } work_t;
 
@@ -177,6 +193,7 @@ typedef struct work_queue_t {
 /* HTTP Server runs in a single thread, communication from other threads via 
work_queue */
 struct qd_http_server_t {
     qd_server_t *server;
+    qdr_core_t *core;
     sys_thread_t *thread;
     work_queue_t work;
     qd_log_source_t *log;
@@ -230,6 +247,7 @@ struct qd_http_listener_t {
     qd_http_server_t *server;
     struct lws_vhost *vhost;
     struct lws_http_mount mount;
+    struct lws_http_mount metrics;
 };
 
 void qd_http_listener_free(qd_http_listener_t *hl) {
@@ -283,6 +301,14 @@ static void listener_start(qd_http_listener_t *hl, 
qd_http_server_t *hs) {
     m->def = "index.html";  /* Default file name */
     m->origin_protocol = LWSMPRO_FILE; /* mount type is a directory in a 
filesystem */
     m->extra_mimetypes = mime_types;
+    if (config->metrics) {
+        struct lws_http_mount *metrics = &hl->metrics;
+        m->mount_next = metrics;
+        metrics->mountpoint = "/metrics";
+        metrics->mountpoint_len = strlen(metrics->mountpoint);
+        metrics->origin_protocol = LWSMPRO_CALLBACK;
+        metrics->protocol = "http";
+    }
 
     struct lws_context_creation_info info = {0};
     info.mounts = m;
@@ -361,6 +387,143 @@ static void connection_wake(qd_connection_t *qd_conn)
     }
 }
 
+static void handle_stats_results(void *context)
+{
+    stats_t* stats = (stats_t*) context;
+    qd_http_server_t *hs = stats->server;
+    if (hs) {
+        work_t w = { W_HANDLE_STATS, stats->wsi };
+        work_push(hs, w);
+    }
+}
+
+typedef int (*int_metric) (qdr_global_stats_t *stats);
+typedef struct metric_definition {
+    const char* name;
+    const char* type;
+    int_metric value;
+} metric_definition;
+
+static int stats_get_connections(qdr_global_stats_t *stats) { return 
stats->connections; }
+static int stats_get_links(qdr_global_stats_t *stats) { return stats->links; }
+static int stats_get_addrs(qdr_global_stats_t *stats) { return stats->addrs; }
+static int stats_get_routers(qdr_global_stats_t *stats) { return 
stats->routers; }
+static int stats_get_link_routes(qdr_global_stats_t *stats) { return 
stats->link_routes; }
+static int stats_get_auto_links(qdr_global_stats_t *stats) { return 
stats->auto_links; }
+static int stats_get_presettled_deliveries(qdr_global_stats_t *stats) { return 
stats->presettled_deliveries; }
+static int stats_get_dropped_presettled_deliveries(qdr_global_stats_t *stats) 
{ return stats->dropped_presettled_deliveries; }
+static int stats_get_accepted_deliveries(qdr_global_stats_t *stats) { return 
stats->accepted_deliveries; }
+static int stats_get_released_deliveries(qdr_global_stats_t *stats) { return 
stats->released_deliveries; }
+static int stats_get_rejected_deliveries(qdr_global_stats_t *stats) { return 
stats->rejected_deliveries; }
+static int stats_get_modified_deliveries(qdr_global_stats_t *stats) { return 
stats->modified_deliveries; }
+static int stats_get_deliveries_ingress(qdr_global_stats_t *stats) { return 
stats->deliveries_ingress; }
+static int stats_get_deliveries_egress(qdr_global_stats_t *stats) { return 
stats->deliveries_egress; }
+static int stats_get_deliveries_transit(qdr_global_stats_t *stats) { return 
stats->deliveries_transit; }
+static int stats_get_deliveries_ingress_route_container(qdr_global_stats_t 
*stats) { return stats->deliveries_ingress_route_container; }
+static int stats_get_deliveries_egress_route_container(qdr_global_stats_t 
*stats) { return stats->deliveries_egress_route_container; }
+
+static struct metric_definition metrics[] = {
+    {"connections", "gauge", stats_get_connections},
+    {"links", "gauge", stats_get_links},
+    {"addresses", "gauge", stats_get_addrs},
+    {"routers", "gauge", stats_get_routers},
+    {"link_routes", "gauge", stats_get_link_routes},
+    {"auto_links", "gauge", stats_get_auto_links},
+    {"presettled_deliveries", "counter", stats_get_presettled_deliveries},
+    {"dropped_presettled_deliveries", "counter", 
stats_get_dropped_presettled_deliveries},
+    {"accepted_deliveries", "counter", stats_get_accepted_deliveries},
+    {"released_deliveries", "counter", stats_get_released_deliveries},
+    {"rejected_deliveries", "counter", stats_get_rejected_deliveries},
+    {"modified_deliveries", "counter", stats_get_modified_deliveries},
+    {"deliveries_ingress", "counter", stats_get_deliveries_ingress},
+    {"deliveries_egress", "counter", stats_get_deliveries_egress},
+    {"deliveries_transit", "counter", stats_get_deliveries_transit},
+    {"deliveries_ingress_route_container", "counter", 
stats_get_deliveries_ingress_route_container},
+    {"deliveries_egress_route_container", "counter", 
stats_get_deliveries_egress_route_container}
+};
+static size_t metrics_length = sizeof(metrics)/sizeof(metrics[0]);
+
+static bool write_stats(uint8_t **position, const uint8_t * const end, const 
char* name, const char* type, int value)
+{
+    //11 chars + type + 2*name + 20 chars for int
+    size_t length = 11 + strlen(type) + strlen(name)*2 + 20;
+    if (end - *position >= length) {
+        *position += lws_snprintf((char*) *position, end - *position, "# TYPE 
%s %s\n", name, type);
+        *position += lws_snprintf((char*) *position, end - *position, "%s 
%i\n", name, value);
+        return true;
+    } else {
+        return false;
+    }
+}
+
+static bool write_metric(uint8_t **position, const uint8_t * const end, 
metric_definition* definition, qdr_global_stats_t* stats)
+{
+    return write_stats(position, end, definition->name, definition->type, 
definition->value(stats));
+}
+
+static int add_header_by_name(struct lws *wsi, const char* name, const char* 
value, uint8_t** position, uint8_t* end)
+{
+    return lws_add_http_header_by_name(wsi, (unsigned char*) name, (unsigned 
char*) value, strlen(value), position, end);
+}
+
+static int callback_metrics(struct lws *wsi, enum lws_callback_reasons reason,
+                               void *user, void *in, size_t len)
+{
+    qd_http_server_t *hs = wsi_server(wsi);
+    stats_t *stats = (stats_t*) user;
+    uint8_t buffer[LWS_PRE + 2048];
+    uint8_t *start = &buffer[LWS_PRE], *position = start, *end = 
&buffer[sizeof(buffer) - LWS_PRE - 1];
+
+    switch (reason) {
+
+    case LWS_CALLBACK_HTTP: {
+        stats->wsi = wsi;
+        stats->server = hs;
+        //request stats from core thread
+        qdr_request_global_stats(hs->core, &stats->stats, 
handle_stats_results, (void*) stats);
+        return 0;
+    }
+
+    case LWS_CALLBACK_HTTP_WRITEABLE: {
+        //encode stats into buffer
+        if (!stats->headers_sent) {
+            if (lws_add_http_header_status(wsi, HTTP_STATUS_OK, &position, end)
+                || add_header_by_name(wsi, "content-type:", "text/plain", 
&position, end)
+                || add_header_by_name(wsi, "connection:", "close", &position, 
end))
+                return 1;
+            if (lws_finalize_http_header(wsi, &position, end))
+                return 1;
+            stats->headers_sent = true;
+        }
+
+        while (stats->current < metrics_length) {
+            if (write_metric(&position, end, &metrics[stats->current], 
&stats->stats)) {
+                stats->current++;
+                qd_log(hs->log, QD_LOG_DEBUG, "wrote metric %i of %i", 
stats->current, metrics_length);
+            } else {
+                qd_log(hs->log, QD_LOG_DEBUG, "insufficient space in buffer");
+                break;
+            }
+        }
+        int n = stats->current < metrics_length ? LWS_WRITE_HTTP : 
LWS_WRITE_HTTP_FINAL;
+
+        //write buffer
+        size_t available = position - start;
+       if (lws_write(wsi, (unsigned char*) start, available, n) != available)
+            return 1;
+        if (n == LWS_WRITE_HTTP_FINAL) {
+            if (lws_http_transaction_completed(wsi)) return -1;
+        } else {
+            lws_callback_on_writable(wsi);
+        }
+        return 0;
+    }
+
+    default:
+        return 0;
+    }
+}
+
 /* Callbacks for promoted AMQP over WS connections. */
 static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
                            void *user, void *in, size_t len)
@@ -494,6 +657,9 @@ static void* http_thread_run(void* v) {
             case W_CLOSE:
                 listener_close((qd_http_listener_t*)w.value, hs);
                 break;
+            case W_HANDLE_STATS:
+                lws_callback_on_writable((struct lws*) w.value);
+                break;
             case W_WAKE: {
                 connection_t *c = w.value;
                 pn_collector_put(c->driver.collector, PN_OBJECT, 
c->driver.connection,
@@ -546,6 +712,7 @@ qd_http_server_t *qd_http_server(qd_server_t *s, 
qd_log_source_t *log) {
         hs->context = lws_create_context(&info);
         hs->server = s;
         hs->log = log;              /* For messages from this file */
+        hs->core = 0; // not yet available
         if (!hs->context) {
             qd_log(hs->log, QD_LOG_CRITICAL, "No memory starting HTTP server");
             qd_http_server_free(hs);
@@ -559,6 +726,7 @@ qd_http_server_t *qd_http_server(qd_server_t *s, 
qd_log_source_t *log) {
 
 qd_http_listener_t *qd_http_server_listen(qd_http_server_t *hs, qd_listener_t 
*li)
 {
+    hs->core = qd_dispatch_router_core(qd_server_dispatch(hs->server));
     sys_mutex_lock(hs->work.lock);
     if (!hs->thread) {
         hs->thread = sys_thread(http_thread_run, hs);
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index c2dbdaa..05fe970 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -748,3 +748,45 @@ void qdr_connection_work_free_CT(qdr_connection_work_t 
*work)
     qdr_terminus_free(work->target);
     free_qdr_connection_work_t(work);
 }
+
+static void qdr_post_global_stats_response(qdr_core_t *core, 
qdr_general_work_t *work)
+{
+    work->stats_handler(work->context);
+}
+
+static void qdr_global_stats_request_CT(qdr_core_t *core, qdr_action_t 
*action, bool discard)
+{
+    qdr_global_stats_t *stats = action->args.stats_request.stats;
+    stats->addrs = DEQ_SIZE(core->addrs);
+    stats->links = DEQ_SIZE(core->open_links);
+    stats->routers = DEQ_SIZE(core->routers);
+    stats->connections = DEQ_SIZE(core->open_connections);
+    stats->link_routes = DEQ_SIZE(core->link_routes);
+    stats->auto_links = DEQ_SIZE(core->auto_links);
+    stats->presettled_deliveries = core->presettled_deliveries;
+    stats->dropped_presettled_deliveries = core->dropped_presettled_deliveries;
+    stats->accepted_deliveries = core->accepted_deliveries;
+    stats->rejected_deliveries = core->rejected_deliveries;
+    stats->released_deliveries = core->released_deliveries;
+    stats->modified_deliveries = core->modified_deliveries;
+    stats->deliveries_ingress = core->deliveries_ingress;
+    stats->deliveries_egress = core->deliveries_egress;
+    stats->deliveries_transit = core->deliveries_transit;
+    stats->deliveries_ingress_route_container = 
core->deliveries_ingress_route_container;
+    stats->deliveries_egress_route_container = 
core->deliveries_egress_route_container;
+
+    qdr_general_work_t *work = 
qdr_general_work(qdr_post_global_stats_response);
+    work->stats_handler = action->args.stats_request.handler;
+    work->context = action->args.stats_request.context;
+    qdr_post_general_work_CT(core, work);
+}
+
+void qdr_request_global_stats(qdr_core_t *core, qdr_global_stats_t *stats, 
qdr_global_stats_handler_t callback, void *context)
+{
+    qdr_action_t *action = qdr_action(qdr_global_stats_request_CT, 
"global_stats_request");
+    action->args.stats_request.stats = stats;
+    action->args.stats_request.handler = callback;
+    action->args.stats_request.context = context;
+    qdr_action_enqueue(core, action);
+}
+
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index 0865c78..efba16f 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -166,6 +166,15 @@ struct qdr_action_t {
         } agent;
 
         //
+        // Arguments for stats request actions
+        //
+        struct {
+            qdr_global_stats_t             *stats;
+            qdr_global_stats_handler_t     handler;
+            void                           *context;
+        } stats_request;
+
+        //
         // Arguments for general use
         //
         struct {
@@ -203,6 +212,8 @@ struct qdr_general_work_t {
     qd_message_t               *msg;
     uint64_t                    in_conn_id;
     int                         treatment;
+    qdr_global_stats_handler_t  stats_handler;
+    void                       *context;
 };
 
 ALLOC_DECLARE(qdr_general_work_t);
@@ -301,7 +312,6 @@ struct qdr_query_t {
 
 DEQ_DECLARE(qdr_query_t, qdr_query_list_t); 
 
-
 struct qdr_node_t {
     DEQ_LINKS(qdr_node_t);
     qdr_address_t    *owning_addr;
diff --git a/tests/system_tests_http.py b/tests/system_tests_http.py
index 70c87c4..7e7ba8e 100644
--- a/tests/system_tests_http.py
+++ b/tests/system_tests_http.py
@@ -149,6 +149,42 @@ class RouterTestHttp(TestCase):
         # https not configured
         self.assertRaises(URLError, urlopen, "https://localhost:%d/nosuch"; % 
r.ports[0])
 
+    def test_http_metrics(self):
+
+        if not sys.version_info >= (2, 7):
+            return
+
+        config = Qdrouterd.Config([
+            ('router', {'id': 'QDR.METRICS'}),
+            ('listener', {'port': self.get_port(), 'http': 'yes'}),
+            ('listener', {'port': self.get_port(), 'httpRootDir': 
os.path.dirname(__file__)}),
+        ])
+        r = self.qdrouterd('metrics-test-router', config)
+
+        def test(port):
+            result = urlopen("http://localhost:%d/metrics"; % port, 
cafile=self.ssl_file('ca-certificate.pem'))
+            self.assertEqual(200, result.getcode())
+            data = result.read().decode('utf-8')
+            assert('connections' in data)
+            assert('deliveries_ingress' in data)
+
+        # Sequential calls on multiple ports
+        for port in r.ports: test(port)
+
+        # Concurrent calls on multiple ports
+        class TestThread(threading.Thread):
+            def __init__(self, port):
+                threading.Thread.__init__(self)
+                self.port, self.ex = port, None
+                self.start()
+            def run(self):
+                try: test(self.port)
+                except Exception as e: self.ex = e
+        threads = [TestThread(p) for p in r.ports + r.ports]
+        for t in threads: t.join()
+        for t in threads:
+            if t.ex: raise t.ex
+
     def test_https_get(self):
         if not sys.version_info >= (2, 9):
             return


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to