Hi Hoang,
Nice job, but still a few things to improve. See below.

> -----Original Message-----
> From: Hoang Le <[email protected]>
> Sent: 29-Jan-19 04:22
> To: Jon Maloy <[email protected]>; [email protected];
> [email protected]; [email protected]
> Subject: [net-next] tipc: smooth change between replicast and broadcast
> 
> Currently, a multicast stream may start out using replicast, because there are
> few destinations, and then it should ideally switch to L2/broadcast
> IGMP/multicast when the number of destinations grows beyond a certain
> limit. The opposite should happen when the number decreases below the
> limit.
> 
> To eliminate the risk of message reordering caused by method change, a
> sending socket must stick to a previously selected method until it enters an
> idle period of 5 seconds. Means there is a 5 seconds pause in the traffic from
> the sender socket.
> 
> In this fix, we allow such a switch between replicast and broadcast without a
> 5 seconds pause in the traffic.
> 
> Solution is to send a dummy message with only the header, also with the SYN
> bit set, via broadcast/replicast. For the data message, the SYN bit set and
> sending via replicast/broadcast.
> 
> Then, at receiving side any messages follow first SYN bit message (data or
> dummy), they will be hold in deferred queue until another pair (dummy or
> data) arrived.
> 
> For compatibility reasons we have to introduce a new capability flag
> TIPC_MCAST_RBCTL to handle this new feature. Because of there is a
> dummy message sent out, then poll return empty at old machines.
> 
> Signed-off-by: Jon Maloy <[email protected]>
> Signed-off-by: Hoang Le <[email protected]>
> ---
>  net/tipc/bcast.c  | 116
> +++++++++++++++++++++++++++++++++++++++++++++-
>  net/tipc/bcast.h  |   5 ++
>  net/tipc/core.c   |   2 +
>  net/tipc/core.h   |   3 ++
>  net/tipc/msg.h    |  10 ++++
>  net/tipc/node.c   |  10 ++++
>  net/tipc/node.h   |   6 ++-
>  net/tipc/socket.c |  10 ++++
>  8 files changed, 159 insertions(+), 3 deletions(-)
> 
> diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index
> d8026543bf4c..e3a85227d4aa 100644
> --- a/net/tipc/bcast.c
> +++ b/net/tipc/bcast.c
> @@ -295,11 +295,15 @@ int tipc_mcast_xmit(struct net *net, struct
> sk_buff_head *pkts,
>                   struct tipc_mc_method *method, struct tipc_nlist *dests,
>                   u16 *cong_link_cnt)
>  {
> -     struct sk_buff_head inputq, localq;
> +     struct sk_buff_head inputq, localq, tmpq;
> +     bool rcast = method->rcast;
> +     struct sk_buff *skb, *_skb;
> +     struct tipc_msg *hdr, *_hdr;
>       int rc = 0;
> 
>       skb_queue_head_init(&inputq);
>       skb_queue_head_init(&localq);
> +     skb_queue_head_init(&tmpq);
> 
>       /* Clone packets before they are consumed by next call */
>       if (dests->local && !tipc_msg_reassemble(pkts, &localq)) { @@ -
> 309,6 +313,53 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head
> *pkts,
>       /* Send according to determined transmit method */
>       if (dests->remote) {
>               tipc_bcast_select_xmit_method(net, dests->remote,
> method);

I  would suggest that you move the whole code block below into a separate 
function:
                              msg_set_is_rcast(hdr, method->rcast);
                              if (rcast != method->rcast)
                                  tipc_mcast_send_sync(net, skb_peek(pkts));

This function sets the SYN bit in the packet header,  then copies that header 
into a dummy header, inverts the is_rcast  bit in that header and sends it out 
via the appropriate method.
Note that this involves a small change: the real message is sent out via the 
selected method, the dummy message always via the other method, whichever it is.

> +
> +             if (tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) {
> +                     skb = skb_peek(pkts);
> +                     hdr = buf_msg(skb);
> +
> +                     if (msg_user(hdr) == MSG_FRAGMENTER)
> +                             hdr = msg_get_wrapped(hdr);
> +                     if (msg_type(hdr) != TIPC_MCAST_MSG)
> +                             goto xmit;
> +
> +                     msg_set_syn(hdr, 0);
> +                     msg_set_is_rcast(hdr, method->rcast);
> +
> +                     /* switch mode */
> +                     if (rcast != method->rcast) {
> +                             /* Build message's copied */
> +                             _skb = tipc_buf_acquire(MCAST_H_SIZE,
> +                                                     GFP_KERNEL);
> +                             if (!skb) {
> +                                     rc = -ENOMEM;
> +                                     goto exit;
> +                             }
> +                             skb_orphan(_skb);
> +                             skb_copy_to_linear_data(_skb, hdr,
> +                                                     MCAST_H_SIZE);
> +
> +                             /* Build dummy header */
> +                             _hdr = buf_msg(_skb);
> +                             msg_set_size(_hdr, MCAST_H_SIZE);
> +                             __skb_queue_tail(&tmpq, _skb);
> +
> +                             msg_set_syn(hdr, 1);
> +                             msg_set_syn(_hdr, 1);
> +                             msg_set_is_rcast(_hdr, rcast);
> +                             /* Prepare for 'synching' */
> +                             if (rcast)
> +                                     tipc_rcast_xmit(net, &tmpq, dests,
> +                                                     cong_link_cnt);
> +                             else
> +                                     tipc_bcast_xmit(net, &tmpq,
> +                                                     cong_link_cnt);
> +
> +                             /* This queue should normally be empty by
> now */
> +                             __skb_queue_purge(&tmpq);
> +                     }
> +             }
> +xmit:
>               if (method->rcast)
>                       rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
>               else
> @@ -576,3 +627,66 @@ void tipc_nlist_purge(struct tipc_nlist *nl)
>       nl->remote = 0;
>       nl->local = false;
>  }
> +
> +void tipc_mcast_filter_msg(struct sk_buff_head *defq,
> +                        struct sk_buff_head *inputq)
> +{
> +     struct sk_buff *skb, *_skb;
> +     struct tipc_msg *hdr, *_hdr;
> +     u32 node, port, _node, _port;
> +     bool match = false;

If you put in the following lines here we will save some instruction cycles:
              hdr = buf_msg(skb_peek(inputq));
              if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq)))
                         return;

After all this will be the case in the vast majority of cases. It is safe to 
just peek the queue and access, since inputq never can be empty.

> +
> +     skb = __skb_dequeue(inputq);
> +     if (!skb)
> +             return;
> +
> +     hdr = buf_msg(skb);
> +     node =  msg_orignode(hdr);
> +     port = msg_origport(hdr);
> +
> +     /* Find a peer port if its existing in defer queue */
> +     while ((_skb = skb_peek(defq))) {
> +             _hdr = buf_msg(_skb);
> +             _node = msg_orignode(_hdr);
> +             _port = msg_origport(_hdr);
> +
> +             if (_node != node)
> +                     continue;
> +             if (_port != port)
> +                     continue;
> +
> +             if (!match) {
> +                     if (msg_is_syn(hdr) &&
> +                         msg_is_rcast(hdr) != msg_is_rcast(_hdr)) {
> +                             __skb_dequeue(defq);
> +                             if (msg_data_sz(hdr)) {
> +                                     __skb_queue_tail(inputq, skb);
> +                                     kfree_skb(_skb);
> +                             } else {
> +                                     __skb_queue_tail(inputq, _skb);
> +                                     kfree_skb(skb);
> +                             }
> +                             match = true;
> +                     } else {
> +                             break;
> +                     }
> +             } else {
> +                     if (msg_is_syn(_hdr))
> +                             return;
> +                     /* Dequeued to receive buffer */
> +                     __skb_dequeue(defq);
> +                     __skb_queue_tail(inputq, _skb);
> +             }
> +     }
> +
> +     if (match)
> +             return;
> +
> +     if (msg_is_syn(hdr)) {
> +             /* Enqueue and defer to next synching */
> +             __skb_queue_tail(defq, skb);
> +     } else {
> +             /* Direct enqueued */
> +             __skb_queue_tail(inputq, skb);
> +     }
> +}

Above function is hard to follow and does not convince me. What if there are 
messages from many sources, and you find the match in the middle of the queue?
I suggest you break down the logics to smaller tasks, e.g., as follows (code 
compiles ok, but is untested):

void tipc_mcast_filter_msg(struct sk_buff_head *defq,
                           struct sk_buff_head *inputq)
{
        struct sk_buff *skb, *_skb, *tmp;
        struct tipc_msg *hdr, *_hdr;
        bool match = false;
        u32 node, port;

        skb = skb_peek(inputq);
        hdr = buf_msg(skb);
        if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq)))
                return;

        __skb_dequeue(inputq);
        node =  msg_orignode(hdr);
        port = msg_origport(hdr);

        /* Has the twin SYN message already arrived ? */
        skb_queue_walk(defq, _skb) {
                _hdr = buf_msg(_skb);
                if (msg_orignode(_hdr) != node)
                        continue;
                if (msg_origport(_hdr) != port)
                        continue;
                match = true;
                break;
        }

        if (!match)
                return __skb_queue_tail(defq, skb);

        if (!msg_is_syn(_hdr)) {
                pr_warn_ratelimited("Non-sync mcast heads deferred queue\n");
                __skb_queue_purge(defq);
                return __skb_queue_tail(inputq, skb);
        }

        /* Non-SYN message from other link can be delivered right away */
        if (!msg_is_syn(hdr)) {
                if (msg_is_rcast(hdr) != msg_is_rcast(_hdr))
                        return __skb_queue_tail(inputq, skb);
                else
                        return __skb_queue_tail(defq, skb);
        }

        /* Matching SYN messages => return the one with data, if any */
        __skb_unlink(_skb, defq);
        if (msg_data_sz(hdr)) {
                kfree_skb(_skb);
                __skb_queue_tail(inputq, skb);
        } else {
                kfree_skb(skb);
                __skb_queue_tail(inputq, _skb);
        }
        /* Deliver subsequent non-SYN messages from same peer */
        skb_queue_walk_safe(defq, _skb, tmp) {
                _hdr = buf_msg(_skb);
                if (msg_orignode(_hdr) != node)
                        continue;
                if (msg_origport(_hdr) != port)
                        continue;
                if (msg_is_syn(_hdr))
                        break;
                __skb_unlink(_skb, defq);
                __skb_queue_tail(inputq, _skb);
        }
}

> diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index
> 751530ab0c49..165d88a503e4 100644
> --- a/net/tipc/bcast.h
> +++ b/net/tipc/bcast.h
> @@ -63,11 +63,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 node);
>  /* Cookie to be used between socket and broadcast layer
>   * @rcast: replicast (instead of broadcast) was used at previous xmit
>   * @mandatory: broadcast/replicast indication was set by user
> + * @deferredq: defer queue to make message in order
>   * @expires: re-evaluate non-mandatory transmit method if we are past this
>   */

[...]

> @@ -383,6 +383,11 @@ static struct tipc_node *tipc_node_create(struct net
> *net, u32 addr,
>                               tipc_link_update_caps(l, capabilities);
>               }
>               write_unlock_bh(&n->lock);
> +             /* Calculate cluster capabilities */
> +             tn->capabilities = TIPC_NODE_CAPABILITIES;
> +             list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
> +                     tn->capabilities &= temp_node->capabilities;
> +             }

Yes, you are right here. During a cluster upgrade a node can come back with new 
capabilities which also must be reflected in the cluster capabilities field.
Actually, I think it would be a good idea to add cluster capabilities as a 
separate patch. This makes this rather complex patch slightly smaller.

>               goto exit;
>       }
>       n = kzalloc(sizeof(*n), GFP_ATOMIC);
> @@ -433,6 +438,11 @@ static struct tipc_node *tipc_node_create(struct net
> *net, u32 addr,
>                       break;

[...]

> 
> @@ -817,6 +819,11 @@ static int tipc_sendmcast(struct  socket *sock, struct
> tipc_name_seq *seq,
>                                    &tsk->cong_link_cnt);
>       }
> 
> +     /* Broadcast link is now free to choose method for next broadcast */
> +     if (rc == 0) {
> +             method->mandatory = false;
> +             method->expires = jiffies;
> +     }

No, we should leave the socket code as is, so we are sure it works with legacy 
nodes.
We should instead make tipc_bcast_select_input_method() slightly smarter:

static void tipc_bcast_select_xmit_method(struct net *net, int dests,
                                          struct tipc_mc_method *method)
{
         .......
        /* Can current method be changed ? */
        method->expires = jiffies + TIPC_METHOD_EXPIRE;
        if (method->mandatory)
                     return;
         if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL)) && 
time_before(jiffies, exp))
                return;

        /* Determine method to use now */
        method->rcast = dests <= bb->bc_threshold;
}

I.e., we respect the 'mandatory' setting, because we need that for group_cast 
wo work correctly, but we override 'method->expire' if the cluster capabilities 
says that all nodes support MCAST_RBCTL.
Combined with the patch where we add forced BCAST or REPLICAST (make sure you 
add this patch on top of that one) I think we have a achieved a pretty smart 
and adaptive multicast subsystem.

BR
///jon


>       tipc_nlist_purge(&dsts);
> 
>       return rc ? rc : dlen;
> @@ -2157,6 +2164,9 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct
> sk_buff *skb,
>       if (unlikely(grp))
>               tipc_group_filter_msg(grp, &inputq, xmitq);
> 
> +     if (msg_type(hdr) == TIPC_MCAST_MSG)
> +             tipc_mcast_filter_msg(&tsk->mc_method.deferredq,
> &inputq);
> +
>       /* Validate and add to receive buffer if there is space */
>       while ((skb = __skb_dequeue(&inputq))) {
>               hdr = buf_msg(skb);
> --
> 2.17.1



_______________________________________________
tipc-discussion mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to