From f79cb31eeb96993e63cc5c8208a444a052532490 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 1 Apr 2025 16:45:59 +0900
Subject: [PATCH v3] Allow pg_recvlogical to create slots with failover=true

--two-phase option in pg_recvlogical allows users to create replication slots
two_phase=true. However, another slot option, failover, could not be enabled
from the command.

This patch adds a new option --failover in pg_recvlogical. When specified, the
command tries to create a slot with failover=true, which can be synchronized
to the standby.

Author: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Michael Banck <mbanck@gmx.net>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
---
 doc/src/sgml/ref/pg_recvlogical.sgml          | 14 +++++++--
 src/bin/pg_basebackup/pg_basebackup.c         |  3 +-
 src/bin/pg_basebackup/pg_receivewal.c         |  2 +-
 src/bin/pg_basebackup/pg_recvlogical.c        | 29 +++++++++++++++----
 src/bin/pg_basebackup/streamutil.c            |  7 ++++-
 src/bin/pg_basebackup/streamutil.h            |  3 +-
 src/bin/pg_basebackup/t/030_pg_recvlogical.pl | 15 ++++++++++
 7 files changed, 62 insertions(+), 11 deletions(-)

diff --git a/doc/src/sgml/ref/pg_recvlogical.sgml b/doc/src/sgml/ref/pg_recvlogical.sgml
index 2946bdae1e5..5166393abeb 100644
--- a/doc/src/sgml/ref/pg_recvlogical.sgml
+++ b/doc/src/sgml/ref/pg_recvlogical.sgml
@@ -79,8 +79,8 @@ PostgreSQL documentation
        </para>
 
        <para>
-        The <option>--two-phase</option> can be specified with
-        <option>--create-slot</option> to enable decoding of prepared transactions.
+        The <option>--two-phase</option> and <option>--falover</option> options
+        can be specified with <option>--create-slot</option>.
        </para>
       </listitem>
      </varlistentry>
@@ -165,6 +165,16 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--failover</option></term>
+      <listitem>
+       <para>
+        Enables the slot to be synchronized to the standbys. This option may
+        only be specified with <option>--create-slot</option>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-f <replaceable>filename</replaceable></option></term>
       <term><option>--file=<replaceable>filename</replaceable></option></term>
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 1da4bfc2351..eb7354200bc 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -667,7 +667,8 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier,
 	if (temp_replication_slot || create_slot)
 	{
 		if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL,
-								   temp_replication_slot, true, true, false, false))
+								   temp_replication_slot, true, true, false,
+								   false, false))
 			exit(1);
 
 		if (verbose)
diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index de3584018b0..e816cf58101 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -889,7 +889,7 @@ main(int argc, char **argv)
 			pg_log_info("creating replication slot \"%s\"", replication_slot);
 
 		if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false,
-								   slot_exists_ok, false))
+								   slot_exists_ok, false, false))
 			exit(1);
 		exit(0);
 	}
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index e6158251ec1..339d3a539c5 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -42,6 +42,7 @@ typedef enum
 static char *outfile = NULL;
 static int	verbose = 0;
 static bool two_phase = false;
+static bool failover = false;
 static int	noloop = 0;
 static int	standby_message_timeout = 10 * 1000;	/* 10 sec = default */
 static int	fsync_interval = 10 * 1000; /* 10 sec = default */
@@ -89,6 +90,8 @@ usage(void)
 	printf(_("      --start            start streaming in a replication slot (for the slot's name see --slot)\n"));
 	printf(_("\nOptions:\n"));
 	printf(_("  -E, --endpos=LSN       exit after receiving the specified LSN\n"));
+	printf(_("      --failover         enable the replication slot being synchronized to the standby\n"
+			 "                         when creating a slot\n"));
 	printf(_("  -f, --file=FILE        receive log into this file, - for stdout\n"));
 	printf(_("  -F  --fsync-interval=SECS\n"
 			 "                         time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
@@ -695,6 +698,7 @@ main(int argc, char **argv)
 		{"file", required_argument, NULL, 'f'},
 		{"fsync-interval", required_argument, NULL, 'F'},
 		{"no-loop", no_argument, NULL, 'n'},
+		{"failover", no_argument, NULL, 5},
 		{"verbose", no_argument, NULL, 'v'},
 		{"two-phase", no_argument, NULL, 't'},
 		{"version", no_argument, NULL, 'V'},
@@ -770,6 +774,9 @@ main(int argc, char **argv)
 			case 'v':
 				verbose++;
 				break;
+			case 5:
+				failover = true;
+				break;
 /* connection options */
 			case 'd':
 				dbname = pg_strdup(optarg);
@@ -917,11 +924,22 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
-	if (two_phase && !do_create_slot)
+	if (!do_create_slot)
 	{
-		pg_log_error("--two-phase may only be specified with --create-slot");
-		pg_log_error_hint("Try \"%s --help\" for more information.", progname);
-		exit(1);
+		if (two_phase)
+		{
+			pg_log_error("--two-phase may only be specified with --create-slot");
+			pg_log_error_hint("Try \"%s --help\" for more information.",
+							  progname);
+			exit(1);
+		}
+		else if (failover)
+		{
+			pg_log_error("--failover may only be specified with --create-slot");
+			pg_log_error_hint("Try \"%s --help\" for more information.",
+							  progname);
+			exit(1);
+		}
 	}
 
 	/*
@@ -984,7 +1002,8 @@ main(int argc, char **argv)
 			pg_log_info("creating replication slot \"%s\"", replication_slot);
 
 		if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
-								   false, false, slot_exists_ok, two_phase))
+								   false, false, slot_exists_ok, two_phase,
+								   failover))
 			exit(1);
 		startpos = InvalidXLogRecPtr;
 	}
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index 8e605f43ffe..c7b8a4c3a4b 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -583,7 +583,7 @@ GetSlotInformation(PGconn *conn, const char *slot_name,
 bool
 CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
 					  bool is_temporary, bool is_physical, bool reserve_wal,
-					  bool slot_exists_ok, bool two_phase)
+					  bool slot_exists_ok, bool two_phase, bool failover)
 {
 	PQExpBuffer query;
 	PGresult   *res;
@@ -594,6 +594,7 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
 	Assert((is_physical && plugin == NULL) ||
 		   (!is_physical && plugin != NULL));
 	Assert(!(two_phase && is_physical));
+	Assert(!(failover && is_physical));
 	Assert(slot_name != NULL);
 
 	/* Build base portion of query */
@@ -616,6 +617,10 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
 	}
 	else
 	{
+		if (failover && PQserverVersion(conn) >= 170000)
+			AppendPlainCommandOption(query, use_new_option_syntax,
+									 "FAILOVER");
+
 		if (two_phase && PQserverVersion(conn) >= 150000)
 			AppendPlainCommandOption(query, use_new_option_syntax,
 									 "TWO_PHASE");
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
index f917c43517f..017b227303c 100644
--- a/src/bin/pg_basebackup/streamutil.h
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -35,7 +35,8 @@ extern PGconn *GetConnection(void);
 extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name,
 								  const char *plugin, bool is_temporary,
 								  bool is_physical, bool reserve_wal,
-								  bool slot_exists_ok, bool two_phase);
+								  bool slot_exists_ok, bool two_phase,
+								  bool failover);
 extern bool DropReplicationSlot(PGconn *conn, const char *slot_name);
 extern bool RunIdentifySystem(PGconn *conn, char **sysid,
 							  TimeLineID *starttli,
diff --git a/src/bin/pg_basebackup/t/030_pg_recvlogical.pl b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl
index 62bbc5a3f98..c82e78847b3 100644
--- a/src/bin/pg_basebackup/t/030_pg_recvlogical.pl
+++ b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl
@@ -135,4 +135,19 @@ $node->command_ok(
 	],
 	'drop could work without dbname');
 
+# test with failover option enabled
+$node->command_ok(
+	[
+		'pg_recvlogical',
+		'--slot' => 'test',
+		'--dbname' => $node->connstr('postgres'),
+		'--create-slot',
+		'--failover',
+	],
+	'slot with failover created');
+
+my $result = $node->safe_psql('postgres',
+	"SELECT failover FROM pg_catalog.pg_replication_slots WHERE slot_name = 'test'");
+is($result, 't', "failover is enabled for the new slot");
+
 done_testing();
-- 
2.43.5

