Hi,
relayd suffers from the same issue that has been fixed in httpd:
The new fork+exec mode uses too many fds in the parent process on
startup, for a short time, so we needed a rlimit hack in relayd.c.
rzalamena@ has fixed proc.c and I added the proc_flush_imsg()
mechanism in httpd that makes sure that each fd is immediately closed
after forwarding it to a child process instead of queueing it up.
Sync both changes from httpd, now I can open many relays with many fds.
OK?
Reyk
Index: usr.sbin/relayd/config.c
===================================================================
RCS file: /cvs/src/usr.sbin/relayd/config.c,v
retrieving revision 1.30
diff -u -p -u -p -r1.30 config.c
--- usr.sbin/relayd/config.c 2 Sep 2016 14:45:51 -0000 1.30
+++ usr.sbin/relayd/config.c 20 Nov 2016 11:45:18 -0000
@@ -827,11 +827,30 @@ config_setrelay(struct relayd *env, stru
for (n = 0; n < m; n++) {
if ((fd = dup(rlay->rl_s)) == -1)
return (-1);
- proc_composev_imsg(ps, id, n,
- IMSG_CFG_RELAY, -1, fd, iov, c);
+ if (proc_composev_imsg(ps, id, n,
+ IMSG_CFG_RELAY, -1, fd, iov, c) != 0) {
+ log_warn("%s: failed to compose "
+ "IMSG_CFG_RELAY imsg for `%s'",
+ __func__, rlay->rl_conf.name);
+ return (-1);
+ }
+
+ /* Prevent fd exhaustion in the parent. */
+ if (proc_flush_imsg(ps, id, n) == -1) {
+ log_warn("%s: failed to flush "
+ "IMSG_CFG_RELAY imsg for `%s'",
+ __func__, rlay->rl_conf.name);
+ return (-1);
+ }
}
} else {
- proc_composev(ps, id, IMSG_CFG_RELAY, iov, c);
+ if (proc_composev(ps, id,
+ IMSG_CFG_RELAY, iov, c) != 0) {
+ log_warn("%s: failed to compose "
+ "IMSG_CFG_RELAY imsg for `%s'",
+ __func__, rlay->rl_conf.name);
+ return (-1);
+ }
}
if ((what & CONFIG_TABLES) == 0)
@@ -852,8 +871,11 @@ config_setrelay(struct relayd *env, stru
}
}
- close(rlay->rl_s);
- rlay->rl_s = -1;
+ /* Close server socket early to prevent fd exhaustion in the parent. */
+ if (rlay->rl_s != -1) {
+ close(rlay->rl_s);
+ rlay->rl_s = -1;
+ }
return (0);
}
Index: usr.sbin/relayd/proc.c
===================================================================
RCS file: /cvs/src/usr.sbin/relayd/proc.c,v
retrieving revision 1.36
diff -u -p -u -p -r1.36 proc.c
--- usr.sbin/relayd/proc.c 5 Oct 2016 17:31:28 -0000 1.36
+++ usr.sbin/relayd/proc.c 20 Nov 2016 11:45:18 -0000
@@ -1,4 +1,4 @@
-/* $OpenBSD: proc.c,v 1.36 2016/10/05 17:31:28 rzalamena Exp $ */
+/* $OpenBSD: proc.c,v 1.34 2016/10/12 10:57:30 reyk Exp $ */
/*
* Copyright (c) 2010 - 2016 Reyk Floeter <[email protected]>
@@ -37,8 +37,6 @@
void proc_exec(struct privsep *, struct privsep_proc *, unsigned int,
int, char **);
-void proc_connectpeer(struct privsep *, enum privsep_procid, int,
- struct privsep_pipes *);
void proc_setup(struct privsep *, struct privsep_proc *, unsigned int);
void proc_open(struct privsep *, int, int);
void proc_accept(struct privsep *, int, enum privsep_procid,
@@ -157,72 +155,38 @@ proc_exec(struct privsep *ps, struct pri
}
void
-proc_connectpeer(struct privsep *ps, enum privsep_procid id, int inst,
- struct privsep_pipes *pp)
-{
- unsigned int i, j;
- struct privsep_fd pf;
-
- for (i = 0; i < PROC_MAX; i++) {
- /* Parent is already connected with everyone. */
- if (i == PROC_PARENT)
- continue;
-
- for (j = 0; j < ps->ps_instances[i]; j++) {
- /* Don't send socket to child itself. */
- if (i == (unsigned int)id &&
- j == (unsigned int)inst)
- continue;
- if (pp->pp_pipes[i][j] == -1)
- continue;
-
- pf.pf_procid = i;
- pf.pf_instance = j;
- proc_compose_imsg(ps, id, inst, IMSG_CTL_PROCFD,
- -1, pp->pp_pipes[i][j], &pf, sizeof(pf));
- pp->pp_pipes[i][j] = -1;
- }
- }
-}
-
-/* Inter-connect all process except with ourself. */
-void
proc_connect(struct privsep *ps)
{
- unsigned int src, i, j;
- struct privsep_pipes *pp;
struct imsgev *iev;
+ unsigned int src, dst, inst;
- /* Listen on appropriate pipes. */
- src = privsep_process;
- pp = &ps->ps_pipes[src][ps->ps_instance];
-
- for (i = 0; i < PROC_MAX; i++) {
- /* Don't listen to ourself. */
- if (i == src)
- continue;
+ /* Don't distribute any sockets if we are not really going to run. */
+ if (ps->ps_noaction)
+ return;
- for (j = 0; j < ps->ps_instances[i]; j++) {
- if (pp->pp_pipes[i][j] == -1)
- continue;
+ for (dst = 0; dst < PROC_MAX; dst++) {
+ /* We don't communicate with ourselves. */
+ if (dst == PROC_PARENT)
+ continue;
- iev = &ps->ps_ievs[i][j];
- imsg_init(&iev->ibuf, pp->pp_pipes[i][j]);
+ for (inst = 0; inst < ps->ps_instances[dst]; inst++) {
+ iev = &ps->ps_ievs[dst][inst];
+ imsg_init(&iev->ibuf, ps->ps_pp->pp_pipes[dst][inst]);
event_set(&iev->ev, iev->ibuf.fd, iev->events,
iev->handler, iev->data);
event_add(&iev->ev, NULL);
}
}
- /* Exchange pipes between process. */
- for (i = 0; i < PROC_MAX; i++) {
- /* Parent is already connected with everyone. */
- if (i == PROC_PARENT)
- continue;
+ /* Distribute the socketpair()s for everyone. */
+ for (src = 0; src < PROC_MAX; src++)
+ for (dst = src; dst < PROC_MAX; dst++) {
+ /* Parent already distributed its fds. */
+ if (src == PROC_PARENT || dst == PROC_PARENT)
+ continue;
- for (j = 0; j < ps->ps_instances[i]; j++)
- proc_connectpeer(ps, i, j, &ps->ps_pipes[i][j]);
- }
+ proc_open(ps, src, dst);
+ }
}
void
@@ -230,17 +194,41 @@ proc_init(struct privsep *ps, struct pri
int argc, char **argv, enum privsep_procid proc_id)
{
struct privsep_proc *p = NULL;
+ struct privsep_pipes *pa, *pb;
unsigned int proc;
- unsigned int src, dst;
+ unsigned int dst;
+ int fds[2];
+
+ /* Don't initiate anything if we are not really going to run. */
+ if (ps->ps_noaction)
+ return;
if (proc_id == PROC_PARENT) {
privsep_process = PROC_PARENT;
proc_setup(ps, procs, nproc);
- /* Open socketpair()s for everyone. */
- for (src = 0; src < PROC_MAX; src++)
- for (dst = 0; dst < PROC_MAX; dst++)
- proc_open(ps, src, dst);
+ /*
+ * Create the children sockets so we can use them
+ * to distribute the rest of the socketpair()s using
+ * proc_connect() later.
+ */
+ for (dst = 0; dst < PROC_MAX; dst++) {
+ /* Don't create socket for ourselves. */
+ if (dst == PROC_PARENT)
+ continue;
+
+ for (proc = 0; proc < ps->ps_instances[dst]; proc++) {
+ pa = &ps->ps_pipes[PROC_PARENT][0];
+ pb = &ps->ps_pipes[dst][proc];
+ if (socketpair(AF_UNIX,
+ SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC,
+ PF_UNSPEC, fds) == -1)
+ fatal("%s: socketpair", __func__);
+
+ pa->pp_pipes[dst][proc] = fds[0];
+ pb->pp_pipes[PROC_PARENT][0] = fds[1];
+ }
+ }
/* Engage! */
proc_exec(ps, procs, nproc, argc, argv);
@@ -317,7 +305,7 @@ proc_setup(struct privsep *ps, struct pr
ps->ps_title[id] = procs[src].p_title;
if ((ps->ps_ievs[id] = calloc(ps->ps_instances[id],
sizeof(struct imsgev))) == NULL)
- fatal(__func__);
+ fatal("%s: calloc", __func__);
/* With this set up, we are ready to call imsg_init(). */
for (i = 0; i < ps->ps_instances[id]; i++) {
@@ -409,9 +397,11 @@ void
proc_open(struct privsep *ps, int src, int dst)
{
struct privsep_pipes *pa, *pb;
+ struct privsep_fd pf;
int fds[2];
unsigned int i, j;
+ /* Exchange pipes between process. */
for (i = 0; i < ps->ps_instances[src]; i++) {
for (j = 0; j < ps->ps_instances[dst]; j++) {
/* Don't create sockets for ourself. */
@@ -420,16 +410,33 @@ proc_open(struct privsep *ps, int src, i
pa = &ps->ps_pipes[src][i];
pb = &ps->ps_pipes[dst][j];
- if (pb->pp_pipes[dst][j] != -1)
- continue;
-
if (socketpair(AF_UNIX,
SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC,
PF_UNSPEC, fds) == -1)
- fatal(__func__);
+ fatal("%s: socketpair", __func__);
pa->pp_pipes[dst][j] = fds[0];
pb->pp_pipes[src][i] = fds[1];
+
+ pf.pf_procid = src;
+ pf.pf_instance = i;
+ if (proc_compose_imsg(ps, dst, j, IMSG_CTL_PROCFD,
+ -1, pb->pp_pipes[src][i], &pf, sizeof(pf)) == -1)
+ fatal("%s: proc_compose_imsg", __func__);
+
+ pf.pf_procid = dst;
+ pf.pf_instance = j;
+ if (proc_compose_imsg(ps, src, i, IMSG_CTL_PROCFD,
+ -1, pa->pp_pipes[dst][j], &pf, sizeof(pf)) == -1)
+ fatal("%s: proc_compose_imsg", __func__);
+
+ /*
+ * We have to flush to send the descriptors and close
+ * them to avoid the fd ramp on startup.
+ */
+ if (proc_flush_imsg(ps, src, i) == -1 ||
+ proc_flush_imsg(ps, dst, j) == -1)
+ fatal("%s: imsg_flush", __func__);
}
}
}
@@ -512,9 +519,6 @@ proc_run(struct privsep *ps, struct priv
const char *root;
struct control_sock *rcs;
- if (ps->ps_noaction)
- exit(0);
-
log_procinit(p->p_title);
/* Set the process group of the current process */
@@ -522,10 +526,10 @@ proc_run(struct privsep *ps, struct priv
if (p->p_id == PROC_CONTROL && ps->ps_instance == 0) {
if (control_init(ps, &ps->ps_csock) == -1)
- fatalx(__func__);
+ fatalx("%s: control_init", __func__);
TAILQ_FOREACH(rcs, &ps->ps_rcsocks, cs_entry)
if (control_init(ps, rcs) == -1)
- fatalx(__func__);
+ fatalx("%s: control_init", __func__);
}
/* Use non-standard user */
@@ -575,10 +579,10 @@ proc_run(struct privsep *ps, struct priv
if (p->p_id == PROC_CONTROL && ps->ps_instance == 0) {
TAILQ_INIT(&ctl_conns);
if (control_listen(&ps->ps_csock) == -1)
- fatalx(__func__);
+ fatalx("%s: control_listen", __func__);
TAILQ_FOREACH(rcs, &ps->ps_rcsocks, cs_entry)
if (control_listen(rcs) == -1)
- fatalx(__func__);
+ fatalx("%s: control_listen", __func__);
}
DPRINTF("%s: %s %d/%d, pid %d", __func__, p->p_title,
@@ -610,7 +614,7 @@ proc_dispatch(int fd, short event, void
if (event & EV_READ) {
if ((n = imsg_read(ibuf)) == -1 && errno != EAGAIN)
- fatal(__func__);
+ fatal("%s: imsg_read", __func__);
if (n == 0) {
/* this pipe is dead, so remove the event handler */
event_del(&iev->ev);
@@ -620,13 +624,19 @@ proc_dispatch(int fd, short event, void
}
if (event & EV_WRITE) {
- if (msgbuf_write(&ibuf->w) <= 0 && errno != EAGAIN)
- fatal(__func__);
+ if ((n = msgbuf_write(&ibuf->w)) == -1 && errno != EAGAIN)
+ fatal("%s: msgbuf_write", __func__);
+ if (n == 0) {
+ /* this pipe is dead, so remove the event handler */
+ event_del(&iev->ev);
+ event_loopexit(NULL);
+ return;
+ }
}
for (;;) {
if ((n = imsg_get(ibuf, &imsg)) == -1)
- fatal(__func__);
+ fatal("%s: imsg_get", __func__);
if (n == 0)
break;
@@ -661,12 +671,11 @@ proc_dispatch(int fd, short event, void
pf.pf_instance);
break;
default:
- log_warnx("%s: %s %d got invalid imsg %d peerid %d "
+ fatalx("%s: %s %d got invalid imsg %d peerid %d "
"from %s %d",
__func__, title, ps->ps_instance + 1,
imsg.hdr.type, imsg.hdr.peerid,
p->p_title, imsg.hdr.pid);
- fatalx(__func__);
}
imsg_free(&imsg);
}
@@ -808,4 +817,26 @@ proc_iev(struct privsep *ps, enum privse
proc_range(ps, id, &n, &m);
return (&ps->ps_ievs[id][n]);
+}
+
+/* This function should only be called with care as it breaks async I/O */
+int
+proc_flush_imsg(struct privsep *ps, enum privsep_procid id, int n)
+{
+ struct imsgbuf *ibuf;
+ int m, ret = 0;
+
+ proc_range(ps, id, &n, &m);
+ for (; n < m; n++) {
+ if ((ibuf = proc_ibuf(ps, id, n)) == NULL)
+ return (-1);
+ do {
+ ret = imsg_flush(ibuf);
+ } while (ret == -1 && errno == EAGAIN);
+ if (ret == -1)
+ break;
+ imsg_event_add(&ps->ps_ievs[id][n]);
+ }
+
+ return (ret);
}
Index: usr.sbin/relayd/relayd.c
===================================================================
RCS file: /cvs/src/usr.sbin/relayd/relayd.c,v
retrieving revision 1.162
diff -u -p -u -p -r1.162 relayd.c
--- usr.sbin/relayd/relayd.c 28 Sep 2016 12:16:44 -0000 1.162
+++ usr.sbin/relayd/relayd.c 20 Nov 2016 11:45:18 -0000
@@ -214,11 +214,6 @@ main(int argc, char *argv[])
if (title != NULL)
ps->ps_title[proc_id] = title;
- if (proc_id == PROC_PARENT) {
- /* XXX the parent opens too many fds in proc_open() */
- socket_rlimit(-1);
- }
-
/* only the parent returns */
proc_init(ps, procs, nitems(procs), argc0, argv, proc_id);
Index: usr.sbin/relayd/relayd.h
===================================================================
RCS file: /cvs/src/usr.sbin/relayd/relayd.h,v
retrieving revision 1.235
diff -u -p -u -p -r1.235 relayd.h
--- usr.sbin/relayd/relayd.h 5 Oct 2016 16:58:19 -0000 1.235
+++ usr.sbin/relayd/relayd.h 20 Nov 2016 11:45:18 -0000
@@ -1373,6 +1373,7 @@ __dead void fatalx(const char *, ...)
/* proc.c */
enum privsep_procid
proc_getid(struct privsep_proc *, unsigned int, const char *);
+int proc_flush_imsg(struct privsep *, enum privsep_procid, int);
void proc_init(struct privsep *, struct privsep_proc *, unsigned int,
int, char **, enum privsep_procid);
void proc_kill(struct privsep *);