The company I work for has over the last couple of years created an
application server for use in most of our customer projects. It embeds Python
and most project code is written in Python by now. It is quite resource-hungry
(several GB of RAM, MySQL databases of 50-100GB). And of course it is
multi-threaded and, at least originally, we hoped to make it utilize multiple
processor cores. Which, as we all know, doesn't sit very well with Python. Our
application runs heavy background calculations most of the time (in Python)
and has to service multiple (few) GUI clients at the same time, also using
Python. The problem was that a single background thread would increase the
response time of the client threads by a factor of 10 or (usually) more.

This led me to add a dirty hack to the Python core to make it switch threads
more frequently. While this hack greatly improved response time for the GUI
clients, it also slowed down the background threads quite a bit. top would
often show significantly less CPU usage -- 80% instead of the more usual 100%.

The problem with thread switching in Python is that the global semaphore used
for the GIL is regularly released and immediately reacquired. Unfortunately,
most of the time this leads to the very same thread winning the race on the
semaphore again and thus more wait time for the other threads. This is where
my dirty patch intervened and just did a nanosleep() for a short amount of
time (I used 1000 nsecs).

I have then created a better scheduling scheme and written a small test
program that nicely mimics what Python does for some statistics. I call the
scheduling algorithm the round-robin semaphore because threads can now run in
a more or less round-robin fashion. Actually, it's just a semaphore with FIFO
semantics.

The implementation problem with the round-robin semaphore is the __thread
variable I had to use because I did not want to change the signature of the
Enter() and Leave() methods. For CPython, I have replaced this thread-local
allocation with an additional field in the PyThreadState. Because of that, the
patch for CPython I have already created is a bit more involved than the
simple nanosleep() hack. Consequently, it's not very polished yet and not at
all as portable as the rest of the Python core.

I now show you the results from the test program which compares all three
scheduling mechanisms -- standard python, my dirty hack and the new
round-robin semaphore. I also show you the test program containing the three
implementations nicely encapsulated.

The program was run on a quad-core Xeon 1.86 GHz on Fedora 5 x86_64. The first
three lines from the output (including the name of the algorithm) should be
self-explanatory. The fourth and the fifth show a distribution of wait times
for the individual threads. The ideal distribution would be everything on the
number of threads (2 in this case) and zero everywhere else. As you can see,
the round-robin semaphore is pretty close to that. Also, because of the high
thread switching frequency, we could lower Python's checkinterval -- the jury
is still out on the actual value, likely something between 1000 and 10000.

I can post my Python patch if there is enough interest.

Thanks for your attention.


Synch: Python lock
iteration count: 24443
thread switches: 10
     1     2     3     4     5     6     7     8     9    10   -10   -50  -100  
 -1k more
 24433     0     0     0     0     0     0     0     0     0     0     1     1  
   6     0

Synch: Dirty lock
iteration count: 25390
thread switches: 991
     1     2     3     4     5     6     7     8     9    10   -10   -50  -100  
 -1k more
 24399    10     0     0     0     0     1     0     1     0   975     1     1  
   0     0

Synch: round-robin semaphore
iteration count: 23023
thread switches: 22987
     1     2     3     4     5     6     7     8     9    10   -10   -50  -100  
 -1k more
    36 22984     0     0     0     0     0     0     0     0     1     0     0  
   0     0
// compile with g++ -g -O0 -pthread -Wall p.cpp

#include <pthread.h>
#include <semaphore.h>

#include <stdio.h>
#include <stdlib.h>

#include <string.h>
#include <errno.h>
#include <assert.h>

//////////////////////////////
// posix stuff

class TMutex {
    pthread_mutex_t mutex;

    static pthread_mutex_t initializer_normal;
    static pthread_mutex_t initializer_recursive;
    TMutex(const TMutex &);
    TMutex &operator=(const TMutex &);
public:
    TMutex(bool recursive = true);
    ~TMutex() { pthread_mutex_destroy(&mutex); }
    void Lock() { pthread_mutex_lock(&mutex); }
    bool TryLock() { return pthread_mutex_trylock(&mutex) == 0;}
    void Unlock() { pthread_mutex_unlock(&mutex); }

    friend class TCondVar;
};

class TCondVar {
    pthread_cond_t cond;

    static pthread_cond_t initializer;
    TCondVar(const TCondVar &);
    TCondVar &operator=(const TCondVar &);
public:
    TCondVar();
    ~TCondVar() { pthread_cond_destroy(&cond); }
    void Wait(TMutex *mutex) { pthread_cond_wait(&cond, &mutex->mutex); }
    void TimedWait(TMutex *mutex, const struct timespec *abstime) {
        pthread_cond_timedwait(&cond, &mutex->mutex, abstime); }
    void Signal() { pthread_cond_signal(&cond); }
    void Broadcast() { pthread_cond_broadcast(&cond); }
};

class TSemaphore {
    sem_t sem;
    TSemaphore(const TSemaphore &);
    TSemaphore &operator=(const TSemaphore &);
public:
    TSemaphore(unsigned value) { sem_init(&sem, 0, value); }
    ~TSemaphore() { sem_destroy(&sem); }
    void Wait();
    bool TryWait() { return sem_trywait(&sem) == 0; }
    void Post() { sem_post(&sem); }
    int Value() { int val; sem_getvalue(&sem, &val); return val; }
};

pthread_mutex_t TMutex::initializer_normal =
    PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t TMutex::initializer_recursive =
    PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;

TMutex::TMutex(bool recursive)
{
    mutex = recursive ? initializer_recursive : initializer_normal;
}

pthread_cond_t TCondVar::initializer = PTHREAD_COND_INITIALIZER;

TCondVar::TCondVar()
{
    cond = initializer;
}

void TSemaphore::Wait()
{
    while (sem_wait(&sem) == -1)
        assert(errno == EINTR);
}

//////////////////////////////
// misc stuff

void Sleep(unsigned long ms)
{
    struct timespec t;
    t.tv_sec = ms / 1000;
    t.tv_nsec = (ms % 1000) * 1000000;
    do { } while (nanosleep(&t, &t) < 0 && errno == EINTR);
}

class ScopedLock
{
    TMutex &mutex;
public:
    ScopedLock(TMutex &mutex):mutex(mutex) {mutex.Lock();}
    ~ScopedLock() {mutex.Unlock();}
};

//////////////////////////////
// statistics

struct Stats {
    static const int N_SLOTS = 15;
    int the_counts[N_SLOTS];

    Stats();
    void count(int slot);
    void print();

    static int categorize(int slot);
};

Stats::Stats()
{
    for (int i=0; i<N_SLOTS; i++)
        the_counts[i] = 0;
}

void Stats::print()
{
    printf("     1     2     3     4     5     6     7     8     9    10   -10  
 -50  -100   -1k more\n");

    for (int i=0; i<N_SLOTS; i++)
        printf("%6d", the_counts[i]);
    printf("\n");
}

void Stats::count(int slot)
{
    assert(slot >= 0);
    if (slot < N_SLOTS)
        the_counts[slot]++;
    else
        the_counts[N_SLOTS-1]++;
}

int Stats::categorize(int slot)
{
    if (slot < 10)
        return slot;
    if (slot < 50)
        return 10;
    if (slot < 100)
        return 11;
    if (slot < 1000)
        return 12;
    if (slot < 10000)
        return 13;
    if (slot < 100000)
        return 14;
    if (slot < 1000000)
        return 15;
    return 100000;
}

Stats stats;

//////////////////////////////


class TStandardPython {
    TSemaphore the_sem;

public:
    TStandardPython(): the_sem(1)
        { }
    void Enter() { the_sem.Wait(); }
    void Leave() { the_sem.Post(); }
    bool NeedsSwitch() const { return true; }

    static const char *name() { return "Python lock"; }
};

class TMyDirtyLock {
    TSemaphore the_sem;
    bool waitflag;

public:
    TMyDirtyLock(): the_sem(1)
                   , waitflag(false)
        { }
    void Enter();
    void Leave() { the_sem.Post(); }
    bool NeedsSwitch() const { return waitflag; }

    static const char *name() { return "Dirty lock"; }
};

void TMyDirtyLock::Enter()
{
    if (waitflag)
    {
        struct timespec t;
        t.tv_sec = 0;
        t.tv_nsec = 1000;
        nanosleep(&t, &t);
    }

    waitflag = true;
    the_sem.Wait();
    waitflag = false;
}

struct TRoundRobinSemaphore {
    struct LinkStruct {
        LinkStruct *queue_next;
        TCondVar wait;

        LinkStruct(): queue_next(0)
            { }
    };

private:
    TMutex the_lock;
    LinkStruct *wait_queue, *wait_last;

public:
    TRoundRobinSemaphore(): wait_queue(0)
        { }

    void Enter();
    void Leave();
    bool NeedsSwitch() const { return wait_queue; }

    static const char *name() { return "round-robin semaphore"; }
};

__thread TRoundRobinSemaphore::LinkStruct *t_linkstruct = 0;

void TRoundRobinSemaphore::Enter()
{
    if (!t_linkstruct)
        t_linkstruct = new LinkStruct;

    LinkStruct *ls = t_linkstruct;

    ScopedLock l(the_lock);

    // nobody there? -> done
    if (!wait_queue)
    {
        wait_queue = ls;
        wait_last = ls;
        return;
    }

    assert(wait_queue != ls);
    assert(wait_last->queue_next == 0);
    wait_last->queue_next = ls;
    wait_last = ls;

    while (wait_queue != ls)
        ls->wait.Wait(&the_lock);
}

void TRoundRobinSemaphore::Leave()
{
    ScopedLock l(the_lock);

    LinkStruct *ls = t_linkstruct;
    assert(wait_queue == ls);

    wait_queue = ls->queue_next;
    if (wait_queue)
    {
        ls->queue_next = 0;
        wait_queue->wait.Signal();
    }
}

struct sync_stuff {
    typedef TRoundRobinSemaphore LockInUse;
    LockInUse the_lock;

    int go_count;
    TMutex go_lock;
    TCondVar go_wait, go_wait2;
    bool gogogo;
    bool quit_now;

    int it_count;
    int sw_count;
    int last_thread;

    sync_stuff(): go_count(0)
                , gogogo(false)
                , quit_now(false)
                , it_count(0)
                , sw_count(0)
                , last_thread(-1)
        {
            printf("Synch: %s\n", LockInUse::name());
        }
};

struct thread_struct {
    sync_stuff *ss;
    int num;
};

static void *the_thread(void *startup)
{
    thread_struct *ss2 = static_cast<thread_struct*>(startup);
    sync_stuff *ss = ss2->ss;

    {
        ScopedLock l(ss->go_lock);
        ss->go_count++;
        ss->go_wait2.Signal();
        while (!ss->gogogo)
            ss->go_wait.Wait(&ss->go_lock);
    }

    Sleep(10);

    bool firstrun = true;
    bool reenter = false;

    int last_count = -1;
    for (;;)
    {
        if (firstrun || reenter)
            ss->the_lock.Enter();
        if (firstrun)
            Sleep(1);

        // do some work
        for (int i=0; i<10000; i++)
            ;

        if (last_count != -1)
        {
            int sl = ss->it_count - last_count;
            stats.count(Stats::categorize(sl));
        }

        if (ss->last_thread != ss2->num)
            ss->sw_count++;
        ss->last_thread = ss2->num;
        last_count = ++ss->it_count;

        reenter = ss->the_lock.NeedsSwitch();
        if (reenter)
            ss->the_lock.Leave();

        firstrun = false;

        if (ss->quit_now) {
            if (!reenter)
                ss->the_lock.Leave();
            break;
        }
    }

    {
        ScopedLock l(ss->go_lock);
        ss->go_count++;
        ss->go_wait2.Signal();
    }

    return 0;
}

int main()
{
    sync_stuff stuff;
    int go_num;

    for (int i=0; i<2; i++)
    {
        pthread_t tid;

        thread_struct *ss = new thread_struct;
        ss->ss = &stuff;
        ss->num = i+1;

        pthread_create(&tid, NULL, the_thread, ss);
        go_num = i+1;
    }

    {
        ScopedLock l(stuff.go_lock);
        while (stuff.go_count != go_num)
            stuff.go_wait2.Wait(&stuff.go_lock);
        stuff.go_count = 0;
        stuff.gogogo = true;
        stuff.go_wait.Broadcast();
    }

    Sleep(1000);

    {
        ScopedLock l(stuff.go_lock);
        stuff.quit_now = true;
        while (stuff.go_count != go_num)
            stuff.go_wait2.Wait(&stuff.go_lock);
    }

    printf("iteration count: %d\n", stuff.it_count);
    printf("thread switches: %d\n", stuff.sw_count);
    stats.print();

    return 0;
}
_______________________________________________
Python-Dev mailing list
Python-Dev@python.org
http://mail.python.org/mailman/listinfo/python-dev
Unsubscribe: 
http://mail.python.org/mailman/options/python-dev/archive%40mail-archive.com

Reply via email to