Hello, Manfred!

Thank you, for your review. I've reviewed your patch.

I forgot about the case with different message types. At now with your patch,
a sender will force message consuming if that doesn't hold own capacity.

I have measured queue throughput and have pushed the results to:
https://github.com/artur-barsegyan/systemv_queue_research

But I'm confused about the next thought: in general loop in the do_msgsnd()
function, we doesn't check pipeline sending availability. Your case will be
optimized if we check the pipeline sending inside the loop.

On Sun, May 24, 2020 at 03:21:31PM +0200, Manfred Spraul wrote:
> Hello Artur,
> 
> On 5/23/20 10:34 PM, Artur Barsegyan wrote:
> > Take into account the total size of the already enqueued messages of
> > previously handled senders before another one.
> > 
> > Otherwise, we have serious degradation of receiver throughput for
> > case with multiple senders because another sender wakes up,
> > checks the queue capacity and falls into sleep again.
> > 
> > Each round-trip wastes CPU time a lot and leads to perceptible
> > throughput degradation.
> > 
> > Source code of:
> >     - sender/receiver
> >     - benchmark script
> >     - ready graphics of before/after results
> > 
> > is located here: https://github.com/artur-barsegyan/systemv_queue_research
> 
> Thanks for analyzing the issue!
> 
> > Signed-off-by: Artur Barsegyan <[email protected]>
> > ---
> >   ipc/msg.c | 4 +++-
> >   1 file changed, 3 insertions(+), 1 deletion(-)
> > 
> > diff --git a/ipc/msg.c b/ipc/msg.c
> > index caca67368cb5..52d634b0a65a 100644
> > --- a/ipc/msg.c
> > +++ b/ipc/msg.c
> > @@ -214,6 +214,7 @@ static void ss_wakeup(struct msg_queue *msq,
> >     struct msg_sender *mss, *t;
> >     struct task_struct *stop_tsk = NULL;
> >     struct list_head *h = &msq->q_senders;
> > +   size_t msq_quota_used = 0;
> >     list_for_each_entry_safe(mss, t, h, list) {
> >             if (kill)
> > @@ -233,7 +234,7 @@ static void ss_wakeup(struct msg_queue *msq,
> >              * move the sender to the tail on behalf of the
> >              * blocked task.
> >              */
> > -           else if (!msg_fits_inqueue(msq, mss->msgsz)) {
> > +           else if (!msg_fits_inqueue(msq, msq_quota_used + mss->msgsz)) {
> >                     if (!stop_tsk)
> >                             stop_tsk = mss->tsk;
> > @@ -241,6 +242,7 @@ static void ss_wakeup(struct msg_queue *msq,
> >                     continue;
> >             }
> > +           msq_quota_used += mss->msgsz;
> >             wake_q_add(wake_q, mss->tsk);
> 
> You have missed the case of a do_msgsnd() that doesn't enqueue the message:
> 
> Situation:
> 
> - 2 messages of type 1 in the queue (2x8192 bytes, queue full)
> 
> - 6 senders waiting to send messages of type 2
> 
> - 6 receivers waiting to get messages of type 2.
> 
> If now a receiver reads one message of type 1, then all 6 senders can send.
> 
> WIth your patch applied, only one sender sends the message to one receiver,
> and the remaining 10 tasks continue to sleep.
> 
> 
> Could you please check and (assuming that you agree) run your benchmarks
> with the patch applied?
> 
> --
> 
>     Manfred
> 
> 
> 

> From fe2f257b1950a19bf5c6f67e71aa25c2f13bcdc3 Mon Sep 17 00:00:00 2001
> From: Manfred Spraul <[email protected]>
> Date: Sun, 24 May 2020 14:47:31 +0200
> Subject: [PATCH 2/2] ipc/msg.c: Handle case of senders not enqueuing the
>  message
> 
> The patch "ipc/msg.c: wake up senders until there is a queue empty
> capacity" avoids the thundering herd problem by wakeing up
> only as many potential senders as there is free space in the queue.
> 
> This patch is a fix: If one of the senders doesn't enqueue its message,
> then a search for further potential senders must be performed.
> 
> Signed-off-by: Manfred Spraul <[email protected]>
> ---
>  ipc/msg.c | 21 +++++++++++++++++++++
>  1 file changed, 21 insertions(+)
> 
> diff --git a/ipc/msg.c b/ipc/msg.c
> index 52d634b0a65a..f6d5188db38a 100644
> --- a/ipc/msg.c
> +++ b/ipc/msg.c
> @@ -208,6 +208,12 @@ static inline void ss_del(struct msg_sender *mss)
>               list_del(&mss->list);
>  }
>  
> +/*
> + * ss_wakeup() assumes that the stored senders will enqueue the pending 
> message.
> + * Thus: If a woken up task doesn't send the enqueued message for whatever
> + * reason, then that task must call ss_wakeup() again, to ensure that no
> + * wakeup is lost.
> + */
>  static void ss_wakeup(struct msg_queue *msq,
>                     struct wake_q_head *wake_q, bool kill)
>  {
> @@ -843,6 +849,7 @@ static long do_msgsnd(int msqid, long mtype, void __user 
> *mtext,
>       struct msg_queue *msq;
>       struct msg_msg *msg;
>       int err;
> +     bool need_wakeup;
>       struct ipc_namespace *ns;
>       DEFINE_WAKE_Q(wake_q);
>  
> @@ -869,6 +876,7 @@ static long do_msgsnd(int msqid, long mtype, void __user 
> *mtext,
>  
>       ipc_lock_object(&msq->q_perm);
>  
> +     need_wakeup = false;
>       for (;;) {
>               struct msg_sender s;
>  
> @@ -898,6 +906,13 @@ static long do_msgsnd(int msqid, long mtype, void __user 
> *mtext,
>               /* enqueue the sender and prepare to block */
>               ss_add(msq, &s, msgsz);
>  
> +             /* Enqueuing a sender is actually an obligation:
> +              * The sender must either enqueue the message, or call
> +              * ss_wakeup(). Thus track that we have added our message
> +              * to the candidates for the message queue.
> +              */
> +             need_wakeup = true;
> +
>               if (!ipc_rcu_getref(&msq->q_perm)) {
>                       err = -EIDRM;
>                       goto out_unlock0;
> @@ -935,12 +950,18 @@ static long do_msgsnd(int msqid, long mtype, void 
> __user *mtext,
>               msq->q_qnum++;
>               atomic_add(msgsz, &ns->msg_bytes);
>               atomic_inc(&ns->msg_hdrs);
> +
> +             /* we have fulfilled our obligation, no need for wakeup */
> +             need_wakeup = false;
>       }
>  
>       err = 0;
>       msg = NULL;
>  
>  out_unlock0:
> +     if (need_wakeup)
> +             ss_wakeup(msq, &wake_q, false);
> +
>       ipc_unlock_object(&msq->q_perm);
>       wake_up_q(&wake_q);
>  out_unlock1:
> -- 
> 2.26.2
> 

Reply via email to