Hi Jon,

It's very nice solution.

But I have a few of questions about the algorithm:

1. Is it possible to set a minimal node number which identify whether we deploy 
the new algorithm or not? For instance, if the number of nodes in a cluster is 
less than a threshold, such as, 16 or a smaller value, we still use old 
algorithm to detect link's failure. Of course, I am not sure whether it's worth 
doing this. Please identify.

2. Is it possible that we can set different tolerance values for nodes existing 
in local domain and remote domains?  In other words, if neighbors in the same 
local domain, the tolerance can be set to a short time. In contrast, if both 
neighbors are in local domain and a remote domain respectively, the tolerance 
of the link between them can set a bit longer so that the link is not 
unnecessarily broken. Maybe the idea is not reasonable. Instead, with the 
number of nodes in a cluster growing, we can automatically expand default 
link's tolerance value to avoid the fact link is unnecessarily broken, which is 
a bit simpler than the former.

Regards,
Ying

-----Original Message-----
From: Jon Maloy [mailto:jon.ma...@ericsson.com] 
Sent: 2016年4月10日 2:38
To: tipc-discussion@lists.sourceforge.net; 
parthasarathy.bhuvara...@ericsson.com; Xue, Ying; richard.a...@ericsson.com; 
jon.ma...@ericsson.com
Cc: ma...@donjonn.com
Subject: [PATCH net-next 1/1] tipc: add neighbor monitoring framework

TIPC based clusters are by default set up with full-mesh link connectivity 
between all nodes. Those links are expected to provide a short failure 
detection time, by default set to 1500 ms. Because of this, the background load 
for neighbor monitoring in an N-node cluster increases with a factor N on each 
node, while the overall monitoring traffic through the network infrastructure 
inceases at a ~(N * (N - 1)) rate. Experience has shown that such clusters 
don't scale well beyond ~100 nodes unless we significantly increase failure 
discovery tolerance.

This commit introduces a framework and an algorithm that drastically reduces 
this background load, while basically maintaining the original failure 
detection times across the whole cluster. Using this algortithm, background 
load will now grow at a rate of ~(2 * sqrt(N)) per node, and at ~(2 * N * 
sqrt(N)) in traffic overhead. As an example, each node will now have to 
actively monitor 38 neighbors in a 400-node cluster, instead of as before 399.

This "Overlapping Ring Supervision Algorithm" is completely distributed and 
employs no centralized state. It goes as follows:

- Each node makes up a linearly ascending, circular list of all its
  N known neighbors, based on their TIPC node identity. This algorithm
  must be the same on all nodes.

- The node then selects the next M = sqrt(N)-1 nodes downstream in the
  list, and chooses to actively monitor those. This is called its
  "local monitoring domain".

- It creates a domain record describing the monitoring domain, and
  piggy-backs this in the data area of all neigbor monitoring messages
  (LINK_PROTOCOL/STATE) leaving that node. This means that all nodes in
  the cluster eventually (default within 400 ms) will learn about
  its monitoring domain.

- Whenever a node discovers a change in its local domain, e.g., a node
  has been added or has gone down, it creates and sends out a new
  version of its node record to inform all neighbors about the change.

- A node receiving a domain record from anybody outside its local domain
  matches this against its own list (which may not look the same), and
  chooses to not actively monitor those members of the received domain
  record that are also present in its own list. Instead, it relies on
  indications from the direct monitoring nodes if an indirecly monitored
  node has gone up or down. If a node is indicated lost, the receiving
  node temporarily activates its own direct monitoring towards that node
  in order to confirm, or not, that it is actually gone.

- Since each node is actively montoring sqrt(N) downstream neighbors,
  each node is also actively monitored by the same number of upstream
  neighbors. This means that all non-direct monitoring nodes normally
  will receive sqrt(N) indications that a node is gone.

- A major drawback with ring monitoring is how to handle failures that
  causes massive network partitionings. If both a lost node and all its
  direct monitoring neigbors are inside the lost partition, the nodes in
  the remaining partition will never receive indications about the loss.
  To overcome this, each node also chooses to actively monitor some
  nodes outside its local domain. Those nodes are called remote domain
  "heads", and are selected in such a way that no node in the cluster
  is more than one indirect monitoring hop away. Because of this, each
  node, apart from monitoring the member of its local domain, will also
  typically monitor sqrt(N) remote head nodes.

- As an optimization, local list status, domain status and domain
  records are marked with a generation number. This saves senders from
  unecessarily conveying  unchanged domain records, and receivers from
  performing unneeded re-adaptations of their node monitoring list, such
  as re-assigning domain heads.

Signed-off-by: Jon Maloy <jon.ma...@ericsson.com>
---
 net/tipc/Makefile  |   2 +-
 net/tipc/addr.h    |   1 +
 net/tipc/bearer.c  |   8 +-
 net/tipc/bearer.h  |   2 +-
 net/tipc/core.h    |  15 ++
 net/tipc/link.c    |  32 +++-
 net/tipc/monitor.c | 537 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 net/tipc/monitor.h |  65 +++++++
 net/tipc/node.c    |  25 ++-
 9 files changed, 664 insertions(+), 23 deletions(-)  create mode 100644 
net/tipc/monitor.c  create mode 100644 net/tipc/monitor.h

diff --git a/net/tipc/Makefile b/net/tipc/Makefile index 57e460b..31b9f9c 100644
--- a/net/tipc/Makefile
+++ b/net/tipc/Makefile
@@ -6,7 +6,7 @@ obj-$(CONFIG_TIPC) := tipc.o
 
 tipc-y += addr.o bcast.o bearer.o \
           core.o link.o discover.o msg.o  \
-          name_distr.o  subscr.o name_table.o net.o  \
+          name_distr.o  subscr.o monitor.o name_table.o net.o  \
           netlink.o netlink_compat.o node.o socket.o eth_media.o \
           server.o socket.o
 
diff --git a/net/tipc/addr.h b/net/tipc/addr.h index 93f7c98..64f4004 100644
--- a/net/tipc/addr.h
+++ b/net/tipc/addr.h
@@ -73,4 +73,5 @@ int tipc_addr_node_valid(u32 addr);  int tipc_in_scope(u32 
domain, u32 addr);  int tipc_addr_scope(u32 domain);  char 
*tipc_addr_string_fill(char *string, u32 addr);
+
 #endif
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index 6f11c62..9a70e1d 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -1,7 +1,7 @@
 /*
  * net/tipc/bearer.c: TIPC bearer code
  *
- * Copyright (c) 1996-2006, 2013-2014, Ericsson AB
+ * Copyright (c) 1996-2006, 2013-2016, Ericsson AB
  * Copyright (c) 2004-2006, 2010-2013, Wind River Systems
  * All rights reserved.
  *
@@ -39,6 +39,7 @@
 #include "bearer.h"
 #include "link.h"
 #include "discover.h"
+#include "monitor.h"
 #include "bcast.h"
 #include "netlink.h"
 
@@ -313,6 +314,10 @@ restart:
        rcu_assign_pointer(tn->bearer_list[bearer_id], b);
        if (skb)
                tipc_bearer_xmit_skb(net, bearer_id, skb, &b->bcast_addr);
+
+       if (tipc_mon_create(net, bearer_id))
+               return -ENOMEM;
+
        pr_info("Enabled bearer <%s>, discovery domain %s, priority %u\n",
                name,
                tipc_addr_string_fill(addr_string, disc_domain), priority); @@ 
-348,6 +353,7 @@ static void bearer_disable(struct net *net, struct tipc_bearer 
*b)
                tipc_disc_delete(b->link_req);
        RCU_INIT_POINTER(tn->bearer_list[bearer_id], NULL);
        kfree_rcu(b, rcu);
+       tipc_mon_delete(net, bearer_id);
 }
 
 int tipc_enable_l2_media(struct net *net, struct tipc_bearer *b, diff --git 
a/net/tipc/bearer.h b/net/tipc/bearer.h index e318205..7051b0f 100644
--- a/net/tipc/bearer.h
+++ b/net/tipc/bearer.h
@@ -1,7 +1,7 @@
 /*
  * net/tipc/bearer.h: Include file for TIPC bearer code
  *
- * Copyright (c) 1996-2006, 2013-2014, Ericsson AB
+ * Copyright (c) 1996-2006, 2013-2016, Ericsson AB
  * Copyright (c) 2005, 2010-2011, Wind River Systems
  * All rights reserved.
  *
diff --git a/net/tipc/core.h b/net/tipc/core.h index eff58dc..f1555bd 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -66,6 +66,7 @@ struct tipc_bc_base;
 struct tipc_link;
 struct tipc_name_table;
 struct tipc_server;
+struct tipc_monitor;
 
 #define TIPC_MOD_VER "2.0.0"
 
@@ -88,6 +89,9 @@ struct tipc_net {
        u32 num_nodes;
        u32 num_links;
 
+       /* Neighbor monitoring list */
+       struct tipc_monitor *monitors[MAX_BEARERS];
+
        /* Bearer list */
        struct tipc_bearer __rcu *bearer_list[MAX_BEARERS + 1];
 
@@ -126,6 +130,17 @@ static inline struct list_head *tipc_nodes(struct net *net)
        return &tipc_net(net)->node_list;
 }
 
+static inline struct tipc_monitor *tipc_monitor(struct net *net,
+                                               int bearer_id)
+{
+       return tipc_net(net)->monitors[bearer_id];
+}
+
+static inline unsigned int tipc_hashfn(u32 addr) {
+       return addr & (NODE_HTABLE_SIZE - 1);
+}
+
 static inline u16 mod(u16 x)
 {
        return x & 0xffffu;
diff --git a/net/tipc/link.c b/net/tipc/link.c index 2e28a7d..d760fdc 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -42,6 +42,7 @@
 #include "name_distr.h"
 #include "discover.h"
 #include "netlink.h"
+#include "monitor.h"
 
 #include <linux/pkt_sched.h>
 
@@ -96,6 +97,7 @@ struct tipc_stats {
  * @pmsg: convenience pointer to "proto_msg" field
  * @priority: current link priority
  * @net_plane: current link network plane ('A' through 'H')
+ * @mon_state: cookie with information needed by link monitor
  * @backlog_limit: backlog queue congestion thresholds (indexed by importance)
  * @exp_msg_count: # of tunnelled messages expected during link changeover
  * @reset_rcv_checkpt: seq # of last acknowledged message at time of link 
reset @@ -140,6 +142,7 @@ struct tipc_link {
        char if_name[TIPC_MAX_IF_NAME];
        u32 priority;
        char net_plane;
+       struct tipc_mon_state mon_state;
        u16 rst_cnt;
 
        /* Failover/synch */
@@ -710,18 +713,22 @@ int tipc_link_timeout(struct tipc_link *l, struct 
sk_buff_head *xmitq)
        bool setup = false;
        u16 bc_snt = l->bc_sndlink->snd_nxt - 1;
        u16 bc_acked = l->bc_rcvlink->acked;
-
-       link_profile_stats(l);
+       struct tipc_mon_state *mstate = &l->mon_state;
 
        switch (l->state) {
        case LINK_ESTABLISHED:
        case LINK_SYNCHING:
+               link_profile_stats(l);
                if (l->silent_intv_cnt > l->abort_limit)
                        return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
                mtyp = STATE_MSG;
                state = bc_acked != bc_snt;
-               probe = l->silent_intv_cnt;
-               if (probe)
+               state |= l->rcv_unacked;
+               state |= skb_queue_len(&l->transmq);
+               state |= skb_queue_len(&l->deferdq);
+               probe = tipc_mon_probed(l->net, l->addr, mstate, l->bearer_id);
+               probe |= l->silent_intv_cnt;
+               if (probe || mstate->monitored)
                        l->silent_intv_cnt++;
                break;
        case LINK_RESET:
@@ -833,6 +840,7 @@ void tipc_link_reset(struct tipc_link *l)
        l->stats.recv_info = 0;
        l->stale_count = 0;
        l->bc_peer_is_up = false;
+       memset(&l->mon_state, 0, sizeof(l->mon_state));
        tipc_link_reset_stats(l);
 }
 
@@ -1241,6 +1249,9 @@ static void tipc_link_build_proto_msg(struct tipc_link 
*l, int mtyp, bool probe,
        struct tipc_msg *hdr;
        struct sk_buff_head *dfq = &l->deferdq;
        bool node_up = link_is_up(l->bc_rcvlink);
+       struct tipc_mon_state *mstate = &l->mon_state;
+       int dlen;
+       void *data;
 
        /* Don't send protocol message during reset or link failover */
        if (tipc_link_is_blocked(l))
@@ -1253,12 +1264,13 @@ static void tipc_link_build_proto_msg(struct tipc_link 
*l, int mtyp, bool probe,
                rcvgap = buf_seqno(skb_peek(dfq)) - l->rcv_nxt;
 
        skb = tipc_msg_create(LINK_PROTOCOL, mtyp, INT_H_SIZE,
-                             TIPC_MAX_IF_NAME, l->addr,
+                             tipc_domain_size, l->addr,
                              tipc_own_addr(l->net), 0, 0, 0);
        if (!skb)
                return;
 
        hdr = buf_msg(skb);
+       data = msg_data(hdr);
        msg_set_session(hdr, l->session);
        msg_set_bearer_id(hdr, l->bearer_id);
        msg_set_net_plane(hdr, l->net_plane);
@@ -1276,12 +1288,14 @@ static void tipc_link_build_proto_msg(struct tipc_link 
*l, int mtyp, bool probe,
                msg_set_seq_gap(hdr, rcvgap);
                msg_set_size(hdr, INT_H_SIZE);
                msg_set_probe(hdr, probe);
+               tipc_mon_prep(l->net, data, &dlen, mstate, l->bearer_id);
+               msg_set_size(hdr, INT_H_SIZE + dlen);
                l->stats.sent_states++;
                l->rcv_unacked = 0;
        } else {
                /* RESET_MSG or ACTIVATE_MSG */
                msg_set_max_pkt(hdr, l->advertised_mtu);
-               strcpy(msg_data(hdr), l->if_name);
+               strcpy(data, l->if_name);
        }
        if (probe)
                l->stats.sent_probes++;
@@ -1374,7 +1388,9 @@ static int tipc_link_proto_rcv(struct tipc_link *l, 
struct sk_buff *skb,
        u16 peers_tol = msg_link_tolerance(hdr);
        u16 peers_prio = msg_linkprio(hdr);
        u16 rcv_nxt = l->rcv_nxt;
+       u16 dlen = msg_data_sz(hdr);
        int mtyp = msg_type(hdr);
+       void *data = msg_data(hdr);
        char *if_name;
        int rc = 0;
 
@@ -1403,7 +1419,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, 
struct sk_buff *skb,
                        break;
                if (msg_data_sz(hdr) < TIPC_MAX_IF_NAME)
                        break;
-               strncpy(if_name, msg_data(hdr), TIPC_MAX_IF_NAME);
+               strncpy(if_name, data, TIPC_MAX_IF_NAME);
 
                /* Update own tolerance if peer indicates a non-zero value */
                if (in_range(peers_tol, TIPC_MIN_LINK_TOL, TIPC_MAX_LINK_TOL)) 
@@ -1451,6 +1467,8 @@ static int tipc_link_proto_rcv(struct tipc_link *l, 
struct sk_buff *skb,
                                rc = TIPC_LINK_UP_EVT;
                        break;
                }
+               tipc_mon_rcv(l->net, data, dlen, l->addr,
+                            &l->mon_state, l->bearer_id);
 
                /* Send NACK if peer has sent pkts we haven't received yet */
                if (more(peers_snd_nxt, rcv_nxt) && !tipc_link_is_synching(l)) 
diff --git a/net/tipc/monitor.c b/net/tipc/monitor.c new file mode 100644 index 
0000000..ec0c6b4
--- /dev/null
+++ b/net/tipc/monitor.c
@@ -0,0 +1,537 @@
+/*
+ * net/tipc/monitor.c
+ *
+ * Copyright (c) 2016, Ericsson AB
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. Neither the names of the copyright holders nor the names of its
+ *    contributors may be used to endorse or promote products derived from
+ *    this software without specific prior written permission.
+ *
+ * Alternatively, this software may be distributed under the terms of 
+the
+ * GNU General Public License ("GPL") version 2 as published by the 
+Free
+ * Software Foundation.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
+TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
+PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR 
+CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR 
+BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER 
+IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 
+OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED 
+OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "core.h"
+#include "addr.h"
+#include "monitor.h"
+
+#define TIPC_MAX_MON_DOMAIN     64
+
+/* struct tipc_mon_domain: domain record to be transferred between 
+peers
+ * @dom_gen: current generation of peer's domain
+ * @ack_dom_gen: most recent generation of self's domain acked by peer
+ * @member_cnt: number of domain member nodes described in this record
+ * @up_map: bit map indicating which of the members the sender 
+considers up
+ * @members: identity of the domain members  */ struct tipc_mon_domain 
+{
+       u16 len;
+       u16 dom_gen;
+       u16 ack_dom_gen;
+       u16 member_cnt;
+       u64 up_map;
+       u32 members[TIPC_MAX_MON_DOMAIN];
+};
+
+/* struct tipc_peer: state of a peer node and its domain
+ * @addr: tipc node identity of peer
+ * @head_map: shows which other nodes currently consider peer 'up'
+ * @domain: most recent domain record from peer
+ * @hash: position in hashed lookup list
+ * @list: position in linked list, in circular ascending order by 'addr'
+ * @monitoring: number of nodes monitored by peer, as seen from this 
+node
+ * @up: peer is up as seen from this node
+ * @head: peer is assigned domain head by this node
+ * @local: peer is in local domain and should be continuously monitored
+ * @confirm: - set and start probing if some other peer has lost link  
+*/ struct tipc_peer {
+       u32 addr;
+       u64 head_map;
+       struct tipc_mon_domain *domain;
+       struct hlist_node hash;
+       struct list_head list;
+       unsigned int monitoring   : 6;
+       unsigned int up           : 1;
+       unsigned int head         : 1;
+       unsigned int local        : 1;
+       unsigned int confirm      : 1;
+       u32 upcnt;
+};
+
+struct tipc_monitor {
+       struct hlist_head peers[NODE_HTABLE_SIZE];
+       int peer_cnt;
+       struct tipc_peer *self;
+       rwlock_t lock;
+       struct tipc_mon_domain cache;
+       u16 net_gen;
+       u16 dom_gen;
+       bool disabled;
+       struct net *net;
+};
+
+const int tipc_domain_size = sizeof(struct tipc_mon_domain);
+
+static int dom_rec_len(struct tipc_mon_domain *dom, u16 mcnt) {
+       return ((void *)&dom->members - (void *)dom) + (mcnt * sizeof(u32)); }
+
+static void map_set(u64 *up_map, int i, unsigned int v) {
+       *up_map &= ~(1 << i);
+       *up_map |= (v << i);
+}
+
+static int map_get(u64 up_map, int i)
+{
+       return (up_map & (1 << i)) >> i;
+}
+
+static struct tipc_peer *peer_prev(struct tipc_peer *peer) {
+       return list_entry(peer->list.prev, struct tipc_peer, list); }
+
+static struct tipc_peer *peer_nxt(struct tipc_peer *peer) {
+       return list_entry(peer->list.next, struct tipc_peer, list); }
+
+static struct tipc_peer *peer_head(struct tipc_peer *peer) {
+       while (!peer->head)
+               peer = peer_prev(peer);
+       return peer;
+}
+
+/* dom_size() : calculate size of own domain based on number of peers  
+*/ static int dom_size(int peers) {
+       unsigned int i = 0;
+
+       while ((i * i) < peers)
+               i++;
+       return i < TIPC_MAX_MON_DOMAIN ? i : TIPC_MAX_MON_DOMAIN; }
+
+static struct tipc_peer *mon_peer(struct tipc_monitor *mon, u32 addr) {
+       struct tipc_peer *peer;
+       unsigned int thash = tipc_hashfn(addr);
+
+       hlist_for_each_entry(peer, &mon->peers[thash], hash) {
+               if (peer->addr == addr)
+                       return peer;
+       }
+       return NULL;
+}
+
+/* mon_match_domain() : match a peer's domain record against monitor 
+list  */ static void mon_match_domain(struct tipc_monitor *mon,
+                            struct tipc_peer *peer)
+{
+       struct tipc_mon_domain *dom = peer->domain;
+       struct tipc_peer *mmbr;
+       u64 prev_map;
+       u32 addr;
+       int up, i = 0;
+
+       if (!dom || !peer->up)
+               return;
+
+       /* Scan across domain members and match against monitor list */
+       peer->monitoring = 0;
+       mmbr = peer_nxt(peer);
+       for (; i < dom->member_cnt; i++, mmbr = peer_nxt(mmbr)) {
+               addr = dom->members[i];
+               if (addr != mmbr->addr)
+                       return;
+               if (addr == tipc_own_addr(mon->net))
+                       return;
+               peer->monitoring++;
+               prev_map = mmbr->head_map;
+               up = map_get(dom->up_map, i);
+               map_set(&mmbr->head_map, i, up);
+               if (mmbr->up && (mmbr->head_map != prev_map) && !up)
+                       mmbr->confirm = 1;
+       }
+}
+
+/* mon_update_local_domain() : update after peer 
+addition/removal/up/down  */ static void mon_update_local_domain(struct 
+tipc_monitor *mon) {
+       struct tipc_peer *self = mon->self;
+       struct tipc_mon_domain *cache = &mon->cache;
+       struct tipc_mon_domain *dom = self->domain;
+       struct tipc_peer *peer = self;
+       int mmbr_cnt = self->monitoring;
+       int i;
+
+       /* Update native and cached outgoing local domain records */
+       dom->len = dom_rec_len(dom, mmbr_cnt);
+       dom->dom_gen = ++mon->dom_gen;
+       dom->member_cnt = mmbr_cnt;
+       for (i = 0; i < mmbr_cnt; i++) {
+               peer = peer_nxt(peer);
+               dom->members[i] = peer->addr;
+               map_set(&dom->up_map, i, peer->up);
+               cache->members[i] = htonl(peer->addr);
+       }
+       cache->len = htons(dom->len);
+       cache->dom_gen = htons(dom->dom_gen);
+       cache->member_cnt = htons(mmbr_cnt);
+       cache->up_map = cpu_to_be64(dom->up_map);
+       mon_match_domain(mon, self);
+}
+
+/* mon_update_neighbors() : update neighbors around an added/removed 
+peer  */ static void mon_update_neighbors(struct tipc_monitor *mon,
+                                struct tipc_peer *peer)
+{
+       int dz, i;
+
+       dz = dom_size(mon->peer_cnt);
+       for (i = 0; i < dz; i++, peer = peer_nxt(peer))
+               peer->head_map = 0;
+       for (i = 0; i < (dz * 2); i++, peer = peer_prev(peer))
+               mon_match_domain(mon, peer);
+}
+
+/* mon_assign_roles() : reassign peer roles after a network change
+ * The monitor list is consistent at this stage; i.e., each peer is 
+monitoring
+ * a set of domain members as matched beween domain record and the 
+monitor list  */ static void mon_assign_roles(struct tipc_monitor *mon, 
+struct tipc_peer *head) {
+       struct tipc_peer *peer = peer_nxt(head);
+       int i = 0;
+
+       for (; peer != mon->self; peer = peer_nxt(peer)) {
+               peer->local = 0;
+
+               /* Update domain member */
+               if (i++ < head->monitoring) {
+                       peer->head = 0;
+                       if (head == mon->self)
+                               peer->local = 1;
+                       continue;
+               }
+               /* Assign next domain head */
+               if (!peer->up)
+                       continue;
+               if (peer->head)
+                       break;
+               head = peer;
+               head->head = 1;
+               i = 0;
+       }
+       mon->net_gen++;
+}
+
+void tipc_mon_remove_peer(struct net *net, u32 addr, int bearer_id) {
+       struct tipc_monitor *mon = tipc_monitor(net, bearer_id);
+       struct tipc_peer *self = mon->self;
+       struct tipc_peer *peer, *prev, *head;
+
+       write_lock_bh(&mon->lock);
+       peer = mon_peer(mon, addr);
+       if (!peer)
+               goto exit;
+       prev = peer_prev(peer);
+       list_del(&peer->list);
+       hlist_del(&peer->hash);
+       kfree(peer->domain);
+       kfree(peer);
+       mon->peer_cnt--;
+       head = peer_head(prev);
+       if (head == self) {
+               self->monitoring = dom_size(mon->peer_cnt) - 1;
+               mon_update_local_domain(mon);
+       }
+       mon_update_neighbors(mon, prev);
+       mon_assign_roles(mon, head);
+exit:
+       write_unlock_bh(&mon->lock);
+}
+
+void tipc_mon_peer_up(struct net *net, u32 addr, int bearer_id) {
+       struct tipc_monitor *mon = tipc_monitor(net, bearer_id);
+       struct tipc_peer *self = mon->self;
+       struct tipc_peer *peer, *head, *cur, *prev;
+       int pc = mon->peer_cnt;
+
+       write_lock_bh(&mon->lock);
+       peer = mon_peer(mon, addr);
+
+       /* Create peer and sort into ascending circular list if applicable */
+       if (!peer) {
+               peer = kzalloc(sizeof(*peer), GFP_ATOMIC);
+               if (!peer)
+                       goto exit;
+               peer->addr = addr;
+               INIT_LIST_HEAD(&peer->list);
+               hlist_add_head(&peer->hash, &mon->peers[tipc_hashfn(addr)]);
+               prev = self;
+               list_for_each_entry(cur, &self->list, list) {
+                       if ((addr > prev->addr) && (addr < cur->addr))
+                               break;
+                       if (((addr < cur->addr) || (addr > prev->addr)) &&
+                           (prev->addr > cur->addr))
+                               break;
+                       prev = cur;
+               }
+               list_add_tail(&peer->list, &cur->list);
+               mon->peer_cnt++;
+       }
+       peer->up = 1;
+       head = peer_head(peer);
+       if (head == self) {
+               self->monitoring = dom_size(mon->peer_cnt) - 1;
+               mon_update_local_domain(mon);
+       }
+       /* If new peer, update a safe number of peers around it */
+       if (mon->peer_cnt != pc)
+               mon_update_neighbors(mon, peer);
+       mon_assign_roles(mon, head);
+       peer->upcnt++;
+exit:
+       write_unlock_bh(&mon->lock);
+}
+
+void tipc_mon_peer_down(struct net *net, u32 addr, int bearer_id) {
+       struct tipc_monitor *mon = tipc_monitor(net, bearer_id);
+       struct tipc_peer *peer, *mmbr, *head;
+       struct tipc_peer *self = mon->self;
+       int i = 0;
+
+       write_lock_bh(&mon->lock);
+       peer = mon_peer(mon, addr);
+       if (!peer) {
+               pr_warn("Mon: unknown link %x/%u DOWN\n", addr, bearer_id);
+               goto exit;
+       }
+       /* Update domain members' head_map field */
+       if (peer->domain) {
+               peer->domain->up_map = 0;
+               mon_match_domain(mon, peer);
+       }
+       /* Suppress member probing if peer was not domain head */
+       mmbr = peer_nxt(peer);
+       while (!peer->head && (i++ < peer->monitoring)) {
+               mmbr->confirm = 0;
+               mmbr = peer_nxt(mmbr);
+       }
+       peer->up = 0;
+       peer->head = 0;
+       peer->local = 0;
+       peer->monitoring = 0;
+       peer->confirm = 0;
+       kfree(peer->domain);
+       peer->domain = 0;
+       head = peer_head(peer);
+       if (head == self)
+               mon_update_local_domain(mon);
+       mon_assign_roles(mon, head);
+exit:
+       write_unlock_bh(&mon->lock);
+}
+
+/* tipc_mon_rcv - process monitor domain event message  */ void 
+tipc_mon_rcv(struct net *net, void *data, u16 dlen, u32 addr,
+                 struct tipc_mon_state *state, int bearer_id) {
+       struct tipc_monitor *mon = tipc_monitor(net, bearer_id);
+       struct tipc_mon_domain *ndom = data;
+       u16 nmmbr_cnt = ntohs(ndom->member_cnt);
+       int ndlen = dom_rec_len(ndom, nmmbr_cnt);
+       u16 ndgen = ntohs(ndom->dom_gen);
+       struct tipc_mon_domain *dom;
+       struct tipc_peer *peer;
+       int i;
+
+       if ((dlen != ntohs(ndom->len)) || (dlen != ndlen))
+               return;
+       state->ack_dom_gen = ntohs(ndom->ack_dom_gen);
+
+       /* Ignore if this generation already received */
+       if (!more(ndgen, state->peer_dom_gen) && !state->probed)
+               return;
+       state->probed = 0;
+
+       write_lock_bh(&mon->lock);
+       peer = mon_peer(mon, addr);
+       if (!peer)
+               goto exit;
+       if (!more(ndgen, state->peer_dom_gen))
+               goto exit;
+       state->peer_dom_gen = ndgen;
+       if (!peer->up)
+               goto exit;
+
+       /* Transform and store received domain record */
+       dom = peer->domain;
+       if (!dom || (dom->len < ndlen)) {
+               kfree(dom);
+               dom = kmalloc(ndlen, GFP_ATOMIC);
+               peer->domain = dom;
+               if (!dom)
+                       goto exit;
+       }
+       dom->len = ndlen;
+       dom->dom_gen = ndgen;
+       dom->member_cnt = nmmbr_cnt;
+       dom->up_map = be64_to_cpu(ndom->up_map);
+       for (i = 0; i < nmmbr_cnt; i++)
+               dom->members[i] = ntohl(ndom->members[i]);
+
+       /* Update affected peer and its domain */
+       mon_match_domain(mon, peer);
+       peer->confirm = 0;
+       mon_assign_roles(mon, peer_head(peer));
+exit:
+       write_unlock_bh(&mon->lock);
+}
+
+void tipc_mon_prep(struct net *net, void *data, int *dlen,
+                  struct tipc_mon_state *state, int bearer_id) {
+       struct tipc_monitor *mon = tipc_monitor(net, bearer_id);
+       struct tipc_mon_domain *dom = data;
+       u16 dgen = state->dom_gen;
+
+       if (!less(state->ack_dom_gen, dgen) || mon->disabled) {
+               *dlen = dom_rec_len(dom, 0);
+               dom->len = htons(dom_rec_len(dom, 0));
+               dom->dom_gen = htons(dgen);
+               dom->ack_dom_gen = htons(state->peer_dom_gen);
+               dom->member_cnt = 0;
+               return;
+       }
+       read_lock_bh(&mon->lock);
+       *dlen = ntohs(mon->cache.len);
+       memcpy(data, &mon->cache, *dlen);
+       read_unlock_bh(&mon->lock);
+       dom->ack_dom_gen = htons(state->peer_dom_gen); }
+
+bool tipc_mon_probed(struct net *net, u32 addr, struct tipc_mon_state *state,
+                    int bearer_id)
+{
+       struct tipc_monitor *mon = tipc_monitor(net, bearer_id);
+       struct tipc_peer *peer;
+
+       if (mon->disabled)
+               return false;
+
+       if (!state->probed &&
+           !less(state->net_gen, mon->net_gen) &&
+           !less(state->ack_dom_gen, state->dom_gen))
+               return false;
+
+       read_lock_bh(&mon->lock);
+       peer = mon_peer(mon, addr);
+       if (peer) {
+               state->probed = less(state->dom_gen, mon->dom_gen);
+               state->probed |= less(state->ack_dom_gen, state->dom_gen);
+               state->probed |= peer->confirm;
+               peer->confirm = 0;
+               state->monitored = peer->local;
+               state->monitored |= peer->head;
+               state->monitored |= !peer->head_map;
+               state->net_gen = mon->net_gen;
+               state->dom_gen = mon->dom_gen;
+       }
+       read_unlock_bh(&mon->lock);
+       return state->probed || state->monitored; }
+
+int tipc_mon_create(struct net *net, int bearer_id) {
+       struct tipc_net *tn = tipc_net(net);
+       struct tipc_monitor *mon;
+       struct tipc_peer *self;
+       struct tipc_mon_domain *dom;
+
+       if (tn->monitors[bearer_id])
+               return 0;
+
+       mon = kzalloc(sizeof(*mon), GFP_ATOMIC);
+       self = kzalloc(sizeof(*self), GFP_ATOMIC);
+       dom = kzalloc(sizeof(*dom), GFP_ATOMIC);
+       if (!mon || !self || !dom) {
+               kfree(mon);
+               kfree(self);
+               kfree(dom);
+               return -ENOMEM;
+       }
+       tn->monitors[bearer_id] = mon;
+       rwlock_init(&mon->lock);
+       mon->net = net;
+       mon->peer_cnt = 1;
+       mon->self = self;
+       self->domain = dom;
+       self->addr = tipc_own_addr(net);
+       self->up = 1;
+       self->head = 1;
+       INIT_LIST_HEAD(&self->list);
+       return 0;
+}
+
+void tipc_mon_disable(struct net *net, int bearer_id) {
+       tipc_monitor(net, bearer_id)->disabled = true; }
+
+void tipc_mon_delete(struct net *net, int bearer_id) {
+       struct tipc_net *tn = tipc_net(net);
+       struct tipc_monitor *mon = tipc_monitor(net, bearer_id);
+       struct tipc_peer *self = mon->self;
+       struct tipc_peer *peer, *tmp;
+
+       write_lock_bh(&mon->lock);
+       tn->monitors[bearer_id] = NULL;
+       list_for_each_entry_safe(peer, tmp, &self->list, list) {
+               list_del(&peer->list);
+               hlist_del(&peer->hash);
+               kfree(peer->domain);
+               kfree(peer);
+       }
+       kfree(self->domain);
+       kfree(self);
+       write_unlock_bh(&mon->lock);
+       tn->monitors[bearer_id] = NULL;
+       kfree(mon);
+}
diff --git a/net/tipc/monitor.h b/net/tipc/monitor.h new file mode 100644 index 
0000000..1975d55
--- /dev/null
+++ b/net/tipc/monitor.h
@@ -0,0 +1,65 @@
+/*
+ * net/tipc/monitor.h
+ *
+ * Copyright (c) 2015, Ericsson AB
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. Neither the names of the copyright holders nor the names of its
+ *    contributors may be used to endorse or promote products derived from
+ *    this software without specific prior written permission.
+ *
+ * Alternatively, this software may be distributed under the terms of 
+the
+ * GNU General Public License ("GPL") version 2 as published by the 
+Free
+ * Software Foundation.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 
+TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
+PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR 
+CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR 
+BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER 
+IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 
+OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED 
+OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _TIPC_MONITOR_H
+#define _TIPC_MONITOR_H
+
+struct tipc_mon_state {
+       u16 net_gen;
+       u16 dom_gen;
+       u16 peer_dom_gen;
+       u16 ack_dom_gen;
+       bool monitored;
+       bool probed;
+};
+
+int tipc_mon_create(struct net *net, int bearer_id); void 
+tipc_mon_disable(struct net *net, int bearer_id); void 
+tipc_mon_delete(struct net *net, int bearer_id);
+
+void tipc_mon_peer_up(struct net *net, u32 addr, int bearer_id); void 
+tipc_mon_peer_down(struct net *net, u32 addr, int bearer_id); void 
+tipc_mon_prep(struct net *net, void *data, int *dlen,
+                  struct tipc_mon_state *state, int bearer_id); void 
+tipc_mon_rcv(struct net *net, void *data, u16 dlen, u32 addr,
+                 struct tipc_mon_state *state, int bearer_id); bool 
+tipc_mon_probed(struct net *net, u32 addr,
+                    struct tipc_mon_state *state,
+                    int bearer_id);
+void tipc_mon_add_peer(struct net *net, u32 addr, int bearer_id); void 
+tipc_mon_remove_peer(struct net *net, u32 addr, int bearer_id);
+
+extern const int tipc_domain_size;
+#endif
diff --git a/net/tipc/node.c b/net/tipc/node.c index 68d9f7b..43f2d78 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -40,6 +40,7 @@
 #include "name_distr.h"
 #include "socket.h"
 #include "bcast.h"
+#include "monitor.h"
 #include "discover.h"
 #include "netlink.h"
 
@@ -191,16 +192,6 @@ int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel)
        tipc_node_put(n);
        return mtu;
 }
-/*
- * A trivial power-of-two bitmask technique is used for speed, since this
- * operation is done for every incoming TIPC packet. The number of hash table
- * entries has been chosen so that no hash chain exceeds 8 nodes and will
- * usually be much smaller (typically only a single node).
- */
-static unsigned int tipc_hashfn(u32 addr) -{
-       return addr & (NODE_HTABLE_SIZE - 1);
-}
 
 static void tipc_node_kref_release(struct kref *kref)  { @@ -265,6 +256,7 @@ 
static void tipc_node_write_unlock(struct tipc_node *n)
        u32 addr = 0;
        u32 flags = n->action_flags;
        u32 link_id = 0;
+       u32 bearer_id;
        struct list_head *publ_list;
 
        if (likely(!flags)) {
@@ -274,6 +266,7 @@ static void tipc_node_write_unlock(struct tipc_node *n)
 
        addr = n->addr;
        link_id = n->link_id;
+       bearer_id = link_id & 0xffff;
        publ_list = &n->publ_list;
 
        n->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP | @@ 
-287,13 +280,16 @@ static void tipc_node_write_unlock(struct tipc_node *n)
        if (flags & TIPC_NOTIFY_NODE_UP)
                tipc_named_node_up(net, addr);
 
-       if (flags & TIPC_NOTIFY_LINK_UP)
+       if (flags & TIPC_NOTIFY_LINK_UP) {
+               tipc_mon_peer_up(net, addr, bearer_id);
                tipc_nametbl_publish(net, TIPC_LINK_STATE, addr, addr,
                                     TIPC_NODE_SCOPE, link_id, addr);
-
-       if (flags & TIPC_NOTIFY_LINK_DOWN)
+       }
+       if (flags & TIPC_NOTIFY_LINK_DOWN) {
+               tipc_mon_peer_down(net, addr, bearer_id);
                tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
                                      link_id, addr);
+       }
 }
 
 struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 
capabilities) @@ -674,6 +670,7 @@ static void tipc_node_link_down(struct 
tipc_node *n, int bearer_id, bool delete)
        struct tipc_link *l = le->link;
        struct tipc_media_addr *maddr;
        struct sk_buff_head xmitq;
+       int old_bearer_id = bearer_id;
 
        if (!l)
                return;
@@ -693,6 +690,8 @@ static void tipc_node_link_down(struct tipc_node *n, int 
bearer_id, bool delete)
                tipc_link_fsm_evt(l, LINK_RESET_EVT);
        }
        tipc_node_write_unlock(n);
+       if (delete)
+               tipc_mon_remove_peer(n->net, n->addr, old_bearer_id);
        tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr);
        tipc_sk_rcv(n->net, &le->inputq);
 }
--
1.9.1

------------------------------------------------------------------------------
Find and fix application performance issues faster with Applications Manager
Applications Manager provides deep performance insights into multiple tiers of
your business applications. It resolves application problems quickly and
reduces your MTTR. Get your free trial!
https://ad.doubleclick.net/ddm/clk/302982198;130105516;z
_______________________________________________
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to