Hi, No, they won't be summed twice. The update to the aggregate occurs within a transaction, so one of the writes will fail and retry. Even if some of the update "work units" are refetched the marker entities stored within the aggregates entity group have a revision number that allows you to skip stale updates.
Robert On Mon, Jul 25, 2011 at 04:39, Bert <robertbcur...@gmail.com> wrote: > Hi, > > Based on Brett's talk > http://www.google.com/url?sa=D&q=http://www.google.com/events/io/2010/sessions/high-throughput-data-pipelines-appengine.html, > is it possible to have a scenario where you get 2 join tasks happening > closer than 1 second(or given batch time duration) apart? > > A scenario where I think it may happen is as follows: > Batch time 1 second > Stall timeout is 30 seconds > The first insert is added at hh:mm:29.9 (multiple inserts follow) > Task_name is e.g. "sumName-1-1237847" > The task will only execute at about hh:mm:30.9 > > At hh:mm:30.0 a new task is inserted but the index remains the same > (the task sumName-1-1237847 hasn't incremented index yet). > The task_name is "sumName-2-1237847" and it will execute at hh:mm:31.0 > (only 0.1 seconds after the previous task for the same index). > > In most cases this may be OK (because 2 updates in a second for an > entity group is probably OK), but what happens if there are many > records for index 1237847 (more than task can finish in 0.1 seconds)? > Potentially task "sumName-2-1237847" could start executing before task > "sumName-1-1237847" has had time to finish executing, in which case it > will also retrieve the work items for index 1237847 and may sum the > same values twice. > > Here's the code (from the slides): > > now = time.time() > stallTimeout =30 > try: > taskqueue.add( > name='%s-%d-%d' % ( > sum_name, int(now / stallTimeout ), index), > url='/work', > eta=datetime.datetime.utcfromtimestamp(now) + > datetime.timedelta(seconds=1)) > except taskqueue.TaskAlreadyExistsError: > pass > > > def join(sum_name, index): > # force new writers to use the next index > memcache.incr('index-' + sum_name) > lock = '%s-lock-%d' % (sum_name, index) > memcache.decr(lock, 2**15) # You missed the boat > # busy wait for writers > for i in xrange(20): # timeout after 5s > counter = memcache.get(lock) > if counter is None or int(counter) <= 2**15: > break > time.sleep(0.250) > #get work for index > #sum or whatever you want > #delete work > > If what I've said is valid, is a workaround to check the lock at join > time to see if its already been decremented by 2**15 before starting > the join as that would indicate it's already been run? > > def join(sum_name,index): > lock = '%s-lock-%d' % (sum_name, index) > counter =memcache.get(lock) > if int(counter) < 2**16: > #the task for index has already started running, return to prevent > overlap > return False > > #rest of code here... > > Thanks > Rob > > -- > You received this message because you are subscribed to the Google Groups > "Google App Engine" group. > To post to this group, send email to google-appengine@googlegroups.com. > To unsubscribe from this group, send email to > google-appengine+unsubscr...@googlegroups.com. > For more options, visit this group at > http://groups.google.com/group/google-appengine?hl=en. > > -- You received this message because you are subscribed to the Google Groups "Google App Engine" group. To post to this group, send email to google-appengine@googlegroups.com. To unsubscribe from this group, send email to google-appengine+unsubscr...@googlegroups.com. For more options, visit this group at http://groups.google.com/group/google-appengine?hl=en.