On 23 March 2017 at 17:44, Craig Ringer <cr...@2ndquadrant.com> wrote:

Minor update to catalog_xmin walsender patch to fix failure to
parenthesize definition of PROCARRAY_PROC_FLAGS_MASK .

This one's ready to go. Working on drop slots on DB drop now.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From b5e34ecaa8f43825fe41ae2e2bbf0a97258cb56a Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Wed, 22 Mar 2017 12:29:13 +0800
Subject: [PATCH] Report catalog_xmin separately to xmin in hot standby
 feedback

The catalog_xmin of slots on a standby was reported as part of the standby's
xmin, causing the master's xmin to be held down. This could cause considerable
unnecessary bloat on the master.

Instead, report catalog_xmin as a separate field in hot_standby_feedback. If
the upstream walsender is using a physical replication slot, store the
catalog_xmin in the slot's catalog_xmin field. If the upstream doesn't use a
slot and has only a PGPROC entry behaviour doesn't change, as we store the
combined xmin and catalog_xmin in the PGPROC entry.

There's no backward compatibility concern here, as nothing except another
postgres instance of the same major version has any business sending hot
standby feedback and it's only used on the physical replication protocol.
---
 doc/src/sgml/protocol.sgml                         |  33 ++++++-
 src/backend/replication/walreceiver.c              |  45 +++++++--
 src/backend/replication/walsender.c                | 110 +++++++++++++++------
 src/backend/storage/ipc/procarray.c                |  12 ++-
 src/include/storage/proc.h                         |   5 +
 src/include/storage/procarray.h                    |  11 +++
 .../recovery/t/010_logical_decoding_timelines.pl   |  38 ++++++-
 7 files changed, 202 insertions(+), 52 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 48ca414..b3a5026 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1916,10 +1916,11 @@ The commands accepted in walsender mode are:
       </term>
       <listitem>
       <para>
-          The standby's current xmin. This may be 0, if the standby is
-          sending notification that Hot Standby feedback will no longer
-          be sent on this connection. Later non-zero messages may
-          reinitiate the feedback mechanism.
+          The standby's current global xmin, excluding the catalog_xmin from any
+          replication slots. If both this value and the following
+          catalog_xmin are 0 this is treated as a notification that Hot Standby
+          feedback will no longer be sent on this connection. Later non-zero
+          messages may reinitiate the feedback mechanism.
       </para>
       </listitem>
       </varlistentry>
@@ -1929,7 +1930,29 @@ The commands accepted in walsender mode are:
       </term>
       <listitem>
       <para>
-          The standby's current epoch.
+          The epoch of the global xmin xid on the standby.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The lowest catalog_xmin of any replication slots on the standby. Set to 0
+          if no catalog_xmin exists on the standby or if hot standby feedback is being
+          disabled.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The epoch of the catalog_xmin xid on the standby.
       </para>
       </listitem>
       </varlistentry>
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 31c567b..0f22f17 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1175,8 +1175,8 @@ XLogWalRcvSendHSFeedback(bool immed)
 {
 	TimestampTz now;
 	TransactionId nextXid;
-	uint32		nextEpoch;
-	TransactionId xmin;
+	uint32		xmin_epoch, catalog_xmin_epoch;
+	TransactionId xmin, catalog_xmin;
 	static TimestampTz sendTime = 0;
 	/* initially true so we always send at least one feedback message */
 	static bool master_has_standby_xmin = true;
@@ -1221,29 +1221,56 @@ XLogWalRcvSendHSFeedback(bool immed)
 	 * everything else has been checked.
 	 */
 	if (hot_standby_feedback)
-		xmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT);
+	{
+		TransactionId slot_xmin;
+
+		/*
+		 * Usually GetOldestXmin() would include the global replication slot
+		 * xmin and catalog_xmin in its calculations, but we don't want to hold
+		 * upstream back from vacuuming normal user table tuples just because
+		 * they're within the catalog_xmin horizon of logical replication slots
+		 * on this standby, so we ignore slot xmin and catalog_xmin GetOldestXmin
+		 * then deal with them ourselves.
+		 */
+		xmin = GetOldestXmin(NULL,
+							 PROCARRAY_FLAGS_DEFAULT|PROCARRAY_SLOTS_XMIN);
+
+		ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
+
+		if (TransactionIdIsValid(slot_xmin) &&
+			TransactionIdPrecedes(slot_xmin, xmin))
+			xmin = slot_xmin;
+	}
 	else
+	{
 		xmin = InvalidTransactionId;
+		catalog_xmin = InvalidTransactionId;
+	}
 
 	/*
 	 * Get epoch and adjust if nextXid and oldestXmin are different sides of
 	 * the epoch boundary.
 	 */
-	GetNextXidAndEpoch(&nextXid, &nextEpoch);
+	GetNextXidAndEpoch(&nextXid, &xmin_epoch);
+	catalog_xmin_epoch = xmin_epoch;
 	if (nextXid < xmin)
-		nextEpoch--;
+		xmin_epoch --;
+	if (nextXid < catalog_xmin)
+		catalog_xmin_epoch --;
 
-	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
-		 xmin, nextEpoch);
+	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
+		 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
 
 	/* Construct the message and send it. */
 	resetStringInfo(&reply_message);
 	pq_sendbyte(&reply_message, 'h');
 	pq_sendint64(&reply_message, GetCurrentTimestamp());
 	pq_sendint(&reply_message, xmin, 4);
-	pq_sendint(&reply_message, nextEpoch, 4);
+	pq_sendint(&reply_message, xmin_epoch, 4);
+	pq_sendint(&reply_message, catalog_xmin, 4);
+	pq_sendint(&reply_message, catalog_xmin_epoch, 4);
 	walrcv_send(wrconn, reply_message.data, reply_message.len);
-	if (TransactionIdIsValid(xmin))
+	if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
 		master_has_standby_xmin = true;
 	else
 		master_has_standby_xmin = false;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index a29d0e7..59ae22d 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -242,6 +242,7 @@ static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tran
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
+static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
 
@@ -1756,7 +1757,7 @@ ProcessStandbyReplyMessage(void)
 
 /* compute new replication slot xmin horizon if needed */
 static void
-PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
+PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 {
 	bool		changed = false;
 	ReplicationSlot *slot = MyReplicationSlot;
@@ -1777,6 +1778,14 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
 		slot->data.xmin = feedbackXmin;
 		slot->effective_xmin = feedbackXmin;
 	}
+	if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
+		!TransactionIdIsNormal(feedbackCatalogXmin) ||
+		TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
+	{
+		changed = true;
+		slot->data.catalog_xmin = feedbackCatalogXmin;
+		slot->effective_catalog_xmin = feedbackCatalogXmin;
+	}
 	SpinLockRelease(&slot->mutex);
 
 	if (changed)
@@ -1787,59 +1796,92 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
 }
 
 /*
- * Hot Standby feedback
+ * Check that the provided xmin/epoch are sane, that is, not in the future
+ * and not so far back as to be already wrapped around.
+ *
+ * Epoch of nextXid should be same as standby, or if the counter has
+ * wrapped, then one greater than standby.
+ *
+ * This check doesn't care about whether clog exists for these xids
+ * at all.
  */
-static void
-ProcessStandbyHSFeedbackMessage(void)
+static bool
+TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
 {
 	TransactionId nextXid;
 	uint32		nextEpoch;
+
+	GetNextXidAndEpoch(&nextXid, &nextEpoch);
+
+	if (xid <= nextXid)
+	{
+		if (epoch != nextEpoch)
+			return false;
+	}
+	else
+	{
+		if (epoch + 1 != nextEpoch)
+			return false;
+	}
+
+	if (!TransactionIdPrecedesOrEquals(xid, nextXid))
+		return false;				/* epoch OK, but it's wrapped around */
+
+	return true;
+}
+
+/*
+ * Hot Standby feedback
+ */
+static void
+ProcessStandbyHSFeedbackMessage(void)
+{
 	TransactionId feedbackXmin;
 	uint32		feedbackEpoch;
+	TransactionId feedbackCatalogXmin;
+	uint32		feedbackCatalogEpoch;
 
 	/*
 	 * Decipher the reply message. The caller already consumed the msgtype
-	 * byte.
+	 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
+	 * of this message.
 	 */
 	(void) pq_getmsgint64(&reply_message);		/* sendTime; not used ATM */
 	feedbackXmin = pq_getmsgint(&reply_message, 4);
 	feedbackEpoch = pq_getmsgint(&reply_message, 4);
+	feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
+	feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
 
-	elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
+	elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
 		 feedbackXmin,
-		 feedbackEpoch);
+		 feedbackEpoch,
+		 feedbackCatalogXmin,
+		 feedbackCatalogEpoch);
 
-	/* Unset WalSender's xmin if the feedback message value is invalid */
-	if (!TransactionIdIsNormal(feedbackXmin))
+	/*
+	 * Unset WalSender's xmins if the feedback message values are invalid.
+	 * This happens when the downstream turned hot_standby_feedback off.
+	 */
+	if (!TransactionIdIsNormal(feedbackXmin)
+		&& !TransactionIdIsNormal(feedbackCatalogXmin))
 	{
 		MyPgXact->xmin = InvalidTransactionId;
 		if (MyReplicationSlot != NULL)
-			PhysicalReplicationSlotNewXmin(feedbackXmin);
+			PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
 		return;
 	}
 
 	/*
 	 * Check that the provided xmin/epoch are sane, that is, not in the future
 	 * and not so far back as to be already wrapped around.  Ignore if not.
-	 *
-	 * Epoch of nextXid should be same as standby, or if the counter has
-	 * wrapped, then one greater than standby.
 	 */
-	GetNextXidAndEpoch(&nextXid, &nextEpoch);
+	if (TransactionIdIsNormal(feedbackXmin) &&
+		!TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
+		return;
 
-	if (feedbackXmin <= nextXid)
-	{
-		if (feedbackEpoch != nextEpoch)
-			return;
-	}
-	else
-	{
-		if (feedbackEpoch + 1 != nextEpoch)
-			return;
-	}
-
-	if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
-		return;					/* epoch OK, but it's wrapped around */
+	if (TransactionIdIsNormal(feedbackCatalogXmin) &&
+		!TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
+		return;
 
 	/*
 	 * Set the WalSender's xmin equal to the standby's requested xmin, so that
@@ -1864,15 +1906,23 @@ ProcessStandbyHSFeedbackMessage(void)
 	 * already since a VACUUM could have just finished calling GetOldestXmin.)
 	 *
 	 * If we're using a replication slot we reserve the xmin via that,
-	 * otherwise via the walsender's PGXACT entry.
+	 * otherwise via the walsender's PGXACT entry. We can only track the
+	 * catalog xmin separately when using a slot, so we store the least
+	 * of the two provided when not using a slot.
 	 *
 	 * XXX: It might make sense to generalize the ephemeral slot concept and
 	 * always use the slot mechanism to handle the feedback xmin.
 	 */
 	if (MyReplicationSlot != NULL)		/* XXX: persistency configurable? */
-		PhysicalReplicationSlotNewXmin(feedbackXmin);
+		PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
 	else
-		MyPgXact->xmin = feedbackXmin;
+	{
+		if (TransactionIdIsNormal(feedbackCatalogXmin)
+			&& TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
+			MyPgXact->xmin = feedbackCatalogXmin;
+		else
+			MyPgXact->xmin = feedbackXmin;
+	}
 }
 
 /*
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 40c3247..7c2e1e1 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -1264,6 +1264,10 @@ TransactionIdIsActive(TransactionId xid)
  * corresponding flags is set. Typically, if you want to ignore ones with
  * PROC_IN_VACUUM flag, you can use PROCARRAY_FLAGS_VACUUM.
  *
+ * PROCARRAY_SLOTS_XMIN causes GetOldestXmin to ignore the xmin and
+ * catalog_xmin of any replication slots that exist in the system when
+ * calculating the oldest xmin.
+ *
  * This is used by VACUUM to decide which deleted tuples must be preserved in
  * the passed in table. For shared relations backends in all databases must be
  * considered, but for non-shared relations that's not required, since only
@@ -1342,7 +1346,7 @@ GetOldestXmin(Relation rel, int flags)
 		volatile PGPROC *proc = &allProcs[pgprocno];
 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
 
-		if (pgxact->vacuumFlags & flags)
+		if (pgxact->vacuumFlags & (flags & PROCARRAY_PROC_FLAGS_MASK))
 			continue;
 
 		if (allDbs ||
@@ -1418,7 +1422,8 @@ GetOldestXmin(Relation rel, int flags)
 	/*
 	 * Check whether there are replication slots requiring an older xmin.
 	 */
-	if (TransactionIdIsValid(replication_slot_xmin) &&
+	if (!(flags & PROCARRAY_SLOTS_XMIN) &&
+		TransactionIdIsValid(replication_slot_xmin) &&
 		NormalTransactionIdPrecedes(replication_slot_xmin, result))
 		result = replication_slot_xmin;
 
@@ -1428,7 +1433,8 @@ GetOldestXmin(Relation rel, int flags)
 	 * possible. We need to do so if we're computing the global limit (rel =
 	 * NULL) or if the passed relation is a catalog relation of some kind.
 	 */
-	if ((rel == NULL ||
+	if (!(flags & PROCARRAY_SLOTS_XMIN) &&
+		(rel == NULL ||
 		 RelationIsAccessibleInLogicalDecoding(rel)) &&
 		TransactionIdIsValid(replication_slot_catalog_xmin) &&
 		NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 945dd1d..1b345fa 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -44,6 +44,10 @@ struct XidCache
  *
  * Note: If you modify these flags, you need to modify PROCARRAY_XXX flags
  * in src/include/storage/procarray.h.
+ *
+ * PROC_RESERVED may later be assigned for use in vacuumFlags, but its value is
+ * used for PROCARRAY_SLOTS_XMIN in procarray.h, so GetOldestXmin won't be able
+ * to match and ignore processes with this flag set.
  */
 #define		PROC_IS_AUTOVACUUM	0x01	/* is it an autovac worker? */
 #define		PROC_IN_VACUUM		0x02	/* currently running lazy vacuum */
@@ -51,6 +55,7 @@ struct XidCache
 #define		PROC_VACUUM_FOR_WRAPAROUND	0x08	/* set by autovac only */
 #define		PROC_IN_LOGICAL_DECODING	0x10	/* currently doing logical
 												 * decoding outside xact */
+#define		PROC_RESERVED				0x20	/* reserved for procarray */
 
 /* flags reset at EOXact */
 #define		PROC_VACUUM_STATE_MASK \
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index c8e1ae5..9b42e49 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -32,6 +32,17 @@
 #define		PROCARRAY_LOGICAL_DECODING_FLAG	0x10	/* currently doing logical
 													 * decoding outside xact */
 
+#define		PROCARRAY_SLOTS_XMIN			0x20	/* replication slot xmin,
+													 * catalog_xmin */
+/*
+ * Only flags in PROCARRAY_PROC_FLAGS_MASK are considered when matching
+ * PGXACT->vacuumFlags. Other flags are used for different purposes and
+ * have no corresponding PROC flag equivalent.
+ */
+#define		PROCARRAY_PROC_FLAGS_MASK	(PROCARRAY_VACUUM_FLAG | \
+										 PROCARRAY_ANALYZE_FLAG | \
+										 PROCARRAY_LOGICAL_DECODING_FLAG)
+
 /* Use the following flags as an input "flags" to GetOldestXmin function */
 /* Consider all backends except for logical decoding ones which manage xmin separately */
 #define		PROCARRAY_FLAGS_DEFAULT			PROCARRAY_LOGICAL_DECODING_FLAG
diff --git a/src/test/recovery/t/010_logical_decoding_timelines.pl b/src/test/recovery/t/010_logical_decoding_timelines.pl
index 09830dc..4561a06 100644
--- a/src/test/recovery/t/010_logical_decoding_timelines.pl
+++ b/src/test/recovery/t/010_logical_decoding_timelines.pl
@@ -20,7 +20,7 @@ use warnings;
 
 use PostgresNode;
 use TestLib;
-use Test::More tests => 7;
+use Test::More tests => 10;
 use RecursiveCopy;
 use File::Copy;
 use IPC::Run ();
@@ -31,10 +31,14 @@ my ($stdout, $stderr, $ret);
 # Initialize master node
 my $node_master = get_new_node('master');
 $node_master->init(allows_streaming => 1, has_archiving => 1);
-$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
-$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n");
-$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n");
-$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n");
+$node_master->append_conf('postgresql.conf', q[
+wal_level = 'logical'
+max_replication_slots = 3
+max_wal_senders = 2
+log_min_messages = 'debug2'
+hot_standby_feedback = on
+wal_receiver_status_interval = 1
+]);
 $node_master->dump_info;
 $node_master->start;
 
@@ -51,11 +55,17 @@ $node_master->safe_psql('postgres', 'CHECKPOINT;');
 my $backup_name = 'b1';
 $node_master->backup_fs_hot($backup_name);
 
+$node_master->safe_psql('postgres',
+	q[SELECT pg_create_physical_replication_slot('phys_slot');]);
+
 my $node_replica = get_new_node('replica');
 $node_replica->init_from_backup(
 	$node_master, $backup_name,
 	has_streaming => 1,
 	has_restoring => 1);
+$node_replica->append_conf(
+	'recovery.conf', q[primary_slot_name = 'phys_slot']);
+
 $node_replica->start;
 
 $node_master->safe_psql('postgres',
@@ -71,6 +81,24 @@ $stdout = $node_replica->safe_psql('postgres',
 is($stdout, 'before_basebackup',
 	'Expected to find only slot before_basebackup on replica');
 
+# Examine the physical slot the replica uses to stream changes
+# from the master to make sure its hot_standby_feedback
+# has locked in a catalog_xmin on the physical slot, and that
+# any xmin is < the catalog_xmin
+$node_master->poll_query_until('postgres', q[
+	SELECT catalog_xmin IS NOT NULL
+	FROM pg_replication_slots
+	WHERE slot_name = 'phys_slot'
+	]);
+my $phys_slot = $node_master->slot('phys_slot');
+isnt($phys_slot->{'xmin'}, '',
+	'xmin assigned on physical slot of master');
+isnt($phys_slot->{'catalog_xmin'}, '',
+	'catalog_xmin assigned on physical slot of master');
+# Ignore wrap-around here, we're on a new cluster:
+cmp_ok($phys_slot->{'xmin'}, '>=', $phys_slot->{'catalog_xmin'},
+	   'xmin on physical slot must not be lower than catalog_xmin');
+
 # Boom, crash
 $node_master->stop('immediate');
 
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to