On 5 September 2016 at 14:44, Craig Ringer <cr...@2ndquadrant.com> wrote:
> On 5 September 2016 at 12:40, Craig Ringer <cr...@2ndquadrant.com> wrote:
>> Hi all
>>
>> Currently hot standby feedback sends GetOldestXmin()'s result to the
>> upstream as the required xmin. GetOldestXmin() returns a slot's
>> catalog_xmin if that's the lowest xmin on the system.
>
>
> Note that this patch changes the API to GetOldestXmin(), adding a new
> boolean to allow it to disregard the catalog_xmin of slots.
>
> Per Simon's feedback I'm going to split that out into a separate
> patch, so will post a follow-up split one soon as the series.

Now formatted a series:

1.      Send catalog_xmin in hot standby feedback protocol
2.      Make walsender respect catalog_xmin in hot standby feedback messages
3.      Allow GetOldestXmin(...) to optionally disregard the catalog_xmin
4.      Send catalog_xmin separately in hot_standby_feedback messages


Descriptions are in the patch headers.


1 adds the protocol field only. The value is at this point always sent
as 0 by walreceiver and ignored by walsender. There's need to handle
backward compatibility in the addition to the hot standby protocol
message here as only the same major version Pg has any business
sending us hot standby feedback. pg_receivexlog, pg_basebackup etc
don't use hs feedback. Includes protocol docs change.

2 makes walsender now pay attention to the sent catalog_xmin.
walreceiver doesn't set it yet and has no way to get it separately.

3 Provides a way to get the global xmin without considering the
catalog_xmin so walreceiver can use it.

4 makes walsender use the modified GetOldestXmin()


(3) needs additional attention:

By ignoring slot catalog_xmin in the GetOldestXmin() call then
separately calling ProcArrayGetReplicationSlotXmin() to get the
catalog_xmin to  we might produce a catalog xmin slightly later than
what was in the procarray at the time we previously called
GetOldestXmin() to examine backend/session state. ProcArrayLock is
released so it can be advanced in-between the calls. That's harmless -
it isn't necessary for the reported catalog_xmin to be exactly
consistent with backend state. If it advances it's safe to report the
new position since we know the confirmed positions are on-disk
locally.

The alternative here is to extend GetOldestXmin() to take an out-param
to report the slot catalog xmin. That really just duplicates  the
functionality of ProcArrayGetReplicationSlotXmin but means we can grab
it within a single ProcArray lock. Variants of GetOldestXmin and
ProcArrayGetReplicationSlotXmin that take an already-locked flag would
work too, but would hold the lock across parts of GetOldestXmin() that
currently don't retain it. I could also convert the current boolean
param ignoreVacuum into a flags argument instead of adding another
boolean. No real preference from me.

I cut out some comment changes to be submitted separately; otherwise
this series is much the same as the original patch upthread.

Also available at
https://github.com/2ndQuadrant/postgres/tree/dev/feedback-catalog-xmin
(and tagged dev/feedback-catalog-xmin). Branch subject to rebasing.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From 275c6422962f1fc326cc5f0c92186de0b127c472 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Mon, 5 Sep 2016 15:30:53 +0800
Subject: [PATCH 1/6] Send catalog_xmin in hot standby feedback protocol

Add catalog_xmin to the to the hot standby feedback protocol so a read replica
that has logical slots can use its physical slot to the master to hold down the
master's catalog_xmin. This information will let a replica prevent vacuuming of
catalog tuples still required by the replica's logical slots.

This is the hot standby feedback protocol change, the new value is always set
to zero by the walreceiver and is ignored by the walsender.
---
 doc/src/sgml/protocol.sgml            | 33 ++++++++++++++++++++++++++++-----
 src/backend/replication/walreceiver.c | 20 ++++++++++++++------
 src/backend/replication/walsender.c   | 14 ++++++++++++--
 3 files changed, 54 insertions(+), 13 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 68b0941..c4e41ca 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1783,10 +1783,11 @@ The commands accepted in walsender mode are:
       </term>
       <listitem>
       <para>
-          The standby's current xmin. This may be 0, if the standby is
-          sending notification that Hot Standby feedback will no longer
-          be sent on this connection. Later non-zero messages may
-          reinitiate the feedback mechanism.
+          The standby's current global xmin, excluding the catalog_xmin from any
+          replication slots. If both this value and the following
+          catalog_xmin are 0 this is treated as a notification that Hot Standby
+          feedback will no longer be sent on this connection. Later non-zero
+          messages may reinitiate the feedback mechanism.
       </para>
       </listitem>
       </varlistentry>
@@ -1796,7 +1797,29 @@ The commands accepted in walsender mode are:
       </term>
       <listitem>
       <para>
-          The standby's current epoch.
+          The epoch of the global xmin xid on the standby.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The lowest catalog_xmin of any replication slots on the standby. Set to 0
+          if no catalog_xmin exists on the standby or if hot standby feedback is being
+          disabled. New in 10.0.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The epoch of the catalog_xmin xid on the standby. New in 10.0.
       </para>
       </listitem>
       </varlistentry>
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 413ee3a..0b92aac 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1161,8 +1161,8 @@ XLogWalRcvSendHSFeedback(bool immed)
 {
 	TimestampTz now;
 	TransactionId nextXid;
-	uint32		nextEpoch;
-	TransactionId xmin;
+	uint32		nextEpoch, xmin_epoch, catalog_xmin_epoch;
+	TransactionId xmin, catalog_xmin;
 	static TimestampTz sendTime = 0;
 	static bool master_has_standby_xmin = false;
 
@@ -1207,23 +1207,31 @@ XLogWalRcvSendHSFeedback(bool immed)
 	else
 		xmin = InvalidTransactionId;
 
+	catalog_xmin = InvalidTransactionId;
+
 	/*
 	 * Get epoch and adjust if nextXid and oldestXmin are different sides of
 	 * the epoch boundary.
 	 */
 	GetNextXidAndEpoch(&nextXid, &nextEpoch);
+	xmin_epoch = nextEpoch;
 	if (nextXid < xmin)
-		nextEpoch--;
+		xmin_epoch --;
+	catalog_xmin_epoch = nextEpoch;
+	if (nextXid < catalog_xmin)
+		catalog_xmin_epoch --;
 
-	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
-		 xmin, nextEpoch);
+	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
+		 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
 
 	/* Construct the message and send it. */
 	resetStringInfo(&reply_message);
 	pq_sendbyte(&reply_message, 'h');
 	pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
 	pq_sendint(&reply_message, xmin, 4);
-	pq_sendint(&reply_message, nextEpoch, 4);
+	pq_sendint(&reply_message, xmin_epoch, 4);
+	pq_sendint(&reply_message, catalog_xmin, 4);
+	pq_sendint(&reply_message, catalog_xmin_epoch, 4);
 	walrcv_send(reply_message.data, reply_message.len);
 	if (TransactionIdIsValid(xmin))
 		master_has_standby_xmin = true;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 1ea2a5c..efa76e1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1633,6 +1633,8 @@ ProcessStandbyHSFeedbackMessage(void)
 	uint32		nextEpoch;
 	TransactionId feedbackXmin;
 	uint32		feedbackEpoch;
+	TransactionId feedbackCatalogXmin;
+	uint32		feedbackCatalogEpoch;
 
 	/*
 	 * Decipher the reply message. The caller already consumed the msgtype
@@ -1641,10 +1643,18 @@ ProcessStandbyHSFeedbackMessage(void)
 	(void) pq_getmsgint64(&reply_message);		/* sendTime; not used ATM */
 	feedbackXmin = pq_getmsgint(&reply_message, 4);
 	feedbackEpoch = pq_getmsgint(&reply_message, 4);
+	/*
+	 * A 10.0+ standby's walsender passes the lowest catalog xmin of any
+	 * replication slot up to the master.
+	 */
+	feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
+	feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
 
-	elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
+	elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
 		 feedbackXmin,
-		 feedbackEpoch);
+		 feedbackEpoch,
+		 feedbackCatalogXmin,
+		 feedbackCatalogEpoch);
 
 	/* Unset WalSender's xmin if the feedback message value is invalid */
 	if (!TransactionIdIsNormal(feedbackXmin))
-- 
2.5.5

From ade1ef3e9907bd6729dfff4840d0ec79f155a868 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Mon, 5 Sep 2016 15:38:40 +0800
Subject: [PATCH 2/6] Make walsender respect catalog_xmin in hot standby
 feedback messages

The walsender now respects the new catalog_xmin field in the hot standby
feedback message. It uses it to set the catalog_xmin field on its physical
replication slot if one is in use. Otherwise it sets its process xmin to the
older of the xmin and catalog_xmin, so the outcome is the same as before
the protocol change.

In the process, factor out walsender's sanity check for xid+epoch wraparound
into a separate TransactionIdInRecentPast() function since we're now checking
it in two places.
---
 src/backend/replication/walsender.c | 109 ++++++++++++++++++++++++++++--------
 1 file changed, 86 insertions(+), 23 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index efa76e1..c40f3e1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -215,6 +215,7 @@ static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
+static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
 
@@ -1531,6 +1532,11 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 	 * be energy wasted - the worst lost information can do here is give us
 	 * wrong information in a statistics view - we'll just potentially be more
 	 * conservative in removing files.
+	 *
+	 * We don't have to do any effective_xmin / effective_catalog_xmin testing
+	 * here either, like for LogicalConfirmReceivedLocation. If we received
+	 * the xmin and catalog_xmin from downstream replication slots we know they
+	 * were already confirmed there,
 	 */
 }
 
@@ -1593,7 +1599,7 @@ ProcessStandbyReplyMessage(void)
 
 /* compute new replication slot xmin horizon if needed */
 static void
-PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
+PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 {
 	bool		changed = false;
 	ReplicationSlot *slot = MyReplicationSlot;
@@ -1614,6 +1620,22 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
 		slot->data.xmin = feedbackXmin;
 		slot->effective_xmin = feedbackXmin;
 	}
+	/*
+	 * If the physical slot is relaying catalog_xmin for logical replication
+	 * slots on the replica it's safe to act on catalog_xmin advances
+	 * immediately too. The replica will only send a new catalog_xmin via
+	 * feedback when it advances its effective_catalog_xmin, so it's done the
+	 * delay-until-confirmed dance for us and knows it won't need the data
+	 * we're protecting from vacuum again.
+	 */
+	if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
+		!TransactionIdIsNormal(feedbackCatalogXmin) ||
+		TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
+	{
+		changed = true;
+		slot->data.catalog_xmin = feedbackCatalogXmin;
+		slot->effective_catalog_xmin = feedbackCatalogXmin;
+	}
 	SpinLockRelease(&slot->mutex);
 
 	if (changed)
@@ -1624,13 +1646,43 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
 }
 
 /*
+ * Check that the provided xmin/epoch are sane, that is, not in the future
+ * and not so far back as to be already wrapped around.
+ *
+ * Epoch of nextXid should be same as standby, or if the counter has
+ * wrapped, then one greater than standby.
+ */
+static bool
+TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
+{
+	TransactionId nextXid;
+	uint32		nextEpoch;
+
+	GetNextXidAndEpoch(&nextXid, &nextEpoch);
+
+	if (xid <= nextXid)
+	{
+		if (epoch != nextEpoch)
+			return false;
+	}
+	else
+	{
+		if (epoch + 1 != nextEpoch)
+			return false;
+	}
+
+	if (!TransactionIdPrecedesOrEquals(xid, nextXid))
+		return false;				/* epoch OK, but it's wrapped around */
+
+	return true;
+}
+
+/*
  * Hot Standby feedback
  */
 static void
 ProcessStandbyHSFeedbackMessage(void)
 {
-	TransactionId nextXid;
-	uint32		nextEpoch;
 	TransactionId feedbackXmin;
 	uint32		feedbackEpoch;
 	TransactionId feedbackCatalogXmin;
@@ -1656,38 +1708,35 @@ ProcessStandbyHSFeedbackMessage(void)
 		 feedbackCatalogXmin,
 		 feedbackCatalogEpoch);
 
-	/* Unset WalSender's xmin if the feedback message value is invalid */
-	if (!TransactionIdIsNormal(feedbackXmin))
+	/*
+	 * Unset WalSender's xmins if the feedback message values are invalid.
+	 * This happens when the downstream turned hot_standby_feedback off.
+	 */
+	if (!TransactionIdIsNormal(feedbackXmin)
+		&& !TransactionIdIsNormal(feedbackCatalogXmin))
 	{
 		MyPgXact->xmin = InvalidTransactionId;
 		if (MyReplicationSlot != NULL)
-			PhysicalReplicationSlotNewXmin(feedbackXmin);
+			PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
 		return;
 	}
 
 	/*
 	 * Check that the provided xmin/epoch are sane, that is, not in the future
 	 * and not so far back as to be already wrapped around.  Ignore if not.
-	 *
-	 * Epoch of nextXid should be same as standby, or if the counter has
-	 * wrapped, then one greater than standby.
 	 */
-	GetNextXidAndEpoch(&nextXid, &nextEpoch);
-
-	if (feedbackXmin <= nextXid)
+	if (TransactionIdIsNormal(feedbackXmin) &&
+		!TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
 	{
-		if (feedbackEpoch != nextEpoch)
-			return;
+		return;
 	}
-	else
+
+	if (TransactionIdIsNormal(feedbackCatalogXmin) &&
+		!TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
 	{
-		if (feedbackEpoch + 1 != nextEpoch)
-			return;
+		return;
 	}
 
-	if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
-		return;					/* epoch OK, but it's wrapped around */
-
 	/*
 	 * Set the WalSender's xmin equal to the standby's requested xmin, so that
 	 * the xmin will be taken into account by GetOldestXmin.  This will hold
@@ -1711,15 +1760,29 @@ ProcessStandbyHSFeedbackMessage(void)
 	 * already since a VACUUM could have just finished calling GetOldestXmin.)
 	 *
 	 * If we're using a replication slot we reserve the xmin via that,
-	 * otherwise via the walsender's PGXACT entry.
+	 * otherwise via the walsender's PGXACT entry. We can only track the
+	 * catalog xmin separately when using a slot, so we store the least
+	 * of the two provided when not using a slot.
 	 *
 	 * XXX: It might make sense to generalize the ephemeral slot concept and
 	 * always use the slot mechanism to handle the feedback xmin.
 	 */
 	if (MyReplicationSlot != NULL)		/* XXX: persistency configurable? */
-		PhysicalReplicationSlotNewXmin(feedbackXmin);
+	{
+		PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
+	}
 	else
-		MyPgXact->xmin = feedbackXmin;
+	{
+		if (TransactionIdIsNormal(feedbackCatalogXmin)
+			&& TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
+		{
+			MyPgXact->xmin = feedbackCatalogXmin;
+		}
+		else
+		{
+			MyPgXact->xmin = feedbackXmin;
+		}
+	}
 }
 
 /*
-- 
2.5.5

From c85aec273b4345ee4198ce69cb1f8a29279495b7 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Mon, 5 Sep 2016 16:13:35 +0800
Subject: [PATCH 3/6] Allow GetOldestXmin(...) to optionally disregard the
 catalog_xmin

Add a new ignoreCatalogXmin option to GetOldestXmin(...), for use when
calculating hot standby feedback xmins. Adjust existing call sites.
---
 contrib/pg_visibility/pg_visibility.c |  4 ++--
 contrib/pgstattuple/pgstatapprox.c    |  2 +-
 src/backend/access/transam/xlog.c     |  4 ++--
 src/backend/catalog/index.c           |  2 +-
 src/backend/commands/analyze.c        |  2 +-
 src/backend/commands/vacuum.c         |  4 ++--
 src/backend/replication/walreceiver.c |  2 +-
 src/backend/storage/ipc/procarray.c   | 42 +++++++++++++++++++++--------------
 src/include/storage/procarray.h       |  2 +-
 9 files changed, 36 insertions(+), 28 deletions(-)

diff --git a/contrib/pg_visibility/pg_visibility.c b/contrib/pg_visibility/pg_visibility.c
index 7034066..318caab 100644
--- a/contrib/pg_visibility/pg_visibility.c
+++ b/contrib/pg_visibility/pg_visibility.c
@@ -523,7 +523,7 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
 	if (all_visible)
 	{
 		/* Don't pass rel; that will fail in recovery. */
-		OldestXmin = GetOldestXmin(NULL, true);
+		OldestXmin = GetOldestXmin(NULL, true, false);
 	}
 
 	rel = relation_open(relid, AccessShareLock);
@@ -646,7 +646,7 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen)
 				 * a buffer lock. And this shouldn't happen often, so it's
 				 * worth being careful so as to avoid false positives.
 				 */
-				RecomputedOldestXmin = GetOldestXmin(NULL, true);
+				RecomputedOldestXmin = GetOldestXmin(NULL, true, false);
 
 				if (!TransactionIdPrecedes(OldestXmin, RecomputedOldestXmin))
 					record_corrupt_item(items, &tuple.t_data->t_ctid);
diff --git a/contrib/pgstattuple/pgstatapprox.c b/contrib/pgstattuple/pgstatapprox.c
index a49ff54..3674c05 100644
--- a/contrib/pgstattuple/pgstatapprox.c
+++ b/contrib/pgstattuple/pgstatapprox.c
@@ -67,7 +67,7 @@ statapprox_heap(Relation rel, output_type *stat)
 	TransactionId OldestXmin;
 	uint64		misc_count = 0;
 
-	OldestXmin = GetOldestXmin(rel, true);
+	OldestXmin = GetOldestXmin(rel, true, false);
 	bstrategy = GetAccessStrategy(BAS_BULKREAD);
 
 	nblocks = RelationGetNumberOfBlocks(rel);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 2189c22..8546649 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -8635,7 +8635,7 @@ CreateCheckPoint(int flags)
 	 * StartupSUBTRANS hasn't been called yet.
 	 */
 	if (!RecoveryInProgress())
-		TruncateSUBTRANS(GetOldestXmin(NULL, false));
+		TruncateSUBTRANS(GetOldestXmin(NULL, false, false));
 
 	/* Real work is done, but log and update stats before releasing lock. */
 	LogCheckpointEnd(false);
@@ -8974,7 +8974,7 @@ CreateRestartPoint(int flags)
 	 * this because StartupSUBTRANS hasn't been called yet.
 	 */
 	if (EnableHotStandby)
-		TruncateSUBTRANS(GetOldestXmin(NULL, false));
+		TruncateSUBTRANS(GetOldestXmin(NULL, false, false));
 
 	/* Real work is done, but log and update before releasing lock. */
 	LogCheckpointEnd(true);
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index b0b43cf..fd86530 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -2255,7 +2255,7 @@ IndexBuildHeapRangeScan(Relation heapRelation,
 	{
 		snapshot = SnapshotAny;
 		/* okay to ignore lazy VACUUMs here */
-		OldestXmin = GetOldestXmin(heapRelation, true);
+		OldestXmin = GetOldestXmin(heapRelation, true, false);
 	}
 
 	scan = heap_beginscan_strat(heapRelation,	/* relation */
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index c617abb..2170566 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -992,7 +992,7 @@ acquire_sample_rows(Relation onerel, int elevel,
 	totalblocks = RelationGetNumberOfBlocks(onerel);
 
 	/* Need a cutoff xmin for HeapTupleSatisfiesVacuum */
-	OldestXmin = GetOldestXmin(onerel, true);
+	OldestXmin = GetOldestXmin(onerel, true, false);
 
 	/* Prepare for sampling block numbers */
 	BlockSampler_Init(&bs, totalblocks, targrows, random());
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 58bbf55..79ec690 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -497,7 +497,7 @@ vacuum_set_xid_limits(Relation rel,
 	 * always an independent transaction.
 	 */
 	*oldestXmin =
-		TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true), rel);
+		TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true, false), rel);
 
 	Assert(TransactionIdIsNormal(*oldestXmin));
 
@@ -909,7 +909,7 @@ vac_update_datfrozenxid(void)
 	 * committed pg_class entries for new tables; see AddNewRelationTuple().
 	 * So we cannot produce a wrong minimum by starting with this.
 	 */
-	newFrozenXid = GetOldestXmin(NULL, true);
+	newFrozenXid = GetOldestXmin(NULL, true, false);
 
 	/*
 	 * Similarly, initialize the MultiXact "min" with the value that would be
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 0b92aac..0f38328 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1203,7 +1203,7 @@ XLogWalRcvSendHSFeedback(bool immed)
 	 * everything else has been checked.
 	 */
 	if (hot_standby_feedback)
-		xmin = GetOldestXmin(NULL, false);
+		xmin = GetOldestXmin(NULL, false, false);
 	else
 		xmin = InvalidTransactionId;
 
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index e5d487d..1912790 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -1298,17 +1298,22 @@ TransactionIdIsActive(TransactionId xid)
  * process can set its xmin based on transactions that are no longer running
  * in the master but are still being replayed on the standby, thus possibly
  * making the GetOldestXmin reading go backwards.  In this case there is a
- * possibility that we lose data that the standby would like to have, but
- * there is little we can do about that --- data is only protected if the
- * walsender runs continuously while queries are executed on the standby.
- * (The Hot Standby code deals with such cases by failing standby queries
- * that needed to access already-removed data, so there's no integrity bug.)
+ * possibility that we lose data that the standby would like to have
+ * unless the standby uses a replication slot to make its xmin persistent
+ * even when it isn't connected. The Hot Standby code deals with such cases by
+ * failing standby queries that needed to access already-removed data, so
+ * there's no integrity bug.
+ *
  * The return value is also adjusted with vacuum_defer_cleanup_age, so
  * increasing that setting on the fly is another easy way to make
  * GetOldestXmin() move backwards, with no consequences for data integrity.
+ *
+ * The caller may request that replication slots' catalog_xmin values be
+ * disregarded when calculating the global xmin. The caller must account
+ * for catalog_xmin separately.
  */
 TransactionId
-GetOldestXmin(Relation rel, bool ignoreVacuum)
+GetOldestXmin(Relation rel, bool ignoreVacuum, bool ignoreCatalogXmin)
 {
 	ProcArrayStruct *arrayP = procArray;
 	TransactionId result;
@@ -1433,17 +1438,20 @@ GetOldestXmin(Relation rel, bool ignoreVacuum)
 		NormalTransactionIdPrecedes(replication_slot_xmin, result))
 		result = replication_slot_xmin;
 
-	/*
-	 * After locks have been released and defer_cleanup_age has been applied,
-	 * check whether we need to back up further to make logical decoding
-	 * possible. We need to do so if we're computing the global limit (rel =
-	 * NULL) or if the passed relation is a catalog relation of some kind.
-	 */
-	if ((rel == NULL ||
-		 RelationIsAccessibleInLogicalDecoding(rel)) &&
-		TransactionIdIsValid(replication_slot_catalog_xmin) &&
-		NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
-		result = replication_slot_catalog_xmin;
+	if (!ignoreCatalogXmin)
+	{
+		/*
+		 * After locks have been released and defer_cleanup_age has been applied,
+		 * check whether we need to back up further to make logical decoding
+		 * possible. We need to do so if we're computing the global limit (rel =
+		 * NULL) or if the passed relation is a catalog relation of some kind.
+		 */
+		if ((rel == NULL ||
+			 RelationIsAccessibleInLogicalDecoding(rel)) &&
+			TransactionIdIsValid(replication_slot_catalog_xmin) &&
+			NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
+			result = replication_slot_catalog_xmin;
+	}
 
 	return result;
 }
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index dd37c0c..a63f6ac 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -53,7 +53,7 @@ extern RunningTransactions GetRunningTransactionData(void);
 
 extern bool TransactionIdIsInProgress(TransactionId xid);
 extern bool TransactionIdIsActive(TransactionId xid);
-extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum);
+extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum, bool ignoreCatalogXmin);
 extern TransactionId GetOldestActiveTransactionId(void);
 extern TransactionId GetOldestSafeDecodingTransactionId(void);
 
-- 
2.5.5

From 89956e6f6154d43f4b0aa89fd234e4b4a3ffb171 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Mon, 5 Sep 2016 16:23:57 +0800
Subject: [PATCH 4/6] Send catalog_xmin separately in hot_standby_feedback
 messages

Now that the protocol supports reporting catalog_xmin separately and
GetOldestXmin() allows us to exclude the catalog_xmin from the calculated xmin,
actually send a separate catalog_xmin to the master.

This change is necessary, but not sufficient, to allow logical decoding
on a standby.
---
 src/backend/replication/walreceiver.c | 19 +++++++++++++++----
 1 file changed, 15 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 0f38328..344f0e8 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1203,11 +1203,22 @@ XLogWalRcvSendHSFeedback(bool immed)
 	 * everything else has been checked.
 	 */
 	if (hot_standby_feedback)
-		xmin = GetOldestXmin(NULL, false, false);
+	{
+		/*
+		 * Usually GetOldestXmin() would include the catalog_xmin in its
+		 * calculations, but we don't want to hold upstream back from vacuuming
+		 * normal user table tuples just because they're within the
+		 * catalog_xmin horizon of logical replication slots on this standby.
+		 * Instead we report the catalog_xmin to the upstream separately.
+		 */
+		xmin = GetOldestXmin(NULL, false, true);
+		ProcArrayGetReplicationSlotXmin(NULL, &catalog_xmin);
+	}
 	else
+	{
 		xmin = InvalidTransactionId;
-
-	catalog_xmin = InvalidTransactionId;
+		catalog_xmin = InvalidTransactionId;
+	}
 
 	/*
 	 * Get epoch and adjust if nextXid and oldestXmin are different sides of
@@ -1233,7 +1244,7 @@ XLogWalRcvSendHSFeedback(bool immed)
 	pq_sendint(&reply_message, catalog_xmin, 4);
 	pq_sendint(&reply_message, catalog_xmin_epoch, 4);
 	walrcv_send(reply_message.data, reply_message.len);
-	if (TransactionIdIsValid(xmin))
+	if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
 		master_has_standby_xmin = true;
 	else
 		master_has_standby_xmin = false;
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to