Hi, Thanks Robert. Right, now see the need for having revision numbers on aggregates.
Thanks Rob On Jul 26, 7:21 am, Robert Kluin <robert.kl...@gmail.com> wrote: > Hi, > No, they won't be summed twice. The update to the aggregate occurs > within a transaction, so one of the writes will fail and retry. Even > if some of the update "work units" are refetched the marker entities > stored within the aggregates entity group have a revision number that > allows you to skip stale updates. > > Robert > > > > > > > > On Mon, Jul 25, 2011 at 04:39, Bert <robertbcur...@gmail.com> wrote: > > Hi, > > > Based on Brett's talk > >http://www.google.com/url?sa=D&q=http://www.google.com/events/io/2010..., > > is it possible to have a scenario where you get 2 join tasks happening > > closer than 1 second(or given batch time duration) apart? > > > A scenario where I think it may happen is as follows: > > Batch time 1 second > > Stall timeout is 30 seconds > > The first insert is added at hh:mm:29.9 (multiple inserts follow) > > Task_name is e.g. "sumName-1-1237847" > > The task will only execute at about hh:mm:30.9 > > > At hh:mm:30.0 a new task is inserted but the index remains the same > > (the task sumName-1-1237847 hasn't incremented index yet). > > The task_name is "sumName-2-1237847" and it will execute at hh:mm:31.0 > > (only 0.1 seconds after the previous task for the same index). > > > In most cases this may be OK (because 2 updates in a second for an > > entity group is probably OK), but what happens if there are many > > records for index 1237847 (more than task can finish in 0.1 seconds)? > > Potentially task "sumName-2-1237847" could start executing before task > > "sumName-1-1237847" has had time to finish executing, in which case it > > will also retrieve the work items for index 1237847 and may sum the > > same values twice. > > > Here's the code (from the slides): > > > now = time.time() > > stallTimeout =30 > > try: > > taskqueue.add( > > name='%s-%d-%d' % ( > > sum_name, int(now / stallTimeout ), index), > > url='/work', > > eta=datetime.datetime.utcfromtimestamp(now) + > > datetime.timedelta(seconds=1)) > > except taskqueue.TaskAlreadyExistsError: > > pass > > > def join(sum_name, index): > > # force new writers to use the next index > > memcache.incr('index-' + sum_name) > > lock = '%s-lock-%d' % (sum_name, index) > > memcache.decr(lock, 2**15) # You missed the boat > > # busy wait for writers > > for i in xrange(20): # timeout after 5s > > counter = memcache.get(lock) > > if counter is None or int(counter) <= 2**15: > > break > > time.sleep(0.250) > > #get work for index > > #sum or whatever you want > > #delete work > > > If what I've said is valid, is a workaround to check the lock at join > > time to see if its already been decremented by 2**15 before starting > > the join as that would indicate it's already been run? > > > def join(sum_name,index): > > lock = '%s-lock-%d' % (sum_name, index) > > counter =memcache.get(lock) > > if int(counter) < 2**16: > > #the task for index has already started running, return to prevent > > overlap > > return False > > > #rest of code here... > > > Thanks > > Rob > > > -- > > You received this message because you are subscribed to the Google Groups > > "Google App Engine" group. > > To post to this group, send email to google-appengine@googlegroups.com. > > To unsubscribe from this group, send email to > > google-appengine+unsubscr...@googlegroups.com. > > For more options, visit this group > > athttp://groups.google.com/group/google-appengine?hl=en. -- You received this message because you are subscribed to the Google Groups "Google App Engine" group. To post to this group, send email to google-appengine@googlegroups.com. To unsubscribe from this group, send email to google-appengine+unsubscr...@googlegroups.com. For more options, visit this group at http://groups.google.com/group/google-appengine?hl=en.