On May 12, 1:40 am, Rhamphoryncus <[EMAIL PROTECTED]> wrote: > On May 11, 10:16 am,skunkwerk<[EMAIL PROTECTED]> wrote: > > > > > On May 10, 1:31 pm, Dennis Lee Bieber <[EMAIL PROTECTED]> wrote: > > > > On Fri, 9 May 2008 08:40:38 -0700 (PDT),skunkwerk<[EMAIL PROTECTED]> > > > declaimed the following in comp.lang.python: > > > > Coming in late... > > > > > On May 9, 12:12 am, John Nagle <[EMAIL PROTECTED]> wrote: > > > > >skunkwerkwrote: > > > > > > i've declared a bunch of workerthreads(100) and a queue into which > > > > > > new requests are inserted, like so: > > > > <snip> > > > > > > > queue = Queue.Queue(0) > > > > > > WORKERS=100 > > > > > > for i in range(WORKERS): > > > > > > thread = SDBThread(queue) > > > > > > thread.setDaemon(True) > > > > > > thread.start() > > > > > > > the thread: > > > > > > > class SimpleDBThread ( threading.Thread ): > > > > > > def __init__ ( self, queue ): > > > > > > self.__queue = queue > > > > Note: double-leading __ means "name mangling" -- typically only > > > needed when doing multiple layers of inheritance where different parents > > > have similar named items that need to be kept independent; a single _ is > > > the convention for "don't touch me unless you know what you are doing" > > > > > > > threading.Thread.__init__ ( self ) > > > > > > def run ( self ): > > > > > > while 1: > > > > > > item = self.__queue.get() > > > > > > if item!=None: > > > > > > model = domain.get_item(item[0]) > > > > > > logger.debug('sdbthread item:'+item[0]) > > > > > > title = model['title'] > > > > > > scraped = model['scraped'] > > > > > > logger.debug("sdbthread title:"+title) > > > > > > > any suggestions? > > > > > > thanks > > > > <snip> > > > > > thanks John, Gabriel, > > > > here's the 'put' side of the requests: > > > > > def prepSDBSearch(results): > > > > modelList = [0] > > > > counter=1 > > > > for result in results: > > > > data = [result.item, counter, modelList] > > > > queue.put(data) > > > > counter+=1 > > > > while modelList[0] < len(results): > > > > print 'waiting...'#wait for them to come home > > > > modelList.pop(0)#now remove '0' > > > > return modelList > > > > My suggestion, if you really want diagnostic help -- follow the > > > common recommendation of posting the minimal /runable (if erroneous)/ > > > code... If "domain.get_item()" is some sort of RDBM access, you might > > > fake it using a pre-loaded dictionary -- anything that allows it to > > > return something when given the key value. > > > > > responses to your follow ups: > > > > 1) 'item' in thethreadsis a list that corresponds to the 'data' > > > > list in the above function. it's not global, and the initial values > > > > seem ok, but i'm not sure if every time i pass in data to the queue it > > > > passes in the same memory address or declares a new 'data' list (which > > > > I guess is what I want) > > > > Rather confusing usage... In your "put" you have a list whose > > > first > > > element is "result.item", but then in the work thread, you refer to the > > > entire list as "item" > > > > > 3) the first item in the modelList is a counter that keeps track of > > > > the number ofthreadsfor this call that have completed - is there any > > > > better way of doing this? > > > > Where? None of your posted code shows either "counter" or > > > modelList > > > being used by thethreads. > > > > And yes, if you havethreadstrying to update a shared mutable, you > > > have a race condition. > > > > You also have a problem if you are using "counter" to define where > > > in modelList a thread is supposed to store its results -- as you can not > > > access an element that doesn't already exist... > > > > a = [0] > > > a[3] = 1 #failure, need to create elements 1, 2, 3 first > > > > Now, if position is irrelevant, and a thread just appends its > > > results to modelList, then you don't need some counter, all you need is > > > to check the length of modelList against the count expected. > > > > Overall -- even though you are passing things via the queue, the > > > contents being pass via the queue are being treated as if they were > > > global entities (you could make modelList a global, remove it from the > > > queue entries, and have the same net access)... > > > > IOWs, you have too much coupling between thethreadsand the feed > > > routine... > > > > As for me... I'd be using a second queue for return values... > > > > WORKERTHREADS = 100 > > > feed = Queue.Queue() > > > result = Queue.Queue() > > > > def worker(): > > > while True: > > > (ID, item) = feed.get() #I leave the > > > queues globals > > > > > > #since they perform locking > > > > > > #internally > > > model = domain.get_item(item) > > > results.put( (ID, model["title"], model["scraped"]) ) > > > > for i in range(WORKERTHREADS): > > > aThread = threading.Thread(target=worker) > > > #overkill to subclass as there is now no specialized init > > > #and if I really wanted to make the queues non-global > > > #I'd pass them as arguments: > > > # threading.Thread(target=worker, args=(feed, > > > results)) > > > #where worker is now > > > # def worker(feed, results): > > > aThread.setDaemon(True) > > > aThread.start() > > > > ... > > > > def prepSearch(searches): > > > modelList = [] > > > counter = 0 > > > for searchItem in searches: > > > feed.put( (counter, searchItem) ) > > > counter += 1 > > > modelList.append(None) #extend list one element per > > > search > > > while counter: > > > (ID, title, scraped) = results.get() > > > modelList[ID] = (title, scraped) > > > counter -= 1 > > > return modelList > > > > The only place counter and modelList are modified are within the > > > prepSearch. I'm passing counter out and back to use as an ID value if > > > the final results are supposed to be in order -- that way if one thread > > > finishes before another, the items can be placed into the list where > > > they'd have been sequentially. > > > > I can only hope that "domain.get_item" is an activity that is I/O > > > bound AND that it supports parallel accesses... Otherwise the above > > > workerthreadsseem to be adding a lot of overhead for queue I/O and > > > threading swaps for what is otherwise a rather linear process. > > > > Perhaps your posts don't reveal enough... Maybe you have multiple > > > mainthreadsthat are posting to the worker feed queue (and you were > > > using separate mutables for storing the results). In this situation, I'd > > > remove the results queue from being a global entity, create one queue > > > per main processing thread, and pass the queue as one of the parameters. > > > This way, a worker can return data to any source thread by using the > > > supplied queue for the return... > > > > Modify prepSearch with: > > > > myQueue = Queue.Queue() > > > ... > > > feed.put( (counter, searchItem, myQueue) ) > > > ... > > > (ID, title, scraped) = myQueue.get() > > > > Modify worker with: > > > > (ID, item, retqueue) = feed.get() > > > ... > > > retqueue.put( (ID, model["title"], model["scraped"]) ) > > > wow. thanks for the help. > > i seem to have fixed my problem though - it turns out > > domain.get_item was not thread safe as it was using the underlying > > httplib. the solution was to create a new connection to the database > > for each thread (which is ok as the database is meant to be queried in > > a massively paralell fashion). the missing part of the code included > > a part where i inserted the results at the given position into the > > list. > > > the only issue i have now is that it takes a long time for 100 threads > > to initialize that connection (>5 minutes) - and as i'm doing this on > > a webserver any time i update the code i have to restart all those > > threads, which i'm doing right now in a for loop. is there any way I > > can keep the thread stuff separate from the rest of the code for this > > file, yet allow access? It wouldn't help having a .pyc or using > > psycho, correct, as the time is being spent in the runtime? something > > along the lines of 'start a new thread every minute until you get to a > > 100' without blocking the execution of the rest of the code in that > > file? or maybe any time i need to do a search, start a new thread if > > the #threads is <100? > > $ python2.5 -m timeit -s 'import threading' 't = threading.Thread(); > t.start(); t.join()' > 10000 loops, best of 3: 131 usec per loop > > Clearly it is not the threads themselves, but something else which is > expensive. > > It's not clear why you need threads at all. Unless you've got a lot > of cpus/cores running that DBMS, or it's got fairly high latency (and > no way to pipeline), attacking it with more threads isn't gonna give > significant speedups.
correct. the threads themselves are not taking up the time, but the initialization of each thread (which includes making a new connection to the database) - typically this is ~3 seconds. The database is amazon's simpleDB, which is meant to support massively parallel queries. once the connection has been made, queries are very fast. thanks -- http://mail.python.org/mailman/listinfo/python-list