Hello Artur,
On 5/23/20 10:34 PM, Artur Barsegyan wrote:
Take into account the total size of the already enqueued messages of
previously handled senders before another one.
Otherwise, we have serious degradation of receiver throughput for
case with multiple senders because another sender wakes up,
checks the queue capacity and falls into sleep again.
Each round-trip wastes CPU time a lot and leads to perceptible
throughput degradation.
Source code of:
- sender/receiver
- benchmark script
- ready graphics of before/after results
is located here: https://github.com/artur-barsegyan/systemv_queue_research
Thanks for analyzing the issue!
Signed-off-by: Artur Barsegyan <a.barsegya...@gmail.com>
---
ipc/msg.c | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/ipc/msg.c b/ipc/msg.c
index caca67368cb5..52d634b0a65a 100644
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -214,6 +214,7 @@ static void ss_wakeup(struct msg_queue *msq,
struct msg_sender *mss, *t;
struct task_struct *stop_tsk = NULL;
struct list_head *h = &msq->q_senders;
+ size_t msq_quota_used = 0;
list_for_each_entry_safe(mss, t, h, list) {
if (kill)
@@ -233,7 +234,7 @@ static void ss_wakeup(struct msg_queue *msq,
* move the sender to the tail on behalf of the
* blocked task.
*/
- else if (!msg_fits_inqueue(msq, mss->msgsz)) {
+ else if (!msg_fits_inqueue(msq, msq_quota_used + mss->msgsz)) {
if (!stop_tsk)
stop_tsk = mss->tsk;
@@ -241,6 +242,7 @@ static void ss_wakeup(struct msg_queue *msq,
continue;
}
+ msq_quota_used += mss->msgsz;
wake_q_add(wake_q, mss->tsk);
You have missed the case of a do_msgsnd() that doesn't enqueue the message:
Situation:
- 2 messages of type 1 in the queue (2x8192 bytes, queue full)
- 6 senders waiting to send messages of type 2
- 6 receivers waiting to get messages of type 2.
If now a receiver reads one message of type 1, then all 6 senders can send.
WIth your patch applied, only one sender sends the message to one
receiver, and the remaining 10 tasks continue to sleep.
Could you please check and (assuming that you agree) run your benchmarks
with the patch applied?
--
Manfred
>From fe2f257b1950a19bf5c6f67e71aa25c2f13bcdc3 Mon Sep 17 00:00:00 2001
From: Manfred Spraul <manf...@colorfullife.com>
Date: Sun, 24 May 2020 14:47:31 +0200
Subject: [PATCH 2/2] ipc/msg.c: Handle case of senders not enqueuing the
message
The patch "ipc/msg.c: wake up senders until there is a queue empty
capacity" avoids the thundering herd problem by wakeing up
only as many potential senders as there is free space in the queue.
This patch is a fix: If one of the senders doesn't enqueue its message,
then a search for further potential senders must be performed.
Signed-off-by: Manfred Spraul <manf...@colorfullife.com>
---
ipc/msg.c | 21 +++++++++++++++++++++
1 file changed, 21 insertions(+)
diff --git a/ipc/msg.c b/ipc/msg.c
index 52d634b0a65a..f6d5188db38a 100644
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -208,6 +208,12 @@ static inline void ss_del(struct msg_sender *mss)
list_del(&mss->list);
}
+/*
+ * ss_wakeup() assumes that the stored senders will enqueue the pending message.
+ * Thus: If a woken up task doesn't send the enqueued message for whatever
+ * reason, then that task must call ss_wakeup() again, to ensure that no
+ * wakeup is lost.
+ */
static void ss_wakeup(struct msg_queue *msq,
struct wake_q_head *wake_q, bool kill)
{
@@ -843,6 +849,7 @@ static long do_msgsnd(int msqid, long mtype, void __user *mtext,
struct msg_queue *msq;
struct msg_msg *msg;
int err;
+ bool need_wakeup;
struct ipc_namespace *ns;
DEFINE_WAKE_Q(wake_q);
@@ -869,6 +876,7 @@ static long do_msgsnd(int msqid, long mtype, void __user *mtext,
ipc_lock_object(&msq->q_perm);
+ need_wakeup = false;
for (;;) {
struct msg_sender s;
@@ -898,6 +906,13 @@ static long do_msgsnd(int msqid, long mtype, void __user *mtext,
/* enqueue the sender and prepare to block */
ss_add(msq, &s, msgsz);
+ /* Enqueuing a sender is actually an obligation:
+ * The sender must either enqueue the message, or call
+ * ss_wakeup(). Thus track that we have added our message
+ * to the candidates for the message queue.
+ */
+ need_wakeup = true;
+
if (!ipc_rcu_getref(&msq->q_perm)) {
err = -EIDRM;
goto out_unlock0;
@@ -935,12 +950,18 @@ static long do_msgsnd(int msqid, long mtype, void __user *mtext,
msq->q_qnum++;
atomic_add(msgsz, &ns->msg_bytes);
atomic_inc(&ns->msg_hdrs);
+
+ /* we have fulfilled our obligation, no need for wakeup */
+ need_wakeup = false;
}
err = 0;
msg = NULL;
out_unlock0:
+ if (need_wakeup)
+ ss_wakeup(msq, &wake_q, false);
+
ipc_unlock_object(&msq->q_perm);
wake_up_q(&wake_q);
out_unlock1:
--
2.26.2