The following workqueue related patch is a port of the original VFCIPI PI
patch I submitted earlier.  There is still more work to be done to add the
"schedule_on_cpu()" type behavior, and even more if we want to use this as
part of KVM.  But for now, this patch can stand alone so I thought I would
get it out there.

------------------------------------------------------------------

Mainline workqueues treat all requests the same.  This patch prioritizes
requests in the queue based on the callers current rt_priority.  It also
adds a priority-inheritance feature to avoid inversion.

Signed-off-by: Gregory Haskins <[EMAIL PROTECTED]>
---

 include/linux/workqueue.h |    2 
 kernel/workqueue.c        |  198 +++++++++++++++++++++++++++++++++------------
 2 files changed, 149 insertions(+), 51 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 925d898..ae740b6 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -28,6 +28,7 @@ struct work_struct {
 #define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK)
        struct list_head entry;
        work_func_t func;
+       int prio;
 };
 
 #define WORK_DATA_INIT()       ATOMIC_LONG_INIT(0)
@@ -81,6 +82,7 @@ struct execute_work {
                (_work)->data = (atomic_long_t) WORK_DATA_INIT();       \
                INIT_LIST_HEAD(&(_work)->entry);                        \
                PREPARE_WORK((_work), (_func));                         \
+                (_work)->prio = -1;                                     \
        } while (0)
 
 #define INIT_DELAYED_WORK(_work, _func)                                \
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index e339c80..ecd5e01 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -14,6 +14,8 @@
  *   Theodore Ts'o <[EMAIL PROTECTED]>
  *
  * Made to use alloc_percpu by Christoph Lameter <[EMAIL PROTECTED]>.
+ *
+ * RT-queues and PI by Gregory Haskins <[EMAIL PROTECTED]>
  */
 
 #include <linux/module.h>
@@ -36,6 +38,13 @@
 
 #include <asm/uaccess.h>
 
+/* Note: prio_array inspiration credit goes to the RT scheduler...*/
+struct prio_array {
+       DECLARE_BITMAP(bitmap, MAX_RT_PRIO+1); /* +1 bit for delimiter */
+       unsigned long    count;
+       struct list_head queue[MAX_RT_PRIO];
+};
+
 /*
  * The per-CPU workqueue (if single thread, we always use the first
  * possible cpu).
@@ -45,6 +54,7 @@ struct cpu_workqueue_struct {
        spinlock_t lock;
 
        struct list_head worklist;
+       struct prio_array rt_worklist;
        wait_queue_head_t more_work;
        struct work_struct *current_work;
 
@@ -52,6 +62,7 @@ struct cpu_workqueue_struct {
        struct task_struct *thread;
 
        int run_depth;          /* Detect run_workqueue() recursion depth */
+       int prio;               /* -1 = normal priority, 0-99 = RT priority */
 } ____cacheline_aligned;
 
 /*
@@ -82,6 +93,94 @@ static cpumask_t cpu_singlethread_map __read_mostly;
  */
 static cpumask_t cpu_populated_map __read_mostly;
 
+/*
+ * ----------------------------------------
+ * prio_array
+ * ----------------------------------------
+ */
+static void prio_array_init(struct prio_array *array)
+{
+       int i;
+
+       memset(array->bitmap, 0, sizeof(array->bitmap));
+       array->count = 0;
+       
+       for (i=0; i<MAX_RT_PRIO; i++)
+               INIT_LIST_HEAD(&array->queue[i]);
+}
+
+static struct work_struct* prio_array_dequeue(struct prio_array *array)
+{
+       struct list_head   *head;
+       struct work_struct *work;
+       int                 idx;
+       
+       if (!array->count)
+               return NULL;
+       
+       idx = sched_find_first_bit(array->bitmap);
+       
+       head = array->queue + idx;
+       
+       /* If we got here, there better be something in the list */
+       BUG_ON(!head);
+       BUG_ON(list_empty(head));
+       
+       work = list_first_entry(head, struct work_struct, entry);
+       BUG_ON(!work);
+       
+       list_del_init(&work->entry);
+       array->count--;
+       
+       if (list_empty(head))
+               __clear_bit(idx, &array->bitmap);
+       
+       return work;
+}
+
+static void prio_array_enqueue(struct prio_array *array,
+                              struct work_struct *work,
+                              int prio, int tail)
+{
+       struct list_head *head;
+
+       BUG_ON(prio < 0);
+       BUG_ON(prio > MAX_RT_PRIO);
+
+       head = array->queue + prio;
+
+       if (tail)
+               list_add_tail(&work->entry, head);
+       else
+               list_add(&work->entry, head);
+
+       __set_bit(prio, &array->bitmap);
+       array->count++;
+}
+
+#define CWQ_DEFAULT_PRIO 1
+
+/* Assumes lock is held */
+static void wq_task_setprio(struct cpu_workqueue_struct *cwq, int prio)
+{
+       struct sched_param param = { 0, };
+       pid_t              pid   = cwq->thread->pid;
+
+       if (prio < CWQ_DEFAULT_PRIO)
+               prio = CWQ_DEFAULT_PRIO;
+
+       if (cwq->prio != prio) {
+               if (prio != -1) {
+                       param.sched_priority = prio;
+                       sys_sched_setscheduler(pid, SCHED_FIFO, &param);
+               } else {
+                       sys_sched_setscheduler(pid, SCHED_NORMAL, &param);
+               }
+
+               cwq->prio = prio;
+       }
+}
+
 /* If it's single threaded, it isn't in the list of workqueues. */
 static inline int is_single_threaded(struct workqueue_struct *wq)
 {
@@ -133,10 +232,22 @@ static void insert_work(struct cpu_workqueue_struct *cwq,
         * result of list_add() below, see try_to_grab_pending().
         */
        smp_wmb();
-       if (tail)
-               list_add_tail(&work->entry, &cwq->worklist);
-       else
-               list_add(&work->entry, &cwq->worklist);
+       if (rt_task(current)) {
+               work->prio = current->rt_priority;
+               prio_array_enqueue(&cwq->rt_worklist, work, work->prio, tail);
+
+               /* Priority inheritance on the kthread */
+               if (cwq->prio < work->prio)
+                       wq_task_setprio(cwq, work->prio);
+       } else {
+               work->prio = -1;
+               if (tail)
+                       list_add_tail(&work->entry, &cwq->worklist);
+               else
+                       list_add(&work->entry, &cwq->worklist);
+
+       }
+
        wake_up(&cwq->more_work);
 }
 
@@ -254,8 +365,30 @@ static void leak_check(void *func)
        dump_stack();
 }
 
+static struct work_struct* cwq_dequeue(struct cpu_workqueue_struct *cwq)
+{
+       struct work_struct *work;
+       
+       /* First check the RT items */
+       work = prio_array_dequeue(&cwq->rt_worklist);
+       if (!work) {
+               /* If nothing is found there, check the normal queue */
+               if (!list_empty(&cwq->worklist)) {
+                       work = list_first_entry(&cwq->worklist,
+                                               struct work_struct,
+                                               entry);
+                       BUG_ON(!work);
+                       list_del_init(&work->entry);
+               }
+       }
+
+       return work;
+}
+
 static void run_workqueue(struct cpu_workqueue_struct *cwq)
 {
+       struct work_struct *work;
+
        spin_lock_irq(&cwq->lock);
        cwq->run_depth++;
        if (cwq->run_depth > 3) {
@@ -264,13 +397,12 @@ static void run_workqueue(struct cpu_workqueue_struct 
*cwq)
                        __FUNCTION__, cwq->run_depth);
                dump_stack();
        }
-       while (!list_empty(&cwq->worklist)) {
-               struct work_struct *work = list_entry(cwq->worklist.next,
-                                               struct work_struct, entry);
+       while ((work = cwq_dequeue(cwq))) {
                work_func_t f = work->func;
 
+               wq_task_setprio(cwq, work->prio);
                cwq->current_work = work;
-               list_del_init(cwq->worklist.next);
+
                spin_unlock_irq(&cwq->lock);
 
                BUG_ON(get_wq_data(work) != cwq);
@@ -283,6 +415,10 @@ static void run_workqueue(struct cpu_workqueue_struct *cwq)
                spin_lock_irq(&cwq->lock);
                cwq->current_work = NULL;
        }
+
+       /* Make sure we are back to normal priority before sleeping */
+       wq_task_setprio(cwq, -1);
+
        cwq->run_depth--;
        spin_unlock_irq(&cwq->lock);
 }
@@ -295,7 +431,7 @@ static int worker_thread(void *__cwq)
        if (!cwq->wq->freezeable)
                current->flags |= PF_NOFREEZE;
 
-       set_user_nice(current, -5);
+       wq_task_setprio(cwq, -1);
 
        for (;;) {
                prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
@@ -691,7 +827,9 @@ init_cpu_workqueue(struct workqueue_struct *wq, int cpu)
        cwq->wq = wq;
        spin_lock_init(&cwq->lock);
        INIT_LIST_HEAD(&cwq->worklist);
+       prio_array_init(&cwq->rt_worklist);
        init_waitqueue_head(&cwq->more_work);
+       cwq->prio = -1;
 
        return cwq;
 }
@@ -803,47 +941,6 @@ static void cleanup_workqueue_thread(struct 
cpu_workqueue_struct *cwq, int cpu)
        cwq->thread = NULL;
 }
 
-void set_workqueue_thread_prio(struct workqueue_struct *wq, int cpu,
-                              int policy, int rt_priority, int nice)
-{
-       struct sched_param param = { .sched_priority = rt_priority };
-       struct cpu_workqueue_struct *cwq;
-       mm_segment_t oldfs = get_fs();
-       struct task_struct *p;
-       unsigned long flags;
-       int ret;
-
-       cwq = per_cpu_ptr(wq->cpu_wq, cpu);
-       spin_lock_irqsave(&cwq->lock, flags);
-       p = cwq->thread;
-       spin_unlock_irqrestore(&cwq->lock, flags);
-
-       set_user_nice(p, nice);
-
-       set_fs(KERNEL_DS);
-       ret = sys_sched_setscheduler(p->pid, policy, &param);
-       set_fs(oldfs);
-
-       WARN_ON(ret);
-}
-
- void set_workqueue_prio(struct workqueue_struct *wq, int policy,
-                       int rt_priority, int nice)
-{
-       int cpu;
-
-       /* We don't need the distraction of CPUs appearing and vanishing. */
-       mutex_lock(&workqueue_mutex);
-       if (is_single_threaded(wq))
-               set_workqueue_thread_prio(wq, 0, policy, rt_priority, nice);
-       else {
-               for_each_online_cpu(cpu)
-                       set_workqueue_thread_prio(wq, cpu, policy,
-                                                 rt_priority, nice);
-       }
-       mutex_unlock(&workqueue_mutex);
-}
-
 /**
  * destroy_workqueue - safely terminate a workqueue
  * @wq: target workqueue
@@ -926,5 +1023,4 @@ void __init init_workqueues(void)
        hotcpu_notifier(workqueue_cpu_callback, 0);
        keventd_wq = create_workqueue("events");
        BUG_ON(!keventd_wq);
-       set_workqueue_prio(keventd_wq, SCHED_FIFO, 1, -20);
 }

-
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to [EMAIL PROTECTED]
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Reply via email to