This also looks very good now, I only have some comments to the log text. 
Just make sure you test this with the groupcast test program before you post it.

Acked-by: Jon


> -----Original Message-----
> From: Hoang Le <[email protected]>
> Sent: 18-Feb-19 05:08
> To: [email protected]; Jon Maloy
> <[email protected]>; [email protected]; [email protected]
> Subject: [net-next v3 3/3] tipc: smooth change between replicast and
> broadcast
> 
> Currently, a multicast stream may start out using replicast, because there are
> few destinations, and then it should ideally switch to L2/broadcast
> IGMP/multicast when the number of destinations grows beyond a certain
> limit. The opposite should happen when the number decreases below the
> limit.
> 
> To eliminate the risk of message reordering caused by method change, a
> sending socket must stick to a previously selected method until it enters an
> idle period of 5 seconds. Means there is a 5 seconds pause in the traffic from
> the sender socket.

If the sender never makes such a pause, the method will never change, and 
transmission may become very inefficient as the cluster grows.

> 
> With this commit, we allow such a switch between replicast and broadcast
> without 5 seconds pause in the traffic.

without any need for a traffic pause.

> 
> Solution is to send a dummy message with only the header, also with the SYN
> bit set, via broadcast or replicast. For the data message, the SYN bit is set 
> and
> sending via replicast or broadcast (inverse method with dummy).
> 
> Then, at receiving side any messages follow first SYN bit message (data or
> dummy message), they will be hold

s/hold/held

 in deferred queue until another pair

> (dummy or data message) arrived in other link.

///jon

> 
> Signed-off-by: Hoang Le <[email protected]>
> ---
>  net/tipc/bcast.c  | 165
> +++++++++++++++++++++++++++++++++++++++++++++-
>  net/tipc/bcast.h  |   5 ++
>  net/tipc/msg.h    |  10 +++
>  net/tipc/socket.c |   5 ++
>  4 files changed, 184 insertions(+), 1 deletion(-)
> 
> diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index
> 12b59268bdd6..d806e3714280 100644
> --- a/net/tipc/bcast.c
> +++ b/net/tipc/bcast.c
> @@ -220,9 +220,24 @@ static void tipc_bcast_select_xmit_method(struct
> net *net, int dests,
>       }
>       /* Can current method be changed ? */
>       method->expires = jiffies + TIPC_METHOD_EXPIRE;
> -     if (method->mandatory || time_before(jiffies, exp))
> +     if (method->mandatory)
>               return;
> 
> +     if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) &&
> +         time_before(jiffies, exp))
> +             return;
> +
> +     /* Configuration as force 'broadcast' method */
> +     if (bb->force_bcast) {
> +             method->rcast = false;
> +             return;
> +     }
> +     /* Configuration as force 'replicast' method */
> +     if (bb->force_rcast) {
> +             method->rcast = true;
> +             return;
> +     }
> +     /* Configuration as 'selectable' or default method */
>       /* Determine method to use now */
>       method->rcast = dests <= bb->bc_threshold;  } @@ -285,6 +300,63
> @@ static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
>       return 0;
>  }
> 
> +/* tipc_mcast_send_sync - deliver a dummy message with SYN bit
> + * @net: the applicable net namespace
> + * @skb: socket buffer to copy
> + * @method: send method to be used
> + * @dests: destination nodes for message.
> + * @cong_link_cnt: returns number of encountered congested destination
> +links
> + * Returns 0 if success, otherwise errno  */ static int
> +tipc_mcast_send_sync(struct net *net, struct sk_buff *skb,
> +                             struct tipc_mc_method *method,
> +                             struct tipc_nlist *dests,
> +                             u16 *cong_link_cnt)
> +{
> +     struct sk_buff_head tmpq;
> +     struct sk_buff *_skb;
> +     struct tipc_msg *hdr, *_hdr;
> +
> +     /* Is a cluster supporting with new capabilities ? */
> +     if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL))
> +             return 0;
> +
> +     hdr = buf_msg(skb);
> +     if (msg_user(hdr) == MSG_FRAGMENTER)
> +             hdr = msg_get_wrapped(hdr);
> +     if (msg_type(hdr) != TIPC_MCAST_MSG)
> +             return 0;
> +
> +     /* Allocate dummy message */
> +     _skb = tipc_buf_acquire(MCAST_H_SIZE, GFP_KERNEL);
> +     if (!skb)
> +             return -ENOMEM;
> +
> +     /* Preparing for 'synching' header */
> +     msg_set_syn(hdr, 1);
> +
> +     /* Copy skb's header into a dummy header */
> +     skb_copy_to_linear_data(_skb, hdr, MCAST_H_SIZE);
> +     skb_orphan(_skb);
> +
> +     /* Reverse method for dummy message */
> +     _hdr = buf_msg(_skb);
> +     msg_set_size(_hdr, MCAST_H_SIZE);
> +     msg_set_is_rcast(_hdr, !msg_is_rcast(hdr));
> +
> +     skb_queue_head_init(&tmpq);
> +     __skb_queue_tail(&tmpq, _skb);
> +     if (method->rcast)
> +             tipc_bcast_xmit(net, &tmpq, cong_link_cnt);
> +     else
> +             tipc_rcast_xmit(net, &tmpq, dests, cong_link_cnt);
> +
> +     /* This queue should normally be empty by now */
> +     __skb_queue_purge(&tmpq);
> +
> +     return 0;
> +}
> +
>  /* tipc_mcast_xmit - deliver message to indicated destination nodes
>   *                   and to identified node local sockets
>   * @net: the applicable net namespace
> @@ -300,6 +372,9 @@ int tipc_mcast_xmit(struct net *net, struct
> sk_buff_head *pkts,
>                   u16 *cong_link_cnt)
>  {
>       struct sk_buff_head inputq, localq;
> +     struct sk_buff *skb;
> +     struct tipc_msg *hdr;
> +     bool rcast = method->rcast;
>       int rc = 0;
> 
>       skb_queue_head_init(&inputq);
> @@ -313,6 +388,18 @@ int tipc_mcast_xmit(struct net *net, struct
> sk_buff_head *pkts,
>       /* Send according to determined transmit method */
>       if (dests->remote) {
>               tipc_bcast_select_xmit_method(net, dests->remote,
> method);
> +
> +             skb = skb_peek(pkts);
> +             hdr = buf_msg(skb);
> +             if (msg_user(hdr) == MSG_FRAGMENTER)
> +                     hdr = msg_get_wrapped(hdr);
> +             msg_set_is_rcast(hdr, method->rcast);
> +
> +             /* Switch method ? */
> +             if (rcast != method->rcast)
> +                     tipc_mcast_send_sync(net, skb, method,
> +                                          dests, cong_link_cnt);
> +
>               if (method->rcast)
>                       rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
>               else
> @@ -672,3 +759,79 @@ u32 tipc_bcast_get_broadcast_ratio(struct net *net)
> 
>       return bb->rc_ratio;
>  }
> +
> +void tipc_mcast_filter_msg(struct sk_buff_head *defq,
> +                        struct sk_buff_head *inputq)
> +{
> +     struct sk_buff *skb, *_skb, *tmp;
> +     struct tipc_msg *hdr, *_hdr;
> +     bool match = false;
> +     u32 node, port;
> +
> +     skb = skb_peek(inputq);
> +     hdr = buf_msg(skb);
> +
> +     if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq)))
> +             return;
> +
> +     node = msg_orignode(hdr);
> +     port = msg_origport(hdr);
> +
> +     /* Has the twin SYN message already arrived ? */
> +     skb_queue_walk(defq, _skb) {
> +             _hdr = buf_msg(_skb);
> +             if (msg_orignode(_hdr) != node)
> +                     continue;
> +             if (msg_origport(_hdr) != port)
> +                     continue;
> +             match = true;
> +             break;
> +     }
> +
> +     if (!match) {
> +             if (!msg_is_syn(hdr))
> +                     return;
> +             __skb_dequeue(inputq);
> +             __skb_queue_tail(defq, skb);
> +             return;
> +     }
> +
> +     /* Deliver non-SYN message from other link, otherwise queue it */
> +     if (!msg_is_syn(hdr)) {
> +             if (msg_is_rcast(hdr) != msg_is_rcast(_hdr))
> +                     return;
> +             __skb_dequeue(inputq);
> +             __skb_queue_tail(defq, skb);
> +             return;
> +     }
> +
> +     /* Queue non-SYN/SYN message from same link */
> +     if (msg_is_rcast(hdr) == msg_is_rcast(_hdr)) {
> +             __skb_dequeue(inputq);
> +             __skb_queue_tail(defq, skb);
> +             return;
> +     }
> +
> +     /* Matching SYN messages => return the one with data, if any */
> +     __skb_unlink(_skb, defq);
> +     if (msg_data_sz(hdr)) {
> +             kfree_skb(_skb);
> +     } else {
> +             __skb_dequeue(inputq);
> +             kfree_skb(skb);
> +             __skb_queue_tail(inputq, _skb);
> +     }
> +
> +     /* Deliver subsequent non-SYN messages from same peer */
> +     skb_queue_walk_safe(defq, _skb, tmp) {
> +             _hdr = buf_msg(_skb);
> +             if (msg_orignode(_hdr) != node)
> +                     continue;
> +             if (msg_origport(_hdr) != port)
> +                     continue;
> +             if (msg_is_syn(_hdr))
> +                     break;
> +             __skb_unlink(_skb, defq);
> +             __skb_queue_tail(inputq, _skb);
> +     }
> +}
> diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index
> 37c55e7347a5..484bde289d3a 100644
> --- a/net/tipc/bcast.h
> +++ b/net/tipc/bcast.h
> @@ -67,11 +67,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 node);
>  /* Cookie to be used between socket and broadcast layer
>   * @rcast: replicast (instead of broadcast) was used at previous xmit
>   * @mandatory: broadcast/replicast indication was set by user
> + * @deferredq: defer queue to make message in order
>   * @expires: re-evaluate non-mandatory transmit method if we are past this
>   */
>  struct tipc_mc_method {
>       bool rcast;
>       bool mandatory;
> +     struct sk_buff_head deferredq;
>       unsigned long expires;
>  };
> 
> @@ -99,6 +101,9 @@ int tipc_bclink_reset_stats(struct net *net);
>  u32 tipc_bcast_get_broadcast_mode(struct net *net);
>  u32 tipc_bcast_get_broadcast_ratio(struct net *net);
> 
> +void tipc_mcast_filter_msg(struct sk_buff_head *defq,
> +                        struct sk_buff_head *inputq);
> +
>  static inline void tipc_bcast_lock(struct net *net)  {
>       spin_lock_bh(&tipc_net(net)->bclock);
> diff --git a/net/tipc/msg.h b/net/tipc/msg.h index
> d7e4b8b93f9d..528ba9241acc 100644
> --- a/net/tipc/msg.h
> +++ b/net/tipc/msg.h
> @@ -257,6 +257,16 @@ static inline void msg_set_src_droppable(struct
> tipc_msg *m, u32 d)
>       msg_set_bits(m, 0, 18, 1, d);
>  }
> 
> +static inline bool msg_is_rcast(struct tipc_msg *m) {
> +     return msg_bits(m, 0, 18, 0x1);
> +}
> +
> +static inline void msg_set_is_rcast(struct tipc_msg *m, bool d) {
> +     msg_set_bits(m, 0, 18, 0x1, d);
> +}
> +
>  static inline void msg_set_size(struct tipc_msg *m, u32 sz)  {
>       m->hdr[0] = htonl((msg_word(m, 0) & ~0x1ffff) | sz); diff --git
> a/net/tipc/socket.c b/net/tipc/socket.c index 8fc5acd4820d..de83eb1e718e
> 100644
> --- a/net/tipc/socket.c
> +++ b/net/tipc/socket.c
> @@ -483,6 +483,7 @@ static int tipc_sk_create(struct net *net, struct socket
> *sock,
>               tsk_set_unreturnable(tsk, true);
>               if (sock->type == SOCK_DGRAM)
>                       tsk_set_unreliable(tsk, true);
> +             __skb_queue_head_init(&tsk->mc_method.deferredq);
>       }
> 
>       trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " "); @@ -580,6
> +581,7 @@ static int tipc_release(struct socket *sock)
>       sk->sk_shutdown = SHUTDOWN_MASK;
>       tipc_sk_leave(tsk);
>       tipc_sk_withdraw(tsk, 0, NULL);
> +     __skb_queue_purge(&tsk->mc_method.deferredq);
>       sk_stop_timer(sk, &sk->sk_timer);
>       tipc_sk_remove(tsk);
> 
> @@ -2157,6 +2159,9 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct
> sk_buff *skb,
>       if (unlikely(grp))
>               tipc_group_filter_msg(grp, &inputq, xmitq);
> 
> +     if (msg_type(hdr) == TIPC_MCAST_MSG)
> +             tipc_mcast_filter_msg(&tsk->mc_method.deferredq,
> &inputq);
> +
>       /* Validate and add to receive buffer if there is space */
>       while ((skb = __skb_dequeue(&inputq))) {
>               hdr = buf_msg(skb);
> --
> 2.17.1



_______________________________________________
tipc-discussion mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to