First of all, thank you for taking the time to give a such in depth
answer.

To answer point 1 and 3; I was actually using the random name to make
the task idempotent, but yes named tasks will do better. I still don't
understand why the above code behaves the way it does, but using user
generated name should fix this.

Regarding point 2, I'm not sure you're correct on this one. Shouldn't
the query always be fast since there's no "skip" in the fetch method?
Since the query always gets the first X entities and deletes them, I
don't see why it'd be longer and longer.

In this scenario, I don't need the extra speed from the second method
you suggest. I will make sure to look in the mapper api though.

Again, thank you for your time.

Jimmy

On 1 oct, 21:44, Eli Jones <eli.jo...@gmail.com> wrote:
> From the looks of this code.. you should completely rewrite it.
>
> 1.  You indicate you want to use a recursive task.. but you aren't using
> named tasks.  So, you have no way to guarantee that the task won't fork
> indefinitely.
> 2.  You aren't using a cursor.. so eventually the db.Query() will take
> longer and longer and longer to run if you have a very large number of
> entities to delete.
> 3.  Why are you saving the random task name to the datastore?
>
> There are a few patterns you can use to do batch deletes for a bunch of
> entities.
>
> 1.  Simple recursive named task with a cursor.
>  (For an intelligent explanation of this when using deferred, see 
> here:http://blog.notdot.net/2010/03/Task-Queue-task-chaining-done-right)
>
> 2.  Recursive task using keys_only cursor and deferred task to do
> db.delete()
>
> You do 1 if you really only want each db.delete() to run one at a time.  You
> do 2 if you just want the deleting done as fast as possible.  (Or there is
> option 3.. familiarize yourself with the GAE Mapper API [aka the Map Reduce
> API] and use that to fire off deletes.)
>
> 2 Would look something like this (I didn't indent the try except since it's
> getting a little too indenty):
>
> def mainTask(campaign_id, new_import, cursor=None, i = 0):
>     count = app_model.DATASTORE_PUT_LIMIT
>     try:
>     while count == app_model.DATASTORE_PUT_LIMIT:
>         query = db.Query(Recipient,
> keys_only=True).ancestor(campaign_id).filter('campaign', camp)
>         if cursor is not None:
>             query.with_cursor(cursor)
>         results = query.fetch(app_model.DATASTORE_PUT_LIMIT)
>         count = len(results)
>         deferred.defer(deleteByKeyList, repr(results))
>         cursor = query.cursor()
>     if do_weird_new_import_check(new_import):
>         campaign.Campaign.get_cache(campaign_id).spawn_import_tasks()
>     except DeadlineExceededError:
>         i+= 1
>         deferred.defer(mainTask, campaign_id, new_import, cursor, i, _name =
> "mainTask-" + str(campain_id) + "-run-" + str(i))
>
> def deleteByKeyList(keylist):
>     from google.appengine.api import datastore_types
>     keylist = eval(keylist)
>     db.delete(keylist)
>
> You could call that first function from a remote_shell.. and it would then
> begin firing off deferred deleteByKeyList batches until it go a
> DeadlineExceeded Error.. then it would just defer itself with all of the
> appropriate variables.
>
> It's a little sloppy.. and there's a chance that the deferred delete would
> get called.. and then the deadlineexceeded would throw.. and you'd then have
> the cursor variable get passed for the old cursor... but that shouldn't
> really change the ultimate behaviour.
>
> When the while loop completes (which will only happen when all of the
> entities have had a deferred deleteByKeyList() fired off), it will do the
> new_import check.
>
> If you are worried about maybe the DeadlineExceeded being fired off.. and
> then the deferred.defer() call not succeeding.. it wouldn't really matter,
> as long as it doesn't happen too often (so that your cursor value gets
> passed along) and the error occurrs inside of one of the deferred versions
> of the mainTask() function.. it will just run the task again with the
> previous cursor value.
>
> And, when it runs the query.. it will only get keys for entities that are
> still in the datastore.  (the only negative will be that it will be looking
> for key values greater than the key value that started an already deleted
> batch).
>
> I'll admit, this method is a little nutty.. it's effectively trying to do
> the db.deletes() as fast as possible.. and it results in a lot of deferred
> errors (the usual 10 second appengine could not service the request in time
> errors).  But, it's faster than doing it serially.
>
> Again.. i am sad to say that the Mapper API pretty much makes this
> obsolete.. since once you know how to use it, it makes a lot of stuff like
> this easy.. and it seems to provide really neat stats and abilities to
> monitor the processes.
>
> http://googleappengine.blogspot.com/2010/07/introducing-mapper-api.html
>
>
>
> On Fri, Oct 1, 2010 at 3:54 PM, Jimmy Bourassa <jboura...@gmail.com> wrote:
> > I created a task that handles batch delete for my application. The
> > purpose of the task is to delete records recursively. Once all records
> > are deleted, a new process must be started.
>
> > I tried to implement this through the code posted below. I really why
> > this is not working as expected, but for some reason, I end up having
> > many tasks running concurrently. Is this the expected behavior of the
> > TaskQueue?
>
> > Here's the actual code :
>
> > def post(self):
> >        task_unique_name = self.request.headers['X-AppEngine-TaskName']
> >        #Checks if task has been executed already
> >        if task.ExecutedTask.all().filter('task_name=',
> > task_unique_name).count():
> >                return;
>
> >        campaign_id = db.Key(self.request.POST['campaign_id'])
>
> >        def delete_transcation(camp):
> >                query = db.Query(Recipient,
> > keys_only=True).ancestor(campaign_id).filter('campaign', camp)
> >                results = query.fetch(app_model.DATASTORE_PUT_LIMIT)
> >                #If results are found, delete them and start a new delete
> > task
> >                if results:
> >                        db.delete(results)
> >                        t = taskqueue.Task(url='/tasks/delete_recipients',
> > params=self.request.POST)
> >                        t.add(queue_name='recipients-import')
>
> >                #No results found to delete, start import if appropriate.
> >                elif self.request.POST.get('new_import', False)
>
> >  campaign.Campaign.get_cache(campaign_id).spawn_import_tasks()
>
> >                return True
>
> >        def mark_task_as_done():
> >                save_task = task.ExecutedTask(task_name=task_unique_name)
> >                save_task.put()
>
> >        if(db.run_in_transaction(delete_transcation, campaign_id)):
> >                db.run_in_transaction(mark_task_as_done)
>
> > --
> > You received this message because you are subscribed to the Google Groups
> > "Google App Engine" group.
> > To post to this group, send email to google-appeng...@googlegroups.com.
> > To unsubscribe from this group, send email to
> > google-appengine+unsubscr...@googlegroups.com<google-appengine%2Bunsubscrib 
> > e...@googlegroups.com>
> > .
> > For more options, visit this group at
> >http://groups.google.com/group/google-appengine?hl=en.

-- 
You received this message because you are subscribed to the Google Groups 
"Google App Engine" group.
To post to this group, send email to google-appeng...@googlegroups.com.
To unsubscribe from this group, send email to 
google-appengine+unsubscr...@googlegroups.com.
For more options, visit this group at 
http://groups.google.com/group/google-appengine?hl=en.

Reply via email to