Hi, Based on Brett's talk http://www.google.com/url?sa=D&q=http://www.google.com/events/io/2010/sessions/high-throughput-data-pipelines-appengine.html, is it possible to have a scenario where you get 2 join tasks happening closer than 1 second(or given batch time duration) apart?
A scenario where I think it may happen is as follows: Batch time 1 second Stall timeout is 30 seconds The first insert is added at hh:mm:29.9 (multiple inserts follow) Task_name is e.g. "sumName-1-1237847" The task will only execute at about hh:mm:30.9 At hh:mm:30.0 a new task is inserted but the index remains the same (the task sumName-1-1237847 hasn't incremented index yet). The task_name is "sumName-2-1237847" and it will execute at hh:mm:31.0 (only 0.1 seconds after the previous task for the same index). In most cases this may be OK (because 2 updates in a second for an entity group is probably OK), but what happens if there are many records for index 1237847 (more than task can finish in 0.1 seconds)? Potentially task "sumName-2-1237847" could start executing before task "sumName-1-1237847" has had time to finish executing, in which case it will also retrieve the work items for index 1237847 and may sum the same values twice. Here's the code (from the slides): now = time.time() stallTimeout =30 try: taskqueue.add( name='%s-%d-%d' % ( sum_name, int(now / stallTimeout ), index), url='/work', eta=datetime.datetime.utcfromtimestamp(now) + datetime.timedelta(seconds=1)) except taskqueue.TaskAlreadyExistsError: pass def join(sum_name, index): # force new writers to use the next index memcache.incr('index-' + sum_name) lock = '%s-lock-%d' % (sum_name, index) memcache.decr(lock, 2**15) # You missed the boat # busy wait for writers for i in xrange(20): # timeout after 5s counter = memcache.get(lock) if counter is None or int(counter) <= 2**15: break time.sleep(0.250) #get work for index #sum or whatever you want #delete work If what I've said is valid, is a workaround to check the lock at join time to see if its already been decremented by 2**15 before starting the join as that would indicate it's already been run? def join(sum_name,index): lock = '%s-lock-%d' % (sum_name, index) counter =memcache.get(lock) if int(counter) < 2**16: #the task for index has already started running, return to prevent overlap return False #rest of code here... Thanks Rob -- You received this message because you are subscribed to the Google Groups "Google App Engine" group. To post to this group, send email to google-appengine@googlegroups.com. To unsubscribe from this group, send email to google-appengine+unsubscr...@googlegroups.com. For more options, visit this group at http://groups.google.com/group/google-appengine?hl=en.