Thanks. Fixed.
New version here: 
http://openvswitch.org/pipermail/dev/2015-September/059895.html

Best regards, Ilya Maximets.

On 10.09.2015 21:03, Daniele Di Proietto wrote:
> Sorry for the delay.
> 
> There's still one problem with this patch:
> 
> when a non-DPDK port is added to the datapath, its txqs are not
> added to the pmd threads.
> 
> Can you confirm the issue?
> 
> Thanks
> 
> On 10/09/2015 07:52, "Ilya Maximets" <i.maxim...@samsung.com> wrote:
> 
>> Ping.
>>
>> On 02.09.2015 14:44, Ilya Maximets wrote:
>>> Currently tx_qid is equal to pmd->core_id. This leads to unexpected
>>> behavior if pmd-cpu-mask different from '/(0*)(1|3|7)?(f*)/',
>>> e.g. if core_ids are not sequential, or doesn't start from 0, or both.
>>>
>>> Example:
>>>     starting 2 pmd threads with 1 port, 2 rxqs per port,
>>>     pmd-cpu-mask = 00000014 and let dev->real_n_txq = 2
>>>
>>>     It that case pmd_1->tx_qid = 2, pmd_2->tx_qid = 4 and
>>>     txq_needs_locking = true (if device hasn't ovs_numa_get_n_cores()+1
>>>     queues).
>>>
>>>     In that case, after truncating in netdev_dpdk_send__():
>>>             'qid = qid % dev->real_n_txq;'
>>>     pmd_1: qid = 2 % 2 = 0
>>>     pmd_2: qid = 4 % 2 = 0
>>>
>>>     So, both threads will call dpdk_queue_pkts() with same qid = 0.
>>>     This is unexpected behavior if there is 2 tx queues in device.
>>>     Queue #1 will not be used and both threads will lock queue #0
>>>     on each send.
>>>
>>> Fix that by introducing per pmd thread hash map 'tx_queues', where will
>>> be stored all available tx queues for that pmd thread with
>>> port_no as a key(hash). All tx_qid-s will be unique per port and
>>> sequential to prevent described unexpected mapping after truncating.
>>>
>>> Implemented infrastructure can be used in the future to choose
>>> between all tx queues available for that pmd thread.
>>>
>>> Signed-off-by: Ilya Maximets <i.maxim...@samsung.com>
>>> ---
>>> version 4:
>>>     * fixed distribution of tx queues if multiqueue is not supported
>>>
>>> version 3:
>>>     * fixed failing of unit tests by adding tx queues of non
>>>       pmd devices to non pmd thread. (they haven't been used by any thread)
>>>     * pmd_flush_tx_queues --> dp_netdev_pmd_detach_tx_queues
>>>     * function names changed to dp_netdev_*
>>>     * dp_netdev_pmd_lookup_txq now looks by port_no.
>>>     * removed unnecessary dp_netdev_lookup_port in dp_execute_cb
>>>       for OVS_ACTION_ATTR_OUTPUT.
>>>     * refactoring
>>>
>>>  lib/dpif-netdev.c | 160
>>> +++++++++++++++++++++++++++++++++++++++++++++++-------
>>>  1 file changed, 139 insertions(+), 21 deletions(-)
>>>
>>> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
>>> index cf5b064..d430bc9 100644
>>> --- a/lib/dpif-netdev.c
>>> +++ b/lib/dpif-netdev.c
>>> @@ -371,6 +371,13 @@ struct dp_netdev_pmd_cycles {
>>>      atomic_ullong n[PMD_N_CYCLES];
>>>  };
>>>  
>>> +struct dp_netdev_pmd_txq {
>>> +    struct cmap_node node;        /* In owning dp_netdev_pmd_thread's
>>> */
>>> +                                  /* 'tx_queues'. */
>>> +    struct dp_netdev_port *port;
>>> +    int tx_qid;
>>> +};
>>> +
>>>  /* PMD: Poll modes drivers.  PMD accesses devices via polling to
>>> eliminate
>>>   * the performance overhead of interrupt processing.  Therefore netdev
>>> can
>>>   * not implement rx-wait for these devices.  dpif-netdev needs to poll
>>> @@ -426,8 +433,8 @@ struct dp_netdev_pmd_thread {
>>>                                      /* threads on same numa node. */
>>>      unsigned core_id;               /* CPU core id of this pmd thread.
>>> */
>>>      int numa_id;                    /* numa node id of this pmd
>>> thread. */
>>> -    int tx_qid;                     /* Queue id used by this pmd
>>> thread to
>>> -                                     * send packets on all netdevs */
>>> +    struct cmap tx_queues;          /* Queue ids used by this pmd
>>> thread to
>>> +                                     * send packets to ports */
>>>  
>>>      /* Only a pmd thread can write on its own 'cycles' and 'stats'.
>>>       * The main thread keeps 'stats_zero' and 'cycles_zero' as base
>>> @@ -469,6 +476,15 @@ static void dp_netdev_input(struct
>>> dp_netdev_pmd_thread *,
>>>  
>>>  static void dp_netdev_disable_upcall(struct dp_netdev *);
>>>  void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd);
>>> +static void dp_netdev_configure_non_pmd_txqs(struct
>>> dp_netdev_pmd_thread *pmd);
>>> +static void dp_netdev_pmd_add_txq(struct dp_netdev_pmd_thread *pmd,
>>> +                                  struct dp_netdev_port *port, int
>>> queue_id);
>>> +static void dp_netdev_pmd_del_txq(struct dp_netdev_pmd_thread *pmd,
>>> +                                  struct dp_netdev_pmd_txq *txq);
>>> +static void dp_netdev_pmd_detach_tx_queues(struct dp_netdev_pmd_thread
>>> *pmd);
>>> +static struct dp_netdev_pmd_txq *
>>> +dp_netdev_pmd_lookup_txq(const struct dp_netdev_pmd_thread *pmd,
>>> +                         odp_port_t port_no);
>>>  static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd,
>>>                                      struct dp_netdev *dp, int index,
>>>                                      unsigned core_id, int numa_id);
>>> @@ -1050,6 +1066,7 @@ do_add_port(struct dp_netdev *dp, const char
>>> *devname, const char *type,
>>>      struct netdev_saved_flags *sf;
>>>      struct dp_netdev_port *port;
>>>      struct netdev *netdev;
>>> +    struct dp_netdev_pmd_thread *non_pmd;
>>>      enum netdev_flags flags;
>>>      const char *open_type;
>>>      int error;
>>> @@ -1126,6 +1143,11 @@ do_add_port(struct dp_netdev *dp, const char
>>> *devname, const char *type,
>>>      ovs_refcount_init(&port->ref_cnt);
>>>      cmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
>>>  
>>> +    non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
>>> +    if (non_pmd) {
>>> +        dp_netdev_pmd_add_txq(non_pmd, port, ovs_numa_get_n_cores());
>>> +        dp_netdev_pmd_unref(non_pmd);
>>> +    }
>>>      if (netdev_is_pmd(netdev)) {
>>>          dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev));
>>>          dp_netdev_reload_pmds(dp);
>>> @@ -1307,8 +1329,21 @@ static void
>>>  do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
>>>      OVS_REQUIRES(dp->port_mutex)
>>>  {
>>> +    struct dp_netdev_pmd_thread *non_pmd;
>>> +
>>>      cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no));
>>>      seq_change(dp->port_seq);
>>> +
>>> +    non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
>>> +    if (non_pmd) {
>>> +        /* There is only one txq for each port for non pmd thread */
>>> +        struct dp_netdev_pmd_txq *txq;
>>> +        txq = dp_netdev_pmd_lookup_txq(non_pmd, port->port_no);
>>> +        if (OVS_LIKELY(txq))
>>> +            dp_netdev_pmd_del_txq(non_pmd, txq);
>>> +        dp_netdev_pmd_unref(non_pmd);
>>> +    }
>>> +
>>>      if (netdev_is_pmd(port->netdev)) {
>>>          int numa_id = netdev_get_numa_id(port->netdev);
>>>  
>>> @@ -2578,6 +2613,80 @@ dpif_netdev_wait(struct dpif *dpif)
>>>      seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq);
>>>  }
>>>  
>>> +static void
>>> +dp_netdev_pmd_add_txq(struct dp_netdev_pmd_thread *pmd,
>>> +                      struct dp_netdev_port *port, int queue_id)
>>> +{
>>> +    if (port_try_ref(port)) {
>>> +        struct dp_netdev_pmd_txq * txq = xmalloc(sizeof *txq);
>>> +        txq->port = port;
>>> +        txq->tx_qid = queue_id;
>>> +        cmap_insert(&pmd->tx_queues, &txq->node,
>>> +                        hash_port_no(port->port_no));
>>> +    }
>>> +}
>>> +
>>> +/* Configures tx_queues for non pmd thread. */
>>> +static void
>>> +dp_netdev_configure_non_pmd_txqs(struct dp_netdev_pmd_thread *pmd)
>>> +{
>>> +    if (!cmap_is_empty(&pmd->tx_queues))
>>> +        dp_netdev_pmd_detach_tx_queues(pmd);
>>> +
>>> +    struct dp_netdev_port *port;
>>> +    CMAP_FOR_EACH (port, node, &pmd->dp->ports) {
>>> +        dp_netdev_pmd_add_txq(pmd, port, ovs_numa_get_n_cores());
>>> +    }
>>> +}
>>> +
>>> +static void
>>> +dp_netdev_pmd_del_txq(struct dp_netdev_pmd_thread *pmd,
>>> +                      struct dp_netdev_pmd_txq *txq)
>>> +{
>>> +    cmap_remove(&pmd->tx_queues, &txq->node,
>>> +                hash_port_no(txq->port->port_no));
>>> +    port_unref(txq->port);
>>> +    free(txq);
>>> +}
>>> +
>>> +/* Removes all queues from 'tx_queues' of pmd thread. */
>>> +static void
>>> +dp_netdev_pmd_detach_tx_queues(struct dp_netdev_pmd_thread *pmd)
>>> +{
>>> +    struct dp_netdev_pmd_txq *txq;
>>> +
>>> +    CMAP_FOR_EACH (txq, node, &pmd->tx_queues) {
>>> +        dp_netdev_pmd_del_txq(pmd, txq);
>>> +    }
>>> +}
>>> +
>>> +static void OVS_UNUSED
>>> +dp_netdev_pmd_tx_queues_print(struct dp_netdev_pmd_thread *pmd)
>>> +{
>>> +    struct dp_netdev_pmd_txq *txq;
>>> +
>>> +    CMAP_FOR_EACH (txq, node, &pmd->tx_queues) {
>>> +        VLOG_INFO("Core_id: %d, Port: %s, tx_qid: %d\n",
>>> +                   pmd->core_id, netdev_get_name(txq->port->netdev),
>>> +                   txq->tx_qid);
>>> +    }
>>> +}
>>> +
>>> +static struct dp_netdev_pmd_txq *
>>> +dp_netdev_pmd_lookup_txq(const struct dp_netdev_pmd_thread *pmd,
>>> +                         odp_port_t port_no)
>>> +{
>>> +    struct dp_netdev_pmd_txq *txq;
>>> +
>>> +    CMAP_FOR_EACH_WITH_HASH (txq, node, hash_port_no(port_no),
>>> +                             &pmd->tx_queues) {
>>> +        if (txq->port->port_no == port_no) {
>>> +            return txq;
>>> +        }
>>> +    }
>>> +    return NULL;
>>> +}
>>> +
>>>  struct rxq_poll {
>>>      struct dp_netdev_port *port;
>>>      struct netdev_rxq *rx;
>>> @@ -2589,16 +2698,19 @@ pmd_load_queues(struct dp_netdev_pmd_thread
>>> *pmd,
>>>  {
>>>      struct rxq_poll *poll_list = *ppoll_list;
>>>      struct dp_netdev_port *port;
>>> -    int n_pmds_on_numa, index, i;
>>> +    int n_pmds_on_numa, rx_index, tx_index, i, n_txq;
>>>  
>>>      /* Simple scheduler for netdev rx polling. */
>>> +    dp_netdev_pmd_detach_tx_queues(pmd);
>>> +
>>>      for (i = 0; i < poll_cnt; i++) {
>>>          port_unref(poll_list[i].port);
>>>      }
>>>  
>>>      poll_cnt = 0;
>>>      n_pmds_on_numa = get_n_pmd_threads_on_numa(pmd->dp, pmd->numa_id);
>>> -    index = 0;
>>> +    rx_index = 0;
>>> +    tx_index = 0;
>>>  
>>>      CMAP_FOR_EACH (port, node, &pmd->dp->ports) {
>>>          /* Calls port_try_ref() to prevent the main thread
>>> @@ -2609,7 +2721,7 @@ pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
>>>                  int i;
>>>  
>>>                  for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
>>> -                    if ((index % n_pmds_on_numa) == pmd->index) {
>>> +                    if ((rx_index % n_pmds_on_numa) == pmd->index) {
>>>                          poll_list = xrealloc(poll_list,
>>>                                          sizeof *poll_list * (poll_cnt
>>> + 1));
>>>  
>>> @@ -2618,7 +2730,16 @@ pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
>>>                          poll_list[poll_cnt].rx = port->rxq[i];
>>>                          poll_cnt++;
>>>                      }
>>> -                    index++;
>>> +                    rx_index++;
>>> +                }
>>> +
>>> +                n_txq = netdev_n_txq(port->netdev);
>>> +                /* Last queue reserved for non pmd threads */
>>> +                n_txq = n_txq == 1 ? 1 : n_txq - 1;
>>> +                for (i = 0; i < n_txq; i++) {
>>> +                    if ((tx_index % n_pmds_on_numa) == pmd->index ||
>>> n_txq == 1)
>>> +                        dp_netdev_pmd_add_txq(pmd, port, i);
>>> +                    tx_index++;
>>>                  }
>>>              }
>>>              /* Unrefs the port_try_ref(). */
>>> @@ -2689,6 +2810,8 @@ reload:
>>>          goto reload;
>>>      }
>>>  
>>> +    dp_netdev_pmd_detach_tx_queues(pmd);
>>> +
>>>      for (i = 0; i < poll_cnt; i++) {
>>>           port_unref(poll_list[i].port);
>>>      }
>>> @@ -2802,16 +2925,6 @@ dp_netdev_pmd_get_next(struct dp_netdev *dp,
>>> struct cmap_position *pos)
>>>      return next;
>>>  }
>>>  
>>> -static int
>>> -core_id_to_qid(unsigned core_id)
>>> -{
>>> -    if (core_id != NON_PMD_CORE_ID) {
>>> -        return core_id;
>>> -    } else {
>>> -        return ovs_numa_get_n_cores();
>>> -    }
>>> -}
>>> -
>>>  /* Configures the 'pmd' based on the input argument. */
>>>  static void
>>>  dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct
>>> dp_netdev *dp,
>>> @@ -2820,7 +2933,6 @@ dp_netdev_configure_pmd(struct
>>> dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
>>>      pmd->dp = dp;
>>>      pmd->index = index;
>>>      pmd->core_id = core_id;
>>> -    pmd->tx_qid = core_id_to_qid(core_id);
>>>      pmd->numa_id = numa_id;
>>>  
>>>      ovs_refcount_init(&pmd->ref_cnt);
>>> @@ -2831,9 +2943,11 @@ dp_netdev_configure_pmd(struct
>>> dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
>>>      ovs_mutex_init(&pmd->flow_mutex);
>>>      dpcls_init(&pmd->cls);
>>>      cmap_init(&pmd->flow_table);
>>> -    /* init the 'flow_cache' since there is no
>>> +    cmap_init(&pmd->tx_queues);
>>> +    /* init the 'flow_cache' and 'tx_queues' since there is no
>>>       * actual thread created for NON_PMD_CORE_ID. */
>>>      if (core_id == NON_PMD_CORE_ID) {
>>> +        dp_netdev_configure_non_pmd_txqs(pmd);
>>>          emc_cache_init(&pmd->flow_cache);
>>>      }
>>>      cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *,
>>> &pmd->node),
>>> @@ -2846,6 +2960,7 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread
>>> *pmd)
>>>      dp_netdev_pmd_flow_flush(pmd);
>>>      dpcls_destroy(&pmd->cls);
>>>      cmap_destroy(&pmd->flow_table);
>>> +    cmap_destroy(&pmd->tx_queues);
>>>      ovs_mutex_destroy(&pmd->flow_mutex);
>>>      latch_destroy(&pmd->exit_latch);
>>>      xpthread_cond_destroy(&pmd->cond);
>>> @@ -2862,6 +2977,7 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct
>>> dp_netdev_pmd_thread *pmd)
>>>       * no actual thread uninit it for NON_PMD_CORE_ID. */
>>>      if (pmd->core_id == NON_PMD_CORE_ID) {
>>>          emc_cache_uninit(&pmd->flow_cache);
>>> +        dp_netdev_pmd_detach_tx_queues(pmd);
>>>      } else {
>>>          latch_set(&pmd->exit_latch);
>>>          dp_netdev_reload_pmd__(pmd);
>>> @@ -3471,13 +3587,15 @@ dp_execute_cb(void *aux_, struct dp_packet
>>> **packets, int cnt,
>>>      struct dp_netdev *dp = pmd->dp;
>>>      int type = nl_attr_type(a);
>>>      struct dp_netdev_port *p;
>>> +    struct dp_netdev_pmd_txq *txq;
>>>      int i;
>>>  
>>>      switch ((enum ovs_action_attr)type) {
>>>      case OVS_ACTION_ATTR_OUTPUT:
>>> -        p = dp_netdev_lookup_port(dp, u32_to_odp(nl_attr_get_u32(a)));
>>> -        if (OVS_LIKELY(p)) {
>>> -            netdev_send(p->netdev, pmd->tx_qid, packets, cnt,
>>> may_steal);
>>> +        txq = dp_netdev_pmd_lookup_txq(pmd,
>>> u32_to_odp(nl_attr_get_u32(a)));
>>> +        if (OVS_LIKELY(txq)) {
>>> +            netdev_send(txq->port->netdev, txq->tx_qid,
>>> +                        packets, cnt, may_steal);
>>>              return;
>>>          }
>>>          break;
>>>
> 
> 
_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to