Add release barriers before updating the processed packets for worker
lcores to ensure the worker lcore has really finished data processing
and then it can update the processed packets number.
Fixes: 314bcf58ca8f ("app/eventdev: add pipeline queue worker functions")
Cc: [email protected]
Cc: [email protected]
Signed-off-by: Phil Yang <[email protected]>
Signed-off-by: Feifei Wang <[email protected]>
Reviewed-by: Honnappa Nagarahalli <[email protected]>
Reviewed-by: Ruifeng Wang <[email protected]>
---
app/test-eventdev/test_pipeline_queue.c | 64 +++++++++++++++++++++----
1 file changed, 56 insertions(+), 8 deletions(-)
diff --git a/app/test-eventdev/test_pipeline_queue.c
b/app/test-eventdev/test_pipeline_queue.c
index 7bebac34f..8e499d8d5 100644
--- a/app/test-eventdev/test_pipeline_queue.c
+++ b/app/test-eventdev/test_pipeline_queue.c
@@ -30,7 +30,13 @@ pipeline_queue_worker_single_stage_tx(void *arg)
if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
pipeline_event_tx(dev, port, &ev);
- w->processed_pkts++;
+
+ /* release barrier here ensures stored operation
+ * of the event completes before the number of
+ * processed pkts is visiable to the main core
+ */
+ __atomic_fetch_add(&(w->processed_pkts), 1,
+ __ATOMIC_RELEASE);
} else {
ev.queue_id++;
pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
@@ -59,7 +65,13 @@ pipeline_queue_worker_single_stage_fwd(void *arg)
rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0);
pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
pipeline_event_enqueue(dev, port, &ev);
- w->processed_pkts++;
+
+ /* release barrier here ensures stored operation
+ * of the event completes before the number of
+ * processed pkts is visiable to the main core
+ */
+ __atomic_fetch_add(&(w->processed_pkts), 1,
+ __ATOMIC_RELEASE);
}
return 0;
@@ -84,7 +96,13 @@ pipeline_queue_worker_single_stage_burst_tx(void *arg)
if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
pipeline_event_tx(dev, port, &ev[i]);
ev[i].op = RTE_EVENT_OP_RELEASE;
- w->processed_pkts++;
+
+ /* release barrier here ensures stored operation
+ * of the event completes before the number of
+ * processed pkts is visiable to the main core
+ */
+ __atomic_fetch_add(&(w->processed_pkts), 1,
+ __ATOMIC_RELEASE);
} else {
ev[i].queue_id++;
pipeline_fwd_event(&ev[i],
@@ -121,7 +139,13 @@ pipeline_queue_worker_single_stage_burst_fwd(void *arg)
}
pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
- w->processed_pkts += nb_rx;
+
+ /* release barrier here ensures stored operation
+ * of the event completes before the number of
+ * processed pkts is visiable to the main core
+ */
+ __atomic_fetch_add(&(w->processed_pkts), nb_rx,
+ __ATOMIC_RELEASE);
}
return 0;
@@ -146,7 +170,13 @@ pipeline_queue_worker_multi_stage_tx(void *arg)
if (ev.queue_id == tx_queue[ev.mbuf->port]) {
pipeline_event_tx(dev, port, &ev);
- w->processed_pkts++;
+
+ /* release barrier here ensures stored operation
+ * of the event completes before the number of
+ * processed pkts is visiable to the main core
+ */
+ __atomic_fetch_add(&(w->processed_pkts), 1,
+ __ATOMIC_RELEASE);
continue;
}
@@ -180,7 +210,13 @@ pipeline_queue_worker_multi_stage_fwd(void *arg)
ev.queue_id = tx_queue[ev.mbuf->port];
rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0);
pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
- w->processed_pkts++;
+
+ /* release barrier here ensures stored operation
+ * of the event completes before the number of
+ * processed pkts is visiable to the main core
+ */
+ __atomic_fetch_add(&(w->processed_pkts), 1,
+ __ATOMIC_RELEASE);
} else {
ev.queue_id++;
pipeline_fwd_event(&ev, sched_type_list[cq_id]);
@@ -214,7 +250,13 @@ pipeline_queue_worker_multi_stage_burst_tx(void *arg)
if (ev[i].queue_id == tx_queue[ev[i].mbuf->port]) {
pipeline_event_tx(dev, port, &ev[i]);
ev[i].op = RTE_EVENT_OP_RELEASE;
- w->processed_pkts++;
+
+ /* release barrier here ensures stored operation
+ * of the event completes before the number of
+ * processed pkts is visiable to the main core
+ */
+ __atomic_fetch_add(&(w->processed_pkts), 1,
+ __ATOMIC_RELEASE);
continue;
}
@@ -254,7 +296,13 @@ pipeline_queue_worker_multi_stage_burst_fwd(void *arg)
rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
pipeline_fwd_event(&ev[i],
RTE_SCHED_TYPE_ATOMIC);
- w->processed_pkts++;
+
+ /* release barrier here ensures stored operation
+ * of the event completes before the number of
+ * processed pkts is visiable to the main core
+ */
+ __atomic_fetch_add(&(w->processed_pkts), 1,
+ __ATOMIC_RELEASE);
} else {
ev[i].queue_id++;
pipeline_fwd_event(&ev[i],
--
2.17.1