Coincidentally I had to implement almost exactly the same thing today.
I used separate job checkout/checkin transactions, serializable
isolation level to find out when there is a collision and job checkout
time to see which jobs are currently running. By default the checkout
time is a special date way in the past instead of NULL to make it work
better with indexing. This implements a kind of optimistic locking
that throws serialization errors when two workers checkout
concurrently. I'm hoping that this won't hit performance problems,
because the checkout process is quite fast compared to the processing,
although there will be around 40 workers running concurrently in the
cluster.

The code I used is basically this: (slightly edited to omit
confidential/superfluous stuff)

def check_out_dataset_for_processing():
    for retries in xrange(MAX_RETRIES):
        try:
            session = Session()

            # This is necessary to avoid duplicate checkouts
            session.connection(Dataset).execute("SET TRANSACTION
ISOLATION LEVEL SERIALIZABLE")

            # Database side CURRENT_TIMESTAMP would be a lot better
            now = datetime.now()
            timeout_horizon = now - timedelta(seconds=TIMEOUT)

            dataset =
session.query(Dataset).filter_by(is_processed=False)\
                .filter(Dataset.last_checkout <
timeout_horizon).first()

            if dataset:
                # If found something mark it checked out
                dataset.last_checkout = now
                result = dataset.id, dataset.data_for_processing
            else:
                result = None

            session.commit()

            return result
        except sqlalchemy.exceptions.ProgrammingError, e:
            if e.orig.pgcode != '40001': # Ignore serialization
conflicts
                raise
    logger.error('Failed to checkout a dataset')

def store_processing_result(dataset_id, processing_result):
    session = Session()

    dataset = session.query(Dataset).filter_by(id=dataset_id).first()

    dataset.result = processing_result
    dataset.is_processed = True

    session.commit()

In my case duplicate execution is only a performance issue so when a
worker times out due to crashing or just being slow that dataset is
handed to another worker. Though this code can easily be modified to
do something different in case of timeout. I don't have a separate job
table and the datasets table is millions of rows so to get good
performance I used a partial index (currently only in trunk):

Index('unprocessed_datasets_idx', datasets.c.last_checkout,
    postgres_where=datasets.c.is_processed == False)

This reduces the job lookup to a simple index lookup.


--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To post to this group, send email to sqlalchemy@googlegroups.com
To unsubscribe from this group, send email to [EMAIL PROTECTED]
For more options, visit this group at 
http://groups.google.com/group/sqlalchemy?hl=en
-~----------~----~----~----~------~----~------~--~---

Reply via email to