The company I work for has over the last couple of years created an application server for use in most of our customer projects. It embeds Python and most project code is written in Python by now. It is quite resource-hungry (several GB of RAM, MySQL databases of 50-100GB). And of course it is multi-threaded and, at least originally, we hoped to make it utilize multiple processor cores. Which, as we all know, doesn't sit very well with Python. Our application runs heavy background calculations most of the time (in Python) and has to service multiple (few) GUI clients at the same time, also using Python. The problem was that a single background thread would increase the response time of the client threads by a factor of 10 or (usually) more.
This led me to add a dirty hack to the Python core to make it switch threads more frequently. While this hack greatly improved response time for the GUI clients, it also slowed down the background threads quite a bit. top would often show significantly less CPU usage -- 80% instead of the more usual 100%. The problem with thread switching in Python is that the global semaphore used for the GIL is regularly released and immediately reacquired. Unfortunately, most of the time this leads to the very same thread winning the race on the semaphore again and thus more wait time for the other threads. This is where my dirty patch intervened and just did a nanosleep() for a short amount of time (I used 1000 nsecs). I have then created a better scheduling scheme and written a small test program that nicely mimics what Python does for some statistics. I call the scheduling algorithm the round-robin semaphore because threads can now run in a more or less round-robin fashion. Actually, it's just a semaphore with FIFO semantics. The implementation problem with the round-robin semaphore is the __thread variable I had to use because I did not want to change the signature of the Enter() and Leave() methods. For CPython, I have replaced this thread-local allocation with an additional field in the PyThreadState. Because of that, the patch for CPython I have already created is a bit more involved than the simple nanosleep() hack. Consequently, it's not very polished yet and not at all as portable as the rest of the Python core. I now show you the results from the test program which compares all three scheduling mechanisms -- standard python, my dirty hack and the new round-robin semaphore. I also show you the test program containing the three implementations nicely encapsulated. The program was run on a quad-core Xeon 1.86 GHz on Fedora 5 x86_64. The first three lines from the output (including the name of the algorithm) should be self-explanatory. The fourth and the fifth show a distribution of wait times for the individual threads. The ideal distribution would be everything on the number of threads (2 in this case) and zero everywhere else. As you can see, the round-robin semaphore is pretty close to that. Also, because of the high thread switching frequency, we could lower Python's checkinterval -- the jury is still out on the actual value, likely something between 1000 and 10000. I can post my Python patch if there is enough interest. Thanks for your attention.
Synch: Python lock iteration count: 24443 thread switches: 10 1 2 3 4 5 6 7 8 9 10 -10 -50 -100 -1k more 24433 0 0 0 0 0 0 0 0 0 0 1 1 6 0 Synch: Dirty lock iteration count: 25390 thread switches: 991 1 2 3 4 5 6 7 8 9 10 -10 -50 -100 -1k more 24399 10 0 0 0 0 1 0 1 0 975 1 1 0 0 Synch: round-robin semaphore iteration count: 23023 thread switches: 22987 1 2 3 4 5 6 7 8 9 10 -10 -50 -100 -1k more 36 22984 0 0 0 0 0 0 0 0 1 0 0 0 0
// compile with g++ -g -O0 -pthread -Wall p.cpp #include <pthread.h> #include <semaphore.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <errno.h> #include <assert.h> ////////////////////////////// // posix stuff class TMutex { pthread_mutex_t mutex; static pthread_mutex_t initializer_normal; static pthread_mutex_t initializer_recursive; TMutex(const TMutex &); TMutex &operator=(const TMutex &); public: TMutex(bool recursive = true); ~TMutex() { pthread_mutex_destroy(&mutex); } void Lock() { pthread_mutex_lock(&mutex); } bool TryLock() { return pthread_mutex_trylock(&mutex) == 0;} void Unlock() { pthread_mutex_unlock(&mutex); } friend class TCondVar; }; class TCondVar { pthread_cond_t cond; static pthread_cond_t initializer; TCondVar(const TCondVar &); TCondVar &operator=(const TCondVar &); public: TCondVar(); ~TCondVar() { pthread_cond_destroy(&cond); } void Wait(TMutex *mutex) { pthread_cond_wait(&cond, &mutex->mutex); } void TimedWait(TMutex *mutex, const struct timespec *abstime) { pthread_cond_timedwait(&cond, &mutex->mutex, abstime); } void Signal() { pthread_cond_signal(&cond); } void Broadcast() { pthread_cond_broadcast(&cond); } }; class TSemaphore { sem_t sem; TSemaphore(const TSemaphore &); TSemaphore &operator=(const TSemaphore &); public: TSemaphore(unsigned value) { sem_init(&sem, 0, value); } ~TSemaphore() { sem_destroy(&sem); } void Wait(); bool TryWait() { return sem_trywait(&sem) == 0; } void Post() { sem_post(&sem); } int Value() { int val; sem_getvalue(&sem, &val); return val; } }; pthread_mutex_t TMutex::initializer_normal = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t TMutex::initializer_recursive = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP; TMutex::TMutex(bool recursive) { mutex = recursive ? initializer_recursive : initializer_normal; } pthread_cond_t TCondVar::initializer = PTHREAD_COND_INITIALIZER; TCondVar::TCondVar() { cond = initializer; } void TSemaphore::Wait() { while (sem_wait(&sem) == -1) assert(errno == EINTR); } ////////////////////////////// // misc stuff void Sleep(unsigned long ms) { struct timespec t; t.tv_sec = ms / 1000; t.tv_nsec = (ms % 1000) * 1000000; do { } while (nanosleep(&t, &t) < 0 && errno == EINTR); } class ScopedLock { TMutex &mutex; public: ScopedLock(TMutex &mutex):mutex(mutex) {mutex.Lock();} ~ScopedLock() {mutex.Unlock();} }; ////////////////////////////// // statistics struct Stats { static const int N_SLOTS = 15; int the_counts[N_SLOTS]; Stats(); void count(int slot); void print(); static int categorize(int slot); }; Stats::Stats() { for (int i=0; i<N_SLOTS; i++) the_counts[i] = 0; } void Stats::print() { printf(" 1 2 3 4 5 6 7 8 9 10 -10 -50 -100 -1k more\n"); for (int i=0; i<N_SLOTS; i++) printf("%6d", the_counts[i]); printf("\n"); } void Stats::count(int slot) { assert(slot >= 0); if (slot < N_SLOTS) the_counts[slot]++; else the_counts[N_SLOTS-1]++; } int Stats::categorize(int slot) { if (slot < 10) return slot; if (slot < 50) return 10; if (slot < 100) return 11; if (slot < 1000) return 12; if (slot < 10000) return 13; if (slot < 100000) return 14; if (slot < 1000000) return 15; return 100000; } Stats stats; ////////////////////////////// class TStandardPython { TSemaphore the_sem; public: TStandardPython(): the_sem(1) { } void Enter() { the_sem.Wait(); } void Leave() { the_sem.Post(); } bool NeedsSwitch() const { return true; } static const char *name() { return "Python lock"; } }; class TMyDirtyLock { TSemaphore the_sem; bool waitflag; public: TMyDirtyLock(): the_sem(1) , waitflag(false) { } void Enter(); void Leave() { the_sem.Post(); } bool NeedsSwitch() const { return waitflag; } static const char *name() { return "Dirty lock"; } }; void TMyDirtyLock::Enter() { if (waitflag) { struct timespec t; t.tv_sec = 0; t.tv_nsec = 1000; nanosleep(&t, &t); } waitflag = true; the_sem.Wait(); waitflag = false; } struct TRoundRobinSemaphore { struct LinkStruct { LinkStruct *queue_next; TCondVar wait; LinkStruct(): queue_next(0) { } }; private: TMutex the_lock; LinkStruct *wait_queue, *wait_last; public: TRoundRobinSemaphore(): wait_queue(0) { } void Enter(); void Leave(); bool NeedsSwitch() const { return wait_queue; } static const char *name() { return "round-robin semaphore"; } }; __thread TRoundRobinSemaphore::LinkStruct *t_linkstruct = 0; void TRoundRobinSemaphore::Enter() { if (!t_linkstruct) t_linkstruct = new LinkStruct; LinkStruct *ls = t_linkstruct; ScopedLock l(the_lock); // nobody there? -> done if (!wait_queue) { wait_queue = ls; wait_last = ls; return; } assert(wait_queue != ls); assert(wait_last->queue_next == 0); wait_last->queue_next = ls; wait_last = ls; while (wait_queue != ls) ls->wait.Wait(&the_lock); } void TRoundRobinSemaphore::Leave() { ScopedLock l(the_lock); LinkStruct *ls = t_linkstruct; assert(wait_queue == ls); wait_queue = ls->queue_next; if (wait_queue) { ls->queue_next = 0; wait_queue->wait.Signal(); } } struct sync_stuff { typedef TRoundRobinSemaphore LockInUse; LockInUse the_lock; int go_count; TMutex go_lock; TCondVar go_wait, go_wait2; bool gogogo; bool quit_now; int it_count; int sw_count; int last_thread; sync_stuff(): go_count(0) , gogogo(false) , quit_now(false) , it_count(0) , sw_count(0) , last_thread(-1) { printf("Synch: %s\n", LockInUse::name()); } }; struct thread_struct { sync_stuff *ss; int num; }; static void *the_thread(void *startup) { thread_struct *ss2 = static_cast<thread_struct*>(startup); sync_stuff *ss = ss2->ss; { ScopedLock l(ss->go_lock); ss->go_count++; ss->go_wait2.Signal(); while (!ss->gogogo) ss->go_wait.Wait(&ss->go_lock); } Sleep(10); bool firstrun = true; bool reenter = false; int last_count = -1; for (;;) { if (firstrun || reenter) ss->the_lock.Enter(); if (firstrun) Sleep(1); // do some work for (int i=0; i<10000; i++) ; if (last_count != -1) { int sl = ss->it_count - last_count; stats.count(Stats::categorize(sl)); } if (ss->last_thread != ss2->num) ss->sw_count++; ss->last_thread = ss2->num; last_count = ++ss->it_count; reenter = ss->the_lock.NeedsSwitch(); if (reenter) ss->the_lock.Leave(); firstrun = false; if (ss->quit_now) { if (!reenter) ss->the_lock.Leave(); break; } } { ScopedLock l(ss->go_lock); ss->go_count++; ss->go_wait2.Signal(); } return 0; } int main() { sync_stuff stuff; int go_num; for (int i=0; i<2; i++) { pthread_t tid; thread_struct *ss = new thread_struct; ss->ss = &stuff; ss->num = i+1; pthread_create(&tid, NULL, the_thread, ss); go_num = i+1; } { ScopedLock l(stuff.go_lock); while (stuff.go_count != go_num) stuff.go_wait2.Wait(&stuff.go_lock); stuff.go_count = 0; stuff.gogogo = true; stuff.go_wait.Broadcast(); } Sleep(1000); { ScopedLock l(stuff.go_lock); stuff.quit_now = true; while (stuff.go_count != go_num) stuff.go_wait2.Wait(&stuff.go_lock); } printf("iteration count: %d\n", stuff.it_count); printf("thread switches: %d\n", stuff.sw_count); stats.print(); return 0; }
_______________________________________________ Python-Dev mailing list Python-Dev@python.org http://mail.python.org/mailman/listinfo/python-dev Unsubscribe: http://mail.python.org/mailman/options/python-dev/archive%40mail-archive.com