On Tue, Nov 30, 2021 at 03:11:16PM +0000, Visa Hankala wrote:
> Here is a revised version of kqueue-based poll(2).
> 
> The two most important changes to the previous version are:
> 
> * Properly handle pollfd arrays where more than one entries refer
>   to the same file descriptor.
> 
> * Fix poll(2)'s return value.
> 
> The first change would be somewhat tricky with plain kqueue since
> an fd-based knote is identified by a (filter, fd) pair. The poll(2)
> translation layer could manage with this by using an auxiliary array
> to map overlapping pollfd entries into a single knote. However, most
> users of poll(2) appear to avoid fd overlaps. So the mapping would be
> a hindrance in the typical case. Also, overlaps might suggest
> suboptimal I/O patterns in the userspace software.
> 
> The patch handles fd overlaps by adding a pollid field to the knote
> identifier. This allows the co-existence of multiple knotes with the
> same filter and fd. Of course, the extra identifier is not totally free
> either. Nevertheless, it looks to solve the problem relatively nicely.
> 
> I have shuffled the code a bit so that the register and collect
> routines are next to each other.
> 
> In addition, the collect routine now warns if kqueue_scan() returns
> an event that does not activate a pollfd entry. Such a situation is
> likely the result of a bug somewhere.
> 
> Please test. Once this way of implementing poll(2) has proven reliable
> enough, it should be possible to replace selwakeup() with kernel lock
> free KNOTE() in subsystems that are sufficiently MP-safe.

Here is a revised version of the patch.

A number of fixes to event filter routines have already been committed.

Changes to the previous version:

* Prevent excessive use of kernel memory with poll(2). Now the code
  follows how many knotes are registered. If the number looks high,
  kqpoll_done() removes knotes eagerly. The condition could include
  `num' as well, but I think the heuristic is good enough already.

* Set forcehup anew on each iteration in ppollregister().

Please test.

Index: kern/kern_event.c
===================================================================
RCS file: src/sys/kern/kern_event.c,v
retrieving revision 1.178
diff -u -p -r1.178 kern_event.c
--- kern/kern_event.c   25 Dec 2021 11:04:58 -0000      1.178
+++ kern/kern_event.c   25 Dec 2021 13:09:06 -0000
@@ -221,6 +221,7 @@ KQRELE(struct kqueue *kq)
        }
 
        KASSERT(TAILQ_EMPTY(&kq->kq_head));
+       KASSERT(kq->kq_nknotes == 0);
 
        free(kq->kq_knlist, M_KEVENT, kq->kq_knlistsize *
            sizeof(struct knlist));
@@ -451,7 +452,7 @@ filt_proc(struct knote *kn, long hint)
                kev.fflags = kn->kn_sfflags;
                kev.data = kn->kn_id;                   /* parent */
                kev.udata = kn->kn_udata;               /* preserve udata */
-               error = kqueue_register(kq, &kev, NULL);
+               error = kqueue_register(kq, &kev, 0, NULL);
                if (error)
                        kn->kn_fflags |= NOTE_TRACKERR;
        }
@@ -814,11 +815,15 @@ void
 kqpoll_done(unsigned int num)
 {
        struct proc *p = curproc;
+       struct kqueue *kq = p->p_kq;
 
        KASSERT(p->p_kq != NULL);
        KASSERT(p->p_kq_serial + num >= p->p_kq_serial);
 
        p->p_kq_serial += num;
+
+       if (kq->kq_nknotes > 4 * kq->kq_knlistsize)
+               kqueue_purge(p, kq);
 }
 
 void
@@ -944,7 +949,7 @@ sys_kevent(struct proc *p, void *v, regi
                for (i = 0; i < n; i++) {
                        kevp = &kev[i];
                        kevp->flags &= ~EV_SYSFLAGS;
-                       error = kqueue_register(kq, kevp, p);
+                       error = kqueue_register(kq, kevp, 0, p);
                        if (error || (kevp->flags & EV_RECEIPT)) {
                                if (SCARG(uap, nevents) != 0) {
                                        kevp->flags = EV_ERROR;
@@ -1040,7 +1045,8 @@ bad:
 #endif
 
 int
-kqueue_register(struct kqueue *kq, struct kevent *kev, struct proc *p)
+kqueue_register(struct kqueue *kq, struct kevent *kev, unsigned int pollid,
+    struct proc *p)
 {
        struct filedesc *fdp = kq->kq_fdp;
        const struct filterops *fops = NULL;
@@ -1049,6 +1055,8 @@ kqueue_register(struct kqueue *kq, struc
        struct knlist *list = NULL;
        int active, error = 0;
 
+       KASSERT(pollid == 0 || (p != NULL && p->p_kq == kq));
+
        if (kev->filter < 0) {
                if (kev->filter + EVFILT_SYSCOUNT < 0)
                        return (EINVAL);
@@ -1096,7 +1104,8 @@ again:
        if (list != NULL) {
                SLIST_FOREACH(kn, list, kn_link) {
                        if (kev->filter == kn->kn_filter &&
-                           kev->ident == kn->kn_id) {
+                           kev->ident == kn->kn_id &&
+                           pollid == kn->kn_pollid) {
                                if (!knote_acquire(kn, NULL, 0)) {
                                        /* knote_acquire() has released
                                         * kq_lock. */
@@ -1141,6 +1150,7 @@ again:
                        kev->fflags = 0;
                        kev->data = 0;
                        kn->kn_kevent = *kev;
+                       kn->kn_pollid = pollid;
 
                        knote_attach(kn);
                        mtx_leave(&kq->kq_lock);
@@ -1905,6 +1915,7 @@ knote_attach(struct knote *kn)
                list = &kq->kq_knhash[KN_HASH(kn->kn_id, kq->kq_knhashmask)];
        }
        SLIST_INSERT_HEAD(list, kn, kn_link);
+       kq->kq_nknotes++;
 }
 
 void
@@ -1916,6 +1927,7 @@ knote_detach(struct knote *kn)
        MUTEX_ASSERT_LOCKED(&kq->kq_lock);
        KASSERT(kn->kn_status & KN_PROCESSING);
 
+       kq->kq_nknotes--;
        if (kn->kn_fop->f_flags & FILTEROP_ISFD)
                list = &kq->kq_knlist[kn->kn_id];
        else
Index: kern/sys_generic.c
===================================================================
RCS file: src/sys/kern/sys_generic.c,v
retrieving revision 1.146
diff -u -p -r1.146 sys_generic.c
--- kern/sys_generic.c  11 Dec 2021 09:28:26 -0000      1.146
+++ kern/sys_generic.c  25 Dec 2021 13:09:06 -0000
@@ -81,8 +81,9 @@ int kqpoll_debug = 0;
 
 int pselregister(struct proc *, fd_set *[], fd_set *[], int, int *, int *);
 int pselcollect(struct proc *, struct kevent *, fd_set *[], int *);
+void ppollregister(struct proc *, struct pollfd *, int, int *, int *);
+int ppollcollect(struct proc *, struct kevent *, struct pollfd *, u_int);
 
-void pollscan(struct proc *, struct pollfd *, u_int, register_t *);
 int pollout(struct pollfd *, struct pollfd *, u_int);
 int dopselect(struct proc *, int, fd_set *, fd_set *, fd_set *,
     struct timespec *, const sigset_t *, register_t *);
@@ -768,7 +769,7 @@ pselregister(struct proc *p, fd_set *pib
                                if (KTRPOINT(p, KTR_STRUCT))
                                        ktrevent(p, &kev, 1);
 #endif
-                               error = kqueue_register(p->p_kq, &kev, p);
+                               error = kqueue_register(p->p_kq, &kev, 0, p);
                                switch (error) {
                                case 0:
                                        nevents++;
@@ -910,33 +911,6 @@ doselwakeup(struct selinfo *sip)
        }
 }
 
-void
-pollscan(struct proc *p, struct pollfd *pl, u_int nfd, register_t *retval)
-{
-       struct filedesc *fdp = p->p_fd;
-       struct file *fp;
-       u_int i;
-       int n = 0;
-
-       for (i = 0; i < nfd; i++, pl++) {
-               /* Check the file descriptor. */
-               if (pl->fd < 0) {
-                       pl->revents = 0;
-                       continue;
-               }
-               if ((fp = fd_getfile(fdp, pl->fd)) == NULL) {
-                       pl->revents = POLLNVAL;
-                       n++;
-                       continue;
-               }
-               pl->revents = (*fp->f_ops->fo_poll)(fp, pl->events, p);
-               FRELE(fp, p);
-               if (pl->revents != 0)
-                       n++;
-       }
-       *retval = n;
-}
-
 /*
  * Only copyout the revents field.
  */
@@ -1024,11 +998,11 @@ int
 doppoll(struct proc *p, struct pollfd *fds, u_int nfds,
     struct timespec *timeout, const sigset_t *sigmask, register_t *retval)
 {
-       size_t sz;
+       struct kqueue_scan_state scan;
+       struct timespec zerots = {};
        struct pollfd pfds[4], *pl = pfds;
-       struct timespec elapsed, start, stop;
-       uint64_t nsecs;
-       int ncoll, i, s, error;
+       int error, ncollected = 0, nevents = 0;
+       size_t sz;
 
        /* Standards say no more than MAX_OPEN; this is possibly better. */
        if (nfds > min((int)lim_cur(RLIMIT_NOFILE), maxfiles))
@@ -1042,58 +1016,80 @@ doppoll(struct proc *p, struct pollfd *f
                        return (EINVAL);
        }
 
+       kqpoll_init(nfds);
+
        sz = nfds * sizeof(*pl);
 
        if ((error = copyin(fds, pl, sz)) != 0)
                goto bad;
 
-       for (i = 0; i < nfds; i++) {
-               pl[i].events &= ~POLL_NOHUP;
-               pl[i].revents = 0;
-       }
-
        if (sigmask)
                dosigsuspend(p, *sigmask &~ sigcantmask);
 
-retry:
-       ncoll = nselcoll;
-       atomic_setbits_int(&p->p_flag, P_SELECT);
-       pollscan(p, pl, nfds, retval);
-       if (*retval)
-               goto done;
-       if (timeout == NULL || timespecisset(timeout)) {
-               if (timeout != NULL) {
-                       getnanouptime(&start);
-                       nsecs = MIN(TIMESPEC_TO_NSEC(timeout), MAXTSLP);
-               } else
-                       nsecs = INFSLP;
-               s = splhigh();
-               if ((p->p_flag & P_SELECT) == 0 || nselcoll != ncoll) {
-                       splx(s);
-                       goto retry;
-               }
-               atomic_clearbits_int(&p->p_flag, P_SELECT);
-               error = tsleep_nsec(&selwait, PSOCK | PCATCH, "poll", nsecs);
-               splx(s);
+       /* Register kqueue events */
+       ppollregister(p, pl, nfds, &nevents, &ncollected);
+
+       /*
+        * The poll/select family of syscalls has been designed to
+        * block when file descriptors are not available, even if
+        * there's nothing to wait for.
+        */
+       if (nevents == 0 && ncollected == 0) {
+               uint64_t nsecs = INFSLP;
+
                if (timeout != NULL) {
-                       getnanouptime(&stop);
-                       timespecsub(&stop, &start, &elapsed);
-                       timespecsub(timeout, &elapsed, timeout);
-                       if (timeout->tv_sec < 0)
-                               timespecclear(timeout);
+                       if (!timespecisset(timeout))
+                               goto done;
+                       nsecs = MAX(1, MIN(TIMESPEC_TO_NSEC(timeout), MAXTSLP));
                }
-               if (error == 0 || error == EWOULDBLOCK)
-                       goto retry;
+
+               error = tsleep_nsec(&nowake, PSOCK | PCATCH, "kqpoll", nsecs);
+               if (error == ERESTART)
+                       error = EINTR;
+               if (error == EWOULDBLOCK)
+                       error = 0;
+               goto done;
        }
 
+       /* Do not block if registering found pending events. */
+       if (ncollected > 0)
+               timeout = &zerots;
+
+       /* Collect at most `nevents' possibly waiting in kqueue_scan() */
+       kqueue_scan_setup(&scan, p->p_kq);
+       while (nevents > 0) {
+               struct kevent kev[KQ_NEVENTS];
+               int i, ready, count;
+
+               /* Maximum number of events per iteration */
+               count = MIN(nitems(kev), nevents);
+               ready = kqueue_scan(&scan, count, kev, timeout, p, &error);
+#ifdef KTRACE
+               if (KTRPOINT(p, KTR_STRUCT))
+                       ktrevent(p, kev, ready);
+#endif
+               /* Convert back events that are ready. */
+               for (i = 0; i < ready; i++)
+                       ncollected += ppollcollect(p, &kev[i], pl, nfds);
+
+               /*
+                * Stop if there was an error or if we had enough
+                * place to collect all events that were ready.
+                */
+               if (error || ready < count)
+                       break;
+
+               nevents -= ready;
+       }
+       kqueue_scan_finish(&scan);
+       *retval = ncollected;
 done:
-       atomic_clearbits_int(&p->p_flag, P_SELECT);
        /*
         * NOTE: poll(2) is not restarted after a signal and EWOULDBLOCK is
         *       ignored (since the whole point is to see what would block).
         */
        switch (error) {
-       case ERESTART:
+       case EINTR:
                error = pollout(pl, fds, nfds);
                if (error == 0)
                        error = EINTR;
@@ -1110,9 +1106,235 @@ done:
 bad:
        if (pl != pfds)
                free(pl, M_TEMP, sz);
+
+       kqpoll_done(nfds);
+
        return (error);
 }
 
+int
+ppollregister_evts(struct proc *p, struct kevent *kevp, int nkev,
+    struct pollfd *pl, unsigned int pollid)
+{
+       int i, error, nevents = 0;
+
+       KASSERT(pl->revents == 0);
+
+#ifdef KTRACE
+       if (KTRPOINT(p, KTR_STRUCT))
+               ktrevent(p, kevp, nkev);
+#endif
+       for (i = 0; i < nkev; i++, kevp++) {
+again:
+               error = kqueue_register(p->p_kq, kevp, pollid, p);
+               switch (error) {
+               case 0:
+                       nevents++;
+                       break;
+               case EOPNOTSUPP:/* No underlying kqfilter */
+               case EINVAL:    /* Unimplemented filter */
+                       break;
+               case EBADF:     /* Bad file descriptor */
+                       pl->revents |= POLLNVAL;
+                       break;
+               case EPERM:     /* Specific to FIFO */
+                       KASSERT(kevp->filter == EVFILT_WRITE);
+                       if (nkev == 1) {
+                               /*
+                                * If this is the only filter make sure
+                                * POLLHUP is passed to userland.
+                                */
+                               kevp->filter = EVFILT_EXCEPT;
+                               goto again;
+                       }
+                       break;
+               case EPIPE:     /* Specific to pipes */
+                       KASSERT(kevp->filter == EVFILT_WRITE);
+                       pl->revents |= POLLHUP;
+                       break;
+               default:
+                       DPRINTFN(0, "poll err %lu fd %d revents %02x serial"
+                           " %lu filt %d ERROR=%d\n",
+                           ((unsigned long)kevp->udata - p->p_kq_serial),
+                           pl->fd, pl->revents, p->p_kq_serial, kevp->filter,
+                           error);
+                       /* FALLTHROUGH */
+               case ENXIO:     /* Device has been detached */
+                       pl->revents |= POLLERR;
+                       break;
+               }
+       }
+
+       return (nevents);
+}
+
+/*
+ * Convert pollfd into kqueue events and register them on the
+ * per-thread queue.
+ *
+ * At most 3 events can correspond to a single pollfd.
+ */
+void
+ppollregister(struct proc *p, struct pollfd *pl, int nfds, int *nregistered,
+    int *ncollected)
+{
+       int i, nkev, nevt, forcehup;
+       struct kevent kev[3], *kevp;
+
+       for (i = 0; i < nfds; i++) {
+               pl[i].events &= ~POLL_NOHUP;
+               pl[i].revents = 0;
+
+               if (pl[i].fd < 0)
+                       continue;
+
+               /*
+                * POLLHUP checking is implicit in the event filters.
+                * However, the checking must be even if no events are
+                * requested.
+                */
+               forcehup = ((pl[i].events & ~POLLHUP) == 0);
+
+               DPRINTFN(1, "poll set %d/%d fd %d events %02x serial %lu\n",
+                   i+1, nfds, pl[i].fd, pl[i].events, p->p_kq_serial);
+
+               nevt = 0;
+               nkev = 0;
+               kevp = kev;
+               if (pl[i].events & (POLLIN | POLLRDNORM)) {
+                       EV_SET(kevp, pl[i].fd, EVFILT_READ,
+                           EV_ADD|EV_ENABLE|__EV_POLL, 0, 0,
+                           (void *)(p->p_kq_serial + i));
+                       nkev++;
+                       kevp++;
+               }
+               if (pl[i].events & (POLLOUT | POLLWRNORM)) {
+                       EV_SET(kevp, pl[i].fd, EVFILT_WRITE,
+                           EV_ADD|EV_ENABLE|__EV_POLL, 0, 0,
+                           (void *)(p->p_kq_serial + i));
+                       nkev++;
+                       kevp++;
+               }
+               if ((pl[i].events & (POLLPRI | POLLRDBAND)) || forcehup) {
+                       int evff = forcehup ? 0 : NOTE_OOB;
+
+                       EV_SET(kevp, pl[i].fd, EVFILT_EXCEPT,
+                           EV_ADD|EV_ENABLE|__EV_POLL, evff, 0,
+                           (void *)(p->p_kq_serial + i));
+                       nkev++;
+                       kevp++;
+               }
+
+               if (nkev == 0)
+                       continue;
+
+               *nregistered += ppollregister_evts(p, kev, nkev, &pl[i], i);
+
+               if (pl[i].revents != 0)
+                       (*ncollected)++;
+       }
+
+       DPRINTFN(1, "poll registered = %d, collected = %d\n", *nregistered,
+           *ncollected);
+}
+
+/*
+ * Convert given kqueue event into corresponding poll(2) revents bit.
+ */
+int
+ppollcollect(struct proc *p, struct kevent *kevp, struct pollfd *pl, u_int 
nfds)
+{
+       static struct timeval poll_errintvl = { 5, 0 };
+       static struct timeval poll_lasterr;
+       int already_seen;
+       unsigned long i;
+
+       /*  Extract poll array index */
+       i = (unsigned long)kevp->udata - p->p_kq_serial;
+
+       if (i >= nfds) {
+               panic("%s: spurious kevp %p nfds %u udata 0x%lx serial 0x%lx",
+                   __func__, kevp, nfds,
+                   (unsigned long)kevp->udata, p->p_kq_serial);
+       }
+       if ((int)kevp->ident != pl[i].fd) {
+               panic("%s: kevp %p %lu/%d mismatch fd %d!=%d serial 0x%lx",
+                   __func__, kevp, i + 1, nfds, (int)kevp->ident, pl[i].fd,
+                   p->p_kq_serial);
+       }
+
+       /*
+        * A given descriptor may already have generated an error
+        * against another filter during kqueue_register().
+        *
+        * Make sure to set the appropriate flags but do not
+        * increment `*retval' more than once.
+        */
+       already_seen = (pl[i].revents != 0);
+
+       /* POLLNVAL preempts other events. */
+       if ((kevp->flags & EV_ERROR) && kevp->data == EBADF) {
+               pl[i].revents = POLLNVAL;
+               goto done;
+       } else if (pl[i].revents & POLLNVAL) {
+               goto done;
+       }
+
+       switch (kevp->filter) {
+       case EVFILT_READ:
+               if (kevp->flags & __EV_HUP)
+                       pl[i].revents |= POLLHUP;
+               if (pl[i].events & (POLLIN | POLLRDNORM))
+                       pl[i].revents |= pl[i].events & (POLLIN | POLLRDNORM);
+               break;
+       case EVFILT_WRITE:
+               /* POLLHUP and POLLOUT/POLLWRNORM are mutually exclusive */
+               if (kevp->flags & __EV_HUP) {
+                       pl[i].revents |= POLLHUP;
+               } else if (pl[i].events & (POLLOUT | POLLWRNORM)) {
+                       pl[i].revents |= pl[i].events & (POLLOUT | POLLWRNORM);
+               }
+               break;
+       case EVFILT_EXCEPT:
+               if (kevp->flags & __EV_HUP) {
+                       if (pl[i].events != 0 && pl[i].events != POLLOUT)
+                               DPRINTFN(0, "weird events %x\n", pl[i].events);
+                       pl[i].revents |= POLLHUP;
+                       break;
+               }
+               if (pl[i].events & (POLLPRI | POLLRDBAND))
+                       pl[i].revents |= pl[i].events & (POLLPRI | POLLRDBAND);
+               break;
+       default:
+               KASSERT(0);
+       }
+
+done:
+       DPRINTFN(1, "poll get %lu/%d fd %d revents %02x serial %lu filt %d\n",
+           i+1, nfds, pl[i].fd, pl[i].revents, (unsigned long)kevp->udata,
+           kevp->filter);
+
+       /*
+        * Make noise about unclaimed events as they might indicate a bug
+        * and can result in spurious-looking wakeups of poll(2).
+        *
+        * Live-locking within the system call should not happen because
+        * the scan loop in doppoll() has an upper limit for the number
+        * of events to process.
+        */
+       if (pl[i].revents == 0 && ratecheck(&poll_lasterr, &poll_errintvl)) {
+               printf("%s[%d]: poll index %lu fd %d events 0x%x "
+                   "filter %d/0x%x unclaimed\n",
+                   p->p_p->ps_comm, p->p_tid, i, pl[i].fd,
+                   pl[i].events, kevp->filter, kevp->flags);
+       }
+
+       if (!already_seen && (pl[i].revents != 0))
+               return (1);
+
+       return (0);
+}
+
 /*
  * utrace system call
  */
Index: sys/event.h
===================================================================
RCS file: src/sys/sys/event.h,v
retrieving revision 1.61
diff -u -p -r1.61 event.h
--- sys/event.h 11 Dec 2021 09:28:26 -0000      1.61
+++ sys/event.h 25 Dec 2021 13:09:06 -0000
@@ -246,6 +246,8 @@ struct knote {
        } kn_ptr;
        const struct            filterops *kn_fop;
        void                    *kn_hook;       /* [o] */
+       unsigned int            kn_pollid;      /* [I] */
+
 #define KN_ACTIVE      0x0001                  /* event has been triggered */
 #define KN_QUEUED      0x0002                  /* event is on queue */
 #define KN_DISABLED    0x0004                  /* event is disabled */
@@ -296,8 +298,8 @@ extern void knote_modify(const struct ke
 extern void    knote_submit(struct knote *, struct kevent *);
 extern void    kqueue_init(void);
 extern void    kqueue_init_percpu(void);
-extern int     kqueue_register(struct kqueue *kq,
-                   struct kevent *kev, struct proc *p);
+extern int     kqueue_register(struct kqueue *kq, struct kevent *kev,
+                   unsigned int pollid, struct proc *p);
 extern int     kqueue_scan(struct kqueue_scan_state *, int, struct kevent *,
                    struct timespec *, struct proc *, int *);
 extern void    kqueue_scan_setup(struct kqueue_scan_state *, struct kqueue *);
Index: sys/eventvar.h
===================================================================
RCS file: src/sys/sys/eventvar.h,v
retrieving revision 1.12
diff -u -p -r1.12 eventvar.h
--- sys/eventvar.h      10 Jun 2021 15:10:56 -0000      1.12
+++ sys/eventvar.h      25 Dec 2021 13:09:06 -0000
@@ -53,6 +53,8 @@ struct kqueue {
 
        LIST_ENTRY(kqueue) kq_next;
 
+       u_int           kq_nknotes;             /* [q] # of registered knotes */
+
        int             kq_knlistsize;          /* [q] size of kq_knlist */
        struct          knlist *kq_knlist;      /* [q] list of
                                                 *     attached knotes */

Reply via email to