--- doc/configuration.txt | 22 ++++ include/proto/dumpstats.h | 5 + include/types/stream_interface.h | 5 + src/dumpstats.c | 242 +++++++++++++++++++++++++++++++++++++- src/session.c | 9 ++ 5 files changed, 280 insertions(+), 3 deletions(-)
diff --git a/doc/configuration.txt b/doc/configuration.txt index 2ede208..5b1ca85 100644 --- a/doc/configuration.txt +++ b/doc/configuration.txt @@ -10031,6 +10031,28 @@ shutdown sessions <backend>/<server> maintenance mode, for instance. Such terminated sessions are reported with a 'K' flag in the logs. +debug sess [proxy:<proxy_name>[:<server_name>]] + + Dump a stream of events about sessions as they are added and removed. + The possible event formats are "Forward" and "Close": + + "F <session_id> <in_peer> - <in_sock> | <out_sock> - <out_peer>\n" + "C <session_id>\n" + + Streaming will continue until a new command is received or the + connection is closed. If <proxy_name> or <server_name> is specified, limit to + events concerning only the proxy and server specified. + + This command is restricted and can only be issued on sockets configured + for levels "operator" or "admin". + + Example: + >>> $ echo "set timeout cli 3600; debug sess" | socat stdio,ignoreeof /tmp/sock1 + F 1 127.0.0.1:50869 - 127.0.0.1:9418 | 127.0.0.1:50870 - 127.0.0.1:6000 + C 1 + F 2 127.0.0.1:50874 - 127.0.0.1:9418 | 127.0.0.1:50875 - 127.0.0.1:6000 + C 2 + /* * Local variables: * fill-column: 79 diff --git a/include/proto/dumpstats.h b/include/proto/dumpstats.h index eb44a36..d328881 100644 --- a/include/proto/dumpstats.h +++ b/include/proto/dumpstats.h @@ -55,6 +55,8 @@ #define STAT_CLI_O_TAB 8 /* dump tables */ #define STAT_CLI_O_CLR 9 /* clear tables */ +#define STAT_CLI_EVENTS 8 /* event stream */ + /* status codes (strictly 4 chars) used in the URL to display a message */ #define STAT_STATUS_UNKN "UNKN" /* an unknown error occured, shouldn't happen */ #define STAT_STATUS_DONE "DONE" /* the action is successful */ @@ -63,8 +65,11 @@ #define STAT_STATUS_DENY "DENY" /* action denied */ extern struct si_applet http_stats_applet; +extern int stats_event_enabled; void stats_io_handler(struct stream_interface *si); +void stats_event_new_session(struct session *s); +void stats_event_end_session(struct session *s); #endif /* _PROTO_DUMPSTATS_H */ diff --git a/include/types/stream_interface.h b/include/types/stream_interface.h index 16af806..d730559 100644 --- a/include/types/stream_interface.h +++ b/include/types/stream_interface.h @@ -169,6 +169,11 @@ struct stream_interface { int bol; /* pointer to beginning of current line */ } errors; struct { + struct list list; /* list of stats streams in the STAT_CLI_EVENTS state */ + struct proxy *px; /* if not NULL, only send events associated with this proxy */ + struct server *srv; /* if not NULL, only send events associated with this server */ + } events; + struct { void *target; /* table we want to dump, or NULL for all */ struct proxy *proxy; /* table being currently dumped (first if NULL) */ struct stksess *entry; /* last entry we were trying to dump (or first if NULL) */ diff --git a/src/dumpstats.c b/src/dumpstats.c index 74ad966..b3f4423 100644 --- a/src/dumpstats.c +++ b/src/dumpstats.c @@ -86,6 +86,7 @@ static const char stats_sock_usage_msg[] = " disable : put a server or frontend in maintenance mode\n" " enable : re-enable a server or frontend which is in maintenance mode\n" " shutdown : kill a session or a frontend (eg:to release listening ports)\n" + " debug sess : stream events about proxied sessions\n" ""; static const char stats_permission_denied_msg[] = @@ -114,6 +115,77 @@ enum { STAT_PX_ST_FIN, }; +/* Keep track of sessions that want streaming events (STAT_CLI_EVENT). + */ +int stats_event_enabled = 0; +static struct list stats_event_listeners = LIST_HEAD_INIT(stats_event_listeners); + +/* Add a session to the list of event listeners. + */ +static inline void stats_event_listener_add(struct stream_interface *si) +{ + LIST_ADDQ(&stats_event_listeners, &si->applet.ctx.events.list); + stats_event_enabled = 1; +} + +/* Remove a session from the list of listeners, but only if it is a + * registered listener. This enables us to invoke the method on all + * disconnecting stats sockets to ensure they are cleaned up, regardless + * of how many times they switch between streaming and other commands. + */ +static inline void stats_event_listener_remove(struct stream_interface *si) +{ + int found = 0; + struct stream_interface *curr; + list_for_each_entry(curr, &stats_event_listeners, applet.ctx.events.list) { + if (curr == si) { + found = 1; + break; + } + } + + if (found) { + si->applet.ctx.events.px = NULL; + si->applet.ctx.events.srv = NULL; + LIST_DEL(&si->applet.ctx.events.list); + } + + if (LIST_ISEMPTY(&stats_event_listeners)) + stats_event_enabled = 0; + + /* Re-initialize stats output */ + memset(&si->applet.ctx.stats, 0, sizeof(si->applet.ctx.stats)); +} + +/* Send a message to all registered event listeners. + */ +static inline void stats_event_listener_message_all(char *msg, struct session *s) +{ + struct stream_interface *curr; + + list_for_each_entry(curr, &stats_event_listeners, applet.ctx.events.list) { + struct proxy *px; + struct server *srv; + + if (!(curr->flags & SI_FL_DONT_WAKE) && curr->owner) { + /* filter by proxy and server if required */ + if ((px = curr->applet.ctx.events.px)) { + if (s->be != px && s->fe != px) + continue; /* ignore */ + if ((srv = curr->applet.ctx.events.srv)) { + if (target_srv(&s->target) != srv) + continue; /* ignore */ + } + } + + if (buffer_feed(curr->ib, msg) == -1) { + curr->ib->flags |= BF_SEND_DONTWAIT; + task_wakeup(curr->owner, TASK_WOKEN_MSG); + } + } + } +} + /* This function is called from the session-level accept() in order to instanciate * a new stats socket. It returns a positive value upon success, 0 if the connection * needs to be closed and ignored, or a negative value upon critical failure. @@ -772,7 +844,54 @@ static int stats_sock_parse_request(struct stream_interface *si, char *line) args[arg] = line; si->applet.ctx.stats.flags = 0; - if (strcmp(args[0], "show") == 0) { + if (strcmp(args[0], "debug") == 0) { + if (strcmp(args[1], "sess") == 0) { + if (s->listener->perm.ux.level < ACCESS_LVL_OPER) { + si->applet.ctx.cli.msg = stats_permission_denied_msg; + si->applet.st0 = STAT_CLI_PRINT; + return 1; + } + if (*args[2] && !strncmp(args[2], "proxy", 5)) { + struct proxy *px = NULL; + struct server *srv = NULL; + char *px_name = args[2] + 6, *srv_name; + + if ((srv_name = strchr(px_name, ':'))) { + *srv_name = 0; + srv_name += 1; + } + + px = findproxy(px_name, PR_CAP_FE|PR_CAP_BE); + if (!px) { + si->applet.ctx.cli.msg = "Invalid proxy filter for event stream."; + si->applet.st0 = STAT_CLI_PRINT; + return 1; + } + + if (srv_name && *srv_name) { + srv = findserver(px, srv_name); + if (!srv) { + si->applet.ctx.cli.msg = "Invalid server filter for event stream."; + si->applet.st0 = STAT_CLI_PRINT; + return 1; + } + } + + si->applet.ctx.events.srv = srv; + si->applet.ctx.events.px = px; + } else { + si->applet.ctx.events.srv = NULL; + si->applet.ctx.events.px = NULL; + } + + stats_event_listener_add(si); + si->applet.st0 = STAT_CLI_EVENTS; + } + else { /* not "sess" */ + return 0; + } + } + else if (strcmp(args[0], "show") == 0) { if (strcmp(args[1], "stat") == 0) { if (*args[2] && *args[3] && *args[4]) { si->applet.ctx.stats.flags |= STAT_BOUND; @@ -1346,6 +1465,14 @@ static int stats_sock_parse_request(struct stream_interface *si, char *line) return 1; } +/* Callback to release a cli session. + */ +static void cli_session_release(struct stream_interface *si) +{ + /* remove if registered as event listener */ + stats_event_listener_remove(si); +} + /* This I/O handler runs as an applet embedded in a stream interface. It is * used to processes I/O from/to the stats unix socket. The system relies on a * state machine handling requests and various responses. We read a request, @@ -1377,7 +1504,7 @@ static void cli_io_handler(struct stream_interface *si) si->shutw(si); break; } - else if (si->applet.st0 == STAT_CLI_GETREQ) { + else if (si->applet.st0 == STAT_CLI_GETREQ || si->applet.st0 == STAT_CLI_EVENTS) { /* ensure we have some output room left in the event we * would want to return some info right after parsing. */ @@ -1417,7 +1544,10 @@ static void cli_io_handler(struct stream_interface *si) trash[len] = '\0'; + if (si->applet.st0 == STAT_CLI_EVENTS) + stats_event_listener_remove(si); si->applet.st0 = STAT_CLI_PROMPT; + if (len) { if (strcmp(trash, "quit") == 0) { si->applet.st0 = STAT_CLI_END; @@ -3725,6 +3855,112 @@ static int stats_table_request(struct stream_interface *si, bool show) return 1; } +/* Called whenever a new session is successfully established (reaches + * SI_ST_EST). If there are any stats sockets listening in the + * STAT_CLI_EVENTS state, they will be notified of this session's unique + * id, along with the sockname and peername of both sides of the session. + */ +void stats_event_new_session(struct session *s) +{ + if (LIST_ISEMPTY(&stats_event_listeners)) + return; + + char addrs[4][INET6_ADDRSTRLEN + strlen(":65535") + 1] = {"","","",""}; + int i; + + struct stream_interface *si0 = &s->si[0], *si1 = &s->si[1]; + socklen_t namelen; + + /* si0 from/to = peer/sock */ + if (!(si0->flags & SI_FL_FROM_SET)) { + namelen = sizeof(si0->addr.from); + getpeername(si0->fd, (struct sockaddr *)&si0->addr.from, &namelen); + si0->flags |= SI_FL_FROM_SET; + } + if (!(si0->flags & SI_FL_TO_SET)) { + namelen = sizeof(si0->addr.to); + getsockname(si0->fd, (struct sockaddr *)&si0->addr.to, &namelen); + si0->flags |= SI_FL_TO_SET; + } + + /* si1 from/to = sock/peer (reversed) */ + if (!(si1->flags & SI_FL_FROM_SET)) { + namelen = sizeof(si1->addr.from); + getsockname(si1->fd, (struct sockaddr *)&si1->addr.from, &namelen); + si1->flags |= SI_FL_FROM_SET; + } + if (!(si1->flags & SI_FL_TO_SET)) { + namelen = sizeof(si1->addr.to); + getpeername(si1->fd, (struct sockaddr *)&si1->addr.to, &namelen); + si1->flags |= SI_FL_TO_SET; + } + + for(i = 0; i < 4; i++) { + struct sockaddr_storage *sock; + const void *sin_addr = NULL; + int port = 0; + + switch (i) { + case 0: // inbound peer + sock = &si0->addr.from; + break; + case 1: // inbound sock + sock = &si0->addr.to; + break; + case 2: // outbound sock + sock = &si1->addr.from; + break; + case 3: // outbound peer + sock = &si1->addr.to; + break; + } + + switch (sock->ss_family) { + case AF_INET: + sin_addr = (const void *)&((struct sockaddr_in *)sock)->sin_addr; + port = ntohs(((struct sockaddr_in *)sock)->sin_port); + break; + case AF_INET6: + sin_addr = (const void *)&((struct sockaddr_in6 *)sock)->sin6_addr; + port = ntohs(((struct sockaddr_in6 *)sock)->sin6_port); + break; + } + + switch (sock->ss_family) { + case AF_INET: + case AF_INET6: + inet_ntop(sock->ss_family, sin_addr, addrs[i], sizeof(addrs[i])); + snprintf(addrs[i]+strlen(addrs[i]), sizeof(addrs[i])-strlen(addrs[i])-1, ":%d", port); + break; + case AF_UNIX: + sprintf(addrs[i], "unix:%d", s->listener->luid); + break; + default: + sprintf(addrs[i], "%s", "unknown"); + } + } + + snprintf(trash, sizeof(trash), "F %u %s - %s | %s - %s\n", s->uniq_id, + addrs[0], // inbound peer + addrs[1], // inbound sock + addrs[2], // outbound sock + addrs[3] // outbound peer + ); + stats_event_listener_message_all(trash, s); +} + +/* Called when the session argument's s->si[1]->state goes from SI_ST_EST + * to SI_ST_CLO. All stats listeners are notified of this destroy event. + */ +void stats_event_end_session(struct session *s) +{ + if (LIST_ISEMPTY(&stats_event_listeners)) + return; + + snprintf(trash, sizeof(trash), "C %u\n", s->uniq_id); + stats_event_listener_message_all(trash, s); +} + /* print a line of text buffer (limited to 70 bytes) to <out>. The format is : * <2 spaces> <offset=5 digits> <space or plus> <space> <70 chars max> <\n> * which is 60 chars per line. Non-printable chars \t, \n, \r and \e are @@ -3939,7 +4175,7 @@ struct si_applet http_stats_applet = { static struct si_applet cli_applet = { .name = "<CLI>", /* used for logging */ .fct = cli_io_handler, - .release = NULL, + .release = cli_session_release, }; static struct cfg_kw_list cfg_kws = {{ },{ diff --git a/src/session.c b/src/session.c index e5b76eb..48c5435 100644 --- a/src/session.c +++ b/src/session.c @@ -708,6 +708,9 @@ static void sess_establish(struct session *s, struct stream_interface *si) rep->rto = s->be->timeout.server; } req->wex = TICK_ETERNITY; + + if (unlikely(stats_event_enabled)) + stats_event_new_session(s); } /* Update stream interface status for input states SI_ST_ASS, SI_ST_QUE, SI_ST_TAR. @@ -2148,6 +2151,12 @@ struct task *process_session(struct task *t) s->do_log(s); } + if (unlikely(stats_event_enabled)) { + if (s->si[1].state == SI_ST_CLO && + s->si[1].prev_state == SI_ST_EST) + stats_event_end_session(s); + } + /* the task MUST not be in the run queue anymore */ session_free(s); task_delete(t); -- 1.7.5.3