On Sat, Apr 28, 2018 at 07:40:38PM +0300, Paul Irofti wrote:
> On Sun, Apr 22, 2018 at 03:34:45PM +0300, Paul Irofti wrote:
> > Hi,
> > 
> > Here is a new semaphore implementation that uses atomic operations,
> > where available, and futexes for locking. 
> > 
> > The reason we need this is to make semaphores safe for asynchronous
> > signals.
> > 
> > 
> > All posixsuite tests (semaphore and sigaction) pass with this.
> > They do not with our current implementation.  Unfortunately I can not
> > get our sem_timedwait regression test to work.
> > 
> >   regress/lib/libpthread/semaphore/sem_timedwait
> > 
> > My investigation so far suggests that the current implementation is
> > flawed because it does not respect ERESTART and treats EINTR as if it
> > would be equivalent to EAGAIN. The POSIX standard and other
> > implementations disagree with that: ERESTART should restart the
> > semaphore waiting and EINTR should exit the call. The above regression
> > test relies on our current EINTR abuse and I think that is why it fails.
> > I added a few helpful printfs to that test in my diff.
> > 
> > I hope future discussions at the Nantes hackathon will clarify this
> > issue.
> > 
> > 
> > Otherwise I have been running with this implementation for a couple of
> > weeks. LaTeX, octave, chrome, firefox, thunderbird, vim, mutt, vlc,
> > mplayer etc. run just fine.
> > 
> > I would like to get wider testing to see if there are any defects left
> > in the current version. 
> > 
> > 
> > I have also added all the changes in a fork on github.
> > 
> >   https://github.com/bulibuta/openbsd-src/tree/sem_atomicfutex
> > 
> > 
> > Please test and get back to me if you see any issues.
> > 
> > Thank you,
> > Paul
> 
> Here is the same diff adapted to what happened in -current this week.
> All required bits are now in, so the current diff neatly contains just the
> implementation.

People started testing this, thank you!

Here is an updated diff that:

  Add barriers, debug printfs and handle EAGAIN.

  The barriers bit is mostly from visa@, thanks!

  tb@ found offlineimap faulting because the futex syscall returned EAGAIN
  and sem_wait exited. Loop again on EAGAIN.

  Debug printfs are for future debugging.

Martin, is handling EAGAIN like this correct?

diff --git lib/librthread/Makefile lib/librthread/Makefile
index 4c3e127491d..5dfb140290e 100644
--- lib/librthread/Makefile
+++ lib/librthread/Makefile
@@ -30,12 +30,19 @@ SRCS=       rthread.c \
        rthread_rwlock.c \
        rthread_rwlockattr.c \
        rthread_sched.c \
-       rthread_sem.c \
        rthread_sig.c \
        rthread_stack.c \
        rthread_spin_lock.c \
        sched_prio.c
 
+# Architectures that implement atomics
+.if ${MACHINE_ARCH} == "amd64" || ${MACHINE_ARCH} == "i386" || \
+    ${MACHINE_ARCH} == "mips64" || ${MACHINE_ARCH} == "mips64el"
+SRCS+= rthread_sem_atomic.c
+.else
+SRCS+= rthread_sem.c
+.endif
+
 SRCDIR= ${.CURDIR}/../libpthread
 .include "${SRCDIR}/man/Makefile.inc"
 .include <bsd.lib.mk>
diff --git lib/librthread/rthread_sem_atomic.c 
lib/librthread/rthread_sem_atomic.c
new file mode 100644
index 00000000000..e5c8015d27c
--- /dev/null
+++ lib/librthread/rthread_sem_atomic.c
@@ -0,0 +1,445 @@
+/*     $OpenBSD$ */
+/*
+ * Copyright (c) 2004,2005,2013 Ted Unangst <t...@openbsd.org>
+ * Copyright (c) 2018 Paul Irofti <piro...@openbsd.org>
+ * All Rights Reserved.
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <sys/types.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <sys/atomic.h>
+#include <sys/time.h>
+#include <sys/futex.h>
+
+#include <errno.h>
+#include <fcntl.h>
+#include <sha2.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <pthread.h>
+
+#include "rthread.h"
+#include "cancel.h"            /* in libc/include */
+#include "synch.h"
+
+#ifdef SEM_ATOMIC_DEBUG
+#define DPRINTF(x)     printf x
+#else
+#define DPRINTF(x)
+#endif
+
+#define SHARED_IDENT ((void *)-1)
+
+/* SHA256_DIGEST_STRING_LENGTH includes nul */
+/* "/tmp/" + sha256 + ".sem" */
+#define SEM_PATH_SIZE (5 + SHA256_DIGEST_STRING_LENGTH + 4)
+
+/* long enough to be hard to guess */
+#define SEM_RANDOM_NAME_LEN    10
+
+/*
+ * Size of memory to be mmap()'ed by named semaphores.
+ * Should be >= SEM_PATH_SIZE and page-aligned.
+ */
+#define SEM_MMAP_SIZE  _thread_pagesize
+
+/*
+ * Internal implementation of semaphores
+ */
+int
+_sem_wait(sem_t sem, int can_eintr, const struct timespec *abstime,
+    int *delayed_cancel)
+{
+       int r = 0;
+       int v = sem->value, ov;
+
+       for (;;) {
+               while ((v = sem->value) > 0) {
+                       ov = atomic_cas_uint(&sem->value, v, v - 1);
+                       if (ov == v) {
+                               membar_enter_after_atomic();
+                               return 0;
+                       }
+               }
+               if (r)
+                       break;
+
+               atomic_inc_int(&sem->waitcount);
+               r = _twait(&sem->value, 0, CLOCK_REALTIME, abstime);
+               /* ignore interruptions other than cancelation */
+               if ((r == ECANCELED && *delayed_cancel == 0) ||
+                   (r == EINTR && !can_eintr) || r == EAGAIN)
+                       r = 0;
+               atomic_dec_int(&sem->waitcount);
+       }
+
+       return r;
+}
+
+/* always increment count */
+int
+_sem_post(sem_t sem)
+{
+       membar_exit_before_atomic();
+       atomic_inc_int(&sem->value);
+       _wake(&sem->value, 1);
+       return 0;
+}
+
+/*
+ * exported semaphores
+ */
+int
+sem_init(sem_t *semp, int pshared, unsigned int value)
+{
+       sem_t sem;
+
+       if (value > SEM_VALUE_MAX) {
+               errno = EINVAL;
+               return (-1);
+       }
+
+       if (pshared) {
+               errno = EPERM;
+               return (-1);
+#ifdef notyet
+               char name[SEM_RANDOM_NAME_LEN];
+               sem_t *sempshared;
+               int i;
+
+               for (;;) {
+                       for (i = 0; i < SEM_RANDOM_NAME_LEN - 1; i++)
+                               name[i] = arc4random_uniform(255) + 1;
+                       name[SEM_RANDOM_NAME_LEN - 1] = '\0';
+                       sempshared = sem_open(name, O_CREAT | O_EXCL, 0, value);
+                       if (sempshared != SEM_FAILED)
+                               break;
+                       if (errno == EEXIST)
+                               continue;
+                       if (errno != EPERM)
+                               errno = ENOSPC;
+                       return (-1);
+               }
+
+               /* unnamed semaphore should not be opened twice */
+               if (sem_unlink(name) == -1) {
+                       sem_close(sempshared);
+                       errno = ENOSPC;
+                       return (-1);
+               }
+
+               *semp = *sempshared;
+               free(sempshared);
+               return (0);
+#endif
+       }
+
+       sem = calloc(1, sizeof(*sem));
+       if (!sem) {
+               errno = ENOSPC;
+               return (-1);
+       }
+       sem->lock = _SPINLOCK_UNLOCKED;
+       sem->value = value;
+       *semp = sem;
+
+       return (0);
+}
+
+int
+sem_destroy(sem_t *semp)
+{
+       sem_t sem;
+
+       if (!_threads_ready)             /* for SEM_MMAP_SIZE */
+               _rthread_init();
+
+       if (!semp || !(sem = *semp)) {
+               errno = EINVAL;
+               return (-1);
+       }
+
+       if (sem->waitcount) {
+#define MSG "sem_destroy on semaphore with waiters!\n"
+               write(2, MSG, sizeof(MSG) - 1);
+#undef MSG
+               errno = EBUSY;
+               return (-1);
+       }
+
+       *semp = NULL;
+       if (sem->shared)
+               munmap(sem, SEM_MMAP_SIZE);
+       else
+               free(sem);
+
+       return (0);
+}
+
+int
+sem_getvalue(sem_t *semp, int *sval)
+{
+       sem_t sem;
+
+       if (!semp || !(sem = *semp)) {
+               errno = EINVAL;
+               return (-1);
+       }
+
+       //membar_exit_before_atomic();
+       //*sval = atomic_add_int_nv(&sem->value, 0);
+       *sval = sem->value;
+
+       return (0);
+}
+
+int
+sem_post(sem_t *semp)
+{
+       sem_t sem;
+
+       if (!semp || !(sem = *semp)) {
+               errno = EINVAL;
+               return (-1);
+       }
+
+       _sem_post(sem);
+
+       return (0);
+}
+
+int
+sem_wait(sem_t *semp)
+{
+       struct tib *tib = TIB_GET();
+       pthread_t self;
+       sem_t sem;
+       int r;
+       PREP_CANCEL_POINT(tib);
+
+       if (!_threads_ready)
+               _rthread_init();
+       self = tib->tib_thread;
+
+       if (!semp || !(sem = *semp)) {
+               errno = EINVAL;
+               return (-1);
+       }
+
+       ENTER_DELAYED_CANCEL_POINT(tib, self);
+       r = _sem_wait(sem, 1, NULL, &self->delayed_cancel);
+       LEAVE_CANCEL_POINT_INNER(tib, r);
+
+       if (r) {
+               errno = r;
+#ifdef SEM_ATOMIC_DEBUG
+               sem_getvalue(&sem, &r);
+               DPRINTF(("%s: v=%d errno=%d\n", __func__, r, errno));
+#endif
+               return (-1);
+       }
+
+       return (0);
+}
+
+int
+sem_timedwait(sem_t *semp, const struct timespec *abstime)
+{
+       struct tib *tib = TIB_GET();
+       pthread_t self;
+       sem_t sem;
+       int r;
+       PREP_CANCEL_POINT(tib);
+
+       if (!semp || !(sem = *semp) || abstime == NULL ||
+          abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000) {
+               errno = EINVAL;
+               return (-1);
+       }
+
+       if (!_threads_ready)
+               _rthread_init();
+       self = tib->tib_thread;
+
+       ENTER_DELAYED_CANCEL_POINT(tib, self);
+       r = _sem_wait(sem, 1, abstime, &self->delayed_cancel);
+       LEAVE_CANCEL_POINT_INNER(tib, r);
+
+       if (r) {
+               errno = r == EWOULDBLOCK ? ETIMEDOUT : r;
+#ifdef SEM_ATOMIC_DEBUG
+               sem_getvalue(&sem, &r);
+               DPRINTF(("%s: v=%d errno=%d\n", __func__, r, errno));
+#endif
+               return (-1);
+       }
+
+       return (0);
+}
+
+int
+sem_trywait(sem_t *semp)
+{
+       sem_t sem;
+       int v, ov;
+
+       if (!semp || !(sem = *semp)) {
+               errno = EINVAL;
+               return (-1);
+       }
+
+       while ((v = sem->value) > 0) {
+               ov = atomic_cas_uint(&sem->value, v, v - 1);
+               if (ov == v) {
+                       membar_enter_after_atomic();
+                       return (0);
+               }
+       }
+
+       errno = EAGAIN;
+#ifdef SEM_ATOMIC_DEBUG
+       sem_getvalue(&sem, &v);
+       DPRINTF(("%s: v=%d errno=%d\n", __func__, v, errno));
+#endif
+       return (-1);
+}
+
+
+static void
+makesempath(const char *origpath, char *sempath, size_t len)
+{
+       char buf[SHA256_DIGEST_STRING_LENGTH];
+
+       SHA256Data(origpath, strlen(origpath), buf);
+       snprintf(sempath, len, "/tmp/%s.sem", buf);
+}
+
+sem_t *
+sem_open(const char *name, int oflag, ...)
+{
+       char sempath[SEM_PATH_SIZE];
+       struct stat sb;
+       sem_t sem, *semp;
+       unsigned int value = 0;
+       int created = 0, fd;
+
+       if (!_threads_ready)
+               _rthread_init();
+
+       if (oflag & ~(O_CREAT | O_EXCL)) {
+               errno = EINVAL;
+               return (SEM_FAILED);
+       }
+
+       if (oflag & O_CREAT) {
+               va_list ap;
+               va_start(ap, oflag);
+               /* 3rd parameter mode is not used */
+               va_arg(ap, mode_t);
+               value = va_arg(ap, unsigned);
+               va_end(ap);
+
+               if (value > SEM_VALUE_MAX) {
+                       errno = EINVAL;
+                       return (SEM_FAILED);
+               }
+       }
+
+       makesempath(name, sempath, sizeof(sempath));
+       fd = open(sempath, O_RDWR | O_NOFOLLOW | oflag, 0600);
+       if (fd == -1)
+               return (SEM_FAILED);
+       if (fstat(fd, &sb) == -1 || !S_ISREG(sb.st_mode)) {
+               close(fd);
+               errno = EINVAL;
+               return (SEM_FAILED);
+       }
+       if (sb.st_uid != geteuid()) {
+               close(fd);
+               errno = EPERM;
+               return (SEM_FAILED);
+       }
+       if (sb.st_size != (off_t)SEM_MMAP_SIZE) {
+               if (!(oflag & O_CREAT)) {
+                       close(fd);
+                       errno = EINVAL;
+                       return (SEM_FAILED);
+               }
+               if (sb.st_size != 0) {
+                       close(fd);
+                       errno = EINVAL;
+                       return (SEM_FAILED);
+               }
+               if (ftruncate(fd, SEM_MMAP_SIZE) == -1) {
+                       close(fd);
+                       errno = EINVAL;
+                       return (SEM_FAILED);
+               }
+
+               created = 1;
+       }
+       sem = mmap(NULL, SEM_MMAP_SIZE, PROT_READ | PROT_WRITE,
+           MAP_SHARED, fd, 0);
+       close(fd);
+       if (sem == MAP_FAILED) {
+               errno = EINVAL;
+               return (SEM_FAILED);
+       }
+       semp = malloc(sizeof(*semp));
+       if (!semp) {
+               munmap(sem, SEM_MMAP_SIZE);
+               errno = ENOSPC;
+               return (SEM_FAILED);
+       }
+       if (created) {
+               sem->lock = _SPINLOCK_UNLOCKED;
+               sem->value = value;
+               sem->shared = 1;
+       }
+       *semp = sem;
+
+       return (semp);
+}
+
+int
+sem_close(sem_t *semp)
+{
+       sem_t sem;
+
+       if (!semp || !(sem = *semp) || !sem->shared) {
+               errno = EINVAL;
+               return (-1);
+       }
+
+       *semp = NULL;
+       munmap(sem, SEM_MMAP_SIZE);
+       free(semp);
+
+       return (0);
+}
+
+int
+sem_unlink(const char *name)
+{
+       char sempath[SEM_PATH_SIZE];
+
+       makesempath(name, sempath, sizeof(sempath));
+       return (unlink(sempath));
+}
diff --git lib/librthread/synch.h lib/librthread/synch.h
new file mode 100644
index 00000000000..8ab379530e8
--- /dev/null
+++ lib/librthread/synch.h
@@ -0,0 +1,61 @@
+/*     $OpenBSD: synch.h,v 1.2 2017/09/05 02:40:54 guenther Exp $ */
+/*
+ * Copyright (c) 2017 Martin Pieuchot
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <sys/atomic.h>
+#include <sys/time.h>
+#include <sys/futex.h>
+
+static inline int
+_wake(volatile uint32_t *p, int n)
+{
+       return futex(p, FUTEX_WAKE, n, NULL, NULL);
+}
+
+static inline void
+_wait(volatile uint32_t *p, int val)
+{
+       while (*p != (uint32_t)val)
+               futex(p, FUTEX_WAIT, val, NULL, NULL);
+}
+
+static inline int
+_twait(volatile uint32_t *p, int val, clockid_t clockid, const struct timespec 
*abs)
+{
+       struct timespec rel;
+
+       if (abs == NULL)
+               return futex(p, FUTEX_WAIT, val, NULL, NULL);
+
+       if (abs->tv_nsec >= 1000000000 || clock_gettime(clockid, &rel))
+               return (EINVAL);
+
+       rel.tv_sec = abs->tv_sec - rel.tv_sec;
+       if ((rel.tv_nsec = abs->tv_nsec - rel.tv_nsec) < 0) {
+               rel.tv_sec--;
+               rel.tv_nsec += 1000000000;
+       }
+       if (rel.tv_sec < 0)
+               return (ETIMEDOUT);
+
+       return futex(p, FUTEX_WAIT, val, &rel, NULL);
+}
+
+static inline int
+_requeue(volatile uint32_t *p, int n, int m, volatile uint32_t *q)
+{
+       return futex(p, FUTEX_REQUEUE, n, (void *)(long)m, q);
+}

Reply via email to