We need some way to secure a smooth change from TIPC replicast to
broadcast and vice versa.

Currently, a multicast stream may start out using replicast, because
there are few destinations, and then it should ideally switch to
L2/broadcast IGMP/multicast when the number of destinations grows beyond
a certain limit. The opposite should happen when the number decreases
below the limit.

Currently, to guarantee sequence order, we don’t allow such a switch to
happen unless there is a 5 seconds pause in the traffic from the sender
socket. That means that is a sender never takes such a pause, he will
forever be stuck with the method he started out with.

Signed-off-by: Hoang Le <[email protected]>
---
 net/tipc/bcast.c  | 116 +++++++++++++++++++++++++++++++++++++++++++++-
 net/tipc/bcast.h  |   5 ++
 net/tipc/core.c   |   2 +
 net/tipc/core.h   |   3 ++
 net/tipc/msg.h    |  10 ++++
 net/tipc/node.c   |  10 ++++
 net/tipc/node.h   |   6 ++-
 net/tipc/socket.c |  10 ++++
 8 files changed, 159 insertions(+), 3 deletions(-)

diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index d8026543bf4c..3f9015b1b6bc 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -295,11 +295,15 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head 
*pkts,
                    struct tipc_mc_method *method, struct tipc_nlist *dests,
                    u16 *cong_link_cnt)
 {
-       struct sk_buff_head inputq, localq;
+       struct sk_buff_head inputq, localq, tmpq;
+       bool rcast = method->rcast;
+       struct sk_buff *skb, *_skb;
+       struct tipc_msg *hdr, *_hdr;
        int rc = 0;
 
        skb_queue_head_init(&inputq);
        skb_queue_head_init(&localq);
+       skb_queue_head_init(&tmpq);
 
        /* Clone packets before they are consumed by next call */
        if (dests->local && !tipc_msg_reassemble(pkts, &localq)) {
@@ -309,6 +313,53 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head 
*pkts,
        /* Send according to determined transmit method */
        if (dests->remote) {
                tipc_bcast_select_xmit_method(net, dests->remote, method);
+
+               if (tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) {
+                       skb = skb_peek(pkts);
+                       hdr = buf_msg(skb);
+
+                       if (msg_user(hdr) == MSG_FRAGMENTER)
+                               hdr = msg_get_wrapped(hdr);
+                       if (msg_type(hdr) != TIPC_MCAST_MSG)
+                               goto xmit;
+
+                       msg_set_syn(hdr, 0);
+                       msg_set_is_rcast(hdr, method->rcast);
+
+                       /* switch mode */
+                       if (rcast != method->rcast) {
+                               /* Build message's copied */
+                               _skb = tipc_buf_acquire(MCAST_H_SIZE,
+                                                       GFP_KERNEL);
+                               if (!skb) {
+                                       rc = -ENOMEM;
+                                       goto exit;
+                               }
+                               skb_orphan(_skb);
+                               skb_copy_to_linear_data(_skb, hdr,
+                                                       MCAST_H_SIZE);
+
+                               /* Build dummy header */
+                               _hdr = buf_msg(_skb);
+                               msg_set_size(_hdr, MCAST_H_SIZE);
+                               __skb_queue_tail(&tmpq, _skb);
+
+                               msg_set_syn(hdr, 1);
+                               msg_set_syn(_hdr, 1);
+                               msg_set_is_rcast(hdr, rcast);
+                               /* Prepare for 'synching' */
+                               if (rcast)
+                                       tipc_rcast_xmit(net, &tmpq, dests,
+                                                       cong_link_cnt);
+                               else
+                                       tipc_bcast_xmit(net, &tmpq,
+                                                       cong_link_cnt);
+
+                               /* This queue should normally be empty by now */
+                               __skb_queue_purge(&tmpq);
+                       }
+               }
+xmit:
                if (method->rcast)
                        rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
                else
@@ -576,3 +627,66 @@ void tipc_nlist_purge(struct tipc_nlist *nl)
        nl->remote = 0;
        nl->local = false;
 }
+
+void tipc_mcast_filter_msg(struct sk_buff_head *defq,
+                          struct sk_buff_head *inputq)
+{
+       struct sk_buff *skb, *_skb;
+       struct tipc_msg *hdr, *_hdr;
+       u32 node, port, _node, _port;
+       bool match = false;
+
+       skb = __skb_dequeue(inputq);
+       if (!skb)
+               return;
+
+       hdr = buf_msg(skb);
+       node =  msg_orignode(hdr);
+       port = msg_origport(hdr);
+
+       /* Find a peer port if its existing in defer queue */
+       while ((_skb = skb_peek(defq))) {
+               _hdr = buf_msg(_skb);
+               _node = msg_orignode(_hdr);
+               _port = msg_origport(_hdr);
+
+               if (_node != node)
+                       continue;
+               if (_port != port)
+                       continue;
+
+               if (!match) {
+                       if (msg_is_syn(hdr) &&
+                           msg_is_rcast(hdr) != msg_is_rcast(_hdr)) {
+                               __skb_dequeue(defq);
+                               if (msg_data_sz(hdr)) {
+                                       __skb_queue_tail(inputq, skb);
+                                       kfree_skb(_skb);
+                               } else {
+                                       __skb_queue_tail(inputq, _skb);
+                                       kfree_skb(skb);
+                               }
+                               match = true;
+                       } else {
+                               break;
+                       }
+               } else {
+                       if (msg_is_syn(_hdr))
+                               return;
+                       /* Dequeued to receive buffer */
+                       __skb_dequeue(defq);
+                       __skb_queue_tail(inputq, _skb);
+               }
+       }
+
+       if (match)
+               return;
+
+       if (msg_is_syn(hdr)) {
+               /* Enqueue and defer to next synching */
+               __skb_queue_tail(defq, skb);
+       } else {
+               /* Direct enqueued */
+               __skb_queue_tail(inputq, skb);
+       }
+}
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 751530ab0c49..165d88a503e4 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -63,11 +63,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 node);
 /* Cookie to be used between socket and broadcast layer
  * @rcast: replicast (instead of broadcast) was used at previous xmit
  * @mandatory: broadcast/replicast indication was set by user
+ * @deferredq: defer queue to make message in order
  * @expires: re-evaluate non-mandatory transmit method if we are past this
  */
 struct tipc_mc_method {
        bool rcast;
        bool mandatory;
+       struct sk_buff_head deferredq;
        unsigned long expires;
 };
 
@@ -92,6 +94,9 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg 
*msg);
 int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
 int tipc_bclink_reset_stats(struct net *net);
 
+void tipc_mcast_filter_msg(struct sk_buff_head *defq,
+                          struct sk_buff_head *inputq);
+
 static inline void tipc_bcast_lock(struct net *net)
 {
        spin_lock_bh(&tipc_net(net)->bclock);
diff --git a/net/tipc/core.c b/net/tipc/core.c
index 5b38f5164281..27cccd101ef6 100644
--- a/net/tipc/core.c
+++ b/net/tipc/core.c
@@ -43,6 +43,7 @@
 #include "net.h"
 #include "socket.h"
 #include "bcast.h"
+#include "node.h"
 
 #include <linux/module.h>
 
@@ -59,6 +60,7 @@ static int __net_init tipc_init_net(struct net *net)
        tn->node_addr = 0;
        tn->trial_addr = 0;
        tn->addr_trial_end = 0;
+       tn->capabilities = TIPC_NODE_CAPABILITIES;
        memset(tn->node_id, 0, sizeof(tn->node_id));
        memset(tn->node_id_string, 0, sizeof(tn->node_id_string));
        tn->mon_threshold = TIPC_DEF_MON_THRESHOLD;
diff --git a/net/tipc/core.h b/net/tipc/core.h
index 8020a6c360ff..7a68e1b6a066 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -122,6 +122,9 @@ struct tipc_net {
        /* Topology subscription server */
        struct tipc_topsrv *topsrv;
        atomic_t subscription_count;
+
+       /* Cluster capabilities */
+       u16 capabilities;
 };
 
 static inline struct tipc_net *tipc_net(struct net *net)
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index a0924956bb61..70ddff2206a0 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -257,6 +257,16 @@ static inline void msg_set_src_droppable(struct tipc_msg 
*m, u32 d)
        msg_set_bits(m, 0, 18, 1, d);
 }
 
+static inline bool msg_is_rcast(struct tipc_msg *m)
+{
+       return msg_bits(m, 0, 18, 0x1);
+}
+
+static inline void msg_set_is_rcast(struct tipc_msg *m, bool d)
+{
+       msg_set_bits(m, 0, 18, 0x1, d);
+}
+
 static inline void msg_set_size(struct tipc_msg *m, u32 sz)
 {
        m->hdr[0] = htonl((msg_word(m, 0) & ~0x1ffff) | sz);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index db2a6c3e0be9..1386e44d965c 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -383,6 +383,11 @@ static struct tipc_node *tipc_node_create(struct net *net, 
u32 addr,
                                tipc_link_update_caps(l, capabilities);
                }
                write_unlock_bh(&n->lock);
+               /* Calculate cluster capabilities */
+               tn->capabilities = TIPC_NODE_CAPABILITIES;
+               list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
+                       tn->capabilities &= temp_node->capabilities;
+               }
                goto exit;
        }
        n = kzalloc(sizeof(*n), GFP_ATOMIC);
@@ -433,6 +438,11 @@ static struct tipc_node *tipc_node_create(struct net *net, 
u32 addr,
                        break;
        }
        list_add_tail_rcu(&n->list, &temp_node->list);
+       /* Calculate cluster capabilities */
+       tn->capabilities = TIPC_NODE_CAPABILITIES;
+       list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
+               tn->capabilities &= temp_node->capabilities;
+       }
        trace_tipc_node_create(n, true, " ");
 exit:
        spin_unlock_bh(&tn->node_list_lock);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 4f59a30e989a..2404225c5d58 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -51,7 +51,8 @@ enum {
        TIPC_BLOCK_FLOWCTL    = (1 << 3),
        TIPC_BCAST_RCAST      = (1 << 4),
        TIPC_NODE_ID128       = (1 << 5),
-       TIPC_LINK_PROTO_SEQNO = (1 << 6)
+       TIPC_LINK_PROTO_SEQNO = (1 << 6),
+       TIPC_MCAST_RBCTL      = (1 << 7)
 };
 
 #define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT           |  \
@@ -60,7 +61,8 @@ enum {
                                TIPC_BCAST_RCAST       |   \
                                TIPC_BLOCK_FLOWCTL     |   \
                                TIPC_NODE_ID128        |   \
-                               TIPC_LINK_PROTO_SEQNO)
+                               TIPC_LINK_PROTO_SEQNO  |   \
+                               TIPC_MCAST_RBCTL)
 #define INVALID_BEARER_ID -1
 
 void tipc_node_stop(struct net *net);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 1217c90a363b..f8d8fa7216b9 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -483,6 +483,7 @@ static int tipc_sk_create(struct net *net, struct socket 
*sock,
                tsk_set_unreturnable(tsk, true);
                if (sock->type == SOCK_DGRAM)
                        tsk_set_unreliable(tsk, true);
+               __skb_queue_head_init(&tsk->mc_method.deferredq);
        }
 
        trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " ");
@@ -580,6 +581,7 @@ static int tipc_release(struct socket *sock)
        sk->sk_shutdown = SHUTDOWN_MASK;
        tipc_sk_leave(tsk);
        tipc_sk_withdraw(tsk, 0, NULL);
+       __skb_queue_purge(&tsk->mc_method.deferredq);
        sk_stop_timer(sk, &sk->sk_timer);
        tipc_sk_remove(tsk);
 
@@ -817,6 +819,11 @@ static int tipc_sendmcast(struct  socket *sock, struct 
tipc_name_seq *seq,
                                     &tsk->cong_link_cnt);
        }
 
+       /* Update broadcast sequence number */
+       if (rc == 0) {
+               method->mandatory = false;
+               method->expires = jiffies;
+       }
        tipc_nlist_purge(&dsts);
 
        return rc ? rc : dlen;
@@ -2157,6 +2164,9 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct 
sk_buff *skb,
        if (unlikely(grp))
                tipc_group_filter_msg(grp, &inputq, xmitq);
 
+       if (msg_type(hdr) == TIPC_MCAST_MSG)
+               tipc_mcast_filter_msg(&tsk->mc_method.deferredq, &inputq);
+
        /* Validate and add to receive buffer if there is space */
        while ((skb = __skb_dequeue(&inputq))) {
                hdr = buf_msg(skb);
-- 
2.17.1



_______________________________________________
tipc-discussion mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to