On 19Nov2018 23:52, Dimitar Ivanov <dimitarxiva...@gmail.com> wrote:
I'm having a hard time getting my head around threads so I was hoping
someone who has better understanding of their underlying functionality
could lend me a helping hand, in particular how threads work with each
other when using thread.join() and Semaphore set with maximum value. I'll
try to keep it as clear and concise as possible, but please don't hesitate
to ask if anything about my approach is unclear or, frankly, awful.

I'm writing a script that performs a couple of I/O operations and CLI
commands for each element in a list of IDs. The whole process takes a while
and may vary based on the ID, hence the threading approach sounded like the
best fit since next ID can start once space has freed up. I'm parsing an
extract of my code below and will explain what I can't properly understand
underneath.

I'm just going to scatter a few random remarks about your code in here with your listing before addressing your other queries lower down...

file1.py
---------
ids = [<IDs listed here>]
threadsPool = []
for id in ids:

The name "id" is unfortunate as it conflicts with the id() builtin function. Maybe "element_id"? Wordier, but also more clear.

 thread = threading.Thread(target=file2.runStuff, name=str(id), args=(id,
))
 threadsPool.append(thread)
for thread in threadsPool:
 thread.start()

You could start each thread right after creating it if you wished.

for thread in threadsPool:
 print(thread.enumerate())

"enumerate" is a function from the threading module, not a method of a Thread. So try:

 print(threading.enumerate())

Frankly I'm surprised that "thread.enumerate" works at all.

 print("Queuing thread" + str(thread))
 thread.join()

file2.py
----------
queue = threading.Semaphore(2)

I'd be disinclined to call this a "queue", which usually implies a FIFO list of some variety: put things onto it, and pull things off it, usually first in first off. Maybe just "sem" or "thread_capacity" or something?

def runStuff(id):
 queue.acquire()
 print("Lock acquired for " + str(id))
 file3.doMoreStuff()
 file4.evenMoreStuff()
 queue.release()

Onto my confusion - as long as I don't try to print information about the
thread that's being queued or the total amount of threads using
.enumerate(), the script is working absolutely flawlessly, each thread that
doesn't have a lock is waiting until it acquires it and then moves on. I
decided it'd be nice to be able to provide more information about which
thread starts next and how many threads are active right now (each can take
a different amount of time), however, when I tried to do that, my log was
showing me some pretty funky output which at first made me believe I've
messed up all my threads, example:


<<  2018-11-19 15:01:38,094 file2 [ID09] INFO - Lock acquired for
ID09                 <---- this is from file2.py
------ some time later and other logs in here ---------
[<_MainThread(MainThread, started 140431033562880)>, <Thread(ID09, started
140430614177536)>] <---- output from thread.enumerate(), file1.py
<<  2018-11-19 15:01:38,103 file1 [MainThread] DEBUG - Queuing thread -
<Thread(ID09, started 140430614177536)> <---- output from print() right
after thread.enumerate()

After some head scratching, I believe I've finally tracked down the reason
for my confusion:

The .start() loop starts the threads and the first 2 acquire a lock
immediately and start running, later on the .join() queue puts the rest in
waiting for lock, that's fine, what I didn't realize, of course, is that
the .join() loop goes through threads that have already been instantly
kicked off by the .start() loop (the first 2 threads since Semaphore allows
2 locks) and then my print in that loop is telling me that those threads
are being queued, except they aren't since they are already running, it's
just my text is telling me that, since I wasn't smart enough to realize
what's about to happen, as seen below:

<<  2018-11-19 15:01:33,094 file1.py [MainThread] DEBUG - Queuing thread -
<Thread(ID02, stopped 140430666626816)> <--- makes it clear the thread has
already even finished

Yes. The .join() has NO EFFECT on the Thread itself: it doesn't start it or stop it. It just waits for the Thread to complete. So yes, your log message is misleading.

Which finally gets me to my cry for help - I know I can't modify the
threadsPool list to remove the threads already created on the fly, so I can
have only the ones pending to be queued in the 2nd loop, but for the life
of me I can't think of a proper way to try and extract some information
about what threads are still going (or rather, have finished since
thread.enumerate() shows both running and queued).

Well, what you'd probably _like_ is a way to be told about each Thread as it completes, and report them then. Which you can do using a Queue, getting each Thread to report its completion to the Queue as it happens.

Untested example:

 from queue import Queue
 q = Queue()
 threadsPool = []
 for id in ids:
   thread = threading.Thread(target=file2.runStuff, name=str(id), args=(id, q))
   threadsPool.append(thread)

and modify runStuff thus:

 def runStuff(id, q):
   sem.aquire()
   ...
   sem.release()
   q.put(id)

After the threads are started, collect completed ids:

 for count in range(len(ids)):
   id = q.get()
   print("completed work on id %r" % (id,))

You'll notice no .join() there. Getting the id off the queue "q" implies that the Thread has completed.

I have the feeling I'm using a very wrong approach in trying to extract
that information in the .join() loop, since it only goes back to it once a
thread has finished, but at the same time it feels like the perfect timing.

You're collecting specific threads. If other threads complete earlier than that specific thread, they don't get reported immediately. You're reporting threads in the order you made them, not in the order they complete. Using a Queue and not worrying about the threads themselves lets you gets ids as they're done, in whatever order.

And just in case you are wondering why I have my threads starting in
file1.py and my Semaphore queue in file2.py, it's because I wanted to split
the runStuff(id) function in a separate module due to its length. I don't
know if it's a good way to do it, but thankfully the Python interpreter is
smart enough to see through my ignorance.

I usually split modules based on function, not size. Put related things in the same module. Often in classes, but let us not go there yet.

If you want to get a little funky, separate the runStuff code which works on the id from the control code (the semaphore use). You could then run the queue collection in its own thread. Have the main control code also manage the semaphore:

 q = Queue()
 threadsPool = []
 for id in ids:
   thread = threading.Thread(target=file2.runStuff, name=str(id), args=(id, q))
   threadsPool.append(thread)

 collector = Thread(target=collect, args=(q, ids))
 collector.start()

 for thread in threadPool:
   sem.acquire()
   thread.start()

 # wait for collection to complete
 collector.join()

 def collect(q, ids):
   for count in range(len(ids)):
     id = q.get()
     sem.release()

so that you acquire the semaphore before starting each thread, and release the semaphore as threads complete and report their id values.

Because these things need to happen in parallel (acquire, start versus q.get, release) you run the collector in its own thread.

Does this clear anything up?

Cheers,
Cameron Simpson <c...@cskk.id.au>
_______________________________________________
Tutor maillist  -  Tutor@python.org
To unsubscribe or change subscription options:
https://mail.python.org/mailman/listinfo/tutor

Reply via email to