On Thu, Feb 19, 2015 at 05:56:41PM +0000, John McNamara wrote:
> From: Richardson, Bruce <bruce.richardson at intel.com>
> 
> Add optional support for inline processing of packets inside the RX
> or TX call. For an RX callback, what happens is that we get a set of
> packets from the NIC and then pass them to a callback function, if
> configured, to allow additional processing to be done on them, e.g.
> filling in more mbuf fields, before passing back to the application.
> On TX, the packets are similarly post-processed before being handed
> to the NIC for transmission.
> 
> Signed-off-by: Bruce Richardson <bruce.richardson at intel.com>
> Signed-off-by: John McNamara <john.mcnamara at intel.com>
> ---
>  config/common_bsdapp                   |    1 +
>  config/common_linuxapp                 |    1 +
>  lib/librte_ether/rte_ethdev.c          |  192 +++++++++++++++++++++++++++++-
>  lib/librte_ether/rte_ethdev.h          |  204 
> +++++++++++++++++++++++++++++++-
>  lib/librte_ether/rte_ether_version.map |    4 +
>  5 files changed, 397 insertions(+), 5 deletions(-)
> 
> diff --git a/config/common_bsdapp b/config/common_bsdapp
> index f11ff39..e9c445e 100644
> --- a/config/common_bsdapp
> +++ b/config/common_bsdapp
> @@ -133,6 +133,7 @@ CONFIG_RTE_LIBRTE_ETHDEV_DEBUG=n
>  CONFIG_RTE_MAX_ETHPORTS=32
>  CONFIG_RTE_LIBRTE_IEEE1588=n
>  CONFIG_RTE_ETHDEV_QUEUE_STAT_CNTRS=16
> +CONFIG_RTE_LIBRTE_ETHDEV_RXTX_CALLBACKS=n

I no reason why this should not be "y" by default. Those who are not using it
and don't want the tiny performance impact of it, can turn it off. 

/Bruce

>  
>  #
>  # Support NIC bypass logic
> diff --git a/config/common_linuxapp b/config/common_linuxapp
> index f921d8c..0cb850e 100644
> --- a/config/common_linuxapp
> +++ b/config/common_linuxapp
> @@ -131,6 +131,7 @@ CONFIG_RTE_LIBRTE_ETHDEV_DEBUG=n
>  CONFIG_RTE_MAX_ETHPORTS=32
>  CONFIG_RTE_LIBRTE_IEEE1588=n
>  CONFIG_RTE_ETHDEV_QUEUE_STAT_CNTRS=16
> +CONFIG_RTE_LIBRTE_ETHDEV_RXTX_CALLBACKS=n
>  
>  #
>  # Support NIC bypass logic
> diff --git a/lib/librte_ether/rte_ethdev.c b/lib/librte_ether/rte_ethdev.c
> index 7c4e772..8a4e0e7 100644
> --- a/lib/librte_ether/rte_ethdev.c
> +++ b/lib/librte_ether/rte_ethdev.c
> @@ -337,6 +337,20 @@ rte_eth_dev_rx_queue_config(struct rte_eth_dev *dev, 
> uint16_t nb_queues)
>                       dev->data->nb_rx_queues = 0;
>                       return -(ENOMEM);
>               }
> +
> +#ifdef RTE_LIBRTE_ETHDEV_RXTX_CALLBACKS
> +             dev->post_rx_burst_cbs = rte_zmalloc(
> +                     "ethdev->post_rx_burst_cbs",
> +                     sizeof(*dev->post_rx_burst_cbs) * nb_queues,
> +                     RTE_CACHE_LINE_SIZE);
> +             if (dev->post_rx_burst_cbs == NULL) {
> +                     rte_free(dev->data->rx_queues);
> +                     dev->data->rx_queues = NULL;
> +                     dev->data->nb_rx_queues = 0;
> +                     return -(ENOMEM);
> +             }
> +#endif
> +
>       } else { /* re-configure */
>               FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rx_queue_release, -ENOTSUP);
>  
> @@ -349,9 +363,25 @@ rte_eth_dev_rx_queue_config(struct rte_eth_dev *dev, 
> uint16_t nb_queues)
>               if (rxq == NULL)
>                       return -(ENOMEM);
>  
> -             if (nb_queues > old_nb_queues)
> +#ifdef RTE_LIBRTE_ETHDEV_RXTX_CALLBACKS
> +             dev->post_rx_burst_cbs = rte_realloc(
> +                     dev->post_rx_burst_cbs,
> +                     sizeof(*dev->post_rx_burst_cbs) *
> +                             nb_queues, RTE_CACHE_LINE_SIZE);
> +             if (dev->post_rx_burst_cbs == NULL)
> +                     return -(ENOMEM);
> +#endif
> +
> +             if (nb_queues > old_nb_queues) {
> +                     uint16_t new_qs = nb_queues - old_nb_queues;
>                       memset(rxq + old_nb_queues, 0,
> -                             sizeof(rxq[0]) * (nb_queues - old_nb_queues));
> +                             sizeof(rxq[0]) * new_qs);
> +
> +#ifdef RTE_LIBRTE_ETHDEV_RXTX_CALLBACKS
> +                     memset(dev->post_rx_burst_cbs + old_nb_queues, 0,
> +                             sizeof(dev->post_rx_burst_cbs[0]) * new_qs);
> +#endif
> +             }
>  
>               dev->data->rx_queues = rxq;
>  
> @@ -479,6 +509,20 @@ rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, 
> uint16_t nb_queues)
>                       dev->data->nb_tx_queues = 0;
>                       return -(ENOMEM);
>               }
> +
> +#ifdef RTE_LIBRTE_ETHDEV_RXTX_CALLBACKS
> +             dev->pre_tx_burst_cbs = rte_zmalloc(
> +                     "ethdev->pre_tx_burst_cbs",
> +                     sizeof(*dev->pre_tx_burst_cbs) * nb_queues,
> +                     RTE_CACHE_LINE_SIZE);
> +             if (dev->pre_tx_burst_cbs == NULL) {
> +                     rte_free(dev->data->tx_queues);
> +                     dev->data->tx_queues = NULL;
> +                     dev->data->nb_tx_queues = 0;
> +                     return -(ENOMEM);
> +             }
> +#endif
> +
>       } else { /* re-configure */
>               FUNC_PTR_OR_ERR_RET(*dev->dev_ops->tx_queue_release, -ENOTSUP);
>  
> @@ -491,9 +535,25 @@ rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, 
> uint16_t nb_queues)
>               if (txq == NULL)
>                       return -(ENOMEM);
>  
> -             if (nb_queues > old_nb_queues)
> +#ifdef RTE_LIBRTE_ETHDEV_RXTX_CALLBACKS
> +             dev->pre_tx_burst_cbs = rte_realloc(
> +                     dev->pre_tx_burst_cbs,
> +                     sizeof(*dev->pre_tx_burst_cbs) *
> +                             nb_queues, RTE_CACHE_LINE_SIZE);
> +             if (dev->pre_tx_burst_cbs == NULL)
> +                     return -(ENOMEM);
> +#endif
> +
> +             if (nb_queues > old_nb_queues) {
> +                     uint16_t new_qs = nb_queues - old_nb_queues;
>                       memset(txq + old_nb_queues, 0,
> -                             sizeof(txq[0]) * (nb_queues - old_nb_queues));
> +                             sizeof(txq[0]) * new_qs);
> +
> +#ifdef RTE_LIBRTE_ETHDEV_RXTX_CALLBACKS
> +                     memset(dev->pre_tx_burst_cbs + old_nb_queues, 0,
> +                             sizeof(dev->pre_tx_burst_cbs[0]) * new_qs);
> +#endif
> +             }
>  
>               dev->data->tx_queues = txq;
>  
> @@ -3258,3 +3318,127 @@ rte_eth_dev_filter_ctrl(uint8_t port_id, enum 
> rte_filter_type filter_type,
>       FUNC_PTR_OR_ERR_RET(*dev->dev_ops->filter_ctrl, -ENOTSUP);
>       return (*dev->dev_ops->filter_ctrl)(dev, filter_type, filter_op, arg);
>  }
> +
> +#ifdef RTE_LIBRTE_ETHDEV_RXTX_CALLBACKS
> +void *
> +rte_eth_add_rx_callback(uint8_t port_id, uint16_t queue_id,
> +             rte_rxtx_callback_fn fn, void *user_param)
> +{
> +     /* check input parameters */
> +     if (port_id >= nb_ports || fn == NULL ||
> +                 queue_id >= rte_eth_devices[port_id].data->nb_rx_queues) {
> +             rte_errno = EINVAL;
> +             return NULL;
> +     }
> +
> +     struct rte_eth_rxtx_callback *cb = rte_zmalloc(NULL, sizeof(*cb), 0);
> +
> +     if (cb == NULL) {
> +             rte_errno = ENOMEM;
> +             return NULL;
> +     }
> +
> +     cb->fn = fn;
> +     cb->param = user_param;
> +     cb->next = rte_eth_devices[port_id].post_rx_burst_cbs[queue_id];
> +     rte_eth_devices[port_id].post_rx_burst_cbs[queue_id] = cb;
> +     return cb;
> +}
> +
> +void *
> +rte_eth_add_tx_callback(uint8_t port_id, uint16_t queue_id,
> +             rte_rxtx_callback_fn fn, void *user_param)
> +{
> +     /* check input parameters */
> +     if (port_id >= nb_ports || fn == NULL ||
> +                 queue_id >= rte_eth_devices[port_id].data->nb_tx_queues) {
> +             rte_errno = EINVAL;
> +             return NULL;
> +     }
> +
> +     struct rte_eth_rxtx_callback *cb = rte_zmalloc(NULL, sizeof(*cb), 0);
> +
> +     if (cb == NULL) {
> +             rte_errno = ENOMEM;
> +             return NULL;
> +     }
> +
> +     cb->fn = fn;
> +     cb->param = user_param;
> +     cb->next = rte_eth_devices[port_id].pre_tx_burst_cbs[queue_id];
> +     rte_eth_devices[port_id].pre_tx_burst_cbs[queue_id] = cb;
> +     return cb;
> +}
> +
> +int
> +rte_eth_remove_rx_callback(uint8_t port_id, uint16_t queue_id,
> +             struct rte_eth_rxtx_callback *user_cb)
> +{
> +     /* Check input parameters. */
> +     if (port_id >= nb_ports || user_cb == NULL ||
> +                 queue_id >= rte_eth_devices[port_id].data->nb_rx_queues) {
> +             return (-EINVAL);
> +     }
> +
> +     struct rte_eth_dev *dev = &rte_eth_devices[port_id];
> +     struct rte_eth_rxtx_callback *cb = dev->post_rx_burst_cbs[queue_id];
> +     struct rte_eth_rxtx_callback *prev_cb;
> +
> +     /* Reset head pointer and remove user cb if first in the list. */
> +     if (cb == user_cb) {
> +             dev->post_rx_burst_cbs[queue_id] = user_cb->next;
> +             return 0;
> +     }
> +
> +     /* Remove the user cb from the callback list. */
> +     do {
> +             prev_cb = cb;
> +             cb = cb->next;
> +
> +             if (cb == user_cb) {
> +                     prev_cb->next = user_cb->next;
> +                     return 0;
> +             }
> +
> +     } while (cb != NULL);
> +
> +     /* Callback wasn't found. */
> +     return (-EINVAL);
> +}
> +
> +int
> +rte_eth_remove_tx_callback(uint8_t port_id, uint16_t queue_id,
> +             struct rte_eth_rxtx_callback *user_cb)
> +{
> +     /* Check input parameters. */
> +     if (port_id >= nb_ports || user_cb == NULL ||
> +                 queue_id >= rte_eth_devices[port_id].data->nb_tx_queues) {
> +             return (-EINVAL);
> +     }
> +
> +     struct rte_eth_dev *dev = &rte_eth_devices[port_id];
> +     struct rte_eth_rxtx_callback *cb = dev->pre_tx_burst_cbs[queue_id];
> +     struct rte_eth_rxtx_callback *prev_cb;
> +
> +     /* Reset head pointer and remove user cb if first in the list. */
> +     if (cb == user_cb) {
> +             dev->pre_tx_burst_cbs[queue_id] = user_cb->next;
> +             return 0;
> +     }
> +
> +     /* Remove the user cb from the callback list. */
> +     do {
> +             prev_cb = cb;
> +             cb = cb->next;
> +
> +             if (cb == user_cb) {
> +                     prev_cb->next = user_cb->next;
> +                     return 0;
> +             }
> +
> +     } while (cb != NULL);
> +
> +     /* Callback wasn't found. */
> +     return (-EINVAL);
> +}
> +#endif       /* RTE_LIBRTE_ETHDEV_RXTX_CALLBACKS */
> diff --git a/lib/librte_ether/rte_ethdev.h b/lib/librte_ether/rte_ethdev.h
> index 48e4ac9..f55eeea 100644
> --- a/lib/librte_ether/rte_ethdev.h
> +++ b/lib/librte_ether/rte_ethdev.h
> @@ -1522,6 +1522,49 @@ struct eth_dev_ops {
>       eth_filter_ctrl_t              filter_ctrl;          /**< common filter 
> control*/
>  };
>  
> +#ifdef RTE_LIBRTE_ETHDEV_RXTX_CALLBACKS
> +/**
> + * Function type used for callbacks for processing packets on RX and TX
> + *
> + * If configured for RX, it is called with a burst of packets that have just
> + * been received on the given port and queue. On TX, it is called with a 
> burst
> + * of packets immediately before those packets are put onto the hardware 
> queue
> + * for transmission.
> + *
> + * @param port
> + *   The ethernet port on which rx or tx is being performed
> + * @param queue
> + *   The queue on the ethernet port which is being used to receive or 
> transmit
> + *   the packets.
> + * @param pkts
> + *   The burst of packets on which processing is to be done. On RX, these
> + *   packets have just been received. On TX, they are about to be 
> transmitted.
> + * @param nb_pkts
> + *   The number of packets in the burst pointed to by "pkts"
> + * @param user_param
> + *   The arbitrary user parameter passed in by the application when the 
> callback
> + *   was originally configured.
> + * @return
> + *   The number of packets remaining in pkts are processing.
> + *   * On RX, this will be returned to the user as the return value from
> + *     rte_eth_rx_burst.
> + *   * On TX, this will be the number of packets actually written to the NIC.
> + */
> +typedef uint16_t (*rte_rxtx_callback_fn)(uint8_t port, uint16_t queue,
> +     struct rte_mbuf *pkts[], uint16_t nb_pkts, void *user_param);
> +
> +/**
> + * @internal
> + * Structure used to hold information about the callbacks to be called for a
> + * queue on RX and TX.
> + */
> +struct rte_eth_rxtx_callback {
> +     struct rte_eth_rxtx_callback *next;
> +     rte_rxtx_callback_fn fn;
> +     void *param;
> +};
> +#endif
> +
>  /**
>   * @internal
>   * The generic data structure associated with each ethernet device.
> @@ -1539,7 +1582,22 @@ struct rte_eth_dev {
>       const struct eth_driver *driver;/**< Driver for this device */
>       struct eth_dev_ops *dev_ops;    /**< Functions exported by PMD */
>       struct rte_pci_device *pci_dev; /**< PCI info. supplied by probing */
> -     struct rte_eth_dev_cb_list link_intr_cbs; /**< User application 
> callbacks on interrupt*/
> +     /** User application callbacks for NIC interrupts */
> +     struct rte_eth_dev_cb_list link_intr_cbs;
> +
> +#ifdef RTE_LIBRTE_ETHDEV_RXTX_CALLBACKS
> +     /**
> +      * User-supplied functions called from rx_burst to post-process
> +      * received packets before passing them to the user
> +      */
> +     struct rte_eth_rxtx_callback **post_rx_burst_cbs;
> +
> +     /**
> +      * User-supplied functions called from tx_burst to pre-process
> +      * received packets before passing them to the driver for transmission.
> +      */
> +     struct rte_eth_rxtx_callback **pre_tx_burst_cbs;
> +#endif
>  };
>  
>  struct rte_eth_dev_sriov {
> @@ -2393,7 +2451,23 @@ rte_eth_rx_burst(uint8_t port_id, uint16_t queue_id,
>       struct rte_eth_dev *dev;
>  
>       dev = &rte_eth_devices[port_id];
> +
> +#ifndef RTE_LIBRTE_ETHDEV_RXTX_CALLBACKS
>       return (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id], rx_pkts, 
> nb_pkts);
> +#else
> +     nb_pkts = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id], rx_pkts,
> +                     nb_pkts);
> +     struct rte_eth_rxtx_callback *cb = dev->post_rx_burst_cbs[queue_id];
> +
> +     if (unlikely(cb != NULL)) {
> +             do {
> +                     nb_pkts = cb->fn(port_id, queue_id, rx_pkts, nb_pkts,
> +                                     cb->param);
> +                     cb = cb->next;
> +             } while (cb != NULL);
> +     }
> +     return nb_pkts;
> +#endif
>  }
>  #endif
>  
> @@ -2520,6 +2594,17 @@ rte_eth_tx_burst(uint8_t port_id, uint16_t queue_id,
>       struct rte_eth_dev *dev;
>  
>       dev = &rte_eth_devices[port_id];
> +#ifdef RTE_LIBRTE_ETHDEV_RXTX_CALLBACKS
> +     struct rte_eth_rxtx_callback *cb = dev->pre_tx_burst_cbs[queue_id];
> +
> +     if (unlikely(cb != NULL)) {
> +             do {
> +                     nb_pkts = cb->fn(port_id, queue_id, tx_pkts, nb_pkts,
> +                                     cb->param);
> +                     cb = cb->next;
> +             } while (cb != NULL);
> +     }
> +#endif
>       return (*dev->tx_pkt_burst)(dev->data->tx_queues[queue_id], tx_pkts, 
> nb_pkts);
>  }
>  #endif
> @@ -3667,6 +3752,123 @@ int rte_eth_dev_filter_supported(uint8_t port_id, 
> enum rte_filter_type filter_ty
>  int rte_eth_dev_filter_ctrl(uint8_t port_id, enum rte_filter_type 
> filter_type,
>                       enum rte_filter_op filter_op, void *arg);
>  
> +#ifdef RTE_LIBRTE_ETHDEV_RXTX_CALLBACKS
> +/**
> + * Add a callback to be called on packet RX on a given port and queue.
> + *
> + * This API configures a function to be called for each burst of
> + * packets received on a given NIC port queue. The return value is a pointer
> + * that can be used to later remove the callback using
> + * rte_eth_remove_rx_callback().
> + *
> + * @param port_id
> + *   The port identifier of the Ethernet device.
> + * @param queue_id
> + *   The queue on the Ethernet device on which the callback is to be added.
> + * @param fn
> + *   The callback function
> + * @param user_param
> + *   A generic pointer parameter which will be passed to each invocation of 
> the
> + *   callback function on this port and queue.
> + *
> + * @return
> + *   NULL on error.
> + *   On success, a pointer value which can later be used to remove the 
> callback.
> + */
> +void *rte_eth_add_rx_callback(uint8_t port_id, uint16_t queue_id,
> +             rte_rxtx_callback_fn fn, void *user_param);
> +
> +/**
> + * Add a callback to be called on packet TX on a given port and queue.
> + *
> + * This API configures a function to be called for each burst of
> + * packets sent on a given NIC port queue. The return value is a pointer
> + * that can be used to later remove the callback using
> + * rte_eth_remove_tx_callback().
> + *
> + * @param port_id
> + *   The port identifier of the Ethernet device.
> + * @param queue_id
> + *   The queue on the Ethernet device on which the callback is to be added.
> + * @param fn
> + *   The callback function
> + * @param user_param
> + *   A generic pointer parameter which will be passed to each invocation of 
> the
> + *   callback function on this port and queue.
> + *
> + * @return
> + *   NULL on error.
> + *   On success, a pointer value which can later be used to remove the 
> callback.
> + */
> +void *rte_eth_add_tx_callback(uint8_t port_id, uint16_t queue_id,
> +             rte_rxtx_callback_fn fn, void *user_param);
> +
> +/**
> + * Remove an RX packet callback from a given port and queue.
> + *
> + * This function is used to removed callbacks that were added to a NIC port
> + * queue using rte_eth_add_rx_callback().
> + *
> + * Note: the callback is removed from the callback list but it isn't freed
> + * since the it may still be in use. The memory for the callback can be
> + * subsequently freed back by the application by calling rte_free():
> + *
> + *  - Immediately - if the port is stopped, or the user knows that no
> + *    callbacks are in flight e.g. if called from the thread doing RX/TX
> + *    on that queue.
> + *
> + * - After a short delay - where the delay is sufficient to allow any
> + *   in-flight callbacks to complete.
> + *
> + * @param port_id
> + *   The port identifier of the Ethernet device.
> + * @param queue_id
> + *   The queue on the Ethernet device from which the callback is to be 
> removed.
> + * @param user_cb
> + *   User supplied callback created via rte_eth_add_rx_callback().
> + *
> + * @return
> + *   - 0: Success. Callback was removed.
> + *   - -EINVAL: The port_id or the queue_id is out of range, or the callback 
> is
> + *              NULL or not found for the port/queue.
> + */
> +int rte_eth_remove_rx_callback(uint8_t port_id, uint16_t queue_id,
> +             struct rte_eth_rxtx_callback *user_cb);
> +
> +/**
> + * Remove a TX packet callback from a given port and queue.
> + *
> + * This function is used to removed callbacks that were added to a NIC port
> + * queue using rte_eth_add_tx_callback().
> + *
> + * Note: the callback is removed from the callback list but it isn't freed
> + * since the it may still be in use. The memory for the callback can be
> + * subsequently freed back by the application by calling rte_free():
> + *
> + *  - Immediately - if the port is stopped, or the user knows that no
> + *    callbacks are in flight e.g. if called from the thread doing RX/TX
> + *    on that queue.
> + *
> + * - After a short delay - where the delay is sufficient to allow any
> + *   in-flight callbacks to complete.
> + *
> + * @param port_id
> + *   The port identifier of the Ethernet device.
> + * @param queue_id
> + *   The queue on the Ethernet device from which the callback is to be 
> removed.
> + * @param user_cb
> + *   User supplied callback created via rte_eth_add_tx_callback().
> + *
> + * @return
> + *   - 0: Success. Callback was removed.
> + *   - -EINVAL: The port_id or the queue_id is out of range, or the callback 
> is
> + *              NULL or not found for the port/queue.
> + */
> +int rte_eth_remove_tx_callback(uint8_t port_id, uint16_t queue_id,
> +             struct rte_eth_rxtx_callback *user_cb);
> +
> +#endif /* RTE_LIBRTE_ETHDEV_RXTX_CALLBACKS */
> +
>  #ifdef __cplusplus
>  }
>  #endif
> diff --git a/lib/librte_ether/rte_ether_version.map 
> b/lib/librte_ether/rte_ether_version.map
> index 7316530..3227cda 100644
> --- a/lib/librte_ether/rte_ether_version.map
> +++ b/lib/librte_ether/rte_ether_version.map
> @@ -2,6 +2,8 @@ DPDK_2.0 {
>       global:
>  
>       _rte_eth_dev_callback_process;
> +     rte_eth_add_rx_callback;
> +     rte_eth_add_tx_callback;
>       rte_eth_allmulticast_disable;
>       rte_eth_allmulticast_enable;
>       rte_eth_allmulticast_get;
> @@ -96,6 +98,8 @@ DPDK_2.0 {
>       rte_eth_promiscuous_disable;
>       rte_eth_promiscuous_enable;
>       rte_eth_promiscuous_get;
> +     rte_eth_remove_rx_callback;
> +     rte_eth_remove_tx_callback;
>       rte_eth_rx_burst;
>       rte_eth_rx_descriptor_done;
>       rte_eth_rx_queue_count;
> -- 
> 1.7.4.1
> 

Reply via email to