Hi Bruce,

> -----Original Message-----
> From: dev [mailto:dev-bounces at dpdk.org] On Behalf Of Richardson, Bruce
> Sent: Friday, July 04, 2014 12:39 AM
> To: dev at dpdk.org
> Subject: [dpdk-dev] Making space in mbuf data-structure
> 
> Hi all,
> 
> At this stage it's been well recognised that we need to make more space in 
> the mbuf data structure for new fields. We in Intel have
> had some discussion on this and this email is to outline what our current 
> thinking and approach on this issue is, and look for additional
> suggestions and feedback.
> 
> Firstly, we believe that there is no possible way that we can ever fit all 
> the fields we need to fit into a 64-byte mbuf, and so we need to
> start looking at a 128-byte mbuf instead. Examples of new fields that need to 
> fit in, include - 32-64 bits for additional offload flags, 32-
> bits more for filter information for support for the new filters in the i40e 
> driver, an additional 2-4 bytes for storing info on a second
> vlan tag, 4-bytes for storing a sequence number to enable packet reordering 
> in future, as well as potentially a number of other fields
> or splitting out fields that are superimposed over each other right now, e.g. 
> for the qos scheduler. We also want to allow space for use
> by other non-Intel NIC drivers that may be open-sourced to dpdk.org in the 
> future too, where they support fields and offloads that
> our hardware doesn't.
> 
> If we accept the fact of a 2-cache-line mbuf, then the issue becomes how to 
> rework things so that we spread our fields over the two
> cache lines while causing the lowest slow-down possible. The general approach 
> that we are looking to take is to focus the first cache
> line on fields that are updated on RX , so that receive only deals with one 
> cache line. The second cache line can be used for application
> data and information that will only be used on the TX leg. This would allow 
> us to work on the first cache line in RX as now, and have the
> second cache line being prefetched in the background so that it is available 
> when necessary. Hardware prefetches should help us out
> here. We also may move rarely used, or slow-path RX fields e.g. such as those 
> for chained mbufs with jumbo frames, to the second
> cache line, depending upon the performance impact and bytes savings achieved.
> 
> With this approach, we still need to make space in the first cache line for 
> information for the new or expanded receive offloads. First
> off the blocks is to look at moving the mempool pointer into the second cache 
> line. This will free-up 8 bytes in cache line  0, with a field
> that is only used when cleaning up after TX. A prototype patch for this 
> change is given below, for your review and comment. Initial
> quick tests with testpmd (testpmd -c 600 -n 4 -- --mbcache=250 
> --txqflags=0xf01 --burst=32 --txrst=32 --txfreet=32 --rxfreet=32 --
> total-num-mbufs=65536), and l3fwd (l3fwd -c 400 -n 4 -- -p F -P 
> --config="(0,0,10),(1,0,10),(2,0,10),(3,0,10)") showed only a slight
> performance decrease with testpmd and equal or slightly better performance 
> with l3fwd. These would both have been using the
> vector PMD - I haven't looked to change the fallback TX implementation yet 
> for this change, since it's not as performance optimized
> and hence cycle-count sensitive.

Regarding code changes itself:
If I understand your code changes correctly:
ixgbe_tx_free_bufs() will use rte_mempool_put_bulk() *only* if all mbufs in the 
bulk belong to the same mempool.
If we have let say 2 groups of mbufs from 2 different mempools - it wouldn't be 
able to use put_bulk. 
While, as I remember,  current implementation is able to use put_bulk() in such 
case.
I think, it is possible to change you code to do a better grouping.

> 
> Beyond this change, I'm also investigating potentially moving the "next" 
> pointer to the second cache line, but it's looking harder to
> move without serious impact, so we'll have to see there what can be done. 
> Other fields I'll examine in due course too, and send on
> any findings I may have.

Could you explain it a bit?
I always had an impression that moving next pointer to the 2-nd cache line 
would have less impact then moving mempool pointer.
My thinking was: next is used only in scatter versions of RX/TX which are 
slower than optimised RX/TX anyway.
So few extra cycles wouldn't be that noticeable. 
Though I admit I never did any measurements for that case.

About space savings for the first cache line:
- I still think that Oliver's suggestion of replacing data pointer (8 bytes) 
with data offset (2 bytes) makes a lot of sense.

> 
> Once we have freed up space, then we can start looking at what fields get to 
> use that space and what way we shuffle the existing
> fields about, but that's a discussion for another day!
> 
> Please see patch below for moving pool pointer to second cache line of mbuf. 
> All feedback welcome, naturally.
> 
> Regards,
> /Bruce
> 
> diff --git a/app/test/test_mbuf.c b/app/test/test_mbuf.c
> index 2b87521..e0cf30c 100644
> --- a/app/test/test_mbuf.c
> +++ b/app/test/test_mbuf.c
> @@ -832,7 +832,7 @@ test_failing_mbuf_sanity_check(void)
>  int
>  test_mbuf(void)
>  {
> -     RTE_BUILD_BUG_ON(sizeof(struct rte_mbuf) != 64);
> +     RTE_BUILD_BUG_ON(sizeof(struct rte_mbuf) != CACHE_LINE_SIZE*2);
> 
>       /* create pktmbuf pool if it does not exist */
>       if (pktmbuf_pool == NULL) {
> diff --git a/lib/librte_mbuf/rte_mbuf.h b/lib/librte_mbuf/rte_mbuf.h
> index 2735f37..aa3ec32 100644
> --- a/lib/librte_mbuf/rte_mbuf.h
> +++ b/lib/librte_mbuf/rte_mbuf.h
> @@ -181,7 +181,8 @@ enum rte_mbuf_type {
>   * The generic rte_mbuf, containing a packet mbuf or a control mbuf.
>   */
>  struct rte_mbuf {
> -     struct rte_mempool *pool; /**< Pool from which mbuf was allocated. */
> +     uint64_t this_space_for_rent; // 8-bytes free for reuse.
> +
>       void *buf_addr;           /**< Virtual address of segment buffer. */
>       phys_addr_t buf_physaddr; /**< Physical address of segment buffer. */
>       uint16_t buf_len;         /**< Length of segment buffer. */
> @@ -210,12 +211,16 @@ struct rte_mbuf {
>               struct rte_pktmbuf pkt;
>       };
> 
> +// second cache line starts here
> +     struct rte_mempool *pool __rte_cache_aligned;
> +                     /**< Pool from which mbuf was allocated. */
> +
>       union {
>               uint8_t metadata[0];
>               uint16_t metadata16[0];
>               uint32_t metadata32[0];
>               uint64_t metadata64[0];
> -     };
> +     } __rte_cache_aligned;
>  } __rte_cache_aligned;
> 
>  #define RTE_MBUF_METADATA_UINT8(mbuf, offset)              \
> diff --git a/lib/librte_pmd_ixgbe/ixgbe_rxtx.h 
> b/lib/librte_pmd_ixgbe/ixgbe_rxtx.h
> index 64c0695..7374e0d 100644
> --- a/lib/librte_pmd_ixgbe/ixgbe_rxtx.h
> +++ b/lib/librte_pmd_ixgbe/ixgbe_rxtx.h
> @@ -171,10 +171,6 @@ struct igb_tx_queue {
>       volatile union ixgbe_adv_tx_desc *tx_ring;
>       uint64_t            tx_ring_phys_addr; /**< TX ring DMA address. */
>       struct igb_tx_entry *sw_ring;      /**< virtual address of SW ring. */
> -#ifdef RTE_IXGBE_INC_VECTOR
> -     /** continuous tx entry sequence within the same mempool */
> -     struct igb_tx_entry_seq *sw_ring_seq;
> -#endif
>       volatile uint32_t   *tdt_reg_addr; /**< Address of TDT register. */
>       uint16_t            nb_tx_desc;    /**< number of TX descriptors. */
>       uint16_t            tx_tail;       /**< current value of TDT reg. */
> diff --git a/lib/librte_pmd_ixgbe/ixgbe_rxtx_vec.c 
> b/lib/librte_pmd_ixgbe/ixgbe_rxtx_vec.c
> index 09e19a3..e9fd739 100644
> --- a/lib/librte_pmd_ixgbe/ixgbe_rxtx_vec.c
> +++ b/lib/librte_pmd_ixgbe/ixgbe_rxtx_vec.c
> @@ -401,9 +401,8 @@ static inline int __attribute__((always_inline))
>  ixgbe_tx_free_bufs(struct igb_tx_queue *txq)
>  {
>       struct igb_tx_entry_v *txep;
> -     struct igb_tx_entry_seq *txsp;
>       uint32_t status;
> -     uint32_t n, k;
> +     uint32_t n;
>  #ifdef RTE_MBUF_SCATTER_GATHER
>       uint32_t i;
>       int nb_free = 0;
> @@ -423,25 +422,36 @@ ixgbe_tx_free_bufs(struct igb_tx_queue *txq)
>        */
>       txep = &((struct igb_tx_entry_v *)txq->sw_ring)[txq->tx_next_dd -
>                       (n - 1)];
> -     txsp = &txq->sw_ring_seq[txq->tx_next_dd - (n - 1)];
> 
> -     while (n > 0) {
> -             k = RTE_MIN(n, txsp[n-1].same_pool);
>  #ifdef RTE_MBUF_SCATTER_GATHER
> -             for (i = 0; i < k; i++) {
> -                     m = __rte_pktmbuf_prefree_seg((txep+n-k+i)->mbuf);
> +     m = __rte_pktmbuf_prefree_seg(txep[0].mbuf);
> +     if (likely(m != NULL)) {
> +             free[0] = m;
> +             nb_free = 1;
> +             for (i = 1; i < n; i++) {
> +                     m = __rte_pktmbuf_prefree_seg(txep[i].mbuf);
> +                     if (likely(m != NULL)) {
> +                             if (likely(m->pool == free[0]->pool))
> +                                     free[nb_free++] = m;
> +                             else
> +                                     rte_mempool_put(m->pool, m);
> +                     }
> +             }
> +             rte_mempool_put_bulk(free[0]->pool, (void **)free, nb_free);
> +     }
> +     else {
> +             for (i = 1; i < n; i++) {
> +                     m = __rte_pktmbuf_prefree_seg(txep[i].mbuf);
>                       if (m != NULL)
> -                             free[nb_free++] = m;
> +                             rte_mempool_put(m->pool, m);
>               }
> -             rte_mempool_put_bulk((void *)txsp[n-1].pool,
> -                             (void **)free, nb_free);
> -#else
> -             rte_mempool_put_bulk((void *)txsp[n-1].pool,
> -                             (void **)(txep+n-k), k);
> -#endif
> -             n -= k;
>       }
> 
> +#else /* no scatter_gather */
> +     for (i = 0; i < n; i++)
> +             rte_mempool_put(m->pool, txep[i].mbuf);
> +#endif /* scatter_gather */
> +
>       /* buffers were freed, update counters */
>       txq->nb_tx_free = (uint16_t)(txq->nb_tx_free + txq->tx_rs_thresh);
>       txq->tx_next_dd = (uint16_t)(txq->tx_next_dd + txq->tx_rs_thresh);
> @@ -453,19 +463,11 @@ ixgbe_tx_free_bufs(struct igb_tx_queue *txq)
> 
>  static inline void __attribute__((always_inline))
>  tx_backlog_entry(struct igb_tx_entry_v *txep,
> -              struct igb_tx_entry_seq *txsp,
>                struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
>  {
>       int i;
> -     for (i = 0; i < (int)nb_pkts; ++i) {
> +     for (i = 0; i < (int)nb_pkts; ++i)
>               txep[i].mbuf = tx_pkts[i];
> -             /* check and update sequence number */
> -             txsp[i].pool = tx_pkts[i]->pool;
> -             if (txsp[i-1].pool == tx_pkts[i]->pool)
> -                     txsp[i].same_pool = txsp[i-1].same_pool + 1;
> -             else
> -                     txsp[i].same_pool = 1;
> -     }
>  }
> 
>  uint16_t
> @@ -475,7 +477,6 @@ ixgbe_xmit_pkts_vec(void *tx_queue, struct rte_mbuf 
> **tx_pkts,
>       struct igb_tx_queue *txq = (struct igb_tx_queue *)tx_queue;
>       volatile union ixgbe_adv_tx_desc *txdp;
>       struct igb_tx_entry_v *txep;
> -     struct igb_tx_entry_seq *txsp;
>       uint16_t n, nb_commit, tx_id;
>       __m128i flags = _mm_set_epi32(DCMD_DTYP_FLAGS, 0, 0, 0);
>       __m128i rs = _mm_set_epi32(IXGBE_ADVTXD_DCMD_RS|DCMD_DTYP_FLAGS,
> @@ -495,14 +496,13 @@ ixgbe_xmit_pkts_vec(void *tx_queue, struct rte_mbuf 
> **tx_pkts,
>       tx_id = txq->tx_tail;
>       txdp = &txq->tx_ring[tx_id];
>       txep = &((struct igb_tx_entry_v *)txq->sw_ring)[tx_id];
> -     txsp = &txq->sw_ring_seq[tx_id];
> 
>       txq->nb_tx_free = (uint16_t)(txq->nb_tx_free - nb_pkts);
> 
>       n = (uint16_t)(txq->nb_tx_desc - tx_id);
>       if (nb_commit >= n) {
> 
> -             tx_backlog_entry(txep, txsp, tx_pkts, n);
> +             tx_backlog_entry(txep, tx_pkts, n);
> 
>               for (i = 0; i < n - 1; ++i, ++tx_pkts, ++txdp)
>                       vtx1(txdp, *tx_pkts, flags);
> @@ -517,10 +517,9 @@ ixgbe_xmit_pkts_vec(void *tx_queue, struct rte_mbuf 
> **tx_pkts,
>               /* avoid reach the end of ring */
>               txdp = &(txq->tx_ring[tx_id]);
>               txep = &(((struct igb_tx_entry_v *)txq->sw_ring)[tx_id]);
> -             txsp = &(txq->sw_ring_seq[tx_id]);
>       }
> 
> -     tx_backlog_entry(txep, txsp, tx_pkts, nb_commit);
> +     tx_backlog_entry(txep, tx_pkts, nb_commit);
> 
>       vtx(txdp, tx_pkts, nb_commit, flags);
> 
> @@ -544,7 +543,6 @@ ixgbe_tx_queue_release_mbufs(struct igb_tx_queue *txq)
>  {
>       unsigned i;
>       struct igb_tx_entry_v *txe;
> -     struct igb_tx_entry_seq *txs;
>       uint16_t nb_free, max_desc;
> 
>       if (txq->sw_ring != NULL) {
> @@ -562,10 +560,6 @@ ixgbe_tx_queue_release_mbufs(struct igb_tx_queue *txq)
>               for (i = 0; i < txq->nb_tx_desc; i++) {
>                       txe = (struct igb_tx_entry_v *)&txq->sw_ring[i];
>                       txe->mbuf = NULL;
> -
> -                     txs = &txq->sw_ring_seq[i];
> -                     txs->pool = NULL;
> -                     txs->same_pool = 0;
>               }
>       }
>  }
> @@ -580,11 +574,6 @@ ixgbe_tx_free_swring(struct igb_tx_queue *txq)
>               rte_free((struct igb_rx_entry *)txq->sw_ring - 1);
>               txq->sw_ring = NULL;
>       }
> -
> -     if (txq->sw_ring_seq != NULL) {
> -             rte_free(txq->sw_ring_seq - 1);
> -             txq->sw_ring_seq = NULL;
> -     }
>  }
> 
>  static void
> @@ -593,7 +582,6 @@ ixgbe_reset_tx_queue(struct igb_tx_queue *txq)
>       static const union ixgbe_adv_tx_desc zeroed_desc = { .read = {
>                       .buffer_addr = 0} };
>       struct igb_tx_entry_v *txe = (struct igb_tx_entry_v *)txq->sw_ring;
> -     struct igb_tx_entry_seq *txs = txq->sw_ring_seq;
>       uint16_t i;
> 
>       /* Zero out HW ring memory */
> @@ -605,8 +593,6 @@ ixgbe_reset_tx_queue(struct igb_tx_queue *txq)
>               volatile union ixgbe_adv_tx_desc *txd = &txq->tx_ring[i];
>               txd->wb.status = IXGBE_TXD_STAT_DD;
>               txe[i].mbuf = NULL;
> -             txs[i].pool = NULL;
> -             txs[i].same_pool = 0;
>       }
> 
>       txq->tx_next_dd = (uint16_t)(txq->tx_rs_thresh - 1);
> @@ -632,27 +618,14 @@ static struct ixgbe_txq_ops vec_txq_ops = {
>  };
> 
>  int ixgbe_txq_vec_setup(struct igb_tx_queue *txq,
> -                     unsigned int socket_id)
> +                     __rte_unused unsigned int socket_id)
>  {
> -     uint16_t nb_desc;
> -
>       if (txq->sw_ring == NULL)
>               return -1;
> 
> -     /* request addtional one entry for continous sequence check */
> -     nb_desc = (uint16_t)(txq->nb_tx_desc + 1);
> -
> -     txq->sw_ring_seq = rte_zmalloc_socket("txq->sw_ring_seq",
> -                             sizeof(struct igb_tx_entry_seq) * nb_desc,
> -                             CACHE_LINE_SIZE, socket_id);
> -     if (txq->sw_ring_seq == NULL)
> -             return -1;
> -
> -
>       /* leave the first one for overflow */
>       txq->sw_ring = (struct igb_tx_entry *)
>               ((struct igb_tx_entry_v *)txq->sw_ring + 1);
> -     txq->sw_ring_seq += 1;
>       txq->ops = &vec_txq_ops;
> 
>       return 0;

Reply via email to