On 01/16/2017 11:41 PM, Parthasarathy Bhuvaragan wrote: > Until now, the subscribers keep track of the subscriptions using > reference count at subscriber level. At subscription cancel or > subscriber delete, we delete the subscription by checking for > pending timer using del_timer(). del_timer() is not SMP safe, if > on CPU0 the check for pending timer returns true but CPU1 might > schedule the timer callback thereby deleting the subscription. > Thus when CPU0 is scheduled, it deletes an invalid subscription. > > We can fix the above issue using either a synchronous timer function > or by introducing subscription reference count. The later solution > is preferred as it's consistent with the asynchronous design model > used by the rest of the code. > > In this commit, we introduce subscription refcount to avoid deleting > an invalid subscription. > > Signed-off-by: Parthasarathy Bhuvaragan > <parthasarathy.bhuvara...@ericsson.com> > --- > v2: Address comments from Ying Xue, to avoid race conditions by using > refcount only instead of del_timer_sync(). > --- > net/tipc/subscr.c | 120 > +++++++++++++++++++++++++++++------------------------- > net/tipc/subscr.h | 1 + > 2 files changed, 66 insertions(+), 55 deletions(-) > > diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c > index 0dd02244e21d..d6c1f35f6518 100644 > --- a/net/tipc/subscr.c > +++ b/net/tipc/subscr.c > @@ -54,6 +54,7 @@ struct tipc_subscriber { > > static void tipc_subscrp_delete(struct tipc_subscription *sub); > static void tipc_subscrb_put(struct tipc_subscriber *subscriber); > +static void tipc_subscrp_put(struct tipc_subscription *subscription); > > /** > * htohl - convert value to endianness used by destination > @@ -137,25 +138,17 @@ void tipc_subscrp_report_overlap(struct > tipc_subscription *sub, u32 found_lower, > static void tipc_subscrp_timeout(unsigned long data) > { > struct tipc_subscription *sub = (struct tipc_subscription *)data; > - struct tipc_subscriber *subscriber = sub->subscriber; > > /* Notify subscriber of timeout */ > tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper, > TIPC_SUBSCR_TIMEOUT, 0, 0); > > - spin_lock_bh(&subscriber->lock); > - tipc_subscrp_delete(sub); > - spin_unlock_bh(&subscriber->lock); > - > - tipc_subscrb_put(subscriber); > + tipc_subscrp_put(sub); > } > > static void tipc_subscrb_kref_release(struct kref *kref) > { > - struct tipc_subscriber *subcriber = container_of(kref, > - struct tipc_subscriber, kref); > - > - kfree(subcriber); > + kfree(container_of(kref,struct tipc_subscriber, kref)); > } > > static void tipc_subscrb_put(struct tipc_subscriber *subscriber) > @@ -168,6 +161,56 @@ static void tipc_subscrb_get(struct tipc_subscriber > *subscriber) > kref_get(&subscriber->kref); > } > > +static void tipc_subscrp_kref_release(struct kref *kref) > +{ > + struct tipc_subscription *sub = container_of(kref, > + struct tipc_subscription, > + kref); > + struct tipc_net *tn = net_generic(sub->net, tipc_net_id); > + struct tipc_subscriber *subscriber = sub->subscriber; > + > + spin_lock_bh(&subscriber->lock); > + tipc_nametbl_unsubscribe(sub); > + list_del(&sub->subscrp_list); > + atomic_dec(&tn->subscription_count); > + spin_unlock_bh(&subscriber->lock); > + kfree(sub); > + tipc_subscrb_put(subscriber); > +} > + > +static void tipc_subscrp_put(struct tipc_subscription *subscription) > +{ > + kref_put(&subscription->kref, tipc_subscrp_kref_release); > +} > + > +static void tipc_subscrp_get(struct tipc_subscription *subscription) > +{ > + kref_get(&subscription->kref); > +} > + > +/* tipc_subscrb_subscrp_delete - delete a specific subscription or all > + * subscriptions for a given subscriber. > + */ > +static void tipc_subscrb_subscrp_delete(struct tipc_subscriber *subscriber, > + struct tipc_subscr *s) > +{ > + struct tipc_subscription *sub, *temp; > + > + spin_lock_bh(&subscriber->lock); > + list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list, > + subscrp_list) { > + spin_unlock_bh(&subscriber->lock); > + if (s && !memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) { > + tipc_subscrp_delete(sub); > + return; > + } else { > + tipc_subscrp_delete(sub); > + }
If we change above code as below, it seems more simple: list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list, subscrp_list) { spin_unlock_bh(&subscriber->lock); tipc_subscrp_delete(sub); if (s && !memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) return; spin_lock_bh(&subscriber->lock); } Besides the commit,this version is exactly what I expect. It's very good! Thanks for the nice job! Please add my ack-by flag to it. Regards, Ying > + spin_lock_bh(&subscriber->lock); > + } > + spin_unlock_bh(&subscriber->lock); > +} > + > static struct tipc_subscriber *tipc_subscrb_create(int conid) > { > struct tipc_subscriber *subscriber; > @@ -177,8 +220,8 @@ static struct tipc_subscriber *tipc_subscrb_create(int > conid) > pr_warn("Subscriber rejected, no memory\n"); > return NULL; > } > - kref_init(&subscriber->kref); > INIT_LIST_HEAD(&subscriber->subscrp_list); > + kref_init(&subscriber->kref); > subscriber->conid = conid; > spin_lock_init(&subscriber->lock); > > @@ -187,55 +230,21 @@ static struct tipc_subscriber *tipc_subscrb_create(int > conid) > > static void tipc_subscrb_delete(struct tipc_subscriber *subscriber) > { > - struct tipc_subscription *sub, *temp; > - u32 timeout; > - > - spin_lock_bh(&subscriber->lock); > - /* Destroy any existing subscriptions for subscriber */ > - list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list, > - subscrp_list) { > - timeout = htohl(sub->evt.s.timeout, sub->swap); > - if ((timeout == TIPC_WAIT_FOREVER) || del_timer(&sub->timer)) { > - tipc_subscrp_delete(sub); > - tipc_subscrb_put(subscriber); > - } > - } > - spin_unlock_bh(&subscriber->lock); > - > + tipc_subscrb_subscrp_delete(subscriber, NULL); > tipc_subscrb_put(subscriber); > } > > static void tipc_subscrp_delete(struct tipc_subscription *sub) > { > - struct tipc_net *tn = net_generic(sub->net, tipc_net_id); > - > - tipc_nametbl_unsubscribe(sub); > - list_del(&sub->subscrp_list); > - kfree(sub); > - atomic_dec(&tn->subscription_count); > + if (del_timer(&sub->timer)) > + tipc_subscrp_put(sub); > + tipc_subscrp_put(sub); > } > > static void tipc_subscrp_cancel(struct tipc_subscr *s, > struct tipc_subscriber *subscriber) > { > - struct tipc_subscription *sub, *temp; > - u32 timeout; > - > - spin_lock_bh(&subscriber->lock); > - /* Find first matching subscription, exit if not found */ > - list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list, > - subscrp_list) { > - if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) { > - timeout = htohl(sub->evt.s.timeout, sub->swap); > - if ((timeout == TIPC_WAIT_FOREVER) || > - del_timer(&sub->timer)) { > - tipc_subscrp_delete(sub); > - tipc_subscrb_put(subscriber); > - } > - break; > - } > - } > - spin_unlock_bh(&subscriber->lock); > + tipc_subscrb_subscrp_delete(subscriber, s); > } > > static struct tipc_subscription *tipc_subscrp_create(struct net *net, > @@ -272,6 +281,7 @@ static struct tipc_subscription > *tipc_subscrp_create(struct net *net, > sub->swap = swap; > memcpy(&sub->evt.s, s, sizeof(*s)); > atomic_inc(&tn->subscription_count); > + kref_init(&sub->kref); > return sub; > } > > @@ -288,17 +298,17 @@ static void tipc_subscrp_subscribe(struct net *net, > struct tipc_subscr *s, > > spin_lock_bh(&subscriber->lock); > list_add(&sub->subscrp_list, &subscriber->subscrp_list); > - tipc_subscrb_get(subscriber); > sub->subscriber = subscriber; > tipc_nametbl_subscribe(sub); > + tipc_subscrb_get(subscriber); > spin_unlock_bh(&subscriber->lock); > > + tipc_subscrp_get(sub); > + setup_timer(&sub->timer, tipc_subscrp_timeout, (unsigned long)sub); > timeout = htohl(sub->evt.s.timeout, swap); > - if (timeout == TIPC_WAIT_FOREVER) > - return; > > - setup_timer(&sub->timer, tipc_subscrp_timeout, (unsigned long)sub); > - mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout)); > + if (timeout != TIPC_WAIT_FOREVER) > + mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout)); > } > > /* Handle one termination request for the subscriber */ > diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h > index be60103082c9..ffdc214c117a 100644 > --- a/net/tipc/subscr.h > +++ b/net/tipc/subscr.h > @@ -57,6 +57,7 @@ struct tipc_subscriber; > * @evt: template for events generated by subscription > */ > struct tipc_subscription { > + struct kref kref; > struct tipc_subscriber *subscriber; > struct net *net; > struct timer_list timer; > ------------------------------------------------------------------------------ Check out the vibrant tech community on one of the world's most engaging tech sites, SlashDot.org! http://sdm.link/slashdot _______________________________________________ tipc-discussion mailing list tipc-discussion@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/tipc-discussion