Re: [PATCH 3/3] ipc/mqueue: lockless pipelined wakeups
> You are not wrong, but I'd rather leave the comment as is, as it will > vary from user to user. The comments in the sched wake_q bits are > already pretty clear, and if users cannot see the need for holding > reference and the task disappearing on their own they have no business > using wake_q. Furthermore, I think my comment serves better in mqueues > as the need for it isn't immediately obvious. Okay, but the comment is still rather awkward and hard-to-follow English. How about: /* * Rely on the implicit cmpxchg barrier from wake_q_add to * ensure that updating receiver->state is the last write * operation. Once set, the receiver can continue, and if we * hadn't placed it on the wake_q (which takes a reference to * the task), any later use might cause a use-after-free * condition. */ Part of the confusion is that there are two different ordering conditions that wake_q_add is involved in, and the comment above (even my version) isn't good about explaining the distinction: 1) It, itself, must come before the receiver->state update, because after that, the receiver may run (and possibly exit). 2) It serves as a write barrier for all the other state writes above. If I wanted to be clearer, I'd have to do more extensive edits: /* * wake_q_add must come before updating receiver->state, since * that write lets the receiver continue (and possibly exit). * The reference count from the wake_q prevents use-after-free. * * The cmpxchg inside wake_q_add also serves as a write barrier * for all the other state updates that must be visible before * receiver->state. */ None of this affects the code, which is Acked-by: George Spelvin -- To unsubscribe from this list: send the line "unsubscribe linux-kernel" in the body of a message to majord...@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html Please read the FAQ at http://www.tux.org/lkml/
Re: [PATCH 3/3] ipc/mqueue: lockless pipelined wakeups
On Fri, 2015-05-01 at 17:52 -0400, George Spelvin wrote: > In general, Acked-by, but you're making me fix all your comments. :-) > > This is a nice use of the wake queue, since the code was already handling > the same problem in a similar way with STATE_PENDING. > > > * The receiver accepts the message and returns without grabbing the queue > >+ * spinlock. The used algorithm is different from sysv semaphores > >(ipc/sem.c): > > Is that last sentence even wanted? Yeah, we can probably remove it now. > >+ * > >+ * - Set pointer to message. > >+ * - Queue the receiver task's for later wakeup (without the info->lock). > > It's "task" singular, and the apostrophe would be wrong if it were plural. > > >+ * - Update its state to STATE_READY. Now the receiver can continue. > >+ * - Wake up the process after the lock is dropped. Should the process wake > >up > >+ * before this wakeup (due to a timeout or a signal) it will either see > >+ * STATE_READY and continue or acquire the lock to check the sate again. > > "check the sTate again". > > >+wake_q_add(wake_q, receiver->task); > >+/* > >+ * Rely on the implicit cmpxchg barrier from wake_q_add such > >+ * that we can ensure that updating receiver->state is the last > >+ * write operation: As once set, the receiver can continue, > >+ * and if we don't have the reference count from the wake_q, > >+ * yet, at that point we can later have a use-after-free > >+ * condition and bogus wakeup. > >+ */ > > receiver->state = STATE_READY; > > How about: > /* >* There must be a write barrier here; setting STATE_READY >* lets the receiver proceed without further synchronization. >* The cmpxchg inside wake_q_add serves as the barrier here. >*/ > > The need for a wake queue to take a reference to avoid use-after-free > is generic to wake queues, and handled in generic code; I don't see why > it needs a comment here. You are not wrong, but I'd rather leave the comment as is, as it will vary from user to user. The comments in the sched wake_q bits are already pretty clear, and if users cannot see the need for holding reference and the task disappearing on their own they have no business using wake_q. Furthermore, I think my comment serves better in mqueues as the need for it isn't immediately obvious. > >@@ -1084,6 +1094,7 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char > >__user *, u_msg_ptr, > > ktime_t expires, *timeout = NULL; > > struct timespec ts; > > struct posix_msg_tree_node *new_leaf = NULL; > >+WAKE_Q(wake_q); > > > > if (u_abs_timeout) { > > int res = prepare_timeout(u_abs_timeout, &expires, &ts); > >@@ -1155,8 +1166,9 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char > >__user *, u_msg_ptr, > > CURRENT_TIME; > > > > /* There is now free space in queue. */ > >-pipelined_receive(info); > >+pipelined_receive(&wake_q, info); > > spin_unlock(&info->lock); > >+wake_up_q(&wake_q); > > ret = 0; > > } > > if (ret == 0) { > > Since WAKE_Q actually involves some initialization, would it make sense to > move its declaration to inside the condition that needs it? > > (I'm also a fan of declaring variables in the smallest scope possible, > just on general principles.) Agreed. Thanks, Davidlohr -- To unsubscribe from this list: send the line "unsubscribe linux-kernel" in the body of a message to majord...@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html Please read the FAQ at http://www.tux.org/lkml/
Re: [PATCH 3/3] ipc/mqueue: lockless pipelined wakeups
In general, Acked-by, but you're making me fix all your comments. :-) This is a nice use of the wake queue, since the code was already handling the same problem in a similar way with STATE_PENDING. > * The receiver accepts the message and returns without grabbing the queue >+ * spinlock. The used algorithm is different from sysv semaphores (ipc/sem.c): Is that last sentence even wanted? >+ * >+ * - Set pointer to message. >+ * - Queue the receiver task's for later wakeup (without the info->lock). It's "task" singular, and the apostrophe would be wrong if it were plural. >+ * - Update its state to STATE_READY. Now the receiver can continue. >+ * - Wake up the process after the lock is dropped. Should the process wake up >+ * before this wakeup (due to a timeout or a signal) it will either see >+ * STATE_READY and continue or acquire the lock to check the sate again. "check the sTate again". >+ wake_q_add(wake_q, receiver->task); >+ /* >+ * Rely on the implicit cmpxchg barrier from wake_q_add such >+ * that we can ensure that updating receiver->state is the last >+ * write operation: As once set, the receiver can continue, >+ * and if we don't have the reference count from the wake_q, >+ * yet, at that point we can later have a use-after-free >+ * condition and bogus wakeup. >+ */ > receiver->state = STATE_READY; How about: /* * There must be a write barrier here; setting STATE_READY * lets the receiver proceed without further synchronization. * The cmpxchg inside wake_q_add serves as the barrier here. */ The need for a wake queue to take a reference to avoid use-after-free is generic to wake queues, and handled in generic code; I don't see why it needs a comment here. >@@ -1084,6 +1094,7 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char >__user *, u_msg_ptr, > ktime_t expires, *timeout = NULL; > struct timespec ts; > struct posix_msg_tree_node *new_leaf = NULL; >+ WAKE_Q(wake_q); > > if (u_abs_timeout) { > int res = prepare_timeout(u_abs_timeout, &expires, &ts); >@@ -1155,8 +1166,9 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char >__user *, u_msg_ptr, > CURRENT_TIME; > > /* There is now free space in queue. */ >- pipelined_receive(info); >+ pipelined_receive(&wake_q, info); > spin_unlock(&info->lock); >+ wake_up_q(&wake_q); > ret = 0; > } > if (ret == 0) { Since WAKE_Q actually involves some initialization, would it make sense to move its declaration to inside the condition that needs it? (I'm also a fan of declaring variables in the smallest scope possible, just on general principles.) -- To unsubscribe from this list: send the line "unsubscribe linux-kernel" in the body of a message to majord...@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html Please read the FAQ at http://www.tux.org/lkml/
[PATCH 3/3] ipc/mqueue: lockless pipelined wakeups
This patch moves the wakeup_process() invocation so it is not done under the info->lock by making use of a lockless wake_q. With this change, the waiter is woken up once it is STATE_READY and it does not need to loop on SMP if it is still in STATE_PENDING. In the timeout case we still need to grab the info->lock to verify the state. This change should also avoid the introduction of preempt_disable() in -RT which avoids a busy-loop which pools for the STATE_PENDING -> STATE_READY change if the waiter has a higher priority compared to the waker. Additionally, this patch micro-optimizes wq_sleep by using the cheaper cousin of set_current_state(TASK_INTERRUPTABLE) as we will block no matter what, thus get rid of the implied barrier. Signed-off-by: Davidlohr Bueso --- ipc/mqueue.c | 50 +++--- 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/ipc/mqueue.c b/ipc/mqueue.c index 3aaea7f..5f422f2 100644 --- a/ipc/mqueue.c +++ b/ipc/mqueue.c @@ -47,8 +47,7 @@ #define RECV 1 #define STATE_NONE 0 -#define STATE_PENDING 1 -#define STATE_READY2 +#define STATE_READY1 struct posix_msg_tree_node { struct rb_node rb_node; @@ -571,15 +570,12 @@ static int wq_sleep(struct mqueue_inode_info *info, int sr, wq_add(info, sr, ewp); for (;;) { - set_current_state(TASK_INTERRUPTIBLE); + __set_current_state(TASK_INTERRUPTIBLE); spin_unlock(&info->lock); time = schedule_hrtimeout_range_clock(timeout, 0, HRTIMER_MODE_ABS, CLOCK_REALTIME); - while (ewp->state == STATE_PENDING) - cpu_relax(); - if (ewp->state == STATE_READY) { retval = 0; goto out; @@ -909,9 +905,14 @@ out_name: * bypasses the message array and directly hands the message over to the * receiver. * The receiver accepts the message and returns without grabbing the queue - * spinlock. Therefore an intermediate STATE_PENDING state and memory barriers - * are necessary. The same algorithm is used for sysv semaphores, see - * ipc/sem.c for more details. + * spinlock. The used algorithm is different from sysv semaphores (ipc/sem.c): + * + * - Set pointer to message. + * - Queue the receiver task's for later wakeup (without the info->lock). + * - Update its state to STATE_READY. Now the receiver can continue. + * - Wake up the process after the lock is dropped. Should the process wake up + * before this wakeup (due to a timeout or a signal) it will either see + * STATE_READY and continue or acquire the lock to check the sate again. * * The same algorithm is used for senders. */ @@ -919,21 +920,29 @@ out_name: /* pipelined_send() - send a message directly to the task waiting in * sys_mq_timedreceive() (without inserting message into a queue). */ -static inline void pipelined_send(struct mqueue_inode_info *info, +static inline void pipelined_send(struct wake_q_head *wake_q, + struct mqueue_inode_info *info, struct msg_msg *message, struct ext_wait_queue *receiver) { receiver->msg = message; list_del(&receiver->list); - receiver->state = STATE_PENDING; - wake_up_process(receiver->task); - smp_wmb(); + wake_q_add(wake_q, receiver->task); + /* +* Rely on the implicit cmpxchg barrier from wake_q_add such +* that we can ensure that updating receiver->state is the last +* write operation: As once set, the receiver can continue, +* and if we don't have the reference count from the wake_q, +* yet, at that point we can later have a use-after-free +* condition and bogus wakeup. +*/ receiver->state = STATE_READY; } /* pipelined_receive() - if there is task waiting in sys_mq_timedsend() * gets its message and put to the queue (we have one free place for sure). */ -static inline void pipelined_receive(struct mqueue_inode_info *info) +static inline void pipelined_receive(struct wake_q_head *wake_q, +struct mqueue_inode_info *info) { struct ext_wait_queue *sender = wq_get_first_waiter(info, SEND); @@ -944,10 +953,9 @@ static inline void pipelined_receive(struct mqueue_inode_info *info) } if (msg_insert(sender->msg, info)) return; + list_del(&sender->list); - sender->state = STATE_PENDING; - wake_up_process(sender->task); - smp_wmb(); + wake_q_add(wake_q, sender->task); sender->state = STATE_READY; } @@ -965,6 +973,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr, struct timespec ts; struct posix_msg_tree_node *new_leaf = NULL; int ret = 0; + WAKE_Q(wake_q); if