The attached patch implements the idea of Heikki / Simon published in http://archives.postgresql.org/pgsql-hackers/2009-11/msg00271.php
Since nobody objected to the idea in general, I have implemented it. As this is not currently used anywhere it doesn't give immediate benefit, it is however a prerequisite for a parallel version of pg_dump that quite some people (including myself) seem to be interested in. Here's the comment from the patch explaining it in more detail: /* * This function is for synchronization of snapshots: It can be called by * new transactions to get the same snapshots. It's signature is * * pg_synchronize_snapshots(text, int, int); * * The first parameter is an identifier so that several groups can request * synchronized snapshots concurrently. * * The second parameter is the number of backends that are expected to connect * in the current group (i.e. same identifier). * * The third parameter is the timeout in milliseconds. * * Note that once we are holding the ProcArrayLock in shared mode we are * severely hitting the usability of the database server: for example, nobody * can commit nontrivial transactions during that time nor can you establish a * new connection! This is why you need to be superuser to use this function. * * The idea is that from one connection you call for example * * pg_synchronize_snapshot('7bd0320c4ff9252716972e160fb33b8a', 4, 1000) * * and then have 1000ms to call with some other four (already connected) * sessions * * BEGIN TRANSACTION; * SELECT pg_synchronize_snapshot_taken('7bd0320c4ff9252716972e160fb33b8a'); * * If all four pg_synchronize_snapshot_taken() calls return true and the * function pg_synchronize_snapshot() returns true as well, you can go on and * all four transactions now see the same snapshot (which in general is not the * snapshot that the transaction saw that has initially called * pg_synchronize_snapshot()). */ Thoughts? Joachim
diff -cr cvs.head/src/backend/storage/ipc/procarray.c cvs.build/src/backend/storage/ipc/procarray.c *** cvs.head/src/backend/storage/ipc/procarray.c 2010-01-05 12:39:28.000000000 +0100 --- cvs.build/src/backend/storage/ipc/procarray.c 2010-01-08 20:50:31.000000000 +0100 *************** *** 81,86 **** --- 81,96 ---- static ProcArrayStruct *procArray; + typedef struct SyncSnapshotGroupStruct + { + char id[NAMEDATALEN]; + int refCount; + } SyncSnapshotGroupStruct; + + static SyncSnapshotGroupStruct *syncSnapshotGroups; + #define SYNC_SNAPSHOT_GROUPS_MAX 5 + static void freeSyncSnapshotGroup(int idx); + /* * Bookkeeping for tracking emulated transactions in recovery */ *************** *** 182,187 **** --- 192,199 ---- CreateSharedProcArray(void) { bool found; + bool foundSyncSnapshot; + Size syncSnapshotSize; /* Create or attach to the ProcArray shared structure */ procArray = (ProcArrayStruct *) *************** *** 189,194 **** --- 201,212 ---- mul_size(sizeof(PGPROC *), PROCARRAY_MAXPROCS), &found); + syncSnapshotSize = mul_size(sizeof(SyncSnapshotGroupStruct), + SYNC_SNAPSHOT_GROUPS_MAX); + syncSnapshotGroups = (SyncSnapshotGroupStruct *) + ShmemInitStruct("Sync Snapshot Groups", + syncSnapshotSize, &foundSyncSnapshot); + if (!found) { /* *************** *** 201,206 **** --- 219,230 ---- if (XLogRequestRecoveryConnections) KnownAssignedXidsInit(TOTAL_MAX_CACHED_SUBXIDS); } + if (!foundSyncSnapshot) + { + int i; + for (i = 0; i < SYNC_SNAPSHOT_GROUPS_MAX; i++) + freeSyncSnapshotGroup(i); + } } /* *************** *** 2499,2501 **** --- 2523,2755 ---- pfree(buf.data); } + + static void + freeSyncSnapshotGroup(int idx) + { + /* caller needs to take care of proper locking */ + syncSnapshotGroups[idx].id[0] = '\0'; + syncSnapshotGroups[idx].refCount = 0; + } + + /* + * This function is for synchronization of snapshots: It can be called by + * new transactions to get the same snapshots. It's signature is + * + * pg_synchronize_snapshots(text, int, int); + * + * The first parameter is an identifier so that several groups can request + * synchronized snapshots concurrently. + * + * The second parameter is the number of backends that are expected to connect + * in the current group (i.e. same identifier). + * + * The third parameter is the timeout in milliseconds. + * + * Note that once we are holding the ProcArrayLock in shared mode we are + * severely hitting the usability of the database server: for example, nobody + * can commit nontrivial transactions during that time nor can you establish a + * new connection! This is why you need to be superuser to use this function. + * + * The idea is that from one connection you call for example + * + * pg_synchronize_snapshot('7bd0320c4ff9252716972e160fb33b8a', 4, 1000) + * + * and then have 1000ms to call with some other four (already connected) + * sessions + * + * BEGIN TRANSACTION; + * SELECT pg_synchronize_snapshot_taken('7bd0320c4ff9252716972e160fb33b8a'); + * + * If all four pg_synchronize_snapshot_taken() calls return true and the + * function pg_synchronize_snapshot() returns true as well, you can go on and + * all four transactions now see the same snapshot (which in general is not the + * snapshot that the transaction saw that has initially called + * pg_synchronize_snapshot()). + */ + Datum + pg_synchronize_snapshots(PG_FUNCTION_ARGS) + { + text *idText = PG_GETARG_TEXT_P(0); + char *id; + int nBackends = PG_GETARG_INT32(1); + int timeout = PG_GETARG_INT32(2); + int i; + int myIdx; + TimestampTz tstart; + bool failed; + + id = text_to_cstring(idText); + tstart = GetCurrentTimestamp(); + + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + (errmsg("must be superuser to execute \"%s\"", + "pg_synchronize_snapshots")))); + + if (strlen(id) > NAMEDATALEN - 1) + ereport(ERROR, + (errcode(ERRCODE_NAME_TOO_LONG), + errmsg("identifier too long"), + errdetail("Identifier must be less than %d characters.", + NAMEDATALEN))); + + if (id[0] == '\0') + ereport(ERROR, + (errcode(ERRCODE_INVALID_NAME), + errmsg("invalid identifier"))); + + myIdx = -1; + LWLockAcquire(SyncSnapshotLock, LW_EXCLUSIVE); + for (i = 0; i < SYNC_SNAPSHOT_GROUPS_MAX; i++) + { + if (strcmp(syncSnapshotGroups[i].id, id) == 0) + { + /* identifier is already taken, abort */ + if (myIdx >= 0) + freeSyncSnapshotGroup(myIdx); + /* As we expect a certain concurrency with these functions, we + * explicitly release SyncSnapshotLock immediately instead of + * implicitly in the error handler. + */ + LWLockRelease(SyncSnapshotLock); + ereport(ERROR, + (errcode(ERRCODE_INVALID_NAME), + errmsg("identifier already in use"))); + } + else if (myIdx < 0 && syncSnapshotGroups[i].id[0] == '\0') + { + myIdx = i; + /* we know that id is at most NAMEDATALEN - 1 bytes */ + strcpy(syncSnapshotGroups[i].id, id); + } + } + + if (myIdx < 0) + { + /* see above about releasing the lock explicitly */ + LWLockRelease(SyncSnapshotLock); + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("no more sync snapshot slots available"))); + } + + syncSnapshotGroups[myIdx].refCount = nBackends; + + /* + * We need to make sure that we hold the ProcArrayLock before releasing + * SyncSnapshotLock: as soon as we release SyncSnapshotLock, the other + * backends can enter pg_synchronize_snapshot_taken() so we should be + * prepared for it and have ProcArrayLock already. Since we get + * ProcArrayLock in SHARED mode only, everybody else can also get it and we + * cannot run into a deadlock. + */ + LWLockAcquire(ProcArrayLock, LW_SHARED); + LWLockRelease(SyncSnapshotLock); + + /* + * This is the only place where an LW_SHARED SyncSnapshotLock is sufficient + * but for simplicity we just acquire an LW_EXCLUSIVE as the LW_SHARED + * doesn't give a lot of benefits (pg_synchronize_snapshot_taken() acquires + * only LW_EXCLUSIVE locks). + */ + for(;;) + { + LWLockAcquire(SyncSnapshotLock, LW_EXCLUSIVE); + + if (TimestampDifferenceExceeds(tstart, GetCurrentTimestamp(), timeout)) + /* cancel due to expired timeout */ + syncSnapshotGroups[myIdx].refCount = -1; + + /* CHECK_FOR_INTERRUPTS() does not work here because we are holding + * a lock and so we have InterruptHoldoffCount > 0. So we check + * for QueryCancelPending or ProcDiePending manually and abort in that + * case. */ + if (QueryCancelPending || ProcDiePending) + syncSnapshotGroups[myIdx].refCount = -1; + + if (syncSnapshotGroups[myIdx].refCount <= 0) + break; + + LWLockRelease(SyncSnapshotLock); + pg_usleep(50000); + } + + /* + * We save the status here in order to be able to call + * freeSyncSnapshotGroup() before releasing ProcArrayLock. Once it is + * released we could be interrupted and the query cancelled. Then we have + * dead data in syncSnapshotGroups. + */ + failed = syncSnapshotGroups[myIdx].refCount < 0 ? true : false; + freeSyncSnapshotGroup(myIdx); + LWLockRelease(SyncSnapshotLock); + + LWLockRelease(ProcArrayLock); + + /* Has the query been cancelled externally ? */ + CHECK_FOR_INTERRUPTS(); + + if (failed) + PG_RETURN_BOOL(false); + else + PG_RETURN_BOOL(true); + } + + Datum + pg_synchronize_snapshot_taken(PG_FUNCTION_ARGS) + { + text *idText = PG_GETARG_TEXT_P(0); + char *id; + int myIdx = -1; + int i; + + id = text_to_cstring(idText); + + if (id[0] == '\0') + ereport(ERROR, + (errcode(ERRCODE_INVALID_NAME), + errmsg("identifier not found"))); + + /* + * One could think that an LW_SHARED is enough here and for the first part + * it is. However later on we need an LW_EXCLUSIVE anyway. So let's just + * get LW_EXCLUSIVE right in the beginning, also because it is more likely + * that we find the identifier. + * + * Also we expect some level of concurrency and it seems better to release + * the SyncSnapshotLock explicitly as soon as possible instead of + * implicitly at transaction cleanup. + */ + LWLockAcquire(SyncSnapshotLock, LW_EXCLUSIVE); + for (i = 0; i < SYNC_SNAPSHOT_GROUPS_MAX; i++) + if (strcmp(syncSnapshotGroups[i].id, id) == 0) + { + myIdx = i; + break; + } + + if (myIdx < 0) + { + LWLockRelease(SyncSnapshotLock); + ereport(ERROR, + (errcode(ERRCODE_INVALID_NAME), + errmsg("identifier not found"))); + } + + /* + * Should we enforce that IsXactIsoLevelSerializable == true ? + */ + + if (syncSnapshotGroups[myIdx].refCount <= 0) + { + LWLockRelease(SyncSnapshotLock); + PG_RETURN_BOOL(false); + } + + syncSnapshotGroups[myIdx].refCount--; + LWLockRelease(SyncSnapshotLock); + PG_RETURN_BOOL(true); + } + diff -cr cvs.head/src/include/catalog/pg_proc.h cvs.build/src/include/catalog/pg_proc.h *** cvs.head/src/include/catalog/pg_proc.h 2010-01-08 10:48:32.000000000 +0100 --- cvs.build/src/include/catalog/pg_proc.h 2010-01-08 14:52:24.000000000 +0100 *************** *** 3270,3275 **** --- 3270,3279 ---- DESCR("cancel a server process' current query"); DATA(insert OID = 2096 ( pg_terminate_backend PGNSP PGUID 12 1 0 0 f f f t f v 1 0 16 "23" _null_ _null_ _null_ _null_ pg_terminate_backend _null_ _null_ _null_ )); DESCR("terminate a server process"); + DATA(insert OID = 3030 ( pg_synchronize_snapshots PGNSP PGUID 12 1 0 0 f f f t f v 3 0 16 "25 23 23" _null_ _null_ _null_ _null_ pg_synchronize_snapshots _null_ _null_ _null_ )); + DESCR("get synchronized snapshots"); + DATA(insert OID = 3031 ( pg_synchronize_snapshot_taken PGNSP PGUID 12 1 0 0 f f f t f v 1 0 16 "25" _null_ _null_ _null_ _null_ pg_synchronize_snapshot_taken _null_ _null_ _null_ )); + DESCR("report that a synchronized snapshot has been taken"); DATA(insert OID = 2172 ( pg_start_backup PGNSP PGUID 12 1 0 0 f f f t f v 2 0 25 "25 16" _null_ _null_ _null_ _null_ pg_start_backup _null_ _null_ _null_ )); DESCR("prepare for taking an online backup"); DATA(insert OID = 2173 ( pg_stop_backup PGNSP PGUID 12 1 0 0 f f f t f v 0 0 25 "" _null_ _null_ _null_ _null_ pg_stop_backup _null_ _null_ _null_ )); diff -cr cvs.head/src/include/storage/lwlock.h cvs.build/src/include/storage/lwlock.h *** cvs.head/src/include/storage/lwlock.h 2010-01-05 12:39:36.000000000 +0100 --- cvs.build/src/include/storage/lwlock.h 2010-01-08 14:52:24.000000000 +0100 *************** *** 67,72 **** --- 67,73 ---- AutovacuumLock, AutovacuumScheduleLock, SyncScanLock, + SyncSnapshotLock, /* Individual lock IDs end here */ FirstBufMappingLock, FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS, diff -cr cvs.head/src/include/storage/procarray.h cvs.build/src/include/storage/procarray.h *** cvs.head/src/include/storage/procarray.h 2010-01-05 12:39:36.000000000 +0100 --- cvs.build/src/include/storage/procarray.h 2010-01-08 14:52:24.000000000 +0100 *************** *** 71,74 **** --- 71,77 ---- int nxids, const TransactionId *xids, TransactionId latestXid); + Datum pg_synchronize_snapshot_taken(PG_FUNCTION_ARGS); + Datum pg_synchronize_snapshots(PG_FUNCTION_ARGS); + #endif /* PROCARRAY_H */
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers