From: rean' via OSv Development <osv-dev@googlegroups.com>
Committer: Nadav Har'El <n...@scylladb.com>
Branch: master

pthread_barrier*: Add OSv-specific pthread_barrier_init, wait, destroy + test

- pthread_barrier* patch clean up (address review comments: r1)
- pthread_barrier* patch clean up (address review comments: r2)
- Abridged test output with 7 microsec usleep, 10 threads, 4 rounds
of barrier crossings  (tst-pthread-barrier.so)
- pthread_barrier* patch clean up (address review comments: r3)
- reformat patch per OSv style (r1)
- reformat patch per OSv style (r2)
Special return value PTHREAD_BARRIER_SERIAL_THREAD == -1

[Thread 8] crossed barrier with -1 (round 1)
[Thread 9] crossed barrier with -1 (round 2)
[Thread 9] crossed barrier with -1 (round 3)
[Thread 3] crossed barrier with -1 (round 4)

Signed-off-by: rean <r...@caa.columbia.edu>
Message-Id: <1482401000-9043-1-git-send-email-r...@caa.columbia.edu>

---
diff --git a/include/osv/latch.hh b/include/osv/latch.hh
--- a/include/osv/latch.hh
+++ b/include/osv/latch.hh
@@ -58,6 +58,18 @@ public:
         std::unique_lock<std::mutex> l(_mutex);
return _condvar.wait_for(l, duration, [&] () -> bool { return is_released(); });
     }
+
+    // Useful if latches are being used as a primitive for implementing
+    // pthread_barrier_t so threads can wait on a barrier multiple times
+    // (over multiple rounds). There should be no other threads calling
+    // count_down or await when we're resetting. The caller needs to use
+    // external locking to avoid disaster (having a thread concurrently
+    // in count_down or await and one concurrently in reset). Without
+    // external locking resetting the latch is unsafe.
+    void unsafe_reset(int count)
+    {
+        _count.store(count, std::memory_order_relaxed);
+    }
 };

 class thread_barrier
diff --git a/libc/pthread.cc b/libc/pthread.cc
--- a/libc/pthread.cc
+++ b/libc/pthread.cc
@@ -29,7 +29,7 @@

 #include <api/time.h>
 #include <osv/rwlock.h>
-
+#include <osv/latch.hh>
 #include "pthread.hh"

 namespace pthread_private {
@@ -1128,3 +1128,117 @@ int pthread_attr_getaffinity_np(const pthread_attr_t *attr, size_t cpusetsize,

     return 0;
 }
+
+// Private definitions of the internal structs backing pthread_barrier_t and
+// pthread_barrierattr_t
+typedef struct
+{
+    unsigned int out;
+    unsigned int count;
+    latch *ltch;
+    pthread_mutex_t *mtx;
+} pthread_barrier_t_int;
+
+typedef struct
+{
+    unsigned pshared;
+} pthread_barrierattr_t_int;
+
+int pthread_barrier_init(pthread_barrier_t *barrier_opq,
+                         const pthread_barrierattr_t *attr_opq,
+                         unsigned count)
+{
+    pthread_barrier_t_int *barrier = (pthread_barrier_t_int*) barrier_opq;
+ static_assert(sizeof(pthread_barrier_t_int) <= sizeof(pthread_barrier_t), + "pthread_barrier_t_int is larger than pthread_barrier_t");
+
+    // Linux returns EINVAL if count == 0 or INT_MAX so we do too.
+    // In theory, we could go up to UINT_MAX since count is unsigned.
+    if (!barrier || count == 0 || count >= INT_MAX) {
+        return EINVAL;
+    }
+
+    // Always ignore attr, it has no meaning in the context of a unikernel.
+ // pthread_barrierattr_t has a single member variable pshared that can be set
+    // to PTHREAD_PROCESS_PRIVATE or PTHREAD_PROCESS_SHARED. These have the
+    // same effect in a unikernel - there is only a single process and all
+    // threads can manipulate the memory area associated with the
+ // pthread_barrier_t so it doesn't matter what the value of pshared is set to
+    barrier->count = count;
+    barrier->out = 0;
+    barrier->ltch = new latch(count);
+    barrier->mtx = new pthread_mutex_t;
+    pthread_mutex_init(barrier->mtx, NULL);
+    return 0;
+}
+
+int pthread_barrier_wait(pthread_barrier_t *barrier_opq)
+{
+    pthread_barrier_t_int *barrier = (pthread_barrier_t_int*) barrier_opq;
+ static_assert(sizeof(pthread_barrier_t_int) <= sizeof(pthread_barrier_t), + "pthread_barrier_t_int is larger than pthread_barrier_t");
+
+    if (!barrier || !barrier->ltch || !barrier->mtx) {
+        return EINVAL;
+    }
+
+    int retval = 0;
+    pthread_mutex_t *mtx = barrier->mtx;
+
+    pthread_mutex_lock(mtx);
+    pthread_mutex_unlock(mtx);
+
+    latch *l  = barrier->ltch;
+    l->count_down();
+    // All threads stuck here until we get at least 'count' waiters
+    l->await();
+
+ // If the last thread (thread x) to wait on the barrier is descheduled here
+    // (immediately after being the count'th thread crossing the barrier)
+    // the barrier remains open (a new waiting thread will cross) until
+ // the barrier is reset below (when thread x is rescheduled), which doesn't + // seem technically incorrect. Only one of the crossing threads will get a
+    // retval of PTHREAD_BARRIER_SERIAL_THREAD, when
+    // barrier->out == barrier->count.
+    // All other crossing threads will get a retval of 0.
+
+    pthread_mutex_lock(mtx);
+    barrier->out++;
+ // Make the last thread out responsible for resetting the barrier's latch.
+    // The last thread also gets the special return value
+    // PTHREAD_BARRIER_SERIAL_THREAD. Every other thread gets a retval of 0
+    if (barrier->out == barrier->count) {
+        retval = PTHREAD_BARRIER_SERIAL_THREAD;
+        // Reset the latch for the next round of waiters. We're using an
+        // external lock (mtx) to ensure that no other thread is calling
+ // count_down or in await when we're resetting it. Without the external
+        // lock, resetting the latch isn't safe.
+        l->unsafe_reset(barrier->count);
+ // Reset the 'out' counter so that the equality check above works across
+        // multiple rounds of threads waiting on the barrier
+        barrier->out = 0;
+    }
+    pthread_mutex_unlock(mtx);
+    return retval;
+}
+
+int pthread_barrier_destroy(pthread_barrier_t *barrier_opq)
+{
+    pthread_barrier_t_int *barrier = (pthread_barrier_t_int*) barrier_opq;
+
+ static_assert(sizeof(pthread_barrier_t_int) <= sizeof(pthread_barrier_t), + "pthread_barrier_t_int is larger than pthread_barrier_t");
+
+    if (!barrier || !barrier->ltch || !barrier->mtx) {
+        return EINVAL;
+    }
+
+    delete barrier->ltch;
+    barrier->ltch = nullptr;
+
+    pthread_mutex_destroy(barrier->mtx);
+    delete barrier->mtx;
+    barrier->mtx = nullptr;
+
+    return 0;
+}
diff --git a/modules/tests/Makefile b/modules/tests/Makefile
--- a/modules/tests/Makefile
+++ b/modules/tests/Makefile
@@ -85,7 +85,7 @@ tests := tst-pthread.so misc-ramdisk.so tst-vblk.so tst-bsd-evh.so \ payload-merge-env.so misc-execve.so misc-execve-payload.so misc-mutex2.so \
        tst-pthread-setcancelstate.so tst-syscall.so tst-pin.so tst-run.so \
        tst-ifaddrs.so tst-pthread-affinity-inherit.so tst-sem-timed-wait.so \
-       tst-ttyname.so
+       tst-ttyname.so tst-pthread-barrier.so

 #      libstatic-thread-variable.so tst-static-thread-variable.so \

diff --git a/tests/tst-pthread-barrier.cc b/tests/tst-pthread-barrier.cc
--- a/tests/tst-pthread-barrier.cc
+++ b/tests/tst-pthread-barrier.cc
@@ -0,0 +1,109 @@
+#include <stdio.h>
+#include <unistd.h>
+#include <memory.h>
+#include <errno.h>
+#include <stdbool.h>
+#include <pthread.h>
+#include <limits.h>
+#include <stdlib.h>
+#include <atomic>
+
+unsigned int tests_total = 0, tests_failed = 0;
+
+void report(const char* name, bool passed)
+{
+    static const char* status[] = {"FAIL", "PASS"};
+    printf("%s: %s\n", status[passed], name);
+    tests_total += 1;
+    tests_failed += !passed;
+}
+
+// Opaque type 32 bytes in size
+static pthread_barrier_t barrier;
+// Opaque type 4 bytes in size
+pthread_barrierattr_t attr;
+// Number of crossings across the barrier
+static int numCrossings;
+// Counter to track the number of special return values to threads
+static std::atomic<int> specialRetVals;
+
+static void* thread_func(void *arg)
+{
+    int threadNum = arg ? *((int*) arg): 0;
+    int retval = 0;
+    printf("[Thread %d] starting...\n", threadNum);
+
+    for (int crossing = 0; crossing < numCrossings; crossing++) {
+        // Force threads to sleep for a random interval so we randomize
+        // which thread might get the special return value
+        // PTHREAD_BARRIER_SERIAL_THREAD
+        int delay = random() % 7 + 1;
+        usleep(delay);
+
+        printf("[Thread %d] waiting on barrier\n", threadNum);
+        retval = pthread_barrier_wait(&barrier);
+        if (retval == PTHREAD_BARRIER_SERIAL_THREAD) {
+            printf("[Thread %d] crossed barrier with %d\n", threadNum,
+                   PTHREAD_BARRIER_SERIAL_THREAD);
+            // Increment the counter of special return values
+            specialRetVals.fetch_add(1, std::memory_order_relaxed);
+        } else if (retval == 0) {
+ printf("[Thread %d] crossed barrier with %d\n", threadNum, retval);
+        }
+    }
+    return 0;
+}
+
+int main(void)
+{
+ // Number of threads that must call into the barrier before they all unblock
+    const int numThreads = 10;
+    // Pass through the barrier k times
+    numCrossings = 4;
+    specialRetVals.store(0, std::memory_order_relaxed);
+    pthread_t threads[numThreads];
+    int threadIds[numThreads];
+    int retval = -1;
+    printf("Sizeof pthread_barrier_t    : %ld\n", sizeof(barrier));
+ report("sizeof pthread_barrier_t is 32 bytes\n", sizeof(barrier) == 32);
+    printf("Sizeof pthread_barrierattr_t: %ld\n", sizeof(attr));
+    report("sizeof pthread_barrierattr_t is 4 bytes\n", sizeof(attr) == 4);
+
+    // Try an invalid initialization (-1 or 0 or a null pthread_barrier_t*)
+    retval = pthread_barrier_init(NULL, NULL, 4);
+    report("pthread_barrier_init (pthread_barrier_t* == NULL)",
+           retval == EINVAL);
+    retval = pthread_barrier_init(&barrier, NULL, -1);
+    report("pthread_barrier_init (count == -1)", retval == EINVAL);
+    retval = pthread_barrier_init(&barrier, NULL, 0);
+    report("pthread_barrier_init (count == 0)", retval == EINVAL);
+    retval = pthread_barrier_init(&barrier, NULL, INT_MAX);
+    report("pthread_barrier_init (count == INT_MAX)", retval == EINVAL);
+
+    // Initalize a barrier with NULL attributes. In general
+    // it doesn't really matter what we do with pthread_barrierattr_t
+ // PTHREAD_PROCESS_PRIVATE vs PTHREAD_PROCESS_SHARED have the same effect
+    // in a unikernel - there's only a single process and all threads can
+    // manipulate the barrier so we can just ignore pthread_barrierattr_t
+    retval = pthread_barrier_init(&barrier, NULL, numThreads);
+    report("pthread_barrier_init", retval == 0);
+    if (retval != 0) {
+ printf("Early exit, pthread_barrier_init returned %d instead of 0\n",
+               retval);
+        goto exit;
+    }
+
+    for (int t = 0; t < numThreads; t++) {
+        threadIds[t] = t;
+ retval = pthread_create(&threads[t], NULL, thread_func, &threadIds[t]);
+    }
+ exit:
+    for (int t = 0; t < numThreads; t++) {
+        pthread_join(threads[t], NULL);
+    }
+    report("pthread_barrier_wait (special retvals)",
+           specialRetVals.load(std::memory_order_relaxed) == numCrossings);
+    pthread_barrier_destroy(&barrier);
+    printf("SUMMARY: %u tests / %u failures\n", tests_total, tests_failed);
+    return tests_failed == 0 ? 0 : 1;
+}

--
You received this message because you are subscribed to the Google Groups "OSv 
Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to osv-dev+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to