[posted in de.clp in German]

Hello,

I want to implement an alternative concept to worker threads processing a job queue. The alternative consists of threads being the jobs themselves and thus running only this one job. The job threads are started after a Semaphore's acquire() "giving the OK" to do so. The Semaphore's release() gets called inside the jobs/threads, saying "done, the next one please".

But the program doesn't always stop on Ctrl-C.

So I had a look on threading.Semaphore and found

    def acquire(self, blocking=1):
        rc = False
        self.__cond.acquire() [1]
        [...]
        self.__cond.release() [2]
        return rc

    __enter__ = acquire

    def release(self):
        self.__cond.acquire() [3]
        [...]
        self.__cond.release()

Due to Strg-C, after returning from [1] there was a KeyboardInterrupt. [2] was never reached, so the threads deadlocked on [3].

In the other classes (Event, Condition) the "helper lock calls" are wrapped with try...finally gepackt, where after Ctrl-C self.__cond.release() would be called as well, allowing for the currently running jobs to finish.

First I thought about writing a bug report. But does t make it better? Because, with the finally: solution mentionned, an exception still can happen between finally: and release() - or not? It is less probable, the time window being smaller, but not impossible, so this error becomes more subtle.


Thus, the following questions:

0. (First and in general) Am I doing "bad things" when calling release() at a completely different place than acquire() with a Semaphore? I used to believe that's why there are Semaphores?

1. Is a bug report useful, or would that worsen the problem, as the race condition doesn't disappear completely?

2. Is in, in general, an error to work with Semaphores (ore, more general with Locks) in the MainThread? Or should this be done in a separate thread being informed about a keyboard exception by the main thread?


Below is a minimal example (t.py) demonstrating the behaviour.

python -m t 1 does the IMHO bad thing: Ctrl-C stops the MainThread and has the others starve on the lock.

python -m t 2 does spawning in a separate thread. The MainThread reacts to Strg-C with run = False, telling the other threads to stop ASAP.

python -m t 3 improves Semaphore() by putting self.__cond.release() into a finally: clause.

python -m t 4 "destroys" this beautiful new concept by throwing an Asserion at the appropriate (worst case) place (instead of a KeyboardInterrupt).


So the question is: Bug(report) doe to finally:, or used the wrong way?

TIA and greetings,
Thomas

import threading
import time
import random

run = True
destroyed = False

sema = threading.Semaphore(5)
def target():
    print threading.current_thread(), ' works'
    time.sleep(max(random.normalvariate(2, .4), 0))
    sema.release()

class MySema(threading._Semaphore):
    def acquire(self, blocking=1):
        rc = False
        ok = False # for destroying
        try:
            self._Semaphore__cond.acquire()
            while self._Semaphore__value == 0:
                if not blocking:
                    break
                if __debug__:
                    self._note("%s.acquire(%s): blocked waiting, value=%s",
                            self, blocking, self._Semaphore__value)
                self._Semaphore__cond.wait()
            else:
                self._Semaphore__value = self._Semaphore__value - 1
                if __debug__:
                    self._note("%s.acquire: success, value=%s",
                            self, self._Semaphore__value)
                rc = True
            ok = True # for not destroying
        finally:
if destroyed and not ok: assert 0, "hier kaputt, release() doch nicht"
            self._Semaphore__cond.release()
        return rc

    __enter__ = acquire

    def release(self):
        try:
            self._Semaphore__cond.acquire()
            self._Semaphore__value = self._Semaphore__value + 1
            if __debug__:
                self._note("%s.release: success, value=%s",
                           self, self._Semaphore__value)
            self._Semaphore__cond.notify()
        finally:
            self._Semaphore__cond.release()

def thread():
    t = threading.Thread(target=target)
    t.start()
    return t

def erroneous():
    global run
    try:
        while True:
            sema.acquire()
            t = thread()
            print 'main thread spawned', t
    finally:
        run = False

def extrathread():
    global run
    def extra():
        while run:
            sema.acquire()
            t = thread()
            print 'control thread spawned', t
    threading.Thread(target=extra).start()
    try:
        while True:
            time.sleep(.1)
    finally:
        run = False

def alternative():
    global sema
    sema = MySema(5)
    erroneous()


if __name__ == '__main__':
    import sys
    if len(sys.argv) < 2 or sys.argv[1] == '1':
        f = erroneous
    elif sys.argv[1] == '2':
        f = extrathread
    elif sys.argv[1] == '3':
        f = alternative
    else:
        destroyed = True
        f = alternative
    print f
    f()
--
http://mail.python.org/mailman/listinfo/python-list

Reply via email to