This patch moves the wakeup_process() invocation so it is not done under
the info->lock. With this change, the waiter is woken up once it is
"ready" which means its state is STATE_READY and it does not need to loop
on SMP if it is still in STATE_PENDING.
In the timeout case we still need to grab the info->lock to verify the state.

This change should also avoid the introduction of preempt_disable() in
-RT which avoids a busy-loop which pools for the STATE_PENDING -> STATE_READY
change if the waiter has a higher priority compared to the waker.

Signed-off-by: Sebastian Andrzej Siewior <[email protected]>
---
 ipc/mqueue.c | 45 ++++++++++++++++++++++++++++-----------------
 1 file changed, 28 insertions(+), 17 deletions(-)

diff --git a/ipc/mqueue.c b/ipc/mqueue.c
index 7635a1cf99f3..95d179ed0923 100644
--- a/ipc/mqueue.c
+++ b/ipc/mqueue.c
@@ -47,7 +47,6 @@
 #define RECV           1
 
 #define STATE_NONE     0
-#define STATE_PENDING  1
 #define STATE_READY    2
 
 struct posix_msg_tree_node {
@@ -577,9 +576,6 @@ static int wq_sleep(struct mqueue_inode_info *info, int sr,
                time = schedule_hrtimeout_range_clock(timeout, 0,
                        HRTIMER_MODE_ABS, CLOCK_REALTIME);
 
-               while (ewp->state == STATE_PENDING)
-                       cpu_relax();
-
                if (ewp->state == STATE_READY) {
                        retval = 0;
                        goto out;
@@ -909,9 +905,8 @@ SYSCALL_DEFINE1(mq_unlink, const char __user *, u_name)
  * bypasses the message array and directly hands the message over to the
  * receiver.
  * The receiver accepts the message and returns without grabbing the queue
- * spinlock. Therefore an intermediate STATE_PENDING state and memory barriers
- * are necessary. The same algorithm is used for sysv semaphores, see
- * ipc/sem.c for more details.
+ * spinlock. The same algorithm is used for sysv semaphores, see ipc/sem.c
+ * for more details.
  *
  * The same algorithm is used for senders.
  */
@@ -919,36 +914,41 @@ SYSCALL_DEFINE1(mq_unlink, const char __user *, u_name)
 /* pipelined_send() - send a message directly to the task waiting in
  * sys_mq_timedreceive() (without inserting message into a queue).
  */
-static inline void pipelined_send(struct mqueue_inode_info *info,
+static struct task_struct *pipelined_send(struct mqueue_inode_info *info,
                                  struct msg_msg *message,
                                  struct ext_wait_queue *receiver)
 {
+       struct task_struct *r_task;
+
        receiver->msg = message;
        list_del(&receiver->list);
-       receiver->state = STATE_PENDING;
-       wake_up_process(receiver->task);
+       r_task = receiver->task;
+       get_task_struct(r_task);
        smp_wmb();
        receiver->state = STATE_READY;
+       return r_task;
 }
 
 /* pipelined_receive() - if there is task waiting in sys_mq_timedsend()
  * gets its message and put to the queue (we have one free place for sure). */
-static inline void pipelined_receive(struct mqueue_inode_info *info)
+static struct task_struct *pipelined_receive(struct mqueue_inode_info *info)
 {
+       struct task_struct *r_sender;
        struct ext_wait_queue *sender = wq_get_first_waiter(info, SEND);
 
        if (!sender) {
                /* for poll */
                wake_up_interruptible(&info->wait_q);
-               return;
+               return NULL;
        }
        if (msg_insert(sender->msg, info))
-               return;
+               return NULL;
        list_del(&sender->list);
-       sender->state = STATE_PENDING;
-       wake_up_process(sender->task);
+       r_sender = sender->task;
+       get_task_struct(r_sender);
        smp_wmb();
        sender->state = STATE_READY;
+       return r_sender;
 }
 
 SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
@@ -961,6 +961,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char 
__user *, u_msg_ptr,
        struct ext_wait_queue *receiver;
        struct msg_msg *msg_ptr;
        struct mqueue_inode_info *info;
+       struct task_struct *r_task = NULL;
        ktime_t expires, *timeout = NULL;
        struct timespec ts;
        struct posix_msg_tree_node *new_leaf = NULL;
@@ -1049,7 +1050,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char 
__user *, u_msg_ptr,
        } else {
                receiver = wq_get_first_waiter(info, RECV);
                if (receiver) {
-                       pipelined_send(info, msg_ptr, receiver);
+                       r_task = pipelined_send(info, msg_ptr, receiver);
                } else {
                        /* adds message to the queue */
                        ret = msg_insert(msg_ptr, info);
@@ -1062,6 +1063,10 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char 
__user *, u_msg_ptr,
        }
 out_unlock:
        spin_unlock(&info->lock);
+       if (r_task) {
+               wake_up_process(r_task);
+               put_task_struct(r_task);
+       }
 out_free:
        if (ret)
                free_msg(msg_ptr);
@@ -1149,14 +1154,20 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char 
__user *, u_msg_ptr,
                        msg_ptr = wait.msg;
                }
        } else {
+               struct task_struct *r_sender;
+
                msg_ptr = msg_get(info);
 
                inode->i_atime = inode->i_mtime = inode->i_ctime =
                                CURRENT_TIME;
 
                /* There is now free space in queue. */
-               pipelined_receive(info);
+               r_sender = pipelined_receive(info);
                spin_unlock(&info->lock);
+               if (r_sender) {
+                       wake_up_process(r_sender);
+                       put_task_struct(r_sender);
+               }
                ret = 0;
        }
        if (ret == 0) {
-- 
2.1.4

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to [email protected]
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Reply via email to