The following workqueue related patch is a port of the original VFCIPI PI patch I submitted earlier. There is still more work to be done to add the "schedule_on_cpu()" type behavior, and even more if we want to use this as part of KVM. But for now, this patch can stand alone so I thought I would get it out there.
------------------------------------------------------------------ Mainline workqueues treat all requests the same. This patch prioritizes requests in the queue based on the callers current rt_priority. It also adds a priority-inheritance feature to avoid inversion. Signed-off-by: Gregory Haskins <[EMAIL PROTECTED]> --- include/linux/workqueue.h | 2 kernel/workqueue.c | 198 +++++++++++++++++++++++++++++++++------------ 2 files changed, 149 insertions(+), 51 deletions(-) diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h index 925d898..ae740b6 100644 --- a/include/linux/workqueue.h +++ b/include/linux/workqueue.h @@ -28,6 +28,7 @@ struct work_struct { #define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK) struct list_head entry; work_func_t func; + int prio; }; #define WORK_DATA_INIT() ATOMIC_LONG_INIT(0) @@ -81,6 +82,7 @@ struct execute_work { (_work)->data = (atomic_long_t) WORK_DATA_INIT(); \ INIT_LIST_HEAD(&(_work)->entry); \ PREPARE_WORK((_work), (_func)); \ + (_work)->prio = -1; \ } while (0) #define INIT_DELAYED_WORK(_work, _func) \ diff --git a/kernel/workqueue.c b/kernel/workqueue.c index e339c80..ecd5e01 100644 --- a/kernel/workqueue.c +++ b/kernel/workqueue.c @@ -14,6 +14,8 @@ * Theodore Ts'o <[EMAIL PROTECTED]> * * Made to use alloc_percpu by Christoph Lameter <[EMAIL PROTECTED]>. + * + * RT-queues and PI by Gregory Haskins <[EMAIL PROTECTED]> */ #include <linux/module.h> @@ -36,6 +38,13 @@ #include <asm/uaccess.h> +/* Note: prio_array inspiration credit goes to the RT scheduler...*/ +struct prio_array { + DECLARE_BITMAP(bitmap, MAX_RT_PRIO+1); /* +1 bit for delimiter */ + unsigned long count; + struct list_head queue[MAX_RT_PRIO]; +}; + /* * The per-CPU workqueue (if single thread, we always use the first * possible cpu). @@ -45,6 +54,7 @@ struct cpu_workqueue_struct { spinlock_t lock; struct list_head worklist; + struct prio_array rt_worklist; wait_queue_head_t more_work; struct work_struct *current_work; @@ -52,6 +62,7 @@ struct cpu_workqueue_struct { struct task_struct *thread; int run_depth; /* Detect run_workqueue() recursion depth */ + int prio; /* -1 = normal priority, 0-99 = RT priority */ } ____cacheline_aligned; /* @@ -82,6 +93,94 @@ static cpumask_t cpu_singlethread_map __read_mostly; */ static cpumask_t cpu_populated_map __read_mostly; +/* + * ---------------------------------------- + * prio_array + * ---------------------------------------- + */ +static void prio_array_init(struct prio_array *array) +{ + int i; + + memset(array->bitmap, 0, sizeof(array->bitmap)); + array->count = 0; + + for (i=0; i<MAX_RT_PRIO; i++) + INIT_LIST_HEAD(&array->queue[i]); +} + +static struct work_struct* prio_array_dequeue(struct prio_array *array) +{ + struct list_head *head; + struct work_struct *work; + int idx; + + if (!array->count) + return NULL; + + idx = sched_find_first_bit(array->bitmap); + + head = array->queue + idx; + + /* If we got here, there better be something in the list */ + BUG_ON(!head); + BUG_ON(list_empty(head)); + + work = list_first_entry(head, struct work_struct, entry); + BUG_ON(!work); + + list_del_init(&work->entry); + array->count--; + + if (list_empty(head)) + __clear_bit(idx, &array->bitmap); + + return work; +} + +static void prio_array_enqueue(struct prio_array *array, + struct work_struct *work, + int prio, int tail) +{ + struct list_head *head; + + BUG_ON(prio < 0); + BUG_ON(prio > MAX_RT_PRIO); + + head = array->queue + prio; + + if (tail) + list_add_tail(&work->entry, head); + else + list_add(&work->entry, head); + + __set_bit(prio, &array->bitmap); + array->count++; +} + +#define CWQ_DEFAULT_PRIO 1 + +/* Assumes lock is held */ +static void wq_task_setprio(struct cpu_workqueue_struct *cwq, int prio) +{ + struct sched_param param = { 0, }; + pid_t pid = cwq->thread->pid; + + if (prio < CWQ_DEFAULT_PRIO) + prio = CWQ_DEFAULT_PRIO; + + if (cwq->prio != prio) { + if (prio != -1) { + param.sched_priority = prio; + sys_sched_setscheduler(pid, SCHED_FIFO, ¶m); + } else { + sys_sched_setscheduler(pid, SCHED_NORMAL, ¶m); + } + + cwq->prio = prio; + } +} + /* If it's single threaded, it isn't in the list of workqueues. */ static inline int is_single_threaded(struct workqueue_struct *wq) { @@ -133,10 +232,22 @@ static void insert_work(struct cpu_workqueue_struct *cwq, * result of list_add() below, see try_to_grab_pending(). */ smp_wmb(); - if (tail) - list_add_tail(&work->entry, &cwq->worklist); - else - list_add(&work->entry, &cwq->worklist); + if (rt_task(current)) { + work->prio = current->rt_priority; + prio_array_enqueue(&cwq->rt_worklist, work, work->prio, tail); + + /* Priority inheritance on the kthread */ + if (cwq->prio < work->prio) + wq_task_setprio(cwq, work->prio); + } else { + work->prio = -1; + if (tail) + list_add_tail(&work->entry, &cwq->worklist); + else + list_add(&work->entry, &cwq->worklist); + + } + wake_up(&cwq->more_work); } @@ -254,8 +365,30 @@ static void leak_check(void *func) dump_stack(); } +static struct work_struct* cwq_dequeue(struct cpu_workqueue_struct *cwq) +{ + struct work_struct *work; + + /* First check the RT items */ + work = prio_array_dequeue(&cwq->rt_worklist); + if (!work) { + /* If nothing is found there, check the normal queue */ + if (!list_empty(&cwq->worklist)) { + work = list_first_entry(&cwq->worklist, + struct work_struct, + entry); + BUG_ON(!work); + list_del_init(&work->entry); + } + } + + return work; +} + static void run_workqueue(struct cpu_workqueue_struct *cwq) { + struct work_struct *work; + spin_lock_irq(&cwq->lock); cwq->run_depth++; if (cwq->run_depth > 3) { @@ -264,13 +397,12 @@ static void run_workqueue(struct cpu_workqueue_struct *cwq) __FUNCTION__, cwq->run_depth); dump_stack(); } - while (!list_empty(&cwq->worklist)) { - struct work_struct *work = list_entry(cwq->worklist.next, - struct work_struct, entry); + while ((work = cwq_dequeue(cwq))) { work_func_t f = work->func; + wq_task_setprio(cwq, work->prio); cwq->current_work = work; - list_del_init(cwq->worklist.next); + spin_unlock_irq(&cwq->lock); BUG_ON(get_wq_data(work) != cwq); @@ -283,6 +415,10 @@ static void run_workqueue(struct cpu_workqueue_struct *cwq) spin_lock_irq(&cwq->lock); cwq->current_work = NULL; } + + /* Make sure we are back to normal priority before sleeping */ + wq_task_setprio(cwq, -1); + cwq->run_depth--; spin_unlock_irq(&cwq->lock); } @@ -295,7 +431,7 @@ static int worker_thread(void *__cwq) if (!cwq->wq->freezeable) current->flags |= PF_NOFREEZE; - set_user_nice(current, -5); + wq_task_setprio(cwq, -1); for (;;) { prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE); @@ -691,7 +827,9 @@ init_cpu_workqueue(struct workqueue_struct *wq, int cpu) cwq->wq = wq; spin_lock_init(&cwq->lock); INIT_LIST_HEAD(&cwq->worklist); + prio_array_init(&cwq->rt_worklist); init_waitqueue_head(&cwq->more_work); + cwq->prio = -1; return cwq; } @@ -803,47 +941,6 @@ static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) cwq->thread = NULL; } -void set_workqueue_thread_prio(struct workqueue_struct *wq, int cpu, - int policy, int rt_priority, int nice) -{ - struct sched_param param = { .sched_priority = rt_priority }; - struct cpu_workqueue_struct *cwq; - mm_segment_t oldfs = get_fs(); - struct task_struct *p; - unsigned long flags; - int ret; - - cwq = per_cpu_ptr(wq->cpu_wq, cpu); - spin_lock_irqsave(&cwq->lock, flags); - p = cwq->thread; - spin_unlock_irqrestore(&cwq->lock, flags); - - set_user_nice(p, nice); - - set_fs(KERNEL_DS); - ret = sys_sched_setscheduler(p->pid, policy, ¶m); - set_fs(oldfs); - - WARN_ON(ret); -} - - void set_workqueue_prio(struct workqueue_struct *wq, int policy, - int rt_priority, int nice) -{ - int cpu; - - /* We don't need the distraction of CPUs appearing and vanishing. */ - mutex_lock(&workqueue_mutex); - if (is_single_threaded(wq)) - set_workqueue_thread_prio(wq, 0, policy, rt_priority, nice); - else { - for_each_online_cpu(cpu) - set_workqueue_thread_prio(wq, cpu, policy, - rt_priority, nice); - } - mutex_unlock(&workqueue_mutex); -} - /** * destroy_workqueue - safely terminate a workqueue * @wq: target workqueue @@ -926,5 +1023,4 @@ void __init init_workqueues(void) hotcpu_notifier(workqueue_cpu_callback, 0); keventd_wq = create_workqueue("events"); BUG_ON(!keventd_wq); - set_workqueue_prio(keventd_wq, SCHED_FIFO, 1, -20); } - To unsubscribe from this list: send the line "unsubscribe linux-kernel" in the body of a message to [EMAIL PROTECTED] More majordomo info at http://vger.kernel.org/majordomo-info.html Please read the FAQ at http://www.tux.org/lkml/