The pdump callback can race with other cpu's in the datapath. Handle this by using reference counts and LSB in manner similar to seqcount and bpf code.
Signed-off-by: Stephen Hemminger <step...@networkplumber.org> --- lib/pdump/rte_pdump.c | 48 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 46 insertions(+), 2 deletions(-) diff --git a/lib/pdump/rte_pdump.c b/lib/pdump/rte_pdump.c index ba75b828f2..bfd63fa8c2 100644 --- a/lib/pdump/rte_pdump.c +++ b/lib/pdump/rte_pdump.c @@ -12,6 +12,7 @@ #include <rte_memzone.h> #include <rte_errno.h> #include <rte_string_fns.h> +#include <rte_pause.h> #include <rte_pcapng.h> #include "rte_pdump.h" @@ -62,6 +63,7 @@ static struct pdump_rxtx_cbs { const struct rte_bpf *filter; enum pdump_version ver; uint32_t snaplen; + RTE_ATOMIC(uint32_t) use_count; } rx_cbs[RTE_MAX_ETHPORTS][RTE_MAX_QUEUES_PER_PORT], tx_cbs[RTE_MAX_ETHPORTS][RTE_MAX_QUEUES_PER_PORT]; @@ -78,6 +80,36 @@ static struct { const struct rte_memzone *mz; } *pdump_stats; +static void +pdump_cb_wait(struct pdump_rxtx_cbs *cbs) +{ + /* make sure the data loads happens before the use count load */ + rte_atomic_thread_fence(rte_memory_order_acquire); + + /* wait until use_count is even (not in use) */ + RTE_WAIT_UNTIL_MASKED(&cbs->use_count, 1, ==, 0, rte_memory_order_relaxed); +} + +static __rte_always_inline void +pdump_cb_hold(struct pdump_rxtx_cbs *cbs) +{ + uint32_t count = cbs->use_count + 1; + + rte_atomic_store_explicit(&cbs->use_count, count, rte_memory_order_relaxed); + + /* prevent stores after this from happening before the use_count update */ + rte_atomic_thread_fence(rte_memory_order_release); +} + +static __rte_always_inline void +pdump_cb_release(struct pdump_rxtx_cbs *cbs) +{ + uint32_t count = cbs->use_count + 1; + + /* Synchronizes-with the load acquire in pdump_cb_wait */ + rte_atomic_store_explicit(&cbs->use_count, count, rte_memory_order_release); +} + /* Create a clone of mbuf to be placed into ring. */ static void pdump_copy(uint16_t port_id, uint16_t queue, @@ -146,11 +178,14 @@ pdump_rx(uint16_t port, uint16_t queue, struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused, void *user_params) { - const struct pdump_rxtx_cbs *cbs = user_params; + struct pdump_rxtx_cbs *cbs = user_params; struct rte_pdump_stats *stats = &pdump_stats->rx[port][queue]; + pdump_cb_hold(cbs); pdump_copy(port, queue, RTE_PCAPNG_DIRECTION_IN, pkts, nb_pkts, cbs, stats); + pdump_cb_release(cbs); + return nb_pkts; } @@ -158,14 +193,18 @@ static uint16_t pdump_tx(uint16_t port, uint16_t queue, struct rte_mbuf **pkts, uint16_t nb_pkts, void *user_params) { - const struct pdump_rxtx_cbs *cbs = user_params; + struct pdump_rxtx_cbs *cbs = user_params; struct rte_pdump_stats *stats = &pdump_stats->tx[port][queue]; + pdump_cb_hold(cbs); pdump_copy(port, queue, RTE_PCAPNG_DIRECTION_OUT, pkts, nb_pkts, cbs, stats); + pdump_cb_release(cbs); + return nb_pkts; } + static int pdump_register_rx_callbacks(enum pdump_version ver, uint16_t end_q, uint16_t port, uint16_t queue, @@ -186,6 +225,7 @@ pdump_register_rx_callbacks(enum pdump_version ver, port, qid); return -EEXIST; } + cbs->use_count = 0; cbs->ver = ver; cbs->ring = ring; cbs->mp = mp; @@ -218,6 +258,7 @@ pdump_register_rx_callbacks(enum pdump_version ver, -ret); return ret; } + pdump_cb_wait(cbs); cbs->cb = NULL; } } @@ -246,6 +287,7 @@ pdump_register_tx_callbacks(enum pdump_version ver, port, qid); return -EEXIST; } + cbs->use_count = 0; cbs->ver = ver; cbs->ring = ring; cbs->mp = mp; @@ -277,6 +319,8 @@ pdump_register_tx_callbacks(enum pdump_version ver, -ret); return ret; } + + pdump_cb_wait(cbs); cbs->cb = NULL; } } -- 2.47.2