The receive callback was not safe with multiple queues.
If one receive queue callback decides to take a sample it
needs to add that sample and do atomic update to the previous
TSC sample value. Add a new lock for that.

Also, add code to handle TSC wraparound in comparison.
Perhaps this should move to rte_cycles.h?

Signed-off-by: Stephen Hemminger <step...@networkplumber.org>
Fixes: 5cd3cac9ed22 ("latency: added new library for latency stats")
Cc: sta...@dpdk.org
---
 lib/latencystats/rte_latencystats.c | 47 ++++++++++++++++++-----------
 1 file changed, 29 insertions(+), 18 deletions(-)

diff --git a/lib/latencystats/rte_latencystats.c 
b/lib/latencystats/rte_latencystats.c
index 6873a44a92..181c53dd0e 100644
--- a/lib/latencystats/rte_latencystats.c
+++ b/lib/latencystats/rte_latencystats.c
@@ -45,10 +45,19 @@ timestamp_dynfield(struct rte_mbuf *mbuf)
                        timestamp_dynfield_offset, rte_mbuf_timestamp_t *);
 }
 
+/* Compare two 64 bit timer counter but deal with wraparound correctly. */
+static inline bool tsc_after(uint64_t t0, uint64_t t1)
+{
+       return (int64_t)(t1 - t0) < 0;
+}
+
+#define tsc_before(a, b) tsc_after(b, a)
+
 static const char *MZ_RTE_LATENCY_STATS = "rte_latencystats";
 static int latency_stats_index;
+
+static rte_spinlock_t sample_lock = RTE_SPINLOCK_INITIALIZER;
 static uint64_t samp_intvl;
-static uint64_t timer_tsc;
 static uint64_t prev_tsc;
 
 #define LATENCY_AVG_SCALE     4
@@ -147,25 +156,27 @@ add_time_stamps(uint16_t pid __rte_unused,
                void *user_cb __rte_unused)
 {
        unsigned int i;
-       uint64_t diff_tsc, now;
+       uint64_t now = rte_rdtsc();
 
-       /*
-        * For every sample interval,
-        * time stamp is marked on one received packet.
-        */
-       now = rte_rdtsc();
-       for (i = 0; i < nb_pkts; i++) {
-               diff_tsc = now - prev_tsc;
-               timer_tsc += diff_tsc;
-
-               if ((pkts[i]->ol_flags & timestamp_dynflag) == 0
-                               && (timer_tsc >= samp_intvl)) {
-                       *timestamp_dynfield(pkts[i]) = now;
-                       pkts[i]->ol_flags |= timestamp_dynflag;
-                       timer_tsc = 0;
+       /* Check without locking */
+       if (likely(tsc_before(now, prev_tsc + samp_intvl)))
+               return nb_pkts;
+
+       /* Try and get sample, skip if sample is being done by other core. */
+       if (likely(rte_spinlock_trylock(&sample_lock))) {
+               for (i = 0; i < nb_pkts; i++) {
+                       struct rte_mbuf *m = pkts[i];
+
+                       /* skip if already timestamped */
+                       if (unlikely(m->ol_flags & timestamp_dynflag))
+                               continue;
+
+                       m->ol_flags |= timestamp_dynflag;
+                       *timestamp_dynfield(m) = now;
+                       prev_tsc = now;
+                       break;
                }
-               prev_tsc = now;
-               now = rte_rdtsc();
+               rte_spinlock_unlock(&sample_lock);
        }
 
        return nb_pkts;
-- 
2.47.2

Reply via email to