Hi Ganapati,

> -----Original Message-----
> From: Kundapura, Ganapati <ganapati.kundap...@intel.com>
> Sent: Tuesday, January 11, 2022 4:07 PM
> To: jerinjac...@gmail.com; Jayatheerthan, Jay <jay.jayatheert...@intel.com>;
> dev@dpdk.org
> Cc: Gujjar, Abhinandan S <abhinandan.guj...@intel.com>
> Subject: [PATCH v3 1/2] eventdev/crypto_adapter: move crypto ops to circular
> buffer
> 
> Move crypto ops to circular buffer to retain crypto ops when
> cryptodev/eventdev are temporarily full
> 
> ---
> v3:
> * update eca_ops_buffer_flush() to flush out all the crypto
>   ops out of circular buffer.
> * remove freeing of failed crypto ops from eca_ops_enqueue_burst()
>   and add to cirular buffer for later processing.
> 
> v2:
> * reset cryptp adapter next cdev id before dequeueing from the
Cryptp -> crypto
>   next cdev
> ---
> 
> Signed-off-by: Ganapati Kundapura <ganapati.kundap...@intel.com>
I don't see sign off after applying the patch. Could you take a look?
commit 18d9c3b1728f325dba5fe5dbb51df2366b70527a
Author: Ganapati Kundapura <ganapati.kundap...@intel.com>
Date:   Tue Jan 11 04:36:30 2022 -0600

    eventdev/crypto_adapter: move crypto ops to circular buffer
    
    Move crypto ops to circular buffer to retain crypto
    ops when cryptodev/eventdev are temporarily full


> 
> diff --git a/lib/eventdev/rte_event_crypto_adapter.c
> b/lib/eventdev/rte_event_crypto_adapter.c
> index d840803..9086368 100644
> --- a/lib/eventdev/rte_event_crypto_adapter.c
> +++ b/lib/eventdev/rte_event_crypto_adapter.c
> @@ -25,11 +25,27 @@
>  #define CRYPTO_ADAPTER_MEM_NAME_LEN 32
>  #define CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100
> 
> +#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE)
> #define
> +CRYPTO_ADAPTER_BUFFER_SZ 1024
> +
>  /* Flush an instance's enqueue buffers every CRYPTO_ENQ_FLUSH_THRESHOLD
>   * iterations of eca_crypto_adapter_enq_run()
>   */
>  #define CRYPTO_ENQ_FLUSH_THRESHOLD 1024
> 
> +struct crypto_ops_circular_buffer {
> +     /* index of head element in circular buffer */
> +     uint16_t head;
> +     /* index of tail element in circular buffer */
> +     uint16_t tail;
> +     /* number elements in buffer */
number elements -> number of elements 
> +     uint16_t count;
> +     /* size of circular buffer */
> +     uint16_t size;
> +     /* Pointer to hold rte_crypto_ops for batching */
> +     struct rte_crypto_op **op_buffer;
> +} __rte_cache_aligned;
> +
>  struct event_crypto_adapter {
>       /* Event device identifier */
>       uint8_t eventdev_id;
> @@ -47,6 +63,8 @@ struct event_crypto_adapter {
>       struct crypto_device_info *cdevs;
>       /* Loop counter to flush crypto ops */
>       uint16_t transmit_loop_count;
> +     /* Circular buffer for batching crypto ops to eventdev */
> +     struct crypto_ops_circular_buffer ebuf;
>       /* Per instance stats structure */
>       struct rte_event_crypto_adapter_stats crypto_stats;
>       /* Configuration callback for rte_service configuration */ @@ -93,8
> +111,8 @@ struct crypto_device_info {  struct crypto_queue_pair_info {
>       /* Set to indicate queue pair is enabled */
>       bool qp_enabled;
> -     /* Pointer to hold rte_crypto_ops for batching */
> -     struct rte_crypto_op **op_buffer;
> +     /* Circular buffer for batching crypto ops to cdev */
> +     struct crypto_ops_circular_buffer cbuf;
>       /* No of crypto ops accumulated */
>       uint8_t len;
>  } __rte_cache_aligned;
> @@ -141,6 +159,77 @@ eca_init(void)
>       return 0;
>  }
> 
> +static inline bool
> +eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer
> +*bufp) {
> +     return bufp->count >= BATCH_SIZE;
> +}
> +
> +static inline void
> +eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp) {
> +     rte_free(bufp->op_buffer);
> +}
> +
> +static inline int
> +eca_circular_buffer_init(const char *name,
> +                      struct crypto_ops_circular_buffer *bufp,
> +                      uint16_t sz)
> +{
> +     bufp->op_buffer = rte_zmalloc(name,
> +                                   sizeof(struct rte_crypto_op *) * sz,
> +                                   0);
> +     if (bufp->op_buffer == NULL)
> +             return -ENOMEM;
> +
> +     bufp->size = sz;
> +     return 0;
> +}
> +
> +static inline int
> +eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp,
> +                     struct rte_crypto_op *op)
> +{
> +     uint16_t *tailp = &bufp->tail;
> +
> +     bufp->op_buffer[*tailp] = op;
> +     *tailp = (*tailp + 1) % bufp->size;
Provide a comment saying you are taking care of buffer rollover condition
> +     bufp->count++;
> +
> +     return 0;
> +}
> +
> +static inline int
> +eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer *bufp,
> +                               uint8_t cdev_id, uint16_t qp_id,
> +                               uint16_t *nb_ops_flushed)
This function is returning a value but caller does not check!
> +{
> +     uint16_t n = 0;
> +     uint16_t *headp = &bufp->head;
> +     uint16_t *tailp = &bufp->tail;
> +     struct rte_crypto_op **ops = bufp->op_buffer;
> +
> +     if (*tailp > *headp)
> +             n = *tailp - *headp;
> +     else if (*tailp < *headp)
> +             n = bufp->size - *headp;
> +     else {
> +             *nb_ops_flushed = 0;
> +             return 0;  /* buffer empty */
> +     }
> +
> +     *nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id,
> +                                                   &ops[*headp], n);
> +     bufp->count -= *nb_ops_flushed;
> +     if (!bufp->count) {
> +             *headp = 0;
> +             *tailp = 0;
> +     } else
> +             *headp = (*headp + *nb_ops_flushed) % bufp->size;
> +
> +     return *nb_ops_flushed == n ? 0 : -1;
> +}
> +
>  static inline struct event_crypto_adapter *  eca_id_to_adapter(uint8_t id)  {
> @@ -237,10 +326,19 @@ rte_event_crypto_adapter_create_ext(uint8_t id,
> uint8_t dev_id,
>               return -ENOMEM;
>       }
> 
> +     if (eca_circular_buffer_init("eca_edev_circular_buffer",
> +                                  &adapter->ebuf,
> +                                  CRYPTO_ADAPTER_BUFFER_SZ)) {
> +             RTE_EDEV_LOG_ERR("Failed to get mem for edev buffer");
Mem -> memory, edev -> eventdev. 
> +             rte_free(adapter);
> +             return -ENOMEM;
> +     }
> +
>       ret = rte_event_dev_info_get(dev_id, &dev_info);
>       if (ret < 0) {
>               RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d: %s!",
>                                dev_id, dev_info.driver_name);
> +             eca_circular_buffer_free(&adapter->ebuf);
>               rte_free(adapter);
>               return ret;
>       }
> @@ -259,6 +357,7 @@ rte_event_crypto_adapter_create_ext(uint8_t id,
> uint8_t dev_id,
>                                       socket_id);
>       if (adapter->cdevs == NULL) {
>               RTE_EDEV_LOG_ERR("Failed to get mem for crypto devices\n");
> +             eca_circular_buffer_free(&adapter->ebuf);
>               rte_free(adapter);
>               return -ENOMEM;
>       }
> @@ -337,11 +436,10 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
>       struct crypto_queue_pair_info *qp_info = NULL;
>       struct rte_crypto_op *crypto_op;
>       unsigned int i, n;
> -     uint16_t qp_id, len, ret;
> +     uint16_t qp_id, len, nb_enqueued = 0;
>       uint8_t cdev_id;
> 
>       len = 0;
> -     ret = 0;
>       n = 0;
>       stats->event_deq_count += cnt;
> 
> @@ -367,7 +465,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
>                               continue;
>                       }
>                       len = qp_info->len;
> -                     qp_info->op_buffer[len] = crypto_op;
> +                     eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
>                       len++;
>               } else if (crypto_op->sess_type ==
> RTE_CRYPTO_OP_SESSIONLESS &&
>                               crypto_op->private_data_offset) {
> @@ -383,7 +481,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
>                               continue;
>                       }
>                       len = qp_info->len;
> -                     qp_info->op_buffer[len] = crypto_op;
> +                     eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
>                       len++;
>               } else {
>                       rte_pktmbuf_free(crypto_op->sym->m_src);
> @@ -391,18 +489,17 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
>                       continue;
>               }
> 
> -             if (len == BATCH_SIZE) {
> -                     struct rte_crypto_op **op_buffer = qp_info-
> >op_buffer;
> -                     ret = rte_cryptodev_enqueue_burst(cdev_id,
> +             if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) {
> +                     eca_circular_buffer_flush_to_cdev(&qp_info->cbuf,
> +                                                       cdev_id,
>                                                         qp_id,
> -                                                       op_buffer,
> -                                                       BATCH_SIZE);
> +                                                       &nb_enqueued);
> +                     stats->crypto_enq_count += nb_enqueued;
> +                     n += nb_enqueued;
> 
> -                     stats->crypto_enq_count += ret;
> -
> -                     while (ret < len) {
> +                     while (nb_enqueued < len) {
>                               struct rte_crypto_op *op;
> -                             op = op_buffer[ret++];
> +                             op = qp_info-
> >cbuf.op_buffer[nb_enqueued++];
>                               stats->crypto_enq_fail++;
>                               rte_pktmbuf_free(op->sym->m_src);
>                               rte_crypto_op_free(op);
Not sure, why are you free the ops which are not enqueued to cryptodev.
Isn't it the goal of the patch is to retain those non-enqueued crypto_ops 
temporarily and retry later?

> @@ -413,7 +510,6 @@ eca_enq_to_cryptodev(struct event_crypto_adapter
> *adapter, struct rte_event *ev,
> 
>               if (qp_info)
>                       qp_info->len = len;
> -             n += ret;
>       }
> 
>       return n;
> @@ -425,14 +521,12 @@ eca_crypto_enq_flush(struct event_crypto_adapter
> *adapter)
>       struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
>       struct crypto_device_info *curr_dev;
>       struct crypto_queue_pair_info *curr_queue;
> -     struct rte_crypto_op **op_buffer;
>       struct rte_cryptodev *dev;
>       uint8_t cdev_id;
>       uint16_t qp;
> -     uint16_t ret;
> +     uint16_t nb_enqueued = 0, nb = 0;
>       uint16_t num_cdev = rte_cryptodev_count();
> 
> -     ret = 0;
>       for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) {
>               curr_dev = &adapter->cdevs[cdev_id];
>               dev = curr_dev->dev;
> @@ -444,16 +538,17 @@ eca_crypto_enq_flush(struct event_crypto_adapter
> *adapter)
>                       if (!curr_queue->qp_enabled)
>                               continue;
> 
> -                     op_buffer = curr_queue->op_buffer;
> -                     ret = rte_cryptodev_enqueue_burst(cdev_id,
> +                     eca_circular_buffer_flush_to_cdev(&curr_queue->cbuf,
> +                                                       cdev_id,
>                                                         qp,
> -                                                       op_buffer,
> -                                                       curr_queue->len);
> -                     stats->crypto_enq_count += ret;
> +                                                       &nb_enqueued);
> 
> -                     while (ret < curr_queue->len) {
> +                     stats->crypto_enq_count += nb_enqueued;
> +                     nb += nb_enqueued;
> +
> +                     while (nb_enqueued < curr_queue->len) {
>                               struct rte_crypto_op *op;
> -                             op = op_buffer[ret++];
> +                             op = curr_queue-
> >cbuf.op_buffer[nb_enqueued++];
>                               stats->crypto_enq_fail++;
>                               rte_pktmbuf_free(op->sym->m_src);
>                               rte_crypto_op_free(op);
> @@ -462,7 +557,7 @@ eca_crypto_enq_flush(struct event_crypto_adapter
> *adapter)
>               }
>       }
> 
> -     return ret;
> +     return nb;
>  }
> 
>  static int
> @@ -499,9 +594,9 @@ eca_crypto_adapter_enq_run(struct
> event_crypto_adapter *adapter,
>       return nb_enqueued;
>  }
> 
> -static inline void
> +static inline uint16_t
>  eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
> -                   struct rte_crypto_op **ops, uint16_t num)
> +               struct rte_crypto_op **ops, uint16_t num)
>  {
>       struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
>       union rte_event_crypto_metadata *m_data = NULL; @@ -518,6 +613,8
> @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
>       num = RTE_MIN(num, BATCH_SIZE);
>       for (i = 0; i < num; i++) {
>               struct rte_event *ev = &events[nb_ev++];
> +
> +             m_data = NULL;
>               if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) {
>                       m_data = rte_cryptodev_sym_session_get_user_data(
>                                       ops[i]->sym->session);
> @@ -548,21 +645,58 @@ eca_ops_enqueue_burst(struct event_crypto_adapter
> *adapter,
>                                                 event_port_id,
>                                                 &events[nb_enqueued],
>                                                 nb_ev - nb_enqueued);
> +
>       } while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES &&
>                nb_enqueued < nb_ev);
> 
> -     /* Free mbufs and rte_crypto_ops for failed events */
> -     for (i = nb_enqueued; i < nb_ev; i++) {
> -             struct rte_crypto_op *op = events[i].event_ptr;
> -             rte_pktmbuf_free(op->sym->m_src);
> -             rte_crypto_op_free(op);
> -     }
> -
>       stats->event_enq_fail_count += nb_ev - nb_enqueued;
>       stats->event_enq_count += nb_enqueued;
>       stats->event_enq_retry_count += retry - 1;
> +
> +     return nb_enqueued;
>  }
> 
> +static int
> +eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter *adapter,
> +                                struct crypto_ops_circular_buffer *bufp) {
> +     uint16_t n = 0, nb_ops_flushed;
> +     uint16_t *headp = &bufp->head;
> +     uint16_t *tailp = &bufp->tail;
> +     struct rte_crypto_op **ops = bufp->op_buffer;
> +
> +     if (*tailp > *headp)
> +             n = *tailp - *headp;
> +     else if (*tailp < *headp)
> +             n = bufp->size - *headp;
> +     else
> +             return 0;  /* buffer empty */
> +
> +     nb_ops_flushed =  eca_ops_enqueue_burst(adapter, ops, n);
> +     bufp->count -= nb_ops_flushed;
> +     if (!bufp->count) {
> +             *headp = 0;
> +             *tailp = 0;
> +
Extra line not required
> +             return 0;  /* buffer empty */
> +     }
> +
> +     *headp = (*headp + nb_ops_flushed) % bufp->size;
> +
Extra line not required
> +     return 1;
> +}
> +
> +
> +static void
> +eca_ops_buffer_flush(struct event_crypto_adapter *adapter) {
> +     if (adapter->ebuf.count == 0)
> +             return;
> +
> +     while (eca_circular_buffer_flush_to_evdev(adapter,
> +                                               &adapter->ebuf))
> +             ;
> +}
>  static inline unsigned int
>  eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
>                          unsigned int max_deq)
> @@ -571,7 +705,7 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
>       struct crypto_device_info *curr_dev;
>       struct crypto_queue_pair_info *curr_queue;
>       struct rte_crypto_op *ops[BATCH_SIZE];
> -     uint16_t n, nb_deq;
> +     uint16_t n, nb_deq, nb_enqueued, i;
>       struct rte_cryptodev *dev;
>       uint8_t cdev_id;
>       uint16_t qp, dev_qps;
> @@ -579,16 +713,20 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
>       uint16_t num_cdev = rte_cryptodev_count();
> 
>       nb_deq = 0;
> +     eca_ops_buffer_flush(adapter);
> +
>       do {
> -             uint16_t queues = 0;
>               done = true;
> 
>               for (cdev_id = adapter->next_cdev_id;
>                       cdev_id < num_cdev; cdev_id++) {
> +                     uint16_t queues = 0;
> +
>                       curr_dev = &adapter->cdevs[cdev_id];
>                       dev = curr_dev->dev;
>                       if (dev == NULL)
>                               continue;
> +
>                       dev_qps = dev->data->nb_queue_pairs;
> 
>                       for (qp = curr_dev->next_queue_pair_id; @@ -596,7
> +734,8 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter
> *adapter,
>                               queues++) {
> 
>                               curr_queue = &curr_dev->qpairs[qp];
> -                             if (!curr_queue->qp_enabled)
> +                             if (curr_queue == NULL ||
> +                                 !curr_queue->qp_enabled)
>                                       continue;
> 
>                               n = rte_cryptodev_dequeue_burst(cdev_id, qp,
> @@ -605,11 +744,27 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
>                                       continue;
> 
>                               done = false;
> +                             nb_enqueued = 0;
> +
>                               stats->crypto_deq_count += n;
> -                             eca_ops_enqueue_burst(adapter, ops, n);
> +
> +                             if (unlikely(!adapter->ebuf.count))
> +                                     nb_enqueued =
> eca_ops_enqueue_burst(
> +                                                     adapter, ops, n);
> +
> +                             if (nb_enqueued == n)
> +                                     goto check;
> +
> +                             /* Failed to enqueue events case */
> +                             for (i = nb_enqueued; i < n; i++)
> +                                     eca_circular_buffer_add(
> +                                             &adapter->ebuf,
> +                                             ops[nb_enqueued]);
> +
> +check:
>                               nb_deq += n;
> 
> -                             if (nb_deq > max_deq) {
> +                             if (nb_deq >= max_deq) {
>                                       if ((qp + 1) == dev_qps) {
>                                               adapter->next_cdev_id =
>                                                       (cdev_id + 1)
> @@ -622,6 +777,7 @@ eca_crypto_adapter_deq_run(struct
> event_crypto_adapter *adapter,
>                               }
>                       }
>               }
> +             adapter->next_cdev_id = 0;
>       } while (done == false);
>       return nb_deq;
>  }
> @@ -751,11 +907,10 @@ eca_add_queue_pair(struct event_crypto_adapter
> *adapter, uint8_t cdev_id,
>                       return -ENOMEM;
> 
>               qpairs = dev_info->qpairs;
> -             qpairs->op_buffer = rte_zmalloc_socket(adapter->mem_name,
> -                                     BATCH_SIZE *
> -                                     sizeof(struct rte_crypto_op *),
> -                                     0, adapter->socket_id);
> -             if (!qpairs->op_buffer) {
> +
> +             if (eca_circular_buffer_init("eca_cdev_circular_buffer",
> +                                          &qpairs->cbuf,
> +
> CRYPTO_ADAPTER_OPS_BUFFER_SZ)) {
>                       rte_free(qpairs);
>                       return -ENOMEM;
>               }
> --
> 2.6.4

Reply via email to