On 3 April 2017 at 13:46, Craig Ringer <cr...@2ndquadrant.com> wrote:

> OK, updated catalog_xmin logging patch attached.

Ahem, that should be v5.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From 7f742f582e1f6f8f23c4e9d78cd0298180e5387c Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Wed, 22 Mar 2017 13:36:49 +0800
Subject: [PATCH] Log catalog_xmin advances before removing catalog tuples

Before advancing the effective catalog_xmin we use to remove old catalog
tuple versions, make sure it is written to WAL. This allows standbys
to know the oldest xid they can safely create a historic snapshot for.
They can then refuse to start decoding from a slot or raise a recovery
conflict.

The catalog_xmin advance is logged in the next xl_running_xacts records, so
vacuum of catalogs may be held back up to 10 seconds when a replication slot
with catalog_xmin is holding down the global catalog_xmin.
---
 src/backend/access/heap/rewriteheap.c       |  3 +-
 src/backend/access/rmgrdesc/standbydesc.c   |  5 +-
 src/backend/access/rmgrdesc/xlogdesc.c      |  3 +-
 src/backend/access/transam/varsup.c         | 15 +++++
 src/backend/access/transam/xlog.c           | 26 +++++++-
 src/backend/postmaster/bgwriter.c           |  2 +-
 src/backend/replication/slot.c              |  2 +-
 src/backend/replication/walreceiver.c       |  2 +-
 src/backend/replication/walsender.c         |  8 +++
 src/backend/storage/ipc/procarray.c         | 61 ++++++++++++++++---
 src/backend/storage/ipc/standby.c           | 60 +++++++++++++++++--
 src/bin/pg_controldata/pg_controldata.c     |  2 +
 src/include/access/transam.h                |  5 ++
 src/include/catalog/pg_control.h            |  1 +
 src/include/storage/procarray.h             |  3 +-
 src/include/storage/standby.h               |  8 ++-
 src/include/storage/standbydefs.h           |  1 +
 src/test/recovery/t/006_logical_decoding.pl | 93 +++++++++++++++++++++++++++--
 18 files changed, 269 insertions(+), 31 deletions(-)

diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index d7f65a5..d1400ec 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -812,7 +812,8 @@ logical_begin_heap_rewrite(RewriteState state)
 	if (!state->rs_logical_rewrite)
 		return;
 
-	ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
+	/* Use oldestCatalogXmin here */
+	ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin, NULL);
 
 	/*
 	 * If there are no logical slots in progress we don't need to do anything,
diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index 278546a..4aaae59 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -21,10 +21,11 @@ standby_desc_running_xacts(StringInfo buf, xl_running_xacts *xlrec)
 {
 	int			i;
 
-	appendStringInfo(buf, "nextXid %u latestCompletedXid %u oldestRunningXid %u",
+	appendStringInfo(buf, "nextXid %u latestCompletedXid %u oldestRunningXid %u oldestCatalogXmin %u",
 					 xlrec->nextXid,
 					 xlrec->latestCompletedXid,
-					 xlrec->oldestRunningXid);
+					 xlrec->oldestRunningXid,
+					 xlrec->oldestCatalogXmin);
 	if (xlrec->xcnt > 0)
 	{
 		appendStringInfo(buf, "; %d xacts:", xlrec->xcnt);
diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index 5f07eb1..a66cfc6 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -47,7 +47,7 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
 						 "tli %u; prev tli %u; fpw %s; xid %u:%u; oid %u; multi %u; offset %u; "
 						 "oldest xid %u in DB %u; oldest multi %u in DB %u; "
 						 "oldest/newest commit timestamp xid: %u/%u; "
-						 "oldest running xid %u; %s",
+						 "oldest running xid %u; oldest catalog xmin %u; %s",
 				(uint32) (checkpoint->redo >> 32), (uint32) checkpoint->redo,
 						 checkpoint->ThisTimeLineID,
 						 checkpoint->PrevTimeLineID,
@@ -63,6 +63,7 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
 						 checkpoint->oldestCommitTsXid,
 						 checkpoint->newestCommitTsXid,
 						 checkpoint->oldestActiveXid,
+						 checkpoint->oldestCatalogXmin,
 				 (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
 	}
 	else if (info == XLOG_NEXTOID)
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index 5efbfbd..ffabf1c 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -414,6 +414,21 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
 	}
 }
 
+/*
+ * Set the global oldest catalog_xmin used to determine when tuples
+ * may be removed from catalogs and user-catalogs accessible from logical
+ * decoding.
+ *
+ * Only to be called from the startup process or from LogCurrentRunningXacts()
+ * which ensures the update is properly written to xlog first.
+ */
+void
+SetOldestCatalogXmin(TransactionId oldestCatalogXmin)
+{
+	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+	ShmemVariableCache->oldestCatalogXmin = oldestCatalogXmin;
+	LWLockRelease(ProcArrayLock);
+}
 
 /*
  * ForceTransactionIdLimitUpdate -- does the XID wrap-limit data need updating?
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 5d58f09..cec68b2 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5021,6 +5021,7 @@ BootStrapXLOG(void)
 	MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
 	AdvanceOldestClogXid(checkPoint.oldestXid);
 	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+	SetOldestCatalogXmin(checkPoint.oldestCatalogXmin);
 	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true);
 	SetCommitTsLimit(InvalidTransactionId, InvalidTransactionId);
 
@@ -6611,6 +6612,9 @@ StartupXLOG(void)
 	   (errmsg_internal("oldest unfrozen transaction ID: %u, in database %u",
 						checkPoint.oldestXid, checkPoint.oldestXidDB)));
 	ereport(DEBUG1,
+			(errmsg_internal("oldest catalog-only transaction ID: %u",
+							 checkPoint.oldestCatalogXmin)));
+	ereport(DEBUG1,
 			(errmsg_internal("oldest MultiXactId: %u, in database %u",
 						 checkPoint.oldestMulti, checkPoint.oldestMultiDB)));
 	ereport(DEBUG1,
@@ -6628,6 +6632,7 @@ StartupXLOG(void)
 	MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
 	AdvanceOldestClogXid(checkPoint.oldestXid);
 	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+	SetOldestCatalogXmin(checkPoint.oldestCatalogXmin);
 	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true);
 	SetCommitTsLimit(checkPoint.oldestCommitTsXid,
 					 checkPoint.newestCommitTsXid);
@@ -6908,6 +6913,8 @@ StartupXLOG(void)
 				running.subxid_overflow = false;
 				running.nextXid = checkPoint.nextXid;
 				running.oldestRunningXid = oldestActiveXID;
+				running.pendingOldestCatalogXmin
+					= checkPoint.oldestCatalogXmin;
 				latestCompletedXid = checkPoint.nextXid;
 				TransactionIdRetreat(latestCompletedXid);
 				Assert(TransactionIdIsNormal(latestCompletedXid));
@@ -8786,7 +8793,16 @@ CreateCheckPoint(int flags)
 	 * recovery we don't need to write running xact data.
 	 */
 	if (!shutdown && XLogStandbyInfoActive())
-		LogStandbySnapshot();
+		LogStandbySnapshot(true);
+
+	/*
+	 * We must copy oldestCatalogXmin after the standby snapshot so we get any
+	 * updated value and don't clobber the value written in xl_running_xacts
+	 * with an older one in the checkpoint.
+	 */
+	LWLockAcquire(ProcArrayLock, LW_SHARED);
+	checkPoint.oldestCatalogXmin = ShmemVariableCache->oldestCatalogXmin;
+	LWLockRelease(ProcArrayLock);
 
 	START_CRIT_SECTION();
 
@@ -9670,6 +9686,8 @@ xlog_redo(XLogReaderState *record)
 			running.subxid_overflow = false;
 			running.nextXid = checkPoint.nextXid;
 			running.oldestRunningXid = oldestActiveXID;
+			running.pendingOldestCatalogXmin =
+				checkPoint.oldestCatalogXmin;
 			latestCompletedXid = checkPoint.nextXid;
 			TransactionIdRetreat(latestCompletedXid);
 			Assert(TransactionIdIsNormal(latestCompletedXid));
@@ -9729,8 +9747,10 @@ xlog_redo(XLogReaderState *record)
 							   checkPoint.oldestMultiDB);
 		if (TransactionIdPrecedes(ShmemVariableCache->oldestXid,
 								  checkPoint.oldestXid))
-			SetTransactionIdLimit(checkPoint.oldestXid,
-								  checkPoint.oldestXidDB);
+			SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+
+		SetOldestCatalogXmin(checkPoint.oldestCatalogXmin);
+
 		/* ControlFile->checkPointCopy always tracks the latest ckpt XID */
 		ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
 		ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c
index dcb4cf2..258c955 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -330,7 +330,7 @@ BackgroundWriterMain(void)
 			if (now >= timeout &&
 				last_snapshot_lsn < GetLastImportantRecPtr())
 			{
-				last_snapshot_lsn = LogStandbySnapshot();
+				last_snapshot_lsn = LogStandbySnapshot(false);
 				last_snapshot_ts = now;
 			}
 		}
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 6c5ec7a..605990f 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -947,7 +947,7 @@ ReplicationSlotReserveWal(void)
 			slot->data.restart_lsn = GetXLogInsertRecPtr();
 
 			/* make sure we have enough information to start */
-			flushptr = LogStandbySnapshot();
+			flushptr = LogStandbySnapshot(false);
 
 			/* and make sure it's fsynced to disk */
 			XLogFlush(flushptr);
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index df93265..277f196 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1234,7 +1234,7 @@ XLogWalRcvSendHSFeedback(bool immed)
 		xmin = GetOldestXmin(NULL,
 							 PROCARRAY_FLAGS_DEFAULT|PROCARRAY_SLOTS_XMIN);
 
-		ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
+		ProcArrayGetReplicationSlotXmin(&slot_xmin, NULL, &catalog_xmin);
 
 		if (TransactionIdIsValid(slot_xmin) &&
 			TransactionIdPrecedes(slot_xmin, xmin))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index cfc3fba..7c46e24 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1778,6 +1778,14 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbac
 		slot->data.xmin = feedbackXmin;
 		slot->effective_xmin = feedbackXmin;
 	}
+	/*
+	 * If the physical slot is relaying catalog_xmin for logical replication
+	 * slots on the replica it's safe to act on catalog_xmin advances
+	 * immediately too. The replica will only send a new catalog_xmin via
+	 * feedback when it advances its effective_catalog_xmin, so it's done the
+	 * delay-until-confirmed dance for us and knows it won't need the data
+	 * we're protecting from vacuum again.
+	 */
 	if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
 		!TransactionIdIsNormal(feedbackCatalogXmin) ||
 		TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 7c2e1e1..898a5ca 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -87,7 +87,11 @@ typedef struct ProcArrayStruct
 
 	/* oldest xmin of any replication slot */
 	TransactionId replication_slot_xmin;
-	/* oldest catalog xmin of any replication slot */
+	/*
+	 * Oldest catalog xmin of any replication slot
+	 *
+	 * See also ShmemVariableCache->oldestGlobalXmin
+	 */
 	TransactionId replication_slot_catalog_xmin;
 
 	/* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */
@@ -679,6 +683,15 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
 	ExpireOldKnownAssignedTransactionIds(running->oldestRunningXid);
 
 	/*
+	 * Update our knowledge of the oldest xid we can safely create historic
+	 * snapshots for.
+	 *
+	 * If we allow logical decoding on standbys in future we must raise
+	 * recovery conflicts with catalog_xmin advances here.
+	 */
+	SetOldestCatalogXmin(running->pendingOldestCatalogXmin);
+
+	/*
 	 * Remove stale locks, if any.
 	 *
 	 * Locks are always assigned to the toplevel xid so we don't need to care
@@ -1306,6 +1319,9 @@ TransactionIdIsActive(TransactionId xid)
  * The return value is also adjusted with vacuum_defer_cleanup_age, so
  * increasing that setting on the fly is another easy way to make
  * GetOldestXmin() move backwards, with no consequences for data integrity.
+ *
+ * When changing GetOldestXmin, check to see whether RecentGlobalXmin
+ * computation in GetSnapshotData also needs changing.
  */
 TransactionId
 GetOldestXmin(Relation rel, int flags)
@@ -1700,7 +1716,7 @@ GetSnapshotData(Snapshot snapshot)
 
 	/* fetch into volatile var while ProcArrayLock is held */
 	replication_slot_xmin = procArray->replication_slot_xmin;
-	replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
+	replication_slot_catalog_xmin = ShmemVariableCache->oldestCatalogXmin;
 
 	if (!TransactionIdIsValid(MyPgXact->xmin))
 		MyPgXact->xmin = TransactionXmin = xmin;
@@ -1711,6 +1727,9 @@ GetSnapshotData(Snapshot snapshot)
 	 * Update globalxmin to include actual process xids.  This is a slightly
 	 * different way of computing it than GetOldestXmin uses, but should give
 	 * the same result.
+	 *
+	 * If you change computation of RecentGlobalXmin here you may need to
+	 * change GetOldestXmin(...) as well.
 	 */
 	if (TransactionIdPrecedes(xmin, globalxmin))
 		globalxmin = xmin;
@@ -2041,12 +2060,16 @@ GetRunningTransactionData(void)
 	}
 
 	/*
-	 * It's important *not* to include the limits set by slots here because
+	 * It's important *not* to include the xmin set by slots here because
 	 * snapbuild.c uses oldestRunningXid to manage its xmin horizon. If those
 	 * were to be included here the initial value could never increase because
-	 * of a circular dependency where slots only increase their limits when
-	 * running xacts increases oldestRunningXid and running xacts only
+	 * of a circular dependency where slots only increase their xmin limits
+	 * when running xacts increases oldestRunningXid and running xacts only
 	 * increases if slots do.
+	 *
+	 * We can safely report the catalog_xmin limit for replication slots here
+	 * because it's only used to advance oldestCatalogXmin. Slots' catalog_xmin
+	 * advance does not depend on it so there's no circularity.
 	 */
 
 	CurrentRunningXacts->xcnt = count - subcount;
@@ -2055,6 +2078,8 @@ GetRunningTransactionData(void)
 	CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid;
 	CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
 	CurrentRunningXacts->latestCompletedXid = latestCompletedXid;
+	CurrentRunningXacts->pendingOldestCatalogXmin =
+		procArray->replication_slot_catalog_xmin;
 
 	Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid));
 	Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid));
@@ -2171,6 +2196,13 @@ GetOldestSafeDecodingTransactionId(void)
 	 * If there's already a slot pegging the xmin horizon, we can start with
 	 * that value, it's guaranteed to be safe since it's computed by this
 	 * routine initially and has been enforced since.
+	 *
+	 * We don't use ShmemVariableCache->oldestCatalogXmin here because another
+	 * backend may have already logged its intention to advance it to a higher
+	 * value (still <= replication_slot_catalog_xmin) and just be waiting on
+	 * ProcArrayLock to actually apply the change. On a standby
+	 * replication_slot_catalog_xmin is what the walreceiver will be sending in
+	 * hot_standby_feedback, not oldestCatalogXmin.
 	 */
 	if (TransactionIdIsValid(procArray->replication_slot_catalog_xmin) &&
 		TransactionIdPrecedes(procArray->replication_slot_catalog_xmin,
@@ -2965,18 +2997,31 @@ ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin,
  *
  * Return the current slot xmin limits. That's useful to be able to remove
  * data that's older than those limits.
+ *
+ * For logical replication slots' catalog_xmins, we return both the effective
+ * catalog_xmin being used for tuple removal (retained catalog_xmin) and the
+ * catalog_xmin actually needed by replication slots (needed_catalog_xmin).
+ *
+ * retained_catalog_xmin should be older than needed_catalog_xmin but is not
+ * guaranteed to be if there are replication slots on a replica currently
+ * attempting to start up and reserve catalogs, outdated replicas sending
+ * feedback, etc.
  */
 void
 ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
-								TransactionId *catalog_xmin)
+								TransactionId *retained_catalog_xmin,
+								TransactionId *needed_catalog_xmin)
 {
 	LWLockAcquire(ProcArrayLock, LW_SHARED);
 
 	if (xmin != NULL)
 		*xmin = procArray->replication_slot_xmin;
 
-	if (catalog_xmin != NULL)
-		*catalog_xmin = procArray->replication_slot_catalog_xmin;
+	if (retained_catalog_xmin != NULL)
+		*retained_catalog_xmin = ShmemVariableCache->oldestCatalogXmin;
+
+	if (needed_catalog_xmin != NULL)
+		*needed_catalog_xmin = procArray->replication_slot_catalog_xmin;
 
 	LWLockRelease(ProcArrayLock);
 }
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 8e57f93..7f73180 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -45,6 +45,7 @@ static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlis
 static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
 static XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
 static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
+static void UpdateOldestCatalogXmin(TransactionId pendingOldestCatalogXmin);
 
 
 /*
@@ -822,6 +823,7 @@ standby_redo(XLogReaderState *record)
 		running.latestCompletedXid = xlrec->latestCompletedXid;
 		running.oldestRunningXid = xlrec->oldestRunningXid;
 		running.xids = xlrec->xids;
+		running.pendingOldestCatalogXmin = xlrec->oldestCatalogXmin;
 
 		ProcArrayApplyRecoveryInfo(&running);
 	}
@@ -906,7 +908,7 @@ standby_redo(XLogReaderState *record)
  * Returns the RecPtr of the last inserted record.
  */
 XLogRecPtr
-LogStandbySnapshot(void)
+LogStandbySnapshot(bool in_checkpoint)
 {
 	XLogRecPtr	recptr;
 	RunningTransactions running;
@@ -924,6 +926,17 @@ LogStandbySnapshot(void)
 	pfree(locks);
 
 	/*
+	 * We must lock out concurrent checkpoints so that a checkpoint doesn't
+	 * copy oldestCatalogXmin after we've written a pending new value to xlog
+	 * but before we've updated it in shmem. Otherwise a standby will get the
+	 * old value after replaying the checkpoint.
+	 */
+	if (in_checkpoint)
+		Assert(LWLockHeldByMe(CheckpointLock));
+	else
+		LWLockAcquire(CheckpointLock, LW_SHARED);
+
+	/*
 	 * Log details of all in-progress transactions. This should be the last
 	 * record we write, because standby will open up when it sees this.
 	 */
@@ -953,12 +966,29 @@ LogStandbySnapshot(void)
 	/* GetRunningTransactionData() acquired XidGenLock, we must release it */
 	LWLockRelease(XidGenLock);
 
+	/*
+	 * Now that we've recorded our intention to allow cleanup of catalog tuples
+	 * no longer needed by our replication slots we can make the new threshold
+	 * effective for vacuum etc.
+	 */
+	UpdateOldestCatalogXmin(running->pendingOldestCatalogXmin);
+
+	if (!in_checkpoint)
+		LWLockRelease(CheckpointLock);
+
 	return recptr;
 }
 
 /*
  * Record an enhanced snapshot of running transactions into WAL.
  *
+ * We also record the value of procArray->replication_slot_catalog_xmin
+ * obtained from GetRunningTransactionData here. We intend to advance
+ * ShmemVariableCache->oldestCatalogXmin to it once standbys have been informed
+ * of the new value, which will permit removal of previously-protected dead
+ * catalog tuples. The standby needs to know about that before any WAL
+ * records containing such tuple removals could possibly arrive.
+ *
  * The definitions of RunningTransactionsData and xl_xact_running_xacts are
  * similar. We keep them separate because xl_xact_running_xacts is a
  * contiguous chunk of memory and never exists fully until it is assembled in
@@ -977,6 +1007,7 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
 	xlrec.nextXid = CurrRunningXacts->nextXid;
 	xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid;
 	xlrec.latestCompletedXid = CurrRunningXacts->latestCompletedXid;
+	xlrec.oldestCatalogXmin = CurrRunningXacts->pendingOldestCatalogXmin;
 
 	/* Header */
 	XLogBeginInsert();
@@ -992,20 +1023,22 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
 
 	if (CurrRunningXacts->subxid_overflow)
 		elog(trace_recovery(DEBUG2),
-			 "snapshot of %u running transactions overflowed (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
+			 "snapshot of %u running transactions overflowed (lsn %X/%X oldest xid %u latest complete %u next xid %u oldestCatalogXmin %u)",
 			 CurrRunningXacts->xcnt,
 			 (uint32) (recptr >> 32), (uint32) recptr,
 			 CurrRunningXacts->oldestRunningXid,
 			 CurrRunningXacts->latestCompletedXid,
-			 CurrRunningXacts->nextXid);
+			 CurrRunningXacts->nextXid,
+			 CurrRunningXacts->pendingOldestCatalogXmin);
 	else
 		elog(trace_recovery(DEBUG2),
-			 "snapshot of %u+%u running transaction ids (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
+			 "snapshot of %u+%u running transaction ids (lsn %X/%X oldest xid %u latest complete %u next xid %u oldestCatalogXmin %u)",
 			 CurrRunningXacts->xcnt, CurrRunningXacts->subxcnt,
 			 (uint32) (recptr >> 32), (uint32) recptr,
 			 CurrRunningXacts->oldestRunningXid,
 			 CurrRunningXacts->latestCompletedXid,
-			 CurrRunningXacts->nextXid);
+			 CurrRunningXacts->nextXid,
+			 CurrRunningXacts->pendingOldestCatalogXmin);
 
 	/*
 	 * Ensure running_xacts information is synced to disk not too far in the
@@ -1022,6 +1055,23 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
 }
 
 /*
+ * Advance the oldestCatalogXmin used for removal of dead catalog tuples to the
+ * lowest catalog_xmin threshold of any current replication slots.
+ *
+ * Should only be called during a checkpoint or after writing an xlog record to
+ * record the advance.
+ */
+static void
+UpdateOldestCatalogXmin(TransactionId pendingOldestCatalogXmin)
+{
+	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+	if ((TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin) != TransactionIdIsValid(pendingOldestCatalogXmin))
+		|| TransactionIdPrecedes(ShmemVariableCache->oldestCatalogXmin, pendingOldestCatalogXmin))
+		ShmemVariableCache->oldestCatalogXmin = pendingOldestCatalogXmin;
+	LWLockRelease(ProcArrayLock);
+}
+
+/*
  * Wholesale logging of AccessExclusiveLocks. Other lock types need not be
  * logged, as described in backend/storage/lmgr/README.
  */
diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c
index 2ea8931..5c7eb77 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -248,6 +248,8 @@ main(int argc, char *argv[])
 		   ControlFile->checkPointCopy.oldestCommitTsXid);
 	printf(_("Latest checkpoint's newestCommitTsXid:%u\n"),
 		   ControlFile->checkPointCopy.newestCommitTsXid);
+	printf(_("Latest checkpoint's oldestCatalogXmin:%u\n"),
+		   ControlFile->checkPointCopy.oldestCatalogXmin);
 	printf(_("Time of latest checkpoint:            %s\n"),
 		   ckpttime_str);
 	printf(_("Fake LSN counter for unlogged rels:   %X/%X\n"),
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index d25a2dd..0123fc8 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -134,6 +134,10 @@ typedef struct VariableCacheData
 	 */
 	TransactionId latestCompletedXid;	/* newest XID that has committed or
 										 * aborted */
+	TransactionId oldestCatalogXmin;	/* oldestCatalogXmin guarantees that no
+										 * valid catalog tuples >= than it are
+										 * removed. That property is used for
+										 * logical decoding. */
 
 	/*
 	 * These fields are protected by CLogTruncationLock
@@ -179,6 +183,7 @@ extern TransactionId GetNewTransactionId(bool isSubXact);
 extern TransactionId ReadNewTransactionId(void);
 extern void SetTransactionIdLimit(TransactionId oldest_datfrozenxid,
 					  Oid oldest_datoid);
+extern void SetOldestCatalogXmin(TransactionId oldestCatalogXmin);
 extern void AdvanceOldestClogXid(TransactionId oldest_datfrozenxid);
 extern bool ForceTransactionIdLimitUpdate(void);
 extern Oid	GetNewObjectId(void);
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index 3a25cc8..1fe89ae 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -45,6 +45,7 @@ typedef struct CheckPoint
 	MultiXactOffset nextMultiOffset;	/* next free MultiXact offset */
 	TransactionId oldestXid;	/* cluster-wide minimum datfrozenxid */
 	Oid			oldestXidDB;	/* database with minimum datfrozenxid */
+	TransactionId oldestCatalogXmin;	/* catalog retained after this xid */
 	MultiXactId oldestMulti;	/* cluster-wide minimum datminmxid */
 	Oid			oldestMultiDB;	/* database with minimum datminmxid */
 	pg_time_t	time;			/* time stamp of checkpoint */
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 9b42e49..05ace64 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -120,6 +120,7 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin,
 							TransactionId catalog_xmin, bool already_locked);
 
 extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
-								TransactionId *catalog_xmin);
+								TransactionId *retained_catalog_xmin,
+								TransactionId *needed_catalog_xmin);
 
 #endif   /* PROCARRAY_H */
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 3ecc446..4728fa5 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -65,6 +65,10 @@ extern void StandbyReleaseOldLocks(int nxids, TransactionId *xids);
  * is written to WAL as a separate record immediately after each
  * checkpoint. That means that wherever we start a standby from we will
  * almost immediately see the data we need to begin executing queries.
+ *
+ * Information about the oldest catalog_xmin needed by any replication slot is
+ * also included here, so we can use it to update the catalog tuple removal
+ * limit and convey the new limit to standbys.
  */
 
 typedef struct RunningTransactionsData
@@ -75,6 +79,8 @@ typedef struct RunningTransactionsData
 	TransactionId nextXid;		/* copy of ShmemVariableCache->nextXid */
 	TransactionId oldestRunningXid;		/* *not* oldestXmin */
 	TransactionId latestCompletedXid;	/* so we can set xmax */
+	/* so we can update ShmemVariableCache->oldestCatalogXmin: */
+	TransactionId pendingOldestCatalogXmin;
 
 	TransactionId *xids;		/* array of (sub)xids still running */
 } RunningTransactionsData;
@@ -84,7 +90,7 @@ typedef RunningTransactionsData *RunningTransactions;
 extern void LogAccessExclusiveLock(Oid dbOid, Oid relOid);
 extern void LogAccessExclusiveLockPrepare(void);
 
-extern XLogRecPtr LogStandbySnapshot(void);
+extern XLogRecPtr LogStandbySnapshot(bool in_checkpoint);
 extern void LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
 						bool relcacheInitFileInval);
 
diff --git a/src/include/storage/standbydefs.h b/src/include/storage/standbydefs.h
index f8444c7..6153675 100644
--- a/src/include/storage/standbydefs.h
+++ b/src/include/storage/standbydefs.h
@@ -52,6 +52,7 @@ typedef struct xl_running_xacts
 	TransactionId nextXid;		/* copy of ShmemVariableCache->nextXid */
 	TransactionId oldestRunningXid;		/* *not* oldestXmin */
 	TransactionId latestCompletedXid;	/* so we can set xmax */
+	TransactionId oldestCatalogXmin;	/* oldest safe historic snapshot */
 
 	TransactionId xids[FLEXIBLE_ARRAY_MEMBER];
 } xl_running_xacts;
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index bf9b50a..a1c92c8 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -7,24 +7,79 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 16;
+use Test::More tests => 44;
 
 # Initialize master node
 my $node_master = get_new_node('master');
 $node_master->init(allows_streaming => 1);
-$node_master->append_conf(
-		'postgresql.conf', qq(
+$node_master->append_conf('postgresql.conf', qq(
 wal_level = logical
+hot_standby_feedback = on
+wal_receiver_status_interval = 1
+log_min_messages = debug1
 ));
 $node_master->start;
-my $backup_name = 'master_backup';
 
+# Set up some changes before we make base backups
 $node_master->safe_psql('postgres', qq[CREATE TABLE decoding_test(x integer, y text);]);
 
 $node_master->safe_psql('postgres', qq[SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');]);
 
 $node_master->safe_psql('postgres', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]);
 
+# Launch two streaming replicas, one with and one without
+# physical replication slots. We'll use these for tests
+# involving interaction of logical and physical standby.
+#
+# Both backups are created with pg_basebackup.
+#
+my $backup_name = 'master_backup';
+$node_master->backup($backup_name);
+
+$node_master->safe_psql('postgres', q[SELECT pg_create_physical_replication_slot('slot_replica');]);
+my $node_slot_replica = get_new_node('slot_replica');
+$node_slot_replica->init_from_backup($node_master, $backup_name, has_streaming => 1);
+$node_slot_replica->append_conf('recovery.conf', "primary_slot_name = 'slot_replica'");
+
+my $node_noslot_replica = get_new_node('noslot_replica');
+$node_noslot_replica->init_from_backup($node_master, $backup_name, has_streaming => 1);
+
+$node_slot_replica->start;
+$node_noslot_replica->start;
+
+sub restartpoint_standbys
+{
+	# Force restartpoints to update control files on replicas
+	$node_slot_replica->safe_psql('postgres', 'CHECKPOINT');
+	$node_noslot_replica->safe_psql('postgres', 'CHECKPOINT');
+}
+
+sub wait_standbys
+{
+	my $lsn = $node_master->lsn('insert');
+	$node_master->wait_for_catchup($node_noslot_replica, 'replay', $lsn);
+	$node_master->wait_for_catchup($node_slot_replica, 'replay', $lsn);
+}
+
+# pg_basebackup doesn't copy replication slots
+is($node_slot_replica->slot('test_slot')->{'slot_name'}, undef,
+	'logical slot test_slot on master not copied by pg_basebackup');
+
+# Make sure oldestCatalogXmin lands in the control file on master
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+my @nodes = ($node_master, $node_slot_replica, $node_noslot_replica);
+
+wait_standbys();
+restartpoint_standbys();
+foreach my $node (@nodes)
+{
+	# Master had an oldestCatalogXmin, so we must've inherited it via checkpoint
+	command_like(['pg_controldata', $node->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:[^0][\d]*$/m,
+		"pg_controldata's oldestCatalogXmin is nonzero after start on " . $node->name);
+}
+
 # Basic decoding works
 my($result) = $node_master->safe_psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]);
 is(scalar(my @foobar = split /^/m, $result), 12, 'Decoding produced 12 rows inc BEGIN/COMMIT');
@@ -64,6 +119,9 @@ $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpo
 chomp($stdout_recv);
 is($stdout_recv, '', 'pg_recvlogical acknowledged changes, nothing pending on slot');
 
+# Create a second DB we'll use for testing dropping and accessing slots across
+# databases. This matters since logical slots are globally visible objects that
+# can only actually be used on one DB for most purposes.
 $node_master->safe_psql('postgres', 'CREATE DATABASE otherdb');
 
 is($node_master->psql('otherdb', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;"), 3,
@@ -96,9 +154,32 @@ isnt($node_master->slot('test_slot')->{'catalog_xmin'}, '0',
 	'restored slot catalog_xmin is nonzero');
 is($node_master->psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]), 3,
 	'reading from slot with wal_level < logical fails');
+wait_standbys();
+restartpoint_standbys();
+foreach my $node (@nodes)
+{
+	command_like(['pg_controldata', $node->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:[^0][\d]*$/m,
+		"pg_controldata's oldestCatalogXmin is nonzero on " . $node->name);
+}
+
+# Dropping the slot must clear catalog_xmin
 is($node_master->psql('postgres', q[SELECT pg_drop_replication_slot('test_slot')]), 0,
 	'can drop logical slot while wal_level = replica');
 is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped');
+$node_master->safe_psql('postgres', 'VACUUM;');
+# First checkpoint forces xl_running_xacts with the new oldestCatalogXmin
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+# Then we need a second checkpoint to write the control file with the new value
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+wait_standbys();
+restartpoint_standbys();
+foreach my $node (@nodes)
+{
+	command_like(['pg_controldata', $node->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:0$/m,
+		"pg_controldata's oldestCatalogXmin is zero after drop, vacuum and checkpoint on " . $node->name);
+}
 
-# done with the node
-$node_master->stop;
+foreach my $node (@nodes)
+{
+	$node->stop;
+}
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to