<snip> > > rte_distributor_return_pkt function which is run on worker cores must wait > for distributor core to clear handshake on retptr64 before using those > buffers. While the handshake is set distributor core controls buffers and any > operations on worker side might overwrite buffers which are unread yet. > Same situation appears in the legacy single distributor. Function > rte_distributor_return_pkt_single shouldn't modify the bufptr64 until > handshake on it is cleared by distributor lcore. > > Fixes: 775003ad2f96 ("distributor: add new burst-capable library") > Cc: david.h...@intel.com > Cc: sta...@dpdk.org > > Signed-off-by: Lukasz Wojciechowski <l.wojciec...@partner.samsung.com> > Acked-by: David Hunt <david.h...@intel.com> Looks good. Reviewed-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>
> --- > lib/librte_distributor/rte_distributor.c | 12 ++++++++++++ > lib/librte_distributor/rte_distributor_single.c | 4 ++++ > 2 files changed, 16 insertions(+) > > diff --git a/lib/librte_distributor/rte_distributor.c > b/lib/librte_distributor/rte_distributor.c > index 1c047f065..c6b19a388 100644 > --- a/lib/librte_distributor/rte_distributor.c > +++ b/lib/librte_distributor/rte_distributor.c > @@ -169,6 +169,18 @@ rte_distributor_return_pkt(struct rte_distributor *d, > return -EINVAL; > } > > + /* Spin while handshake bits are set (scheduler clears it). > + * Sync with worker on GET_BUF flag. > + */ > + while (unlikely(__atomic_load_n(&(buf->retptr64[0]), > __ATOMIC_RELAXED) > + & RTE_DISTRIB_GET_BUF)) { > + rte_pause(); > + uint64_t t = rte_rdtsc()+100; > + > + while (rte_rdtsc() < t) > + rte_pause(); > + } > + > /* Sync with distributor to acquire retptrs */ > __atomic_thread_fence(__ATOMIC_ACQUIRE); > for (i = 0; i < RTE_DIST_BURST_SIZE; i++) diff --git > a/lib/librte_distributor/rte_distributor_single.c > b/lib/librte_distributor/rte_distributor_single.c > index abaf7730c..f4725b1d0 100644 > --- a/lib/librte_distributor/rte_distributor_single.c > +++ b/lib/librte_distributor/rte_distributor_single.c > @@ -74,6 +74,10 @@ rte_distributor_return_pkt_single(struct > rte_distributor_single *d, > union rte_distributor_buffer_single *buf = &d->bufs[worker_id]; > uint64_t req = (((int64_t)(uintptr_t)oldpkt) << > RTE_DISTRIB_FLAG_BITS) > | RTE_DISTRIB_RETURN_BUF; > + while (unlikely(__atomic_load_n(&buf->bufptr64, > __ATOMIC_RELAXED) > + & RTE_DISTRIB_FLAGS_MASK)) > + rte_pause(); > + > /* Sync with distributor on RETURN_BUF flag. */ > __atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE); > return 0; > -- > 2.17.1