<snip>

> 
> Statistics of handled packets are cleared and read on main lcore, while they
> are increased in workers handlers on different lcores.
> 
> Without synchronization occasionally showed invalid values.
What exactly do you mean by invalid values? Can you elaborate?

> This patch uses atomic acquire/release mechanisms to synchronize.
> 
> Fixes: c3eabff124e6 ("distributor: add unit tests")
> Cc: bruce.richard...@intel.com
> Cc: sta...@dpdk.org
> 
> Signed-off-by: Lukasz Wojciechowski <l.wojciec...@partner.samsung.com>
> ---
>  app/test/test_distributor.c | 39 ++++++++++++++++++++++++-------------
>  1 file changed, 26 insertions(+), 13 deletions(-)
> 
> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index
> 35b25463a..0e49e3714 100644
> --- a/app/test/test_distributor.c
> +++ b/app/test/test_distributor.c
> @@ -43,7 +43,8 @@ total_packet_count(void)  {
>       unsigned i, count = 0;
>       for (i = 0; i < worker_idx; i++)
> -             count += worker_stats[i].handled_packets;
> +             count +=
> __atomic_load_n(&worker_stats[i].handled_packets,
> +                             __ATOMIC_ACQUIRE);
>       return count;
>  }
> 
> @@ -52,6 +53,7 @@ static inline void
>  clear_packet_count(void)
>  {
>       memset(&worker_stats, 0, sizeof(worker_stats));
> +     rte_atomic_thread_fence(__ATOMIC_RELEASE);
>  }
> 
>  /* this is the basic worker function for sanity test @@ -72,13 +74,13 @@
> handle_work(void *arg)
>       num = rte_distributor_get_pkt(db, id, buf, buf, num);
>       while (!quit) {
>               __atomic_fetch_add(&worker_stats[id].handled_packets,
> num,
> -                             __ATOMIC_RELAXED);
> +                             __ATOMIC_ACQ_REL);
>               count += num;
>               num = rte_distributor_get_pkt(db, id,
>                               buf, buf, num);
>       }
>       __atomic_fetch_add(&worker_stats[id].handled_packets, num,
> -                     __ATOMIC_RELAXED);
> +                     __ATOMIC_ACQ_REL);
>       count += num;
>       rte_distributor_return_pkt(db, id, buf, num);
>       return 0;
> @@ -134,7 +136,8 @@ sanity_test(struct worker_params *wp, struct
> rte_mempool *p)
> 
>       for (i = 0; i < rte_lcore_count() - 1; i++)
>               printf("Worker %u handled %u packets\n", i,
> -                             worker_stats[i].handled_packets);
> +                     __atomic_load_n(&worker_stats[i].handled_packets,
> +                                     __ATOMIC_ACQUIRE));
>       printf("Sanity test with all zero hashes done.\n");
> 
>       /* pick two flows and check they go correctly */ @@ -159,7 +162,9
> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
> 
>               for (i = 0; i < rte_lcore_count() - 1; i++)
>                       printf("Worker %u handled %u packets\n", i,
> -                                     worker_stats[i].handled_packets);
> +                             __atomic_load_n(
> +                                     &worker_stats[i].handled_packets,
> +                                     __ATOMIC_ACQUIRE));
>               printf("Sanity test with two hash values done\n");
>       }
> 
> @@ -185,7 +190,8 @@ sanity_test(struct worker_params *wp, struct
> rte_mempool *p)
> 
>       for (i = 0; i < rte_lcore_count() - 1; i++)
>               printf("Worker %u handled %u packets\n", i,
> -                             worker_stats[i].handled_packets);
> +                     __atomic_load_n(&worker_stats[i].handled_packets,
> +                                     __ATOMIC_ACQUIRE));
>       printf("Sanity test with non-zero hashes done\n");
> 
>       rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -280,15
> +286,17 @@ handle_work_with_free_mbufs(void *arg)
>               buf[i] = NULL;
>       num = rte_distributor_get_pkt(d, id, buf, buf, num);
>       while (!quit) {
> -             worker_stats[id].handled_packets += num;
>               count += num;
> +             __atomic_fetch_add(&worker_stats[id].handled_packets,
> num,
> +                             __ATOMIC_ACQ_REL);
>               for (i = 0; i < num; i++)
>                       rte_pktmbuf_free(buf[i]);
>               num = rte_distributor_get_pkt(d,
>                               id, buf, buf, num);
>       }
> -     worker_stats[id].handled_packets += num;
>       count += num;
> +     __atomic_fetch_add(&worker_stats[id].handled_packets, num,
> +                     __ATOMIC_ACQ_REL);
>       rte_distributor_return_pkt(d, id, buf, num);
>       return 0;
>  }
> @@ -363,8 +371,9 @@ handle_work_for_shutdown_test(void *arg)
>       /* wait for quit single globally, or for worker zero, wait
>        * for zero_quit */
>       while (!quit && !(id == zero_id && zero_quit)) {
> -             worker_stats[id].handled_packets += num;
>               count += num;
> +             __atomic_fetch_add(&worker_stats[id].handled_packets,
> num,
> +                             __ATOMIC_ACQ_REL);
>               for (i = 0; i < num; i++)
>                       rte_pktmbuf_free(buf[i]);
>               num = rte_distributor_get_pkt(d,
> @@ -379,10 +388,11 @@ handle_work_for_shutdown_test(void *arg)
> 
>               total += num;
>       }
> -     worker_stats[id].handled_packets += num;
>       count += num;
>       returned = rte_distributor_return_pkt(d, id, buf, num);
> 
> +     __atomic_fetch_add(&worker_stats[id].handled_packets, num,
> +                     __ATOMIC_ACQ_REL);
>       if (id == zero_id) {
>               /* for worker zero, allow it to restart to pick up last packet
>                * when all workers are shutting down.
> @@ -394,10 +404,11 @@ handle_work_for_shutdown_test(void *arg)
>                               id, buf, buf, num);
> 
>               while (!quit) {
> -                     worker_stats[id].handled_packets += num;
>                       count += num;
>                       rte_pktmbuf_free(pkt);
>                       num = rte_distributor_get_pkt(d, id, buf, buf, num);
> +
>       __atomic_fetch_add(&worker_stats[id].handled_packets,
> +                                     num, __ATOMIC_ACQ_REL);
>               }
>               returned = rte_distributor_return_pkt(d,
>                               id, buf, num);
> @@ -461,7 +472,8 @@ sanity_test_with_worker_shutdown(struct
> worker_params *wp,
> 
>       for (i = 0; i < rte_lcore_count() - 1; i++)
>               printf("Worker %u handled %u packets\n", i,
> -                             worker_stats[i].handled_packets);
> +                     __atomic_load_n(&worker_stats[i].handled_packets,
> +                                     __ATOMIC_ACQUIRE));
> 
>       if (total_packet_count() != BURST * 2) {
>               printf("Line %d: Error, not all packets flushed. "
> @@ -514,7 +526,8 @@ test_flush_with_worker_shutdown(struct
> worker_params *wp,
>       zero_quit = 0;
>       for (i = 0; i < rte_lcore_count() - 1; i++)
>               printf("Worker %u handled %u packets\n", i,
> -                             worker_stats[i].handled_packets);
> +                     __atomic_load_n(&worker_stats[i].handled_packets,
> +                                     __ATOMIC_ACQUIRE));
> 
>       if (total_packet_count() != BURST) {
>               printf("Line %d: Error, not all packets flushed. "
> --
> 2.17.1

Reply via email to