Hi Huang,
The (!match) case of my suggested code was wrong. To possibly save you some 
unnecessary troubleshooting I fixed that and made some other simplifications 
that became possible.

///jon


-----Original Message-----
From: Jon Maloy 
Sent: 31-Jan-19 16:05
To: 'Hoang Le' <[email protected]>; [email protected]; 
[email protected]; [email protected]
Subject: RE: [net-next] tipc: smooth change between replicast and broadcast

Hi Hoang,
Nice job, but still a few things to improve. See below.

> -----Original Message-----
> From: Hoang Le <[email protected]>
> Sent: 29-Jan-19 04:22
> To: Jon Maloy <[email protected]>; [email protected]; 
> [email protected]; [email protected]
> Subject: [net-next] tipc: smooth change between replicast and 

[...]

void tipc_mcast_filter_msg(struct sk_buff_head *defq,
                           struct sk_buff_head *inputq)
{
        struct sk_buff *skb, *_skb, *tmp;
        struct tipc_msg *hdr, *_hdr;
        bool match = false;
        u32 node, port;

        skb = skb_peek(inputq);
        hdr = buf_msg(skb);
        if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq)))
                return;

        node =  msg_orignode(hdr);
        port = msg_origport(hdr);

        /* Has the twin SYN message already arrived ? */
        skb_queue_walk(defq, _skb) {
                _hdr = buf_msg(_skb);
                if (msg_orignode(_hdr) != node)
                        continue;
                if (msg_origport(_hdr) != port)
                        continue;
                match = true;
                break;
        }

        if (!match) {
                if (!msg_is_syn(hdr))
                        return;
                __skb_dequeue(inputq);
                __skb_queue_tail(defq, skb);
                return;
        }

        if (!msg_is_syn(_hdr)) {
                pr_warn_ratelimited("Non-sync mcast heads deferred queue\n");
                __skb_queue_purge(defq);
                return;
        }

        /* Deliver non-SYN message from other link, otherewise queue it */
        if (!msg_is_syn(hdr)) {
                if (msg_is_rcast(hdr) != msg_is_rcast(_hdr))
                        return;
                __skb_dequeue(inputq);
                __skb_queue_tail(defq, skb);
                return;
        }
        /* Matching SYN messages => return the one with data, if any */
        __skb_unlink(_skb, defq);
        if (msg_data_sz(hdr)) {
                kfree_skb(_skb);
        } else {
                __skb_dequeue(inputq);
                kfree_skb(skb);
                __skb_queue_tail(inputq, _skb);
        }
        /* Deliver subsequent non-SYN messages from same peer */
        skb_queue_walk_safe(defq, _skb, tmp) {
                _hdr = buf_msg(_skb);
                if (msg_orignode(_hdr) != node)
                        continue;
                if (msg_origport(_hdr) != port)
                        continue;
                if (msg_is_syn(_hdr))
                        break;
                __skb_unlink(_skb, defq);
                __skb_queue_tail(inputq, _skb);
        }
}

> diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index
> 751530ab0c49..165d88a503e4 100644
> --- a/net/tipc/bcast.h
> +++ b/net/tipc/bcast.h
> @@ -63,11 +63,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 
> node);
>  /* Cookie to be used between socket and broadcast layer
>   * @rcast: replicast (instead of broadcast) was used at previous xmit
>   * @mandatory: broadcast/replicast indication was set by user
> + * @deferredq: defer queue to make message in order
>   * @expires: re-evaluate non-mandatory transmit method if we are past this
>   */

[...]

> @@ -383,6 +383,11 @@ static struct tipc_node *tipc_node_create(struct 
> net *net, u32 addr,
>                               tipc_link_update_caps(l, capabilities);
>               }
>               write_unlock_bh(&n->lock);
> +             /* Calculate cluster capabilities */
> +             tn->capabilities = TIPC_NODE_CAPABILITIES;
> +             list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
> +                     tn->capabilities &= temp_node->capabilities;
> +             }

Yes, you are right here. During a cluster upgrade a node can come back with new 
capabilities which also must be reflected in the cluster capabilities field.
Actually, I think it would be a good idea to add cluster capabilities as a 
separate patch. This makes this rather complex patch slightly smaller.

>               goto exit;
>       }
>       n = kzalloc(sizeof(*n), GFP_ATOMIC); @@ -433,6 +438,11 @@ static 
> struct tipc_node *tipc_node_create(struct net *net, u32 addr,
>                       break;

[...]

> 
> @@ -817,6 +819,11 @@ static int tipc_sendmcast(struct  socket *sock, 
> struct tipc_name_seq *seq,
>                                    &tsk->cong_link_cnt);
>       }
> 
> +     /* Broadcast link is now free to choose method for next broadcast */
> +     if (rc == 0) {
> +             method->mandatory = false;
> +             method->expires = jiffies;
> +     }

No, we should leave the socket code as is, so we are sure it works with legacy 
nodes.
We should instead make tipc_bcast_select_input_method() slightly smarter:

static void tipc_bcast_select_xmit_method(struct net *net, int dests,
                                          struct tipc_mc_method *method) {
         .......
        /* Can current method be changed ? */
        method->expires = jiffies + TIPC_METHOD_EXPIRE;
        if (method->mandatory)
                     return;
         if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL)) && 
time_before(jiffies, exp))
                return;

        /* Determine method to use now */
        method->rcast = dests <= bb->bc_threshold; }

I.e., we respect the 'mandatory' setting, because we need that for group_cast 
wo work correctly, but we override 'method->expire' if the cluster capabilities 
says that all nodes support MCAST_RBCTL.
Combined with the patch where we add forced BCAST or REPLICAST (make sure you 
add this patch on top of that one) I think we have a achieved a pretty smart 
and adaptive multicast subsystem.

BR
///jon


>       tipc_nlist_purge(&dsts);
> 
>       return rc ? rc : dlen;
> @@ -2157,6 +2164,9 @@ static void tipc_sk_filter_rcv(struct sock *sk, 
> struct sk_buff *skb,
>       if (unlikely(grp))
>               tipc_group_filter_msg(grp, &inputq, xmitq);
> 
> +     if (msg_type(hdr) == TIPC_MCAST_MSG)
> +             tipc_mcast_filter_msg(&tsk->mc_method.deferredq,
> &inputq);
> +
>       /* Validate and add to receive buffer if there is space */
>       while ((skb = __skb_dequeue(&inputq))) {
>               hdr = buf_msg(skb);
> --
> 2.17.1



_______________________________________________
tipc-discussion mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to