On Sun, Oct 9, 2022 at 3:22 AM Nathan Bossart <nathandboss...@gmail.com> wrote:
>
> As I mentioned upthread [0], I'm still a little concerned that this patch
> will cause the state machine to go straight from archive recovery to
> streaming replication, skipping recovery from pg_wal.
>
> [0] https://postgr.es/m/20220906215704.GA2084086%40nathanxps13

Yes, it goes straight to streaming replication skipping recovery from
pg_wal with the patch.

> I wonder if this
> could be resolved by moving the standby to the pg_wal phase instead.
> Concretely, this line
>
> +                               if (switchSource)
> +                                       break;
>
> would instead change currentSource from XLOG_FROM_ARCHIVE to
> XLOG_FROM_PG_WAL before the call to XLogFileReadAnyTLI().  I suspect the
> behavior would be basically the same, but it would maintain the existing
> ordering.

We can give it a chance to restore from pg_wal before switching to
streaming to not change any behaviour of the state machine. But, not
definitely by setting currentSource to XLOG_FROM_WAL, we basically
never explicitly set currentSource to XLOG_FROM_WAL, other than when
not in archive recovery i.e. InArchiveRecovery is false. Also, see the
comment [1].

Instead, the simplest would be to just pass XLOG_FROM_WAL to
XLogFileReadAnyTLI() when we're about to switch the source to stream
mode. This doesn't change the existing behaviour.

> However, I do see the following note elsewhere in xlogrecovery.c:
>
>  * The segment can be fetched via restore_command, or via walreceiver having
>  * streamed the record, or it can already be present in pg_wal. Checking
>  * pg_wal is mainly for crash recovery, but it will be polled in standby mode
>  * too, in case someone copies a new segment directly to pg_wal. That is not
>  * documented or recommended, though.
>
> Given this information, the present behavior might not be too important,
> but I don't see a point in changing it without good reason.

Yeah, with the attached patch we don't skip pg_wal before switching to
streaming mode.

I've also added a note in the 'Standby Server Operation' section about
the new feature.

Please review the v8 patch further.

Unrelated to this patch, the fact that the standby polls pg_wal is not
documented or recommended, is not true, it is actually documented [2].
Whether or not we change the docs to be something like [3], is a
separate discussion.

[1]
            /*
             * We just successfully read a file in pg_wal. We prefer files in
             * the archive over ones in pg_wal, so try the next file again
             * from the archive first.
             */

[2] 
https://www.postgresql.org/docs/current/warm-standby.html#STANDBY-SERVER-OPERATION
The standby server will also attempt to restore any WAL found in the
standby cluster's pg_wal directory. That typically happens after a
server restart, when the standby replays again WAL that was streamed
from the primary before the restart, but you can also manually copy
files to pg_wal at any time to have them replayed.

[3]
The standby server will also attempt to restore any WAL found in the
standby cluster's pg_wal directory. That typically happens after a
server restart, when the standby replays again WAL that was streamed
from the primary before the restart, but you can also manually copy
files to pg_wal at any time to have them replayed. However, copying of
WAL files manually is not recommended.

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From 171e11088cca63e99726b49b1b9b408eed81f299 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sun, 9 Oct 2022 08:11:10 +0000
Subject: [PATCH v8] Allow standby to switch WAL source from archive to
 streaming replication

A standby typically switches to streaming replication (get WAL
from primary), only when receive from WAL archive finishes (no
more WAL left there) or fails for any reason. Reading WAL from
archive may not always be efficient and cheaper because network
latencies, disk IO cost might differ on the archive as compared
to primary and often the archive may sit far from standby
impacting recovery performance on the standby. Hence reading WAL
from the primary, by setting this parameter, as opposed to the
archive enables the standby to catch up with the primary sooner
thus reducing replication lag and avoiding WAL files accumulation
on the primary.

This feature adds a new GUC that specifies amount of time after
which standby attempts to switch WAL source from WAL archive to
streaming replication. If the standby fails to switch to stream
mode, it falls back to archive mode.

Reported-by: SATYANARAYANA NARLAPURAM
Author: Bharath Rupireddy
Reviewed-by: Cary Huang, Nathan Bossart
Reviewed-by: Kyotaro Horiguchi
Discussion: https://www.postgresql.org/message-id/CAHg+QDdLmfpS0n0U3U+e+dw7X7jjEOsJJ0aLEsrtxs-tUyf5Ag@mail.gmail.com
---
 doc/src/sgml/config.sgml                      |  45 ++++++
 doc/src/sgml/high-availability.sgml           |   7 +
 src/backend/access/transam/xlogrecovery.c     | 132 ++++++++++++++++--
 src/backend/utils/misc/guc_tables.c           |  12 ++
 src/backend/utils/misc/postgresql.conf.sample |   4 +
 src/include/access/xlogrecovery.h             |   1 +
 src/test/recovery/t/034_wal_source_switch.pl  | 126 +++++++++++++++++
 7 files changed, 312 insertions(+), 15 deletions(-)
 create mode 100644 src/test/recovery/t/034_wal_source_switch.pl

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 66312b53b8..85baac9bbb 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4847,6 +4847,51 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
       </listitem>
      </varlistentry>
 
+    <varlistentry id="guc-streaming-replication-retry-interval" xreflabel="streaming_replication_retry_interval">
+      <term><varname>streaming_replication_retry_interval</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>streaming_replication_retry_interval</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Specifies amount of time after which standby attempts to switch WAL
+        source from WAL archive to streaming replication (get WAL from
+        primary). If the standby fails to switch to stream mode, it falls back
+        to archive mode.
+        If this value is specified without units, it is taken as milliseconds.
+        The default is five minutes (<literal>5min</literal>).
+        With a lower setting of this parameter, the standby makes frequent
+        WAL source switch attempts when the primary is lost for quite longer.
+        To avoid this, set a reasonable value.
+        A setting of <literal>0</literal> disables the feature. When disabled,
+        the standby typically switches to stream mode, only when receive from
+        WAL archive finishes (no more WAL left there) or fails for any reason.
+        This parameter can only be set in
+        the <filename>postgresql.conf</filename> file or on the server
+        command line.
+       </para>
+       <para>
+        Reading WAL from archive may not always be efficient and cheaper
+        because network latencies, disk IO cost might differ on the archive as
+        compared to primary and often the archive may sit far from standby
+        impacting recovery performance on the standby. Hence reading WAL
+        from the primary, by setting this parameter, as opposed to the archive
+        enables the standby to catch up with the primary sooner thus reducing
+        replication lag and avoiding WAL files accumulation on the primary.
+      </para>
+       <para>
+        Note that the standby may not always attempt to switch source from
+        WAL archive to streaming replication at exact
+        <varname>streaming_replication_retry_interval</varname> intervals.
+        For example, if the parameter is set to <literal>1min</literal> and
+        fetching from WAL archive takes <literal>5min</literal>, then the
+        source switch attempt happens for the next WAL after current WAL is
+        fetched from WAL archive and applied.
+      </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-recovery-min-apply-delay" xreflabel="recovery_min_apply_delay">
       <term><varname>recovery_min_apply_delay</varname> (<type>integer</type>)
       <indexterm>
diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index b2b3129397..e38ce258e7 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -636,6 +636,13 @@ protocol to make nodes agree on a serializable transactional order.
     <filename>pg_wal</filename> at any time to have them replayed.
    </para>
 
+   <para>
+    The standby server can attempt to switch to streaming replication after
+    reading WAL from archive, see
+    <xref linkend="guc-streaming-replication-retry-interval"/> for more
+    details.
+   </para>
+
    <para>
     At startup, the standby begins by restoring all WAL available in the
     archive location, calling <varname>restore_command</varname>. Once it
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index cb07694aea..d0939f9b0b 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -68,6 +68,12 @@
 #define RECOVERY_COMMAND_FILE	"recovery.conf"
 #define RECOVERY_COMMAND_DONE	"recovery.done"
 
+
+#define SwitchFromArchiveToStreamEnabled() \
+	(streaming_replication_retry_interval > 0 && \
+	 StandbyMode && \
+	 currentSource == XLOG_FROM_ARCHIVE)
+
 /*
  * GUC support
  */
@@ -91,6 +97,7 @@ TimestampTz recoveryTargetTime;
 const char *recoveryTargetName;
 XLogRecPtr	recoveryTargetLSN;
 int			recovery_min_apply_delay = 0;
+int			streaming_replication_retry_interval = 300000;
 
 /* options formerly taken from recovery.conf for XLOG streaming */
 char	   *PrimaryConnInfo = NULL;
@@ -298,6 +305,11 @@ bool		reachedConsistency = false;
 static char *replay_image_masked = NULL;
 static char *primary_image_masked = NULL;
 
+/*
+ * Holds the timestamp at which WaitForWALToBecomeAvailable()'s state machine
+ * switches to XLOG_FROM_ARCHIVE.
+ */
+static TimestampTz switched_to_archive_at = 0;
 
 /*
  * Shared-memory state for WAL recovery.
@@ -440,6 +452,8 @@ static bool HotStandbyActiveInReplay(void);
 static void SetCurrentChunkStartTime(TimestampTz xtime);
 static void SetLatestXTime(TimestampTz xtime);
 
+static bool ShouldSwitchWALSourceToPrimary(void);
+
 /*
  * Initialization of shared memory for WAL recovery
  */
@@ -3416,8 +3430,10 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 							bool nonblocking)
 {
 	static TimestampTz last_fail_time = 0;
+	bool	switchSource = false;
 	TimestampTz now;
 	bool		streaming_reply_sent = false;
+	XLogSource	readFrom;
 
 	/*-------
 	 * Standby mode is implemented by a state machine:
@@ -3437,6 +3453,11 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 	 * those actions are taken when reading from the previous source fails, as
 	 * part of advancing to the next state.
 	 *
+	 * Try reading WAL from primary after being in XLOG_FROM_ARCHIVE state for
+	 * at least streaming_replication_retry_interval milliseconds. If
+	 * successful, the state machine moves to XLOG_FROM_STREAM state, otherwise
+	 * it falls back to XLOG_FROM_ARCHIVE state.
+	 *
 	 * If standby mode is turned off while reading WAL from stream, we move
 	 * to XLOG_FROM_ARCHIVE and reset lastSourceFailed, to force fetching
 	 * the files (which would be required at end of recovery, e.g., timeline
@@ -3460,19 +3481,20 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 		bool		startWalReceiver = false;
 
 		/*
-		 * First check if we failed to read from the current source, and
-		 * advance the state machine if so. The failure to read might've
-		 * happened outside this function, e.g when a CRC check fails on a
-		 * record, or within this loop.
+		 * First check if we failed to read from the current source or we
+		 * intentionally would want to switch the source from archive to
+		 * primary, and advance the state machine if so. The failure to read
+		 * might've happened outside this function, e.g when a CRC check fails
+		 * on a record, or within this loop.
 		 */
-		if (lastSourceFailed)
+		if (lastSourceFailed || switchSource)
 		{
 			/*
 			 * Don't allow any retry loops to occur during nonblocking
-			 * readahead.  Let the caller process everything that has been
-			 * decoded already first.
+			 * readahead if we failed to read from the current source. Let the
+			 * caller process everything that has been decoded already first.
 			 */
-			if (nonblocking)
+			if (nonblocking && lastSourceFailed)
 				return XLREAD_WOULDBLOCK;
 
 			switch (currentSource)
@@ -3601,15 +3623,30 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 		}
 
 		if (currentSource != oldSource)
-			elog(DEBUG2, "switched WAL source from %s to %s after %s",
-				 xlogSourceNames[oldSource], xlogSourceNames[currentSource],
-				 lastSourceFailed ? "failure" : "success");
+		{
+			/* Save the timestamp at which we're switching to archive. */
+			if (SwitchFromArchiveToStreamEnabled())
+				switched_to_archive_at = GetCurrentTimestamp();
+
+			if (switchSource)
+				elog(DEBUG2,
+					 "switched WAL source to %s after fetching WAL from %s for at least %d milliseconds",
+					 xlogSourceNames[currentSource],
+					 xlogSourceNames[oldSource],
+					 streaming_replication_retry_interval);
+			else
+				elog(DEBUG2, "switched WAL source from %s to %s after %s",
+					 xlogSourceNames[oldSource],
+					 xlogSourceNames[currentSource],
+					 lastSourceFailed ? "failure" : "success");
+		}
 
 		/*
 		 * We've now handled possible failure. Try to read from the chosen
 		 * source.
 		 */
 		lastSourceFailed = false;
+		switchSource = false;
 
 		switch (currentSource)
 		{
@@ -3632,13 +3669,21 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 				if (randAccess)
 					curFileTLI = 0;
 
+				switchSource = ShouldSwitchWALSourceToPrimary();
+
 				/*
 				 * Try to restore the file from archive, or read an existing
-				 * file from pg_wal.
+				 * file from pg_wal. However, before switching the source to
+				 * stream mode, give it a chance to read from pg_wal.
 				 */
-				readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2,
-											  currentSource == XLOG_FROM_ARCHIVE ? XLOG_FROM_ANY :
-											  currentSource);
+				if (switchSource)
+					readFrom = XLOG_FROM_PG_WAL;
+				else if (currentSource == XLOG_FROM_ARCHIVE)
+					readFrom = XLOG_FROM_ANY;
+				else
+					readFrom = currentSource;
+
+				readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2, readFrom);
 				if (readFile >= 0)
 					return XLREAD_SUCCESS;	/* success! */
 
@@ -3874,6 +3919,63 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 	return XLREAD_FAIL;			/* not reached */
 }
 
+/*
+ * This function tells if a standby should make an attempt to read WAL from
+ * primary after reading from archive for at least
+ * streaming_replication_retry_interval milliseconds. Reading WAL from the
+ * archive may not always be efficient and cheaper because network latencies,
+ * disk IO cost might differ on the archive as compared to the primary and
+ * often the archive may sit far from the standby - all adding to recovery
+ * performance on the standby. Hence reading WAL from the primary as opposed to
+ * the archive enables the standby to catch up with the primary sooner thus
+ * reducing replication lag and avoiding WAL files accumulation on the primary.
+ *
+ * We are here for any of the following reasons:
+ * 1) standby in initial recovery after start/restart.
+ * 2) standby stopped streaming from primary because of connectivity issues
+ * with the primary (either due to network issues or crash in the primary or
+ * something else) or walreceiver got killed or crashed for whatever reasons.
+ */
+static bool
+ShouldSwitchWALSourceToPrimary(void)
+{
+	bool shouldSwitchSource = false;
+
+	if (!SwitchFromArchiveToStreamEnabled())
+		return shouldSwitchSource;
+
+	if (switched_to_archive_at > 0)
+	{
+		TimestampTz curr_time;
+
+		curr_time = GetCurrentTimestamp();
+
+		if (TimestampDifferenceExceeds(switched_to_archive_at, curr_time,
+									   streaming_replication_retry_interval))
+		{
+			elog(DEBUG2,
+				 "trying to switch WAL source to %s after fetching WAL from %s for at least %d milliseconds",
+				 xlogSourceNames[XLOG_FROM_STREAM],
+				 xlogSourceNames[currentSource],
+				 streaming_replication_retry_interval);
+
+			shouldSwitchSource = true;
+		}
+		else
+			shouldSwitchSource = false;
+	}
+	else if (switched_to_archive_at == 0)
+	{
+		/*
+		 * Save the timestamp if we're about to fetch WAL from archive for the
+		 * first time.
+		 */
+		switched_to_archive_at = GetCurrentTimestamp();
+		shouldSwitchSource = false;
+	}
+
+	return shouldSwitchSource;
+}
 
 /*
  * Determine what log level should be used to report a corrupt WAL record
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 05ab087934..df09125611 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -3065,6 +3065,18 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"streaming_replication_retry_interval", PGC_SIGHUP, REPLICATION_STANDBY,
+			gettext_noop("Sets the time after which standby attempts to switch WAL "
+						 "source from archive to streaming replication."),
+			gettext_noop("0 turns this feature off."),
+			GUC_UNIT_MS
+		},
+		&streaming_replication_retry_interval,
+		300000, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"wal_segment_size", PGC_INTERNAL, PRESET_OPTIONS,
 			gettext_noop("Shows the size of write ahead log segments."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 868d21c351..97bc00d5e6 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -351,6 +351,10 @@
 					# in milliseconds; 0 disables
 #wal_retrieve_retry_interval = 5s	# time to wait before retrying to
 					# retrieve WAL after a failed attempt
+#streaming_replication_retry_interval = 5min	# time after which standby
+					# attempts to switch WAL source from archive to
+					# streaming replication
+					# in milliseconds; 0 disables
 #recovery_min_apply_delay = 0		# minimum delay for applying changes during recovery
 
 # - Subscribers -
diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h
index 0e3e246bd2..8c5be66946 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -57,6 +57,7 @@ extern PGDLLIMPORT char *PrimarySlotName;
 extern PGDLLIMPORT char *recoveryRestoreCommand;
 extern PGDLLIMPORT char *recoveryEndCommand;
 extern PGDLLIMPORT char *archiveCleanupCommand;
+extern PGDLLIMPORT int streaming_replication_retry_interval;
 
 /* indirectly set via GUC system */
 extern PGDLLIMPORT TransactionId recoveryTargetXid;
diff --git a/src/test/recovery/t/034_wal_source_switch.pl b/src/test/recovery/t/034_wal_source_switch.pl
new file mode 100644
index 0000000000..d33ca9635c
--- /dev/null
+++ b/src/test/recovery/t/034_wal_source_switch.pl
@@ -0,0 +1,126 @@
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# Test for WAL source switch feature
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Utils;
+use PostgreSQL::Test::Cluster;
+use Test::More;
+
+$ENV{PGDATABASE} = 'postgres';
+
+# Initialize primary node, setting wal-segsize to 1MB
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(
+    allows_streaming => 1,
+    has_archiving => 1,
+    extra => ['--wal-segsize=1']);
+# Ensure checkpoint doesn't come in our way
+$node_primary->append_conf(
+	'postgresql.conf', qq(
+    min_wal_size = 2MB
+    max_wal_size = 1GB
+    checkpoint_timeout = 1h
+    wal_recycle = off
+));
+$node_primary->start;
+$node_primary->safe_psql('postgres',
+	"SELECT pg_create_physical_replication_slot('rep1')");
+
+# Take backup
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a standby linking to it using the replication slot
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+$node_standby->init_from_backup($node_primary, $backup_name,
+	has_streaming => 1,
+    has_restoring => 1);
+$node_standby->append_conf(
+	'postgresql.conf', qq(
+primary_slot_name = 'rep1'
+min_wal_size = 2MB
+max_wal_size = 1GB
+checkpoint_timeout = 1h
+streaming_replication_retry_interval = 100ms
+wal_recycle = off
+log_min_messages = 'debug2'
+));
+
+$node_standby->start;
+
+# Wait until standby has replayed enough data
+$node_primary->wait_for_catchup($node_standby);
+
+# Stop standby
+$node_standby->stop;
+
+# Advance WAL by 100 segments (= 100MB) on primary
+advance_wal($node_primary, 100);
+
+# Wait for primary to generate requested WAL files
+$node_primary->poll_query_until('postgres',
+	q|SELECT COUNT(*) >= 100 FROM pg_ls_waldir()|, 't');
+
+# Standby now connects to primary during inital recovery after fetching WAL
+# from archive for about streaming_replication_retry_interval milliseconds.
+$node_standby->start;
+
+$node_primary->wait_for_catchup($node_standby);
+
+ok(find_in_log(
+		$node_standby,
+        qr/restored log file ".*" from archive/),
+	    'check that some of WAL segments were fetched from archive');
+
+ok(find_in_log(
+		$node_standby,
+        qr/trying to switch WAL source to .* after fetching WAL from .* for at least .* milliseconds/),
+	    'check that standby tried to switch WAL source to primary from archive');
+
+ok(find_in_log(
+		$node_standby,
+        qr/switched WAL source to .* after fetching WAL from .* for at least .* milliseconds/),
+	    'check that standby actually switched WAL source to primary from archive');
+
+ok(find_in_log(
+		$node_standby,
+        qr/started streaming WAL from primary at .* on timeline .*/),
+	    'check that standby strated streaming from primary');
+
+# Stop standby
+$node_standby->stop;
+
+# Stop primary
+$node_primary->stop;
+#####################################
+# Advance WAL of $node by $n segments
+sub advance_wal
+{
+	my ($node, $n) = @_;
+
+	# Advance by $n segments (= (wal_segment_size * $n) bytes) on primary.
+	for (my $i = 0; $i < $n; $i++)
+	{
+		$node->safe_psql('postgres',
+			"CREATE TABLE t (); DROP TABLE t; SELECT pg_switch_wal();");
+	}
+	return;
+}
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+	my ($node, $pat, $off) = @_;
+
+	$off = 0 unless defined $off;
+	my $log = PostgreSQL::Test::Utils::slurp_file($node->logfile);
+	return 0 if (length($log) <= $off);
+
+	$log = substr($log, $off);
+
+	return $log =~ m/$pat/;
+}
+
+done_testing();
-- 
2.34.1

Reply via email to