On Thu, Jun 18, 2026 at 5:27 PM Tamir Duberstein <[email protected]> wrote:
>
> After consuming the last visible record, ringbuf_process_ring()
> publishes the consumer position and checks the producer position. These
> operations lack a full StoreLoad barrier. A producer can therefore
> commit a new record but read the old consumer position while the
> consumer reads the old producer position. The producer sends no
> notification and the consumer waits despite a queued record.
>
> Insert a full barrier between publishing a consumer position and the
> next producer position load. When a record bound or callback ends the
> current invocation first, execute the barrier before returning so the
> load in a later invocation completes the same handshake.
>
> Add an edge-triggered epoll test that drains one record per call while a
> concurrent producer fills the ring. Without the barrier, a missed
> notification leaves the producer dropping records from a full ring while
> the consumer times out. Document that bounded consumers and callbacks
> that terminate consumption must drain before waiting again.
>
> Fixes: bf99c936f947 ("libbpf: Add BPF ring buffer support")
> Reported-by: Andrew Werner <[email protected]>
> Reported-by: Sashiko <[email protected]>
> Closes: 
> https://lore.kernel.org/bpf/[email protected]/
> Assisted-by: Codex:gpt-5.5
> Signed-off-by: Tamir Duberstein <[email protected]>
> ---
>  tools/lib/bpf/libbpf.h                           | 23 +++++++
>  tools/lib/bpf/ringbuf.c                          | 24 +++++--
>  tools/testing/selftests/bpf/prog_tests/ringbuf.c | 83 
> ++++++++++++++++++++++++
>  3 files changed, 123 insertions(+), 7 deletions(-)
>
> diff --git a/tools/lib/bpf/libbpf.h b/tools/lib/bpf/libbpf.h
> index ae46b17feaa6..3a649ed87034 100644
> --- a/tools/lib/bpf/libbpf.h
> +++ b/tools/lib/bpf/libbpf.h
> @@ -1440,6 +1440,11 @@ struct ring;
>  struct user_ring_buffer;
>
>  /* Callback-based consumption is unsupported for BPF_F_RB_OVERWRITE maps. */
> +/*
> + * A negative return stops consumption; non-negative values continue. 
> Stopping
> + * can leave records queued without a new readiness notification. Before
> + * waiting for readiness again, consume until no records remain.
> + */
>  typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size);
>
>  struct ring_buffer_opts {
> @@ -1456,6 +1461,20 @@ LIBBPF_API int ring_buffer__add(struct ring_buffer 
> *rb, int map_fd,
>                                 ring_buffer_sample_fn sample_cb, void *ctx);
>  LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms);
>  LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb);
> +
> +/**
> + * @brief **ring_buffer__consume_n()** consumes up to a requested number of
> + * records from a ring buffer manager without event polling.
> + *
> + * @param rb A ring buffer manager object.
> + * @param n Maximum number of records to consume.
> + * @return The number of records consumed, or a negative error code on 
> failure.
> + *
> + * Reaching the requested bound does not establish that every ring is empty.
> + * Records can remain queued without a new readiness notification. Before
> + * calling ring_buffer__poll() or waiting on ring_buffer__epoll_fd(), call
> + * ring_buffer__consume() until it returns 0.
> + */
>  LIBBPF_API int ring_buffer__consume_n(struct ring_buffer *rb, size_t n);
>  LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb);
>
> @@ -1538,6 +1557,10 @@ LIBBPF_API int ring__consume(struct ring *r);
>   * @param r A ringbuffer object.
>   * @param n Maximum number of records to consume.
>   * @return The number of records consumed, or a negative error code on 
> failure.
> + *
> + * Reaching the requested bound does not establish that the ring is empty.
> + * Records can remain queued without a new readiness notification. Before
> + * waiting on ring__map_fd(), call ring__consume() until it returns 0.
>   */
>  LIBBPF_API int ring__consume_n(struct ring *r, size_t n);
>
> diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c
> index 141f2cbe56eb..0598f6c2f7da 100644
> --- a/tools/lib/bpf/ringbuf.c
> +++ b/tools/lib/bpf/ringbuf.c
> @@ -271,7 +271,7 @@ static int64_t ringbuf_process_ring(struct ring *r, 
> size_t n)
>                 return 0;
>
>         cons_pos = __atomic_load_n(r->consumer_pos, __ATOMIC_ACQUIRE);
> -       do {
> +       for (;;) {
>                 got_new_data = false;
>                 prod_pos = __atomic_load_n(r->producer_pos, __ATOMIC_ACQUIRE);
>                 /* Positions wrap; the consumer cannot logically pass the 
> producer. */
> @@ -279,9 +279,9 @@ static int64_t ringbuf_process_ring(struct ring *r, 
> size_t n)
>                         len_ptr = r->data + (cons_pos & r->mask);
>                         len = __atomic_load_n(len_ptr, __ATOMIC_ACQUIRE);
>
> -                       /* sample not committed yet, bail out for now */
> +                       /* Retry a busy record once after publishing prior 
> records. */
>                         if (len & BPF_RINGBUF_BUSY_BIT)
> -                               goto done;
> +                               break;

so we do this one time retry just so that we can reuse the same memory
barrier, is that right? and similarly all those error-returning
conditions with cnt = err. Wouldn't it be cleaner and easier to
mentally follow to have `goto done` which would just duplicate memory
barrier and exit with whatever value err is?

>
>                         got_new_data = true;
>                         cons_pos += roundup_len(len);

[...]

Reply via email to