The attached patch implements the idea of Heikki / Simon published in
http://archives.postgresql.org/pgsql-hackers/2009-11/msg00271.php
Since nobody objected to the idea in general, I have implemented it.
As this is not currently used anywhere it doesn't give immediate benefit, it
is however a prerequisite for a parallel version of pg_dump that quite some
people (including myself) seem to be interested in.
Here's the comment from the patch explaining it in more detail:
/*
* This function is for synchronization of snapshots: It can be called by
* new transactions to get the same snapshots. It's signature is
*
* pg_synchronize_snapshots(text, int, int);
*
* The first parameter is an identifier so that several groups can request
* synchronized snapshots concurrently.
*
* The second parameter is the number of backends that are expected to connect
* in the current group (i.e. same identifier).
*
* The third parameter is the timeout in milliseconds.
*
* Note that once we are holding the ProcArrayLock in shared mode we are
* severely hitting the usability of the database server: for example, nobody
* can commit nontrivial transactions during that time nor can you establish a
* new connection! This is why you need to be superuser to use this function.
*
* The idea is that from one connection you call for example
*
* pg_synchronize_snapshot('7bd0320c4ff9252716972e160fb33b8a', 4, 1000)
*
* and then have 1000ms to call with some other four (already connected)
* sessions
*
* BEGIN TRANSACTION;
* SELECT pg_synchronize_snapshot_taken('7bd0320c4ff9252716972e160fb33b8a');
*
* If all four pg_synchronize_snapshot_taken() calls return true and the
* function pg_synchronize_snapshot() returns true as well, you can go on and
* all four transactions now see the same snapshot (which in general is not the
* snapshot that the transaction saw that has initially called
* pg_synchronize_snapshot()).
*/
Thoughts?
Joachim
diff -cr cvs.head/src/backend/storage/ipc/procarray.c cvs.build/src/backend/storage/ipc/procarray.c
*** cvs.head/src/backend/storage/ipc/procarray.c 2010-01-05 12:39:28.000000000 +0100
--- cvs.build/src/backend/storage/ipc/procarray.c 2010-01-08 20:50:31.000000000 +0100
***************
*** 81,86 ****
--- 81,96 ----
static ProcArrayStruct *procArray;
+ typedef struct SyncSnapshotGroupStruct
+ {
+ char id[NAMEDATALEN];
+ int refCount;
+ } SyncSnapshotGroupStruct;
+
+ static SyncSnapshotGroupStruct *syncSnapshotGroups;
+ #define SYNC_SNAPSHOT_GROUPS_MAX 5
+ static void freeSyncSnapshotGroup(int idx);
+
/*
* Bookkeeping for tracking emulated transactions in recovery
*/
***************
*** 182,187 ****
--- 192,199 ----
CreateSharedProcArray(void)
{
bool found;
+ bool foundSyncSnapshot;
+ Size syncSnapshotSize;
/* Create or attach to the ProcArray shared structure */
procArray = (ProcArrayStruct *)
***************
*** 189,194 ****
--- 201,212 ----
mul_size(sizeof(PGPROC *), PROCARRAY_MAXPROCS),
&found);
+ syncSnapshotSize = mul_size(sizeof(SyncSnapshotGroupStruct),
+ SYNC_SNAPSHOT_GROUPS_MAX);
+ syncSnapshotGroups = (SyncSnapshotGroupStruct *)
+ ShmemInitStruct("Sync Snapshot Groups",
+ syncSnapshotSize, &foundSyncSnapshot);
+
if (!found)
{
/*
***************
*** 201,206 ****
--- 219,230 ----
if (XLogRequestRecoveryConnections)
KnownAssignedXidsInit(TOTAL_MAX_CACHED_SUBXIDS);
}
+ if (!foundSyncSnapshot)
+ {
+ int i;
+ for (i = 0; i < SYNC_SNAPSHOT_GROUPS_MAX; i++)
+ freeSyncSnapshotGroup(i);
+ }
}
/*
***************
*** 2499,2501 ****
--- 2523,2755 ----
pfree(buf.data);
}
+
+ static void
+ freeSyncSnapshotGroup(int idx)
+ {
+ /* caller needs to take care of proper locking */
+ syncSnapshotGroups[idx].id[0] = '\0';
+ syncSnapshotGroups[idx].refCount = 0;
+ }
+
+ /*
+ * This function is for synchronization of snapshots: It can be called by
+ * new transactions to get the same snapshots. It's signature is
+ *
+ * pg_synchronize_snapshots(text, int, int);
+ *
+ * The first parameter is an identifier so that several groups can request
+ * synchronized snapshots concurrently.
+ *
+ * The second parameter is the number of backends that are expected to connect
+ * in the current group (i.e. same identifier).
+ *
+ * The third parameter is the timeout in milliseconds.
+ *
+ * Note that once we are holding the ProcArrayLock in shared mode we are
+ * severely hitting the usability of the database server: for example, nobody
+ * can commit nontrivial transactions during that time nor can you establish a
+ * new connection! This is why you need to be superuser to use this function.
+ *
+ * The idea is that from one connection you call for example
+ *
+ * pg_synchronize_snapshot('7bd0320c4ff9252716972e160fb33b8a', 4, 1000)
+ *
+ * and then have 1000ms to call with some other four (already connected)
+ * sessions
+ *
+ * BEGIN TRANSACTION;
+ * SELECT pg_synchronize_snapshot_taken('7bd0320c4ff9252716972e160fb33b8a');
+ *
+ * If all four pg_synchronize_snapshot_taken() calls return true and the
+ * function pg_synchronize_snapshot() returns true as well, you can go on and
+ * all four transactions now see the same snapshot (which in general is not the
+ * snapshot that the transaction saw that has initially called
+ * pg_synchronize_snapshot()).
+ */
+ Datum
+ pg_synchronize_snapshots(PG_FUNCTION_ARGS)
+ {
+ text *idText = PG_GETARG_TEXT_P(0);
+ char *id;
+ int nBackends = PG_GETARG_INT32(1);
+ int timeout = PG_GETARG_INT32(2);
+ int i;
+ int myIdx;
+ TimestampTz tstart;
+ bool failed;
+
+ id = text_to_cstring(idText);
+ tstart = GetCurrentTimestamp();
+
+ if (!superuser())
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ (errmsg("must be superuser to execute \"%s\"",
+ "pg_synchronize_snapshots"))));
+
+ if (strlen(id) > NAMEDATALEN - 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_NAME_TOO_LONG),
+ errmsg("identifier too long"),
+ errdetail("Identifier must be less than %d characters.",
+ NAMEDATALEN)));
+
+ if (id[0] == '\0')
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_NAME),
+ errmsg("invalid identifier")));
+
+ myIdx = -1;
+ LWLockAcquire(SyncSnapshotLock, LW_EXCLUSIVE);
+ for (i = 0; i < SYNC_SNAPSHOT_GROUPS_MAX; i++)
+ {
+ if (strcmp(syncSnapshotGroups[i].id, id) == 0)
+ {
+ /* identifier is already taken, abort */
+ if (myIdx >= 0)
+ freeSyncSnapshotGroup(myIdx);
+ /* As we expect a certain concurrency with these functions, we
+ * explicitly release SyncSnapshotLock immediately instead of
+ * implicitly in the error handler.
+ */
+ LWLockRelease(SyncSnapshotLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_NAME),
+ errmsg("identifier already in use")));
+ }
+ else if (myIdx < 0 && syncSnapshotGroups[i].id[0] == '\0')
+ {
+ myIdx = i;
+ /* we know that id is at most NAMEDATALEN - 1 bytes */
+ strcpy(syncSnapshotGroups[i].id, id);
+ }
+ }
+
+ if (myIdx < 0)
+ {
+ /* see above about releasing the lock explicitly */
+ LWLockRelease(SyncSnapshotLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("no more sync snapshot slots available")));
+ }
+
+ syncSnapshotGroups[myIdx].refCount = nBackends;
+
+ /*
+ * We need to make sure that we hold the ProcArrayLock before releasing
+ * SyncSnapshotLock: as soon as we release SyncSnapshotLock, the other
+ * backends can enter pg_synchronize_snapshot_taken() so we should be
+ * prepared for it and have ProcArrayLock already. Since we get
+ * ProcArrayLock in SHARED mode only, everybody else can also get it and we
+ * cannot run into a deadlock.
+ */
+ LWLockAcquire(ProcArrayLock, LW_SHARED);
+ LWLockRelease(SyncSnapshotLock);
+
+ /*
+ * This is the only place where an LW_SHARED SyncSnapshotLock is sufficient
+ * but for simplicity we just acquire an LW_EXCLUSIVE as the LW_SHARED
+ * doesn't give a lot of benefits (pg_synchronize_snapshot_taken() acquires
+ * only LW_EXCLUSIVE locks).
+ */
+ for(;;)
+ {
+ LWLockAcquire(SyncSnapshotLock, LW_EXCLUSIVE);
+
+ if (TimestampDifferenceExceeds(tstart, GetCurrentTimestamp(), timeout))
+ /* cancel due to expired timeout */
+ syncSnapshotGroups[myIdx].refCount = -1;
+
+ /* CHECK_FOR_INTERRUPTS() does not work here because we are holding
+ * a lock and so we have InterruptHoldoffCount > 0. So we check
+ * for QueryCancelPending or ProcDiePending manually and abort in that
+ * case. */
+ if (QueryCancelPending || ProcDiePending)
+ syncSnapshotGroups[myIdx].refCount = -1;
+
+ if (syncSnapshotGroups[myIdx].refCount <= 0)
+ break;
+
+ LWLockRelease(SyncSnapshotLock);
+ pg_usleep(50000);
+ }
+
+ /*
+ * We save the status here in order to be able to call
+ * freeSyncSnapshotGroup() before releasing ProcArrayLock. Once it is
+ * released we could be interrupted and the query cancelled. Then we have
+ * dead data in syncSnapshotGroups.
+ */
+ failed = syncSnapshotGroups[myIdx].refCount < 0 ? true : false;
+ freeSyncSnapshotGroup(myIdx);
+ LWLockRelease(SyncSnapshotLock);
+
+ LWLockRelease(ProcArrayLock);
+
+ /* Has the query been cancelled externally ? */
+ CHECK_FOR_INTERRUPTS();
+
+ if (failed)
+ PG_RETURN_BOOL(false);
+ else
+ PG_RETURN_BOOL(true);
+ }
+
+ Datum
+ pg_synchronize_snapshot_taken(PG_FUNCTION_ARGS)
+ {
+ text *idText = PG_GETARG_TEXT_P(0);
+ char *id;
+ int myIdx = -1;
+ int i;
+
+ id = text_to_cstring(idText);
+
+ if (id[0] == '\0')
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_NAME),
+ errmsg("identifier not found")));
+
+ /*
+ * One could think that an LW_SHARED is enough here and for the first part
+ * it is. However later on we need an LW_EXCLUSIVE anyway. So let's just
+ * get LW_EXCLUSIVE right in the beginning, also because it is more likely
+ * that we find the identifier.
+ *
+ * Also we expect some level of concurrency and it seems better to release
+ * the SyncSnapshotLock explicitly as soon as possible instead of
+ * implicitly at transaction cleanup.
+ */
+ LWLockAcquire(SyncSnapshotLock, LW_EXCLUSIVE);
+ for (i = 0; i < SYNC_SNAPSHOT_GROUPS_MAX; i++)
+ if (strcmp(syncSnapshotGroups[i].id, id) == 0)
+ {
+ myIdx = i;
+ break;
+ }
+
+ if (myIdx < 0)
+ {
+ LWLockRelease(SyncSnapshotLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_NAME),
+ errmsg("identifier not found")));
+ }
+
+ /*
+ * Should we enforce that IsXactIsoLevelSerializable == true ?
+ */
+
+ if (syncSnapshotGroups[myIdx].refCount <= 0)
+ {
+ LWLockRelease(SyncSnapshotLock);
+ PG_RETURN_BOOL(false);
+ }
+
+ syncSnapshotGroups[myIdx].refCount--;
+ LWLockRelease(SyncSnapshotLock);
+ PG_RETURN_BOOL(true);
+ }
+
diff -cr cvs.head/src/include/catalog/pg_proc.h cvs.build/src/include/catalog/pg_proc.h
*** cvs.head/src/include/catalog/pg_proc.h 2010-01-08 10:48:32.000000000 +0100
--- cvs.build/src/include/catalog/pg_proc.h 2010-01-08 14:52:24.000000000 +0100
***************
*** 3270,3275 ****
--- 3270,3279 ----
DESCR("cancel a server process' current query");
DATA(insert OID = 2096 ( pg_terminate_backend PGNSP PGUID 12 1 0 0 f f f t f v 1 0 16 "23" _null_ _null_ _null_ _null_ pg_terminate_backend _null_ _null_ _null_ ));
DESCR("terminate a server process");
+ DATA(insert OID = 3030 ( pg_synchronize_snapshots PGNSP PGUID 12 1 0 0 f f f t f v 3 0 16 "25 23 23" _null_ _null_ _null_ _null_ pg_synchronize_snapshots _null_ _null_ _null_ ));
+ DESCR("get synchronized snapshots");
+ DATA(insert OID = 3031 ( pg_synchronize_snapshot_taken PGNSP PGUID 12 1 0 0 f f f t f v 1 0 16 "25" _null_ _null_ _null_ _null_ pg_synchronize_snapshot_taken _null_ _null_ _null_ ));
+ DESCR("report that a synchronized snapshot has been taken");
DATA(insert OID = 2172 ( pg_start_backup PGNSP PGUID 12 1 0 0 f f f t f v 2 0 25 "25 16" _null_ _null_ _null_ _null_ pg_start_backup _null_ _null_ _null_ ));
DESCR("prepare for taking an online backup");
DATA(insert OID = 2173 ( pg_stop_backup PGNSP PGUID 12 1 0 0 f f f t f v 0 0 25 "" _null_ _null_ _null_ _null_ pg_stop_backup _null_ _null_ _null_ ));
diff -cr cvs.head/src/include/storage/lwlock.h cvs.build/src/include/storage/lwlock.h
*** cvs.head/src/include/storage/lwlock.h 2010-01-05 12:39:36.000000000 +0100
--- cvs.build/src/include/storage/lwlock.h 2010-01-08 14:52:24.000000000 +0100
***************
*** 67,72 ****
--- 67,73 ----
AutovacuumLock,
AutovacuumScheduleLock,
SyncScanLock,
+ SyncSnapshotLock,
/* Individual lock IDs end here */
FirstBufMappingLock,
FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
diff -cr cvs.head/src/include/storage/procarray.h cvs.build/src/include/storage/procarray.h
*** cvs.head/src/include/storage/procarray.h 2010-01-05 12:39:36.000000000 +0100
--- cvs.build/src/include/storage/procarray.h 2010-01-08 14:52:24.000000000 +0100
***************
*** 71,74 ****
--- 71,77 ----
int nxids, const TransactionId *xids,
TransactionId latestXid);
+ Datum pg_synchronize_snapshot_taken(PG_FUNCTION_ARGS);
+ Datum pg_synchronize_snapshots(PG_FUNCTION_ARGS);
+
#endif /* PROCARRAY_H */
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers