Thanks Ilya for your review, please see my comments inline. Regards, Antonio
> -----Original Message----- > From: Ilya Maximets [mailto:i.maxim...@samsung.com] > Sent: Wednesday, January 18, 2017 7:37 AM > To: Fischetti, Antonio <antonio.fische...@intel.com>; d...@openvswitch.org > Cc: acon...@redhat.com; diproiet...@vmware.com; Bodireddy, Bhanuprakash > <bhanuprakash.bodire...@intel.com>; markus.magnus...@ericsson.com > Subject: Re: [PATCH v2 1/2] netdev-dpdk: Use intermediate queue during > packet transmission. > > Not a complete review. This code is full of races. > See details inline. > > Best regards, Ilya Maximets. > > On 17.01.2017 18:37, antonio.fische...@intel.com wrote: > > This patch implements the intermediate Tx queues on 'dpdk' type ports. > > > > Test results: > > * In worst case scenario with fewer packets per batch, a significant > > bottleneck is observed for netdev_dpdk_eth_send() function due to > > expensive MMIO writes. > > > > * Also its observed that CPI(cycles per instruction) Rate for the > function > > stood between 3.15 and 4.1 which is significantly higher than > acceptable > > limit of 1.0 for HPC applications and theoretical limit of 0.25 (As > Backend > > pipeline can retire 4 micro-operations in a cycle). > > > > * With this patch, CPI for netdev_dpdk_eth_send() is at 0.55 and the > overall > > throughput improved significantly. > > > > > > Signed-off-by: Antonio Fischetti <antonio.fische...@intel.com> > > Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodire...@intel.com> > > Co-authored-by: Bhanuprakash Bodireddy > <bhanuprakash.bodire...@intel.com> > > Signed-off-by: Markus Magnusson <markus.magnus...@ericsson.com> > > Co-authored-by: Markus Magnusson <markus.magnus...@ericsson.com> > > --- > > lib/dpif-netdev.c | 53 +++++++++++++++++++++++++++++++-- > > lib/netdev-bsd.c | 1 + > > lib/netdev-dpdk.c | 82 > ++++++++++++++++++++++++++++++++++++++++++++++----- > > lib/netdev-dummy.c | 1 + > > lib/netdev-linux.c | 1 + > > lib/netdev-provider.h | 8 +++++ > > lib/netdev-vport.c | 3 +- > > lib/netdev.c | 9 ++++++ > > lib/netdev.h | 1 + > > 9 files changed, 149 insertions(+), 10 deletions(-) > > > > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c > > index 3901129..58ac429 100644 > > --- a/lib/dpif-netdev.c > > +++ b/lib/dpif-netdev.c > > @@ -289,6 +289,8 @@ struct dp_netdev_rxq { > > struct dp_netdev_pmd_thread *pmd; /* pmd thread that will poll > this queue. */ > > }; > > > > +#define LAST_USED_QID_NONE -1 > > + > > /* A port in a netdev-based datapath. */ > > struct dp_netdev_port { > > odp_port_t port_no; > > @@ -303,6 +305,8 @@ struct dp_netdev_port { > > char *type; /* Port type as requested by user. */ > > char *rxq_affinity_list; /* Requested affinity of rx queues. */ > > bool need_reconfigure; /* True if we should reconfigure > netdev. */ > > + int last_used_qid; /* Last queue id where packets could be > > + enqueued. */ > > }; > > > > /* Contained by struct dp_netdev_flow's 'stats' member. */ > > @@ -619,6 +623,9 @@ static int dpif_netdev_xps_get_tx_qid(const struct > dp_netdev_pmd_thread *pmd, > > static inline bool emc_entry_alive(struct emc_entry *ce); > > static void emc_clear_entry(struct emc_entry *ce); > > > > +static struct tx_port *pmd_send_port_cache_lookup > > +(const struct dp_netdev_pmd_thread *pmd, odp_port_t port_no); > > + > > static void > > emc_cache_init(struct emc_cache *flow_cache) > > { > > @@ -3507,15 +3514,19 @@ pmd_load_queues_and_ports(struct > dp_netdev_pmd_thread *pmd, > > return i; > > } > > > > +enum { DRAIN_TSC = 20000ULL }; > > + > > static void * > > pmd_thread_main(void *f_) > > { > > struct dp_netdev_pmd_thread *pmd = f_; > > - unsigned int lc = 0; > > + unsigned int lc = 0, lc_drain = 0; > > struct polled_queue *poll_list; > > bool exiting; > > int poll_cnt; > > int i; > > + uint64_t prev = 0, now = 0; > > + struct tx_port *tx_port; > > > > poll_list = NULL; > > > > @@ -3548,6 +3559,26 @@ reload: > > poll_list[i].port_no); > > } > > > > +#define MAX_LOOP_TO_DRAIN 128 > > + if (lc_drain++ > MAX_LOOP_TO_DRAIN) { > > + lc_drain = 0; > > + prev = now; > > + now = pmd->last_cycles; > > + if ((now - prev) > DRAIN_TSC) { > > + HMAP_FOR_EACH (tx_port, node, &pmd->tx_ports) { > > 'pmd->tx_ports' must be protected by 'pmd->port_mutex'. Also it can be > changed > while pmd still working. I think you wanted something like 'pmd- > >send_port_cache'. Ok, I will replace it with HMAP_FOR_EACH (tx_port, node, &pmd->send_port_cache) > > > + if (tx_port->port->last_used_qid != > LAST_USED_QID_NONE) { > > + /* This queue may contain some buffered packets > waiting > > + * to be sent out. */ > > + netdev_txq_drain(tx_port->port->netdev, > > + tx_port->port->last_used_qid, > > + tx_port->port->dynamic_txqs); > > + /* Mark it as empty. */ > > + tx_port->port->last_used_qid = > LAST_USED_QID_NONE; > > 'port' is a pointer to the common structure --> 'port->last_used_qid' will > be > concurrently updated by all threads --> total mess. Ok, will fix this. What about moving 'last_used_qid' inside 'struct tx_port'? This way, if I loop on send_port_cache like HMAP_FOR_EACH (tx_port, node, &pmd->send_port_cache) then I could refer to tx_port->last_used_qid with no concurrency issue. Do you think this could be ok? Any suggestion will help, thanks. > > > + } > > + } > > + } > > + } > > + > > if (lc++ > 1024) { > > bool reload; > > > > @@ -3883,6 +3914,7 @@ dp_netdev_add_port_tx_to_pmd(struct > dp_netdev_pmd_thread *pmd, > > > > tx->port = port; > > tx->qid = -1; > > + port->last_used_qid = LAST_USED_QID_NONE; > > > > hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port- > >port_no)); > > pmd->need_reload = true; > > @@ -4538,7 +4570,24 @@ dp_execute_cb(void *aux_, struct dp_packet_batch > *packets_, > > } else { > > tx_qid = pmd->static_tx_qid; > > } > > +//TODO Add UNLIKELY to the 1st condition? > > + /* Is the current qid the same as the last one we used? */ > > + if ((p->port->last_used_qid != LAST_USED_QID_NONE) && > > + (p->port->last_used_qid != tx_qid)) { > > + /* The current assigned queue was changed, we need to > drain > > + * packets from the previous queue. */ > > + netdev_txq_drain(p->port->netdev, > > + p->port->last_used_qid, > > + p->port->dynamic_txqs); > > + p->port->last_used_qid = LAST_USED_QID_NONE; > > + } > > > > + /* In case these packets gets buffered into an intermediate > > + * queue and XPS is enabled the drain function could find a > > + * different Tx qid assigned to its thread. We keep track > > + * of the qid we're now using, that will trigger the drain > > + * function and will select the right queue to flush. */ > > + p->port->last_used_qid = tx_qid; > > netdev_send(p->port->netdev, tx_qid, packets_, may_steal, > > dynamic_txqs); > > return; > > @@ -4952,7 +5001,7 @@ dpif_dummy_register(enum dummy_level level) > > "dp port new-number", > > 3, 3, dpif_dummy_change_port_number, > NULL); > > } > > - > > + > > /* Datapath Classifier. */ > > > > /* A set of rules that all have the same fields wildcarded. */ > > diff --git a/lib/netdev-bsd.c b/lib/netdev-bsd.c > > index 94c515d..00d5263 100644 > > --- a/lib/netdev-bsd.c > > +++ b/lib/netdev-bsd.c > > @@ -1547,6 +1547,7 @@ netdev_bsd_update_flags(struct netdev *netdev_, > enum netdev_flags off, > > netdev_bsd_rxq_recv, \ > > netdev_bsd_rxq_wait, \ > > netdev_bsd_rxq_drain, \ > > + NULL, \ > > } > > > > const struct netdev_class netdev_bsd_class = > > diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c > > index 94568a1..d560bf6 100644 > > --- a/lib/netdev-dpdk.c > > +++ b/lib/netdev-dpdk.c > > @@ -166,7 +166,6 @@ static const struct rte_eth_conf port_conf = { > > > > enum { DPDK_RING_SIZE = 256 }; > > BUILD_ASSERT_DECL(IS_POW2(DPDK_RING_SIZE)); > > -enum { DRAIN_TSC = 200000ULL }; > > > > enum dpdk_dev_type { > > DPDK_DEV_ETH = 0, > > @@ -289,12 +288,18 @@ struct dpdk_mp { > > /* There should be one 'struct dpdk_tx_queue' created for > > * each cpu core. */ > > struct dpdk_tx_queue { > > + int count; /* Number of buffered packets > waiting to > > + be sent. */ > > rte_spinlock_t tx_lock; /* Protects the members and the NIC > queue > > * from concurrent access. It is > used only > > * if the queue is shared among > different > > * pmd threads (see > 'concurrent_txq'). */ > > int map; /* Mapping of configured vhost-user > queues > > * to enabled by guest. */ > > + struct rte_mbuf *burst_pkts[NETDEV_MAX_BURST]; > > + /* Intermediate queues where packets > can > > + * be buffered to amortize the cost > of MMIO > > + * writes. */ > > }; > > > > /* dpdk has no way to remove dpdk ring ethernet devices > > @@ -1381,6 +1386,7 @@ static inline int > > netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid, > > struct rte_mbuf **pkts, int cnt) > > { > > + struct dpdk_tx_queue *txq = &dev->tx_q[qid]; > > uint32_t nb_tx = 0; > > > > while (nb_tx != cnt) { > > @@ -1404,6 +1410,8 @@ netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, > int qid, > > } > > } > > > > + txq->count = cnt - nb_tx; > > + > > return cnt - nb_tx; > > } > > > > @@ -1788,6 +1796,37 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, > struct dp_packet_batch *batch) > > } > > } > > > > +/* Enqueue packets in an intermediate queue and call the burst > > + * function when the queue is full. This way we can amortize the > > + * cost of MMIO writes. */ > > +static inline int > > +netdev_dpdk_eth_tx_queue(struct netdev_dpdk *dev, int qid, > > + struct rte_mbuf **pkts, int cnt) > > +{ > > + struct dpdk_tx_queue *txq = &dev->tx_q[qid]; > > + > > + int i = 0; > > + int dropped = 0; > > + > > + while (i < cnt) { > > + int freeslots = NETDEV_MAX_BURST - txq->count; > > + int tocopy = MIN(freeslots, cnt-i); > > + > > + memcpy(&txq->burst_pkts[txq->count], &pkts[i], > > + tocopy * sizeof (struct rte_mbuf *)); > > + > > + txq->count += tocopy; > > + i += tocopy; > > + > > + /* Queue full, burst the packets */ > > + if (txq->count >= NETDEV_MAX_BURST) { > > + dropped += netdev_dpdk_eth_tx_burst(dev, qid, txq- > >burst_pkts, > > + txq->count); > > + } > > + } > > + return dropped; > > +} > > + > > static int > > netdev_dpdk_vhost_send(struct netdev *netdev, int qid, > > struct dp_packet_batch *batch, > > @@ -1836,7 +1875,7 @@ netdev_dpdk_send__(struct netdev_dpdk *dev, int > qid, > > cnt = netdev_dpdk_qos_run(dev, pkts, cnt); > > dropped = batch->count - cnt; > > > > - dropped += netdev_dpdk_eth_tx_burst(dev, qid, pkts, cnt); > > + dropped += netdev_dpdk_eth_tx_queue(dev, qid, pkts, cnt); > > > > if (OVS_UNLIKELY(dropped)) { > > rte_spinlock_lock(&dev->stats_lock); > > @@ -1850,6 +1889,30 @@ netdev_dpdk_send__(struct netdev_dpdk *dev, int > qid, > > } > > } > > > > +/* Drain tx queues, this is called periodically to empty the > > + * intermediate queue in case of few packets (< NETDEV_MAX_BURST) > > + * are buffered into the queue. */ > > +static int > > +netdev_dpdk_txq_drain(struct netdev *netdev, int qid, bool > concurrent_txq) > > +{ > > + struct netdev_dpdk *dev = netdev_dpdk_cast(netdev); > > + struct dpdk_tx_queue *txq = &dev->tx_q[qid]; > > + > > + if (OVS_LIKELY(txq->count)) { > > + if (OVS_UNLIKELY(concurrent_txq)) { > > + qid = qid % dev->up.n_txq; // TODO: do we need this? > > + rte_spinlock_lock(&dev->tx_q[qid].tx_lock); > > + } > > + > > + netdev_dpdk_eth_tx_burst(dev, qid, txq->burst_pkts, txq- > >count); > > + > > + if (OVS_UNLIKELY(concurrent_txq)) { > > + rte_spinlock_unlock(&dev->tx_q[qid].tx_lock); > > + } > > + } > > + return 0; > > +} > > + > > static int > > netdev_dpdk_eth_send(struct netdev *netdev, int qid, > > struct dp_packet_batch *batch, bool may_steal, > > @@ -3243,7 +3306,7 @@ unlock: > > SET_CONFIG, SET_TX_MULTIQ, SEND, \ > > GET_CARRIER, GET_STATS, \ > > GET_FEATURES, GET_STATUS, \ > > - RECONFIGURE, RXQ_RECV) \ > > + RECONFIGURE, RXQ_RECV, TXQ_DRAIN) \ > > { \ > > NAME, \ > > true, /* is_pmd */ \ > > @@ -3310,6 +3373,7 @@ unlock: > > RXQ_RECV, \ > > NULL, /* rx_wait */ \ > > NULL, /* rxq_drain */ \ > > + TXQ_DRAIN, /* txq_drain */ \ > > } > > > > static const struct netdev_class dpdk_class = > > @@ -3326,7 +3390,8 @@ static const struct netdev_class dpdk_class = > > netdev_dpdk_get_features, > > netdev_dpdk_get_status, > > netdev_dpdk_reconfigure, > > - netdev_dpdk_rxq_recv); > > + netdev_dpdk_rxq_recv, > > + netdev_dpdk_txq_drain); > > > > static const struct netdev_class dpdk_ring_class = > > NETDEV_DPDK_CLASS( > > @@ -3342,7 +3407,8 @@ static const struct netdev_class dpdk_ring_class = > > netdev_dpdk_get_features, > > netdev_dpdk_get_status, > > netdev_dpdk_reconfigure, > > - netdev_dpdk_rxq_recv); > > + netdev_dpdk_rxq_recv, > > + NULL); > > > > static const struct netdev_class dpdk_vhost_class = > > NETDEV_DPDK_CLASS( > > @@ -3358,7 +3424,8 @@ static const struct netdev_class dpdk_vhost_class > = > > NULL, > > NULL, > > netdev_dpdk_vhost_reconfigure, > > - netdev_dpdk_vhost_rxq_recv); > > + netdev_dpdk_vhost_rxq_recv, > > + NULL); > > static const struct netdev_class dpdk_vhost_client_class = > > NETDEV_DPDK_CLASS( > > "dpdkvhostuserclient", > > @@ -3373,7 +3440,8 @@ static const struct netdev_class > dpdk_vhost_client_class = > > NULL, > > NULL, > > netdev_dpdk_vhost_client_reconfigure, > > - netdev_dpdk_vhost_rxq_recv); > > + netdev_dpdk_vhost_rxq_recv, > > + NULL); > > > > void > > netdev_dpdk_register(void) > > diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c > > index e6e36cd..6a8ad45 100644 > > --- a/lib/netdev-dummy.c > > +++ b/lib/netdev-dummy.c > > @@ -1409,6 +1409,7 @@ netdev_dummy_update_flags(struct netdev *netdev_, > > netdev_dummy_rxq_recv, \ > > netdev_dummy_rxq_wait, \ > > netdev_dummy_rxq_drain, \ > > + NULL, \ > > } > > > > static const struct netdev_class dummy_class = > > diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c > > index a5a9ec1..2499b3e 100644 > > --- a/lib/netdev-linux.c > > +++ b/lib/netdev-linux.c > > @@ -2831,6 +2831,7 @@ netdev_linux_update_flags(struct netdev *netdev_, > enum netdev_flags off, > > netdev_linux_rxq_recv, \ > > netdev_linux_rxq_wait, \ > > netdev_linux_rxq_drain, \ > > + NULL, \ > > } > > > > const struct netdev_class netdev_linux_class = > > diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h > > index 8346fc4..97e72c6 100644 > > --- a/lib/netdev-provider.h > > +++ b/lib/netdev-provider.h > > @@ -335,6 +335,11 @@ struct netdev_class { > > * If the function returns a non-zero value, some of the packets > might have > > * been sent anyway. > > * > > + * Some netdev provider - like in case of 'dpdk' - may buffer the > batch > > + * of packets into an intermediate queue. Buffered packets will be > sent > > + * out when their number will exceed a threshold or by the periodic > call > > + * to the drain function. > > + * > > * If 'may_steal' is false, the caller retains ownership of all the > > * packets. If 'may_steal' is true, the caller transfers ownership > of all > > * the packets to the network device, regardless of success. > > @@ -769,6 +774,9 @@ struct netdev_class { > > > > /* Discards all packets waiting to be received from 'rx'. */ > > int (*rxq_drain)(struct netdev_rxq *rx); > > + > > + /* Drain all packets waiting to be sent on queue 'qid'. */ > > + int (*txq_drain)(struct netdev *netdev, int qid, bool > concurrent_txq); > > }; > > > > int netdev_register_provider(const struct netdev_class *); > > diff --git a/lib/netdev-vport.c b/lib/netdev-vport.c > > index 4c2ced5..77225b8 100644 > > --- a/lib/netdev-vport.c > > +++ b/lib/netdev-vport.c > > @@ -838,7 +838,8 @@ get_stats(const struct netdev *netdev, struct > netdev_stats *stats) > > NULL, /* rx_dealloc */ \ > > NULL, /* rx_recv */ \ > > NULL, /* rx_wait */ \ > > - NULL, /* rx_drain */ > > + NULL, /* rx_drain */ \ > > + NULL, /* tx_drain */ > > > > > > #define TUNNEL_CLASS(NAME, DPIF_PORT, BUILD_HEADER, PUSH_HEADER, > POP_HEADER) \ > > diff --git a/lib/netdev.c b/lib/netdev.c > > index 839b1f6..0d48e41 100644 > > --- a/lib/netdev.c > > +++ b/lib/netdev.c > > @@ -670,6 +670,15 @@ netdev_rxq_drain(struct netdev_rxq *rx) > > : 0); > > } > > > > +/* Flush packets on the queue 'qid'. */ > > +int > > +netdev_txq_drain(struct netdev *netdev, int qid, bool netdev_txq_drain) > > +{ > > + return (netdev->netdev_class->txq_drain > > + ? netdev->netdev_class->txq_drain(netdev, qid, > netdev_txq_drain) > > + : EOPNOTSUPP); > > +} > > + > > /* Configures the number of tx queues of 'netdev'. Returns 0 if > successful, > > * otherwise a positive errno value. > > * > > diff --git a/lib/netdev.h b/lib/netdev.h > > index bef9cdd..49969a1 100644 > > --- a/lib/netdev.h > > +++ b/lib/netdev.h > > @@ -153,6 +153,7 @@ int netdev_rxq_drain(struct netdev_rxq *); > > int netdev_send(struct netdev *, int qid, struct dp_packet_batch *, > > bool may_steal, bool concurrent_txq); > > void netdev_send_wait(struct netdev *, int qid); > > +int netdev_txq_drain(struct netdev *, int qid, bool concurrent_txq); > > > > /* native tunnel APIs */ > > /* Structure to pass parameters required to build a tunnel header. */ > > _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev