[sqlalchemy] Re: non-blocking row locks?
On Oct 3, 2007, at 10:40 AM, sacha wrote: Hi, I am attempting to implement a job queue in a postgres database. There is a simple job table in which each row represents a job to be run. There are multiple dispatcher threads that pull jobs off the queue and run them. I need concurrency control to prevent multiple threads from dispatching the same job. I think the most elegant place to do this is in the database itself using row locking (rather than with OS or application concurrency control mechanisms, which would restrict the code to a single process or host). I can get and lock the next job using job = session.query(Job).with_lockmode('update').first() However, another thread running the same query would presumably block until the first releases the lock. Is there a non-blocking update mode, such that the second thread returns immediately (and I can look for a different job), or some way for the query to exclude locked jobs? Apologies if this is a sqlalchemy 101 (or SQL 101) question, I'm new to it all and I've not been able to find answers via FAQs/google. I usually take a low-tech approach to this problem and update the rows which I want to process with a status flag, such as IN PROGRESS. Subsequent queries for job rows by other threads query for rows which have QUEUED as their status flag, thereby ignoring the IN PROGRESS rows. that way nothing is locked outside of the span of single short-running transaction. i.e. BEGIN SELECT * FROM jobs WHERE status='QUEUED' FOR UPDATE UPDATE jobs SET status='IN PROGRESS' WHERE status='QUEUED' COMMIT if you want just one job, then just update the WHERE criterion of the UPDATE statement accordingly to match the job(s) you are actually going to process (or just use an ORM flush if youre using the ORM). with postgres 8.2 (and the latest trunk of 0.4) you can even do a RETURNING and get the whole thing in one query: UPDATE jobs SET status='IN PROGRESS' WHERE status='QUEUED' RETURNING * when jobs are finished I usually mark them as COMPLETE, that way you get a log output of job history as a bonus. --~--~-~--~~~---~--~~ You received this message because you are subscribed to the Google Groups sqlalchemy group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en -~--~~~~--~~--~--~---
[sqlalchemy] Re: non-blocking row locks?
Coincidentally I had to implement almost exactly the same thing today. I used separate job checkout/checkin transactions, serializable isolation level to find out when there is a collision and job checkout time to see which jobs are currently running. By default the checkout time is a special date way in the past instead of NULL to make it work better with indexing. This implements a kind of optimistic locking that throws serialization errors when two workers checkout concurrently. I'm hoping that this won't hit performance problems, because the checkout process is quite fast compared to the processing, although there will be around 40 workers running concurrently in the cluster. The code I used is basically this: (slightly edited to omit confidential/superfluous stuff) def check_out_dataset_for_processing(): for retries in xrange(MAX_RETRIES): try: session = Session() # This is necessary to avoid duplicate checkouts session.connection(Dataset).execute(SET TRANSACTION ISOLATION LEVEL SERIALIZABLE) # Database side CURRENT_TIMESTAMP would be a lot better now = datetime.now() timeout_horizon = now - timedelta(seconds=TIMEOUT) dataset = session.query(Dataset).filter_by(is_processed=False)\ .filter(Dataset.last_checkout timeout_horizon).first() if dataset: # If found something mark it checked out dataset.last_checkout = now result = dataset.id, dataset.data_for_processing else: result = None session.commit() return result except sqlalchemy.exceptions.ProgrammingError, e: if e.orig.pgcode != '40001': # Ignore serialization conflicts raise logger.error('Failed to checkout a dataset') def store_processing_result(dataset_id, processing_result): session = Session() dataset = session.query(Dataset).filter_by(id=dataset_id).first() dataset.result = processing_result dataset.is_processed = True session.commit() In my case duplicate execution is only a performance issue so when a worker times out due to crashing or just being slow that dataset is handed to another worker. Though this code can easily be modified to do something different in case of timeout. I don't have a separate job table and the datasets table is millions of rows so to get good performance I used a partial index (currently only in trunk): Index('unprocessed_datasets_idx', datasets.c.last_checkout, postgres_where=datasets.c.is_processed == False) This reduces the job lookup to a simple index lookup. --~--~-~--~~~---~--~~ You received this message because you are subscribed to the Google Groups sqlalchemy group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en -~--~~~~--~~--~--~---