On Fri, 30 Oct 2015, Sebastian Andrzej Siewior wrote:
> This was just compile tested. It would be nice if somone with real
> workload could test it. Otherwise I could hack something myself and check
> if it still works.

ltp and glibc should have tests at least at the functional level
 
> -static void expunge_all(struct msg_queue *msq, int res)
> +static void expunge_all(struct msg_queue *msq, int res,
> +                     struct wake_q_head *wake_q)
>  {
>       struct msg_receiver *msr, *t;
>  
>       list_for_each_entry_safe(msr, t, &msq->q_receivers, r_list) {
> -             msr->r_msg = NULL; /* initialize expunge ordering */

Don't we need to keep that NULL init? I might be missing something.

> -             wake_up_process(msr->r_tsk);
> -             /*
> -              * Ensure that the wakeup is visible before setting r_msg as
> -              * the receiving end depends on it: either spinning on a nil,
> -              * or dealing with -EAGAIN cases. See lockless receive part 1
> -              * and 2 in do_msgrcv().
> -              */
> -             smp_wmb(); /* barrier (B) */

Please keep the comment intact and fix it as it documents the mechanism
including the barriers. The barrier is now inside of wake_q_add().

> +
> +             wake_q_add(wake_q, msr->r_tsk);
>               msr->r_msg = ERR_PTR(res);


> -static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
> +static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg,
> +                              struct wake_q_head *wake_q)
>  {
>       struct msg_receiver *msr, *t;
>  
> @@ -577,27 +576,13 @@ static inline int pipelined_send(struct msg_queue *msq, 
> struct msg_msg *msg)
>  
>                       list_del(&msr->r_list);
>                       if (msr->r_maxsize < msg->m_ts) {
> -                             /* initialize pipelined send ordering */
> -                             msr->r_msg = NULL;

See above.

> -                             wake_up_process(msr->r_tsk);
> -                             /* barrier (B) see barrier comment below */
> -                             smp_wmb();

Please do not remove barrier documentation. Update it.

> +                             wake_q_add(wake_q, msr->r_tsk);
>                               msr->r_msg = ERR_PTR(-E2BIG);
>                       } else {
> -                             msr->r_msg = NULL;
>                               msq->q_lrpid = task_pid_vnr(msr->r_tsk);
>                               msq->q_rtime = get_seconds();
> -                             wake_up_process(msr->r_tsk);
> -                             /*
> -                              * Ensure that the wakeup is visible before
> -                              * setting r_msg, as the receiving can otherwise
> -                              * exit - once r_msg is set, the receiver can
> -                              * continue. See lockless receive part 1 and 2
> -                              * in do_msgrcv(). Barrier (B).
> -                              */

Ditto.

> -                             smp_wmb();
> +                             wake_q_add(wake_q, msr->r_tsk);
>                               msr->r_msg = msg;
> -
>                               return 1;
>                       }
>               }

> @@ -932,57 +919,25 @@ long do_msgrcv(int msqid, void __user *buf, size_t 
> bufsz, long msgtyp, int msgfl
>               rcu_read_lock();
>  
>               /* Lockless receive, part 2:
> -              * Wait until pipelined_send or expunge_all are outside of
> -              * wake_up_process(). There is a race with exit(), see
> -              * ipc/mqueue.c for the details. The correct serialization
> -              * ensures that a receiver cannot continue without the wakeup
> -              * being visibible _before_ setting r_msg:
> +              * The work in pipelined_send() and expunge_all():
> +              * - Set pointer to message
> +              * - Queue the receiver task for later wakeup
> +              * - Wake up the process after the lock is dropped.
>                *
> -              * CPU 0                             CPU 1
> -              * <loop receiver>
> -              *   smp_rmb(); (A) <-- pair -.      <waker thread>
> -              *   <load ->r_msg>           |        msr->r_msg = NULL;
> -              *                            |        wake_up_process();
> -              * <continue>                 `------> smp_wmb(); (B)
> -              *                                     msr->r_msg = msg;
> -              *
> -              * Where (A) orders the message value read and where (B) orders
> -              * the write to the r_msg -- done in both pipelined_send and
> -              * expunge_all.
> +              * Should the process wake up before this wakeup (due to a
> +              * signal) it will either see the message and continue …

Please update the barrier documentation so it matches the new code.

>                */
> -             for (;;) {
> -                     /*
> -                      * Pairs with writer barrier in pipelined_send
> -                      * or expunge_all.
> -                      */
> -                     smp_rmb(); /* barrier (A) */
> -                     msg = (struct msg_msg *)msr_d.r_msg;
> -                     if (msg)
> -                             break;
>  
> -                     /*
> -                      * The cpu_relax() call is a compiler barrier
> -                      * which forces everything in this loop to be
> -                      * re-loaded.
> -                      */
> -                     cpu_relax();
> -             }

Are you sure that this is not longer required? If so then please
explain and update the documentation accordingly.

Thanks,

        tglx

Reply via email to