On 04/28/2011 09:40 PM, Sasha Levin wrote:
> This patch adds a generic pool to create a common interface for working with 
> threads within the kvm tool.
> Main idea here is using this threadpool for all I/O threads instead of having 
> every I/O module write it's own thread code.
> 
> The process of working with the thread pool is supposed to be very simple.
> During initialization, Each module which is interested in working with the 
> threadpool will call threadpool__add_jobtype with the callback function and a 
> void* parameter. For example, virtio modules will register every virt_queue 
> as a new job type.
> During operation, When theres work to do for a specific job, the module will 
> signal it to the queue and would expect the callback to be called with proper 
> parameters. It is assured that the callback will be called once for every 
> signal action and each callback will be called only once at a time (i.e. 
> callback functions themselves don't need to handle threading).
> 
> Signed-off-by: Sasha Levin <levinsasha...@gmail.com>
> ---
>  tools/kvm/Makefile                 |    1 +
>  tools/kvm/include/kvm/threadpool.h |   16 ++++
>  tools/kvm/kvm-run.c                |    5 +
>  tools/kvm/threadpool.c             |  171 
> ++++++++++++++++++++++++++++++++++++
>  4 files changed, 193 insertions(+), 0 deletions(-)
>  create mode 100644 tools/kvm/include/kvm/threadpool.h
>  create mode 100644 tools/kvm/threadpool.c
> 
> diff --git a/tools/kvm/Makefile b/tools/kvm/Makefile
> index 1b0c76e..fbce14d 100644
> --- a/tools/kvm/Makefile
> +++ b/tools/kvm/Makefile
> @@ -36,6 +36,7 @@ OBJS    += kvm-cmd.o
>  OBJS    += kvm-run.o
>  OBJS    += qcow.o
>  OBJS    += mptable.o
> +OBJS    += threadpool.o
>  
>  DEPS := $(patsubst %.o,%.d,$(OBJS))
>  
> diff --git a/tools/kvm/include/kvm/threadpool.h 
> b/tools/kvm/include/kvm/threadpool.h
> new file mode 100644
> index 0000000..25b5eb8
> --- /dev/null
> +++ b/tools/kvm/include/kvm/threadpool.h
> @@ -0,0 +1,16 @@
> +#ifndef KVM__THREADPOOL_H
> +#define KVM__THREADPOOL_H
> +
> +#include <stdint.h>
> +
> +struct kvm;
> +
> +typedef void (*kvm_thread_callback_fn_t)(struct kvm *kvm, void *data);
> +
> +int thread_pool__init(unsigned long thread_count);
> +
> +void *thread_pool__add_jobtype(struct kvm *kvm, kvm_thread_callback_fn_t 
> callback, void *data);
> +
> +void thread_pool__signal_work(void *job);
> +
> +#endif
> diff --git a/tools/kvm/kvm-run.c b/tools/kvm/kvm-run.c
> index 071157a..97a17dd 100644
> --- a/tools/kvm/kvm-run.c
> +++ b/tools/kvm/kvm-run.c
> @@ -24,6 +24,7 @@
>  #include <kvm/pci.h>
>  #include <kvm/term.h>
>  #include <kvm/ioport.h>
> +#include <kvm/threadpool.h>
>  
>  /* header files for gitish interface  */
>  #include <kvm/kvm-run.h>
> @@ -312,6 +313,7 @@ int kvm_cmd_run(int argc, const char **argv, const char 
> *prefix)
>       int i;
>       struct virtio_net_parameters net_params;
>       char *hi;
> +     unsigned int nr_online_cpus;
>  
>       signal(SIGALRM, handle_sigalrm);
>       signal(SIGQUIT, handle_sigquit);
> @@ -457,6 +459,9 @@ int kvm_cmd_run(int argc, const char **argv, const char 
> *prefix)
>  
>       kvm__init_ram(kvm);
>  
> +     nr_online_cpus = sysconf(_SC_NPROCESSORS_ONLN);
> +     thread_pool__init(nr_online_cpus);

We may benefit from more threads than the number of hardware thread we
have. Currently, virtio_console consumes two,  virio_net consumes two,
and virtio_blk consumes one. Can we adjust the thread pool size when
devices register to use thread pool?

> +
>       for (i = 0; i < nrcpus; i++) {
>               if (pthread_create(&kvm_cpus[i]->thread, NULL, kvm_cpu_thread, 
> kvm_cpus[i]) != 0)
>                       die("unable to create KVM VCPU thread");
> diff --git a/tools/kvm/threadpool.c b/tools/kvm/threadpool.c
> new file mode 100644
> index 0000000..e78db3a
> --- /dev/null
> +++ b/tools/kvm/threadpool.c
> @@ -0,0 +1,171 @@
> +#include "kvm/threadpool.h"
> +#include "kvm/mutex.h"
> +
> +#include <linux/kernel.h>
> +#include <linux/list.h>
> +#include <pthread.h>
> +#include <stdbool.h>
> +
> +struct thread_pool__job_info {
> +     kvm_thread_callback_fn_t callback;
> +     struct kvm *kvm;
> +     void *data;
> +
> +     int signalcount;
> +     pthread_mutex_t mutex;
> +
> +     struct list_head queue;
> +};

Does 'struct thread_pool__job' sound better?

> +static pthread_mutex_t       job_mutex               = 
> PTHREAD_MUTEX_INITIALIZER;
> +static pthread_mutex_t       thread_mutex    = PTHREAD_MUTEX_INITIALIZER;
> +static pthread_cond_t        job_cond                = 
> PTHREAD_COND_INITIALIZER;

These mutex and cond are global. As the number of thread/job grows,
there may be a lot of contention.

> +
> +static LIST_HEAD(head);
> +
> +static pthread_t     *threads;
> +static long                  threadcount;
> +
> +static struct thread_pool__job_info *thread_pool__job_info_pop(void)
> +{
> +     struct thread_pool__job_info *job;
> +
> +     if (list_empty(&head))
> +             return NULL;
> +
> +     job = list_first_entry(&head, struct thread_pool__job_info, queue);
> +     list_del(&job->queue);
> +
> +     return job;
> +}
> +
> +static void thread_pool__job_info_push(struct thread_pool__job_info *job)
> +{
> +     list_add_tail(&job->queue, &head);
> +}
> +
> +static struct thread_pool__job_info *thread_pool__job_info_pop_locked(void)
> +{
> +     struct thread_pool__job_info *job;
> +
> +     mutex_lock(&job_mutex);
> +     job = thread_pool__job_info_pop();
> +     mutex_unlock(&job_mutex);
> +     return job;
> +}
> +
> +static void thread_pool__job_info_push_locked(struct thread_pool__job_info 
> *job)
> +{
> +     mutex_lock(&job_mutex);
> +     thread_pool__job_info_push(job);
> +     mutex_unlock(&job_mutex);
> +}
> +
> +static void thread_pool__handle_job(struct thread_pool__job_info *job)
> +{
> +     while (job) {
> +             job->callback(job->kvm, job->data);
> +
> +             mutex_lock(&job->mutex);
> +
> +             if (--job->signalcount > 0)
> +                     /* If the job was signaled again while we were working 
> */
> +                     thread_pool__job_info_push_locked(job);
> +
> +             mutex_unlock(&job->mutex);
> +
> +             job = thread_pool__job_info_pop_locked();
> +     }
> +}
> +
> +static void thread_pool__threadfunc_cleanup(void *param)
> +{
> +     mutex_unlock(&job_mutex);
> +}
> +
> +static void *thread_pool__threadfunc(void *param)
> +{
> +     pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL);
> +
> +     for (;;) {
> +             struct thread_pool__job_info *curjob;
> +
> +             mutex_lock(&job_mutex);
> +             pthread_cond_wait(&job_cond, &job_mutex);
> +             curjob = thread_pool__job_info_pop();
> +             mutex_unlock(&job_mutex);
> +
> +             if (curjob)
> +                     thread_pool__handle_job(curjob);
> +     }
> +
> +     pthread_cleanup_pop(0);
> +
> +     return NULL;
> +}
> +
> +static int thread_pool__addthread(void)
> +{
> +     int res;
> +     void *newthreads;
> +
> +     mutex_lock(&thread_mutex);
> +     newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t));
> +     if (newthreads == NULL) {
> +             mutex_unlock(&thread_mutex);
> +             return -1;
> +     }
> +
> +     threads = newthreads;
> +
> +     res = pthread_create(threads + threadcount, NULL,
> +                                                     
> thread_pool__threadfunc, NULL);
> +
> +     if (res == 0)
> +             threadcount++;
> +     mutex_unlock(&thread_mutex);
> +
> +     return res;
> +}
> +
> +int thread_pool__init(unsigned long thread_count)
> +{
> +     unsigned long i;
> +
> +     for (i = 0 ; i < thread_count ; i++)
> +             if (thread_pool__addthread() < 0)
> +                     return i;
> +
> +     return i;
> +}
> +
> +void *thread_pool__add_jobtype(struct kvm *kvm,
> +                                                             
> kvm_thread_callback_fn_t callback,
> +                                                             void *data)

Is thread_pool__add_job() better?

> +{
> +     struct thread_pool__job_info *job = calloc(1, sizeof(*job));
> +
> +     *job = (struct thread_pool__job_info) {
> +             .kvm            = kvm,
> +             .data           = data,
> +             .callback       = callback,
> +             .mutex          = PTHREAD_MUTEX_INITIALIZER
> +     };
> +
> +     return job;
> +}
> +
> +void thread_pool__signal_work(void *job)

I think thread_pool__signal_job() or thread_pool__do_job()
would be more consistent.

Consumer of this API can simply use it with: thread_pool_{add,do}_job().

> +{
> +     struct thread_pool__job_info *jobinfo = job;
> +
> +     if (jobinfo == NULL)
> +             return;
> +
> +     mutex_lock(&jobinfo->mutex);
> +     if (jobinfo->signalcount++ == 0)
> +             thread_pool__job_info_push_locked(job);
> +     mutex_unlock(&jobinfo->mutex);
> +
> +     pthread_cond_signal(&job_cond);
> +}


-- 
Best Regards,
Asias He
--
To unsubscribe from this list: send the line "unsubscribe kvm" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to