On Tue, Nov 30, 2021 at 03:11:16PM +0000, Visa Hankala wrote:
> Here is a revised version of kqueue-based poll(2).
>
> The two most important changes to the previous version are:
>
> * Properly handle pollfd arrays where more than one entries refer
> to the same file descriptor.
>
> * Fix poll(2)'s return value.
>
> The first change would be somewhat tricky with plain kqueue since
> an fd-based knote is identified by a (filter, fd) pair. The poll(2)
> translation layer could manage with this by using an auxiliary array
> to map overlapping pollfd entries into a single knote. However, most
> users of poll(2) appear to avoid fd overlaps. So the mapping would be
> a hindrance in the typical case. Also, overlaps might suggest
> suboptimal I/O patterns in the userspace software.
>
> The patch handles fd overlaps by adding a pollid field to the knote
> identifier. This allows the co-existence of multiple knotes with the
> same filter and fd. Of course, the extra identifier is not totally free
> either. Nevertheless, it looks to solve the problem relatively nicely.
>
> I have shuffled the code a bit so that the register and collect
> routines are next to each other.
>
> In addition, the collect routine now warns if kqueue_scan() returns
> an event that does not activate a pollfd entry. Such a situation is
> likely the result of a bug somewhere.
>
> Please test. Once this way of implementing poll(2) has proven reliable
> enough, it should be possible to replace selwakeup() with kernel lock
> free KNOTE() in subsystems that are sufficiently MP-safe.
Here is a revised version of the patch.
A number of fixes to event filter routines have already been committed.
Changes to the previous version:
* Prevent excessive use of kernel memory with poll(2). Now the code
follows how many knotes are registered. If the number looks high,
kqpoll_done() removes knotes eagerly. The condition could include
`num' as well, but I think the heuristic is good enough already.
* Set forcehup anew on each iteration in ppollregister().
Please test.
Index: kern/kern_event.c
===================================================================
RCS file: src/sys/kern/kern_event.c,v
retrieving revision 1.178
diff -u -p -r1.178 kern_event.c
--- kern/kern_event.c 25 Dec 2021 11:04:58 -0000 1.178
+++ kern/kern_event.c 25 Dec 2021 13:09:06 -0000
@@ -221,6 +221,7 @@ KQRELE(struct kqueue *kq)
}
KASSERT(TAILQ_EMPTY(&kq->kq_head));
+ KASSERT(kq->kq_nknotes == 0);
free(kq->kq_knlist, M_KEVENT, kq->kq_knlistsize *
sizeof(struct knlist));
@@ -451,7 +452,7 @@ filt_proc(struct knote *kn, long hint)
kev.fflags = kn->kn_sfflags;
kev.data = kn->kn_id; /* parent */
kev.udata = kn->kn_udata; /* preserve udata */
- error = kqueue_register(kq, &kev, NULL);
+ error = kqueue_register(kq, &kev, 0, NULL);
if (error)
kn->kn_fflags |= NOTE_TRACKERR;
}
@@ -814,11 +815,15 @@ void
kqpoll_done(unsigned int num)
{
struct proc *p = curproc;
+ struct kqueue *kq = p->p_kq;
KASSERT(p->p_kq != NULL);
KASSERT(p->p_kq_serial + num >= p->p_kq_serial);
p->p_kq_serial += num;
+
+ if (kq->kq_nknotes > 4 * kq->kq_knlistsize)
+ kqueue_purge(p, kq);
}
void
@@ -944,7 +949,7 @@ sys_kevent(struct proc *p, void *v, regi
for (i = 0; i < n; i++) {
kevp = &kev[i];
kevp->flags &= ~EV_SYSFLAGS;
- error = kqueue_register(kq, kevp, p);
+ error = kqueue_register(kq, kevp, 0, p);
if (error || (kevp->flags & EV_RECEIPT)) {
if (SCARG(uap, nevents) != 0) {
kevp->flags = EV_ERROR;
@@ -1040,7 +1045,8 @@ bad:
#endif
int
-kqueue_register(struct kqueue *kq, struct kevent *kev, struct proc *p)
+kqueue_register(struct kqueue *kq, struct kevent *kev, unsigned int pollid,
+ struct proc *p)
{
struct filedesc *fdp = kq->kq_fdp;
const struct filterops *fops = NULL;
@@ -1049,6 +1055,8 @@ kqueue_register(struct kqueue *kq, struc
struct knlist *list = NULL;
int active, error = 0;
+ KASSERT(pollid == 0 || (p != NULL && p->p_kq == kq));
+
if (kev->filter < 0) {
if (kev->filter + EVFILT_SYSCOUNT < 0)
return (EINVAL);
@@ -1096,7 +1104,8 @@ again:
if (list != NULL) {
SLIST_FOREACH(kn, list, kn_link) {
if (kev->filter == kn->kn_filter &&
- kev->ident == kn->kn_id) {
+ kev->ident == kn->kn_id &&
+ pollid == kn->kn_pollid) {
if (!knote_acquire(kn, NULL, 0)) {
/* knote_acquire() has released
* kq_lock. */
@@ -1141,6 +1150,7 @@ again:
kev->fflags = 0;
kev->data = 0;
kn->kn_kevent = *kev;
+ kn->kn_pollid = pollid;
knote_attach(kn);
mtx_leave(&kq->kq_lock);
@@ -1905,6 +1915,7 @@ knote_attach(struct knote *kn)
list = &kq->kq_knhash[KN_HASH(kn->kn_id, kq->kq_knhashmask)];
}
SLIST_INSERT_HEAD(list, kn, kn_link);
+ kq->kq_nknotes++;
}
void
@@ -1916,6 +1927,7 @@ knote_detach(struct knote *kn)
MUTEX_ASSERT_LOCKED(&kq->kq_lock);
KASSERT(kn->kn_status & KN_PROCESSING);
+ kq->kq_nknotes--;
if (kn->kn_fop->f_flags & FILTEROP_ISFD)
list = &kq->kq_knlist[kn->kn_id];
else
Index: kern/sys_generic.c
===================================================================
RCS file: src/sys/kern/sys_generic.c,v
retrieving revision 1.146
diff -u -p -r1.146 sys_generic.c
--- kern/sys_generic.c 11 Dec 2021 09:28:26 -0000 1.146
+++ kern/sys_generic.c 25 Dec 2021 13:09:06 -0000
@@ -81,8 +81,9 @@ int kqpoll_debug = 0;
int pselregister(struct proc *, fd_set *[], fd_set *[], int, int *, int *);
int pselcollect(struct proc *, struct kevent *, fd_set *[], int *);
+void ppollregister(struct proc *, struct pollfd *, int, int *, int *);
+int ppollcollect(struct proc *, struct kevent *, struct pollfd *, u_int);
-void pollscan(struct proc *, struct pollfd *, u_int, register_t *);
int pollout(struct pollfd *, struct pollfd *, u_int);
int dopselect(struct proc *, int, fd_set *, fd_set *, fd_set *,
struct timespec *, const sigset_t *, register_t *);
@@ -768,7 +769,7 @@ pselregister(struct proc *p, fd_set *pib
if (KTRPOINT(p, KTR_STRUCT))
ktrevent(p, &kev, 1);
#endif
- error = kqueue_register(p->p_kq, &kev, p);
+ error = kqueue_register(p->p_kq, &kev, 0, p);
switch (error) {
case 0:
nevents++;
@@ -910,33 +911,6 @@ doselwakeup(struct selinfo *sip)
}
}
-void
-pollscan(struct proc *p, struct pollfd *pl, u_int nfd, register_t *retval)
-{
- struct filedesc *fdp = p->p_fd;
- struct file *fp;
- u_int i;
- int n = 0;
-
- for (i = 0; i < nfd; i++, pl++) {
- /* Check the file descriptor. */
- if (pl->fd < 0) {
- pl->revents = 0;
- continue;
- }
- if ((fp = fd_getfile(fdp, pl->fd)) == NULL) {
- pl->revents = POLLNVAL;
- n++;
- continue;
- }
- pl->revents = (*fp->f_ops->fo_poll)(fp, pl->events, p);
- FRELE(fp, p);
- if (pl->revents != 0)
- n++;
- }
- *retval = n;
-}
-
/*
* Only copyout the revents field.
*/
@@ -1024,11 +998,11 @@ int
doppoll(struct proc *p, struct pollfd *fds, u_int nfds,
struct timespec *timeout, const sigset_t *sigmask, register_t *retval)
{
- size_t sz;
+ struct kqueue_scan_state scan;
+ struct timespec zerots = {};
struct pollfd pfds[4], *pl = pfds;
- struct timespec elapsed, start, stop;
- uint64_t nsecs;
- int ncoll, i, s, error;
+ int error, ncollected = 0, nevents = 0;
+ size_t sz;
/* Standards say no more than MAX_OPEN; this is possibly better. */
if (nfds > min((int)lim_cur(RLIMIT_NOFILE), maxfiles))
@@ -1042,58 +1016,80 @@ doppoll(struct proc *p, struct pollfd *f
return (EINVAL);
}
+ kqpoll_init(nfds);
+
sz = nfds * sizeof(*pl);
if ((error = copyin(fds, pl, sz)) != 0)
goto bad;
- for (i = 0; i < nfds; i++) {
- pl[i].events &= ~POLL_NOHUP;
- pl[i].revents = 0;
- }
-
if (sigmask)
dosigsuspend(p, *sigmask &~ sigcantmask);
-retry:
- ncoll = nselcoll;
- atomic_setbits_int(&p->p_flag, P_SELECT);
- pollscan(p, pl, nfds, retval);
- if (*retval)
- goto done;
- if (timeout == NULL || timespecisset(timeout)) {
- if (timeout != NULL) {
- getnanouptime(&start);
- nsecs = MIN(TIMESPEC_TO_NSEC(timeout), MAXTSLP);
- } else
- nsecs = INFSLP;
- s = splhigh();
- if ((p->p_flag & P_SELECT) == 0 || nselcoll != ncoll) {
- splx(s);
- goto retry;
- }
- atomic_clearbits_int(&p->p_flag, P_SELECT);
- error = tsleep_nsec(&selwait, PSOCK | PCATCH, "poll", nsecs);
- splx(s);
+ /* Register kqueue events */
+ ppollregister(p, pl, nfds, &nevents, &ncollected);
+
+ /*
+ * The poll/select family of syscalls has been designed to
+ * block when file descriptors are not available, even if
+ * there's nothing to wait for.
+ */
+ if (nevents == 0 && ncollected == 0) {
+ uint64_t nsecs = INFSLP;
+
if (timeout != NULL) {
- getnanouptime(&stop);
- timespecsub(&stop, &start, &elapsed);
- timespecsub(timeout, &elapsed, timeout);
- if (timeout->tv_sec < 0)
- timespecclear(timeout);
+ if (!timespecisset(timeout))
+ goto done;
+ nsecs = MAX(1, MIN(TIMESPEC_TO_NSEC(timeout), MAXTSLP));
}
- if (error == 0 || error == EWOULDBLOCK)
- goto retry;
+
+ error = tsleep_nsec(&nowake, PSOCK | PCATCH, "kqpoll", nsecs);
+ if (error == ERESTART)
+ error = EINTR;
+ if (error == EWOULDBLOCK)
+ error = 0;
+ goto done;
}
+ /* Do not block if registering found pending events. */
+ if (ncollected > 0)
+ timeout = &zerots;
+
+ /* Collect at most `nevents' possibly waiting in kqueue_scan() */
+ kqueue_scan_setup(&scan, p->p_kq);
+ while (nevents > 0) {
+ struct kevent kev[KQ_NEVENTS];
+ int i, ready, count;
+
+ /* Maximum number of events per iteration */
+ count = MIN(nitems(kev), nevents);
+ ready = kqueue_scan(&scan, count, kev, timeout, p, &error);
+#ifdef KTRACE
+ if (KTRPOINT(p, KTR_STRUCT))
+ ktrevent(p, kev, ready);
+#endif
+ /* Convert back events that are ready. */
+ for (i = 0; i < ready; i++)
+ ncollected += ppollcollect(p, &kev[i], pl, nfds);
+
+ /*
+ * Stop if there was an error or if we had enough
+ * place to collect all events that were ready.
+ */
+ if (error || ready < count)
+ break;
+
+ nevents -= ready;
+ }
+ kqueue_scan_finish(&scan);
+ *retval = ncollected;
done:
- atomic_clearbits_int(&p->p_flag, P_SELECT);
/*
* NOTE: poll(2) is not restarted after a signal and EWOULDBLOCK is
* ignored (since the whole point is to see what would block).
*/
switch (error) {
- case ERESTART:
+ case EINTR:
error = pollout(pl, fds, nfds);
if (error == 0)
error = EINTR;
@@ -1110,9 +1106,235 @@ done:
bad:
if (pl != pfds)
free(pl, M_TEMP, sz);
+
+ kqpoll_done(nfds);
+
return (error);
}
+int
+ppollregister_evts(struct proc *p, struct kevent *kevp, int nkev,
+ struct pollfd *pl, unsigned int pollid)
+{
+ int i, error, nevents = 0;
+
+ KASSERT(pl->revents == 0);
+
+#ifdef KTRACE
+ if (KTRPOINT(p, KTR_STRUCT))
+ ktrevent(p, kevp, nkev);
+#endif
+ for (i = 0; i < nkev; i++, kevp++) {
+again:
+ error = kqueue_register(p->p_kq, kevp, pollid, p);
+ switch (error) {
+ case 0:
+ nevents++;
+ break;
+ case EOPNOTSUPP:/* No underlying kqfilter */
+ case EINVAL: /* Unimplemented filter */
+ break;
+ case EBADF: /* Bad file descriptor */
+ pl->revents |= POLLNVAL;
+ break;
+ case EPERM: /* Specific to FIFO */
+ KASSERT(kevp->filter == EVFILT_WRITE);
+ if (nkev == 1) {
+ /*
+ * If this is the only filter make sure
+ * POLLHUP is passed to userland.
+ */
+ kevp->filter = EVFILT_EXCEPT;
+ goto again;
+ }
+ break;
+ case EPIPE: /* Specific to pipes */
+ KASSERT(kevp->filter == EVFILT_WRITE);
+ pl->revents |= POLLHUP;
+ break;
+ default:
+ DPRINTFN(0, "poll err %lu fd %d revents %02x serial"
+ " %lu filt %d ERROR=%d\n",
+ ((unsigned long)kevp->udata - p->p_kq_serial),
+ pl->fd, pl->revents, p->p_kq_serial, kevp->filter,
+ error);
+ /* FALLTHROUGH */
+ case ENXIO: /* Device has been detached */
+ pl->revents |= POLLERR;
+ break;
+ }
+ }
+
+ return (nevents);
+}
+
+/*
+ * Convert pollfd into kqueue events and register them on the
+ * per-thread queue.
+ *
+ * At most 3 events can correspond to a single pollfd.
+ */
+void
+ppollregister(struct proc *p, struct pollfd *pl, int nfds, int *nregistered,
+ int *ncollected)
+{
+ int i, nkev, nevt, forcehup;
+ struct kevent kev[3], *kevp;
+
+ for (i = 0; i < nfds; i++) {
+ pl[i].events &= ~POLL_NOHUP;
+ pl[i].revents = 0;
+
+ if (pl[i].fd < 0)
+ continue;
+
+ /*
+ * POLLHUP checking is implicit in the event filters.
+ * However, the checking must be even if no events are
+ * requested.
+ */
+ forcehup = ((pl[i].events & ~POLLHUP) == 0);
+
+ DPRINTFN(1, "poll set %d/%d fd %d events %02x serial %lu\n",
+ i+1, nfds, pl[i].fd, pl[i].events, p->p_kq_serial);
+
+ nevt = 0;
+ nkev = 0;
+ kevp = kev;
+ if (pl[i].events & (POLLIN | POLLRDNORM)) {
+ EV_SET(kevp, pl[i].fd, EVFILT_READ,
+ EV_ADD|EV_ENABLE|__EV_POLL, 0, 0,
+ (void *)(p->p_kq_serial + i));
+ nkev++;
+ kevp++;
+ }
+ if (pl[i].events & (POLLOUT | POLLWRNORM)) {
+ EV_SET(kevp, pl[i].fd, EVFILT_WRITE,
+ EV_ADD|EV_ENABLE|__EV_POLL, 0, 0,
+ (void *)(p->p_kq_serial + i));
+ nkev++;
+ kevp++;
+ }
+ if ((pl[i].events & (POLLPRI | POLLRDBAND)) || forcehup) {
+ int evff = forcehup ? 0 : NOTE_OOB;
+
+ EV_SET(kevp, pl[i].fd, EVFILT_EXCEPT,
+ EV_ADD|EV_ENABLE|__EV_POLL, evff, 0,
+ (void *)(p->p_kq_serial + i));
+ nkev++;
+ kevp++;
+ }
+
+ if (nkev == 0)
+ continue;
+
+ *nregistered += ppollregister_evts(p, kev, nkev, &pl[i], i);
+
+ if (pl[i].revents != 0)
+ (*ncollected)++;
+ }
+
+ DPRINTFN(1, "poll registered = %d, collected = %d\n", *nregistered,
+ *ncollected);
+}
+
+/*
+ * Convert given kqueue event into corresponding poll(2) revents bit.
+ */
+int
+ppollcollect(struct proc *p, struct kevent *kevp, struct pollfd *pl, u_int
nfds)
+{
+ static struct timeval poll_errintvl = { 5, 0 };
+ static struct timeval poll_lasterr;
+ int already_seen;
+ unsigned long i;
+
+ /* Extract poll array index */
+ i = (unsigned long)kevp->udata - p->p_kq_serial;
+
+ if (i >= nfds) {
+ panic("%s: spurious kevp %p nfds %u udata 0x%lx serial 0x%lx",
+ __func__, kevp, nfds,
+ (unsigned long)kevp->udata, p->p_kq_serial);
+ }
+ if ((int)kevp->ident != pl[i].fd) {
+ panic("%s: kevp %p %lu/%d mismatch fd %d!=%d serial 0x%lx",
+ __func__, kevp, i + 1, nfds, (int)kevp->ident, pl[i].fd,
+ p->p_kq_serial);
+ }
+
+ /*
+ * A given descriptor may already have generated an error
+ * against another filter during kqueue_register().
+ *
+ * Make sure to set the appropriate flags but do not
+ * increment `*retval' more than once.
+ */
+ already_seen = (pl[i].revents != 0);
+
+ /* POLLNVAL preempts other events. */
+ if ((kevp->flags & EV_ERROR) && kevp->data == EBADF) {
+ pl[i].revents = POLLNVAL;
+ goto done;
+ } else if (pl[i].revents & POLLNVAL) {
+ goto done;
+ }
+
+ switch (kevp->filter) {
+ case EVFILT_READ:
+ if (kevp->flags & __EV_HUP)
+ pl[i].revents |= POLLHUP;
+ if (pl[i].events & (POLLIN | POLLRDNORM))
+ pl[i].revents |= pl[i].events & (POLLIN | POLLRDNORM);
+ break;
+ case EVFILT_WRITE:
+ /* POLLHUP and POLLOUT/POLLWRNORM are mutually exclusive */
+ if (kevp->flags & __EV_HUP) {
+ pl[i].revents |= POLLHUP;
+ } else if (pl[i].events & (POLLOUT | POLLWRNORM)) {
+ pl[i].revents |= pl[i].events & (POLLOUT | POLLWRNORM);
+ }
+ break;
+ case EVFILT_EXCEPT:
+ if (kevp->flags & __EV_HUP) {
+ if (pl[i].events != 0 && pl[i].events != POLLOUT)
+ DPRINTFN(0, "weird events %x\n", pl[i].events);
+ pl[i].revents |= POLLHUP;
+ break;
+ }
+ if (pl[i].events & (POLLPRI | POLLRDBAND))
+ pl[i].revents |= pl[i].events & (POLLPRI | POLLRDBAND);
+ break;
+ default:
+ KASSERT(0);
+ }
+
+done:
+ DPRINTFN(1, "poll get %lu/%d fd %d revents %02x serial %lu filt %d\n",
+ i+1, nfds, pl[i].fd, pl[i].revents, (unsigned long)kevp->udata,
+ kevp->filter);
+
+ /*
+ * Make noise about unclaimed events as they might indicate a bug
+ * and can result in spurious-looking wakeups of poll(2).
+ *
+ * Live-locking within the system call should not happen because
+ * the scan loop in doppoll() has an upper limit for the number
+ * of events to process.
+ */
+ if (pl[i].revents == 0 && ratecheck(&poll_lasterr, &poll_errintvl)) {
+ printf("%s[%d]: poll index %lu fd %d events 0x%x "
+ "filter %d/0x%x unclaimed\n",
+ p->p_p->ps_comm, p->p_tid, i, pl[i].fd,
+ pl[i].events, kevp->filter, kevp->flags);
+ }
+
+ if (!already_seen && (pl[i].revents != 0))
+ return (1);
+
+ return (0);
+}
+
/*
* utrace system call
*/
Index: sys/event.h
===================================================================
RCS file: src/sys/sys/event.h,v
retrieving revision 1.61
diff -u -p -r1.61 event.h
--- sys/event.h 11 Dec 2021 09:28:26 -0000 1.61
+++ sys/event.h 25 Dec 2021 13:09:06 -0000
@@ -246,6 +246,8 @@ struct knote {
} kn_ptr;
const struct filterops *kn_fop;
void *kn_hook; /* [o] */
+ unsigned int kn_pollid; /* [I] */
+
#define KN_ACTIVE 0x0001 /* event has been triggered */
#define KN_QUEUED 0x0002 /* event is on queue */
#define KN_DISABLED 0x0004 /* event is disabled */
@@ -296,8 +298,8 @@ extern void knote_modify(const struct ke
extern void knote_submit(struct knote *, struct kevent *);
extern void kqueue_init(void);
extern void kqueue_init_percpu(void);
-extern int kqueue_register(struct kqueue *kq,
- struct kevent *kev, struct proc *p);
+extern int kqueue_register(struct kqueue *kq, struct kevent *kev,
+ unsigned int pollid, struct proc *p);
extern int kqueue_scan(struct kqueue_scan_state *, int, struct kevent *,
struct timespec *, struct proc *, int *);
extern void kqueue_scan_setup(struct kqueue_scan_state *, struct kqueue *);
Index: sys/eventvar.h
===================================================================
RCS file: src/sys/sys/eventvar.h,v
retrieving revision 1.12
diff -u -p -r1.12 eventvar.h
--- sys/eventvar.h 10 Jun 2021 15:10:56 -0000 1.12
+++ sys/eventvar.h 25 Dec 2021 13:09:06 -0000
@@ -53,6 +53,8 @@ struct kqueue {
LIST_ENTRY(kqueue) kq_next;
+ u_int kq_nknotes; /* [q] # of registered knotes */
+
int kq_knlistsize; /* [q] size of kq_knlist */
struct knlist *kq_knlist; /* [q] list of
* attached knotes */