Hi Ganapati, It looks good to me. Some minor comments inline.
> -----Original Message----- > From: Kundapura, Ganapati <ganapati.kundap...@intel.com> > Sent: Monday, February 7, 2022 4:21 PM > To: Jayatheerthan, Jay <jay.jayatheert...@intel.com>; > jerinjac...@gmail.com; Gujjar, Abhinandan S > <abhinandan.guj...@intel.com>; dev@dpdk.org > Subject: [PATCH v4 1/2] eventdev/crypto_adapter: move crypto ops to > circular buffer > > Move crypto ops to circular buffer to retain crypto ops when > cryptodev/eventdev are temporarily full > > Signed-off-by: Ganapati Kundapura <ganapati.kundap...@intel.com> > > --- > v4: > * Retain the non enqueued crypto ops in circular buffer to > process later and stop the dequeue from eventdev till > all the crypto ops are enqueued to cryptodev > > check space in circular buffer and stop dequeue from > eventdev if some ops failed to flush to cdev > and no space for another batch is available in circular buffer > > Enable dequeue from eventdev after all the ops are flushed > > v3: > * update eca_ops_buffer_flush() to flush out all the crypto > ops out of circular buffer. > * remove freeing of failed crypto ops from eca_ops_enqueue_burst() > and add to cirular buffer for later processing. > > v2: > * reset crypto adapter next cdev id before dequeueing from the > next cdev > --- > > diff --git a/lib/eventdev/rte_event_crypto_adapter.c > b/lib/eventdev/rte_event_crypto_adapter.c > index d840803..0faac36 100644 > --- a/lib/eventdev/rte_event_crypto_adapter.c > +++ b/lib/eventdev/rte_event_crypto_adapter.c > @@ -25,11 +25,27 @@ > #define CRYPTO_ADAPTER_MEM_NAME_LEN 32 > #define CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100 > > +#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE) > #define > +CRYPTO_ADAPTER_BUFFER_SZ 1024 > + > /* Flush an instance's enqueue buffers every > CRYPTO_ENQ_FLUSH_THRESHOLD > * iterations of eca_crypto_adapter_enq_run() > */ > #define CRYPTO_ENQ_FLUSH_THRESHOLD 1024 > > +struct crypto_ops_circular_buffer { > + /* index of head element in circular buffer */ > + uint16_t head; > + /* index of tail element in circular buffer */ > + uint16_t tail; > + /* number of elements in buffer */ > + uint16_t count; > + /* size of circular buffer */ > + uint16_t size; > + /* Pointer to hold rte_crypto_ops for batching */ > + struct rte_crypto_op **op_buffer; > +} __rte_cache_aligned; > + > struct event_crypto_adapter { > /* Event device identifier */ > uint8_t eventdev_id; > @@ -37,6 +53,10 @@ struct event_crypto_adapter { > uint8_t event_port_id; > /* Store event device's implicit release capability */ > uint8_t implicit_release_disabled; > + /* Flag to indicate backpressure at cryptodev > + * Stop further dequeuing events from eventdev > + */ > + bool stop_enq_to_cryptodev; > /* Max crypto ops processed in any service function invocation */ > uint32_t max_nb; > /* Lock to serialize config updates with service function */ @@ -47,6 > +67,8 @@ struct event_crypto_adapter { > struct crypto_device_info *cdevs; > /* Loop counter to flush crypto ops */ > uint16_t transmit_loop_count; > + /* Circular buffer for batching crypto ops to eventdev */ > + struct crypto_ops_circular_buffer ebuf; > /* Per instance stats structure */ > struct rte_event_crypto_adapter_stats crypto_stats; > /* Configuration callback for rte_service configuration */ @@ -93,10 > +115,8 @@ struct crypto_device_info { struct crypto_queue_pair_info { > /* Set to indicate queue pair is enabled */ > bool qp_enabled; > - /* Pointer to hold rte_crypto_ops for batching */ > - struct rte_crypto_op **op_buffer; > - /* No of crypto ops accumulated */ > - uint8_t len; > + /* Circular buffer for batching crypto ops to cdev */ > + struct crypto_ops_circular_buffer cbuf; > } __rte_cache_aligned; > > static struct event_crypto_adapter **event_crypto_adapter; @@ -141,6 > +161,84 @@ eca_init(void) > return 0; > } > > +static inline bool > +eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer > +*bufp) { > + return bufp->count >= BATCH_SIZE; > +} > + > +static inline bool > +eca_circular_buffer_space_for_batch(struct crypto_ops_circular_buffer > +*bufp) { > + return (bufp->size - bufp->count) >= BATCH_SIZE; } > + > +static inline void > +eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp) { > + rte_free(bufp->op_buffer); > +} > + > +static inline int > +eca_circular_buffer_init(const char *name, > + struct crypto_ops_circular_buffer *bufp, > + uint16_t sz) > +{ > + bufp->op_buffer = rte_zmalloc(name, > + sizeof(struct rte_crypto_op *) * sz, > + 0); > + if (bufp->op_buffer == NULL) > + return -ENOMEM; > + > + bufp->size = sz; > + return 0; > +} > + > +static inline int > +eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp, > + struct rte_crypto_op *op) > +{ > + uint16_t *tailp = &bufp->tail; > + > + bufp->op_buffer[*tailp] = op; > + /* circular buffer, go round */ > + *tailp = (*tailp + 1) % bufp->size; > + bufp->count++; > + > + return 0; > +} > + > +static inline int > +eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer > *bufp, > + uint8_t cdev_id, uint16_t qp_id, > + uint16_t *nb_ops_flushed) > +{ > + uint16_t n = 0; > + uint16_t *headp = &bufp->head; > + uint16_t *tailp = &bufp->tail; > + struct rte_crypto_op **ops = bufp->op_buffer; > + > + if (*tailp > *headp) > + n = *tailp - *headp; > + else if (*tailp < *headp) > + n = bufp->size - *headp; > + else { > + *nb_ops_flushed = 0; > + return 0; /* buffer empty */ > + } > + > + *nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id, > + &ops[*headp], n); > + bufp->count -= *nb_ops_flushed; > + if (!bufp->count) { > + *headp = 0; > + *tailp = 0; > + } else > + *headp = (*headp + *nb_ops_flushed) % bufp->size; > + > + return *nb_ops_flushed == n ? 0 : -1; > +} > + > static inline struct event_crypto_adapter * eca_id_to_adapter(uint8_t id) { > @@ -237,10 +335,19 @@ rte_event_crypto_adapter_create_ext(uint8_t id, > uint8_t dev_id, > return -ENOMEM; > } > > + if (eca_circular_buffer_init("eca_edev_circular_buffer", > + &adapter->ebuf, > + CRYPTO_ADAPTER_BUFFER_SZ)) { > + RTE_EDEV_LOG_ERR("Failed to get memory for eventdev > buffer"); > + rte_free(adapter); > + return -ENOMEM; > + } > + > ret = rte_event_dev_info_get(dev_id, &dev_info); > if (ret < 0) { > RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d: > %s!", > dev_id, dev_info.driver_name); > + eca_circular_buffer_free(&adapter->ebuf); > rte_free(adapter); > return ret; > } > @@ -259,6 +366,7 @@ rte_event_crypto_adapter_create_ext(uint8_t id, > uint8_t dev_id, > socket_id); > if (adapter->cdevs == NULL) { > RTE_EDEV_LOG_ERR("Failed to get mem for crypto > devices\n"); > + eca_circular_buffer_free(&adapter->ebuf); > rte_free(adapter); > return -ENOMEM; > } > @@ -337,10 +445,10 @@ eca_enq_to_cryptodev(struct > event_crypto_adapter *adapter, struct rte_event *ev, > struct crypto_queue_pair_info *qp_info = NULL; > struct rte_crypto_op *crypto_op; > unsigned int i, n; > - uint16_t qp_id, len, ret; > + uint16_t qp_id, nb_enqueued = 0; > uint8_t cdev_id; > + int ret; > > - len = 0; > ret = 0; > n = 0; > stats->event_deq_count += cnt; > @@ -366,9 +474,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter > *adapter, struct rte_event *ev, > rte_crypto_op_free(crypto_op); > continue; > } > - len = qp_info->len; > - qp_info->op_buffer[len] = crypto_op; > - len++; > + eca_circular_buffer_add(&qp_info->cbuf, > crypto_op); > } else if (crypto_op->sess_type == > RTE_CRYPTO_OP_SESSIONLESS && > crypto_op->private_data_offset) { > m_data = (union rte_event_crypto_metadata *) @@ > -382,87 +488,91 @@ eca_enq_to_cryptodev(struct event_crypto_adapter > *adapter, struct rte_event *ev, > rte_crypto_op_free(crypto_op); > continue; > } > - len = qp_info->len; > - qp_info->op_buffer[len] = crypto_op; > - len++; > + eca_circular_buffer_add(&qp_info->cbuf, > crypto_op); > } else { > rte_pktmbuf_free(crypto_op->sym->m_src); > rte_crypto_op_free(crypto_op); > continue; > } > > - if (len == BATCH_SIZE) { > - struct rte_crypto_op **op_buffer = qp_info- > >op_buffer; > - ret = rte_cryptodev_enqueue_burst(cdev_id, > - qp_id, > - op_buffer, > - BATCH_SIZE); > - > - stats->crypto_enq_count += ret; > - > - while (ret < len) { > - struct rte_crypto_op *op; > - op = op_buffer[ret++]; > - stats->crypto_enq_fail++; > - rte_pktmbuf_free(op->sym->m_src); > - rte_crypto_op_free(op); > - } > - > - len = 0; > + if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) { > + ret = eca_circular_buffer_flush_to_cdev(&qp_info- > >cbuf, > + cdev_id, > + qp_id, > + > &nb_enqueued); > + /** > + * If some crypto ops failed to flush to cdev and > + * space for another batch is not available, stop > + * dequeue from eventdev momentarily > + */ > + if (unlikely(ret < 0 && > + !eca_circular_buffer_space_for_batch( > + &qp_info->cbuf))) > + adapter->stop_enq_to_cryptodev = true; > } > > - if (qp_info) > - qp_info->len = len; > - n += ret; > + stats->crypto_enq_count += nb_enqueued; > + n += nb_enqueued; > } > > return n; > } > > static unsigned int > -eca_crypto_enq_flush(struct event_crypto_adapter *adapter) > +eca_crypto_cdev_flush(struct event_crypto_adapter *adapter, > + uint8_t cdev_id, uint16_t *nb_ops_flushed) > { > - struct rte_event_crypto_adapter_stats *stats = &adapter- > >crypto_stats; > struct crypto_device_info *curr_dev; > struct crypto_queue_pair_info *curr_queue; > - struct rte_crypto_op **op_buffer; > struct rte_cryptodev *dev; > - uint8_t cdev_id; > + uint16_t nb = 0, nb_enqueued = 0; > uint16_t qp; > - uint16_t ret; > - uint16_t num_cdev = rte_cryptodev_count(); > > - ret = 0; > - for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) { > - curr_dev = &adapter->cdevs[cdev_id]; > - dev = curr_dev->dev; > - if (dev == NULL) > - continue; > - for (qp = 0; qp < dev->data->nb_queue_pairs; qp++) { > + curr_dev = &adapter->cdevs[cdev_id]; > + if (unlikely(curr_dev == NULL)) > + return 0; > > - curr_queue = &curr_dev->qpairs[qp]; > - if (!curr_queue->qp_enabled) > - continue; > + dev = rte_cryptodev_pmd_get_dev(cdev_id); > + for (qp = 0; qp < dev->data->nb_queue_pairs; qp++) { > > - op_buffer = curr_queue->op_buffer; > - ret = rte_cryptodev_enqueue_burst(cdev_id, > - qp, > - op_buffer, > - curr_queue->len); > - stats->crypto_enq_count += ret; > - > - while (ret < curr_queue->len) { > - struct rte_crypto_op *op; > - op = op_buffer[ret++]; > - stats->crypto_enq_fail++; > - rte_pktmbuf_free(op->sym->m_src); > - rte_crypto_op_free(op); > - } > - curr_queue->len = 0; > - } > + curr_queue = &curr_dev->qpairs[qp]; > + if (unlikely(curr_queue == NULL || !curr_queue- > >qp_enabled)) > + continue; > + > + eca_circular_buffer_flush_to_cdev(&curr_queue->cbuf, > + cdev_id, > + qp, > + &nb_enqueued); > + *nb_ops_flushed += curr_queue->cbuf.count; > + nb += nb_enqueued; > } > > - return ret; > + return nb; > +} > + > +static unsigned int > +eca_crypto_enq_flush(struct event_crypto_adapter *adapter) { > + struct rte_event_crypto_adapter_stats *stats = &adapter- > >crypto_stats; > + uint8_t cdev_id; > + uint16_t nb_enqueued = 0; > + uint16_t nb_ops_flushed = 0; > + uint16_t num_cdev = rte_cryptodev_count(); > + > + for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) > + nb_enqueued += eca_crypto_cdev_flush(adapter, > + cdev_id, > + &nb_ops_flushed); > + /** > + * Enable dequeue from eventdev if all ops from circular > + * buffer flushed to cdev > + */ > + if (!nb_ops_flushed) > + adapter->stop_enq_to_cryptodev = false; > + > + stats->crypto_enq_count += nb_enqueued; > + > + return nb_enqueued; > } > > static int > @@ -480,6 +590,13 @@ eca_crypto_adapter_enq_run(struct > event_crypto_adapter *adapter, > if (adapter->mode == RTE_EVENT_CRYPTO_ADAPTER_OP_NEW) > return 0; > > + if (adapter->stop_enq_to_cryptodev) { Add unlikely here > + nb_enqueued += eca_crypto_enq_flush(adapter); > + > + if (adapter->stop_enq_to_cryptodev) Add unlikely here > + goto skip_event_dequeue_burst; > + } > + > for (nb_enq = 0; nb_enq < max_enq; nb_enq += n) { > stats->event_poll_count++; > n = rte_event_dequeue_burst(event_dev_id, > @@ -491,6 +608,8 @@ eca_crypto_adapter_enq_run(struct > event_crypto_adapter *adapter, > nb_enqueued += eca_enq_to_cryptodev(adapter, ev, n); > } > > +skip_event_dequeue_burst: > + > if ((++adapter->transmit_loop_count & > (CRYPTO_ENQ_FLUSH_THRESHOLD - 1)) == 0) { > nb_enqueued += eca_crypto_enq_flush(adapter); @@ - > 499,9 +618,9 @@ eca_crypto_adapter_enq_run(struct > event_crypto_adapter *adapter, > return nb_enqueued; > } > > -static inline void > +static inline uint16_t > eca_ops_enqueue_burst(struct event_crypto_adapter *adapter, > - struct rte_crypto_op **ops, uint16_t num) > + struct rte_crypto_op **ops, uint16_t num) > { > struct rte_event_crypto_adapter_stats *stats = &adapter- > >crypto_stats; > union rte_event_crypto_metadata *m_data = NULL; @@ -518,6 > +637,8 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter, > num = RTE_MIN(num, BATCH_SIZE); > for (i = 0; i < num; i++) { > struct rte_event *ev = &events[nb_ev++]; > + > + m_data = NULL; > if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) { > m_data = > rte_cryptodev_sym_session_get_user_data( > ops[i]->sym->session); > @@ -548,21 +669,56 @@ eca_ops_enqueue_burst(struct > event_crypto_adapter *adapter, > event_port_id, > &events[nb_enqueued], > nb_ev - nb_enqueued); > + > } while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES && > nb_enqueued < nb_ev); > > - /* Free mbufs and rte_crypto_ops for failed events */ > - for (i = nb_enqueued; i < nb_ev; i++) { > - struct rte_crypto_op *op = events[i].event_ptr; > - rte_pktmbuf_free(op->sym->m_src); > - rte_crypto_op_free(op); > - } > - > stats->event_enq_fail_count += nb_ev - nb_enqueued; > stats->event_enq_count += nb_enqueued; > stats->event_enq_retry_count += retry - 1; > + > + return nb_enqueued; > +} > + > +static int > +eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter > *adapter, > + struct crypto_ops_circular_buffer *bufp) { > + uint16_t n = 0, nb_ops_flushed; > + uint16_t *headp = &bufp->head; > + uint16_t *tailp = &bufp->tail; > + struct rte_crypto_op **ops = bufp->op_buffer; > + > + if (*tailp > *headp) > + n = *tailp - *headp; > + else if (*tailp < *headp) > + n = bufp->size - *headp; > + else > + return 0; /* buffer empty */ > + > + nb_ops_flushed = eca_ops_enqueue_burst(adapter, ops, n); > + bufp->count -= nb_ops_flushed; > + if (!bufp->count) { > + *headp = 0; > + *tailp = 0; > + return 0; /* buffer empty */ > + } > + > + *headp = (*headp + nb_ops_flushed) % bufp->size; > + return 1; > } > > + > +static void > +eca_ops_buffer_flush(struct event_crypto_adapter *adapter) { > + if (adapter->ebuf.count == 0) Add likely here > + return; > + > + while (eca_circular_buffer_flush_to_evdev(adapter, > + &adapter->ebuf)) > + ; > +} > static inline unsigned int > eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter, > unsigned int max_deq) > @@ -571,7 +727,7 @@ eca_crypto_adapter_deq_run(struct > event_crypto_adapter *adapter, > struct crypto_device_info *curr_dev; > struct crypto_queue_pair_info *curr_queue; > struct rte_crypto_op *ops[BATCH_SIZE]; > - uint16_t n, nb_deq; > + uint16_t n, nb_deq, nb_enqueued, i; > struct rte_cryptodev *dev; > uint8_t cdev_id; > uint16_t qp, dev_qps; > @@ -579,16 +735,20 @@ eca_crypto_adapter_deq_run(struct > event_crypto_adapter *adapter, > uint16_t num_cdev = rte_cryptodev_count(); > > nb_deq = 0; > + eca_ops_buffer_flush(adapter); > + > do { > - uint16_t queues = 0; > done = true; > > for (cdev_id = adapter->next_cdev_id; > cdev_id < num_cdev; cdev_id++) { > + uint16_t queues = 0; > + > curr_dev = &adapter->cdevs[cdev_id]; > dev = curr_dev->dev; > if (dev == NULL) Add unlikely here > continue; > + > dev_qps = dev->data->nb_queue_pairs; > > for (qp = curr_dev->next_queue_pair_id; @@ -596,7 > +756,8 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter > *adapter, > queues++) { > > curr_queue = &curr_dev->qpairs[qp]; > - if (!curr_queue->qp_enabled) > + if (curr_queue == NULL || > + !curr_queue->qp_enabled) Add unlikely here > continue; > > n = rte_cryptodev_dequeue_burst(cdev_id, > qp, @@ -605,11 +766,27 @@ eca_crypto_adapter_deq_run(struct > event_crypto_adapter *adapter, > continue; > > done = false; > + nb_enqueued = 0; > + > stats->crypto_deq_count += n; > - eca_ops_enqueue_burst(adapter, ops, n); > + > + if (unlikely(!adapter->ebuf.count)) > + nb_enqueued = > eca_ops_enqueue_burst( > + adapter, ops, n); > + > + if (nb_enqueued == n) Add likely here > + goto check; > + > + /* Failed to enqueue events case */ > + for (i = nb_enqueued; i < n; i++) > + eca_circular_buffer_add( > + &adapter->ebuf, > + ops[nb_enqueued]); > + > +check: > nb_deq += n; > > - if (nb_deq > max_deq) { > + if (nb_deq >= max_deq) { > if ((qp + 1) == dev_qps) { > adapter->next_cdev_id = > (cdev_id + 1) > @@ -622,6 +799,7 @@ eca_crypto_adapter_deq_run(struct > event_crypto_adapter *adapter, > } > } > } > + adapter->next_cdev_id = 0; > } while (done == false); > return nb_deq; > } > @@ -751,11 +929,12 @@ eca_add_queue_pair(struct event_crypto_adapter > *adapter, uint8_t cdev_id, > return -ENOMEM; > > qpairs = dev_info->qpairs; > - qpairs->op_buffer = rte_zmalloc_socket(adapter- > >mem_name, > - BATCH_SIZE * > - sizeof(struct rte_crypto_op *), > - 0, adapter->socket_id); > - if (!qpairs->op_buffer) { > + > + if (eca_circular_buffer_init("eca_cdev_circular_buffer", > + &qpairs->cbuf, > + > CRYPTO_ADAPTER_OPS_BUFFER_SZ)) { > + RTE_EDEV_LOG_ERR("Failed to get memory for > cryptodev " > + "buffer"); > rte_free(qpairs); > return -ENOMEM; > } > -- > 2.6.4