Hi Steve,

On Fri, 26 Jun 2020 21:00:42 -0400
Steven Rostedt <rost...@goodmis.org> wrote:

> From: "Steven Rostedt (VMware)" <rost...@goodmis.org>
> 
> Up until now, if an event is interrupted while it is recorded by an
> interrupt, and that interrupt records events, the time of those events will
> all be the same. This is because events only record the delta of the time
> since the previous event (or beginning of a page), and to handle updating
> the time keeping for that of nested events is extremely racy. After years of
> thinking about this and several failed attempts, I finally have a solution
> to solve this puzzle.
> 
> The problem is that you need to atomically calculate the delta and then
> update the time stamp you made the delta from, as well as then record it
> into the buffer, all this while at any time an interrupt can come in and
> do the same thing. This is easy to solve with heavy weight atomics, but that
> would be detrimental to the performance of the ring buffer. The current
> state of affairs sacrificed the time deltas for nested events for
> performance.
> 
> The reason for previous failed attempts at solving this puzzle was because I
> was trying to completely avoid slow atomic operations like cmpxchg. I final
> came to the conclusion to always avoid cmpxchg is not possible, which is why
> those previous attempts always failed. But it is possible to pick one path
> (the most common case) and avoid cmpxchg in that path, which is the "fast
> path". The most common case is that an event will not be interrupted and
> have other events added into it. An event can detect if it has
> interrupted another event, and for these cases we can make it the slow
> path and use the heavy operations like cmpxchg.
> 
> One more player was added to the game that made this possible, and that is
> the "absolute timestamp" (by Tom Zanussi) that allows us to inject a full 59
> bit time stamp. (Of course this breaks if a machine is running for more than
> 18 years without a reboot!).
> 
> There's barrier() placements around for being paranoid, even when they
> are not needed because of other atomic functions near by. But those
> should not hurt, as if they are not needed, they basically become a nop.
> 
> Note, this also makes the race window much smaller, which means there
> are less slow paths to slow down the performance.
> 
> Here's the design of this solution:
> 
>  All this is per cpu, and only needs to worry about nested events (not
>  parallel events).
>

This looks basically good to me, but I have some comments below.
(just a clean up)
 
> The players:
> 
>  write_tail: The index in the buffer where new events can be written to.
>      It is incremented via local_add() to reserve space for a new event.
> 
>  before_stamp: A time stamp set by all events before reserving space.
> 
>  write_stamp: A time stamp updated by events after it has successfully
>      reserved space.

So these stamps works like a seq-lock to detect interruption (from both
interrupted process and interrpting process)

> 
>  next_write: A copy of "write_tail" used to help with races.

It seems this player does nothing.

> 
>       /* Save the current position of write */
>  [A]  w = local_read(write_tail);
>       barrier();
>       /* Read both before and write stamps before touching anything */
>       before = READ_ONCE(before_stamp);
>       after = local_read(write_stamp);

Are there any reason to use READ_ONCE() and local_read()?
(In the code, both are local64_read())

>       barrier();
> 
>       /*
>        * If before and after are the same, then this event is not
>        * interrupting a time update. If it is, then reserve space for adding
>        * a full time stamp (this can turn into a time extend which is
>        * just an extended time delta but fill up the extra space).
>        */
>       if (after != before)
>               abs = true;
> 
>       ts = clock();
> 
>       /* Now update the before_stamp (everyone does this!) */
>  [B]  WRITE_ONCE(before_stamp, ts);
> 
>       /* Read the current next_write and update it to what we want write
>        * to be after we reserve space. */
>       next = READ_ONCE(next_write);
>       WRITE_ONCE(next_write, w + len);

This seems meaningless, because neither "next" nor "next_write"
are not refered.

> 
>       /* Now reserve space on the buffer */
>  [C]  write = local_add_return(len, write_tail);
> 
>       /* Set tail to be were this event's data is */
>       tail = write - len;
> 
>       if (w == tail) {
> 
>               /* Nothing interrupted this between A and C */
>  [D]          local_set(write_stamp, ts);
>               barrier();
>  [E]          save_before = READ_ONCE(before_stamp);
> 
>               if (!abs) {
>                       /* This did not interrupt a time update */
>                       delta = ts - after;
>               } else {
>                       delta = ts; /* The full time stamp will be in use */
>               }
>               if (ts != save_before) {
>                       /* slow path - Was interrupted between C and E */
>                       /* The update to write_stamp could have overwritten the 
> update to
>                        * it by the interrupting event, but before and after 
> should be
>                        * the same for all completed top events */
>                       after = local_read(write_stamp);
>                       if (save_before > after)
>                               local_cmpxchg(write_stamp, after, save_before);
>               }
>       } else {
>               /* slow path - Interrupted between A and C */
> 
>               after = local_read(write_stamp);
>               temp_ts = clock();
>               barrier();
>  [F]          if (write == local_read(write_tail) && after < temp_ts) {
>                       /* This was not interrupted since C and F
>                        * The last write_stamp is still valid for the previous 
> event
>                        * in the buffer. */
>                       delta = temp_ts - after;
>                       /* OK to keep this new time stamp */
>                       ts = temp_ts;
>               } else {
>                       /* Interrupted between C and F
>                        * Well, there's no use to try to know what the time 
> stamp
>                        * is for the previous event. Just set delta to zero and
>                        * be the same time as that event that interrupted us 
> before
>                        * the reservation of the buffer. */
> 
>                       delta = 0;
>               }
>               /* No need to use full timestamps here */
>               abs = 0;
>       }
> 
> Link: https://lkml.kernel.org/r/20200625094454.73279...@oasis.local.home
> 
> Signed-off-by: Steven Rostedt (VMware) <rost...@goodmis.org>
> ---
>  kernel/trace/ring_buffer.c | 281 ++++++++++++++++++++++++-------------
>  1 file changed, 186 insertions(+), 95 deletions(-)
> 
> diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
> index 00867ff82412..4f13ae38b8f8 100644
> --- a/kernel/trace/ring_buffer.c
> +++ b/kernel/trace/ring_buffer.c
> @@ -27,6 +27,7 @@
>  #include <linux/oom.h>
>  
>  #include <asm/local.h>
> +#include <asm/local64.h>
>  
>  static void update_pages_handler(struct work_struct *work);
>  
> @@ -418,6 +419,17 @@ struct rb_event_info {
>       int                     add_timestamp;
>  };
>  
> +/*
> + * Used for the add_timestamp
> + *  NONE
> + *  NORMAL - may be for either time extend or absolute
> + *  FORCE - force a full time stamp.
> + */
> +enum {
> +     RB_ADD_STAMP_NONE,
> +     RB_ADD_STAMP_NORMAL,
> +     RB_ADD_STAMP_FORCE
> +};
>  /*
>   * Used for which event context the event is in.
>   *  NMI     = 0
> @@ -470,7 +482,9 @@ struct ring_buffer_per_cpu {
>       size_t                          shortest_full;
>       unsigned long                   read;
>       unsigned long                   read_bytes;
> -     u64                             write_stamp;
> +     unsigned long                   next_write;
> +     local64_t                       write_stamp;
> +     local64_t                       before_stamp;
>       u64                             read_stamp;
>       /* ring buffer pages to update, > 0 to add, < 0 to remove */
>       long                            nr_pages_to_update;
> @@ -2416,16 +2430,13 @@ rb_update_event(struct ring_buffer_per_cpu 
> *cpu_buffer,
>       unsigned length = info->length;
>       u64 delta = info->delta;
>  
> -     /* Only a commit updates the timestamp */
> -     if (unlikely(!rb_event_is_commit(cpu_buffer, event)))
> -             delta = 0;
> -
>       /*
>        * If we need to add a timestamp, then we
>        * add it to the start of the reserved space.
>        */
>       if (unlikely(info->add_timestamp)) {
> -             bool abs = ring_buffer_time_stamp_abs(cpu_buffer->buffer);
> +             bool abs = info->add_timestamp == RB_ADD_STAMP_FORCE ||
> +                     ring_buffer_time_stamp_abs(cpu_buffer->buffer);
>  
>               event = rb_add_time_stamp(event, abs ? info->delta : delta, 
> abs);
>               length -= RB_LEN_TIME_EXTEND;
> @@ -2480,6 +2491,39 @@ static inline bool sched_clock_stable(void)
>  }
>  #endif
>  
> +static __always_inline bool
> +rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
> +                struct ring_buffer_event *event)
> +{
> +     unsigned long addr = (unsigned long)event;
> +     unsigned long index;
> +
> +     index = rb_event_index(event);
> +     addr &= PAGE_MASK;
> +
> +     return cpu_buffer->commit_page->page == (void *)addr &&
> +             rb_commit_index(cpu_buffer) == index;
> +}
> +
> +static u64 rb_time_delta(struct ring_buffer_event *event)
> +{
> +     switch (event->type_len) {
> +     case RINGBUF_TYPE_PADDING:
> +             return 0;
> +
> +     case RINGBUF_TYPE_TIME_EXTEND:
> +             return ring_buffer_event_time_stamp(event);
> +
> +     case RINGBUF_TYPE_TIME_STAMP:
> +             return 0;
> +
> +     case RINGBUF_TYPE_DATA:
> +             return event->time_delta;
> +     default:
> +             return 0;
> +     }
> +}
> +
>  static inline int
>  rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
>                 struct ring_buffer_event *event)
> @@ -2488,6 +2532,8 @@ rb_try_to_discard(struct ring_buffer_per_cpu 
> *cpu_buffer,
>       struct buffer_page *bpage;
>       unsigned long index;
>       unsigned long addr;
> +     u64 write_stamp;
> +     u64 delta;
>  
>       new_index = rb_event_index(event);
>       old_index = new_index + rb_event_ts_length(event);
> @@ -2496,10 +2542,32 @@ rb_try_to_discard(struct ring_buffer_per_cpu 
> *cpu_buffer,
>  
>       bpage = READ_ONCE(cpu_buffer->tail_page);
>  
> +     delta = rb_time_delta(event);
> +
> +     write_stamp = local64_read(&cpu_buffer->write_stamp);
> +
> +     /* Make sure the write stamp is read before testing the location */
> +     barrier();
> +
>       if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {
>               unsigned long write_mask =
>                       local_read(&bpage->write) & ~RB_WRITE_MASK;
>               unsigned long event_length = rb_event_length(event);
> +             u64 ret;
> +
> +             ret = local64_cmpxchg(&cpu_buffer->write_stamp, write_stamp, 
> write_stamp - delta);
> +             /* Something came in, can't discard */
> +             if (ret != write_stamp)
> +                     return 0;
> +
> +             /*
> +              * If an event were to come in now, it would see that the
> +              * write_stamp and the before_stamp are different, and assume
> +              * that this event just added itself before updating
> +              * the write stamp. The interrupting event will fix the
> +              * write stamp for us, and use the before stamp as its delta.
> +              */
> +
>               /*
>                * This is on the tail page. It is possible that
>                * a write could come in and move the tail page
> @@ -2551,10 +2619,6 @@ rb_set_commit_to_write(struct ring_buffer_per_cpu 
> *cpu_buffer)
>               local_set(&cpu_buffer->commit_page->page->commit,
>                         rb_page_write(cpu_buffer->commit_page));
>               rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
> -             /* Only update the write stamp if the page has an event */
> -             if (rb_page_write(cpu_buffer->commit_page))
> -                     cpu_buffer->write_stamp =
> -                             cpu_buffer->commit_page->page->time_stamp;
>               /* add barrier to keep gcc from optimizing too much */
>               barrier();
>       }
> @@ -2626,54 +2690,10 @@ static inline void rb_event_discard(struct 
> ring_buffer_event *event)
>               event->time_delta = 1;
>  }
>  
> -static __always_inline bool
> -rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
> -                struct ring_buffer_event *event)
> -{
> -     unsigned long addr = (unsigned long)event;
> -     unsigned long index;
> -
> -     index = rb_event_index(event);
> -     addr &= PAGE_MASK;
> -
> -     return cpu_buffer->commit_page->page == (void *)addr &&
> -             rb_commit_index(cpu_buffer) == index;
> -}
> -
> -static __always_inline void
> -rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer,
> -                   struct ring_buffer_event *event)
> -{
> -     u64 delta;
> -
> -     /*
> -      * The event first in the commit queue updates the
> -      * time stamp.
> -      */
> -     if (rb_event_is_commit(cpu_buffer, event)) {
> -             /*
> -              * A commit event that is first on a page
> -              * updates the write timestamp with the page stamp
> -              */
> -             if (!rb_event_index(event))
> -                     cpu_buffer->write_stamp =
> -                             cpu_buffer->commit_page->page->time_stamp;
> -             else if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) {
> -                     delta = ring_buffer_event_time_stamp(event);
> -                     cpu_buffer->write_stamp += delta;
> -             } else if (event->type_len == RINGBUF_TYPE_TIME_STAMP) {
> -                     delta = ring_buffer_event_time_stamp(event);
> -                     cpu_buffer->write_stamp = delta;
> -             } else
> -                     cpu_buffer->write_stamp += event->time_delta;
> -     }
> -}
> -
>  static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
>                     struct ring_buffer_event *event)
>  {
>       local_inc(&cpu_buffer->entries);
> -     rb_update_write_stamp(cpu_buffer, event);
>       rb_end_commit(cpu_buffer);
>  }
>  
> @@ -2872,13 +2892,13 @@ rb_handle_timestamp(struct ring_buffer_per_cpu 
> *cpu_buffer,
>                 KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = 
> %llu\n%s",
>                 (unsigned long long)info->delta,
>                 (unsigned long long)info->ts,
> -               (unsigned long long)cpu_buffer->write_stamp,
> +               (unsigned long long)local64_read(&cpu_buffer->write_stamp),
>                 sched_clock_stable() ? "" :
>                 "If you just came from a suspend/resume,\n"
>                 "please switch to the trace global clock:\n"
>                 "  echo global > /sys/kernel/debug/tracing/trace_clock\n"
>                 "or add trace_clock=global to the kernel command line\n");
> -     info->add_timestamp = 1;
> +     info->add_timestamp = RB_ADD_STAMP_NORMAL;
>  }
>  
>  static struct ring_buffer_event *
> @@ -2887,8 +2907,36 @@ __rb_reserve_next(struct ring_buffer_per_cpu 
> *cpu_buffer,
>  {
>       struct ring_buffer_event *event;
>       struct buffer_page *tail_page;
> -     unsigned long tail, write;
> +     unsigned long tail, write, w, next;
> +     u64 delta, before, after;
> +     bool abs = false;
> +
> +     /* Don't let the compiler play games with cpu_buffer->tail_page */
> +     tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page);
> +
> + /*A*/       w = local_read(&tail_page->write) & RB_WRITE_MASK;
> +     barrier();
> +     before = local64_read(&cpu_buffer->before_stamp);
> +     after = local64_read(&cpu_buffer->write_stamp);
> +     barrier();
> +     info->ts = rb_time_stamp(cpu_buffer->buffer);
> +
> +     if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) {
> +             info->delta = info->ts;
> +             abs = true;
> +     } else {
> +             info->delta = info->ts - after;
> +     }
> +
> +     if (unlikely(test_time_stamp(info->delta)))
> +             rb_handle_timestamp(cpu_buffer, info);
>  
> +     /*
> +      * If interrupting an event time update, we may need an absolute 
> timestamp.
> +      * Don't bother if this is the start of a new page (w == 0).
> +      */
> +     if (unlikely(before != after && w))
> +             info->add_timestamp = RB_ADD_STAMP_FORCE;
>       /*
>        * If the time delta since the last event is too big to
>        * hold in the time field of the event, then we append a
> @@ -2897,25 +2945,91 @@ __rb_reserve_next(struct ring_buffer_per_cpu 
> *cpu_buffer,
>       if (unlikely(info->add_timestamp))
>               info->length += RB_LEN_TIME_EXTEND;
>  
> -     /* Don't let the compiler play games with cpu_buffer->tail_page */
> -     tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page);
> -     write = local_add_return(info->length, &tail_page->write);
> +     next = READ_ONCE(cpu_buffer->next_write);
> +     WRITE_ONCE(cpu_buffer->next_write, w + info->length);

So, this next may have no effect.

Thank you,

> +
> + /*B*/       local64_set(&cpu_buffer->before_stamp, info->ts);
> +
> + /*C*/       write = local_add_return(info->length, &tail_page->write);
>  
>       /* set write to only the index of the write */
>       write &= RB_WRITE_MASK;
> +
>       tail = write - info->length;
>  
> +     /* See if we shot pass the end of this buffer page */
> +     if (unlikely(write > BUF_PAGE_SIZE)) {
> +             if (tail != w) {
> +                     /* before and after may now different, fix it up*/
> +                     before = local64_read(&cpu_buffer->before_stamp);
> +                     after = local64_read(&cpu_buffer->write_stamp);
> +                     if (before != after)
> +                             
> (void)local64_cmpxchg(&cpu_buffer->before_stamp, before, after);
> +             }
> +             return rb_move_tail(cpu_buffer, tail, info);
> +     }
> +
> +     if (likely(tail == w)) {
> +             u64 save_before;
> +
> +             /* Nothing interrupted us between A and C */
> + /*D*/               local64_set(&cpu_buffer->write_stamp, info->ts);
> +             barrier();
> + /*E*/               save_before = local64_read(&cpu_buffer->before_stamp);
> +             if (likely(info->add_timestamp != RB_ADD_STAMP_FORCE))
> +                     /* This did not interrupt any time update */
> +                     info->delta = info->ts - after;
> +             else
> +                     /* Just use full timestamp for inerrupting event */
> +                     info->delta = info->ts;
> +             barrier();
> +             if (unlikely(info->ts != save_before)) {
> +                     /* SLOW PATH - Interrupted between C and E */
> +
> +                     after = local64_read(&cpu_buffer->write_stamp);
> +                     /* Write stamp must only go forward */
> +                     if (save_before > after) {
> +                             /*
> +                              * We do not care about the result, only that
> +                              * it gets updated atomically.
> +                              */
> +                             (void)local64_cmpxchg(&cpu_buffer->write_stamp, 
> after, save_before);
> +                     }
> +             }
> +     } else {
> +             u64 ts;
> +             /* SLOW PATH - Interrupted between A and C */
> +             after = local64_read(&cpu_buffer->write_stamp);
> +             ts = rb_time_stamp(cpu_buffer->buffer);
> +             barrier();
> + /*E*/               if (write == (local_read(&tail_page->write) & 
> RB_WRITE_MASK) &&
> +                 after < ts) {
> +                     /* Nothing came after this event between C and E */
> +                     info->delta = ts - after;
> +                     (void)local64_cmpxchg(&cpu_buffer->write_stamp, after, 
> info->ts);
> +                     info->ts = ts;
> +             } else {
> +                     /*
> +                      * Interrupted beween C and E:
> +                      * Lost the previous events time stamp. Just set the
> +                      * delta to zero, and this will be the same time as
> +                      * the event this event interrupted. And the events that
> +                      * came after this will still be correct (as they would
> +                      * have built their delta on the previous event.
> +                      */
> +                     info->delta = 0;
> +             }
> +             if (info->add_timestamp == RB_ADD_STAMP_FORCE)
> +                     info->add_timestamp = RB_ADD_STAMP_NORMAL;
> +     }
> +
>       /*
>        * If this is the first commit on the page, then it has the same
>        * timestamp as the page itself.
>        */
> -     if (!tail && !ring_buffer_time_stamp_abs(cpu_buffer->buffer))
> +     if (unlikely(!tail && info->add_timestamp != RB_ADD_STAMP_FORCE && 
> !abs))
>               info->delta = 0;
>  
> -     /* See if we shot pass the end of this buffer page */
> -     if (unlikely(write > BUF_PAGE_SIZE))
> -             return rb_move_tail(cpu_buffer, tail, info);
> -
>       /* We reserved something on the buffer */
>  
>       event = __rb_page_index(tail_page, tail);
> @@ -2944,9 +3058,9 @@ rb_reserve_next_event(struct trace_buffer *buffer,
>       struct ring_buffer_event *event;
>       struct rb_event_info info;
>       int nr_loops = 0;
> -     u64 diff;
>  
>       rb_start_commit(cpu_buffer);
> +     /* The commit page can not change after this */
>  
>  #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
>       /*
> @@ -2965,7 +3079,7 @@ rb_reserve_next_event(struct trace_buffer *buffer,
>  
>       info.length = rb_calculate_event_length(length);
>   again:
> -     info.add_timestamp = 0;
> +     info.add_timestamp = RB_ADD_STAMP_NONE;
>       info.delta = 0;
>  
>       /*
> @@ -2980,22 +3094,6 @@ rb_reserve_next_event(struct trace_buffer *buffer,
>       if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
>               goto out_fail;
>  
> -     info.ts = rb_time_stamp(cpu_buffer->buffer);
> -     diff = info.ts - cpu_buffer->write_stamp;
> -
> -     /* make sure this diff is calculated here */
> -     barrier();
> -
> -     if (ring_buffer_time_stamp_abs(buffer)) {
> -             info.delta = info.ts;
> -             rb_handle_timestamp(cpu_buffer, &info);
> -     } else /* Did the write stamp get updated already? */
> -             if (likely(info.ts >= cpu_buffer->write_stamp)) {
> -             info.delta = diff;
> -             if (unlikely(test_time_stamp(info.delta)))
> -                     rb_handle_timestamp(cpu_buffer, &info);
> -     }
> -
>       event = __rb_reserve_next(cpu_buffer, &info);
>  
>       if (unlikely(PTR_ERR(event) == -EAGAIN)) {
> @@ -3004,11 +3102,8 @@ rb_reserve_next_event(struct trace_buffer *buffer,
>               goto again;
>       }
>  
> -     if (!event)
> -             goto out_fail;
> -
> -     return event;
> -
> +     if (likely(event))
> +             return event;
>   out_fail:
>       rb_end_commit(cpu_buffer);
>       return NULL;
> @@ -3154,11 +3249,6 @@ void ring_buffer_discard_commit(struct trace_buffer 
> *buffer,
>       if (rb_try_to_discard(cpu_buffer, event))
>               goto out;
>  
> -     /*
> -      * The commit is still visible by the reader, so we
> -      * must still update the timestamp.
> -      */
> -     rb_update_write_stamp(cpu_buffer, event);
>   out:
>       rb_end_commit(cpu_buffer);
>  
> @@ -4475,8 +4565,9 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
>       cpu_buffer->read = 0;
>       cpu_buffer->read_bytes = 0;
>  
> -     cpu_buffer->write_stamp = 0;
> -     cpu_buffer->read_stamp = 0;
> +     local64_set(&cpu_buffer->write_stamp, 0);
> +     local64_set(&cpu_buffer->before_stamp, 0);
> +     cpu_buffer->next_write = 0;
>  
>       cpu_buffer->lost_events = 0;
>       cpu_buffer->last_overrun = 0;
> -- 
> 2.26.2
> 
> 


-- 
Masami Hiramatsu <mhira...@kernel.org>

Reply via email to