On 31 March 2017 at 12:49, Craig Ringer <cr...@2ndquadrant.com> wrote:
> On 31 March 2017 at 01:16, Andres Freund <and...@anarazel.de> wrote:
>>> @@ -9633,6 +9643,12 @@ xlog_redo(XLogReaderState *record)
>>>               SetTransactionIdLimit(checkPoint.oldestXid, 
>>> checkPoint.oldestXidDB);
>>>
>>>               /*
>>> +              * There can be no concurrent writers to oldestCatalogXmin 
>>> during
>>> +              * recovery, so no need to take ProcArrayLock.
>>> +              */
>>> +             ShmemVariableCache->oldestCatalogXmin = 
>>> checkPoint.oldestCatalogXmin;
>>
>> s/writers/writes/?
>
> I meant writers, i.e. nothing else can be writing to it. But writes works too.
>

Fixed.

>>> @@ -276,8 +279,21 @@ CreateInitDecodingContext(char *plugin,
>>>
>>>       ReplicationSlotsComputeRequiredXmin(true);
>>>
>>> +     /*
>>> +      * If this is the first slot created on the master we won't have a
>>> +      * persistent record of the oldest safe xid for historic snapshots 
>>> yet.
>>> +      * Force one to be recorded so that when we go to replay from this 
>>> slot we
>>> +      * know it's safe.
>>> +      */
>>> +     force_standby_snapshot =
>>> +             !TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin);
>>
>> s/first slot/first logical slot/. Also, the reference to master doesn't
>> seem right.
>
> Unsure what you mean re reference to master not seeming right.
>
> If oldestCatalogXmin is 0 we'll ERROR when trying to start decoding
> from the new slot so we need to make sure it gets advanced one we've
> decided on our starting catalog_xmin.

Moved to next patch, will address there.

>>>       LWLockRelease(ProcArrayLock);
>>>
>>> +     /* Update ShmemVariableCache->oldestCatalogXmin */
>>> +     if (force_standby_snapshot)
>>> +             LogStandbySnapshot();
>>
>> The comment and code don't quite square to me - it's far from obvious
>> that LogStandbySnapshot does something like that. I'd even say it's a
>> bad idea to have it do that.
>
> So you prefer the prior approach with separate xl_catalog_xmin advance 
> records?
>
> I don't have much preference; I liked the small code reduction of
> Simon's proposed approach, but it landed up being a bit awkward in
> terms of ordering and locking. I don't think catalog_xmin tracking is
> really closely related to the standby snapshot stuff and it feels a
> bit like it's a tacked-on afterthought where it is now.

This code moved to next patch. But we do need to agree on the best approach.

If we're not going to force a standby snapshot here, then it's
probably better to use the separate xl_catalog_xmin design instead.

>>>       /*
>>>        * tell the snapshot builder to only assemble snapshot once reaching 
>>> the
>>>        * running_xact's record with the respective xmin.
>>> @@ -376,6 +392,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
>>>               start_lsn = slot->data.confirmed_flush;
>>>       }
>>>
>>> +     EnsureActiveLogicalSlotValid();
>>
>> This seems like it should be in a separate patch, and seperately
>> reviewed. It's code that's currently unreachable (and thus untestable).
>
> It is reached and is run, those checks run whenever creating a
> non-initial decoding context on master or replica.

Again, moved to next patch.

>>>       /*
>>> +      * Update our knowledge of the oldest xid we can safely create 
>>> historic
>>> +      * snapshots for.
>>> +      *
>>> +      * There can be no concurrent writers to oldestCatalogXmin during
>>> +      * recovery, so no need to take ProcArrayLock.
>>
>> By now I think is pretty flawed logic, because there can be concurrent
>> readers, that need to be safe against oldestCatalogXmin advancing
>> concurrently.
>
> You're right, we'll need a lock or suitable barriers here to ensure
> that slot conflict with recovery and startup of new decoding sessions
> doesn't see outdated values. This would be the peer of the
> pg_memory_barrier() above. Or could just take a lock; there's enough
> other locking activity in redo that it should be fine.

Now takes ProcArrayLock briefly.

oldestCatalogXmin is also used in GetOldestSafeDecodingTransactionId,
and there we want to prevent it from being advanced. But on further
thought, relying on oldestCatalogXmin there is actually unsafe; on the
master, we might've already logged our intent to advance it to some
greater value of procArray->replication_slot_catalog_xmin and will do
so as ProcArrayLock is released. On standby we're also better off
relying on procArray->replication_slot_catalog_xmin since that's what
we'll be sending in feedback.

Went back to using replication_slot_catalog_xmin here, with comment

     *
     * We don't use ShmemVariableCache->oldestCatalogXmin here because another
     * backend may have already logged its intention to advance it to a higher
     * value (still <= replication_slot_catalog_xmin) and just be waiting on
     * ProcArrayLock to actually apply the change. On a standby
     * replication_slot_catalog_xmin is what the walreceiver will be sending in
     * hot_standby_feedback, not oldestCatalogXmin.
     */


>>>       /*
>>> -      * It's important *not* to include the limits set by slots here 
>>> because
>>> +      * It's important *not* to include the xmin set by slots here because
>>>        * snapbuild.c uses oldestRunningXid to manage its xmin horizon. If 
>>> those
>>>        * were to be included here the initial value could never increase 
>>> because
>>>        * of a circular dependency where slots only increase their limits 
>>> when
>>>        * running xacts increases oldestRunningXid and running xacts only
>>>        * increases if slots do.
>>> +      *
>>> +      * We can include the catalog_xmin limit here; there's no similar
>>> +      * circularity, and we need it to log xl_running_xacts records for
>>> +      * standbys.
>>>        */
>>
>> Those comments seem to need some more heavyhanded reconciliation.
>
> OK. To me it seems clear that the first refers to xmin, the second to
> catalog_xmin. But after all I wrote it, and the important thing is
> what it says to people who are not me. Will adjust.

Changed to

     * We can safely report the catalog_xmin limit for replication slots here
     * because it's only used to advance oldestCatalogXmin. Slots'
     * catalog_xmin advance does not depend on it so there's no circularity.



>
>>>   *
>>>   * Return the current slot xmin limits. That's useful to be able to remove
>>>   * data that's older than those limits.
>>> + *
>>> + * For logical replication slots' catalog_xmin, we return both the 
>>> effective
>>
>> This seems to need some light editing.
>
> catalog_xmin => catalog_xmins I guess.

Amended.

>>>  /*
>>>   * Record an enhanced snapshot of running transactions into WAL.
>>>   *
>>> + * We also record the value of procArray->replication_slot_catalog_xmin
>>> + * obtained from GetRunningTransactionData here, so standbys know we're 
>>> about
>>> + * to advance ShmemVariableCache->oldestCatalogXmin to its value and start
>>> + * removing dead catalog tuples below that threshold.
>>
>> I think needs some rephrasing. We're not necessarily about to remove
>> catalog tuples here, nor are we necessarily advancing oldestCatalogXmin.
>
> Agreed

 * We also record the value of procArray->replication_slot_catalog_xmin
 * obtained from GetRunningTransactionData here. We intend to advance
 * ShmemVariableCache->oldestCatalogXmin to it once standbys have been informed
 * of the new value, which will permit removal of previously-protected dead
 * catalog tuples. The standby needs to know about that before any WAL
 * records containing such tuple removals could possibly arrive.


>>> +static void
>>> +UpdateOldestCatalogXmin(TransactionId pendingOldestCatalogXmin)
>>> +{
>>> +     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
>>> +     if (TransactionIdPrecedes(ShmemVariableCache->oldestCatalogXmin, 
>>> pendingOldestCatalogXmin)
>>> +             || 
>>> (TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin) != 
>>> TransactionIdIsValid(pendingOldestCatalogXmin)))
>>> +             ShmemVariableCache->oldestCatalogXmin = 
>>> pendingOldestCatalogXmin;
>>> +     LWLockRelease(ProcArrayLock);
>>> +}
>>
>> Doing TransactionIdPrecedes before ensuring
>> ShmemVariableCache->oldestCatalogXmin is actually valid doesn't strike
>> me as a good idea.  Generally, the expression as it stands is hard to
>> understand.
>
> OK.
>
> I found other formulations to be long and hard to read. Expressing it
> as "if validity has changed or value has increased" made more sense.
> Agree order should change.

Re-ordered, otherwise left the same.

Could add a comment like

"we must set oldestCatalogXmin if its validity has changed or it is advancing"

but seems rather redundant to the code.


>>> diff --git a/src/include/access/transam.h b/src/include/access/transam.h
>>> index d25a2dd..a4ecfb7 100644
>>> --- a/src/include/access/transam.h
>>> +++ b/src/include/access/transam.h
>>> @@ -136,6 +136,17 @@ typedef struct VariableCacheData
>>>                                                                             
>>>    * aborted */
>>>
>>>       /*
>>> +      * This field is protected by ProcArrayLock except
>>> +      * during recovery, when it's set unlocked.
>>> +      *
>>> +      * oldestCatalogXmin is the oldest xid it is
>>> +      * guaranteed to be safe to create a historic
>>> +      * snapshot for. See also
>>> +      * procArray->replication_slot_catalog_xmin
>>> +      */
>>> +     TransactionId oldestCatalogXmin;
>>
>> Maybe it'd be better to rephrase that do something like
>> "oldestCatalogXmin guarantees that no valid catalog tuples >= than it
>> are removed. That property is used for logical decoding.". or similar?
>
> Fine by me.
>
> I'll adjust this per discussion and per a comment Simon made
> separately. Whether we use it right away or not it's worth having it
> updated while it's still freshly in mind.

OK, updated catalog_xmin logging patch attached.

Important fix included: when faking up a RunningTransactions snapshot
in StartupXLOG for replay of shutdown checkpoints, copy the
checkpoint's oldestCatalogXmin so we apply it instead of clobbering
the replica's value. It's kind of roundabout to set this once when we
apply the checkpoint and again via ProcArrayApplyRecoveryInfo, but
it's necessary if we're using xl_running_xacts to carry
oldestCatalogXmin info.

Found another issue too. We log our intention to increase
oldestCatalogXmin in LogStandbySnapshot when we write
xl_running_xacts. We then release ProcArrayLock to re-acquire it
LW_EXCLUSIVE so we can increment oldestCatalogXmin in shmem. But a
checkpoint runs and copies the old oldestCatalogXmin value after we
wrote xlog but before we updated in shmem. On the standby, redo will
apply the new value then clobber it with the old one.

To fix this, take CheckpointLock in LogStandbySnapshot (if not called
during a checkpoint) so we can't have the xl_running_xacts with the
new oldestCatalogXmin land up in WAL before a checkpoint with an older
value. Also take oldestCatalogXmin's value after we've forced
LogStandbySnapshot in a checkpoint.

Extended tests a bit to cover redo on standbys.

Personally I'm not a huge fan of how integrating this with logging
standby snapshots has turned out. It seemed to make sense initially,
but I think the way it works out is more convoluted than necessary for
little benefit. I'll prep an updated version of the
xl_advance_catalog_xmin patch with the same fixes for side by side
comparison.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From 7f742f582e1f6f8f23c4e9d78cd0298180e5387c Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Wed, 22 Mar 2017 13:36:49 +0800
Subject: [PATCH] Log catalog_xmin advances before removing catalog tuples

Before advancing the effective catalog_xmin we use to remove old catalog
tuple versions, make sure it is written to WAL. This allows standbys
to know the oldest xid they can safely create a historic snapshot for.
They can then refuse to start decoding from a slot or raise a recovery
conflict.

The catalog_xmin advance is logged in the next xl_running_xacts records, so
vacuum of catalogs may be held back up to 10 seconds when a replication slot
with catalog_xmin is holding down the global catalog_xmin.
---
 src/backend/access/heap/rewriteheap.c       |  3 +-
 src/backend/access/rmgrdesc/standbydesc.c   |  5 +-
 src/backend/access/rmgrdesc/xlogdesc.c      |  3 +-
 src/backend/access/transam/varsup.c         | 15 +++++
 src/backend/access/transam/xlog.c           | 26 +++++++-
 src/backend/postmaster/bgwriter.c           |  2 +-
 src/backend/replication/slot.c              |  2 +-
 src/backend/replication/walreceiver.c       |  2 +-
 src/backend/replication/walsender.c         |  8 +++
 src/backend/storage/ipc/procarray.c         | 61 ++++++++++++++++---
 src/backend/storage/ipc/standby.c           | 60 +++++++++++++++++--
 src/bin/pg_controldata/pg_controldata.c     |  2 +
 src/include/access/transam.h                |  5 ++
 src/include/catalog/pg_control.h            |  1 +
 src/include/storage/procarray.h             |  3 +-
 src/include/storage/standby.h               |  8 ++-
 src/include/storage/standbydefs.h           |  1 +
 src/test/recovery/t/006_logical_decoding.pl | 93 +++++++++++++++++++++++++++--
 18 files changed, 269 insertions(+), 31 deletions(-)

diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index d7f65a5..d1400ec 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -812,7 +812,8 @@ logical_begin_heap_rewrite(RewriteState state)
 	if (!state->rs_logical_rewrite)
 		return;
 
-	ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
+	/* Use oldestCatalogXmin here */
+	ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin, NULL);
 
 	/*
 	 * If there are no logical slots in progress we don't need to do anything,
diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index 278546a..4aaae59 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -21,10 +21,11 @@ standby_desc_running_xacts(StringInfo buf, xl_running_xacts *xlrec)
 {
 	int			i;
 
-	appendStringInfo(buf, "nextXid %u latestCompletedXid %u oldestRunningXid %u",
+	appendStringInfo(buf, "nextXid %u latestCompletedXid %u oldestRunningXid %u oldestCatalogXmin %u",
 					 xlrec->nextXid,
 					 xlrec->latestCompletedXid,
-					 xlrec->oldestRunningXid);
+					 xlrec->oldestRunningXid,
+					 xlrec->oldestCatalogXmin);
 	if (xlrec->xcnt > 0)
 	{
 		appendStringInfo(buf, "; %d xacts:", xlrec->xcnt);
diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index 5f07eb1..a66cfc6 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -47,7 +47,7 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
 						 "tli %u; prev tli %u; fpw %s; xid %u:%u; oid %u; multi %u; offset %u; "
 						 "oldest xid %u in DB %u; oldest multi %u in DB %u; "
 						 "oldest/newest commit timestamp xid: %u/%u; "
-						 "oldest running xid %u; %s",
+						 "oldest running xid %u; oldest catalog xmin %u; %s",
 				(uint32) (checkpoint->redo >> 32), (uint32) checkpoint->redo,
 						 checkpoint->ThisTimeLineID,
 						 checkpoint->PrevTimeLineID,
@@ -63,6 +63,7 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
 						 checkpoint->oldestCommitTsXid,
 						 checkpoint->newestCommitTsXid,
 						 checkpoint->oldestActiveXid,
+						 checkpoint->oldestCatalogXmin,
 				 (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
 	}
 	else if (info == XLOG_NEXTOID)
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index 5efbfbd..ffabf1c 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -414,6 +414,21 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
 	}
 }
 
+/*
+ * Set the global oldest catalog_xmin used to determine when tuples
+ * may be removed from catalogs and user-catalogs accessible from logical
+ * decoding.
+ *
+ * Only to be called from the startup process or from LogCurrentRunningXacts()
+ * which ensures the update is properly written to xlog first.
+ */
+void
+SetOldestCatalogXmin(TransactionId oldestCatalogXmin)
+{
+	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+	ShmemVariableCache->oldestCatalogXmin = oldestCatalogXmin;
+	LWLockRelease(ProcArrayLock);
+}
 
 /*
  * ForceTransactionIdLimitUpdate -- does the XID wrap-limit data need updating?
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 5d58f09..cec68b2 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5021,6 +5021,7 @@ BootStrapXLOG(void)
 	MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
 	AdvanceOldestClogXid(checkPoint.oldestXid);
 	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+	SetOldestCatalogXmin(checkPoint.oldestCatalogXmin);
 	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true);
 	SetCommitTsLimit(InvalidTransactionId, InvalidTransactionId);
 
@@ -6611,6 +6612,9 @@ StartupXLOG(void)
 	   (errmsg_internal("oldest unfrozen transaction ID: %u, in database %u",
 						checkPoint.oldestXid, checkPoint.oldestXidDB)));
 	ereport(DEBUG1,
+			(errmsg_internal("oldest catalog-only transaction ID: %u",
+							 checkPoint.oldestCatalogXmin)));
+	ereport(DEBUG1,
 			(errmsg_internal("oldest MultiXactId: %u, in database %u",
 						 checkPoint.oldestMulti, checkPoint.oldestMultiDB)));
 	ereport(DEBUG1,
@@ -6628,6 +6632,7 @@ StartupXLOG(void)
 	MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
 	AdvanceOldestClogXid(checkPoint.oldestXid);
 	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+	SetOldestCatalogXmin(checkPoint.oldestCatalogXmin);
 	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true);
 	SetCommitTsLimit(checkPoint.oldestCommitTsXid,
 					 checkPoint.newestCommitTsXid);
@@ -6908,6 +6913,8 @@ StartupXLOG(void)
 				running.subxid_overflow = false;
 				running.nextXid = checkPoint.nextXid;
 				running.oldestRunningXid = oldestActiveXID;
+				running.pendingOldestCatalogXmin
+					= checkPoint.oldestCatalogXmin;
 				latestCompletedXid = checkPoint.nextXid;
 				TransactionIdRetreat(latestCompletedXid);
 				Assert(TransactionIdIsNormal(latestCompletedXid));
@@ -8786,7 +8793,16 @@ CreateCheckPoint(int flags)
 	 * recovery we don't need to write running xact data.
 	 */
 	if (!shutdown && XLogStandbyInfoActive())
-		LogStandbySnapshot();
+		LogStandbySnapshot(true);
+
+	/*
+	 * We must copy oldestCatalogXmin after the standby snapshot so we get any
+	 * updated value and don't clobber the value written in xl_running_xacts
+	 * with an older one in the checkpoint.
+	 */
+	LWLockAcquire(ProcArrayLock, LW_SHARED);
+	checkPoint.oldestCatalogXmin = ShmemVariableCache->oldestCatalogXmin;
+	LWLockRelease(ProcArrayLock);
 
 	START_CRIT_SECTION();
 
@@ -9670,6 +9686,8 @@ xlog_redo(XLogReaderState *record)
 			running.subxid_overflow = false;
 			running.nextXid = checkPoint.nextXid;
 			running.oldestRunningXid = oldestActiveXID;
+			running.pendingOldestCatalogXmin =
+				checkPoint.oldestCatalogXmin;
 			latestCompletedXid = checkPoint.nextXid;
 			TransactionIdRetreat(latestCompletedXid);
 			Assert(TransactionIdIsNormal(latestCompletedXid));
@@ -9729,8 +9747,10 @@ xlog_redo(XLogReaderState *record)
 							   checkPoint.oldestMultiDB);
 		if (TransactionIdPrecedes(ShmemVariableCache->oldestXid,
 								  checkPoint.oldestXid))
-			SetTransactionIdLimit(checkPoint.oldestXid,
-								  checkPoint.oldestXidDB);
+			SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+
+		SetOldestCatalogXmin(checkPoint.oldestCatalogXmin);
+
 		/* ControlFile->checkPointCopy always tracks the latest ckpt XID */
 		ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
 		ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c
index dcb4cf2..258c955 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -330,7 +330,7 @@ BackgroundWriterMain(void)
 			if (now >= timeout &&
 				last_snapshot_lsn < GetLastImportantRecPtr())
 			{
-				last_snapshot_lsn = LogStandbySnapshot();
+				last_snapshot_lsn = LogStandbySnapshot(false);
 				last_snapshot_ts = now;
 			}
 		}
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 6c5ec7a..605990f 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -947,7 +947,7 @@ ReplicationSlotReserveWal(void)
 			slot->data.restart_lsn = GetXLogInsertRecPtr();
 
 			/* make sure we have enough information to start */
-			flushptr = LogStandbySnapshot();
+			flushptr = LogStandbySnapshot(false);
 
 			/* and make sure it's fsynced to disk */
 			XLogFlush(flushptr);
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index df93265..277f196 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1234,7 +1234,7 @@ XLogWalRcvSendHSFeedback(bool immed)
 		xmin = GetOldestXmin(NULL,
 							 PROCARRAY_FLAGS_DEFAULT|PROCARRAY_SLOTS_XMIN);
 
-		ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
+		ProcArrayGetReplicationSlotXmin(&slot_xmin, NULL, &catalog_xmin);
 
 		if (TransactionIdIsValid(slot_xmin) &&
 			TransactionIdPrecedes(slot_xmin, xmin))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index cfc3fba..7c46e24 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1778,6 +1778,14 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbac
 		slot->data.xmin = feedbackXmin;
 		slot->effective_xmin = feedbackXmin;
 	}
+	/*
+	 * If the physical slot is relaying catalog_xmin for logical replication
+	 * slots on the replica it's safe to act on catalog_xmin advances
+	 * immediately too. The replica will only send a new catalog_xmin via
+	 * feedback when it advances its effective_catalog_xmin, so it's done the
+	 * delay-until-confirmed dance for us and knows it won't need the data
+	 * we're protecting from vacuum again.
+	 */
 	if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
 		!TransactionIdIsNormal(feedbackCatalogXmin) ||
 		TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 7c2e1e1..898a5ca 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -87,7 +87,11 @@ typedef struct ProcArrayStruct
 
 	/* oldest xmin of any replication slot */
 	TransactionId replication_slot_xmin;
-	/* oldest catalog xmin of any replication slot */
+	/*
+	 * Oldest catalog xmin of any replication slot
+	 *
+	 * See also ShmemVariableCache->oldestGlobalXmin
+	 */
 	TransactionId replication_slot_catalog_xmin;
 
 	/* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */
@@ -679,6 +683,15 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
 	ExpireOldKnownAssignedTransactionIds(running->oldestRunningXid);
 
 	/*
+	 * Update our knowledge of the oldest xid we can safely create historic
+	 * snapshots for.
+	 *
+	 * If we allow logical decoding on standbys in future we must raise
+	 * recovery conflicts with catalog_xmin advances here.
+	 */
+	SetOldestCatalogXmin(running->pendingOldestCatalogXmin);
+
+	/*
 	 * Remove stale locks, if any.
 	 *
 	 * Locks are always assigned to the toplevel xid so we don't need to care
@@ -1306,6 +1319,9 @@ TransactionIdIsActive(TransactionId xid)
  * The return value is also adjusted with vacuum_defer_cleanup_age, so
  * increasing that setting on the fly is another easy way to make
  * GetOldestXmin() move backwards, with no consequences for data integrity.
+ *
+ * When changing GetOldestXmin, check to see whether RecentGlobalXmin
+ * computation in GetSnapshotData also needs changing.
  */
 TransactionId
 GetOldestXmin(Relation rel, int flags)
@@ -1700,7 +1716,7 @@ GetSnapshotData(Snapshot snapshot)
 
 	/* fetch into volatile var while ProcArrayLock is held */
 	replication_slot_xmin = procArray->replication_slot_xmin;
-	replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
+	replication_slot_catalog_xmin = ShmemVariableCache->oldestCatalogXmin;
 
 	if (!TransactionIdIsValid(MyPgXact->xmin))
 		MyPgXact->xmin = TransactionXmin = xmin;
@@ -1711,6 +1727,9 @@ GetSnapshotData(Snapshot snapshot)
 	 * Update globalxmin to include actual process xids.  This is a slightly
 	 * different way of computing it than GetOldestXmin uses, but should give
 	 * the same result.
+	 *
+	 * If you change computation of RecentGlobalXmin here you may need to
+	 * change GetOldestXmin(...) as well.
 	 */
 	if (TransactionIdPrecedes(xmin, globalxmin))
 		globalxmin = xmin;
@@ -2041,12 +2060,16 @@ GetRunningTransactionData(void)
 	}
 
 	/*
-	 * It's important *not* to include the limits set by slots here because
+	 * It's important *not* to include the xmin set by slots here because
 	 * snapbuild.c uses oldestRunningXid to manage its xmin horizon. If those
 	 * were to be included here the initial value could never increase because
-	 * of a circular dependency where slots only increase their limits when
-	 * running xacts increases oldestRunningXid and running xacts only
+	 * of a circular dependency where slots only increase their xmin limits
+	 * when running xacts increases oldestRunningXid and running xacts only
 	 * increases if slots do.
+	 *
+	 * We can safely report the catalog_xmin limit for replication slots here
+	 * because it's only used to advance oldestCatalogXmin. Slots' catalog_xmin
+	 * advance does not depend on it so there's no circularity.
 	 */
 
 	CurrentRunningXacts->xcnt = count - subcount;
@@ -2055,6 +2078,8 @@ GetRunningTransactionData(void)
 	CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid;
 	CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
 	CurrentRunningXacts->latestCompletedXid = latestCompletedXid;
+	CurrentRunningXacts->pendingOldestCatalogXmin =
+		procArray->replication_slot_catalog_xmin;
 
 	Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid));
 	Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid));
@@ -2171,6 +2196,13 @@ GetOldestSafeDecodingTransactionId(void)
 	 * If there's already a slot pegging the xmin horizon, we can start with
 	 * that value, it's guaranteed to be safe since it's computed by this
 	 * routine initially and has been enforced since.
+	 *
+	 * We don't use ShmemVariableCache->oldestCatalogXmin here because another
+	 * backend may have already logged its intention to advance it to a higher
+	 * value (still <= replication_slot_catalog_xmin) and just be waiting on
+	 * ProcArrayLock to actually apply the change. On a standby
+	 * replication_slot_catalog_xmin is what the walreceiver will be sending in
+	 * hot_standby_feedback, not oldestCatalogXmin.
 	 */
 	if (TransactionIdIsValid(procArray->replication_slot_catalog_xmin) &&
 		TransactionIdPrecedes(procArray->replication_slot_catalog_xmin,
@@ -2965,18 +2997,31 @@ ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin,
  *
  * Return the current slot xmin limits. That's useful to be able to remove
  * data that's older than those limits.
+ *
+ * For logical replication slots' catalog_xmins, we return both the effective
+ * catalog_xmin being used for tuple removal (retained catalog_xmin) and the
+ * catalog_xmin actually needed by replication slots (needed_catalog_xmin).
+ *
+ * retained_catalog_xmin should be older than needed_catalog_xmin but is not
+ * guaranteed to be if there are replication slots on a replica currently
+ * attempting to start up and reserve catalogs, outdated replicas sending
+ * feedback, etc.
  */
 void
 ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
-								TransactionId *catalog_xmin)
+								TransactionId *retained_catalog_xmin,
+								TransactionId *needed_catalog_xmin)
 {
 	LWLockAcquire(ProcArrayLock, LW_SHARED);
 
 	if (xmin != NULL)
 		*xmin = procArray->replication_slot_xmin;
 
-	if (catalog_xmin != NULL)
-		*catalog_xmin = procArray->replication_slot_catalog_xmin;
+	if (retained_catalog_xmin != NULL)
+		*retained_catalog_xmin = ShmemVariableCache->oldestCatalogXmin;
+
+	if (needed_catalog_xmin != NULL)
+		*needed_catalog_xmin = procArray->replication_slot_catalog_xmin;
 
 	LWLockRelease(ProcArrayLock);
 }
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 8e57f93..7f73180 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -45,6 +45,7 @@ static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlis
 static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
 static XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
 static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
+static void UpdateOldestCatalogXmin(TransactionId pendingOldestCatalogXmin);
 
 
 /*
@@ -822,6 +823,7 @@ standby_redo(XLogReaderState *record)
 		running.latestCompletedXid = xlrec->latestCompletedXid;
 		running.oldestRunningXid = xlrec->oldestRunningXid;
 		running.xids = xlrec->xids;
+		running.pendingOldestCatalogXmin = xlrec->oldestCatalogXmin;
 
 		ProcArrayApplyRecoveryInfo(&running);
 	}
@@ -906,7 +908,7 @@ standby_redo(XLogReaderState *record)
  * Returns the RecPtr of the last inserted record.
  */
 XLogRecPtr
-LogStandbySnapshot(void)
+LogStandbySnapshot(bool in_checkpoint)
 {
 	XLogRecPtr	recptr;
 	RunningTransactions running;
@@ -924,6 +926,17 @@ LogStandbySnapshot(void)
 	pfree(locks);
 
 	/*
+	 * We must lock out concurrent checkpoints so that a checkpoint doesn't
+	 * copy oldestCatalogXmin after we've written a pending new value to xlog
+	 * but before we've updated it in shmem. Otherwise a standby will get the
+	 * old value after replaying the checkpoint.
+	 */
+	if (in_checkpoint)
+		Assert(LWLockHeldByMe(CheckpointLock));
+	else
+		LWLockAcquire(CheckpointLock, LW_SHARED);
+
+	/*
 	 * Log details of all in-progress transactions. This should be the last
 	 * record we write, because standby will open up when it sees this.
 	 */
@@ -953,12 +966,29 @@ LogStandbySnapshot(void)
 	/* GetRunningTransactionData() acquired XidGenLock, we must release it */
 	LWLockRelease(XidGenLock);
 
+	/*
+	 * Now that we've recorded our intention to allow cleanup of catalog tuples
+	 * no longer needed by our replication slots we can make the new threshold
+	 * effective for vacuum etc.
+	 */
+	UpdateOldestCatalogXmin(running->pendingOldestCatalogXmin);
+
+	if (!in_checkpoint)
+		LWLockRelease(CheckpointLock);
+
 	return recptr;
 }
 
 /*
  * Record an enhanced snapshot of running transactions into WAL.
  *
+ * We also record the value of procArray->replication_slot_catalog_xmin
+ * obtained from GetRunningTransactionData here. We intend to advance
+ * ShmemVariableCache->oldestCatalogXmin to it once standbys have been informed
+ * of the new value, which will permit removal of previously-protected dead
+ * catalog tuples. The standby needs to know about that before any WAL
+ * records containing such tuple removals could possibly arrive.
+ *
  * The definitions of RunningTransactionsData and xl_xact_running_xacts are
  * similar. We keep them separate because xl_xact_running_xacts is a
  * contiguous chunk of memory and never exists fully until it is assembled in
@@ -977,6 +1007,7 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
 	xlrec.nextXid = CurrRunningXacts->nextXid;
 	xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid;
 	xlrec.latestCompletedXid = CurrRunningXacts->latestCompletedXid;
+	xlrec.oldestCatalogXmin = CurrRunningXacts->pendingOldestCatalogXmin;
 
 	/* Header */
 	XLogBeginInsert();
@@ -992,20 +1023,22 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
 
 	if (CurrRunningXacts->subxid_overflow)
 		elog(trace_recovery(DEBUG2),
-			 "snapshot of %u running transactions overflowed (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
+			 "snapshot of %u running transactions overflowed (lsn %X/%X oldest xid %u latest complete %u next xid %u oldestCatalogXmin %u)",
 			 CurrRunningXacts->xcnt,
 			 (uint32) (recptr >> 32), (uint32) recptr,
 			 CurrRunningXacts->oldestRunningXid,
 			 CurrRunningXacts->latestCompletedXid,
-			 CurrRunningXacts->nextXid);
+			 CurrRunningXacts->nextXid,
+			 CurrRunningXacts->pendingOldestCatalogXmin);
 	else
 		elog(trace_recovery(DEBUG2),
-			 "snapshot of %u+%u running transaction ids (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
+			 "snapshot of %u+%u running transaction ids (lsn %X/%X oldest xid %u latest complete %u next xid %u oldestCatalogXmin %u)",
 			 CurrRunningXacts->xcnt, CurrRunningXacts->subxcnt,
 			 (uint32) (recptr >> 32), (uint32) recptr,
 			 CurrRunningXacts->oldestRunningXid,
 			 CurrRunningXacts->latestCompletedXid,
-			 CurrRunningXacts->nextXid);
+			 CurrRunningXacts->nextXid,
+			 CurrRunningXacts->pendingOldestCatalogXmin);
 
 	/*
 	 * Ensure running_xacts information is synced to disk not too far in the
@@ -1022,6 +1055,23 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
 }
 
 /*
+ * Advance the oldestCatalogXmin used for removal of dead catalog tuples to the
+ * lowest catalog_xmin threshold of any current replication slots.
+ *
+ * Should only be called during a checkpoint or after writing an xlog record to
+ * record the advance.
+ */
+static void
+UpdateOldestCatalogXmin(TransactionId pendingOldestCatalogXmin)
+{
+	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+	if ((TransactionIdIsValid(ShmemVariableCache->oldestCatalogXmin) != TransactionIdIsValid(pendingOldestCatalogXmin))
+		|| TransactionIdPrecedes(ShmemVariableCache->oldestCatalogXmin, pendingOldestCatalogXmin))
+		ShmemVariableCache->oldestCatalogXmin = pendingOldestCatalogXmin;
+	LWLockRelease(ProcArrayLock);
+}
+
+/*
  * Wholesale logging of AccessExclusiveLocks. Other lock types need not be
  * logged, as described in backend/storage/lmgr/README.
  */
diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c
index 2ea8931..5c7eb77 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -248,6 +248,8 @@ main(int argc, char *argv[])
 		   ControlFile->checkPointCopy.oldestCommitTsXid);
 	printf(_("Latest checkpoint's newestCommitTsXid:%u\n"),
 		   ControlFile->checkPointCopy.newestCommitTsXid);
+	printf(_("Latest checkpoint's oldestCatalogXmin:%u\n"),
+		   ControlFile->checkPointCopy.oldestCatalogXmin);
 	printf(_("Time of latest checkpoint:            %s\n"),
 		   ckpttime_str);
 	printf(_("Fake LSN counter for unlogged rels:   %X/%X\n"),
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index d25a2dd..0123fc8 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -134,6 +134,10 @@ typedef struct VariableCacheData
 	 */
 	TransactionId latestCompletedXid;	/* newest XID that has committed or
 										 * aborted */
+	TransactionId oldestCatalogXmin;	/* oldestCatalogXmin guarantees that no
+										 * valid catalog tuples >= than it are
+										 * removed. That property is used for
+										 * logical decoding. */
 
 	/*
 	 * These fields are protected by CLogTruncationLock
@@ -179,6 +183,7 @@ extern TransactionId GetNewTransactionId(bool isSubXact);
 extern TransactionId ReadNewTransactionId(void);
 extern void SetTransactionIdLimit(TransactionId oldest_datfrozenxid,
 					  Oid oldest_datoid);
+extern void SetOldestCatalogXmin(TransactionId oldestCatalogXmin);
 extern void AdvanceOldestClogXid(TransactionId oldest_datfrozenxid);
 extern bool ForceTransactionIdLimitUpdate(void);
 extern Oid	GetNewObjectId(void);
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index 3a25cc8..1fe89ae 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -45,6 +45,7 @@ typedef struct CheckPoint
 	MultiXactOffset nextMultiOffset;	/* next free MultiXact offset */
 	TransactionId oldestXid;	/* cluster-wide minimum datfrozenxid */
 	Oid			oldestXidDB;	/* database with minimum datfrozenxid */
+	TransactionId oldestCatalogXmin;	/* catalog retained after this xid */
 	MultiXactId oldestMulti;	/* cluster-wide minimum datminmxid */
 	Oid			oldestMultiDB;	/* database with minimum datminmxid */
 	pg_time_t	time;			/* time stamp of checkpoint */
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 9b42e49..05ace64 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -120,6 +120,7 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin,
 							TransactionId catalog_xmin, bool already_locked);
 
 extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
-								TransactionId *catalog_xmin);
+								TransactionId *retained_catalog_xmin,
+								TransactionId *needed_catalog_xmin);
 
 #endif   /* PROCARRAY_H */
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 3ecc446..4728fa5 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -65,6 +65,10 @@ extern void StandbyReleaseOldLocks(int nxids, TransactionId *xids);
  * is written to WAL as a separate record immediately after each
  * checkpoint. That means that wherever we start a standby from we will
  * almost immediately see the data we need to begin executing queries.
+ *
+ * Information about the oldest catalog_xmin needed by any replication slot is
+ * also included here, so we can use it to update the catalog tuple removal
+ * limit and convey the new limit to standbys.
  */
 
 typedef struct RunningTransactionsData
@@ -75,6 +79,8 @@ typedef struct RunningTransactionsData
 	TransactionId nextXid;		/* copy of ShmemVariableCache->nextXid */
 	TransactionId oldestRunningXid;		/* *not* oldestXmin */
 	TransactionId latestCompletedXid;	/* so we can set xmax */
+	/* so we can update ShmemVariableCache->oldestCatalogXmin: */
+	TransactionId pendingOldestCatalogXmin;
 
 	TransactionId *xids;		/* array of (sub)xids still running */
 } RunningTransactionsData;
@@ -84,7 +90,7 @@ typedef RunningTransactionsData *RunningTransactions;
 extern void LogAccessExclusiveLock(Oid dbOid, Oid relOid);
 extern void LogAccessExclusiveLockPrepare(void);
 
-extern XLogRecPtr LogStandbySnapshot(void);
+extern XLogRecPtr LogStandbySnapshot(bool in_checkpoint);
 extern void LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
 						bool relcacheInitFileInval);
 
diff --git a/src/include/storage/standbydefs.h b/src/include/storage/standbydefs.h
index f8444c7..6153675 100644
--- a/src/include/storage/standbydefs.h
+++ b/src/include/storage/standbydefs.h
@@ -52,6 +52,7 @@ typedef struct xl_running_xacts
 	TransactionId nextXid;		/* copy of ShmemVariableCache->nextXid */
 	TransactionId oldestRunningXid;		/* *not* oldestXmin */
 	TransactionId latestCompletedXid;	/* so we can set xmax */
+	TransactionId oldestCatalogXmin;	/* oldest safe historic snapshot */
 
 	TransactionId xids[FLEXIBLE_ARRAY_MEMBER];
 } xl_running_xacts;
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index bf9b50a..a1c92c8 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -7,24 +7,79 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 16;
+use Test::More tests => 44;
 
 # Initialize master node
 my $node_master = get_new_node('master');
 $node_master->init(allows_streaming => 1);
-$node_master->append_conf(
-		'postgresql.conf', qq(
+$node_master->append_conf('postgresql.conf', qq(
 wal_level = logical
+hot_standby_feedback = on
+wal_receiver_status_interval = 1
+log_min_messages = debug1
 ));
 $node_master->start;
-my $backup_name = 'master_backup';
 
+# Set up some changes before we make base backups
 $node_master->safe_psql('postgres', qq[CREATE TABLE decoding_test(x integer, y text);]);
 
 $node_master->safe_psql('postgres', qq[SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');]);
 
 $node_master->safe_psql('postgres', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]);
 
+# Launch two streaming replicas, one with and one without
+# physical replication slots. We'll use these for tests
+# involving interaction of logical and physical standby.
+#
+# Both backups are created with pg_basebackup.
+#
+my $backup_name = 'master_backup';
+$node_master->backup($backup_name);
+
+$node_master->safe_psql('postgres', q[SELECT pg_create_physical_replication_slot('slot_replica');]);
+my $node_slot_replica = get_new_node('slot_replica');
+$node_slot_replica->init_from_backup($node_master, $backup_name, has_streaming => 1);
+$node_slot_replica->append_conf('recovery.conf', "primary_slot_name = 'slot_replica'");
+
+my $node_noslot_replica = get_new_node('noslot_replica');
+$node_noslot_replica->init_from_backup($node_master, $backup_name, has_streaming => 1);
+
+$node_slot_replica->start;
+$node_noslot_replica->start;
+
+sub restartpoint_standbys
+{
+	# Force restartpoints to update control files on replicas
+	$node_slot_replica->safe_psql('postgres', 'CHECKPOINT');
+	$node_noslot_replica->safe_psql('postgres', 'CHECKPOINT');
+}
+
+sub wait_standbys
+{
+	my $lsn = $node_master->lsn('insert');
+	$node_master->wait_for_catchup($node_noslot_replica, 'replay', $lsn);
+	$node_master->wait_for_catchup($node_slot_replica, 'replay', $lsn);
+}
+
+# pg_basebackup doesn't copy replication slots
+is($node_slot_replica->slot('test_slot')->{'slot_name'}, undef,
+	'logical slot test_slot on master not copied by pg_basebackup');
+
+# Make sure oldestCatalogXmin lands in the control file on master
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+my @nodes = ($node_master, $node_slot_replica, $node_noslot_replica);
+
+wait_standbys();
+restartpoint_standbys();
+foreach my $node (@nodes)
+{
+	# Master had an oldestCatalogXmin, so we must've inherited it via checkpoint
+	command_like(['pg_controldata', $node->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:[^0][\d]*$/m,
+		"pg_controldata's oldestCatalogXmin is nonzero after start on " . $node->name);
+}
+
 # Basic decoding works
 my($result) = $node_master->safe_psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]);
 is(scalar(my @foobar = split /^/m, $result), 12, 'Decoding produced 12 rows inc BEGIN/COMMIT');
@@ -64,6 +119,9 @@ $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpo
 chomp($stdout_recv);
 is($stdout_recv, '', 'pg_recvlogical acknowledged changes, nothing pending on slot');
 
+# Create a second DB we'll use for testing dropping and accessing slots across
+# databases. This matters since logical slots are globally visible objects that
+# can only actually be used on one DB for most purposes.
 $node_master->safe_psql('postgres', 'CREATE DATABASE otherdb');
 
 is($node_master->psql('otherdb', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;"), 3,
@@ -96,9 +154,32 @@ isnt($node_master->slot('test_slot')->{'catalog_xmin'}, '0',
 	'restored slot catalog_xmin is nonzero');
 is($node_master->psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]), 3,
 	'reading from slot with wal_level < logical fails');
+wait_standbys();
+restartpoint_standbys();
+foreach my $node (@nodes)
+{
+	command_like(['pg_controldata', $node->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:[^0][\d]*$/m,
+		"pg_controldata's oldestCatalogXmin is nonzero on " . $node->name);
+}
+
+# Dropping the slot must clear catalog_xmin
 is($node_master->psql('postgres', q[SELECT pg_drop_replication_slot('test_slot')]), 0,
 	'can drop logical slot while wal_level = replica');
 is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped');
+$node_master->safe_psql('postgres', 'VACUUM;');
+# First checkpoint forces xl_running_xacts with the new oldestCatalogXmin
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+# Then we need a second checkpoint to write the control file with the new value
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+wait_standbys();
+restartpoint_standbys();
+foreach my $node (@nodes)
+{
+	command_like(['pg_controldata', $node->data_dir], qr/^Latest checkpoint's oldestCatalogXmin:0$/m,
+		"pg_controldata's oldestCatalogXmin is zero after drop, vacuum and checkpoint on " . $node->name);
+}
 
-# done with the node
-$node_master->stop;
+foreach my $node (@nodes)
+{
+	$node->stop;
+}
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to