With the new RB tree structure for service ranges it becomes possible to solve an old problem; - we can now allow overlapping service ranges in the table.
When inserting a new service range to the tree, we use 'lower' as primary key, and when necessary 'upper' as secondary key. This guarantees that a well-formed binding item from a peer never will be rejected, and makes it possible to eliminate the problematic backlog functionality we currently have for handling such cases. Signed-off-by: Jon Maloy <[email protected]> --- net/tipc/binding_distr.c | 102 +++++++++++------------------------------------ net/tipc/binding_table.c | 74 +++++++++++++++------------------- net/tipc/binding_table.h | 7 ++-- net/tipc/net.c | 2 +- net/tipc/node.c | 4 +- net/tipc/socket.c | 4 +- 6 files changed, 65 insertions(+), 128 deletions(-) diff --git a/net/tipc/binding_distr.c b/net/tipc/binding_distr.c index 207ff87..961f50b 100644 --- a/net/tipc/binding_distr.c +++ b/net/tipc/binding_distr.c @@ -194,32 +194,21 @@ void tipc_bindist_node_up(struct net *net, u32 dnode) } /** - * tipc_publ_purge - remove publication associated with a failed node + * tipc_purge_binding - remove binding associated with a lost peer node * * Invoked for each publication issued by a newly failed node. * Removes publication structure from name table & deletes it. */ -static void tipc_purge_binding(struct net *net, struct tipc_binding *b, - u32 addr) +static void tipc_purge_binding(struct net *net, struct tipc_binding *b) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct tipc_binding *p; + struct tipc_net *tn = tipc_net(net); spin_lock_bh(&tn->bindingtbl_lock); - p = tipc_bindtbl_remove_binding(net, b->type, b->lower, - b->node, b->port, b->key); - if (p) - tipc_node_unsubscribe(net, &p->binding_node, addr); + tipc_bindtbl_remove_binding(net, b->type, b->lower, + b->upper, b->node, b->key); + tipc_node_unsubscribe(net, &b->binding_node, b->node); spin_unlock_bh(&tn->bindingtbl_lock); - - if (p != b) { - pr_err("Unable to remove publication from failed node\n" - " (type=%u, lower=%u, node=0x%x, port=%u, key=%u)\n", - b->type, b->lower, b->node, b->port, - b->key); - } - - kfree_rcu(p, rcu); + kfree_rcu(b, rcu); } /** @@ -245,13 +234,12 @@ void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 addr) struct tipc_binding *b, *tmp; list_for_each_entry_safe(b, tmp, nsub_list, binding_node) - tipc_purge_binding(net, b, addr); + tipc_purge_binding(net, b); tipc_dist_queue_purge(net, addr); } /** - * tipc_update_nametbl - try to process a nametable update and notify - * subscribers + * tipc_update_nametbl - process a nametable update and notify subscribers * * tipc_bindtbl_lock must be held. * Returns the publication item if successful, otherwise NULL. @@ -260,27 +248,32 @@ static bool tipc_update_nametbl(struct net *net, struct distr_item *i, u32 node, u32 dtype) { struct tipc_binding *b = NULL; + u32 lower = ntohl(i->lower); + u32 upper = ntohl(i->upper); + u32 type = ntohl(i->type); + u32 port = ntohl(i->port); + u32 key = ntohl(i->key); if (dtype == PUBLICATION) { - b = tipc_bindtbl_insert_binding(net, ntohl(i->type), - ntohl(i->lower), - ntohl(i->upper), + b = tipc_bindtbl_insert_binding(net, type, lower, upper, TIPC_CLUSTER_SCOPE, node, - ntohl(i->port), ntohl(i->key)); + port, key); if (b) { tipc_node_subscribe(net, &b->binding_node, node); return true; } + pr_warn_ratelimited("Failed to add binding %u,%u,%u from %x\n", + type, lower, upper, node); } else if (dtype == WITHDRAWAL) { - b = tipc_bindtbl_remove_binding(net, ntohl(i->type), - ntohl(i->lower), - node, ntohl(i->port), - ntohl(i->key)); + b = tipc_bindtbl_remove_binding(net, type, lower, + upper, node, key); if (b) { tipc_node_unsubscribe(net, &b->binding_node, node); kfree_rcu(b, rcu); return true; } + pr_warn_ratelimited("Failed to remove binding %u,%u from %x\n", + type, lower, node); } else { pr_warn("Unrecognized name table message received\n"); } @@ -288,53 +281,6 @@ static bool tipc_update_nametbl(struct net *net, struct distr_item *i, } /** - * tipc_bindist_add_backlog - add a failed name table update to the backlog - * - */ -static void tipc_bindist_add_backlog(struct net *net, struct distr_item *i, - u32 type, u32 node) -{ - struct distr_queue_item *e; - struct tipc_net *tn = net_generic(net, tipc_net_id); - unsigned long now = get_jiffies_64(); - - e = kzalloc(sizeof(*e), GFP_ATOMIC); - if (!e) - return; - e->dtype = type; - e->node = node; - e->expires = now + msecs_to_jiffies(sysctl_tipc_bindist_timeout); - memcpy(e, i, sizeof(*i)); - list_add_tail(&e->next, &tn->dist_queue); -} - -/** - * tipc_bindist_process_backlog - try to process any pending name table updates - * from the network. - */ -void tipc_bindist_process_backlog(struct net *net) -{ - struct distr_queue_item *e, *tmp; - struct tipc_net *tn = net_generic(net, tipc_net_id); - unsigned long now = get_jiffies_64(); - - list_for_each_entry_safe(e, tmp, &tn->dist_queue, next) { - if (time_after(e->expires, now)) { - if (!tipc_update_nametbl(net, &e->i, e->node, e->dtype)) - continue; - } else { - pr_warn_ratelimited("Dropping name table update (%d) of {%u, %u, %u} from %x key=%u\n", - e->dtype, ntohl(e->i.type), - ntohl(e->i.lower), - ntohl(e->i.upper), - e->node, ntohl(e->i.key)); - } - list_del(&e->next); - kfree(e); - } -} - -/** * tipc_bindist_rcv - process name table update messages sent by another node */ void tipc_bindist_rcv(struct net *net, struct sk_buff_head *inputq) @@ -356,12 +302,10 @@ void tipc_bindist_rcv(struct net *net, struct sk_buff_head *inputq) count = msg_data_sz(hdr) / ITEM_SIZE; node = msg_orignode(hdr); while (count--) { - if (!tipc_update_nametbl(net, item, node, mtyp)) - tipc_bindist_add_backlog(net, item, mtyp, node); + tipc_update_nametbl(net, item, node, mtyp); item++; } kfree_skb(skb); - tipc_bindist_process_backlog(net); } spin_unlock_bh(&tn->bindingtbl_lock); } diff --git a/net/tipc/binding_table.c b/net/tipc/binding_table.c index 0203297..81a2de1 100644 --- a/net/tipc/binding_table.c +++ b/net/tipc/binding_table.c @@ -118,8 +118,6 @@ static struct tipc_binding *tipc_create_binding(u32 type, u32 lower, u32 upper, /** * tipc_service_create - create a service structure for the specified 'type' - * - * Allocates a single range structure and sets it to all 0's. */ static struct tipc_service *tipc_service_create(u32 type, struct hlist_head *hd) { @@ -141,8 +139,7 @@ static struct tipc_service *tipc_service_create(u32 type, struct hlist_head *hd) /** * tipc_service_find_range - find service range matching a service instance - * - * Very time-critical, so binary searches through range array. + * Very time-critical */ static struct service_range *tipc_service_find_range(struct tipc_service *sc, u32 instance) @@ -175,10 +172,14 @@ static struct service_range *tipc_service_create_range(struct tipc_service *sc, tmp = container_of(parent, struct service_range, tree_node); if (lower < tmp->lower) n = &(*n)->rb_left; + else if (lower > tmp->lower) + n = &(*n)->rb_right; + else if (upper < tmp->upper) + n = &(*n)->rb_left; else if (upper > tmp->upper) n = &(*n)->rb_right; else - return NULL; + return tmp; } sr = kzalloc(sizeof(*sr), GFP_ATOMIC); if (!sr) @@ -204,17 +205,12 @@ static struct tipc_binding *tipc_service_insert_binding(struct net *net, struct tipc_binding *b; bool first = false; - sr = tipc_service_find_range(sc, lower); - if (!sr) { - sr = tipc_service_create_range(sc, lower, upper); - if (!sr) - goto err; - first = true; - } + /* Create range if not already existing */ + sr = tipc_service_create_range(sc, lower, upper); + if (!sr) + goto err; - /* Lower end overlaps existing entry, but we need an exact match */ - if (sr->lower != lower || sr->upper != upper) - return NULL; + first = list_empty(&sr->all_bindings); /* Return if the binding already exists */ list_for_each_entry(b, &sr->all_bindings, all_bindings) { @@ -241,32 +237,31 @@ static struct tipc_binding *tipc_service_insert_binding(struct net *net, return NULL; } -/** - * tipc_service_remove_binding - * - * NOTE: There may be cases where TIPC is asked to remove a binding - * that is not in the name table. For example, if another node issues a - * binding for a name range that overlaps an existing name range - * the binding will not be recorded, which means the binding won't - * be found when the name range is later withdrawn by that node. - * A failed withdraw request simply returns a failure indication and lets the - * caller issue any error or warning messages associated with such a problem. - */ static struct tipc_binding *tipc_service_remove_binding(struct net *net, struct tipc_service *sc, - u32 inst, u32 node, - u32 port, u32 key) + u32 lower, u32 upper, + u32 node, u32 key) { struct tipc_subscription *sub, *tmp; struct service_range *sr; struct tipc_binding *b; bool found = false; bool last = false; + struct rb_node *n; - sr = tipc_service_find_range(sc, inst); + sr = tipc_service_find_range(sc, lower); if (!sr) return NULL; + /* Find exact matching service range */ + for (n = &sr->tree_node; n; n = rb_next(n)) { + sr = container_of(n, struct service_range, tree_node); + if (sr->upper == upper) + break; + } + if (!n || sr->lower != lower || sr->upper != upper) + return NULL; + /* Find binding, if it exists */ list_for_each_entry(b, &sr->all_bindings, all_bindings) { if (b->key != key || (node && node != b->node)) @@ -379,8 +374,8 @@ struct tipc_binding *tipc_bindtbl_insert_binding(struct net *net, u32 type, } struct tipc_binding *tipc_bindtbl_remove_binding(struct net *net, u32 type, - u32 lower, u32 node, u32 port, - u32 key) + u32 lower, u32 upper, + u32 node, u32 key) { struct tipc_service *sc = tipc_bindtbl_find_service(net, type); struct tipc_binding *b = NULL; @@ -389,7 +384,7 @@ struct tipc_binding *tipc_bindtbl_remove_binding(struct net *net, u32 type, return NULL; spin_lock_bh(&sc->lock); - b = tipc_service_remove_binding(net, sc, lower, node, port, key); + b = tipc_service_remove_binding(net, sc, lower, upper, node, key); /* Delete service item if this no more bindings and subscriptions */ if (RB_EMPTY_ROOT(&sc->ranges) && list_empty(&sc->subscriptions)) { @@ -623,8 +618,6 @@ struct tipc_binding *tipc_bindtbl_bind(struct net *net, u32 type, u32 lower, if (b) { bt->local_bindings_count++; skb = tipc_bindist_publish(net, b); - /* Any pending external events? */ - tipc_bindist_process_backlog(net); } exit: spin_unlock_bh(&tn->bindingtbl_lock); @@ -637,7 +630,8 @@ struct tipc_binding *tipc_bindtbl_bind(struct net *net, u32 type, u32 lower, /** * tipc_bindtbl_unbind - withdraw a service binding */ -int tipc_bindtbl_unbind(struct net *net, u32 type, u32 lower, u32 port, u32 key) +int tipc_bindtbl_unbind(struct net *net, u32 type, u32 lower, + u32 upper, u32 key) { struct tipc_binding_table *bt = tipc_binding_table(net); struct tipc_net *tn = tipc_net(net); @@ -647,17 +641,15 @@ int tipc_bindtbl_unbind(struct net *net, u32 type, u32 lower, u32 port, u32 key) spin_lock_bh(&tn->bindingtbl_lock); - b = tipc_bindtbl_remove_binding(net, type, lower, self, port, key); + b = tipc_bindtbl_remove_binding(net, type, lower, upper, self, key); if (b) { bt->local_bindings_count--; skb = tipc_bindist_withdraw(net, b); - /* Any pending external events? */ - tipc_bindist_process_backlog(net); list_del_init(&b->binding_sock); kfree_rcu(b, rcu); } else { pr_err("Failed to remove local binding {%u,%u,%u}/%u\n", - type, lower, port, key); + type, lower, upper, key); } spin_unlock_bh(&tn->bindingtbl_lock); @@ -756,8 +748,8 @@ static void tipc_purge_bindings(struct net *net, struct tipc_service *sc) rbtree_postorder_for_each_entry_safe(sr, tmpr, &sc->ranges, tree_node) { list_for_each_entry_safe(b, tmpb, &sr->all_bindings, all_bindings) { - tipc_service_remove_binding(net, sc, b->lower, b->node, - b->port, b->key); + tipc_service_remove_binding(net, sc, b->lower, b->upper, + b->node, b->key); kfree_rcu(b, rcu); } } diff --git a/net/tipc/binding_table.h b/net/tipc/binding_table.h index 2d9f156..da57137 100644 --- a/net/tipc/binding_table.h +++ b/net/tipc/binding_table.h @@ -115,14 +115,15 @@ bool tipc_bindtbl_lookup(struct net *net, u32 type, u32 instance, u32 domain, struct tipc_binding *tipc_bindtbl_bind(struct net *net, u32 type, u32 lower, u32 upper, u32 scope, u32 port_ref, u32 key); -int tipc_bindtbl_unbind(struct net *net, u32 type, u32 lower, u32 ref, u32 key); +int tipc_bindtbl_unbind(struct net *net, u32 type, + u32 lower, u32 upper, u32 key); struct tipc_binding *tipc_bindtbl_insert_binding(struct net *net, u32 type, u32 lower, u32 upper, u32 scope, u32 node, u32 ref, u32 key); struct tipc_binding *tipc_bindtbl_remove_binding(struct net *net, u32 type, - u32 lower, u32 node, u32 ref, - u32 key); + u32 lower, u32 upper, + u32 node, u32 key); void tipc_bindtbl_subscribe(struct tipc_subscription *s); void tipc_bindtbl_unsubscribe(struct tipc_subscription *s); int tipc_bindtbl_init(struct net *net); diff --git a/net/tipc/net.c b/net/tipc/net.c index a37c8cb..3a1a1b8 100644 --- a/net/tipc/net.c +++ b/net/tipc/net.c @@ -136,7 +136,7 @@ void tipc_net_stop(struct net *net) if (!self) return; - tipc_bindtbl_unbind(net, TIPC_CFG_SRV, self, 0, self); + tipc_bindtbl_unbind(net, TIPC_CFG_SRV, self, self, self); rtnl_lock(); tipc_bearer_stop(net); tipc_node_stop(net); diff --git a/net/tipc/node.c b/net/tipc/node.c index b7a2413..6c1bb08 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -324,11 +324,11 @@ static void tipc_node_write_unlock(struct tipc_node *n) if (flags & TIPC_NOTIFY_LINK_UP) { tipc_mon_peer_up(net, addr, bearer_id); tipc_bindtbl_bind(net, TIPC_LINK_STATE, addr, addr, - TIPC_NODE_SCOPE, link_id, addr); + TIPC_NODE_SCOPE, link_id, link_id); } if (flags & TIPC_NOTIFY_LINK_DOWN) { tipc_mon_peer_down(net, addr, bearer_id); - tipc_bindtbl_unbind(net, TIPC_LINK_STATE, addr, link_id, addr); + tipc_bindtbl_unbind(net, TIPC_LINK_STATE, addr, addr, link_id); } } diff --git a/net/tipc/socket.c b/net/tipc/socket.c index a242869..dc8826f 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -2634,11 +2634,11 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope, if (b->upper != seq->upper) break; tipc_bindtbl_unbind(net, b->type, b->lower, - b->port, b->key); + b->upper, b->key); rc = 0; break; } - tipc_bindtbl_unbind(net, b->type, b->lower, b->port, b->key); + tipc_bindtbl_unbind(net, b->type, b->lower, b->upper, b->key); rc = 0; } if (list_empty(&tsk->publications)) -- 2.1.4 ------------------------------------------------------------------------------ Check out the vibrant tech community on one of the world's most engaging tech sites, Slashdot.org! http://sdm.link/slashdot _______________________________________________ tipc-discussion mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/tipc-discussion
