From: Long Li <lon...@microsoft.com>

With RX coalescing, one CQE entry can be used to indicate up to 4 packets
on the receive queue. This saves processing time and PCI bandwidth over
the CQ.

Cc: sta...@dpdk.org
Signed-off-by: Long Li <lon...@microsoft.com>
---
 drivers/net/mana/mana.h |  4 +++
 drivers/net/mana/rx.c   | 71 ++++++++++++++++++++++++++++++++++-------
 2 files changed, 64 insertions(+), 11 deletions(-)

diff --git a/drivers/net/mana/mana.h b/drivers/net/mana/mana.h
index be88537f1a..dfeda531eb 100644
--- a/drivers/net/mana/mana.h
+++ b/drivers/net/mana/mana.h
@@ -429,6 +429,10 @@ struct mana_rxq {
        struct mana_gdma_queue gdma_cq;
        struct gdma_comp *gdma_comp_buf;
 
+       uint32_t comp_buf_len;
+       uint32_t comp_buf_idx;
+       uint32_t backlog_idx;
+
        struct mana_stats stats;
        struct mana_mr_btree mr_btree;
 
diff --git a/drivers/net/mana/rx.c b/drivers/net/mana/rx.c
index 6e1c397be8..cacfd9ae1b 100644
--- a/drivers/net/mana/rx.c
+++ b/drivers/net/mana/rx.c
@@ -361,6 +361,10 @@ mana_start_rx_queues(struct rte_eth_dev *dev)
                DRV_LOG(INFO, "rxq rq id %u buf %p count %u size %u",
                        rxq->gdma_rq.id, rxq->gdma_rq.buffer,
                        rxq->gdma_rq.count, rxq->gdma_rq.size);
+
+               rxq->comp_buf_len = 0;
+               rxq->comp_buf_idx = 0;
+               rxq->backlog_idx = 0;
        }
 
        for (i = 0; i < priv->num_queues; i++) {
@@ -385,16 +389,29 @@ mana_rx_burst(void *dpdk_rxq, struct rte_mbuf **pkts, 
uint16_t pkts_n)
        struct mana_priv *priv = rxq->priv;
        struct rte_mbuf *mbuf;
        int ret;
-       uint32_t num_pkts;
+       uint32_t pkt_idx = rxq->backlog_idx;
+       uint32_t pkt_len;
+       uint32_t i;
+       int polled = 0;
+
+repoll:
+       /* Polling on new completions if we have no backlog */
+       if (rxq->comp_buf_idx == rxq->comp_buf_len) {
+               RTE_ASSERT(!pkt_idx);
+               rxq->comp_buf_len =
+                       gdma_poll_completion_queue(&rxq->gdma_cq,
+                                                  rxq->gdma_comp_buf, pkts_n);
+               rxq->comp_buf_idx = 0;
+               polled = 1;
+       }
 
-       num_pkts = gdma_poll_completion_queue(&rxq->gdma_cq, 
rxq->gdma_comp_buf, pkts_n);
-       for (uint32_t i = 0; i < num_pkts; i++) {
+       i = rxq->comp_buf_idx;
+       while (i < rxq->comp_buf_len) {
                struct mana_rx_comp_oob *oob = (struct mana_rx_comp_oob *)
                        rxq->gdma_comp_buf[i].cqe_data;
                struct mana_rxq_desc *desc =
                        &rxq->desc_ring[rxq->desc_ring_tail];
 
-               rxq->gdma_rq.tail += desc->wqe_size_in_bu;
                mbuf = desc->pkt;
 
                switch (oob->cqe_hdr.cqe_type) {
@@ -409,8 +426,8 @@ mana_rx_burst(void *dpdk_rxq, struct rte_mbuf **pkts, 
uint16_t pkts_n)
                        goto drop;
 
                case CQE_RX_COALESCED_4:
-                       DP_LOG(ERR, "RX coalescing is not supported");
-                       continue;
+                       /* Proceed to process mbuf */
+                       break;
 
                default:
                        DP_LOG(ERR, "Unknown RX CQE type %d",
@@ -418,13 +435,22 @@ mana_rx_burst(void *dpdk_rxq, struct rte_mbuf **pkts, 
uint16_t pkts_n)
                        continue;
                }
 
-               DP_LOG(DEBUG, "mana_rx_comp_oob CQE_RX_OKAY rxq %p", rxq);
+               DP_LOG(DEBUG, "mana_rx_comp_oob type %d rxq %p",
+                      oob->cqe_hdr.cqe_type, rxq);
+
+               pkt_len = oob->packet_info[pkt_idx].packet_length;
+               if (!pkt_len) {
+                       /* Move on to the next completion */
+                       pkt_idx = 0;
+                       i++;
+                       continue;
+               }
 
                mbuf->data_off = RTE_PKTMBUF_HEADROOM;
                mbuf->nb_segs = 1;
                mbuf->next = NULL;
-               mbuf->pkt_len = oob->packet_info[0].packet_length;
-               mbuf->data_len = oob->packet_info[0].packet_length;
+               mbuf->data_len = pkt_len;
+               mbuf->pkt_len = pkt_len;
                mbuf->port = priv->port_id;
 
                if (oob->rx_ip_header_checksum_succeeded)
@@ -447,19 +473,28 @@ mana_rx_burst(void *dpdk_rxq, struct rte_mbuf **pkts, 
uint16_t pkts_n)
                if (oob->rx_hash_type == MANA_HASH_L3 ||
                    oob->rx_hash_type == MANA_HASH_L4) {
                        mbuf->ol_flags |= RTE_MBUF_F_RX_RSS_HASH;
-                       mbuf->hash.rss = oob->packet_info[0].packet_hash;
+                       mbuf->hash.rss = oob->packet_info[pkt_idx].packet_hash;
                }
 
                pkts[pkt_received++] = mbuf;
                rxq->stats.packets++;
                rxq->stats.bytes += mbuf->data_len;
 
+               pkt_idx++;
+               /* Move on the next completion if all packets are processed */
+               if (pkt_idx >= RX_COM_OOB_NUM_PACKETINFO_SEGMENTS) {
+                       pkt_idx = 0;
+                       i++;
+               }
+
 drop:
                rxq->desc_ring_tail++;
                if (rxq->desc_ring_tail >= rxq->num_desc)
                        rxq->desc_ring_tail = 0;
 
-               /* Post another request */
+               rxq->gdma_rq.tail += desc->wqe_size_in_bu;
+
+               /* Consume this request and post another request */
                ret = mana_alloc_and_post_rx_wqe(rxq);
                if (ret) {
                        DP_LOG(ERR, "failed to post rx wqe ret=%d", ret);
@@ -467,6 +502,20 @@ mana_rx_burst(void *dpdk_rxq, struct rte_mbuf **pkts, 
uint16_t pkts_n)
                }
 
                wqe_posted++;
+               if (pkt_received == pkts_n)
+                       break;
+       }
+
+       rxq->backlog_idx = pkt_idx;
+       rxq->comp_buf_idx = i;
+
+       /* If all CQEs are processed but there are more packets to read, poll 
the
+        * completion queue again because we may have not polled on the 
completion
+        * queue due to CQE not fully processed in the previous rx_burst
+        */
+       if (pkt_received < pkts_n && !polled) {
+               polled = 1;
+               goto repoll;
        }
 
        if (wqe_posted)
-- 
2.34.1

Reply via email to