Sorry, I agree that example is incorrect. It is really not true, because
of using ovs_numa_get_n_cores() to call netdev_set_multiq().
No, I didn't actually observe a bug.

But there is another example:

        same configuration(2 pmd threads with 1 port,
        2 rxqs per port and pmd-cpu-mask = 00000014).

        pmd_1->tx_qid = 2, pmd_2->tx_qid = 4,
        txq_needs_locking = true (if device hasn't ovs_numa_get_n_cores() 
queues)

        Lets netdev->real_n_txq = 2; (device has 2 queues)

        In that case, after truncating in netdev_dpdk_send__()
                'qid = qid % dev->real_n_txq;'
        pmd_1: qid = 2 % 2 = 0
        pmd_2: qid = 4 % 2 = 0

        So, both threads will call dpdk_queue_pkts() with same qid = 0.
        This is unexpected behavior if there is 2 tx queues in device.
        Queue #1 will not be used and both threads will lock queue #0
        on each send.

About your example:
        2 pmd threads can't call netdev_send() with same tx_qid,
        because pmd->tx_qid = pmd->core_id and there is only one thread
        with core_id = 0. See dp_netdev_configure_pmd().

        So,
        pmd1 will call netdev_send(netdev=dpdk0, tx_qid= *pmd1->core_id* )
        pmd2 will call netdev_send(netdev=dpdk0, tx_qid= *pmd2->core_id* )

        
On 05.08.2015 17:54, Daniele Di Proietto wrote:
> 
> 
> On 05/08/2015 13:28, "Ilya Maximets" <i.maxim...@samsung.com> wrote:
> 
>> Currently tx_qid is equal to pmd->core_id. This leads to wrong
>> behavior if pmd-cpu-mask different from '/(0*)(1|3|7)?(f*)/',
>> e.g. if core_ids are not sequential, or doesn't start from 0, or both.
>>
>> Example(just one of possible wrong scenarios):
>>
>>      starting 2 pmd threads with 1 port, 2 rxqs per port and
>>      pmd-cpu-mask = 00000014.
>>
>>      It that case pmd_1->tx_qid = 2, pmd_2->tx_qid = 4 and
>>      txq_needs_locking = false (if device has 2 queues).
>>
>>      While netdev_dpdk_send__() qid will not be truncated and
>>      dpdk_queue_pkts() will be called for nonexistent queues (2 and 4).
> 
> This shouldn't be possible, because the datapath requests one txq
> for each core in the system, not for each core in pmd-cpu-mask
> (see the calls to netdev_set_multiq() in dpif-netdev.c).
> 
> Did you actually observe a bug or an unexpected behaviour?
> 
> I didn't read the patch carefully (I want to understand the problem first),
> but it appears that two pmd threads could call netdev_send on the same
> port, with the same tx_qid concurrently.  Example:
> 
>     pmd1 is processing dpdk0 with rx_qid 0, pmd2 is processing dpdk1 with
>     rx_qid 0.
> 
>     The flow table is configured to send everything to dpdk0.
> 
>     pmd1 will call netdev_send(netdev=dpdk0, tx_qid=0)
>     pmd2 will call netdev_send(netdev=dpdk0, tx_qid=0)
>     
>     these calls can happen concurrently
> 
>>
>> Fix that by calculating tx_qid from rxq indexes for each rxq separately.
>> 'rxq_poll' structure supplemented by tx_qid and renamed to 'q_poll'.
>> 'poll_list' moved inside dp_netdev_pmd_thread structure to be able
>> to get proper tx_qid for current port while calling netdev_send().
>> Also, information about queues of each thread added to log.
>>
>> Signed-off-by: Ilya Maximets <i.maxim...@samsung.com>
>> ---
>> lib/dpif-netdev.c | 102
>> ++++++++++++++++++++++++++----------------------------
>> lib/netdev.c      |   6 ++++
>> lib/netdev.h      |   1 +
>> 3 files changed, 57 insertions(+), 52 deletions(-)
>>
>> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
>> index 83e55e7..03af4bf 100644
>> --- a/lib/dpif-netdev.c
>> +++ b/lib/dpif-netdev.c
>> @@ -365,6 +365,13 @@ struct dp_netdev_pmd_cycles {
>>     atomic_ullong n[PMD_N_CYCLES];
>> };
>>
>> +/* Contained by struct dp_netdev_pmd_thread's 'poll_list' member.  */
>> +struct q_poll {
>> +    struct dp_netdev_port *port;
>> +    struct netdev_rxq *rx;
>> +    unsigned tx_qid;
>> +};
>> +
>> /* PMD: Poll modes drivers.  PMD accesses devices via polling to
>> eliminate
>>  * the performance overhead of interrupt processing.  Therefore netdev
>> can
>>  * not implement rx-wait for these devices.  dpif-netdev needs to poll
>> @@ -420,8 +427,10 @@ struct dp_netdev_pmd_thread {
>>                                     /* threads on same numa node. */
>>     unsigned core_id;               /* CPU core id of this pmd thread. */
>>     int numa_id;                    /* numa node id of this pmd thread.
>> */
>> -    int tx_qid;                     /* Queue id used by this pmd thread
>> to
>> -                                     * send packets on all netdevs */
>> +
>> +    struct q_poll *poll_list;       /* List of queues polling by this
>> pmd */
>> +    unsigned int poll_cnt;          /* Number of queues in poll_list */
>> +    unsigned int poll_cur;          /* Index of current queue in
>> poll_list */
>>
>>     /* Only a pmd thread can write on its own 'cycles' and 'stats'.
>>      * The main thread keeps 'stats_zero' and 'cycles_zero' as base
>> @@ -2624,25 +2633,18 @@ dpif_netdev_wait(struct dpif *dpif)
>>     seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq);
>> }
>>
>> -struct rxq_poll {
>> -    struct dp_netdev_port *port;
>> -    struct netdev_rxq *rx;
>> -};
>> -
>> -static int
>> -pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
>> -                struct rxq_poll **ppoll_list, int poll_cnt)
>> +static void
>> +pmd_load_queues(struct dp_netdev_pmd_thread *pmd)
>> {
>> -    struct rxq_poll *poll_list = *ppoll_list;
>>     struct dp_netdev_port *port;
>> -    int n_pmds_on_numa, index, i;
>> +    int n_pmds_on_numa, n_txqs, index, i;
>>
>>     /* Simple scheduler for netdev rx polling. */
>> -    for (i = 0; i < poll_cnt; i++) {
>> -        port_unref(poll_list[i].port);
>> +    for (i = 0; i < pmd->poll_cnt; i++) {
>> +        port_unref(pmd->poll_list[i].port);
>>     }
>>
>> -    poll_cnt = 0;
>> +    pmd->poll_cnt = 0;
>>     n_pmds_on_numa = get_n_pmd_threads_on_numa(pmd->dp, pmd->numa_id);
>>     index = 0;
>>
>> @@ -2652,17 +2654,18 @@ pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
>>         if (port_try_ref(port)) {
>>             if (netdev_is_pmd(port->netdev)
>>                 && netdev_get_numa_id(port->netdev) == pmd->numa_id) {
>> -                int i;
>> +                n_txqs = netdev_n_txq(port->netdev);
>>
>>                 for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
>>                     if ((index % n_pmds_on_numa) == pmd->index) {
>> -                        poll_list = xrealloc(poll_list,
>> -                                        sizeof *poll_list * (poll_cnt +
>> 1));
>> +                        pmd->poll_list = xrealloc(pmd->poll_list,
>> +                                sizeof *pmd->poll_list * (pmd->poll_cnt
>> + 1));
>>
>>                         port_ref(port);
>> -                        poll_list[poll_cnt].port = port;
>> -                        poll_list[poll_cnt].rx = port->rxq[i];
>> -                        poll_cnt++;
>> +                        pmd->poll_list[pmd->poll_cnt].port = port;
>> +                        pmd->poll_list[pmd->poll_cnt].rx = port->rxq[i];
>> +                        pmd->poll_list[pmd->poll_cnt].tx_qid = i %
>> n_txqs;
>> +                        pmd->poll_cnt++;
>>                     }
>>                     index++;
>>                 }
>> @@ -2671,9 +2674,6 @@ pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
>>             port_unref(port);
>>         }
>>     }
>> -
>> -    *ppoll_list = poll_list;
>> -    return poll_cnt;
>> }
>>
>> static void *
>> @@ -2681,24 +2681,23 @@ pmd_thread_main(void *f_)
>> {
>>     struct dp_netdev_pmd_thread *pmd = f_;
>>     unsigned int lc = 0;
>> -    struct rxq_poll *poll_list;
>>     unsigned int port_seq = PMD_INITIAL_SEQ;
>> -    int poll_cnt;
>>     int i;
>>
>> -    poll_cnt = 0;
>> -    poll_list = NULL;
>> -
>>     /* Stores the pmd thread's 'pmd' to 'per_pmd_key'. */
>>     ovsthread_setspecific(pmd->dp->per_pmd_key, pmd);
>>     pmd_thread_setaffinity_cpu(pmd->core_id);
>> reload:
>>     emc_cache_init(&pmd->flow_cache);
>> -    poll_cnt = pmd_load_queues(pmd, &poll_list, poll_cnt);
>> +    pmd_load_queues(pmd);
>>
>> -    /* List port/core affinity */
>> -    for (i = 0; i < poll_cnt; i++) {
>> -       VLOG_INFO("Core %d processing port \'%s\'\n", pmd->core_id,
>> netdev_get_name(poll_list[i].port->netdev));
>> +    /* List port/core affinity and queues */
>> +    for (i = 0; i < pmd->poll_cnt; i++) {
>> +       VLOG_INFO("Core %d processing port \'%s\' "
>> +                 "with rx_qid = %d, tx_qid = %d\n",
>> +                 pmd->core_id,
>> netdev_get_name(pmd->poll_list[i].port->netdev),
>> +                 netdev_rxq_get_queue_id(pmd->poll_list[i].rx),
>> +                 pmd->poll_list[i].tx_qid);
>>     }
>>
>>     /* Signal here to make sure the pmd finishes
>> @@ -2708,8 +2707,10 @@ reload:
>>     for (;;) {
>>         int i;
>>
>> -        for (i = 0; i < poll_cnt; i++) {
>> -            dp_netdev_process_rxq_port(pmd, poll_list[i].port,
>> poll_list[i].rx);
>> +        for (i = 0; i < pmd->poll_cnt; i++) {
>> +            pmd->poll_cur = i;
>> +            dp_netdev_process_rxq_port(pmd, pmd->poll_list[i].port,
>> +                                            pmd->poll_list[i].rx);
>>         }
>>
>>         if (lc++ > 1024) {
>> @@ -2734,13 +2735,13 @@ reload:
>>         goto reload;
>>     }
>>
>> -    for (i = 0; i < poll_cnt; i++) {
>> -         port_unref(poll_list[i].port);
>> +    for (i = 0; i < pmd->poll_cnt; i++) {
>> +         port_unref(pmd->poll_list[i].port);
>>     }
>>
>>     dp_netdev_pmd_reload_done(pmd);
>>
>> -    free(poll_list);
>> +    free(pmd->poll_list);
>>     return NULL;
>> }
>>
>> @@ -2847,16 +2848,6 @@ dp_netdev_pmd_get_next(struct dp_netdev *dp,
>> struct cmap_position *pos)
>>     return next;
>> }
>>
>> -static int
>> -core_id_to_qid(unsigned core_id)
>> -{
>> -    if (core_id != NON_PMD_CORE_ID) {
>> -        return core_id;
>> -    } else {
>> -        return ovs_numa_get_n_cores();
>> -    }
>> -}
>> -
>> /* Configures the 'pmd' based on the input argument. */
>> static void
>> dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct
>> dp_netdev *dp,
>> @@ -2865,7 +2856,8 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread
>> *pmd, struct dp_netdev *dp,
>>     pmd->dp = dp;
>>     pmd->index = index;
>>     pmd->core_id = core_id;
>> -    pmd->tx_qid = core_id_to_qid(core_id);
>> +    pmd->poll_cnt = 0;
>> +    pmd->poll_list = NULL;
>>     pmd->numa_id = numa_id;
>>
>>     ovs_refcount_init(&pmd->ref_cnt);
>> @@ -2876,9 +2868,13 @@ dp_netdev_configure_pmd(struct
>> dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
>>     ovs_mutex_init(&pmd->flow_mutex);
>>     dpcls_init(&pmd->cls);
>>     cmap_init(&pmd->flow_table);
>> -    /* init the 'flow_cache' since there is no
>> +    /* init the 'flow_cache' and 'poll_list' since there is no
>>      * actual thread created for NON_PMD_CORE_ID. */
>>     if (core_id == NON_PMD_CORE_ID) {
>> +        pmd->poll_list = xmalloc(sizeof *pmd->poll_list);
>> +        pmd->poll_list[0].tx_qid = ovs_numa_get_n_cores();
>> +        pmd->poll_cnt = 1;
>> +        pmd->poll_cur = 0;
>>         emc_cache_init(&pmd->flow_cache);
>>     }
>>     cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *,
>> &pmd->node),
>> @@ -2903,9 +2899,10 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread
>> *pmd)
>> static void
>> dp_netdev_del_pmd(struct dp_netdev_pmd_thread *pmd)
>> {
>> -    /* Uninit the 'flow_cache' since there is
>> +    /* Uninit the 'flow_cache' and 'poll_list' since there is
>>      * no actual thread uninit it for NON_PMD_CORE_ID. */
>>     if (pmd->core_id == NON_PMD_CORE_ID) {
>> +        free(pmd->poll_list);
>>         emc_cache_uninit(&pmd->flow_cache);
>>     } else {
>>         latch_set(&pmd->exit_latch);
>> @@ -3456,7 +3453,8 @@ dp_execute_cb(void *aux_, struct dp_packet
>> **packets, int cnt,
>>     case OVS_ACTION_ATTR_OUTPUT:
>>         p = dp_netdev_lookup_port(dp, u32_to_odp(nl_attr_get_u32(a)));
>>         if (OVS_LIKELY(p)) {
>> -            netdev_send(p->netdev, pmd->tx_qid, packets, cnt, may_steal);
>> +            netdev_send(p->netdev, pmd->poll_list[pmd->poll_cur].tx_qid,
>> +                                   packets, cnt, may_steal);
>>             return;
>>         }
>>         break;
>> diff --git a/lib/netdev.c b/lib/netdev.c
>> index 4819ae9..c292f83 100644
>> --- a/lib/netdev.c
>> +++ b/lib/netdev.c
>> @@ -1800,6 +1800,12 @@ netdev_rxq_get_name(const struct netdev_rxq *rx)
>>     return netdev_get_name(netdev_rxq_get_netdev(rx));
>> }
>>
>> +unsigned int
>> +netdev_rxq_get_queue_id(const struct netdev_rxq *rx)
>> +{
>> +    return rx->queue_id;
>> +}
>> +
>> static void
>> restore_all_flags(void *aux OVS_UNUSED)
>> {
>> diff --git a/lib/netdev.h b/lib/netdev.h
>> index 9d412ee..212cbe2 100644
>> --- a/lib/netdev.h
>> +++ b/lib/netdev.h
>> @@ -174,6 +174,7 @@ int netdev_rxq_open(struct netdev *, struct
>> netdev_rxq **, int id);
>> void netdev_rxq_close(struct netdev_rxq *);
>>
>> const char *netdev_rxq_get_name(const struct netdev_rxq *);
>> +unsigned int netdev_rxq_get_queue_id(const struct netdev_rxq *);
>>
>> int netdev_rxq_recv(struct netdev_rxq *rx, struct dp_packet **buffers,
>>                     int *cnt);
>> -- 
>> 2.1.4
>>
> 
> 
_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to