Hi Cameron, Massive apologies for the delayed answer!
Your explanation definitely clears up quite a bit of my misunderstanding, thank you for that! There was a reason why I shy away from using Queue, but for the life of me I can't remember right now what that reason was. I will have to modify my code using your example and give it another try, I will make sure to let you know if I run into any issues or additional questions. :) Regards, Dimitar On Tue, 20 Nov 2018 at 08:39, Cameron Simpson <c...@cskk.id.au> wrote: > On 19Nov2018 23:52, Dimitar Ivanov <dimitarxiva...@gmail.com> wrote: > >I'm having a hard time getting my head around threads so I was hoping > >someone who has better understanding of their underlying functionality > >could lend me a helping hand, in particular how threads work with each > >other when using thread.join() and Semaphore set with maximum value. I'll > >try to keep it as clear and concise as possible, but please don't hesitate > >to ask if anything about my approach is unclear or, frankly, awful. > > > >I'm writing a script that performs a couple of I/O operations and CLI > >commands for each element in a list of IDs. The whole process takes a > while > >and may vary based on the ID, hence the threading approach sounded like > the > >best fit since next ID can start once space has freed up. I'm parsing an > >extract of my code below and will explain what I can't properly understand > >underneath. > > I'm just going to scatter a few random remarks about your code in here > with your listing before addressing your other queries lower down... > > >file1.py > >--------- > >ids = [<IDs listed here>] > >threadsPool = [] > >for id in ids: > > The name "id" is unfortunate as it conflicts with the id() builtin > function. Maybe "element_id"? Wordier, but also more clear. > > > thread = threading.Thread(target=file2.runStuff, name=str(id), args=(id, > >)) > > threadsPool.append(thread) > >for thread in threadsPool: > > thread.start() > > You could start each thread right after creating it if you wished. > > >for thread in threadsPool: > > print(thread.enumerate()) > > "enumerate" is a function from the threading module, not a method of a > Thread. So try: > > print(threading.enumerate()) > > Frankly I'm surprised that "thread.enumerate" works at all. > > > print("Queuing thread" + str(thread)) > > thread.join() > > > >file2.py > >---------- > >queue = threading.Semaphore(2) > > I'd be disinclined to call this a "queue", which usually implies a FIFO > list of some variety: put things onto it, and pull things off it, > usually first in first off. Maybe just "sem" or "thread_capacity" or > something? > > >def runStuff(id): > > queue.acquire() > > print("Lock acquired for " + str(id)) > > file3.doMoreStuff() > > file4.evenMoreStuff() > > queue.release() > > > >Onto my confusion - as long as I don't try to print information about the > >thread that's being queued or the total amount of threads using > >.enumerate(), the script is working absolutely flawlessly, each thread > that > >doesn't have a lock is waiting until it acquires it and then moves on. I > >decided it'd be nice to be able to provide more information about which > >thread starts next and how many threads are active right now (each can > take > >a different amount of time), however, when I tried to do that, my log was > >showing me some pretty funky output which at first made me believe I've > >messed up all my threads, example: > > > > > ><< 2018-11-19 15:01:38,094 file2 [ID09] INFO - Lock acquired for > >ID09 <---- this is from file2.py > >------ some time later and other logs in here --------- > >[<_MainThread(MainThread, started 140431033562880)>, <Thread(ID09, started > >140430614177536)>] <---- output from thread.enumerate(), file1.py > ><< 2018-11-19 15:01:38,103 file1 [MainThread] DEBUG - Queuing thread - > ><Thread(ID09, started 140430614177536)> <---- output from print() right > >after thread.enumerate() > > > >After some head scratching, I believe I've finally tracked down the reason > >for my confusion: > > > >The .start() loop starts the threads and the first 2 acquire a lock > >immediately and start running, later on the .join() queue puts the rest in > >waiting for lock, that's fine, what I didn't realize, of course, is that > >the .join() loop goes through threads that have already been instantly > >kicked off by the .start() loop (the first 2 threads since Semaphore > allows > >2 locks) and then my print in that loop is telling me that those threads > >are being queued, except they aren't since they are already running, it's > >just my text is telling me that, since I wasn't smart enough to realize > >what's about to happen, as seen below: > > > ><< 2018-11-19 15:01:33,094 file1.py [MainThread] DEBUG - Queuing thread - > ><Thread(ID02, stopped 140430666626816)> <--- makes it clear the thread has > >already even finished > > Yes. The .join() has NO EFFECT on the Thread itself: it doesn't start it > or stop it. It just waits for the Thread to complete. So yes, your log > message is misleading. > > >Which finally gets me to my cry for help - I know I can't modify the > >threadsPool list to remove the threads already created on the fly, so I > can > >have only the ones pending to be queued in the 2nd loop, but for the life > >of me I can't think of a proper way to try and extract some information > >about what threads are still going (or rather, have finished since > >thread.enumerate() shows both running and queued). > > Well, what you'd probably _like_ is a way to be told about each Thread > as it completes, and report them then. Which you can do using a Queue, > getting each Thread to report its completion to the Queue as it happens. > > Untested example: > > from queue import Queue > q = Queue() > threadsPool = [] > for id in ids: > thread = threading.Thread(target=file2.runStuff, name=str(id), > args=(id, q)) > threadsPool.append(thread) > > and modify runStuff thus: > > def runStuff(id, q): > sem.aquire() > ... > sem.release() > q.put(id) > > After the threads are started, collect completed ids: > > for count in range(len(ids)): > id = q.get() > print("completed work on id %r" % (id,)) > > You'll notice no .join() there. Getting the id off the queue "q" implies > that the Thread has completed. > > >I have the feeling I'm using a very wrong approach in trying to extract > >that information in the .join() loop, since it only goes back to it once a > >thread has finished, but at the same time it feels like the perfect > timing. > > You're collecting specific threads. If other threads complete earlier > than that specific thread, they don't get reported immediately. You're > reporting threads in the order you made them, not in the order they > complete. Using a Queue and not worrying about the threads themselves > lets you gets ids as they're done, in whatever order. > > >And just in case you are wondering why I have my threads starting in > >file1.py and my Semaphore queue in file2.py, it's because I wanted to > split > >the runStuff(id) function in a separate module due to its length. I don't > >know if it's a good way to do it, but thankfully the Python interpreter is > >smart enough to see through my ignorance. > > I usually split modules based on function, not size. Put related things > in the same module. Often in classes, but let us not go there yet. > > If you want to get a little funky, separate the runStuff code which > works on the id from the control code (the semaphore use). You could > then run the queue collection in its own thread. Have the main control > code also manage the semaphore: > > q = Queue() > threadsPool = [] > for id in ids: > thread = threading.Thread(target=file2.runStuff, name=str(id), > args=(id, q)) > threadsPool.append(thread) > > collector = Thread(target=collect, args=(q, ids)) > collector.start() > > for thread in threadPool: > sem.acquire() > thread.start() > > # wait for collection to complete > collector.join() > > def collect(q, ids): > for count in range(len(ids)): > id = q.get() > sem.release() > > so that you acquire the semaphore before starting each thread, and > release the semaphore as threads complete and report their id values. > > Because these things need to happen in parallel (acquire, start versus > q.get, release) you run the collector in its own thread. > > Does this clear anything up? > > Cheers, > Cameron Simpson <c...@cskk.id.au> > _______________________________________________ Tutor maillist - Tutor@python.org To unsubscribe or change subscription options: https://mail.python.org/mailman/listinfo/tutor