On 06/12/2018 11:34 AM, Hoang Le wrote: > It is currently impossible for a client to send a connection setup > message with data, directly followed by one or more other data messages.
Does TCP have the similar capability? > > A way to fix this is to note that the we are sending a > service addressed message from a CONNECTING SOCK_SEQPACKET/SOCK_STREAM > socket, and define a new FOLLOW_UP (e.g., the unused bit 18 in word 0) > in the message header, and when the accepting socket sees this bit set, > it looks if there is already a spawned socket with that peer. > If so, it sends the message to that socket, instead of creating a new one. > We already support the implicit connection with data like SYN+. I doubt whether the enhancement is necessary. > Signed-off-by: Hoang Le <[email protected]> > --- > net/tipc/msg.h | 18 ++++++++++++ > net/tipc/node.c | 26 +++++++++++++++++ > net/tipc/node.h | 1 + > net/tipc/socket.c | 84 > +++++++++++++++++++++++++++++++++++++++++++++++++++---- > 4 files changed, 124 insertions(+), 5 deletions(-) > > diff --git a/net/tipc/msg.h b/net/tipc/msg.h > index a4e944d59394..0ca56c44f4b4 100644 > --- a/net/tipc/msg.h > +++ b/net/tipc/msg.h > @@ -246,6 +246,19 @@ static inline void msg_set_src_droppable(struct tipc_msg > *m, u32 d) > msg_set_bits(m, 0, 18, 1, d); > } > > +/* Use same bit 18 to indicate data messages come along with > + * connection setup message > + */ > +static inline int msg_follow_up(struct tipc_msg *m) > +{ > + return msg_bits(m, 0, 18, 1); > +} > + > +static inline void msg_set_follow_up(struct tipc_msg *m, u32 d) > +{ > + msg_set_bits(m, 0, 18, 1, d); > +} > + > static inline void msg_set_size(struct tipc_msg *m, u32 sz) > { > m->hdr[0] = htonl((msg_word(m, 0) & ~0x1ffff) | sz); > @@ -564,6 +577,11 @@ static inline void msg_set_nameupper(struct tipc_msg *m, > u32 n) > #define GRP_REMIT_MSG 5 > > /* > + * Message follow with connection setup message > + */ > +#define FOLLOW_UP_MSG 1 > + > +/* > * Word 1 > */ > static inline u32 msg_seq_gap(struct tipc_msg *m) > diff --git a/net/tipc/node.c b/net/tipc/node.c > index 6a44eb812baf..4ce059194dd2 100644 > --- a/net/tipc/node.c > +++ b/net/tipc/node.c > @@ -521,6 +521,32 @@ int tipc_node_add_conn(struct net *net, u32 dnode, u32 > port, u32 peer_port) > return err; > } > > +/* Lookup spawned socket that connecting with service addressed */ > +u32 tipc_node_lkup_conn(struct net *net, u32 dnode, u32 peer_port) > +{ > + struct tipc_node *node; > + struct tipc_sock_conn *conn, *safe; > + u32 portid = 0; > + > + if (in_own_node(net, dnode)) > + return 0; > + > + node = tipc_node_find(net, dnode); > + if (!node) > + return 0; > + > + tipc_node_read_lock(node); > + list_for_each_entry_safe(conn, safe, &node->conn_sks, list) { > + if (peer_port == conn->peer_port) { > + portid = conn->port; > + break; > + } > + } > + tipc_node_read_unlock(node); > + tipc_node_put(node); > + return portid; > +} > + > void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port) > { > struct tipc_node *node; > diff --git a/net/tipc/node.h b/net/tipc/node.h > index 846c8f240872..20d1699f4b94 100644 > --- a/net/tipc/node.h > +++ b/net/tipc/node.h > @@ -81,6 +81,7 @@ void tipc_node_unsubscribe(struct net *net, struct > list_head *subscr, u32 addr); > void tipc_node_broadcast(struct net *net, struct sk_buff *skb); > int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port); > void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port); > +u32 tipc_node_lkup_conn(struct net *net, u32 dnode, u32 port); > int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel); > bool tipc_node_is_up(struct net *net, u32 addr); > u16 tipc_node_get_capabilities(struct net *net, u32 addr); > diff --git a/net/tipc/socket.c b/net/tipc/socket.c > index 14a5d055717d..552e435ffa06 100644 > --- a/net/tipc/socket.c > +++ b/net/tipc/socket.c > @@ -138,6 +138,9 @@ static void tipc_sk_remove(struct tipc_sock *tsk); > static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t > dsz); > static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz); > > +/* Forward to CONNECTING socket if it is already being spawned */ > +static bool __tipc_sk_spawned_enqueue(struct sock *sk, struct sk_buff *skb); > + > static const struct proto_ops packet_ops; > static const struct proto_ops stream_ops; > static const struct proto_ops msg_ops; > @@ -1310,7 +1313,10 @@ static int __tipc_sendmsg(struct socket *sock, struct > msghdr *m, size_t dlen) > if (sk->sk_state == TIPC_LISTEN) > return -EPIPE; > if (sk->sk_state != TIPC_OPEN) > - return -EISCONN; > + if (sk->sk_state != TIPC_CONNECTING || > + (msg_follow_up(hdr) != FOLLOW_UP_MSG)) > + return -EISCONN; > + > if (tsk->published) > return -EOPNOTSUPP; > if (dest->addrtype == TIPC_ADDR_NAME) { > @@ -1416,6 +1422,7 @@ static int __tipc_sendstream(struct socket *sock, > struct msghdr *m, size_t dlen) > > /* Handle implicit connection setup */ > if (unlikely(dest)) { > + msg_set_follow_up(hdr, FOLLOW_UP_MSG); > rc = __tipc_sendmsg(sock, m, dlen); > if (dlen && (dlen == rc)) > tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr)); > @@ -2079,6 +2086,37 @@ static unsigned int rcvbuf_limit(struct sock *sk, > struct sk_buff *skb) > return FLOWCTL_MSG_LIM; > } > > +static bool __tipc_sk_spawned_enqueue(struct sock *sk, struct sk_buff *skb) > +{ > + struct tipc_msg *hdr = buf_msg(skb); > + struct net *net = sock_net(sk); > + struct tipc_sock *tsk; > + struct sock *spawned_sk; > + u32 portid = 0; > + > + if (sk->sk_state == TIPC_LISTEN && > + (msg_follow_up(hdr) == FOLLOW_UP_MSG)) { > + // Lookup spawned socket in net No, the above comment format doesn't match Linux kernel code style. Please change it as /* ... */ > + portid = tipc_node_lkup_conn(net, msg_orignode(hdr), > + msg_origport(hdr)); > + if (portid) { > + tsk = tipc_sk_lookup(net, portid); > + if (likely(tsk)) { > + spawned_sk = &tsk->sk; > + if > (likely(spin_trylock_bh(&spawned_sk->sk_lock.slock))) { > + > __skb_queue_tail(&spawned_sk->sk_receive_queue, skb); > + skb_set_owner_r(skb, spawned_sk); > + spawned_sk->sk_data_ready(spawned_sk); > + > spin_unlock_bh(&spawned_sk->sk_lock.slock); > + } > + sock_put(spawned_sk); > + return true; > + } > + } > + } > + return false; > +} > + > /** > * tipc_sk_filter_rcv - validate incoming message > * @sk: socket > @@ -2114,6 +2152,7 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct > sk_buff *skb, > /* Validate and add to receive buffer if there is space */ > while ((skb = __skb_dequeue(&inputq))) { > hdr = buf_msg(skb); > + > limit = rcvbuf_limit(sk, skb); > if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) || > (!sk_conn && msg_connected(hdr)) || > @@ -2129,9 +2168,14 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct > sk_buff *skb, > err = TIPC_OK; > continue; > } > - __skb_queue_tail(&sk->sk_receive_queue, skb); > - skb_set_owner_r(skb, sk); > - sk->sk_data_ready(sk); > + > + /* Redirect to spawned socket (CONNECTING/ESTABLISHED) > + * if this skb is sent together with connection setup message. > */ > + if (!__tipc_sk_spawned_enqueue(sk, skb)) { As this is a very hot path, I don't think this is a good idea to go through so many branches for each message in _tipc_sk_spawned_enqueue(), which is too expensive. > + __skb_queue_tail(&sk->sk_receive_queue, skb); > + skb_set_owner_r(skb, sk); > + sk->sk_data_ready(sk); > + } > } > } > > @@ -2443,6 +2487,9 @@ static int tipc_accept(struct socket *sock, struct > socket *new_sock, int flags, > struct sk_buff *buf; > struct tipc_sock *new_tsock; > struct tipc_msg *msg; > + u32 peer_port = 0; > + u32 peer_node = 0; > + u32 portid = 0; > long timeo; > int res; > > @@ -2458,15 +2505,24 @@ static int tipc_accept(struct socket *sock, struct > socket *new_sock, int flags, > goto exit; > > buf = skb_peek(&sk->sk_receive_queue); > + msg = buf_msg(buf); > + > + /* > + * Forward to CONNECTING socket if it is already being spawned > + */ Change it as: /*Forward to CONNECTING socket if it is already being spawned*/ > + if (__tipc_sk_spawned_enqueue(sk, buf)) { > + __skb_dequeue(&sk->sk_receive_queue); > + goto exit; > + } > > res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, kern); > if (res) > goto exit; > + > security_sk_clone(sock->sk, new_sock->sk); > > new_sk = new_sock->sk; > new_tsock = tipc_sk(new_sk); > - msg = buf_msg(buf); > > /* we lock on new_sk; but lockdep sees the lock on sk */ > lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING); > @@ -2499,7 +2555,25 @@ static int tipc_accept(struct socket *sock, struct > socket *new_sock, int flags, > __skb_dequeue(&sk->sk_receive_queue); > __skb_queue_head(&new_sk->sk_receive_queue, buf); > skb_set_owner_r(buf, new_sk); > + > + /* > + * If there are more other data messages follow with > + * connection setup message, enqueue on new socket. > + */ > + peer_port = tsk_peer_port(new_tsock); > + peer_node = tsk_peer_node(new_tsock); > + portid = msg_destport(msg); > + while ((buf = tipc_skb_dequeue(&sk->sk_receive_queue, portid)) > != NULL) { > + if ((msg_orignode(buf_msg(buf)) == peer_node) && > + (msg_origport(buf_msg(buf)) == peer_port)) { > + __skb_queue_tail(&new_sk->sk_receive_queue, > buf); > + skb_set_owner_r(buf, new_sk); > + } else { > + __skb_queue_tail(&sk->sk_receive_queue, buf); > + } > + } > } > + > release_sock(new_sk); > exit: > release_sock(sk); > ------------------------------------------------------------------------------ Check out the vibrant tech community on one of the world's most engaging tech sites, Slashdot.org! http://sdm.link/slashdot _______________________________________________ tipc-discussion mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/tipc-discussion
