On 01/16/2017 11:41 PM, Parthasarathy Bhuvaragan wrote:
> Until now, the subscribers keep track of the subscriptions using
> reference count at subscriber level. At subscription cancel or
> subscriber delete, we delete the subscription by checking for
> pending timer using del_timer(). del_timer() is not SMP safe, if
> on CPU0 the check for pending timer returns true but CPU1 might
> schedule the timer callback thereby deleting the subscription.
> Thus when CPU0 is scheduled, it deletes an invalid subscription.
>
> We can fix the above issue using either a synchronous timer function
> or by introducing subscription reference count. The later solution
> is preferred as it's consistent with the asynchronous design model
> used by the rest of the code.
>
> In this commit, we introduce subscription refcount to avoid deleting
> an invalid subscription.
>
> Signed-off-by: Parthasarathy Bhuvaragan 
> <parthasarathy.bhuvara...@ericsson.com>
> ---
> v2: Address comments from Ying Xue, to avoid race conditions by using
>     refcount only instead of del_timer_sync().
> ---
>  net/tipc/subscr.c | 120 
> +++++++++++++++++++++++++++++-------------------------
>  net/tipc/subscr.h |   1 +
>  2 files changed, 66 insertions(+), 55 deletions(-)
>
> diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
> index 0dd02244e21d..d6c1f35f6518 100644
> --- a/net/tipc/subscr.c
> +++ b/net/tipc/subscr.c
> @@ -54,6 +54,7 @@ struct tipc_subscriber {
>
>  static void tipc_subscrp_delete(struct tipc_subscription *sub);
>  static void tipc_subscrb_put(struct tipc_subscriber *subscriber);
> +static void tipc_subscrp_put(struct tipc_subscription *subscription);
>
>  /**
>   * htohl - convert value to endianness used by destination
> @@ -137,25 +138,17 @@ void tipc_subscrp_report_overlap(struct 
> tipc_subscription *sub, u32 found_lower,
>  static void tipc_subscrp_timeout(unsigned long data)
>  {
>       struct tipc_subscription *sub = (struct tipc_subscription *)data;
> -     struct tipc_subscriber *subscriber = sub->subscriber;
>
>       /* Notify subscriber of timeout */
>       tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
>                               TIPC_SUBSCR_TIMEOUT, 0, 0);
>
> -     spin_lock_bh(&subscriber->lock);
> -     tipc_subscrp_delete(sub);
> -     spin_unlock_bh(&subscriber->lock);
> -
> -     tipc_subscrb_put(subscriber);
> +     tipc_subscrp_put(sub);
>  }
>
>  static void tipc_subscrb_kref_release(struct kref *kref)
>  {
> -     struct tipc_subscriber *subcriber = container_of(kref,
> -                                         struct tipc_subscriber, kref);
> -
> -     kfree(subcriber);
> +     kfree(container_of(kref,struct tipc_subscriber, kref));
>  }
>
>  static void tipc_subscrb_put(struct tipc_subscriber *subscriber)
> @@ -168,6 +161,56 @@ static void tipc_subscrb_get(struct tipc_subscriber 
> *subscriber)
>       kref_get(&subscriber->kref);
>  }
>
> +static void tipc_subscrp_kref_release(struct kref *kref)
> +{
> +     struct tipc_subscription *sub = container_of(kref,
> +                                                  struct tipc_subscription,
> +                                                  kref);
> +     struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
> +     struct tipc_subscriber *subscriber = sub->subscriber;
> +
> +     spin_lock_bh(&subscriber->lock);
> +     tipc_nametbl_unsubscribe(sub);
> +     list_del(&sub->subscrp_list);
> +     atomic_dec(&tn->subscription_count);
> +     spin_unlock_bh(&subscriber->lock);
> +     kfree(sub);
> +     tipc_subscrb_put(subscriber);
> +}
> +
> +static void tipc_subscrp_put(struct tipc_subscription *subscription)
> +{
> +     kref_put(&subscription->kref, tipc_subscrp_kref_release);
> +}
> +
> +static void tipc_subscrp_get(struct tipc_subscription *subscription)
> +{
> +     kref_get(&subscription->kref);
> +}
> +
> +/* tipc_subscrb_subscrp_delete - delete a specific subscription or all
> + * subscriptions for a given subscriber.
> + */
> +static void tipc_subscrb_subscrp_delete(struct tipc_subscriber *subscriber,
> +                                     struct tipc_subscr *s)
> +{
> +     struct tipc_subscription *sub, *temp;
> +
> +     spin_lock_bh(&subscriber->lock);
> +     list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
> +                              subscrp_list) {
> +             spin_unlock_bh(&subscriber->lock);
> +             if (s && !memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) {
> +                     tipc_subscrp_delete(sub);
> +                     return;
> +             } else {
> +                     tipc_subscrp_delete(sub);
> +             }

If we change above code as below, it seems more simple:

list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list, 
subscrp_list) {
     spin_unlock_bh(&subscriber->lock);
     tipc_subscrp_delete(sub);
     if (s && !memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr)))
         return;
     spin_lock_bh(&subscriber->lock);
}


Besides the commit,this version is exactly what I expect. It's very good!

Thanks for the nice job!

Please add my ack-by flag to it.

Regards,
Ying

> +             spin_lock_bh(&subscriber->lock);
> +     }
> +     spin_unlock_bh(&subscriber->lock);
> +}
> +
>  static struct tipc_subscriber *tipc_subscrb_create(int conid)
>  {
>       struct tipc_subscriber *subscriber;
> @@ -177,8 +220,8 @@ static struct tipc_subscriber *tipc_subscrb_create(int 
> conid)
>               pr_warn("Subscriber rejected, no memory\n");
>               return NULL;
>       }
> -     kref_init(&subscriber->kref);
>       INIT_LIST_HEAD(&subscriber->subscrp_list);
> +     kref_init(&subscriber->kref);
>       subscriber->conid = conid;
>       spin_lock_init(&subscriber->lock);
>
> @@ -187,55 +230,21 @@ static struct tipc_subscriber *tipc_subscrb_create(int 
> conid)
>
>  static void tipc_subscrb_delete(struct tipc_subscriber *subscriber)
>  {
> -     struct tipc_subscription *sub, *temp;
> -     u32 timeout;
> -
> -     spin_lock_bh(&subscriber->lock);
> -     /* Destroy any existing subscriptions for subscriber */
> -     list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
> -                              subscrp_list) {
> -             timeout = htohl(sub->evt.s.timeout, sub->swap);
> -             if ((timeout == TIPC_WAIT_FOREVER) || del_timer(&sub->timer)) {
> -                     tipc_subscrp_delete(sub);
> -                     tipc_subscrb_put(subscriber);
> -             }
> -     }
> -     spin_unlock_bh(&subscriber->lock);
> -
> +     tipc_subscrb_subscrp_delete(subscriber, NULL);
>       tipc_subscrb_put(subscriber);
>  }
>
>  static void tipc_subscrp_delete(struct tipc_subscription *sub)
>  {
> -     struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
> -
> -     tipc_nametbl_unsubscribe(sub);
> -     list_del(&sub->subscrp_list);
> -     kfree(sub);
> -     atomic_dec(&tn->subscription_count);
> +     if (del_timer(&sub->timer))
> +             tipc_subscrp_put(sub);
> +     tipc_subscrp_put(sub);
>  }
>
>  static void tipc_subscrp_cancel(struct tipc_subscr *s,
>                               struct tipc_subscriber *subscriber)
>  {
> -     struct tipc_subscription *sub, *temp;
> -     u32 timeout;
> -
> -     spin_lock_bh(&subscriber->lock);
> -     /* Find first matching subscription, exit if not found */
> -     list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
> -                              subscrp_list) {
> -             if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) {
> -                     timeout = htohl(sub->evt.s.timeout, sub->swap);
> -                     if ((timeout == TIPC_WAIT_FOREVER) ||
> -                         del_timer(&sub->timer)) {
> -                             tipc_subscrp_delete(sub);
> -                             tipc_subscrb_put(subscriber);
> -                     }
> -                     break;
> -             }
> -     }
> -     spin_unlock_bh(&subscriber->lock);
> +     tipc_subscrb_subscrp_delete(subscriber, s);
>  }
>
>  static struct tipc_subscription *tipc_subscrp_create(struct net *net,
> @@ -272,6 +281,7 @@ static struct tipc_subscription 
> *tipc_subscrp_create(struct net *net,
>       sub->swap = swap;
>       memcpy(&sub->evt.s, s, sizeof(*s));
>       atomic_inc(&tn->subscription_count);
> +     kref_init(&sub->kref);
>       return sub;
>  }
>
> @@ -288,17 +298,17 @@ static void tipc_subscrp_subscribe(struct net *net, 
> struct tipc_subscr *s,
>
>       spin_lock_bh(&subscriber->lock);
>       list_add(&sub->subscrp_list, &subscriber->subscrp_list);
> -     tipc_subscrb_get(subscriber);
>       sub->subscriber = subscriber;
>       tipc_nametbl_subscribe(sub);
> +     tipc_subscrb_get(subscriber);
>       spin_unlock_bh(&subscriber->lock);
>
> +     tipc_subscrp_get(sub);
> +     setup_timer(&sub->timer, tipc_subscrp_timeout, (unsigned long)sub);
>       timeout = htohl(sub->evt.s.timeout, swap);
> -     if (timeout == TIPC_WAIT_FOREVER)
> -             return;
>
> -     setup_timer(&sub->timer, tipc_subscrp_timeout, (unsigned long)sub);
> -     mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout));
> +     if (timeout != TIPC_WAIT_FOREVER)
> +             mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout));
>  }
>
>  /* Handle one termination request for the subscriber */
> diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
> index be60103082c9..ffdc214c117a 100644
> --- a/net/tipc/subscr.h
> +++ b/net/tipc/subscr.h
> @@ -57,6 +57,7 @@ struct tipc_subscriber;
>   * @evt: template for events generated by subscription
>   */
>  struct tipc_subscription {
> +     struct kref kref;
>       struct tipc_subscriber *subscriber;
>       struct net *net;
>       struct timer_list timer;
>


------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, SlashDot.org! http://sdm.link/slashdot
_______________________________________________
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to