On 9 Feb 2026, at 14:29, Eli Britstein wrote:
> From: Gaetan Rivet <[email protected]>
>
> Add a way to schedule executions with the RCU using memory embedded
> within the object being scheduled, if applicable.
>
> This way, freeing a high volume of objects does not require many small
> allocations, reducing memory pressure and heap fragmentation.
>
> This fragmentation is seen as the heap grows instead of shrinking when
> a high volume of objects are freed using RCU.
Hi Gaetan,
Thanks for the patch. I have some comments inline below (not a complete
review yet).
I'm curious about the design choice: why implement this as a parallel
mechanism rather than refactoring the existing RCU implementation to handle
both cases? Was this driven by performance concerns, or are there fundamental
differences that require separate code paths?
Cheers,
Eelco
> Signed-off-by: Gaetan Rivet <[email protected]>
> Co-authored-by: Eli Britstein <[email protected]>
> Signed-off-by: Eli Britstein <[email protected]>
> ---
> lib/ovs-rcu.c | 125 ++++++++++++++++++++++++++++++++++------
> lib/ovs-rcu.h | 39 +++++++++++++
> tests/automake.mk | 1 +
> tests/library.at | 4 ++
> tests/test-rcu-inline.c | 118 +++++++++++++++++++++++++++++++++++++
> 5 files changed, 270 insertions(+), 17 deletions(-)
> create mode 100644 tests/test-rcu-inline.c
>
> diff --git a/lib/ovs-rcu.c b/lib/ovs-rcu.c
> index 49afcc55c..539d17a7c 100644
> --- a/lib/ovs-rcu.c
> +++ b/lib/ovs-rcu.c
> @@ -20,6 +20,7 @@
> #include "fatal-signal.h"
> #include "guarded-list.h"
> #include "latch.h"
> +#include "mpsc-queue.h"
> #include "openvswitch/list.h"
> #include "ovs-thread.h"
> #include "openvswitch/poll-loop.h"
> @@ -49,6 +50,7 @@ struct ovsrcu_perthread {
>
> uint64_t seqno;
> struct ovsrcu_cbset *cbset;
> + bool do_inline;
Do we need this bool? It's not set atomically or under any lock, and once set
to true it stays true forever for that thread. Would it be better to check if
the inline_queue is empty instead of relying on this sticky flag?
> char name[16]; /* This thread's name. */
> };
>
> @@ -61,6 +63,8 @@ static struct ovs_mutex ovsrcu_threads_mutex;
> static struct guarded_list flushed_cbsets;
> static struct seq *flushed_cbsets_seq;
>
> +static struct seq *inline_seq;
If do_inline is removed (always signal), inline_seq becomes redundant with
global_seqno.
> + static struct latch postpone_exit;
> static struct ovs_barrier postpone_barrier;
>
> @@ -68,7 +72,8 @@ static void ovsrcu_init_module(void);
> static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool);
> static void ovsrcu_flush_cbset(struct ovsrcu_perthread *);
> static void ovsrcu_unregister__(struct ovsrcu_perthread *);
> -static bool ovsrcu_call_postponed(void);
> +static bool ovsrcu_call_inline(uint64_t);
> +static bool ovsrcu_call_postponed(struct ovs_list *cbsets);
> static void *ovsrcu_postpone_thread(void *arg OVS_UNUSED);
>
> static struct ovsrcu_perthread *
> @@ -85,6 +90,7 @@ ovsrcu_perthread_get(void)
> perthread = xmalloc(sizeof *perthread);
> perthread->seqno = seq_read(global_seqno);
> perthread->cbset = NULL;
> + perthread->do_inline = false;
> ovs_strlcpy(perthread->name, name[0] ? name : "main",
> sizeof perthread->name);
>
> @@ -112,7 +118,11 @@ static void
> ovsrcu_quiesced(void)
> {
> if (single_threaded()) {
> - ovsrcu_call_postponed();
> + struct ovs_list cbsets = OVS_LIST_INITIALIZER(&cbsets);
> +
> + ovsrcu_call_inline(seq_read(global_seqno));
> + guarded_list_pop_all(&flushed_cbsets, &cbsets);
> + ovsrcu_call_postponed(&cbsets);
> } else {
> static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
> if (ovsthread_once_start(&once)) {
> @@ -156,6 +166,9 @@ ovsrcu_quiesce(void)
> if (perthread->cbset) {
> ovsrcu_flush_cbset(perthread);
> }
> + if (perthread->do_inline) {
> + seq_change(inline_seq);
> + }
> seq_change(global_seqno);
>
> ovsrcu_quiesced();
> @@ -174,6 +187,9 @@ ovsrcu_try_quiesce(void)
> if (perthread->cbset) {
> ovsrcu_flush_cbset__(perthread, true);
> }
> + if (perthread->do_inline) {
> + seq_change_protected(inline_seq);
> + }
> seq_change_protected(global_seqno);
> seq_unlock();
> ovsrcu_quiesced();
> @@ -189,21 +205,28 @@ ovsrcu_is_quiescent(void)
> return pthread_getspecific(perthread_key) == NULL;
> }
>
> -void
> -ovsrcu_synchronize(void)
> +static uint64_t
> +ovsrcu_synchronize_(struct ovs_list *cbsets)
We use a double _ for specific variants.
> {
> unsigned int warning_threshold = 1000;
> uint64_t target_seqno;
> long long int start;
>
> if (single_threaded()) {
> - return;
> + return UINT64_MAX;
> }
>
> target_seqno = seq_read(global_seqno);
> ovsrcu_quiesce_start();
> start = time_msec();
>
> + if (cbsets != NULL) {
> + /* Move the flushed 'cbsets' after 'ovsrcu_quiesce_start',
> + * as this function has the potential in single-threaded mode
> + * to itself execute those 'cbsets'. */
This comment doesn't make sense. We can only reach here in multi-threaded mode
(single-threaded returns early), so the described scenario is impossible.
> + guarded_list_pop_all(&flushed_cbsets, cbsets);
> + }
> +
> for (;;) {
> uint64_t cur_seqno = seq_read(global_seqno);
> struct ovsrcu_perthread *perthread;
> @@ -237,6 +260,15 @@ ovsrcu_synchronize(void)
> poll_block();
> }
> ovsrcu_quiesce_end();
> +
> + /* Return the 'seqno' that is safe to consider reached by all threads. */
> + return target_seqno;
> +}
> +
> +void
> +ovsrcu_synchronize(void)
> +{
> + ovs_ignore(ovsrcu_synchronize_(NULL));
> }
>
> /* Waits until as many postponed callbacks as possible have executed.
> @@ -275,11 +307,57 @@ ovsrcu_exit(void)
> * infinite loop. This function is just for making memory leaks easier
> to
> * spot so there's no point in breaking things on that basis. */
> for (int i = 0; i < 8; i++) {
> - ovsrcu_synchronize();
> - if (!ovsrcu_call_postponed()) {
> + struct ovs_list cbsets = OVS_LIST_INITIALIZER(&cbsets);
> + uint64_t target = ovsrcu_synchronize_(&cbsets);
> + bool inline_active;
> + bool cbsets_active;
> +
> + /* Both RCU calls must be examined for activity. */
> + inline_active = ovsrcu_call_inline(target);
The inline naming is confusing. Consider using "embedded" instead, like
ovsrcu_postpone_embedded(). For consistency, this function would be
ovsrcu_call_postpone_embedded().
> + cbsets_active = ovsrcu_call_postponed(&cbsets);
> + if (!inline_active && !cbsets_active) {
> + break;
> + }
> + }
> +}
> +
> +static struct mpsc_queue inline_queue =
> MPSC_QUEUE_INITIALIZER(&inline_queue);
> +
> +void
> +ovsrcu_postpone_inline__(void (*function)(void *aux), void *aux,
> + struct ovsrcu_inline_node *rcu_node)
> +{
> + struct ovsrcu_perthread *perthread = ovsrcu_perthread_get();
> +
> + rcu_node->seqno = perthread->seqno;
> + rcu_node->cb = function;
> + rcu_node->aux = aux;
> + mpsc_queue_insert(&inline_queue, &rcu_node->node);
> +
> + perthread->do_inline = true;
> +}
> +
> +static bool
> +ovsrcu_call_inline(uint64_t target_seqno)
> +{
> + struct mpsc_queue_node *msg;
> + unsigned int count = 0;
> +
> + mpsc_queue_acquire(&inline_queue);
> + MPSC_QUEUE_FOR_EACH_POP (msg, &inline_queue) {
> + struct ovsrcu_inline_node *node;
> +
> + node = CONTAINER_OF(msg, struct ovsrcu_inline_node, node);
> + if (node->seqno >= target_seqno) {
> + mpsc_queue_push_front(&inline_queue, msg);
> break;
> }
> + node->cb(node->aux);
> + count++;
> }
> + mpsc_queue_release(&inline_queue);
> +
> + return count > 0;
> }
>
> /* Registers 'function' to be called, passing 'aux' as argument, after the
> @@ -327,19 +405,18 @@ ovsrcu_postpone__(void (*function)(void *aux), void
> *aux)
> }
>
> static bool OVS_NO_SANITIZE_FUNCTION
> -ovsrcu_call_postponed(void)
> +ovsrcu_call_postponed(struct ovs_list *cbsets)
> {
> struct ovsrcu_cbset *cbset;
> - struct ovs_list cbsets;
>
> - guarded_list_pop_all(&flushed_cbsets, &cbsets);
> - if (ovs_list_is_empty(&cbsets)) {
> + if (cbsets == NULL) {
> + return false;
> + }
> + if (ovs_list_is_empty(cbsets)) {
These two ifs could be combined. Also, why does cbsets need to be a parameter?
(See earlier comment about the confusing comment in ovsrcu_synchronize_.)
> return false;
> }
>
> - ovsrcu_synchronize();
> -
> - LIST_FOR_EACH_POP (cbset, list_node, &cbsets) {
> + LIST_FOR_EACH_POP (cbset, list_node, cbsets) {
> struct ovsrcu_cb *cb;
>
> for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) {
> @@ -358,9 +435,19 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED)
> pthread_detach(pthread_self());
>
> while (!latch_is_set(&postpone_exit)) {
> - uint64_t seqno = seq_read(flushed_cbsets_seq);
> - if (!ovsrcu_call_postponed()) {
> - seq_wait(flushed_cbsets_seq, seqno);
> + struct ovs_list cbsets = OVS_LIST_INITIALIZER(&cbsets);
> + uint64_t cb_seqno = seq_read(flushed_cbsets_seq);
> + uint64_t target = ovsrcu_synchronize_(&cbsets);
> + uint64_t inline_seqno = seq_read(inline_seq);
> + bool inline_active;
> + bool cbsets_active;
> +
> + /* Both RCU calls must be examined for activity. */
> + inline_active = ovsrcu_call_inline(target);
> + cbsets_active = ovsrcu_call_postponed(&cbsets);
> + if (!inline_active && !cbsets_active) {
> + seq_wait(flushed_cbsets_seq, cb_seqno);
> + seq_wait(inline_seq, inline_seqno);
> latch_wait(&postpone_exit);
> poll_block();
> }
> @@ -399,6 +486,9 @@ ovsrcu_unregister__(struct ovsrcu_perthread *perthread)
> if (perthread->cbset) {
> ovsrcu_flush_cbset(perthread);
> }
> + if (perthread->do_inline) {
> + seq_change(inline_seq);
> + }
>
> ovs_mutex_lock(&ovsrcu_threads_mutex);
> ovs_list_remove(&perthread->list_node);
> @@ -440,6 +530,7 @@ ovsrcu_init_module(void)
>
> guarded_list_init(&flushed_cbsets);
> flushed_cbsets_seq = seq_create();
> + inline_seq = seq_create();
>
> ovsthread_once_done(&once);
> }
> diff --git a/lib/ovs-rcu.h b/lib/ovs-rcu.h
> index a1c15c126..ed756c1c2 100644
> --- a/lib/ovs-rcu.h
> +++ b/lib/ovs-rcu.h
> @@ -125,6 +125,22 @@
> * ovs_mutex_unlock(&mutex);
> * }
> *
> + * As an alternative to ovsrcu_postpone(), the same deferred execution can be
> + * achieved using ovsrcu_postpone_inline():
> + *
> + * struct deferrable {
> + * struct ovsrcu_inline_node rcu_node;
> + * };
> + *
> + * void
> + * deferred_free(struct deferrable *d)
> + * {
> + * ovsrcu_postpone_inline(free, d, rcu_node);
> + * }
> + *
> + * Using inline fields can be preferred sometimes to avoid the small
> + * allocations done in ovsrcu_postpone().
> + *
> * In some rare cases an object may not be addressable with a pointer, but
> only
> * through an array index (e.g. because it's provided by another library).
> It
> * is still possible to have RCU semantics by using the ovsrcu_index type.
> @@ -171,6 +187,7 @@
> */
>
> #include "compiler.h"
> +#include "mpsc-queue.h"
> #include "ovs-atomic.h"
>
> #if __GNUC__
> @@ -256,6 +273,28 @@ void ovsrcu_postpone__(void (*function)(void *aux), void
> *aux);
> (void) sizeof(*(ARG)), \
> ovsrcu_postpone__((void (*)(void *))(FUNCTION), ARG))
>
> +struct ovsrcu_inline_node {
> + struct mpsc_queue_node node;
> + void (*cb)(void *aux);
> + void *aux;
> + uint64_t seqno;
> +};
> +
> +/* Calls FUNCTION passing ARG as its pointer-type argument, which
> + * contains an 'ovsrcu_inline_node' as a field named MEMBER. The function
> + * is called following the next grace period. See 'Usage' above for an
> + * example.
> + */
> +void ovsrcu_postpone_inline__(void (*function)(void *aux), void *aux,
> + struct ovsrcu_inline_node *node);
> +#define ovsrcu_postpone_inline(FUNCTION, ARG, MEMBER) \
> + (/* Verify that ARG is appropriate for FUNCTION. */ \
> + (void) sizeof((FUNCTION)(ARG), 1), \
> + /* Verify that ARG is a pointer type. */ \
> + (void) sizeof(*(ARG)), \
> + ovsrcu_postpone_inline__((void (*)(void *))(FUNCTION), ARG, \
> + &(ARG)->MEMBER))
> +
> /* An array index protected by RCU semantics. This is an easier alternative
> to
> * an RCU protected pointer to a malloc'd int. */
> typedef struct { atomic_int v; } ovsrcu_index;
> diff --git a/tests/automake.mk b/tests/automake.mk
> index da569b022..da4d2e0b8 100644
> --- a/tests/automake.mk
> +++ b/tests/automake.mk
> @@ -500,6 +500,7 @@ tests_ovstest_SOURCES = \
> tests/test-random.c \
> tests/test-rcu.c \
> tests/test-rculist.c \
> + tests/test-rcu-inline.c \
> tests/test-reconnect.c \
> tests/test-rstp.c \
> tests/test-sflow.c \
> diff --git a/tests/library.at b/tests/library.at
> index 82ac80a27..16820ff49 100644
> --- a/tests/library.at
> +++ b/tests/library.at
> @@ -275,6 +275,10 @@ AT_SETUP([rcu])
> AT_CHECK([ovstest test-rcu], [0], [])
> AT_CLEANUP
>
> +AT_SETUP([rcu inline])
> +AT_CHECK([ovstest test-rcu-inline], [0], [])
> +AT_CLEANUP
> +
> AT_SETUP([stopwatch module])
> AT_CHECK([ovstest test-stopwatch], [0], [......
> ], [ignore])
> diff --git a/tests/test-rcu-inline.c b/tests/test-rcu-inline.c
> new file mode 100644
> index 000000000..c72410223
> --- /dev/null
> +++ b/tests/test-rcu-inline.c
> @@ -0,0 +1,118 @@
> +/*
> + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION &
> AFFILIATES.
> + * All rights reserved.
> + * SPDX-License-Identifier: Apache-2.0
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#include <config.h>
> +
> +#undef NDEBUG
> +#include "ovs-atomic.h"
> +#include "ovs-rcu.h"
> +#include "ovs-thread.h"
> +#include "ovstest.h"
> +#include "seq.h"
> +#include "timeval.h"
> +#include "util.h"
> +
> +#include "openvswitch/poll-loop.h"
> +
> +struct element {
> + struct ovsrcu_inline_node rcu_node;
> + struct seq *trigger;
> + atomic_bool wait;
> +};
> +
> +static void
> +do_inline(void *e_)
> +{
> + struct element *e = (struct element *) e_;
> +
> + seq_change(e->trigger);
> +}
> +
> +static void *
> +wait_main(void *aux)
> +{
> + struct element *e = aux;
> +
> + for (;;) {
> + bool wait;
> +
> + atomic_read(&e->wait, &wait);
> + if (!wait) {
> + break;
> + }
> + }
> +
> + seq_wait(e->trigger, seq_read(e->trigger));
> + poll_block();
> +
> + return NULL;
> +}
> +
> +static void
> +test_rcu_inline_main(bool multithread)
> +{
> + long long int timeout;
> + pthread_t waiter;
> + struct element e;
> + uint64_t seqno;
> +
> + atomic_init(&e.wait, true);
> +
> + if (multithread) {
> + waiter = ovs_thread_create("waiter", wait_main, &e);
> + }
> +
> + e.trigger = seq_create();
> + seqno = seq_read(e.trigger);
> +
> + ovsrcu_postpone_inline(do_inline, &e, rcu_node);
> +
> + /* Check that GC holds out until all threads are quiescent. */
> + timeout = time_msec();
> + if (multithread) {
> + timeout += 200;
> + }
> + while (time_msec() <= timeout) {
> + ovs_assert(seq_read(e.trigger) == seqno);
> + }
> +
> + atomic_store(&e.wait, false);
> +
> + seq_wait(e.trigger, seqno);
> + poll_timer_wait_until(time_msec() + 200);
> + poll_block();
> +
> + /* Verify that GC executed. */
> + ovs_assert(seq_read(e.trigger) != seqno);
> + seq_destroy(e.trigger);
> +
> + if (multithread) {
> + xpthread_join(waiter, NULL);
> + }
> +}
> +
> +static void
> +test_rcu_inline(int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
> +{
> + const bool multithread = true;
> +
> + test_rcu_inline_main(!multithread);
> + test_rcu_inline_main(multithread);
> +}
> +
> +OVSTEST_REGISTER("test-rcu-inline", test_rcu_inline);
> --
> 2.34.1
_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev