Move crypto ops to circular buffer to retain crypto
ops when cryptodev/eventdev are temporarily full

Signed-off-by: Ganapati Kundapura <ganapati.kundap...@intel.com>

---
v4:
* Retain the non enqueued crypto ops in circular buffer to
  process later and stop the dequeue from eventdev till
  all the crypto ops are enqueued to cryptodev

  check space in circular buffer and stop dequeue from
  eventdev if some ops failed to flush to cdev
  and no space for another batch is available in circular buffer

  Enable dequeue from eventdev after all the ops are flushed

v3:
* update eca_ops_buffer_flush() to flush out all the crypto
  ops out of circular buffer.
* remove freeing of failed crypto ops from eca_ops_enqueue_burst()
  and add to cirular buffer for later processing.

v2:
* reset crypto adapter next cdev id before dequeueing from the
  next cdev
---

diff --git a/lib/eventdev/rte_event_crypto_adapter.c 
b/lib/eventdev/rte_event_crypto_adapter.c
index d840803..0faac36 100644
--- a/lib/eventdev/rte_event_crypto_adapter.c
+++ b/lib/eventdev/rte_event_crypto_adapter.c
@@ -25,11 +25,27 @@
 #define CRYPTO_ADAPTER_MEM_NAME_LEN 32
 #define CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100
 
+#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE)
+#define CRYPTO_ADAPTER_BUFFER_SZ 1024
+
 /* Flush an instance's enqueue buffers every CRYPTO_ENQ_FLUSH_THRESHOLD
  * iterations of eca_crypto_adapter_enq_run()
  */
 #define CRYPTO_ENQ_FLUSH_THRESHOLD 1024
 
+struct crypto_ops_circular_buffer {
+       /* index of head element in circular buffer */
+       uint16_t head;
+       /* index of tail element in circular buffer */
+       uint16_t tail;
+       /* number of elements in buffer */
+       uint16_t count;
+       /* size of circular buffer */
+       uint16_t size;
+       /* Pointer to hold rte_crypto_ops for batching */
+       struct rte_crypto_op **op_buffer;
+} __rte_cache_aligned;
+
 struct event_crypto_adapter {
        /* Event device identifier */
        uint8_t eventdev_id;
@@ -37,6 +53,10 @@ struct event_crypto_adapter {
        uint8_t event_port_id;
        /* Store event device's implicit release capability */
        uint8_t implicit_release_disabled;
+       /* Flag to indicate backpressure at cryptodev
+        * Stop further dequeuing events from eventdev
+        */
+       bool stop_enq_to_cryptodev;
        /* Max crypto ops processed in any service function invocation */
        uint32_t max_nb;
        /* Lock to serialize config updates with service function */
@@ -47,6 +67,8 @@ struct event_crypto_adapter {
        struct crypto_device_info *cdevs;
        /* Loop counter to flush crypto ops */
        uint16_t transmit_loop_count;
+       /* Circular buffer for batching crypto ops to eventdev */
+       struct crypto_ops_circular_buffer ebuf;
        /* Per instance stats structure */
        struct rte_event_crypto_adapter_stats crypto_stats;
        /* Configuration callback for rte_service configuration */
@@ -93,10 +115,8 @@ struct crypto_device_info {
 struct crypto_queue_pair_info {
        /* Set to indicate queue pair is enabled */
        bool qp_enabled;
-       /* Pointer to hold rte_crypto_ops for batching */
-       struct rte_crypto_op **op_buffer;
-       /* No of crypto ops accumulated */
-       uint8_t len;
+       /* Circular buffer for batching crypto ops to cdev */
+       struct crypto_ops_circular_buffer cbuf;
 } __rte_cache_aligned;
 
 static struct event_crypto_adapter **event_crypto_adapter;
@@ -141,6 +161,84 @@ eca_init(void)
        return 0;
 }
 
+static inline bool
+eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer *bufp)
+{
+       return bufp->count >= BATCH_SIZE;
+}
+
+static inline bool
+eca_circular_buffer_space_for_batch(struct crypto_ops_circular_buffer *bufp)
+{
+       return (bufp->size - bufp->count) >= BATCH_SIZE;
+}
+
+static inline void
+eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp)
+{
+       rte_free(bufp->op_buffer);
+}
+
+static inline int
+eca_circular_buffer_init(const char *name,
+                        struct crypto_ops_circular_buffer *bufp,
+                        uint16_t sz)
+{
+       bufp->op_buffer = rte_zmalloc(name,
+                                     sizeof(struct rte_crypto_op *) * sz,
+                                     0);
+       if (bufp->op_buffer == NULL)
+               return -ENOMEM;
+
+       bufp->size = sz;
+       return 0;
+}
+
+static inline int
+eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp,
+                       struct rte_crypto_op *op)
+{
+       uint16_t *tailp = &bufp->tail;
+
+       bufp->op_buffer[*tailp] = op;
+       /* circular buffer, go round */
+       *tailp = (*tailp + 1) % bufp->size;
+       bufp->count++;
+
+       return 0;
+}
+
+static inline int
+eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer *bufp,
+                                 uint8_t cdev_id, uint16_t qp_id,
+                                 uint16_t *nb_ops_flushed)
+{
+       uint16_t n = 0;
+       uint16_t *headp = &bufp->head;
+       uint16_t *tailp = &bufp->tail;
+       struct rte_crypto_op **ops = bufp->op_buffer;
+
+       if (*tailp > *headp)
+               n = *tailp - *headp;
+       else if (*tailp < *headp)
+               n = bufp->size - *headp;
+       else {
+               *nb_ops_flushed = 0;
+               return 0;  /* buffer empty */
+       }
+
+       *nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id,
+                                                     &ops[*headp], n);
+       bufp->count -= *nb_ops_flushed;
+       if (!bufp->count) {
+               *headp = 0;
+               *tailp = 0;
+       } else
+               *headp = (*headp + *nb_ops_flushed) % bufp->size;
+
+       return *nb_ops_flushed == n ? 0 : -1;
+}
+
 static inline struct event_crypto_adapter *
 eca_id_to_adapter(uint8_t id)
 {
@@ -237,10 +335,19 @@ rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t 
dev_id,
                return -ENOMEM;
        }
 
+       if (eca_circular_buffer_init("eca_edev_circular_buffer",
+                                    &adapter->ebuf,
+                                    CRYPTO_ADAPTER_BUFFER_SZ)) {
+               RTE_EDEV_LOG_ERR("Failed to get memory for eventdev buffer");
+               rte_free(adapter);
+               return -ENOMEM;
+       }
+
        ret = rte_event_dev_info_get(dev_id, &dev_info);
        if (ret < 0) {
                RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d: %s!",
                                 dev_id, dev_info.driver_name);
+               eca_circular_buffer_free(&adapter->ebuf);
                rte_free(adapter);
                return ret;
        }
@@ -259,6 +366,7 @@ rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t 
dev_id,
                                        socket_id);
        if (adapter->cdevs == NULL) {
                RTE_EDEV_LOG_ERR("Failed to get mem for crypto devices\n");
+               eca_circular_buffer_free(&adapter->ebuf);
                rte_free(adapter);
                return -ENOMEM;
        }
@@ -337,10 +445,10 @@ eca_enq_to_cryptodev(struct event_crypto_adapter 
*adapter, struct rte_event *ev,
        struct crypto_queue_pair_info *qp_info = NULL;
        struct rte_crypto_op *crypto_op;
        unsigned int i, n;
-       uint16_t qp_id, len, ret;
+       uint16_t qp_id, nb_enqueued = 0;
        uint8_t cdev_id;
+       int ret;
 
-       len = 0;
        ret = 0;
        n = 0;
        stats->event_deq_count += cnt;
@@ -366,9 +474,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, 
struct rte_event *ev,
                                rte_crypto_op_free(crypto_op);
                                continue;
                        }
-                       len = qp_info->len;
-                       qp_info->op_buffer[len] = crypto_op;
-                       len++;
+                       eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
                } else if (crypto_op->sess_type == RTE_CRYPTO_OP_SESSIONLESS &&
                                crypto_op->private_data_offset) {
                        m_data = (union rte_event_crypto_metadata *)
@@ -382,87 +488,91 @@ eca_enq_to_cryptodev(struct event_crypto_adapter 
*adapter, struct rte_event *ev,
                                rte_crypto_op_free(crypto_op);
                                continue;
                        }
-                       len = qp_info->len;
-                       qp_info->op_buffer[len] = crypto_op;
-                       len++;
+                       eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
                } else {
                        rte_pktmbuf_free(crypto_op->sym->m_src);
                        rte_crypto_op_free(crypto_op);
                        continue;
                }
 
-               if (len == BATCH_SIZE) {
-                       struct rte_crypto_op **op_buffer = qp_info->op_buffer;
-                       ret = rte_cryptodev_enqueue_burst(cdev_id,
-                                                         qp_id,
-                                                         op_buffer,
-                                                         BATCH_SIZE);
-
-                       stats->crypto_enq_count += ret;
-
-                       while (ret < len) {
-                               struct rte_crypto_op *op;
-                               op = op_buffer[ret++];
-                               stats->crypto_enq_fail++;
-                               rte_pktmbuf_free(op->sym->m_src);
-                               rte_crypto_op_free(op);
-                       }
-
-                       len = 0;
+               if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) {
+                       ret = eca_circular_buffer_flush_to_cdev(&qp_info->cbuf,
+                                                               cdev_id,
+                                                               qp_id,
+                                                               &nb_enqueued);
+                       /**
+                        * If some crypto ops failed to flush to cdev and
+                        * space for another batch is not available, stop
+                        * dequeue from eventdev momentarily
+                        */
+                       if (unlikely(ret < 0 &&
+                               !eca_circular_buffer_space_for_batch(
+                                                       &qp_info->cbuf)))
+                               adapter->stop_enq_to_cryptodev = true;
                }
 
-               if (qp_info)
-                       qp_info->len = len;
-               n += ret;
+               stats->crypto_enq_count += nb_enqueued;
+               n += nb_enqueued;
        }
 
        return n;
 }
 
 static unsigned int
-eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
+eca_crypto_cdev_flush(struct event_crypto_adapter *adapter,
+                     uint8_t cdev_id, uint16_t *nb_ops_flushed)
 {
-       struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
        struct crypto_device_info *curr_dev;
        struct crypto_queue_pair_info *curr_queue;
-       struct rte_crypto_op **op_buffer;
        struct rte_cryptodev *dev;
-       uint8_t cdev_id;
+       uint16_t nb = 0, nb_enqueued = 0;
        uint16_t qp;
-       uint16_t ret;
-       uint16_t num_cdev = rte_cryptodev_count();
 
-       ret = 0;
-       for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) {
-               curr_dev = &adapter->cdevs[cdev_id];
-               dev = curr_dev->dev;
-               if (dev == NULL)
-                       continue;
-               for (qp = 0; qp < dev->data->nb_queue_pairs; qp++) {
+       curr_dev = &adapter->cdevs[cdev_id];
+       if (unlikely(curr_dev == NULL))
+               return 0;
 
-                       curr_queue = &curr_dev->qpairs[qp];
-                       if (!curr_queue->qp_enabled)
-                               continue;
+       dev = rte_cryptodev_pmd_get_dev(cdev_id);
+       for (qp = 0; qp < dev->data->nb_queue_pairs; qp++) {
 
-                       op_buffer = curr_queue->op_buffer;
-                       ret = rte_cryptodev_enqueue_burst(cdev_id,
-                                                         qp,
-                                                         op_buffer,
-                                                         curr_queue->len);
-                       stats->crypto_enq_count += ret;
-
-                       while (ret < curr_queue->len) {
-                               struct rte_crypto_op *op;
-                               op = op_buffer[ret++];
-                               stats->crypto_enq_fail++;
-                               rte_pktmbuf_free(op->sym->m_src);
-                               rte_crypto_op_free(op);
-                       }
-                       curr_queue->len = 0;
-               }
+               curr_queue = &curr_dev->qpairs[qp];
+               if (unlikely(curr_queue == NULL || !curr_queue->qp_enabled))
+                       continue;
+
+               eca_circular_buffer_flush_to_cdev(&curr_queue->cbuf,
+                                                 cdev_id,
+                                                 qp,
+                                                 &nb_enqueued);
+               *nb_ops_flushed += curr_queue->cbuf.count;
+               nb += nb_enqueued;
        }
 
-       return ret;
+       return nb;
+}
+
+static unsigned int
+eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
+{
+       struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
+       uint8_t cdev_id;
+       uint16_t nb_enqueued = 0;
+       uint16_t nb_ops_flushed = 0;
+       uint16_t num_cdev = rte_cryptodev_count();
+
+       for (cdev_id = 0; cdev_id < num_cdev; cdev_id++)
+               nb_enqueued += eca_crypto_cdev_flush(adapter,
+                                                   cdev_id,
+                                                   &nb_ops_flushed);
+       /**
+        * Enable dequeue from eventdev if all ops from circular
+        * buffer flushed to cdev
+        */
+       if (!nb_ops_flushed)
+               adapter->stop_enq_to_cryptodev = false;
+
+       stats->crypto_enq_count += nb_enqueued;
+
+       return nb_enqueued;
 }
 
 static int
@@ -480,6 +590,13 @@ eca_crypto_adapter_enq_run(struct event_crypto_adapter 
*adapter,
        if (adapter->mode == RTE_EVENT_CRYPTO_ADAPTER_OP_NEW)
                return 0;
 
+       if (adapter->stop_enq_to_cryptodev) {
+               nb_enqueued += eca_crypto_enq_flush(adapter);
+
+               if (adapter->stop_enq_to_cryptodev)
+                       goto skip_event_dequeue_burst;
+       }
+
        for (nb_enq = 0; nb_enq < max_enq; nb_enq += n) {
                stats->event_poll_count++;
                n = rte_event_dequeue_burst(event_dev_id,
@@ -491,6 +608,8 @@ eca_crypto_adapter_enq_run(struct event_crypto_adapter 
*adapter,
                nb_enqueued += eca_enq_to_cryptodev(adapter, ev, n);
        }
 
+skip_event_dequeue_burst:
+
        if ((++adapter->transmit_loop_count &
                (CRYPTO_ENQ_FLUSH_THRESHOLD - 1)) == 0) {
                nb_enqueued += eca_crypto_enq_flush(adapter);
@@ -499,9 +618,9 @@ eca_crypto_adapter_enq_run(struct event_crypto_adapter 
*adapter,
        return nb_enqueued;
 }
 
-static inline void
+static inline uint16_t
 eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
-                     struct rte_crypto_op **ops, uint16_t num)
+                 struct rte_crypto_op **ops, uint16_t num)
 {
        struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
        union rte_event_crypto_metadata *m_data = NULL;
@@ -518,6 +637,8 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
        num = RTE_MIN(num, BATCH_SIZE);
        for (i = 0; i < num; i++) {
                struct rte_event *ev = &events[nb_ev++];
+
+               m_data = NULL;
                if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) {
                        m_data = rte_cryptodev_sym_session_get_user_data(
                                        ops[i]->sym->session);
@@ -548,21 +669,56 @@ eca_ops_enqueue_burst(struct event_crypto_adapter 
*adapter,
                                                  event_port_id,
                                                  &events[nb_enqueued],
                                                  nb_ev - nb_enqueued);
+
        } while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES &&
                 nb_enqueued < nb_ev);
 
-       /* Free mbufs and rte_crypto_ops for failed events */
-       for (i = nb_enqueued; i < nb_ev; i++) {
-               struct rte_crypto_op *op = events[i].event_ptr;
-               rte_pktmbuf_free(op->sym->m_src);
-               rte_crypto_op_free(op);
-       }
-
        stats->event_enq_fail_count += nb_ev - nb_enqueued;
        stats->event_enq_count += nb_enqueued;
        stats->event_enq_retry_count += retry - 1;
+
+       return nb_enqueued;
+}
+
+static int
+eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter *adapter,
+                                  struct crypto_ops_circular_buffer *bufp)
+{
+       uint16_t n = 0, nb_ops_flushed;
+       uint16_t *headp = &bufp->head;
+       uint16_t *tailp = &bufp->tail;
+       struct rte_crypto_op **ops = bufp->op_buffer;
+
+       if (*tailp > *headp)
+               n = *tailp - *headp;
+       else if (*tailp < *headp)
+               n = bufp->size - *headp;
+       else
+               return 0;  /* buffer empty */
+
+       nb_ops_flushed =  eca_ops_enqueue_burst(adapter, ops, n);
+       bufp->count -= nb_ops_flushed;
+       if (!bufp->count) {
+               *headp = 0;
+               *tailp = 0;
+               return 0;  /* buffer empty */
+       }
+
+       *headp = (*headp + nb_ops_flushed) % bufp->size;
+       return 1;
 }
 
+
+static void
+eca_ops_buffer_flush(struct event_crypto_adapter *adapter)
+{
+       if (adapter->ebuf.count == 0)
+               return;
+
+       while (eca_circular_buffer_flush_to_evdev(adapter,
+                                                 &adapter->ebuf))
+               ;
+}
 static inline unsigned int
 eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
                           unsigned int max_deq)
@@ -571,7 +727,7 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter 
*adapter,
        struct crypto_device_info *curr_dev;
        struct crypto_queue_pair_info *curr_queue;
        struct rte_crypto_op *ops[BATCH_SIZE];
-       uint16_t n, nb_deq;
+       uint16_t n, nb_deq, nb_enqueued, i;
        struct rte_cryptodev *dev;
        uint8_t cdev_id;
        uint16_t qp, dev_qps;
@@ -579,16 +735,20 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter 
*adapter,
        uint16_t num_cdev = rte_cryptodev_count();
 
        nb_deq = 0;
+       eca_ops_buffer_flush(adapter);
+
        do {
-               uint16_t queues = 0;
                done = true;
 
                for (cdev_id = adapter->next_cdev_id;
                        cdev_id < num_cdev; cdev_id++) {
+                       uint16_t queues = 0;
+
                        curr_dev = &adapter->cdevs[cdev_id];
                        dev = curr_dev->dev;
                        if (dev == NULL)
                                continue;
+
                        dev_qps = dev->data->nb_queue_pairs;
 
                        for (qp = curr_dev->next_queue_pair_id;
@@ -596,7 +756,8 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter 
*adapter,
                                queues++) {
 
                                curr_queue = &curr_dev->qpairs[qp];
-                               if (!curr_queue->qp_enabled)
+                               if (curr_queue == NULL ||
+                                   !curr_queue->qp_enabled)
                                        continue;
 
                                n = rte_cryptodev_dequeue_burst(cdev_id, qp,
@@ -605,11 +766,27 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter 
*adapter,
                                        continue;
 
                                done = false;
+                               nb_enqueued = 0;
+
                                stats->crypto_deq_count += n;
-                               eca_ops_enqueue_burst(adapter, ops, n);
+
+                               if (unlikely(!adapter->ebuf.count))
+                                       nb_enqueued = eca_ops_enqueue_burst(
+                                                       adapter, ops, n);
+
+                               if (nb_enqueued == n)
+                                       goto check;
+
+                               /* Failed to enqueue events case */
+                               for (i = nb_enqueued; i < n; i++)
+                                       eca_circular_buffer_add(
+                                               &adapter->ebuf,
+                                               ops[nb_enqueued]);
+
+check:
                                nb_deq += n;
 
-                               if (nb_deq > max_deq) {
+                               if (nb_deq >= max_deq) {
                                        if ((qp + 1) == dev_qps) {
                                                adapter->next_cdev_id =
                                                        (cdev_id + 1)
@@ -622,6 +799,7 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter 
*adapter,
                                }
                        }
                }
+               adapter->next_cdev_id = 0;
        } while (done == false);
        return nb_deq;
 }
@@ -751,11 +929,12 @@ eca_add_queue_pair(struct event_crypto_adapter *adapter, 
uint8_t cdev_id,
                        return -ENOMEM;
 
                qpairs = dev_info->qpairs;
-               qpairs->op_buffer = rte_zmalloc_socket(adapter->mem_name,
-                                       BATCH_SIZE *
-                                       sizeof(struct rte_crypto_op *),
-                                       0, adapter->socket_id);
-               if (!qpairs->op_buffer) {
+
+               if (eca_circular_buffer_init("eca_cdev_circular_buffer",
+                                            &qpairs->cbuf,
+                                            CRYPTO_ADAPTER_OPS_BUFFER_SZ)) {
+                       RTE_EDEV_LOG_ERR("Failed to get memory for cryptodev "
+                                        "buffer");
                        rte_free(qpairs);
                        return -ENOMEM;
                }
-- 
2.6.4

Reply via email to