On Tue, 03 Nov 2015, Sebastian Andrzej Siewior wrote:

@@ -577,26 +570,23 @@ static inline int pipelined_send(struct msg_queue *msq, 
struct msg_msg *msg)

                        list_del(&msr->r_list);
                        if (msr->r_maxsize < msg->m_ts) {
-                               /* initialize pipelined send ordering */
-                               msr->r_msg = NULL;
-                               wake_up_process(msr->r_tsk);
-                               /* barrier (B) see barrier comment below */
-                               smp_wmb();
+                               wake_q_add(wake_q, msr->r_tsk);
                                msr->r_msg = ERR_PTR(-E2BIG);
                        } else {
-                               msr->r_msg = NULL;
                                msq->q_lrpid = task_pid_vnr(msr->r_tsk);
                                msq->q_rtime = get_seconds();
-                               wake_up_process(msr->r_tsk);
-                               /*
-                                * Ensure that the wakeup is visible before
-                                * setting r_msg, as the receiving can otherwise
-                                * exit - once r_msg is set, the receiver can
-                                * continue. See lockless receive part 1 and 2
-                                * in do_msgrcv(). Barrier (B).
-                                */
-                               smp_wmb();
+                               wake_q_add(wake_q, msr->r_tsk);
                                msr->r_msg = msg;
+                               /*
+                                * Rely on the implicit cmpxchg barrier from
+                                * wake_q_add such that we can ensure that
+                                * updating msr->r_msg is the last write
+                                * operation: As once set, the receiver can
+                                * continue, and if we don't have the reference
+                                * count from the wake_q, yet, at that point we
+                                * can later have a use-after-free condition and
+                                * bogus wakeup.
+                                */

Not sure why you placed the comment here. Why not between smp_wmb() and the 
r_msg
write as we have it?

You might also want to add a reference to this comment in expunge_all(), which
does the same thing.

[...]

                /* Lockless receive, part 2:
-                * Wait until pipelined_send or expunge_all are outside of
-                * wake_up_process(). There is a race with exit(), see
-                * ipc/mqueue.c for the details. The correct serialization
-                * ensures that a receiver cannot continue without the wakeup
-                * being visibible _before_ setting r_msg:
+                * The work in pipelined_send() and expunge_all():
+                * - Set pointer to message
+                * - Queue the receiver task for later wakeup
+                * - Wake up the process after the lock is dropped.
                 *
-                * CPU 0                             CPU 1
-                * <loop receiver>
-                *   smp_rmb(); (A) <-- pair -.      <waker thread>
-                *   <load ->r_msg>           |        msr->r_msg = NULL;
-                *                            |        wake_up_process();
-                * <continue>                 `------> smp_wmb(); (B)
-                *                                     msr->r_msg = msg;
-                *
-                * Where (A) orders the message value read and where (B) orders
-                * the write to the r_msg -- done in both pipelined_send and
-                * expunge_all.
+                * Should the process wake up before this wakeup (due to a
+                * signal) it will either see the message and continue ...
                 */
-               for (;;) {
-                       /*
-                        * Pairs with writer barrier in pipelined_send
-                        * or expunge_all.
-                        */
-                       smp_rmb(); /* barrier (A) */
-                       msg = (struct msg_msg *)msr_d.r_msg;
-                       if (msg)
-                               break;

-                       /*
-                        * The cpu_relax() call is a compiler barrier
-                        * which forces everything in this loop to be
-                        * re-loaded.
-                        */
-                       cpu_relax();
-               }
-
-               /* Lockless receive, part 3:
-                * If there is a message or an error then accept it without
-                * locking.
-                */
+               msg = msr_d.r_msg;

But you're getting rid of the barrier pairing (smp_rmb) we have in pipelined 
sends
and expunge_all, which is necesary even if we don't busy wait on nil. Likewise,
there's no need to remove the comment above that illustrates this.

Thanks,
Davidlohr
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to [email protected]
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Reply via email to