Re: [PATCH 3/3] ipc/mqueue: lockless pipelined wakeups

2015-05-04 Thread George Spelvin
> You are not wrong, but I'd rather leave the comment as is, as it will
> vary from user to user. The comments in the sched wake_q bits are
> already pretty clear, and if users cannot see the need for holding
> reference and the task disappearing on their own they have no business
> using wake_q. Furthermore, I think my comment serves better in mqueues
> as the need for it isn't immediately obvious.

Okay, but the comment is still rather awkward and hard-to-follow English.

How about:
/*
 * Rely on the implicit cmpxchg barrier from wake_q_add to
 * ensure that updating receiver->state is the last write
 * operation.  Once set, the receiver can continue, and if we
 * hadn't placed it on the wake_q (which takes a reference to
 * the task), any later use might cause a use-after-free
 * condition.
 */

Part of the confusion is that there are two different ordering conditions
that wake_q_add is involved in, and the comment above (even my version)
isn't good about explaining the distinction:

1) It, itself, must come before the receiver->state update, because
   after that, the receiver may run (and possibly exit).
2) It serves as a write barrier for all the other state writes above.

If I wanted to be clearer, I'd have to do more extensive edits:

/*
 * wake_q_add must come before updating receiver->state, since
 * that write lets the receiver continue (and possibly exit).
 * The reference count from the wake_q prevents use-after-free.
 *
 * The cmpxchg inside wake_q_add also serves as a write barrier
 * for all the other state updates that must be visible before
 * receiver->state.
 */

None of this affects the code, which is
Acked-by: George Spelvin 
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/


Re: [PATCH 3/3] ipc/mqueue: lockless pipelined wakeups

2015-05-01 Thread Davidlohr Bueso
On Fri, 2015-05-01 at 17:52 -0400, George Spelvin wrote:
> In general, Acked-by, but you're making me fix all your comments. :-)
> 
> This is a nice use of the wake queue, since the code was already handling
> the same problem in a similar way with STATE_PENDING.
> 
> >  * The receiver accepts the message and returns without grabbing the queue
> >+ * spinlock. The used algorithm is different from sysv semaphores 
> >(ipc/sem.c):
> 
> Is that last sentence even wanted?

Yeah, we can probably remove it now.

> >+ *
> >+ * - Set pointer to message.
> >+ * - Queue the receiver task's for later wakeup (without the info->lock).
> 
> It's "task" singular, and the apostrophe would be wrong if it were plural.
> 
> >+ * - Update its state to STATE_READY. Now the receiver can continue.
> >+ * - Wake up the process after the lock is dropped. Should the process wake 
> >up
> >+ *   before this wakeup (due to a timeout or a signal) it will either see
> >+ *   STATE_READY and continue or acquire the lock to check the sate again.
> 
> "check the sTate again".
> 
> >+wake_q_add(wake_q, receiver->task);
> >+/*
> >+ * Rely on the implicit cmpxchg barrier from wake_q_add such
> >+ * that we can ensure that updating receiver->state is the last
> >+ * write operation: As once set, the receiver can continue,
> >+ * and if we don't have the reference count from the wake_q,
> >+ * yet, at that point we can later have a use-after-free
> >+ * condition and bogus wakeup.
> >+ */
> > receiver->state = STATE_READY;
> 
> How about:
>   /*
>* There must be a write barrier here; setting STATE_READY
>* lets the receiver proceed without further synchronization.
>* The cmpxchg inside wake_q_add serves as the barrier here.
>*/
> 
> The need for a wake queue to take a reference to avoid use-after-free
> is generic to wake queues, and handled in generic code; I don't see why
> it needs a comment here.

You are not wrong, but I'd rather leave the comment as is, as it will
vary from user to user. The comments in the sched wake_q bits are
already pretty clear, and if users cannot see the need for holding
reference and the task disappearing on their own they have no business
using wake_q. Furthermore, I think my comment serves better in mqueues
as the need for it isn't immediately obvious.

 
> >@@ -1084,6 +1094,7 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char 
> >__user *, u_msg_ptr,
> > ktime_t expires, *timeout = NULL;
> > struct timespec ts;
> > struct posix_msg_tree_node *new_leaf = NULL;
> >+WAKE_Q(wake_q);
> > 
> > if (u_abs_timeout) {
> > int res = prepare_timeout(u_abs_timeout, &expires, &ts);
> >@@ -1155,8 +1166,9 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char 
> >__user *, u_msg_ptr,
> > CURRENT_TIME;
> > 
> > /* There is now free space in queue. */
> >-pipelined_receive(info);
> >+pipelined_receive(&wake_q, info);
> > spin_unlock(&info->lock);
> >+wake_up_q(&wake_q);
> > ret = 0;
> > }
> > if (ret == 0) {
> 
> Since WAKE_Q actually involves some initialization, would it make sense to
> move its declaration to inside the condition that needs it?
> 
> (I'm also a fan of declaring variables in the smallest scope possible,
> just on general principles.)

Agreed.

Thanks,
Davidlohr


--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/


Re: [PATCH 3/3] ipc/mqueue: lockless pipelined wakeups

2015-05-01 Thread George Spelvin
In general, Acked-by, but you're making me fix all your comments. :-)

This is a nice use of the wake queue, since the code was already handling
the same problem in a similar way with STATE_PENDING.

>  * The receiver accepts the message and returns without grabbing the queue
>+ * spinlock. The used algorithm is different from sysv semaphores (ipc/sem.c):

Is that last sentence even wanted?

>+ *
>+ * - Set pointer to message.
>+ * - Queue the receiver task's for later wakeup (without the info->lock).

It's "task" singular, and the apostrophe would be wrong if it were plural.

>+ * - Update its state to STATE_READY. Now the receiver can continue.
>+ * - Wake up the process after the lock is dropped. Should the process wake up
>+ *   before this wakeup (due to a timeout or a signal) it will either see
>+ *   STATE_READY and continue or acquire the lock to check the sate again.

"check the sTate again".

>+  wake_q_add(wake_q, receiver->task);
>+  /*
>+   * Rely on the implicit cmpxchg barrier from wake_q_add such
>+   * that we can ensure that updating receiver->state is the last
>+   * write operation: As once set, the receiver can continue,
>+   * and if we don't have the reference count from the wake_q,
>+   * yet, at that point we can later have a use-after-free
>+   * condition and bogus wakeup.
>+   */
>   receiver->state = STATE_READY;

How about:
/*
 * There must be a write barrier here; setting STATE_READY
 * lets the receiver proceed without further synchronization.
 * The cmpxchg inside wake_q_add serves as the barrier here.
 */

The need for a wake queue to take a reference to avoid use-after-free
is generic to wake queues, and handled in generic code; I don't see why
it needs a comment here.

 
>@@ -1084,6 +1094,7 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char 
>__user *, u_msg_ptr,
>   ktime_t expires, *timeout = NULL;
>   struct timespec ts;
>   struct posix_msg_tree_node *new_leaf = NULL;
>+  WAKE_Q(wake_q);
> 
>   if (u_abs_timeout) {
>   int res = prepare_timeout(u_abs_timeout, &expires, &ts);
>@@ -1155,8 +1166,9 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char 
>__user *, u_msg_ptr,
>   CURRENT_TIME;
> 
>   /* There is now free space in queue. */
>-  pipelined_receive(info);
>+  pipelined_receive(&wake_q, info);
>   spin_unlock(&info->lock);
>+  wake_up_q(&wake_q);
>   ret = 0;
>   }
>   if (ret == 0) {

Since WAKE_Q actually involves some initialization, would it make sense to
move its declaration to inside the condition that needs it?

(I'm also a fan of declaring variables in the smallest scope possible,
just on general principles.)
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/


[PATCH 3/3] ipc/mqueue: lockless pipelined wakeups

2015-05-01 Thread Davidlohr Bueso
This patch moves the wakeup_process() invocation so it is not done under
the info->lock by making use of a lockless wake_q. With this change, the
waiter is woken up once it is STATE_READY and it does not need to loop
on SMP if it is still in STATE_PENDING. In the timeout case we still need
to grab the info->lock to verify the state.

This change should also avoid the introduction of preempt_disable() in
-RT which avoids a busy-loop which pools for the STATE_PENDING -> STATE_READY
change if the waiter has a higher priority compared to the waker.

Additionally, this patch micro-optimizes wq_sleep by using the cheaper
cousin of set_current_state(TASK_INTERRUPTABLE) as we will block no
matter what, thus get rid of the implied barrier.

Signed-off-by: Davidlohr Bueso 
---
 ipc/mqueue.c | 50 +++---
 1 file changed, 31 insertions(+), 19 deletions(-)

diff --git a/ipc/mqueue.c b/ipc/mqueue.c
index 3aaea7f..5f422f2 100644
--- a/ipc/mqueue.c
+++ b/ipc/mqueue.c
@@ -47,8 +47,7 @@
 #define RECV   1
 
 #define STATE_NONE 0
-#define STATE_PENDING  1
-#define STATE_READY2
+#define STATE_READY1
 
 struct posix_msg_tree_node {
struct rb_node  rb_node;
@@ -571,15 +570,12 @@ static int wq_sleep(struct mqueue_inode_info *info, int 
sr,
wq_add(info, sr, ewp);
 
for (;;) {
-   set_current_state(TASK_INTERRUPTIBLE);
+   __set_current_state(TASK_INTERRUPTIBLE);
 
spin_unlock(&info->lock);
time = schedule_hrtimeout_range_clock(timeout, 0,
HRTIMER_MODE_ABS, CLOCK_REALTIME);
 
-   while (ewp->state == STATE_PENDING)
-   cpu_relax();
-
if (ewp->state == STATE_READY) {
retval = 0;
goto out;
@@ -909,9 +905,14 @@ out_name:
  * bypasses the message array and directly hands the message over to the
  * receiver.
  * The receiver accepts the message and returns without grabbing the queue
- * spinlock. Therefore an intermediate STATE_PENDING state and memory barriers
- * are necessary. The same algorithm is used for sysv semaphores, see
- * ipc/sem.c for more details.
+ * spinlock. The used algorithm is different from sysv semaphores (ipc/sem.c):
+ *
+ * - Set pointer to message.
+ * - Queue the receiver task's for later wakeup (without the info->lock).
+ * - Update its state to STATE_READY. Now the receiver can continue.
+ * - Wake up the process after the lock is dropped. Should the process wake up
+ *   before this wakeup (due to a timeout or a signal) it will either see
+ *   STATE_READY and continue or acquire the lock to check the sate again.
  *
  * The same algorithm is used for senders.
  */
@@ -919,21 +920,29 @@ out_name:
 /* pipelined_send() - send a message directly to the task waiting in
  * sys_mq_timedreceive() (without inserting message into a queue).
  */
-static inline void pipelined_send(struct mqueue_inode_info *info,
+static inline void pipelined_send(struct wake_q_head *wake_q,
+ struct mqueue_inode_info *info,
  struct msg_msg *message,
  struct ext_wait_queue *receiver)
 {
receiver->msg = message;
list_del(&receiver->list);
-   receiver->state = STATE_PENDING;
-   wake_up_process(receiver->task);
-   smp_wmb();
+   wake_q_add(wake_q, receiver->task);
+   /*
+* Rely on the implicit cmpxchg barrier from wake_q_add such
+* that we can ensure that updating receiver->state is the last
+* write operation: As once set, the receiver can continue,
+* and if we don't have the reference count from the wake_q,
+* yet, at that point we can later have a use-after-free
+* condition and bogus wakeup.
+*/
receiver->state = STATE_READY;
 }
 
 /* pipelined_receive() - if there is task waiting in sys_mq_timedsend()
  * gets its message and put to the queue (we have one free place for sure). */
-static inline void pipelined_receive(struct mqueue_inode_info *info)
+static inline void pipelined_receive(struct wake_q_head *wake_q,
+struct mqueue_inode_info *info)
 {
struct ext_wait_queue *sender = wq_get_first_waiter(info, SEND);
 
@@ -944,10 +953,9 @@ static inline void pipelined_receive(struct 
mqueue_inode_info *info)
}
if (msg_insert(sender->msg, info))
return;
+
list_del(&sender->list);
-   sender->state = STATE_PENDING;
-   wake_up_process(sender->task);
-   smp_wmb();
+   wake_q_add(wake_q, sender->task);
sender->state = STATE_READY;
 }
 
@@ -965,6 +973,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char 
__user *, u_msg_ptr,
struct timespec ts;
struct posix_msg_tree_node *new_leaf = NULL;
int ret = 0;
+   WAKE_Q(wake_q);
 
if