On Tue, 19 Dec 2023 18:45:54 +0000
Vincent Donnefort <vdonnef...@google.com> wrote:

> The tracing ring-buffers can be stored on disk or sent to network
> without any copy via splice. However the later doesn't allow real time
> processing of the traces. A solution is to give userspace direct access
> to the ring-buffer pages via a mapping. An application can now become a
> consumer of the ring-buffer, in a similar fashion to what trace_pipe
> offers.
> 
> Attached to this cover letter an example of consuming read for a
> ring-buffer, using libtracefs.
> 

I'm still testing this, but I needed to add this patch to fix two bugs. One
is that you are calling rb_wakeup_waiters() for both the buffer and the
cpu_buffer, and it needs to know which one to use the container_of() macro.

The other is a "goto unlock" that unlocks two locks where only one was taken.

-- Steve

diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index 35f3736f660b..987ad7bd1e8b 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -389,6 +389,7 @@ struct rb_irq_work {
        bool                            waiters_pending;
        bool                            full_waiters_pending;
        bool                            wakeup_full;
+       bool                            is_cpu_buffer;
 };
 
 /*
@@ -771,10 +772,20 @@ static void rb_update_meta_page(struct 
ring_buffer_per_cpu *cpu_buffer)
 static void rb_wake_up_waiters(struct irq_work *work)
 {
        struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, 
work);
-       struct ring_buffer_per_cpu *cpu_buffer =
-               container_of(rbwork, struct ring_buffer_per_cpu, irq_work);
+       struct ring_buffer_per_cpu *cpu_buffer;
+       struct trace_buffer *buffer;
+       int cpu;
 
-       rb_update_meta_page(cpu_buffer);
+       if (rbwork->is_cpu_buffer) {
+               cpu_buffer = container_of(rbwork, struct ring_buffer_per_cpu, 
irq_work);
+               rb_update_meta_page(cpu_buffer);
+       } else {
+               buffer = container_of(rbwork, struct trace_buffer, irq_work);
+               for_each_buffer_cpu(buffer, cpu) {
+                       cpu_buffer = buffer->buffers[cpu];
+                       rb_update_meta_page(cpu_buffer);
+               }
+       }
 
        wake_up_all(&rbwork->waiters);
        if (rbwork->full_waiters_pending || rbwork->wakeup_full) {
@@ -1569,6 +1580,7 @@ rb_allocate_cpu_buffer(struct trace_buffer *buffer, long 
nr_pages, int cpu)
        init_waitqueue_head(&cpu_buffer->irq_work.waiters);
        init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
        mutex_init(&cpu_buffer->mapping_lock);
+       cpu_buffer->irq_work.is_cpu_buffer = true;
 
        bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
                            GFP_KERNEL, cpu_to_node(cpu));
@@ -6209,7 +6221,8 @@ int ring_buffer_map(struct trace_buffer *buffer, int cpu)
 
        if (cpu_buffer->mapped) {
                WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped + 1);
-               goto unlock;
+               mutex_unlock(&cpu_buffer->mapping_lock);
+               return 0;
        }
 
        /* prevent another thread from changing buffer sizes */

Reply via email to