This diff changes the io read functions to work on ibufs. With this the poll loops will consume data with io_buf_read() until a full message is received and then that message is processed. Thanks to this the processes no longer block while waiting for more data in the io read functions.
With this also the blocking/nonblocking dance done in a few places is no longer needed. Everything is now nonblocking. Further cleanup of the various marshaling functions can follow in a later step. -- :wq Claudio Index: cert.c =================================================================== RCS file: /cvs/src/usr.sbin/rpki-client/cert.c,v retrieving revision 1.39 diff -u -p -r1.39 cert.c --- cert.c 15 Oct 2021 22:30:33 -0000 1.39 +++ cert.c 23 Oct 2021 11:58:43 -0000 @@ -1281,33 +1281,31 @@ cert_buffer(struct ibuf *b, const struct } static void -cert_ip_read(int fd, struct cert_ip *p) +cert_ip_read(struct ibuf *b, struct cert_ip *p) { - - io_simple_read(fd, &p->afi, sizeof(enum afi)); - io_simple_read(fd, &p->type, sizeof(enum cert_ip_type)); + io_read_buf(b, &p->afi, sizeof(enum afi)); + io_read_buf(b, &p->type, sizeof(enum cert_ip_type)); if (p->type != CERT_IP_INHERIT) { - io_simple_read(fd, &p->min, sizeof(p->min)); - io_simple_read(fd, &p->max, sizeof(p->max)); + io_read_buf(b, &p->min, sizeof(p->min)); + io_read_buf(b, &p->max, sizeof(p->max)); } if (p->type == CERT_IP_RANGE) - ip_addr_range_read(fd, &p->range); + ip_addr_range_read(b, &p->range); else if (p->type == CERT_IP_ADDR) - ip_addr_read(fd, &p->ip); + ip_addr_read(b, &p->ip); } static void -cert_as_read(int fd, struct cert_as *p) +cert_as_read(struct ibuf *b, struct cert_as *p) { - - io_simple_read(fd, &p->type, sizeof(enum cert_as_type)); + io_read_buf(b, &p->type, sizeof(enum cert_as_type)); if (p->type == CERT_AS_RANGE) { - io_simple_read(fd, &p->range.min, sizeof(uint32_t)); - io_simple_read(fd, &p->range.max, sizeof(uint32_t)); + io_read_buf(b, &p->range.min, sizeof(uint32_t)); + io_read_buf(b, &p->range.max, sizeof(uint32_t)); } else if (p->type == CERT_AS_ID) - io_simple_read(fd, &p->id, sizeof(uint32_t)); + io_read_buf(b, &p->id, sizeof(uint32_t)); } /* @@ -1316,7 +1314,7 @@ cert_as_read(int fd, struct cert_as *p) * Always returns a valid pointer. */ struct cert * -cert_read(int fd) +cert_read(struct ibuf *b) { struct cert *p; size_t i; @@ -1324,35 +1322,36 @@ cert_read(int fd) if ((p = calloc(1, sizeof(struct cert))) == NULL) err(1, NULL); - io_simple_read(fd, &p->valid, sizeof(int)); - io_simple_read(fd, &p->expires, sizeof(time_t)); - io_simple_read(fd, &p->purpose, sizeof(enum cert_purpose)); - io_simple_read(fd, &p->ipsz, sizeof(size_t)); + io_read_buf(b, &p->valid, sizeof(int)); + io_read_buf(b, &p->expires, sizeof(time_t)); + io_read_buf(b, &p->purpose, sizeof(enum cert_purpose)); + io_read_buf(b, &p->ipsz, sizeof(size_t)); + p->ips = calloc(p->ipsz, sizeof(struct cert_ip)); if (p->ips == NULL) err(1, NULL); for (i = 0; i < p->ipsz; i++) - cert_ip_read(fd, &p->ips[i]); + cert_ip_read(b, &p->ips[i]); - io_simple_read(fd, &p->asz, sizeof(size_t)); + io_read_buf(b, &p->asz, sizeof(size_t)); p->as = calloc(p->asz, sizeof(struct cert_as)); if (p->as == NULL) err(1, NULL); for (i = 0; i < p->asz; i++) - cert_as_read(fd, &p->as[i]); + cert_as_read(b, &p->as[i]); + + io_read_str(b, &p->mft); + io_read_str(b, &p->notify); + io_read_str(b, &p->repo); + io_read_str(b, &p->crl); + io_read_str(b, &p->aia); + io_read_str(b, &p->aki); + io_read_str(b, &p->ski); + io_read_str(b, &p->tal); + io_read_str(b, &p->pubkey); - io_str_read(fd, &p->mft); assert(p->mft != NULL || p->purpose == CERT_PURPOSE_BGPSEC_ROUTER); - io_str_read(fd, &p->notify); - io_str_read(fd, &p->repo); - io_str_read(fd, &p->crl); - io_str_read(fd, &p->aia); - io_str_read(fd, &p->aki); - io_str_read(fd, &p->ski); assert(p->ski); - io_str_read(fd, &p->tal); - io_str_read(fd, &p->pubkey); - return p; } Index: extern.h =================================================================== RCS file: /cvs/src/usr.sbin/rpki-client/extern.h,v retrieving revision 1.73 diff -u -p -r1.73 extern.h --- extern.h 22 Oct 2021 11:13:06 -0000 1.73 +++ extern.h 23 Oct 2021 12:03:19 -0000 @@ -399,25 +399,25 @@ void tal_buffer(struct ibuf *, const s void tal_free(struct tal *); struct tal *tal_parse(const char *, char *); char *tal_read_file(const char *); -struct tal *tal_read(int); +struct tal *tal_read(struct ibuf *); void cert_buffer(struct ibuf *, const struct cert *); void cert_free(struct cert *); struct cert *cert_parse(X509 **, const char *); struct cert *ta_parse(X509 **, const char *, const unsigned char *, size_t); -struct cert *cert_read(int); +struct cert *cert_read(struct ibuf *); void cert_insert_brks(struct brk_tree *, struct cert *); void mft_buffer(struct ibuf *, const struct mft *); void mft_free(struct mft *); struct mft *mft_parse(X509 **, const char *); int mft_check(const char *, struct mft *); -struct mft *mft_read(int); +struct mft *mft_read(struct ibuf *); void roa_buffer(struct ibuf *, const struct roa *); void roa_free(struct roa *); struct roa *roa_parse(X509 **, const char *); -struct roa *roa_read(int); +struct roa *roa_read(struct ibuf *); void roa_insert_vrps(struct vrp_tree *, struct roa *, size_t *, size_t *); @@ -460,8 +460,8 @@ void ip_addr_print(const struct ip_add void ip_addr_buffer(struct ibuf *, const struct ip_addr *); void ip_addr_range_buffer(struct ibuf *, const struct ip_addr_range *); -void ip_addr_read(int, struct ip_addr *); -void ip_addr_range_read(int, struct ip_addr_range *); +void ip_addr_read(struct ibuf *, struct ip_addr *); +void ip_addr_range_read(struct ibuf *, struct ip_addr_range *); int ip_addr_cmp(const struct ip_addr *, const struct ip_addr *); int ip_addr_check_overlap(const struct cert_ip *, const char *, const struct cert_ip *, size_t); @@ -480,7 +480,7 @@ int as_check_covered(uint32_t, uint32_ /* Parser-specific */ void entity_free(struct entity *); -void entity_read_req(int fd, struct entity *); +void entity_read_req(struct ibuf *, struct entity *); void entityq_flush(struct entityq *, struct repo *); void proc_parser(int) __attribute__((noreturn)); @@ -535,8 +535,6 @@ char *hex_encode(const unsigned char *, /* Functions for moving data between processes. */ -void io_socket_blocking(int); -void io_socket_nonblocking(int); struct ibuf *io_buf_new(void); void io_simple_buffer(struct ibuf *, const void *, size_t); void io_buf_buffer(struct ibuf *, const void *, size_t); @@ -545,7 +543,11 @@ void io_buf_close(struct msgbuf *, str void io_simple_read(int, void *, size_t); void io_buf_read_alloc(int, void **, size_t *); void io_str_read(int, char **); -int io_recvfd(int, void *, size_t); +void io_read_buf(struct ibuf *, void *, size_t); +void io_read_str(struct ibuf *, char **); +void io_read_buf_alloc(struct ibuf *, void **, size_t *); +struct ibuf *io_buf_read(int, struct ibuf **); +struct ibuf *io_buf_recvfd(int, struct ibuf **); /* X509 helpers. */ Index: http.c =================================================================== RCS file: /cvs/src/usr.sbin/rpki-client/http.c,v retrieving revision 1.43 diff -u -p -r1.43 http.c --- http.c 22 Oct 2021 11:13:06 -0000 1.43 +++ http.c 23 Oct 2021 11:59:14 -0000 @@ -1769,6 +1769,7 @@ proc_http(char *bind_addr, int fd) struct pollfd pfds[NPFDS]; struct http_connection *conn, *nc; struct http_request *req, *nr; + struct ibuf *b, *inbuf = NULL; if (bind_addr != NULL) { struct addrinfo hints, *res; @@ -1859,18 +1860,20 @@ proc_http(char *bind_addr, int fd) } } if (pfds[0].revents & POLLIN) { - size_t id, size; - int outfd; - char *uri; - char *mod; - - outfd = io_recvfd(fd, &size, sizeof(size)); - io_simple_read(fd, &id, sizeof(id)); - io_str_read(fd, &uri); - io_str_read(fd, &mod); - - /* queue up new requests */ - http_req_new(id, uri, mod, outfd); + b = io_buf_recvfd(fd, &inbuf); + if (b != NULL) { + size_t id; + char *uri; + char *mod; + + io_read_buf(b, &id, sizeof(id)); + io_read_str(b, &uri); + io_read_str(b, &mod); + + /* queue up new requests */ + http_req_new(id, uri, mod, b->fd); + ibuf_free(b); + } } now = getmonotime(); Index: io.c =================================================================== RCS file: /cvs/src/usr.sbin/rpki-client/io.c,v retrieving revision 1.14 diff -u -p -r1.14 io.c --- io.c 22 Oct 2021 11:13:06 -0000 1.14 +++ io.c 23 Oct 2021 12:13:41 -0000 @@ -1,4 +1,4 @@ -/* $OpenBSD: io.c,v 1.14 2021/10/22 11:13:06 claudio Exp $ */ +/* $OpenBSD: io.c,v 1.13 2021/03/04 13:01:41 claudio Exp $ */ /* * Copyright (c) 2019 Kristaps Dzonsons <krist...@bsd.lv> * @@ -30,28 +30,6 @@ #include "extern.h" -void -io_socket_blocking(int fd) -{ - int fl; - - if ((fl = fcntl(fd, F_GETFL, 0)) == -1) - err(1, "fcntl"); - if (fcntl(fd, F_SETFL, fl & ~O_NONBLOCK) == -1) - err(1, "fcntl"); -} - -void -io_socket_nonblocking(int fd) -{ - int fl; - - if ((fl = fcntl(fd, F_GETFL, 0)) == -1) - err(1, "fcntl"); - if (fcntl(fd, F_SETFL, fl | O_NONBLOCK) == -1) - err(1, "fcntl"); -} - /* * Create new io buffer, call io_close() when done with it. * Function always returns a new buffer. @@ -109,80 +87,148 @@ io_buf_close(struct msgbuf *msgbuf, stru { size_t len; - len = ibuf_size(b); + len = ibuf_size(b) - sizeof(len); memcpy(ibuf_seek(b, 0, sizeof(len)), &len, sizeof(len)); ibuf_close(msgbuf, b); } /* - * Read of a binary buffer that must be on a blocking descriptor. + * Read of an ibuf and extract sz byte from there. * Does nothing if "sz" is zero. - * This will fail and exit on EOF. + * Return 1 on success or 0 if there was not enough data. */ void -io_simple_read(int fd, void *res, size_t sz) +io_read_buf(struct ibuf *b, void *res, size_t sz) { - ssize_t ssz; char *tmp; - tmp = res; /* arithmetic on a pointer to void is a GNU extension */ -again: if (sz == 0) return; - if ((ssz = read(fd, tmp, sz)) == -1) - err(1, "read"); - else if (ssz == 0) - errx(1, "read: unexpected end of file"); - else if ((size_t)ssz == sz) + tmp = ibuf_seek(b, b->rpos, sz); + if (tmp == NULL) + errx(1, "bad internal framing, buffer too short"); + b->rpos += sz; + memcpy(res, tmp, sz); +} + +/* + * Read a string (returns NULL for zero-length strings), allocating + * space for it. + * Return 1 on success or 0 if there was not enough data. + */ +void +io_read_str(struct ibuf *b, char **res) +{ + size_t sz; + + io_read_buf(b, &sz, sizeof(sz)); + if (sz == 0) { + *res = NULL; return; - sz -= ssz; - tmp += ssz; - goto again; + } + if ((*res = calloc(sz + 1, 1)) == NULL) + err(1, NULL); + io_read_buf(b, *res, sz); } /* * Read a binary buffer, allocating space for it. * If the buffer is zero-sized, this won't allocate "res", but * will still initialise it to NULL. + * Return 1 on success or 0 if there was not enough data. */ void -io_buf_read_alloc(int fd, void **res, size_t *sz) +io_read_buf_alloc(struct ibuf *b, void **res, size_t *sz) { - *res = NULL; - io_simple_read(fd, sz, sizeof(size_t)); + io_read_buf(b, sz, sizeof(sz)); if (*sz == 0) return; if ((*res = malloc(*sz)) == NULL) err(1, NULL); - io_simple_read(fd, *res, *sz); + io_read_buf(b, *res, *sz); +} + +/* XXX copy from imsg-buffer.c */ +static int +ibuf_realloc(struct ibuf *buf, size_t len) +{ + unsigned char *b; + + /* on static buffers max is eq size and so the following fails */ + if (buf->wpos + len > buf->max) { + errno = ERANGE; + return (-1); + } + + b = recallocarray(buf->buf, buf->size, buf->wpos + len, 1); + if (b == NULL) + return (-1); + buf->buf = b; + buf->size = buf->wpos + len; + + return (0); } /* - * Read a string (returns NULL for zero-length strings), allocating - * space for it. + * Read once and fill a ibuf until it is finished. + * Returns NULL if more data is needed, returns a full ibuf once + * all data is received. */ -void -io_str_read(int fd, char **res) +struct ibuf * +io_buf_read(int fd, struct ibuf **ib) { - size_t sz; + struct ibuf *b = *ib; + ssize_t n; + size_t sz; - io_simple_read(fd, &sz, sizeof(size_t)); - if (sz == 0) { - *res = NULL; - return; + /* if ibuf == NULL allocate a new buffer */ + if (b == NULL) { + if ((b = ibuf_dynamic(sizeof(sz), INT32_MAX)) == NULL) + err(1, NULL); + *ib = b; } - if ((*res = calloc(sz + 1, 1)) == NULL) - err(1, NULL); - io_simple_read(fd, *res, sz); + + /* read some data */ + while ((n = read(fd, b->buf + b->wpos, b->size - b->wpos)) == -1) { + if (errno == EINTR) + continue; + err(1, "read"); + } + + if (n == 0) + errx(1, "read: unexpected end of file"); + b->wpos += n; + + /* got full message */ + if (b->wpos == b->size) { + /* only header received */ + if (b->wpos == sizeof(sz)) { + memcpy(&sz, b->buf, sizeof(sz)); + if (sz == 0 || sz > INT32_MAX) + errx(1, "bad internal framing, bad size"); + if (ibuf_realloc(b, sz) == -1) + err(1, "ibuf_realloc"); + return NULL; + } + + /* skip over initial size header */ + b->rpos += sizeof(sz); + *ib = NULL; + return b; + } + + return NULL; } + /* * Read data from socket but receive a file descriptor at the same time. */ -int -io_recvfd(int fd, void *res, size_t sz) +struct ibuf * +io_buf_recvfd(int fd, struct ibuf **ib) { + struct ibuf *b = *ib; struct iovec iov; struct msghdr msg; struct cmsghdr *cmsg; @@ -190,15 +236,22 @@ io_recvfd(int fd, void *res, size_t sz) struct cmsghdr hdr; char buf[CMSG_SPACE(sizeof(int))]; } cmsgbuf; - int outfd = -1; - char *b = res; ssize_t n; + size_t sz; + /* fd are only passed on the head, just use regular read afterwards */ + if (b != NULL) + return io_buf_read(fd, ib); + + if ((b = ibuf_dynamic(sizeof(sz), INT32_MAX)) == NULL) + err(1, NULL); + *ib = b; + memset(&msg, 0, sizeof(msg)); memset(&cmsgbuf, 0, sizeof(cmsgbuf)); - iov.iov_base = res; - iov.iov_len = sz; + iov.iov_base = b->buf; + iov.iov_len = b->size; msg.msg_iov = &iov; msg.msg_iovlen = 1; @@ -225,29 +278,32 @@ io_recvfd(int fd, void *res, size_t sz) for (i = 0; i < j; i++) { f = ((int *)CMSG_DATA(cmsg))[i]; if (i == 0) - outfd = f; + b->fd = f; else close(f); } } } - b += n; - sz -= n; - while (sz > 0) { - /* short receive */ - n = recv(fd, b, sz, 0); - if (n == -1) { - if (errno == EINTR) - continue; - err(1, "recv"); + b->wpos += n; + + /* got full message */ + if (b->wpos == b->size) { + /* only header received */ + if (b->wpos == sizeof(sz)) { + memcpy(&sz, b->buf, sizeof(sz)); + if (sz == 0 || sz > INT32_MAX) + errx(1, "read: bad internal framing, %zu", sz); + if (ibuf_realloc(b, sz) == -1) + err(1, "ibuf_realloc"); + return NULL; } - if (n == 0) - errx(1, "recv: unexpected end of file"); - b += n; - sz -= n; + /* skip over initial size header */ + b->rpos += sizeof(sz); + *ib = NULL; + return b; } - return outfd; + return NULL; } Index: ip.c =================================================================== RCS file: /cvs/src/usr.sbin/rpki-client/ip.c,v retrieving revision 1.17 diff -u -p -r1.17 ip.c --- ip.c 19 Apr 2021 17:04:35 -0000 1.17 +++ ip.c 23 Oct 2021 11:59:43 -0000 @@ -314,14 +314,14 @@ ip_addr_range_buffer(struct ibuf *b, con * Matched with ip_addr_buffer(). */ void -ip_addr_read(int fd, struct ip_addr *p) +ip_addr_read(struct ibuf *b, struct ip_addr *p) { size_t sz; - io_simple_read(fd, &p->prefixlen, sizeof(unsigned char)); + io_read_buf(b, &p->prefixlen, sizeof(unsigned char)); sz = PREFIX_SIZE(p->prefixlen); assert(sz <= 16); - io_simple_read(fd, p->addr, sz); + io_read_buf(b, p->addr, sz); } /* @@ -329,11 +329,10 @@ ip_addr_read(int fd, struct ip_addr *p) * Matched with ip_addr_range_buffer(). */ void -ip_addr_range_read(int fd, struct ip_addr_range *p) +ip_addr_range_read(struct ibuf *b, struct ip_addr_range *p) { - - ip_addr_read(fd, &p->min); - ip_addr_read(fd, &p->max); + ip_addr_read(b, &p->min); + ip_addr_read(b, &p->max); } /* Index: main.c =================================================================== RCS file: /cvs/src/usr.sbin/rpki-client/main.c,v retrieving revision 1.150 diff -u -p -r1.150 main.c --- main.c 22 Oct 2021 11:13:06 -0000 1.150 +++ main.c 23 Oct 2021 12:03:01 -0000 @@ -99,17 +99,14 @@ entity_free(struct entity *ent) * The pointer must be passed entity_free(). */ void -entity_read_req(int fd, struct entity *ent) +entity_read_req(struct ibuf *b, struct entity *ent) { - size_t size; - - io_simple_read(fd, &size, sizeof(size)); - io_simple_read(fd, &ent->type, sizeof(enum rtype)); - io_str_read(fd, &ent->file); - io_simple_read(fd, &ent->has_pkey, sizeof(int)); + io_read_buf(b, &ent->type, sizeof(ent->type)); + io_read_str(b, &ent->file); + io_read_buf(b, &ent->has_pkey, sizeof(ent->has_pkey)); if (ent->has_pkey) - io_buf_read_alloc(fd, (void **)&ent->pkey, &ent->pkeysz); - io_str_read(fd, &ent->descr); + io_read_buf_alloc(b, (void **)&ent->pkey, &ent->pkeysz); + io_read_str(b, &ent->descr); } /* @@ -459,7 +456,7 @@ queue_add_from_cert(const struct cert *c * In all cases, we gather statistics. */ static void -entity_process(int proc, struct stats *st, struct vrp_tree *tree, +entity_process(struct ibuf *b, struct stats *st, struct vrp_tree *tree, struct brk_tree *brktree) { enum rtype type; @@ -467,7 +464,6 @@ entity_process(int proc, struct stats *s struct cert *cert; struct mft *mft; struct roa *roa; - size_t size; int c; /* @@ -476,24 +472,23 @@ entity_process(int proc, struct stats *s * certificate, for example). * We follow that up with whether the resources didn't parse. */ - io_simple_read(proc, &size, sizeof(size)); - io_simple_read(proc, &type, sizeof(type)); + io_read_buf(b, &type, sizeof(type)); switch (type) { case RTYPE_TAL: st->tals++; - tal = tal_read(proc); + tal = tal_read(b); queue_add_from_tal(tal); tal_free(tal); break; case RTYPE_CER: st->certs++; - io_simple_read(proc, &c, sizeof(int)); + io_read_buf(b, &c, sizeof(c)); if (c == 0) { st->certs_fail++; break; } - cert = cert_read(proc); + cert = cert_read(b); if (cert->purpose == CERT_PURPOSE_CA) { if (cert->valid) { /* @@ -517,12 +512,12 @@ entity_process(int proc, struct stats *s break; case RTYPE_MFT: st->mfts++; - io_simple_read(proc, &c, sizeof(int)); + io_read_buf(b, &c, sizeof(c)); if (c == 0) { st->mfts_fail++; break; } - mft = mft_read(proc); + mft = mft_read(b); if (mft->stale) st->mfts_stale++; queue_add_from_mft_set(mft); @@ -533,12 +528,12 @@ entity_process(int proc, struct stats *s break; case RTYPE_ROA: st->roas++; - io_simple_read(proc, &c, sizeof(int)); + io_read_buf(b, &c, sizeof(c)); if (c == 0) { st->roas_fail++; break; } - roa = roa_read(proc); + roa = roa_read(b); if (roa->valid) roa_insert_vrps(tree, roa, &st->vrps, &st->uniqs); else @@ -555,6 +550,57 @@ entity_process(int proc, struct stats *s entity_queue--; } +static void +rrdp_process(struct ibuf *b) +{ + enum rrdp_msg type; + enum publish_type pt; + struct rrdp_session s; + char *uri, *last_mod, *data; + char hash[SHA256_DIGEST_LENGTH]; + size_t dsz, id; + int ok; + + io_read_buf(b, &type, sizeof(type)); + io_read_buf(b, &id, sizeof(id)); + + switch (type) { + case RRDP_END: + io_read_buf(b, &ok, sizeof(ok)); + rrdp_finish(id, ok); + break; + case RRDP_HTTP_REQ: + io_read_str(b, &uri); + io_read_str(b, &last_mod); + rrdp_http_fetch(id, uri, last_mod); + break; + case RRDP_SESSION: + io_read_str(b, &s.session_id); + io_read_buf(b, &s.serial, sizeof(s.serial)); + io_read_str(b, &s.last_mod); + rrdp_save_state(id, &s); + free(s.session_id); + free(s.last_mod); + break; + case RRDP_FILE: + io_read_buf(b, &pt, sizeof(pt)); + if (pt != PUB_ADD) + io_read_buf(b, &hash, sizeof(hash)); + io_read_str(b, &uri); + io_read_buf_alloc(b, (void **)&data, &dsz); + + ok = rrdp_handle_file(id, pt, uri, hash, sizeof(hash), + data, dsz); + rrdp_file_resp(id, ok); + + free(uri); + free(data); + break; + default: + errx(1, "unexpected rrdp response"); + } +} + /* * Assign filenames ending in ".tal" in "/etc/rpki" into "tals", * returning the number of files found and filled-in. @@ -623,19 +669,21 @@ suicide(int sig __attribute__((unused))) int main(int argc, char *argv[]) { - int rc, c, st, proc, rsync, http, rrdp, ok, - hangup = 0, fl = SOCK_STREAM | SOCK_CLOEXEC; - size_t i, id, talsz = 0, size; + int rc, c, st, proc, rsync, http, rrdp, ok, hangup = 0; + int fl = SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK; + size_t i, id, talsz = 0; pid_t pid, procpid, rsyncpid, httppid, rrdppid; int fd[2]; struct pollfd pfd[NPFD]; struct msgbuf *queues[NPFD]; + struct ibuf *b, *httpbuf = NULL, *procbuf = NULL; + struct ibuf *rrdpbuf = NULL, *rsyncbuf = NULL; char *rsync_prog = "openrsync"; char *bind_addr = NULL; const char *cachedir = NULL, *outputdir = NULL; const char *tals[TALSZ_MAX], *errs, *name; - struct vrp_tree v = RB_INITIALIZER(&v); - struct brk_tree b = RB_INITIALIZER(&b); + struct vrp_tree vrps = RB_INITIALIZER(&vrps); + struct brk_tree brks = RB_INITIALIZER(&brks); struct rusage ru; struct timeval start_time, now_time; @@ -972,12 +1020,6 @@ main(int argc, char *argv[]) hangup = 1; } if (pfd[i].revents & POLLOUT) { - /* - * XXX work around deadlocks because of - * blocking read vs non-blocking writes. - */ - if (i > 1) - io_socket_nonblocking(pfd[i].fd); switch (msgbuf_write(queues[i])) { case 0: errx(1, "write[%zu]: " @@ -985,8 +1027,6 @@ main(int argc, char *argv[]) case -1: err(1, "write[%zu]", i); } - if (i > 1) - io_socket_blocking(pfd[i].fd); } } if (hangup) @@ -1000,75 +1040,38 @@ main(int argc, char *argv[]) */ if ((pfd[1].revents & POLLIN)) { - io_simple_read(rsync, &size, sizeof(size)); - io_simple_read(rsync, &id, sizeof(id)); - io_simple_read(rsync, &ok, sizeof(ok)); - rsync_finish(id, ok); + b = io_buf_read(rsync, &rsyncbuf); + if (b != NULL) { + io_read_buf(b, &id, sizeof(id)); + io_read_buf(b, &ok, sizeof(ok)); + rsync_finish(id, ok); + ibuf_free(b); + } } if ((pfd[2].revents & POLLIN)) { - enum http_result res; - char *last_mod; - - io_simple_read(http, &size, sizeof(size)); - io_simple_read(http, &id, sizeof(id)); - io_simple_read(http, &res, sizeof(res)); - io_str_read(http, &last_mod); - http_finish(id, res, last_mod); - free(last_mod); + b = io_buf_read(http, &httpbuf); + if (b != NULL) { + enum http_result res; + char *last_mod; + + io_read_buf(b, &id, sizeof(id)); + io_read_buf(b, &res, sizeof(res)); + io_read_str(b, &last_mod); + http_finish(id, res, last_mod); + free(last_mod); + ibuf_free(b); + } } /* * Handle RRDP requests here. */ if ((pfd[3].revents & POLLIN)) { - enum rrdp_msg type; - enum publish_type pt; - struct rrdp_session s; - char *uri, *last_mod, *data; - char hash[SHA256_DIGEST_LENGTH]; - size_t dsz; - - io_simple_read(rrdp, &size, sizeof(size)); - io_simple_read(rrdp, &type, sizeof(type)); - io_simple_read(rrdp, &id, sizeof(id)); - - switch (type) { - case RRDP_END: - io_simple_read(rrdp, &ok, sizeof(ok)); - rrdp_finish(id, ok); - break; - case RRDP_HTTP_REQ: - io_str_read(rrdp, &uri); - io_str_read(rrdp, &last_mod); - rrdp_http_fetch(id, uri, last_mod); - break; - case RRDP_SESSION: - io_str_read(rrdp, &s.session_id); - io_simple_read(rrdp, &s.serial, - sizeof(s.serial)); - io_str_read(rrdp, &s.last_mod); - rrdp_save_state(id, &s); - free(s.session_id); - free(s.last_mod); - break; - case RRDP_FILE: - io_simple_read(rrdp, &pt, sizeof(pt)); - if (pt != PUB_ADD) - io_simple_read(rrdp, &hash, - sizeof(hash)); - io_str_read(rrdp, &uri); - io_buf_read_alloc(rrdp, (void **)&data, &dsz); - - ok = rrdp_handle_file(id, pt, uri, - hash, sizeof(hash), data, dsz); - rrdp_file_resp(id, ok); - - free(uri); - free(data); - break; - default: - errx(1, "unexpected rrdp response"); + b = io_buf_read(rrdp, &rrdpbuf); + if (b != NULL) { + rrdp_process(b); + ibuf_free(b); } } @@ -1078,7 +1081,11 @@ main(int argc, char *argv[]) */ if ((pfd[0].revents & POLLIN)) { - entity_process(proc, &stats, &v, &b); + b = io_buf_read(proc, &procbuf); + if (b != NULL) { + entity_process(b, &stats, &vrps, &brks); + ibuf_free(b); + } } } @@ -1154,7 +1161,7 @@ main(int argc, char *argv[]) if (fchdir(outdirfd) == -1) err(1, "fchdir output dir"); - if (outputfiles(&v, &b, &stats)) + if (outputfiles(&vrps, &brks, &stats)) rc = 1; Index: mft.c =================================================================== RCS file: /cvs/src/usr.sbin/rpki-client/mft.c,v retrieving revision 1.38 diff -u -p -r1.38 mft.c --- mft.c 9 Sep 2021 14:15:49 -0000 1.38 +++ mft.c 23 Oct 2021 12:04:20 -0000 @@ -564,7 +564,7 @@ mft_buffer(struct ibuf *b, const struct * Result must be passed to mft_free(). */ struct mft * -mft_read(int fd) +mft_read(struct ibuf *b) { struct mft *p = NULL; size_t i; @@ -572,22 +572,22 @@ mft_read(int fd) if ((p = calloc(1, sizeof(struct mft))) == NULL) err(1, NULL); - io_simple_read(fd, &p->stale, sizeof(int)); - io_str_read(fd, &p->file); - assert(p->file); - io_simple_read(fd, &p->filesz, sizeof(size_t)); + io_read_buf(b, &p->stale, sizeof(int)); + io_read_str(b, &p->file); + io_read_buf(b, &p->filesz, sizeof(size_t)); + assert(p->file); if ((p->files = calloc(p->filesz, sizeof(struct mftfile))) == NULL) err(1, NULL); for (i = 0; i < p->filesz; i++) { - io_str_read(fd, &p->files[i].file); - io_simple_read(fd, p->files[i].hash, SHA256_DIGEST_LENGTH); + io_read_str(b, &p->files[i].file); + io_read_buf(b, p->files[i].hash, SHA256_DIGEST_LENGTH); } - io_str_read(fd, &p->aia); - io_str_read(fd, &p->aki); - io_str_read(fd, &p->ski); + io_read_str(b, &p->aia); + io_read_str(b, &p->aki); + io_read_str(b, &p->ski); assert(p->aia && p->aki && p->ski); return p; Index: parser.c =================================================================== RCS file: /cvs/src/usr.sbin/rpki-client/parser.c,v retrieving revision 1.14 diff -u -p -r1.14 parser.c --- parser.c 22 Oct 2021 11:13:06 -0000 1.14 +++ parser.c 23 Oct 2021 12:04:54 -0000 @@ -525,7 +525,7 @@ proc_parser(int fd) struct entityq q; struct msgbuf msgq; struct pollfd pfd; - struct ibuf *b; + struct ibuf *b, *inbuf = NULL; X509_STORE_CTX *ctx; struct auth_tree auths = RB_INITIALIZER(&auths); struct crl_tree crlt = RB_INITIALIZER(&crlt); @@ -545,8 +545,6 @@ proc_parser(int fd) pfd.fd = fd; - io_socket_nonblocking(pfd.fd); - for (;;) { pfd.events = POLLIN; if (msgq.queued) @@ -571,13 +569,16 @@ proc_parser(int fd) */ if ((pfd.revents & POLLIN)) { - io_socket_blocking(fd); - entp = calloc(1, sizeof(struct entity)); - if (entp == NULL) - err(1, NULL); - entity_read_req(fd, entp); - TAILQ_INSERT_TAIL(&q, entp, entries); - io_socket_nonblocking(fd); + b = io_buf_read(fd, &inbuf); + + if (b != NULL) { + entp = calloc(1, sizeof(struct entity)); + if (entp == NULL) + err(1, NULL); + entity_read_req(b, entp); + TAILQ_INSERT_TAIL(&q, entp, entries); + ibuf_free(b); + } } if (pfd.revents & POLLOUT) { Index: roa.c =================================================================== RCS file: /cvs/src/usr.sbin/rpki-client/roa.c,v retrieving revision 1.26 diff -u -p -r1.26 roa.c --- roa.c 7 Oct 2021 08:28:45 -0000 1.26 +++ roa.c 23 Oct 2021 12:05:47 -0000 @@ -448,7 +448,7 @@ roa_buffer(struct ibuf *b, const struct * Result must be passed to roa_free(). */ struct roa * -roa_read(int fd) +roa_read(struct ibuf *b) { struct roa *p; size_t i; @@ -456,26 +456,26 @@ roa_read(int fd) if ((p = calloc(1, sizeof(struct roa))) == NULL) err(1, NULL); - io_simple_read(fd, &p->valid, sizeof(int)); - io_simple_read(fd, &p->asid, sizeof(uint32_t)); - io_simple_read(fd, &p->ipsz, sizeof(size_t)); - io_simple_read(fd, &p->expires, sizeof(time_t)); + io_read_buf(b, &p->valid, sizeof(int)); + io_read_buf(b, &p->asid, sizeof(uint32_t)); + io_read_buf(b, &p->ipsz, sizeof(size_t)); + io_read_buf(b, &p->expires, sizeof(time_t)); if ((p->ips = calloc(p->ipsz, sizeof(struct roa_ip))) == NULL) err(1, NULL); for (i = 0; i < p->ipsz; i++) { - io_simple_read(fd, &p->ips[i].afi, sizeof(enum afi)); - io_simple_read(fd, &p->ips[i].maxlength, sizeof(size_t)); - io_simple_read(fd, &p->ips[i].min, sizeof(p->ips[i].min)); - io_simple_read(fd, &p->ips[i].max, sizeof(p->ips[i].max)); - ip_addr_read(fd, &p->ips[i].addr); + io_read_buf(b, &p->ips[i].afi, sizeof(enum afi)); + io_read_buf(b, &p->ips[i].maxlength, sizeof(size_t)); + io_read_buf(b, &p->ips[i].min, sizeof(p->ips[i].min)); + io_read_buf(b, &p->ips[i].max, sizeof(p->ips[i].max)); + ip_addr_read(b, &p->ips[i].addr); } - io_str_read(fd, &p->aia); - io_str_read(fd, &p->aki); - io_str_read(fd, &p->ski); - io_str_read(fd, &p->tal); + io_read_str(b, &p->aia); + io_read_str(b, &p->aki); + io_read_str(b, &p->ski); + io_read_str(b, &p->tal); assert(p->aia && p->aki && p->ski && p->tal); return p; Index: rrdp.c =================================================================== RCS file: /cvs/src/usr.sbin/rpki-client/rrdp.c,v retrieving revision 1.12 diff -u -p -r1.12 rrdp.c --- rrdp.c 22 Oct 2021 11:13:06 -0000 1.12 +++ rrdp.c 23 Oct 2021 12:06:53 -0000 @@ -378,32 +378,37 @@ rrdp_finished(struct rrdp *s) static void rrdp_input_handler(int fd) { + static struct ibuf *inbuf; char *local, *notify, *session_id, *last_mod; + struct ibuf *b; struct rrdp *s; enum rrdp_msg type; enum http_result res; long long serial; - size_t id, size; - int infd, ok; + size_t id; + int ok; - infd = io_recvfd(fd, &size, sizeof(size)); - io_simple_read(fd, &type, sizeof(type)); - io_simple_read(fd, &id, sizeof(id)); + b = io_buf_recvfd(fd, &inbuf); + if (b == NULL) + return; + + io_read_buf(b, &type, sizeof(type)); + io_read_buf(b, &id, sizeof(id)); switch (type) { case RRDP_START: - io_str_read(fd, &local); - io_str_read(fd, ¬ify); - io_str_read(fd, &session_id); - io_simple_read(fd, &serial, sizeof(serial)); - io_str_read(fd, &last_mod); - if (infd != -1) - errx(1, "received unexpected fd %d", infd); + io_read_str(b, &local); + io_read_str(b, ¬ify); + io_read_str(b, &session_id); + io_read_buf(b, &serial, sizeof(serial)); + io_read_str(b, &last_mod); + if (b->fd != -1) + errx(1, "received unexpected fd"); s = rrdp_new(id, local, notify, session_id, serial, last_mod); break; case RRDP_HTTP_INI: - if (infd == -1) + if (b->fd == -1) errx(1, "expected fd not received"); s = rrdp_get(id); if (s == NULL) @@ -411,13 +416,13 @@ rrdp_input_handler(int fd) if (s->state != RRDP_STATE_WAIT) errx(1, "%s: bad internal state", s->local); - s->infd = infd; + s->infd = b->fd; s->state = RRDP_STATE_PARSE; break; case RRDP_HTTP_FIN: - io_simple_read(fd, &res, sizeof(res)); - io_str_read(fd, &last_mod); - if (infd != -1) + io_read_buf(b, &res, sizeof(res)); + io_read_str(b, &last_mod); + if (b->fd != -1) errx(1, "received unexpected fd"); s = rrdp_get(id); @@ -435,12 +440,11 @@ rrdp_input_handler(int fd) s = rrdp_get(id); if (s == NULL) errx(1, "rrdp session %zu does not exist", id); - if (infd != -1) - errx(1, "received unexpected fd %d", infd); - io_simple_read(fd, &ok, sizeof(ok)); - if (ok != 1) { + if (b->fd != -1) + errx(1, "received unexpected fd"); + io_read_buf(b, &ok, sizeof(ok)); + if (ok != 1) s->file_failed++; - } s->file_pending--; if (s->file_pending == 0) rrdp_finished(s); @@ -448,6 +452,7 @@ rrdp_input_handler(int fd) default: errx(1, "unexpected message %d", type); } + ibuf_free(b); } static void @@ -558,14 +563,12 @@ proc_rrdp(int fd) if (pfds[0].revents & POLLHUP) break; if (pfds[0].revents & POLLOUT) { - io_socket_nonblocking(fd); switch (msgbuf_write(&msgq)) { case 0: errx(1, "write: connection closed"); case -1: err(1, "write"); } - io_socket_blocking(fd); } if (pfds[0].revents & POLLIN) rrdp_input_handler(fd); Index: rsync.c =================================================================== RCS file: /cvs/src/usr.sbin/rpki-client/rsync.c,v retrieving revision 1.26 diff -u -p -r1.26 rsync.c --- rsync.c 22 Oct 2021 11:13:06 -0000 1.26 +++ rsync.c 23 Oct 2021 12:07:41 -0000 @@ -120,6 +120,7 @@ proc_rsync(char *prog, char *bind_addr, int rc = 0; struct pollfd pfd; struct msgbuf msgq; + struct ibuf *b, *inbuf = NULL; sigset_t mask, oldmask; struct rsyncproc *ids = NULL; @@ -178,7 +179,7 @@ proc_rsync(char *prog, char *bind_addr, for (;;) { char *uri = NULL, *dst = NULL; - size_t id, size; + size_t id; pid_t pid; int st; @@ -198,7 +199,6 @@ proc_rsync(char *prog, char *bind_addr, */ while ((pid = waitpid(WAIT_ANY, &st, WNOHANG)) > 0) { - struct ibuf *b; int ok = 1; for (i = 0; i < idsz; i++) @@ -247,11 +247,17 @@ proc_rsync(char *prog, char *bind_addr, if (!(pfd.revents & POLLIN)) continue; + b = io_buf_read(fd, &inbuf); + if (b == NULL) + continue; + /* Read host and module. */ - io_simple_read(fd, &size, sizeof(size)); - io_simple_read(fd, &id, sizeof(id)); - io_str_read(fd, &dst); - io_str_read(fd, &uri); + io_read_buf(b, &id, sizeof(id)); + io_read_str(b, &dst); + io_read_str(b, &uri); + + ibuf_free(b); + assert(dst); assert(uri); Index: tal.c =================================================================== RCS file: /cvs/src/usr.sbin/rpki-client/tal.c,v retrieving revision 1.30 diff -u -p -r1.30 tal.c --- tal.c 1 Apr 2021 06:43:23 -0000 1.30 +++ tal.c 23 Oct 2021 12:07:17 -0000 @@ -284,7 +284,7 @@ tal_buffer(struct ibuf *b, const struct * A returned pointer must be freed with tal_free(). */ struct tal * -tal_read(int fd) +tal_read(struct ibuf *b) { size_t i; struct tal *p; @@ -292,18 +292,18 @@ tal_read(int fd) if ((p = calloc(1, sizeof(struct tal))) == NULL) err(1, NULL); - io_buf_read_alloc(fd, (void **)&p->pkey, &p->pkeysz); + io_read_buf_alloc(b, (void **)&p->pkey, &p->pkeysz); + io_read_str(b, &p->descr); + io_read_buf(b, &p->urisz, sizeof(size_t)); assert(p->pkeysz > 0); - io_str_read(fd, &p->descr); assert(p->descr); - io_simple_read(fd, &p->urisz, sizeof(size_t)); assert(p->urisz > 0); if ((p->uri = calloc(p->urisz, sizeof(char *))) == NULL) err(1, NULL); for (i = 0; i < p->urisz; i++) { - io_str_read(fd, &p->uri[i]); + io_read_str(b, &p->uri[i]); assert(p->uri[i]); }