---
 doc/configuration.txt            |   22 ++++
 include/proto/dumpstats.h        |    5 +
 include/types/stream_interface.h |    5 +
 src/dumpstats.c                  |  242 +++++++++++++++++++++++++++++++++++++-
 src/session.c                    |    9 ++
 5 files changed, 280 insertions(+), 3 deletions(-)

diff --git a/doc/configuration.txt b/doc/configuration.txt
index 2ede208..5b1ca85 100644
--- a/doc/configuration.txt
+++ b/doc/configuration.txt
@@ -10031,6 +10031,28 @@ shutdown sessions <backend>/<server>
   maintenance mode, for instance. Such terminated sessions are reported with a
   'K' flag in the logs.
 
+debug sess [proxy:<proxy_name>[:<server_name>]]
+
+  Dump a stream of events about sessions as they are added and removed.
+  The possible event formats are "Forward" and "Close":
+
+    "F <session_id> <in_peer> - <in_sock> | <out_sock> - <out_peer>\n"
+    "C <session_id>\n"
+
+  Streaming will continue until a new command is received or the
+  connection is closed. If <proxy_name> or <server_name> is specified, limit to
+  events concerning only the proxy and server specified.
+
+  This command is restricted and can only be issued on sockets configured
+  for levels "operator" or "admin".
+
+  Example:
+    >>> $ echo "set timeout cli 3600; debug sess" | socat stdio,ignoreeof 
/tmp/sock1
+        F 1 127.0.0.1:50869 - 127.0.0.1:9418 | 127.0.0.1:50870 - 127.0.0.1:6000
+        C 1
+        F 2 127.0.0.1:50874 - 127.0.0.1:9418 | 127.0.0.1:50875 - 127.0.0.1:6000
+        C 2
+
 /*
  * Local variables:
  *  fill-column: 79
diff --git a/include/proto/dumpstats.h b/include/proto/dumpstats.h
index eb44a36..d328881 100644
--- a/include/proto/dumpstats.h
+++ b/include/proto/dumpstats.h
@@ -55,6 +55,8 @@
 #define STAT_CLI_O_TAB  8   /* dump tables */
 #define STAT_CLI_O_CLR  9   /* clear tables */
 
+#define STAT_CLI_EVENTS 8   /* event stream */
+
 /* status codes (strictly 4 chars) used in the URL to display a message */
 #define STAT_STATUS_UNKN "UNKN"        /* an unknown error occured, shouldn't 
happen */
 #define STAT_STATUS_DONE "DONE"        /* the action is successful */
@@ -63,8 +65,11 @@
 #define STAT_STATUS_DENY "DENY"        /* action denied */
 
 extern struct si_applet http_stats_applet;
+extern int stats_event_enabled;
 
 void stats_io_handler(struct stream_interface *si);
+void stats_event_new_session(struct session *s);
+void stats_event_end_session(struct session *s);
 
 
 #endif /* _PROTO_DUMPSTATS_H */
diff --git a/include/types/stream_interface.h b/include/types/stream_interface.h
index 16af806..d730559 100644
--- a/include/types/stream_interface.h
+++ b/include/types/stream_interface.h
@@ -169,6 +169,11 @@ struct stream_interface {
                                int bol;                /* pointer to beginning 
of current line */
                        } errors;
                        struct {
+                               struct list list;       /* list of stats 
streams in the STAT_CLI_EVENTS state */
+                               struct proxy *px;       /* if not NULL, only 
send events associated with this proxy */
+                               struct server *srv;     /* if not NULL, only 
send events associated with this server */
+                       } events;
+                       struct {
                                void *target;           /* table we want to 
dump, or NULL for all */
                                struct proxy *proxy;    /* table being 
currently dumped (first if NULL) */
                                struct stksess *entry;  /* last entry we were 
trying to dump (or first if NULL) */
diff --git a/src/dumpstats.c b/src/dumpstats.c
index 74ad966..b3f4423 100644
--- a/src/dumpstats.c
+++ b/src/dumpstats.c
@@ -86,6 +86,7 @@ static const char stats_sock_usage_msg[] =
        "  disable        : put a server or frontend in maintenance mode\n"
        "  enable         : re-enable a server or frontend which is in 
maintenance mode\n"
        "  shutdown       : kill a session or a frontend (eg:to release 
listening ports)\n"
+       "  debug sess     : stream events about proxied sessions\n"
        "";
 
 static const char stats_permission_denied_msg[] =
@@ -114,6 +115,77 @@ enum {
        STAT_PX_ST_FIN,
 };
 
+/* Keep track of sessions that want streaming events (STAT_CLI_EVENT).
+ */
+int stats_event_enabled = 0;
+static struct list stats_event_listeners = 
LIST_HEAD_INIT(stats_event_listeners);
+
+/* Add a session to the list of event listeners.
+ */
+static inline void stats_event_listener_add(struct stream_interface *si)
+{
+       LIST_ADDQ(&stats_event_listeners, &si->applet.ctx.events.list);
+       stats_event_enabled = 1;
+}
+
+/* Remove a session from the list of listeners, but only if it is a
+ * registered listener. This enables us to invoke the method on all
+ * disconnecting stats sockets to ensure they are cleaned up, regardless
+ * of how many times they switch between streaming and other commands.
+ */
+static inline void stats_event_listener_remove(struct stream_interface *si)
+{
+       int found = 0;
+       struct stream_interface *curr;
+       list_for_each_entry(curr, &stats_event_listeners, 
applet.ctx.events.list) {
+               if (curr == si) {
+                       found = 1;
+                       break;
+               }
+       }
+
+       if (found) {
+               si->applet.ctx.events.px = NULL;
+               si->applet.ctx.events.srv = NULL;
+               LIST_DEL(&si->applet.ctx.events.list);
+       }
+
+       if (LIST_ISEMPTY(&stats_event_listeners))
+               stats_event_enabled = 0;
+
+       /* Re-initialize stats output */
+       memset(&si->applet.ctx.stats, 0, sizeof(si->applet.ctx.stats));
+}
+
+/* Send a message to all registered event listeners.
+ */
+static inline void stats_event_listener_message_all(char *msg, struct session 
*s)
+{
+       struct stream_interface *curr;
+
+       list_for_each_entry(curr, &stats_event_listeners, 
applet.ctx.events.list) {
+               struct proxy *px;
+               struct server *srv;
+
+               if (!(curr->flags & SI_FL_DONT_WAKE) && curr->owner) {
+                       /* filter by proxy and server if required */
+                       if ((px = curr->applet.ctx.events.px)) {
+                               if (s->be != px && s->fe != px)
+                                       continue; /* ignore */
+                               if ((srv = curr->applet.ctx.events.srv)) {
+                                       if (target_srv(&s->target) != srv)
+                                               continue; /* ignore */
+                               }
+                       }
+
+                       if (buffer_feed(curr->ib, msg) == -1) {
+                               curr->ib->flags |= BF_SEND_DONTWAIT;
+                               task_wakeup(curr->owner, TASK_WOKEN_MSG);
+                       }
+               }
+       }
+}
+
 /* This function is called from the session-level accept() in order to 
instanciate
  * a new stats socket. It returns a positive value upon success, 0 if the 
connection
  * needs to be closed and ignored, or a negative value upon critical failure.
@@ -772,7 +844,54 @@ static int stats_sock_parse_request(struct 
stream_interface *si, char *line)
                args[arg] = line;
 
        si->applet.ctx.stats.flags = 0;
-       if (strcmp(args[0], "show") == 0) {
+       if (strcmp(args[0], "debug") == 0) {
+               if (strcmp(args[1], "sess") == 0) {
+                       if (s->listener->perm.ux.level < ACCESS_LVL_OPER) {
+                               si->applet.ctx.cli.msg = 
stats_permission_denied_msg;
+                               si->applet.st0 = STAT_CLI_PRINT;
+                               return 1;
+                       }
+                       if (*args[2] && !strncmp(args[2], "proxy", 5)) {
+                               struct proxy *px = NULL;
+                               struct server *srv = NULL;
+                               char *px_name = args[2] + 6, *srv_name;
+
+                               if ((srv_name = strchr(px_name, ':'))) {
+                                       *srv_name = 0;
+                                       srv_name += 1;
+                               }
+
+                               px = findproxy(px_name, PR_CAP_FE|PR_CAP_BE);
+                               if (!px) {
+                                       si->applet.ctx.cli.msg = "Invalid proxy 
filter for event stream.";
+                                       si->applet.st0 = STAT_CLI_PRINT;
+                                       return 1;
+                               }
+
+                               if (srv_name && *srv_name) {
+                                       srv = findserver(px, srv_name);
+                                       if (!srv) {
+                                               si->applet.ctx.cli.msg = 
"Invalid server filter for event stream.";
+                                               si->applet.st0 = STAT_CLI_PRINT;
+                                               return 1;
+                                       }
+                               }
+
+                               si->applet.ctx.events.srv = srv;
+                               si->applet.ctx.events.px = px;
+                       } else {
+                               si->applet.ctx.events.srv = NULL;
+                               si->applet.ctx.events.px = NULL;
+                       }
+
+                       stats_event_listener_add(si);
+                       si->applet.st0 = STAT_CLI_EVENTS;
+               }
+               else { /* not "sess" */
+                       return 0;
+               }
+       }
+       else if (strcmp(args[0], "show") == 0) {
                if (strcmp(args[1], "stat") == 0) {
                        if (*args[2] && *args[3] && *args[4]) {
                                si->applet.ctx.stats.flags |= STAT_BOUND;
@@ -1346,6 +1465,14 @@ static int stats_sock_parse_request(struct 
stream_interface *si, char *line)
        return 1;
 }
 
+/* Callback to release a cli session.
+ */
+static void cli_session_release(struct stream_interface *si)
+{
+       /* remove if registered as event listener */
+       stats_event_listener_remove(si);
+}
+
 /* This I/O handler runs as an applet embedded in a stream interface. It is
  * used to processes I/O from/to the stats unix socket. The system relies on a
  * state machine handling requests and various responses. We read a request,
@@ -1377,7 +1504,7 @@ static void cli_io_handler(struct stream_interface *si)
                        si->shutw(si);
                        break;
                }
-               else if (si->applet.st0 == STAT_CLI_GETREQ) {
+               else if (si->applet.st0 == STAT_CLI_GETREQ || si->applet.st0 == 
STAT_CLI_EVENTS) {
                        /* ensure we have some output room left in the event we
                         * would want to return some info right after parsing.
                         */
@@ -1417,7 +1544,10 @@ static void cli_io_handler(struct stream_interface *si)
 
                        trash[len] = '\0';
 
+                       if (si->applet.st0 == STAT_CLI_EVENTS)
+                               stats_event_listener_remove(si);
                        si->applet.st0 = STAT_CLI_PROMPT;
+
                        if (len) {
                                if (strcmp(trash, "quit") == 0) {
                                        si->applet.st0 = STAT_CLI_END;
@@ -3725,6 +3855,112 @@ static int stats_table_request(struct stream_interface 
*si, bool show)
        return 1;
 }
 
+/* Called whenever a new session is successfully established (reaches
+ * SI_ST_EST). If there are any stats sockets listening in the
+ * STAT_CLI_EVENTS state, they will be notified of this session's unique
+ * id, along with the sockname and peername of both sides of the session.
+ */
+void stats_event_new_session(struct session *s)
+{
+       if (LIST_ISEMPTY(&stats_event_listeners))
+               return;
+
+       char addrs[4][INET6_ADDRSTRLEN + strlen(":65535") + 1] = {"","","",""};
+       int i;
+
+       struct stream_interface *si0 = &s->si[0], *si1 = &s->si[1];
+       socklen_t namelen;
+
+       /* si0 from/to = peer/sock */
+       if (!(si0->flags & SI_FL_FROM_SET)) {
+               namelen = sizeof(si0->addr.from);
+               getpeername(si0->fd, (struct sockaddr *)&si0->addr.from, 
&namelen);
+               si0->flags |= SI_FL_FROM_SET;
+       }
+       if (!(si0->flags & SI_FL_TO_SET)) {
+               namelen = sizeof(si0->addr.to);
+               getsockname(si0->fd, (struct sockaddr *)&si0->addr.to, 
&namelen);
+               si0->flags |= SI_FL_TO_SET;
+       }
+
+       /* si1 from/to = sock/peer (reversed) */
+       if (!(si1->flags & SI_FL_FROM_SET)) {
+               namelen = sizeof(si1->addr.from);
+               getsockname(si1->fd, (struct sockaddr *)&si1->addr.from, 
&namelen);
+               si1->flags |= SI_FL_FROM_SET;
+       }
+       if (!(si1->flags & SI_FL_TO_SET)) {
+               namelen = sizeof(si1->addr.to);
+               getpeername(si1->fd, (struct sockaddr *)&si1->addr.to, 
&namelen);
+               si1->flags |= SI_FL_TO_SET;
+       }
+
+       for(i = 0; i < 4; i++) {
+               struct sockaddr_storage *sock;
+               const void *sin_addr = NULL;
+               int port = 0;
+
+               switch (i) {
+               case 0: // inbound peer
+                       sock = &si0->addr.from;
+                       break;
+               case 1: // inbound sock
+                       sock = &si0->addr.to;
+                       break;
+               case 2: // outbound sock
+                       sock = &si1->addr.from;
+                       break;
+               case 3: // outbound peer
+                       sock = &si1->addr.to;
+                       break;
+               }
+
+               switch (sock->ss_family) {
+               case AF_INET:
+                       sin_addr = (const void *)&((struct sockaddr_in 
*)sock)->sin_addr;
+                       port = ntohs(((struct sockaddr_in *)sock)->sin_port);
+                       break;
+               case AF_INET6:
+                       sin_addr = (const void *)&((struct sockaddr_in6 
*)sock)->sin6_addr;
+                       port = ntohs(((struct sockaddr_in6 *)sock)->sin6_port);
+                       break;
+               }
+
+               switch (sock->ss_family) {
+               case AF_INET:
+               case AF_INET6:
+                       inet_ntop(sock->ss_family, sin_addr, addrs[i], 
sizeof(addrs[i]));
+                       snprintf(addrs[i]+strlen(addrs[i]), 
sizeof(addrs[i])-strlen(addrs[i])-1, ":%d", port);
+                       break;
+               case AF_UNIX:
+                       sprintf(addrs[i], "unix:%d", s->listener->luid);
+                       break;
+               default:
+                       sprintf(addrs[i], "%s", "unknown");
+               }
+       }
+
+       snprintf(trash, sizeof(trash), "F %u %s - %s | %s - %s\n", s->uniq_id,
+         addrs[0], // inbound peer
+         addrs[1], // inbound sock
+         addrs[2], // outbound sock
+         addrs[3]  // outbound peer
+       );
+       stats_event_listener_message_all(trash, s);
+}
+
+/* Called when the session argument's s->si[1]->state goes from SI_ST_EST
+ * to SI_ST_CLO. All stats listeners are notified of this destroy event.
+ */
+void stats_event_end_session(struct session *s)
+{
+       if (LIST_ISEMPTY(&stats_event_listeners))
+               return;
+
+       snprintf(trash, sizeof(trash), "C %u\n", s->uniq_id);
+       stats_event_listener_message_all(trash, s);
+}
+
 /* print a line of text buffer (limited to 70 bytes) to <out>. The format is :
  * <2 spaces> <offset=5 digits> <space or plus> <space> <70 chars max> <\n>
  * which is 60 chars per line. Non-printable chars \t, \n, \r and \e are
@@ -3939,7 +4175,7 @@ struct si_applet http_stats_applet = {
 static struct si_applet cli_applet = {
        .name = "<CLI>", /* used for logging */
        .fct = cli_io_handler,
-       .release = NULL,
+       .release = cli_session_release,
 };
 
 static struct cfg_kw_list cfg_kws = {{ },{
diff --git a/src/session.c b/src/session.c
index e5b76eb..48c5435 100644
--- a/src/session.c
+++ b/src/session.c
@@ -708,6 +708,9 @@ static void sess_establish(struct session *s, struct 
stream_interface *si)
                rep->rto = s->be->timeout.server;
        }
        req->wex = TICK_ETERNITY;
+
+       if (unlikely(stats_event_enabled))
+               stats_event_new_session(s);
 }
 
 /* Update stream interface status for input states SI_ST_ASS, SI_ST_QUE, 
SI_ST_TAR.
@@ -2148,6 +2151,12 @@ struct task *process_session(struct task *t)
                s->do_log(s);
        }
 
+       if (unlikely(stats_event_enabled)) {
+               if (s->si[1].state      == SI_ST_CLO &&
+                   s->si[1].prev_state == SI_ST_EST)
+                       stats_event_end_session(s);
+       }
+
        /* the task MUST not be in the run queue anymore */
        session_free(s);
        task_delete(t);
-- 
1.7.5.3


Reply via email to