[sqlalchemy] Re: non-blocking row locks?

2007-10-03 Thread Michael Bayer


On Oct 3, 2007, at 10:40 AM, sacha wrote:


 Hi,

 I am attempting to implement a job queue in a postgres database. There
 is a simple job table in which each row represents a job to be run.

 There are multiple dispatcher threads that pull jobs off the queue and
 run them. I need concurrency control to prevent multiple threads from
 dispatching the same job. I think the most elegant place to do this is
 in the database itself using row locking (rather than with OS or
 application concurrency control mechanisms, which would restrict the
 code to a single process or host).

 I can get and lock the next job using

   job = session.query(Job).with_lockmode('update').first()

 However, another thread running the same query would presumably block
 until the first releases the lock.

 Is there a non-blocking update mode, such that the second thread
 returns immediately (and I can look for a different job), or some way
 for the query to exclude locked jobs?

 Apologies if this is a sqlalchemy 101 (or SQL 101) question, I'm new
 to it all and I've not been able to find answers via FAQs/google.

I usually take a low-tech approach to this problem and update the  
rows which I want to process with a status flag, such as IN  
PROGRESS.  Subsequent queries for job rows by other threads query  
for rows which have QUEUED as their status flag, thereby ignoring  
the IN PROGRESS rows.  that way nothing is locked outside of the span  
of single short-running transaction.  i.e.

BEGIN
SELECT * FROM jobs WHERE status='QUEUED' FOR UPDATE
UPDATE jobs SET status='IN PROGRESS' WHERE status='QUEUED'
COMMIT

if you want just one job, then just update the WHERE criterion of the  
UPDATE statement accordingly to match the job(s) you are actually  
going to process (or just use an ORM flush if youre using the ORM).

with postgres 8.2 (and the latest trunk of 0.4) you can even do a  
RETURNING and get the whole thing in one query:

UPDATE jobs SET status='IN PROGRESS' WHERE status='QUEUED' RETURNING *

when jobs are finished I usually mark them as COMPLETE, that way you  
get a log output of job history as a bonus.








--~--~-~--~~~---~--~~
You received this message because you are subscribed to the Google Groups 
sqlalchemy group.
To post to this group, send email to sqlalchemy@googlegroups.com
To unsubscribe from this group, send email to [EMAIL PROTECTED]
For more options, visit this group at 
http://groups.google.com/group/sqlalchemy?hl=en
-~--~~~~--~~--~--~---



[sqlalchemy] Re: non-blocking row locks?

2007-10-03 Thread Ants Aasma

Coincidentally I had to implement almost exactly the same thing today.
I used separate job checkout/checkin transactions, serializable
isolation level to find out when there is a collision and job checkout
time to see which jobs are currently running. By default the checkout
time is a special date way in the past instead of NULL to make it work
better with indexing. This implements a kind of optimistic locking
that throws serialization errors when two workers checkout
concurrently. I'm hoping that this won't hit performance problems,
because the checkout process is quite fast compared to the processing,
although there will be around 40 workers running concurrently in the
cluster.

The code I used is basically this: (slightly edited to omit
confidential/superfluous stuff)

def check_out_dataset_for_processing():
for retries in xrange(MAX_RETRIES):
try:
session = Session()

# This is necessary to avoid duplicate checkouts
session.connection(Dataset).execute(SET TRANSACTION
ISOLATION LEVEL SERIALIZABLE)

# Database side CURRENT_TIMESTAMP would be a lot better
now = datetime.now()
timeout_horizon = now - timedelta(seconds=TIMEOUT)

dataset =
session.query(Dataset).filter_by(is_processed=False)\
.filter(Dataset.last_checkout 
timeout_horizon).first()

if dataset:
# If found something mark it checked out
dataset.last_checkout = now
result = dataset.id, dataset.data_for_processing
else:
result = None

session.commit()

return result
except sqlalchemy.exceptions.ProgrammingError, e:
if e.orig.pgcode != '40001': # Ignore serialization
conflicts
raise
logger.error('Failed to checkout a dataset')

def store_processing_result(dataset_id, processing_result):
session = Session()

dataset = session.query(Dataset).filter_by(id=dataset_id).first()

dataset.result = processing_result
dataset.is_processed = True

session.commit()

In my case duplicate execution is only a performance issue so when a
worker times out due to crashing or just being slow that dataset is
handed to another worker. Though this code can easily be modified to
do something different in case of timeout. I don't have a separate job
table and the datasets table is millions of rows so to get good
performance I used a partial index (currently only in trunk):

Index('unprocessed_datasets_idx', datasets.c.last_checkout,
postgres_where=datasets.c.is_processed == False)

This reduces the job lookup to a simple index lookup.


--~--~-~--~~~---~--~~
You received this message because you are subscribed to the Google Groups 
sqlalchemy group.
To post to this group, send email to sqlalchemy@googlegroups.com
To unsubscribe from this group, send email to [EMAIL PROTECTED]
For more options, visit this group at 
http://groups.google.com/group/sqlalchemy?hl=en
-~--~~~~--~~--~--~---