This improves our AMS-IX border router (200 configured sessions, plus Route Servers) starting performance.
>From 45 minutes to bring everyone up, to 3 minutes, and bgpctl stays acceptably responsive during the thundering herd. Memory pressure is also far more relaxed. OK On 2017 May 26 (Fri) at 21:27:46 +0200 (+0200), Claudio Jeker wrote: :Both bgpctl and bgp neighbors are often not fast enough to keep up with :the RDE. The result is quite a bit of memory bloat or some ugly :workarounds for bgpctl which can result in starving other bgpctl calls to :death. : :This implements a simple XON / XOFF protocol for peers and control :sessions and helps reducing the pain on busy boxes. It is a first step. :There is still some major changes needed to reduce the update overhead :seen when many session start up at the same time. : :I would love to hear from people with larger setups if there are any :problems. :-- ::wq Claudio : : :Index: bgpd.h :=================================================================== :RCS file: /cvs/src/usr.sbin/bgpd/bgpd.h,v :retrieving revision 1.300 :diff -u -p -r1.300 bgpd.h :--- bgpd.h 25 Jan 2017 00:11:07 -0000 1.300 :+++ bgpd.h 25 Jan 2017 04:22:34 -0000 :@@ -87,13 +87,17 @@ : #define F_RTLABEL 0x10000 : : /* :- * Limit the number of control messages generated by the RDE and queued in :- * session engine. The RDE limit defines how many imsg are generated in :- * one poll round. Then if the SE limit is hit the RDE control socket will no :- * longer be polled. :+ * Limit the number of messages queued in the session engine. :+ * The SE will send an IMSG_XOFF messages to the RDE if the high water mark :+ * is reached. The RDE should then throttle this peer or control connection. :+ * Once the message queue in the SE drops below the low water mark an :+ * IMSG_XON message will be sent and the RDE will produce more messages again. : */ : #define RDE_RUNNER_ROUNDS 100 :-#define SESSION_CTL_QUEUE_MAX 10000 :+#define SESS_MSG_HIGH_MARK 300 :+#define SESS_MSG_LOW_MARK 50 :+#define CTL_MSG_HIGH_MARK 500 :+#define CTL_MSG_LOW_MARK 100 : : enum bgpd_process { : PROC_MAIN, :@@ -425,7 +429,9 @@ enum imsg_type { : IMSG_PFTABLE_COMMIT, : IMSG_REFRESH, : IMSG_IFINFO, :- IMSG_DEMOTE :+ IMSG_DEMOTE, :+ IMSG_XON, :+ IMSG_XOFF : }; : : struct demote_msg { :Index: control.c :=================================================================== :RCS file: /cvs/src/usr.sbin/bgpd/control.c,v :retrieving revision 1.87 :diff -u -p -r1.87 control.c :--- control.c 13 Feb 2017 14:48:44 -0000 1.87 :+++ control.c 16 Feb 2017 19:20:23 -0000 :@@ -213,11 +213,16 @@ control_dispatch_msg(struct pollfd *pfd, : return (0); : } : :- if (pfd->revents & POLLOUT) :+ if (pfd->revents & POLLOUT) { : if (msgbuf_write(&c->ibuf.w) <= 0 && errno != EAGAIN) { : *ctl_cnt -= control_close(pfd->fd); : return (1); : } :+ if (c->throttled && c->ibuf.w.queued < CTL_MSG_LOW_MARK) { :+ if (imsg_ctl_rde(IMSG_XON, c->ibuf.pid, NULL, 0) != -1) :+ c->throttled = 0; :+ } :+ } : : if (!(pfd->revents & POLLIN)) : return (0); :@@ -521,6 +526,11 @@ control_imsg_relay(struct imsg *imsg) : : if ((c = control_connbypid(imsg->hdr.pid)) == NULL) : return (0); :+ :+ if (!c->throttled && c->ibuf.w.queued > CTL_MSG_HIGH_MARK) { :+ if (imsg_ctl_rde(IMSG_XOFF, imsg->hdr.pid, NULL, 0) != -1) :+ c->throttled = 1; :+ } : : return (imsg_compose(&c->ibuf, imsg->hdr.type, 0, imsg->hdr.pid, -1, : imsg->data, imsg->hdr.len - IMSG_HEADER_SIZE)); :Index: rde.c :=================================================================== :RCS file: /cvs/src/usr.sbin/bgpd/rde.c,v :retrieving revision 1.361 :diff -u -p -r1.361 rde.c :--- rde.c 25 Jan 2017 03:21:55 -0000 1.361 :+++ rde.c 26 May 2017 18:57:51 -0000 :@@ -76,7 +76,7 @@ void rde_update_log(const char *, u_in : void rde_as4byte_fixup(struct rde_peer *, struct rde_aspath *); : void rde_reflector(struct rde_peer *, struct rde_aspath *); : :-void rde_dump_rib_as(struct prefix *, struct rde_aspath *,pid_t, :+void rde_dump_rib_as(struct prefix *, struct rde_aspath *, pid_t, : int); : void rde_dump_filter(struct prefix *, : struct ctl_show_rib_request *); :@@ -86,8 +86,14 @@ void rde_dump_upcall(struct rib_entry : void rde_dump_prefix_upcall(struct rib_entry *, void *); : void rde_dump_ctx_new(struct ctl_show_rib_request *, pid_t, : enum imsg_type); :-void rde_dump_mrt_new(struct mrt *, pid_t, int); :+void rde_dump_ctx_throttle(pid_t pid, int throttle); :+void rde_dump_runner(void); :+int rde_dump_pending(void); : void rde_dump_done(void *); :+void rde_dump_mrt_new(struct mrt *, pid_t, int); :+void rde_dump_rib_free(struct rib *); :+void rde_dump_mrt_free(struct rib *); :+void rde_rib_free(struct rib_desc *); : : int rde_rdomain_import(struct rde_aspath *, struct rdomain *); : void rde_reload_done(void); :@@ -131,15 +137,19 @@ struct imsgbuf *ibuf_main; : struct rde_memstats rdemem; : : struct rde_dump_ctx { :+ LIST_ENTRY(rde_dump_ctx) entry; : struct rib_context ribctx; : struct ctl_show_rib_request req; : sa_family_t af; :+ u_int8_t throttled; : }; : :+LIST_HEAD(, rde_dump_ctx) rde_dump_h = LIST_HEAD_INITIALIZER(rde_dump_h); :+ : struct rde_mrt_ctx { :- struct mrt mrt; :- struct rib_context ribctx; : LIST_ENTRY(rde_mrt_ctx) entry; :+ struct rib_context ribctx; :+ struct mrt mrt; : }; : : LIST_HEAD(, rde_mrt_ctx) rde_mrts = LIST_HEAD_INITIALIZER(rde_mrts); :@@ -248,7 +258,7 @@ rde_main(int debug, int verbose) : set_pollfd(&pfd[PFD_PIPE_SESSION], ibuf_se); : set_pollfd(&pfd[PFD_PIPE_SESSION_CTL], ibuf_se_ctl); : :- if (rib_dump_pending() && :+ if (rde_dump_pending() && : ibuf_se_ctl && ibuf_se_ctl->w.queued == 0) : timeout = 0; : :@@ -261,7 +271,6 @@ rde_main(int debug, int verbose) : i++; : } else if (mctx->mrt.state == MRT_STATE_REMOVE) { : close(mctx->mrt.wbuf.fd); :- LIST_REMOVE(&mctx->ribctx, entry); : LIST_REMOVE(mctx, entry); : free(mctx); : rde_mrt_cnt--; :@@ -307,9 +316,9 @@ rde_main(int debug, int verbose) : rde_update_queue_runner(); : for (aid = AID_INET6; aid < AID_MAX; aid++) : rde_update6_queue_runner(aid); :- if (rib_dump_pending() && :+ if (rde_dump_pending() && : ibuf_se_ctl && ibuf_se_ctl->w.queued <= 10) :- rib_dump_runner(); :+ rde_dump_runner(); : } : : /* close pipes */ :@@ -334,7 +343,6 @@ rde_main(int debug, int verbose) : while ((mctx = LIST_FIRST(&rde_mrts)) != NULL) { : msgbuf_clear(&mctx->mrt.wbuf); : close(mctx->mrt.wbuf.fd); :- LIST_REMOVE(&mctx->ribctx, entry); : LIST_REMOVE(mctx, entry); : free(mctx); : } :@@ -611,6 +619,23 @@ badnet: : memcpy(&verbose, imsg.data, sizeof(verbose)); : log_setverbose(verbose); : break; :+ case IMSG_XON: :+ if (imsg.hdr.peerid) { :+ peer = peer_get(imsg.hdr.peerid); :+ if (peer) :+ peer->throttled = 0; :+ break; :+ } else :+ rde_dump_ctx_throttle(imsg.hdr.pid, 0); :+ break; :+ case IMSG_XOFF: :+ if (imsg.hdr.peerid) { :+ peer = peer_get(imsg.hdr.peerid); :+ if (peer) :+ peer->throttled = 1; :+ } else :+ rde_dump_ctx_throttle(imsg.hdr.pid, 1); :+ break; : default: : break; : } :@@ -737,7 +762,7 @@ rde_dispatch_imsg_parent(struct imsgbuf : */ : in_rules = ribd->in_rules; : ribd->in_rules = NULL; :- rib_free(rib); :+ rde_rib_free(ribd); : rib = rib_new(rn.name, rn.rtableid, rn.flags); : ribd->in_rules = in_rules; : } else :@@ -2356,7 +2381,7 @@ rde_dump_ctx_new(struct ctl_show_rib_req : memcpy(&ctx->req, req, sizeof(struct ctl_show_rib_request)); : ctx->req.pid = pid; : ctx->req.type = type; :- ctx->ribctx.ctx_count = RDE_RUNNER_ROUNDS; :+ ctx->ribctx.ctx_count = CTL_MSG_HIGH_MARK; : ctx->ribctx.ctx_rib = rib; : switch (ctx->req.type) { : case IMSG_CTL_SHOW_NETWORK: :@@ -2398,20 +2423,72 @@ rde_dump_ctx_new(struct ctl_show_rib_req : ctx->ribctx.ctx_done = rde_dump_done; : ctx->ribctx.ctx_arg = ctx; : ctx->ribctx.ctx_aid = ctx->req.aid; :+ LIST_INSERT_HEAD(&rde_dump_h, ctx, entry); : rib_dump_r(&ctx->ribctx); : } : : void :+rde_dump_ctx_throttle(pid_t pid, int throttle) :+{ :+ struct rde_dump_ctx *ctx; :+ :+ LIST_FOREACH(ctx, &rde_dump_h, entry) { :+ if (ctx->req.pid == pid) { :+ ctx->throttled = throttle; :+ return; :+ } :+ } :+} :+ :+void :+rde_dump_runner(void) :+{ :+ struct rde_dump_ctx *ctx, *next; :+ :+ for (ctx = LIST_FIRST(&rde_dump_h); ctx != NULL; ctx = next) { :+ next = LIST_NEXT(ctx, entry); :+ if (!ctx->throttled) :+ rib_dump_r(&ctx->ribctx); :+ } :+} :+ :+int :+rde_dump_pending(void) :+{ :+ struct rde_dump_ctx *ctx; :+ :+ /* return true if there is at least one unthrottled context */ :+ LIST_FOREACH(ctx, &rde_dump_h, entry) :+ if (!ctx->throttled) :+ return (1); :+ :+ return (0); :+} :+ :+void : rde_dump_done(void *arg) : { : struct rde_dump_ctx *ctx = arg; : : imsg_compose(ibuf_se_ctl, IMSG_CTL_END, 0, ctx->req.pid, : -1, NULL, 0); :+ LIST_REMOVE(ctx, entry); : free(ctx); : } : : void :+rde_dump_rib_free(struct rib *rib) :+{ :+ struct rde_dump_ctx *ctx, *next; :+ :+ for (ctx = LIST_FIRST(&rde_dump_h); ctx != NULL; ctx = next) { :+ next = LIST_NEXT(ctx, entry); :+ if (ctx->ribctx.ctx_rib == rib) :+ rde_dump_done(ctx); :+ } :+} :+ :+void : rde_dump_mrt_new(struct mrt *mrt, pid_t pid, int fd) : { : struct rde_mrt_ctx *ctx; :@@ -2435,7 +2512,7 @@ rde_dump_mrt_new(struct mrt *mrt, pid_t : if (ctx->mrt.type == MRT_TABLE_DUMP_V2) : mrt_dump_v2_hdr(&ctx->mrt, conf, &peerlist); : :- ctx->ribctx.ctx_count = RDE_RUNNER_ROUNDS; :+ ctx->ribctx.ctx_count = CTL_MSG_HIGH_MARK; : ctx->ribctx.ctx_rib = rib; : ctx->ribctx.ctx_upcall = mrt_dump_upcall; : ctx->ribctx.ctx_done = mrt_done; :@@ -2446,6 +2523,28 @@ rde_dump_mrt_new(struct mrt *mrt, pid_t : rib_dump_r(&ctx->ribctx); : } : :+void :+rde_dump_mrt_free(struct rib *rib) :+{ :+ struct rde_mrt_ctx *ctx, *next; :+ :+ for (ctx = LIST_FIRST(&rde_mrts); ctx != NULL; ctx = next) { :+ next = LIST_NEXT(ctx, entry); :+ if (ctx->ribctx.ctx_rib == rib) :+ mrt_done(&ctx->mrt); :+ } :+} :+ :+void :+rde_rib_free(struct rib_desc *rd) :+{ :+ /* abort pending rib_dumps */ :+ rde_dump_rib_free(&rd->rib); :+ rde_dump_mrt_free(&rd->rib); :+ :+ rib_free(&rd->rib); :+} :+ : /* : * kroute specific functions : */ :@@ -2847,7 +2946,7 @@ rde_reload_done(void) : : switch (ribs[rid].state) { : case RECONF_DELETE: :- rib_free(&ribs[rid].rib); :+ rde_rib_free(&ribs[rid]); : break; : case RECONF_KEEP: : if (rde_filter_equal(ribs[rid].in_rules, :Index: rde.h :=================================================================== :RCS file: /cvs/src/usr.sbin/bgpd/rde.h,v :retrieving revision 1.160 :diff -u -p -r1.160 rde.h :--- rde.h 25 Jan 2017 03:21:55 -0000 1.160 :+++ rde.h 25 Jan 2017 04:22:34 -0000 :@@ -81,6 +81,7 @@ struct rde_peer { : u_int16_t mrt_idx; : u_int8_t reconf_out; /* out filter changed */ : u_int8_t reconf_rib; /* rib changed */ :+ u_int8_t throttled; : }; : : #define AS_SET 1 :@@ -267,7 +268,6 @@ struct pt_entry_vpn4 { : }; : : struct rib_context { :- LIST_ENTRY(rib_context) entry; : struct rib_entry *ctx_re; : struct rib *ctx_rib; : void (*ctx_upcall)(struct rib_entry *, void *); :@@ -436,8 +436,6 @@ struct rib_entry *rib_lookup(struct rib : void rib_dump(struct rib *, void (*)(struct rib_entry *, void *), : void *, u_int8_t); : void rib_dump_r(struct rib_context *); :-void rib_dump_runner(void); :-int rib_dump_pending(void); : : static inline struct rib * : re_rib(struct rib_entry *re) :Index: rde_rib.c :=================================================================== :RCS file: /cvs/src/usr.sbin/bgpd/rde_rib.c,v :retrieving revision 1.153 :diff -u -p -r1.153 rde_rib.c :--- rde_rib.c 25 Jan 2017 03:21:55 -0000 1.153 :+++ rde_rib.c 26 May 2017 18:08:29 -0000 :@@ -38,8 +38,6 @@ : u_int16_t rib_size; : struct rib_desc *ribs; : :-LIST_HEAD(, rib_context) rib_dump_h = LIST_HEAD_INITIALIZER(rib_dump_h); :- : struct rib_entry *rib_add(struct rib *, struct bgpd_addr *, int); : int rib_compare(const struct rib_entry *, const struct rib_entry *); : void rib_remove(struct rib_entry *); :@@ -136,25 +134,10 @@ rib_desc(struct rib *rib) : void : rib_free(struct rib *rib) : { :- struct rib_context *ctx, *next; : struct rib_desc *rd; : struct rib_entry *re, *xre; : struct prefix *p, *np; : :- /* abort pending rib_dumps */ :- for (ctx = LIST_FIRST(&rib_dump_h); ctx != NULL; ctx = next) { :- next = LIST_NEXT(ctx, entry); :- if (ctx->ctx_rib == rib) { :- re = ctx->ctx_re; :- re_unlock(re); :- LIST_REMOVE(ctx, entry); :- if (ctx->ctx_done) :- ctx->ctx_done(ctx->ctx_arg); :- else :- free(ctx); :- } :- } :- : for (re = RB_MIN(rib_tree, rib_tree(rib)); re != NULL; re = xre) { : xre = RB_NEXT(rib_tree, rib_tree(rib), re); : :@@ -311,10 +294,9 @@ rib_dump_r(struct rib_context *ctx) : struct rib_entry *re; : unsigned int i; : :- if (ctx->ctx_re == NULL) { :+ if (ctx->ctx_re == NULL) : re = RB_MIN(rib_tree, rib_tree(ctx->ctx_rib)); :- LIST_INSERT_HEAD(&rib_dump_h, ctx, entry); :- } else :+ else : re = rib_restart(ctx); : : for (i = 0; re != NULL; re = RB_NEXT(rib_tree, unused, re)) { :@@ -322,7 +304,7 @@ rib_dump_r(struct rib_context *ctx) : ctx->ctx_aid != re->prefix->aid) : continue; : if (ctx->ctx_count && i++ >= ctx->ctx_count && :- re_is_locked(re)) { :+ !re_is_locked(re)) { : /* store and lock last element */ : ctx->ctx_re = re; : re_lock(re); :@@ -331,7 +313,6 @@ rib_dump_r(struct rib_context *ctx) : ctx->ctx_upcall(re, ctx->ctx_arg); : } : :- LIST_REMOVE(ctx, entry); : if (ctx->ctx_done) : ctx->ctx_done(ctx->ctx_arg); : else :@@ -355,23 +336,6 @@ rib_restart(struct rib_context *ctx) : rib_remove(ctx->ctx_re); : ctx->ctx_re = NULL; : return (re); :-} :- :-void :-rib_dump_runner(void) :-{ :- struct rib_context *ctx, *next; :- :- for (ctx = LIST_FIRST(&rib_dump_h); ctx != NULL; ctx = next) { :- next = LIST_NEXT(ctx, entry); :- rib_dump_r(ctx); :- } :-} :- :-int :-rib_dump_pending(void) :-{ :- return (!LIST_EMPTY(&rib_dump_h)); : } : : /* used to bump correct prefix counters */ :Index: session.c :=================================================================== :RCS file: /cvs/src/usr.sbin/bgpd/session.c,v :retrieving revision 1.359 :diff -u -p -r1.359 session.c :--- session.c 13 Feb 2017 14:48:44 -0000 1.359 :+++ session.c 26 May 2017 18:58:46 -0000 :@@ -195,7 +195,6 @@ session_main(int debug, int verbose) : u_int pfd_elms = 0, peer_l_elms = 0, mrt_l_elms = 0; : u_int listener_cnt, ctl_cnt, mrt_cnt; : u_int new_cnt; :- u_int32_t ctl_queued; : struct passwd *pw; : struct peer *p, **peer_l = NULL, *last, *next; : struct mrt *m, *xm, **mrt_l = NULL; :@@ -356,17 +355,7 @@ session_main(int debug, int verbose) : : set_pollfd(&pfd[PFD_PIPE_MAIN], ibuf_main); : set_pollfd(&pfd[PFD_PIPE_ROUTE], ibuf_rde); :- :- ctl_queued = 0; :- TAILQ_FOREACH(ctl_conn, &ctl_conns, entry) :- ctl_queued += ctl_conn->ibuf.w.queued; :- :- /* :- * Do not act as unlimited buffer. Don't read in more :- * messages if the ctl sockets are getting full. :- */ :- if (ctl_queued < SESSION_CTL_QUEUE_MAX) :- set_pollfd(&pfd[PFD_PIPE_ROUTE_CTL], ibuf_rde_ctl); :+ set_pollfd(&pfd[PFD_PIPE_ROUTE_CTL], ibuf_rde_ctl); : : if (pauseaccept == 0) { : pfd[PFD_SOCK_CTL].fd = csock; :@@ -1389,6 +1378,13 @@ session_sendmsg(struct bgp_msg *msg, str : } : : ibuf_close(&p->wbuf, msg->buf); :+ if (!p->throttled && p->wbuf.queued > SESS_MSG_HIGH_MARK) { :+ if (imsg_compose(ibuf_rde, IMSG_XOFF, p->conf.id, 0, -1, :+ NULL, 0) == -1) :+ log_peer_warn(&p->conf, "imsg_compose XOFF"); :+ p->throttled = 1; :+ } :+ : free(msg); : return (0); : } :@@ -1773,6 +1769,12 @@ session_dispatch_msg(struct pollfd *pfd, : log_peer_warn(&p->conf, "write error"); : bgp_fsm(p, EVNT_CON_FATAL); : return (1); :+ } :+ if (p->throttled && p->wbuf.queued < SESS_MSG_LOW_MARK) { :+ if (imsg_compose(ibuf_rde, IMSG_XON, p->conf.id, 0, -1, :+ NULL, 0) == -1) :+ log_peer_warn(&p->conf, "imsg_compose XON"); :+ p->throttled = 0; : } : if (!(pfd->revents & POLLIN)) : return (1); :Index: session.h :=================================================================== :RCS file: /cvs/src/usr.sbin/bgpd/session.h,v :retrieving revision 1.122 :diff -u -p -r1.122 session.h :--- session.h 13 Jan 2017 18:59:12 -0000 1.122 :+++ session.h 19 Jan 2017 23:32:14 -0000 :@@ -142,6 +142,7 @@ struct ctl_conn { : TAILQ_ENTRY(ctl_conn) entry; : struct imsgbuf ibuf; : int restricted; :+ int throttled; : }; : : TAILQ_HEAD(ctl_conns, ctl_conn) ctl_conns; :@@ -225,6 +226,7 @@ struct peer { : u_int8_t depend_ok; : u_int8_t demoted; : u_int8_t passive; :+ u_int8_t throttled; : }; : : extern struct peer *peers; : -- Pardon this fortune. Database under reconstruction.