The commit adds the new trace_events for TIPC socket object: trace_tipc_sk_create() trace_tipc_sk_sendmsg() trace_tipc_sk_sendmcast() trace_tipc_sk_sendstream() trace_tipc_sk_filter_rcv() trace_tipc_sk_advance_rx() trace_tipc_sk_rej_msg() trace_tipc_sk_drop_msg() trace_tipc_sk_release() trace_tipc_sk_shutdown() trace_tipc_sk_overlimit1() trace_tipc_sk_overlimit2()
Also, enables the traces for the following cases: - When user creates a TIPC socket; - When user sends a dgram/mcast/stream message. - When a message is put into the socket 'sk_receive_queue'; - When a message is released from the socket 'sk_receive_queue'; - When a message is rejected (e.g. due to no port, invalid, etc.); - When a message is dropped (e.g. due to peer port not connected); - When socket is released; - When socket is shutdown; - When socket rcvq's allocation is overlimit (> 90%); - When socket rcvq + bklq's allocation is overlimit (> 90%); - When the 'TIPC_ERR_OVERLOAD/2' issue happens; Note: The 'tipc_sk_overlimit1/2' event is a conditional trace_event which happens when the socket receive queue (and backlog queue) is about to be overloaded, when the queue allocation is > 90%. Then, when the trace is enabled, the last skbs leading to the TIPC_ERR_OVERLOAD/2 issue can be traced. The trace event is designed as an 'upper watermark' notification that the other traces (e.g. 'tipc_sk_advance_rx' vs 'tipc_sk_filter_rcv') or actions can be triggerred in the meanwhile to see what is going on with the socket queue. In addition, the 'trace_tipc_sk_dump()' is also placed at the 'TIPC_ERR_OVERLOAD/2' case, so the socket and last skb can be dumped for post-analysis. Signed-off-by: Tuong Lien <[email protected]> --- net/tipc/socket.c | 74 ++++++++++++++++++++++++++++++++++++++++++++++++++++--- net/tipc/socket.h | 2 ++ net/tipc/trace.h | 19 ++++++++++++++ 3 files changed, 91 insertions(+), 4 deletions(-) diff --git a/net/tipc/socket.c b/net/tipc/socket.c index d70c26908836..f8d9c2349387 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -119,6 +119,13 @@ struct tipc_sock { bool group_is_open; }; +/* Error message prefixes + */ +static const char *sk_overlim = "warning: rx queues > 90 allocated!"; +static const char *sk_overlim2 = "warning: rcvq > 90 allocated!"; +static const char *sk_conges = "error: rx queues congested!"; +static const char *sk_conges2 = "error: rcvq congested!"; + static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb); static void tipc_data_ready(struct sock *sk); static void tipc_write_space(struct sock *sk); @@ -234,6 +241,7 @@ static u16 tsk_inc(struct tipc_sock *tsk, int msglen) */ static void tsk_advance_rx_queue(struct sock *sk) { + trace_tipc_sk_advance_rx(sk, NULL, TIPC_DUMP_SK_RCVQ, "rx advancing"); kfree_skb(__skb_dequeue(&sk->sk_receive_queue)); } @@ -262,8 +270,10 @@ static void tsk_rej_rx_queue(struct sock *sk) { struct sk_buff *skb; - while ((skb = __skb_dequeue(&sk->sk_receive_queue))) + while ((skb = __skb_dequeue(&sk->sk_receive_queue))) { + trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_NONE, "rx rejected!"); tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT); + } } static bool tipc_sk_connected(struct sock *sk) @@ -483,6 +493,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock, tsk_set_unreliable(tsk, true); } + trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " "); return 0; } @@ -572,6 +583,7 @@ static int tipc_release(struct socket *sock) tsk = tipc_sk(sk); lock_sock(sk); + trace_tipc_sk_release(sk, NULL, TIPC_DUMP_ALL, " "); __tipc_shutdown(sock, TIPC_ERR_NO_PORT); sk->sk_shutdown = SHUTDOWN_MASK; tipc_sk_leave(tsk); @@ -803,6 +815,7 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, /* Build message as chain of buffers */ skb_queue_head_init(&pkts); rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts); + trace_tipc_sk_sendmcast(sk, skb_peek(&pkts), TIPC_DUMP_SK_SNDQ, " "); /* Send message if build was successful */ if (unlikely(rc == dlen)) @@ -1209,8 +1222,11 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb, bool conn_cong; /* Ignore if connection cannot be validated: */ - if (!tsk_peer_msg(tsk, hdr)) + if (!tsk_peer_msg(tsk, hdr)) { + trace_tipc_sk_drop_msg(sk, skb, TIPC_DUMP_NONE, + "peer no conn!"); goto exit; + } if (unlikely(msg_errcode(hdr))) { tipc_set_sk_state(sk, TIPC_DISCONNECTING); @@ -1378,6 +1394,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) if (unlikely(syn && !tipc_msg_skb_clone(&pkts, &sk->sk_write_queue))) return -ENOMEM; + trace_tipc_sk_sendmsg(sk, skb_peek(&pkts), TIPC_DUMP_SK_SNDQ, " "); rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid); if (unlikely(rc == -ELINKCONG)) { tipc_dest_push(clinks, dnode, 0); @@ -1452,6 +1469,8 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen) send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE); rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts); + trace_tipc_sk_sendstream(sk, skb_peek(&pkts), + TIPC_DUMP_SK_SNDQ, " "); if (unlikely(rc != send)) break; @@ -2129,6 +2148,8 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, struct sk_buff_head inputq; int limit, err = TIPC_OK; + trace_tipc_sk_filter_rcv(sk, skb, TIPC_DUMP_ALL, "rcvq enqueuing"); + TIPC_SKB_CB(skb)->bytes_read = 0; __skb_queue_head_init(&inputq); __skb_queue_tail(&inputq, skb); @@ -2145,9 +2166,12 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, limit = rcvbuf_limit(sk, skb); if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) || (!sk_conn && msg_connected(hdr)) || - (!grp && msg_in_group(hdr))) + (!grp && msg_in_group(hdr))) { + trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_NONE, + "msg invalid!"); err = TIPC_ERR_NO_PORT; - else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit) { + } else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit) { + trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL, sk_conges2); atomic_inc(&sk->sk_drops); err = TIPC_ERR_OVERLOAD; } @@ -2157,6 +2181,8 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, err = TIPC_OK; continue; } + + trace_tipc_sk_overlimit2(sk, skb, TIPC_DUMP_ALL, sk_overlim2); __skb_queue_tail(&sk->sk_receive_queue, skb); skb_set_owner_r(skb, sk); sk->sk_data_ready(sk); @@ -2224,9 +2250,12 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, if (!sk->sk_backlog.len) atomic_set(dcnt, 0); lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt); + trace_tipc_sk_overlimit1(sk, skb, TIPC_DUMP_ALL, sk_overlim); + if (likely(!sk_add_backlog(sk, skb, lim))) continue; + trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL, sk_conges); /* Overload => reject message back to sender */ onode = tipc_own_addr(sock_net(sk)); atomic_inc(&sk->sk_drops); @@ -2277,6 +2306,7 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq) if (tipc_msg_lookup_dest(net, skb, &err)) goto xmit; + trace_tipc_sk_rej_msg(NULL, skb, TIPC_DUMP_NONE, "no port!"); /* Prepare for message rejection */ if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err)) continue; @@ -2553,6 +2583,7 @@ static int tipc_shutdown(struct socket *sock, int how) lock_sock(sk); + trace_tipc_sk_shutdown(sk, NULL, TIPC_DUMP_ALL, "conn: shutdown!"); __tipc_shutdown(sock, TIPC_CONN_SHUTDOWN); sk->sk_shutdown = SEND_SHUTDOWN; @@ -3572,6 +3603,41 @@ u32 tipc_sock_get_portid(struct sock *sk) } /** + * tipc_sk_overlimit1 - check if socket rx queue is about to be overloaded, + * both the rcv and backlog queues are considered + * @sk: tipc sk to be checked + * @skb: tipc msg to be checked + * + * Returns true if the socket rx queue allocation is > 90%, otherwise false + */ + +bool tipc_sk_overlimit1(struct sock *sk, struct sk_buff *skb) +{ + atomic_t *dcnt = &tipc_sk(sk)->dupl_rcvcnt; + unsigned int lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt); + unsigned int qsize = sk->sk_backlog.len + sk_rmem_alloc_get(sk); + + return (qsize > lim * 90 / 100); +} + +/** + * tipc_sk_overlimit2 - check if socket rx queue is about to be overloaded, + * only the rcv queue is considered + * @sk: tipc sk to be checked + * @skb: tipc msg to be checked + * + * Returns true if the socket rx queue allocation is > 90%, otherwise false + */ + +bool tipc_sk_overlimit2(struct sock *sk, struct sk_buff *skb) +{ + unsigned int lim = rcvbuf_limit(sk, skb); + unsigned int qsize = sk_rmem_alloc_get(sk); + + return (qsize > lim * 90 / 100); +} + +/** * tipc_sk_dump - dump TIPC socket * @sk: tipc sk to be dumped * @dqueues: bitmask to decide if any socket queue to be dumped? diff --git a/net/tipc/socket.h b/net/tipc/socket.h index 07e36545b696..235b9679acee 100644 --- a/net/tipc/socket.h +++ b/net/tipc/socket.h @@ -72,5 +72,7 @@ int tipc_dump_start(struct netlink_callback *cb); int __tipc_dump_start(struct netlink_callback *cb, struct net *net); int tipc_dump_done(struct netlink_callback *cb); u32 tipc_sock_get_portid(struct sock *sk); +bool tipc_sk_overlimit1(struct sock *sk, struct sk_buff *skb); +bool tipc_sk_overlimit2(struct sock *sk, struct sk_buff *skb); #endif diff --git a/net/tipc/trace.h b/net/tipc/trace.h index 1059fe35f343..9f70156690bf 100644 --- a/net/tipc/trace.h +++ b/net/tipc/trace.h @@ -202,6 +202,25 @@ DEFINE_EVENT(tipc_sk_class, name, \ const char *header), \ TP_ARGS(sk, skb, dqueues, header)) DEFINE_SK_EVENT(tipc_sk_dump); +DEFINE_SK_EVENT(tipc_sk_create); +DEFINE_SK_EVENT(tipc_sk_sendmcast); +DEFINE_SK_EVENT(tipc_sk_sendmsg); +DEFINE_SK_EVENT(tipc_sk_sendstream); +DEFINE_SK_EVENT(tipc_sk_filter_rcv); +DEFINE_SK_EVENT(tipc_sk_advance_rx); +DEFINE_SK_EVENT(tipc_sk_rej_msg); +DEFINE_SK_EVENT(tipc_sk_drop_msg); +DEFINE_SK_EVENT(tipc_sk_release); +DEFINE_SK_EVENT(tipc_sk_shutdown); + +#define DEFINE_SK_EVENT_COND(name, cond) \ +DEFINE_EVENT_CONDITION(tipc_sk_class, name, \ + TP_PROTO(struct sock *sk, struct sk_buff *skb, u16 dqueues, \ + const char *header), \ + TP_ARGS(sk, skb, dqueues, header), \ + TP_CONDITION(cond)) +DEFINE_SK_EVENT_COND(tipc_sk_overlimit1, tipc_sk_overlimit1(sk, skb)); +DEFINE_SK_EVENT_COND(tipc_sk_overlimit2, tipc_sk_overlimit2(sk, skb)); DECLARE_EVENT_CLASS(tipc_link_class, -- 2.13.7 _______________________________________________ tipc-discussion mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/tipc-discussion
