Coincidentally I had to implement almost exactly the same thing today. I used separate job checkout/checkin transactions, serializable isolation level to find out when there is a collision and job checkout time to see which jobs are currently running. By default the checkout time is a special date way in the past instead of NULL to make it work better with indexing. This implements a kind of optimistic locking that throws serialization errors when two workers checkout concurrently. I'm hoping that this won't hit performance problems, because the checkout process is quite fast compared to the processing, although there will be around 40 workers running concurrently in the cluster.
The code I used is basically this: (slightly edited to omit confidential/superfluous stuff) def check_out_dataset_for_processing(): for retries in xrange(MAX_RETRIES): try: session = Session() # This is necessary to avoid duplicate checkouts session.connection(Dataset).execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") # Database side CURRENT_TIMESTAMP would be a lot better now = datetime.now() timeout_horizon = now - timedelta(seconds=TIMEOUT) dataset = session.query(Dataset).filter_by(is_processed=False)\ .filter(Dataset.last_checkout < timeout_horizon).first() if dataset: # If found something mark it checked out dataset.last_checkout = now result = dataset.id, dataset.data_for_processing else: result = None session.commit() return result except sqlalchemy.exceptions.ProgrammingError, e: if e.orig.pgcode != '40001': # Ignore serialization conflicts raise logger.error('Failed to checkout a dataset') def store_processing_result(dataset_id, processing_result): session = Session() dataset = session.query(Dataset).filter_by(id=dataset_id).first() dataset.result = processing_result dataset.is_processed = True session.commit() In my case duplicate execution is only a performance issue so when a worker times out due to crashing or just being slow that dataset is handed to another worker. Though this code can easily be modified to do something different in case of timeout. I don't have a separate job table and the datasets table is millions of rows so to get good performance I used a partial index (currently only in trunk): Index('unprocessed_datasets_idx', datasets.c.last_checkout, postgres_where=datasets.c.is_processed == False) This reduces the job lookup to a simple index lookup. --~--~---------~--~----~------------~-------~--~----~ You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en -~----------~----~----~----~------~----~------~--~---