There is a by-design assumption in the code that the global counter rings can contain all the port counters. So, enqueuing to these global rings should always succeed.
Add assertions to help for debugging this assumption. In addition, change mlx5_hws_cnt_pool_put() function to return void due to those assumptions. Signed-off-by: Michael Baum <michae...@nvidia.com> Acked-by: Matan Azrad <ma...@nvidia.com> Acked-by: Xiaoyu Min <jack...@nvidia.com> --- drivers/net/mlx5/mlx5_flow_hw.c | 2 +- drivers/net/mlx5/mlx5_hws_cnt.c | 25 ++++---- drivers/net/mlx5/mlx5_hws_cnt.h | 106 +++++++++++++++++--------------- 3 files changed, 72 insertions(+), 61 deletions(-) diff --git a/drivers/net/mlx5/mlx5_flow_hw.c b/drivers/net/mlx5/mlx5_flow_hw.c index 2d275ad111..54a0afe45f 100644 --- a/drivers/net/mlx5/mlx5_flow_hw.c +++ b/drivers/net/mlx5/mlx5_flow_hw.c @@ -7874,7 +7874,7 @@ flow_hw_action_handle_destroy(struct rte_eth_dev *dev, uint32_t queue, * time to update the AGE. */ mlx5_hws_age_nb_cnt_decrease(priv, age_idx); - ret = mlx5_hws_cnt_shared_put(priv->hws_cpool, &act_idx); + mlx5_hws_cnt_shared_put(priv->hws_cpool, &act_idx); break; case MLX5_INDIRECT_ACTION_TYPE_CT: ret = flow_hw_conntrack_destroy(dev, act_idx, error); diff --git a/drivers/net/mlx5/mlx5_hws_cnt.c b/drivers/net/mlx5/mlx5_hws_cnt.c index b8ce69af57..24c01eace0 100644 --- a/drivers/net/mlx5/mlx5_hws_cnt.c +++ b/drivers/net/mlx5/mlx5_hws_cnt.c @@ -58,13 +58,14 @@ __hws_cnt_id_load(struct mlx5_hws_cnt_pool *cpool) static void __mlx5_hws_cnt_svc(struct mlx5_dev_ctx_shared *sh, - struct mlx5_hws_cnt_pool *cpool) + struct mlx5_hws_cnt_pool *cpool) { struct rte_ring *reset_list = cpool->wait_reset_list; struct rte_ring *reuse_list = cpool->reuse_list; uint32_t reset_cnt_num; struct rte_ring_zc_data zcdr = {0}; struct rte_ring_zc_data zcdu = {0}; + uint32_t ret __rte_unused; reset_cnt_num = rte_ring_count(reset_list); do { @@ -72,17 +73,19 @@ __mlx5_hws_cnt_svc(struct mlx5_dev_ctx_shared *sh, mlx5_aso_cnt_query(sh, cpool); zcdr.n1 = 0; zcdu.n1 = 0; - rte_ring_enqueue_zc_burst_elem_start(reuse_list, - sizeof(cnt_id_t), reset_cnt_num, &zcdu, - NULL); - rte_ring_dequeue_zc_burst_elem_start(reset_list, - sizeof(cnt_id_t), reset_cnt_num, &zcdr, - NULL); + ret = rte_ring_enqueue_zc_burst_elem_start(reuse_list, + sizeof(cnt_id_t), + reset_cnt_num, &zcdu, + NULL); + MLX5_ASSERT(ret == reset_cnt_num); + ret = rte_ring_dequeue_zc_burst_elem_start(reset_list, + sizeof(cnt_id_t), + reset_cnt_num, &zcdr, + NULL); + MLX5_ASSERT(ret == reset_cnt_num); __hws_cnt_r2rcpy(&zcdu, &zcdr, reset_cnt_num); - rte_ring_dequeue_zc_elem_finish(reset_list, - reset_cnt_num); - rte_ring_enqueue_zc_elem_finish(reuse_list, - reset_cnt_num); + rte_ring_dequeue_zc_elem_finish(reset_list, reset_cnt_num); + rte_ring_enqueue_zc_elem_finish(reuse_list, reset_cnt_num); reset_cnt_num = rte_ring_count(reset_list); } while (reset_cnt_num > 0); } diff --git a/drivers/net/mlx5/mlx5_hws_cnt.h b/drivers/net/mlx5/mlx5_hws_cnt.h index 338ee4d688..030dcead86 100644 --- a/drivers/net/mlx5/mlx5_hws_cnt.h +++ b/drivers/net/mlx5/mlx5_hws_cnt.h @@ -116,7 +116,7 @@ enum { HWS_AGE_CANDIDATE_INSIDE_RING, /* * AGE assigned to flows but it still in ring. It was aged-out but the - * timeout was changed, so it in ring but stiil candidate. + * timeout was changed, so it in ring but still candidate. */ HWS_AGE_AGED_OUT_REPORTED, /* @@ -182,7 +182,7 @@ mlx5_hws_cnt_id_valid(cnt_id_t cnt_id) * * @param cpool * The pointer to counter pool - * @param index + * @param iidx * The internal counter index. * * @return @@ -231,32 +231,32 @@ __hws_cnt_query_raw(struct mlx5_hws_cnt_pool *cpool, cnt_id_t cnt_id, } /** - * Copy elems from one zero-copy ring to zero-copy ring in place. + * Copy elements from one zero-copy ring to zero-copy ring in place. * * The input is a rte ring zero-copy data struct, which has two pointer. * in case of the wrapper happened, the ptr2 will be meaningful. * - * So this rountin needs to consider the situation that the address given by + * So this routine needs to consider the situation that the address given by * source and destination could be both wrapped. * First, calculate the first number of element needs to be copied until wrapped * address, which could be in source or destination. * Second, copy left number of element until second wrapped address. If in first * step the wrapped address is source, then this time it must be in destination. - * and vice-vers. - * Third, copy all left numbe of element. + * and vice-versa. + * Third, copy all left number of element. * * In worst case, we need copy three pieces of continuous memory. * * @param zcdd - * A pointer to zero-copy data of dest ring. + * A pointer to zero-copy data of destination ring. * @param zcds * A pointer to zero-copy data of source ring. * @param n - * Number of elems to copy. + * Number of elements to copy. */ static __rte_always_inline void __hws_cnt_r2rcpy(struct rte_ring_zc_data *zcdd, struct rte_ring_zc_data *zcds, - unsigned int n) + unsigned int n) { unsigned int n1, n2, n3; void *s1, *s2, *s3; @@ -291,22 +291,23 @@ static __rte_always_inline int mlx5_hws_cnt_pool_cache_flush(struct mlx5_hws_cnt_pool *cpool, uint32_t queue_id) { - unsigned int ret; + unsigned int ret __rte_unused; struct rte_ring_zc_data zcdr = {0}; struct rte_ring_zc_data zcdc = {0}; struct rte_ring *reset_list = NULL; struct rte_ring *qcache = cpool->cache->qcache[queue_id]; + uint32_t ring_size = rte_ring_count(qcache); - ret = rte_ring_dequeue_zc_burst_elem_start(qcache, - sizeof(cnt_id_t), rte_ring_count(qcache), &zcdc, - NULL); - MLX5_ASSERT(ret); + ret = rte_ring_dequeue_zc_burst_elem_start(qcache, sizeof(cnt_id_t), + ring_size, &zcdc, NULL); + MLX5_ASSERT(ret == ring_size); reset_list = cpool->wait_reset_list; - rte_ring_enqueue_zc_burst_elem_start(reset_list, - sizeof(cnt_id_t), ret, &zcdr, NULL); - __hws_cnt_r2rcpy(&zcdr, &zcdc, ret); - rte_ring_enqueue_zc_elem_finish(reset_list, ret); - rte_ring_dequeue_zc_elem_finish(qcache, ret); + ret = rte_ring_enqueue_zc_burst_elem_start(reset_list, sizeof(cnt_id_t), + ring_size, &zcdr, NULL); + MLX5_ASSERT(ret == ring_size); + __hws_cnt_r2rcpy(&zcdr, &zcdc, ring_size); + rte_ring_enqueue_zc_elem_finish(reset_list, ring_size); + rte_ring_dequeue_zc_elem_finish(qcache, ring_size); return 0; } @@ -323,7 +324,7 @@ mlx5_hws_cnt_pool_cache_fetch(struct mlx5_hws_cnt_pool *cpool, struct rte_ring_zc_data zcdu = {0}; struct rte_ring_zc_data zcds = {0}; struct mlx5_hws_cnt_pool_caches *cache = cpool->cache; - unsigned int ret; + unsigned int ret, actual_fetch_size __rte_unused; reuse_list = cpool->reuse_list; ret = rte_ring_dequeue_zc_burst_elem_start(reuse_list, @@ -334,7 +335,9 @@ mlx5_hws_cnt_pool_cache_fetch(struct mlx5_hws_cnt_pool *cpool, rte_ring_dequeue_zc_elem_finish(reuse_list, 0); free_list = cpool->free_list; ret = rte_ring_dequeue_zc_burst_elem_start(free_list, - sizeof(cnt_id_t), cache->fetch_sz, &zcdf, NULL); + sizeof(cnt_id_t), + cache->fetch_sz, + &zcdf, NULL); zcds = zcdf; list = free_list; if (unlikely(ret == 0)) { /* no free counter. */ @@ -344,8 +347,10 @@ mlx5_hws_cnt_pool_cache_fetch(struct mlx5_hws_cnt_pool *cpool, return -ENOENT; } } - rte_ring_enqueue_zc_burst_elem_start(qcache, sizeof(cnt_id_t), - ret, &zcdc, NULL); + actual_fetch_size = ret; + ret = rte_ring_enqueue_zc_burst_elem_start(qcache, sizeof(cnt_id_t), + ret, &zcdc, NULL); + MLX5_ASSERT(ret == actual_fetch_size); __hws_cnt_r2rcpy(&zcdc, &zcds, ret); rte_ring_dequeue_zc_elem_finish(list, ret); rte_ring_enqueue_zc_elem_finish(qcache, ret); @@ -378,15 +383,14 @@ __mlx5_hws_cnt_pool_enqueue_revert(struct rte_ring *r, unsigned int n, * * @param cpool * A pointer to the counter pool structure. + * @param queue + * A pointer to HWS queue. If null, it means put into common pool. * @param cnt_id * A counter id to be added. - * @return - * - 0: Success; object taken - * - -ENOENT: not enough entry in pool */ -static __rte_always_inline int -mlx5_hws_cnt_pool_put(struct mlx5_hws_cnt_pool *cpool, - uint32_t *queue, cnt_id_t *cnt_id) +static __rte_always_inline void +mlx5_hws_cnt_pool_put(struct mlx5_hws_cnt_pool *cpool, uint32_t *queue, + cnt_id_t *cnt_id) { unsigned int ret = 0; struct rte_ring_zc_data zcdc = {0}; @@ -404,25 +408,29 @@ mlx5_hws_cnt_pool_put(struct mlx5_hws_cnt_pool *cpool, qcache = cpool->cache->qcache[*queue]; if (unlikely(qcache == NULL)) { ret = rte_ring_enqueue_elem(cpool->wait_reset_list, cnt_id, - sizeof(cnt_id_t)); + sizeof(cnt_id_t)); MLX5_ASSERT(ret == 0); - return ret; + return; } ret = rte_ring_enqueue_burst_elem(qcache, cnt_id, sizeof(cnt_id_t), 1, NULL); if (unlikely(ret == 0)) { /* cache is full. */ + struct rte_ring *reset_list = cpool->wait_reset_list; + wb_num = rte_ring_count(qcache) - cpool->cache->threshold; MLX5_ASSERT(wb_num < rte_ring_count(qcache)); __mlx5_hws_cnt_pool_enqueue_revert(qcache, wb_num, &zcdc); - rte_ring_enqueue_zc_burst_elem_start(cpool->wait_reset_list, - sizeof(cnt_id_t), wb_num, &zcdr, NULL); - __hws_cnt_r2rcpy(&zcdr, &zcdc, wb_num); - rte_ring_enqueue_zc_elem_finish(cpool->wait_reset_list, wb_num); + ret = rte_ring_enqueue_zc_burst_elem_start(reset_list, + sizeof(cnt_id_t), + wb_num, &zcdr, NULL); + MLX5_ASSERT(ret == wb_num); + __hws_cnt_r2rcpy(&zcdr, &zcdc, ret); + rte_ring_enqueue_zc_elem_finish(reset_list, ret); /* write-back THIS counter too */ - ret = rte_ring_enqueue_burst_elem(cpool->wait_reset_list, - cnt_id, sizeof(cnt_id_t), 1, NULL); + ret = rte_ring_enqueue_burst_elem(reset_list, cnt_id, + sizeof(cnt_id_t), 1, NULL); } - return ret == 1 ? 0 : -ENOENT; + MLX5_ASSERT(ret == 1); } /** @@ -482,15 +490,17 @@ mlx5_hws_cnt_pool_get(struct mlx5_hws_cnt_pool *cpool, uint32_t *queue, return 0; } ret = rte_ring_dequeue_zc_burst_elem_start(qcache, sizeof(cnt_id_t), 1, - &zcdc, NULL); + &zcdc, NULL); if (unlikely(ret == 0)) { /* local cache is empty. */ rte_ring_dequeue_zc_elem_finish(qcache, 0); /* let's fetch from global free list. */ ret = mlx5_hws_cnt_pool_cache_fetch(cpool, *queue); if (unlikely(ret != 0)) return ret; - rte_ring_dequeue_zc_burst_elem_start(qcache, sizeof(cnt_id_t), - 1, &zcdc, NULL); + ret = rte_ring_dequeue_zc_burst_elem_start(qcache, + sizeof(cnt_id_t), 1, + &zcdc, NULL); + MLX5_ASSERT(ret == 1); } /* get one from local cache. */ *cnt_id = (*(cnt_id_t *)zcdc.ptr1); @@ -504,8 +514,10 @@ mlx5_hws_cnt_pool_get(struct mlx5_hws_cnt_pool *cpool, uint32_t *queue, ret = mlx5_hws_cnt_pool_cache_fetch(cpool, *queue); if (unlikely(ret != 0)) return ret; - rte_ring_dequeue_zc_burst_elem_start(qcache, sizeof(cnt_id_t), - 1, &zcdc, NULL); + ret = rte_ring_dequeue_zc_burst_elem_start(qcache, + sizeof(cnt_id_t), 1, + &zcdc, NULL); + MLX5_ASSERT(ret == 1); *cnt_id = *(cnt_id_t *)zcdc.ptr1; iidx = mlx5_hws_cnt_iidx(cpool, *cnt_id); } @@ -553,17 +565,13 @@ mlx5_hws_cnt_shared_get(struct mlx5_hws_cnt_pool *cpool, cnt_id_t *cnt_id, return 0; } -static __rte_always_inline int +static __rte_always_inline void mlx5_hws_cnt_shared_put(struct mlx5_hws_cnt_pool *cpool, cnt_id_t *cnt_id) { - int ret; uint32_t iidx = mlx5_hws_cnt_iidx(cpool, *cnt_id); cpool->pool[iidx].share = 0; - ret = mlx5_hws_cnt_pool_put(cpool, NULL, cnt_id); - if (unlikely(ret != 0)) - cpool->pool[iidx].share = 1; /* fail to release, restore. */ - return ret; + mlx5_hws_cnt_pool_put(cpool, NULL, cnt_id); } static __rte_always_inline bool -- 2.25.1