On 01/18/2017 01:07 PM, Parthasarathy Bhuvaragan wrote:
> On 01/18/2017 11:06 AM, Ying Xue wrote:
>> On 01/16/2017 11:41 PM, Parthasarathy Bhuvaragan wrote:
>>> Until now, the subscribers keep track of the subscriptions using
>>> reference count at subscriber level. At subscription cancel or
>>> subscriber delete, we delete the subscription by checking for
>>> pending timer using del_timer(). del_timer() is not SMP safe, if
>>> on CPU0 the check for pending timer returns true but CPU1 might
>>> schedule the timer callback thereby deleting the subscription.
>>> Thus when CPU0 is scheduled, it deletes an invalid subscription.
>>>
>>> We can fix the above issue using either a synchronous timer function
>>> or by introducing subscription reference count. The later solution
>>> is preferred as it's consistent with the asynchronous design model
>>> used by the rest of the code.
>>>
>>> In this commit, we introduce subscription refcount to avoid deleting
>>> an invalid subscription.
>>>
>>> Signed-off-by: Parthasarathy Bhuvaragan
>>> <[email protected]>
>>> ---
>>> v2: Address comments from Ying Xue, to avoid race conditions by using
>>> refcount only instead of del_timer_sync().
>>> ---
>>> net/tipc/subscr.c | 120
>>> +++++++++++++++++++++++++++++-------------------------
>>> net/tipc/subscr.h | 1 +
>>> 2 files changed, 66 insertions(+), 55 deletions(-)
>>>
>>> diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
>>> index 0dd02244e21d..d6c1f35f6518 100644
>>> --- a/net/tipc/subscr.c
>>> +++ b/net/tipc/subscr.c
>>> @@ -54,6 +54,7 @@ struct tipc_subscriber {
>>>
>>> static void tipc_subscrp_delete(struct tipc_subscription *sub);
>>> static void tipc_subscrb_put(struct tipc_subscriber *subscriber);
>>> +static void tipc_subscrp_put(struct tipc_subscription *subscription);
>>>
>>> /**
>>> * htohl - convert value to endianness used by destination
>>> @@ -137,25 +138,17 @@ void tipc_subscrp_report_overlap(struct
>>> tipc_subscription *sub, u32 found_lower,
>>> static void tipc_subscrp_timeout(unsigned long data)
>>> {
>>> struct tipc_subscription *sub = (struct tipc_subscription *)data;
>>> - struct tipc_subscriber *subscriber = sub->subscriber;
>>>
>>> /* Notify subscriber of timeout */
>>> tipc_subscrp_send_event(sub, sub->evt.s.seq.lower,
>>> sub->evt.s.seq.upper,
>>> TIPC_SUBSCR_TIMEOUT, 0, 0);
>>>
>>> - spin_lock_bh(&subscriber->lock);
>>> - tipc_subscrp_delete(sub);
>>> - spin_unlock_bh(&subscriber->lock);
>>> -
>>> - tipc_subscrb_put(subscriber);
>>> + tipc_subscrp_put(sub);
>>> }
>>>
>>> static void tipc_subscrb_kref_release(struct kref *kref)
>>> {
>>> - struct tipc_subscriber *subcriber = container_of(kref,
>>> - struct tipc_subscriber, kref);
>>> -
>>> - kfree(subcriber);
>>> + kfree(container_of(kref,struct tipc_subscriber, kref));
>>> }
>>>
>>> static void tipc_subscrb_put(struct tipc_subscriber *subscriber)
>>> @@ -168,6 +161,56 @@ static void tipc_subscrb_get(struct
>>> tipc_subscriber *subscriber)
>>> kref_get(&subscriber->kref);
>>> }
>>>
>>> +static void tipc_subscrp_kref_release(struct kref *kref)
>>> +{
>>> + struct tipc_subscription *sub = container_of(kref,
>>> + struct tipc_subscription,
>>> + kref);
>>> + struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
>>> + struct tipc_subscriber *subscriber = sub->subscriber;
>>> +
>>> + spin_lock_bh(&subscriber->lock);
>>> + tipc_nametbl_unsubscribe(sub);
>>> + list_del(&sub->subscrp_list);
>>> + atomic_dec(&tn->subscription_count);
>>> + spin_unlock_bh(&subscriber->lock);
>>> + kfree(sub);
>>> + tipc_subscrb_put(subscriber);
>>> +}
>>> +
>>> +static void tipc_subscrp_put(struct tipc_subscription *subscription)
>>> +{
>>> + kref_put(&subscription->kref, tipc_subscrp_kref_release);
>>> +}
>>> +
>>> +static void tipc_subscrp_get(struct tipc_subscription *subscription)
>>> +{
>>> + kref_get(&subscription->kref);
>>> +}
>>> +
>>> +/* tipc_subscrb_subscrp_delete - delete a specific subscription or all
>>> + * subscriptions for a given subscriber.
>>> + */
>>> +static void tipc_subscrb_subscrp_delete(struct tipc_subscriber
>>> *subscriber,
>>> + struct tipc_subscr *s)
>>> +{
>>> + struct tipc_subscription *sub, *temp;
>>> +
>>> + spin_lock_bh(&subscriber->lock);
>>> + list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
>>> + subscrp_list) {
>>> + spin_unlock_bh(&subscriber->lock);
>>> + if (s && !memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) {
>>> + tipc_subscrp_delete(sub);
>>> + return;
>>> + } else {
>>> + tipc_subscrp_delete(sub);
>>> + }
>>
>> If we change above code as below, it seems more simple:
>>
>> list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
>> subscrp_list) {
>> spin_unlock_bh(&subscriber->lock);
>> tipc_subscrp_delete(sub);
> Ying,
> In the case of subscription cancel request, I need to find the correct
> subscription and delete only that subscription. If we use the above
> version, we will end up deleting all previous subscriptions in the list
> for the given subscriber.
Ying,
My patch was wrong and there itself I delete all prior subscriptions
for a given subscriber until the specific subscription.
I will spin a new version with the fix.
/Partha
>
> Thanks for the comments and feedback.
>
> I found the fix for John's issue and will send a new version soon.
>
> /Partha
>> if (s && !memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr)))
>> return;
>> spin_lock_bh(&subscriber->lock);
>> }
>>
>>
>> Besides the commit,this version is exactly what I expect. It's very good!
>>
>> Thanks for the nice job!
>>
>> Please add my ack-by flag to it.
>>
>> Regards,
>> Ying
>>
>>> + spin_lock_bh(&subscriber->lock);
>>> + }
>>> + spin_unlock_bh(&subscriber->lock);
>>> +}
>>> +
>>> static struct tipc_subscriber *tipc_subscrb_create(int conid)
>>> {
>>> struct tipc_subscriber *subscriber;
>>> @@ -177,8 +220,8 @@ static struct tipc_subscriber
>>> *tipc_subscrb_create(int conid)
>>> pr_warn("Subscriber rejected, no memory\n");
>>> return NULL;
>>> }
>>> - kref_init(&subscriber->kref);
>>> INIT_LIST_HEAD(&subscriber->subscrp_list);
>>> + kref_init(&subscriber->kref);
>>> subscriber->conid = conid;
>>> spin_lock_init(&subscriber->lock);
>>>
>>> @@ -187,55 +230,21 @@ static struct tipc_subscriber
>>> *tipc_subscrb_create(int conid)
>>>
>>> static void tipc_subscrb_delete(struct tipc_subscriber *subscriber)
>>> {
>>> - struct tipc_subscription *sub, *temp;
>>> - u32 timeout;
>>> -
>>> - spin_lock_bh(&subscriber->lock);
>>> - /* Destroy any existing subscriptions for subscriber */
>>> - list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
>>> - subscrp_list) {
>>> - timeout = htohl(sub->evt.s.timeout, sub->swap);
>>> - if ((timeout == TIPC_WAIT_FOREVER) || del_timer(&sub->timer)) {
>>> - tipc_subscrp_delete(sub);
>>> - tipc_subscrb_put(subscriber);
>>> - }
>>> - }
>>> - spin_unlock_bh(&subscriber->lock);
>>> -
>>> + tipc_subscrb_subscrp_delete(subscriber, NULL);
>>> tipc_subscrb_put(subscriber);
>>> }
>>>
>>> static void tipc_subscrp_delete(struct tipc_subscription *sub)
>>> {
>>> - struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
>>> -
>>> - tipc_nametbl_unsubscribe(sub);
>>> - list_del(&sub->subscrp_list);
>>> - kfree(sub);
>>> - atomic_dec(&tn->subscription_count);
>>> + if (del_timer(&sub->timer))
>>> + tipc_subscrp_put(sub);
>>> + tipc_subscrp_put(sub);
>>> }
>>>
>>> static void tipc_subscrp_cancel(struct tipc_subscr *s,
>>> struct tipc_subscriber *subscriber)
>>> {
>>> - struct tipc_subscription *sub, *temp;
>>> - u32 timeout;
>>> -
>>> - spin_lock_bh(&subscriber->lock);
>>> - /* Find first matching subscription, exit if not found */
>>> - list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
>>> - subscrp_list) {
>>> - if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) {
>>> - timeout = htohl(sub->evt.s.timeout, sub->swap);
>>> - if ((timeout == TIPC_WAIT_FOREVER) ||
>>> - del_timer(&sub->timer)) {
>>> - tipc_subscrp_delete(sub);
>>> - tipc_subscrb_put(subscriber);
>>> - }
>>> - break;
>>> - }
>>> - }
>>> - spin_unlock_bh(&subscriber->lock);
>>> + tipc_subscrb_subscrp_delete(subscriber, s);
>>> }
>>>
>>> static struct tipc_subscription *tipc_subscrp_create(struct net *net,
>>> @@ -272,6 +281,7 @@ static struct tipc_subscription
>>> *tipc_subscrp_create(struct net *net,
>>> sub->swap = swap;
>>> memcpy(&sub->evt.s, s, sizeof(*s));
>>> atomic_inc(&tn->subscription_count);
>>> + kref_init(&sub->kref);
>>> return sub;
>>> }
>>>
>>> @@ -288,17 +298,17 @@ static void tipc_subscrp_subscribe(struct net
>>> *net, struct tipc_subscr *s,
>>>
>>> spin_lock_bh(&subscriber->lock);
>>> list_add(&sub->subscrp_list, &subscriber->subscrp_list);
>>> - tipc_subscrb_get(subscriber);
>>> sub->subscriber = subscriber;
>>> tipc_nametbl_subscribe(sub);
>>> + tipc_subscrb_get(subscriber);
>>> spin_unlock_bh(&subscriber->lock);
>>>
>>> + tipc_subscrp_get(sub);
>>> + setup_timer(&sub->timer, tipc_subscrp_timeout, (unsigned long)sub);
>>> timeout = htohl(sub->evt.s.timeout, swap);
>>> - if (timeout == TIPC_WAIT_FOREVER)
>>> - return;
>>>
>>> - setup_timer(&sub->timer, tipc_subscrp_timeout, (unsigned long)sub);
>>> - mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout));
>>> + if (timeout != TIPC_WAIT_FOREVER)
>>> + mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout));
>>> }
>>>
>>> /* Handle one termination request for the subscriber */
>>> diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
>>> index be60103082c9..ffdc214c117a 100644
>>> --- a/net/tipc/subscr.h
>>> +++ b/net/tipc/subscr.h
>>> @@ -57,6 +57,7 @@ struct tipc_subscriber;
>>> * @evt: template for events generated by subscription
>>> */
>>> struct tipc_subscription {
>>> + struct kref kref;
>>> struct tipc_subscriber *subscriber;
>>> struct net *net;
>>> struct timer_list timer;
>>>
>>
------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, SlashDot.org! http://sdm.link/slashdot
_______________________________________________
tipc-discussion mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/tipc-discussion