Here is a new version of the patch incorporating most of Noah's
suggestions. The patch now also adds documentation. Since I couldn't
really find a suitable section to document the two new functions, I
added a new one for now. Any better ideas where it should go?

On Thu, Jan 20, 2011 at 1:37 AM, Noah Misch <n...@leadboat.com> wrote:
> Just to clarify, I was envisioning something like:
>
> typedef struct { bool valid; char value[16]; } snapshotChksum;
>
> I don't object to the way you've done it, but someone else might not like the
> extra marshalling between text and binary.  Your call.

I didn't like it in the beginning but later on I adopted your
proposal. I have also changed the locking to be more natural. Even
though we don't really need it, I am now grabbing shared ProcArrayLock
for any reads of shared memory and exclusive lock for writes. Of
course no additional lock is taken if the feature is not used.


> You're right.  Then consider "VALUES (pg_import_snapshot('...'), (SELECT
> count(*) from t))" at READ COMMITTED.  It works roughly as I'd guess; the
> subquery uses the imported snapshot.  If I flip the two expressions and do
> "VALUES ((SELECT count(*) from t), pg_import_snapshot('...'))", the subquery
> uses the normal snapshot.  That makes sense, but I can't really see a use case
> for it.  A warning, like your code has today, seems appropriate.

Yeah, that would do what you wanted to illustrate but it truely cannot
be considered the standard use case  :-)


>> > Is it valid to scribble directly on snapshots like this?
>> I figured that previously executed code still has references pointing
>> to the snapshots and so we cannot just push a new snapshot on top but
>> really need to change the memory where they are pointing to.
> Okay.  Had no special reason to believe otherwise, just didn't know.

This is one part where I'd like someone more experienced than me look into it.


> Thanks; that was handy.  One thing I noticed is that the second "SELECT * FROM
> kidseen" yields zero rows instead of yielding "ERROR:  relation "kidseen" does
> not exist".  I changed things around per the attached "test-drop.pl", such 
> that
> the table is already gone in the ordinary snapshot, but still visible to the
> imported snapshot.  Note how the pg_class row is visible, but an actual 
> attempt
> to query the table fails.  Must some kind of syscache invalidation follow the
> snapshot alteration to make this behave normally?

As discussed with Noah off-list this is just not an MVCC safe
operation. You could hit on this in a regular SERIALIZABLE transaction
as well: Somebody else can delete a table and you won't be able to
access it anymore. This is also the reason why pg_dump in the
beginning gets a shared lock on every table that it will dump later
on.


Thanks for the review Noah,

Joachim
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index ed2039c..4169594 100644
*** a/doc/src/sgml/func.sgml
--- b/doc/src/sgml/func.sgml
*************** FOR EACH ROW EXECUTE PROCEDURE suppress_
*** 14761,14764 ****
--- 14761,14860 ----
          <xref linkend="SQL-CREATETRIGGER">.
      </para>
    </sect1>
+ 
+   <sect1 id="functions-snapshotsync">
+    <title>Snapshot Synchronization Functions</title>
+ 
+    <indexterm>
+      <primary>pg_export_snapshot</primary>
+    </indexterm>
+    <indexterm>
+      <primary>pg_import_snapshot</primary>
+    </indexterm>
+ 
+    <para>
+      <productname>PostgreSQL</> allows different sessions to synchronize their
+      snapshots. A database snapshot determines which data is visible to
+      the client that is using this snapshot. Synchronized snapshots are necessary if
+      two clients need to see the same content in the database. If these two clients
+      just connected to the database and opened their transactions, then they could
+      never be sure that there was no data modification right between both
+      connections. To solve this, <productname>PostgreSQL</> offers the two functions
+      <function>pg_export_snapshot</> and <function>pg_import_snapshot</>.
+    </para>
+    <para>
+      The idea is that one client retrieves the information about the snapshot that it
+      is currently using and then the second client passes this information back to
+      the server. The database server will then modify the second client's snapshot
+      to match the snapshot of the first and from then on both can see the identical
+      data in the database.
+    </para>
+    <para>
+      Note that a snapshot can only be imported as long as the transaction that
+      exported it originally is held open. Also note that even after the
+      synchronization both clients still run their own independent transactions.
+      As a consequence, even though synchronized with respect to reading pre-existing
+      data, both transactions won't be able to see each other's uncommitted data.
+    </para>
+    <table id="functions-snapshot-synchronization">
+     <title>Snapshot Synchronization Functions</title>
+     <tgroup cols="3">
+      <thead>
+       <row><entry>Name</entry> <entry>Return Type</entry> <entry>Description</entry>
+       </row>
+      </thead>
+ 
+      <tbody>
+       <row>
+        <entry>
+         <literal><function>pg_export_snapshot()</function></literal>
+        </entry>
+        <entry><type>bytea</type></entry>
+        <entry>Export the snapshot and return its textual representation</entry>
+       </row>
+       <row>
+        <entry>
+         <literal><function>pg_import_snapshot(<parameter>snapshotData</> <type>bytea</>)</function></literal>
+        </entry>
+        <entry><type>boolean</type></entry>
+        <entry>Import a previously exported snapshot</entry>
+       </row>
+      </tbody>
+     </tgroup>
+    </table>
+ 
+    <para>
+       The function <function>pg_export_snapshot</> does not take an argument
+       and returns the current snapshot information as <type>bytea</type> data.
+       You should not assume any maximal length for this data. Also make sure that
+       the transaction that exports the snapshot will continue to live at least until
+       the snapshot has been imported into another connection. As soon as the
+       transaction ends, the exported snapshot becomes invalid and cannot be imported
+       anymore. You should not rely on what exactly the function returns. If you need
+       to access any transaction IDs of the current transaction, please refer to <xref
+       linkend="functions-txid-snapshot"> instead.
+    </para>
+ <programlisting>
+ BEGIN TRANSACTION READ ONLY;
+ SELECT pg_export_snapshot();
+ </programlisting>
+    <para>
+       <function>pg_import_snapshot</> takes an argument of type <type>bytea</type>
+       and returns a <type>boolean</type> value. Actually the function will
+       always either return true or throw an error. As the argument you need to pass
+       the exact value that has been returned by <function>pg_export_snapshot</>.
+ 	  The function can only be used reasonably if your <literal>TRANSACTION ISOLATION
+       LEVEL</> (see <xref
+       linkend="transaction-iso">) is set to <literal>SERIALIZABLE</> (or
+       <literal>REPEATABLE READ</> which will implicitly use <literal>SERIALIZABLE</>).
+       The reason is that if your transaction is set to <literal>READ COMMITTED</>,
+       it will acquire a new snapshot for the next query and will not use the one that
+       has just been imported.
+    </para>
+ <programlisting>
+ BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
+ SELECT pg_import_snapshot('\x6469643a31206269643a3320786d693a31303133313420786d613a3130313331342078636e743a30207378636e743a3020');
+ </programlisting>
+   </sect1>
  </chapter>
+ 
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 2dbac56..c96942a 100644
*** a/src/backend/storage/ipc/ipci.c
--- b/src/backend/storage/ipc/ipci.c
*************** CreateSharedMemoryAndSemaphores(bool mak
*** 124,129 ****
--- 124,130 ----
  		size = add_size(size, BTreeShmemSize());
  		size = add_size(size, SyncScanShmemSize());
  		size = add_size(size, AsyncShmemSize());
+ 		size = add_size(size, SyncSnapshotShmemSize());
  #ifdef EXEC_BACKEND
  		size = add_size(size, ShmemBackendArraySize());
  #endif
*************** CreateSharedMemoryAndSemaphores(bool mak
*** 228,233 ****
--- 229,235 ----
  	BTreeShmemInit();
  	SyncScanShmemInit();
  	AsyncShmemInit();
+ 	SyncSnapshotInit();
  
  #ifdef EXEC_BACKEND
  
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 8b36df4..5caddec 100644
*** a/src/backend/storage/ipc/procarray.c
--- b/src/backend/storage/ipc/procarray.c
***************
*** 50,60 ****
--- 50,63 ----
  #include "access/transam.h"
  #include "access/xact.h"
  #include "access/twophase.h"
+ #include "libpq/md5.h"
  #include "miscadmin.h"
  #include "storage/procarray.h"
  #include "storage/spin.h"
  #include "storage/standby.h"
  #include "utils/builtins.h"
+ #include "utils/bytea.h"
+ #include "utils/memutils.h"
  #include "utils/snapmgr.h"
  
  
*************** static int KnownAssignedXidsGetAndSetXmi
*** 158,163 ****
--- 161,175 ----
  							   TransactionId xmax);
  static TransactionId KnownAssignedXidsGetOldestXmin(void);
  static void KnownAssignedXidsDisplay(int trace_level);
+ static void InvalidateExportedSnapshot(void);
+ 
+ typedef char snapshotChksumPlain[16];
+ typedef struct {
+ 	snapshotChksumPlain chksum;
+ 	bool			valid;
+ } snapshotChksum;
+ static snapshotChksum *syncSnapshotChksums;
+ static Snapshot exportedSnapshot;
  
  /*
   * Report shared-memory space needed by CreateSharedProcArray.
*************** ProcArrayRemove(PGPROC *proc, Transactio
*** 350,355 ****
--- 362,373 ----
  void
  ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
  {
+ 	/*
+ 	 * Release any exported snapshots.
+ 	 */
+ 	if (exportedSnapshot)
+ 		InvalidateExportedSnapshot();
+ 
  	if (TransactionIdIsValid(latestXid))
  	{
  		/*
*************** void
*** 415,420 ****
--- 433,445 ----
  ProcArrayClearTransaction(PGPROC *proc)
  {
  	/*
+ 	 * Release any exported snapshots (Note that this will take exclusive
+ 	 * ProcArrayLock if the feature has been used).
+ 	 */
+ 	if (exportedSnapshot)
+ 		InvalidateExportedSnapshot();
+ 
+ 	/*
  	 * We can skip locking ProcArrayLock here, because this action does not
  	 * actually change anyone's view of the set of running XIDs: our entry is
  	 * duplicate with the gxact that has already been inserted into the
*************** KnownAssignedXidsDisplay(int trace_level
*** 3065,3067 ****
--- 3090,3469 ----
  
  	pfree(buf.data);
  }
+ 
+ 
+ /*
+  *  Report space needed for our shared memory area, which is basically an
+  *  md5 checksum per connection.
+  */
+ Size
+ SyncSnapshotShmemSize(void)
+ {
+ 	return PROCARRAY_MAXPROCS * sizeof(snapshotChksum);
+ }
+ 
+ void
+ SyncSnapshotInit(void)
+ {
+ 	Size		size;
+ 	bool		found;
+ 	int			i;
+ 
+ 	size = SyncSnapshotShmemSize();
+ 
+ 	syncSnapshotChksums = (snapshotChksum*) ShmemInitStruct("SyncSnapshotChksums",
+ 															size, &found);
+ 	if (!found)
+ 		for (i = 0; i < PROCARRAY_MAXPROCS; i++)
+ 			syncSnapshotChksums[i].valid = false;
+ }
+ 
+ static void
+ InvalidateExportedSnapshot(void)
+ {
+ 	/* See lock considerations at ImportSnapshot() */
+ 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ 	syncSnapshotChksums[MyBackendId].valid = false;
+ 	LWLockRelease(ProcArrayLock);
+ 
+ 	UnregisterSnapshotFromOwner(exportedSnapshot, TopTransactionResourceOwner);
+ 	exportedSnapshot = NULL;
+ }
+ 
+ static void
+ snapshot_appendf(char ** buffer, int *bufsize_filled, int *bufsize_left, char *fmt, ...)
+ {
+ 	va_list		ap;
+ 	int			ret;
+ 
+ 	if (*bufsize_left < 64)
+ 	{
+ 		/* enlarge buffer by 1024 bytes */
+ 		*buffer = repalloc(*buffer, *bufsize_filled + 1024);
+ 		*bufsize_left = *bufsize_filled + 1024;
+ 	}
+ 
+ 	va_start(ap, fmt);
+ 	ret = vsnprintf(*buffer + *bufsize_filled, *bufsize_left, fmt, ap);
+ 	va_end(ap);
+ 
+ 	/* There shouldn't be any error, we leave enough room for the data that we
+ 	 * are writing (only numbers basically). */
+ 	Assert(ret < *bufsize_left && ret > 0);
+ 
+ 	*bufsize_left -= ret;
+ 	*bufsize_filled += ret;
+ 
+ 	Assert(strlen(*buffer) == *bufsize_filled);
+ }
+ 
+ bytea *
+ ExportSnapshot(Snapshot snapshot)
+ {
+ #define SNAPSHOT_APPEND(x, y) \
+ 	(snapshot_appendf(&buf, &bufsize_filled, &bufsize_left, (x), (y)))
+ 	int			bufsize = 1024;
+ 	int			bufsize_filled = 0; /* doesn't include NUL byte */
+ 	int			bufsize_left = bufsize - bufsize_filled;
+ 	char	   *buf = (char *) palloc(bufsize);
+ 	TransactionId *children;
+ 	int			i;
+ 	int			nchildren;
+ 
+ 	/* In a subtransaction we don't see our open subxip values in the snapshot
+ 	 * so they would be missing in the backend applying it. */
+ 	if (GetCurrentTransactionNestLevel() != 1)
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
+ 				 errmsg("cannot export a snapshot from a subtransaction")));
+ 
+ 	/* We do however see our already committed subxip values and add them to
+ 	 * the subxip array. */
+ 	nchildren = xactGetCommittedChildren(&children);
+ 
+ 	/* Write up all the data that we return */
+ 	SNAPSHOT_APPEND("did:%d ", MyDatabaseId);
+ 	SNAPSHOT_APPEND("bid:%d ", MyBackendId);
+ 	SNAPSHOT_APPEND("xmi:%d ", snapshot->xmin);
+ 	SNAPSHOT_APPEND("xma:%d ", snapshot->xmax);
+ 	/* Include our own transaction ID into the count if any. */
+ 	SNAPSHOT_APPEND("xcnt:%d ", TransactionIdIsValid(GetTopTransactionIdIfAny()) ?
+ 								snapshot->xcnt + 1 : snapshot->xcnt);
+ 	for (i = 0; i < snapshot->xcnt; i++)
+ 		SNAPSHOT_APPEND("xip:%d ", snapshot->xip[i]);
+ 	/*
+ 	 * Finally add our own XID if we have one, since by definition we will
+ 	 * still be running when the other transaction takes over the snapshot.
+ 	 */
+ 	if (TransactionIdIsValid(GetTopTransactionIdIfAny()))
+ 		SNAPSHOT_APPEND("xip:%d ", GetTopTransactionIdIfAny());
+ 	if (snapshot->suboverflowed || snapshot->subxcnt + nchildren > TOTAL_MAX_CACHED_SUBXIDS)
+ 		SNAPSHOT_APPEND("sof:%d ", 1);
+ 	else
+ 	{
+ 		SNAPSHOT_APPEND("sxcnt:%d ", snapshot->subxcnt + nchildren);
+ 		for (i = 0; i < snapshot->subxcnt; i++)
+ 			SNAPSHOT_APPEND("sxp:%d ", snapshot->subxip[i]);
+ 		/* Add already committed subtransactions. */
+ 		for (i = 0; i < nchildren; i++)
+ 			SNAPSHOT_APPEND("sxp:%d ", children[i]);
+ 	}
+ 
+ 	/*
+ 	 * buf ends with a trailing space but we leave it in for simplicity. The
+ 	 * parsing routines also depend on it.
+ 	 */
+ 
+ 	/* Register the snapshot */
+ 	snapshot = RegisterSnapshotOnOwner(snapshot, TopTransactionResourceOwner);
+ 	exportedSnapshot = snapshot;
+ 
+ 	/* See lock considerations at ImportSnapshot() */
+ 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ 	if (!pg_md5_binary(buf, bufsize_filled, syncSnapshotChksums[MyBackendId].chksum))
+ 	{
+ 		LWLockRelease(ProcArrayLock);
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_OUT_OF_MEMORY),
+ 				 errmsg("out of memory")));
+ 	}
+ 	syncSnapshotChksums[MyBackendId].valid = true;
+ 	LWLockRelease(ProcArrayLock);
+ 
+ 	return DatumGetByteaP(DirectFunctionCall1(byteain, CStringGetDatum(buf)));
+ #undef SNAPSHOT_APPEND
+ }
+ 
+ static Oid
+ parseOidFromText(char **s, const char *prefix)
+ {
+ 	char	   *n, *p = strstr(*s, prefix);
+ 	Oid			oid;
+ 
+ 	if (!p)
+ 		return InvalidOid;
+ 	p += strlen(prefix);
+ 	n = strchr(p, ' ');
+ 	if (!n)
+ 		return InvalidOid;
+ 	*n = '\0';
+ 	oid = DatumGetObjectId(DirectFunctionCall1(oidin, CStringGetDatum(p)));
+ 	*s = n + 1;
+ 	return oid;
+ }
+ 
+ static int
+ parseIntFromText(char **s, const char *prefix, int notfound)
+ {
+ 	char	   *n, *p = strstr(*s, prefix);
+ 	int			i;
+ 
+ 	if (!p)
+ 		return notfound;
+ 	p += strlen(prefix);
+ 	n = strchr(p, ' ');
+ 	if (!n)
+ 		return notfound;
+ 	*n = '\0';
+ 	i = DatumGetInt32(DirectFunctionCall1(int4in, CStringGetDatum(p)));
+ 	*s = n + 1;
+ 	return i;
+ }
+ 
+ static bool
+ parseBoolFromText(char **s, const char *prefix)
+ {
+ 	return (bool) parseIntFromText(s, prefix, 0);
+ }
+ 
+ static TransactionId
+ parseXactFromText(char **s, const char *prefix)
+ {
+ 	char	   *n, *p = strstr(*s, prefix);
+ 	TransactionId xid;
+ 
+ 	if (!p)
+ 		return InvalidTransactionId;
+ 	p += strlen(prefix);
+ 	n = strchr(p, ' ');
+ 	if (!n)
+ 		return InvalidTransactionId;
+ 	*n = '\0';
+ 	xid = DatumGetTransactionId(DirectFunctionCall1(xidin, CStringGetDatum(p)));
+ 	*s = n + 1;
+ 	return xid;
+ }
+ 
+ /*
+  * Import a previously exported snapshot: First check the data, then compare
+  * the checksum with the exported snapshots. Finally update the snapshot that
+  * we get passed in.
+  *
+  * Lock considerations:
+  *
+  * syncSnapshotChksums[backendId] is only changed by the backend with ID
+  * backendID and read by another backend that is asked to import a snapshot.
+  *
+  * We'd not really need to take a lock because of the access pattern. Instead we
+  * need to take one because we might run on an architecture with weak memory
+  * ordering: Usually we first invalidate our exported snapshot and then release
+  * our ProcArray entry. A different backend however could see the updated
+  * ProcArray entry but still see our checksum as valid. This can be solved with
+  * a few shared locks but in order to make the access least awkward, we take
+  * exclusive lock on writes and shared lock on reads.
+  */
+ bool
+ ImportSnapshot(bytea *snapshotData, Snapshot snapshot)
+ {
+ 	BackendId	backendId;
+ 	snapshotChksumPlain cksum;
+ 	Oid			databaseId;
+ 	int			i;
+ 	int			len = VARSIZE_ANY_EXHDR(snapshotData);
+ 	char	   *s = palloc(len + 1);
+ 	int			sxcnt, xcnt;
+ 	TransactionId xid;
+ 
+ 	if (GetCurrentTransactionNestLevel() != 1)
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
+ 				 errmsg("cannot import a snapshot into a subtransaction")));
+ 
+ 	if (len == 0)
+ 		ereport(ERROR,
+ 			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ 			 errmsg("invalid data in snapshot information")));
+ 
+ 	strncpy(s, VARDATA_ANY(snapshotData), len);
+ 	s[len] = '\0';
+ 
+ 	if (!pg_md5_binary(s, len, (void *) &cksum))
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_OUT_OF_MEMORY),
+ 				 errmsg("out of memory")));
+ 
+ 	databaseId = parseOidFromText(&s, "did:");
+ 	backendId = (BackendId) parseIntFromText(&s, "bid:", (int) InvalidBackendId);
+ 
+ 	if (databaseId != MyDatabaseId)
+ 		ereport(ERROR,
+ 			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ 			 errmsg("cannot import snapshot from a different database")));
+ 
+ 	if (backendId == InvalidBackendId
+ 		/*
+ 		 * Make sure backendId is in a reasonable range before using it as an
+ 		 * array subscript.
+ 		 */
+ 		|| backendId >= PROCARRAY_MAXPROCS
+ 		|| backendId < 0)
+ 	{
+ 		ereport(ERROR,
+ 			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ 			 errmsg("invalid data in snapshot information")));
+ 	}
+ 
+ 	if (backendId == MyBackendId)
+ 		ereport(ERROR,
+ 			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ 			 errmsg("cannot export and import a snapshot in the same connection")));
+ 
+ 	/*
+ 	 * Verify that the checksum matches before doing any more work. We will
+ 	 * later verify again to make sure that the exporting transaction has not
+ 	 * yet terminated by then. We don't want to optimize this into just one
+ 	 * verification call at the very end because the instructions that follow
+ 	 * this comment rely on a sane format of the textual snapshot data. In
+ 	 * particular they assume that there are not more XactIds than
+ 	 * advertised...
+ 	 */
+ 	if (syncSnapshotChksums[backendId].valid == false
+ 		|| memcmp(cksum, syncSnapshotChksums[backendId].chksum,
+ 						 sizeof(syncSnapshotChksums[backendId].chksum)) != 0)
+ 		ereport(ERROR,
+ 			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ 			 errmsg("snapshot not found exported by any running transaction")));
+ 
+ 	Assert(databaseId != InvalidOid);
+ 
+ 	snapshot->xmin = parseXactFromText(&s, "xmi:");
+ 	Assert(snapshot->xmin != InvalidTransactionId);
+ 	snapshot->xmax = parseXactFromText(&s, "xma:");
+ 	Assert(snapshot->xmax != InvalidTransactionId);
+ 
+ 	xcnt = parseIntFromText(&s, "xcnt:", 0);
+ 	/*
+ 	 * Unfortunately CopySnapshot allocates just one large chunk of memory, and
+ 	 * makes snapshot->xip then point past the end of the fixed-size snapshot
+ 	 * data. That way we cannot just repalloc(snapshot->xip, ...).  Neither can
+ 	 * we just change the base address of the snapshot because this address
+ 	 * might still be saved somewhere.
+ 	 */
+ 	if (snapshot->copied && snapshot->xcnt < xcnt)
+ 		snapshot->xip = MemoryContextAlloc(TopTransactionContext,
+ 										   xcnt * sizeof(TransactionId));
+ 
+ 	i = 0;
+ 	while ((xid = parseXactFromText(&s, "xip:")) != InvalidTransactionId)
+ 	{
+ 		if (xid == GetTopTransactionIdIfAny())
+ 			continue;
+ 		snapshot->xip[i++] = xid;
+ 	}
+ 	snapshot->xcnt = i;
+ 	Assert(snapshot->xcnt == xcnt);
+ 
+ 	/*
+ 	 * We only write "sof:1" if the snapshot overflowed. If not, then there is
+ 	 * no "sof:x" entry at all and parseBoolFromText() will just return false.
+ 	 */
+ 	snapshot->suboverflowed = parseBoolFromText(&s, "sof:");
+ 
+ 	if (!snapshot->suboverflowed)
+ 	{
+ 		sxcnt = parseIntFromText(&s, "sxcnt:", 0);
+ 		if (snapshot->copied && snapshot->subxcnt < sxcnt)
+ 			snapshot->subxip = MemoryContextAlloc(TopTransactionContext,
+ 												  sxcnt * sizeof(TransactionId));
+ 
+ 		i = 0;
+ 		while ((xid = parseXactFromText(&s, "sxp:")) != InvalidTransactionId)
+ 			snapshot->subxip[i++] = xid;
+ 		snapshot->subxcnt = i;
+ 		Assert(snapshot->subxcnt == sxcnt);
+ 	}
+ 
+ 	/* Leave snapshot->curcid as is. */
+ 
+ 	LWLockAcquire(ProcArrayLock, LW_SHARED);
+ 
+ 	/*
+ 	 * Note that MyProc->xmin can go backwards here. However this is safe
+ 	 * because the xmin we set here is the same as in the backend's proc->xmin
+ 	 * whose snapshot we are copying. At this very moment, anybody computing a
+ 	 * minimum will calculate at least this xmin as the overall xmin with or
+ 	 * without us setting MyProc->xmin to this value.
+ 	 * Instead we must check to not go forward (we might have opened a cursor
+ 	 * in this transaction and still have its snapshot registered)
+ 	 */
+ 	if (TransactionIdPrecedes(snapshot->xmin, MyProc->xmin))
+ 		MyProc->xmin = snapshot->xmin;
+ 
+ 	/*
+ 	 * Check the checksum again to prevent a race condition. If the exporting
+ 	 * backend invalidated its snapshot since we last checked then we fail here
+ 	 * and error out, thus invalidating the snapshot we've built up.
+ 	 */
+ 	if (syncSnapshotChksums[backendId].valid == false
+ 		|| memcmp(cksum, syncSnapshotChksums[backendId].chksum,
+ 				  sizeof(syncSnapshotChksums[backendId].chksum)) != 0)
+ 	{
+ 		LWLockRelease(ProcArrayLock);
+ 		ereport(ERROR,
+ 			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ 			 errmsg("invalid data in snapshot information")));
+ 	}
+ 	LWLockRelease(ProcArrayLock);
+ 
+ 	return true;
+ }
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 45b92a0..c5ef1ab 100644
*** a/src/backend/utils/time/snapmgr.c
--- b/src/backend/utils/time/snapmgr.c
*************** AtEOXact_Snapshot(bool isCommit)
*** 559,561 ****
--- 559,593 ----
  	FirstSnapshotSet = false;
  	registered_xact_snapshot = false;
  }
+ 
+ Datum
+ pg_export_snapshot(PG_FUNCTION_ARGS)
+ {
+ 	bytea	   *snapshotData = ExportSnapshot(GetTransactionSnapshot());
+ 	PG_RETURN_BYTEA_P(snapshotData);
+ }
+ 
+ Datum
+ pg_import_snapshot(PG_FUNCTION_ARGS)
+ {
+ 	bytea	   *snapshotData = PG_GETARG_BYTEA_P(0);
+ 	bool		ret = true;
+ 
+ 	if (ActiveSnapshotSet())
+ 		ret = ImportSnapshot(snapshotData, GetActiveSnapshot());
+ 
+ 	ret &= ImportSnapshot(snapshotData, GetTransactionSnapshot());
+ 
+ 	/*
+ 	 * If we are in read committed mode then the next query will execute with a
+ 	 * new snapshot making this function call quite useless.
+ 	 */
+ 	if (!IsolationUsesXactSnapshot())
+ 		ereport(WARNING,
+ 				(errcode(ERRCODE_INAPPROPRIATE_ISOLATION_LEVEL_FOR_BRANCH_TRANSACTION),
+ 				 errmsg("a snapshot importing transaction should be ISOLATION LEVEL SERIALIZABLE/REPEATABLE READ"),
+ 				 errhint("A transaction of isolation level READ COMMITTED/READ UNCOMMITED gives you a new snapshot for each query.")));
+ 
+ 	PG_RETURN_BOOL(ret);
+ }
+ 
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index f8b5d4d..3a4bc6d 100644
*** a/src/include/catalog/pg_proc.h
--- b/src/include/catalog/pg_proc.h
*************** DATA(insert OID = 2171 ( pg_cancel_backe
*** 3391,3396 ****
--- 3391,3400 ----
  DESCR("cancel a server process' current query");
  DATA(insert OID = 2096 ( pg_terminate_backend		PGNSP PGUID 12 1 0 0 f f f t f v 1 0 16 "23" _null_ _null_ _null_ _null_ pg_terminate_backend _null_ _null_ _null_ ));
  DESCR("terminate a server process");
+ DATA(insert OID = 3115 ( pg_export_snapshot		PGNSP PGUID 12 1 0 0 f f f t f v 0 0 17 "" _null_ _null_ _null_ _null_ pg_export_snapshot _null_ _null_ _null_ ));
+ DESCR("export a snapshot");
+ DATA(insert OID = 3116 ( pg_import_snapshot		PGNSP PGUID 12 1 0 0 f f f t f v 1 0 16 "17" _null_ _null_ _null_ _null_ pg_import_snapshot _null_ _null_ _null_ ));
+ DESCR("import a snapshot");
  DATA(insert OID = 2172 ( pg_start_backup		PGNSP PGUID 12 1 0 0 f f f t f v 2 0 25 "25 16" _null_ _null_ _null_ _null_ pg_start_backup _null_ _null_ _null_ ));
  DESCR("prepare for taking an online backup");
  DATA(insert OID = 2173 ( pg_stop_backup			PGNSP PGUID 12 1 0 0 f f f t f v 0 0 25 "" _null_ _null_ _null_ _null_ pg_stop_backup _null_ _null_ _null_ ));
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 334f9a2..9f47a2e 100644
*** a/src/include/storage/procarray.h
--- b/src/include/storage/procarray.h
*************** extern void XidCacheRemoveRunningXids(Tr
*** 71,74 ****
--- 71,80 ----
  						  int nxids, const TransactionId *xids,
  						  TransactionId latestXid);
  
+ extern Size SyncSnapshotShmemSize(void);
+ extern void SyncSnapshotInit(void);
+ 
+ extern bytea *ExportSnapshot(Snapshot snapshot);
+ extern bool ImportSnapshot(bytea *snapshotData, Snapshot snapshot);
+ 
  #endif   /* PROCARRAY_H */
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index 6d784d2..3dcb924 100644
*** a/src/include/utils/snapmgr.h
--- b/src/include/utils/snapmgr.h
*************** extern void AtSubAbort_Snapshot(int leve
*** 43,46 ****
--- 43,49 ----
  extern void AtEarlyCommit_Snapshot(void);
  extern void AtEOXact_Snapshot(bool isCommit);
  
+ extern Datum pg_export_snapshot(PG_FUNCTION_ARGS);
+ extern Datum pg_import_snapshot(PG_FUNCTION_ARGS);
+ 
  #endif   /* SNAPMGR_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to