Script 'mail_helper' called by obssrc
Hello community,

here is the log from the commit of package dqlite for openSUSE:Factory checked 
in at 2024-02-28 19:48:14
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/dqlite (Old)
 and      /work/SRC/openSUSE:Factory/.dqlite.new.1770 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "dqlite"

Wed Feb 28 19:48:14 2024 rev:11 rq:1152974 version:1.16.4

Changes:
--------
--- /work/SRC/openSUSE:Factory/dqlite/dqlite.changes    2024-02-22 
20:59:13.952735401 +0100
+++ /work/SRC/openSUSE:Factory/.dqlite.new.1770/dqlite.changes  2024-02-28 
19:48:29.596802661 +0100
@@ -1,0 +2,6 @@
+Wed Feb 28 04:02:03 UTC 2024 - Andreas Stieger <andreas.stie...@gmx.de>
+
+- update to 1.16.4:
+  * build system fixes
+
+-------------------------------------------------------------------

Old:
----
  dqlite-1.16.3.tar.gz

New:
----
  dqlite-1.16.4.tar.gz

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ dqlite.spec ++++++
--- /var/tmp/diff_new_pack.MQ4UEI/_old  2024-02-28 19:48:31.244862165 +0100
+++ /var/tmp/diff_new_pack.MQ4UEI/_new  2024-02-28 19:48:31.248862310 +0100
@@ -19,7 +19,7 @@
 
 %define lname libdqlite0
 Name:           dqlite
-Version:        1.16.3
+Version:        1.16.4
 Release:        0
 Summary:        Distributed SQLite
 License:        LGPL-3.0-only WITH LGPL-3.0-linking-exception

++++++ dqlite-1.16.3.tar.gz -> dqlite-1.16.4.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dqlite-1.16.3/.clang-format 
new/dqlite-1.16.4/.clang-format
--- old/dqlite-1.16.3/.clang-format     2024-02-21 17:28:32.000000000 +0100
+++ new/dqlite-1.16.4/.clang-format     2024-02-23 16:28:33.000000000 +0100
@@ -2,7 +2,8 @@
 BreakBeforeBraces: Custom
 BraceWrapping:
  AfterFunction: true
- AfterStruct: true
+ AfterStruct: false
+Cpp11BracedListStyle: false
 IndentWidth: 8
 UseTab: ForContinuationAndIndentation
 PointerAlignment: Right
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dqlite-1.16.3/Makefile.am 
new/dqlite-1.16.4/Makefile.am
--- old/dqlite-1.16.3/Makefile.am       2024-02-21 17:28:32.000000000 +0100
+++ new/dqlite-1.16.4/Makefile.am       2024-02-23 16:28:33.000000000 +0100
@@ -44,6 +44,7 @@
   src/lib/buffer.c \
   src/lib/fs.c \
   src/lib/sm.c \
+  src/lib/threadpool.c \
   src/lib/transport.c \
   src/logger.c \
   src/message.c \
@@ -62,15 +63,15 @@
   src/tuple.c \
   src/vfs.c
 
+if BUILD_SQLITE_ENABLED
+basic_dqlite_sources += sqlite3.c
+endif
+
 lib_LTLIBRARIES = libdqlite.la
 libdqlite_la_CFLAGS = $(AM_CFLAGS) -fvisibility=hidden -DRAFT_API=''
 libdqlite_la_LDFLAGS = $(AM_LDFLAGS) -version-info 0:1:0
 libdqlite_la_SOURCES = $(basic_dqlite_sources)
 
-if BUILD_SQLITE_ENABLED
-libdqlite_la_SOURCES += sqlite3.c
-endif
-
 if BUILD_RAFT_ENABLED
 libraft_la_SOURCES = \
   src/raft/byte.c \
@@ -149,6 +150,7 @@
   test/test_error.c \
   test/test_integration.c \
   test/unit/ext/test_uv.c \
+  test/unit/ext/test_uv_pool.c \
   test/unit/lib/test_addr.c \
   test/unit/lib/test_buffer.c \
   test/unit/lib/test_byte.c \
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dqlite-1.16.3/configure.ac 
new/dqlite-1.16.4/configure.ac
--- old/dqlite-1.16.3/configure.ac      2024-02-21 17:28:32.000000000 +0100
+++ new/dqlite-1.16.4/configure.ac      2024-02-23 16:28:33.000000000 +0100
@@ -1,5 +1,5 @@
 AC_PREREQ(2.60)
-AC_INIT([libdqlite], [1.16.3], [https://github.com/canonical/dqlite])
+AC_INIT([libdqlite], [1.16.4], [https://github.com/canonical/dqlite])
 AC_CONFIG_MACRO_DIR([m4])
 AC_CONFIG_AUX_DIR([ac])
 
@@ -37,9 +37,6 @@
 AC_ARG_ENABLE(build-raft, AS_HELP_STRING([--enable-build-raft[=ARG]], [use the 
bundled raft sources instead of linking to libraft [default=no]]))
 AM_CONDITIONAL(BUILD_RAFT_ENABLED, test "x$enable_build_raft" = "xyes")
 
-# Allow not linking to liblz4 even if it's present.
-AC_ARG_WITH([lz4], AS_HELP_STRING([--without-lz4], [never link to liblz4]))
-
 # Whether to enable code coverage.
 AX_CODE_COVERAGE
 
@@ -59,8 +56,23 @@
 PKG_CHECK_MODULES(UV, [libuv >= 1.8.0], [], [])
 AS_IF([test "x$enable_build_raft" != "xyes"], [PKG_CHECK_MODULES(RAFT, [raft 
>= 0.18.1], [], [])], [])
 
-AS_IF([test "x$with_lz4" != "xno"], [PKG_CHECK_MODULES(LZ4, [liblz4 >= 1.7.1], 
[have_lz4=yes], [have_lz4=no])], [have_lz4=no])
-AS_IF([test "x$with_lz4" != "xno" -a "x$have_lz4" = "xno"], 
[AC_MSG_ERROR([liblz4 required but not found])], [])
+
+# Allow not linking to liblz4 even if it's present.
+AC_ARG_WITH([lz4], AS_HELP_STRING([--without-lz4], [never link to liblz4]))
+AS_IF([test "x$enable_build_raft" = "xyes"],
+      # Building raft
+      [AS_IF([test "x$with_lz4" != "xno"],
+            [PKG_CHECK_MODULES(LZ4, [liblz4 >= 1.7.1], [have_lz4=yes], 
[have_lz4=no])],
+            [have_lz4=no])
+       AS_IF([test "x$with_lz4" != "xno" -a "x$have_lz4" = "xno"],
+            [AC_MSG_ERROR([liblz4 required but not found])],
+            [])],
+      # Not building raft
+      [AS_IF([test "x$with_lz4" = "xyes"],
+            [AC_MSG_ERROR([linking lz4 doesn't make sense unless building 
raft])],
+            [])
+      have_lz4=no])
+
 AM_CONDITIONAL(LZ4_AVAILABLE, test "x$have_lz4" = "xyes")
 
 AC_ARG_ENABLE(lz4, AS_HELP_STRING([--disable-lz4], [when building with lz4, do 
not compress snapshots by default]))
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dqlite-1.16.3/include/dqlite.h 
new/dqlite-1.16.4/include/dqlite.h
--- old/dqlite-1.16.3/include/dqlite.h  2024-02-21 17:28:32.000000000 +0100
+++ new/dqlite-1.16.4/include/dqlite.h  2024-02-23 16:28:33.000000000 +0100
@@ -28,7 +28,7 @@
  */
 #define DQLITE_VERSION_MAJOR 1
 #define DQLITE_VERSION_MINOR 16
-#define DQLITE_VERSION_RELEASE 3
+#define DQLITE_VERSION_RELEASE 4
 #define DQLITE_VERSION_NUMBER                                            \
        (DQLITE_VERSION_MAJOR * 100 * 100 + DQLITE_VERSION_MINOR * 100 + \
         DQLITE_VERSION_RELEASE)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dqlite-1.16.3/src/lib/queue.h 
new/dqlite-1.16.4/src/lib/queue.h
--- old/dqlite-1.16.3/src/lib/queue.h   2024-02-21 17:28:32.000000000 +0100
+++ new/dqlite-1.16.4/src/lib/queue.h   2024-02-23 16:28:33.000000000 +0100
@@ -37,6 +37,19 @@
                QUEUE__PREV(q) = (e);            \
        }
 
+#define QUEUE__INSERT_TAIL(q, e) QUEUE__PUSH(q, e)
+
+/**
+ * Insert an element at the front of a queue.
+ */
+#define QUEUE__INSERT_HEAD(h, q)                 \
+       {                                        \
+               QUEUE__NEXT(q) = QUEUE__NEXT(h); \
+               QUEUE__PREV(q) = (h);            \
+               QUEUE__NEXT_PREV(q) = (q);       \
+               QUEUE__NEXT(h) = (q);            \
+       }
+
 /**
  * Remove the given element from the queue. Any element can be removed at any
  * time.
@@ -48,6 +61,25 @@
        }
 
 /**
+ * Moves elements from queue @h to queue @n
+ * Note: Removed QUEUE__SPLIT() and merged it into QUEUE__MOVE().
+ */
+#define QUEUE__MOVE(h, n)                                  \
+       {                                                  \
+               if (QUEUE__IS_EMPTY(h)) {                  \
+                       QUEUE__INIT(n);                    \
+               } else {                                   \
+                       queue *__q = QUEUE__HEAD(h);       \
+                       QUEUE__PREV(n) = QUEUE__PREV(h);   \
+                       QUEUE__PREV_NEXT(n) = (n);         \
+                       QUEUE__NEXT(n) = (__q);            \
+                       QUEUE__PREV(h) = QUEUE__PREV(__q); \
+                       QUEUE__PREV_NEXT(h) = (h);         \
+                       QUEUE__PREV(__q) = (n);            \
+               }                                          \
+       }
+
+/**
  * Return the element at the front of the queue.
  */
 #define QUEUE__HEAD(q) (QUEUE__NEXT(q))
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dqlite-1.16.3/src/lib/threadpool.c 
new/dqlite-1.16.4/src/lib/threadpool.c
--- old/dqlite-1.16.3/src/lib/threadpool.c      1970-01-01 01:00:00.000000000 
+0100
+++ new/dqlite-1.16.4/src/lib/threadpool.c      2024-02-23 16:28:33.000000000 
+0100
@@ -0,0 +1,554 @@
+#include "threadpool.h"
+#include <assert.h>
+#include <stddef.h>
+#include <stdlib.h>
+#include <string.h>
+#include <uv.h>
+#include "../../src/lib/queue.h"
+#include "../../src/lib/sm.h"
+#include "../../src/utils.h"
+
+/**
+ *  Planner thread state machine.
+ *
+ *     signal() &&
+ *     empty(o) &&                 signal() && exiting
+ *     empty(u) && +-----> NOTHING ----------------> EXITED
+ *     !exiting    +-------  ^ |
+ *                           | |
+ *               empty(o) && | | signal()
+ *               empty(u)    | | !empty(o) || !empty(u)
+ *                           | |
+ *                           | |
+ *                           | V
+ *    !empty(o) && +-----> DRAINING
+ *    !empty(u) && +-------  ^ |
+ * type(head(o)) != BAR      | |
+ *                           | | type(head(o)) == BAR
+ *        ord_in_flight == 0 | |
+ *                           | V
+ *                         BARRIER --------+ signal()
+ *                           ^ |   <-------+ ord_in_flight == 0
+ *                           | |
+ *                  empty(u) | | !empty(u)
+ *                           | V
+ *                      DRAINING_UNORD
+ */
+
+enum planner_states {
+       PS_NOTHING,
+       PS_DRAINING,
+       PS_BARRIER,
+       PS_DRAINING_UNORD,
+       PS_EXITED,
+       PS_NR,
+};
+
+static const struct sm_conf planner_states[PS_NR] = {
+       [PS_NOTHING] = {
+           .flags = SM_INITIAL,
+           .name = "nothing",
+           .allowed = BITS(PS_DRAINING) | BITS(PS_EXITED),
+       },
+       [PS_DRAINING] = {
+           .name = "draining",
+           .allowed = BITS(PS_DRAINING)
+                    | BITS(PS_NOTHING)
+                    | BITS(PS_BARRIER),
+       },
+       [PS_BARRIER] = {
+           .name = "barrier",
+           .allowed = BITS(PS_DRAINING_UNORD)
+                    | BITS(PS_DRAINING)
+                    | BITS(PS_BARRIER),
+       },
+       [PS_DRAINING_UNORD] = {
+           .name = "draining-unord",
+           .allowed = BITS(PS_BARRIER)
+       },
+       [PS_EXITED] = {
+           .flags = SM_FINAL,
+           .name = "exited",
+           .allowed = 0,
+       },
+};
+
+enum {
+       THREADPOOL_SIZE_MAX = 1024,
+};
+
+typedef struct pool_thread pool_thread_t;
+typedef struct pool_impl pool_impl_t;
+
+struct targs {
+       pool_impl_t *pi;
+       uv_sem_t *sem;
+       uint32_t idx; /* Thread's index */
+};
+
+/* Worker thread of the pool */
+struct pool_thread {
+       queue inq;          /* Thread's input queue */
+       uv_cond_t cond;     /* Signalled when work item appears in @inq */
+       uv_thread_t thread; /* Pool's worker thread */
+       struct targs arg;
+};
+
+struct pool_impl {
+       uv_mutex_t mutex; /* Input queue, planner_sm,
+                            worker and planner threads lock */
+       uint32_t threads_nr;
+       pool_thread_t *threads;
+
+       queue outq;            /* Output queue used by libuv part */
+       uv_mutex_t outq_mutex; /* Output queue lock */
+       uv_async_t outq_async; /* Signalled when output queue is not
+                                 empty and libuv loop has to process
+                                 items from it */
+       uint64_t active_ws;    /* Number of all work items in flight,
+                                 accessed from the main thread only */
+
+       queue ordered;        /* Queue of WT_ORD{N} items */
+       queue unordered;      /* Queue of WT_UNORD items */
+       struct sm planner_sm; /* State machine of the scheduler */
+       uv_cond_t planner_cond;
+       uv_thread_t planner_thread; /* Scheduler's thread */
+
+       uint32_t ord_in_flight; /* Number of WT_ORD{N} in flight */
+       bool exiting;           /* True when the pool is being stopped */
+       enum pool_work_type     /* Type of the previous work item, */
+           ord_prev;           /* used in WT_ORD{N} ivariants */
+       uint32_t qos;           /* QoS token */
+       uint32_t qos_prio;      /* QoS prio */
+};
+
+static inline bool has_active_ws(pool_t *pool)
+{
+       return pool->pi->active_ws > 0;
+}
+
+static inline void w_register(pool_t *pool, pool_work_t *w)
+{
+       if (w->type != WT_BAR) {
+               pool->pi->active_ws++;
+       }
+}
+
+static inline void w_unregister(pool_t *pool, pool_work_t *w)
+{
+       (void)w;
+       PRE(has_active_ws(pool));
+       pool->pi->active_ws--;
+}
+
+static bool empty(const queue *q)
+{
+       return QUEUE__IS_EMPTY(q);
+}
+
+static queue *head(const queue *q)
+{
+       return QUEUE__HEAD(q);
+}
+
+static void push(queue *to, queue *what)
+{
+       QUEUE__INSERT_TAIL(to, what);
+}
+
+static queue *pop(queue *from)
+{
+       queue *q = QUEUE__HEAD(from);
+       PRE(q != NULL);
+       QUEUE__REMOVE(q);
+       QUEUE__INIT(q);
+       return q;
+}
+
+static queue *qos_pop(pool_impl_t *pi, queue *first, queue *second)
+{
+       PRE(!empty(first) || !empty(second));
+
+       if (empty(first)) {
+               return pop(second);
+       } else if (empty(second)) {
+               return pop(first);
+       }
+
+       return pop(pi->qos++ % pi->qos_prio ? first : second);
+}
+
+static pool_work_t *q_to_w(const queue *q)
+{
+       return QUEUE__DATA(q, pool_work_t, link);
+}
+
+static enum pool_work_type q_type(const queue *q)
+{
+       return q_to_w(q)->type;
+}
+
+static uint32_t q_tid(const queue *q)
+{
+       return q_to_w(q)->thread_id;
+}
+
+static bool planner_invariant(const struct sm *m, int prev_state)
+{
+       pool_impl_t *pi = CONTAINER_OF(m, pool_impl_t, planner_sm);
+       queue *o = &pi->ordered;
+       queue *u = &pi->unordered;
+
+       return ERGO(sm_state(m) == PS_NOTHING, empty(o) && empty(u)) &&
+              ERGO(sm_state(m) == PS_DRAINING,
+                   ERGO(prev_state == PS_BARRIER,
+                        pi->ord_in_flight == 0 && empty(u)) &&
+                       ERGO(prev_state == PS_NOTHING,
+                            !empty(u) || !empty(o))) &&
+              ERGO(sm_state(m) == PS_EXITED,
+                   pi->exiting && empty(o) && empty(u)) &&
+              ERGO(
+                  sm_state(m) == PS_BARRIER,
+                  ERGO(prev_state == PS_DRAINING, q_type(head(o)) == WT_BAR) &&
+                      ERGO(prev_state == PS_DRAINING_UNORD, empty(u))) &&
+              ERGO(sm_state(m) == PS_DRAINING_UNORD, !empty(u));
+}
+
+static void planner(void *arg)
+{
+       struct targs *ta = arg;
+       uv_sem_t *sem = ta->sem;
+       pool_impl_t *pi = ta->pi;
+       uv_mutex_t *mutex = &pi->mutex;
+       pool_thread_t *ts = pi->threads;
+       struct sm *planner_sm = &pi->planner_sm;
+       queue *o = &pi->ordered;
+       queue *u = &pi->unordered;
+       queue *q;
+
+       sm_init(planner_sm, planner_invariant, NULL, planner_states,
+               PS_NOTHING);
+       uv_sem_post(sem);
+       uv_mutex_lock(mutex);
+       for (;;) {
+               switch (sm_state(planner_sm)) {
+                       case PS_NOTHING:
+                               while (empty(o) && empty(u) && !pi->exiting) {
+                                       uv_cond_wait(&pi->planner_cond, mutex);
+                               }
+                               sm_move(planner_sm,
+                                       pi->exiting ? PS_EXITED : PS_DRAINING);
+                               break;
+                       case PS_DRAINING:
+                               while (!(empty(o) && empty(u))) {
+                                       sm_move(planner_sm, PS_DRAINING);
+                                       if (!empty(o) &&
+                                           q_type(head(o)) == WT_BAR) {
+                                               sm_move(planner_sm, PS_BARRIER);
+                                               goto ps_barrier;
+                                       }
+                                       q = qos_pop(pi, o, u);
+                                       push(&ts[q_tid(q)].inq, q);
+                                       uv_cond_signal(&ts[q_tid(q)].cond);
+                                       if (q_type(q) >= WT_ORD1) {
+                                               pi->ord_in_flight++;
+                                       }
+                               }
+                               sm_move(planner_sm, PS_NOTHING);
+                       ps_barrier:
+                               break;
+                       case PS_BARRIER:
+                               if (!empty(u)) {
+                                       sm_move(planner_sm, PS_DRAINING_UNORD);
+                                       break;
+                               }
+                               if (pi->ord_in_flight == 0) {
+                                       q = pop(o);
+                                       PRE(q_to_w(q)->type == WT_BAR);
+                                       free(q_to_w(q));
+                                       sm_move(planner_sm, PS_DRAINING);
+                                       break;
+                               }
+                               uv_cond_wait(&pi->planner_cond, mutex);
+                               sm_move(planner_sm, PS_BARRIER);
+                               break;
+                       case PS_DRAINING_UNORD:
+                               while (!empty(u)) {
+                                       q = pop(u);
+                                       push(&ts[q_tid(q)].inq, q);
+                                       uv_cond_signal(&ts[q_tid(q)].cond);
+                               }
+                               sm_move(planner_sm, PS_BARRIER);
+                               break;
+                       case PS_EXITED:
+                               sm_fini(planner_sm);
+                               uv_mutex_unlock(mutex);
+                               return;
+                       default:
+                               POST(false && "Impossible!");
+               }
+       }
+}
+
+static void queue_work(pool_work_t *w)
+{
+       w->work_cb(w);
+}
+
+static void queue_done(pool_work_t *w)
+{
+       w_unregister(w->pool, w);
+       if (w->after_work_cb != NULL) {
+               w->after_work_cb(w);
+       }
+}
+
+static void worker(void *arg)
+{
+       struct targs *ta = arg;
+       pool_impl_t *pi = ta->pi;
+       uv_mutex_t *mutex = &pi->mutex;
+       pool_thread_t *ts = pi->threads;
+       enum pool_work_type wtype;
+       pool_work_t *w;
+       queue *q;
+
+       uv_sem_post(ta->sem);
+       uv_mutex_lock(mutex);
+       for (;;) {
+               while (empty(&ts[ta->idx].inq)) {
+                       if (pi->exiting) {
+                               uv_mutex_unlock(mutex);
+                               return;
+                       }
+                       uv_cond_wait(&ts[ta->idx].cond, mutex);
+               }
+
+               q = pop(&ts[ta->idx].inq);
+               uv_mutex_unlock(mutex);
+
+               w = q_to_w(q);
+               wtype = w->type;
+               queue_work(w);
+
+               uv_mutex_lock(&pi->outq_mutex);
+               push(&pi->outq, &w->link);
+               uv_async_send(&pi->outq_async);
+               uv_mutex_unlock(&pi->outq_mutex);
+
+               uv_mutex_lock(mutex);
+               if (wtype > WT_BAR) {
+                       assert(pi->ord_in_flight > 0);
+                       if (--pi->ord_in_flight == 0) {
+                               uv_cond_signal(&pi->planner_cond);
+                       }
+               }
+       }
+}
+
+static void pool_cleanup(pool_t *pool)
+{
+       pool_impl_t *pi = pool->pi;
+       pool_thread_t *ts = pi->threads;
+       uint32_t i;
+
+       if (pi->threads_nr == 0) {
+               return;
+       }
+
+       pi->exiting = true;
+       uv_cond_signal(&pi->planner_cond);
+
+       if (uv_thread_join(&pi->planner_thread)) {
+               abort();
+       }
+       uv_cond_destroy(&pi->planner_cond);
+       POST(empty(&pi->ordered) && empty(&pi->unordered));
+
+       for (i = 0; i < pi->threads_nr; i++) {
+               uv_cond_signal(&ts[i].cond);
+               if (uv_thread_join(&ts[i].thread)) {
+                       abort();
+               }
+               POST(empty(&ts[i].inq));
+               uv_cond_destroy(&ts[i].cond);
+       }
+
+       free(pi->threads);
+       uv_mutex_destroy(&pi->mutex);
+       pi->threads_nr = 0;
+}
+
+static void pool_threads_init(pool_t *pool)
+{
+       uint32_t i;
+       uv_sem_t sem;
+       pool_impl_t *pi = pool->pi;
+       pool_thread_t *ts;
+       struct targs pa = {
+               .sem = &sem,
+               .pi = pi,
+       };
+       uv_thread_options_t config = {
+               .flags = UV_THREAD_HAS_STACK_SIZE,
+               .stack_size = 8u << 20,
+       };
+
+       if (uv_mutex_init(&pi->mutex)) {
+               abort();
+       }
+       if (uv_sem_init(&sem, 0)) {
+               abort();
+       }
+
+       pi->threads = calloc(pi->threads_nr, sizeof(pi->threads[0]));
+       if (pi->threads == NULL) {
+               abort();
+       }
+
+       for (i = 0, ts = pi->threads; i < pi->threads_nr; i++) {
+               ts[i].arg = (struct targs){
+                       .pi = pi,
+                       .sem = &sem,
+                       .idx = i,
+               };
+
+               QUEUE__INIT(&ts[i].inq);
+               if (uv_cond_init(&ts[i].cond)) {
+                       abort();
+               }
+               if (uv_thread_create_ex(&ts[i].thread, &config, worker,
+                                       &ts[i].arg)) {
+                       abort();
+               }
+       }
+
+       if (uv_cond_init(&pi->planner_cond)) {
+               abort();
+       }
+       if (uv_thread_create_ex(&pi->planner_thread, &config, planner, &pa)) {
+               abort();
+       }
+       for (i = 0; i < pi->threads_nr + 1 /* +planner */; i++) {
+               uv_sem_wait(&sem);
+       }
+
+       uv_sem_destroy(&sem);
+}
+
+static void pool_work_submit(pool_t *pool, pool_work_t *w)
+{
+       pool_impl_t *pi = pool->pi;
+       queue *o = &pi->ordered;
+       queue *u = &pi->unordered;
+
+       if (w->type > WT_UNORD) {
+               /* Make sure that elements in the ordered queue come in order.
+                */
+               PRE(ERGO(pi->ord_prev != WT_BAR && w->type != WT_BAR,
+                        pi->ord_prev == w->type));
+               pi->ord_prev = w->type;
+       }
+
+       uv_mutex_lock(&pi->mutex);
+       push(w->type == WT_UNORD ? u : o, &w->link);
+       uv_cond_signal(&pi->planner_cond);
+       uv_mutex_unlock(&pi->mutex);
+}
+
+void work_done(uv_async_t *handle)
+{
+       queue q = {};
+       pool_impl_t *pi = CONTAINER_OF(handle, pool_impl_t, outq_async);
+
+       uv_mutex_lock(&pi->outq_mutex);
+       QUEUE__MOVE(&pi->outq, &q);
+       uv_mutex_unlock(&pi->outq_mutex);
+
+       while (!empty(&q)) {
+               queue_done(q_to_w(pop(&q)));
+       }
+}
+
+void pool_queue_work(pool_t *pool,
+                    pool_work_t *w,
+                    uint32_t cookie,
+                    enum pool_work_type type,
+                    void (*work_cb)(pool_work_t *w),
+                    void (*after_work_cb)(pool_work_t *w))
+{
+       PRE(work_cb != NULL && type < WT_NR);
+
+       *w = (pool_work_t){
+               .pool = pool,
+               .type = type,
+               .thread_id = cookie % pool->pi->threads_nr,
+               .work_cb = work_cb,
+               .after_work_cb = after_work_cb,
+       };
+       w_register(pool, w);
+       pool_work_submit(pool, w);
+}
+
+int pool_init(pool_t *pool,
+             uv_loop_t *loop,
+             uint32_t threads_nr,
+             uint32_t qos_prio)
+{
+       int rc;
+       pool_impl_t *pi = pool->pi;
+
+       PRE(threads_nr <= THREADPOOL_SIZE_MAX);
+
+       pi = pool->pi = calloc(1, sizeof(*pool->pi));
+       if (pi == NULL) {
+               return UV_ENOMEM;
+       }
+
+       *pi = (pool_impl_t){
+               .qos = 0,
+               .qos_prio = qos_prio,
+               .exiting = false,
+               .ord_prev = WT_BAR,
+               .threads_nr = threads_nr,
+               .ord_in_flight = 0,
+       };
+       QUEUE__INIT(&pi->outq);
+       QUEUE__INIT(&pi->ordered);
+       QUEUE__INIT(&pi->unordered);
+
+       rc = uv_mutex_init(&pi->outq_mutex);
+       if (rc != 0) {
+               free(pi);
+               return rc;
+       }
+
+       rc = uv_async_init(loop, &pi->outq_async, work_done);
+       if (rc != 0) {
+               uv_mutex_destroy(&pi->outq_mutex);
+               free(pi);
+               return rc;
+       }
+
+       pool_threads_init(pool);
+       return 0;
+}
+
+void pool_fini(pool_t *pool)
+{
+       pool_impl_t *pi = pool->pi;
+
+       pool_cleanup(pool);
+
+       uv_mutex_lock(&pi->outq_mutex);
+       POST(empty(&pi->outq) && !has_active_ws(pool));
+       uv_mutex_unlock(&pi->outq_mutex);
+
+       uv_mutex_destroy(&pi->outq_mutex);
+       free(pi);
+}
+
+void pool_close(pool_t *pool)
+{
+       uv_close((uv_handle_t *)&pool->pi->outq_async, NULL);
+}
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dqlite-1.16.3/src/lib/threadpool.h 
new/dqlite-1.16.4/src/lib/threadpool.h
--- old/dqlite-1.16.3/src/lib/threadpool.h      1970-01-01 01:00:00.000000000 
+0100
+++ new/dqlite-1.16.4/src/lib/threadpool.h      2024-02-23 16:28:33.000000000 
+0100
@@ -0,0 +1,92 @@
+#ifndef __THREAD_POOL__
+#define __THREAD_POOL__
+
+#include <uv.h>
+#include "queue.h"
+
+/**
+   Thread pool
+
+   - Use-cases:
+
+     - Move sqlite3-, IO- related blocking operations from libuv
+       loop's thread to pool's threads in order to unblock serving
+       incoming dqlite requests during sqlite3 IO.
+       Multiple sqlite3_step()-s can be in flight and executed
+       concurrently, while thread's loop is not IO blocked.
+
+     - Introduced pool's work item thread affinity to serve sqlite3-
+       related items of each database in a "dedicated" thread which
+       allows not to make any assumption on sqlite3 threading model.
+       @see https://www.sqlite.org/threadsafe.html
+
+     - The pool supports servicing of the following types of work items:
+
+       - WT_UNORD - items, which can be processed by the pool in any
+        order, concurrency assumptions of this type of work are
+        guaranteed by other layers of the application. Read and write
+        transactions executed by sqlite3_step() are good examples for
+        such work item type.
+
+       - WT_ORD_N - items, which can NOT be processed by the pool in
+        any order. The pool's logic shall guarantee that servicing
+        all WT_ORD_{N}s happens before WT_ORD_{N + 1}s. WT_ORD_{N}s
+        and WT_ORD_{N + 1}s operations can't be put into the pool
+        interleaved. Sqlite3 checkpoints is an example of WT_ORD_{N}
+        and InstallSnapshot(CP(), MV()) is an example of WT_ORD_{N + 1}.
+
+       - WT_BAR - special purpose item, barrier. Delimits WT_ORD_{N}s
+        from WT_ORD_{N + 1}s.
+
+     - The pool supports servicing of work items with a given quality
+       of service (QoS) considerations. For example, the priority of
+       serving read/write sqlite3 transactions (WT_UNORD) can be set
+       higher then snapshot installation (WT_ORD{N}).
+ */
+
+struct pool_impl;
+typedef struct pool_s pool_t;
+typedef struct pool_work_s pool_work_t;
+
+enum pool_work_type {
+       WT_UNORD,
+       WT_BAR,
+       WT_ORD1,
+       WT_ORD2,
+       WT_NR,
+};
+
+struct pool_work_s
+{
+       queue link;         /* Link into ordered, unordered and outq */
+       uint32_t thread_id; /* Identifier of the thread the item is affined */
+       pool_t *pool;       /* The pool, item is being associated with */
+       enum pool_work_type type;
+
+       void (*work_cb)(pool_work_t *w);
+       void (*after_work_cb)(pool_work_t *w);
+};
+
+struct pool_s
+{
+       struct pool_impl *pi;
+};
+
+enum {
+       POOL_QOS_PRIO_FAIR = 2,
+};
+
+int pool_init(pool_t *pool,
+             uv_loop_t *loop,
+             uint32_t threads_nr,
+             uint32_t qos_prio);
+void pool_fini(pool_t *pool);
+void pool_close(pool_t *pool);
+void pool_queue_work(pool_t *pool,
+                    pool_work_t *w,
+                    uint32_t cookie,
+                    enum pool_work_type type,
+                    void (*work_cb)(pool_work_t *w),
+                    void (*after_work_cb)(pool_work_t *w));
+
+#endif /* __THREAD_POOL__ */
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/dqlite-1.16.3/test/unit/ext/test_uv_pool.c 
new/dqlite-1.16.4/test/unit/ext/test_uv_pool.c
--- old/dqlite-1.16.3/test/unit/ext/test_uv_pool.c      1970-01-01 
01:00:00.000000000 +0100
+++ new/dqlite-1.16.4/test/unit/ext/test_uv_pool.c      2024-02-23 
16:28:33.000000000 +0100
@@ -0,0 +1,113 @@
+#include "../../../src/lib/threadpool.h"
+#include "../../../src/utils.h"
+#include "../../lib/runner.h"
+#include "../../lib/uv.h"
+
+TEST_MODULE(ext_uv_pool);
+
+/******************************************************************************
+ *
+ * threadpool
+ *
+ 
******************************************************************************/
+
+enum { WORK_ITEMS_NR = 50000 };
+
+struct fixture {
+       pool_work_t w;
+       uv_loop_t loop;
+       pool_t pool;
+};
+
+static void loop_setup(struct fixture *f)
+{
+       int rc;
+
+       rc = uv_loop_init(&f->loop);
+       munit_assert_int(rc, ==, 0);
+
+       rc = pool_init(&f->pool, &f->loop, 4, POOL_QOS_PRIO_FAIR);
+       munit_assert_int(rc, ==, 0);
+}
+
+static void bottom_work_cb(pool_work_t *w)
+{
+       (void)w;
+}
+
+static void bottom_after_work_cb(pool_work_t *w)
+{
+       static int count = 0;
+
+       if (count == WORK_ITEMS_NR)
+               pool_close(w->pool);
+
+       count++;
+       assert(w->type != WT_BAR);
+       free(w);
+}
+
+static void after_work_cb(pool_work_t *w)
+{
+       enum pool_work_type pwt;
+       pool_work_t *work;
+       unsigned int wt;
+       unsigned int i;
+
+       for (i = 0; i <= WORK_ITEMS_NR + 1 /* +WT_BAR */; i++) {
+               work = malloc(sizeof(*work));
+
+               if (i < WORK_ITEMS_NR / 2)
+                       wt = WT_ORD1;
+               else if (i == WORK_ITEMS_NR / 2)
+                       wt = WT_BAR;
+               else
+                       wt = WT_ORD2;
+
+               pwt = i % 2 == 0 ? wt : WT_UNORD;
+               pool_queue_work(w->pool, work, i, pwt, bottom_work_cb,
+                               bottom_after_work_cb);
+       }
+}
+
+static void work_cb(pool_work_t *w)
+{
+       (void)w;
+}
+
+static void threadpool_tear_down(void *data)
+{
+       int rc;
+       struct fixture *f = data;
+
+       pool_fini(&f->pool);
+       rc = uv_loop_close(&f->loop);
+       munit_assert_int(rc, ==, 0);
+       free(f);
+}
+
+static void *threadpool_setup(const MunitParameter params[], void *user_data)
+{
+       (void)params;
+       (void)user_data;
+       struct fixture *f = calloc(1, sizeof *f);
+       loop_setup(f);
+       return f;
+}
+
+TEST_SUITE(threadpool);
+TEST_SETUP(threadpool, threadpool_setup);
+TEST_TEAR_DOWN(threadpool, threadpool_tear_down);
+TEST_CASE(threadpool, sync, NULL)
+{
+       (void)params;
+       struct fixture *f = data;
+       int rc;
+
+       pool_queue_work(&f->pool, &f->w, 0, WT_UNORD, work_cb, after_work_cb);
+
+       rc = uv_run(&f->loop, UV_RUN_DEFAULT);
+       munit_assert_int(rc, ==, 0);
+
+       return MUNIT_OK;
+}

Reply via email to