On 30 March 2017 at 11:34, Craig Ringer <cr...@2ndquadrant.com> wrote:
> On 29 March 2017 at 23:13, Simon Riggs <simon.ri...@2ndquadrant.com> wrote:
>> On 29 March 2017 at 10:17, Craig Ringer <cr...@2ndquadrant.com> wrote:
>>> On 29 March 2017 at 16:44, Craig Ringer <cr...@2ndquadrant.com> wrote:
>>>
>>>> * Split oldestCatalogXmin tracking into separate patch
>>>
>>> Regarding this, Simon raised concerns about xlog volume here.
>>>
>>> It's pretty negligible.
>>>
>>> We only write a new record when a vacuum runs after catalog_xmin
>>> advances on the slot with the currently-lowest catalog_xmin (or, if
>>> vacuum doesn't run reasonably soon, when the bgworker next looks).
>>
>> I'd prefer to slow things down a little, not be so eager.
>>
>> If we hold back update of the catalog_xmin until when we run
>> GetRunningTransactionData() we wouldn't need to produce any WAL
>> records at all AND we wouldn't need to have VACUUM do
>> UpdateOldestCatalogXmin(). Bgwriter wouldn't need to perform an extra
>> task.
>>
>> That would also make this patch about half the length it is.
>>
>> Let me know what you think.
>
> Good idea.
>
> We can always add a heuristic later to make xl_running_xacts get
> emitted more often at high transaction rates if it's necessary.
>
> Patch coming soon.

Attached.

A bit fiddlier than expected, but I like the result more.

In the process I identified an issue with both the prior patch and
this one where we don't check slot validity for slots that existed on
standby prior to promotion of standby to master. We were just assuming
that being the master was good enough, since it controls
replication_slot_catalog_xmin, but that's not true for pre-existing
slots.

Fixed by forcing update of the persistent safe catalog xmin after the
first slot is created on the master - which is now done by doing an
immediate LogStandbySnapshot() after assigning the slot's
catalog_xmin.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From 0df4f4ae04f8d37c623d3a533699966c3cc0479a Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Wed, 22 Mar 2017 13:36:49 +0800
Subject: [PATCH v2] Log catalog_xmin advances before removing catalog tuples

Before advancing the effective catalog_xmin we use to remove old catalog
tuple versions, make sure it is written to WAL. This allows standbys
to know the oldest xid they can safely create a historic snapshot for.
They can then refuse to start decoding from a slot or raise a recovery
conflict.

The catalog_xmin advance is logged in the next xl_running_xacts records, so
vacuum of catalogs may be held back up to 10 seconds when a replication slot
with catalog_xmin is holding down the global catalog_xmin.
---
 src/backend/access/heap/rewriteheap.c       |  3 +-
 src/backend/access/rmgrdesc/standbydesc.c   |  5 ++-
 src/backend/access/transam/varsup.c         |  1 -
 src/backend/access/transam/xlog.c           | 26 ++++++++++-
 src/backend/replication/logical/logical.c   | 54 +++++++++++++++++++++++
 src/backend/replication/walreceiver.c       |  2 +-
 src/backend/replication/walsender.c         | 13 ++++++
 src/backend/storage/ipc/procarray.c         | 68 +++++++++++++++++++++++------
 src/backend/storage/ipc/standby.c           | 25 +++++++++++
 src/bin/pg_controldata/pg_controldata.c     |  2 +
 src/include/access/transam.h                | 11 +++++
 src/include/catalog/pg_control.h            |  1 +
 src/include/storage/procarray.h             |  3 +-
 src/include/storage/standby.h               |  6 +++
 src/include/storage/standbydefs.h           |  1 +
 src/test/recovery/t/006_logical_decoding.pl | 15 ++++++-
 16 files changed, 214 insertions(+), 22 deletions(-)

diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index d7f65a5..d1400ec 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -812,7 +812,8 @@ logical_begin_heap_rewrite(RewriteState state)
 	if (!state->rs_logical_rewrite)
 		return;
 
-	ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
+	/* Use oldestCatalogXmin here */
+	ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin, NULL);
 
 	/*
 	 * If there are no logical slots in progress we don't need to do anything,
diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index 278546a..4aaae59 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -21,10 +21,11 @@ standby_desc_running_xacts(StringInfo buf, xl_running_xacts *xlrec)
 {
 	int			i;
 
-	appendStringInfo(buf, "nextXid %u latestCompletedXid %u oldestRunningXid %u",
+	appendStringInfo(buf, "nextXid %u latestCompletedXid %u oldestRunningXid %u oldestCatalogXmin %u",
 					 xlrec->nextXid,
 					 xlrec->latestCompletedXid,
-					 xlrec->oldestRunningXid);
+					 xlrec->oldestRunningXid,
+					 xlrec->oldestCatalogXmin);
 	if (xlrec->xcnt > 0)
 	{
 		appendStringInfo(buf, "; %d xacts:", xlrec->xcnt);
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index 5efbfbd..4babdf9 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -414,7 +414,6 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
 	}
 }
 
-
 /*
  * ForceTransactionIdLimitUpdate -- does the XID wrap-limit data need updating?
  *
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 5d58f09..19e0116 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5021,6 +5021,7 @@ BootStrapXLOG(void)
 	MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
 	AdvanceOldestClogXid(checkPoint.oldestXid);
 	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+	ShmemVariableCache->oldestCatalogXmin = checkPoint.oldestCatalogXmin;
 	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true);
 	SetCommitTsLimit(InvalidTransactionId, InvalidTransactionId);
 
@@ -6611,6 +6612,9 @@ StartupXLOG(void)
 	   (errmsg_internal("oldest unfrozen transaction ID: %u, in database %u",
 						checkPoint.oldestXid, checkPoint.oldestXidDB)));
 	ereport(DEBUG1,
+			(errmsg_internal("oldest catalog-only transaction ID: %u",
+							 checkPoint.oldestCatalogXmin)));
+	ereport(DEBUG1,
 			(errmsg_internal("oldest MultiXactId: %u, in database %u",
 						 checkPoint.oldestMulti, checkPoint.oldestMultiDB)));
 	ereport(DEBUG1,
@@ -6628,6 +6632,7 @@ StartupXLOG(void)
 	MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
 	AdvanceOldestClogXid(checkPoint.oldestXid);
 	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+	ShmemVariableCache->oldestCatalogXmin = checkPoint.oldestCatalogXmin;
 	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true);
 	SetCommitTsLimit(checkPoint.oldestCommitTsXid,
 					 checkPoint.newestCommitTsXid);
@@ -8704,6 +8709,10 @@ CreateCheckPoint(int flags)
 	checkPoint.oldestXidDB = ShmemVariableCache->oldestXidDB;
 	LWLockRelease(XidGenLock);
 
+	LWLockAcquire(ProcArrayLock, LW_SHARED);
+	checkPoint.oldestCatalogXmin = ShmemVariableCache->oldestCatalogXmin;
+	LWLockRelease(ProcArrayLock);
+
 	LWLockAcquire(CommitTsLock, LW_SHARED);
 	checkPoint.oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
 	checkPoint.newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
@@ -9633,6 +9642,12 @@ xlog_redo(XLogReaderState *record)
 		SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
 
 		/*
+		 * There can be no concurrent writers to oldestCatalogXmin during
+		 * recovery, so no need to take ProcArrayLock.
+		 */
+		ShmemVariableCache->oldestCatalogXmin = checkPoint.oldestCatalogXmin;
+
+		/*
 		 * If we see a shutdown checkpoint while waiting for an end-of-backup
 		 * record, the backup was canceled and the end-of-backup record will
 		 * never arrive.
@@ -9729,8 +9744,15 @@ xlog_redo(XLogReaderState *record)
 							   checkPoint.oldestMultiDB);
 		if (TransactionIdPrecedes(ShmemVariableCache->oldestXid,
 								  checkPoint.oldestXid))
-			SetTransactionIdLimit(checkPoint.oldestXid,
-								  checkPoint.oldestXidDB);
+			SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+
+
+		/*
+		 * There can be no concurrent writers to oldestCatalogXmin during
+		 * recovery, so no need to take ProcArrayLock.
+		 */
+		ShmemVariableCache->oldestCatalogXmin = checkPoint.oldestCatalogXmin;
+
 		/* ControlFile->checkPointCopy always tracks the latest ckpt XID */
 		ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
 		ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 5529ac8..76155bf 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -68,6 +68,8 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
 
+static void EnsureActiveLogicalSlotValid(void);
+
 /*
  * Make sure the current settings & environment are capable of doing logical
  * decoding.
@@ -218,6 +220,7 @@ CreateInitDecodingContext(char *plugin,
 	ReplicationSlot *slot;
 	LogicalDecodingContext *ctx;
 	MemoryContext old_context;
+	bool force_standby_snapshot;
 
 	/* shorter lines... */
 	slot = MyReplicationSlot;
@@ -276,8 +279,21 @@ CreateInitDecodingContext(char *plugin,
 
 	ReplicationSlotsComputeRequiredXmin(true);
 
+	/*
+	 * If this is the first slot created on the master we won't have a
+	 * persistent record of the oldest safe xid for historic snapshots yet.
+	 * Force one to be recorded so that when we go to replay from this slot we
+	 * know it's safe.
+	 */
+	force_standby_snapshot =
+		!TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin);
+
 	LWLockRelease(ProcArrayLock);
 
+	/* Update ShmemVariableCache->oldestCatalogXmin */
+	if (force_standby_snapshot)
+		LogStandbySnapshot();
+
 	/*
 	 * tell the snapshot builder to only assemble snapshot once reaching the
 	 * running_xact's record with the respective xmin.
@@ -376,6 +392,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 		start_lsn = slot->data.confirmed_flush;
 	}
 
+	EnsureActiveLogicalSlotValid();
+
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId,
 								 read_page, prepare_write, do_write);
@@ -963,3 +981,39 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 		SpinLockRelease(&MyReplicationSlot->mutex);
 	}
 }
+
+/*
+ * Test to see if the active logical slot is usable.
+ */
+static void
+EnsureActiveLogicalSlotValid(void)
+{
+	TransactionId shmem_catalog_xmin;
+
+	Assert(MyReplicationSlot != NULL);
+
+	/*
+	 * A logical slot can become unusable if we're doing logical decoding on a
+	 * standby or using a slot created before we were promoted from standby
+	 * to master. If the master advanced its global catalog_xmin past the
+	 * threshold we need it could've removed catalog tuple versions that
+	 * we'll require to start decoding at our restart_lsn.
+	 *
+	 * We need a barrier so that if we decode in recovery on a standby we
+	 * don't allow new decoding sessions to start after redo has advanced
+	 * the threshold.
+	 */
+	if (RecoveryInProgress())
+		pg_memory_barrier();
+
+	shmem_catalog_xmin = ShmemVariableCache->oldestCatalogXmin;
+
+	if (!TransactionIdIsValid(shmem_catalog_xmin) ||
+		TransactionIdFollows(shmem_catalog_xmin, MyReplicationSlot->data.catalog_xmin))
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("replication slot '%s' requires catalogs removed by master",
+						NameStr(MyReplicationSlot->data.name)),
+				 errdetail("need catalog_xmin %u, have oldestCatalogXmin %u",
+						   MyReplicationSlot->data.catalog_xmin, shmem_catalog_xmin)));
+}
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 771ac30..c2ad791 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1233,7 +1233,7 @@ XLogWalRcvSendHSFeedback(bool immed)
 		xmin = GetOldestXmin(NULL,
 							 PROCARRAY_FLAGS_DEFAULT|PROCARRAY_SLOTS_XMIN);
 
-		ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
+		ProcArrayGetReplicationSlotXmin(&slot_xmin, NULL, &catalog_xmin);
 
 		if (TransactionIdIsValid(slot_xmin) &&
 			TransactionIdPrecedes(slot_xmin, xmin))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index cfc3fba..cdc5f95 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1658,6 +1658,11 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 	 * be energy wasted - the worst lost information can do here is give us
 	 * wrong information in a statistics view - we'll just potentially be more
 	 * conservative in removing files.
+	 *
+	 * We don't have to do any effective_xmin / effective_catalog_xmin testing
+	 * here either, like for LogicalConfirmReceivedLocation. If we received
+	 * the xmin and catalog_xmin from downstream replication slots we know they
+	 * were already confirmed there,
 	 */
 }
 
@@ -1778,6 +1783,14 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbac
 		slot->data.xmin = feedbackXmin;
 		slot->effective_xmin = feedbackXmin;
 	}
+	/*
+	 * If the physical slot is relaying catalog_xmin for logical replication
+	 * slots on the replica it's safe to act on catalog_xmin advances
+	 * immediately too. The replica will only send a new catalog_xmin via
+	 * feedback when it advances its effective_catalog_xmin, so it's done the
+	 * delay-until-confirmed dance for us and knows it won't need the data
+	 * we're protecting from vacuum again.
+	 */
 	if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
 		!TransactionIdIsNormal(feedbackCatalogXmin) ||
 		TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 7c2e1e1..a5b26dd 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -87,7 +87,11 @@ typedef struct ProcArrayStruct
 
 	/* oldest xmin of any replication slot */
 	TransactionId replication_slot_xmin;
-	/* oldest catalog xmin of any replication slot */
+	/*
+	 * Oldest catalog xmin of any replication slot
+	 *
+	 * See also ShmemVariableCache->oldestGlobalXmin
+	 */
 	TransactionId replication_slot_catalog_xmin;
 
 	/* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */
@@ -679,6 +683,18 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
 	ExpireOldKnownAssignedTransactionIds(running->oldestRunningXid);
 
 	/*
+	 * Update our knowledge of the oldest xid we can safely create historic
+	 * snapshots for.
+	 *
+	 * There can be no concurrent writers to oldestCatalogXmin during
+	 * recovery, so no need to take ProcArrayLock.
+	 *
+	 * If we allow logical decoding on standbys in future we must raise
+	 * recovery conflicts with catalog_xmin advances here.
+	 */
+	ShmemVariableCache->oldestCatalogXmin = running->pendingOldestCatalogXmin;
+
+	/*
 	 * Remove stale locks, if any.
 	 *
 	 * Locks are always assigned to the toplevel xid so we don't need to care
@@ -1306,6 +1322,9 @@ TransactionIdIsActive(TransactionId xid)
  * The return value is also adjusted with vacuum_defer_cleanup_age, so
  * increasing that setting on the fly is another easy way to make
  * GetOldestXmin() move backwards, with no consequences for data integrity.
+ *
+ * When changing GetOldestXmin, check to see whether RecentGlobalXmin
+ * computation in GetSnapshotData also needs changing.
  */
 TransactionId
 GetOldestXmin(Relation rel, int flags)
@@ -1493,7 +1512,8 @@ GetMaxSnapshotSubxidCount(void)
  *			older than this are known not running any more.
  *		RecentGlobalXmin: the global xmin (oldest TransactionXmin across all
  *			running transactions, except those running LAZY VACUUM).  This is
- *			the same computation done by GetOldestXmin(true, true).
+ *			the same computation done by
+ *			GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT|PROCARRAY_FLAGS_VACUUM)
  *		RecentGlobalDataXmin: the global xmin for non-catalog tables
  *			>= RecentGlobalXmin
  *
@@ -1700,7 +1720,7 @@ GetSnapshotData(Snapshot snapshot)
 
 	/* fetch into volatile var while ProcArrayLock is held */
 	replication_slot_xmin = procArray->replication_slot_xmin;
-	replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
+	replication_slot_catalog_xmin = ShmemVariableCache->oldestCatalogXmin;
 
 	if (!TransactionIdIsValid(MyPgXact->xmin))
 		MyPgXact->xmin = TransactionXmin = xmin;
@@ -1711,6 +1731,9 @@ GetSnapshotData(Snapshot snapshot)
 	 * Update globalxmin to include actual process xids.  This is a slightly
 	 * different way of computing it than GetOldestXmin uses, but should give
 	 * the same result.
+	 *
+	 * If you change computation of RecentGlobalXmin here you may need to
+	 * change GetOldestXmin(...) as well.
 	 */
 	if (TransactionIdPrecedes(xmin, globalxmin))
 		globalxmin = xmin;
@@ -2041,12 +2064,16 @@ GetRunningTransactionData(void)
 	}
 
 	/*
-	 * It's important *not* to include the limits set by slots here because
+	 * It's important *not* to include the xmin set by slots here because
 	 * snapbuild.c uses oldestRunningXid to manage its xmin horizon. If those
 	 * were to be included here the initial value could never increase because
 	 * of a circular dependency where slots only increase their limits when
 	 * running xacts increases oldestRunningXid and running xacts only
 	 * increases if slots do.
+	 *
+	 * We can include the catalog_xmin limit here; there's no similar
+	 * circularity, and we need it to log xl_running_xacts records for
+	 * standbys.
 	 */
 
 	CurrentRunningXacts->xcnt = count - subcount;
@@ -2055,6 +2082,8 @@ GetRunningTransactionData(void)
 	CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid;
 	CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
 	CurrentRunningXacts->latestCompletedXid = latestCompletedXid;
+	CurrentRunningXacts->pendingOldestCatalogXmin =
+		procArray->replication_slot_catalog_xmin;
 
 	Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid));
 	Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid));
@@ -2168,14 +2197,14 @@ GetOldestSafeDecodingTransactionId(void)
 	oldestSafeXid = ShmemVariableCache->nextXid;
 
 	/*
-	 * If there's already a slot pegging the xmin horizon, we can start with
-	 * that value, it's guaranteed to be safe since it's computed by this
-	 * routine initially and has been enforced since.
+	 * If there's already an effectiveCatalogXmin held down by an existing
+	 * replication slot it's definitely safe to start there, and it can't
+	 * advance while we hold ProcArrayLock.
 	 */
-	if (TransactionIdIsValid(procArray->replication_slot_catalog_xmin) &&
-		TransactionIdPrecedes(procArray->replication_slot_catalog_xmin,
+	if (TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin) &&
+		TransactionIdPrecedes(ShmemVariableCache->oldestCatalogXmin,
 							  oldestSafeXid))
-		oldestSafeXid = procArray->replication_slot_catalog_xmin;
+		oldestSafeXid = ShmemVariableCache->oldestCatalogXmin;
 
 	/*
 	 * If we're not in recovery, we walk over the procarray and collect the
@@ -2965,18 +2994,31 @@ ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin,
  *
  * Return the current slot xmin limits. That's useful to be able to remove
  * data that's older than those limits.
+ *
+ * For logical replication slots' catalog_xmin, we return both the effective
+ * catalog_xmin being used for tuple removal (retained catalog_xmin) and the
+ * catalog_xmin actually needed by replication slots (needed_catalog_xmin).
+ *
+ * retained_catalog_xmin should be older than needed_catalog_xmin but is not
+ * guaranteed to be if there are replication slots on a replica currently
+ * attempting to start up and reserve catalogs, outdated replicas sending
+ * feedback, etc.
  */
 void
 ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
-								TransactionId *catalog_xmin)
+								TransactionId *retained_catalog_xmin,
+								TransactionId *needed_catalog_xmin)
 {
 	LWLockAcquire(ProcArrayLock, LW_SHARED);
 
 	if (xmin != NULL)
 		*xmin = procArray->replication_slot_xmin;
 
-	if (catalog_xmin != NULL)
-		*catalog_xmin = procArray->replication_slot_catalog_xmin;
+	if (retained_catalog_xmin != NULL)
+		*retained_catalog_xmin = ShmemVariableCache->oldestCatalogXmin;
+
+	if (needed_catalog_xmin != NULL)
+		*needed_catalog_xmin = procArray->replication_slot_catalog_xmin;
 
 	LWLockRelease(ProcArrayLock);
 }
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 8e57f93..819abf7 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -45,6 +45,7 @@ static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlis
 static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
 static XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
 static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
+static void UpdateOldestCatalogXmin(TransactionId pendingOldestCatalogXmin);
 
 
 /*
@@ -822,6 +823,7 @@ standby_redo(XLogReaderState *record)
 		running.latestCompletedXid = xlrec->latestCompletedXid;
 		running.oldestRunningXid = xlrec->oldestRunningXid;
 		running.xids = xlrec->xids;
+		running.pendingOldestCatalogXmin = xlrec->oldestCatalogXmin;
 
 		ProcArrayApplyRecoveryInfo(&running);
 	}
@@ -953,12 +955,24 @@ LogStandbySnapshot(void)
 	/* GetRunningTransactionData() acquired XidGenLock, we must release it */
 	LWLockRelease(XidGenLock);
 
+	/*
+	 * Now that we've recorded our intention to allow cleanup of catalog tuples
+	 * no longer needed by our replication slots we can make the new threshold
+	 * effective for vacuum etc.
+	 */
+	UpdateOldestCatalogXmin(running->pendingOldestCatalogXmin);
+
 	return recptr;
 }
 
 /*
  * Record an enhanced snapshot of running transactions into WAL.
  *
+ * We also record the value of procArray->replication_slot_catalog_xmin
+ * obtained from GetRunningTransactionData here, so standbys know we're about
+ * to advance ShmemVariableCache->oldestCatalogXmin to its value and start
+ * removing dead catalog tuples below that threshold.
+ *
  * The definitions of RunningTransactionsData and xl_xact_running_xacts are
  * similar. We keep them separate because xl_xact_running_xacts is a
  * contiguous chunk of memory and never exists fully until it is assembled in
@@ -977,6 +991,7 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
 	xlrec.nextXid = CurrRunningXacts->nextXid;
 	xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid;
 	xlrec.latestCompletedXid = CurrRunningXacts->latestCompletedXid;
+	xlrec.oldestCatalogXmin = CurrRunningXacts->pendingOldestCatalogXmin;
 
 	/* Header */
 	XLogBeginInsert();
@@ -1021,6 +1036,16 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
 	return recptr;
 }
 
+static void
+UpdateOldestCatalogXmin(TransactionId pendingOldestCatalogXmin)
+{
+	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+	if (TransactionIdPrecedes(ShmemVariableCache->oldestCatalogXmin, pendingOldestCatalogXmin)
+		|| (TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin) != TransactionIdIsValid(pendingOldestCatalogXmin)))
+		ShmemVariableCache->oldestCatalogXmin = pendingOldestCatalogXmin;
+	LWLockRelease(ProcArrayLock);
+}
+
 /*
  * Wholesale logging of AccessExclusiveLocks. Other lock types need not be
  * logged, as described in backend/storage/lmgr/README.
diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c
index 2ea8931..5c7eb77 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -248,6 +248,8 @@ main(int argc, char *argv[])
 		   ControlFile->checkPointCopy.oldestCommitTsXid);
 	printf(_("Latest checkpoint's newestCommitTsXid:%u\n"),
 		   ControlFile->checkPointCopy.newestCommitTsXid);
+	printf(_("Latest checkpoint's oldestCatalogXmin:%u\n"),
+		   ControlFile->checkPointCopy.oldestCatalogXmin);
 	printf(_("Time of latest checkpoint:            %s\n"),
 		   ckpttime_str);
 	printf(_("Fake LSN counter for unlogged rels:   %X/%X\n"),
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index d25a2dd..a4ecfb7 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -136,6 +136,17 @@ typedef struct VariableCacheData
 										 * aborted */
 
 	/*
+	 * This field is protected by ProcArrayLock except
+	 * during recovery, when it's set unlocked.
+	 *
+	 * oldestCatalogXmin is the oldest xid it is
+	 * guaranteed to be safe to create a historic
+	 * snapshot for. See also
+	 * procArray->replication_slot_catalog_xmin
+	 */
+	TransactionId oldestCatalogXmin;
+
+	/*
 	 * These fields are protected by CLogTruncationLock
 	 */
 	TransactionId oldestClogXid;	/* oldest it's safe to look up in clog */
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index 3a25cc8..1fe89ae 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -45,6 +45,7 @@ typedef struct CheckPoint
 	MultiXactOffset nextMultiOffset;	/* next free MultiXact offset */
 	TransactionId oldestXid;	/* cluster-wide minimum datfrozenxid */
 	Oid			oldestXidDB;	/* database with minimum datfrozenxid */
+	TransactionId oldestCatalogXmin;	/* catalog retained after this xid */
 	MultiXactId oldestMulti;	/* cluster-wide minimum datminmxid */
 	Oid			oldestMultiDB;	/* database with minimum datminmxid */
 	pg_time_t	time;			/* time stamp of checkpoint */
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 9b42e49..05ace64 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -120,6 +120,7 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin,
 							TransactionId catalog_xmin, bool already_locked);
 
 extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
-								TransactionId *catalog_xmin);
+								TransactionId *retained_catalog_xmin,
+								TransactionId *needed_catalog_xmin);
 
 #endif   /* PROCARRAY_H */
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 3ecc446..7756a27 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -65,6 +65,10 @@ extern void StandbyReleaseOldLocks(int nxids, TransactionId *xids);
  * is written to WAL as a separate record immediately after each
  * checkpoint. That means that wherever we start a standby from we will
  * almost immediately see the data we need to begin executing queries.
+ *
+ * Information about the oldest catalog_xmin needed by any replication slot is
+ * also included here, so we can use it to update the catalog tuple removal
+ * limit and convey the new limit to standbys.
  */
 
 typedef struct RunningTransactionsData
@@ -75,6 +79,8 @@ typedef struct RunningTransactionsData
 	TransactionId nextXid;		/* copy of ShmemVariableCache->nextXid */
 	TransactionId oldestRunningXid;		/* *not* oldestXmin */
 	TransactionId latestCompletedXid;	/* so we can set xmax */
+	/* so we can update ShmemVariableCache->oldestCatalogXmin: */
+	TransactionId pendingOldestCatalogXmin;
 
 	TransactionId *xids;		/* array of (sub)xids still running */
 } RunningTransactionsData;
diff --git a/src/include/storage/standbydefs.h b/src/include/storage/standbydefs.h
index f8444c7..6153675 100644
--- a/src/include/storage/standbydefs.h
+++ b/src/include/storage/standbydefs.h
@@ -52,6 +52,7 @@ typedef struct xl_running_xacts
 	TransactionId nextXid;		/* copy of ShmemVariableCache->nextXid */
 	TransactionId oldestRunningXid;		/* *not* oldestXmin */
 	TransactionId latestCompletedXid;	/* so we can set xmax */
+	TransactionId oldestCatalogXmin;	/* oldest safe historic snapshot */
 
 	TransactionId xids[FLEXIBLE_ARRAY_MEMBER];
 } xl_running_xacts;
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index bf9b50a..2cfa9ac 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -7,7 +7,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 16;
+use Test::More tests => 25;
 
 # Initialize master node
 my $node_master = get_new_node('master');
@@ -17,6 +17,10 @@ $node_master->append_conf(
 wal_level = logical
 ));
 $node_master->start;
+
+command_like(['pg_controldata', $node_master->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:0$/m,
+	"pg_controldata's oldestCatalogXmin is zero after start");
+
 my $backup_name = 'master_backup';
 
 $node_master->safe_psql('postgres', qq[CREATE TABLE decoding_test(x integer, y text);]);
@@ -96,9 +100,18 @@ isnt($node_master->slot('test_slot')->{'catalog_xmin'}, '0',
 	'restored slot catalog_xmin is nonzero');
 is($node_master->psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]), 3,
 	'reading from slot with wal_level < logical fails');
+command_like(['pg_controldata', $node_master->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:[^0][\d]*$/m,
+	"pg_controldata's oldestCatalogXmin is nonzero");
 is($node_master->psql('postgres', q[SELECT pg_drop_replication_slot('test_slot')]), 0,
 	'can drop logical slot while wal_level = replica');
 is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped');
+$node_master->safe_psql('postgres', 'VACUUM;');
+# First checkpoint forces xl_running_xacts with the new oldestCatalogXmin
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+# Then we need a second checkpoint to write the control file with the new value
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+command_like(['pg_controldata', $node_master->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:0$/m,
+	"pg_controldata's oldestCatalogXmin is zero after drop, vacuum and checkpoint");
 
 # done with the node
 $node_master->stop;
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to