Hi all,

I've just started a blog on professional level AppEngine coding, and
the first substantive post is on a python class I built for using
tasks on Push Queues for doing scheduled job processing.

It's here: http://appenginedevelopment.blogspot.com/2011/10/worker.html

I'd love some feedback on this. I'm still a bit of a n00b to
AppEngine, so I might be doing it wrong. Are there better ways to do
job processing on AppEngine that I'm missing?

I've posted it inline assuming there are people as lazy as me who
can't be bothered clicking. You're missing links and such, but you can
get the gist.

----
The Worker
Emlyn O'Regan 1 Oct 2011

One of the first serious Google AppEngine subjects I've approached
recently is the problem of doing work in the background. In my
particular case I needed to do some intensive and error prone tasks,
then send an email with the results (which is also error prone), on a
schedule.

I was going to write some standard job-processing-in-a-loop kind of
code, with the loop being processed as a cron job (set up in
cron.yaml). That's what Syyncc does. But some bit of my brain kept
grumbling about the inelegance of that approach. You're on a platform
that wants to do it a different way, says my brain (and who am I to
disagree?).

And the cron thing is kind of bad, because it doesn't scale. Let's say
I schedule a job every two minutes. It can get through some fixed
amount of work (maybe 10 jobs?) before it hits its time limit. It can
never do more than that. That's nasty.

People often recommend backends for this kind of work. With them, you
stick jobs on a pull queue, and pull them off with the backend. Each
backend can process a limited amount of jobs, but you can set them to
be automatically created in response to workload, which is cool.

But I'm partial to push queues, what were previously just called Task
Queues. At any point in code you can schedule a task to run, which
simply comes through as a post to a url in your app:

  taskqueue.add(url='/dosomething', params={'key': key})

It's a bit clunky, because you need to set up a handler for the url,
and implement the Post method.

Oh wait, no you don't. Nick Johnson wrote the excellent deferred.defer
library, which takes care of the public url and thunking the call from
there into a method of your choice. So instead your call can look like
this:


from google.appengine.ext import deferred

  def do_something_expensive(a, b, c=None):
      logging.info("Doing something expensive!")
      # Do your work here

  # Somewhere else
  deferred.defer(do_something_expensive, "Hello, world!", 42, c=True)


That's cool, isn't it!

What's also cool about tasks is that you can delay them, either by
specifying a countdown or an eta. Using a countdown (number of seconds
before execution) is interesting, because you can delay tasks, ie:
spread the work out a bit. But using an eta is really fascinating,
because it lets you schedule work for specific times. So if you need
to schedule an email to go out at midnight, a task with an eta will do
that for you, with no real plumbing required on your part. (Can you do
this with a pull queue? You may be able to use eta to stop tasks
showing up through the lease system before a specified time, I'm not
sure about this.)

This is all great for performing scheduled background tasks. Except,
what if they fail? Or take a long time to complete? In fact, how can
you report on the status of these tasks? Well, you can't. There's no
way to go in and find out much about the task through any APIs. Even
if there was, you'd probably need custom information suited to the job
at hand anyway.

What I need is an object in the datastore that maps to the task. I
personally prefer an object oriented approach (ok, I'm an old man set
in my ways, yes I know). So, what I'd like is a base object which lets
me set up a task, kick it off, record its progress, and lets me see
afterwards how it went.

So I created the Worker. The worker is a base class polymodel object,
that you can use to do background jobs. You need to override it, and
provide it with a job to do (doExecute()) and a method for calculating
the next time to run if you want a repeating job
(doCalculateNextRun()). You can also provide a specific queue name
(override GetQueue()) and you can specify whether or not it should run
immediately (override ExecuteImmediately()). If ExecuteImmediately()
returns false, then on the first, immediate run it wont call
doExecute(), but instead will call doCalculateNextRun() and reschedule
itself.

So for instance, if you want to run a background job immediately (say
send an email), you make this class:

  class SendAnEmailImmediately(Worker)
      def doExecute(self):
          logging.info("Sending emails to %s" % lemailStr)
          lmessage = mail.EmailMessage(
                          sender="a...@example.com",
                          to="be...@example.com",
                          subject= "Hi Betty",
                          body="I know you love email!"
                          )
          lmessage.send()

       def doCalculateNextRun(self, aUtcNow, alastDue):
           return None # never reschedule

To kick it off, do this:

  lsender = SendAnEmailImmediately()
  lsender.status = 0
  lsender.enabled = True
  lsender.put()
And what do you get out of that? Well, not only does the email get
sent from a background task, but afterward you'll have a
SendAnEmailImmediately object in the datastore, with these properties:

    lastRunSucceeded = db.BooleanProperty()
    lastRunMessage = db.StringProperty()
    lastRunStartTime = db.DateTimeProperty()
    lastRunFinishTime = db.DateTimeProperty()
which give you information on when it ran and how the worker actually
went; did it fail? If so, what errors occurred?

How about a recurring task? Try this one, which sends an email once per hour:

  class SendAnEmailEveryHour(Worker)
      def doExecute(self):
          logging.info("Sending emails to %s" % lemailStr)
          lmessage = mail.EmailMessage(
                          sender="a...@example.com",
                          to="be...@example.com",
                          subject= "Hi again Betty",
                          body="Are you feeling loved yet?"
                          )
          lmessage.send()

      def doCalculateNextRun(self, aUtcNow, alastDue):
          if alastDue:
              lbaseDate = alastDue
          else:
              lbaseDate = aUtcNow
          return lbaseDate + timedelta(minutes=60)


and again, kick it off like this:


  lsender = SendAnEmailEveryHour()
  lsender.status = 0
  lsender.enabled = True
  lsender.put()


Ok, that'll work. However, what if we want a record of each run? Then
do it like this instead:

  class SendAnEmailEveryHour2(Worker)
      def doExecute(self):
          lsender = SendAnEmailImmediately()
          lsender.status = 0
          lsender.enabled = True
          lsender.put()

      def doCalculateNextRun(self, aUtcNow, alastDue):
           if alastDue:
               lbaseDate = alastDue
           else:
               lbaseDate = aUtcNow
           return lbaseDate + timedelta(minutes=60)



So now you get a recurring worker kicking off other workers, one per job.


You can see how powerful this is as a simple method of structuring
background jobs!

Ok, hold onto your hats, excuse my n00bish python, and get ready for a
slab of code. Here's the implementation of Worker:

##################################################################
from google.appengine.ext import db
from google.appengine.ext.db import polymodel
import logging
from datetime import datetime
from datetime import timedelta
from google.appengine.ext import deferred
from lib.pytz.gae import pytz
import uuid

class Worker(polymodel.PolyModel):
    nextDue = db.DateTimeProperty()
    enabled = db.BooleanProperty()
    status = db.IntegerProperty() # 0 = ready, 1 = running, 2 = stopped
    lastRunSucceeded = db.BooleanProperty()
    lastRunMessage = db.StringProperty() # only if
    lastRunStartTime = db.DateTimeProperty()
    lastRunFinishTime = db.DateTimeProperty()
    createTime = db.DateTimeProperty(auto_now_add = True)
    taskid = db.StringProperty()

    # override to change queues
    def GetQueue(self):
        return "default"

    # override to do first run in the future
    def ExecuteImmediately(self):
        return True

    # must override to perform work
    def doExecute(self):
        raise NotImplementedError

    # override to tell us when next to run
    def doCalculateNextRun(self, aUtcNow, alastDue):
        raise NotImplementedError

    def Execute(self, aTaskID, aIsFirstRun, **kwargs):
        try:
            #Don't trust depickled self, go reload self
            #Nick Johnson told me not to do this - needs to be fixed
            self = db.get(self.key())
        except db.NotSavedError, ex:
            self = None

        lutcNow = datetime.utcnow()

        if not self:
            logging.warning("eek I am gone! (disappears in a puff of logic)")
        elif not aTaskID:
            logging.warning("No aTaskID, skipping")
        elif aTaskID != self.taskid:
            logging.debug("TaskIDs do not match, skipping")
        elif not self.enabled:
            logging.warning("Disabled, skipping")
        elif self.status != 0:
            logging.warning("Wrong status to execute Worker, status =
%s, skipping" % (self.status))
        elif self.nextDue and self.nextDue > lutcNow:
            logging.debug("Don't run till %s, reschedule..." % (self.nextDue))
            if (self.nextDue - lutcNow) > timedelta(1):
                # don't reschedule more than a day forward
                lresched = lutcNow + timedelta(1) # add a day
            else:
                lresched = self.nextDue

            lqueue = self.GetQueue()

            deferred.defer(
                self.Execute,
                _queue_name=lqueue,
                _eta=lresched,
                aTaskID=self.taskid,
                aIsFirstRun=aIsFirstRun,
            )
        else:
            if aIsFirstRun and not self.nextDue and not
self.ExecuteImmediately():
                logging.debug("First run, don't execute")
            else:
                logging.debug("We can execute")
                try:
                    self.status = 1 # running
                    self.lastRunStartTime = datetime.utcnow()
                    self.put()

                    logging.debug("Before doExecute()")
                    self.doExecute()
                    logging.debug("After doExecute()")

                    self.status = 0 # ready to run
                    self.lastRunSucceeded = True
                    self.lastRunMessage = None
                except Exception, ex:
                    self.status = 0
                    self.lastRunSucceeded = False
                    self.lastRunMessage = unicode(ex)
                    logging.error(ex)

                self.lastRunFinishTime = datetime.utcnow()

            logging.debug("calculate lnextRun")
            lnextRun = None
            try:
                lutcnow = datetime.utcnow()
                lnextRun = self.doCalculateNextRun(datetime.utcnow(),
self.nextDue)
            except Exception, ex:
                logging.error(ex)

            if lnextRun:
                logging.debug("got lnextRun, need to reschedule")
                self.nextDue = lnextRun
                self.status = 0
                self.put()

                lqueue = self.GetQueue()

                if (lnextRun - lutcnow) > timedelta(1):
                    lresched = lutcnow + timedelta(1)
                else:
                    lresched = lnextRun

                if lresched <= lutcnow:
                    # run immediately, no eta provided
                    deferred.defer(
                        self.Execute,
                        _queue_name=lqueue,
                        aTaskID=self.taskid,
                        aIsFirstRun=False
                    )
                else:
                    # schedule future run
                    deferred.defer(
                        self.Execute,
                        _queue_name=lqueue,
                        _eta=lresched,
                        aTaskID=self.taskid,
                        aIsFirstRun=False
                    )
            else:
                logging.debug("no lnextRun, we are finished.")
                self.status = 2
                self.put()
    # Need to override put to kick off the task if enabled is
    # set to True
    def put(self, **kwargs):
        lneedPut = True

        # first grab a copy of what's currently stored.
        logging.debug("Entered put, new self = %s" % (self))

        loldself = None
        if self.enabled:
            logging.debug("Need to find out if enabled has been newly
set. Load old self from datastore")

            try:
                loldself = self.get(self.key())
            except Exception, ex:
                logging.error(ex)
                loldself = None

        logging.debug("See if newly enabled has changed")
        if self.enabled and (not loldself or not loldself.enabled):
            logging.debug("Newly enabled. Need to schedule self to run")

            self.taskid = unicode(uuid.uuid4())
            logging.debug("taskid == %s" % (self.taskid))

            logging.debug("Now schedule to run immediately")

            #self.nextDue = None
            self.status = 0

            logging.debug("Pre-save")
            super(Worker, self).put(**kwargs)
            lneedPut = False

            lqueue = self.GetQueue()

            # run immediately
            logging.debug("call deferred.defer")
            deferred.defer(
                self.Execute,
                _queue_name=lqueue,
                aTaskID=self.taskid,
                aIsFirstRun=True
            )
        else:
            logging.debug("not newly enabled")

        if lneedPut:
            logging.debug("Do the actual put")
            super(Worker, self).put(**kwargs)
##################################################################

A couple of footnotes before I leave you with this:

1: I've used deferred.defer on a class method, which has an issue.
Specifically, it has to pickle the whole class, then depickle it when
the task is run. That's a little expensive, and it leaves the running
task with an old version of the object. So, I have to do this:

self = db.get(self.key())

to replace the passed in version of the object with the object from
the datastore.

What would be better would be if Execute were a class method, and I
passed the self.key() as a parameter to it, then loaded the full
instance using the key on entry to Execute. It's a simple change, but
I want to test it before I change it here. I'm sure people will point
out all kinds of issue, so I'll wait to change it until then.

2: You'd think I'd have some kind of "Go()" method to kick things off,
instead of using an override on put() to detect a change in the
"enabled" property. However, I've been specifically using this in the
context of a REST api, where I don't want to be calling methods. So,
this method of overriding put() has been just the ticket. To complete
the implementation I should also do some monkey patching of db.put(),
but I haven't needed that yet and it's a minor PITA to do, so it's
left to the reader for now. Actually, this approach of overriding
put() to do work in a REST context is a paradigm I'll explore in
detail in a subsequent post, it goes really well with the rest library
appengine-rest-server.

---

-- 
Emlyn

http://my.syyn.cc - Synchonise Google+, Facebook, WordPress and Google
Buzz posts,
comments and all.
http://point7.wordpress.com - My blog
Find me on Facebook and Buzz

-- 
You received this message because you are subscribed to the Google Groups 
"Google App Engine" group.
To post to this group, send email to google-appengine@googlegroups.com.
To unsubscribe from this group, send email to 
google-appengine+unsubscr...@googlegroups.com.
For more options, visit this group at 
http://groups.google.com/group/google-appengine?hl=en.

Reply via email to