On Thu, Dec 30, 2010 at 7:31 AM, Joachim Wieland <j...@mcknight.de> wrote:
> What I am proposing now is the following:
> We return snapshot information as a chunk of data to the client. At
> the same time however, we set a checksum in shared memory to protect
> against modification of the snapshot. A publishing backend can revoke
> its snapshot by deleting the checksum and a backend that is asked to
> install a snapshot can verify that the snapshot is correct and current
> by calculating the checksum and comparing it with the one in shared
> memory.

So here's the patch implementing this idea.

I named the user visible functions pg_export_snapshot() and

A user starts a transaction and calls pg_export_snapshot() to get a
chunk of bytea data. In a different connection he opens a transaction in
isolation level serializable and passes that chunk of data into
pg_import_snapshot(). Now subsequent queries of the second connection see the
snapshot that was current when pg_export_snapshot() has been executed. In case
both transactions are in isolation level serializable then both see the same
data from this moment on (this is the case of pg_dump).

There are most probably a few loose ends and someone who is more familiar to
this area than me needs to look into it but at least everybody should be happy
now with the overall approach.

These are the implementation details and restrictions of the patch:

The exporting transaction

    - should be read-only (to discourage people from expecting that writes of
      the exporting transaction can be seen by the importing transaction)
    - must not be a subtransaction (we don't add subxips of our own transaction
      to the snapshot, so importing the snapshot later would result in missing
    - adds its own xid (if any) to the xip-array

The importing transaction

    - will not import a snapshot of the same backend (even though it would
      probably work)
    - will not import a snapshot of a different database in the cluster
    - should be isolation level serializable
    - must not be a subtransaction (can we completely rollback on
    - leaves curcid as is, otherwise effects of previous commands would get lost
      and magically appear later when curcid increases
    - applies xmin, xmax, xip, subxip values of the exported snapshot to
      GetActiveSnapshot() and GetTransactionSnapshot()
    - takes itself out of the xip array
    - updates MyProc->xmin but sets it only backwards but not forwards

The snapshot is invalidated on transaction commit/rollback as well as when doing
"prepare transaction".

Comments welcome.

diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 95beba8..c24150f 100644
*** a/src/backend/storage/ipc/ipci.c
--- b/src/backend/storage/ipc/ipci.c
*************** CreateSharedMemoryAndSemaphores(bool mak
*** 124,129 ****
--- 124,130 ----
  		size = add_size(size, BTreeShmemSize());
  		size = add_size(size, SyncScanShmemSize());
  		size = add_size(size, AsyncShmemSize());
+ 		size = add_size(size, SyncSnapshotShmemSize());
  		size = add_size(size, ShmemBackendArraySize());
*************** CreateSharedMemoryAndSemaphores(bool mak
*** 228,233 ****
--- 229,235 ----
+ 	SyncSnapshotInit();
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 980996e..e851bcd 100644
*** a/src/backend/storage/ipc/procarray.c
--- b/src/backend/storage/ipc/procarray.c
*** 50,60 ****
--- 50,62 ----
  #include "access/transam.h"
  #include "access/xact.h"
  #include "access/twophase.h"
+ #include "libpq/md5.h"
  #include "miscadmin.h"
  #include "storage/procarray.h"
  #include "storage/spin.h"
  #include "storage/standby.h"
  #include "utils/builtins.h"
+ #include "utils/bytea.h"
  #include "utils/snapmgr.h"
*************** static int KnownAssignedXidsGetAndSetXmi
*** 159,164 ****
--- 161,170 ----
  static TransactionId KnownAssignedXidsGetOldestXmin(void);
  static void KnownAssignedXidsDisplay(int trace_level);
+ typedef char snapshotChksum[16];
+ static snapshotChksum *syncSnapshotChksums;
+ static Snapshot exportedSnapshot;
   * Report shared-memory space needed by CreateSharedProcArray.
*************** KnownAssignedXidsDisplay(int trace_level
*** 3065,3067 ****
--- 3071,3335 ----
+ /*
+  *  Report space needed for our shared memory area, which is basically an
+  *  md5 checksum per connection.
+  */
+ Size
+ SyncSnapshotShmemSize(void)
+ {
+ 	return PROCARRAY_MAXPROCS * sizeof(snapshotChksum);
+ }
+ void
+ SyncSnapshotInit(void)
+ {
+ 	Size	size;
+ 	bool	found;
+ 	int		i;
+ 	size = SyncSnapshotShmemSize();
+ 	syncSnapshotChksums = (snapshotChksum*) ShmemInitStruct("SyncSnapshotChksums",
+ 															size, &found);
+ 	if (!found)
+ 		for (i = 0; i < PROCARRAY_MAXPROCS; i++)
+ 			memset(syncSnapshotChksums[i], 0, sizeof(snapshotChksum));
+ }
+ void
+ InvalidateExportedSnapshot(void)
+ {
+ 	if (syncSnapshotChksums[MyBackendId][0] == '\0')
+ 		return;
+ 	memset(syncSnapshotChksums[MyBackendId], 0, sizeof(snapshotChksum));
+ 	UnregisterSnapshotFromOwner(exportedSnapshot, TopTransactionResourceOwner);
+ 	exportedSnapshot = NULL;
+ }
+ static void
+ snapshot_appendf(char ** buffer, int *bufsize_filled, int *bufsize_left, char *fmt, ...)
+ {
+ 	va_list		ap;
+ 	int			ret;
+ 	if (*bufsize_left < 64)
+ 	{
+ 		/* enlarge buffer by 1024 bytes */
+ 		*buffer = repalloc(*buffer, *bufsize_filled + 1024);
+ 		*bufsize_left = *bufsize_filled + 1024;
+ 	}
+ 	va_start(ap, fmt);
+ 	ret = vsnprintf(*buffer + *bufsize_filled, *bufsize_left, fmt, ap);
+ 	va_end(ap);
+ 	/* There shouldn't be any error, we leave enough room for the data that we
+ 	 * are writing (only numbers basically). */
+ 	Assert(ret < *bufsize_left && ret > 0);
+ 	*bufsize_left -= ret;
+ 	*bufsize_filled += ret;
+ 	Assert(strlen(*buffer) == *bufsize_filled);
+ }
+ bytea *
+ ExportSnapshot(Snapshot snapshot)
+ {
+ 	int				bufsize = 1024;
+ 	int				bufsize_filled = 0; /* doesn't include NUL byte */
+ 	int				bufsize_left = bufsize - bufsize_filled;
+ 	char		   *buf = (char *) palloc(bufsize);
+ 	snapshotChksum	cksum;
+ 	int				i;
+ 	/* In a subtransaction we don't see our subxip values in the snapshot so
+ 	 * they would be missing in the backend applying it. */
+ 	if (GetCurrentTransactionNestLevel() != 1)
+ 		elog(ERROR, "Can only export a snapshot from a top level transaction.");
+ 	/* Write up all the data that we return */
+ 	snapshot_appendf(&buf, &bufsize_filled, &bufsize_left,
+ 							"did:%d ", MyDatabaseId);
+ 	snapshot_appendf(&buf, &bufsize_filled, &bufsize_left,
+ 							"bid:%d ", MyBackendId);
+ 	snapshot_appendf(&buf, &bufsize_filled, &bufsize_left,
+ 							"xmi:%d ", snapshot->xmin);
+ 	snapshot_appendf(&buf, &bufsize_filled, &bufsize_left,
+ 							"xma:%d ", snapshot->xmax);
+ 	for (i = 0; i < snapshot->xcnt; i++)
+ 		snapshot_appendf(&buf, &bufsize_filled, &bufsize_left,
+ 								"xip:%d ", snapshot->xip[i]);
+ 	/*
+ 	 * Finally add our own XID if we have one, since by definition we will
+ 	 * still be running when the other transaction takes over the snapshot.
+ 	 */
+ 	if (TransactionIdIsValid(GetTopTransactionIdIfAny()))
+ 		snapshot_appendf(&buf, &bufsize_filled, &bufsize_left,
+ 								"xip:%d ", GetTopTransactionIdIfAny());
+ 	if (snapshot->suboverflowed)
+ 		snapshot_appendf(&buf, &bufsize_filled, &bufsize_left,
+ 								"sof:1 ");
+ 	else
+ 		for (i = 0; i < snapshot->subxcnt; i++)
+ 			snapshot_appendf(&buf, &bufsize_filled, &bufsize_left,
+ 									"sxp:%d ", snapshot->subxip[i]);
+ 	/* Register the snapshot */
+ 	snapshot = RegisterSnapshotOnOwner(snapshot, TopTransactionResourceOwner);
+ 	exportedSnapshot = snapshot;
+ 	pg_md5_binary(buf, bufsize_filled, (void *) &cksum);
+ 	memcpy(syncSnapshotChksums[MyBackendId], &cksum, sizeof(snapshotChksum));
+ 	if (!XactReadOnly)
+ 		elog(WARNING, "A snapshot exporting function should be readonly.");
+ 	return DatumGetByteaP(DirectFunctionCall1(byteain, CStringGetDatum(buf)));
+ }
+ static Oid
+ parseOidFromText(char **s, const char *prefix)
+ {
+ 	char   *n, *p = strstr(*s, prefix);
+ 	Oid		oid;
+ 	if (!p)
+ 		return InvalidOid;
+ 	p += strlen(prefix);
+ 	n = strchr(p, ' ');
+ 	if (!n)
+ 		return InvalidOid;
+ 	*n = '\0';
+ 	oid = DatumGetObjectId(DirectFunctionCall1(oidin, CStringGetDatum(p)));
+ 	*s = n + 1;
+ 	return oid;
+ }
+ static bool
+ parseBoolFromText(char **s, const char *prefix)
+ {
+ 	/* It's safe to overload parseOid to parse 0/1. This returns false if the
+ 	 * entry could not be found, at least as long as InvalidOid is defined to be 0. */
+ 	Assert(InvalidOid == (Oid) 0);
+ 	return (bool) parseOidFromText(s, prefix);
+ }
+ static TransactionId
+ parseXactFromText(char **s, const char *prefix)
+ {
+ 	char			*n, *p = strstr(*s, prefix);
+ 	TransactionId	xid;
+ 	if (!p)
+ 		return InvalidTransactionId;
+ 	p += strlen(prefix);
+ 	n = strchr(p, ' ');
+ 	if (!n)
+ 		return InvalidTransactionId;
+ 	*n = '\0';
+ 	xid = DatumGetTransactionId(DirectFunctionCall1(xidin, CStringGetDatum(p)));
+ 	*s = n + 1;
+ 	return xid;
+ }
+ bool
+ ImportSnapshot(bytea *snapshotData, Snapshot snapshot)
+ {
+ 	snapshotChksum	cksum;
+ 	Oid				databaseId;
+ 	Oid				backendId;
+ 	int				i;
+ 	TransactionId	xid;
+ 	int				len = VARSIZE_ANY_EXHDR(snapshotData);
+ 	char			*s = palloc(len + 1);
+ 	if (GetCurrentTransactionNestLevel() != 1)
+ 		elog(ERROR, "Can only import a snapshot to a top level transaction.");
+ 	Assert(len > 0);
+ 	strncpy(s, VARDATA_ANY(snapshotData), len);
+ 	s[len] = '\0';
+ 	pg_md5_binary(s, strlen(s), (void *) &cksum);
+ 	if ((databaseId = parseOidFromText(&s, "did:")) == InvalidOid)
+ 		return false;
+ 	if (databaseId != MyDatabaseId)
+ 		return false;
+ 	if ((backendId = parseOidFromText(&s, "bid:")) == InvalidOid)
+ 		return false;
+ 	if (backendId == MyBackendId)
+ 		return false;
+ 	/*
+ 	 * Lock considerations:
+ 	 *
+ 	 * syncSnapshotChksums[backendId] is only changed by the backend with ID
+ 	 * backendID and read by another backend that is asked to import a
+ 	 * snapshot.
+ 	 * syncSnapshotChksums[backendId] contains either NUL bytes or the checksum
+ 	 * of a valid snapshot.
+ 	 * Every comparision to the checksum while it is being written or
+ 	 * deleted is okay to fail, so we don't need to take a lock at all.
+ 	 */
+ 	if (memcmp(&cksum, syncSnapshotChksums[backendId], sizeof(snapshotChksum)))
+ 		return false;
+ 	snapshot->xmin = parseXactFromText(&s, "xmi:");
+ 	Assert(snapshot->xmin != InvalidTransactionId);
+ 	snapshot->xmax = parseXactFromText(&s, "xma:");
+ 	Assert(snapshot->xmax != InvalidTransactionId);
+ 	i = 0;
+ 	while ((xid = parseXactFromText(&s, "xip:")) != InvalidTransactionId)
+ 	{
+ 		if (xid == GetTopTransactionIdIfAny())
+ 			continue;
+ 		snapshot->xip[i++] = xid;
+ 	}
+ 	snapshot->xcnt = i;
+ 	/*
+ 	 * We only write "sof:1" if the snapshot overflowed. If not, then there is
+ 	 * no "sof:x" entry at all and parseBoolFromText() will just return false.
+ 	 */
+ 	snapshot->suboverflowed = parseBoolFromText(&s, "sof:");
+ 	i = 0;
+ 	while ((xid = parseXactFromText(&s, "sxp:")) != InvalidTransactionId)
+ 		snapshot->subxip[i++] = xid;
+ 	snapshot->subxcnt = i;
+ 	/* Leave snapshot->curcid as is. */
+ 	/*
+ 	 * No ProcArrayLock held here, we assume that a write is atomic. Also note
+ 	 * that MyProc->xmin can go backwards here. However this is safe because
+ 	 * the xmin we set here is the same as in the backend's proc->xmin whose
+ 	 * snapshot we are copying. At this very moment, anybody computing a
+ 	 * minimum will calculate at least this xmin as the overall xmin with or
+ 	 * without us setting MyProc->xmin to this value.
+ 	 * Instead we must check to not go forward (we might have opened a cursor
+ 	 * in this transaction and still have its snapshot registered)
+ 	 */
+ 	if (TransactionIdPrecedes(snapshot->xmin, MyProc->xmin))
+ 		MyProc->xmin = snapshot->xmin;
+ 	/*
+ 	 * If we are in read committed mode then the next query will execute with a
+ 	 * new snapshot making this function call quite useless.
+ 	 */
+ 	if (!IsolationUsesXactSnapshot())
+ 		elog(WARNING, "A snapshot importing transaction should be ISOLATION LEVEL SERIALIZABLE");
+ 	return true;
+ }
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 273d8bd..75315bb 100644
*** a/src/backend/utils/time/snapmgr.c
--- b/src/backend/utils/time/snapmgr.c
*** 29,34 ****
--- 29,35 ----
  #include "access/xact.h"
  #include "storage/proc.h"
  #include "storage/procarray.h"
+ #include "utils/builtins.h"
  #include "utils/memutils.h"
  #include "utils/memutils.h"
  #include "utils/resowner.h"
*************** AtEarlyCommit_Snapshot(void)
*** 523,528 ****
--- 524,530 ----
  	registered_xact_snapshot = false;
+ 	InvalidateExportedSnapshot();
*************** AtEOXact_Snapshot(bool isCommit)
*** 559,561 ****
--- 561,585 ----
  	FirstSnapshotSet = false;
  	registered_xact_snapshot = false;
+ Datum
+ pg_export_snapshot(PG_FUNCTION_ARGS)
+ {
+ 	bytea  *snapshotData = ExportSnapshot(GetTransactionSnapshot());
+ 	PG_RETURN_BYTEA_P(snapshotData);
+ }
+ Datum
+ pg_import_snapshot(PG_FUNCTION_ARGS)
+ {
+ 	bytea  *snapshotData = PG_GETARG_BYTEA_P(0);
+ 	bool	ret = true;
+ 	if (ActiveSnapshotSet())
+ 		ret = ImportSnapshot(snapshotData, GetActiveSnapshot());
+ 	ret &= ImportSnapshot(snapshotData, GetTransactionSnapshot());
+ }
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index c624243..fa06f07 100644
*** a/src/include/catalog/pg_proc.h
--- b/src/include/catalog/pg_proc.h
*************** DATA(insert OID = 2171 ( pg_cancel_backe
*** 3375,3380 ****
--- 3375,3384 ----
  DESCR("cancel a server process' current query");
  DATA(insert OID = 2096 ( pg_terminate_backend		PGNSP PGUID 12 1 0 0 f f f t f v 1 0 16 "23" _null_ _null_ _null_ _null_ pg_terminate_backend _null_ _null_ _null_ ));
  DESCR("terminate a server process");
+ DATA(insert OID = 3115 ( pg_export_snapshot		PGNSP PGUID 12 1 0 0 f f f t f v 0 0 17 "" _null_ _null_ _null_ _null_ pg_export_snapshot _null_ _null_ _null_ ));
+ DESCR("export a snapshot");
+ DATA(insert OID = 3116 ( pg_import_snapshot		PGNSP PGUID 12 1 0 0 f f f t f v 1 0 16 "17" _null_ _null_ _null_ _null_ pg_import_snapshot _null_ _null_ _null_ ));
+ DESCR("import a snapshot");
  DATA(insert OID = 2172 ( pg_start_backup		PGNSP PGUID 12 1 0 0 f f f t f v 2 0 25 "25 16" _null_ _null_ _null_ _null_ pg_start_backup _null_ _null_ _null_ ));
  DESCR("prepare for taking an online backup");
  DATA(insert OID = 2173 ( pg_stop_backup			PGNSP PGUID 12 1 0 0 f f f t f v 0 0 25 "" _null_ _null_ _null_ _null_ pg_stop_backup _null_ _null_ _null_ ));
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index ea030d6..a1ac8c6 100644
*** a/src/include/storage/procarray.h
--- b/src/include/storage/procarray.h
*************** extern void XidCacheRemoveRunningXids(Tr
*** 71,74 ****
--- 71,81 ----
  						  int nxids, const TransactionId *xids,
  						  TransactionId latestXid);
+ extern Size SyncSnapshotShmemSize(void);
+ extern void SyncSnapshotInit(void);
+ extern bytea *ExportSnapshot(Snapshot snapshot);
+ extern bool ImportSnapshot(bytea *snapshotData, Snapshot snapshot);
+ extern void InvalidateExportedSnapshot(void);
  #endif   /* PROCARRAY_H */
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index f03647b..d0e3f9a 100644
*** a/src/include/utils/snapmgr.h
--- b/src/include/utils/snapmgr.h
*************** extern void AtSubAbort_Snapshot(int leve
*** 43,46 ****
--- 43,49 ----
  extern void AtEarlyCommit_Snapshot(void);
  extern void AtEOXact_Snapshot(bool isCommit);
+ extern Datum pg_export_snapshot(PG_FUNCTION_ARGS);
+ extern Datum pg_import_snapshot(PG_FUNCTION_ARGS);
  #endif   /* SNAPMGR_H */
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:

Reply via email to