Hi,

Based on Brett's talk
http://www.google.com/url?sa=D&q=http://www.google.com/events/io/2010/sessions/high-throughput-data-pipelines-appengine.html,
is it possible to have a scenario where you get 2 join tasks happening
closer than 1 second(or given batch time duration) apart?

A scenario where I think it may happen is as follows:
Batch time 1 second
Stall timeout is 30 seconds
The first insert is added at hh:mm:29.9 (multiple inserts follow)
Task_name is e.g. "sumName-1-1237847"
The task will only execute at about hh:mm:30.9

At hh:mm:30.0 a new task is inserted but the index remains the same
(the task sumName-1-1237847 hasn't incremented index yet).
The task_name is "sumName-2-1237847" and it will execute at hh:mm:31.0
(only 0.1 seconds after the previous task for the same index).

In most cases this may be OK (because 2 updates in a second for an
entity group is probably OK), but what happens if there are many
records for index 1237847 (more than task can finish in 0.1 seconds)?
Potentially task "sumName-2-1237847" could start executing before task
"sumName-1-1237847" has had time to finish executing, in which case it
will also retrieve the work items for index 1237847 and may sum the
same values twice.

Here's the code (from the slides):

now = time.time()
stallTimeout =30
try:
  taskqueue.add(
  name='%s-%d-%d' % (
  sum_name, int(now / stallTimeout ), index),
  url='/work',
  eta=datetime.datetime.utcfromtimestamp(now) +
  datetime.timedelta(seconds=1))
except taskqueue.TaskAlreadyExistsError:
  pass


def join(sum_name, index):
  # force new writers to use the next index
  memcache.incr('index-' + sum_name)
  lock = '%s-lock-%d' % (sum_name, index)
  memcache.decr(lock, 2**15) # You missed the boat
  # busy wait for writers
  for i in xrange(20): # timeout after 5s
    counter = memcache.get(lock)
    if counter is None or int(counter) <= 2**15:
      break
    time.sleep(0.250)
  #get work for index
  #sum or whatever you want
  #delete work

If what I've said is valid, is a workaround to check the lock at join
time to see if its already been decremented by 2**15 before starting
the join as that would indicate it's already been run?

def join(sum_name,index):
  lock = '%s-lock-%d' % (sum_name, index)
  counter =memcache.get(lock)
  if int(counter) < 2**16:
    #the task for index has already started running, return to prevent
overlap
    return False

   #rest of code here...

Thanks
Rob

-- 
You received this message because you are subscribed to the Google Groups 
"Google App Engine" group.
To post to this group, send email to google-appengine@googlegroups.com.
To unsubscribe from this group, send email to 
google-appengine+unsubscr...@googlegroups.com.
For more options, visit this group at 
http://groups.google.com/group/google-appengine?hl=en.

Reply via email to