Hi,
  No, they won't be summed twice.  The update to the aggregate occurs
within a transaction, so one of the writes will fail and retry.  Even
if some of the update "work units" are refetched the marker entities
stored within the aggregates entity group have a revision number that
allows you to skip stale updates.



Robert


On Mon, Jul 25, 2011 at 04:39, Bert <robertbcur...@gmail.com> wrote:
> Hi,
>
> Based on Brett's talk
> http://www.google.com/url?sa=D&q=http://www.google.com/events/io/2010/sessions/high-throughput-data-pipelines-appengine.html,
> is it possible to have a scenario where you get 2 join tasks happening
> closer than 1 second(or given batch time duration) apart?
>
> A scenario where I think it may happen is as follows:
> Batch time 1 second
> Stall timeout is 30 seconds
> The first insert is added at hh:mm:29.9 (multiple inserts follow)
> Task_name is e.g. "sumName-1-1237847"
> The task will only execute at about hh:mm:30.9
>
> At hh:mm:30.0 a new task is inserted but the index remains the same
> (the task sumName-1-1237847 hasn't incremented index yet).
> The task_name is "sumName-2-1237847" and it will execute at hh:mm:31.0
> (only 0.1 seconds after the previous task for the same index).
>
> In most cases this may be OK (because 2 updates in a second for an
> entity group is probably OK), but what happens if there are many
> records for index 1237847 (more than task can finish in 0.1 seconds)?
> Potentially task "sumName-2-1237847" could start executing before task
> "sumName-1-1237847" has had time to finish executing, in which case it
> will also retrieve the work items for index 1237847 and may sum the
> same values twice.
>
> Here's the code (from the slides):
>
> now = time.time()
> stallTimeout =30
> try:
>  taskqueue.add(
>  name='%s-%d-%d' % (
>  sum_name, int(now / stallTimeout ), index),
>  url='/work',
>  eta=datetime.datetime.utcfromtimestamp(now) +
>  datetime.timedelta(seconds=1))
> except taskqueue.TaskAlreadyExistsError:
>  pass
>
>
> def join(sum_name, index):
>  # force new writers to use the next index
>  memcache.incr('index-' + sum_name)
>  lock = '%s-lock-%d' % (sum_name, index)
>  memcache.decr(lock, 2**15) # You missed the boat
>  # busy wait for writers
>  for i in xrange(20): # timeout after 5s
>    counter = memcache.get(lock)
>    if counter is None or int(counter) <= 2**15:
>      break
>    time.sleep(0.250)
>  #get work for index
>  #sum or whatever you want
>  #delete work
>
> If what I've said is valid, is a workaround to check the lock at join
> time to see if its already been decremented by 2**15 before starting
> the join as that would indicate it's already been run?
>
> def join(sum_name,index):
>  lock = '%s-lock-%d' % (sum_name, index)
>  counter =memcache.get(lock)
>  if int(counter) < 2**16:
>    #the task for index has already started running, return to prevent
> overlap
>    return False
>
>   #rest of code here...
>
> Thanks
> Rob
>
> --
> You received this message because you are subscribed to the Google Groups 
> "Google App Engine" group.
> To post to this group, send email to google-appengine@googlegroups.com.
> To unsubscribe from this group, send email to 
> google-appengine+unsubscr...@googlegroups.com.
> For more options, visit this group at 
> http://groups.google.com/group/google-appengine?hl=en.
>
>

-- 
You received this message because you are subscribed to the Google Groups 
"Google App Engine" group.
To post to this group, send email to google-appengine@googlegroups.com.
To unsubscribe from this group, send email to 
google-appengine+unsubscr...@googlegroups.com.
For more options, visit this group at 
http://groups.google.com/group/google-appengine?hl=en.

Reply via email to