Tracing TCP connections that make hops through haproxy is currently very
challenging. To get a list of proxied connection pairs inside haproxy,
one must use `show sess` to dump the session table, grab the fd pairs
for each session, resolve those to inodes via the process's file
descriptor table, and then resolve those inodes to addresses via the TCP
connection table. This is quite cumbersome and slow (especially when the
TCP connection table is huge), and does not scale when there are
hundreds or thousands of connections happening per second.

This patch adds a new `show events` command to the stats socket, which
streams events when sessions are established or destroyed. This allows
any interested party to subscribe to these events and maintain their
own session table about the state inside haproxy. This data can then be
used to augment data collected from other sources (like pcap), to follow
a connection through all the hops it makes.
---
 doc/configuration.txt     |   11 ++++
 include/proto/dumpstats.h |    4 ++
 include/types/session.h   |    3 +
 src/dumpstats.c           |  119 +++++++++++++++++++++++++++++++++++++++++++-
 src/session.c             |    4 ++
 5 files changed, 138 insertions(+), 3 deletions(-)

diff --git a/doc/configuration.txt b/doc/configuration.txt
index 36f68a5..b586251 100644
--- a/doc/configuration.txt
+++ b/doc/configuration.txt
@@ -8526,6 +8526,17 @@ show errors [<iid>]
     is the slash ('/') in header name "header/bizarre", which is not a valid
     HTTP character for a header name.
 
+show events
+  Dump a stream of events about sessions as they are created and destroyed.
+  Streaming will continue until a new command is sent over the stats socket.
+
+  Example:
+    >>> $ echo "show show events" | socat stdio,ignoreeof /tmp/sock1
+        + 1 127.0.0.1:50869 127.0.0.1:9418 127.0.0.1:50870 127.0.0.1:6000
+        - 1
+        + 2 127.0.0.1:50874 127.0.0.1:9418 127.0.0.1:50875 127.0.0.1:6000
+        - 2
+
 show info
   Dump info about haproxy status on current process.
 
diff --git a/include/proto/dumpstats.h b/include/proto/dumpstats.h
index 4728121..658fab7 100644
--- a/include/proto/dumpstats.h
+++ b/include/proto/dumpstats.h
@@ -53,6 +53,8 @@
 #define STAT_CLI_O_SESS 6   /* dump sessions */
 #define STAT_CLI_O_ERR  7   /* dump errors */
 
+#define STAT_CLI_EVENTS 8   /* event stream */
+
 /* status codes (strictly 4 chars) used in the URL to display a message */
 #define STAT_STATUS_UNKN "UNKN"        /* an unknown error occured, shouldn't 
happen */
 #define STAT_STATUS_DONE "DONE"        /* the action is successful */
@@ -63,6 +65,8 @@
 
 int stats_sock_parse_request(struct stream_interface *si, char *line);
 void stats_io_handler(struct stream_interface *si);
+void stats_event_new_session(struct session *s);
+void stats_event_end_session(struct session *s);
 int stats_dump_raw_to_buffer(struct session *s, struct buffer *rep);
 int stats_dump_http(struct session *s, struct buffer *rep, struct uri_auth 
*uri);
 int stats_dump_proxy(struct session *s, struct proxy *px, struct uri_auth 
*uri);
diff --git a/include/types/session.h b/include/types/session.h
index 20d2a2c..a296dcd 100644
--- a/include/types/session.h
+++ b/include/types/session.h
@@ -231,6 +231,9 @@ struct session {
                        int bol;                /* pointer to beginning of 
current line */
                } errors;
                struct {
+                       struct list list;
+               } events;
+               struct {
                        const char *msg;        /* pointer to a persistent 
message to be returned in PRINT state */
                } cli;
        } data_ctx;                             /* used by stats I/O handlers 
to dump the stats */
diff --git a/src/dumpstats.c b/src/dumpstats.c
index f680134..959b114 100644
--- a/src/dumpstats.c
+++ b/src/dumpstats.c
@@ -52,6 +52,23 @@
 #include <proto/stream_interface.h>
 #include <proto/task.h>
 
+static struct list stats_event_listeners = 
LIST_HEAD_INIT(stats_event_listeners);
+
+static inline void stats_event_listener_add(struct session *s)
+{
+       LIST_ADDQ(&stats_event_listeners, &s->data_ctx.events.list);
+}
+
+static inline void stats_event_listener_remove(struct session *s)
+{
+       if (!LIST_ISEMPTY(&stats_event_listeners)) {
+               LIST_DEL(&s->data_ctx.events.list);
+       }
+
+       /* Re-initialize stats output */
+       memset(&s->data_ctx.stats, 0, sizeof(s->data_ctx.stats));
+}
+
 const char stats_sock_usage_msg[] =
        "Unknown command. Please enter one of the following commands only :\n"
        "  clear counters : clear max statistics counters (add 'all' for all 
counters)\n"
@@ -62,6 +79,7 @@ const char stats_sock_usage_msg[] =
        "  show stat      : report counters for each proxy and server\n"
        "  show errors    : report last request and response errors for each 
proxy\n"
        "  show sess [id] : report the list of current sessions or dump this 
session\n"
+       "  show events    : stream events about proxied sessions\n"
        "  get weight     : report a server's current weight\n"
        "  set weight     : change a server's weight\n"
        "  set timeout    : change a timeout setting\n"
@@ -358,7 +376,11 @@ int stats_sock_parse_request(struct stream_interface *si, 
char *line)
                        s->data_state = DATA_ST_INIT;
                        si->st0 = STAT_CLI_O_ERR; // stats_dump_errors_to_buffer
                }
-               else { /* neither "stat" nor "info" nor "sess" nor "errors"*/
+               else if (strcmp(args[1], "events") == 0) {
+                       si->st0 = STAT_CLI_EVENTS;
+                       stats_event_listener_add(s);
+               }
+               else { /* neither "stat" nor "info" nor "sess" nor "errors" nor 
"events" */
                        return 0;
                }
        }
@@ -681,6 +703,88 @@ int stats_sock_parse_request(struct stream_interface *si, 
char *line)
        return 1;
 }
 
+void stats_event_new_session(struct session *s)
+{
+       struct session *curr;
+       struct stream_interface *si;
+       int ret;
+
+       char addrs[4][INET6_ADDRSTRLEN + strlen(":65535") + 1] = {"","","",""};
+       int i, fd, port;
+       const void *addr;
+       struct sockaddr_storage sock;
+       socklen_t addr_size;
+
+       for(i = 0; i < 4; i++) {
+               fd = s->si[ i/2 ].fd;
+
+               addr_size = sizeof(sock);
+               if (!(i%2==0 ? getpeername : getsockname)(fd, (struct sockaddr 
*)&sock, &addr_size)) {
+                       switch (sock.ss_family) {
+                       case AF_INET:
+                               addr = (const void *)&((struct sockaddr_in 
*)&sock)->sin_addr;
+                               port = ntohs(((struct sockaddr_in 
*)&sock)->sin_port);
+                               inet_ntop(sock.ss_family, addr, addrs[i], 
sizeof(addrs[i]));
+                               snprintf(addrs[i], sizeof(addrs[i]), "%s:%d", 
addrs[i], port);
+                               break;
+
+                       case AF_INET6:
+                               addr = (const void *)&((struct sockaddr_in6 
*)&sock)->sin6_addr;
+                               port = ntohs(((struct sockaddr_in6 
*)&sock)->sin6_port);
+                               inet_ntop(sock.ss_family, addr, addrs[i], 
sizeof(addrs[i]));
+                               snprintf(addrs[i], sizeof(addrs[i]), "%s:%d", 
addrs[i], port);
+                               break;
+
+                       case AF_UNIX:
+                       default:
+                               sprintf(addrs[i], "%s", "unknown");
+                       }
+               }
+       }
+
+       if (LIST_ISEMPTY(&stats_event_listeners))
+               return;
+
+       snprintf(trash, sizeof(trash), "+ %u %s %s %s %s\n",
+               s->uniq_id,
+               addrs[0], // inbound peer
+               addrs[1], // inbound sock
+               addrs[3], // outbound sock
+               addrs[2]  // outbound peer
+       );
+
+       list_for_each_entry(curr, &stats_event_listeners, data_ctx.events.list) 
{
+               si = &curr->si[1];
+
+               ret = buffer_feed(si->ib, trash);
+               if (ret == -1 && si->owner) {
+                       si->ib->flags |= BF_SEND_DONTWAIT;
+                       task_wakeup(si->owner, TASK_WOKEN_MSG);
+               }
+       }
+}
+
+void stats_event_end_session(struct session *s)
+{
+       struct session *curr;
+       struct stream_interface *si;
+       int ret;
+
+       if (LIST_ISEMPTY(&stats_event_listeners))
+               return;
+
+       snprintf(trash, sizeof(trash), "- %u\n", s->uniq_id);
+       list_for_each_entry(curr, &stats_event_listeners, data_ctx.events.list) 
{
+               si = &curr->si[1];
+
+               ret = buffer_feed(si->ib, trash);
+               if (ret == -1 && si->owner) {
+                       si->ib->flags |= BF_SEND_DONTWAIT;
+                       task_wakeup(si->owner, TASK_WOKEN_MSG);
+               }
+       }
+}
+
 /* This I/O handler runs as an applet embedded in a stream interface. It is
  * used to processes I/O from/to the stats unix socket. The system relies on a
  * state machine handling requests and various responses. We read a request,
@@ -714,7 +818,7 @@ void stats_io_handler(struct stream_interface *si)
                        si->shutw(si);
                        break;
                }
-               else if (si->st0 == STAT_CLI_GETREQ) {
+               else if (si->st0 == STAT_CLI_GETREQ || si->st0 == 
STAT_CLI_EVENTS) {
                        /* ensure we have some output room left in the event we
                         * would want to return some info right after parsing.
                         */
@@ -754,7 +858,11 @@ void stats_io_handler(struct stream_interface *si)
 
                        trash[len] = '\0';
 
+                       if (si->st0 == STAT_CLI_EVENTS)
+                               stats_event_listener_remove(s);
+
                        si->st0 = STAT_CLI_PROMPT;
+
                        if (len) {
                                if (strcmp(trash, "quit") == 0) {
                                        si->st0 = STAT_CLI_END;
@@ -840,7 +948,7 @@ void stats_io_handler(struct stream_interface *si)
        if ((res->flags & BF_SHUTR) && (si->state == SI_ST_EST) && (si->st0 != 
STAT_CLI_GETREQ)) {
                DPRINTF(stderr, "%s@%d: si to buf closed. req=%08x, res=%08x, 
st=%d\n",
                        __FUNCTION__, __LINE__, req->flags, res->flags, 
si->state);
-               /* Other size has closed, let's abort if we have no more 
processing to do
+               /* Other side has closed, let's abort if we have no more 
processing to do
                 * and nothing more to consume. This is comparable to a broken 
pipe, so
                 * we forward the close to the request side so that it flows 
upstream to
                 * the client.
@@ -866,12 +974,17 @@ void stats_io_handler(struct stream_interface *si)
        si->ib->rex = TICK_ETERNITY;
        si->ob->wex = TICK_ETERNITY;
 
+       /* no timeouts when streaming events */
+       if (si->st0 == STAT_CLI_EVENTS)
+               s->req->rex = TICK_ETERNITY;
+
  out:
        DPRINTF(stderr, "%s@%d: st=%d, rqf=%x, rpf=%x, rql=%d, rqs=%d, rl=%d, 
rs=%d\n",
                __FUNCTION__, __LINE__,
                si->state, req->flags, res->flags, req->l, req->send_max, 
res->l, res->send_max);
 
        if (unlikely(si->state == SI_ST_DIS || si->state == SI_ST_CLO)) {
+               stats_event_listener_remove(s);
                /* check that we have released everything then unregister */
                stream_int_unregister_handler(si);
        }
diff --git a/src/session.c b/src/session.c
index 6906656..ae100db 100644
--- a/src/session.c
+++ b/src/session.c
@@ -350,6 +350,8 @@ void sess_establish(struct session *s, struct 
stream_interface *si)
        rep->analysers |= s->fe->fe_rsp_ana | s->be->be_rsp_ana;
        rep->flags |= BF_READ_ATTACHED; /* producer is now attached */
        req->wex = TICK_ETERNITY;
+
+       stats_event_new_session(s);
 }
 
 /* Update stream interface status for input states SI_ST_ASS, SI_ST_QUE, 
SI_ST_TAR.
@@ -1655,6 +1657,8 @@ resync_stream_interface:
                s->do_log(s);
        }
 
+       stats_event_end_session(s);
+
        /* the task MUST not be in the run queue anymore */
        session_free(s);
        task_delete(t);
-- 
1.7.5.3


Reply via email to