Diff below is the counterpart of the select(2) one I just committed to make poll(2) and ppoll(2) use kqueue internally.
They use the same logic as select(2): convert pollfd into kqueue events with EV_SET(2) then wait in kqueue_scan(). To make this implementation compatible with the existing poll(2) semantic I added a new specific kqueue-filter to FIFOs to handle the case where POLLOUT is specified on a read-only event. Thanks to millert@ for the idea. The regress sys/fifofs is passing with that. As for the select(2) diff I'm currently interested in knowing if you find any incompatibility with the current behavior. Thanks for testing, Martin Index: kern/sys_generic.c =================================================================== RCS file: /cvs/src/sys/kern/sys_generic.c,v retrieving revision 1.136 diff -u -p -r1.136 sys_generic.c --- kern/sys_generic.c 14 Oct 2021 08:46:01 -0000 1.136 +++ kern/sys_generic.c 14 Oct 2021 09:00:22 -0000 @@ -81,6 +81,8 @@ int kqpoll_debug = 0; int pselregister(struct proc *, fd_set *[], int, int *); int pselcollect(struct proc *, struct kevent *, fd_set *[], int *); +int ppollregister(struct proc *, struct pollfd *, int, int *); +int ppollcollect(struct proc *, struct kevent *, struct pollfd *, u_int); int pollout(struct pollfd *, struct pollfd *, u_int); int dopselect(struct proc *, int, fd_set *, fd_set *, fd_set *, @@ -769,6 +771,7 @@ pselregister(struct proc *p, fd_set *pib /* FALLTHROUGH */ case EOPNOTSUPP:/* No underlying kqfilter */ case EINVAL: /* Unimplemented filter */ + case EPERM: /* Specific to FIFO */ error = 0; break; case ENXIO: /* Device has been detached */ @@ -899,31 +902,132 @@ doselwakeup(struct selinfo *sip) } } -void -pollscan(struct proc *p, struct pollfd *pl, u_int nfd, register_t *retval) +int +ppollregister_evts(struct proc *p, struct kevent *kevp, int nkev, + struct pollfd *pl) { - struct filedesc *fdp = p->p_fd; - struct file *fp; - u_int i; - int n = 0; + int i, error, nevents = 0; - for (i = 0; i < nfd; i++, pl++) { - /* Check the file descriptor. */ - if (pl->fd < 0) { - pl->revents = 0; - continue; + KASSERT(pl->revents == 0); + +#ifdef KTRACE + if (KTRPOINT(p, KTR_STRUCT)) + ktrevent(p, kevp, nkev); +#endif + for (i = 0; i < nkev; i++, kevp++) { +again: + error = kqueue_register(p->p_kq, kevp, p); + switch (error) { + case 0: + nevents++; + break; + case EOPNOTSUPP:/* No underlying kqfilter */ + case EINVAL: /* Unimplemented filter */ + break; + case EBADF: /* Bad file descriptor */ + pl->revents |= POLLNVAL; + break; + case EPERM: /* Specific to FIFO */ + KASSERT(kevp->filter == EVFILT_WRITE); + if (nkev == 1) { + /* + * If this is the only filter make sure + * POLLHUP is passed to userland. + */ + kevp->filter = EVFILT_EXCEPT; + goto again; + } + break; + case EPIPE: /* Specific to pipes */ + KASSERT(kevp->filter == EVFILT_WRITE); + pl->revents |= POLLHUP; + break; + default: +#ifdef DIAGNOSTIC + DPRINTFN(0, "poll err %lu fd %d revents %02x serial" + " %lu filt %d ERROR=%d\n", + ((unsigned long)kevp->udata - p->p_kq_serial), + pl->fd, pl->revents, p->p_kq_serial, kevp->filter, + error); +#endif + /* FALLTHROUGH */ + case ENXIO: /* Device has been detached */ + pl->revents |= POLLERR; + break; } - if ((fp = fd_getfile(fdp, pl->fd)) == NULL) { - pl->revents = POLLNVAL; - n++; + } + + return (nevents); +} + +/* + * Convert pollfd into kqueue events and register them on the + * per-thread queue. + * + * Return the number of pollfd that triggered at least one error and aren't + * completly monitored. These pollfd should have the correponding error bit + * set in `revents'. + * + * At most 3 events can correspond to a single pollfd. + */ +int +ppollregister(struct proc *p, struct pollfd *pl, int nfds, int *nregistered) +{ + int i, nkev, nevt, errcount = 0, forcehup = 0; + struct kevent kev[3], *kevp; + + for (i = 0; i < nfds; i++) { + pl[i].events &= ~POLL_NOHUP; + pl[i].revents = 0; + + if (pl[i].fd < 0) continue; + + if (pl[i].events == 0) + forcehup = 1; + + DPRINTFN(1, "poll set %d/%d fd %d events %02x serial %lu\n", + i+1, nfds, pl[i].fd, pl[i].events, p->p_kq_serial); + + nevt = 0; + nkev = 0; + kevp = kev; + if (pl[i].events & (POLLIN | POLLRDNORM)) { + EV_SET(kevp, pl[i].fd, EVFILT_READ, + EV_ADD|EV_ENABLE|EV_ONESHOT|__EV_POLL, 0, 0, + (void *)(p->p_kq_serial + i)); + nkev++; + kevp++; + } + if (pl[i].events & (POLLOUT | POLLWRNORM)) { + EV_SET(kevp, pl[i].fd, EVFILT_WRITE, + EV_ADD|EV_ENABLE|EV_ONESHOT|__EV_POLL, 0, 0, + (void *)(p->p_kq_serial + i)); + nkev++; + kevp++; } - pl->revents = (*fp->f_ops->fo_poll)(fp, pl->events, p); - FRELE(fp, p); - if (pl->revents != 0) - n++; + if ((pl[i].events & (POLLPRI | POLLRDBAND)) || forcehup) { + EV_SET(kevp, pl[i].fd, EVFILT_EXCEPT, + EV_ADD|EV_ENABLE|EV_ONESHOT|__EV_POLL, 0, 0, + (void *)(p->p_kq_serial + i)); + nkev++; + kevp++; + } + + if (nkev == 0) + continue; + + nevt = ppollregister_evts(p, kev, nkev, &pl[i]); + if (nevt == 0 && !forcehup) + errcount++; + *nregistered += nevt; } - *retval = n; + +#if DIAGNOSTIC + DPRINTFN(1, "poll registered = %d, errors = %d\n", *nregistered, + errcount); +#endif + return (errcount); } /* @@ -1013,11 +1117,10 @@ int doppoll(struct proc *p, struct pollfd *fds, u_int nfds, struct timespec *timeout, const sigset_t *sigmask, register_t *retval) { - size_t sz; + struct kqueue_scan_state scan; struct pollfd pfds[4], *pl = pfds; - struct timespec elapsed, start, stop; - uint64_t nsecs; - int ncoll, i, s, error; + int error, nevents = 0; + size_t sz; /* Standards say no more than MAX_OPEN; this is possibly better. */ if (nfds > min((int)lim_cur(RLIMIT_NOFILE), maxfiles)) @@ -1031,58 +1134,75 @@ doppoll(struct proc *p, struct pollfd *f return (EINVAL); } + kqpoll_init(); + sz = nfds * sizeof(*pl); if ((error = copyin(fds, pl, sz)) != 0) goto bad; - for (i = 0; i < nfds; i++) { - pl[i].events &= ~POLL_NOHUP; - pl[i].revents = 0; - } - if (sigmask) dosigsuspend(p, *sigmask &~ sigcantmask); -retry: - ncoll = nselcoll; - atomic_setbits_int(&p->p_flag, P_SELECT); - pollscan(p, pl, nfds, retval); - if (*retval) - goto done; - if (timeout == NULL || timespecisset(timeout)) { - if (timeout != NULL) { - getnanouptime(&start); - nsecs = MIN(TIMESPEC_TO_NSEC(timeout), MAXTSLP); - } else - nsecs = INFSLP; - s = splhigh(); - if ((p->p_flag & P_SELECT) == 0 || nselcoll != ncoll) { - splx(s); - goto retry; - } - atomic_clearbits_int(&p->p_flag, P_SELECT); - error = tsleep_nsec(&selwait, PSOCK | PCATCH, "poll", nsecs); - splx(s); + /* Register kqueue events */ + *retval = ppollregister(p, pl, nfds, &nevents); + + /* + * The poll/select family of syscalls has been designed to + * block when file descriptors are not available, even if + * there's nothing to wait for. + */ + if (nevents == 0) { + uint64_t nsecs = INFSLP; + if (timeout != NULL) { - getnanouptime(&stop); - timespecsub(&stop, &start, &elapsed); - timespecsub(timeout, &elapsed, timeout); - if (timeout->tv_sec < 0) - timespecclear(timeout); + if (!timespecisset(timeout)) + goto done; + nsecs = MAX(1, MIN(TIMESPEC_TO_NSEC(timeout), MAXTSLP)); } - if (error == 0 || error == EWOULDBLOCK) - goto retry; + + error = tsleep_nsec(&p->p_kq, PSOCK | PCATCH, "kqpoll", nsecs); + if (error == ERESTART) + error = EINTR; + if (error == EWOULDBLOCK) + error = 0; + goto done; } + /* Collect at most `nevents' possibly waiting in kqueue_scan() */ + kqueue_scan_setup(&scan, p->p_kq); + while (nevents > 0) { + struct kevent kev[KQ_NEVENTS]; + int i, ready, count; + + /* Maxium number of events per iteration */ + count = MIN(nitems(kev), nevents); + ready = kqueue_scan(&scan, count, kev, timeout, p, &error); +#ifdef KTRACE + if (KTRPOINT(p, KTR_STRUCT)) + ktrevent(p, kev, ready); +#endif + /* Convert back events that are ready. */ + for (i = 0; i < ready; i++) + *retval += ppollcollect(p, &kev[i], pl, nfds); + + /* + * Stop if there was an error or if we had enough + * place to collect all events that were ready. + */ + if (error || ready < count) + break; + + nevents -= ready; + } + kqueue_scan_finish(&scan); done: - atomic_clearbits_int(&p->p_flag, P_SELECT); /* * NOTE: poll(2) is not restarted after a signal and EWOULDBLOCK is * ignored (since the whole point is to see what would block). */ switch (error) { - case ERESTART: + case EINTR: error = pollout(pl, fds, nfds); if (error == 0) error = EINTR; @@ -1099,7 +1219,95 @@ done: bad: if (pl != pfds) free(pl, M_TEMP, sz); + + kqueue_purge(p, p->p_kq); + p->p_kq_serial += nfds; + return (error); +} + +/* + * Convert given kqueue event into corresponding poll(2) revents bit. + */ +int +ppollcollect(struct proc *p, struct kevent *kevp, struct pollfd *pl, u_int nfds) +{ + int already_seen; + unsigned long i; + + /* Extract poll array index */ + i = (unsigned long)kevp->udata - p->p_kq_serial; + +#ifdef DIAGNOSTIC + /* + * Lazily delete spurious events. + * + * This should not happen as long as kqueue_purge() is called + * at the end of every syscall. It migh be interesting to do + * like DragonFlyBSD and not always allocated a new knote in + * kqueue_register() with that lazy removal makes sense. + */ + if (i >= nfds) { + DPRINTFN(0, "poll get out of range udata %lu vs serial %lu\n", + (unsigned long)kevp->udata, p->p_kq_serial); + kevp->flags = EV_DISABLE|EV_DELETE; + kqueue_register(p->p_kq, kevp, p); + return (0); + } + if ((int)kevp->ident != pl[i].fd) { + DPRINTFN(0, "poll get %lu/%d mismatch fd %u!=%d serial %lu\n", + i+1, nfds, (int)kevp->ident, pl[i].fd, p->p_kq_serial); + return (0); + } +#endif + + /* + * A given descriptor may already have generated an error + * against another filter during kqueue_register(). + * + * Make sure to set the appropriate flags but do not + * increment `*retval' more than once. + */ + already_seen = (pl[i].revents != 0); + + switch (kevp->filter) { + case EVFILT_READ: + if (kevp->flags & __EV_HUP) + pl[i].revents |= POLLHUP; + if (pl[i].events & (POLLIN | POLLRDNORM)) + pl[i].revents |= pl[i].events & (POLLIN | POLLRDNORM); + break; + case EVFILT_WRITE: + /* NOTE: POLLHUP and POLLOUT/POLLWRNORM are mutually exclusive*/ + if (kevp->flags & __EV_HUP) { + pl[i].revents |= POLLHUP; + } else if (pl[i].events & (POLLOUT | POLLWRNORM)) { + pl[i].revents |= pl[i].events & (POLLOUT | POLLWRNORM); + } + break; + case EVFILT_EXCEPT: + if (kevp->flags & __EV_HUP) { +#if 1 + if (pl[i].events != 0 && pl[i].events != POLLOUT) + DPRINTFN(0, "weird events %x\n", pl[i].events); +#endif + pl[i].revents |= POLLHUP; + break; + } + if (pl[i].events & (POLLPRI | POLLRDBAND)) + pl[i].revents |= pl[i].events & (POLLPRI | POLLRDBAND); + break; + default: + KASSERT(0); + } + + DPRINTFN(1, "poll get %lu/%d fd %d revents %02x serial %lu filt %d\n", + i+1, nfds, pl[i].fd, pl[i].revents, (unsigned long)kevp->udata, + kevp->filter); + if (!already_seen && (pl[i].revents != 0)) + return (1); + + return (0); } /* Index: miscfs/fifofs/fifo_vnops.c =================================================================== RCS file: /cvs/src/sys/miscfs/fifofs/fifo_vnops.c,v retrieving revision 1.81 diff -u -p -r1.81 fifo_vnops.c --- miscfs/fifofs/fifo_vnops.c 2 Oct 2021 08:51:41 -0000 1.81 +++ miscfs/fifofs/fifo_vnops.c 14 Oct 2021 09:00:22 -0000 @@ -112,6 +112,10 @@ int filt_fifowrite(struct knote *kn, lon int filt_fifowritemodify(struct kevent *kev, struct knote *kn); int filt_fifowriteprocess(struct knote *kn, struct kevent *kev); int filt_fifowrite_common(struct knote *kn, struct socket *so); +int filt_fiforhup(struct knote *kn, long hint); +int filt_fiforhupmodify(struct kevent *kev, struct knote *kn); +int filt_fiforhupprocess(struct knote *kn, struct kevent *kev); +int filt_fiforhup_common(struct knote *kn, struct socket *so); const struct filterops fiforead_filtops = { .f_flags = FILTEROP_ISFD, @@ -131,6 +135,15 @@ const struct filterops fifowrite_filtops .f_process = filt_fifowriteprocess, }; +const struct filterops fifohup_filtops = { + .f_flags = FILTEROP_ISFD, + .f_attach = NULL, + .f_detach = filt_fifordetach, + .f_event = filt_fiforhup, + .f_modify = filt_fiforhupmodify, + .f_process = filt_fiforhupprocess, +}; + /* * Open called to set up a new instance of a fifo or * to find an active instance of a fifo. @@ -516,12 +529,27 @@ fifo_kqfilter(void *v) sb = &so->so_rcv; break; case EVFILT_WRITE: - if (!(ap->a_fflag & FWRITE)) + if (!(ap->a_fflag & FWRITE)) { + /* Tell uper layer to ask for POLLUP only */ + if (ap->a_kn->kn_flags & __EV_POLL) + return (EPERM); return (EINVAL); + } ap->a_kn->kn_fop = &fifowrite_filtops; so = fip->fi_writesock; sb = &so->so_snd; break; + case EVFILT_EXCEPT: + /* + * Filter used to set POLLHUP when no poll(2) flag are + * specified or if POLLOUT is passed on a read-only fd. + */ + if (!(ap->a_kn->kn_flags & __EV_POLL)) + return (EINVAL); + ap->a_kn->kn_fop = &fifohup_filtops; + so = fip->fi_readsock; + sb = &so->so_rcv; + break; default: return (EINVAL); } @@ -670,3 +698,60 @@ filt_fifowriteprocess(struct knote *kn, return (rv); } + +int +filt_fiforhup_common(struct knote *kn, struct socket *so) +{ + int rv = 0; + + soassertlocked(so); + KASSERT(kn->kn_flags & __EV_POLL); + + if (so->so_state & SS_ISDISCONNECTED) { + kn->kn_flags |= __EV_HUP; + rv = 1; + } + + return (rv); +} + +int +filt_fiforhup(struct knote *kn, long hint) +{ + struct socket *so = kn->kn_hook; + + return (filt_fiforhup_common(kn, so)); +} + +int +filt_fiforhupmodify(struct kevent *kev, struct knote *kn) +{ + struct socket *so = kn->kn_hook; + int rv, s; + + s = solock(so); + knote_modify(kev, kn); + rv = filt_fiforhup_common(kn, so); + sounlock(so, s); + + return (rv); +} + +int +filt_fiforhupprocess(struct knote *kn, struct kevent *kev) +{ + struct socket *so = kn->kn_hook; + int rv, s; + + s = solock(so); + if (kev != NULL && (kn->kn_flags & EV_ONESHOT)) + rv = 1; + else + rv = filt_fiforhup_common(kn, so); + if (rv != 0) + knote_submit(kn, kev); + sounlock(so, s); + + return (rv); +} +