The attached patch implements the idea of Heikki / Simon published in

http://archives.postgresql.org/pgsql-hackers/2009-11/msg00271.php

Since nobody objected to the idea in general, I have implemented it.

As this is not currently used anywhere it doesn't give immediate benefit, it
is however a prerequisite for a parallel version of pg_dump that quite some
people (including myself) seem to be interested in.

Here's the comment from the patch explaining it in more detail:

/*
 * This function is for synchronization of snapshots: It can be called by
 * new transactions to get the same snapshots. It's signature is
 *
 * pg_synchronize_snapshots(text, int, int);
 *
 * The first parameter is an identifier so that several groups can request
 * synchronized snapshots concurrently.
 *
 * The second parameter is the number of backends that are expected to connect
 * in the current group (i.e.  same identifier).
 *
 * The third parameter is the timeout in milliseconds.
 *
 * Note that once we are holding the ProcArrayLock in shared mode we are
 * severely hitting the usability of the database server: for example, nobody
 * can commit nontrivial transactions during that time nor can you establish a
 * new connection! This is why you need to be superuser to use this function.
 *
 * The idea is that from one connection you call for example
 *
 * pg_synchronize_snapshot('7bd0320c4ff9252716972e160fb33b8a', 4, 1000)
 *
 * and then have 1000ms to call with some other four (already connected)
 * sessions
 *
 * BEGIN TRANSACTION;
 * SELECT pg_synchronize_snapshot_taken('7bd0320c4ff9252716972e160fb33b8a');
 *
 * If all four pg_synchronize_snapshot_taken() calls return true and the
 * function pg_synchronize_snapshot() returns true as well, you can go on and
 * all four transactions now see the same snapshot (which in general is not the
 * snapshot that the transaction saw that has initially called
 * pg_synchronize_snapshot()).
 */

Thoughts?


Joachim
diff -cr cvs.head/src/backend/storage/ipc/procarray.c cvs.build/src/backend/storage/ipc/procarray.c
*** cvs.head/src/backend/storage/ipc/procarray.c	2010-01-05 12:39:28.000000000 +0100
--- cvs.build/src/backend/storage/ipc/procarray.c	2010-01-08 20:50:31.000000000 +0100
***************
*** 81,86 ****
--- 81,96 ----
  
  static ProcArrayStruct *procArray;
  
+ typedef struct SyncSnapshotGroupStruct
+ {
+ 	char		id[NAMEDATALEN];
+ 	int			refCount;
+ } SyncSnapshotGroupStruct;
+ 
+ static SyncSnapshotGroupStruct *syncSnapshotGroups;
+ #define SYNC_SNAPSHOT_GROUPS_MAX	5
+ static void freeSyncSnapshotGroup(int idx);
+ 
  /*
   * Bookkeeping for tracking emulated transactions in recovery
   */
***************
*** 182,187 ****
--- 192,199 ----
  CreateSharedProcArray(void)
  {
  	bool		found;
+ 	bool		foundSyncSnapshot;
+ 	Size		syncSnapshotSize;
  
  	/* Create or attach to the ProcArray shared structure */
  	procArray = (ProcArrayStruct *)
***************
*** 189,194 ****
--- 201,212 ----
  							mul_size(sizeof(PGPROC *), PROCARRAY_MAXPROCS),
  							&found);
  
+ 	syncSnapshotSize = mul_size(sizeof(SyncSnapshotGroupStruct),
+ 								SYNC_SNAPSHOT_GROUPS_MAX);
+ 	syncSnapshotGroups = (SyncSnapshotGroupStruct *)
+ 		 ShmemInitStruct("Sync Snapshot Groups",
+ 						 syncSnapshotSize, &foundSyncSnapshot);
+ 
  	if (!found)
  	{
  		/*
***************
*** 201,206 ****
--- 219,230 ----
  		if (XLogRequestRecoveryConnections)
  			KnownAssignedXidsInit(TOTAL_MAX_CACHED_SUBXIDS);
  	}
+ 	if (!foundSyncSnapshot)
+ 	{
+ 		int i;
+ 		for (i = 0; i < SYNC_SNAPSHOT_GROUPS_MAX; i++)
+ 			freeSyncSnapshotGroup(i);
+ 	}
  }
  
  /*
***************
*** 2499,2501 ****
--- 2523,2755 ----
  
  	pfree(buf.data);
  }
+ 
+ static void
+ freeSyncSnapshotGroup(int idx)
+ {
+ 	/* caller needs to take care of proper locking */
+ 	syncSnapshotGroups[idx].id[0] = '\0';
+ 	syncSnapshotGroups[idx].refCount = 0;
+ }
+ 
+ /*
+  * This function is for synchronization of snapshots: It can be called by
+  * new transactions to get the same snapshots. It's signature is
+  *
+  * pg_synchronize_snapshots(text, int, int);
+  *
+  * The first parameter is an identifier so that several groups can request
+  * synchronized snapshots concurrently.
+  *
+  * The second parameter is the number of backends that are expected to connect
+  * in the current group (i.e.  same identifier).
+  *
+  * The third parameter is the timeout in milliseconds.
+  *
+  * Note that once we are holding the ProcArrayLock in shared mode we are
+  * severely hitting the usability of the database server: for example, nobody
+  * can commit nontrivial transactions during that time nor can you establish a
+  * new connection! This is why you need to be superuser to use this function.
+  *
+  * The idea is that from one connection you call for example
+  *
+  * pg_synchronize_snapshot('7bd0320c4ff9252716972e160fb33b8a', 4, 1000)
+  *
+  * and then have 1000ms to call with some other four (already connected)
+  * sessions
+  *
+  * BEGIN TRANSACTION;
+  * SELECT pg_synchronize_snapshot_taken('7bd0320c4ff9252716972e160fb33b8a');
+  *
+  * If all four pg_synchronize_snapshot_taken() calls return true and the
+  * function pg_synchronize_snapshot() returns true as well, you can go on and
+  * all four transactions now see the same snapshot (which in general is not the
+  * snapshot that the transaction saw that has initially called
+  * pg_synchronize_snapshot()).
+  */
+ Datum
+ pg_synchronize_snapshots(PG_FUNCTION_ARGS)
+ {
+ 	text	   *idText = PG_GETARG_TEXT_P(0);
+ 	char	   *id;
+ 	int			nBackends = PG_GETARG_INT32(1);
+ 	int			timeout = PG_GETARG_INT32(2);
+ 	int			i;
+ 	int			myIdx;
+ 	TimestampTz	tstart;
+ 	bool		failed;
+ 
+ 	id = text_to_cstring(idText);
+ 	tstart = GetCurrentTimestamp();
+ 
+ 	if (!superuser())
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ 			(errmsg("must be superuser to execute \"%s\"",
+ 					"pg_synchronize_snapshots"))));
+ 
+ 	if (strlen(id) > NAMEDATALEN - 1)
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_NAME_TOO_LONG),
+ 				 errmsg("identifier too long"),
+ 				 errdetail("Identifier must be less than %d characters.",
+ 						   NAMEDATALEN)));
+ 
+ 	if (id[0] == '\0')
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_INVALID_NAME),
+ 				 errmsg("invalid identifier")));
+ 
+ 	myIdx = -1;
+ 	LWLockAcquire(SyncSnapshotLock, LW_EXCLUSIVE);
+ 	for (i = 0; i < SYNC_SNAPSHOT_GROUPS_MAX; i++)
+ 	{
+ 		if (strcmp(syncSnapshotGroups[i].id, id) == 0)
+ 		{
+ 			/* identifier is already taken, abort */
+ 			if (myIdx >= 0)
+ 				freeSyncSnapshotGroup(myIdx);
+ 			/* As we expect a certain concurrency with these functions, we
+ 			 * explicitly release SyncSnapshotLock immediately instead of
+ 			 * implicitly in the error handler.
+ 			 */
+ 			LWLockRelease(SyncSnapshotLock);
+ 			ereport(ERROR,
+ 					(errcode(ERRCODE_INVALID_NAME),
+ 					 errmsg("identifier already in use")));
+ 		}
+ 		else if (myIdx < 0 && syncSnapshotGroups[i].id[0] == '\0')
+ 		{
+ 			myIdx = i;
+ 			/* we know that id is at most NAMEDATALEN - 1 bytes */
+ 			strcpy(syncSnapshotGroups[i].id, id);
+ 		}
+ 	}
+ 
+ 	if (myIdx < 0)
+ 	{
+ 		/* see above about releasing the lock explicitly */
+ 		LWLockRelease(SyncSnapshotLock);
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ 				 errmsg("no more sync snapshot slots available")));
+ 	}
+ 
+ 	syncSnapshotGroups[myIdx].refCount = nBackends;
+ 
+ 	/*
+ 	 * We need to make sure that we hold the ProcArrayLock before releasing
+ 	 * SyncSnapshotLock: as soon as we release SyncSnapshotLock, the other
+ 	 * backends can enter pg_synchronize_snapshot_taken() so we should be
+ 	 * prepared for it and have ProcArrayLock already. Since we get
+ 	 * ProcArrayLock in SHARED mode only, everybody else can also get it and we
+ 	 * cannot run into a deadlock.
+ 	 */
+ 	LWLockAcquire(ProcArrayLock, LW_SHARED);
+ 	LWLockRelease(SyncSnapshotLock);
+ 
+ 	/*
+ 	 * This is the only place where an LW_SHARED SyncSnapshotLock is sufficient
+ 	 * but for simplicity we just acquire an LW_EXCLUSIVE as the LW_SHARED
+ 	 * doesn't give a lot of benefits (pg_synchronize_snapshot_taken() acquires
+ 	 * only LW_EXCLUSIVE locks).
+ 	 */
+ 	for(;;)
+ 	{
+ 		LWLockAcquire(SyncSnapshotLock, LW_EXCLUSIVE);
+ 
+ 		if (TimestampDifferenceExceeds(tstart, GetCurrentTimestamp(), timeout))
+ 			/* cancel due to expired timeout */
+ 			syncSnapshotGroups[myIdx].refCount = -1;
+ 
+ 		/* CHECK_FOR_INTERRUPTS() does not work here because we are holding
+ 		 * a lock and so we have InterruptHoldoffCount > 0. So we check
+ 		 * for QueryCancelPending or ProcDiePending manually and abort in that
+ 		 * case. */
+ 		if (QueryCancelPending || ProcDiePending)
+ 			syncSnapshotGroups[myIdx].refCount = -1;
+ 
+ 		if (syncSnapshotGroups[myIdx].refCount <= 0)
+ 			break;
+ 
+ 		LWLockRelease(SyncSnapshotLock);
+ 		pg_usleep(50000);
+ 	}
+ 
+ 	/*
+ 	 * We save the status here in order to be able to call
+ 	 * freeSyncSnapshotGroup() before releasing ProcArrayLock. Once it is
+ 	 * released we could be interrupted and the query cancelled. Then we have
+ 	 * dead data in syncSnapshotGroups.
+ 	 */
+ 	failed = syncSnapshotGroups[myIdx].refCount < 0 ? true : false;
+ 	freeSyncSnapshotGroup(myIdx);
+ 	LWLockRelease(SyncSnapshotLock);
+ 
+ 	LWLockRelease(ProcArrayLock);
+ 
+ 	/* Has the query been cancelled externally ? */
+ 	CHECK_FOR_INTERRUPTS();
+ 
+ 	if (failed)
+ 		PG_RETURN_BOOL(false);
+ 	else
+ 		PG_RETURN_BOOL(true);
+ }
+ 
+ Datum
+ pg_synchronize_snapshot_taken(PG_FUNCTION_ARGS)
+ {
+ 	text	   *idText = PG_GETARG_TEXT_P(0);
+ 	char	   *id;
+ 	int			myIdx = -1;
+ 	int			i;
+ 
+ 	id = text_to_cstring(idText);
+ 
+ 	if (id[0] == '\0')
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_INVALID_NAME),
+ 				 errmsg("identifier not found")));
+ 
+ 	/*
+ 	 * One could think that an LW_SHARED is enough here and for the first part
+ 	 * it is. However later on we need an LW_EXCLUSIVE anyway. So let's just
+ 	 * get LW_EXCLUSIVE right in the beginning, also because it is more likely
+ 	 * that we find the identifier.
+ 	 *
+ 	 * Also we expect some level of concurrency and it seems better to release
+ 	 * the SyncSnapshotLock explicitly as soon as possible instead of
+ 	 * implicitly at transaction cleanup.
+ 	 */
+ 	LWLockAcquire(SyncSnapshotLock, LW_EXCLUSIVE);
+ 	for (i = 0; i < SYNC_SNAPSHOT_GROUPS_MAX; i++)
+ 		if (strcmp(syncSnapshotGroups[i].id, id) == 0)
+ 		{
+ 			myIdx = i;
+ 			break;
+ 		}
+ 
+ 	if (myIdx < 0)
+ 	{
+ 		LWLockRelease(SyncSnapshotLock);
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_INVALID_NAME),
+ 				 errmsg("identifier not found")));
+ 	}
+ 
+ 	/*
+ 	 * Should we enforce that IsXactIsoLevelSerializable == true ?
+ 	 */
+ 
+ 	if (syncSnapshotGroups[myIdx].refCount <= 0)
+ 	{
+ 		LWLockRelease(SyncSnapshotLock);
+ 		PG_RETURN_BOOL(false);
+ 	}
+ 
+ 	syncSnapshotGroups[myIdx].refCount--;
+ 	LWLockRelease(SyncSnapshotLock);
+ 	PG_RETURN_BOOL(true);
+ }
+ 
diff -cr cvs.head/src/include/catalog/pg_proc.h cvs.build/src/include/catalog/pg_proc.h
*** cvs.head/src/include/catalog/pg_proc.h	2010-01-08 10:48:32.000000000 +0100
--- cvs.build/src/include/catalog/pg_proc.h	2010-01-08 14:52:24.000000000 +0100
***************
*** 3270,3275 ****
--- 3270,3279 ----
  DESCR("cancel a server process' current query");
  DATA(insert OID = 2096 ( pg_terminate_backend		PGNSP PGUID 12 1 0 0 f f f t f v 1 0 16 "23" _null_ _null_ _null_ _null_ pg_terminate_backend _null_ _null_ _null_ ));
  DESCR("terminate a server process");
+ DATA(insert OID = 3030 ( pg_synchronize_snapshots		PGNSP PGUID 12 1 0 0 f f f t f v 3 0 16 "25 23 23" _null_ _null_ _null_ _null_ pg_synchronize_snapshots _null_ _null_ _null_ ));
+ DESCR("get synchronized snapshots");
+ DATA(insert OID = 3031 ( pg_synchronize_snapshot_taken		PGNSP PGUID 12 1 0 0 f f f t f v 1 0 16 "25" _null_ _null_ _null_ _null_ pg_synchronize_snapshot_taken _null_ _null_ _null_ ));
+ DESCR("report that a synchronized snapshot has been taken");
  DATA(insert OID = 2172 ( pg_start_backup		PGNSP PGUID 12 1 0 0 f f f t f v 2 0 25 "25 16" _null_ _null_ _null_ _null_ pg_start_backup _null_ _null_ _null_ ));
  DESCR("prepare for taking an online backup");
  DATA(insert OID = 2173 ( pg_stop_backup			PGNSP PGUID 12 1 0 0 f f f t f v 0 0 25 "" _null_ _null_ _null_ _null_ pg_stop_backup _null_ _null_ _null_ ));
diff -cr cvs.head/src/include/storage/lwlock.h cvs.build/src/include/storage/lwlock.h
*** cvs.head/src/include/storage/lwlock.h	2010-01-05 12:39:36.000000000 +0100
--- cvs.build/src/include/storage/lwlock.h	2010-01-08 14:52:24.000000000 +0100
***************
*** 67,72 ****
--- 67,73 ----
  	AutovacuumLock,
  	AutovacuumScheduleLock,
  	SyncScanLock,
+ 	SyncSnapshotLock,
  	/* Individual lock IDs end here */
  	FirstBufMappingLock,
  	FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
diff -cr cvs.head/src/include/storage/procarray.h cvs.build/src/include/storage/procarray.h
*** cvs.head/src/include/storage/procarray.h	2010-01-05 12:39:36.000000000 +0100
--- cvs.build/src/include/storage/procarray.h	2010-01-08 14:52:24.000000000 +0100
***************
*** 71,74 ****
--- 71,77 ----
  						  int nxids, const TransactionId *xids,
  						  TransactionId latestXid);
  
+ Datum pg_synchronize_snapshot_taken(PG_FUNCTION_ARGS);
+ Datum pg_synchronize_snapshots(PG_FUNCTION_ARGS);
+ 
  #endif   /* PROCARRAY_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to