On Sun, Dec 5, 2010 at 9:27 PM, Robert Haas <[email protected]> wrote:
> On Sun, Dec 5, 2010 at 9:04 PM, Andrew Dunstan <[email protected]> wrote:
>> Why not just say give me the snapshot currently held by process nnnn?
>>
>> And please, not temp files if possible.
>
> As far as I'm aware, the full snapshot doesn't normally exist in
> shared memory, hence the need for publication of some sort. We could
> dedicate a shared memory region for publication but then you have to
> decide how many slots to allocate, and any number you pick will be too
> many for some people and not enough for others, not to mention that
> shared memory is a fairly precious resource.
So here is a patch that I have been playing with in the past, I have
done it a while back and thanks go to Koichi Suzuki for his helpful
comments. I have not published it earlier because I haven't worked on
it recently and from the discussion that I brought up in march I got
the feeling that people are fine with having a first version of
parallel dump without synchronized snapshots.
I am not really sure that what the patch does is sufficient nor if it
does it in the right way but I hope that it can serve as a basis to
collect ideas (and doubt).
My idea is pretty much similar to Robert's about publishing snapshots
and subscribing to them, the patch even uses these words.
Basically the idea is that a transaction in isolation level
serializable can publish a snapshot and as long as this transaction is
alive, its snapshot can be adopted by other transactions. Requiring
the publishing transaction to be serializable guarantees that the copy
of the snapshot in shared memory is always current. When the
transaction ends, the copy of the snapshot is also invalidated and
cannot be adopted anymore. So instead of doing explicit checks, the
patch aims at always having a reference transaction around that
guarantees validity of the snapshot information in shared memory.
The patch currently creates a new area in shared memory to store
snapshot information but we can certainly discuss this... I had a GUC
in mind that can control the number of available "slots", similar to
max_prepared_transactions. Snapshot information can become quite
large, especially with a high number of max_connections.
Known limitations: the patch is lacking awareness of prepared
transactions completely and doesn't check if both backends belong to
the same user.
Joachim
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 95beba8..c24150f 100644
*** a/src/backend/storage/ipc/ipci.c
--- b/src/backend/storage/ipc/ipci.c
*************** CreateSharedMemoryAndSemaphores(bool mak
*** 124,129 ****
--- 124,130 ----
size = add_size(size, BTreeShmemSize());
size = add_size(size, SyncScanShmemSize());
size = add_size(size, AsyncShmemSize());
+ size = add_size(size, SyncSnapshotShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
*************** CreateSharedMemoryAndSemaphores(bool mak
*** 228,233 ****
--- 229,235 ----
BTreeShmemInit();
SyncScanShmemInit();
AsyncShmemInit();
+ SyncSnapshotInit();
#ifdef EXEC_BACKEND
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 6e7a6db..00522fb 100644
*** a/src/backend/storage/ipc/procarray.c
--- b/src/backend/storage/ipc/procarray.c
*************** typedef struct ProcArrayStruct
*** 91,96 ****
--- 91,111 ----
static ProcArrayStruct *procArray;
+
+ /* this should be a GUC later... */
+ #define MAX_SYNC_SNAPSHOT_SETS 4
+ typedef struct
+ {
+ SnapshotData ssd;
+ char name[NAMEDATALEN];
+ BackendId backendId;
+ Oid databaseId;
+ } NamedSnapshotData;
+
+ typedef NamedSnapshotData* NamedSnapshot;
+
+ static NamedSnapshot syncSnapshots;
+
/*
* Bookkeeping for tracking emulated transactions in recovery
*/
*************** static int KnownAssignedXidsGetAndSetXmi
*** 159,164 ****
--- 174,182 ----
static TransactionId KnownAssignedXidsGetOldestXmin(void);
static void KnownAssignedXidsDisplay(int trace_level);
+ static bool DeleteSyncSnapshot(const char *name);
+ static bool snapshotPublished = false; /* true if we have published at least one snapshot */
+
/*
* Report shared-memory space needed by CreateSharedProcArray.
*/
*************** ProcArrayRemove(PGPROC *proc, Transactio
*** 350,355 ****
--- 368,379 ----
void
ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
{
+ if (snapshotPublished)
+ {
+ DeleteSyncSnapshot(NULL);
+ snapshotPublished = false;
+ }
+
if (TransactionIdIsValid(latestXid))
{
/*
*************** KnownAssignedXidsDisplay(int trace_level
*** 3104,3106 ****
--- 3132,3374 ----
pfree(buf.data);
}
+
+
+ /*
+ * Report space needed for our shared memory area.
+ *
+ * Memory is structured as follows:
+ *
+ * NamedSnapshotData[0]
+ * NamedSnapshotData[1]
+ * NamedSnapshotData[2]
+ * Xids for NamedSnapshotData[0]
+ * Sub-Xids for NamedSnapshotData[0]
+ * Xids for NamedSnapshotData[1]
+ * Sub-Xids for NamedSnapshotData[1]
+ * Xids for NamedSnapshotData[2]
+ * Sub-Xids for NamedSnapshotData[2]
+ */
+ Size
+ SyncSnapshotShmemSize(void)
+ {
+ Size size;
+
+ size = sizeof(NamedSnapshotData);
+ size = add_size(size, PROCARRAY_MAXPROCS * sizeof(TransactionId));
+ size = add_size(size, TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
+ size = mul_size(size, MAX_SYNC_SNAPSHOT_SETS);
+
+ return size;
+ }
+
+ void
+ SyncSnapshotInit(void)
+ {
+ Size size;
+ bool found;
+
+ size = SyncSnapshotShmemSize();
+
+ syncSnapshots = (NamedSnapshot) ShmemInitStruct("SyncSnapshotSets",
+ size, &found);
+ if (!found)
+ {
+ int i;
+ /* XXX is this always properly aligned? */
+ void *ptr = (void *) &syncSnapshots[MAX_SYNC_SNAPSHOT_SETS];
+ /* ptr now points past the last syncSnapshot entry */
+ for (i = 0; i < MAX_SYNC_SNAPSHOT_SETS; i++)
+ {
+ NamedSnapshot ns = &syncSnapshots[i];
+
+ /* ptr is aligned in the beginning and as we add only multiples of
+ * sizeof(TransactionId), it is still aligned for every loop */
+ ns->ssd.xip = (TransactionId *) ptr;
+
+ ptr = (TransactionId *) ptr + PROCARRAY_MAXPROCS;
+
+ /* ptr now points past what we reserve for xip */
+ ns->ssd.subxip = ptr;
+ ptr = (TransactionId *) ptr + TOTAL_MAX_CACHED_SUBXIDS;
+
+ ns->name[0] = '\0';
+ ns->backendId = InvalidBackendId;
+ ns->databaseId = InvalidOid;
+ }
+ }
+ }
+
+ static bool
+ DeleteSyncSnapshot(const char* name)
+ {
+ TransactionId *xip,
+ *subxip;
+ NamedSnapshot ns;
+ int i;
+ bool found = false;
+
+ LWLockAcquire(SyncSnapshotLock, LW_EXCLUSIVE);
+ for (i = 0; i < MAX_SYNC_SNAPSHOT_SETS; i++)
+ {
+ ns = &syncSnapshots[i];
+
+ /* don't look at other backends' snapshots */
+ if (ns->backendId != MyBackendId)
+ continue;
+
+ Assert(ns->databaseId == MyDatabaseId);
+
+ /* name == NULL means that we want to delete all of our snapshots */
+ if (!name || strcmp(name, syncSnapshots[i].name) == 0)
+ {
+ found = true;
+
+ /* save pointers */
+ xip = ns->ssd.xip;
+ subxip = ns->ssd.subxip;
+
+ memset(ns, 0, sizeof(NamedSnapshotData));
+
+ /* Actually it would be sufficient to set the backendId to
+ * InvalidBackendId to invalide of this snapshot */
+ ns->backendId = InvalidBackendId;
+
+ /* restore pointers */
+ ns->ssd.xip = xip;
+ ns->ssd.subxip = subxip;
+
+ memset(xip, 0, sizeof(TransactionId *) * PROCARRAY_MAXPROCS);
+ memset(subxip, 0, sizeof(TransactionId *) * TOTAL_MAX_CACHED_SUBXIDS);
+ }
+ }
+ LWLockRelease(SyncSnapshotLock);
+
+ return found;
+ }
+
+ bool
+ UnpublishSnapshot(const char *name)
+ {
+ return DeleteSyncSnapshot(name);
+ }
+
+ bool
+ PublishSnapshot(Snapshot snapshot, const char *name)
+ {
+ int i;
+ bool found = false;
+ NamedSnapshot ns;
+ TransactionId *xip, *subxip;
+
+ if (!IsolationUsesXactSnapshot())
+ elog(ERROR, "Transaction must use TRANSACTION ISOLATION LEVEL "
+ "SERIALIZABLE to publish snapshots");
+
+ if (!XactReadOnly)
+ elog(WARNING, "Transaction is not read only");
+
+ LWLockAcquire(SyncSnapshotLock, LW_EXCLUSIVE);
+
+ /* First check for an existing publication with the same name in the same
+ * database. */
+ for (i = 0; i < MAX_SYNC_SNAPSHOT_SETS; i++)
+ {
+ if (syncSnapshots[i].databaseId == MyDatabaseId &&
+ strcmp(syncSnapshots[i].name, name) == 0 &&
+ syncSnapshots[i].backendId != InvalidBackendId)
+
+ elog(ERROR, "A snapshot with this name has already been published");
+ }
+
+ /* find some free space in shared memory to copy the snapshot to.
+ * Make sure the name is unique. */
+ for (i = 0; i < MAX_SYNC_SNAPSHOT_SETS; i++)
+ {
+ if (syncSnapshots[i].backendId == InvalidBackendId)
+ {
+ found = true;
+ /* only valid for now with redundant cleanup upon init and deletion */
+ Assert(syncSnapshots[i].name[0] == '\0');
+ ns = &syncSnapshots[i];
+ break;
+ }
+ }
+
+ if (found)
+ {
+ /* save pointers */
+ xip = ns->ssd.xip;
+ subxip = ns->ssd.subxip;
+
+ memcpy(&ns->ssd, snapshot, sizeof(SnapshotData));
+
+ /* restore pointers */
+ ns->ssd.xip = xip;
+ ns->ssd.subxip = subxip;
+
+ memcpy(&ns->ssd.xip, snapshot->xip,
+ sizeof(TransactionId) * snapshot->xcnt);
+ memcpy(&ns->ssd.subxip, snapshot->subxip,
+ sizeof(TransactionId) * snapshot->subxcnt);
+
+ /* set the name and backend id */
+ strcpy(ns->name, name);
+ ns->backendId = MyBackendId;
+ ns->databaseId = MyDatabaseId;
+
+ snapshotPublished = true;
+ }
+
+ LWLockRelease(SyncSnapshotLock);
+
+ return found;
+ }
+
+ bool
+ SubscribeSnapshot(const char *name, Snapshot snapshot)
+ {
+ NamedSnapshot ns;
+ bool found = false;
+ int i;
+
+ LWLockAcquire(SyncSnapshotLock, LW_SHARED);
+
+ for (i = 0; i < MAX_SYNC_SNAPSHOT_SETS; i++)
+ {
+ if (strcmp(syncSnapshots[i].name, name) == 0 &&
+ MyDatabaseId == syncSnapshots[i].databaseId &&
+ syncSnapshots[i].backendId != InvalidBackendId)
+ {
+ found = true;
+ ns = &syncSnapshots[i];
+ break;
+ }
+ }
+
+ if (found)
+ {
+ /* Do we somehow need to unregister the old snapshot and register the
+ * new one?
+ * Do we need to set any of the other fields?
+ * active_count / regd_count / curcid ?
+ */
+ snapshot->xmin = ns->ssd.xmin;
+ snapshot->xmax = ns->ssd.xmax;
+
+ if ((snapshot->xcnt = ns->ssd.xcnt))
+ memcpy(snapshot->xip, &ns->ssd.xip,
+ sizeof(TransactionId) * ns->ssd.xcnt);
+ if ((snapshot->subxcnt = ns->ssd.subxcnt))
+ memcpy(snapshot->subxip, &ns->ssd.subxip,
+ sizeof(TransactionId) * ns->ssd.subxcnt);
+
+ if (!IsolationUsesXactSnapshot())
+ elog(WARNING, "Transaction is not ISOLATION LEVEL SERIALIZABLE");
+ }
+
+ LWLockRelease(SyncSnapshotLock);
+
+ return found;
+ }
+
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 273d8bd..cf83acb 100644
*** a/src/backend/utils/time/snapmgr.c
--- b/src/backend/utils/time/snapmgr.c
***************
*** 29,34 ****
--- 29,35 ----
#include "access/xact.h"
#include "storage/proc.h"
#include "storage/procarray.h"
+ #include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/memutils.h"
#include "utils/resowner.h"
*************** AtEOXact_Snapshot(bool isCommit)
*** 559,561 ****
--- 560,628 ----
FirstSnapshotSet = false;
registered_xact_snapshot = false;
}
+
+
+
+ Datum
+ pg_publish_snapshot(PG_FUNCTION_ARGS)
+ {
+ text *nameText = PG_GETARG_TEXT_P(0);
+ char *name;
+
+ name = text_to_cstring(nameText);
+
+ if (strlen(name) > NAMEDATALEN - 1)
+ ereport(ERROR, (errcode(ERRCODE_NAME_TOO_LONG),
+ errmsg("identifier too long"),
+ errdetail("Identifier must be less than %d characters.",
+ NAMEDATALEN)));
+
+ if (name[0] == '\0')
+ ereport(ERROR, (errcode(ERRCODE_INVALID_NAME),
+ errmsg("invalid identifier")));
+
+ if (PublishSnapshot(GetTransactionSnapshot(), name))
+ PG_RETURN_BOOL(true);
+ else
+ PG_RETURN_BOOL(false);
+ }
+
+ Datum
+ pg_unpublish_snapshot(PG_FUNCTION_ARGS)
+ {
+ text *nameText = PG_GETARG_TEXT_P(0);
+ char *name;
+
+ name = text_to_cstring(nameText);
+
+ if (strlen(name) > NAMEDATALEN - 1)
+ ereport(ERROR, (errcode(ERRCODE_NAME_TOO_LONG),
+ errmsg("identifier too long"),
+ errdetail("Identifier must be less than %d characters.",
+ NAMEDATALEN)));
+
+ if (name[0] == '\0')
+ ereport(ERROR, (errcode(ERRCODE_INVALID_NAME),
+ errmsg("invalid identifier")));
+
+ if (UnpublishSnapshot(name))
+ PG_RETURN_BOOL(true);
+ else
+ PG_RETURN_BOOL(false);
+ }
+
+ Datum
+ pg_subscribe_snapshot(PG_FUNCTION_ARGS)
+ {
+ text *nameText = PG_GETARG_TEXT_P(0);
+ char *name;
+
+ name = text_to_cstring(nameText);
+
+ if (name[0] == '\0')
+ ereport(ERROR, (errcode(ERRCODE_INVALID_NAME),
+ errmsg("invalid identifier")));
+
+ PG_RETURN_BOOL(SubscribeSnapshot(name, GetTransactionSnapshot()));
+ }
+
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 25a3912..bb040d4 100644
*** a/src/include/catalog/pg_proc.h
--- b/src/include/catalog/pg_proc.h
*************** DATA(insert OID = 2171 ( pg_cancel_backe
*** 3369,3374 ****
--- 3369,3380 ----
DESCR("cancel a server process' current query");
DATA(insert OID = 2096 ( pg_terminate_backend PGNSP PGUID 12 1 0 0 f f f t f v 1 0 16 "23" _null_ _null_ _null_ _null_ pg_terminate_backend _null_ _null_ _null_ ));
DESCR("terminate a server process");
+ DATA(insert OID = 3115 ( pg_publish_snapshot PGNSP PGUID 12 1 0 0 f f f t f v 1 0 16 "25" _null_ _null_ _null_ _null_ pg_publish_snapshot _null_ _null_ _null_ ));
+ DESCR("publish a snapshot");
+ DATA(insert OID = 3116 ( pg_unpublish_snapshot PGNSP PGUID 12 1 0 0 f f f t f v 1 0 16 "25" _null_ _null_ _null_ _null_ pg_unpublish_snapshot _null_ _null_ _null_ ));
+ DESCR("unpublish a snapshot");
+ DATA(insert OID = 3117 ( pg_subscribe_snapshot PGNSP PGUID 12 1 0 0 f f f t f v 1 0 16 "25" _null_ _null_ _null_ _null_ pg_subscribe_snapshot _null_ _null_ _null_ ));
+ DESCR("subscribe to a published snapshot");
DATA(insert OID = 2172 ( pg_start_backup PGNSP PGUID 12 1 0 0 f f f t f v 2 0 25 "25 16" _null_ _null_ _null_ _null_ pg_start_backup _null_ _null_ _null_ ));
DESCR("prepare for taking an online backup");
DATA(insert OID = 2173 ( pg_stop_backup PGNSP PGUID 12 1 0 0 f f f t f v 0 0 25 "" _null_ _null_ _null_ _null_ pg_stop_backup _null_ _null_ _null_ ));
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 548e7e0..44f0ac8 100644
*** a/src/include/storage/lwlock.h
--- b/src/include/storage/lwlock.h
*************** typedef enum LWLockId
*** 70,75 ****
--- 70,76 ----
RelationMappingLock,
AsyncCtlLock,
AsyncQueueLock,
+ SyncSnapshotLock,
/* Individual lock IDs end here */
FirstBufMappingLock,
FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 959033e..f3387fc 100644
*** a/src/include/storage/procarray.h
--- b/src/include/storage/procarray.h
*************** extern void XidCacheRemoveRunningXids(Tr
*** 72,75 ****
--- 72,81 ----
int nxids, const TransactionId *xids,
TransactionId latestXid);
+ extern Size SyncSnapshotShmemSize(void);
+ extern void SyncSnapshotInit(void);
+ extern bool PublishSnapshot(Snapshot snapshot, const char *name);
+ extern bool UnpublishSnapshot(const char *name);
+ extern bool SubscribeSnapshot(const char *name, Snapshot snapshot);
+
#endif /* PROCARRAY_H */
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index f03647b..a8292be 100644
*** a/src/include/utils/snapmgr.h
--- b/src/include/utils/snapmgr.h
*************** extern void AtSubAbort_Snapshot(int leve
*** 43,46 ****
--- 43,50 ----
extern void AtEarlyCommit_Snapshot(void);
extern void AtEOXact_Snapshot(bool isCommit);
+ extern Datum pg_publish_snapshot(PG_FUNCTION_ARGS);
+ extern Datum pg_unpublish_snapshot(PG_FUNCTION_ARGS);
+ extern Datum pg_subscribe_snapshot(PG_FUNCTION_ARGS);
+
#endif /* SNAPMGR_H */
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers