n 02/10/2018 01:07 AM, Jon Maloy wrote:
> It is unnecessary to keep two structures, struct tipc_conn and struct
> tipc_subscriber, with a one-to-one relationship and still with different
> life cycles. The fact that the two often run in different contexts, and
> still may access each other via direct pointers constitutes an additional
> hazard, something we have experienced at several occasions, and still
> see happening.
>
> We have identified at least two remaining problems that are easier to
> fix if we simplify the topology server data structure somewhat.
>
> - When there is a race between a subscription up/down event and a
> timeout event, it is fully possible that the former might be delivered
> after the latter, leading to confusion for the receiver.
>
> - The function tipc_subcrp_timeout() is executing in interrupt context,
> while the following call chain is at least theoretically possible:
> tipc_subscrp_timeout()
> tipc_subscrp_send_event()
> tipc_conn_sendmsg()
> conn_put()
> tipc_conn_kref_release()
> sock_release(sock)
>
> Because of this, we sometimes see a warning "Trying to sleep in
> interrupt context". To eliminate this, we need to ensure that the
> tipc_conn structure and the socket, as well as the subscription
> instances, only are deleted in work queue context, i.e., after the
> timeout event really has been sent out.
>
> We now remove this unnecessary complexity, by merging data and
> functionality of the subscriber structure into struct tipc_conn
> and the associated file server.c. We thereafter add a spinlock and
> a new 'inactive' state to the subscription structure. Using those,
> both problems described above can be easily solved.
>
> Signed-off-by: Jon Maloy <[email protected]>
> ---
> net/tipc/server.c | 162 +++++++++++++++++++++++++++++++++-----------------
> net/tipc/server.h | 2 +-
> net/tipc/subscr.c | 173
> ++++++++++--------------------------------------------
> net/tipc/subscr.h | 17 +++---
> 4 files changed, 149 insertions(+), 205 deletions(-)
>
> diff --git a/net/tipc/server.c b/net/tipc/server.c
> index 8aa2a33..1706453 100644
> --- a/net/tipc/server.c
> +++ b/net/tipc/server.c
> @@ -2,6 +2,7 @@
> * net/tipc/server.c: TIPC server infrastructure
> *
> * Copyright (c) 2012-2013, Wind River Systems
> + * Copyright (c) 2017, Ericsson AB
> * All rights reserved.
> *
> * Redistribution and use in source and binary forms, with or without
> @@ -57,12 +58,13 @@
> * @sock: socket handler associated with connection
> * @flags: indicates connection state
> * @server: pointer to connected server
> + * @sub_list: lsit to all pertaing subscriptions
> + * @sub_lock: lock protecting the subscription list
> + * @outqueue_lock: control access to the outqueue
> * @rwork: receive work item
> - * @usr_data: user-specified field
> * @rx_action: what to do when connection socket is active
> * @outqueue: pointer to first outbound message in queue
> * @outqueue_lock: control access to the outqueue
> - * @outqueue: list of connection objects for its server
> * @swork: send work item
> */
> struct tipc_conn {
> @@ -71,9 +73,10 @@ struct tipc_conn {
> struct socket *sock;
> unsigned long flags;
> struct tipc_server *server;
> + struct list_head sub_list;
> + spinlock_t sub_lock; /* for subscription list */
> struct work_struct rwork;
> int (*rx_action) (struct tipc_conn *con);
> - void *usr_data;
> struct list_head outqueue;
> spinlock_t outqueue_lock;
> struct work_struct swork;
> @@ -81,6 +84,7 @@ struct tipc_conn {
>
> /* An entry waiting to be sent */
> struct outqueue_entry {
> + u32 evt;
> struct list_head list;
> struct kvec iov;
> };
> @@ -89,18 +93,35 @@ static void tipc_recv_work(struct work_struct *work);
> static void tipc_send_work(struct work_struct *work);
> static void tipc_clean_outqueues(struct tipc_conn *con);
>
> +static bool connected(struct tipc_conn *con)
> +{
> + if (!con)
> + return false;
> + return test_bit(CF_CONNECTED, &con->flags);
> +}
> +
> +/**
> + * htohl - convert value to endianness used by destination
> + * @in: value to convert
> + * @swap: non-zero if endianness must be reversed
> + *
> + * Returns converted value
> + */
> +static u32 htohl(u32 in, int swap)
> +{
> + return swap ? swab32(in) : in;
> +}
> +
> static void tipc_conn_kref_release(struct kref *kref)
> {
> struct tipc_conn *con = container_of(kref, struct tipc_conn, kref);
> struct tipc_server *s = con->server;
> struct socket *sock = con->sock;
> - struct sock *sk;
>
> if (sock) {
> - sk = sock->sk;
> if (test_bit(CF_SERVER, &con->flags)) {
> __module_get(sock->ops->owner);
> - __module_get(sk->sk_prot_creator->owner);
> + __module_get(sock->sk->sk_prot_creator->owner);
> }
> sock_release(sock);
> con->sock = NULL;
> @@ -129,11 +150,8 @@ static struct tipc_conn *tipc_conn_lookup(struct
> tipc_server *s, int conid)
>
> spin_lock_bh(&s->idr_lock);
> con = idr_find(&s->conn_idr, conid);
> - if (con) {
> - if (!test_bit(CF_CONNECTED, &con->flags) ||
> - !kref_get_unless_zero(&con->kref))
> - con = NULL;
> - }
> + if (!connected(con) || !kref_get_unless_zero(&con->kref))
> + con = NULL;
> spin_unlock_bh(&s->idr_lock);
> return con;
> }
> @@ -144,7 +162,7 @@ static void sock_data_ready(struct sock *sk)
>
> read_lock_bh(&sk->sk_callback_lock);
> con = sock2con(sk);
> - if (con && test_bit(CF_CONNECTED, &con->flags)) {
> + if (connected(con)) {
> conn_get(con);
> if (!queue_work(con->server->rcv_wq, &con->rwork))
> conn_put(con);
> @@ -158,7 +176,7 @@ static void sock_write_space(struct sock *sk)
>
> read_lock_bh(&sk->sk_callback_lock);
> con = sock2con(sk);
> - if (con && test_bit(CF_CONNECTED, &con->flags)) {
> + if (connected(con)) {
> conn_get(con);
> if (!queue_work(con->server->send_wq, &con->swork))
> conn_put(con);
> @@ -181,6 +199,24 @@ static void tipc_register_callbacks(struct socket *sock,
> struct tipc_conn *con)
> write_unlock_bh(&sk->sk_callback_lock);
> }
>
> +/* tipc_con_delete_sub - delete a specific or all subscriptions
> + * for a given subscriber
> + */
> +static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s)
> +{
> + struct list_head *sub_list = &con->sub_list;
> + struct tipc_subscription *sub, *tmp;
> +
> + spin_lock_bh(&con->sub_lock);
> + list_for_each_entry_safe(sub, tmp, sub_list, subscrp_list) {
> + if (!s || !memcmp(s, &sub->evt.s, sizeof(*s)))
> + tipc_sub_delete(sub);
> + else if (s)
> + break;
> + }
> + spin_unlock_bh(&con->sub_lock);
> +}
> +
> static void tipc_close_conn(struct tipc_conn *con)
> {
> struct sock *sk = con->sock->sk;
> @@ -188,10 +224,11 @@ static void tipc_close_conn(struct tipc_conn *con)
>
> write_lock_bh(&sk->sk_callback_lock);
> disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags);
> +
> if (disconnect) {
> sk->sk_user_data = NULL;
> if (con->conid)
> - tipc_subscrb_delete(con->usr_data);
> + tipc_con_delete_sub(con, NULL);
> }
> write_unlock_bh(&sk->sk_callback_lock);
>
> @@ -215,7 +252,9 @@ static struct tipc_conn *tipc_alloc_conn(struct
> tipc_server *s)
>
> kref_init(&con->kref);
> INIT_LIST_HEAD(&con->outqueue);
> + INIT_LIST_HEAD(&con->sub_list);
> spin_lock_init(&con->outqueue_lock);
> + spin_lock_init(&con->sub_lock);
> INIT_WORK(&con->swork, tipc_send_work);
> INIT_WORK(&con->rwork, tipc_recv_work);
>
> @@ -236,6 +275,35 @@ static struct tipc_conn *tipc_alloc_conn(struct
> tipc_server *s)
> return con;
> }
>
> +int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con,
> + void *buf, size_t len)
> +{
> + struct tipc_subscr *s = (struct tipc_subscr *)buf;
> + struct tipc_subscription *sub;
> + bool status;
> + int swap;
> +
> + /* Determine subscriber's endianness */
> + swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE |
> + TIPC_SUB_CANCEL));
> +
> + /* Detect & process a subscription cancellation request */
> + if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
> + s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
> + tipc_con_delete_sub(con, s);
> + return 0;
> + }
> + status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
> + sub = tipc_subscrp_subscribe(net, s, conid, swap, status);
> + if (!sub)
> + return -1;
> +
> + spin_lock_bh(&con->sub_lock);
> + list_add(&sub->subscrp_list, &con->sub_list);
> + spin_unlock_bh(&con->sub_lock);
> + return 0;
> +}
> +
> static int tipc_receive_from_sock(struct tipc_conn *con)
> {
> struct tipc_server *s = con->server;
> @@ -262,9 +330,8 @@ static int tipc_receive_from_sock(struct tipc_conn *con)
> }
>
> read_lock_bh(&sk->sk_callback_lock);
> - if (test_bit(CF_CONNECTED, &con->flags))
> - ret = tipc_subscrb_rcv(sock_net(con->sock->sk), con->conid,
> - con->usr_data, buf, ret);
> + if (connected(con))
It's unnecessary to validate "con" because it's always valid here.
> + ret = tipc_con_rcv_sub(s->net, con->conid, con, buf, ret);
> read_unlock_bh(&sk->sk_callback_lock);
> kmem_cache_free(s->rcvbuf_cache, buf);
> if (ret < 0)
> @@ -302,15 +369,6 @@ static int tipc_accept_from_sock(struct tipc_conn *con)
> newcon->rx_action = tipc_receive_from_sock;
> tipc_register_callbacks(newsock, newcon);
>
> - /* Notify that new connection is incoming */
> - newcon->usr_data = tipc_subscrb_create(newcon->conid);
> -
> - if (!newcon->usr_data) {
> - sock_release(newsock);
> - conn_put(newcon);
> - return -ENOMEM;
> - }
> -
> /* Wake up receive process in case of 'SYN+' message */
> newsock->sk->sk_data_ready(newsock->sk);
> return ret;
> @@ -427,7 +485,7 @@ static void tipc_clean_outqueues(struct tipc_conn *con)
> }
>
> int tipc_conn_sendmsg(struct tipc_server *s, int conid,
> - void *data, size_t len)
> + u32 evt, void *data, size_t len)
> {
> struct outqueue_entry *e;
> struct tipc_conn *con;
> @@ -436,7 +494,7 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid,
> if (!con)
> return -EINVAL;
>
> - if (!test_bit(CF_CONNECTED, &con->flags)) {
> + if (con && !connected(con)) {
It's unnecessary to check "con" because we just validated before.
> conn_put(con);
> return 0;
> }
> @@ -446,7 +504,7 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid,
> conn_put(con);
> return -ENOMEM;
> }
> -
> + e->evt = evt;
> spin_lock_bh(&con->outqueue_lock);
> list_add_tail(&e->list, &con->outqueue);
> spin_unlock_bh(&con->outqueue_lock);
> @@ -470,10 +528,9 @@ void tipc_conn_terminate(struct tipc_server *s, int
> conid)
> bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
> u32 upper, u32 filter, int *conid)
> {
> - struct tipc_subscriber *scbr;
> struct tipc_subscr sub;
> - struct tipc_server *s;
> struct tipc_conn *con;
> + int rc;
>
> sub.seq.type = type;
> sub.seq.lower = lower;
> @@ -487,32 +544,23 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port,
> u32 type, u32 lower,
> return false;
>
> *conid = con->conid;
> - s = con->server;
> - scbr = tipc_subscrb_create(*conid);
> - if (!scbr) {
> - conn_put(con);
> - return false;
> - }
> -
> - con->usr_data = scbr;
> con->sock = NULL;
> - tipc_subscrb_rcv(net, *conid, scbr, &sub, sizeof(sub));
> - return true;
> + rc = tipc_con_rcv_sub(net, *conid, con, &sub, sizeof(sub));
> + if (rc < 0)
> + tipc_close_conn(con);
> + return !rc;
> }
>
> void tipc_topsrv_kern_unsubscr(struct net *net, int conid)
> {
> struct tipc_conn *con;
> - struct tipc_server *srv;
>
> con = tipc_conn_lookup(tipc_topsrv(net), conid);
> if (!con)
> return;
>
> test_and_clear_bit(CF_CONNECTED, &con->flags);
> - srv = con->server;
> - if (con->conid)
> - tipc_subscrb_delete(con->usr_data);
> + tipc_con_delete_sub(con, NULL);
> conn_put(con);
> conn_put(con);
> }
> @@ -537,7 +585,8 @@ static void tipc_send_kern_top_evt(struct net *net,
> struct tipc_event *evt)
>
> static void tipc_send_to_sock(struct tipc_conn *con)
> {
> - struct tipc_server *s = con->server;
> + struct list_head *queue = &con->outqueue;
> + struct tipc_server *srv = con->server;
> struct outqueue_entry *e;
> struct tipc_event *evt;
> struct msghdr msg;
> @@ -545,13 +594,19 @@ static void tipc_send_to_sock(struct tipc_conn *con)
> int ret;
>
> spin_lock_bh(&con->outqueue_lock);
> - while (test_bit(CF_CONNECTED, &con->flags)) {
> - e = list_entry(con->outqueue.next, struct outqueue_entry, list);
> - if ((struct list_head *) e == &con->outqueue)
> - break;
> +
> + while (connected(con) && !list_empty(queue)) {
It's unnecessary to validate con as it's always valid here.
> + e = list_first_entry(queue, struct outqueue_entry, list);
>
> spin_unlock_bh(&con->outqueue_lock);
>
> + if (e->evt == TIPC_SUBSCR_TIMEOUT) {
> + evt = (struct tipc_event *)e->iov.iov_base;
> + tipc_con_delete_sub(con, &evt->s);
> + }
> + memset(&msg, 0, sizeof(msg));
> + msg.msg_flags = MSG_DONTWAIT;
> +
> if (con->sock) {
Delete below two duplicated sentence because we have did above.
> memset(&msg, 0, sizeof(msg));
> msg.msg_flags = MSG_DONTWAIT;
> @@ -565,7 +620,7 @@ static void tipc_send_to_sock(struct tipc_conn *con)
> }
> } else {
> evt = e->iov.iov_base;
> - tipc_send_kern_top_evt(s->net, evt);
> + tipc_send_kern_top_evt(srv->net, evt);
> }
>
> /* Don't starve users filling buffers */
> @@ -573,7 +628,6 @@ static void tipc_send_to_sock(struct tipc_conn *con)
> cond_resched();
> count = 0;
> }
> -
> spin_lock_bh(&con->outqueue_lock);
> list_del(&e->list);
> tipc_free_entry(e);
> @@ -591,7 +645,7 @@ static void tipc_recv_work(struct work_struct *work)
> struct tipc_conn *con = container_of(work, struct tipc_conn, rwork);
> int count = 0;
>
> - while (test_bit(CF_CONNECTED, &con->flags)) {
> + while (connected(con)) {
"con" is also definitely valid in the funciton. So it's unnecesary
validate it again.
> if (con->rx_action(con))
> break;
>
> @@ -608,7 +662,7 @@ static void tipc_send_work(struct work_struct *work)
> {
> struct tipc_conn *con = container_of(work, struct tipc_conn, swork);
>
> - if (test_bit(CF_CONNECTED, &con->flags))
> + if (connected(con))
It's unnecessary to change this.
> tipc_send_to_sock(con);
>
> conn_put(con);
> diff --git a/net/tipc/server.h b/net/tipc/server.h
> index b4b83bd..fcc7232 100644
> --- a/net/tipc/server.h
> +++ b/net/tipc/server.h
> @@ -77,7 +77,7 @@ struct tipc_server {
> };
>
> int tipc_conn_sendmsg(struct tipc_server *s, int conid,
> - void *data, size_t len);
> + u32 evt, void *data, size_t len);
>
> bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
> u32 upper, u32 filter, int *conid);
> diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
> index b86fbbf..c6de145 100644
> --- a/net/tipc/subscr.c
> +++ b/net/tipc/subscr.c
> @@ -1,7 +1,7 @@
> /*
> * net/tipc/subscr.c: TIPC network topology service
> *
> - * Copyright (c) 2000-2006, Ericsson AB
> + * Copyright (c) 2000-2017, Ericsson AB
> * Copyright (c) 2005-2007, 2010-2013, Wind River Systems
> * All rights reserved.
> *
> @@ -39,22 +39,6 @@
> #include "subscr.h"
>
> /**
> - * struct tipc_subscriber - TIPC network topology subscriber
> - * @kref: reference counter to tipc_subscription object
> - * @conid: connection identifier to server connecting to subscriber
> - * @lock: control access to subscriber
> - * @subscrp_list: list of subscription objects for this subscriber
> - */
> -struct tipc_subscriber {
> - struct kref kref;
> - int conid;
> - spinlock_t lock;
> - struct list_head subscrp_list;
> -};
> -
> -static void tipc_subscrb_put(struct tipc_subscriber *subscriber);
> -
> -/**
> * htohl - convert value to endianness used by destination
> * @in: value to convert
> * @swap: non-zero if endianness must be reversed
> @@ -71,9 +55,10 @@ static void tipc_subscrp_send_event(struct
> tipc_subscription *sub,
> u32 event, u32 port_ref, u32 node)
> {
> struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
> - struct tipc_subscriber *subscriber = sub->subscriber;
> struct kvec msg_sect;
>
> + if (sub->inactive)
> + return;
> msg_sect.iov_base = (void *)&sub->evt;
> msg_sect.iov_len = sizeof(struct tipc_event);
> sub->evt.event = htohl(event, sub->swap);
> @@ -81,7 +66,7 @@ static void tipc_subscrp_send_event(struct
> tipc_subscription *sub,
> sub->evt.found_upper = htohl(found_upper, sub->swap);
> sub->evt.port.ref = htohl(port_ref, sub->swap);
> sub->evt.port.node = htohl(node, sub->swap);
> - tipc_conn_sendmsg(tn->topsrv, subscriber->conid,
> + tipc_conn_sendmsg(tn->topsrv, sub->conid, event,
> msg_sect.iov_base, msg_sect.iov_len);
> }
>
> @@ -132,41 +117,22 @@ void tipc_subscrp_report_overlap(struct
> tipc_subscription *sub, u32 found_lower,
> return;
> if (filter & TIPC_SUB_NODE_SCOPE && scope != TIPC_NODE_SCOPE)
> return;
> -
> - tipc_subscrp_send_event(sub, found_lower, found_upper, event, port_ref,
> - node);
> + spin_lock(&sub->lock);
> + tipc_subscrp_send_event(sub, found_lower, found_upper,
> + event, port_ref, node);
> + spin_unlock(&sub->lock);
> }
>
> static void tipc_subscrp_timeout(struct timer_list *t)
> {
> struct tipc_subscription *sub = from_timer(sub, t, timer);
> - struct tipc_subscriber *subscriber = sub->subscriber;
> -
> - spin_lock_bh(&subscriber->lock);
> - tipc_nametbl_unsubscribe(sub);
> - list_del(&sub->subscrp_list);
> - spin_unlock_bh(&subscriber->lock);
> + struct tipc_subscr *s = &sub->evt.s;
>
> - /* Notify subscriber of timeout */
> - tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
> + spin_lock(&sub->lock);
> + tipc_subscrp_send_event(sub, s->seq.lower, s->seq.upper,
> TIPC_SUBSCR_TIMEOUT, 0, 0);
> -
> - tipc_subscrp_put(sub);
> -}
> -
> -static void tipc_subscrb_kref_release(struct kref *kref)
> -{
> - kfree(container_of(kref,struct tipc_subscriber, kref));
> -}
> -
> -static void tipc_subscrb_put(struct tipc_subscriber *subscriber)
> -{
> - kref_put(&subscriber->kref, tipc_subscrb_kref_release);
> -}
> -
> -static void tipc_subscrb_get(struct tipc_subscriber *subscriber)
> -{
> - kref_get(&subscriber->kref);
> + sub->inactive = true;
> + spin_unlock(&sub->lock);
> }
>
> static void tipc_subscrp_kref_release(struct kref *kref)
> @@ -175,11 +141,9 @@ static void tipc_subscrp_kref_release(struct kref *kref)
> struct tipc_subscription,
> kref);
> struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
> - struct tipc_subscriber *subscriber = sub->subscriber;
>
> atomic_dec(&tn->subscription_count);
> kfree(sub);
> - tipc_subscrb_put(subscriber);
> }
>
> void tipc_subscrp_put(struct tipc_subscription *subscription)
> @@ -192,68 +156,9 @@ void tipc_subscrp_get(struct tipc_subscription
> *subscription)
> kref_get(&subscription->kref);
> }
>
> -/* tipc_subscrb_subscrp_delete - delete a specific subscription or all
> - * subscriptions for a given subscriber.
> - */
> -static void tipc_subscrb_subscrp_delete(struct tipc_subscriber *subscriber,
> - struct tipc_subscr *s)
> -{
> - struct list_head *subscription_list = &subscriber->subscrp_list;
> - struct tipc_subscription *sub, *temp;
> - u32 timeout;
> -
> - spin_lock_bh(&subscriber->lock);
> - list_for_each_entry_safe(sub, temp, subscription_list, subscrp_list) {
> - if (s && memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr)))
> - continue;
> -
> - timeout = htohl(sub->evt.s.timeout, sub->swap);
> - if (timeout == TIPC_WAIT_FOREVER || del_timer(&sub->timer)) {
> - tipc_nametbl_unsubscribe(sub);
> - list_del(&sub->subscrp_list);
> - tipc_subscrp_put(sub);
> - }
> -
> - if (s)
> - break;
> - }
> - spin_unlock_bh(&subscriber->lock);
> -}
> -
> -struct tipc_subscriber *tipc_subscrb_create(int conid)
> -{
> - struct tipc_subscriber *subscriber;
> -
> - subscriber = kzalloc(sizeof(*subscriber), GFP_ATOMIC);
> - if (!subscriber) {
> - pr_warn("Subscriber rejected, no memory\n");
> - return NULL;
> - }
> - INIT_LIST_HEAD(&subscriber->subscrp_list);
> - kref_init(&subscriber->kref);
> - subscriber->conid = conid;
> - spin_lock_init(&subscriber->lock);
> -
> - return subscriber;
> -}
> -
> -void tipc_subscrb_delete(struct tipc_subscriber *subscriber)
> -{
> - tipc_subscrb_subscrp_delete(subscriber, NULL);
> - tipc_subscrb_put(subscriber);
> -}
> -
> -static void tipc_subscrp_cancel(struct tipc_subscr *s,
> - struct tipc_subscriber *subscriber)
> -{
> - tipc_subscrb_get(subscriber);
> - tipc_subscrb_subscrp_delete(subscriber, s);
> - tipc_subscrb_put(subscriber);
> -}
> -
> static struct tipc_subscription *tipc_subscrp_create(struct net *net,
> struct tipc_subscr *s,
> - int swap)
> + int conid, bool swap)
> {
> struct tipc_net *tn = net_generic(net, tipc_net_id);
> struct tipc_subscription *sub;
> @@ -275,6 +180,8 @@ static struct tipc_subscription
> *tipc_subscrp_create(struct net *net,
>
> /* Initialize subscription object */
> sub->net = net;
> + sub->conid = conid;
> + sub->inactive = false;
> if (((filter & TIPC_SUB_PORTS) && (filter & TIPC_SUB_SERVICE)) ||
> (htohl(s->seq.lower, swap) > htohl(s->seq.upper, swap))) {
> pr_warn("Subscription rejected, illegal request\n");
> @@ -284,59 +191,39 @@ static struct tipc_subscription
> *tipc_subscrp_create(struct net *net,
>
> sub->swap = swap;
> memcpy(&sub->evt.s, s, sizeof(*s));
> + spin_lock_init(&sub->lock);
> atomic_inc(&tn->subscription_count);
> kref_init(&sub->kref);
> return sub;
> }
>
> -static int tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s,
> - struct tipc_subscriber *subscriber, int swap,
> - bool status)
> +struct tipc_subscription *tipc_subscrp_subscribe(struct net *net,
> + struct tipc_subscr *s,
> + int conid, bool swap,
> + bool status)
> {
> struct tipc_subscription *sub = NULL;
> u32 timeout;
>
> - sub = tipc_subscrp_create(net, s, swap);
> + sub = tipc_subscrp_create(net, s, conid, swap);
> if (!sub)
> - return -1;
> + return NULL;
>
> - spin_lock_bh(&subscriber->lock);
> - list_add(&sub->subscrp_list, &subscriber->subscrp_list);
> - sub->subscriber = subscriber;
> tipc_nametbl_subscribe(sub, status);
> - tipc_subscrb_get(subscriber);
> - spin_unlock_bh(&subscriber->lock);
> -
> timer_setup(&sub->timer, tipc_subscrp_timeout, 0);
> timeout = htohl(sub->evt.s.timeout, swap);
> -
> if (timeout != TIPC_WAIT_FOREVER)
> mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout));
> - return 0;
> + return sub;
> }
>
> -/* Handle one request to create a new subscription for the subscriber
> - */
> -int tipc_subscrb_rcv(struct net *net, int conid, void *usr_data,
> - void *buf, size_t len)
> +void tipc_sub_delete(struct tipc_subscription *sub)
> {
> - struct tipc_subscriber *subscriber = usr_data;
> - struct tipc_subscr *s = (struct tipc_subscr *)buf;
> - bool status;
> - int swap;
> -
> - /* Determine subscriber's endianness */
> - swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE |
> - TIPC_SUB_CANCEL));
> -
> - /* Detect & process a subscription cancellation request */
> - if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
> - s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
> - tipc_subscrp_cancel(s, subscriber);
> - return 0;
> - }
> - status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
> - return tipc_subscrp_subscribe(net, s, subscriber, swap, status);
> + tipc_nametbl_unsubscribe(sub);
> + if (sub->evt.s.timeout != TIPC_WAIT_FOREVER)
> + del_timer_sync(&sub->timer);
> + list_del(&sub->subscrp_list);
> + tipc_subscrp_put(sub);
> }
>
> int tipc_topsrv_start(struct net *net)
> diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
> index a736f29..cfd0cb3 100644
> --- a/net/tipc/subscr.h
> +++ b/net/tipc/subscr.h
> @@ -43,7 +43,7 @@
> #define TIPC_MAX_PUBLICATIONS 65535
>
> struct tipc_subscription;
> -struct tipc_subscriber;
> +struct tipc_conn;
>
> /**
> * struct tipc_subscription - TIPC network topology subscription object
> @@ -58,19 +58,22 @@ struct tipc_subscriber;
> */
> struct tipc_subscription {
> struct kref kref;
> - struct tipc_subscriber *subscriber;
> struct net *net;
> struct timer_list timer;
> struct list_head nameseq_list;
> struct list_head subscrp_list;
> - int swap;
> struct tipc_event evt;
> + int conid;
> + bool swap;
> + bool inactive;
> + spinlock_t lock; /* serialize up/down and timer events */
> };
>
> -struct tipc_subscriber *tipc_subscrb_create(int conid);
> -void tipc_subscrb_delete(struct tipc_subscriber *subscriber);
> -int tipc_subscrb_rcv(struct net *net, int conid, void *usr_data,
> - void *buf, size_t len);
> +struct tipc_subscription *tipc_subscrp_subscribe(struct net *net,
> + struct tipc_subscr *s,
> + int conid, bool swap,
> + bool status);
> +void tipc_sub_delete(struct tipc_subscription *sub);
> int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower,
> u32 found_upper);
> void tipc_subscrp_report_overlap(struct tipc_subscription *sub,
>
------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, Slashdot.org! http://sdm.link/slashdot
_______________________________________________
tipc-discussion mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/tipc-discussion