Re: the semi-resident thread pool
On Saturday, 30 May 2009 at 13:36:41 UTC, zsxxsz wrote: Sweet! Does the code want a license? The thread-pool is just one little part of my plan migrating acl_project written with C to adl_project written with D. The original acl_project has many server framework. Anyone can use it under the GPL. I have copied the source in this article into my own source files and have just started to use the thread pool. Since you did not provide any license it will effectively be licensed under the GPL v2 license as stated for my own source. I'll add a comment stating who made the particular code above the code.
Re: the semi-resident thread pool
> Sweet! Does the code want a license? The thread-pool is just one little part of my plan migrating acl_project written with C to adl_project written with D. The original acl_project has many server framework. Anyone can use it under the GPL.
Re: the semi-resident thread pool
On Sat, 30 May 2009 08:14:08 +0400, zsxxsz wrote: Hi, I written one thread pool in which each thread is semi-resident. The thread-pool is different from the Tango's one. Any thread of the thread-pool will exit when it is idle for the timeout. That is to say, all threads for jobs, and no job no thread. The thread-pool was from my C version. With D, I wrote it more easier with the delegate function. You may consider adding your code to a scrapple project at dsource.org (http://www.dsource.org/projects/scrapple) This way you will keep your code up-to-date and more people will be able to use it.
Re: the semi-resident thread pool
zsxxsz wrote: Hi, I written one thread pool in which each thread is semi-resident. The thread-pool is different from the Tango's one. Any thread of the thread-pool will exit when it is idle for the timeout. That is to say, all threads for jobs, and no job no thread. The thread-pool was from my C version. With D, I wrote it more easier with the delegate function. Below is the source code: module adl.thread_pool; import core.sys.posix.pthread; // just for pthread_self() import core.thread; import core.sync.mutex; import core.sync.condition; import std.c.time; private struct Job { Job *next; void function() fn; void delegate() dg; void *arg; int call; } /** * semi-daemon thread of thread pool */ class CThreadPool { public: /** * Constructs a CThreadPool * @param nMaxThread {int} the max number threads in thread pool * @param idleTimeout {int} when > 0, the idle thread will * exit after idleTimeout seconds, if == 0, the idle thread * will not exit * @param sz {size_t} when > 0, the thread will be created which * stack size is sz. */ this(int nMaxThread, int idleTimeout, size_t sz = 0) { m_nMaxThread = nMaxThread; m_idleTimeout = idleTimeout; m_stackSize = sz; m_mutex = new Mutex; m_cond = new Condition(m_mutex); } /** * Append one task into the thread pool's task queue * @param fn {void function()} */ void append(void function() fn) { Job *job; char buf[256]; if (fn == null) throw new Exception("fn null"); job = new Job; job.fn = fn; job.next = null; job.call = Call.FN; m_mutex.lock(); append(job); m_mutex.unlock(); } /** * Append one task into the thread pool's task queue * @param dg {void delegate()} */ void append(void delegate() dg) { Job *job; char buf[256]; if (dg == null) throw new Exception("dg null"); job = new Job; job.dg = dg; job.next = null; job.call = Call.DG; m_mutex.lock(); append(job); m_mutex.unlock(); } /** * If dg not null, when one new thread is created, dg will be called. * @param dg {void delegate()} */ void onThreadInit(void delegate() dg) { m_onThreadInit = dg; } /** * If dg not null, before one thread exits, db will be called. * @param dg {void delegate()} */ void onThreadExit(void delegate() dg) { m_onThreadExit = dg; } private: enum Call { NO, FN, DG } Mutex m_mutex; Condition m_cond; size_t m_stackSize = 0; Job* m_jobHead = null, m_jobTail = null; int m_nJob = 0; bool m_isQuit = false; int m_nThread = 0; int m_nMaxThread; int m_nIdleThread = 0; int m_overloadTimeWait = 0; int m_idleTimeout; time_t m_lastWarn; void delegate() m_onThreadInit; void delegate() m_onThreadExit; void append(Job *job) { if (m_jobHead == null) m_jobHead = job; else m_jobTail.next = job; m_jobTail = job; m_nJob++; if (m_nIdleThread > 0) { m_cond.notify(); } else if (m_nThread < m_nMaxThread) { Thread thread = new Thread(&doJob); thread.isDaemon = true; thread.start(); m_nThread++; } else if (m_nJob > 10 * m_nMaxThread) { time_t now = time(null); if (now - m_lastWarn >= 2) { m_lastWarn = now; } if (m_overloadTimeWait > 0) { Thread.sleep(m_overloadTimeWait); } } } void doJob() { Job *job; int status; bool timedout; long period = m_idleTimeout * 10_000_000; if (m_onThreadInit != null) m_onThreadInit(); m_mutex.lock(); for (;;) { timedout = false; while (m_jobHead == null && !m_isQuit) { m_nIdle
the semi-resident thread pool
Hi, I written one thread pool in which each thread is semi-resident. The thread-pool is different from the Tango's one. Any thread of the thread-pool will exit when it is idle for the timeout. That is to say, all threads for jobs, and no job no thread. The thread-pool was from my C version. With D, I wrote it more easier with the delegate function. Below is the source code: module adl.thread_pool; import core.sys.posix.pthread; // just for pthread_self() import core.thread; import core.sync.mutex; import core.sync.condition; import std.c.time; private struct Job { Job *next; void function() fn; void delegate() dg; void *arg; int call; } /** * semi-daemon thread of thread pool */ class CThreadPool { public: /** * Constructs a CThreadPool * @param nMaxThread {int} the max number threads in thread pool * @param idleTimeout {int} when > 0, the idle thread will * exit after idleTimeout seconds, if == 0, the idle thread * will not exit * @param sz {size_t} when > 0, the thread will be created which * stack size is sz. */ this(int nMaxThread, int idleTimeout, size_t sz = 0) { m_nMaxThread = nMaxThread; m_idleTimeout = idleTimeout; m_stackSize = sz; m_mutex = new Mutex; m_cond = new Condition(m_mutex); } /** * Append one task into the thread pool's task queue * @param fn {void function()} */ void append(void function() fn) { Job *job; char buf[256]; if (fn == null) throw new Exception("fn null"); job = new Job; job.fn = fn; job.next = null; job.call = Call.FN; m_mutex.lock(); append(job); m_mutex.unlock(); } /** * Append one task into the thread pool's task queue * @param dg {void delegate()} */ void append(void delegate() dg) { Job *job; char buf[256]; if (dg == null) throw new Exception("dg null"); job = new Job; job.dg = dg; job.next = null; job.call = Call.DG; m_mutex.lock(); append(job); m_mutex.unlock(); } /** * If dg not null, when one new thread is created, dg will be called. * @param dg {void delegate()} */ void onThreadInit(void delegate() dg) { m_onThreadInit = dg; } /** * If dg not null, before one thread exits, db will be called. * @param dg {void delegate()} */ void onThreadExit(void delegate() dg) { m_onThreadExit = dg; } private: enum Call { NO, FN, DG } Mutex m_mutex; Condition m_cond; size_t m_stackSize = 0; Job* m_jobHead = null, m_jobTail = null; int m_nJob = 0; bool m_isQuit = false; int m_nThread = 0; int m_nMaxThread; int m_nIdleThread = 0; int m_overloadTimeWait = 0; int m_idleTimeout; time_t m_lastWarn; void delegate() m_onThreadInit; void delegate() m_onThreadExit; void append(Job *job) { if (m_jobHead == null) m_jobHead = job; else m_jobTail.next = job; m_jobTail = job; m_nJob++; if (m_nIdleThread > 0) { m_cond.notify(); } else if (m_nThread < m_nMaxThread) { Thread thread = new Thread(&doJob); thread.isDaemon = true; thread.start(); m_nThread++; } else if (m_nJob > 10 * m_nMaxThread) { time_t now = time(null); if (now - m_lastWarn >= 2) { m_lastWarn = now; } if (m_overloadTimeWait > 0) { Thread.sleep(m_overloadTimeWait); } } } void doJob() { Job *job; int status; bool timedout; long period = m_idleTimeout * 10_000_000; if (m_onThreadInit != null) m_onThreadInit(); m_mutex.lock(); for (;;) { timedout = false; while (m_jobHead == null && !m_isQuit) { m_nIdleThread++;