n 02/10/2018 01:07 AM, Jon Maloy wrote:
> It is unnecessary to keep two structures, struct tipc_conn and struct
> tipc_subscriber, with a one-to-one relationship and still with different
> life cycles. The fact that the two often run in different contexts, and
> still may access each other via direct pointers constitutes an additional
> hazard, something we have experienced at several occasions, and still
> see happening.
> 
> We have identified at least two remaining problems that are easier to
> fix if we simplify the topology server data structure somewhat.
> 
> - When there is a race between a subscription up/down event and a
>   timeout event, it is fully possible that the former might be delivered
>   after the latter, leading to confusion for the receiver.
> 
> - The function tipc_subcrp_timeout() is executing in interrupt context,
>   while the following call chain is at least theoretically possible:
>   tipc_subscrp_timeout()
>     tipc_subscrp_send_event()
>       tipc_conn_sendmsg()
>         conn_put()
>           tipc_conn_kref_release()
>             sock_release(sock)
> 
> Because of this, we sometimes see a warning "Trying to sleep in
> interrupt context". To eliminate this, we need to ensure that the
> tipc_conn structure and the socket, as well as the subscription
> instances, only are deleted in work queue context, i.e., after the
> timeout event really has been sent out.
> 
> We now remove this unnecessary complexity, by merging data and
> functionality of the subscriber structure into struct tipc_conn
> and the associated file server.c. We thereafter add a spinlock and
> a new 'inactive' state to the subscription structure. Using those,
> both problems described above can be easily solved.
> 
> Signed-off-by: Jon Maloy <[email protected]>
> ---
>  net/tipc/server.c | 162 +++++++++++++++++++++++++++++++++-----------------
>  net/tipc/server.h |   2 +-
>  net/tipc/subscr.c | 173 
> ++++++++++--------------------------------------------
>  net/tipc/subscr.h |  17 +++---
>  4 files changed, 149 insertions(+), 205 deletions(-)
> 
> diff --git a/net/tipc/server.c b/net/tipc/server.c
> index 8aa2a33..1706453 100644
> --- a/net/tipc/server.c
> +++ b/net/tipc/server.c
> @@ -2,6 +2,7 @@
>   * net/tipc/server.c: TIPC server infrastructure
>   *
>   * Copyright (c) 2012-2013, Wind River Systems
> + * Copyright (c) 2017, Ericsson AB
>   * All rights reserved.
>   *
>   * Redistribution and use in source and binary forms, with or without
> @@ -57,12 +58,13 @@
>   * @sock: socket handler associated with connection
>   * @flags: indicates connection state
>   * @server: pointer to connected server
> + * @sub_list: lsit to all pertaing subscriptions
> + * @sub_lock: lock protecting the subscription list
> + * @outqueue_lock: control access to the outqueue
>   * @rwork: receive work item
> - * @usr_data: user-specified field
>   * @rx_action: what to do when connection socket is active
>   * @outqueue: pointer to first outbound message in queue
>   * @outqueue_lock: control access to the outqueue
> - * @outqueue: list of connection objects for its server
>   * @swork: send work item
>   */
>  struct tipc_conn {
> @@ -71,9 +73,10 @@ struct tipc_conn {
>       struct socket *sock;
>       unsigned long flags;
>       struct tipc_server *server;
> +     struct list_head sub_list;
> +     spinlock_t sub_lock; /* for subscription list */
>       struct work_struct rwork;
>       int (*rx_action) (struct tipc_conn *con);
> -     void *usr_data;
>       struct list_head outqueue;
>       spinlock_t outqueue_lock;
>       struct work_struct swork;
> @@ -81,6 +84,7 @@ struct tipc_conn {
>  
>  /* An entry waiting to be sent */
>  struct outqueue_entry {
> +     u32 evt;
>       struct list_head list;
>       struct kvec iov;
>  };
> @@ -89,18 +93,35 @@ static void tipc_recv_work(struct work_struct *work);
>  static void tipc_send_work(struct work_struct *work);
>  static void tipc_clean_outqueues(struct tipc_conn *con);
>  
> +static bool connected(struct tipc_conn *con)
> +{
> +     if (!con)
> +             return false;
> +     return test_bit(CF_CONNECTED, &con->flags);
> +}
> +
> +/**
> + * htohl - convert value to endianness used by destination
> + * @in: value to convert
> + * @swap: non-zero if endianness must be reversed
> + *
> + * Returns converted value
> + */
> +static u32 htohl(u32 in, int swap)
> +{
> +     return swap ? swab32(in) : in;
> +}
> +
>  static void tipc_conn_kref_release(struct kref *kref)
>  {
>       struct tipc_conn *con = container_of(kref, struct tipc_conn, kref);
>       struct tipc_server *s = con->server;
>       struct socket *sock = con->sock;
> -     struct sock *sk;
>  
>       if (sock) {
> -             sk = sock->sk;
>               if (test_bit(CF_SERVER, &con->flags)) {
>                       __module_get(sock->ops->owner);
> -                     __module_get(sk->sk_prot_creator->owner);
> +                     __module_get(sock->sk->sk_prot_creator->owner);
>               }
>               sock_release(sock);
>               con->sock = NULL;
> @@ -129,11 +150,8 @@ static struct tipc_conn *tipc_conn_lookup(struct 
> tipc_server *s, int conid)
>  
>       spin_lock_bh(&s->idr_lock);
>       con = idr_find(&s->conn_idr, conid);
> -     if (con) {
> -             if (!test_bit(CF_CONNECTED, &con->flags) ||
> -                 !kref_get_unless_zero(&con->kref))
> -                     con = NULL;
> -     }
> +     if (!connected(con) || !kref_get_unless_zero(&con->kref))
> +             con = NULL;
>       spin_unlock_bh(&s->idr_lock);
>       return con;
>  }
> @@ -144,7 +162,7 @@ static void sock_data_ready(struct sock *sk)
>  
>       read_lock_bh(&sk->sk_callback_lock);
>       con = sock2con(sk);
> -     if (con && test_bit(CF_CONNECTED, &con->flags)) {
> +     if (connected(con)) {
>               conn_get(con);
>               if (!queue_work(con->server->rcv_wq, &con->rwork))
>                       conn_put(con);
> @@ -158,7 +176,7 @@ static void sock_write_space(struct sock *sk)
>  
>       read_lock_bh(&sk->sk_callback_lock);
>       con = sock2con(sk);
> -     if (con && test_bit(CF_CONNECTED, &con->flags)) {
> +     if (connected(con)) {
>               conn_get(con);
>               if (!queue_work(con->server->send_wq, &con->swork))
>                       conn_put(con);
> @@ -181,6 +199,24 @@ static void tipc_register_callbacks(struct socket *sock, 
> struct tipc_conn *con)
>       write_unlock_bh(&sk->sk_callback_lock);
>  }
>  
> +/* tipc_con_delete_sub - delete a specific or all subscriptions
> + *  for a given subscriber
> + */
> +static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s)
> +{
> +     struct list_head *sub_list = &con->sub_list;
> +     struct tipc_subscription *sub, *tmp;
> +
> +     spin_lock_bh(&con->sub_lock);
> +     list_for_each_entry_safe(sub, tmp, sub_list, subscrp_list) {
> +             if (!s || !memcmp(s, &sub->evt.s, sizeof(*s)))
> +                     tipc_sub_delete(sub);
> +             else if (s)
> +                     break;
> +     }
> +     spin_unlock_bh(&con->sub_lock);
> +}
> +
>  static void tipc_close_conn(struct tipc_conn *con)
>  {
>       struct sock *sk = con->sock->sk;
> @@ -188,10 +224,11 @@ static void tipc_close_conn(struct tipc_conn *con)
>  
>       write_lock_bh(&sk->sk_callback_lock);
>       disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags);
> +
>       if (disconnect) {
>               sk->sk_user_data = NULL;
>               if (con->conid)
> -                     tipc_subscrb_delete(con->usr_data);
> +                     tipc_con_delete_sub(con, NULL);
>       }
>       write_unlock_bh(&sk->sk_callback_lock);
>  
> @@ -215,7 +252,9 @@ static struct tipc_conn *tipc_alloc_conn(struct 
> tipc_server *s)
>  
>       kref_init(&con->kref);
>       INIT_LIST_HEAD(&con->outqueue);
> +     INIT_LIST_HEAD(&con->sub_list);
>       spin_lock_init(&con->outqueue_lock);
> +     spin_lock_init(&con->sub_lock);
>       INIT_WORK(&con->swork, tipc_send_work);
>       INIT_WORK(&con->rwork, tipc_recv_work);
>  
> @@ -236,6 +275,35 @@ static struct tipc_conn *tipc_alloc_conn(struct 
> tipc_server *s)
>       return con;
>  }
>  
> +int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con,
> +                  void *buf, size_t len)
> +{
> +     struct tipc_subscr *s = (struct tipc_subscr *)buf;
> +     struct tipc_subscription *sub;
> +     bool status;
> +     int swap;
> +
> +     /* Determine subscriber's endianness */
> +     swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE |
> +                           TIPC_SUB_CANCEL));
> +
> +     /* Detect & process a subscription cancellation request */
> +     if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
> +             s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
> +             tipc_con_delete_sub(con, s);
> +             return 0;
> +     }
> +     status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
> +     sub = tipc_subscrp_subscribe(net, s, conid, swap, status);
> +     if (!sub)
> +             return -1;
> +
> +     spin_lock_bh(&con->sub_lock);
> +     list_add(&sub->subscrp_list, &con->sub_list);
> +     spin_unlock_bh(&con->sub_lock);
> +     return 0;
> +}
> +
>  static int tipc_receive_from_sock(struct tipc_conn *con)
>  {
>       struct tipc_server *s = con->server;
> @@ -262,9 +330,8 @@ static int tipc_receive_from_sock(struct tipc_conn *con)
>       }
>  
>       read_lock_bh(&sk->sk_callback_lock);
> -     if (test_bit(CF_CONNECTED, &con->flags))
> -             ret = tipc_subscrb_rcv(sock_net(con->sock->sk), con->conid,
> -                                    con->usr_data, buf, ret);
> +     if (connected(con))

It's unnecessary to validate "con" because it's always valid here.

> +             ret = tipc_con_rcv_sub(s->net, con->conid, con, buf, ret);
>       read_unlock_bh(&sk->sk_callback_lock);
>       kmem_cache_free(s->rcvbuf_cache, buf);
>       if (ret < 0)
> @@ -302,15 +369,6 @@ static int tipc_accept_from_sock(struct tipc_conn *con)
>       newcon->rx_action = tipc_receive_from_sock;
>       tipc_register_callbacks(newsock, newcon);
>  
> -     /* Notify that new connection is incoming */
> -     newcon->usr_data = tipc_subscrb_create(newcon->conid);
> -
> -     if (!newcon->usr_data) {
> -             sock_release(newsock);
> -             conn_put(newcon);
> -             return -ENOMEM;
> -     }
> -
>       /* Wake up receive process in case of 'SYN+' message */
>       newsock->sk->sk_data_ready(newsock->sk);
>       return ret;
> @@ -427,7 +485,7 @@ static void tipc_clean_outqueues(struct tipc_conn *con)
>  }
>  
>  int tipc_conn_sendmsg(struct tipc_server *s, int conid,
> -                   void *data, size_t len)
> +                   u32 evt, void *data, size_t len)
>  {
>       struct outqueue_entry *e;
>       struct tipc_conn *con;
> @@ -436,7 +494,7 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid,
>       if (!con)
>               return -EINVAL;
>  
> -     if (!test_bit(CF_CONNECTED, &con->flags)) {
> +     if (con && !connected(con)) {

It's unnecessary to check "con" because we just validated before.

>               conn_put(con);
>               return 0;
>       }
> @@ -446,7 +504,7 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid,
>               conn_put(con);
>               return -ENOMEM;
>       }
> -
> +     e->evt = evt;
>       spin_lock_bh(&con->outqueue_lock);
>       list_add_tail(&e->list, &con->outqueue);
>       spin_unlock_bh(&con->outqueue_lock);
> @@ -470,10 +528,9 @@ void tipc_conn_terminate(struct tipc_server *s, int 
> conid)
>  bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
>                            u32 upper, u32 filter, int *conid)
>  {
> -     struct tipc_subscriber *scbr;
>       struct tipc_subscr sub;
> -     struct tipc_server *s;
>       struct tipc_conn *con;
> +     int rc;
>  
>       sub.seq.type = type;
>       sub.seq.lower = lower;
> @@ -487,32 +544,23 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, 
> u32 type, u32 lower,
>               return false;
>  
>       *conid = con->conid;
> -     s = con->server;
> -     scbr = tipc_subscrb_create(*conid);
> -     if (!scbr) {
> -             conn_put(con);
> -             return false;
> -     }
> -
> -     con->usr_data = scbr;
>       con->sock = NULL;
> -     tipc_subscrb_rcv(net, *conid, scbr, &sub, sizeof(sub));
> -     return true;
> +     rc = tipc_con_rcv_sub(net, *conid, con, &sub, sizeof(sub));
> +     if (rc < 0)
> +             tipc_close_conn(con);
> +     return !rc;
>  }
>  
>  void tipc_topsrv_kern_unsubscr(struct net *net, int conid)
>  {
>       struct tipc_conn *con;
> -     struct tipc_server *srv;
>  
>       con = tipc_conn_lookup(tipc_topsrv(net), conid);
>       if (!con)
>               return;
>  
>       test_and_clear_bit(CF_CONNECTED, &con->flags);
> -     srv = con->server;
> -     if (con->conid)
> -             tipc_subscrb_delete(con->usr_data);
> +     tipc_con_delete_sub(con, NULL);
>       conn_put(con);
>       conn_put(con);
>  }
> @@ -537,7 +585,8 @@ static void tipc_send_kern_top_evt(struct net *net, 
> struct tipc_event *evt)
>  
>  static void tipc_send_to_sock(struct tipc_conn *con)
>  {
> -     struct tipc_server *s = con->server;
> +     struct list_head *queue = &con->outqueue;
> +     struct tipc_server *srv = con->server;
>       struct outqueue_entry *e;
>       struct tipc_event *evt;
>       struct msghdr msg;
> @@ -545,13 +594,19 @@ static void tipc_send_to_sock(struct tipc_conn *con)
>       int ret;
>  
>       spin_lock_bh(&con->outqueue_lock);
> -     while (test_bit(CF_CONNECTED, &con->flags)) {
> -             e = list_entry(con->outqueue.next, struct outqueue_entry, list);
> -             if ((struct list_head *) e == &con->outqueue)
> -                     break;
> +
> +     while (connected(con) && !list_empty(queue)) {

It's unnecessary to validate con as it's always valid here.

> +             e = list_first_entry(queue, struct outqueue_entry, list);
>  
>               spin_unlock_bh(&con->outqueue_lock);
>  
> +             if (e->evt == TIPC_SUBSCR_TIMEOUT) {
> +                     evt = (struct tipc_event *)e->iov.iov_base;
> +                     tipc_con_delete_sub(con, &evt->s);
> +             }
> +             memset(&msg, 0, sizeof(msg));
> +             msg.msg_flags = MSG_DONTWAIT;
> +
>               if (con->sock) {

Delete below two duplicated sentence because we have did above.


>                       memset(&msg, 0, sizeof(msg));
>                       msg.msg_flags = MSG_DONTWAIT;
> @@ -565,7 +620,7 @@ static void tipc_send_to_sock(struct tipc_conn *con)
>                       }
>               } else {
>                       evt = e->iov.iov_base;
> -                     tipc_send_kern_top_evt(s->net, evt);
> +                     tipc_send_kern_top_evt(srv->net, evt);
>               }
>  
>               /* Don't starve users filling buffers */
> @@ -573,7 +628,6 @@ static void tipc_send_to_sock(struct tipc_conn *con)
>                       cond_resched();
>                       count = 0;
>               }
> -
>               spin_lock_bh(&con->outqueue_lock);
>               list_del(&e->list);
>               tipc_free_entry(e);
> @@ -591,7 +645,7 @@ static void tipc_recv_work(struct work_struct *work)
>       struct tipc_conn *con = container_of(work, struct tipc_conn, rwork);
>       int count = 0;
>  
> -     while (test_bit(CF_CONNECTED, &con->flags)) {
> +     while (connected(con)) {

"con" is also definitely valid in the funciton. So it's unnecesary
validate it again.

>               if (con->rx_action(con))
>                       break;
>  
> @@ -608,7 +662,7 @@ static void tipc_send_work(struct work_struct *work)
>  {
>       struct tipc_conn *con = container_of(work, struct tipc_conn, swork);
>  
> -     if (test_bit(CF_CONNECTED, &con->flags))
> +     if (connected(con))

It's unnecessary to change this.

>               tipc_send_to_sock(con);
>  
>       conn_put(con);
> diff --git a/net/tipc/server.h b/net/tipc/server.h
> index b4b83bd..fcc7232 100644
> --- a/net/tipc/server.h
> +++ b/net/tipc/server.h
> @@ -77,7 +77,7 @@ struct tipc_server {
>  };
>  
>  int tipc_conn_sendmsg(struct tipc_server *s, int conid,
> -                   void *data, size_t len);
> +                   u32 evt, void *data, size_t len);
>  
>  bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
>                            u32 upper, u32 filter, int *conid);
> diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
> index b86fbbf..c6de145 100644
> --- a/net/tipc/subscr.c
> +++ b/net/tipc/subscr.c
> @@ -1,7 +1,7 @@
>  /*
>   * net/tipc/subscr.c: TIPC network topology service
>   *
> - * Copyright (c) 2000-2006, Ericsson AB
> + * Copyright (c) 2000-2017, Ericsson AB
>   * Copyright (c) 2005-2007, 2010-2013, Wind River Systems
>   * All rights reserved.
>   *
> @@ -39,22 +39,6 @@
>  #include "subscr.h"
>  
>  /**
> - * struct tipc_subscriber - TIPC network topology subscriber
> - * @kref: reference counter to tipc_subscription object
> - * @conid: connection identifier to server connecting to subscriber
> - * @lock: control access to subscriber
> - * @subscrp_list: list of subscription objects for this subscriber
> - */
> -struct tipc_subscriber {
> -     struct kref kref;
> -     int conid;
> -     spinlock_t lock;
> -     struct list_head subscrp_list;
> -};
> -
> -static void tipc_subscrb_put(struct tipc_subscriber *subscriber);
> -
> -/**
>   * htohl - convert value to endianness used by destination
>   * @in: value to convert
>   * @swap: non-zero if endianness must be reversed
> @@ -71,9 +55,10 @@ static void tipc_subscrp_send_event(struct 
> tipc_subscription *sub,
>                                   u32 event, u32 port_ref, u32 node)
>  {
>       struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
> -     struct tipc_subscriber *subscriber = sub->subscriber;
>       struct kvec msg_sect;
>  
> +     if (sub->inactive)
> +             return;
>       msg_sect.iov_base = (void *)&sub->evt;
>       msg_sect.iov_len = sizeof(struct tipc_event);
>       sub->evt.event = htohl(event, sub->swap);
> @@ -81,7 +66,7 @@ static void tipc_subscrp_send_event(struct 
> tipc_subscription *sub,
>       sub->evt.found_upper = htohl(found_upper, sub->swap);
>       sub->evt.port.ref = htohl(port_ref, sub->swap);
>       sub->evt.port.node = htohl(node, sub->swap);
> -     tipc_conn_sendmsg(tn->topsrv, subscriber->conid,
> +     tipc_conn_sendmsg(tn->topsrv, sub->conid, event,
>                         msg_sect.iov_base, msg_sect.iov_len);
>  }
>  
> @@ -132,41 +117,22 @@ void tipc_subscrp_report_overlap(struct 
> tipc_subscription *sub, u32 found_lower,
>               return;
>       if (filter & TIPC_SUB_NODE_SCOPE && scope != TIPC_NODE_SCOPE)
>               return;
> -
> -     tipc_subscrp_send_event(sub, found_lower, found_upper, event, port_ref,
> -                             node);
> +     spin_lock(&sub->lock);
> +     tipc_subscrp_send_event(sub, found_lower, found_upper,
> +                             event, port_ref, node);
> +     spin_unlock(&sub->lock);
>  }
>  
>  static void tipc_subscrp_timeout(struct timer_list *t)
>  {
>       struct tipc_subscription *sub = from_timer(sub, t, timer);
> -     struct tipc_subscriber *subscriber = sub->subscriber;
> -
> -     spin_lock_bh(&subscriber->lock);
> -     tipc_nametbl_unsubscribe(sub);
> -     list_del(&sub->subscrp_list);
> -     spin_unlock_bh(&subscriber->lock);
> +     struct tipc_subscr *s = &sub->evt.s;
>  
> -     /* Notify subscriber of timeout */
> -     tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
> +     spin_lock(&sub->lock);
> +     tipc_subscrp_send_event(sub, s->seq.lower, s->seq.upper,
>                               TIPC_SUBSCR_TIMEOUT, 0, 0);
> -
> -     tipc_subscrp_put(sub);
> -}
> -
> -static void tipc_subscrb_kref_release(struct kref *kref)
> -{
> -     kfree(container_of(kref,struct tipc_subscriber, kref));
> -}
> -
> -static void tipc_subscrb_put(struct tipc_subscriber *subscriber)
> -{
> -     kref_put(&subscriber->kref, tipc_subscrb_kref_release);
> -}
> -
> -static void tipc_subscrb_get(struct tipc_subscriber *subscriber)
> -{
> -     kref_get(&subscriber->kref);
> +     sub->inactive = true;
> +     spin_unlock(&sub->lock);
>  }
>  
>  static void tipc_subscrp_kref_release(struct kref *kref)
> @@ -175,11 +141,9 @@ static void tipc_subscrp_kref_release(struct kref *kref)
>                                                    struct tipc_subscription,
>                                                    kref);
>       struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
> -     struct tipc_subscriber *subscriber = sub->subscriber;
>  
>       atomic_dec(&tn->subscription_count);
>       kfree(sub);
> -     tipc_subscrb_put(subscriber);
>  }
>  
>  void tipc_subscrp_put(struct tipc_subscription *subscription)
> @@ -192,68 +156,9 @@ void tipc_subscrp_get(struct tipc_subscription 
> *subscription)
>       kref_get(&subscription->kref);
>  }
>  
> -/* tipc_subscrb_subscrp_delete - delete a specific subscription or all
> - * subscriptions for a given subscriber.
> - */
> -static void tipc_subscrb_subscrp_delete(struct tipc_subscriber *subscriber,
> -                                     struct tipc_subscr *s)
> -{
> -     struct list_head *subscription_list = &subscriber->subscrp_list;
> -     struct tipc_subscription *sub, *temp;
> -     u32 timeout;
> -
> -     spin_lock_bh(&subscriber->lock);
> -     list_for_each_entry_safe(sub, temp, subscription_list,  subscrp_list) {
> -             if (s && memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr)))
> -                     continue;
> -
> -             timeout = htohl(sub->evt.s.timeout, sub->swap);
> -             if (timeout == TIPC_WAIT_FOREVER || del_timer(&sub->timer)) {
> -                     tipc_nametbl_unsubscribe(sub);
> -                     list_del(&sub->subscrp_list);
> -                     tipc_subscrp_put(sub);
> -             }
> -
> -             if (s)
> -                     break;
> -     }
> -     spin_unlock_bh(&subscriber->lock);
> -}
> -
> -struct tipc_subscriber *tipc_subscrb_create(int conid)
> -{
> -     struct tipc_subscriber *subscriber;
> -
> -     subscriber = kzalloc(sizeof(*subscriber), GFP_ATOMIC);
> -     if (!subscriber) {
> -             pr_warn("Subscriber rejected, no memory\n");
> -             return NULL;
> -     }
> -     INIT_LIST_HEAD(&subscriber->subscrp_list);
> -     kref_init(&subscriber->kref);
> -     subscriber->conid = conid;
> -     spin_lock_init(&subscriber->lock);
> -
> -     return subscriber;
> -}
> -
> -void tipc_subscrb_delete(struct tipc_subscriber *subscriber)
> -{
> -     tipc_subscrb_subscrp_delete(subscriber, NULL);
> -     tipc_subscrb_put(subscriber);
> -}
> -
> -static void tipc_subscrp_cancel(struct tipc_subscr *s,
> -                             struct tipc_subscriber *subscriber)
> -{
> -     tipc_subscrb_get(subscriber);
> -     tipc_subscrb_subscrp_delete(subscriber, s);
> -     tipc_subscrb_put(subscriber);
> -}
> -
>  static struct tipc_subscription *tipc_subscrp_create(struct net *net,
>                                                    struct tipc_subscr *s,
> -                                                  int swap)
> +                                                  int conid, bool swap)
>  {
>       struct tipc_net *tn = net_generic(net, tipc_net_id);
>       struct tipc_subscription *sub;
> @@ -275,6 +180,8 @@ static struct tipc_subscription 
> *tipc_subscrp_create(struct net *net,
>  
>       /* Initialize subscription object */
>       sub->net = net;
> +     sub->conid = conid;
> +     sub->inactive = false;
>       if (((filter & TIPC_SUB_PORTS) && (filter & TIPC_SUB_SERVICE)) ||
>           (htohl(s->seq.lower, swap) > htohl(s->seq.upper, swap))) {
>               pr_warn("Subscription rejected, illegal request\n");
> @@ -284,59 +191,39 @@ static struct tipc_subscription 
> *tipc_subscrp_create(struct net *net,
>  
>       sub->swap = swap;
>       memcpy(&sub->evt.s, s, sizeof(*s));
> +     spin_lock_init(&sub->lock);
>       atomic_inc(&tn->subscription_count);
>       kref_init(&sub->kref);
>       return sub;
>  }
>  
> -static int tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s,
> -                               struct tipc_subscriber *subscriber, int swap,
> -                               bool status)
> +struct tipc_subscription *tipc_subscrp_subscribe(struct net *net,
> +                                              struct tipc_subscr *s,
> +                                              int conid, bool swap,
> +                                              bool status)
>  {
>       struct tipc_subscription *sub = NULL;
>       u32 timeout;
>  
> -     sub = tipc_subscrp_create(net, s, swap);
> +     sub = tipc_subscrp_create(net, s, conid, swap);
>       if (!sub)
> -             return -1;
> +             return NULL;
>  
> -     spin_lock_bh(&subscriber->lock);
> -     list_add(&sub->subscrp_list, &subscriber->subscrp_list);
> -     sub->subscriber = subscriber;
>       tipc_nametbl_subscribe(sub, status);
> -     tipc_subscrb_get(subscriber);
> -     spin_unlock_bh(&subscriber->lock);
> -
>       timer_setup(&sub->timer, tipc_subscrp_timeout, 0);
>       timeout = htohl(sub->evt.s.timeout, swap);
> -
>       if (timeout != TIPC_WAIT_FOREVER)
>               mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout));
> -     return 0;
> +     return sub;
>  }
>  
> -/* Handle one request to create a new subscription for the subscriber
> - */
> -int tipc_subscrb_rcv(struct net *net, int conid, void *usr_data,
> -                  void *buf, size_t len)
> +void tipc_sub_delete(struct tipc_subscription *sub)
>  {
> -     struct tipc_subscriber *subscriber = usr_data;
> -     struct tipc_subscr *s = (struct tipc_subscr *)buf;
> -     bool status;
> -     int swap;
> -
> -     /* Determine subscriber's endianness */
> -     swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE |
> -                           TIPC_SUB_CANCEL));
> -
> -     /* Detect & process a subscription cancellation request */
> -     if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
> -             s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
> -             tipc_subscrp_cancel(s, subscriber);
> -             return 0;
> -     }
> -     status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
> -     return tipc_subscrp_subscribe(net, s, subscriber, swap, status);
> +     tipc_nametbl_unsubscribe(sub);
> +     if (sub->evt.s.timeout != TIPC_WAIT_FOREVER)
> +             del_timer_sync(&sub->timer);
> +     list_del(&sub->subscrp_list);
> +     tipc_subscrp_put(sub);
>  }
>  
>  int tipc_topsrv_start(struct net *net)
> diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
> index a736f29..cfd0cb3 100644
> --- a/net/tipc/subscr.h
> +++ b/net/tipc/subscr.h
> @@ -43,7 +43,7 @@
>  #define TIPC_MAX_PUBLICATIONS        65535
>  
>  struct tipc_subscription;
> -struct tipc_subscriber;
> +struct tipc_conn;
>  
>  /**
>   * struct tipc_subscription - TIPC network topology subscription object
> @@ -58,19 +58,22 @@ struct tipc_subscriber;
>   */
>  struct tipc_subscription {
>       struct kref kref;
> -     struct tipc_subscriber *subscriber;
>       struct net *net;
>       struct timer_list timer;
>       struct list_head nameseq_list;
>       struct list_head subscrp_list;
> -     int swap;
>       struct tipc_event evt;
> +     int conid;
> +     bool swap;
> +     bool inactive;
> +     spinlock_t lock; /* serialize up/down and timer events */
>  };
>  
> -struct tipc_subscriber *tipc_subscrb_create(int conid);
> -void tipc_subscrb_delete(struct tipc_subscriber *subscriber);
> -int tipc_subscrb_rcv(struct net *net, int conid, void *usr_data,
> -                  void *buf, size_t len);
> +struct tipc_subscription *tipc_subscrp_subscribe(struct net *net,
> +                                              struct tipc_subscr *s,
> +                                              int conid, bool swap,
> +                                              bool status);
> +void tipc_sub_delete(struct tipc_subscription *sub);
>  int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower,
>                              u32 found_upper);
>  void tipc_subscrp_report_overlap(struct tipc_subscription *sub,
> 

------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, Slashdot.org! http://sdm.link/slashdot
_______________________________________________
tipc-discussion mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to