Here is a revised version of kqueue-based poll(2).

The two most important changes to the previous version are:

* Properly handle pollfd arrays where more than one entries refer
  to the same file descriptor.

* Fix poll(2)'s return value.

The first change would be somewhat tricky with plain kqueue since
an fd-based knote is identified by a (filter, fd) pair. The poll(2)
translation layer could manage with this by using an auxiliary array
to map overlapping pollfd entries into a single knote. However, most
users of poll(2) appear to avoid fd overlaps. So the mapping would be
a hindrance in the typical case. Also, overlaps might suggest
suboptimal I/O patterns in the userspace software.

The patch handles fd overlaps by adding a pollid field to the knote
identifier. This allows the co-existence of multiple knotes with the
same filter and fd. Of course, the extra identifier is not totally free
either. Nevertheless, it looks to solve the problem relatively nicely.

I have shuffled the code a bit so that the register and collect
routines are next to each other.

In addition, the collect routine now warns if kqueue_scan() returns
an event that does not activate a pollfd entry. Such a situation is
likely the result of a bug somewhere.

Please test. Once this way of implementing poll(2) has proven reliable
enough, it should be possible to replace selwakeup() with kernel lock
free KNOTE() in subsystems that are sufficiently MP-safe.

Index: sys/kern/kern_event.c
===================================================================
RCS file: src/sys/kern/kern_event.c,v
retrieving revision 1.174
diff -u -p -r1.174 kern_event.c
--- sys/kern/kern_event.c       29 Nov 2021 15:54:04 -0000      1.174
+++ sys/kern/kern_event.c       30 Nov 2021 15:05:06 -0000
@@ -450,7 +450,7 @@ filt_proc(struct knote *kn, long hint)
                kev.fflags = kn->kn_sfflags;
                kev.data = kn->kn_id;                   /* parent */
                kev.udata = kn->kn_udata;               /* preserve udata */
-               error = kqueue_register(kq, &kev, NULL);
+               error = kqueue_register(kq, &kev, 0, NULL);
                if (error)
                        kn->kn_fflags |= NOTE_TRACKERR;
        }
@@ -928,7 +928,7 @@ sys_kevent(struct proc *p, void *v, regi
                for (i = 0; i < n; i++) {
                        kevp = &kev[i];
                        kevp->flags &= ~EV_SYSFLAGS;
-                       error = kqueue_register(kq, kevp, p);
+                       error = kqueue_register(kq, kevp, 0, p);
                        if (error || (kevp->flags & EV_RECEIPT)) {
                                if (SCARG(uap, nevents) != 0) {
                                        kevp->flags = EV_ERROR;
@@ -1024,7 +1024,8 @@ bad:
 #endif
 
 int
-kqueue_register(struct kqueue *kq, struct kevent *kev, struct proc *p)
+kqueue_register(struct kqueue *kq, struct kevent *kev, unsigned int pollid,
+    struct proc *p)
 {
        struct filedesc *fdp = kq->kq_fdp;
        const struct filterops *fops = NULL;
@@ -1033,6 +1034,8 @@ kqueue_register(struct kqueue *kq, struc
        struct knlist *list = NULL;
        int active, error = 0;
 
+       KASSERT(pollid == 0 || (p != NULL && p->p_kq == kq));
+
        if (kev->filter < 0) {
                if (kev->filter + EVFILT_SYSCOUNT < 0)
                        return (EINVAL);
@@ -1080,7 +1083,8 @@ again:
        if (list != NULL) {
                SLIST_FOREACH(kn, list, kn_link) {
                        if (kev->filter == kn->kn_filter &&
-                           kev->ident == kn->kn_id) {
+                           kev->ident == kn->kn_id &&
+                           pollid == kn->kn_pollid) {
                                if (!knote_acquire(kn, NULL, 0)) {
                                        /* knote_acquire() has released
                                         * kq_lock. */
@@ -1125,6 +1129,7 @@ again:
                        kev->fflags = 0;
                        kev->data = 0;
                        kn->kn_kevent = *kev;
+                       kn->kn_pollid = pollid;
 
                        knote_attach(kn);
                        mtx_leave(&kq->kq_lock);
Index: sys/kern/sys_generic.c
===================================================================
RCS file: src/sys/kern/sys_generic.c,v
retrieving revision 1.144
diff -u -p -r1.144 sys_generic.c
--- sys/kern/sys_generic.c      30 Nov 2021 02:58:33 -0000      1.144
+++ sys/kern/sys_generic.c      30 Nov 2021 15:05:06 -0000
@@ -81,8 +81,11 @@ int kqpoll_debug = 0;
 
 int pselregister(struct proc *, fd_set *[], fd_set *[], int, int *, int *);
 int pselcollect(struct proc *, struct kevent *, fd_set *[], int *);
+void ppollregister(struct proc *, struct pollfd *, int, int *, int *);
+int ppollregister_evts(struct proc *, struct kevent *, int, struct pollfd *,
+    unsigned int);
+int ppollcollect(struct proc *, struct kevent *, struct pollfd *, u_int);
 
-void pollscan(struct proc *, struct pollfd *, u_int, register_t *);
 int pollout(struct pollfd *, struct pollfd *, u_int);
 int dopselect(struct proc *, int, fd_set *, fd_set *, fd_set *,
     struct timespec *, const sigset_t *, register_t *);
@@ -768,7 +771,7 @@ pselregister(struct proc *p, fd_set *pib
                                if (KTRPOINT(p, KTR_STRUCT))
                                        ktrevent(p, &kev, 1);
 #endif
-                               error = kqueue_register(p->p_kq, &kev, p);
+                               error = kqueue_register(p->p_kq, &kev, 0, p);
                                switch (error) {
                                case 0:
                                        nevents++;
@@ -909,33 +912,6 @@ doselwakeup(struct selinfo *sip)
        }
 }
 
-void
-pollscan(struct proc *p, struct pollfd *pl, u_int nfd, register_t *retval)
-{
-       struct filedesc *fdp = p->p_fd;
-       struct file *fp;
-       u_int i;
-       int n = 0;
-
-       for (i = 0; i < nfd; i++, pl++) {
-               /* Check the file descriptor. */
-               if (pl->fd < 0) {
-                       pl->revents = 0;
-                       continue;
-               }
-               if ((fp = fd_getfile(fdp, pl->fd)) == NULL) {
-                       pl->revents = POLLNVAL;
-                       n++;
-                       continue;
-               }
-               pl->revents = (*fp->f_ops->fo_poll)(fp, pl->events, p);
-               FRELE(fp, p);
-               if (pl->revents != 0)
-                       n++;
-       }
-       *retval = n;
-}
-
 /*
  * Only copyout the revents field.
  */
@@ -1023,11 +999,11 @@ int
 doppoll(struct proc *p, struct pollfd *fds, u_int nfds,
     struct timespec *timeout, const sigset_t *sigmask, register_t *retval)
 {
-       size_t sz;
+       struct kqueue_scan_state scan;
        struct pollfd pfds[4], *pl = pfds;
-       struct timespec elapsed, start, stop;
-       uint64_t nsecs;
-       int ncoll, i, s, error;
+       struct timespec zerots = {};
+       int error, ncollected = 0, nevents = 0;
+       size_t sz;
 
        /* Standards say no more than MAX_OPEN; this is possibly better. */
        if (nfds > min((int)lim_cur(RLIMIT_NOFILE), maxfiles))
@@ -1041,58 +1017,80 @@ doppoll(struct proc *p, struct pollfd *f
                        return (EINVAL);
        }
 
+       kqpoll_init(nfds);
+
        sz = nfds * sizeof(*pl);
 
        if ((error = copyin(fds, pl, sz)) != 0)
                goto bad;
 
-       for (i = 0; i < nfds; i++) {
-               pl[i].events &= ~POLL_NOHUP;
-               pl[i].revents = 0;
-       }
-
        if (sigmask)
                dosigsuspend(p, *sigmask &~ sigcantmask);
 
-retry:
-       ncoll = nselcoll;
-       atomic_setbits_int(&p->p_flag, P_SELECT);
-       pollscan(p, pl, nfds, retval);
-       if (*retval)
-               goto done;
-       if (timeout == NULL || timespecisset(timeout)) {
-               if (timeout != NULL) {
-                       getnanouptime(&start);
-                       nsecs = MIN(TIMESPEC_TO_NSEC(timeout), MAXTSLP);
-               } else
-                       nsecs = INFSLP;
-               s = splhigh();
-               if ((p->p_flag & P_SELECT) == 0 || nselcoll != ncoll) {
-                       splx(s);
-                       goto retry;
-               }
-               atomic_clearbits_int(&p->p_flag, P_SELECT);
-               error = tsleep_nsec(&selwait, PSOCK | PCATCH, "poll", nsecs);
-               splx(s);
+       /* Register kqueue events */
+       ppollregister(p, pl, nfds, &nevents, &ncollected);
+
+       /*
+        * The poll/select family of syscalls has been designed to
+        * block when file descriptors are not available, even if
+        * there's nothing to wait for.
+        */
+       if (nevents == 0 && ncollected == 0) {
+               uint64_t nsecs = INFSLP;
+
                if (timeout != NULL) {
-                       getnanouptime(&stop);
-                       timespecsub(&stop, &start, &elapsed);
-                       timespecsub(timeout, &elapsed, timeout);
-                       if (timeout->tv_sec < 0)
-                               timespecclear(timeout);
+                       if (!timespecisset(timeout))
+                               goto done;
+                       nsecs = MAX(1, MIN(TIMESPEC_TO_NSEC(timeout), MAXTSLP));
                }
-               if (error == 0 || error == EWOULDBLOCK)
-                       goto retry;
+
+               error = tsleep_nsec(&nowake, PSOCK | PCATCH, "kqpoll", nsecs);
+               if (error == ERESTART)
+                       error = EINTR;
+               if (error == EWOULDBLOCK)
+                       error = 0;
+               goto done;
        }
 
+       /* Do not block if registering found pending events. */
+       if (ncollected > 0)
+               timeout = &zerots;
+
+       /* Collect at most `nevents' possibly waiting in kqueue_scan() */
+       kqueue_scan_setup(&scan, p->p_kq);
+       while (nevents > 0) {
+               struct kevent kev[KQ_NEVENTS];
+               int i, ready, count;
+
+               /* Maximum number of events per iteration */
+               count = MIN(nitems(kev), nevents);
+               ready = kqueue_scan(&scan, count, kev, timeout, p, &error);
+#ifdef KTRACE
+               if (KTRPOINT(p, KTR_STRUCT))
+                       ktrevent(p, kev, ready);
+#endif
+               /* Convert back events that are ready. */
+               for (i = 0; i < ready; i++)
+                       ncollected += ppollcollect(p, &kev[i], pl, nfds);
+
+               /*
+                * Stop if there was an error or if we had enough
+                * place to collect all events that were ready.
+                */
+               if (error || ready < count)
+                       break;
+
+               nevents -= ready;
+       }
+       kqueue_scan_finish(&scan);
+       *retval = ncollected;
 done:
-       atomic_clearbits_int(&p->p_flag, P_SELECT);
        /*
         * NOTE: poll(2) is not restarted after a signal and EWOULDBLOCK is
         *       ignored (since the whole point is to see what would block).
         */
        switch (error) {
-       case ERESTART:
+       case EINTR:
                error = pollout(pl, fds, nfds);
                if (error == 0)
                        error = EINTR;
@@ -1109,9 +1107,231 @@ done:
 bad:
        if (pl != pfds)
                free(pl, M_TEMP, sz);
+
+       kqpoll_done(nfds);
+
        return (error);
 }
 
+int
+ppollregister_evts(struct proc *p, struct kevent *kevp, int nkev,
+    struct pollfd *pl, unsigned int pollid)
+{
+       int i, error, nevents = 0;
+
+       KASSERT(pl->revents == 0);
+
+#ifdef KTRACE
+       if (KTRPOINT(p, KTR_STRUCT))
+               ktrevent(p, kevp, nkev);
+#endif
+       for (i = 0; i < nkev; i++, kevp++) {
+again:
+               error = kqueue_register(p->p_kq, kevp, pollid, p);
+               switch (error) {
+               case 0:
+                       nevents++;
+                       break;
+               case EOPNOTSUPP:/* No underlying kqfilter */
+               case EINVAL:    /* Unimplemented filter */
+                       break;
+               case EBADF:     /* Bad file descriptor */
+                       pl->revents |= POLLNVAL;
+                       break;
+               case EPERM:     /* Specific to FIFO */
+                       KASSERT(kevp->filter == EVFILT_WRITE);
+                       if (nkev == 1) {
+                               /*
+                                * If this is the only filter make sure
+                                * POLLHUP is passed to userland.
+                                */
+                               kevp->filter = EVFILT_EXCEPT;
+                               goto again;
+                       }
+                       break;
+               case EPIPE:     /* Specific to pipes */
+                       KASSERT(kevp->filter == EVFILT_WRITE);
+                       pl->revents |= POLLHUP;
+                       break;
+               default:
+                       DPRINTFN(0, "poll err %lu fd %d revents %02x serial"
+                           " %lu filt %d ERROR=%d\n",
+                           ((unsigned long)kevp->udata - p->p_kq_serial),
+                           pl->fd, pl->revents, p->p_kq_serial, kevp->filter,
+                           error);
+                       /* FALLTHROUGH */
+               case ENXIO:     /* Device has been detached */
+                       pl->revents |= POLLERR;
+                       break;
+               }
+       }
+
+       return (nevents);
+}
+
+/*
+ * Convert pollfd into kqueue events and register them on the
+ * per-thread queue.
+ *
+ * At most 3 events can correspond to a single pollfd.
+ */
+void
+ppollregister(struct proc *p, struct pollfd *pl, int nfds, int *nregistered,
+    int *ncollected)
+{
+       int i, nkev, nevt, forcehup = 0;
+       struct kevent kev[3], *kevp;
+
+       for (i = 0; i < nfds; i++) {
+               pl[i].events &= ~POLL_NOHUP;
+               pl[i].revents = 0;
+
+               if (pl[i].fd < 0)
+                       continue;
+
+               if (pl[i].events == 0)
+                       forcehup = 1;
+
+               DPRINTFN(1, "poll set %d/%d fd %d events %02x serial %lu\n",
+                   i+1, nfds, pl[i].fd, pl[i].events, p->p_kq_serial);
+
+               nevt = 0;
+               nkev = 0;
+               kevp = kev;
+               if (pl[i].events & (POLLIN | POLLRDNORM)) {
+                       EV_SET(kevp, pl[i].fd, EVFILT_READ,
+                           EV_ADD|EV_ENABLE|__EV_POLL, 0, 0,
+                           (void *)(p->p_kq_serial + i));
+                       nkev++;
+                       kevp++;
+               }
+               if (pl[i].events & (POLLOUT | POLLWRNORM)) {
+                       EV_SET(kevp, pl[i].fd, EVFILT_WRITE,
+                           EV_ADD|EV_ENABLE|__EV_POLL, 0, 0,
+                           (void *)(p->p_kq_serial + i));
+                       nkev++;
+                       kevp++;
+               }
+               if ((pl[i].events & (POLLPRI | POLLRDBAND)) || forcehup) {
+                       int evff = forcehup ? 0 : NOTE_OOB;
+
+                       EV_SET(kevp, pl[i].fd, EVFILT_EXCEPT,
+                           EV_ADD|EV_ENABLE|__EV_POLL, evff, 0,
+                           (void *)(p->p_kq_serial + i));
+                       nkev++;
+                       kevp++;
+               }
+
+               if (nkev == 0)
+                       continue;
+
+               *nregistered += ppollregister_evts(p, kev, nkev, &pl[i], i);
+
+               if (pl[i].revents != 0)
+                       (*ncollected)++;
+       }
+
+       DPRINTFN(1, "poll registered = %d, collected = %d\n", *nregistered,
+           *ncollected);
+}
+
+/*
+ * Convert given kqueue event into corresponding poll(2) revents bit.
+ */
+int
+ppollcollect(struct proc *p, struct kevent *kevp, struct pollfd *pl, u_int 
nfds)
+{
+       static struct timeval poll_errintvl = { 5, 0 };
+       static struct timeval poll_lasterr;
+       int already_seen;
+       unsigned long i;
+
+       /*  Extract poll array index */
+       i = (unsigned long)kevp->udata - p->p_kq_serial;
+
+       if (i >= nfds) {
+               panic("%s: spurious kevp %p nfds %u udata 0x%lx serial 0x%lx",
+                   __func__, kevp, nfds,
+                   (unsigned long)kevp->udata, p->p_kq_serial);
+       }
+       if ((int)kevp->ident != pl[i].fd) {
+               panic("%s: kevp %p %lu/%d mismatch fd %d!=%d serial 0x%lx",
+                   __func__, kevp, i + 1, nfds, (int)kevp->ident, pl[i].fd,
+                   p->p_kq_serial);
+       }
+
+       /*
+        * A given descriptor may already have generated an error
+        * against another filter during kqueue_register().
+        *
+        * Make sure to set the appropriate flags but do not
+        * increment `*retval' more than once.
+        */
+       already_seen = (pl[i].revents != 0);
+
+       /* POLLNVAL preempts other events. */
+       if ((kevp->flags & EV_ERROR) && kevp->data == EBADF) {
+               pl[i].revents = POLLNVAL;
+               goto done;
+       } else if (pl[i].revents & POLLNVAL) {
+               goto done;
+       }
+
+       switch (kevp->filter) {
+       case EVFILT_READ:
+               if (kevp->flags & __EV_HUP)
+                       pl[i].revents |= POLLHUP;
+               if (pl[i].events & (POLLIN | POLLRDNORM))
+                       pl[i].revents |= pl[i].events & (POLLIN | POLLRDNORM);
+               break;
+       case EVFILT_WRITE:
+               /* POLLHUP and POLLOUT/POLLWRNORM are mutually exclusive */
+               if (kevp->flags & __EV_HUP) {
+                       pl[i].revents |= POLLHUP;
+               } else if (pl[i].events & (POLLOUT | POLLWRNORM)) {
+                       pl[i].revents |= pl[i].events & (POLLOUT | POLLWRNORM);
+               }
+               break;
+       case EVFILT_EXCEPT:
+               if (kevp->flags & __EV_HUP) {
+                       if (pl[i].events != 0 && pl[i].events != POLLOUT)
+                               DPRINTFN(0, "weird events %x\n", pl[i].events);
+                       pl[i].revents |= POLLHUP;
+                       break;
+               }
+               if (pl[i].events & (POLLPRI | POLLRDBAND))
+                       pl[i].revents |= pl[i].events & (POLLPRI | POLLRDBAND);
+               break;
+       default:
+               KASSERT(0);
+       }
+
+done:
+       DPRINTFN(1, "poll get %lu/%d fd %d revents %02x serial %lu filt %d\n",
+           i+1, nfds, pl[i].fd, pl[i].revents, (unsigned long)kevp->udata,
+           kevp->filter);
+
+       /*
+        * Make noise about unclaimed events as they might indicate a bug
+        * and can result in spurious-looking wakeups of poll(2).
+        *
+        * Live-locking within the system call should not happen because
+        * the scan loop in doppoll() has an upper limit for the number
+        * of events to process.
+        */
+       if (pl[i].revents == 0 && ratecheck(&poll_lasterr, &poll_errintvl)) {
+               printf("%s[%d]: poll index %lu fd %d events 0x%x "
+                   "filter %d/0x%x unclaimed\n",
+                   p->p_p->ps_comm, p->p_tid, i, pl[i].fd,
+                   pl[i].events, kevp->filter, kevp->flags);
+       }
+
+       if (!already_seen && (pl[i].revents != 0))
+               return (1);
+
+       return (0);
+}
+
 /*
  * utrace system call
  */
Index: sys/sys/event.h
===================================================================
RCS file: src/sys/sys/event.h,v
retrieving revision 1.59
diff -u -p -r1.59 event.h
--- sys/sys/event.h     29 Nov 2021 15:54:04 -0000      1.59
+++ sys/sys/event.h     30 Nov 2021 15:05:06 -0000
@@ -245,6 +245,8 @@ struct knote {
        } kn_ptr;
        const struct            filterops *kn_fop;
        void                    *kn_hook;       /* [o] */
+       unsigned int            kn_pollid;      /* [I] */
+
 #define KN_ACTIVE      0x0001                  /* event has been triggered */
 #define KN_QUEUED      0x0002                  /* event is on queue */
 #define KN_DISABLED    0x0004                  /* event is disabled */
@@ -295,8 +297,8 @@ extern void knote_modify(const struct ke
 extern void    knote_submit(struct knote *, struct kevent *);
 extern void    kqueue_init(void);
 extern void    kqueue_init_percpu(void);
-extern int     kqueue_register(struct kqueue *kq,
-                   struct kevent *kev, struct proc *p);
+extern int     kqueue_register(struct kqueue *kq, struct kevent *kev,
+                   unsigned int pollid, struct proc *p);
 extern int     kqueue_scan(struct kqueue_scan_state *, int, struct kevent *,
                    struct timespec *, struct proc *, int *);
 extern void    kqueue_scan_setup(struct kqueue_scan_state *, struct kqueue *);
Index: regress/sys/kern/poll/poll_close.c
===================================================================
RCS file: src/regress/sys/kern/poll/poll_close.c,v
retrieving revision 1.3
diff -u -p -r1.3 poll_close.c
--- regress/sys/kern/poll/poll_close.c  27 Nov 2021 15:07:26 -0000      1.3
+++ regress/sys/kern/poll/poll_close.c  30 Nov 2021 15:05:07 -0000
@@ -132,7 +132,7 @@ main(void)
        }
 
        /* Let the thread settle in poll(). */
-       wait_wchan("poll");
+       wait_wchan("kqread");
 
        /* Awaken poll(). */
        write(sock[0], "x", 1);
@@ -153,7 +153,7 @@ main(void)
        write(barrier[0], "x", 1);
 
        /* Let the thread settle in poll(). */
-       wait_wchan("poll");
+       wait_wchan("kqread");
 
        /* Close the fd to awaken poll(). */
        close(sock[1]);

Reply via email to