This diff changes the io read functions to work on ibufs.
With this the poll loops will consume data with io_buf_read() until a full
message is received and then that message is processed. Thanks to this
the processes no longer block while waiting for more data in the io read
functions.

With this also the blocking/nonblocking dance done in a few places is no
longer needed. Everything is now nonblocking.

Further cleanup of the various marshaling functions can follow in a later
step.
-- 
:wq Claudio

Index: cert.c
===================================================================
RCS file: /cvs/src/usr.sbin/rpki-client/cert.c,v
retrieving revision 1.39
diff -u -p -r1.39 cert.c
--- cert.c      15 Oct 2021 22:30:33 -0000      1.39
+++ cert.c      23 Oct 2021 11:58:43 -0000
@@ -1281,33 +1281,31 @@ cert_buffer(struct ibuf *b, const struct
 }
 
 static void
-cert_ip_read(int fd, struct cert_ip *p)
+cert_ip_read(struct ibuf *b, struct cert_ip *p)
 {
-
-       io_simple_read(fd, &p->afi, sizeof(enum afi));
-       io_simple_read(fd, &p->type, sizeof(enum cert_ip_type));
+       io_read_buf(b, &p->afi, sizeof(enum afi));
+       io_read_buf(b, &p->type, sizeof(enum cert_ip_type));
 
        if (p->type != CERT_IP_INHERIT) {
-               io_simple_read(fd, &p->min, sizeof(p->min));
-               io_simple_read(fd, &p->max, sizeof(p->max));
+               io_read_buf(b, &p->min, sizeof(p->min));
+               io_read_buf(b, &p->max, sizeof(p->max));
        }
 
        if (p->type == CERT_IP_RANGE)
-               ip_addr_range_read(fd, &p->range);
+               ip_addr_range_read(b, &p->range);
        else if (p->type == CERT_IP_ADDR)
-               ip_addr_read(fd, &p->ip);
+               ip_addr_read(b, &p->ip);
 }
 
 static void
-cert_as_read(int fd, struct cert_as *p)
+cert_as_read(struct ibuf *b, struct cert_as *p)
 {
-
-       io_simple_read(fd, &p->type, sizeof(enum cert_as_type));
+       io_read_buf(b, &p->type, sizeof(enum cert_as_type));
        if (p->type == CERT_AS_RANGE) {
-               io_simple_read(fd, &p->range.min, sizeof(uint32_t));
-               io_simple_read(fd, &p->range.max, sizeof(uint32_t));
+               io_read_buf(b, &p->range.min, sizeof(uint32_t));
+               io_read_buf(b, &p->range.max, sizeof(uint32_t));
        } else if (p->type == CERT_AS_ID)
-               io_simple_read(fd, &p->id, sizeof(uint32_t));
+               io_read_buf(b, &p->id, sizeof(uint32_t));
 }
 
 /*
@@ -1316,7 +1314,7 @@ cert_as_read(int fd, struct cert_as *p)
  * Always returns a valid pointer.
  */
 struct cert *
-cert_read(int fd)
+cert_read(struct ibuf *b)
 {
        struct cert     *p;
        size_t           i;
@@ -1324,35 +1322,36 @@ cert_read(int fd)
        if ((p = calloc(1, sizeof(struct cert))) == NULL)
                err(1, NULL);
 
-       io_simple_read(fd, &p->valid, sizeof(int));
-       io_simple_read(fd, &p->expires, sizeof(time_t));
-       io_simple_read(fd, &p->purpose, sizeof(enum cert_purpose));
-       io_simple_read(fd, &p->ipsz, sizeof(size_t));
+       io_read_buf(b, &p->valid, sizeof(int));
+       io_read_buf(b, &p->expires, sizeof(time_t));
+       io_read_buf(b, &p->purpose, sizeof(enum cert_purpose));
+       io_read_buf(b, &p->ipsz, sizeof(size_t));
+
        p->ips = calloc(p->ipsz, sizeof(struct cert_ip));
        if (p->ips == NULL)
                err(1, NULL);
        for (i = 0; i < p->ipsz; i++)
-               cert_ip_read(fd, &p->ips[i]);
+               cert_ip_read(b, &p->ips[i]);
 
-       io_simple_read(fd, &p->asz, sizeof(size_t));
+       io_read_buf(b, &p->asz, sizeof(size_t));
        p->as = calloc(p->asz, sizeof(struct cert_as));
        if (p->as == NULL)
                err(1, NULL);
        for (i = 0; i < p->asz; i++)
-               cert_as_read(fd, &p->as[i]);
+               cert_as_read(b, &p->as[i]);
+
+       io_read_str(b, &p->mft);
+       io_read_str(b, &p->notify);
+       io_read_str(b, &p->repo);
+       io_read_str(b, &p->crl);
+       io_read_str(b, &p->aia);
+       io_read_str(b, &p->aki);
+       io_read_str(b, &p->ski);
+       io_read_str(b, &p->tal);
+       io_read_str(b, &p->pubkey);
 
-       io_str_read(fd, &p->mft);
        assert(p->mft != NULL || p->purpose == CERT_PURPOSE_BGPSEC_ROUTER);
-       io_str_read(fd, &p->notify);
-       io_str_read(fd, &p->repo);
-       io_str_read(fd, &p->crl);
-       io_str_read(fd, &p->aia);
-       io_str_read(fd, &p->aki);
-       io_str_read(fd, &p->ski);
        assert(p->ski);
-       io_str_read(fd, &p->tal);
-       io_str_read(fd, &p->pubkey);
-
        return p;
 }
 
Index: extern.h
===================================================================
RCS file: /cvs/src/usr.sbin/rpki-client/extern.h,v
retrieving revision 1.73
diff -u -p -r1.73 extern.h
--- extern.h    22 Oct 2021 11:13:06 -0000      1.73
+++ extern.h    23 Oct 2021 12:03:19 -0000
@@ -399,25 +399,25 @@ void               tal_buffer(struct ibuf *, const s
 void            tal_free(struct tal *);
 struct tal     *tal_parse(const char *, char *);
 char           *tal_read_file(const char *);
-struct tal     *tal_read(int);
+struct tal     *tal_read(struct ibuf *);
 
 void            cert_buffer(struct ibuf *, const struct cert *);
 void            cert_free(struct cert *);
 struct cert    *cert_parse(X509 **, const char *);
 struct cert    *ta_parse(X509 **, const char *, const unsigned char *, size_t);
-struct cert    *cert_read(int);
+struct cert    *cert_read(struct ibuf *);
 void            cert_insert_brks(struct brk_tree *, struct cert *);
 
 void            mft_buffer(struct ibuf *, const struct mft *);
 void            mft_free(struct mft *);
 struct mft     *mft_parse(X509 **, const char *);
 int             mft_check(const char *, struct mft *);
-struct mft     *mft_read(int);
+struct mft     *mft_read(struct ibuf *);
 
 void            roa_buffer(struct ibuf *, const struct roa *);
 void            roa_free(struct roa *);
 struct roa     *roa_parse(X509 **, const char *);
-struct roa     *roa_read(int);
+struct roa     *roa_read(struct ibuf *);
 void            roa_insert_vrps(struct vrp_tree *, struct roa *, size_t *,
                    size_t *);
 
@@ -460,8 +460,8 @@ void                 ip_addr_print(const struct ip_add
 void            ip_addr_buffer(struct ibuf *, const struct ip_addr *);
 void            ip_addr_range_buffer(struct ibuf *,
                        const struct ip_addr_range *);
-void            ip_addr_read(int, struct ip_addr *);
-void            ip_addr_range_read(int, struct ip_addr_range *);
+void            ip_addr_read(struct ibuf *, struct ip_addr *);
+void            ip_addr_range_read(struct ibuf *, struct ip_addr_range *);
 int             ip_addr_cmp(const struct ip_addr *, const struct ip_addr *);
 int             ip_addr_check_overlap(const struct cert_ip *,
                        const char *, const struct cert_ip *, size_t);
@@ -480,7 +480,7 @@ int          as_check_covered(uint32_t, uint32_
 
 /* Parser-specific */
 void            entity_free(struct entity *);
-void            entity_read_req(int fd, struct entity *);
+void            entity_read_req(struct ibuf *, struct entity *);
 void            entityq_flush(struct entityq *, struct repo *);
 void            proc_parser(int) __attribute__((noreturn));
 
@@ -535,8 +535,6 @@ char                *hex_encode(const unsigned char *,
 
 /* Functions for moving data between processes. */
 
-void            io_socket_blocking(int);
-void            io_socket_nonblocking(int);
 struct ibuf    *io_buf_new(void);
 void            io_simple_buffer(struct ibuf *, const void *, size_t);
 void            io_buf_buffer(struct ibuf *, const void *, size_t);
@@ -545,7 +543,11 @@ void                io_buf_close(struct msgbuf *, str
 void            io_simple_read(int, void *, size_t);
 void            io_buf_read_alloc(int, void **, size_t *);
 void            io_str_read(int, char **);
-int             io_recvfd(int, void *, size_t);
+void            io_read_buf(struct ibuf *, void *, size_t);
+void            io_read_str(struct ibuf *, char **);
+void            io_read_buf_alloc(struct ibuf *, void **, size_t *);
+struct ibuf    *io_buf_read(int, struct ibuf **);
+struct ibuf    *io_buf_recvfd(int, struct ibuf **);
 
 /* X509 helpers. */
 
Index: http.c
===================================================================
RCS file: /cvs/src/usr.sbin/rpki-client/http.c,v
retrieving revision 1.43
diff -u -p -r1.43 http.c
--- http.c      22 Oct 2021 11:13:06 -0000      1.43
+++ http.c      23 Oct 2021 11:59:14 -0000
@@ -1769,6 +1769,7 @@ proc_http(char *bind_addr, int fd)
        struct pollfd pfds[NPFDS];
        struct http_connection *conn, *nc;
        struct http_request *req, *nr;
+       struct ibuf *b, *inbuf = NULL;
 
        if (bind_addr != NULL) {
                struct addrinfo hints, *res;
@@ -1859,18 +1860,20 @@ proc_http(char *bind_addr, int fd)
                        }
                }
                if (pfds[0].revents & POLLIN) {
-                       size_t id, size;
-                       int outfd;
-                       char *uri;
-                       char *mod;
-
-                       outfd = io_recvfd(fd, &size, sizeof(size));
-                       io_simple_read(fd, &id, sizeof(id));
-                       io_str_read(fd, &uri);
-                       io_str_read(fd, &mod);
-
-                       /* queue up new requests */
-                       http_req_new(id, uri, mod, outfd);
+                       b = io_buf_recvfd(fd, &inbuf);
+                       if (b != NULL) {
+                               size_t id;
+                               char *uri;
+                               char *mod;
+
+                               io_read_buf(b, &id, sizeof(id));
+                               io_read_str(b, &uri);
+                               io_read_str(b, &mod);
+
+                               /* queue up new requests */
+                               http_req_new(id, uri, mod, b->fd);
+                               ibuf_free(b);
+                       }
                }
 
                now = getmonotime();
Index: io.c
===================================================================
RCS file: /cvs/src/usr.sbin/rpki-client/io.c,v
retrieving revision 1.14
diff -u -p -r1.14 io.c
--- io.c        22 Oct 2021 11:13:06 -0000      1.14
+++ io.c        23 Oct 2021 12:13:41 -0000
@@ -1,4 +1,4 @@
-/*     $OpenBSD: io.c,v 1.14 2021/10/22 11:13:06 claudio Exp $ */
+/*     $OpenBSD: io.c,v 1.13 2021/03/04 13:01:41 claudio Exp $ */
 /*
  * Copyright (c) 2019 Kristaps Dzonsons <krist...@bsd.lv>
  *
@@ -30,28 +30,6 @@
 
 #include "extern.h"
 
-void
-io_socket_blocking(int fd)
-{
-       int      fl;
-
-       if ((fl = fcntl(fd, F_GETFL, 0)) == -1)
-               err(1, "fcntl");
-       if (fcntl(fd, F_SETFL, fl & ~O_NONBLOCK) == -1)
-               err(1, "fcntl");
-}
-
-void
-io_socket_nonblocking(int fd)
-{
-       int      fl;
-
-       if ((fl = fcntl(fd, F_GETFL, 0)) == -1)
-               err(1, "fcntl");
-       if (fcntl(fd, F_SETFL, fl | O_NONBLOCK) == -1)
-               err(1, "fcntl");
-}
-
 /*
  * Create new io buffer, call io_close() when done with it.
  * Function always returns a new buffer.
@@ -109,80 +87,148 @@ io_buf_close(struct msgbuf *msgbuf, stru
 {
        size_t len;
 
-       len = ibuf_size(b);
+       len = ibuf_size(b) - sizeof(len);
        memcpy(ibuf_seek(b, 0, sizeof(len)), &len, sizeof(len));
        ibuf_close(msgbuf, b);
 }
 
 /*
- * Read of a binary buffer that must be on a blocking descriptor.
+ * Read of an ibuf and extract sz byte from there.
  * Does nothing if "sz" is zero.
- * This will fail and exit on EOF.
+ * Return 1 on success or 0 if there was not enough data.
  */
 void
-io_simple_read(int fd, void *res, size_t sz)
+io_read_buf(struct ibuf *b, void *res, size_t sz)
 {
-       ssize_t  ssz;
        char    *tmp;
 
-       tmp = res; /* arithmetic on a pointer to void is a GNU extension */
-again:
        if (sz == 0)
                return;
-       if ((ssz = read(fd, tmp, sz)) == -1)
-               err(1, "read");
-       else if (ssz == 0)
-               errx(1, "read: unexpected end of file");
-       else if ((size_t)ssz == sz)
+       tmp = ibuf_seek(b, b->rpos, sz);
+       if (tmp == NULL)
+               errx(1, "bad internal framing, buffer too short");
+       b->rpos += sz;
+       memcpy(res, tmp, sz);
+}
+
+/*
+ * Read a string (returns NULL for zero-length strings), allocating
+ * space for it.
+ * Return 1 on success or 0 if there was not enough data.
+ */
+void
+io_read_str(struct ibuf *b, char **res)
+{
+       size_t   sz;
+
+       io_read_buf(b, &sz, sizeof(sz));
+       if (sz == 0) {
+               *res = NULL;
                return;
-       sz -= ssz;
-       tmp += ssz;
-       goto again;
+       }
+       if ((*res = calloc(sz + 1, 1)) == NULL)
+               err(1, NULL);
+       io_read_buf(b, *res, sz);
 }
 
 /*
  * Read a binary buffer, allocating space for it.
  * If the buffer is zero-sized, this won't allocate "res", but
  * will still initialise it to NULL.
+ * Return 1 on success or 0 if there was not enough data.
  */
 void
-io_buf_read_alloc(int fd, void **res, size_t *sz)
+io_read_buf_alloc(struct ibuf *b, void **res, size_t *sz)
 {
-
        *res = NULL;
-       io_simple_read(fd, sz, sizeof(size_t));
+       io_read_buf(b, sz, sizeof(sz));
        if (*sz == 0)
                return;
        if ((*res = malloc(*sz)) == NULL)
                err(1, NULL);
-       io_simple_read(fd, *res, *sz);
+       io_read_buf(b, *res, *sz);
+}
+
+/* XXX copy from imsg-buffer.c */
+static int
+ibuf_realloc(struct ibuf *buf, size_t len)
+{
+       unsigned char   *b;
+
+       /* on static buffers max is eq size and so the following fails */
+       if (buf->wpos + len > buf->max) {
+               errno = ERANGE;
+               return (-1);
+       }
+
+       b = recallocarray(buf->buf, buf->size, buf->wpos + len, 1);
+       if (b == NULL)
+               return (-1);
+       buf->buf = b;
+       buf->size = buf->wpos + len;
+
+       return (0);
 }
 
 /*
- * Read a string (returns NULL for zero-length strings), allocating
- * space for it.
+ * Read once and fill a ibuf until it is finished.
+ * Returns NULL if more data is needed, returns a full ibuf once
+ * all data is received.
  */
-void
-io_str_read(int fd, char **res)
+struct ibuf *
+io_buf_read(int fd, struct ibuf **ib)
 {
-       size_t   sz;
+       struct ibuf *b = *ib;
+       ssize_t n;
+       size_t sz;
 
-       io_simple_read(fd, &sz, sizeof(size_t));
-       if (sz == 0) {
-               *res = NULL;
-               return;
+       /* if ibuf == NULL allocate a new buffer */
+       if (b == NULL) {
+               if ((b = ibuf_dynamic(sizeof(sz), INT32_MAX)) == NULL)
+                       err(1, NULL);
+               *ib = b;
        }
-       if ((*res = calloc(sz + 1, 1)) == NULL)
-               err(1, NULL);
-       io_simple_read(fd, *res, sz);
+
+       /* read some data */
+       while ((n = read(fd, b->buf + b->wpos, b->size - b->wpos)) == -1) {
+               if (errno == EINTR)
+                       continue;
+               err(1, "read");
+       }
+
+       if (n == 0)
+               errx(1, "read: unexpected end of file");
+       b->wpos += n;
+
+       /* got full message */
+       if (b->wpos == b->size) {
+               /* only header received */
+               if (b->wpos == sizeof(sz)) {
+                       memcpy(&sz, b->buf, sizeof(sz));
+                       if (sz == 0 || sz > INT32_MAX)
+                               errx(1, "bad internal framing, bad size");
+                       if (ibuf_realloc(b, sz) == -1)
+                               err(1, "ibuf_realloc");
+                       return NULL;
+               }
+
+               /* skip over initial size header */
+               b->rpos += sizeof(sz);
+               *ib = NULL;
+               return b;
+       }
+
+       return NULL;
 }
 
+
 /*
  * Read data from socket but receive a file descriptor at the same time.
  */
-int
-io_recvfd(int fd, void *res, size_t sz)
+struct ibuf *
+io_buf_recvfd(int fd, struct ibuf **ib)
 {
+       struct ibuf *b = *ib;
        struct iovec iov;
        struct msghdr msg;
        struct cmsghdr *cmsg;
@@ -190,15 +236,22 @@ io_recvfd(int fd, void *res, size_t sz)
                struct cmsghdr  hdr;
                char            buf[CMSG_SPACE(sizeof(int))];
        } cmsgbuf;
-       int outfd = -1;
-       char *b = res;
        ssize_t n;
+       size_t sz;
 
+       /* fd are only passed on the head, just use regular read afterwards */
+       if (b != NULL)
+               return io_buf_read(fd, ib);
+
+       if ((b = ibuf_dynamic(sizeof(sz), INT32_MAX)) == NULL)
+               err(1, NULL);
+       *ib = b;
+       
        memset(&msg, 0, sizeof(msg));
        memset(&cmsgbuf, 0, sizeof(cmsgbuf));
 
-       iov.iov_base = res;
-       iov.iov_len = sz;
+       iov.iov_base = b->buf;
+       iov.iov_len = b->size;
 
        msg.msg_iov = &iov;
        msg.msg_iovlen = 1;
@@ -225,29 +278,32 @@ io_recvfd(int fd, void *res, size_t sz)
                        for (i = 0; i < j; i++) {
                                f = ((int *)CMSG_DATA(cmsg))[i];
                                if (i == 0)
-                                       outfd = f;
+                                       b->fd = f;
                                else
                                        close(f);
                        }
                }
        }
 
-       b += n;
-       sz -= n;
-       while (sz > 0) {
-               /* short receive */
-               n = recv(fd, b, sz, 0);
-               if (n == -1) {
-                       if (errno == EINTR)
-                               continue;
-                       err(1, "recv");
+       b->wpos += n;
+
+       /* got full message */
+       if (b->wpos == b->size) {
+               /* only header received */
+               if (b->wpos == sizeof(sz)) {
+                       memcpy(&sz, b->buf, sizeof(sz));
+                       if (sz == 0 || sz > INT32_MAX)
+                               errx(1, "read: bad internal framing, %zu", sz);
+                       if (ibuf_realloc(b, sz) == -1)
+                               err(1, "ibuf_realloc");
+                       return NULL;
                }
-               if (n == 0)
-                       errx(1, "recv: unexpected end of file");
 
-               b += n;
-               sz -= n;
+               /* skip over initial size header */
+               b->rpos += sizeof(sz);
+               *ib = NULL;
+               return b;
        }
 
-       return outfd;
+       return NULL;
 }
Index: ip.c
===================================================================
RCS file: /cvs/src/usr.sbin/rpki-client/ip.c,v
retrieving revision 1.17
diff -u -p -r1.17 ip.c
--- ip.c        19 Apr 2021 17:04:35 -0000      1.17
+++ ip.c        23 Oct 2021 11:59:43 -0000
@@ -314,14 +314,14 @@ ip_addr_range_buffer(struct ibuf *b, con
  * Matched with ip_addr_buffer().
  */
 void
-ip_addr_read(int fd, struct ip_addr *p)
+ip_addr_read(struct ibuf *b, struct ip_addr *p)
 {
        size_t sz;
 
-       io_simple_read(fd, &p->prefixlen, sizeof(unsigned char));
+       io_read_buf(b, &p->prefixlen, sizeof(unsigned char));
        sz = PREFIX_SIZE(p->prefixlen);
        assert(sz <= 16);
-       io_simple_read(fd, p->addr, sz);
+       io_read_buf(b, p->addr, sz);
 }
 
 /*
@@ -329,11 +329,10 @@ ip_addr_read(int fd, struct ip_addr *p)
  * Matched with ip_addr_range_buffer().
  */
 void
-ip_addr_range_read(int fd, struct ip_addr_range *p)
+ip_addr_range_read(struct ibuf *b, struct ip_addr_range *p)
 {
-
-       ip_addr_read(fd, &p->min);
-       ip_addr_read(fd, &p->max);
+       ip_addr_read(b, &p->min);
+       ip_addr_read(b, &p->max);
 }
 
 /*
Index: main.c
===================================================================
RCS file: /cvs/src/usr.sbin/rpki-client/main.c,v
retrieving revision 1.150
diff -u -p -r1.150 main.c
--- main.c      22 Oct 2021 11:13:06 -0000      1.150
+++ main.c      23 Oct 2021 12:03:01 -0000
@@ -99,17 +99,14 @@ entity_free(struct entity *ent)
  * The pointer must be passed entity_free().
  */
 void
-entity_read_req(int fd, struct entity *ent)
+entity_read_req(struct ibuf *b, struct entity *ent)
 {
-       size_t size;
-
-       io_simple_read(fd, &size, sizeof(size));
-       io_simple_read(fd, &ent->type, sizeof(enum rtype));
-       io_str_read(fd, &ent->file);
-       io_simple_read(fd, &ent->has_pkey, sizeof(int));
+       io_read_buf(b, &ent->type, sizeof(ent->type));
+       io_read_str(b, &ent->file);
+       io_read_buf(b, &ent->has_pkey, sizeof(ent->has_pkey));
        if (ent->has_pkey)
-               io_buf_read_alloc(fd, (void **)&ent->pkey, &ent->pkeysz);
-       io_str_read(fd, &ent->descr);
+               io_read_buf_alloc(b, (void **)&ent->pkey, &ent->pkeysz);
+       io_read_str(b, &ent->descr);
 }
 
 /*
@@ -459,7 +456,7 @@ queue_add_from_cert(const struct cert *c
  * In all cases, we gather statistics.
  */
 static void
-entity_process(int proc, struct stats *st, struct vrp_tree *tree,
+entity_process(struct ibuf *b, struct stats *st, struct vrp_tree *tree,
     struct brk_tree *brktree)
 {
        enum rtype       type;
@@ -467,7 +464,6 @@ entity_process(int proc, struct stats *s
        struct cert     *cert;
        struct mft      *mft;
        struct roa      *roa;
-       size_t           size;
        int              c;
 
        /*
@@ -476,24 +472,23 @@ entity_process(int proc, struct stats *s
         * certificate, for example).
         * We follow that up with whether the resources didn't parse.
         */
-       io_simple_read(proc, &size, sizeof(size));
-       io_simple_read(proc, &type, sizeof(type));
+       io_read_buf(b, &type, sizeof(type));
 
        switch (type) {
        case RTYPE_TAL:
                st->tals++;
-               tal = tal_read(proc);
+               tal = tal_read(b);
                queue_add_from_tal(tal);
                tal_free(tal);
                break;
        case RTYPE_CER:
                st->certs++;
-               io_simple_read(proc, &c, sizeof(int));
+               io_read_buf(b, &c, sizeof(c));
                if (c == 0) {
                        st->certs_fail++;
                        break;
                }
-               cert = cert_read(proc);
+               cert = cert_read(b);
                if (cert->purpose == CERT_PURPOSE_CA) {
                        if (cert->valid) {
                                /*
@@ -517,12 +512,12 @@ entity_process(int proc, struct stats *s
                break;
        case RTYPE_MFT:
                st->mfts++;
-               io_simple_read(proc, &c, sizeof(int));
+               io_read_buf(b, &c, sizeof(c));
                if (c == 0) {
                        st->mfts_fail++;
                        break;
                }
-               mft = mft_read(proc);
+               mft = mft_read(b);
                if (mft->stale)
                        st->mfts_stale++;
                queue_add_from_mft_set(mft);
@@ -533,12 +528,12 @@ entity_process(int proc, struct stats *s
                break;
        case RTYPE_ROA:
                st->roas++;
-               io_simple_read(proc, &c, sizeof(int));
+               io_read_buf(b, &c, sizeof(c));
                if (c == 0) {
                        st->roas_fail++;
                        break;
                }
-               roa = roa_read(proc);
+               roa = roa_read(b);
                if (roa->valid)
                        roa_insert_vrps(tree, roa, &st->vrps, &st->uniqs);
                else
@@ -555,6 +550,57 @@ entity_process(int proc, struct stats *s
        entity_queue--;
 }
 
+static void
+rrdp_process(struct ibuf *b)
+{
+       enum rrdp_msg type;
+       enum publish_type pt;
+       struct rrdp_session s;
+       char *uri, *last_mod, *data;
+       char hash[SHA256_DIGEST_LENGTH];
+       size_t dsz, id;
+       int ok;
+
+       io_read_buf(b, &type, sizeof(type));
+       io_read_buf(b, &id, sizeof(id));
+
+       switch (type) {
+       case RRDP_END:
+               io_read_buf(b, &ok, sizeof(ok));
+               rrdp_finish(id, ok);
+               break;
+       case RRDP_HTTP_REQ:
+               io_read_str(b, &uri);
+               io_read_str(b, &last_mod);
+               rrdp_http_fetch(id, uri, last_mod);
+               break;
+       case RRDP_SESSION:
+               io_read_str(b, &s.session_id);
+               io_read_buf(b, &s.serial, sizeof(s.serial));
+               io_read_str(b, &s.last_mod);
+               rrdp_save_state(id, &s);
+               free(s.session_id);
+               free(s.last_mod);
+               break;
+       case RRDP_FILE:
+               io_read_buf(b, &pt, sizeof(pt));
+               if (pt != PUB_ADD)
+                       io_read_buf(b, &hash, sizeof(hash));
+               io_read_str(b, &uri);
+               io_read_buf_alloc(b, (void **)&data, &dsz);
+
+               ok = rrdp_handle_file(id, pt, uri, hash, sizeof(hash),
+                   data, dsz);
+               rrdp_file_resp(id, ok);
+
+               free(uri);
+               free(data);
+               break;
+       default:
+               errx(1, "unexpected rrdp response");
+       }
+}
+
 /*
  * Assign filenames ending in ".tal" in "/etc/rpki" into "tals",
  * returning the number of files found and filled-in.
@@ -623,19 +669,21 @@ suicide(int sig __attribute__((unused)))
 int
 main(int argc, char *argv[])
 {
-       int              rc, c, st, proc, rsync, http, rrdp, ok,
-                        hangup = 0, fl = SOCK_STREAM | SOCK_CLOEXEC;
-       size_t           i, id, talsz = 0, size;
+       int              rc, c, st, proc, rsync, http, rrdp, ok, hangup = 0;
+       int              fl = SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK;
+       size_t           i, id, talsz = 0;
        pid_t            pid, procpid, rsyncpid, httppid, rrdppid;
        int              fd[2];
        struct pollfd    pfd[NPFD];
        struct msgbuf   *queues[NPFD];
+       struct ibuf     *b, *httpbuf = NULL, *procbuf = NULL;
+       struct ibuf     *rrdpbuf = NULL, *rsyncbuf = NULL;
        char            *rsync_prog = "openrsync";
        char            *bind_addr = NULL;
        const char      *cachedir = NULL, *outputdir = NULL;
        const char      *tals[TALSZ_MAX], *errs, *name;
-       struct vrp_tree  v = RB_INITIALIZER(&v);
-       struct brk_tree  b = RB_INITIALIZER(&b);
+       struct vrp_tree  vrps = RB_INITIALIZER(&vrps);
+       struct brk_tree  brks = RB_INITIALIZER(&brks);
        struct rusage   ru;
        struct timeval  start_time, now_time;
 
@@ -972,12 +1020,6 @@ main(int argc, char *argv[])
                                hangup = 1;
                        }
                        if (pfd[i].revents & POLLOUT) {
-                               /*
-                                * XXX work around deadlocks because of
-                                * blocking read vs non-blocking writes.
-                                */
-                               if (i > 1)
-                                       io_socket_nonblocking(pfd[i].fd);
                                switch (msgbuf_write(queues[i])) {
                                case 0:
                                        errx(1, "write[%zu]: "
@@ -985,8 +1027,6 @@ main(int argc, char *argv[])
                                case -1:
                                        err(1, "write[%zu]", i);
                                }
-                               if (i > 1)
-                                       io_socket_blocking(pfd[i].fd);
                        }
                }
                if (hangup)
@@ -1000,75 +1040,38 @@ main(int argc, char *argv[])
                 */
 
                if ((pfd[1].revents & POLLIN)) {
-                       io_simple_read(rsync, &size, sizeof(size));
-                       io_simple_read(rsync, &id, sizeof(id));
-                       io_simple_read(rsync, &ok, sizeof(ok));
-                       rsync_finish(id, ok);
+                       b = io_buf_read(rsync, &rsyncbuf);
+                       if (b != NULL) {
+                               io_read_buf(b, &id, sizeof(id));
+                               io_read_buf(b, &ok, sizeof(ok));
+                               rsync_finish(id, ok);
+                               ibuf_free(b);
+                       }
                }
 
                if ((pfd[2].revents & POLLIN)) {
-                       enum http_result res;
-                       char *last_mod;
-
-                       io_simple_read(http, &size, sizeof(size));
-                       io_simple_read(http, &id, sizeof(id));
-                       io_simple_read(http, &res, sizeof(res));
-                       io_str_read(http, &last_mod);
-                       http_finish(id, res, last_mod);
-                       free(last_mod);
+                       b = io_buf_read(http, &httpbuf);
+                       if (b != NULL) {
+                               enum http_result res;
+                               char *last_mod;
+
+                               io_read_buf(b, &id, sizeof(id));
+                               io_read_buf(b, &res, sizeof(res));
+                               io_read_str(b, &last_mod);
+                               http_finish(id, res, last_mod);
+                               free(last_mod);
+                               ibuf_free(b);
+                       }
                }
 
                /*
                 * Handle RRDP requests here.
                 */
                if ((pfd[3].revents & POLLIN)) {
-                       enum rrdp_msg type;
-                       enum publish_type pt;
-                       struct rrdp_session s;
-                       char *uri, *last_mod, *data;
-                       char hash[SHA256_DIGEST_LENGTH];
-                       size_t dsz;
-
-                       io_simple_read(rrdp, &size, sizeof(size));
-                       io_simple_read(rrdp, &type, sizeof(type));
-                       io_simple_read(rrdp, &id, sizeof(id));
-
-                       switch (type) {
-                       case RRDP_END:
-                               io_simple_read(rrdp, &ok, sizeof(ok));
-                               rrdp_finish(id, ok);
-                               break;
-                       case RRDP_HTTP_REQ:
-                               io_str_read(rrdp, &uri);
-                               io_str_read(rrdp, &last_mod);
-                               rrdp_http_fetch(id, uri, last_mod);
-                               break;
-                       case RRDP_SESSION:
-                               io_str_read(rrdp, &s.session_id);
-                               io_simple_read(rrdp, &s.serial,
-                                   sizeof(s.serial));
-                               io_str_read(rrdp, &s.last_mod);
-                               rrdp_save_state(id, &s);
-                               free(s.session_id);
-                               free(s.last_mod);
-                               break;
-                       case RRDP_FILE:
-                               io_simple_read(rrdp, &pt, sizeof(pt));
-                               if (pt != PUB_ADD)
-                                       io_simple_read(rrdp, &hash,
-                                           sizeof(hash));
-                               io_str_read(rrdp, &uri);
-                               io_buf_read_alloc(rrdp, (void **)&data, &dsz);
-
-                               ok = rrdp_handle_file(id, pt, uri,
-                                   hash, sizeof(hash), data, dsz);
-                               rrdp_file_resp(id, ok);
-
-                               free(uri);
-                               free(data);
-                               break;
-                       default:
-                               errx(1, "unexpected rrdp response");
+                       b = io_buf_read(rrdp, &rrdpbuf);
+                       if (b != NULL) {
+                               rrdp_process(b);
+                               ibuf_free(b);
                        }
                }
 
@@ -1078,7 +1081,11 @@ main(int argc, char *argv[])
                 */
 
                if ((pfd[0].revents & POLLIN)) {
-                       entity_process(proc, &stats, &v, &b);
+                       b = io_buf_read(proc, &procbuf);
+                       if (b != NULL) {
+                               entity_process(b, &stats, &vrps, &brks);
+                               ibuf_free(b);
+                       }
                }
        }
 
@@ -1154,7 +1161,7 @@ main(int argc, char *argv[])
        if (fchdir(outdirfd) == -1)
                err(1, "fchdir output dir");
 
-       if (outputfiles(&v, &b, &stats))
+       if (outputfiles(&vrps, &brks, &stats))
                rc = 1;
 
 
Index: mft.c
===================================================================
RCS file: /cvs/src/usr.sbin/rpki-client/mft.c,v
retrieving revision 1.38
diff -u -p -r1.38 mft.c
--- mft.c       9 Sep 2021 14:15:49 -0000       1.38
+++ mft.c       23 Oct 2021 12:04:20 -0000
@@ -564,7 +564,7 @@ mft_buffer(struct ibuf *b, const struct 
  * Result must be passed to mft_free().
  */
 struct mft *
-mft_read(int fd)
+mft_read(struct ibuf *b)
 {
        struct mft      *p = NULL;
        size_t           i;
@@ -572,22 +572,22 @@ mft_read(int fd)
        if ((p = calloc(1, sizeof(struct mft))) == NULL)
                err(1, NULL);
 
-       io_simple_read(fd, &p->stale, sizeof(int));
-       io_str_read(fd, &p->file);
-       assert(p->file);
-       io_simple_read(fd, &p->filesz, sizeof(size_t));
+       io_read_buf(b, &p->stale, sizeof(int));
+       io_read_str(b, &p->file);
+       io_read_buf(b, &p->filesz, sizeof(size_t));
 
+       assert(p->file);
        if ((p->files = calloc(p->filesz, sizeof(struct mftfile))) == NULL)
                err(1, NULL);
 
        for (i = 0; i < p->filesz; i++) {
-               io_str_read(fd, &p->files[i].file);
-               io_simple_read(fd, p->files[i].hash, SHA256_DIGEST_LENGTH);
+               io_read_str(b, &p->files[i].file);
+               io_read_buf(b, p->files[i].hash, SHA256_DIGEST_LENGTH);
        }
 
-       io_str_read(fd, &p->aia);
-       io_str_read(fd, &p->aki);
-       io_str_read(fd, &p->ski);
+       io_read_str(b, &p->aia);
+       io_read_str(b, &p->aki);
+       io_read_str(b, &p->ski);
        assert(p->aia && p->aki && p->ski);
 
        return p;
Index: parser.c
===================================================================
RCS file: /cvs/src/usr.sbin/rpki-client/parser.c,v
retrieving revision 1.14
diff -u -p -r1.14 parser.c
--- parser.c    22 Oct 2021 11:13:06 -0000      1.14
+++ parser.c    23 Oct 2021 12:04:54 -0000
@@ -525,7 +525,7 @@ proc_parser(int fd)
        struct entityq   q;
        struct msgbuf    msgq;
        struct pollfd    pfd;
-       struct ibuf     *b;
+       struct ibuf     *b, *inbuf = NULL;
        X509_STORE_CTX  *ctx;
        struct auth_tree auths = RB_INITIALIZER(&auths);
        struct crl_tree  crlt = RB_INITIALIZER(&crlt);
@@ -545,8 +545,6 @@ proc_parser(int fd)
 
        pfd.fd = fd;
 
-       io_socket_nonblocking(pfd.fd);
-
        for (;;) {
                pfd.events = POLLIN;
                if (msgq.queued)
@@ -571,13 +569,16 @@ proc_parser(int fd)
                 */
 
                if ((pfd.revents & POLLIN)) {
-                       io_socket_blocking(fd);
-                       entp = calloc(1, sizeof(struct entity));
-                       if (entp == NULL)
-                               err(1, NULL);
-                       entity_read_req(fd, entp);
-                       TAILQ_INSERT_TAIL(&q, entp, entries);
-                       io_socket_nonblocking(fd);
+                       b = io_buf_read(fd, &inbuf);
+                       
+                       if (b != NULL) {
+                               entp = calloc(1, sizeof(struct entity));
+                               if (entp == NULL)
+                                       err(1, NULL);
+                               entity_read_req(b, entp);
+                               TAILQ_INSERT_TAIL(&q, entp, entries);
+                               ibuf_free(b);
+                       }
                }
 
                if (pfd.revents & POLLOUT) {
Index: roa.c
===================================================================
RCS file: /cvs/src/usr.sbin/rpki-client/roa.c,v
retrieving revision 1.26
diff -u -p -r1.26 roa.c
--- roa.c       7 Oct 2021 08:28:45 -0000       1.26
+++ roa.c       23 Oct 2021 12:05:47 -0000
@@ -448,7 +448,7 @@ roa_buffer(struct ibuf *b, const struct 
  * Result must be passed to roa_free().
  */
 struct roa *
-roa_read(int fd)
+roa_read(struct ibuf *b)
 {
        struct roa      *p;
        size_t           i;
@@ -456,26 +456,26 @@ roa_read(int fd)
        if ((p = calloc(1, sizeof(struct roa))) == NULL)
                err(1, NULL);
 
-       io_simple_read(fd, &p->valid, sizeof(int));
-       io_simple_read(fd, &p->asid, sizeof(uint32_t));
-       io_simple_read(fd, &p->ipsz, sizeof(size_t));
-       io_simple_read(fd, &p->expires, sizeof(time_t));
+       io_read_buf(b, &p->valid, sizeof(int));
+       io_read_buf(b, &p->asid, sizeof(uint32_t));
+       io_read_buf(b, &p->ipsz, sizeof(size_t));
+       io_read_buf(b, &p->expires, sizeof(time_t));
 
        if ((p->ips = calloc(p->ipsz, sizeof(struct roa_ip))) == NULL)
                err(1, NULL);
 
        for (i = 0; i < p->ipsz; i++) {
-               io_simple_read(fd, &p->ips[i].afi, sizeof(enum afi));
-               io_simple_read(fd, &p->ips[i].maxlength, sizeof(size_t));
-               io_simple_read(fd, &p->ips[i].min, sizeof(p->ips[i].min));
-               io_simple_read(fd, &p->ips[i].max, sizeof(p->ips[i].max));
-               ip_addr_read(fd, &p->ips[i].addr);
+               io_read_buf(b, &p->ips[i].afi, sizeof(enum afi));
+               io_read_buf(b, &p->ips[i].maxlength, sizeof(size_t));
+               io_read_buf(b, &p->ips[i].min, sizeof(p->ips[i].min));
+               io_read_buf(b, &p->ips[i].max, sizeof(p->ips[i].max));
+               ip_addr_read(b, &p->ips[i].addr);
        }
 
-       io_str_read(fd, &p->aia);
-       io_str_read(fd, &p->aki);
-       io_str_read(fd, &p->ski);
-       io_str_read(fd, &p->tal);
+       io_read_str(b, &p->aia);
+       io_read_str(b, &p->aki);
+       io_read_str(b, &p->ski);
+       io_read_str(b, &p->tal);
        assert(p->aia && p->aki && p->ski && p->tal);
 
        return p;
Index: rrdp.c
===================================================================
RCS file: /cvs/src/usr.sbin/rpki-client/rrdp.c,v
retrieving revision 1.12
diff -u -p -r1.12 rrdp.c
--- rrdp.c      22 Oct 2021 11:13:06 -0000      1.12
+++ rrdp.c      23 Oct 2021 12:06:53 -0000
@@ -378,32 +378,37 @@ rrdp_finished(struct rrdp *s)
 static void
 rrdp_input_handler(int fd)
 {
+       static struct ibuf *inbuf;
        char *local, *notify, *session_id, *last_mod;
+       struct ibuf *b;
        struct rrdp *s;
        enum rrdp_msg type;
        enum http_result res;
        long long serial;
-       size_t id, size;
-       int infd, ok;
+       size_t id;
+       int ok;
 
-       infd = io_recvfd(fd, &size, sizeof(size));
-       io_simple_read(fd, &type, sizeof(type));
-       io_simple_read(fd, &id, sizeof(id));
+       b = io_buf_recvfd(fd, &inbuf);
+       if (b == NULL)
+               return;
+
+       io_read_buf(b, &type, sizeof(type));
+       io_read_buf(b, &id, sizeof(id));
 
        switch (type) {
        case RRDP_START:
-               io_str_read(fd, &local);
-               io_str_read(fd, &notify);
-               io_str_read(fd, &session_id);
-               io_simple_read(fd, &serial, sizeof(serial));
-               io_str_read(fd, &last_mod);
-               if (infd != -1)
-                       errx(1, "received unexpected fd %d", infd);
+               io_read_str(b, &local);
+               io_read_str(b, &notify);
+               io_read_str(b, &session_id);
+               io_read_buf(b, &serial, sizeof(serial));
+               io_read_str(b, &last_mod);
+               if (b->fd != -1)
+                       errx(1, "received unexpected fd");
 
                s = rrdp_new(id, local, notify, session_id, serial, last_mod);
                break;
        case RRDP_HTTP_INI:
-               if (infd == -1)
+               if (b->fd == -1)
                        errx(1, "expected fd not received");
                s = rrdp_get(id);
                if (s == NULL)
@@ -411,13 +416,13 @@ rrdp_input_handler(int fd)
                if (s->state != RRDP_STATE_WAIT)
                        errx(1, "%s: bad internal state", s->local);
 
-               s->infd = infd;
+               s->infd = b->fd;
                s->state = RRDP_STATE_PARSE;
                break;
        case RRDP_HTTP_FIN:
-               io_simple_read(fd, &res, sizeof(res));
-               io_str_read(fd, &last_mod);
-               if (infd != -1)
+               io_read_buf(b, &res, sizeof(res));
+               io_read_str(b, &last_mod);
+               if (b->fd != -1)
                        errx(1, "received unexpected fd");
 
                s = rrdp_get(id);
@@ -435,12 +440,11 @@ rrdp_input_handler(int fd)
                s = rrdp_get(id);
                if (s == NULL)
                        errx(1, "rrdp session %zu does not exist", id);
-               if (infd != -1)
-                       errx(1, "received unexpected fd %d", infd);
-               io_simple_read(fd, &ok, sizeof(ok));
-               if (ok != 1) {
+               if (b->fd != -1)
+                       errx(1, "received unexpected fd");
+               io_read_buf(b, &ok, sizeof(ok));
+               if (ok != 1)
                        s->file_failed++;
-               }
                s->file_pending--;
                if (s->file_pending == 0)
                        rrdp_finished(s);
@@ -448,6 +452,7 @@ rrdp_input_handler(int fd)
        default:
                errx(1, "unexpected message %d", type);
        }
+       ibuf_free(b);
 }
 
 static void
@@ -558,14 +563,12 @@ proc_rrdp(int fd)
                if (pfds[0].revents & POLLHUP)
                        break;
                if (pfds[0].revents & POLLOUT) {
-                       io_socket_nonblocking(fd);
                        switch (msgbuf_write(&msgq)) {
                        case 0:
                                errx(1, "write: connection closed");
                        case -1:
                                err(1, "write");
                        }
-                       io_socket_blocking(fd);
                }
                if (pfds[0].revents & POLLIN)
                        rrdp_input_handler(fd);
Index: rsync.c
===================================================================
RCS file: /cvs/src/usr.sbin/rpki-client/rsync.c,v
retrieving revision 1.26
diff -u -p -r1.26 rsync.c
--- rsync.c     22 Oct 2021 11:13:06 -0000      1.26
+++ rsync.c     23 Oct 2021 12:07:41 -0000
@@ -120,6 +120,7 @@ proc_rsync(char *prog, char *bind_addr, 
        int                      rc = 0;
        struct pollfd            pfd;
        struct msgbuf            msgq;
+       struct ibuf             *b, *inbuf = NULL;
        sigset_t                 mask, oldmask;
        struct rsyncproc        *ids = NULL;
 
@@ -178,7 +179,7 @@ proc_rsync(char *prog, char *bind_addr, 
 
        for (;;) {
                char *uri = NULL, *dst = NULL;
-               size_t id, size;
+               size_t id;
                pid_t pid;
                int st;
 
@@ -198,7 +199,6 @@ proc_rsync(char *prog, char *bind_addr, 
                         */
 
                        while ((pid = waitpid(WAIT_ANY, &st, WNOHANG)) > 0) {
-                               struct ibuf *b;
                                int ok = 1;
 
                                for (i = 0; i < idsz; i++)
@@ -247,11 +247,17 @@ proc_rsync(char *prog, char *bind_addr, 
                if (!(pfd.revents & POLLIN))
                        continue;
 
+               b = io_buf_read(fd, &inbuf);
+               if (b == NULL)
+                       continue;
+
                /* Read host and module. */
-               io_simple_read(fd, &size, sizeof(size));
-               io_simple_read(fd, &id, sizeof(id));
-               io_str_read(fd, &dst);
-               io_str_read(fd, &uri);
+               io_read_buf(b, &id, sizeof(id));
+               io_read_str(b, &dst);
+               io_read_str(b, &uri);
+
+               ibuf_free(b);
+
                assert(dst);
                assert(uri);
 
Index: tal.c
===================================================================
RCS file: /cvs/src/usr.sbin/rpki-client/tal.c,v
retrieving revision 1.30
diff -u -p -r1.30 tal.c
--- tal.c       1 Apr 2021 06:43:23 -0000       1.30
+++ tal.c       23 Oct 2021 12:07:17 -0000
@@ -284,7 +284,7 @@ tal_buffer(struct ibuf *b, const struct 
  * A returned pointer must be freed with tal_free().
  */
 struct tal *
-tal_read(int fd)
+tal_read(struct ibuf *b)
 {
        size_t           i;
        struct tal      *p;
@@ -292,18 +292,18 @@ tal_read(int fd)
        if ((p = calloc(1, sizeof(struct tal))) == NULL)
                err(1, NULL);
 
-       io_buf_read_alloc(fd, (void **)&p->pkey, &p->pkeysz);
+       io_read_buf_alloc(b, (void **)&p->pkey, &p->pkeysz);
+       io_read_str(b, &p->descr);
+       io_read_buf(b, &p->urisz, sizeof(size_t));
        assert(p->pkeysz > 0);
-       io_str_read(fd, &p->descr);
        assert(p->descr);
-       io_simple_read(fd, &p->urisz, sizeof(size_t));
        assert(p->urisz > 0);
 
        if ((p->uri = calloc(p->urisz, sizeof(char *))) == NULL)
                err(1, NULL);
 
        for (i = 0; i < p->urisz; i++) {
-               io_str_read(fd, &p->uri[i]);
+               io_read_str(b, &p->uri[i]);
                assert(p->uri[i]);
        }
 

Reply via email to