On 01/16/2017 11:41 PM, Parthasarathy Bhuvaragan wrote:
> Until now, the subscribers keep track of the subscriptions using
> reference count at subscriber level. At subscription cancel or
> subscriber delete, we delete the subscription by checking for
> pending timer using del_timer(). del_timer() is not SMP safe, if
> on CPU0 the check for pending timer returns true but CPU1 might
> schedule the timer callback thereby deleting the subscription.
> Thus when CPU0 is scheduled, it deletes an invalid subscription.
>
> We can fix the above issue using either a synchronous timer function
> or by introducing subscription reference count. The later solution
> is preferred as it's consistent with the asynchronous design model
> used by the rest of the code.
>
> In this commit, we introduce subscription refcount to avoid deleting
> an invalid subscription.
>
> Signed-off-by: Parthasarathy Bhuvaragan
> <[email protected]>
> ---
> v2: Address comments from Ying Xue, to avoid race conditions by using
> refcount only instead of del_timer_sync().
> ---
> net/tipc/subscr.c | 120
> +++++++++++++++++++++++++++++-------------------------
> net/tipc/subscr.h | 1 +
> 2 files changed, 66 insertions(+), 55 deletions(-)
>
> diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
> index 0dd02244e21d..d6c1f35f6518 100644
> --- a/net/tipc/subscr.c
> +++ b/net/tipc/subscr.c
> @@ -54,6 +54,7 @@ struct tipc_subscriber {
>
> static void tipc_subscrp_delete(struct tipc_subscription *sub);
> static void tipc_subscrb_put(struct tipc_subscriber *subscriber);
> +static void tipc_subscrp_put(struct tipc_subscription *subscription);
>
> /**
> * htohl - convert value to endianness used by destination
> @@ -137,25 +138,17 @@ void tipc_subscrp_report_overlap(struct
> tipc_subscription *sub, u32 found_lower,
> static void tipc_subscrp_timeout(unsigned long data)
> {
> struct tipc_subscription *sub = (struct tipc_subscription *)data;
> - struct tipc_subscriber *subscriber = sub->subscriber;
>
> /* Notify subscriber of timeout */
> tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
> TIPC_SUBSCR_TIMEOUT, 0, 0);
>
> - spin_lock_bh(&subscriber->lock);
> - tipc_subscrp_delete(sub);
> - spin_unlock_bh(&subscriber->lock);
> -
> - tipc_subscrb_put(subscriber);
> + tipc_subscrp_put(sub);
> }
>
> static void tipc_subscrb_kref_release(struct kref *kref)
> {
> - struct tipc_subscriber *subcriber = container_of(kref,
> - struct tipc_subscriber, kref);
> -
> - kfree(subcriber);
> + kfree(container_of(kref,struct tipc_subscriber, kref));
> }
>
> static void tipc_subscrb_put(struct tipc_subscriber *subscriber)
> @@ -168,6 +161,56 @@ static void tipc_subscrb_get(struct tipc_subscriber
> *subscriber)
> kref_get(&subscriber->kref);
> }
>
> +static void tipc_subscrp_kref_release(struct kref *kref)
> +{
> + struct tipc_subscription *sub = container_of(kref,
> + struct tipc_subscription,
> + kref);
> + struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
> + struct tipc_subscriber *subscriber = sub->subscriber;
> +
> + spin_lock_bh(&subscriber->lock);
> + tipc_nametbl_unsubscribe(sub);
> + list_del(&sub->subscrp_list);
> + atomic_dec(&tn->subscription_count);
> + spin_unlock_bh(&subscriber->lock);
> + kfree(sub);
> + tipc_subscrb_put(subscriber);
> +}
> +
> +static void tipc_subscrp_put(struct tipc_subscription *subscription)
> +{
> + kref_put(&subscription->kref, tipc_subscrp_kref_release);
> +}
> +
> +static void tipc_subscrp_get(struct tipc_subscription *subscription)
> +{
> + kref_get(&subscription->kref);
> +}
> +
> +/* tipc_subscrb_subscrp_delete - delete a specific subscription or all
> + * subscriptions for a given subscriber.
> + */
> +static void tipc_subscrb_subscrp_delete(struct tipc_subscriber *subscriber,
> + struct tipc_subscr *s)
> +{
> + struct tipc_subscription *sub, *temp;
> +
> + spin_lock_bh(&subscriber->lock);
> + list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
> + subscrp_list) {
> + spin_unlock_bh(&subscriber->lock);
> + if (s && !memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) {
> + tipc_subscrp_delete(sub);
> + return;
> + } else {
> + tipc_subscrp_delete(sub);
> + }
If we change above code as below, it seems more simple:
list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
subscrp_list) {
spin_unlock_bh(&subscriber->lock);
tipc_subscrp_delete(sub);
if (s && !memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr)))
return;
spin_lock_bh(&subscriber->lock);
}
Besides the commit,this version is exactly what I expect. It's very good!
Thanks for the nice job!
Please add my ack-by flag to it.
Regards,
Ying
> + spin_lock_bh(&subscriber->lock);
> + }
> + spin_unlock_bh(&subscriber->lock);
> +}
> +
> static struct tipc_subscriber *tipc_subscrb_create(int conid)
> {
> struct tipc_subscriber *subscriber;
> @@ -177,8 +220,8 @@ static struct tipc_subscriber *tipc_subscrb_create(int
> conid)
> pr_warn("Subscriber rejected, no memory\n");
> return NULL;
> }
> - kref_init(&subscriber->kref);
> INIT_LIST_HEAD(&subscriber->subscrp_list);
> + kref_init(&subscriber->kref);
> subscriber->conid = conid;
> spin_lock_init(&subscriber->lock);
>
> @@ -187,55 +230,21 @@ static struct tipc_subscriber *tipc_subscrb_create(int
> conid)
>
> static void tipc_subscrb_delete(struct tipc_subscriber *subscriber)
> {
> - struct tipc_subscription *sub, *temp;
> - u32 timeout;
> -
> - spin_lock_bh(&subscriber->lock);
> - /* Destroy any existing subscriptions for subscriber */
> - list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
> - subscrp_list) {
> - timeout = htohl(sub->evt.s.timeout, sub->swap);
> - if ((timeout == TIPC_WAIT_FOREVER) || del_timer(&sub->timer)) {
> - tipc_subscrp_delete(sub);
> - tipc_subscrb_put(subscriber);
> - }
> - }
> - spin_unlock_bh(&subscriber->lock);
> -
> + tipc_subscrb_subscrp_delete(subscriber, NULL);
> tipc_subscrb_put(subscriber);
> }
>
> static void tipc_subscrp_delete(struct tipc_subscription *sub)
> {
> - struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
> -
> - tipc_nametbl_unsubscribe(sub);
> - list_del(&sub->subscrp_list);
> - kfree(sub);
> - atomic_dec(&tn->subscription_count);
> + if (del_timer(&sub->timer))
> + tipc_subscrp_put(sub);
> + tipc_subscrp_put(sub);
> }
>
> static void tipc_subscrp_cancel(struct tipc_subscr *s,
> struct tipc_subscriber *subscriber)
> {
> - struct tipc_subscription *sub, *temp;
> - u32 timeout;
> -
> - spin_lock_bh(&subscriber->lock);
> - /* Find first matching subscription, exit if not found */
> - list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
> - subscrp_list) {
> - if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) {
> - timeout = htohl(sub->evt.s.timeout, sub->swap);
> - if ((timeout == TIPC_WAIT_FOREVER) ||
> - del_timer(&sub->timer)) {
> - tipc_subscrp_delete(sub);
> - tipc_subscrb_put(subscriber);
> - }
> - break;
> - }
> - }
> - spin_unlock_bh(&subscriber->lock);
> + tipc_subscrb_subscrp_delete(subscriber, s);
> }
>
> static struct tipc_subscription *tipc_subscrp_create(struct net *net,
> @@ -272,6 +281,7 @@ static struct tipc_subscription
> *tipc_subscrp_create(struct net *net,
> sub->swap = swap;
> memcpy(&sub->evt.s, s, sizeof(*s));
> atomic_inc(&tn->subscription_count);
> + kref_init(&sub->kref);
> return sub;
> }
>
> @@ -288,17 +298,17 @@ static void tipc_subscrp_subscribe(struct net *net,
> struct tipc_subscr *s,
>
> spin_lock_bh(&subscriber->lock);
> list_add(&sub->subscrp_list, &subscriber->subscrp_list);
> - tipc_subscrb_get(subscriber);
> sub->subscriber = subscriber;
> tipc_nametbl_subscribe(sub);
> + tipc_subscrb_get(subscriber);
> spin_unlock_bh(&subscriber->lock);
>
> + tipc_subscrp_get(sub);
> + setup_timer(&sub->timer, tipc_subscrp_timeout, (unsigned long)sub);
> timeout = htohl(sub->evt.s.timeout, swap);
> - if (timeout == TIPC_WAIT_FOREVER)
> - return;
>
> - setup_timer(&sub->timer, tipc_subscrp_timeout, (unsigned long)sub);
> - mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout));
> + if (timeout != TIPC_WAIT_FOREVER)
> + mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout));
> }
>
> /* Handle one termination request for the subscriber */
> diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
> index be60103082c9..ffdc214c117a 100644
> --- a/net/tipc/subscr.h
> +++ b/net/tipc/subscr.h
> @@ -57,6 +57,7 @@ struct tipc_subscriber;
> * @evt: template for events generated by subscription
> */
> struct tipc_subscription {
> + struct kref kref;
> struct tipc_subscriber *subscriber;
> struct net *net;
> struct timer_list timer;
>
------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, SlashDot.org! http://sdm.link/slashdot
_______________________________________________
tipc-discussion mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/tipc-discussion