This improves our AMS-IX border router (200 configured sessions, plus
Route Servers) starting performance.

>From 45 minutes to bring everyone up, to 3 minutes, and bgpctl stays
acceptably responsive during the thundering herd.  Memory pressure is
also far more relaxed.

OK


On 2017 May 26 (Fri) at 21:27:46 +0200 (+0200), Claudio Jeker wrote:
:Both bgpctl and bgp neighbors are often not fast enough to keep up with
:the RDE. The result is quite a bit of memory bloat or some ugly
:workarounds for bgpctl which can result in starving other bgpctl calls to
:death.
:
:This implements a simple XON / XOFF protocol for peers and control
:sessions and helps reducing the pain on busy boxes. It is a first step.
:There is still some major changes needed to reduce the update overhead
:seen when many session start up at the same time.
:
:I would love to hear from people with larger setups if there are any
:problems.
:-- 
::wq Claudio
:
:
:Index: bgpd.h
:===================================================================
:RCS file: /cvs/src/usr.sbin/bgpd/bgpd.h,v
:retrieving revision 1.300
:diff -u -p -r1.300 bgpd.h
:--- bgpd.h     25 Jan 2017 00:11:07 -0000      1.300
:+++ bgpd.h     25 Jan 2017 04:22:34 -0000
:@@ -87,13 +87,17 @@
: #define       F_RTLABEL               0x10000
: 
: /*
:- * Limit the number of control messages generated by the RDE and queued in
:- * session engine. The RDE limit defines how many imsg are generated in
:- * one poll round. Then if the SE limit is hit the RDE control socket will no
:- * longer be polled.
:+ * Limit the number of messages queued in the session engine.
:+ * The SE will send an IMSG_XOFF messages to the RDE if the high water mark
:+ * is reached. The RDE should then throttle this peer or control connection.
:+ * Once the message queue in the SE drops below the low water mark an
:+ * IMSG_XON message will be sent and the RDE will produce more messages again.
:  */
: #define RDE_RUNNER_ROUNDS     100
:-#define SESSION_CTL_QUEUE_MAX 10000
:+#define SESS_MSG_HIGH_MARK    300
:+#define SESS_MSG_LOW_MARK     50
:+#define CTL_MSG_HIGH_MARK     500
:+#define CTL_MSG_LOW_MARK      100
: 
: enum bgpd_process {
:       PROC_MAIN,
:@@ -425,7 +429,9 @@ enum imsg_type {
:       IMSG_PFTABLE_COMMIT,
:       IMSG_REFRESH,
:       IMSG_IFINFO,
:-      IMSG_DEMOTE
:+      IMSG_DEMOTE,
:+      IMSG_XON,
:+      IMSG_XOFF
: };
: 
: struct demote_msg {
:Index: control.c
:===================================================================
:RCS file: /cvs/src/usr.sbin/bgpd/control.c,v
:retrieving revision 1.87
:diff -u -p -r1.87 control.c
:--- control.c  13 Feb 2017 14:48:44 -0000      1.87
:+++ control.c  16 Feb 2017 19:20:23 -0000
:@@ -213,11 +213,16 @@ control_dispatch_msg(struct pollfd *pfd,
:               return (0);
:       }
: 
:-      if (pfd->revents & POLLOUT)
:+      if (pfd->revents & POLLOUT) {
:               if (msgbuf_write(&c->ibuf.w) <= 0 && errno != EAGAIN) {
:                       *ctl_cnt -= control_close(pfd->fd);
:                       return (1);
:               }
:+              if (c->throttled && c->ibuf.w.queued < CTL_MSG_LOW_MARK) {
:+                      if (imsg_ctl_rde(IMSG_XON, c->ibuf.pid, NULL, 0) != -1)
:+                              c->throttled = 0;
:+              }
:+      }
: 
:       if (!(pfd->revents & POLLIN))
:               return (0);
:@@ -521,6 +526,11 @@ control_imsg_relay(struct imsg *imsg)
: 
:       if ((c = control_connbypid(imsg->hdr.pid)) == NULL)
:               return (0);
:+
:+      if (!c->throttled && c->ibuf.w.queued > CTL_MSG_HIGH_MARK) {
:+              if (imsg_ctl_rde(IMSG_XOFF, imsg->hdr.pid, NULL, 0) != -1)
:+                      c->throttled = 1;
:+      }
: 
:       return (imsg_compose(&c->ibuf, imsg->hdr.type, 0, imsg->hdr.pid, -1,
:           imsg->data, imsg->hdr.len - IMSG_HEADER_SIZE));
:Index: rde.c
:===================================================================
:RCS file: /cvs/src/usr.sbin/bgpd/rde.c,v
:retrieving revision 1.361
:diff -u -p -r1.361 rde.c
:--- rde.c      25 Jan 2017 03:21:55 -0000      1.361
:+++ rde.c      26 May 2017 18:57:51 -0000
:@@ -76,7 +76,7 @@ void          rde_update_log(const char *, u_in
: void           rde_as4byte_fixup(struct rde_peer *, struct rde_aspath *);
: void           rde_reflector(struct rde_peer *, struct rde_aspath *);
: 
:-void           rde_dump_rib_as(struct prefix *, struct rde_aspath *,pid_t,
:+void           rde_dump_rib_as(struct prefix *, struct rde_aspath *, pid_t,
:                    int);
: void           rde_dump_filter(struct prefix *,
:                    struct ctl_show_rib_request *);
:@@ -86,8 +86,14 @@ void                 rde_dump_upcall(struct rib_entry 
: void           rde_dump_prefix_upcall(struct rib_entry *, void *);
: void           rde_dump_ctx_new(struct ctl_show_rib_request *, pid_t,
:                    enum imsg_type);
:-void           rde_dump_mrt_new(struct mrt *, pid_t, int);
:+void           rde_dump_ctx_throttle(pid_t pid, int throttle);
:+void           rde_dump_runner(void);
:+int            rde_dump_pending(void);
: void           rde_dump_done(void *);
:+void           rde_dump_mrt_new(struct mrt *, pid_t, int);
:+void           rde_dump_rib_free(struct rib *);
:+void           rde_dump_mrt_free(struct rib *);
:+void           rde_rib_free(struct rib_desc *);
: 
: int            rde_rdomain_import(struct rde_aspath *, struct rdomain *);
: void           rde_reload_done(void);
:@@ -131,15 +137,19 @@ struct imsgbuf           *ibuf_main;
: struct rde_memstats    rdemem;
: 
: struct rde_dump_ctx {
:+      LIST_ENTRY(rde_dump_ctx)        entry;
:       struct rib_context              ribctx;
:       struct ctl_show_rib_request     req;
:       sa_family_t                     af;
:+      u_int8_t                        throttled;
: };
: 
:+LIST_HEAD(, rde_dump_ctx) rde_dump_h = LIST_HEAD_INITIALIZER(rde_dump_h);
:+
: struct rde_mrt_ctx {
:-      struct mrt              mrt;
:-      struct rib_context      ribctx;
:       LIST_ENTRY(rde_mrt_ctx) entry;
:+      struct rib_context      ribctx;
:+      struct mrt              mrt;
: };
: 
: LIST_HEAD(, rde_mrt_ctx) rde_mrts = LIST_HEAD_INITIALIZER(rde_mrts);
:@@ -248,7 +258,7 @@ rde_main(int debug, int verbose)
:               set_pollfd(&pfd[PFD_PIPE_SESSION], ibuf_se);
:               set_pollfd(&pfd[PFD_PIPE_SESSION_CTL], ibuf_se_ctl);
: 
:-              if (rib_dump_pending() &&
:+              if (rde_dump_pending() &&
:                   ibuf_se_ctl && ibuf_se_ctl->w.queued == 0)
:                       timeout = 0;
: 
:@@ -261,7 +271,6 @@ rde_main(int debug, int verbose)
:                               i++;
:                       } else if (mctx->mrt.state == MRT_STATE_REMOVE) {
:                               close(mctx->mrt.wbuf.fd);
:-                              LIST_REMOVE(&mctx->ribctx, entry);
:                               LIST_REMOVE(mctx, entry);
:                               free(mctx);
:                               rde_mrt_cnt--;
:@@ -307,9 +316,9 @@ rde_main(int debug, int verbose)
:               rde_update_queue_runner();
:               for (aid = AID_INET6; aid < AID_MAX; aid++)
:                       rde_update6_queue_runner(aid);
:-              if (rib_dump_pending() &&
:+              if (rde_dump_pending() &&
:                   ibuf_se_ctl && ibuf_se_ctl->w.queued <= 10)
:-                      rib_dump_runner();
:+                      rde_dump_runner();
:       }
: 
:       /* close pipes */
:@@ -334,7 +343,6 @@ rde_main(int debug, int verbose)
:       while ((mctx = LIST_FIRST(&rde_mrts)) != NULL) {
:               msgbuf_clear(&mctx->mrt.wbuf);
:               close(mctx->mrt.wbuf.fd);
:-              LIST_REMOVE(&mctx->ribctx, entry);
:               LIST_REMOVE(mctx, entry);
:               free(mctx);
:       }
:@@ -611,6 +619,23 @@ badnet:
:                       memcpy(&verbose, imsg.data, sizeof(verbose));
:                       log_setverbose(verbose);
:                       break;
:+              case IMSG_XON:
:+                      if (imsg.hdr.peerid) {
:+                              peer = peer_get(imsg.hdr.peerid);
:+                              if (peer)
:+                                      peer->throttled = 0;
:+                              break;
:+                      } else
:+                              rde_dump_ctx_throttle(imsg.hdr.pid, 0);
:+                      break;
:+              case IMSG_XOFF:
:+                      if (imsg.hdr.peerid) {
:+                              peer = peer_get(imsg.hdr.peerid);
:+                              if (peer)
:+                                      peer->throttled = 1;
:+                      } else
:+                              rde_dump_ctx_throttle(imsg.hdr.pid, 1);
:+                      break;
:               default:
:                       break;
:               }
:@@ -737,7 +762,7 @@ rde_dispatch_imsg_parent(struct imsgbuf 
:                                */
:                               in_rules = ribd->in_rules;
:                               ribd->in_rules = NULL;
:-                              rib_free(rib);
:+                              rde_rib_free(ribd);
:                               rib = rib_new(rn.name, rn.rtableid, rn.flags);
:                               ribd->in_rules = in_rules;
:                       } else
:@@ -2356,7 +2381,7 @@ rde_dump_ctx_new(struct ctl_show_rib_req
:       memcpy(&ctx->req, req, sizeof(struct ctl_show_rib_request));
:       ctx->req.pid = pid;
:       ctx->req.type = type;
:-      ctx->ribctx.ctx_count = RDE_RUNNER_ROUNDS;
:+      ctx->ribctx.ctx_count = CTL_MSG_HIGH_MARK;
:       ctx->ribctx.ctx_rib = rib;
:       switch (ctx->req.type) {
:       case IMSG_CTL_SHOW_NETWORK:
:@@ -2398,20 +2423,72 @@ rde_dump_ctx_new(struct ctl_show_rib_req
:       ctx->ribctx.ctx_done = rde_dump_done;
:       ctx->ribctx.ctx_arg = ctx;
:       ctx->ribctx.ctx_aid = ctx->req.aid;
:+      LIST_INSERT_HEAD(&rde_dump_h, ctx, entry);
:       rib_dump_r(&ctx->ribctx);
: }
: 
: void
:+rde_dump_ctx_throttle(pid_t pid, int throttle)
:+{
:+      struct rde_dump_ctx     *ctx;
:+
:+      LIST_FOREACH(ctx, &rde_dump_h, entry) {
:+              if (ctx->req.pid == pid) {
:+                      ctx->throttled = throttle;
:+                      return;
:+              }
:+      }
:+}
:+
:+void
:+rde_dump_runner(void)
:+{
:+      struct rde_dump_ctx     *ctx, *next;
:+
:+      for (ctx = LIST_FIRST(&rde_dump_h); ctx != NULL; ctx = next) {
:+              next = LIST_NEXT(ctx, entry);
:+              if (!ctx->throttled)
:+                      rib_dump_r(&ctx->ribctx);
:+      }
:+}
:+
:+int
:+rde_dump_pending(void)
:+{
:+      struct rde_dump_ctx     *ctx;
:+
:+      /* return true if there is at least one unthrottled context */
:+      LIST_FOREACH(ctx, &rde_dump_h, entry)
:+              if (!ctx->throttled)
:+                      return (1);
:+
:+      return (0);
:+}
:+
:+void
: rde_dump_done(void *arg)
: {
:       struct rde_dump_ctx     *ctx = arg;
: 
:       imsg_compose(ibuf_se_ctl, IMSG_CTL_END, 0, ctx->req.pid,
:           -1, NULL, 0);
:+      LIST_REMOVE(ctx, entry);
:       free(ctx);
: }
: 
: void
:+rde_dump_rib_free(struct rib *rib)
:+{
:+      struct rde_dump_ctx     *ctx, *next;
:+
:+      for (ctx = LIST_FIRST(&rde_dump_h); ctx != NULL; ctx = next) {
:+              next = LIST_NEXT(ctx, entry);
:+              if (ctx->ribctx.ctx_rib == rib)
:+                      rde_dump_done(ctx);
:+      }
:+}
:+
:+void
: rde_dump_mrt_new(struct mrt *mrt, pid_t pid, int fd)
: {
:       struct rde_mrt_ctx      *ctx;
:@@ -2435,7 +2512,7 @@ rde_dump_mrt_new(struct mrt *mrt, pid_t 
:       if (ctx->mrt.type == MRT_TABLE_DUMP_V2)
:               mrt_dump_v2_hdr(&ctx->mrt, conf, &peerlist);
: 
:-      ctx->ribctx.ctx_count = RDE_RUNNER_ROUNDS;
:+      ctx->ribctx.ctx_count = CTL_MSG_HIGH_MARK;
:       ctx->ribctx.ctx_rib = rib;
:       ctx->ribctx.ctx_upcall = mrt_dump_upcall;
:       ctx->ribctx.ctx_done = mrt_done;
:@@ -2446,6 +2523,28 @@ rde_dump_mrt_new(struct mrt *mrt, pid_t 
:       rib_dump_r(&ctx->ribctx);
: }
: 
:+void
:+rde_dump_mrt_free(struct rib *rib)
:+{
:+      struct rde_mrt_ctx      *ctx, *next;
:+
:+      for (ctx = LIST_FIRST(&rde_mrts); ctx != NULL; ctx = next) {
:+              next = LIST_NEXT(ctx, entry);
:+              if (ctx->ribctx.ctx_rib == rib)
:+                      mrt_done(&ctx->mrt);
:+      }
:+}
:+
:+void
:+rde_rib_free(struct rib_desc *rd)
:+{
:+      /* abort pending rib_dumps */
:+      rde_dump_rib_free(&rd->rib);
:+      rde_dump_mrt_free(&rd->rib);
:+
:+      rib_free(&rd->rib);
:+}
:+
: /*
:  * kroute specific functions
:  */
:@@ -2847,7 +2946,7 @@ rde_reload_done(void)
: 
:               switch (ribs[rid].state) {
:               case RECONF_DELETE:
:-                      rib_free(&ribs[rid].rib);
:+                      rde_rib_free(&ribs[rid]);
:                       break;
:               case RECONF_KEEP:
:                       if (rde_filter_equal(ribs[rid].in_rules,
:Index: rde.h
:===================================================================
:RCS file: /cvs/src/usr.sbin/bgpd/rde.h,v
:retrieving revision 1.160
:diff -u -p -r1.160 rde.h
:--- rde.h      25 Jan 2017 03:21:55 -0000      1.160
:+++ rde.h      25 Jan 2017 04:22:34 -0000
:@@ -81,6 +81,7 @@ struct rde_peer {
:       u_int16_t                        mrt_idx;
:       u_int8_t                         reconf_out;    /* out filter changed */
:       u_int8_t                         reconf_rib;    /* rib changed */
:+      u_int8_t                         throttled;
: };
: 
: #define AS_SET                        1
:@@ -267,7 +268,6 @@ struct pt_entry_vpn4 {
: };
: 
: struct rib_context {
:-      LIST_ENTRY(rib_context)          entry;
:       struct rib_entry                *ctx_re;
:       struct rib                      *ctx_rib;
:       void            (*ctx_upcall)(struct rib_entry *, void *);
:@@ -436,8 +436,6 @@ struct rib_entry *rib_lookup(struct rib 
: void           rib_dump(struct rib *, void (*)(struct rib_entry *, void *),
:                    void *, u_int8_t);
: void           rib_dump_r(struct rib_context *);
:-void           rib_dump_runner(void);
:-int            rib_dump_pending(void);
: 
: static inline struct rib *
: re_rib(struct rib_entry *re)
:Index: rde_rib.c
:===================================================================
:RCS file: /cvs/src/usr.sbin/bgpd/rde_rib.c,v
:retrieving revision 1.153
:diff -u -p -r1.153 rde_rib.c
:--- rde_rib.c  25 Jan 2017 03:21:55 -0000      1.153
:+++ rde_rib.c  26 May 2017 18:08:29 -0000
:@@ -38,8 +38,6 @@
: u_int16_t rib_size;
: struct rib_desc *ribs;
: 
:-LIST_HEAD(, rib_context) rib_dump_h = LIST_HEAD_INITIALIZER(rib_dump_h);
:-
: struct rib_entry *rib_add(struct rib *, struct bgpd_addr *, int);
: int rib_compare(const struct rib_entry *, const struct rib_entry *);
: void rib_remove(struct rib_entry *);
:@@ -136,25 +134,10 @@ rib_desc(struct rib *rib)
: void
: rib_free(struct rib *rib)
: {
:-      struct rib_context *ctx, *next;
:       struct rib_desc *rd;
:       struct rib_entry *re, *xre;
:       struct prefix *p, *np;
: 
:-      /* abort pending rib_dumps */
:-      for (ctx = LIST_FIRST(&rib_dump_h); ctx != NULL; ctx = next) {
:-              next = LIST_NEXT(ctx, entry);
:-              if (ctx->ctx_rib == rib) {
:-                      re = ctx->ctx_re;
:-                      re_unlock(re);
:-                      LIST_REMOVE(ctx, entry);
:-                      if (ctx->ctx_done)
:-                              ctx->ctx_done(ctx->ctx_arg);
:-                      else
:-                              free(ctx);
:-              }
:-      }
:-
:       for (re = RB_MIN(rib_tree, rib_tree(rib)); re != NULL; re = xre) {
:               xre = RB_NEXT(rib_tree, rib_tree(rib), re);
: 
:@@ -311,10 +294,9 @@ rib_dump_r(struct rib_context *ctx)
:       struct rib_entry        *re;
:       unsigned int             i;
: 
:-      if (ctx->ctx_re == NULL) {
:+      if (ctx->ctx_re == NULL)
:               re = RB_MIN(rib_tree, rib_tree(ctx->ctx_rib));
:-              LIST_INSERT_HEAD(&rib_dump_h, ctx, entry);
:-      } else
:+      else
:               re = rib_restart(ctx);
: 
:       for (i = 0; re != NULL; re = RB_NEXT(rib_tree, unused, re)) {
:@@ -322,7 +304,7 @@ rib_dump_r(struct rib_context *ctx)
:                   ctx->ctx_aid != re->prefix->aid)
:                       continue;
:               if (ctx->ctx_count && i++ >= ctx->ctx_count &&
:-                  re_is_locked(re)) {
:+                  !re_is_locked(re)) {
:                       /* store and lock last element */
:                       ctx->ctx_re = re;
:                       re_lock(re);
:@@ -331,7 +313,6 @@ rib_dump_r(struct rib_context *ctx)
:               ctx->ctx_upcall(re, ctx->ctx_arg);
:       }
: 
:-      LIST_REMOVE(ctx, entry);
:       if (ctx->ctx_done)
:               ctx->ctx_done(ctx->ctx_arg);
:       else
:@@ -355,23 +336,6 @@ rib_restart(struct rib_context *ctx)
:               rib_remove(ctx->ctx_re);
:       ctx->ctx_re = NULL;
:       return (re);
:-}
:-
:-void
:-rib_dump_runner(void)
:-{
:-      struct rib_context      *ctx, *next;
:-
:-      for (ctx = LIST_FIRST(&rib_dump_h); ctx != NULL; ctx = next) {
:-              next = LIST_NEXT(ctx, entry);
:-              rib_dump_r(ctx);
:-      }
:-}
:-
:-int
:-rib_dump_pending(void)
:-{
:-      return (!LIST_EMPTY(&rib_dump_h));
: }
: 
: /* used to bump correct prefix counters */
:Index: session.c
:===================================================================
:RCS file: /cvs/src/usr.sbin/bgpd/session.c,v
:retrieving revision 1.359
:diff -u -p -r1.359 session.c
:--- session.c  13 Feb 2017 14:48:44 -0000      1.359
:+++ session.c  26 May 2017 18:58:46 -0000
:@@ -195,7 +195,6 @@ session_main(int debug, int verbose)
:       u_int                    pfd_elms = 0, peer_l_elms = 0, mrt_l_elms = 0;
:       u_int                    listener_cnt, ctl_cnt, mrt_cnt;
:       u_int                    new_cnt;
:-      u_int32_t                ctl_queued;
:       struct passwd           *pw;
:       struct peer             *p, **peer_l = NULL, *last, *next;
:       struct mrt              *m, *xm, **mrt_l = NULL;
:@@ -356,17 +355,7 @@ session_main(int debug, int verbose)
: 
:               set_pollfd(&pfd[PFD_PIPE_MAIN], ibuf_main);
:               set_pollfd(&pfd[PFD_PIPE_ROUTE], ibuf_rde);
:-
:-              ctl_queued = 0;
:-              TAILQ_FOREACH(ctl_conn, &ctl_conns, entry)
:-                      ctl_queued += ctl_conn->ibuf.w.queued;
:-
:-              /*
:-               * Do not act as unlimited buffer. Don't read in more
:-               * messages if the ctl sockets are getting full.
:-               */
:-              if (ctl_queued < SESSION_CTL_QUEUE_MAX)
:-                      set_pollfd(&pfd[PFD_PIPE_ROUTE_CTL], ibuf_rde_ctl);
:+              set_pollfd(&pfd[PFD_PIPE_ROUTE_CTL], ibuf_rde_ctl);
: 
:               if (pauseaccept == 0) {
:                       pfd[PFD_SOCK_CTL].fd = csock;
:@@ -1389,6 +1378,13 @@ session_sendmsg(struct bgp_msg *msg, str
:       }
: 
:       ibuf_close(&p->wbuf, msg->buf);
:+      if (!p->throttled && p->wbuf.queued > SESS_MSG_HIGH_MARK) {
:+              if (imsg_compose(ibuf_rde, IMSG_XOFF, p->conf.id, 0, -1,
:+                  NULL, 0) == -1)
:+                      log_peer_warn(&p->conf, "imsg_compose XOFF");
:+              p->throttled = 1;
:+      }
:+
:       free(msg);
:       return (0);
: }
:@@ -1773,6 +1769,12 @@ session_dispatch_msg(struct pollfd *pfd,
:                               log_peer_warn(&p->conf, "write error");
:                       bgp_fsm(p, EVNT_CON_FATAL);
:                       return (1);
:+              }
:+              if (p->throttled && p->wbuf.queued < SESS_MSG_LOW_MARK) {
:+                      if (imsg_compose(ibuf_rde, IMSG_XON, p->conf.id, 0, -1,
:+                          NULL, 0) == -1)
:+                              log_peer_warn(&p->conf, "imsg_compose XON");
:+                      p->throttled = 0;
:               }
:               if (!(pfd->revents & POLLIN))
:                       return (1);
:Index: session.h
:===================================================================
:RCS file: /cvs/src/usr.sbin/bgpd/session.h,v
:retrieving revision 1.122
:diff -u -p -r1.122 session.h
:--- session.h  13 Jan 2017 18:59:12 -0000      1.122
:+++ session.h  19 Jan 2017 23:32:14 -0000
:@@ -142,6 +142,7 @@ struct ctl_conn {
:       TAILQ_ENTRY(ctl_conn)   entry;
:       struct imsgbuf          ibuf;
:       int                     restricted;
:+      int                     throttled;
: };
: 
: TAILQ_HEAD(ctl_conns, ctl_conn)       ctl_conns;
:@@ -225,6 +226,7 @@ struct peer {
:       u_int8_t                 depend_ok;
:       u_int8_t                 demoted;
:       u_int8_t                 passive;
:+      u_int8_t                 throttled;
: };
: 
: extern struct peer    *peers;
:

-- 
Pardon this fortune.  Database under reconstruction.

Reply via email to