Thanks for the patch I think the caller of dp_netdev_execute_actions() should always pass a valid timestamp. We can pass it from aux->now to dp_execute_userspace_actions(), we can add it to fast_path_processing() so that it can be passed down to handle_packet_upcall(). In the other cases it's fine to call time_msec(), we're in the slow path anyway.
One more thing: I think we should avoid XPS entirely if there are enough txqs, to avoid any possible locks and even writing tx->last_used. Thanks, Daniele On 13/07/2016 05:34, "Ilya Maximets" <i.maxim...@samsung.com> wrote: >If CPU number in pmd-cpu-mask is not divisible by the number of queues and >in a few more complex situations there may be unfair distribution of TX >queue-ids between PMD threads. > >For example, if we have 2 ports with 4 queues and 6 CPUs in pmd-cpu-mask >such distribution is possible: ><------------------------------------------------------------------------> >pmd thread numa_id 0 core_id 13: > port: vhost-user1 queue-id: 1 > port: dpdk0 queue-id: 3 >pmd thread numa_id 0 core_id 14: > port: vhost-user1 queue-id: 2 >pmd thread numa_id 0 core_id 16: > port: dpdk0 queue-id: 0 >pmd thread numa_id 0 core_id 17: > port: dpdk0 queue-id: 1 >pmd thread numa_id 0 core_id 12: > port: vhost-user1 queue-id: 0 > port: dpdk0 queue-id: 2 >pmd thread numa_id 0 core_id 15: > port: vhost-user1 queue-id: 3 ><------------------------------------------------------------------------> > >As we can see above dpdk0 port polled by threads on cores: > 12, 13, 16 and 17. > >By design of dpif-netdev, there is only one TX queue-id assigned to each >pmd thread. This queue-id's are sequential similar to core-id's. And >thread will send packets to queue with exact this queue-id regardless >of port. > >In previous example: > > pmd thread on core 12 will send packets to tx queue 0 > pmd thread on core 13 will send packets to tx queue 1 > ... > pmd thread on core 17 will send packets to tx queue 5 > >So, for dpdk0 port after truncating in netdev-dpdk: > > core 12 --> TX queue-id 0 % 4 == 0 > core 13 --> TX queue-id 1 % 4 == 1 > core 16 --> TX queue-id 4 % 4 == 0 > core 17 --> TX queue-id 5 % 4 == 1 > >As a result only 2 of 4 queues used. > >To fix this issue some kind of XPS implemented in following way: > > * TX queue-ids are allocated dynamically. > * When PMD thread first time tries to send packets to new port > it allocates less used TX queue for this port. > * PMD threads periodically performes revalidation of > allocated TX queue-ids. If queue wasn't used in last > XPS_TIMEOUT_MS milliseconds it will be freed while revalidation. > >Reported-by: Zhihong Wang <zhihong.w...@intel.com> >Signed-off-by: Ilya Maximets <i.maxim...@samsung.com> >--- > lib/dpif-netdev.c | 170 +++++++++++++++++++++++++++++++++++++----------------- > 1 file changed, 117 insertions(+), 53 deletions(-) > >diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c >index e0107b7..6345944 100644 >--- a/lib/dpif-netdev.c >+++ b/lib/dpif-netdev.c >@@ -248,6 +248,8 @@ enum pmd_cycles_counter_type { > PMD_N_CYCLES > }; > >+#define XPS_TIMEOUT_MS 500LL >+ > /* A port in a netdev-based datapath. */ > struct dp_netdev_port { > odp_port_t port_no; >@@ -256,6 +258,8 @@ struct dp_netdev_port { > struct netdev_saved_flags *sf; > unsigned n_rxq; /* Number of elements in 'rxq' */ > struct netdev_rxq **rxq; >+ unsigned *txq_used; /* Number of threads that uses each tx queue. >*/ >+ struct ovs_mutex txq_used_mutex; > char *type; /* Port type as requested by user. */ > }; > >@@ -384,8 +388,9 @@ struct rxq_poll { > > /* Contained by struct dp_netdev_pmd_thread's 'port_cache' or 'tx_ports'. */ > struct tx_port { >- odp_port_t port_no; >- struct netdev *netdev; >+ struct dp_netdev_port *port; >+ int qid; >+ long long last_used; > struct hmap_node node; > }; > >@@ -498,7 +503,8 @@ static void dp_netdev_execute_actions(struct >dp_netdev_pmd_thread *pmd, > struct dp_packet_batch *, > bool may_steal, > const struct nlattr *actions, >- size_t actions_len); >+ size_t actions_len, >+ long long now); > static void dp_netdev_input(struct dp_netdev_pmd_thread *, > struct dp_packet_batch *, odp_port_t port_no); > static void dp_netdev_recirculate(struct dp_netdev_pmd_thread *, >@@ -541,6 +547,12 @@ static void dp_netdev_pmd_flow_flush(struct >dp_netdev_pmd_thread *pmd); > static void pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd) > OVS_REQUIRES(pmd->port_mutex); > >+static void >+dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd, >+ long long now, bool purge); >+static int dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd, >+ struct tx_port *tx, long long now); >+ > static inline bool emc_entry_alive(struct emc_entry *ce); > static void emc_clear_entry(struct emc_entry *ce); > >@@ -1185,7 +1197,9 @@ port_create(const char *devname, const char *open_type, >const char *type, > port->netdev = netdev; > port->n_rxq = netdev_n_rxq(netdev); > port->rxq = xcalloc(port->n_rxq, sizeof *port->rxq); >+ port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used); > port->type = xstrdup(type); >+ ovs_mutex_init(&port->txq_used_mutex); > > for (i = 0; i < port->n_rxq; i++) { > error = netdev_rxq_open(netdev, &port->rxq[i], i); >@@ -1211,7 +1225,9 @@ out_rxq_close: > for (i = 0; i < n_open_rxqs; i++) { > netdev_rxq_close(port->rxq[i]); > } >+ ovs_mutex_destroy(&port->txq_used_mutex); > free(port->type); >+ free(port->txq_used); > free(port->rxq); > free(port); > >@@ -1351,7 +1367,8 @@ port_destroy(struct dp_netdev_port *port) > for (unsigned i = 0; i < port->n_rxq; i++) { > netdev_rxq_close(port->rxq[i]); > } >- >+ ovs_mutex_destroy(&port->txq_used_mutex); >+ free(port->txq_used); > free(port->rxq); > free(port->type); > free(port); >@@ -1374,13 +1391,6 @@ get_port_by_name(struct dp_netdev *dp, > } > > static int >-get_n_pmd_threads(struct dp_netdev *dp) >-{ >- /* There is one non pmd thread in dp->poll_threads */ >- return cmap_count(&dp->poll_threads) - 1; >-} >- >-static int > get_n_pmd_threads_on_numa(struct dp_netdev *dp, int numa_id) > { > struct dp_netdev_pmd_thread *pmd; >@@ -2476,7 +2486,7 @@ dpif_netdev_execute(struct dpif *dpif, struct >dpif_execute *execute) > > packet_batch_init_packet(&pp, execute->packet); > dp_netdev_execute_actions(pmd, &pp, false, execute->actions, >- execute->actions_len); >+ execute->actions_len, 0); > > if (pmd->core_id == NON_PMD_CORE_ID) { > ovs_mutex_unlock(&dp->non_pmd_mutex); >@@ -2660,6 +2670,10 @@ port_reconfigure(struct dp_netdev_port *port) > } > /* If the netdev_reconfigure() above succeeds, reopens the 'rxq's. */ > port->rxq = xrealloc(port->rxq, sizeof *port->rxq * netdev_n_rxq(netdev)); >+ /* Realloc 'used' counters for tx queues. */ >+ free(port->txq_used); >+ port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used); >+ > for (i = 0; i < netdev_n_rxq(netdev); i++) { > err = netdev_rxq_open(netdev, &port->rxq[i], i); > if (err) { >@@ -2737,6 +2751,7 @@ dpif_netdev_run(struct dpif *dpif) > } > } > } >+ dpif_netdev_xps_revalidate_pmd(non_pmd, time_msec(), false); > ovs_mutex_unlock(&dp->non_pmd_mutex); > > dp_netdev_pmd_unref(non_pmd); >@@ -2786,6 +2801,9 @@ pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd) > { > struct tx_port *tx_port_cached; > >+ /* Free all used tx queue ids. */ >+ dpif_netdev_xps_revalidate_pmd(pmd, 0, true); >+ > HMAP_FOR_EACH_POP (tx_port_cached, node, &pmd->port_cache) { > free(tx_port_cached); > } >@@ -2805,7 +2823,7 @@ pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd) > HMAP_FOR_EACH (tx_port, node, &pmd->tx_ports) { > tx_port_cached = xmemdup(tx_port, sizeof *tx_port_cached); > hmap_insert(&pmd->port_cache, &tx_port_cached->node, >- hash_port_no(tx_port_cached->port_no)); >+ hash_port_no(tx_port_cached->port->port_no)); > } > } > >@@ -3021,11 +3039,6 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread >*pmd, struct dp_netdev *dp, > pmd->numa_id = numa_id; > pmd->poll_cnt = 0; > >- atomic_init(&pmd->tx_qid, >- (core_id == NON_PMD_CORE_ID) >- ? ovs_numa_get_n_cores() >- : get_n_pmd_threads(dp)); >- > ovs_refcount_init(&pmd->ref_cnt); > latch_init(&pmd->exit_latch); > atomic_init(&pmd->change_seq, PMD_INITIAL_SEQ); >@@ -3116,18 +3129,16 @@ dp_netdev_destroy_all_pmds(struct dp_netdev *dp) > free(pmd_list); > } > >-/* Deletes all pmd threads on numa node 'numa_id' and >- * fixes tx_qids of other threads to keep them sequential. */ >+/* Deletes all pmd threads on numa node 'numa_id'. */ > static void > dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id) > { > struct dp_netdev_pmd_thread *pmd; >- int n_pmds_on_numa, n_pmds; >- int *free_idx, k = 0; >+ int n_pmds_on_numa; >+ int k = 0; > struct dp_netdev_pmd_thread **pmd_list; > > n_pmds_on_numa = get_n_pmd_threads_on_numa(dp, numa_id); >- free_idx = xcalloc(n_pmds_on_numa, sizeof *free_idx); > pmd_list = xcalloc(n_pmds_on_numa, sizeof *pmd_list); > > CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { >@@ -3135,7 +3146,6 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int >numa_id) > * 'dp->poll_threads' (while we're iterating it) and it > * might quiesce. */ > if (pmd->numa_id == numa_id) { >- atomic_read_relaxed(&pmd->tx_qid, &free_idx[k]); > pmd_list[k] = pmd; > ovs_assert(k < n_pmds_on_numa); > k++; >@@ -3146,21 +3156,7 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int >numa_id) > dp_netdev_del_pmd(dp, pmd_list[i]); > } > >- n_pmds = get_n_pmd_threads(dp); >- CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { >- int old_tx_qid; >- >- atomic_read_relaxed(&pmd->tx_qid, &old_tx_qid); >- >- if (old_tx_qid >= n_pmds) { >- int new_tx_qid = free_idx[--k]; >- >- atomic_store_relaxed(&pmd->tx_qid, new_tx_qid); >- } >- } >- > free(pmd_list); >- free(free_idx); > } > > /* Deletes all rx queues from pmd->poll_list and all the ports from >@@ -3188,7 +3184,7 @@ tx_port_lookup(const struct hmap *hmap, odp_port_t >port_no) > struct tx_port *tx; > > HMAP_FOR_EACH_IN_BUCKET (tx, node, hash_port_no(port_no), hmap) { >- if (tx->port_no == port_no) { >+ if (tx->port->port_no == port_no) { > return tx; > } > } >@@ -3313,11 +3309,11 @@ dp_netdev_add_port_tx_to_pmd(struct >dp_netdev_pmd_thread *pmd, > { > struct tx_port *tx = xzalloc(sizeof *tx); > >- tx->netdev = port->netdev; >- tx->port_no = port->port_no; >+ tx->port = port; >+ tx->qid = -1; > > ovs_mutex_lock(&pmd->port_mutex); >- hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port_no)); >+ hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no)); > ovs_mutex_unlock(&pmd->port_mutex); > } > >@@ -3658,7 +3654,7 @@ packet_batch_per_flow_execute(struct >packet_batch_per_flow *batch, > actions = dp_netdev_flow_get_actions(flow); > > dp_netdev_execute_actions(pmd, &batch->array, true, >- actions->actions, actions->size); >+ actions->actions, actions->size, now); > } > > static inline void >@@ -3785,7 +3781,7 @@ handle_packet_upcall(struct dp_netdev_pmd_thread *pmd, >struct dp_packet *packet, > * we'll send the packet up twice. */ > packet_batch_init_packet(&b, packet); > dp_netdev_execute_actions(pmd, &b, true, >- actions->data, actions->size); >+ actions->data, actions->size, 0); > > add_actions = put_actions->size ? put_actions : actions; > if (OVS_LIKELY(error != ENOSPC)) { >@@ -3954,6 +3950,7 @@ dp_netdev_recirculate(struct dp_netdev_pmd_thread *pmd, > > struct dp_netdev_execute_aux { > struct dp_netdev_pmd_thread *pmd; >+ long long now; > }; > > static void >@@ -3974,6 +3971,74 @@ dpif_netdev_register_upcall_cb(struct dpif *dpif, >upcall_callback *cb, > dp->upcall_cb = cb; > } > >+static void >+dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd, >+ long long now, bool purge) >+{ >+ struct tx_port *tx; >+ struct dp_netdev_port *port; >+ long long interval; >+ >+ HMAP_FOR_EACH (tx, node, &pmd->port_cache) { >+ interval = now - tx->last_used; >+ if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT_MS)) { >+ port = tx->port; >+ ovs_mutex_lock(&port->txq_used_mutex); >+ port->txq_used[tx->qid]--; >+ ovs_mutex_unlock(&port->txq_used_mutex); >+ tx->qid = -1; >+ } >+ } >+} >+ >+static int >+dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd, >+ struct tx_port *tx, long long now) >+{ >+ struct dp_netdev_port *port; >+ long long interval; >+ int i, min_cnt, min_qid; >+ >+ if (OVS_UNLIKELY(!now)) { >+ now = time_msec(); >+ } >+ >+ interval = now - tx->last_used; >+ tx->last_used = now; >+ >+ if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT_MS)) { >+ return tx->qid; >+ } >+ >+ port = tx->port; >+ >+ ovs_mutex_lock(&port->txq_used_mutex); >+ if (tx->qid >= 0) { >+ port->txq_used[tx->qid]--; >+ tx->qid = -1; >+ } >+ >+ min_cnt = -1; >+ min_qid = 0; >+ for (i = 0; i < netdev_n_txq(port->netdev); i++) { >+ if (port->txq_used[i] < min_cnt || min_cnt == -1) { >+ min_cnt = port->txq_used[i]; >+ min_qid = i; >+ } >+ } >+ >+ port->txq_used[min_qid]++; >+ tx->qid = min_qid; >+ >+ ovs_mutex_unlock(&port->txq_used_mutex); >+ >+ dpif_netdev_xps_revalidate_pmd(pmd, now, false); >+ >+ VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.", >+ pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev)); >+ return min_qid; >+} >+ > static struct tx_port * > pmd_tx_port_cache_lookup(const struct dp_netdev_pmd_thread *pmd, > odp_port_t port_no) >@@ -3997,7 +4062,7 @@ push_tnl_action(const struct dp_netdev_pmd_thread *pmd, > err = -EINVAL; > goto error; > } >- err = netdev_push_header(tun_port->netdev, batch, data); >+ err = netdev_push_header(tun_port->port->netdev, batch, data); > if (!err) { > return 0; > } >@@ -4024,7 +4089,7 @@ dp_execute_userspace_action(struct dp_netdev_pmd_thread >*pmd, > if (!error || error == ENOSPC) { > packet_batch_init_packet(&b, packet); > dp_netdev_execute_actions(pmd, &b, may_steal, >- actions->data, actions->size); >+ actions->data, actions->size, 0); > } else if (may_steal) { > dp_packet_delete(packet); > } >@@ -4045,11 +4110,9 @@ dp_execute_cb(void *aux_, struct dp_packet_batch >*packets_, > case OVS_ACTION_ATTR_OUTPUT: > p = pmd_tx_port_cache_lookup(pmd, u32_to_odp(nl_attr_get_u32(a))); > if (OVS_LIKELY(p)) { >- int tx_qid; >- >- atomic_read_relaxed(&pmd->tx_qid, &tx_qid); >+ int tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p, aux->now); > >- netdev_send(p->netdev, tx_qid, packets_, may_steal); >+ netdev_send(p->port->netdev, tx_qid, packets_, may_steal); > return; > } > break; >@@ -4096,7 +4159,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch >*packets_, > > dp_packet_batch_apply_cutlen(packets_); > >- netdev_pop_header(p->netdev, packets_); >+ netdev_pop_header(p->port->netdev, packets_); > if (!packets_->count) { > return; > } >@@ -4210,9 +4273,10 @@ static void > dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd, > struct dp_packet_batch *packets, > bool may_steal, >- const struct nlattr *actions, size_t actions_len) >+ const struct nlattr *actions, size_t actions_len, >+ long long now) > { >- struct dp_netdev_execute_aux aux = { pmd }; >+ struct dp_netdev_execute_aux aux = { pmd, now }; > > odp_execute_actions(&aux, packets, may_steal, actions, > actions_len, dp_execute_cb); >-- >2.7.4 > _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev