Re: Throttle bgpd's RDE if the client is slow

2017-05-27 Thread Peter Hessler
This improves our AMS-IX border router (200 configured sessions, plus
Route Servers) starting performance.

>From 45 minutes to bring everyone up, to 3 minutes, and bgpctl stays
acceptably responsive during the thundering herd.  Memory pressure is
also far more relaxed.

OK


On 2017 May 26 (Fri) at 21:27:46 +0200 (+0200), Claudio Jeker wrote:
:Both bgpctl and bgp neighbors are often not fast enough to keep up with
:the RDE. The result is quite a bit of memory bloat or some ugly
:workarounds for bgpctl which can result in starving other bgpctl calls to
:death.
:
:This implements a simple XON / XOFF protocol for peers and control
:sessions and helps reducing the pain on busy boxes. It is a first step.
:There is still some major changes needed to reduce the update overhead
:seen when many session start up at the same time.
:
:I would love to hear from people with larger setups if there are any
:problems.
:-- 
::wq Claudio
:
:
:Index: bgpd.h
:===
:RCS file: /cvs/src/usr.sbin/bgpd/bgpd.h,v
:retrieving revision 1.300
:diff -u -p -r1.300 bgpd.h
:--- bgpd.h 25 Jan 2017 00:11:07 -  1.300
:+++ bgpd.h 25 Jan 2017 04:22:34 -
:@@ -87,13 +87,17 @@
: #define   F_RTLABEL   0x1
: 
: /*
:- * Limit the number of control messages generated by the RDE and queued in
:- * session engine. The RDE limit defines how many imsg are generated in
:- * one poll round. Then if the SE limit is hit the RDE control socket will no
:- * longer be polled.
:+ * Limit the number of messages queued in the session engine.
:+ * The SE will send an IMSG_XOFF messages to the RDE if the high water mark
:+ * is reached. The RDE should then throttle this peer or control connection.
:+ * Once the message queue in the SE drops below the low water mark an
:+ * IMSG_XON message will be sent and the RDE will produce more messages again.
:  */
: #define RDE_RUNNER_ROUNDS 100
:-#define SESSION_CTL_QUEUE_MAX 1
:+#define SESS_MSG_HIGH_MARK300
:+#define SESS_MSG_LOW_MARK 50
:+#define CTL_MSG_HIGH_MARK 500
:+#define CTL_MSG_LOW_MARK  100
: 
: enum bgpd_process {
:   PROC_MAIN,
:@@ -425,7 +429,9 @@ enum imsg_type {
:   IMSG_PFTABLE_COMMIT,
:   IMSG_REFRESH,
:   IMSG_IFINFO,
:-  IMSG_DEMOTE
:+  IMSG_DEMOTE,
:+  IMSG_XON,
:+  IMSG_XOFF
: };
: 
: struct demote_msg {
:Index: control.c
:===
:RCS file: /cvs/src/usr.sbin/bgpd/control.c,v
:retrieving revision 1.87
:diff -u -p -r1.87 control.c
:--- control.c  13 Feb 2017 14:48:44 -  1.87
:+++ control.c  16 Feb 2017 19:20:23 -
:@@ -213,11 +213,16 @@ control_dispatch_msg(struct pollfd *pfd,
:   return (0);
:   }
: 
:-  if (pfd->revents & POLLOUT)
:+  if (pfd->revents & POLLOUT) {
:   if (msgbuf_write(&c->ibuf.w) <= 0 && errno != EAGAIN) {
:   *ctl_cnt -= control_close(pfd->fd);
:   return (1);
:   }
:+  if (c->throttled && c->ibuf.w.queued < CTL_MSG_LOW_MARK) {
:+  if (imsg_ctl_rde(IMSG_XON, c->ibuf.pid, NULL, 0) != -1)
:+  c->throttled = 0;
:+  }
:+  }
: 
:   if (!(pfd->revents & POLLIN))
:   return (0);
:@@ -521,6 +526,11 @@ control_imsg_relay(struct imsg *imsg)
: 
:   if ((c = control_connbypid(imsg->hdr.pid)) == NULL)
:   return (0);
:+
:+  if (!c->throttled && c->ibuf.w.queued > CTL_MSG_HIGH_MARK) {
:+  if (imsg_ctl_rde(IMSG_XOFF, imsg->hdr.pid, NULL, 0) != -1)
:+  c->throttled = 1;
:+  }
: 
:   return (imsg_compose(&c->ibuf, imsg->hdr.type, 0, imsg->hdr.pid, -1,
:   imsg->data, imsg->hdr.len - IMSG_HEADER_SIZE));
:Index: rde.c
:===
:RCS file: /cvs/src/usr.sbin/bgpd/rde.c,v
:retrieving revision 1.361
:diff -u -p -r1.361 rde.c
:--- rde.c  25 Jan 2017 03:21:55 -  1.361
:+++ rde.c  26 May 2017 18:57:51 -
:@@ -76,7 +76,7 @@ void  rde_update_log(const char *, u_in
: void   rde_as4byte_fixup(struct rde_peer *, struct rde_aspath *);
: void   rde_reflector(struct rde_peer *, struct rde_aspath *);
: 
:-void   rde_dump_rib_as(struct prefix *, struct rde_aspath *,pid_t,
:+void   rde_dump_rib_as(struct prefix *, struct rde_aspath *, pid_t,
:int);
: void   rde_dump_filter(struct prefix *,
:struct ctl_show_rib_request *);
:@@ -86,8 +86,14 @@ void rde_dump_upcall(struct rib_entry 
: void   rde_dump_prefix_upcall(struct rib_entry *, void *);
: void   rde_dump_ctx_new(struct ctl_show_rib_request *, pid_t,
:enum imsg_type);
:-void   rde_dump_mrt_new(struct mrt *, pid_t, int);
:+void   rde_dump_ctx_throttle(pid_t pid, int thrott

Throttle bgpd's RDE if the client is slow

2017-05-26 Thread Claudio Jeker
Both bgpctl and bgp neighbors are often not fast enough to keep up with
the RDE. The result is quite a bit of memory bloat or some ugly
workarounds for bgpctl which can result in starving other bgpctl calls to
death.

This implements a simple XON / XOFF protocol for peers and control
sessions and helps reducing the pain on busy boxes. It is a first step.
There is still some major changes needed to reduce the update overhead
seen when many session start up at the same time.

I would love to hear from people with larger setups if there are any
problems.
-- 
:wq Claudio


Index: bgpd.h
===
RCS file: /cvs/src/usr.sbin/bgpd/bgpd.h,v
retrieving revision 1.300
diff -u -p -r1.300 bgpd.h
--- bgpd.h  25 Jan 2017 00:11:07 -  1.300
+++ bgpd.h  25 Jan 2017 04:22:34 -
@@ -87,13 +87,17 @@
 #defineF_RTLABEL   0x1
 
 /*
- * Limit the number of control messages generated by the RDE and queued in
- * session engine. The RDE limit defines how many imsg are generated in
- * one poll round. Then if the SE limit is hit the RDE control socket will no
- * longer be polled.
+ * Limit the number of messages queued in the session engine.
+ * The SE will send an IMSG_XOFF messages to the RDE if the high water mark
+ * is reached. The RDE should then throttle this peer or control connection.
+ * Once the message queue in the SE drops below the low water mark an
+ * IMSG_XON message will be sent and the RDE will produce more messages again.
  */
 #define RDE_RUNNER_ROUNDS  100
-#define SESSION_CTL_QUEUE_MAX  1
+#define SESS_MSG_HIGH_MARK 300
+#define SESS_MSG_LOW_MARK  50
+#define CTL_MSG_HIGH_MARK  500
+#define CTL_MSG_LOW_MARK   100
 
 enum bgpd_process {
PROC_MAIN,
@@ -425,7 +429,9 @@ enum imsg_type {
IMSG_PFTABLE_COMMIT,
IMSG_REFRESH,
IMSG_IFINFO,
-   IMSG_DEMOTE
+   IMSG_DEMOTE,
+   IMSG_XON,
+   IMSG_XOFF
 };
 
 struct demote_msg {
Index: control.c
===
RCS file: /cvs/src/usr.sbin/bgpd/control.c,v
retrieving revision 1.87
diff -u -p -r1.87 control.c
--- control.c   13 Feb 2017 14:48:44 -  1.87
+++ control.c   16 Feb 2017 19:20:23 -
@@ -213,11 +213,16 @@ control_dispatch_msg(struct pollfd *pfd,
return (0);
}
 
-   if (pfd->revents & POLLOUT)
+   if (pfd->revents & POLLOUT) {
if (msgbuf_write(&c->ibuf.w) <= 0 && errno != EAGAIN) {
*ctl_cnt -= control_close(pfd->fd);
return (1);
}
+   if (c->throttled && c->ibuf.w.queued < CTL_MSG_LOW_MARK) {
+   if (imsg_ctl_rde(IMSG_XON, c->ibuf.pid, NULL, 0) != -1)
+   c->throttled = 0;
+   }
+   }
 
if (!(pfd->revents & POLLIN))
return (0);
@@ -521,6 +526,11 @@ control_imsg_relay(struct imsg *imsg)
 
if ((c = control_connbypid(imsg->hdr.pid)) == NULL)
return (0);
+
+   if (!c->throttled && c->ibuf.w.queued > CTL_MSG_HIGH_MARK) {
+   if (imsg_ctl_rde(IMSG_XOFF, imsg->hdr.pid, NULL, 0) != -1)
+   c->throttled = 1;
+   }
 
return (imsg_compose(&c->ibuf, imsg->hdr.type, 0, imsg->hdr.pid, -1,
imsg->data, imsg->hdr.len - IMSG_HEADER_SIZE));
Index: rde.c
===
RCS file: /cvs/src/usr.sbin/bgpd/rde.c,v
retrieving revision 1.361
diff -u -p -r1.361 rde.c
--- rde.c   25 Jan 2017 03:21:55 -  1.361
+++ rde.c   26 May 2017 18:57:51 -
@@ -76,7 +76,7 @@ void   rde_update_log(const char *, u_in
 voidrde_as4byte_fixup(struct rde_peer *, struct rde_aspath *);
 voidrde_reflector(struct rde_peer *, struct rde_aspath *);
 
-voidrde_dump_rib_as(struct prefix *, struct rde_aspath *,pid_t,
+voidrde_dump_rib_as(struct prefix *, struct rde_aspath *, pid_t,
 int);
 voidrde_dump_filter(struct prefix *,
 struct ctl_show_rib_request *);
@@ -86,8 +86,14 @@ void  rde_dump_upcall(struct rib_entry 
 voidrde_dump_prefix_upcall(struct rib_entry *, void *);
 voidrde_dump_ctx_new(struct ctl_show_rib_request *, pid_t,
 enum imsg_type);
-voidrde_dump_mrt_new(struct mrt *, pid_t, int);
+voidrde_dump_ctx_throttle(pid_t pid, int throttle);
+voidrde_dump_runner(void);
+int rde_dump_pending(void);
 voidrde_dump_done(void *);
+voidrde_dump_mrt_new(struct mrt *, pid_t, int);
+voidrde_dump_rib_free(struct rib *);
+voidrde_dump_mrt_free(struct rib *);
+voidrde_rib_free(struct rib_desc *);
 
 int rde_rdomain_import(struct rde_aspath *, struct rdomain *