TIPC multicast messages are currently carried over a reliable
'broadcast link', making use of the underlying media's ability to
transport packets as L2 broadcast or IP multicast to all nodes in
the cluster.

When the used bearer is lacking that ability, we can instead emulate
the broadcast service by replicating and sending the packets over as
many unicast links as needed to reach all identified destinations.
We now introduce a new TIPC link-level 'replicast' service that does
this.

Signed-off-by: Jon Maloy <jon.ma...@ericsson.com>
---
 net/tipc/bcast.c  | 105 ++++++++++++++++++++++++++++++++++++++++++------------
 net/tipc/bcast.h  |   5 +--
 net/tipc/link.c   |   8 ++++-
 net/tipc/msg.c    |  17 +++++++++
 net/tipc/msg.h    |   2 ++
 net/tipc/node.c   |  27 +++++++++-----
 net/tipc/socket.c |  29 ++++++++++-----
 7 files changed, 149 insertions(+), 44 deletions(-)

diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 886df68..e7b4d6b 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -39,7 +39,7 @@
 #include "socket.h"
 #include "msg.h"
 #include "bcast.h"
-#include "name_distr.h"
+#include "name_table.h"
 #include "link.h"
 #include "node.h"
 
@@ -176,43 +176,102 @@ static void tipc_bcbase_xmit(struct net *net, struct 
sk_buff_head *xmitq)
        __skb_queue_purge(&_xmitq);
 }
 
-/* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster
- *                    and to identified node local sockets
+/* tipc_bcast_xmit - broadcast the buffer chain to all external nodes
  * @net: the applicable net namespace
- * @list: chain of buffers containing message
+ * @msg: chain of buffers containing message
  * Consumes the buffer chain, except when returning -ELINKCONG
  * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
  */
-int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
+static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *msg)
 {
        struct tipc_link *l = tipc_bc_sndlink(net);
-       struct sk_buff_head xmitq, inputq, rcvq;
+       struct sk_buff_head xmitq;
        int rc = 0;
 
-       __skb_queue_head_init(&rcvq);
        __skb_queue_head_init(&xmitq);
-       skb_queue_head_init(&inputq);
-
-       /* Prepare message clone for local node */
-       if (unlikely(!tipc_msg_reassemble(list, &rcvq)))
-               return -EHOSTUNREACH;
-
        tipc_bcast_lock(net);
        if (tipc_link_bc_peers(l))
-               rc = tipc_link_xmit(l, list, &xmitq);
+               rc = tipc_link_xmit(l, msg, &xmitq);
        tipc_bcast_unlock(net);
+       tipc_bcbase_xmit(net, &xmitq);
+       return rc;
+}
+
+/* tipc_rcast_xmit - replicate and send a message to given destination nodes
+ * @net: the applicable net namespace
+ * @msg: chain of buffers containing message
+ * @dests: list of destination nodes
+ * Returns -ELINKCONG if any link is congested, otherwise 0
+ */
+static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *msg,
+                          struct tipc_nlist *dests)
+{
+       struct sk_buff_head _msg;
+       struct tipc_nitem *n, *tmp;
+       int rc = 0;
+       u32 dst;
+
+       __skb_queue_head_init(&_msg);
+
+       list_for_each_entry_safe(n, tmp, &dests->unsent, list) {
+               dst = n->node;
+               if (!tipc_msg_pskb_copy(dst, msg, &_msg))
+                       return -ENOMEM;
+
+               /* Already congestion? Ensure there will be only one wakeup */
+               TIPC_SKB_CB(skb_peek(&_msg))->wakeup_pending = rc;
+
+               /* Any other failure than -ELINKCONG is ignored */
+               if (tipc_node_xmit(net, &_msg, dst, dst) == -ELINKCONG)
+                       rc = -ELINKCONG;
+               else
+                       tipc_nlist_sent(dests, n);
+
+               /* Message copy list is non-empty if sending failed */
+               __skb_queue_purge(&_msg);
+       }
+       return rc;
+}
 
-       /* Don't send to local node if adding to link failed */
-       if (unlikely(rc)) {
-               __skb_queue_purge(&rcvq);
-               return rc;
+/* tipc_mcast_xmit - deliver message to indicated destination nodes
+ *                   and to identified node local sockets
+ * @net: the applicable net namespace
+ * @msg: chain of buffers containing message
+ * @dests: destination nodes for message.
+ * Consumes buffer chain, except when returning -ELINKCONG
+ * Returns dest list with all items present in either 'sent' or 'unsent' list
+ * Returns 0 if success, otherwise errno
+ */
+int tipc_mcast_xmit(struct net *net, struct sk_buff_head *msg,
+                   struct tipc_nlist *dests)
+{
+       struct tipc_bc_base *bb = tipc_bc_base(net);
+       struct sk_buff_head inputq;
+       int rc = 0;
+
+       skb_queue_head_init(&inputq);
+
+       /* Create message clone for local node if applicable */
+       if (dests->local && skb_queue_empty(&dests->localq) &&
+           !tipc_msg_reassemble(msg, &dests->localq)) {
+               rc  = -EHOSTUNREACH;
+               goto exit;
        }
 
-       /* Broadcast to all nodes, inluding local node */
-       tipc_bcbase_xmit(net, &xmitq);
-       tipc_sk_mcast_rcv(net, &rcvq, &inputq);
-       __skb_queue_purge(list);
-       return 0;
+       if (dests->remote) {
+               if (!bb->bcast_support)
+                       rc = tipc_rcast_xmit(net, msg, dests);
+               else
+                       rc = tipc_bcast_xmit(net, msg);
+
+               if (rc == -ELINKCONG)
+                       return rc;
+       }
+       if (dests->local && !rc)
+               tipc_sk_mcast_rcv(net, &dests->localq, &inputq);
+exit:
+       __skb_queue_purge(msg);
+       return rc;
 }
 
 /* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 5ffe344..65de265 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -42,7 +42,7 @@
 struct tipc_node;
 struct tipc_msg;
 struct tipc_nl_msg;
-struct tipc_node_map;
+struct tipc_nlist;
 extern const char tipc_bclink_name[];
 
 int tipc_bcast_init(struct net *net);
@@ -53,7 +53,8 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link 
*rcv_bcl);
 void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id);
 void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id);
 int  tipc_bcast_get_mtu(struct net *net);
-int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list);
+int tipc_mcast_xmit(struct net *net, struct sk_buff_head *msg,
+                   struct tipc_nlist *dst_nodes);
 int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
 void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked);
 int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
diff --git a/net/tipc/link.c b/net/tipc/link.c
index b36e16c..d06c491 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1036,11 +1036,17 @@ int tipc_link_retrans(struct tipc_link *l, u16 from, 
u16 to,
 static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
                            struct sk_buff_head *inputq)
 {
-       switch (msg_user(buf_msg(skb))) {
+       struct tipc_msg *hdr = buf_msg(skb);
+
+       switch (msg_user(hdr)) {
        case TIPC_LOW_IMPORTANCE:
        case TIPC_MEDIUM_IMPORTANCE:
        case TIPC_HIGH_IMPORTANCE:
        case TIPC_CRITICAL_IMPORTANCE:
+               if (unlikely(msg_type(hdr) == TIPC_MCAST_MSG)) {
+                       skb_queue_tail(l->bc_rcvlink->inputq, skb);
+                       return true;
+               }
        case CONN_MANAGER:
                skb_queue_tail(inputq, skb);
                return true;
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 17201aa..f4e6197 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -607,6 +607,23 @@ error:
        return false;
 }
 
+bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
+                       struct sk_buff_head *cpy)
+{
+       struct sk_buff *skb, *_skb;
+
+       skb_queue_walk(msg, skb) {
+               _skb = pskb_copy(skb, GFP_ATOMIC);
+               if (!_skb) {
+                       __skb_queue_purge(cpy);
+                       return false;
+               }
+               msg_set_destnode(buf_msg(_skb), dst);
+               __skb_queue_tail(cpy, _skb);
+       }
+       return true;
+}
+
 /* tipc_skb_queue_sorted(); sort pkt into list according to sequence number
  * @list: list to be appended to
  * @seqno: sequence number of buffer to add
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index c3832cd..b15374b 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -820,6 +820,8 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
                   int offset, int dsz, int mtu, struct sk_buff_head *list);
 bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err);
 bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq);
+bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
+                       struct sk_buff_head *cpy);
 void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
                             struct sk_buff *skb);
 
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 7ef14e2..efc5673 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -1262,6 +1262,19 @@ void tipc_node_broadcast(struct net *net, struct sk_buff 
*skb)
        kfree_skb(skb);
 }
 
+static void tipc_node_mcast_rcv(struct tipc_node *n)
+{
+       struct tipc_bclink_entry *be = &n->bc_entry;
+
+       /* 'arrvq' is under inputq2's lock protection */
+       spin_lock_bh(&be->inputq2.lock);
+       spin_lock_bh(&be->inputq1.lock);
+       skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
+       spin_unlock_bh(&be->inputq1.lock);
+       spin_unlock_bh(&be->inputq2.lock);
+       tipc_sk_mcast_rcv(n->net, &be->arrvq, &be->inputq2);
+}
+
 static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg *hdr,
                                  int bearer_id, struct sk_buff_head *xmitq)
 {
@@ -1335,15 +1348,8 @@ static void tipc_node_bc_rcv(struct net *net, struct 
sk_buff *skb, int bearer_id
        if (!skb_queue_empty(&xmitq))
                tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
 
-       /* Deliver. 'arrvq' is under inputq2's lock protection */
-       if (!skb_queue_empty(&be->inputq1)) {
-               spin_lock_bh(&be->inputq2.lock);
-               spin_lock_bh(&be->inputq1.lock);
-               skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
-               spin_unlock_bh(&be->inputq1.lock);
-               spin_unlock_bh(&be->inputq2.lock);
-               tipc_sk_mcast_rcv(net, &be->arrvq, &be->inputq2);
-       }
+       if (!skb_queue_empty(&be->inputq1))
+               tipc_node_mcast_rcv(n);
 
        if (rc & TIPC_LINK_DOWN_EVT) {
                /* Reception reassembly failure => reset all links to peer */
@@ -1570,6 +1576,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, 
struct tipc_bearer *b)
        if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))
                tipc_named_rcv(net, &n->bc_entry.namedq);
 
+       if (unlikely(!skb_queue_empty(&n->bc_entry.inputq1)))
+               tipc_node_mcast_rcv(n);
+
        if (!skb_queue_empty(&le->inputq))
                tipc_sk_rcv(net, &le->inputq);
 
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index f9f5f3c..38e1eef 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -694,8 +694,11 @@ static int tipc_sendmcast(struct  socket *sock, struct 
tipc_name_seq *seq,
        struct tipc_msg *mhdr = &tsk->phdr;
        struct sk_buff_head pktchain;
        struct iov_iter save = msg->msg_iter;
-       uint mtu;
-       int rc;
+       struct tipc_nlist dests;
+       u32 domain;
+       int mtu, rc;
+
+       skb_queue_head_init(&pktchain);
 
        msg_set_type(mhdr, TIPC_MCAST_MSG);
        msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
@@ -706,19 +709,25 @@ static int tipc_sendmcast(struct  socket *sock, struct 
tipc_name_seq *seq,
        msg_set_nameupper(mhdr, seq->upper);
        msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
 
-       skb_queue_head_init(&pktchain);
-
+       /* Find all destination nodes */
+       tipc_nlist_init(&dests);
+       domain = addr_domain(net, TIPC_CLUSTER_SCOPE);
+       tipc_nametbl_lookup_dst_nodes(net, seq->type, seq->lower,
+                                     seq->upper, domain, &dests);
+       if (!dests.local && !dests.remote)
+               return -EHOSTUNREACH;
 new_mtu:
        mtu = tipc_bcast_get_mtu(net);
        rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &pktchain);
        if (unlikely(rc < 0))
-               return rc;
+               goto exit;
 
        do {
-               rc = tipc_bcast_xmit(net, &pktchain);
-               if (likely(!rc))
-                       return dsz;
-
+               rc = tipc_mcast_xmit(net, &pktchain, &dests);
+               if (likely(!rc)) {
+                       rc = dsz;
+                       break;
+               }
                if (rc == -ELINKCONG) {
                        tsk->link_cong = 1;
                        rc = tipc_wait_for_sndmsg(sock, &timeo);
@@ -732,6 +741,8 @@ new_mtu:
                }
                break;
        } while (1);
+exit:
+       tipc_nlist_purge(&dests);
        return rc;
 }
 
-- 
2.7.4


------------------------------------------------------------------------------
_______________________________________________
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to