On 4/25/21 1:55 PM, Gaetan Rivet wrote: > Add a lockless multi-producer/single-consumer (MPSC), linked-list based, > intrusive, unbounded queue that does not require deferred memory > management. > > The queue is an implementation of the structure described by Dmitri > Vyukov[1]. It adds a slightly more explicit API explaining the proper use > of the queue. > Alternatives were considered such as a Treiber Stack [2] or a > Michael-Scott queue [3], but this one is faster, simpler and scalable. > > [1]: > http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue > > [2]: R. K. Treiber. Systems programming: Coping with parallelism. > Technical Report RJ 5118, IBM Almaden Research Center, April 1986. > > [3]: M. M. Michael, Simple, Fast, and Practical Non-Blocking and Blocking > Concurrent Queue Algorithms. > > https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html > > The queue is designed to improve the specific MPSC setup. A benchmark > accompanies the unit tests to measure the difference in this configuration. > A single reader thread polls the queue while N writers enqueue elements > as fast as possible. The mpsc-queue is compared against the regular ovs-list > as well as the guarded list. The latter usually offers a slight improvement > by batching the element removal, however the mpsc-queue is faster. > > The average is of each producer threads time: > > $ ./tests/ovstest test-mpsc-queue benchmark 3000000 1 > Benchmarking n=3000000 on 1 + 1 threads. > type\thread: Reader 1 Avg > mpsc-queue: 161 161 161 ms > list: 803 803 803 ms > guarded list: 665 665 665 ms > > $ ./tests/ovstest test-mpsc-queue benchmark 3000000 2 > Benchmarking n=3000000 on 1 + 2 threads. > type\thread: Reader 1 2 Avg > mpsc-queue: 102 101 97 99 ms > list: 246 212 246 229 ms > guarded list: 264 263 214 238 ms > > $ ./tests/ovstest test-mpsc-queue benchmark 3000000 3 > Benchmarking n=3000000 on 1 + 3 threads. > type\thread: Reader 1 2 3 Avg > mpsc-queue: 92 91 92 91 91 ms > list: 520 517 515 520 517 ms > guarded list: 405 395 401 404 400 ms > > $ ./tests/ovstest test-mpsc-queue benchmark 3000000 4 > Benchmarking n=3000000 on 1 + 4 threads. > type\thread: Reader 1 2 3 4 Avg > mpsc-queue: 77 73 73 77 75 74 ms > list: 371 359 361 287 370 344 ms > guarded list: 389 388 359 363 357 366 ms
Hi, Gaetan. This is an interesting implementation. However. I spent a few hours benchmarking it on my machine and I can say that benchmark itself has a few issues: 1. Benchmark need to care about cores on which threads are working, otherwise results are highly inconsistent, e.g. if several threads are scheduled on the same core. Also, results are different if threads are scheduled on sibling cores vs on separate physical cores. 2. ovs-list test is not fair in compare with test for mpsc. In test for ovs-list, reader blocks the writer for the whole loop that also includes some calculations inside. I see you point that, likely, taking a mutex for every pop() operation will destroy the performance, but you're missing the fact that spinlock could be used instead of a mutex. What I did is that I replaced a mutex with spinlock, set affinity for threads to separate physical cores and performed a series of tests. Every test variant was executed 10 times, calculated average, min and max of the 'Avg' column from these runs. Since spinlocks are not fair, they sometimes perform badly in case of too high contention. For that reason I also included 90% average that cuts off one worst result. Results: ./tests/ovstest test-mpsc-queue benchmark 3000000 1 mpsc-queue: avg 183.5 | max 200 | min 169 | 90% avg 181 spin+list: avg 89.2 | max 142 | min 64 | 90% avg 89 ./tests/ovstest test-mpsc-queue benchmark 3000000 2 mpsc-queue: avg 83.3 | max 90 | min 79 | 90% avg 83 spin+list: avg 141.8 | max 675 | min 43 | 90% avg 83 ./tests/ovstest test-mpsc-queue benchmark 3000000 3 mpsc-queue: avg 75.5 | max 80 | min 70 | 90% avg 75 spin+list: avg 96.2 | max 166 | min 63 | 90% avg 88 We can see that in this setup list+spinlock absolutely crashes the mspc in case of single reader + single writer, which is the most likely case for a non-synthetic workload. It's on par with mpsc in case of 1+2 and also very similar in 1+3. I didn't have 5 physical cores to test 1+4. I also run a test without contention (All reads after join). And there is no noticeable difference between implementations in this case: ./tests/ovstest test-mpsc-queue benchmark 3000000 1 - without contention mpsc-queue: avg 60.9 | max 75 | min 42 | 90% avg 59 spin+list: avg 60.5 | max 73 | min 43 | 90% avg 59 guarded: avg 79.7 | max 95 | min 61 | 90% avg 78 Results are much more flaky in case of scheduling threads to siblings, but spinlock+list still not too far from mpsc even in this case. So, the question is: do we really need this new data structure? spinlock + ovs-list shows very good results and should work just fine, especially in non-synthetic cases, where contention of more than 2 threads is very unlikely (assuming that critical sections are short). In OVS these data structures will be used from PMD threads that are guaranteed to run on different cores. Non-PMD threads are running on different set of cores too, so there should be no problems with contention between threads running on a same core. Of course, spinlocks should be used wisely, i.e. we should not hold them longer than necessary. But if the code is already written for mspc, it should not be a problem to convert it to spinlock + list without need to hold a lock for longer than one push() or pop(). Any thoughts? Below is the patch that I applied on top of this one to perform tests (on my machine cores 0 and 1 are siblings): diff --git a/tests/test-mpsc-queue.c b/tests/test-mpsc-queue.c index 26d48b9ff..67cbce421 100644 --- a/tests/test-mpsc-queue.c +++ b/tests/test-mpsc-queue.c @@ -25,7 +25,10 @@ #include "guarded-list.h" #include "mpsc-queue.h" #include "openvswitch/list.h" +#include "openvswitch/vlog.h" #include "openvswitch/util.h" +#include "ovs-numa.h" +#include "ovs-rcu.h" #include "ovs-thread.h" #include "ovstest.h" #include "timeval.h" @@ -318,7 +321,10 @@ mpsc_queue_insert_thread(void *aux_) unsigned int id; size_t i; + ovsrcu_quiesce_start(); + atomic_add(&aux->thread_id, 1u, &id); + ovs_numa_thread_setaffinity_core((id + 1) * 2); n_elems_per_thread = n_elems / n_threads; th_elements = &elements[id * n_elems_per_thread]; @@ -358,6 +364,7 @@ benchmark_mpsc_queue(void) aux.queue = &queue; atomic_store(&aux.thread_id, 0); + ovs_numa_thread_setaffinity_core(0); for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) { mpsc_queue_insert(&queue, &elements[i].node.mpscq); @@ -424,7 +431,7 @@ benchmark_mpsc_queue(void) struct list_aux { struct ovs_list *list; - struct ovs_mutex *lock; + struct ovs_spin *lock; atomic_uint thread_id; }; @@ -438,7 +445,10 @@ locked_list_insert_thread(void *aux_) unsigned int id; size_t i; + ovsrcu_quiesce_start(); + atomic_add(&aux->thread_id, 1u, &id); + ovs_numa_thread_setaffinity_core((id + 1) * 2); n_elems_per_thread = n_elems / n_threads; th_elements = &elements[id * n_elems_per_thread]; @@ -446,9 +456,9 @@ locked_list_insert_thread(void *aux_) xgettimeofday(&start); for (i = 0; i < n_elems_per_thread; i++) { - ovs_mutex_lock(aux->lock); + ovs_spin_lock(aux->lock); ovs_list_push_front(aux->list, &th_elements[i].node.list); - ovs_mutex_unlock(aux->lock); + ovs_spin_unlock(aux->lock); } thread_working_ms[id] = elapsed(&start); @@ -462,7 +472,7 @@ locked_list_insert_thread(void *aux_) static void benchmark_list(void) { - struct ovs_mutex lock; + struct ovs_spin lock; struct ovs_list list; struct element *elem; struct timeval start; @@ -477,18 +487,19 @@ benchmark_list(void) memset(elements, 0, n_elems * sizeof *elements); memset(thread_working_ms, 0, n_threads * sizeof *thread_working_ms); - ovs_mutex_init(&lock); + ovs_spin_init(&lock); ovs_list_init(&list); aux.list = &list; aux.lock = &lock; atomic_store(&aux.thread_id, 0); + ovs_numa_thread_setaffinity_core(0); - ovs_mutex_lock(&lock); + ovs_spin_lock(&lock); for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) { ovs_list_push_front(&list, &elements[i].node.list); } - ovs_mutex_unlock(&lock); + ovs_spin_unlock(&lock); working = true; @@ -505,12 +516,21 @@ benchmark_list(void) counter = 0; epoch = 1; do { - ovs_mutex_lock(&lock); - LIST_FOR_EACH_POP (elem, node.list, &list) { - elem->mark = epoch; - counter++; + struct ovs_list *node = NULL; + + ovs_spin_lock(&lock); + if (!ovs_list_is_empty(&list)) { + node = ovs_list_pop_front(&list); + } + ovs_spin_unlock(&lock); + + if (!node) { + continue; } - ovs_mutex_unlock(&lock); + + elem = CONTAINER_OF(node, struct element, node.list); + elem->mark = epoch; + counter++; if (epoch == UINT64_MAX) { epoch = 0; } @@ -525,12 +545,12 @@ benchmark_list(void) avg /= n_threads; /* Elements might have been inserted before threads were joined. */ - ovs_mutex_lock(&lock); + ovs_spin_lock(&lock); LIST_FOR_EACH_POP (elem, node.list, &list) { elem->mark = epoch; counter++; } - ovs_mutex_unlock(&lock); + ovs_spin_unlock(&lock); printf(" list: %6d", elapsed(&start)); for (i = 0; i < n_threads; i++) { @@ -566,7 +586,10 @@ guarded_list_insert_thread(void *aux_) unsigned int id; size_t i; + ovsrcu_quiesce_start(); + atomic_add(&aux->thread_id, 1u, &id); + ovs_numa_thread_setaffinity_core((id + 1) * 2); n_elems_per_thread = n_elems / n_threads; th_elements = &elements[id * n_elems_per_thread]; @@ -608,6 +631,7 @@ benchmark_guarded_list(void) aux.glist = &glist; atomic_store(&aux.thread_id, 0); + ovs_numa_thread_setaffinity_core(0); for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) { guarded_list_push_back(&glist, &elements[i].node.list, n_elems); @@ -680,6 +704,9 @@ run_benchmarks(struct ovs_cmdl_context *ctx) long int l_elems; size_t i; + vlog_set_levels(NULL, VLF_ANY_DESTINATION, VLL_OFF); + ovs_numa_init(); + l_elems = strtol(ctx->argv[1], NULL, 10); l_threads = strtol(ctx->argv[2], NULL, 10); ovs_assert(l_elems > 0 && l_threads > 0); --- _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev