Jon, Erik see my responses below:

On 11/21/2016 05:51 PM, Erik Hugne wrote:
> Have not read the patch, nor the tipc code for a quite some time. So
> this may be invalid..
> But as i remeber, wakeups are done for are done unconditionally for all
> senders waiting on the same link when this becomes unblocked.
>
Erik, before Jon's series we wakeup a user only if there is enough room 
on the link for that priority. Otherwise we iterate the wakeup queue and 
find the next.

> If you change the sk_wait_event wakeup condition to separate, per-prio
> wakeup flags (not belonging to any individual socket, but rather the
> destination link) then, when messages are drained, signal the highest
> prio first. and if there's space left in the link output buffer, toggle
> the next lower prio flag. and so on.
>
Part of the problem is that we have the queue limits mimicking Weighted 
Fair Queuing (WFQ), but we lack per priority queues. We just perform a 
rudimentary shaping and place all the rest in an unsorted queue.
Even if we implement FQ, we will have two set of priority queues:
1. When there is no congestion, per priority link transmit queue.
2. When there is congestion, per priority link backlog queue.

Then based on the weights assigned (being 1,2,3,4), we can schedule the 
packets from these queues.

But I beleive, traffic control qdisc does this pretty good but can do 
only on the transmitted packets on the interface.

// The way I test prioritizing tipc packets based on importance
tc filter add dev data0 parent 1:0 prio 10 protocol tipc u32 match u32 
0x4f000000 0xff000000 at 0 flowid 1:200

> Selecting which queue flag to wait on is decided by the last (current)
> packet importance level.
>
> Shouldn't that take care of the prio-race?
> Getting fairness within prio levels is a different story though..
>
> //E
>
>
> On Nov 21, 2016 4:20 PM, "Jon Maloy" <jon.ma...@ericsson.com
> <mailto:jon.ma...@ericsson.com>> wrote:
>
>     I am having some new doubts about our current link congestion
>     criteria. See below.
>
>     ///jon
>
>
>     > -----Original Message-----
>     > From: Jon Maloy [mailto:jon.ma...@ericsson.com
>     <mailto:jon.ma...@ericsson.com>]
>     > Sent: Monday, 21 November, 2016 09:57
>     > To: tipc-discussion@lists.sourceforge.net
>     <mailto:tipc-discussion@lists.sourceforge.net>; Parthasarathy Bhuvaragan
>     > <parthasarathy.bhuvara...@ericsson.com
>     <mailto:parthasarathy.bhuvara...@ericsson.com>>; Ying Xue
>     > <ying....@windriver.com <mailto:ying....@windriver.com>>; Jon
>     Maloy <jon.ma...@ericsson.com <mailto:jon.ma...@ericsson.com>>
>     > Cc: ma...@donjonn.com <mailto:ma...@donjonn.com>;
>     thompa....@gmail.com <mailto:thompa....@gmail.com>
>     > Subject: [PATCH net-next 3/3] tipc: reduce risk of user starvation
>     during link
>     > congestion
>     >
>     > The socket code currently handles link congestion by either blocking
>     > and trying to send again when the congestion has abated, or just
>     > returning to the user with -EAGAIN and let him re-try later.
>     >
>     > This mechanism is prone to starvation, because the wakeup algorithm is
>     > non-atomic. During the time the link issues a wakeup signal, until the
>     > socket wakes up and re-attempts sending, other senders may have come
>     > in between and occupied the free buffer space in the link. This in
>     turn
>     > may lead to a socket having to make many send attempts before it is
>     > successful. In extremely loaded systems we have observed latency times
>     > of several seconds before a low-priority socket is able to send out a
>     > message.
>     >
>     > In this commit, we simplify this mechanism and reduce the risk of the
>     > described scenario happening. When a message is sent to a congested
>     > link, we now let it keep the message in the wakeup-item that it has to
>     > create anyway, and immediately add it to the link's send queue when
>     > enough space has been freed up. Only when this is done do we issue a
>     > wakeup signal to the socket, which can now immediately go on and send
>     > the next message, if any.
>     >
>     > The fact that a socket now can consider a message sent even when the
>     > link returns a congestion code means that the sending socket code can
>     > be simplified. Also, since this is a good opportunity to get rid
>     of the
>     > obsolete 'mtu change' condition in the three socket send functions, we
>     > now choose to refactor those functions completely.
>     >
>     > Signed-off-by: Jon Maloy <jon.ma...@ericsson.com
>     <mailto:jon.ma...@ericsson.com>>
>     > ---
>     >  net/tipc/bcast.c  |   2 +-
>     >  net/tipc/link.c   |  90 ++++++++------
>     >  net/tipc/msg.h    |   8 +-
>     >  net/tipc/node.c   |   2 +-
>     >  net/tipc/socket.c | 346
>     ++++++++++++++++++++++++------------------------------
>     >  5 files changed, 209 insertions(+), 239 deletions(-)
>     >
>     > diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
>     > index aa1babb..1a56cab 100644
>     > --- a/net/tipc/bcast.c
>     > +++ b/net/tipc/bcast.c
>     > @@ -174,7 +174,7 @@ static void tipc_bcbase_xmit(struct net *net,
>     struct
>     > sk_buff_head *xmitq)
>     >   *                    and to identified node local sockets
>     >   * @net: the applicable net namespace
>     >   * @list: chain of buffers containing message
>     > - * Consumes the buffer chain, except when returning -ELINKCONG
>     > + * Consumes the buffer chain.
>     >   * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-
>     > EMSGSIZE
>     >   */
>     >  int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
>     > diff --git a/net/tipc/link.c b/net/tipc/link.c
>     > index 1055164..db3dcab 100644
>     > --- a/net/tipc/link.c
>     > +++ b/net/tipc/link.c
>     > @@ -774,60 +774,77 @@ int tipc_link_timeout(struct tipc_link *l,
>     struct
>     > sk_buff_head *xmitq)
>     >       return rc;
>     >  }
>     >
>     > +static bool tipc_link_congested(struct tipc_link *l, int imp)
>     > +{
>     > +     int i;
>     > +
>     > +     if (skb_queue_empty(&l->backlogq))
>     > +             return false;
>     > +
>     > +     /* Match msg importance against this and all higher backlog
>     limits: */
>     > +     for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
>     > +             if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
>     > +                     return true;
>     > +     }
>     > +     return false;
>     > +}
>
>     After introducing this patch I ran into a problem with spontaneous
>     and premature connect disconnects in the ptty test program.
>     After some trouble shooting I realized that with the new algorithm,
>     high priority shutdown messages sometimes bypass messages which are
>     reported "delivered" but still waiting in the wakeup pending queue.
>     This was easy to fix, by adding a potentially blocking
>     tipc_wait_for_cond() call in the __tipc_shutdown() call.
I understand the problem, but cannot follow your solution. Can you 
elaborate more on this? What if the socket is non blocking?
>
>     But I also realized that we may have (and always had) another
>     starvation scenario, which is not addressed with this patch.
>     Gergely's algorithm (above)  fixes that a socket may not be starved
>     by lower- or equal priority users, but this may still be caused by
>     higher-priority users. While a lower-prio message is waiting in the
>     wakeup queue, higher-prio users may drive their own levels into
>     congestion, thus potentially stopping the lower-prio user to send
>     its message for a long time.
>     After the current patch, wouldn't it be better to go back to the
>     algorithm I originally suggested, where we only check the sender's
>     own level, and nothing else.
>     At a given level, messages would now be delivered in a strict
>     first-come-first-served fashion, and no starvation can occur.
>
>     What do you think?
I agree that we should follow wire order within a given priority.

However, the fact that we allow a user to set TIPC_IMPORTANCE anytime 
using setsockopt(), will introduce out of order for stream socket with 
any scheme modifying wire order.

Is the purpose queue limits to decide link congestion criteria? Or are 
we trying to do more with it?

/Partha
>
>     ///jon
>
>     > +
>     >  /**
>     >   * link_schedule_user - schedule a message sender for wakeup
>     after congestion
>     > - * @link: congested link
>     > + * @l: congested link
>     >   * @list: message that was attempted sent
>     >   * Create pseudo msg to send back to user when congestion abates
>     > - * Does not consume buffer list
>     > + * Consumes buffer list
>     >   */
>     > -static int link_schedule_user(struct tipc_link *link, struct
>     sk_buff_head *list)
>     > +static int link_schedule_user(struct tipc_link *l, struct
>     sk_buff_head *list)
>     >  {
>     > -     struct tipc_msg *msg = buf_msg(skb_peek(list));
>     > -     int imp = msg_importance(msg);
>     > -     u32 oport = msg_origport(msg);
>     > -     u32 addr = tipc_own_addr(link->net);
>     > +     struct tipc_msg *hdr = buf_msg(skb_peek(list));
>     > +     int imp = msg_importance(hdr);
>     > +     u32 oport = msg_origport(hdr);
>     > +     u32 dnode = tipc_own_addr(l->net);
>     >       struct sk_buff *skb;
>     > +     struct sk_buff_head *pkts;
>     >
>     >       /* This really cannot happen...  */
>     >       if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
>     > -             pr_warn("%s<%s>, send queue full", link_rst_msg,
>     link->name);
>     > +             pr_warn("%s<%s>, send queue full", link_rst_msg,
>     l->name);
>     >               return -ENOBUFS;
>     >       }
>     > -     /* Non-blocking sender: */
>     > -     if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending)
>     > -             return -ELINKCONG;
>     >
>     >       /* Create and schedule wakeup pseudo message */
>     >       skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
>     > -                           addr, addr, oport, 0, 0);
>     > +                           dnode, l->addr, oport, 0, 0);
>     >       if (!skb)
>     >               return -ENOBUFS;
>     > -     TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list);
>     > -     TIPC_SKB_CB(skb)->chain_imp = imp;
>     > -     skb_queue_tail(&link->wakeupq, skb);
>     > -     link->stats.link_congs++;
>     > +     msg_set_dest_droppable(buf_msg(skb), true);
>     > +     skb_queue_tail(&l->wakeupq, skb);
>     > +
>     > +     /* Keep the packet chain until we can send it */
>     > +     pkts = &TIPC_SKB_CB(skb)->pkts;
>     > +     __skb_queue_head_init(pkts);
>     > +     skb_queue_splice_init(list, pkts);
>     > +     l->stats.link_congs++;
>     >       return -ELINKCONG;
>     >  }
>     >
>     >  /**
>     >   * link_prepare_wakeup - prepare users for wakeup after congestion
>     > - * @link: congested link
>     > - * Move a number of waiting users, as permitted by available space in
>     > - * the send queue, from link wait queue to node wait queue for wakeup
>     > + * @l: congested link
>     > + * Wake up a number of waiting users, as permitted by available space
>     > + * in the send queue
>     >   */
>     > -void link_prepare_wakeup(struct tipc_link *l)
>     > +void link_prepare_wakeup(struct tipc_link *l, struct sk_buff_head
>     *xmitq)
>     >  {
>     > -     int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
>     > -     int imp, lim;
>     > +     struct sk_buff_head *pkts;
>     >       struct sk_buff *skb, *tmp;
>     > +     int imp;
>     >
>     >       skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
>     > -             imp = TIPC_SKB_CB(skb)->chain_imp;
>     > -             lim = l->backlog[imp].limit;
>     > -             pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
>     > -             if ((pnd[imp] + l->backlog[imp].len) >= lim)
>     > +             pkts = &TIPC_SKB_CB(skb)->pkts;
>     > +             imp = msg_importance(buf_msg(skb_peek(pkts)));
>     > +             if (tipc_link_congested(l, imp))
>     >                       break;
>     > +             tipc_link_xmit(l, pkts, xmitq);
>     >               skb_unlink(skb, &l->wakeupq);
>     >               skb_queue_tail(l->inputq, skb);
>     >       }
>     > @@ -870,8 +887,7 @@ void tipc_link_reset(struct tipc_link *l)
>     >   * @list: chain of buffers containing message
>     >   * @xmitq: returned list of packets to be sent by caller
>     >   *
>     > - * Consumes the buffer chain, except when returning -ELINKCONG,
>     > - * since the caller then may want to make more send attempts.
>     > + * Consumes the buffer chain.
>     >   * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
>     >   * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
>     >   */
>     > @@ -880,7 +896,7 @@ int tipc_link_xmit(struct tipc_link *l, struct
>     sk_buff_head
>     > *list,
>     >  {
>     >       struct tipc_msg *hdr = buf_msg(skb_peek(list));
>     >       unsigned int maxwin = l->window;
>     > -     unsigned int i, imp = msg_importance(hdr);
>     > +     int imp = msg_importance(hdr);
>     >       unsigned int mtu = l->mtu;
>     >       u16 ack = l->rcv_nxt - 1;
>     >       u16 seqno = l->snd_nxt;
>     > @@ -889,18 +905,14 @@ int tipc_link_xmit(struct tipc_link *l, struct
>     > sk_buff_head *list,
>     >       struct sk_buff_head *backlogq = &l->backlogq;
>     >       struct sk_buff *skb, *_skb, *bskb;
>     >
>     > -     /* Match msg importance against this and all higher backlog
>     limits: */
>     > -     if (!skb_queue_empty(backlogq)) {
>     > -             for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
>     > -                     if (unlikely(l->backlog[i].len >=
>     l->backlog[i].limit))
>     > -                             return link_schedule_user(l, list);
>     > -             }
>     > -     }
>     >       if (unlikely(msg_size(hdr) > mtu)) {
>     >               skb_queue_purge(list);
>     >               return -EMSGSIZE;
>     >       }
>     >
>     > +     if (unlikely(tipc_link_congested(l, imp)))
>     > +             return link_schedule_user(l, list);
>     > +
>     >       /* Prepare each packet for sending, and add to relevant
>     queue: */
>     >       while (skb_queue_len(list)) {
>     >               skb = skb_peek(list);
>     > @@ -1248,7 +1260,7 @@ int tipc_link_rcv(struct tipc_link *l,
>     struct sk_buff *skb,
>     >               if (likely(tipc_link_release_pkts(l, msg_ack(hdr)))) {
>     >                       tipc_link_advance_backlog(l, xmitq);
>     >                       if (unlikely(!skb_queue_empty(&l->wakeupq)))
>     > -                             link_prepare_wakeup(l);
>     > +                             link_prepare_wakeup(l, xmitq);
>     >               }
>     >
>     >               /* Defer delivery if sequence gap */
>     > @@ -1527,7 +1539,7 @@ static int tipc_link_proto_rcv(struct
>     tipc_link *l, struct
>     > sk_buff *skb,
>     >
>     >               tipc_link_advance_backlog(l, xmitq);
>     >               if (unlikely(!skb_queue_empty(&l->wakeupq)))
>     > -                     link_prepare_wakeup(l);
>     > +                     link_prepare_wakeup(l, xmitq);
>     >       }
>     >  exit:
>     >       kfree_skb(skb);
>     > @@ -1739,7 +1751,7 @@ void tipc_link_bc_ack_rcv(struct tipc_link
>     *l, u16 acked,
>     >       l->acked = acked;
>     >       tipc_link_advance_backlog(snd_l, xmitq);
>     >       if (unlikely(!skb_queue_empty(&snd_l->wakeupq)))
>     > -             link_prepare_wakeup(snd_l);
>     > +             link_prepare_wakeup(snd_l, xmitq);
>     >  }
>     >
>     >  /* tipc_link_bc_nack_rcv(): receive broadcast nack message
>     > diff --git a/net/tipc/msg.h b/net/tipc/msg.h
>     > index 8d40861..53cb404 100644
>     > --- a/net/tipc/msg.h
>     > +++ b/net/tipc/msg.h
>     > @@ -95,13 +95,11 @@ struct plist;
>     >  #define TIPC_MEDIA_INFO_OFFSET       5
>     >
>     >  struct tipc_skb_cb {
>     > -     u32 bytes_read;
>     >       struct sk_buff *tail;
>     > -     bool validated;
>     > -     bool wakeup_pending;
>     > -     u16 chain_sz;
>     > -     u16 chain_imp;
>     > +     struct sk_buff_head pkts;
>     > +     u32 bytes_read;
>     >       u16 ackers;
>     > +     bool validated;
>     >  };
>     >
>     >  #define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0]))
>     > diff --git a/net/tipc/node.c b/net/tipc/node.c
>     > index 9d2f4c2..f51d360 100644
>     > --- a/net/tipc/node.c
>     > +++ b/net/tipc/node.c
>     > @@ -1167,7 +1167,7 @@ static int __tipc_nl_add_node(struct
>     tipc_nl_msg *msg,
>     > struct tipc_node *node)
>     >   * @list: chain of buffers containing message
>     >   * @dnode: address of destination node
>     >   * @selector: a number used for deterministic link selection
>     > - * Consumes the buffer chain, except when returning -ELINKCONG
>     > + * Consumes the buffer chain.
>     >   * Returns 0 if success, otherwise:
>     -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE,-
>     > ENOBUF
>     >   */
>     >  int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
>     > diff --git a/net/tipc/socket.c b/net/tipc/socket.c
>     > index 7b8ff92..07e9469 100644
>     > --- a/net/tipc/socket.c
>     > +++ b/net/tipc/socket.c
>     > @@ -67,12 +67,14 @@ enum {
>     >   * @max_pkt: maximum packet size "hint" used when building
>     messages sent by
>     > port
>     >   * @portid: unique port identity in TIPC socket hash table
>     >   * @phdr: preformatted message header used when sending messages
>     > + * #cong_links: list of congested links
>     >   * @publications: list of publications for port
>     > + * @blocking_link: address of the congested link we are currently
>     sleeping on
>     >   * @pub_count: total # of publications port has made during its
>     lifetime
>     >   * @probing_state:
>     >   * @conn_timeout: the time we can wait for an unresponded setup
>     request
>     >   * @dupl_rcvcnt: number of bytes counted twice, in both backlog
>     and rcv queue
>     > - * @link_cong: non-zero if owner must sleep because of link
>     congestion
>     > + * @cong_link_cnt: number of congested links
>     >   * @sent_unacked: # messages sent by socket, and not yet acked by
>     peer
>     >   * @rcv_unacked: # messages read by user, but not yet acked back
>     to peer
>     >   * @peer: 'connected' peer for dgram/rdm
>     > @@ -87,13 +89,13 @@ struct tipc_sock {
>     >       u32 max_pkt;
>     >       u32 portid;
>     >       struct tipc_msg phdr;
>     > -     struct list_head sock_list;
>     > +     struct list_head cong_links;
>     >       struct list_head publications;
>     >       u32 pub_count;
>     >       uint conn_timeout;
>     >       atomic_t dupl_rcvcnt;
>     >       bool probe_unacked;
>     > -     bool link_cong;
>     > +     u16 cong_link_cnt;
>     >       u16 snt_unacked;
>     >       u16 snd_win;
>     >       u16 peer_caps;
>     > @@ -118,8 +120,7 @@ static int tipc_sk_withdraw(struct tipc_sock
>     *tsk, uint
>     > scope,
>     >  static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
>     >  static int tipc_sk_insert(struct tipc_sock *tsk);
>     >  static void tipc_sk_remove(struct tipc_sock *tsk);
>     > -static int __tipc_send_stream(struct socket *sock, struct msghdr *m,
>     > -                           size_t dsz);
>     > +static int __tipc_sendstream(struct socket *sock, struct msghdr
>     *m, size_t dsz);
>     >  static int __tipc_sendmsg(struct socket *sock, struct msghdr *m,
>     size_t dsz);
>     >
>     >  static const struct proto_ops packet_ops;
>     > @@ -424,6 +425,7 @@ static int tipc_sk_create(struct net *net,
>     struct socket
>     > *sock,
>     >       tsk = tipc_sk(sk);
>     >       tsk->max_pkt = MAX_PKT_DEFAULT;
>     >       INIT_LIST_HEAD(&tsk->publications);
>     > +     INIT_LIST_HEAD(&tsk->cong_links);
>     >       msg = &tsk->phdr;
>     >       tn = net_generic(sock_net(sk), tipc_net_id);
>     >       tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE,
>     > TIPC_NAMED_MSG,
>     > @@ -474,9 +476,14 @@ static void __tipc_shutdown(struct socket
>     *sock, int
>     > error)
>     >       struct sock *sk = sock->sk;
>     >       struct tipc_sock *tsk = tipc_sk(sk);
>     >       struct net *net = sock_net(sk);
>     > +     long timeout = CONN_TIMEOUT_DEFAULT;
>     >       u32 dnode = tsk_peer_node(tsk);
>     >       struct sk_buff *skb;
>     >
>     > +     /* Avoid that hi-prio shutdown msgs bypass msgs in link
>     wakeup queue
>     > */
>     > +     tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
>     > +                                         !tsk_conn_cong(tsk)));
>     > +
>     >       /* Reject all unreceived messages, except on an active
>     connection
>     >        * (which disconnects locally & sends a 'FIN+' to peer).
>     >        */
>     > @@ -545,7 +552,7 @@ static int tipc_release(struct socket *sock)
>     >
>     >       /* Reject any messages that accumulated in backlog queue */
>     >       release_sock(sk);
>     > -
>     > +     u32_list_purge(&tsk->cong_links);
>     >       call_rcu(&tsk->rcu, tipc_sk_callback);
>     >       sock->sk = NULL;
>     >
>     > @@ -688,7 +695,7 @@ static unsigned int tipc_poll(struct file
>     *file, struct socket
>     > *sock,
>     >
>     >       switch (sk->sk_state) {
>     >       case TIPC_ESTABLISHED:
>     > -             if (!tsk->link_cong && !tsk_conn_cong(tsk))
>     > +             if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
>     >                       mask |= POLLOUT;
>     >               /* fall thru' */
>     >       case TIPC_LISTEN:
>     > @@ -697,7 +704,7 @@ static unsigned int tipc_poll(struct file
>     *file, struct socket
>     > *sock,
>     >                       mask |= (POLLIN | POLLRDNORM);
>     >               break;
>     >       case TIPC_OPEN:
>     > -             if (!tsk->link_cong)
>     > +             if (!tsk->cong_link_cnt)
>     >                       mask |= POLLOUT;
>     >               if (tipc_sk_type_connectionless(sk) &&
>     >                   (!skb_queue_empty(&sk->sk_receive_queue)))
>     > @@ -716,63 +723,48 @@ static unsigned int tipc_poll(struct file
>     *file, struct
>     > socket *sock,
>     >   * @sock: socket structure
>     >   * @seq: destination address
>     >   * @msg: message to send
>     > - * @dsz: total length of message data
>     > - * @timeo: timeout to wait for wakeup
>     > + * @dlen: length of data to send
>     > + * @timeout: timeout to wait for wakeup
>     >   *
>     >   * Called from function tipc_sendmsg(), which has done all sanity
>     checks
>     >   * Returns the number of bytes sent on success, or errno
>     >   */
>     >  static int tipc_sendmcast(struct  socket *sock, struct
>     tipc_name_seq *seq,
>     > -                       struct msghdr *msg, size_t dsz, long timeo)
>     > +                       struct msghdr *msg, size_t dlen, long timeout)
>     >  {
>     >       struct sock *sk = sock->sk;
>     >       struct tipc_sock *tsk = tipc_sk(sk);
>     > +     struct tipc_msg *hdr = &tsk->phdr;
>     >       struct net *net = sock_net(sk);
>     > -     struct tipc_msg *mhdr = &tsk->phdr;
>     > -     struct sk_buff_head pktchain;
>     > -     struct iov_iter save = msg->msg_iter;
>     > -     uint mtu;
>     > +     int mtu = tipc_bcast_get_mtu(net);
>     > +     struct sk_buff_head pkts;
>     >       int rc;
>     >
>     > -     if (!timeo && tsk->link_cong)
>     > -             return -ELINKCONG;
>     > -
>     > -     msg_set_type(mhdr, TIPC_MCAST_MSG);
>     > -     msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
>     > -     msg_set_destport(mhdr, 0);
>     > -     msg_set_destnode(mhdr, 0);
>     > -     msg_set_nametype(mhdr, seq->type);
>     > -     msg_set_namelower(mhdr, seq->lower);
>     > -     msg_set_nameupper(mhdr, seq->upper);
>     > -     msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
>     > -
>     > -     skb_queue_head_init(&pktchain);
>     > +     rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
>     > +     if (unlikely(rc))
>     > +             return rc;
>     >
>     > -new_mtu:
>     > -     mtu = tipc_bcast_get_mtu(net);
>     > -     rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &pktchain);
>     > -     if (unlikely(rc < 0))
>     > +     msg_set_type(hdr, TIPC_MCAST_MSG);
>     > +     msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
>     > +     msg_set_destport(hdr, 0);
>     > +     msg_set_destnode(hdr, 0);
>     > +     msg_set_nametype(hdr, seq->type);
>     > +     msg_set_namelower(hdr, seq->lower);
>     > +     msg_set_nameupper(hdr, seq->upper);
>     > +     msg_set_hdr_sz(hdr, MCAST_H_SIZE);
>     > +
>     > +     skb_queue_head_init(&pkts);
>     > +     rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
>     > +     if (unlikely(rc != dlen))
>     >               return rc;
>     >
>     > -     do {
>     > -             rc = tipc_bcast_xmit(net, &pktchain);
>     > -             if (likely(!rc))
>     > -                     return dsz;
>     > -
>     > -             if (rc == -ELINKCONG) {
>     > -                     tsk->link_cong = 1;
>     > -                     rc = tipc_wait_for_cond(sock, &timeo,
>     !tsk->link_cong);
>     > -                     if (!rc)
>     > -                             continue;
>     > -             }
>     > -             __skb_queue_purge(&pktchain);
>     > -             if (rc == -EMSGSIZE) {
>     > -                     msg->msg_iter = save;
>     > -                     goto new_mtu;
>     > -             }
>     > -             break;
>     > -     } while (1);
>     > -     return rc;
>     > +     rc = tipc_bcast_xmit(net, &pkts);
>     > +     if (unlikely(rc == -ELINKCONG)) {
>     > +             tsk->cong_link_cnt = 1;
>     > +             rc = 0;
>     > +     }
>     > +
>     > +     return rc ? rc : dlen;
>     >  }
>     >
>     >  /**
>     > @@ -896,35 +888,38 @@ static int tipc_sendmsg(struct socket *sock,
>     >       return ret;
>     >  }
>     >
>     > -static int __tipc_sendmsg(struct socket *sock, struct msghdr *m,
>     size_t dsz)
>     > +static int __tipc_sendmsg(struct socket *sock, struct msghdr *m,
>     size_t dlen)
>     >  {
>     > -     DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
>     >       struct sock *sk = sock->sk;
>     > -     struct tipc_sock *tsk = tipc_sk(sk);
>     >       struct net *net = sock_net(sk);
>     > -     struct tipc_msg *mhdr = &tsk->phdr;
>     > -     u32 dnode, dport;
>     > -     struct sk_buff_head pktchain;
>     > -     bool is_connectionless = tipc_sk_type_connectionless(sk);
>     > -     struct sk_buff *skb;
>     > +     struct tipc_sock *tsk = tipc_sk(sk);
>     > +     DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
>     > +     long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
>     > +     struct list_head *clinks = &tsk->cong_links;
>     > +     bool syn = !tipc_sk_type_connectionless(sk);
>     > +     struct tipc_msg *hdr = &tsk->phdr;
>     >       struct tipc_name_seq *seq;
>     > -     struct iov_iter save;
>     > -     u32 mtu;
>     > -     long timeo;
>     > -     int rc;
>     > +     struct sk_buff_head pkts;
>     > +     u32 type, inst, domain;
>     > +     u32 dnode, dport;
>     > +     int mtu, rc;
>     >
>     > -     if (dsz > TIPC_MAX_USER_MSG_SIZE)
>     > +     if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
>     >               return -EMSGSIZE;
>     > +
>     >       if (unlikely(!dest)) {
>     > -             if (is_connectionless && tsk->peer.family == AF_TIPC)
>     > -                     dest = &tsk->peer;
>     > -             else
>     > +             dest = &tsk->peer;
>     > +             if (!syn || dest->family != AF_TIPC)
>     >                       return -EDESTADDRREQ;
>     > -     } else if (unlikely(m->msg_namelen < sizeof(*dest)) ||
>     > -                dest->family != AF_TIPC) {
>     > -             return -EINVAL;
>     >       }
>     > -     if (!is_connectionless) {
>     > +
>     > +     if (unlikely(m->msg_namelen < sizeof(*dest)))
>     > +             return -EINVAL;
>     > +
>     > +     if (unlikely(dest->family != AF_TIPC))
>     > +             return -EINVAL;
>     > +
>     > +     if (unlikely(syn)) {
>     >               if (sk->sk_state == TIPC_LISTEN)
>     >                       return -EPIPE;
>     >               if (sk->sk_state != TIPC_OPEN)
>     > @@ -936,72 +931,62 @@ static int __tipc_sendmsg(struct socket
>     *sock, struct
>     > msghdr *m, size_t dsz)
>     >                       tsk->conn_instance =
>     dest->addr.name.name.instance;
>     >               }
>     >       }
>     > -     seq = &dest->addr.nameseq;
>     > -     timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
>     >
>     > -     if (dest->addrtype == TIPC_ADDR_MCAST) {
>     > -             return tipc_sendmcast(sock, seq, m, dsz, timeo);
>     > -     } else if (dest->addrtype == TIPC_ADDR_NAME) {
>     > -             u32 type = dest->addr.name.name.type;
>     > -             u32 inst = dest->addr.name.name.instance;
>     > -             u32 domain = dest->addr.name.domain;
>     > +     seq = &dest->addr.nameseq;
>     > +     if (dest->addrtype == TIPC_ADDR_MCAST)
>     > +             return tipc_sendmcast(sock, seq, m, dlen, timeout);
>     >
>     > +     if (dest->addrtype == TIPC_ADDR_NAME) {
>     > +             type = dest->addr.name.name.type;
>     > +             inst = dest->addr.name.name.instance;
>     > +             domain = dest->addr.name.domain;
>     >               dnode = domain;
>     > -             msg_set_type(mhdr, TIPC_NAMED_MSG);
>     > -             msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
>     > -             msg_set_nametype(mhdr, type);
>     > -             msg_set_nameinst(mhdr, inst);
>     > -             msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
>     > +             msg_set_type(hdr, TIPC_NAMED_MSG);
>     > +             msg_set_hdr_sz(hdr, NAMED_H_SIZE);
>     > +             msg_set_nametype(hdr, type);
>     > +             msg_set_nameinst(hdr, inst);
>     > +             msg_set_lookup_scope(hdr, tipc_addr_scope(domain));
>     >               dport = tipc_nametbl_translate(net, type, inst, &dnode);
>     > -             msg_set_destnode(mhdr, dnode);
>     > -             msg_set_destport(mhdr, dport);
>     > +             msg_set_destnode(hdr, dnode);
>     > +             msg_set_destport(hdr, dport);
>     >               if (unlikely(!dport && !dnode))
>     >                       return -EHOSTUNREACH;
>     > +
>     >       } else if (dest->addrtype == TIPC_ADDR_ID) {
>     >               dnode = dest->addr.id.node;
>     > -             msg_set_type(mhdr, TIPC_DIRECT_MSG);
>     > -             msg_set_lookup_scope(mhdr, 0);
>     > -             msg_set_destnode(mhdr, dnode);
>     > -             msg_set_destport(mhdr, dest->addr.id.ref);
>     > -             msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
>     > +             msg_set_type(hdr, TIPC_DIRECT_MSG);
>     > +             msg_set_lookup_scope(hdr, 0);
>     > +             msg_set_destnode(hdr, dnode);
>     > +             msg_set_destport(hdr, dest->addr.id.ref);
>     > +             msg_set_hdr_sz(hdr, BASIC_H_SIZE);
>     >       }
>     >
>     > -     skb_queue_head_init(&pktchain);
>     > -     save = m->msg_iter;
>     > -new_mtu:
>     > +     /* Block or return if destination link is congested */
>     > +     rc = tipc_wait_for_cond(sock, &timeout, !u32_find(clinks,
>     dnode));
>     > +     if (unlikely(rc))
>     > +             return rc;
>     > +
>     > +     skb_queue_head_init(&pkts);
>     >       mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
>     > -     rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &pktchain);
>     > -     if (rc < 0)
>     > +     rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
>     > +     if (unlikely(rc != dlen))
>     >               return rc;
>     >
>     > -     do {
>     > -             skb = skb_peek(&pktchain);
>     > -             TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
>     > -             rc = tipc_node_xmit(net, &pktchain, dnode, tsk->portid);
>     > -             if (likely(!rc)) {
>     > -                     if (!is_connectionless)
>     > -                             tipc_set_sk_state(sk, TIPC_CONNECTING);
>     > -                     return dsz;
>     > -             }
>     > -             if (rc == -ELINKCONG) {
>     > -                     tsk->link_cong = 1;
>     > -                     rc = tipc_wait_for_cond(sock, &timeo,
>     !tsk->link_cong);
>     > -                     if (!rc)
>     > -                             continue;
>     > -             }
>     > -             __skb_queue_purge(&pktchain);
>     > -             if (rc == -EMSGSIZE) {
>     > -                     m->msg_iter = save;
>     > -                     goto new_mtu;
>     > -             }
>     > -             break;
>     > -     } while (1);
>     > +     rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
>     > +     if (unlikely(rc == -ELINKCONG)) {
>     > +             u32_push(clinks, dnode);
>     > +             tsk->cong_link_cnt++;
>     > +             rc = 0;
>     > +     }
>     >
>     > -     return rc;
>     > +     if (unlikely(syn && !rc))
>     > +             tipc_set_sk_state(sk, TIPC_CONNECTING);
>     > +
>     > +     return rc ? rc : dlen;
>     >  }
>     >
>     >  /**
>     > - * tipc_send_stream - send stream-oriented data
>     > + * tipc_sendstream - send stream-oriented data
>     >   * @sock: socket structure
>     >   * @m: data to send
>     >   * @dsz: total length of data to be transmitted
>     > @@ -1011,97 +996,69 @@ static int __tipc_sendmsg(struct socket
>     *sock, struct
>     > msghdr *m, size_t dsz)
>     >   * Returns the number of bytes sent on success (or partial success),
>     >   * or errno if no data sent
>     >   */
>     > -static int tipc_send_stream(struct socket *sock, struct msghdr
>     *m, size_t dsz)
>     > +static int tipc_sendstream(struct socket *sock, struct msghdr *m,
>     size_t dsz)
>     >  {
>     >       struct sock *sk = sock->sk;
>     >       int ret;
>     >
>     >       lock_sock(sk);
>     > -     ret = __tipc_send_stream(sock, m, dsz);
>     > +     ret = __tipc_sendstream(sock, m, dsz);
>     >       release_sock(sk);
>     >
>     >       return ret;
>     >  }
>     >
>     > -static int __tipc_send_stream(struct socket *sock, struct msghdr
>     *m, size_t dsz)
>     > +static int __tipc_sendstream(struct socket *sock, struct msghdr
>     *m, size_t dlen)
>     >  {
>     >       struct sock *sk = sock->sk;
>     > -     struct net *net = sock_net(sk);
>     > -     struct tipc_sock *tsk = tipc_sk(sk);
>     > -     struct tipc_msg *mhdr = &tsk->phdr;
>     > -     struct sk_buff_head pktchain;
>     >       DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
>     > -     u32 portid = tsk->portid;
>     > -     int rc = -EINVAL;
>     > -     long timeo;
>     > -     u32 dnode;
>     > -     uint mtu, send, sent = 0;
>     > -     struct iov_iter save;
>     > -     int hlen = MIN_H_SIZE;
>     > -
>     > -     /* Handle implied connection establishment */
>     > -     if (unlikely(dest)) {
>     > -             rc = __tipc_sendmsg(sock, m, dsz);
>     > -             hlen = msg_hdr_sz(mhdr);
>     > -             if (dsz && (dsz == rc))
>     > -                     tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
>     > -             return rc;
>     > -     }
>     > -     if (dsz > (uint)INT_MAX)
>     > -             return -EMSGSIZE;
>     > -
>     > -     if (unlikely(!tipc_sk_connected(sk))) {
>     > -             if (sk->sk_state == TIPC_DISCONNECTING)
>     > -                     return -EPIPE;
>     > -             else
>     > -                     return -ENOTCONN;
>     > -     }
>     > +     long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
>     > +     struct tipc_sock *tsk = tipc_sk(sk);
>     > +     struct tipc_msg *hdr = &tsk->phdr;
>     > +     struct net *net = sock_net(sk);
>     > +     struct sk_buff_head pkts;
>     > +     u32 dnode = tsk_peer_node(tsk);
>     > +     int send, sent = 0;
>     > +     int rc = 0;
>     >
>     > -     timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
>     > -     if (!timeo && tsk->link_cong)
>     > -             return -ELINKCONG;
>     > +     skb_queue_head_init(&pkts);
>     >
>     > -     dnode = tsk_peer_node(tsk);
>     > -     skb_queue_head_init(&pktchain);
>     > +     if (unlikely(dlen > INT_MAX))
>     > +             return -EMSGSIZE;
>     >
>     > -next:
>     > -     save = m->msg_iter;
>     > -     mtu = tsk->max_pkt;
>     > -     send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
>     > -     rc = tipc_msg_build(mhdr, m, sent, send, mtu, &pktchain);
>     > -     if (unlikely(rc < 0))
>     > +     /* Handle implicit connection setup */
>     > +     if (unlikely(dest)) {
>     > +             rc = __tipc_sendmsg(sock, m, dlen);
>     > +             if (dlen && (dlen == rc))
>     > +                     tsk->snt_unacked = tsk_inc(tsk, dlen +
>     msg_hdr_sz(hdr));
>     >               return rc;
>     > +     }
>     >
>     >       do {
>     > -             if (likely(!tsk_conn_cong(tsk))) {
>     > -                     rc = tipc_node_xmit(net, &pktchain, dnode,
>     portid);
>     > -                     if (likely(!rc)) {
>     > -                             tsk->snt_unacked += tsk_inc(tsk,
>     send + hlen);
>     > -                             sent += send;
>     > -                             if (sent == dsz)
>     > -                                     return dsz;
>     > -                             goto next;
>     > -                     }
>     > -                     if (rc == -EMSGSIZE) {
>     > -                             __skb_queue_purge(&pktchain);
>     > -                             tsk->max_pkt =
>     tipc_node_get_mtu(net, dnode,
>     > -
>     portid);
>     > -                             m->msg_iter = save;
>     > -                             goto next;
>     > -                     }
>     > -                     if (rc != -ELINKCONG)
>     > -                             break;
>     > -
>     > -                     tsk->link_cong = 1;
>     > -             }
>     > -             rc = tipc_wait_for_cond(sock, &timeo,
>     > -                                     (!tsk->link_cong &&
>     > +             rc = tipc_wait_for_cond(sock, &timeout,
>     > +                                     (!tsk->cong_link_cnt &&
>     >                                        !tsk_conn_cong(tsk) &&
>     >                                        tipc_sk_connected(sk)));
>     > -     } while (!rc);
>     > +             if (unlikely(rc))
>     > +                     break;
>     > +
>     > +             send = min_t(size_t, dlen - sent,
>     TIPC_MAX_USER_MSG_SIZE);
>     > +             rc = tipc_msg_build(hdr, m, sent, send,
>     tsk->max_pkt, &pkts);
>     > +             if (unlikely(rc != send))
>     > +                     break;
>     > +
>     > +             rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
>     > +             if (unlikely(rc == -ELINKCONG)) {
>     > +                     tsk->cong_link_cnt = 1;
>     > +                     rc = 0;
>     > +             }
>     > +             if (likely(!rc)) {
>     > +                     tsk->snt_unacked += tsk_inc(tsk, send +
>     MIN_H_SIZE);
>     > +                     sent += send;
>     > +             }
>     > +     } while (sent < dlen && !rc);
>     >
>     > -     __skb_queue_purge(&pktchain);
>     > -     return sent ? sent : rc;
>     > +     return rc ? rc : sent;
>     >  }
>     >
>     >  /**
>     > @@ -1119,7 +1076,7 @@ static int tipc_send_packet(struct socket
>     *sock, struct
>     > msghdr *m, size_t dsz)
>     >       if (dsz > TIPC_MAX_USER_MSG_SIZE)
>     >               return -EMSGSIZE;
>     >
>     > -     return tipc_send_stream(sock, m, dsz);
>     > +     return tipc_sendstream(sock, m, dsz);
>     >  }
>     >
>     >  /* tipc_sk_finish_conn - complete the setup of a connection
>     > @@ -1686,6 +1643,7 @@ static bool filter_rcv(struct sock *sk,
>     struct sk_buff
>     > *skb,
>     >       unsigned int limit = rcvbuf_limit(sk, skb);
>     >       int err = TIPC_OK;
>     >       int usr = msg_user(hdr);
>     > +     u32 onode;
>     >
>     >       if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
>     >               tipc_sk_proto_rcv(tsk, skb, xmitq);
>     > @@ -1693,8 +1651,10 @@ static bool filter_rcv(struct sock *sk,
>     struct sk_buff
>     > *skb,
>     >       }
>     >
>     >       if (unlikely(usr == SOCK_WAKEUP)) {
>     > +             onode = msg_orignode(hdr);
>     >               kfree_skb(skb);
>     > -             tsk->link_cong = 0;
>     > +             u32_del(&tsk->cong_links, onode);
>     > +             tsk->cong_link_cnt--;
>     >               sk->sk_write_space(sk);
>     >               return false;
>     >       }
>     > @@ -2102,7 +2062,7 @@ static int tipc_accept(struct socket *sock,
>     struct socket
>     > *new_sock, int flags)
>     >               struct msghdr m = {NULL,};
>     >
>     >               tsk_advance_rx_queue(sk);
>     > -             __tipc_send_stream(new_sock, &m, 0);
>     > +             __tipc_sendstream(new_sock, &m, 0);
>     >       } else {
>     >               __skb_dequeue(&sk->sk_receive_queue);
>     >               __skb_queue_head(&new_sk->sk_receive_queue, buf);
>     > @@ -2563,7 +2523,7 @@ static const struct proto_ops stream_ops = {
>     >       .shutdown       = tipc_shutdown,
>     >       .setsockopt     = tipc_setsockopt,
>     >       .getsockopt     = tipc_getsockopt,
>     > -     .sendmsg        = tipc_send_stream,
>     > +     .sendmsg        = tipc_sendstream,
>     >       .recvmsg        = tipc_recv_stream,
>     >       .mmap           = sock_no_mmap,
>     >       .sendpage       = sock_no_sendpage
>     > --
>     > 2.7.4
>
>

------------------------------------------------------------------------------
_______________________________________________
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to