Move crypto ops to circular buffer to retain crypto
ops when cryptodev/eventdev are temporarily full

Signed-off-by: Ganapati Kundapura <ganapati.kundap...@intel.com>

diff --git a/lib/eventdev/rte_event_crypto_adapter.c 
b/lib/eventdev/rte_event_crypto_adapter.c
index d840803..4469a89 100644
--- a/lib/eventdev/rte_event_crypto_adapter.c
+++ b/lib/eventdev/rte_event_crypto_adapter.c
@@ -25,11 +25,27 @@
 #define CRYPTO_ADAPTER_MEM_NAME_LEN 32
 #define CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100
 
+#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE)
+#define CRYPTO_ADAPTER_BUFFER_SZ 1024
+
 /* Flush an instance's enqueue buffers every CRYPTO_ENQ_FLUSH_THRESHOLD
  * iterations of eca_crypto_adapter_enq_run()
  */
 #define CRYPTO_ENQ_FLUSH_THRESHOLD 1024
 
+struct crypto_ops_circular_buffer {
+       /* index of head element in circular buffer */
+       uint16_t head;
+       /* index of tail element in circular buffer */
+       uint16_t tail;
+       /* number elements in buffer */
+       uint16_t count;
+       /* size of circular buffer */
+       uint16_t size;
+       /* Pointer to hold rte_crypto_ops for batching */
+       struct rte_crypto_op **op_buffer;
+} __rte_cache_aligned;
+
 struct event_crypto_adapter {
        /* Event device identifier */
        uint8_t eventdev_id;
@@ -47,6 +63,8 @@ struct event_crypto_adapter {
        struct crypto_device_info *cdevs;
        /* Loop counter to flush crypto ops */
        uint16_t transmit_loop_count;
+       /* Circular buffer for batching crypto ops to eventdev */
+       struct crypto_ops_circular_buffer ebuf;
        /* Per instance stats structure */
        struct rte_event_crypto_adapter_stats crypto_stats;
        /* Configuration callback for rte_service configuration */
@@ -93,8 +111,8 @@ struct crypto_device_info {
 struct crypto_queue_pair_info {
        /* Set to indicate queue pair is enabled */
        bool qp_enabled;
-       /* Pointer to hold rte_crypto_ops for batching */
-       struct rte_crypto_op **op_buffer;
+       /* Circular buffer for batching crypto ops to cdev */
+       struct crypto_ops_circular_buffer cbuf;
        /* No of crypto ops accumulated */
        uint8_t len;
 } __rte_cache_aligned;
@@ -141,6 +159,77 @@ eca_init(void)
        return 0;
 }
 
+static inline bool
+eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer *bufp)
+{
+       return bufp->count >= BATCH_SIZE;
+}
+
+static inline void
+eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp)
+{
+       rte_free(bufp->op_buffer);
+}
+
+static inline int
+eca_circular_buffer_init(const char *name,
+                        struct crypto_ops_circular_buffer *bufp,
+                        uint16_t sz)
+{
+       bufp->op_buffer = rte_zmalloc(name,
+                                     sizeof(struct rte_crypto_op *) * sz,
+                                     0);
+       if (bufp->op_buffer == NULL)
+               return -ENOMEM;
+
+       bufp->size = sz;
+       return 0;
+}
+
+static inline int
+eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp,
+                       struct rte_crypto_op *op)
+{
+       uint16_t *tailp = &bufp->tail;
+
+       bufp->op_buffer[*tailp] = op;
+       *tailp = (*tailp + 1) % bufp->size;
+       bufp->count++;
+
+       return 0;
+}
+
+static inline int
+eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer *bufp,
+                                 uint8_t cdev_id, uint16_t qp_id,
+                                 uint16_t *nb_ops_flushed)
+{
+       uint16_t n = 0;
+       uint16_t *headp = &bufp->head;
+       uint16_t *tailp = &bufp->tail;
+       struct rte_crypto_op **ops = bufp->op_buffer;
+
+       if (*tailp > *headp)
+               n = *tailp - *headp;
+       else if (*tailp < *headp)
+               n = bufp->size - *headp;
+       else {
+               *nb_ops_flushed = 0;
+               return 0;  /* buffer empty */
+       }
+
+       *nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id,
+                                                     &ops[*headp], n);
+       bufp->count -= *nb_ops_flushed;
+       if (!bufp->count) {
+               *headp = 0;
+               *tailp = 0;
+       } else
+               *headp = (*headp + *nb_ops_flushed) % bufp->size;
+
+       return *nb_ops_flushed == n ? 0 : -1;
+}
+
 static inline struct event_crypto_adapter *
 eca_id_to_adapter(uint8_t id)
 {
@@ -237,10 +326,19 @@ rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t 
dev_id,
                return -ENOMEM;
        }
 
+       if (eca_circular_buffer_init("eca_edev_circular_buffer",
+                                    &adapter->ebuf,
+                                    CRYPTO_ADAPTER_BUFFER_SZ)) {
+               RTE_EDEV_LOG_ERR("Failed to get mem for edev buffer");
+               rte_free(adapter);
+               return -ENOMEM;
+       }
+
        ret = rte_event_dev_info_get(dev_id, &dev_info);
        if (ret < 0) {
                RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d: %s!",
                                 dev_id, dev_info.driver_name);
+               eca_circular_buffer_free(&adapter->ebuf);
                rte_free(adapter);
                return ret;
        }
@@ -259,6 +357,7 @@ rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t 
dev_id,
                                        socket_id);
        if (adapter->cdevs == NULL) {
                RTE_EDEV_LOG_ERR("Failed to get mem for crypto devices\n");
+               eca_circular_buffer_free(&adapter->ebuf);
                rte_free(adapter);
                return -ENOMEM;
        }
@@ -337,11 +436,10 @@ eca_enq_to_cryptodev(struct event_crypto_adapter 
*adapter, struct rte_event *ev,
        struct crypto_queue_pair_info *qp_info = NULL;
        struct rte_crypto_op *crypto_op;
        unsigned int i, n;
-       uint16_t qp_id, len, ret;
+       uint16_t qp_id, len, nb_enqueued = 0;
        uint8_t cdev_id;
 
        len = 0;
-       ret = 0;
        n = 0;
        stats->event_deq_count += cnt;
 
@@ -367,7 +465,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, 
struct rte_event *ev,
                                continue;
                        }
                        len = qp_info->len;
-                       qp_info->op_buffer[len] = crypto_op;
+                       eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
                        len++;
                } else if (crypto_op->sess_type == RTE_CRYPTO_OP_SESSIONLESS &&
                                crypto_op->private_data_offset) {
@@ -383,7 +481,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, 
struct rte_event *ev,
                                continue;
                        }
                        len = qp_info->len;
-                       qp_info->op_buffer[len] = crypto_op;
+                       eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
                        len++;
                } else {
                        rte_pktmbuf_free(crypto_op->sym->m_src);
@@ -391,18 +489,16 @@ eca_enq_to_cryptodev(struct event_crypto_adapter 
*adapter, struct rte_event *ev,
                        continue;
                }
 
-               if (len == BATCH_SIZE) {
-                       struct rte_crypto_op **op_buffer = qp_info->op_buffer;
-                       ret = rte_cryptodev_enqueue_burst(cdev_id,
+               if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) {
+                       eca_circular_buffer_flush_to_cdev(&qp_info->cbuf,
+                                                         cdev_id,
                                                          qp_id,
-                                                         op_buffer,
-                                                         BATCH_SIZE);
+                                                         &nb_enqueued);
+                       stats->crypto_enq_count += nb_enqueued;
 
-                       stats->crypto_enq_count += ret;
-
-                       while (ret < len) {
+                       while (nb_enqueued < len) {
                                struct rte_crypto_op *op;
-                               op = op_buffer[ret++];
+                               op = qp_info->cbuf.op_buffer[nb_enqueued++];
                                stats->crypto_enq_fail++;
                                rte_pktmbuf_free(op->sym->m_src);
                                rte_crypto_op_free(op);
@@ -413,7 +509,8 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, 
struct rte_event *ev,
 
                if (qp_info)
                        qp_info->len = len;
-               n += ret;
+
+               n += nb_enqueued;
        }
 
        return n;
@@ -425,14 +522,12 @@ eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
        struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
        struct crypto_device_info *curr_dev;
        struct crypto_queue_pair_info *curr_queue;
-       struct rte_crypto_op **op_buffer;
        struct rte_cryptodev *dev;
        uint8_t cdev_id;
        uint16_t qp;
-       uint16_t ret;
+       uint16_t nb_enqueued = 0, nb = 0;
        uint16_t num_cdev = rte_cryptodev_count();
 
-       ret = 0;
        for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) {
                curr_dev = &adapter->cdevs[cdev_id];
                dev = curr_dev->dev;
@@ -444,25 +539,26 @@ eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
                        if (!curr_queue->qp_enabled)
                                continue;
 
-                       op_buffer = curr_queue->op_buffer;
-                       ret = rte_cryptodev_enqueue_burst(cdev_id,
+                       eca_circular_buffer_flush_to_cdev(&curr_queue->cbuf,
+                                                         cdev_id,
                                                          qp,
-                                                         op_buffer,
-                                                         curr_queue->len);
-                       stats->crypto_enq_count += ret;
+                                                         &nb_enqueued);
+
+                       stats->crypto_enq_count += nb_enqueued;
 
-                       while (ret < curr_queue->len) {
+                       while (nb_enqueued < curr_queue->len) {
                                struct rte_crypto_op *op;
-                               op = op_buffer[ret++];
+                               op = curr_queue->cbuf.op_buffer[nb_enqueued++];
                                stats->crypto_enq_fail++;
                                rte_pktmbuf_free(op->sym->m_src);
                                rte_crypto_op_free(op);
                        }
                        curr_queue->len = 0;
+                       nb += nb_enqueued;
                }
        }
 
-       return ret;
+       return nb;
 }
 
 static int
@@ -499,9 +595,9 @@ eca_crypto_adapter_enq_run(struct event_crypto_adapter 
*adapter,
        return nb_enqueued;
 }
 
-static inline void
+static inline uint16_t
 eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
-                     struct rte_crypto_op **ops, uint16_t num)
+                 struct rte_crypto_op **ops, uint16_t num)
 {
        struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
        union rte_event_crypto_metadata *m_data = NULL;
@@ -518,6 +614,8 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
        num = RTE_MIN(num, BATCH_SIZE);
        for (i = 0; i < num; i++) {
                struct rte_event *ev = &events[nb_ev++];
+
+               m_data = NULL;
                if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) {
                        m_data = rte_cryptodev_sym_session_get_user_data(
                                        ops[i]->sym->session);
@@ -548,6 +646,7 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
                                                  event_port_id,
                                                  &events[nb_enqueued],
                                                  nb_ev - nb_enqueued);
+
        } while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES &&
                 nb_enqueued < nb_ev);
 
@@ -561,8 +660,52 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
        stats->event_enq_fail_count += nb_ev - nb_enqueued;
        stats->event_enq_count += nb_enqueued;
        stats->event_enq_retry_count += retry - 1;
+
+       return nb_enqueued;
 }
 
+static int
+eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter *adapter,
+                                  struct crypto_ops_circular_buffer *bufp)
+{
+       uint16_t n = 0, nb_ops_flushed;
+       uint16_t *headp = &bufp->head;
+       uint16_t *tailp = &bufp->tail;
+       struct rte_crypto_op **ops = bufp->op_buffer;
+
+       if (*tailp > *headp)
+               n = *tailp - *headp;
+       else if (*tailp < *headp)
+               n = bufp->size - *headp;
+       else
+               return 0;  /* buffer empty */
+
+       nb_ops_flushed =  eca_ops_enqueue_burst(adapter, ops, n);
+       bufp->count -= nb_ops_flushed;
+       if (!bufp->count) {
+               *headp = 0;
+               *tailp = 0;
+       } else
+               *headp = (*headp + nb_ops_flushed) % bufp->size;
+
+       /* Stop further enqueue to eventdev, if nb_ops_flushed < n */
+       if (nb_ops_flushed < n)
+               return 0;
+
+       return 1;
+}
+
+
+static void
+eca_ops_buffer_flush(struct event_crypto_adapter *adapter)
+{
+       if (adapter->ebuf.count == 0)
+               return;
+
+       while (eca_circular_buffer_flush_to_evdev(adapter,
+                                                 &adapter->ebuf))
+               ;
+}
 static inline unsigned int
 eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
                           unsigned int max_deq)
@@ -571,7 +714,7 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter 
*adapter,
        struct crypto_device_info *curr_dev;
        struct crypto_queue_pair_info *curr_queue;
        struct rte_crypto_op *ops[BATCH_SIZE];
-       uint16_t n, nb_deq;
+       uint16_t n, nb_deq, nb_enqueued, i;
        struct rte_cryptodev *dev;
        uint8_t cdev_id;
        uint16_t qp, dev_qps;
@@ -579,16 +722,20 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter 
*adapter,
        uint16_t num_cdev = rte_cryptodev_count();
 
        nb_deq = 0;
+       eca_ops_buffer_flush(adapter);
+
        do {
-               uint16_t queues = 0;
                done = true;
 
                for (cdev_id = adapter->next_cdev_id;
                        cdev_id < num_cdev; cdev_id++) {
+                       uint16_t queues = 0;
+
                        curr_dev = &adapter->cdevs[cdev_id];
                        dev = curr_dev->dev;
                        if (dev == NULL)
                                continue;
+
                        dev_qps = dev->data->nb_queue_pairs;
 
                        for (qp = curr_dev->next_queue_pair_id;
@@ -605,8 +752,24 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter 
*adapter,
                                        continue;
 
                                done = false;
+                               nb_enqueued = 0;
+
                                stats->crypto_deq_count += n;
-                               eca_ops_enqueue_burst(adapter, ops, n);
+
+                               if (unlikely(!adapter->ebuf.count))
+                                       nb_enqueued = eca_ops_enqueue_burst(
+                                                       adapter, ops, n);
+
+                               if (nb_enqueued == n)
+                                       goto check;
+
+                               /* Failed to enqueue events case */
+                               for (i = nb_enqueued; i < n; i++)
+                                       eca_circular_buffer_add(
+                                               &adapter->ebuf,
+                                               ops[nb_enqueued]);
+
+check:
                                nb_deq += n;
 
                                if (nb_deq > max_deq) {
@@ -751,11 +914,10 @@ eca_add_queue_pair(struct event_crypto_adapter *adapter, 
uint8_t cdev_id,
                        return -ENOMEM;
 
                qpairs = dev_info->qpairs;
-               qpairs->op_buffer = rte_zmalloc_socket(adapter->mem_name,
-                                       BATCH_SIZE *
-                                       sizeof(struct rte_crypto_op *),
-                                       0, adapter->socket_id);
-               if (!qpairs->op_buffer) {
+
+               if (eca_circular_buffer_init("eca_cdev_circular_buffer",
+                                            &qpairs->cbuf,
+                                            CRYPTO_ADAPTER_OPS_BUFFER_SZ)) {
                        rte_free(qpairs);
                        return -ENOMEM;
                }
-- 
2.6.4

Reply via email to