The commit adds the new trace_events for TIPC socket object:

trace_tipc_sk_create()
trace_tipc_sk_sendmsg()
trace_tipc_sk_sendmcast()
trace_tipc_sk_sendstream()
trace_tipc_sk_filter_rcv()
trace_tipc_sk_advance_rx()
trace_tipc_sk_rej_msg()
trace_tipc_sk_drop_msg()
trace_tipc_sk_release()
trace_tipc_sk_shutdown()
trace_tipc_sk_overlimit1()
trace_tipc_sk_overlimit2()

Also, enables the traces for the following cases:
- When user creates a TIPC socket;
- When user sends a dgram/mcast/stream message.
- When a message is put into the socket 'sk_receive_queue';
- When a message is released from the socket 'sk_receive_queue';
- When a message is rejected (e.g. due to no port, invalid, etc.);
- When a message is dropped (e.g. due to peer port not connected);
- When socket is released;
- When socket is shutdown;
- When socket rcvq's allocation is overlimit (> 90%);
- When socket rcvq + bklq's allocation is overlimit (> 90%);
- When the 'TIPC_ERR_OVERLOAD/2' issue happens;

Note:
The 'tipc_sk_overlimit1/2' event is a conditional trace_event which
happens when the socket receive queue (and backlog queue) is about to
be overloaded, when the queue allocation is > 90%. Then, when the trace
is enabled, the last skbs leading to the TIPC_ERR_OVERLOAD/2 issue can
be traced.

The trace event is designed as an 'upper watermark' notification that
the other traces (e.g. 'tipc_sk_advance_rx' vs 'tipc_sk_filter_rcv')
or actions can be triggerred in the meanwhile to see what is going on
with the socket queue.

In addition, the 'trace_tipc_sk_dump()' is also placed at the
'TIPC_ERR_OVERLOAD/2' case, so the socket and last skb can be dumped
for post-analysis.

Signed-off-by: Tuong Lien <[email protected]>
---
 net/tipc/socket.c | 74 ++++++++++++++++++++++++++++++++++++++++++++++++++++---
 net/tipc/socket.h |  2 ++
 net/tipc/trace.h  | 19 ++++++++++++++
 3 files changed, 91 insertions(+), 4 deletions(-)

diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index d70c26908836..f8d9c2349387 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -119,6 +119,13 @@ struct tipc_sock {
        bool group_is_open;
 };
 
+/* Error message prefixes
+ */
+static const char *sk_overlim = "warning: rx queues > 90 allocated!";
+static const char *sk_overlim2 = "warning: rcvq > 90 allocated!";
+static const char *sk_conges = "error: rx queues congested!";
+static const char *sk_conges2 = "error: rcvq congested!";
+
 static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb);
 static void tipc_data_ready(struct sock *sk);
 static void tipc_write_space(struct sock *sk);
@@ -234,6 +241,7 @@ static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
  */
 static void tsk_advance_rx_queue(struct sock *sk)
 {
+       trace_tipc_sk_advance_rx(sk, NULL, TIPC_DUMP_SK_RCVQ, "rx advancing");
        kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
 }
 
@@ -262,8 +270,10 @@ static void tsk_rej_rx_queue(struct sock *sk)
 {
        struct sk_buff *skb;
 
-       while ((skb = __skb_dequeue(&sk->sk_receive_queue)))
+       while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
+               trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_NONE, "rx rejected!");
                tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT);
+       }
 }
 
 static bool tipc_sk_connected(struct sock *sk)
@@ -483,6 +493,7 @@ static int tipc_sk_create(struct net *net, struct socket 
*sock,
                        tsk_set_unreliable(tsk, true);
        }
 
+       trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " ");
        return 0;
 }
 
@@ -572,6 +583,7 @@ static int tipc_release(struct socket *sock)
        tsk = tipc_sk(sk);
        lock_sock(sk);
 
+       trace_tipc_sk_release(sk, NULL, TIPC_DUMP_ALL, " ");
        __tipc_shutdown(sock, TIPC_ERR_NO_PORT);
        sk->sk_shutdown = SHUTDOWN_MASK;
        tipc_sk_leave(tsk);
@@ -803,6 +815,7 @@ static int tipc_sendmcast(struct  socket *sock, struct 
tipc_name_seq *seq,
        /* Build message as chain of buffers */
        skb_queue_head_init(&pkts);
        rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
+       trace_tipc_sk_sendmcast(sk, skb_peek(&pkts), TIPC_DUMP_SK_SNDQ, " ");
 
        /* Send message if build was successful */
        if (unlikely(rc == dlen))
@@ -1209,8 +1222,11 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock 
*tsk, struct sk_buff *skb,
        bool conn_cong;
 
        /* Ignore if connection cannot be validated: */
-       if (!tsk_peer_msg(tsk, hdr))
+       if (!tsk_peer_msg(tsk, hdr)) {
+               trace_tipc_sk_drop_msg(sk, skb, TIPC_DUMP_NONE,
+                                      "peer no conn!");
                goto exit;
+       }
 
        if (unlikely(msg_errcode(hdr))) {
                tipc_set_sk_state(sk, TIPC_DISCONNECTING);
@@ -1378,6 +1394,7 @@ static int __tipc_sendmsg(struct socket *sock, struct 
msghdr *m, size_t dlen)
        if (unlikely(syn && !tipc_msg_skb_clone(&pkts, &sk->sk_write_queue)))
                return -ENOMEM;
 
+       trace_tipc_sk_sendmsg(sk, skb_peek(&pkts), TIPC_DUMP_SK_SNDQ, " ");
        rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
        if (unlikely(rc == -ELINKCONG)) {
                tipc_dest_push(clinks, dnode, 0);
@@ -1452,6 +1469,8 @@ static int __tipc_sendstream(struct socket *sock, struct 
msghdr *m, size_t dlen)
 
                send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
                rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts);
+               trace_tipc_sk_sendstream(sk, skb_peek(&pkts),
+                                        TIPC_DUMP_SK_SNDQ, " ");
                if (unlikely(rc != send))
                        break;
 
@@ -2129,6 +2148,8 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct 
sk_buff *skb,
        struct sk_buff_head inputq;
        int limit, err = TIPC_OK;
 
+       trace_tipc_sk_filter_rcv(sk, skb, TIPC_DUMP_ALL, "rcvq enqueuing");
+
        TIPC_SKB_CB(skb)->bytes_read = 0;
        __skb_queue_head_init(&inputq);
        __skb_queue_tail(&inputq, skb);
@@ -2145,9 +2166,12 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct 
sk_buff *skb,
                limit = rcvbuf_limit(sk, skb);
                if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) ||
                    (!sk_conn && msg_connected(hdr)) ||
-                   (!grp && msg_in_group(hdr)))
+                   (!grp && msg_in_group(hdr))) {
+                       trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_NONE,
+                                             "msg invalid!");
                        err = TIPC_ERR_NO_PORT;
-               else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit) {
+               } else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit) {
+                       trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL, sk_conges2);
                        atomic_inc(&sk->sk_drops);
                        err = TIPC_ERR_OVERLOAD;
                }
@@ -2157,6 +2181,8 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct 
sk_buff *skb,
                        err = TIPC_OK;
                        continue;
                }
+
+               trace_tipc_sk_overlimit2(sk, skb, TIPC_DUMP_ALL, sk_overlim2);
                __skb_queue_tail(&sk->sk_receive_queue, skb);
                skb_set_owner_r(skb, sk);
                sk->sk_data_ready(sk);
@@ -2224,9 +2250,12 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, 
struct sock *sk,
                if (!sk->sk_backlog.len)
                        atomic_set(dcnt, 0);
                lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
+               trace_tipc_sk_overlimit1(sk, skb, TIPC_DUMP_ALL, sk_overlim);
+
                if (likely(!sk_add_backlog(sk, skb, lim)))
                        continue;
 
+               trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL, sk_conges);
                /* Overload => reject message back to sender */
                onode = tipc_own_addr(sock_net(sk));
                atomic_inc(&sk->sk_drops);
@@ -2277,6 +2306,7 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head 
*inputq)
                if (tipc_msg_lookup_dest(net, skb, &err))
                        goto xmit;
 
+               trace_tipc_sk_rej_msg(NULL, skb, TIPC_DUMP_NONE, "no port!");
                /* Prepare for message rejection */
                if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err))
                        continue;
@@ -2553,6 +2583,7 @@ static int tipc_shutdown(struct socket *sock, int how)
 
        lock_sock(sk);
 
+       trace_tipc_sk_shutdown(sk, NULL, TIPC_DUMP_ALL, "conn: shutdown!");
        __tipc_shutdown(sock, TIPC_CONN_SHUTDOWN);
        sk->sk_shutdown = SEND_SHUTDOWN;
 
@@ -3572,6 +3603,41 @@ u32 tipc_sock_get_portid(struct sock *sk)
 }
 
 /**
+ * tipc_sk_overlimit1 - check if socket rx queue is about to be overloaded,
+ *                     both the rcv and backlog queues are considered
+ * @sk: tipc sk to be checked
+ * @skb: tipc msg to be checked
+ *
+ * Returns true if the socket rx queue allocation is > 90%, otherwise false
+ */
+
+bool tipc_sk_overlimit1(struct sock *sk, struct sk_buff *skb)
+{
+       atomic_t *dcnt = &tipc_sk(sk)->dupl_rcvcnt;
+       unsigned int lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
+       unsigned int qsize = sk->sk_backlog.len + sk_rmem_alloc_get(sk);
+
+       return (qsize > lim * 90 / 100);
+}
+
+/**
+ * tipc_sk_overlimit2 - check if socket rx queue is about to be overloaded,
+ *                     only the rcv queue is considered
+ * @sk: tipc sk to be checked
+ * @skb: tipc msg to be checked
+ *
+ * Returns true if the socket rx queue allocation is > 90%, otherwise false
+ */
+
+bool tipc_sk_overlimit2(struct sock *sk, struct sk_buff *skb)
+{
+       unsigned int lim = rcvbuf_limit(sk, skb);
+       unsigned int qsize = sk_rmem_alloc_get(sk);
+
+       return (qsize > lim * 90 / 100);
+}
+
+/**
  * tipc_sk_dump - dump TIPC socket
  * @sk: tipc sk to be dumped
  * @dqueues: bitmask to decide if any socket queue to be dumped?
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index 07e36545b696..235b9679acee 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -72,5 +72,7 @@ int tipc_dump_start(struct netlink_callback *cb);
 int __tipc_dump_start(struct netlink_callback *cb, struct net *net);
 int tipc_dump_done(struct netlink_callback *cb);
 u32 tipc_sock_get_portid(struct sock *sk);
+bool tipc_sk_overlimit1(struct sock *sk, struct sk_buff *skb);
+bool tipc_sk_overlimit2(struct sock *sk, struct sk_buff *skb);
 
 #endif
diff --git a/net/tipc/trace.h b/net/tipc/trace.h
index 1059fe35f343..9f70156690bf 100644
--- a/net/tipc/trace.h
+++ b/net/tipc/trace.h
@@ -202,6 +202,25 @@ DEFINE_EVENT(tipc_sk_class, name, \
                 const char *header), \
        TP_ARGS(sk, skb, dqueues, header))
 DEFINE_SK_EVENT(tipc_sk_dump);
+DEFINE_SK_EVENT(tipc_sk_create);
+DEFINE_SK_EVENT(tipc_sk_sendmcast);
+DEFINE_SK_EVENT(tipc_sk_sendmsg);
+DEFINE_SK_EVENT(tipc_sk_sendstream);
+DEFINE_SK_EVENT(tipc_sk_filter_rcv);
+DEFINE_SK_EVENT(tipc_sk_advance_rx);
+DEFINE_SK_EVENT(tipc_sk_rej_msg);
+DEFINE_SK_EVENT(tipc_sk_drop_msg);
+DEFINE_SK_EVENT(tipc_sk_release);
+DEFINE_SK_EVENT(tipc_sk_shutdown);
+
+#define DEFINE_SK_EVENT_COND(name, cond) \
+DEFINE_EVENT_CONDITION(tipc_sk_class, name, \
+       TP_PROTO(struct sock *sk, struct sk_buff *skb, u16 dqueues, \
+                const char *header), \
+       TP_ARGS(sk, skb, dqueues, header), \
+       TP_CONDITION(cond))
+DEFINE_SK_EVENT_COND(tipc_sk_overlimit1, tipc_sk_overlimit1(sk, skb));
+DEFINE_SK_EVENT_COND(tipc_sk_overlimit2, tipc_sk_overlimit2(sk, skb));
 
 DECLARE_EVENT_CLASS(tipc_link_class,
 
-- 
2.13.7



_______________________________________________
tipc-discussion mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to