We need some way to secure a smooth change from TIPC replicast to broadcast and vice versa.
Currently, a multicast stream may start out using replicast, because there are few destinations, and then it should ideally switch to L2/broadcast IGMP/multicast when the number of destinations grows beyond a certain limit. The opposite should happen when the number decreases below the limit. Currently, to guarantee sequence order, we don’t allow such a switch to happen unless there is a 5 seconds pause in the traffic from the sender socket. That means that is a sender never takes such a pause, he will forever be stuck with the method he started out with. Signed-off-by: Hoang Le <[email protected]> --- net/tipc/bcast.c | 116 +++++++++++++++++++++++++++++++++++++++++++++- net/tipc/bcast.h | 5 ++ net/tipc/core.c | 2 + net/tipc/core.h | 3 ++ net/tipc/msg.h | 10 ++++ net/tipc/node.c | 10 ++++ net/tipc/node.h | 6 ++- net/tipc/socket.c | 10 ++++ 8 files changed, 159 insertions(+), 3 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index d8026543bf4c..3f9015b1b6bc 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -295,11 +295,15 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, struct tipc_mc_method *method, struct tipc_nlist *dests, u16 *cong_link_cnt) { - struct sk_buff_head inputq, localq; + struct sk_buff_head inputq, localq, tmpq; + bool rcast = method->rcast; + struct sk_buff *skb, *_skb; + struct tipc_msg *hdr, *_hdr; int rc = 0; skb_queue_head_init(&inputq); skb_queue_head_init(&localq); + skb_queue_head_init(&tmpq); /* Clone packets before they are consumed by next call */ if (dests->local && !tipc_msg_reassemble(pkts, &localq)) { @@ -309,6 +313,53 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, /* Send according to determined transmit method */ if (dests->remote) { tipc_bcast_select_xmit_method(net, dests->remote, method); + + if (tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) { + skb = skb_peek(pkts); + hdr = buf_msg(skb); + + if (msg_user(hdr) == MSG_FRAGMENTER) + hdr = msg_get_wrapped(hdr); + if (msg_type(hdr) != TIPC_MCAST_MSG) + goto xmit; + + msg_set_syn(hdr, 0); + msg_set_is_rcast(hdr, method->rcast); + + /* switch mode */ + if (rcast != method->rcast) { + /* Build message's copied */ + _skb = tipc_buf_acquire(MCAST_H_SIZE, + GFP_KERNEL); + if (!skb) { + rc = -ENOMEM; + goto exit; + } + skb_orphan(_skb); + skb_copy_to_linear_data(_skb, hdr, + MCAST_H_SIZE); + + /* Build dummy header */ + _hdr = buf_msg(_skb); + msg_set_size(_hdr, MCAST_H_SIZE); + __skb_queue_tail(&tmpq, _skb); + + msg_set_syn(hdr, 1); + msg_set_syn(_hdr, 1); + msg_set_is_rcast(hdr, rcast); + /* Prepare for 'synching' */ + if (rcast) + tipc_rcast_xmit(net, &tmpq, dests, + cong_link_cnt); + else + tipc_bcast_xmit(net, &tmpq, + cong_link_cnt); + + /* This queue should normally be empty by now */ + __skb_queue_purge(&tmpq); + } + } +xmit: if (method->rcast) rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt); else @@ -576,3 +627,66 @@ void tipc_nlist_purge(struct tipc_nlist *nl) nl->remote = 0; nl->local = false; } + +void tipc_mcast_filter_msg(struct sk_buff_head *defq, + struct sk_buff_head *inputq) +{ + struct sk_buff *skb, *_skb; + struct tipc_msg *hdr, *_hdr; + u32 node, port, _node, _port; + bool match = false; + + skb = __skb_dequeue(inputq); + if (!skb) + return; + + hdr = buf_msg(skb); + node = msg_orignode(hdr); + port = msg_origport(hdr); + + /* Find a peer port if its existing in defer queue */ + while ((_skb = skb_peek(defq))) { + _hdr = buf_msg(_skb); + _node = msg_orignode(_hdr); + _port = msg_origport(_hdr); + + if (_node != node) + continue; + if (_port != port) + continue; + + if (!match) { + if (msg_is_syn(hdr) && + msg_is_rcast(hdr) != msg_is_rcast(_hdr)) { + __skb_dequeue(defq); + if (msg_data_sz(hdr)) { + __skb_queue_tail(inputq, skb); + kfree_skb(_skb); + } else { + __skb_queue_tail(inputq, _skb); + kfree_skb(skb); + } + match = true; + } else { + break; + } + } else { + if (msg_is_syn(_hdr)) + return; + /* Dequeued to receive buffer */ + __skb_dequeue(defq); + __skb_queue_tail(inputq, _skb); + } + } + + if (match) + return; + + if (msg_is_syn(hdr)) { + /* Enqueue and defer to next synching */ + __skb_queue_tail(defq, skb); + } else { + /* Direct enqueued */ + __skb_queue_tail(inputq, skb); + } +} diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index 751530ab0c49..165d88a503e4 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -63,11 +63,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 node); /* Cookie to be used between socket and broadcast layer * @rcast: replicast (instead of broadcast) was used at previous xmit * @mandatory: broadcast/replicast indication was set by user + * @deferredq: defer queue to make message in order * @expires: re-evaluate non-mandatory transmit method if we are past this */ struct tipc_mc_method { bool rcast; bool mandatory; + struct sk_buff_head deferredq; unsigned long expires; }; @@ -92,6 +94,9 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg); int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]); int tipc_bclink_reset_stats(struct net *net); +void tipc_mcast_filter_msg(struct sk_buff_head *defq, + struct sk_buff_head *inputq); + static inline void tipc_bcast_lock(struct net *net) { spin_lock_bh(&tipc_net(net)->bclock); diff --git a/net/tipc/core.c b/net/tipc/core.c index 5b38f5164281..27cccd101ef6 100644 --- a/net/tipc/core.c +++ b/net/tipc/core.c @@ -43,6 +43,7 @@ #include "net.h" #include "socket.h" #include "bcast.h" +#include "node.h" #include <linux/module.h> @@ -59,6 +60,7 @@ static int __net_init tipc_init_net(struct net *net) tn->node_addr = 0; tn->trial_addr = 0; tn->addr_trial_end = 0; + tn->capabilities = TIPC_NODE_CAPABILITIES; memset(tn->node_id, 0, sizeof(tn->node_id)); memset(tn->node_id_string, 0, sizeof(tn->node_id_string)); tn->mon_threshold = TIPC_DEF_MON_THRESHOLD; diff --git a/net/tipc/core.h b/net/tipc/core.h index 8020a6c360ff..7a68e1b6a066 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -122,6 +122,9 @@ struct tipc_net { /* Topology subscription server */ struct tipc_topsrv *topsrv; atomic_t subscription_count; + + /* Cluster capabilities */ + u16 capabilities; }; static inline struct tipc_net *tipc_net(struct net *net) diff --git a/net/tipc/msg.h b/net/tipc/msg.h index a0924956bb61..70ddff2206a0 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -257,6 +257,16 @@ static inline void msg_set_src_droppable(struct tipc_msg *m, u32 d) msg_set_bits(m, 0, 18, 1, d); } +static inline bool msg_is_rcast(struct tipc_msg *m) +{ + return msg_bits(m, 0, 18, 0x1); +} + +static inline void msg_set_is_rcast(struct tipc_msg *m, bool d) +{ + msg_set_bits(m, 0, 18, 0x1, d); +} + static inline void msg_set_size(struct tipc_msg *m, u32 sz) { m->hdr[0] = htonl((msg_word(m, 0) & ~0x1ffff) | sz); diff --git a/net/tipc/node.c b/net/tipc/node.c index db2a6c3e0be9..1386e44d965c 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -383,6 +383,11 @@ static struct tipc_node *tipc_node_create(struct net *net, u32 addr, tipc_link_update_caps(l, capabilities); } write_unlock_bh(&n->lock); + /* Calculate cluster capabilities */ + tn->capabilities = TIPC_NODE_CAPABILITIES; + list_for_each_entry_rcu(temp_node, &tn->node_list, list) { + tn->capabilities &= temp_node->capabilities; + } goto exit; } n = kzalloc(sizeof(*n), GFP_ATOMIC); @@ -433,6 +438,11 @@ static struct tipc_node *tipc_node_create(struct net *net, u32 addr, break; } list_add_tail_rcu(&n->list, &temp_node->list); + /* Calculate cluster capabilities */ + tn->capabilities = TIPC_NODE_CAPABILITIES; + list_for_each_entry_rcu(temp_node, &tn->node_list, list) { + tn->capabilities &= temp_node->capabilities; + } trace_tipc_node_create(n, true, " "); exit: spin_unlock_bh(&tn->node_list_lock); diff --git a/net/tipc/node.h b/net/tipc/node.h index 4f59a30e989a..2404225c5d58 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -51,7 +51,8 @@ enum { TIPC_BLOCK_FLOWCTL = (1 << 3), TIPC_BCAST_RCAST = (1 << 4), TIPC_NODE_ID128 = (1 << 5), - TIPC_LINK_PROTO_SEQNO = (1 << 6) + TIPC_LINK_PROTO_SEQNO = (1 << 6), + TIPC_MCAST_RBCTL = (1 << 7) }; #define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT | \ @@ -60,7 +61,8 @@ enum { TIPC_BCAST_RCAST | \ TIPC_BLOCK_FLOWCTL | \ TIPC_NODE_ID128 | \ - TIPC_LINK_PROTO_SEQNO) + TIPC_LINK_PROTO_SEQNO | \ + TIPC_MCAST_RBCTL) #define INVALID_BEARER_ID -1 void tipc_node_stop(struct net *net); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 1217c90a363b..f8d8fa7216b9 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -483,6 +483,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock, tsk_set_unreturnable(tsk, true); if (sock->type == SOCK_DGRAM) tsk_set_unreliable(tsk, true); + __skb_queue_head_init(&tsk->mc_method.deferredq); } trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " "); @@ -580,6 +581,7 @@ static int tipc_release(struct socket *sock) sk->sk_shutdown = SHUTDOWN_MASK; tipc_sk_leave(tsk); tipc_sk_withdraw(tsk, 0, NULL); + __skb_queue_purge(&tsk->mc_method.deferredq); sk_stop_timer(sk, &sk->sk_timer); tipc_sk_remove(tsk); @@ -817,6 +819,11 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, &tsk->cong_link_cnt); } + /* Update broadcast sequence number */ + if (rc == 0) { + method->mandatory = false; + method->expires = jiffies; + } tipc_nlist_purge(&dsts); return rc ? rc : dlen; @@ -2157,6 +2164,9 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, if (unlikely(grp)) tipc_group_filter_msg(grp, &inputq, xmitq); + if (msg_type(hdr) == TIPC_MCAST_MSG) + tipc_mcast_filter_msg(&tsk->mc_method.deferredq, &inputq); + /* Validate and add to receive buffer if there is space */ while ((skb = __skb_dequeue(&inputq))) { hdr = buf_msg(skb); -- 2.17.1 _______________________________________________ tipc-discussion mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/tipc-discussion
