To be honest, I don't very like this solution. But I cannot think one
solution is better than this one although I tried at least the following
two methods to solve the issue.

- Introduce one timeout list to subscriber. When subscription is
expired, it will be moved to its timeout list. When name service is
deleted or inserted, we are going to release all subscriptions linked in
timeout list. After I tried to do the experiment, it's found we have to
make a big change for current code;

- In subscription's timeout handler, when subsription timeout event is
sent to server through tipc_subscrp_send_event(), we can add a new flag
in the message. When server sends the timeout message to user space send
workqueue, it can send back a TIPC_SUB_CANCEL message to subscriber
through tipc_subscrb_rcv_cb() if that new flag is set. When the
TIPC_SUB_CANCEL message arrives at subscriber, the expired subscription
can be deleted at the moment. The approach is still infeasible due to
the same reason as previous one.

In all, this solution is a bit elegant compared to above two.

If you have any suggestion, please let me know.

Thanks,
Ying

On 03/09/2017 10:21 PM, Ying Xue wrote:
> After the design policy of subscription timeout is adjusted,
> subscription is still present in name table after it's expired, and
> hence the subscriber will receive new events for this subscription
> until the subscriber terminates.
> 
> Now when subscription is expired, it will be first deleted from name
> table and its subscriber, and then schedule its work. Finally the
> subscription will be destroyed in workqueue asynchronously.
> 
> Signed-off-by: Ying Xue <ying....@windriver.com>
> ---
>  net/tipc/server.h |  2 ++
>  net/tipc/subscr.c | 32 ++++++++++++++++++++++++++++++++
>  net/tipc/subscr.h |  1 +
>  3 files changed, 35 insertions(+)
> 
> diff --git a/net/tipc/server.h b/net/tipc/server.h
> index 34f8055..7d5588f 100644
> --- a/net/tipc/server.h
> +++ b/net/tipc/server.h
> @@ -59,6 +59,7 @@
>   * @name: server name
>   * @imp: message importance
>   * @type: socket type
> + * @wq: workqueue
>   */
>  struct tipc_server {
>       struct idr conn_idr;
> @@ -78,6 +79,7 @@ struct tipc_server {
>       char name[TIPC_SERVER_NAME_LEN];
>       int imp;
>       int type;
> +     struct workqueue_struct *wq;
>  };
>  
>  int tipc_conn_sendmsg(struct tipc_server *s, int conid,
> diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
> index ea67cce..3dcb009 100644
> --- a/net/tipc/subscr.c
> +++ b/net/tipc/subscr.c
> @@ -137,14 +137,34 @@ void tipc_subscrp_report_overlap(struct 
> tipc_subscription *sub, u32 found_lower,
>  static void tipc_subscrp_timeout(unsigned long data)
>  {
>       struct tipc_subscription *sub = (struct tipc_subscription *)data;
> +     struct tipc_subscriber *subscriber = sub->subscriber;
> +     struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
>  
>       /* Notify subscriber of timeout */
>       tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
>                               TIPC_SUBSCR_TIMEOUT, 0, 0);
>  
> +     spin_lock_bh(&subscriber->lock);
> +     list_del(&sub->subscrp_list);
> +     tipc_nametbl_unsubscribe(sub);
> +     spin_unlock_bh(&subscriber->lock);
> +
> +     queue_work(tn->topsrv->wq, &sub->work);
> +
>       tipc_subscrp_put(sub);
>  }
>  
> +static void tipc_subscrp_work(struct work_struct *work)
> +{
> +     struct tipc_subscription *sub = container_of(work,
> +                                     struct tipc_subscription, work);
> +     struct tipc_subscriber *subscriber = sub->subscriber;
> +
> +     spin_lock_bh(&subscriber->lock);
> +     tipc_subscrp_delete(sub);
> +     spin_unlock_bh(&subscriber->lock);
> +}
> +
>  static void tipc_subscrb_kref_release(struct kref *kref)
>  {
>       kfree(container_of(kref,struct tipc_subscriber, kref));
> @@ -282,6 +302,9 @@ static struct tipc_subscription 
> *tipc_subscrp_create(struct net *net,
>       memcpy(&sub->evt.s, s, sizeof(*s));
>       atomic_inc(&tn->subscription_count);
>       kref_init(&sub->kref);
> +
> +     INIT_WORK(&sub->work, tipc_subscrp_work);
> +
>       return sub;
>  }
>  
> @@ -379,6 +402,13 @@ int tipc_topsrv_start(struct net *net)
>       topsrv->tipc_conn_release       = tipc_subscrb_release_cb;
>  
>       strncpy(topsrv->name, name, strlen(name) + 1);
> +     topsrv->wq = create_singlethread_workqueue(topsrv->name);
> +     if (!topsrv->wq) {
> +             kfree(saddr);
> +             kfree(topsrv);
> +             return -ENOMEM;
> +     }
> +
>       tn->topsrv = topsrv;
>       atomic_set(&tn->subscription_count, 0);
>  
> @@ -391,6 +421,8 @@ void tipc_topsrv_stop(struct net *net)
>       struct tipc_server *topsrv = tn->topsrv;
>  
>       tipc_server_stop(topsrv);
> +     flush_workqueue(topsrv->wq);
> +     destroy_workqueue(topsrv->wq);
>       kfree(topsrv->saddr);
>       kfree(topsrv);
>  }
> diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
> index ee52957..50fc64b 100644
> --- a/net/tipc/subscr.h
> +++ b/net/tipc/subscr.h
> @@ -65,6 +65,7 @@ struct tipc_subscription {
>       struct list_head subscrp_list;
>       int swap;
>       struct tipc_event evt;
> +     struct work_struct work;
>  };
>  
>  int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower,
> 

------------------------------------------------------------------------------
Announcing the Oxford Dictionaries API! The API offers world-renowned
dictionary content that is easy and intuitive to access. Sign up for an
account today to start using our lexical data to power your apps and
projects. Get started today and enter our developer competition.
http://sdm.link/oxford
_______________________________________________
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to