Here's a common scenario. I'm looking for the best implementation using the 
scheduler.

I want to support a set of background tasks (task1, task2...), where each 
task:
  • processes a queue of items
  • waits a few seconds

It's safe to have task1 and task2 running in parallel, but I cannot have 
two task1s running in parallel. They will duplicately process the same 
queue of items.

I found the scheduler supports this nicely with parameters like:

db.scheduler_task.insert(function_name='task1',
                         task_name='task1',
                         stop_time = now + timedelta(days=90000),
                         repeats=0,
                         period=10)

I can launch 3 workers, and they coordinate amongst themselves to make sure 
that only one will run the task at a time. Great! This task will last 
forever...

...but now we encounter my problem...

What happens if it crashes, or passes stop_time? Then the task will turn 
off, and the queue is no longer processed. Or what happens if I reset the 
database, or install this code on a new server? It isn't nice if I have to 
re-run the insert function by hand.

So how can I ensure there is always EXACTLY ONE of each task in the 
database?

I tried putting this code into models:

def initialize_task_queue(task_name):
    num_tasks = db((db.scheduler_task.function_name == task_name)
                   & ((db.scheduler_task.status == 'QUEUED')
                      | (db.scheduler_task.status == 'ASSIGNED')
                      | (db.scheduler_task.status == 'RUNNING')
                      | (db.scheduler_task.status == 'ACTIVE'))).count()

    # Add a task if there isn't one already
    if num_tasks < 1:
        db.scheduler_task.insert(function_name=task_name,
                                 task_name=task_name,
                                 stop_time = now + timedelta(days=90000),
                                 repeats=0,
                                 period=period)
        db.commit()

initialize_task_queue('task1')
initialize_task_queue('task2')
initialize_task_queue('task3')

This worked, except it introduces a race condition! If you start three 
web2py processes simultaneously (e.g., for three scheduler processes), they 
will insert duplicate tasks:
    process 1: count number of 'task1' tasks
    process 2: count number of 'task1' tasks
    process 1: there are less than 1, insert a 'task1' task
    process 2: there are less than 1, insert a 'task1' task

I was counting on postgresql's MVCC transaction support to make each of 
these atomic. Unfortunately, that's not how it works. I do not understand 
why. As a workaround, I'm currently wrapping the code inside 
"initialize_task_queue" with postgresql advisory lock:

    if not db.executesql('select pg_try_advisory_lock(1);')[0][0]:
        return

    ... count tasks, add one if needed ...

    db.executesql('select pg_advisory_unlock(1);')

But this sucks.
What's a better way to ensure there is always 1 infinite-repeat task in the 
scheduler? Or... am I using the wrong design entirely?

Reply via email to