Re: [tipc-discussion] [PATCH net v2 2/2] tipc: fix nametbl_lock soft lockup at module exit

2016-12-13 Thread Parthasarathy Bhuvaragan
On 12/12/2016 12:37 PM, Ying Xue wrote:
> Hi Parth,
>
> Sorry for late response.
>
> As I could not find your v3 version, I just give comments based on the
> version.
>
> On 11/22/2016 12:27 AM, Parthasarathy Bhuvaragan wrote:
>> Commit 333f796235a527 ("tipc: fix a race condition leading to
>> subscriber refcnt bug") reveals a soft lockup while acquiring
>> nametbl_lock.
>>
>> Before commit 333f796235a527, we call tipc_conn_shutdown() from
>> tipc_close_conn() in the context of tipc_topsrv_stop(). In that
>> context, we are allowed to grab the nametbl_lock.
>>
>> In commit 333f796235a527, i moved the tipc_conn_release (renamed from
>> tipc_conn_shutdown) to the connection refcount cleanup.
>
> Can you please confirm whether the soft lockup doesn't happen any more
> if we don't adjust the sequence of tipc_conn_release?
>
> If yes, I think we can propose other method to fix the issue solved by
> commit 333f796235a527, but in the new method it's unnecessary to adjust
> the order of tipc_conn_release.
>
> In fact the order of tipc_conn_shutdown, tipc_unregister_callbacks and
> tipc_conn_release is very important. When I wrote that code, I spent
> much time considering how to carefully close connection.
>
> In my opinion, the ideal order is still as belows:
>
> 1, Close connection;
> 2. Call tipc_unregister_callbacks to let sk->sk_user_data. As long as
> sk->sk_user_data is 0, no more data will be submitted to
> con->rwork/on->swork works.
> 3. Release socket.
Yes, with your proposed change the soft lockup reported by John will go 
away but it does not avoid the problem which is fixed by commit 
333f796235a527. As long as we yield and let the scheduler schedule a new 
work item, we will break the single threaded work queue assumption.

I tested with the proposed ideal order and does not fix the fault
fixed by commit 333f796235a527.

Thanks for the review. I will spend some more time to find a simpler
solution for both.

Regards
Partha
>
> Regards,
> Ying
>
>  This allows
>> either tipc_nametbl_withdraw() or tipc_topsrv_stop() to perform
>> tipc_sock_release().
>>
>> Since tipc_exit_net() first calls tipc_topsrv_stop() and then
>> tipc_nametble_withdraw() increases the chances for the later to
>> perform the connection cleanup.
>>
>> The soft lockup occurs in the call chain of tipc_nametbl_withdraw(),
>> when it performs the tipc_conn_kref_release() as it tries to grab
>> nametbl_lock again while holding it already.
>> tipc_nametbl_withdraw() grabs nametbl_lock
>>   tipc_nametbl_remove_publ()
>> tipc_subscrp_report_overlap()
>>   tipc_subscrp_send_event()
>> tipc_conn_sendmsg()
>>   << if (con->flags != CF_CONNECTED) we do conn_put(),
>>  triggering the cleanup as refcount=0. >>
>>   tipc_conn_kref_release
>> tipc_sock_release
>>   tipc_conn_release
>> tipc_subscrb_delete
>>   tipc_subscrp_delete
>> tipc_nametbl_unsubscribe << Soft Lockup >>
>>
>> Until now, tipc_server_stop() grabs and releases the idr_lock twice
>> for every connection. Once to find the connection and second to unset
>> connection flag, remove it from list and decrement the refcount.
>> The above lockup occurs when tipc_nametbl_withdraw() grabs the
>> connection in between the two, thus owning the connection
>> and triggering the cleanup while decrementing the refcount later.
>>
>> In this commit, we perform:
>> - call tipc_nametbl_withdraw() before tipc_topsrv_stop() to avoid:
>>   1. soft lockup
>>   2. its too late to actually notify the subscribers, as the topology
>>  server might already have started shutting down.
>> - In tipc_server_stop(), we remove all the connections from connection
>> list in the scope of the idr_lock to prevent any other thread finding
>> a connection which has unset CF_CONNECTED in its flags.
>>
>> Fixes: 333f796235a52727 ("tipc: fix a race condition leading to
>> subscriber refcnt bug")
>> Signed-off-by: Parthasarathy Bhuvaragan
>> 
>> ---
>> v2: commit message update.
>> cleanup tipc_server_stop() as per Ying Xue.
>> ---
>>  net/tipc/core.c   |  4 
>>  net/tipc/net.c|  2 --
>>  net/tipc/server.c | 13 -
>>  3 files changed, 12 insertions(+), 7 deletions(-)
>>
>> diff --git a/net/tipc/core.c b/net/tipc/core.c
>> index 236b043a4156..3ea1452e2f06 100644
>> --- a/net/tipc/core.c
>> +++ b/net/tipc/core.c
>> @@ -93,6 +93,10 @@ static int __net_init tipc_init_net(struct net *net)
>>
>>  static void __net_exit tipc_exit_net(struct net *net)
>>  {
>> +struct tipc_net *tn = net_generic(net, tipc_net_id);
>> +
>> +tipc_nametbl_withdraw(net, TIPC_CFG_SRV, tn->own_addr, 0,
>> +  tn->own_addr);
>>  tipc_topsrv_stop(net);
>>  tipc_net_stop(net);
>>  tipc_bcast_stop(net);
>> diff --git a/net/tipc/net.c b/net/tipc/net.c
>> index 28bf4feeb81c..92696cc6e763 100644
>> --- a/net/tipc/net.c
>> +++ b/net/tipc/net.c
>> @@ -130,8 +130,6 @@ void tipc_net_st

Re: [tipc-discussion] [net-next v3 3/3] tipc: reduce risk of user starvation during link congestion

2016-12-13 Thread Jon Maloy


> -Original Message-
> From: Ying Xue [mailto:ying@windriver.com]
> Sent: Tuesday, 13 December, 2016 07:39
> To: Jon Maloy ; tipc-discussion@lists.sourceforge.net;
> Parthasarathy Bhuvaragan 
> Cc: ma...@donjonn.com; thompa@gmail.com
> Subject: Re: [net-next v3 3/3] tipc: reduce risk of user starvation during 
> link
> congestion
> 
> On 12/13/2016 06:42 AM, Jon Maloy wrote:
> >  void link_prepare_wakeup(struct tipc_link *l)
> >  {
> > -   int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
> > -   int imp, lim;
> > struct sk_buff *skb, *tmp;
> > +   int imp, i = 0;
> >
> > skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
> > imp = TIPC_SKB_CB(skb)->chain_imp;
> > -   lim = l->backlog[imp].limit;
> > -   pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
> > -   if ((pnd[imp] + l->backlog[imp].len) >= lim)
> > +   if (l->backlog[imp].len < l->backlog[imp].limit) {
> > +   skb_unlink(skb, &l->wakeupq);
> > +   skb_queue_tail(l->inputq, skb);
> > +   } else if (i++ > 10) {
> 
> About wakeup skb number, probably we can make it smarter, for example,
> its value can be decided by link window size and the size of available
> backlog queue or something else. As the value is an important factor for
> us, I suggest it's worth considering more.

Sure we can make it smarter, but I don't see why you see this figure as 
important. This is just an "emergency brake" in case the wakeup queue is very 
long.
If we have failed to find a user to wake up more than ten times it is very 
likely that all relevant levels are congested, and it is meaningless and a 
waste of CPU to continue. Note that the iterator is NOT stepped when we find a 
user to wake up.

Regards
///jon


> 
> Regards,
> Ying
> 
> 
> > break;
> > -   skb_unlink(skb, &l->wakeupq);
> > -   skb_queue_tail(l->inputq, skb);
> > +   }
> > }
> >  }


--
Check out the vibrant tech community on one of the world's most 
engaging tech sites, SlashDot.org! http://sdm.link/slashdot
___
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion


Re: [tipc-discussion] [net-next v3 2/3] tipc: modify struct tipc_plist to be more versatile

2016-12-13 Thread Jon Maloy


> -Original Message-
> From: Ying Xue [mailto:ying@windriver.com]
> Sent: Tuesday, 13 December, 2016 06:04
> To: Jon Maloy ; tipc-discussion@lists.sourceforge.net;
> Parthasarathy Bhuvaragan 
> Cc: ma...@donjonn.com; thompa@gmail.com
> Subject: Re: [net-next v3 2/3] tipc: modify struct tipc_plist to be more 
> versatile
> 
> On 12/13/2016 06:42 AM, Jon Maloy wrote:
> > During multicast reception we currently use a simple linked list with
> > push/pop semantics to store port numbers.
> >
> > We now see a need for a more generic list for storing values of type
> > u32. We therefore make some modifications to this list, while replacing
> > the prefix 'tipc_plist_' with 'u32_'.
> 
> It's a shame that we cannot use interfaces defined in lib/plist.c,
> otherwise, it's unnecessary for us to implement by ourselves.
> 
> I still prefer to use "tipc_u32" prefix because the function names are a
> bit too generic without "tipc". Especially when we analyze stack trace,
> common function names cause a bit inconvenience for us.

That is what I used first, but some code lines became awkwardly long, so I had 
to split if-clauses and function calls over two lines, something I generally 
try to avoid.
Also, remember that this is an internal function that is only called from other 
functions having the "tipc_" prefix, so you will never be in in doubt where the 
problem is if you see a stack dump.

///jon

> 
> Regards,
> Ying
> 
>   We also add a couple of new
> > functions which will come to use in the next commits.
> >
> > Signed-off-by: Jon Maloy 
> > ---
> >  net/tipc/name_table.c | 100 ---
> ---
> >  net/tipc/name_table.h |  21 ---
> >  net/tipc/socket.c |   8 ++--
> >  3 files changed, 83 insertions(+), 46 deletions(-)
> >
> > diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
> > index e190460..5a86df1 100644
> > --- a/net/tipc/name_table.c
> > +++ b/net/tipc/name_table.c
> > @@ -608,7 +608,7 @@ u32 tipc_nametbl_translate(struct net *net, u32 type,
> u32 instance,
> >   * Returns non-zero if any off-node ports overlap
> >   */
> >  int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32
> upper,
> > - u32 limit, struct tipc_plist *dports)
> > + u32 limit, struct list_head *dports)
> >  {
> > struct name_seq *seq;
> > struct sub_seq *sseq;
> > @@ -633,7 +633,7 @@ int tipc_nametbl_mc_translate(struct net *net, u32
> type, u32 lower, u32 upper,
> > info = sseq->info;
> > list_for_each_entry(publ, &info->node_list, node_list) {
> > if (publ->scope <= limit)
> > -   tipc_plist_push(dports, publ->ref);
> > +   u32_push(dports, publ->ref);
> > }
> >
> > if (info->cluster_list_size != info->node_list_size)
> > @@ -1022,40 +1022,84 @@ int tipc_nl_name_table_dump(struct sk_buff *skb,
> struct netlink_callback *cb)
> > return skb->len;
> >  }
> >
> > -void tipc_plist_push(struct tipc_plist *pl, u32 port)
> > +struct u32_item {
> > +   struct list_head list;
> > +   u32 value;
> > +};
> > +
> > +bool u32_find(struct list_head *l, u32 value)
> >  {
> > -   struct tipc_plist *nl;
> > +   struct u32_item *item;
> >
> > -   if (likely(!pl->port)) {
> > -   pl->port = port;
> > -   return;
> > +   list_for_each_entry(item, l, list) {
> > +   if (item->value == value)
> > +   return true;
> > }
> > -   if (pl->port == port)
> > -   return;
> > -   list_for_each_entry(nl, &pl->list, list) {
> > -   if (nl->port == port)
> > -   return;
> > +   return false;
> > +}
> > +
> > +bool u32_push(struct list_head *l, u32 value)
> > +{
> > +   struct u32_item *item;
> > +
> > +   list_for_each_entry(item, l, list) {
> > +   if (item->value == value)
> > +   return false;
> > +   }
> > +   item = kmalloc(sizeof(*item), GFP_ATOMIC);
> > +   if (unlikely(!item))
> > +   return false;
> > +
> > +   item->value = value;
> > +   list_add(&item->list, l);
> > +   return true;
> > +}
> > +
> > +u32 u32_pop(struct list_head *l)
> > +{
> > +   struct u32_item *item;
> > +   u32 value = 0;
> > +
> > +   if (list_empty(l))
> > +   return 0;
> > +   item = list_first_entry(l, typeof(*item), list);
> > +   value = item->value;
> > +   list_del(&item->list);
> > +   kfree(item);
> > +   return value;
> > +}
> > +
> > +bool u32_del(struct list_head *l, u32 value)
> > +{
> > +   struct u32_item *item, *tmp;
> > +
> > +   list_for_each_entry_safe(item, tmp, l, list) {
> > +   if (item->value != value)
> > +   continue;
> > +   list_del(&item->list);
> > +   kfree(item);
> > +   return true;
> > }
> > -   nl = kmalloc(sizeof(*nl), GFP_ATOMIC);
> > -   if (nl) {
> > -   nl->port = port;
> > -   list_add(&nl->list, &p

Re: [tipc-discussion] [net-next v3 3/3] tipc: reduce risk of user starvation during link congestion

2016-12-13 Thread Ying Xue
On 12/13/2016 06:42 AM, Jon Maloy wrote:
>  void link_prepare_wakeup(struct tipc_link *l)
>  {
> - int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
> - int imp, lim;
>   struct sk_buff *skb, *tmp;
> + int imp, i = 0;
>
>   skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
>   imp = TIPC_SKB_CB(skb)->chain_imp;
> - lim = l->backlog[imp].limit;
> - pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
> - if ((pnd[imp] + l->backlog[imp].len) >= lim)
> + if (l->backlog[imp].len < l->backlog[imp].limit) {
> + skb_unlink(skb, &l->wakeupq);
> + skb_queue_tail(l->inputq, skb);
> + } else if (i++ > 10) {

About wakeup skb number, probably we can make it smarter, for example, 
its value can be decided by link window size and the size of available 
backlog queue or something else. As the value is an important factor for 
us, I suggest it's worth considering more.

Regards,
Ying


>   break;
> - skb_unlink(skb, &l->wakeupq);
> - skb_queue_tail(l->inputq, skb);
> + }
>   }
>  }


--
Check out the vibrant tech community on one of the world's most 
engaging tech sites, SlashDot.org! http://sdm.link/slashdot
___
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion


Re: [tipc-discussion] [net-next v3 2/3] tipc: modify struct tipc_plist to be more versatile

2016-12-13 Thread Ying Xue
On 12/13/2016 06:42 AM, Jon Maloy wrote:
> During multicast reception we currently use a simple linked list with
> push/pop semantics to store port numbers.
>
> We now see a need for a more generic list for storing values of type
> u32. We therefore make some modifications to this list, while replacing
> the prefix 'tipc_plist_' with 'u32_'.

It's a shame that we cannot use interfaces defined in lib/plist.c, 
otherwise, it's unnecessary for us to implement by ourselves.

I still prefer to use "tipc_u32" prefix because the function names are a 
bit too generic without "tipc". Especially when we analyze stack trace, 
common function names cause a bit inconvenience for us.

Regards,
Ying

  We also add a couple of new
> functions which will come to use in the next commits.
>
> Signed-off-by: Jon Maloy 
> ---
>  net/tipc/name_table.c | 100 
> --
>  net/tipc/name_table.h |  21 ---
>  net/tipc/socket.c |   8 ++--
>  3 files changed, 83 insertions(+), 46 deletions(-)
>
> diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
> index e190460..5a86df1 100644
> --- a/net/tipc/name_table.c
> +++ b/net/tipc/name_table.c
> @@ -608,7 +608,7 @@ u32 tipc_nametbl_translate(struct net *net, u32 type, u32 
> instance,
>   * Returns non-zero if any off-node ports overlap
>   */
>  int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 
> upper,
> -   u32 limit, struct tipc_plist *dports)
> +   u32 limit, struct list_head *dports)
>  {
>   struct name_seq *seq;
>   struct sub_seq *sseq;
> @@ -633,7 +633,7 @@ int tipc_nametbl_mc_translate(struct net *net, u32 type, 
> u32 lower, u32 upper,
>   info = sseq->info;
>   list_for_each_entry(publ, &info->node_list, node_list) {
>   if (publ->scope <= limit)
> - tipc_plist_push(dports, publ->ref);
> + u32_push(dports, publ->ref);
>   }
>
>   if (info->cluster_list_size != info->node_list_size)
> @@ -1022,40 +1022,84 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, 
> struct netlink_callback *cb)
>   return skb->len;
>  }
>
> -void tipc_plist_push(struct tipc_plist *pl, u32 port)
> +struct u32_item {
> + struct list_head list;
> + u32 value;
> +};
> +
> +bool u32_find(struct list_head *l, u32 value)
>  {
> - struct tipc_plist *nl;
> + struct u32_item *item;
>
> - if (likely(!pl->port)) {
> - pl->port = port;
> - return;
> + list_for_each_entry(item, l, list) {
> + if (item->value == value)
> + return true;
>   }
> - if (pl->port == port)
> - return;
> - list_for_each_entry(nl, &pl->list, list) {
> - if (nl->port == port)
> - return;
> + return false;
> +}
> +
> +bool u32_push(struct list_head *l, u32 value)
> +{
> + struct u32_item *item;
> +
> + list_for_each_entry(item, l, list) {
> + if (item->value == value)
> + return false;
> + }
> + item = kmalloc(sizeof(*item), GFP_ATOMIC);
> + if (unlikely(!item))
> + return false;
> +
> + item->value = value;
> + list_add(&item->list, l);
> + return true;
> +}
> +
> +u32 u32_pop(struct list_head *l)
> +{
> + struct u32_item *item;
> + u32 value = 0;
> +
> + if (list_empty(l))
> + return 0;
> + item = list_first_entry(l, typeof(*item), list);
> + value = item->value;
> + list_del(&item->list);
> + kfree(item);
> + return value;
> +}
> +
> +bool u32_del(struct list_head *l, u32 value)
> +{
> + struct u32_item *item, *tmp;
> +
> + list_for_each_entry_safe(item, tmp, l, list) {
> + if (item->value != value)
> + continue;
> + list_del(&item->list);
> + kfree(item);
> + return true;
>   }
> - nl = kmalloc(sizeof(*nl), GFP_ATOMIC);
> - if (nl) {
> - nl->port = port;
> - list_add(&nl->list, &pl->list);
> + return false;
> +}
> +
> +void u32_list_purge(struct list_head *l)
> +{
> + struct u32_item *item, *tmp;
> +
> + list_for_each_entry_safe(item, tmp, l, list) {
> + list_del(&item->list);
> + kfree(item);
>   }
>  }
>
> -u32 tipc_plist_pop(struct tipc_plist *pl)
> +int u32_list_len(struct list_head *l)
>  {
> - struct tipc_plist *nl;
> - u32 port = 0;
> + struct u32_item *item;
> + int i = 0;
>
> - if (likely(list_empty(&pl->list))) {
> - port = pl->port;
> - pl->port = 0;
> - return port;
> + list_for_each_entry(item, l, list) {
> + i++;
>   }
> - nl = list_first_entry(&pl->list, typeof(*nl), list);
> - port = nl->port;
> - list_del(&nl->list);
> - kfree(nl);
> - return port;
> + ret

Re: [tipc-discussion] [net-next v3 1/3] tipc: unify tipc_wait_for_sndpkt() and tipc_wait_for_sndmsg() functions

2016-12-13 Thread Ying Xue
On 12/13/2016 06:42 AM, Jon Maloy wrote:
> The functions tipc_wait_for_sndpkt() and tipc_wait_for_sndmsg() are very
> similar. The latter function is also called from two locations, and
> there will be more in the coming commits, which will all need to test on
> different conditions.
>
> Instead of making yet another duplicates of the function, we now
> introduce a new macro tipc_wait_for_cond() where the wakeup condition
> can be stated as an argument to the call. This macro replaces all
> current and future uses of the two functions, which can now be
> eliminated.
>
> Signed-off-by: Jon Maloy 

Acked-by: Ying Xue 

> ---
>  net/tipc/socket.c | 108 
> +-
>  1 file changed, 49 insertions(+), 59 deletions(-)
>
> diff --git a/net/tipc/socket.c b/net/tipc/socket.c
> index 333c5da..8f3ab08 100644
> --- a/net/tipc/socket.c
> +++ b/net/tipc/socket.c
> @@ -110,7 +110,6 @@ static void tipc_write_space(struct sock *sk);
>  static void tipc_sock_destruct(struct sock *sk);
>  static int tipc_release(struct socket *sock);
>  static int tipc_accept(struct socket *sock, struct socket *new_sock, int 
> flags);
> -static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
>  static void tipc_sk_timeout(unsigned long data);
>  static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
>  struct tipc_name_seq const *seq);
> @@ -334,6 +333,49 @@ static int tipc_set_sk_state(struct sock *sk, int state)
>   return res;
>  }
>
> +static int tipc_sk_sock_err(struct socket *sock, long *timeout)
> +{
> + struct sock *sk = sock->sk;
> + int err = sock_error(sk);
> + int typ = sock->type;
> +
> + if (err)
> + return err;
> + if (typ == SOCK_STREAM || typ == SOCK_SEQPACKET) {
> + if (sk->sk_state == TIPC_DISCONNECTING)
> + return -EPIPE;
> + else if (!tipc_sk_connected(sk))
> + return -ENOTCONN;
> + }
> + if (!*timeout)
> + return -EAGAIN;
> + if (signal_pending(current))
> + return sock_intr_errno(*timeout);
> +
> + return 0;
> +}
> +
> +#define tipc_wait_for_cond(sock_, timeout_, condition_)  
> \
> +({   \
> + int rc_ = 0;\
> + int done_ = 0;  \
> + \
> + while (!(condition_) && !done_) {   \
> + struct sock *sk_ = sock->sk;\
> + DEFINE_WAIT_FUNC(wait_, woken_wake_function);   \
> + \
> + rc_ = tipc_sk_sock_err(sock_, timeout_);\
> + if (rc_)\
> + break;  \
> + prepare_to_wait(sk_sleep(sk_), &wait_,  \
> + TASK_INTERRUPTIBLE);\
> + done_ = sk_wait_event(sk_, timeout_,\
> +   (condition_), &wait_);\
> + remove_wait_queue(sk_sleep(sk), &wait_);\
> + }   \
> + rc_;\
> +})
> +
>  /**
>   * tipc_sk_create - create a TIPC socket
>   * @net: network namespace (must be default network)
> @@ -719,7 +761,7 @@ static int tipc_sendmcast(struct  socket *sock, struct 
> tipc_name_seq *seq,
>
>   if (rc == -ELINKCONG) {
>   tsk->link_cong = 1;
> - rc = tipc_wait_for_sndmsg(sock, &timeo);
> + rc = tipc_wait_for_cond(sock, &timeo, !tsk->link_cong);
>   if (!rc)
>   continue;
>   }
> @@ -828,31 +870,6 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, 
> struct sk_buff *skb,
>   kfree_skb(skb);
>  }
>
> -static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
> -{
> - DEFINE_WAIT_FUNC(wait, woken_wake_function);
> - struct sock *sk = sock->sk;
> - struct tipc_sock *tsk = tipc_sk(sk);
> - int done;
> -
> - do {
> - int err = sock_error(sk);
> - if (err)
> - return err;
> - if (sk->sk_shutdown & SEND_SHUTDOWN)
> - return -EPIPE;
> - if (!*timeo_p)
> - return -EAGAIN;
> - if (signal_pending(current))
> - return sock_intr_errno(*timeo_p);
> -
> - add_wait_queue(sk_sleep(sk), &wait);
> - done = sk_wait_event(sk, timeo_p, !tsk->link_cong, &