Tom Lane írta:
> Boszormenyi Zoltan <z...@cybertec.at> writes:
>   
>> Tom Lane írta:
>>     
>>> If this patch is touching those parts of relcache.c, it probably needs
>>> rethinking.
>>>       
>
>   
>> What I did there is to check the return value of LockRelationOid()
>> and also elog(PANIC) if the lock wasn't available.
>> Does it need rethinking?
>>     
>
> Yes.  What you have done is to change all the LockSomething primitives
> from return void to return bool and thereby require all call sites to
> check their results.  This is a bad idea.

Okay, can you tell me how can I get the relation name
out of the xid in XactLockTableWait()? There are several
call site of this function, and your idea about putting the error
code into the LockSomething() functions to preserve the API
results strange error messages, like

ERROR:  could not obtain lock on transaction with ID 658

when I want to UPDATE a tuple in a session when
this and another session have a FOR SHARE lock
on said tuple.

>   There is no way that you can
> ensure that all third-party modules will make the same change, meaning
> that accepting this patch will certainly introduce nasty, hard to
> reproduce bugs.  And what's the advantage?  The callers are all going
> to throw errors anyway, so you might as well do that within the Lock
> function and avoid the system-wide API change.
>
> I think this is a big patch with a small patch struggling to get out.
>   

Your smaller patch is attached, with the above strangeness. :-)

Best regards,
Zoltán Böszörményi

-- 
Bible has answers for everything. Proof:
"But let your communication be, Yea, yea; Nay, nay: for whatsoever is more
than these cometh of evil." (Matthew 5:37) - basics of digital technology.
"May your kingdom come" - superficial description of plate tectonics

----------------------------------
Zoltán Böszörményi
Cybertec Schönig & Schönig GmbH
http://www.postgresql.at/

diff -dcrpN pgsql.orig/doc/src/sgml/config.sgml pgsql.5/doc/src/sgml/config.sgml
*** pgsql.orig/doc/src/sgml/config.sgml	2010-01-06 08:43:22.000000000 +0100
--- pgsql.5/doc/src/sgml/config.sgml	2010-01-13 18:59:19.000000000 +0100
*************** COPY postgres_log FROM '/full/path/to/lo
*** 4191,4196 ****
--- 4191,4220 ----
        </listitem>
       </varlistentry>
  
+      <varlistentry id="guc-lock-timeout" xreflabel="lock_timeout">
+       <term><varname>lock_timeout</varname> (<type>integer</type>)</term>
+       <indexterm>
+        <primary><varname>lock_timeout</> configuration parameter</primary>
+       </indexterm>
+       <listitem>
+        <para>
+         Abort any statement that tries to acquire a heavy-weight lock (e.g. rows,
+         pages, tables, indices or other objects) and the lock has to wait more
+         than the specified number of milliseconds, starting from the time the
+         command arrives at the server from the client.
+         If <varname>log_min_error_statement</> is set to <literal>ERROR</> or lower,
+         the statement that timed out will also be logged. A value of zero
+         (the default) turns off the limitation.
+        </para>
+ 
+        <para>
+         Setting <varname>lock_timeout</> in
+         <filename>postgresql.conf</> is not recommended because it
+         affects all sessions.
+        </para>      
+       </listitem>   
+      </varlistentry>
+ 
       <varlistentry id="guc-vacuum-freeze-table-age" xreflabel="vacuum_freeze_table_age">
        <term><varname>vacuum_freeze_table_age</varname> (<type>integer</type>)</term>
        <indexterm>
diff -dcrpN pgsql.orig/doc/src/sgml/ref/lock.sgml pgsql.5/doc/src/sgml/ref/lock.sgml
*** pgsql.orig/doc/src/sgml/ref/lock.sgml	2009-09-18 08:26:40.000000000 +0200
--- pgsql.5/doc/src/sgml/ref/lock.sgml	2010-01-13 18:59:19.000000000 +0100
*************** LOCK [ TABLE ] [ ONLY ] <replaceable cla
*** 39,46 ****
     <literal>NOWAIT</literal> is specified, <command>LOCK
     TABLE</command> does not wait to acquire the desired lock: if it
     cannot be acquired immediately, the command is aborted and an
!    error is emitted.  Once obtained, the lock is held for the
!    remainder of the current transaction.  (There is no <command>UNLOCK
     TABLE</command> command; locks are always released at transaction
     end.)
    </para>
--- 39,49 ----
     <literal>NOWAIT</literal> is specified, <command>LOCK
     TABLE</command> does not wait to acquire the desired lock: if it
     cannot be acquired immediately, the command is aborted and an
!    error is emitted. If <varname>lock_timeout</varname> is set to a value
!    higher than 0, and the lock cannot be acquired under the specified
!    timeout value in milliseconds, the command is aborted and an error
!    is emitted. Once obtained, the lock is held for the remainder of  
!    the current transaction.  (There is no <command>UNLOCK
     TABLE</command> command; locks are always released at transaction
     end.)
    </para>
diff -dcrpN pgsql.orig/doc/src/sgml/ref/select.sgml pgsql.5/doc/src/sgml/ref/select.sgml
*** pgsql.orig/doc/src/sgml/ref/select.sgml	2009-10-29 15:23:52.000000000 +0100
--- pgsql.5/doc/src/sgml/ref/select.sgml	2010-01-13 18:59:19.000000000 +0100
*************** FOR SHARE [ OF <replaceable class="param
*** 1121,1126 ****
--- 1121,1134 ----
     </para>
  
     <para>
+     If <literal>NOWAIT</> option is not specified and <varname>lock_timeout</varname>
+     is set to a value higher than 0, and the lock needs to wait more than
+     the specified value in milliseconds, the command reports an error after
+     timing out, rather than waiting indefinitely. The note in the previous
+     paragraph applies to the <varname>lock_timeout</varname>, too.
+    </para>
+ 
+    <para>
      If specific tables are named in <literal>FOR UPDATE</literal>
      or <literal>FOR SHARE</literal>,
      then only rows coming from those tables are locked; any other
diff -dcrpN pgsql.orig/src/backend/access/transam/multixact.c pgsql.5/src/backend/access/transam/multixact.c
*** pgsql.orig/src/backend/access/transam/multixact.c	2010-01-03 12:54:03.000000000 +0100
--- pgsql.5/src/backend/access/transam/multixact.c	2010-01-13 19:26:27.000000000 +0100
***************
*** 58,63 ****
--- 58,64 ----
  #include "pg_trace.h"
  #include "storage/backendid.h"
  #include "storage/lmgr.h"
+ #include "storage/proc.h"
  #include "storage/procarray.h"
  #include "utils/builtins.h"
  #include "utils/memutils.h"
diff -dcrpN pgsql.orig/src/backend/port/posix_sema.c pgsql.5/src/backend/port/posix_sema.c
*** pgsql.orig/src/backend/port/posix_sema.c	2010-01-03 12:54:22.000000000 +0100
--- pgsql.5/src/backend/port/posix_sema.c	2010-01-13 18:59:19.000000000 +0100
***************
*** 24,29 ****
--- 24,30 ----
  #include "miscadmin.h"
  #include "storage/ipc.h"
  #include "storage/pg_sema.h"
+ #include "storage/proc.h" /* for LockTimeout */
  
  
  #ifdef USE_NAMED_POSIX_SEMAPHORES
*************** PGSemaphoreTryLock(PGSemaphore sema)
*** 313,315 ****
--- 314,358 ----
  
  	return true;
  }
+ 
+ /*
+  * PGSemaphoreTimedLock
+  *
+  * Lock a semaphore only if able to do so under the lock_timeout
+  */
+ bool
+ PGSemaphoreTimedLock(PGSemaphore sema, bool interruptOK)
+ {
+ 	int		errStatus;
+ 	struct timespec	timeout;
+ 
+ 	/*
+ 	 * See notes in sysv_sema.c's implementation of PGSemaphoreLock. Just as
+ 	 * that code does for semop(), we handle both the case where sem_wait()
+ 	 * returns errno == EINTR after a signal, and the case where it just keeps
+ 	 * waiting.
+ 	 */
+ 	do
+ 	{
+ 		ImmediateInterruptOK = interruptOK;
+ 		CHECK_FOR_INTERRUPTS();
+ 		if (LockTimeout)
+ 		{
+ 			timeout.tv_sec = LockTimeout / 1000;
+ 			timeout.tv_nsec = (LockTimeout % 1000) * 1000000;
+ 			errStatus = sem_timedwait(PG_SEM_REF(sema), &timeout);
+ 		}
+ 		else
+ 			errStatus = sem_wait(PG_SEM_REF(sema));
+ 		ImmediateInterruptOK = false;
+ 	} while (errStatus < 0 && errno == EINTR);
+ 
+ 	if (errStatus < 0)
+ 	{
+ 		if (errno == ETIMEDOUT)
+ 			return false;		/* failed to lock it */
+ 		/* Otherwise we got trouble */
+ 		elog(FATAL, "sem_wait failed: %m");
+ 	}
+ 	return true;
+ }
diff -dcrpN pgsql.orig/src/backend/port/sysv_sema.c pgsql.5/src/backend/port/sysv_sema.c
*** pgsql.orig/src/backend/port/sysv_sema.c	2010-01-03 12:54:22.000000000 +0100
--- pgsql.5/src/backend/port/sysv_sema.c	2010-01-13 18:59:19.000000000 +0100
***************
*** 30,35 ****
--- 30,36 ----
  #include "miscadmin.h"
  #include "storage/ipc.h"
  #include "storage/pg_sema.h"
+ #include "storage/proc.h" /* for LockTimeout */
  
  
  #ifndef HAVE_UNION_SEMUN
*************** PGSemaphoreTryLock(PGSemaphore sema)
*** 497,499 ****
--- 498,589 ----
  
  	return true;
  }
+ 
+ /*
+  * PGSemaphoreTimedLock
+  *
+  * Lock a semaphore only if able to do so under the lock_timeout
+  */
+ bool
+ PGSemaphoreTimedLock(PGSemaphore sema, bool interruptOK)
+ {
+ 	int			errStatus;
+ 	struct sembuf sops;
+ 	struct timespec timeout;
+ 
+ 	sops.sem_op = -1;			/* decrement */
+ 	sops.sem_flg = 0;
+ 	sops.sem_num = sema->semNum;
+ 
+ 	/*
+ 	 * Note: if errStatus is -1 and errno == EINTR then it means we returned
+ 	 * from the operation prematurely because we were sent a signal.  So we
+ 	 * try and lock the semaphore again.
+ 	 *
+ 	 * Each time around the loop, we check for a cancel/die interrupt.      On
+ 	 * some platforms, if such an interrupt comes in while we are waiting, it
+ 	 * will cause the semop() call to exit with errno == EINTR, allowing us to
+ 	 * service the interrupt (if not in a critical section already) during the
+ 	 * next loop iteration.
+ 	 *
+ 	 * Once we acquire the lock, we do NOT check for an interrupt before
+ 	 * returning.  The caller needs to be able to record ownership of the lock
+ 	 * before any interrupt can be accepted.
+ 	 *
+ 	 * There is a window of a few instructions between CHECK_FOR_INTERRUPTS
+ 	 * and entering the semop() call.  If a cancel/die interrupt occurs in
+ 	 * that window, we would fail to notice it until after we acquire the lock
+ 	 * (or get another interrupt to escape the semop()).  We can avoid this
+ 	 * problem by temporarily setting ImmediateInterruptOK to true before we
+ 	 * do CHECK_FOR_INTERRUPTS; then, a die() interrupt in this interval will
+ 	 * execute directly.  However, there is a huge pitfall: there is another
+ 	 * window of a few instructions after the semop() before we are able to
+ 	 * reset ImmediateInterruptOK.  If an interrupt occurs then, we'll lose
+ 	 * control, which means that the lock has been acquired but our caller did
+ 	 * not get a chance to record the fact. Therefore, we only set
+ 	 * ImmediateInterruptOK if the caller tells us it's OK to do so, ie, the
+ 	 * caller does not need to record acquiring the lock.  (This is currently
+ 	 * true for lockmanager locks, since the process that granted us the lock
+ 	 * did all the necessary state updates. It's not true for SysV semaphores
+ 	 * used to implement LW locks or emulate spinlocks --- but the wait time
+ 	 * for such locks should not be very long, anyway.)
+ 	 *
+ 	 * On some platforms, signals marked SA_RESTART (which is most, for us)
+ 	 * will not interrupt the semop(); it will just keep waiting.  Therefore
+ 	 * it's necessary for cancel/die interrupts to be serviced directly by the
+ 	 * signal handler.      On these platforms the behavior is really the same
+ 	 * whether the signal arrives just before the semop() begins, or while it
+ 	 * is waiting.  The loop on EINTR is thus important only for other types
+ 	 * of interrupts.
+ 	 */
+ 	do
+ 	{
+ 		ImmediateInterruptOK = interruptOK;
+ 		CHECK_FOR_INTERRUPTS();
+ 		if (LockTimeout)
+ 		{
+ 			timeout.tv_sec = LockTimeout / 1000;
+ 			timeout.tv_nsec = (LockTimeout % 1000) * 1000000;
+ 			errStatus = semtimedop(sema->semId, &sops, 1, &timeout);
+ 		}
+ 		else
+ 			errStatus = semop(sema->semId, &sops, 1);
+ 		ImmediateInterruptOK = false;
+ 	} while (errStatus < 0 && errno == EINTR);
+ 
+ 	if (errStatus < 0)
+ 	{
+ 		/* Expect EAGAIN or EWOULDBLOCK (platform-dependent) */
+ #ifdef EAGAIN
+ 		if (errno == EAGAIN)
+ 			return false;		/* failed to lock it */
+ #endif
+ #if defined(EWOULDBLOCK) && (!defined(EAGAIN) || (EWOULDBLOCK != EAGAIN))
+ 		if (errno == EWOULDBLOCK)
+ 			return false;		/* failed to lock it */
+ #endif
+ 		/* Otherwise we got trouble */
+ 		elog(FATAL, "semop(id=%d) failed: %m", sema->semId);
+ 	}
+ 	return true;
+ }
diff -dcrpN pgsql.orig/src/backend/port/win32_sema.c pgsql.5/src/backend/port/win32_sema.c
*** pgsql.orig/src/backend/port/win32_sema.c	2010-01-03 12:54:22.000000000 +0100
--- pgsql.5/src/backend/port/win32_sema.c	2010-01-13 18:59:19.000000000 +0100
***************
*** 16,21 ****
--- 16,22 ----
  #include "miscadmin.h"
  #include "storage/ipc.h"
  #include "storage/pg_sema.h"
+ #include "storage/proc.h" /* for LockTimeout */
  
  static HANDLE *mySemSet;		/* IDs of sema sets acquired so far */
  static int	numSems;			/* number of sema sets acquired so far */
*************** PGSemaphoreTryLock(PGSemaphore sema)
*** 205,207 ****
--- 206,266 ----
  	/* keep compiler quiet */
  	return false;
  }
+ 
+ /*
+  * PGSemaphoreTimedLock
+  *
+  * Lock a semaphore only if able to do so under the lock_timeout
+  * Serve the interrupt if interruptOK is true.
+  */
+ bool
+ PGSemaphoreTimedLock(PGSemaphore sema, bool interruptOK)
+ {
+ 	DWORD		ret;
+ 	HANDLE		wh[2];
+ 
+ 	wh[0] = *sema;
+ 	wh[1] = pgwin32_signal_event;
+ 
+ 	/*
+ 	 * As in other implementations of PGSemaphoreLock, we need to check for
+ 	 * cancel/die interrupts each time through the loop.  But here, there is
+ 	 * no hidden magic about whether the syscall will internally service a
+ 	 * signal --- we do that ourselves.
+ 	 */
+ 	do
+ 	{
+ 		ImmediateInterruptOK = interruptOK;
+ 		CHECK_FOR_INTERRUPTS();
+ 
+ 		errno = 0;
+ 		ret = WaitForMultipleObjectsEx(2, wh, FALSE, LockTimeout ? LockTimeout : INFINITE, TRUE);
+ 
+ 		if (ret == WAIT_OBJECT_0)
+ 		{
+ 			/* We got it! */
+ 			return true;
+ 		}
+ 		else if (ret == WAIT_TIMEOUT)
+ 		{
+ 			/* Can't get it */
+ 			errno = EAGAIN;
+ 			return false;
+ 		}
+ 		else if (ret == WAIT_OBJECT_0 + 1)
+ 		{
+ 			/* Signal event is set - we have a signal to deliver */
+ 			pgwin32_dispatch_queued_signals();
+ 			errno = EINTR;
+ 		}
+ 		else
+ 			/* Otherwise we are in trouble */
+ 			errno = EIDRM;
+ 
+ 		ImmediateInterruptOK = false;
+ 	} while (errno == EINTR);
+ 
+ 	if (errno != 0)
+ 		ereport(FATAL,
+ 				(errmsg("could not lock semaphore: error code %d", (int) GetLastError())));
+ }
diff -dcrpN pgsql.orig/src/backend/storage/lmgr/lmgr.c pgsql.5/src/backend/storage/lmgr/lmgr.c
*** pgsql.orig/src/backend/storage/lmgr/lmgr.c	2010-01-03 12:54:25.000000000 +0100
--- pgsql.5/src/backend/storage/lmgr/lmgr.c	2010-01-13 19:55:58.000000000 +0100
***************
*** 19,26 ****
--- 19,29 ----
  #include "access/transam.h"
  #include "access/xact.h"
  #include "catalog/catalog.h"
+ #include "catalog/pg_database.h"
  #include "miscadmin.h"
  #include "storage/lmgr.h"
+ #include "utils/lsyscache.h"
+ #include "storage/proc.h"
  #include "storage/procarray.h"
  #include "utils/inval.h"
  
*************** LockRelationOid(Oid relid, LOCKMODE lock
*** 78,83 ****
--- 81,101 ----
  
  	res = LockAcquire(&tag, lockmode, false, false);
  
+ 	if (res == LOCKACQUIRE_NOT_AVAIL)
+ 	{
+ 		char	   *relname = get_rel_name(relid);
+ 		if (relname)
+ 			ereport(ERROR,
+ 					(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+ 						errmsg("could not obtain lock on relation \"%s\"",
+ 						relname)));
+ 		else
+ 			ereport(ERROR,
+ 					(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+ 						errmsg("could not obtain lock on relation with OID %u",
+ 						relid)));
+ 	}
+ 
  	/*
  	 * Now that we have the lock, check for invalidation messages, so that we
  	 * will update or flush any stale relcache entry before we try to use it.
*************** LockRelation(Relation relation, LOCKMODE
*** 173,178 ****
--- 191,202 ----
  
  	res = LockAcquire(&tag, lockmode, false, false);
  
+ 	if (res == LOCKACQUIRE_NOT_AVAIL)
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+ 					errmsg("could not obtain lock on relation \"%s\"",
+ 				RelationGetRelationName(relation))));
+ 
  	/*
  	 * Now that we have the lock, check for invalidation messages; see notes
  	 * in LockRelationOid.
*************** LockRelationIdForSession(LockRelId *reli
*** 250,256 ****
  
  	SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
  
! 	(void) LockAcquire(&tag, lockmode, true, false);
  }
  
  /*
--- 274,284 ----
  
  	SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
  
! 	if (LockAcquire(&tag, lockmode, true, false) == LOCKACQUIRE_NOT_AVAIL)
! 		ereport(ERROR,
! 				(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
! 					errmsg("could not obtain lock on relation \"%s\"",
! 							get_rel_name(relid->relId))));
  }
  
  /*
*************** LockRelationForExtension(Relation relati
*** 285,291 ****
  								relation->rd_lockInfo.lockRelId.dbId,
  								relation->rd_lockInfo.lockRelId.relId);
  
! 	(void) LockAcquire(&tag, lockmode, false, false);
  }
  
  /*
--- 313,323 ----
  								relation->rd_lockInfo.lockRelId.dbId,
  								relation->rd_lockInfo.lockRelId.relId);
  
! 	if (LockAcquire(&tag, lockmode, false, false) == LOCKACQUIRE_NOT_AVAIL)
! 		ereport(ERROR,
! 				(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
! 					errmsg("could not obtain lock on index \"%s\"",
! 				RelationGetRelationName(relation))));
  }
  
  /*
*************** LockPage(Relation relation, BlockNumber 
*** 319,325 ****
  					 relation->rd_lockInfo.lockRelId.relId,
  					 blkno);
  
! 	(void) LockAcquire(&tag, lockmode, false, false);
  }
  
  /*
--- 351,361 ----
  					 relation->rd_lockInfo.lockRelId.relId,
  					 blkno);
  
! 	if (LockAcquire(&tag, lockmode, false, false) == LOCKACQUIRE_NOT_AVAIL)
! 		ereport(ERROR,
! 				(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
! 					errmsg("could not obtain lock on page %u of relation \"%s\"",
! 				blkno, RelationGetRelationName(relation))));
  }
  
  /*
*************** LockTuple(Relation relation, ItemPointer
*** 375,381 ****
  					  ItemPointerGetBlockNumber(tid),
  					  ItemPointerGetOffsetNumber(tid));
  
! 	(void) LockAcquire(&tag, lockmode, false, false);
  }
  
  /*
--- 411,421 ----
  					  ItemPointerGetBlockNumber(tid),
  					  ItemPointerGetOffsetNumber(tid));
  
! 	if (LockAcquire(&tag, lockmode, false, false) == LOCKACQUIRE_NOT_AVAIL)
! 		ereport(ERROR,
! 				(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
! 					errmsg("could not obtain lock on row in relation \"%s\"",
! 				RelationGetRelationName(relation))));
  }
  
  /*
*************** XactLockTableInsert(TransactionId xid)
*** 429,435 ****
  
  	SET_LOCKTAG_TRANSACTION(tag, xid);
  
! 	(void) LockAcquire(&tag, ExclusiveLock, false, false);
  }
  
  /*
--- 469,478 ----
  
  	SET_LOCKTAG_TRANSACTION(tag, xid);
  
! 	if (LockAcquire(&tag, ExclusiveLock, false, false) == LOCKACQUIRE_NOT_AVAIL)
! 		ereport(ERROR,
! 				(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
! 					errmsg("could not obtain lock on transation with ID %u", xid)));
  }
  
  /*
*************** XactLockTableWait(TransactionId xid)
*** 473,479 ****
  
  		SET_LOCKTAG_TRANSACTION(tag, xid);
  
! 		(void) LockAcquire(&tag, ShareLock, false, false);
  
  		LockRelease(&tag, ShareLock, false);
  
--- 516,525 ----
  
  		SET_LOCKTAG_TRANSACTION(tag, xid);
  
! 		if (LockAcquire(&tag, ShareLock, false, false) == LOCKACQUIRE_NOT_AVAIL)
! 			ereport(ERROR,
! 					(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
! 						errmsg("could not obtain lock on transaction with ID %u", xid)));
  
  		LockRelease(&tag, ShareLock, false);
  
*************** VirtualXactLockTableInsert(VirtualTransa
*** 531,537 ****
  
  	SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid);
  
! 	(void) LockAcquire(&tag, ExclusiveLock, false, false);
  }
  
  /*
--- 577,587 ----
  
  	SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid);
  
! 	if (LockAcquire(&tag, ExclusiveLock, false, false) == LOCKACQUIRE_NOT_AVAIL)
! 		ereport(ERROR,
! 				(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
! 					errmsg("could not obtain lock on virtual transaction with ID %u",
! 				vxid.localTransactionId)));
  }
  
  /*
*************** VirtualXactLockTableWait(VirtualTransact
*** 549,555 ****
  
  	SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid);
  
! 	(void) LockAcquire(&tag, ShareLock, false, false);
  
  	LockRelease(&tag, ShareLock, false);
  }
--- 599,609 ----
  
  	SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid);
  
! 	if (LockAcquire(&tag, ShareLock, false, false) == LOCKACQUIRE_NOT_AVAIL)
! 		ereport(ERROR,
! 				(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
! 					errmsg("could not obtain lock on virtual transaction with ID %u",
! 				vxid.localTransactionId)));
  
  	LockRelease(&tag, ShareLock, false);
  }
*************** LockDatabaseObject(Oid classid, Oid obji
*** 598,604 ****
  					   objid,
  					   objsubid);
  
! 	(void) LockAcquire(&tag, lockmode, false, false);
  }
  
  /*
--- 652,662 ----
  					   objid,
  					   objsubid);
  
! 	if (LockAcquire(&tag, lockmode, false, false) == LOCKACQUIRE_NOT_AVAIL)
! 		ereport(ERROR,
! 				(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
! 					errmsg("could not obtain lock on class:object: %u:%u",
! 				classid, objid)));
  }
  
  /*
*************** LockSharedObject(Oid classid, Oid objid,
*** 636,642 ****
  					   objid,
  					   objsubid);
  
! 	(void) LockAcquire(&tag, lockmode, false, false);
  
  	/* Make sure syscaches are up-to-date with any changes we waited for */
  	AcceptInvalidationMessages();
--- 694,704 ----
  					   objid,
  					   objsubid);
  
! 	if (LockAcquire(&tag, lockmode, false, false) == LOCKACQUIRE_NOT_AVAIL)
! 		ereport(ERROR,
! 				(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
! 					errmsg("could not obtain lock on class:object: %u:%u",
! 				classid, objid)));
  
  	/* Make sure syscaches are up-to-date with any changes we waited for */
  	AcceptInvalidationMessages();
*************** LockSharedObjectForSession(Oid classid, 
*** 678,684 ****
  					   objid,
  					   objsubid);
  
! 	(void) LockAcquire(&tag, lockmode, true, false);
  }
  
  /*
--- 740,761 ----
  					   objid,
  					   objsubid);
  
! 	if (LockAcquire(&tag, lockmode, true, false) == LOCKACQUIRE_NOT_AVAIL)
! 		switch(classid)
! 		{
! 		case DatabaseRelationId:
! 			ereport(ERROR,
! 					(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
! 						errmsg("could not obtain lock on database with ID %u",
! 					objid)));
! 			break;
! 		default:
! 			ereport(ERROR,
! 					(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
! 						errmsg("could not obtain lock on class:object: %u:%u",
! 					classid, objid)));
! 			break;
! 		}
  }
  
  /*
diff -dcrpN pgsql.orig/src/backend/storage/lmgr/lock.c pgsql.5/src/backend/storage/lmgr/lock.c
*** pgsql.orig/src/backend/storage/lmgr/lock.c	2010-01-03 12:54:25.000000000 +0100
--- pgsql.5/src/backend/storage/lmgr/lock.c	2010-01-13 18:59:19.000000000 +0100
*************** PROCLOCK_PRINT(const char *where, const 
*** 255,261 ****
  static uint32 proclock_hash(const void *key, Size keysize);
  static void RemoveLocalLock(LOCALLOCK *locallock);
  static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
! static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
  static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
  			PROCLOCK *proclock, LockMethod lockMethodTable);
  static void CleanUpLock(LOCK *lock, PROCLOCK *proclock,
--- 255,261 ----
  static uint32 proclock_hash(const void *key, Size keysize);
  static void RemoveLocalLock(LOCALLOCK *locallock);
  static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
! static int WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
  static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
  			PROCLOCK *proclock, LockMethod lockMethodTable);
  static void CleanUpLock(LOCK *lock, PROCLOCK *proclock,
*************** ProcLockHashCode(const PROCLOCKTAG *proc
*** 451,457 ****
   *	dontWait: if true, don't wait to acquire lock
   *
   * Returns one of:
!  *		LOCKACQUIRE_NOT_AVAIL		lock not available, and dontWait=true
   *		LOCKACQUIRE_OK				lock successfully acquired
   *		LOCKACQUIRE_ALREADY_HELD	incremented count for lock already held
   *
--- 451,457 ----
   *	dontWait: if true, don't wait to acquire lock
   *
   * Returns one of:
!  *		LOCKACQUIRE_NOT_AVAIL		lock not available, either dontWait=true or timeout
   *		LOCKACQUIRE_OK				lock successfully acquired
   *		LOCKACQUIRE_ALREADY_HELD	incremented count for lock already held
   *
*************** LockAcquireExtended(const LOCKTAG *lockt
*** 849,855 ****
  										 locktag->locktag_type,
  										 lockmode);
  
! 		WaitOnLock(locallock, owner);
  
  		TRACE_POSTGRESQL_LOCK_WAIT_DONE(locktag->locktag_field1,
  										locktag->locktag_field2,
--- 849,855 ----
  										 locktag->locktag_type,
  										 lockmode);
  
! 		status = WaitOnLock(locallock, owner);
  
  		TRACE_POSTGRESQL_LOCK_WAIT_DONE(locktag->locktag_field1,
  										locktag->locktag_field2,
*************** LockAcquireExtended(const LOCKTAG *lockt
*** 864,883 ****
  		 * done when the lock was granted to us --- see notes in WaitOnLock.
  		 */
  
! 		/*
! 		 * Check the proclock entry status, in case something in the ipc
! 		 * communication doesn't work correctly.
! 		 */
! 		if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
  		{
! 			PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
! 			LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
! 			/* Should we retry ? */
! 			LWLockRelease(partitionLock);
! 			elog(ERROR, "LockAcquire failed");
  		}
- 		PROCLOCK_PRINT("LockAcquire: granted", proclock);
- 		LOCK_PRINT("LockAcquire: granted", lock, lockmode);
  	}
  
  	LWLockRelease(partitionLock);
--- 864,895 ----
  		 * done when the lock was granted to us --- see notes in WaitOnLock.
  		 */
  
! 		switch (status)
  		{
! 		case STATUS_OK:
! 			/*
! 			 * Check the proclock entry status, in case something in the ipc
! 			 * communication doesn't work correctly.
! 			 */
! 			if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
! 			{
! 				PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
! 				LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
! 				/* Should we retry ? */
! 				LWLockRelease(partitionLock);
! 				elog(ERROR, "LockAcquire failed");
! 			}
! 			PROCLOCK_PRINT("LockAcquire: granted", proclock);
! 			LOCK_PRINT("LockAcquire: granted", lock, lockmode);
! 			break;
! 		case STATUS_WAITING:
! 			PROCLOCK_PRINT("LockAcquire: timed out", proclock);
! 			LOCK_PRINT("LockAcquire: timed out", lock, lockmode);
! 			break;
! 		default:
! 			elog(ERROR, "LockAcquire invalid status");
! 			break;
  		}
  	}
  
  	LWLockRelease(partitionLock);
*************** LockAcquireExtended(const LOCKTAG *lockt
*** 903,909 ****
  							   locktag->locktag_field2);
  	}
  
! 	return LOCKACQUIRE_OK;
  }
  
  /*
--- 915,921 ----
  							   locktag->locktag_field2);
  	}
  
! 	return (status == STATUS_OK ? LOCKACQUIRE_OK : LOCKACQUIRE_NOT_AVAIL);
  }
  
  /*
*************** GrantAwaitedLock(void)
*** 1181,1194 ****
   * Caller must have set MyProc->heldLocks to reflect locks already held
   * on the lockable object by this process.
   *
   * The appropriate partition lock must be held at entry.
   */
! static void
  WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
  {
  	LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock);
  	LockMethod	lockMethodTable = LockMethods[lockmethodid];
  	char	   *volatile new_status = NULL;
  
  	LOCK_PRINT("WaitOnLock: sleeping on lock",
  			   locallock->lock, locallock->tag.mode);
--- 1193,1212 ----
   * Caller must have set MyProc->heldLocks to reflect locks already held
   * on the lockable object by this process.
   *
+  * Result: returns value of ProcSleep()
+  *	STATUS_OK if we acquired the lock
+  *	STATUS_ERROR if not (deadlock)
+  *	STATUS_WAITING if not (timeout)
+  *
   * The appropriate partition lock must be held at entry.
   */
! static int
  WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
  {
  	LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock);
  	LockMethod	lockMethodTable = LockMethods[lockmethodid];
  	char	   *volatile new_status = NULL;
+ 	int		wait_status;
  
  	LOCK_PRINT("WaitOnLock: sleeping on lock",
  			   locallock->lock, locallock->tag.mode);
*************** WaitOnLock(LOCALLOCK *locallock, Resourc
*** 1230,1237 ****
  	 */
  	PG_TRY();
  	{
! 		if (ProcSleep(locallock, lockMethodTable) != STATUS_OK)
  		{
  			/*
  			 * We failed as a result of a deadlock, see CheckDeadLock(). Quit
  			 * now.
--- 1248,1260 ----
  	 */
  	PG_TRY();
  	{
! 		wait_status = ProcSleep(locallock, lockMethodTable);
! 		switch (wait_status)
  		{
+ 		case STATUS_OK:
+ 		case STATUS_WAITING:
+ 			break;
+ 		default:
  			/*
  			 * We failed as a result of a deadlock, see CheckDeadLock(). Quit
  			 * now.
*************** WaitOnLock(LOCALLOCK *locallock, Resourc
*** 1276,1283 ****
  		pfree(new_status);
  	}
  
! 	LOCK_PRINT("WaitOnLock: wakeup on lock",
  			   locallock->lock, locallock->tag.mode);
  }
  
  /*
--- 1299,1312 ----
  		pfree(new_status);
  	}
  
! 	if (wait_status == STATUS_OK)
! 		LOCK_PRINT("WaitOnLock: wakeup on lock",
! 			   locallock->lock, locallock->tag.mode);
! 	else if (wait_status == STATUS_WAITING)
! 		LOCK_PRINT("WaitOnLock: timeout on lock",
  			   locallock->lock, locallock->tag.mode);
+ 
+ 	return wait_status;
  }
  
  /*
diff -dcrpN pgsql.orig/src/backend/storage/lmgr/proc.c pgsql.5/src/backend/storage/lmgr/proc.c
*** pgsql.orig/src/backend/storage/lmgr/proc.c	2010-01-03 12:54:25.000000000 +0100
--- pgsql.5/src/backend/storage/lmgr/proc.c	2010-01-13 18:59:19.000000000 +0100
***************
*** 46,55 ****
  #include "storage/procarray.h"
  #include "storage/spin.h"
  
- 
  /* GUC variables */
  int			DeadlockTimeout = 1000;
  int			StatementTimeout = 0;
  bool		log_lock_waits = false;
  
  /* Pointer to this process's PGPROC struct, if any */
--- 46,55 ----
  #include "storage/procarray.h"
  #include "storage/spin.h"
  
  /* GUC variables */
  int			DeadlockTimeout = 1000;
  int			StatementTimeout = 0;
+ int			LockTimeout = 0;
  bool		log_lock_waits = false;
  
  /* Pointer to this process's PGPROC struct, if any */
*************** ProcQueueInit(PROC_QUEUE *queue)
*** 743,749 ****
   * The lock table's partition lock must be held at entry, and will be held
   * at exit.
   *
!  * Result: STATUS_OK if we acquired the lock, STATUS_ERROR if not (deadlock).
   *
   * ASSUME: that no one will fiddle with the queue until after
   *		we release the partition lock.
--- 743,752 ----
   * The lock table's partition lock must be held at entry, and will be held
   * at exit.
   *
!  * Result:
!  *     STATUS_OK if we acquired the lock
!  *     STATUS_ERROR if not (deadlock)
!  *     STATUS_WAITING if not (timeout)
   *
   * ASSUME: that no one will fiddle with the queue until after
   *		we release the partition lock.
*************** ProcSleep(LOCALLOCK *locallock, LockMeth
*** 765,770 ****
--- 768,774 ----
  	LOCKMASK	myHeldLocks = MyProc->heldLocks;
  	bool		early_deadlock = false;
  	bool		allow_autovacuum_cancel = true;
+ 	bool		timeout_detected = false;
  	int			myWaitStatus;
  	PGPROC	   *proc;
  	int			i;
*************** ProcSleep(LOCALLOCK *locallock, LockMeth
*** 897,903 ****
  		elog(FATAL, "could not set timer for process wakeup");
  
  	/*
! 	 * If someone wakes us between LWLockRelease and PGSemaphoreLock,
  	 * PGSemaphoreLock will not block.	The wakeup is "saved" by the semaphore
  	 * implementation.	While this is normally good, there are cases where a
  	 * saved wakeup might be leftover from a previous operation (for example,
--- 901,907 ----
  		elog(FATAL, "could not set timer for process wakeup");
  
  	/*
! 	 * If someone wakes us between LWLockRelease and PGSemaphoreTimedLock,
  	 * PGSemaphoreLock will not block.	The wakeup is "saved" by the semaphore
  	 * implementation.	While this is normally good, there are cases where a
  	 * saved wakeup might be leftover from a previous operation (for example,
*************** ProcSleep(LOCALLOCK *locallock, LockMeth
*** 915,921 ****
  	 */
  	do
  	{
! 		PGSemaphoreLock(&MyProc->sem, true);
  
  		/*
  		 * waitStatus could change from STATUS_WAITING to something else
--- 919,929 ----
  	 */
  	do
  	{
! 		if (!PGSemaphoreTimedLock(&MyProc->sem, true))
! 		{
! 			timeout_detected = true;
! 			break;
! 		}
  
  		/*
  		 * waitStatus could change from STATUS_WAITING to something else
*************** ProcSleep(LOCALLOCK *locallock, LockMeth
*** 1055,1060 ****
--- 1063,1076 ----
  	LWLockAcquire(partitionLock, LW_EXCLUSIVE);
  
  	/*
+ 	 * If we're in timeout, so we're not waiting anymore and
+ 	 * we're not the one that the lock will be granted to.
+ 	 * So remove ourselves from the wait queue.
+ 	 */
+ 	if (timeout_detected)
+ 		RemoveFromWaitQueue(MyProc, hashcode);
+ 
+ 	/*
  	 * We no longer want LockWaitCancel to do anything.
  	 */
  	lockAwaited = NULL;
*************** ProcSleep(LOCALLOCK *locallock, LockMeth
*** 1068,1075 ****
  	/*
  	 * We don't have to do anything else, because the awaker did all the
  	 * necessary update of the lock table and MyProc.
  	 */
! 	return MyProc->waitStatus;
  }
  
  
--- 1084,1093 ----
  	/*
  	 * We don't have to do anything else, because the awaker did all the
  	 * necessary update of the lock table and MyProc.
+ 	 * RemoveFromWaitQueue() have set MyProc->waitStatus = STATUS_ERROR,
+ 	 * we need to distinguish this case.
  	 */
! 	return (timeout_detected ? STATUS_WAITING : MyProc->waitStatus);
  }
  
  
diff -dcrpN pgsql.orig/src/backend/utils/misc/guc.c pgsql.5/src/backend/utils/misc/guc.c
*** pgsql.orig/src/backend/utils/misc/guc.c	2010-01-08 14:37:12.000000000 +0100
--- pgsql.5/src/backend/utils/misc/guc.c	2010-01-13 18:59:19.000000000 +0100
*************** static struct config_int ConfigureNamesI
*** 1584,1589 ****
--- 1584,1599 ----
  	},
  
  	{
+ 		{"lock_timeout", PGC_USERSET, CLIENT_CONN_STATEMENT,
+ 			gettext_noop("Sets the maximum allowed timeout for any lock taken by a statement."),
+ 			gettext_noop("A value of 0 turns off the timeout."),
+ 			GUC_UNIT_MS
+ 		},
+ 		&LockTimeout,
+ 		0, 0, INT_MAX, NULL, NULL
+ 	},
+ 
+ 	{
  		{"vacuum_freeze_min_age", PGC_USERSET, CLIENT_CONN_STATEMENT,
  			gettext_noop("Minimum age at which VACUUM should freeze a table row."),
  			NULL
diff -dcrpN pgsql.orig/src/include/storage/pg_sema.h pgsql.5/src/include/storage/pg_sema.h
*** pgsql.orig/src/include/storage/pg_sema.h	2010-01-03 12:54:39.000000000 +0100
--- pgsql.5/src/include/storage/pg_sema.h	2010-01-13 18:59:19.000000000 +0100
*************** extern void PGSemaphoreUnlock(PGSemaphor
*** 80,83 ****
--- 80,86 ----
  /* Lock a semaphore only if able to do so without blocking */
  extern bool PGSemaphoreTryLock(PGSemaphore sema);
  
+ /* Lock a semaphore only if able to do so under the lock_timeout */
+ extern bool PGSemaphoreTimedLock(PGSemaphore sema, bool interruptOK);
+ 
  #endif   /* PG_SEMA_H */
diff -dcrpN pgsql.orig/src/include/storage/proc.h pgsql.5/src/include/storage/proc.h
*** pgsql.orig/src/include/storage/proc.h	2010-01-03 12:54:39.000000000 +0100
--- pgsql.5/src/include/storage/proc.h	2010-01-13 18:59:19.000000000 +0100
*************** typedef struct PROC_HDR
*** 161,166 ****
--- 161,167 ----
  /* configurable options */
  extern int	DeadlockTimeout;
  extern int	StatementTimeout;
+ extern int	LockTimeout;
  extern bool log_lock_waits;
  
  extern volatile bool cancel_from_timeout;
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to