On 06/12/2018 11:34 AM, Hoang Le wrote:
> It is currently impossible for a client to send a connection setup
> message with data, directly followed by one or more other data messages.

Does TCP have the similar capability?

> 
> A way to fix this is to note that the we are sending a
> service addressed message from a CONNECTING SOCK_SEQPACKET/SOCK_STREAM
> socket, and define a new FOLLOW_UP (e.g., the unused bit 18 in word 0)
> in the message header, and when the accepting socket sees this bit set,
> it looks if there is already a spawned socket with that peer.
> If so, it sends the message to that socket, instead of creating a new one.
> 

We already support the implicit connection with data like SYN+. I doubt
whether the enhancement is necessary.

> Signed-off-by: Hoang Le <[email protected]>
> ---
>  net/tipc/msg.h    | 18 ++++++++++++
>  net/tipc/node.c   | 26 +++++++++++++++++
>  net/tipc/node.h   |  1 +
>  net/tipc/socket.c | 84 
> +++++++++++++++++++++++++++++++++++++++++++++++++++----
>  4 files changed, 124 insertions(+), 5 deletions(-)
> 
> diff --git a/net/tipc/msg.h b/net/tipc/msg.h
> index a4e944d59394..0ca56c44f4b4 100644
> --- a/net/tipc/msg.h
> +++ b/net/tipc/msg.h
> @@ -246,6 +246,19 @@ static inline void msg_set_src_droppable(struct tipc_msg 
> *m, u32 d)
>       msg_set_bits(m, 0, 18, 1, d);
>  }
>  
> +/* Use same bit 18 to indicate data messages come along with
> + * connection setup message
> + */
> +static inline int msg_follow_up(struct tipc_msg *m)
> +{
> +     return msg_bits(m, 0, 18, 1);
> +}
> +
> +static inline void msg_set_follow_up(struct tipc_msg *m, u32 d)
> +{
> +     msg_set_bits(m, 0, 18, 1, d);
> +}
> +
>  static inline void msg_set_size(struct tipc_msg *m, u32 sz)
>  {
>       m->hdr[0] = htonl((msg_word(m, 0) & ~0x1ffff) | sz);
> @@ -564,6 +577,11 @@ static inline void msg_set_nameupper(struct tipc_msg *m, 
> u32 n)
>  #define GRP_REMIT_MSG        5
>  
>  /*
> + * Message follow with connection setup message
> + */
> +#define FOLLOW_UP_MSG       1
> +
> +/*
>   * Word 1
>   */
>  static inline u32 msg_seq_gap(struct tipc_msg *m)
> diff --git a/net/tipc/node.c b/net/tipc/node.c
> index 6a44eb812baf..4ce059194dd2 100644
> --- a/net/tipc/node.c
> +++ b/net/tipc/node.c
> @@ -521,6 +521,32 @@ int tipc_node_add_conn(struct net *net, u32 dnode, u32 
> port, u32 peer_port)
>       return err;
>  }
>  
> +/* Lookup spawned socket that connecting with service addressed */
> +u32 tipc_node_lkup_conn(struct net *net, u32 dnode, u32 peer_port)
> +{
> +     struct tipc_node *node;
> +     struct tipc_sock_conn *conn, *safe;
> +     u32 portid = 0;
> +
> +     if (in_own_node(net, dnode))
> +             return 0;
> +
> +     node = tipc_node_find(net, dnode);
> +     if (!node)
> +             return 0;
> +
> +     tipc_node_read_lock(node);
> +     list_for_each_entry_safe(conn, safe, &node->conn_sks, list) {
> +             if (peer_port == conn->peer_port) {
> +                     portid = conn->port;
> +                     break;
> +             }
> +     }
> +     tipc_node_read_unlock(node);
> +     tipc_node_put(node);
> +     return portid;
> +}
> +
>  void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port)
>  {
>       struct tipc_node *node;
> diff --git a/net/tipc/node.h b/net/tipc/node.h
> index 846c8f240872..20d1699f4b94 100644
> --- a/net/tipc/node.h
> +++ b/net/tipc/node.h
> @@ -81,6 +81,7 @@ void tipc_node_unsubscribe(struct net *net, struct 
> list_head *subscr, u32 addr);
>  void tipc_node_broadcast(struct net *net, struct sk_buff *skb);
>  int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
>  void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
> +u32 tipc_node_lkup_conn(struct net *net, u32 dnode, u32 port);
>  int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel);
>  bool tipc_node_is_up(struct net *net, u32 addr);
>  u16 tipc_node_get_capabilities(struct net *net, u32 addr);
> diff --git a/net/tipc/socket.c b/net/tipc/socket.c
> index 14a5d055717d..552e435ffa06 100644
> --- a/net/tipc/socket.c
> +++ b/net/tipc/socket.c
> @@ -138,6 +138,9 @@ static void tipc_sk_remove(struct tipc_sock *tsk);
>  static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t 
> dsz);
>  static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
>  
> +/* Forward to CONNECTING socket if it is already being spawned */
> +static bool __tipc_sk_spawned_enqueue(struct sock *sk, struct sk_buff *skb);
> +
>  static const struct proto_ops packet_ops;
>  static const struct proto_ops stream_ops;
>  static const struct proto_ops msg_ops;
> @@ -1310,7 +1313,10 @@ static int __tipc_sendmsg(struct socket *sock, struct 
> msghdr *m, size_t dlen)
>               if (sk->sk_state == TIPC_LISTEN)
>                       return -EPIPE;
>               if (sk->sk_state != TIPC_OPEN)
> -                     return -EISCONN;
> +                     if (sk->sk_state != TIPC_CONNECTING ||
> +                         (msg_follow_up(hdr) != FOLLOW_UP_MSG))
> +                             return -EISCONN;
> +
>               if (tsk->published)
>                       return -EOPNOTSUPP;
>               if (dest->addrtype == TIPC_ADDR_NAME) {
> @@ -1416,6 +1422,7 @@ static int __tipc_sendstream(struct socket *sock, 
> struct msghdr *m, size_t dlen)
>  
>       /* Handle implicit connection setup */
>       if (unlikely(dest)) {
> +             msg_set_follow_up(hdr, FOLLOW_UP_MSG);
>               rc = __tipc_sendmsg(sock, m, dlen);
>               if (dlen && (dlen == rc))
>                       tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));
> @@ -2079,6 +2086,37 @@ static unsigned int rcvbuf_limit(struct sock *sk, 
> struct sk_buff *skb)
>       return FLOWCTL_MSG_LIM;
>  }
>  
> +static bool __tipc_sk_spawned_enqueue(struct sock *sk, struct sk_buff *skb)
> +{
> +     struct tipc_msg *hdr = buf_msg(skb);
> +     struct net *net = sock_net(sk);
> +     struct tipc_sock *tsk;
> +     struct sock *spawned_sk;
> +     u32 portid = 0;
> +
> +     if (sk->sk_state == TIPC_LISTEN &&
> +         (msg_follow_up(hdr) == FOLLOW_UP_MSG)) {
> +             // Lookup spawned socket in net

No, the above comment format doesn't match Linux kernel code style.
Please change it as

/* ... */

> +             portid = tipc_node_lkup_conn(net, msg_orignode(hdr),
> +                                          msg_origport(hdr));
> +             if (portid) {
> +                     tsk = tipc_sk_lookup(net, portid);
> +                     if (likely(tsk)) {
> +                             spawned_sk = &tsk->sk;
> +                             if 
> (likely(spin_trylock_bh(&spawned_sk->sk_lock.slock))) {
> +                                     
> __skb_queue_tail(&spawned_sk->sk_receive_queue, skb);
> +                                     skb_set_owner_r(skb, spawned_sk);
> +                                     spawned_sk->sk_data_ready(spawned_sk);
> +                                     
> spin_unlock_bh(&spawned_sk->sk_lock.slock);
> +                             }
> +                             sock_put(spawned_sk);
> +                             return true;
> +                     }
> +             }
> +     }
> +     return false;
> +}
> +
>  /**
>   * tipc_sk_filter_rcv - validate incoming message
>   * @sk: socket
> @@ -2114,6 +2152,7 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct 
> sk_buff *skb,
>       /* Validate and add to receive buffer if there is space */
>       while ((skb = __skb_dequeue(&inputq))) {
>               hdr = buf_msg(skb);
> +
>               limit = rcvbuf_limit(sk, skb);
>               if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) ||
>                   (!sk_conn && msg_connected(hdr)) ||
> @@ -2129,9 +2168,14 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct 
> sk_buff *skb,
>                       err = TIPC_OK;
>                       continue;
>               }
> -             __skb_queue_tail(&sk->sk_receive_queue, skb);
> -             skb_set_owner_r(skb, sk);
> -             sk->sk_data_ready(sk);
> +
> +             /* Redirect to spawned socket (CONNECTING/ESTABLISHED)
> +              * if this skb is sent together with connection setup message. 
> */
> +             if (!__tipc_sk_spawned_enqueue(sk, skb)) {

As this is a very hot path, I don't think this is a good idea to go
through so many branches for each message in _tipc_sk_spawned_enqueue(),
which is too expensive.

> +                     __skb_queue_tail(&sk->sk_receive_queue, skb);
> +                     skb_set_owner_r(skb, sk);
> +                     sk->sk_data_ready(sk);
> +             }
>       }
>  }
>  
> @@ -2443,6 +2487,9 @@ static int tipc_accept(struct socket *sock, struct 
> socket *new_sock, int flags,
>       struct sk_buff *buf;
>       struct tipc_sock *new_tsock;
>       struct tipc_msg *msg;
> +     u32 peer_port = 0;
> +     u32 peer_node = 0;
> +     u32 portid = 0;
>       long timeo;
>       int res;
>  
> @@ -2458,15 +2505,24 @@ static int tipc_accept(struct socket *sock, struct 
> socket *new_sock, int flags,
>               goto exit;
>  
>       buf = skb_peek(&sk->sk_receive_queue);
> +     msg = buf_msg(buf);
> +
> +     /*
> +      * Forward to CONNECTING socket if it is already being spawned
> +      */

Change it as:

/*Forward to CONNECTING socket if it is already being spawned*/

> +     if (__tipc_sk_spawned_enqueue(sk, buf)) {
> +             __skb_dequeue(&sk->sk_receive_queue);
> +             goto exit;
> +     }
>  
>       res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, kern);
>       if (res)
>               goto exit;
> +
>       security_sk_clone(sock->sk, new_sock->sk);
>  
>       new_sk = new_sock->sk;
>       new_tsock = tipc_sk(new_sk);
> -     msg = buf_msg(buf);
>  
>       /* we lock on new_sk; but lockdep sees the lock on sk */
>       lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
> @@ -2499,7 +2555,25 @@ static int tipc_accept(struct socket *sock, struct 
> socket *new_sock, int flags,
>               __skb_dequeue(&sk->sk_receive_queue);
>               __skb_queue_head(&new_sk->sk_receive_queue, buf);
>               skb_set_owner_r(buf, new_sk);
> +
> +             /*
> +              * If there are more other data messages follow with
> +              * connection setup message, enqueue on new socket.
> +              */
> +             peer_port = tsk_peer_port(new_tsock);
> +             peer_node = tsk_peer_node(new_tsock);
> +             portid = msg_destport(msg);
> +             while ((buf = tipc_skb_dequeue(&sk->sk_receive_queue, portid)) 
> != NULL) {
> +                     if ((msg_orignode(buf_msg(buf)) == peer_node) &&
> +                         (msg_origport(buf_msg(buf)) == peer_port)) {
> +                             __skb_queue_tail(&new_sk->sk_receive_queue, 
> buf);
> +                             skb_set_owner_r(buf, new_sk);
> +                     } else {
> +                             __skb_queue_tail(&sk->sk_receive_queue, buf);
> +                     }
> +             }
>       }
> +
>       release_sock(new_sk);
>  exit:
>       release_sock(sk);
> 

------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, Slashdot.org! http://sdm.link/slashdot
_______________________________________________
tipc-discussion mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to