Hi Hoang,
I realized one  more thing. Since this mechanism only is intended for MCAST 
messages, we must filter out those at the send side (in tipc_mcast_send_sync()) 
so that SYN messages are sent only for those. Group messaging, which has its 
own mechanism, does not need this new method, and cannot discover message 
duplicates, -it would just deliver them. This must be avoided.

BR
///jon


> -----Original Message-----
> From: Jon Maloy
> Sent: 31-Jan-19 16:05
> To: 'Hoang Le' <[email protected]>; [email protected];
> [email protected]; [email protected]
> Subject: RE: [net-next] tipc: smooth change between replicast and broadcast
> 
> Hi Hoang,
> Nice job, but still a few things to improve. See below.
> 
> > -----Original Message-----
> > From: Hoang Le <[email protected]>
> > Sent: 29-Jan-19 04:22
> > To: Jon Maloy <[email protected]>; [email protected];
> > [email protected]; [email protected]
> > Subject: [net-next] tipc: smooth change between replicast and
> > broadcast
> >
> > Currently, a multicast stream may start out using replicast, because
> > there are few destinations, and then it should ideally switch to
> > L2/broadcast IGMP/multicast when the number of destinations grows
> > beyond a certain limit. The opposite should happen when the number
> > decreases below the limit.
> >
> > To eliminate the risk of message reordering caused by method change, a
> > sending socket must stick to a previously selected method until it
> > enters an idle period of 5 seconds. Means there is a 5 seconds pause
> > in the traffic from the sender socket.
> >
> > In this fix, we allow such a switch between replicast and broadcast
> > without a
> > 5 seconds pause in the traffic.
> >
> > Solution is to send a dummy message with only the header, also with
> > the SYN bit set, via broadcast/replicast. For the data message, the
> > SYN bit set and sending via replicast/broadcast.
> >
> > Then, at receiving side any messages follow first SYN bit message
> > (data or dummy), they will be hold in deferred queue until another
> > pair (dummy or
> > data) arrived.
> >
> > For compatibility reasons we have to introduce a new capability flag
> > TIPC_MCAST_RBCTL to handle this new feature. Because of there is a
> > dummy message sent out, then poll return empty at old machines.
> >
> > Signed-off-by: Jon Maloy <[email protected]>
> > Signed-off-by: Hoang Le <[email protected]>
> > ---
> >  net/tipc/bcast.c  | 116
> > +++++++++++++++++++++++++++++++++++++++++++++-
> >  net/tipc/bcast.h  |   5 ++
> >  net/tipc/core.c   |   2 +
> >  net/tipc/core.h   |   3 ++
> >  net/tipc/msg.h    |  10 ++++
> >  net/tipc/node.c   |  10 ++++
> >  net/tipc/node.h   |   6 ++-
> >  net/tipc/socket.c |  10 ++++
> >  8 files changed, 159 insertions(+), 3 deletions(-)
> >
> > diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index
> > d8026543bf4c..e3a85227d4aa 100644
> > --- a/net/tipc/bcast.c
> > +++ b/net/tipc/bcast.c
> > @@ -295,11 +295,15 @@ int tipc_mcast_xmit(struct net *net, struct
> > sk_buff_head *pkts,
> >                 struct tipc_mc_method *method, struct tipc_nlist *dests,
> >                 u16 *cong_link_cnt)
> >  {
> > -   struct sk_buff_head inputq, localq;
> > +   struct sk_buff_head inputq, localq, tmpq;
> > +   bool rcast = method->rcast;
> > +   struct sk_buff *skb, *_skb;
> > +   struct tipc_msg *hdr, *_hdr;
> >     int rc = 0;
> >
> >     skb_queue_head_init(&inputq);
> >     skb_queue_head_init(&localq);
> > +   skb_queue_head_init(&tmpq);
> >
> >     /* Clone packets before they are consumed by next call */
> >     if (dests->local && !tipc_msg_reassemble(pkts, &localq)) { @@ -
> > 309,6 +313,53 @@ int tipc_mcast_xmit(struct net *net, struct
> > sk_buff_head *pkts,
> >     /* Send according to determined transmit method */
> >     if (dests->remote) {
> >             tipc_bcast_select_xmit_method(net, dests->remote,
> method);
> 
> I  would suggest that you move the whole code block below into a separate
> function:
>                               msg_set_is_rcast(hdr, method->rcast);
>                               if (rcast != method->rcast)
>                                 tipc_mcast_send_sync(net, skb_peek(pkts));
> 
> This function sets the SYN bit in the packet header,  then copies that header
> into a dummy header, inverts the is_rcast  bit in that header and sends it out
> via the appropriate method.
> Note that this involves a small change: the real message is sent out via the
> selected method, the dummy message always via the other method,
> whichever it is.
> 
> > +
> > +           if (tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) {
> > +                   skb = skb_peek(pkts);
> > +                   hdr = buf_msg(skb);
> > +
> > +                   if (msg_user(hdr) == MSG_FRAGMENTER)
> > +                           hdr = msg_get_wrapped(hdr);
> > +                   if (msg_type(hdr) != TIPC_MCAST_MSG)
> > +                           goto xmit;
> > +
> > +                   msg_set_syn(hdr, 0);
> > +                   msg_set_is_rcast(hdr, method->rcast);
> > +
> > +                   /* switch mode */
> > +                   if (rcast != method->rcast) {
> > +                           /* Build message's copied */
> > +                           _skb = tipc_buf_acquire(MCAST_H_SIZE,
> > +                                                   GFP_KERNEL);
> > +                           if (!skb) {
> > +                                   rc = -ENOMEM;
> > +                                   goto exit;
> > +                           }
> > +                           skb_orphan(_skb);
> > +                           skb_copy_to_linear_data(_skb, hdr,
> > +                                                   MCAST_H_SIZE);
> > +
> > +                           /* Build dummy header */
> > +                           _hdr = buf_msg(_skb);
> > +                           msg_set_size(_hdr, MCAST_H_SIZE);
> > +                           __skb_queue_tail(&tmpq, _skb);
> > +
> > +                           msg_set_syn(hdr, 1);
> > +                           msg_set_syn(_hdr, 1);
> > +                           msg_set_is_rcast(_hdr, rcast);
> > +                           /* Prepare for 'synching' */
> > +                           if (rcast)
> > +                                   tipc_rcast_xmit(net, &tmpq, dests,
> > +                                                   cong_link_cnt);
> > +                           else
> > +                                   tipc_bcast_xmit(net, &tmpq,
> > +                                                   cong_link_cnt);
> > +
> > +                           /* This queue should normally be empty by
> > now */
> > +                           __skb_queue_purge(&tmpq);
> > +                   }
> > +           }
> > +xmit:
> >             if (method->rcast)
> >                     rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
> >             else
> > @@ -576,3 +627,66 @@ void tipc_nlist_purge(struct tipc_nlist *nl)
> >     nl->remote = 0;
> >     nl->local = false;
> >  }
> > +
> > +void tipc_mcast_filter_msg(struct sk_buff_head *defq,
> > +                      struct sk_buff_head *inputq)
> > +{
> > +   struct sk_buff *skb, *_skb;
> > +   struct tipc_msg *hdr, *_hdr;
> > +   u32 node, port, _node, _port;
> > +   bool match = false;
> 
> If you put in the following lines here we will save some instruction cycles:
>               hdr = buf_msg(skb_peek(inputq));
>               if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq)))
>                          return;
> 
> After all this will be the case in the vast majority of cases. It is safe to 
> just
> peek the queue and access, since inputq never can be empty.
> 
> > +
> > +   skb = __skb_dequeue(inputq);
> > +   if (!skb)
> > +           return;
> > +
> > +   hdr = buf_msg(skb);
> > +   node =  msg_orignode(hdr);
> > +   port = msg_origport(hdr);
> > +
> > +   /* Find a peer port if its existing in defer queue */
> > +   while ((_skb = skb_peek(defq))) {
> > +           _hdr = buf_msg(_skb);
> > +           _node = msg_orignode(_hdr);
> > +           _port = msg_origport(_hdr);
> > +
> > +           if (_node != node)
> > +                   continue;
> > +           if (_port != port)
> > +                   continue;
> > +
> > +           if (!match) {
> > +                   if (msg_is_syn(hdr) &&
> > +                       msg_is_rcast(hdr) != msg_is_rcast(_hdr)) {
> > +                           __skb_dequeue(defq);
> > +                           if (msg_data_sz(hdr)) {
> > +                                   __skb_queue_tail(inputq, skb);
> > +                                   kfree_skb(_skb);
> > +                           } else {
> > +                                   __skb_queue_tail(inputq, _skb);
> > +                                   kfree_skb(skb);
> > +                           }
> > +                           match = true;
> > +                   } else {
> > +                           break;
> > +                   }
> > +           } else {
> > +                   if (msg_is_syn(_hdr))
> > +                           return;
> > +                   /* Dequeued to receive buffer */
> > +                   __skb_dequeue(defq);
> > +                   __skb_queue_tail(inputq, _skb);
> > +           }
> > +   }
> > +
> > +   if (match)
> > +           return;
> > +
> > +   if (msg_is_syn(hdr)) {
> > +           /* Enqueue and defer to next synching */
> > +           __skb_queue_tail(defq, skb);
> > +   } else {
> > +           /* Direct enqueued */
> > +           __skb_queue_tail(inputq, skb);
> > +   }
> > +}
> 
> Above function is hard to follow and does not convince me. What if there are
> messages from many sources, and you find the match in the middle of the
> queue?
> I suggest you break down the logics to smaller tasks, e.g., as follows (code
> compiles ok, but is untested):
> 
> void tipc_mcast_filter_msg(struct sk_buff_head *defq,
>                            struct sk_buff_head *inputq) {
>         struct sk_buff *skb, *_skb, *tmp;
>         struct tipc_msg *hdr, *_hdr;
>         bool match = false;
>         u32 node, port;
> 
>         skb = skb_peek(inputq);
>         hdr = buf_msg(skb);
>         if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq)))
>                 return;
> 
>         __skb_dequeue(inputq);
>         node =  msg_orignode(hdr);
>         port = msg_origport(hdr);
> 
>         /* Has the twin SYN message already arrived ? */
>         skb_queue_walk(defq, _skb) {
>                 _hdr = buf_msg(_skb);
>                 if (msg_orignode(_hdr) != node)
>                         continue;
>                 if (msg_origport(_hdr) != port)
>                         continue;
>                 match = true;
>                 break;
>         }
> 
>         if (!match)
>                 return __skb_queue_tail(defq, skb);
> 
>         if (!msg_is_syn(_hdr)) {
>                 pr_warn_ratelimited("Non-sync mcast heads deferred queue\n");
>                 __skb_queue_purge(defq);
>                 return __skb_queue_tail(inputq, skb);
>         }
> 
>         /* Non-SYN message from other link can be delivered right away */
>         if (!msg_is_syn(hdr)) {
>                 if (msg_is_rcast(hdr) != msg_is_rcast(_hdr))
>                         return __skb_queue_tail(inputq, skb);
>                 else
>                         return __skb_queue_tail(defq, skb);
>         }
> 
>         /* Matching SYN messages => return the one with data, if any */
>         __skb_unlink(_skb, defq);
>         if (msg_data_sz(hdr)) {
>                 kfree_skb(_skb);
>                 __skb_queue_tail(inputq, skb);
>         } else {
>                 kfree_skb(skb);
>                 __skb_queue_tail(inputq, _skb);
>         }
>         /* Deliver subsequent non-SYN messages from same peer */
>         skb_queue_walk_safe(defq, _skb, tmp) {
>                 _hdr = buf_msg(_skb);
>                 if (msg_orignode(_hdr) != node)
>                         continue;
>                 if (msg_origport(_hdr) != port)
>                         continue;
>                 if (msg_is_syn(_hdr))
>                         break;
>                 __skb_unlink(_skb, defq);
>                 __skb_queue_tail(inputq, _skb);
>         }
> }
> 
> > diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index
> > 751530ab0c49..165d88a503e4 100644
> > --- a/net/tipc/bcast.h
> > +++ b/net/tipc/bcast.h
> > @@ -63,11 +63,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32
> > node);
> >  /* Cookie to be used between socket and broadcast layer
> >   * @rcast: replicast (instead of broadcast) was used at previous xmit
> >   * @mandatory: broadcast/replicast indication was set by user
> > + * @deferredq: defer queue to make message in order
> >   * @expires: re-evaluate non-mandatory transmit method if we are past
> this
> >   */
> 
> [...]
> 
> > @@ -383,6 +383,11 @@ static struct tipc_node *tipc_node_create(struct
> > net *net, u32 addr,
> >                             tipc_link_update_caps(l, capabilities);
> >             }
> >             write_unlock_bh(&n->lock);
> > +           /* Calculate cluster capabilities */
> > +           tn->capabilities = TIPC_NODE_CAPABILITIES;
> > +           list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
> > +                   tn->capabilities &= temp_node->capabilities;
> > +           }
> 
> Yes, you are right here. During a cluster upgrade a node can come back with
> new capabilities which also must be reflected in the cluster capabilities 
> field.
> Actually, I think it would be a good idea to add cluster capabilities as a
> separate patch. This makes this rather complex patch slightly smaller.
> 
> >             goto exit;
> >     }
> >     n = kzalloc(sizeof(*n), GFP_ATOMIC); @@ -433,6 +438,11 @@ static
> > struct tipc_node *tipc_node_create(struct net *net, u32 addr,
> >                     break;
> 
> [...]
> 
> >
> > @@ -817,6 +819,11 @@ static int tipc_sendmcast(struct  socket *sock,
> > struct tipc_name_seq *seq,
> >                                  &tsk->cong_link_cnt);
> >     }
> >
> > +   /* Broadcast link is now free to choose method for next broadcast */
> > +   if (rc == 0) {
> > +           method->mandatory = false;
> > +           method->expires = jiffies;
> > +   }
> 
> No, we should leave the socket code as is, so we are sure it works with legacy
> nodes.
> We should instead make tipc_bcast_select_input_method() slightly smarter:
> 
> static void tipc_bcast_select_xmit_method(struct net *net, int dests,
>                                           struct tipc_mc_method *method) {
>          .......
>         /* Can current method be changed ? */
>         method->expires = jiffies + TIPC_METHOD_EXPIRE;
>         if (method->mandatory)
>                      return;
>          if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL)) &&
> time_before(jiffies, exp))
>                 return;
> 
>         /* Determine method to use now */
>         method->rcast = dests <= bb->bc_threshold; }
> 
> I.e., we respect the 'mandatory' setting, because we need that for
> group_cast wo work correctly, but we override 'method->expire' if the
> cluster capabilities says that all nodes support MCAST_RBCTL.
> Combined with the patch where we add forced BCAST or REPLICAST (make
> sure you add this patch on top of that one) I think we have a achieved a
> pretty smart and adaptive multicast subsystem.
> 
> BR
> ///jon
> 
> 
> >     tipc_nlist_purge(&dsts);
> >
> >     return rc ? rc : dlen;
> > @@ -2157,6 +2164,9 @@ static void tipc_sk_filter_rcv(struct sock *sk,
> > struct sk_buff *skb,
> >     if (unlikely(grp))
> >             tipc_group_filter_msg(grp, &inputq, xmitq);
> >
> > +   if (msg_type(hdr) == TIPC_MCAST_MSG)
> > +           tipc_mcast_filter_msg(&tsk->mc_method.deferredq,
> > &inputq);
> > +
> >     /* Validate and add to receive buffer if there is space */
> >     while ((skb = __skb_dequeue(&inputq))) {
> >             hdr = buf_msg(skb);
> > --
> > 2.17.1



_______________________________________________
tipc-discussion mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to