The pdump callback can race with other cpu's in the datapath.
Handle this by using reference counts and LSB in manner
similar to seqcount and bpf code.

Signed-off-by: Stephen Hemminger <step...@networkplumber.org>
---
 lib/pdump/rte_pdump.c | 48 +++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 46 insertions(+), 2 deletions(-)

diff --git a/lib/pdump/rte_pdump.c b/lib/pdump/rte_pdump.c
index ba75b828f2..bfd63fa8c2 100644
--- a/lib/pdump/rte_pdump.c
+++ b/lib/pdump/rte_pdump.c
@@ -12,6 +12,7 @@
 #include <rte_memzone.h>
 #include <rte_errno.h>
 #include <rte_string_fns.h>
+#include <rte_pause.h>
 #include <rte_pcapng.h>
 
 #include "rte_pdump.h"
@@ -62,6 +63,7 @@ static struct pdump_rxtx_cbs {
        const struct rte_bpf *filter;
        enum pdump_version ver;
        uint32_t snaplen;
+       RTE_ATOMIC(uint32_t) use_count;
 } rx_cbs[RTE_MAX_ETHPORTS][RTE_MAX_QUEUES_PER_PORT],
 tx_cbs[RTE_MAX_ETHPORTS][RTE_MAX_QUEUES_PER_PORT];
 
@@ -78,6 +80,36 @@ static struct {
        const struct rte_memzone *mz;
 } *pdump_stats;
 
+static void
+pdump_cb_wait(struct pdump_rxtx_cbs *cbs)
+{
+       /* make sure the data loads happens before the use count load */
+       rte_atomic_thread_fence(rte_memory_order_acquire);
+
+       /* wait until use_count is even (not in use) */
+       RTE_WAIT_UNTIL_MASKED(&cbs->use_count, 1, ==, 0, 
rte_memory_order_relaxed);
+}
+
+static __rte_always_inline void
+pdump_cb_hold(struct pdump_rxtx_cbs *cbs)
+{
+       uint32_t count = cbs->use_count + 1;
+
+       rte_atomic_store_explicit(&cbs->use_count, count, 
rte_memory_order_relaxed);
+
+       /* prevent stores after this from happening before the use_count update 
*/
+       rte_atomic_thread_fence(rte_memory_order_release);
+}
+
+static __rte_always_inline void
+pdump_cb_release(struct pdump_rxtx_cbs *cbs)
+{
+       uint32_t count = cbs->use_count + 1;
+
+       /* Synchronizes-with the load acquire in pdump_cb_wait */
+       rte_atomic_store_explicit(&cbs->use_count, count, 
rte_memory_order_release);
+}
+
 /* Create a clone of mbuf to be placed into ring. */
 static void
 pdump_copy(uint16_t port_id, uint16_t queue,
@@ -146,11 +178,14 @@ pdump_rx(uint16_t port, uint16_t queue,
        struct rte_mbuf **pkts, uint16_t nb_pkts,
        uint16_t max_pkts __rte_unused, void *user_params)
 {
-       const struct pdump_rxtx_cbs *cbs = user_params;
+       struct pdump_rxtx_cbs *cbs = user_params;
        struct rte_pdump_stats *stats = &pdump_stats->rx[port][queue];
 
+       pdump_cb_hold(cbs);
        pdump_copy(port, queue, RTE_PCAPNG_DIRECTION_IN,
                   pkts, nb_pkts, cbs, stats);
+       pdump_cb_release(cbs);
+
        return nb_pkts;
 }
 
@@ -158,14 +193,18 @@ static uint16_t
 pdump_tx(uint16_t port, uint16_t queue,
                struct rte_mbuf **pkts, uint16_t nb_pkts, void *user_params)
 {
-       const struct pdump_rxtx_cbs *cbs = user_params;
+       struct pdump_rxtx_cbs *cbs = user_params;
        struct rte_pdump_stats *stats = &pdump_stats->tx[port][queue];
 
+       pdump_cb_hold(cbs);
        pdump_copy(port, queue, RTE_PCAPNG_DIRECTION_OUT,
                   pkts, nb_pkts, cbs, stats);
+       pdump_cb_release(cbs);
+
        return nb_pkts;
 }
 
+
 static int
 pdump_register_rx_callbacks(enum pdump_version ver,
                            uint16_t end_q, uint16_t port, uint16_t queue,
@@ -186,6 +225,7 @@ pdump_register_rx_callbacks(enum pdump_version ver,
                                        port, qid);
                                return -EEXIST;
                        }
+                       cbs->use_count = 0;
                        cbs->ver = ver;
                        cbs->ring = ring;
                        cbs->mp = mp;
@@ -218,6 +258,7 @@ pdump_register_rx_callbacks(enum pdump_version ver,
                                        -ret);
                                return ret;
                        }
+                       pdump_cb_wait(cbs);
                        cbs->cb = NULL;
                }
        }
@@ -246,6 +287,7 @@ pdump_register_tx_callbacks(enum pdump_version ver,
                                        port, qid);
                                return -EEXIST;
                        }
+                       cbs->use_count = 0;
                        cbs->ver = ver;
                        cbs->ring = ring;
                        cbs->mp = mp;
@@ -277,6 +319,8 @@ pdump_register_tx_callbacks(enum pdump_version ver,
                                        -ret);
                                return ret;
                        }
+
+                       pdump_cb_wait(cbs);
                        cbs->cb = NULL;
                }
        }
-- 
2.47.2

Reply via email to