Hi,

Thanks Robert. Right, now see the need for having revision numbers on
aggregates.

Thanks
Rob



On Jul 26, 7:21 am, Robert Kluin <robert.kl...@gmail.com> wrote:
> Hi,
>   No, they won't be summed twice.  The update to the aggregate occurs
> within a transaction, so one of the writes will fail and retry.  Even
> if some of the update "work units" are refetched the marker entities
> stored within the aggregates entity group have a revision number that
> allows you to skip stale updates.
>
> Robert
>
>
>
>
>
>
>
> On Mon, Jul 25, 2011 at 04:39, Bert <robertbcur...@gmail.com> wrote:
> > Hi,
>
> > Based on Brett's talk
> >http://www.google.com/url?sa=D&q=http://www.google.com/events/io/2010...,
> > is it possible to have a scenario where you get 2 join tasks happening
> > closer than 1 second(or given batch time duration) apart?
>
> > A scenario where I think it may happen is as follows:
> > Batch time 1 second
> > Stall timeout is 30 seconds
> > The first insert is added at hh:mm:29.9 (multiple inserts follow)
> > Task_name is e.g. "sumName-1-1237847"
> > The task will only execute at about hh:mm:30.9
>
> > At hh:mm:30.0 a new task is inserted but the index remains the same
> > (the task sumName-1-1237847 hasn't incremented index yet).
> > The task_name is "sumName-2-1237847" and it will execute at hh:mm:31.0
> > (only 0.1 seconds after the previous task for the same index).
>
> > In most cases this may be OK (because 2 updates in a second for an
> > entity group is probably OK), but what happens if there are many
> > records for index 1237847 (more than task can finish in 0.1 seconds)?
> > Potentially task "sumName-2-1237847" could start executing before task
> > "sumName-1-1237847" has had time to finish executing, in which case it
> > will also retrieve the work items for index 1237847 and may sum the
> > same values twice.
>
> > Here's the code (from the slides):
>
> > now = time.time()
> > stallTimeout =30
> > try:
> >  taskqueue.add(
> >  name='%s-%d-%d' % (
> >  sum_name, int(now / stallTimeout ), index),
> >  url='/work',
> >  eta=datetime.datetime.utcfromtimestamp(now) +
> >  datetime.timedelta(seconds=1))
> > except taskqueue.TaskAlreadyExistsError:
> >  pass
>
> > def join(sum_name, index):
> >  # force new writers to use the next index
> >  memcache.incr('index-' + sum_name)
> >  lock = '%s-lock-%d' % (sum_name, index)
> >  memcache.decr(lock, 2**15) # You missed the boat
> >  # busy wait for writers
> >  for i in xrange(20): # timeout after 5s
> >    counter = memcache.get(lock)
> >    if counter is None or int(counter) <= 2**15:
> >      break
> >    time.sleep(0.250)
> >  #get work for index
> >  #sum or whatever you want
> >  #delete work
>
> > If what I've said is valid, is a workaround to check the lock at join
> > time to see if its already been decremented by 2**15 before starting
> > the join as that would indicate it's already been run?
>
> > def join(sum_name,index):
> >  lock = '%s-lock-%d' % (sum_name, index)
> >  counter =memcache.get(lock)
> >  if int(counter) < 2**16:
> >    #the task for index has already started running, return to prevent
> > overlap
> >    return False
>
> >   #rest of code here...
>
> > Thanks
> > Rob
>
> > --
> > You received this message because you are subscribed to the Google Groups 
> > "Google App Engine" group.
> > To post to this group, send email to google-appengine@googlegroups.com.
> > To unsubscribe from this group, send email to 
> > google-appengine+unsubscr...@googlegroups.com.
> > For more options, visit this group 
> > athttp://groups.google.com/group/google-appengine?hl=en.

-- 
You received this message because you are subscribed to the Google Groups 
"Google App Engine" group.
To post to this group, send email to google-appengine@googlegroups.com.
To unsubscribe from this group, send email to 
google-appengine+unsubscr...@googlegroups.com.
For more options, visit this group at 
http://groups.google.com/group/google-appengine?hl=en.

Reply via email to