From: David George <david.geo...@sophos.com>

Replace pending queue with one that allows concurrent single producer and
single consumer. This relaxes the restriction of only allowing a single
lcore to operate on a given queue pair.

Signed-off-by: David George <david.geo...@sophos.com>
Signed-off-by: Anoob Joseph <ano...@marvell.com>
---
 doc/guides/cryptodevs/octeontx.rst                 |   6 --
 doc/guides/cryptodevs/octeontx2.rst                |   6 --
 drivers/common/cpt/cpt_common.h                    |  73 +++++++++++++--
 drivers/crypto/octeontx/otx_cryptodev_hw_access.c  |  19 ++--
 drivers/crypto/octeontx/otx_cryptodev_hw_access.h  |  14 ++-
 drivers/crypto/octeontx/otx_cryptodev_ops.c        |  75 +++++++---------
 .../crypto/octeontx2/otx2_cryptodev_hw_access.h    |   8 +-
 drivers/crypto/octeontx2/otx2_cryptodev_ops.c      | 100 ++++++++++++---------
 8 files changed, 179 insertions(+), 122 deletions(-)

diff --git a/doc/guides/cryptodevs/octeontx.rst 
b/doc/guides/cryptodevs/octeontx.rst
index 4972274..a39f3f3 100644
--- a/doc/guides/cryptodevs/octeontx.rst
+++ b/doc/guides/cryptodevs/octeontx.rst
@@ -135,9 +135,3 @@ application:
 
         ./dpdk-test
         RTE>>cryptodev_octeontx_asym_autotest
-
-Limitations
------------
-
-Multiple lcores may not operate on the same crypto queue pair. The lcore that
-enqueues to a queue pair is the one that must dequeue from it.
diff --git a/doc/guides/cryptodevs/octeontx2.rst 
b/doc/guides/cryptodevs/octeontx2.rst
index f0beb92..811e61a 100644
--- a/doc/guides/cryptodevs/octeontx2.rst
+++ b/doc/guides/cryptodevs/octeontx2.rst
@@ -186,9 +186,3 @@ Features supported
 * AES-128/192/256-GCM
 * AES-128/192/256-CBC-SHA1-HMAC
 * AES-128/192/256-CBC-SHA256-128-HMAC
-
-Limitations
------------
-
-Multiple lcores may not operate on the same crypto queue pair. The lcore that
-enqueues to a queue pair is the one that must dequeue from it.
diff --git a/drivers/common/cpt/cpt_common.h b/drivers/common/cpt/cpt_common.h
index 724e5ec..d70668a 100644
--- a/drivers/common/cpt/cpt_common.h
+++ b/drivers/common/cpt/cpt_common.h
@@ -5,6 +5,7 @@
 #ifndef _CPT_COMMON_H_
 #define _CPT_COMMON_H_
 
+#include <rte_prefetch.h>
 #include <rte_mempool.h>
 
 /*
@@ -32,14 +33,12 @@ struct cpt_qp_meta_info {
  *
  */
 struct pending_queue {
-       /** Pending requests count */
-       uint64_t pending_count;
        /** Array of pending requests */
-       uintptr_t *req_queue;
+       void **rid_queue;
        /** Tail of queue to be used for enqueue */
-       uint16_t enq_tail;
+       unsigned int tail;
        /** Head of queue to be used for dequeue */
-       uint16_t deq_head;
+       unsigned int head;
 };
 
 struct cpt_request_info {
@@ -61,4 +60,68 @@ struct cpt_request_info {
        uint8_t extra_time;
 } __rte_aligned(8);
 
+static __rte_always_inline void
+pending_queue_push(struct pending_queue *q, void *rid, unsigned int off,
+                       const int qsize)
+{
+       /* NOTE: no free space check, but it is expected that one is made */
+       q->rid_queue[(q->tail + off) & (qsize - 1)] = rid;
+}
+
+static __rte_always_inline void
+pending_queue_commit(struct pending_queue *q, unsigned int cnt,
+                       const unsigned int qsize)
+{
+       /* Ensure ordering between setting the entry and updating the tail */
+       rte_atomic_thread_fence(__ATOMIC_RELEASE);
+
+       q->tail = (q->tail + cnt) & (qsize - 1);
+}
+
+static __rte_always_inline void
+pending_queue_pop(struct pending_queue *q, const int qsize)
+{
+       /* NOTE: no empty check, but it is expected that one is made prior */
+
+       q->head = (q->head + 1) & (qsize - 1);
+}
+
+static __rte_always_inline void
+pending_queue_peek(struct pending_queue *q, void **rid, const int qsize,
+                       int prefetch_next)
+{
+       void *next_rid;
+       /* NOTE: no empty check, but it is expected that one is made */
+
+       *rid = q->rid_queue[q->head];
+
+       if (likely(prefetch_next)) {
+               next_rid = q->rid_queue[(q->head + 1) & (qsize - 1)];
+               rte_prefetch_non_temporal((void *)next_rid);
+       }
+}
+
+static __rte_always_inline unsigned int
+pending_queue_level(struct pending_queue *q, const int qsize)
+{
+       return (q->tail - q->head) & (qsize - 1);
+}
+
+static __rte_always_inline unsigned int
+pending_queue_free_slots(struct pending_queue *q, const int qsize,
+               const int reserved_slots)
+{
+       int free_slots;
+
+       free_slots = qsize - pending_queue_level(q, qsize);
+
+       /* Use only use qsize - 1 */
+       free_slots -= 1 + reserved_slots;
+
+       if (unlikely(free_slots < 0))
+               return 0;
+
+       return free_slots;
+}
+
 #endif /* _CPT_COMMON_H_ */
diff --git a/drivers/crypto/octeontx/otx_cryptodev_hw_access.c 
b/drivers/crypto/octeontx/otx_cryptodev_hw_access.c
index ab335c6..7b89a62 100644
--- a/drivers/crypto/octeontx/otx_cryptodev_hw_access.c
+++ b/drivers/crypto/octeontx/otx_cryptodev_hw_access.c
@@ -527,10 +527,10 @@ otx_cpt_get_resource(const struct rte_cryptodev *dev, 
uint8_t group,
        memset(&cptvf->pqueue, 0, sizeof(cptvf->pqueue));
 
        /* Chunks are of fixed size buffers */
+
+       qlen = DEFAULT_CMD_QLEN;
        chunks = DEFAULT_CMD_QCHUNKS;
        chunk_len = DEFAULT_CMD_QCHUNK_SIZE;
-
-       qlen = chunks * chunk_len;
        /* Chunk size includes 8 bytes of next chunk ptr */
        chunk_size = chunk_len * CPT_INST_SIZE + CPT_NEXT_CHUNK_PTR_SIZE;
 
@@ -538,7 +538,7 @@ otx_cpt_get_resource(const struct rte_cryptodev *dev, 
uint8_t group,
        len = chunks * RTE_ALIGN(sizeof(struct command_chunk), 8);
 
        /* For pending queue */
-       len += qlen * sizeof(uintptr_t);
+       len += qlen * RTE_ALIGN(sizeof(cptvf->pqueue.rid_queue[0]), 8);
 
        /* So that instruction queues start as pg size aligned */
        len = RTE_ALIGN(len, pg_sz);
@@ -573,14 +573,11 @@ otx_cpt_get_resource(const struct rte_cryptodev *dev, 
uint8_t group,
        }
 
        /* Pending queue setup */
-       cptvf->pqueue.req_queue = (uintptr_t *)mem;
-       cptvf->pqueue.enq_tail = 0;
-       cptvf->pqueue.deq_head = 0;
-       cptvf->pqueue.pending_count = 0;
-
-       mem +=  qlen * sizeof(uintptr_t);
-       len -=  qlen * sizeof(uintptr_t);
-       dma_addr += qlen * sizeof(uintptr_t);
+       cptvf->pqueue.rid_queue = (void **)mem;
+
+       mem +=  qlen * RTE_ALIGN(sizeof(cptvf->pqueue.rid_queue[0]), 8);
+       len -=  qlen * RTE_ALIGN(sizeof(cptvf->pqueue.rid_queue[0]), 8);
+       dma_addr += qlen * RTE_ALIGN(sizeof(cptvf->pqueue.rid_queue[0]), 8);
 
        /* Alignment wastage */
        used_len = alloc_len - len;
diff --git a/drivers/crypto/octeontx/otx_cryptodev_hw_access.h 
b/drivers/crypto/octeontx/otx_cryptodev_hw_access.h
index f7b1e93..7c6b1e4 100644
--- a/drivers/crypto/octeontx/otx_cryptodev_hw_access.h
+++ b/drivers/crypto/octeontx/otx_cryptodev_hw_access.h
@@ -23,10 +23,16 @@
 #define CPT_INTR_POLL_INTERVAL_MS      (50)
 
 /* Default command queue length */
-#define DEFAULT_CMD_QCHUNKS            2
-#define DEFAULT_CMD_QCHUNK_SIZE                1023
-#define DEFAULT_CMD_QLEN \
-               (DEFAULT_CMD_QCHUNK_SIZE * DEFAULT_CMD_QCHUNKS)
+#define DEFAULT_CMD_QLEN       2048
+#define DEFAULT_CMD_QCHUNKS    2
+
+/* Instruction memory benefits from being 1023, so introduce
+ * reserved entries so we can't overrun the instruction queue
+ */
+#define DEFAULT_CMD_QRSVD_SLOTS DEFAULT_CMD_QCHUNKS
+#define DEFAULT_CMD_QCHUNK_SIZE \
+               ((DEFAULT_CMD_QLEN - DEFAULT_CMD_QRSVD_SLOTS) / \
+               DEFAULT_CMD_QCHUNKS)
 
 #define CPT_CSR_REG_BASE(cpt)          ((cpt)->reg_base)
 
diff --git a/drivers/crypto/octeontx/otx_cryptodev_ops.c 
b/drivers/crypto/octeontx/otx_cryptodev_ops.c
index c51be63..76ea1f8 100644
--- a/drivers/crypto/octeontx/otx_cryptodev_ops.c
+++ b/drivers/crypto/octeontx/otx_cryptodev_ops.c
@@ -431,16 +431,10 @@ otx_cpt_asym_session_clear(struct rte_cryptodev *dev,
 
 static __rte_always_inline void * __rte_hot
 otx_cpt_request_enqueue(struct cpt_instance *instance,
-                       struct pending_queue *pqueue,
                        void *req, uint64_t cpt_inst_w7)
 {
        struct cpt_request_info *user_req = (struct cpt_request_info *)req;
 
-       if (unlikely(pqueue->pending_count >= DEFAULT_CMD_QLEN)) {
-               rte_errno = EAGAIN;
-               return NULL;
-       }
-
        fill_cpt_inst(instance, req, cpt_inst_w7);
 
        CPT_LOG_DP_DEBUG("req: %p op: %p ", req, user_req->op);
@@ -460,8 +454,7 @@ otx_cpt_request_enqueue(struct cpt_instance *instance,
 
 static __rte_always_inline void * __rte_hot
 otx_cpt_enq_single_asym(struct cpt_instance *instance,
-                       struct rte_crypto_op *op,
-                       struct pending_queue *pqueue)
+                       struct rte_crypto_op *op)
 {
        struct cpt_qp_meta_info *minfo = &instance->meta_info;
        struct rte_crypto_asym_op *asym_op = op->asym;
@@ -525,8 +518,7 @@ otx_cpt_enq_single_asym(struct cpt_instance *instance,
                goto req_fail;
        }
 
-       req = otx_cpt_request_enqueue(instance, pqueue, params.req,
-                                     sess->cpt_inst_w7);
+       req = otx_cpt_request_enqueue(instance, params.req, sess->cpt_inst_w7);
        if (unlikely(req == NULL)) {
                CPT_LOG_DP_ERR("Could not enqueue crypto req");
                goto req_fail;
@@ -542,8 +534,7 @@ otx_cpt_enq_single_asym(struct cpt_instance *instance,
 
 static __rte_always_inline void * __rte_hot
 otx_cpt_enq_single_sym(struct cpt_instance *instance,
-                      struct rte_crypto_op *op,
-                      struct pending_queue *pqueue)
+                      struct rte_crypto_op *op)
 {
        struct cpt_sess_misc *sess;
        struct rte_crypto_sym_op *sym_op = op->sym;
@@ -573,8 +564,7 @@ otx_cpt_enq_single_sym(struct cpt_instance *instance,
        }
 
        /* Enqueue prepared instruction to h/w */
-       req = otx_cpt_request_enqueue(instance, pqueue, prep_req,
-                                     sess->cpt_inst_w7);
+       req = otx_cpt_request_enqueue(instance, prep_req, sess->cpt_inst_w7);
        if (unlikely(req == NULL))
                /* Buffer allocated for request preparation need to be freed */
                free_op_meta(mdata, instance->meta_info.pool);
@@ -584,8 +574,7 @@ otx_cpt_enq_single_sym(struct cpt_instance *instance,
 
 static __rte_always_inline void * __rte_hot
 otx_cpt_enq_single_sym_sessless(struct cpt_instance *instance,
-                               struct rte_crypto_op *op,
-                               struct pending_queue *pend_q)
+                               struct rte_crypto_op *op)
 {
        const int driver_id = otx_cryptodev_driver_id;
        struct rte_crypto_sym_op *sym_op = op->sym;
@@ -607,8 +596,8 @@ otx_cpt_enq_single_sym_sessless(struct cpt_instance 
*instance,
 
        sym_op->session = sess;
 
-       req = otx_cpt_enq_single_sym(instance, op, pend_q);
-
+       /* Enqueue op with the tmp session set */
+       req = otx_cpt_enq_single_sym(instance, op);
        if (unlikely(req == NULL))
                goto priv_put;
 
@@ -627,22 +616,20 @@ otx_cpt_enq_single_sym_sessless(struct cpt_instance 
*instance,
 static __rte_always_inline void *__rte_hot
 otx_cpt_enq_single(struct cpt_instance *inst,
                   struct rte_crypto_op *op,
-                  struct pending_queue *pqueue,
                   const uint8_t op_type)
 {
        /* Check for the type */
 
        if (op_type == OP_TYPE_SYM) {
                if (op->sess_type == RTE_CRYPTO_OP_WITH_SESSION)
-                       return otx_cpt_enq_single_sym(inst, op, pqueue);
+                       return otx_cpt_enq_single_sym(inst, op);
                else
-                       return otx_cpt_enq_single_sym_sessless(inst, op,
-                                                              pqueue);
+                       return otx_cpt_enq_single_sym_sessless(inst, op);
        }
 
        if (op_type == OP_TYPE_ASYM) {
                if (op->sess_type == RTE_CRYPTO_OP_WITH_SESSION)
-                       return otx_cpt_enq_single_asym(inst, op, pqueue);
+                       return otx_cpt_enq_single_asym(inst, op);
        }
 
        /* Should not reach here */
@@ -655,30 +642,33 @@ otx_cpt_pkt_enqueue(void *qptr, struct rte_crypto_op 
**ops, uint16_t nb_ops,
                    const uint8_t op_type)
 {
        struct cpt_instance *instance = (struct cpt_instance *)qptr;
-       uint16_t count;
+       uint16_t count, free_slots;
        void *req;
        struct cpt_vf *cptvf = (struct cpt_vf *)instance;
        struct pending_queue *pqueue = &cptvf->pqueue;
 
-       count = DEFAULT_CMD_QLEN - pqueue->pending_count;
-       if (nb_ops > count)
-               nb_ops = count;
+       free_slots = pending_queue_free_slots(pqueue, DEFAULT_CMD_QLEN,
+                               DEFAULT_CMD_QRSVD_SLOTS);
+       if (nb_ops > free_slots)
+               nb_ops = free_slots;
 
        count = 0;
        while (likely(count < nb_ops)) {
 
                /* Enqueue single op */
-               req = otx_cpt_enq_single(instance, ops[count], pqueue, op_type);
+               req = otx_cpt_enq_single(instance, ops[count], op_type);
 
                if (unlikely(req == NULL))
                        break;
 
-               pqueue->req_queue[pqueue->enq_tail] = (uintptr_t)req;
-               MOD_INC(pqueue->enq_tail, DEFAULT_CMD_QLEN);
-               pqueue->pending_count += 1;
+               pending_queue_push(pqueue, req, count, DEFAULT_CMD_QLEN);
                count++;
        }
-       otx_cpt_ring_dbell(instance, count);
+
+       if (likely(count)) {
+               pending_queue_commit(pqueue, count, DEFAULT_CMD_QLEN);
+               otx_cpt_ring_dbell(instance, count);
+       }
        return count;
 }
 
@@ -756,8 +746,7 @@ otx_crypto_adapter_enqueue(void *port, struct rte_crypto_op 
*op)
 
        op_type = op->type == RTE_CRYPTO_OP_TYPE_SYMMETRIC ? OP_TYPE_SYM :
                                                             OP_TYPE_ASYM;
-       req = otx_cpt_enq_single(instance, op,
-                                &((struct cpt_vf *)instance)->pqueue, op_type);
+       req = otx_cpt_enq_single(instance, op, op_type);
        if (unlikely(req == NULL))
                return 0;
 
@@ -971,17 +960,16 @@ otx_cpt_pkt_dequeue(void *qptr, struct rte_crypto_op 
**ops, uint16_t nb_ops,
        int nb_completed;
        struct pending_queue *pqueue = &cptvf->pqueue;
 
-       pcount = pqueue->pending_count;
+       pcount = pending_queue_level(pqueue, DEFAULT_CMD_QLEN);
+
+       /* Ensure pcount isn't read before data lands */
+       rte_atomic_thread_fence(__ATOMIC_ACQUIRE);
+
        count = (nb_ops > pcount) ? pcount : nb_ops;
 
        for (i = 0; i < count; i++) {
-               user_req = (struct cpt_request_info *)
-                               pqueue->req_queue[pqueue->deq_head];
-
-               if (likely((i+1) < count)) {
-                       rte_prefetch_non_temporal(
-                               (void *)pqueue->req_queue[i+1]);
-               }
+               pending_queue_peek(pqueue, (void **) &user_req,
+                       DEFAULT_CMD_QLEN, i + 1 < count);
 
                ret = check_nb_command_id(user_req, instance);
 
@@ -997,8 +985,7 @@ otx_cpt_pkt_dequeue(void *qptr, struct rte_crypto_op **ops, 
uint16_t nb_ops,
                CPT_LOG_DP_DEBUG("Request %p Op %p completed with code %d",
                                 user_req, user_req->op, ret);
 
-               MOD_INC(pqueue->deq_head, DEFAULT_CMD_QLEN);
-               pqueue->pending_count -= 1;
+               pending_queue_pop(pqueue, DEFAULT_CMD_QLEN);
        }
 
        nb_completed = i;
diff --git a/drivers/crypto/octeontx2/otx2_cryptodev_hw_access.h 
b/drivers/crypto/octeontx2/otx2_cryptodev_hw_access.h
index f9981ea..90a338e 100644
--- a/drivers/crypto/octeontx2/otx2_cryptodev_hw_access.h
+++ b/drivers/crypto/octeontx2/otx2_cryptodev_hw_access.h
@@ -17,10 +17,10 @@
 #include "otx2_dev.h"
 #include "otx2_cryptodev_qp.h"
 
-/* CPT instruction queue length */
-#define OTX2_CPT_IQ_LEN                        8200
-
-#define OTX2_CPT_DEFAULT_CMD_QLEN      OTX2_CPT_IQ_LEN
+/* CPT instruction queue length.
+ * Use queue size as power of 2 for aiding in pending queue calculations.
+ */
+#define OTX2_CPT_DEFAULT_CMD_QLEN      8192
 
 /* Mask which selects all engine groups */
 #define OTX2_CPT_ENG_GRPS_MASK         0xFF
diff --git a/drivers/crypto/octeontx2/otx2_cryptodev_ops.c 
b/drivers/crypto/octeontx2/otx2_cryptodev_ops.c
index 952d135..7134fa7 100644
--- a/drivers/crypto/octeontx2/otx2_cryptodev_ops.c
+++ b/drivers/crypto/octeontx2/otx2_cryptodev_ops.c
@@ -49,6 +49,7 @@ otx2_cpt_metabuf_mempool_create(const struct rte_cryptodev 
*dev,
 {
        char mempool_name[RTE_MEMPOOL_NAMESIZE];
        struct cpt_qp_meta_info *meta_info;
+       int lcore_cnt = rte_lcore_count();
        int ret, max_mlen, mb_pool_sz;
        struct rte_mempool *pool;
        int asym_mlen = 0;
@@ -87,7 +88,13 @@ otx2_cpt_metabuf_mempool_create(const struct rte_cryptodev 
*dev,
        snprintf(mempool_name, RTE_MEMPOOL_NAMESIZE, "otx2_cpt_mb_%u:%u",
                 dev->data->dev_id, qp_id);
 
-       mb_pool_sz = RTE_MAX(nb_elements, (METABUF_POOL_CACHE_SIZE * 
rte_lcore_count()));
+       mb_pool_sz = nb_elements;
+
+       /* For poll mode, core that enqueues and core that dequeues can be
+        * different. For event mode, all cores are allowed to use same crypto
+        * queue pair.
+        */
+       mb_pool_sz += (RTE_MAX(2, lcore_cnt) * METABUF_POOL_CACHE_SIZE);
 
        pool = rte_mempool_create_empty(mempool_name, mb_pool_sz, max_mlen,
                                        METABUF_POOL_CACHE_SIZE, 0,
@@ -187,7 +194,13 @@ otx2_cpt_qp_create(const struct rte_cryptodev *dev, 
uint16_t qp_id,
                return NULL;
        }
 
-       iq_len = OTX2_CPT_IQ_LEN;
+       /*
+        * Pending queue updates make assumption that queue size is a power
+        * of 2.
+        */
+       RTE_BUILD_BUG_ON(!RTE_IS_POWER_OF_2(OTX2_CPT_DEFAULT_CMD_QLEN));
+
+       iq_len = OTX2_CPT_DEFAULT_CMD_QLEN;
 
        /*
         * Queue size must be a multiple of 40 and effective queue size to
@@ -196,7 +209,7 @@ otx2_cpt_qp_create(const struct rte_cryptodev *dev, 
uint16_t qp_id,
        size_div40 = (iq_len + 40 - 1) / 40 + 1;
 
        /* For pending queue */
-       len = iq_len * sizeof(uintptr_t);
+       len = iq_len * RTE_ALIGN(sizeof(qp->pend_q.rid_queue[0]), 8);
 
        /* Space for instruction group memory */
        len += size_div40 * 16;
@@ -205,7 +218,7 @@ otx2_cpt_qp_create(const struct rte_cryptodev *dev, 
uint16_t qp_id,
        len = RTE_ALIGN(len, pg_sz);
 
        /* For instruction queues */
-       len += OTX2_CPT_IQ_LEN * sizeof(union cpt_inst_s);
+       len += OTX2_CPT_DEFAULT_CMD_QLEN * sizeof(union cpt_inst_s);
 
        /* Wastage after instruction queues */
        len = RTE_ALIGN(len, pg_sz);
@@ -233,12 +246,11 @@ otx2_cpt_qp_create(const struct rte_cryptodev *dev, 
uint16_t qp_id,
        }
 
        /* Initialize pending queue */
-       qp->pend_q.req_queue = (uintptr_t *)va;
-       qp->pend_q.enq_tail = 0;
-       qp->pend_q.deq_head = 0;
-       qp->pend_q.pending_count = 0;
+       qp->pend_q.rid_queue = (void **)va;
+       qp->pend_q.tail = 0;
+       qp->pend_q.head = 0;
 
-       used_len = iq_len * sizeof(uintptr_t);
+       used_len = iq_len * RTE_ALIGN(sizeof(qp->pend_q.rid_queue[0]), 8);
        used_len += size_div40 * 16;
        used_len = RTE_ALIGN(used_len, pg_sz);
        iova += used_len;
@@ -514,7 +526,8 @@ otx2_cpt_enqueue_req(const struct otx2_cpt_qp *qp,
                     struct pending_queue *pend_q,
                     struct cpt_request_info *req,
                     struct rte_crypto_op *op,
-                    uint64_t cpt_inst_w7)
+                    uint64_t cpt_inst_w7,
+                    unsigned int burst_index)
 {
        void *lmtline = qp->lmtline;
        union cpt_inst_s inst;
@@ -523,9 +536,6 @@ otx2_cpt_enqueue_req(const struct otx2_cpt_qp *qp,
        if (qp->ca_enable)
                return otx2_ca_enqueue_req(qp, req, lmtline, op, cpt_inst_w7);
 
-       if (unlikely(pend_q->pending_count >= OTX2_CPT_DEFAULT_CMD_QLEN))
-               return -EAGAIN;
-
        inst.u[0] = 0;
        inst.s9x.res_addr = req->comp_baddr;
        inst.u[2] = 0;
@@ -553,11 +563,7 @@ otx2_cpt_enqueue_req(const struct otx2_cpt_qp *qp,
                lmt_status = otx2_lmt_submit(qp->lf_nq_reg);
        } while (lmt_status == 0);
 
-       pend_q->req_queue[pend_q->enq_tail] = (uintptr_t)req;
-
-       /* We will use soft queue length here to limit requests */
-       MOD_INC(pend_q->enq_tail, OTX2_CPT_DEFAULT_CMD_QLEN);
-       pend_q->pending_count += 1;
+       pending_queue_push(pend_q, req, burst_index, OTX2_CPT_DEFAULT_CMD_QLEN);
 
        return 0;
 }
@@ -565,7 +571,8 @@ otx2_cpt_enqueue_req(const struct otx2_cpt_qp *qp,
 static __rte_always_inline int32_t __rte_hot
 otx2_cpt_enqueue_asym(struct otx2_cpt_qp *qp,
                      struct rte_crypto_op *op,
-                     struct pending_queue *pend_q)
+                     struct pending_queue *pend_q,
+                     unsigned int burst_index)
 {
        struct cpt_qp_meta_info *minfo = &qp->meta_info;
        struct rte_crypto_asym_op *asym_op = op->asym;
@@ -626,8 +633,7 @@ otx2_cpt_enqueue_asym(struct otx2_cpt_qp *qp,
        }
 
        ret = otx2_cpt_enqueue_req(qp, pend_q, params.req, op,
-                                  sess->cpt_inst_w7);
-
+                                  sess->cpt_inst_w7, burst_index);
        if (unlikely(ret)) {
                CPT_LOG_DP_ERR("Could not enqueue crypto req");
                goto req_fail;
@@ -643,7 +649,7 @@ otx2_cpt_enqueue_asym(struct otx2_cpt_qp *qp,
 
 static __rte_always_inline int __rte_hot
 otx2_cpt_enqueue_sym(struct otx2_cpt_qp *qp, struct rte_crypto_op *op,
-                    struct pending_queue *pend_q)
+                    struct pending_queue *pend_q, unsigned int burst_index)
 {
        struct rte_crypto_sym_op *sym_op = op->sym;
        struct cpt_request_info *req;
@@ -670,8 +676,8 @@ otx2_cpt_enqueue_sym(struct otx2_cpt_qp *qp, struct 
rte_crypto_op *op,
                return ret;
        }
 
-       ret = otx2_cpt_enqueue_req(qp, pend_q, req, op, sess->cpt_inst_w7);
-
+       ret = otx2_cpt_enqueue_req(qp, pend_q, req, op, sess->cpt_inst_w7,
+                                   burst_index);
        if (unlikely(ret)) {
                /* Free buffer allocated by fill params routines */
                free_op_meta(mdata, qp->meta_info.pool);
@@ -682,7 +688,8 @@ otx2_cpt_enqueue_sym(struct otx2_cpt_qp *qp, struct 
rte_crypto_op *op,
 
 static __rte_always_inline int __rte_hot
 otx2_cpt_enqueue_sec(struct otx2_cpt_qp *qp, struct rte_crypto_op *op,
-                    struct pending_queue *pend_q)
+                    struct pending_queue *pend_q,
+                    const unsigned int burst_index)
 {
        uint32_t winsz, esn_low = 0, esn_hi = 0, seql = 0, seqh = 0;
        struct rte_mbuf *m_src = op->sym->m_src;
@@ -739,7 +746,8 @@ otx2_cpt_enqueue_sec(struct otx2_cpt_qp *qp, struct 
rte_crypto_op *op,
                return ret;
        }
 
-       ret = otx2_cpt_enqueue_req(qp, pend_q, req, op, sess->cpt_inst_w7);
+       ret = otx2_cpt_enqueue_req(qp, pend_q, req, op, sess->cpt_inst_w7,
+                                   burst_index);
 
        if (winsz && esn) {
                seq_in_sa = ((uint64_t)esn_hi << 32) | esn_low;
@@ -754,7 +762,8 @@ otx2_cpt_enqueue_sec(struct otx2_cpt_qp *qp, struct 
rte_crypto_op *op,
 
 static __rte_always_inline int __rte_hot
 otx2_cpt_enqueue_sym_sessless(struct otx2_cpt_qp *qp, struct rte_crypto_op *op,
-                             struct pending_queue *pend_q)
+                             struct pending_queue *pend_q,
+                             unsigned int burst_index)
 {
        const int driver_id = otx2_cryptodev_driver_id;
        struct rte_crypto_sym_op *sym_op = op->sym;
@@ -773,7 +782,7 @@ otx2_cpt_enqueue_sym_sessless(struct otx2_cpt_qp *qp, 
struct rte_crypto_op *op,
 
        sym_op->session = sess;
 
-       ret = otx2_cpt_enqueue_sym(qp, op, pend_q);
+       ret = otx2_cpt_enqueue_sym(qp, op, pend_q, burst_index);
 
        if (unlikely(ret))
                goto priv_put;
@@ -798,23 +807,26 @@ otx2_cpt_enqueue_burst(void *qptr, struct rte_crypto_op 
**ops, uint16_t nb_ops)
 
        pend_q = &qp->pend_q;
 
-       nb_allowed = OTX2_CPT_DEFAULT_CMD_QLEN - pend_q->pending_count;
-       if (nb_ops > nb_allowed)
-               nb_ops = nb_allowed;
+       nb_allowed = pending_queue_free_slots(pend_q,
+                               OTX2_CPT_DEFAULT_CMD_QLEN, 0);
+       nb_ops = RTE_MIN(nb_ops, nb_allowed);
 
        for (count = 0; count < nb_ops; count++) {
                op = ops[count];
                if (op->type == RTE_CRYPTO_OP_TYPE_SYMMETRIC) {
                        if (op->sess_type == RTE_CRYPTO_OP_SECURITY_SESSION)
-                               ret = otx2_cpt_enqueue_sec(qp, op, pend_q);
+                               ret = otx2_cpt_enqueue_sec(qp, op, pend_q,
+                                                          count);
                        else if (op->sess_type == RTE_CRYPTO_OP_WITH_SESSION)
-                               ret = otx2_cpt_enqueue_sym(qp, op, pend_q);
+                               ret = otx2_cpt_enqueue_sym(qp, op, pend_q,
+                                                          count);
                        else
                                ret = otx2_cpt_enqueue_sym_sessless(qp, op,
-                                                                   pend_q);
+                                               pend_q, count);
                } else if (op->type == RTE_CRYPTO_OP_TYPE_ASYMMETRIC) {
                        if (op->sess_type == RTE_CRYPTO_OP_WITH_SESSION)
-                               ret = otx2_cpt_enqueue_asym(qp, op, pend_q);
+                               ret = otx2_cpt_enqueue_asym(qp, op, pend_q,
+                                                               count);
                        else
                                break;
                } else
@@ -824,6 +836,9 @@ otx2_cpt_enqueue_burst(void *qptr, struct rte_crypto_op 
**ops, uint16_t nb_ops)
                        break;
        }
 
+       if (unlikely(!qp->ca_enable))
+               pending_queue_commit(pend_q, count, OTX2_CPT_DEFAULT_CMD_QLEN);
+
        return count;
 }
 
@@ -1059,14 +1074,16 @@ otx2_cpt_dequeue_burst(void *qptr, struct rte_crypto_op 
**ops, uint16_t nb_ops)
 
        pend_q = &qp->pend_q;
 
-       nb_pending = pend_q->pending_count;
+       nb_pending = pending_queue_level(pend_q, OTX2_CPT_DEFAULT_CMD_QLEN);
+
+       /* Ensure pcount isn't read before data lands */
+       rte_atomic_thread_fence(__ATOMIC_ACQUIRE);
 
-       if (nb_ops > nb_pending)
-               nb_ops = nb_pending;
+       nb_ops = RTE_MIN(nb_ops, nb_pending);
 
        for (i = 0; i < nb_ops; i++) {
-               req = (struct cpt_request_info *)
-                               pend_q->req_queue[pend_q->deq_head];
+               pending_queue_peek(pend_q, (void **)&req,
+                       OTX2_CPT_DEFAULT_CMD_QLEN, 0);
 
                cc[i] = otx2_cpt_compcode_get(req);
 
@@ -1075,8 +1092,7 @@ otx2_cpt_dequeue_burst(void *qptr, struct rte_crypto_op 
**ops, uint16_t nb_ops)
 
                ops[i] = req->op;
 
-               MOD_INC(pend_q->deq_head, OTX2_CPT_DEFAULT_CMD_QLEN);
-               pend_q->pending_count -= 1;
+               pending_queue_pop(pend_q, OTX2_CPT_DEFAULT_CMD_QLEN);
        }
 
        nb_completed = i;
-- 
2.7.4

Reply via email to