Le jeudi 2 septembre 2021, 10:37:20 CEST Michael Paquier a écrit :
> On Thu, Sep 02, 2021 at 10:08:26AM +0200, Ronan Dunklau wrote:
> > I could see the use for sending active_pid for use within pg_basebackup:
> > at
> > least we could fail early if the slot is already in use. But at the same
> > time, maybe it won't be in use anymore once we need it.
> 
> There is no real concurrent protection with this design.  You could
> read that the slot is not active during READ_REPLICATION_SLOT just to
> find out after in the process of pg_basebackup streaming WAL that it
> became in use in-between.  And the backend-side protections would kick
> in at this stage.
> 
> Hmm.  The logic doing the decision-making with pg_receivewal may
> become more tricky when it comes to pg_replication_slots.wal_status,
> max_slot_wal_keep_size and pg_replication_slots.safe_wal_size.  The
> number of cases we'd like to consider impacts directly the amount of
> data send through READ_REPLICATION_SLOT.  That's not really different
> than deciding of a failure, a success or a retry with active_pid at an
> earlier or a later stage of a base backup.  pg_receivewal, on the
> contrary, can just rely on what the backend tells when it begins
> streaming.  So I'd prefer keeping things simple and limit the number
> of fields a minimum for this command.

Ok. Please find attached new patches rebased on master.*

0001 is yours without any modification.

0002 for pg_receivewal tried to simplify the logic of information to return, 
by using a dedicated struct for this. This accounts for Bahrath's demands to 
return every possible field.
In particular, an is_logical field simplifies the detection of the type of 
slot. 
In my opinion it makes sense to simplify it like this on the client side while 
being more open-minded on the server side if we ever need to provide a new 
type of slot. Also, GetSlotInformation now returns an enum to be able to 
handle the different modes of failures, which differ between pg_receivewal and 
pg_basebackup. 

0003 is the pg_basebackup one, not much changed except for the concerns you 
had about the log message and handling of different failure modes.

There is still the concern raised by Bharath about being able to select the 
way to fetch the replication slot information for the user, and what should or 
should not be included in the documentation. I am in favor of documenting the 
process of selecting the wal start, and maybe include a --start-lsn option for 
the user to override it, but that's maybe for another patch.

-- 
Ronan Dunklau
>From 60b8cedb196f5acdd75b489c1d2155c2698084a4 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Thu, 2 Sep 2021 16:25:25 +0900
Subject: [PATCH v5 1/3] Add READ_REPLICATION_SLOT command

---
 doc/src/sgml/protocol.sgml                  |  66 ++++++++++++
 src/backend/replication/repl_gram.y         |  16 ++-
 src/backend/replication/repl_scanner.l      |   1 +
 src/backend/replication/walsender.c         | 112 ++++++++++++++++++++
 src/include/nodes/nodes.h                   |   1 +
 src/include/nodes/replnodes.h               |  10 ++
 src/test/recovery/t/001_stream_rep.pl       |  47 +++++++-
 src/test/recovery/t/006_logical_decoding.pl |  15 ++-
 src/tools/pgindent/typedefs.list            |   1 +
 9 files changed, 266 insertions(+), 3 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index a232546b1d..8191f17137 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2052,6 +2052,72 @@ The commands accepted in replication mode are:
     </listitem>
   </varlistentry>
 
+  <varlistentry>
+    <term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable>
+      <indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm>
+    </term>
+    <listitem>
+     <para>
+      Read the information of a replication slot. Returns a tuple with
+      <literal>NULL</literal> values if the replication slot does not
+      exist.
+     </para>
+     <para>
+      In response to this command, the server will return a one-row result set,
+      containing the following fields:
+      <variablelist>
+       <varlistentry>
+        <term><literal>slot_type</literal> (<type>text</type>)</term>
+        <listitem>
+         <para>
+          The replication slot's type, either <literal>physical</literal> or
+          <literal>logical</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
+
+       <varlistentry>
+        <term><literal>restart_lsn</literal> (<type>text</type>)</term>
+        <listitem>
+         <para>
+          The replication slot's <literal>restart_lsn</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
+
+       <varlistentry>
+        <term><literal>confirmed_flush_lsn</literal> (<type>text</type>)</term>
+        <listitem>
+         <para>
+          The replication slot's <literal>confirmed_flush_lsn</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
+
+       <varlistentry>
+        <term><literal>restart_tli</literal> (<type>int4</type>)</term>
+        <listitem>
+         <para>
+          The timeline ID for the <literal>restart_lsn</literal> position,
+          when following the current timeline history.
+         </para>
+        </listitem>
+       </varlistentry>
+
+       <varlistentry>
+        <term><literal>confirmed_flush_tli</literal> (<type>int4</type>)</term>
+        <listitem>
+         <para>
+          The timeline ID for the <literal>confirmed_flush_lsn</literal>
+          position, when following the current timeline history.
+         </para>
+        </listitem>
+       </varlistentry>
+      </variablelist>
+     </para>
+    </listitem>
+  </varlistentry>
+
   <varlistentry>
     <term><literal>START_REPLICATION</literal> [ <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ] [ <literal>PHYSICAL</literal> ] <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>TIMELINE</literal> <replaceable class="parameter">tli</replaceable> ]
      <indexterm><primary>START_REPLICATION</primary></indexterm>
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index e1e8ec29cc..d34d617045 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -64,6 +64,7 @@ static SQLCmd *make_sqlcmd(void);
 /* Keyword tokens. */
 %token K_BASE_BACKUP
 %token K_IDENTIFY_SYSTEM
+%token K_READ_REPLICATION_SLOT
 %token K_SHOW
 %token K_START_REPLICATION
 %token K_CREATE_REPLICATION_SLOT
@@ -94,7 +95,7 @@ static SQLCmd *make_sqlcmd(void);
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
 				create_replication_slot drop_replication_slot identify_system
-				timeline_history show sql_cmd
+				read_replication_slot timeline_history show sql_cmd
 %type <list>	base_backup_opt_list
 %type <defelt>	base_backup_opt
 %type <uintval>	opt_timeline
@@ -120,6 +121,7 @@ opt_semicolon:	';'
 
 command:
 			identify_system
+			| read_replication_slot
 			| base_backup
 			| start_replication
 			| start_logical_replication
@@ -140,6 +142,18 @@ identify_system:
 				}
 			;
 
+/*
+ * READ_REPLICATION_SLOT %s
+ */
+read_replication_slot:
+			K_READ_REPLICATION_SLOT var_name
+				{
+					ReadReplicationSlotCmd *n = makeNode(ReadReplicationSlotCmd);
+					n->slotname = $2;
+					$$ = (Node *) n;
+				}
+			;
+
 /*
  * SHOW setting
  */
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index c038a636c3..1b599c255e 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -85,6 +85,7 @@ identifier		{ident_start}{ident_cont}*
 BASE_BACKUP			{ return K_BASE_BACKUP; }
 FAST			{ return K_FAST; }
 IDENTIFY_SYSTEM		{ return K_IDENTIFY_SYSTEM; }
+READ_REPLICATION_SLOT	{ return K_READ_REPLICATION_SLOT; }
 SHOW		{ return K_SHOW; }
 LABEL			{ return K_LABEL; }
 NOWAIT			{ return K_NOWAIT; }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3ca2a11389..afcc5f6612 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -232,6 +232,7 @@ static void XLogSendLogical(void);
 static void WalSndDone(WalSndSendDataCallback send_data);
 static XLogRecPtr GetStandbyFlushRecPtr(void);
 static void IdentifySystem(void);
+static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd);
 static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
 static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
 static void StartReplication(StartReplicationCmd *cmd);
@@ -457,6 +458,110 @@ IdentifySystem(void)
 	end_tup_output(tstate);
 }
 
+/* Handle READ_REPLICATION_SLOT command */
+static void
+ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
+{
+#define READ_REPLICATION_SLOT_COLS 5
+	ReplicationSlot *slot;
+	DestReceiver *dest;
+	TupOutputState *tstate;
+	TupleDesc	tupdesc;
+	Datum		values[READ_REPLICATION_SLOT_COLS];
+	bool		nulls[READ_REPLICATION_SLOT_COLS];
+
+	tupdesc = CreateTemplateTupleDesc(READ_REPLICATION_SLOT_COLS);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "confirmed_flush_lsn",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "restart_tli",
+							  INT4OID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 5, "confirmed_flush_tli",
+							  INT4OID, -1, 0);
+
+	MemSet(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	slot = SearchNamedReplicationSlot(cmd->slotname, false);
+	if (slot == NULL || !slot->in_use)
+	{
+		LWLockRelease(ReplicationSlotControlLock);
+	}
+	else
+	{
+		List	   *timeline_history = NIL;
+		ReplicationSlot slot_contents;
+		int			i = 0;
+		char		xloc[MAXFNAMELEN];
+		TimeLineID	slots_position_timeline;
+
+		/* Copy slot contents while holding spinlock */
+		SpinLockAcquire(&slot->mutex);
+		slot_contents = *slot;
+		SpinLockRelease(&slot->mutex);
+		LWLockRelease(ReplicationSlotControlLock);
+
+		if (OidIsValid(slot_contents.data.database))
+			values[i] = CStringGetTextDatum("logical");
+		else
+			values[i] = CStringGetTextDatum("physical");
+		nulls[i] = false;
+		i++;
+
+		if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+		{
+			snprintf(xloc, sizeof(xloc), "%X/%X",
+					 LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
+			values[i] = CStringGetTextDatum(xloc);
+			nulls[i] = false;
+		}
+		i++;
+
+		if (!XLogRecPtrIsInvalid(slot_contents.data.confirmed_flush))
+		{
+			snprintf(xloc, sizeof(xloc), "%X/%X",
+					 LSN_FORMAT_ARGS(slot_contents.data.confirmed_flush));
+			values[i] = CStringGetTextDatum(xloc);
+			nulls[i] = false;
+		}
+		i++;
+
+		/*
+		 * Now get the timeline this wal was produced on, to get to the
+		 * current timeline
+		 */
+		if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+		{
+			timeline_history = readTimeLineHistory(ThisTimeLineID);
+			slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
+														  timeline_history);
+			values[i] = Int32GetDatum(slots_position_timeline);
+			nulls[i] = false;
+		}
+		i++;
+
+		if (!XLogRecPtrIsInvalid(slot_contents.data.confirmed_flush))
+		{
+			if (!timeline_history)
+				timeline_history = readTimeLineHistory(ThisTimeLineID);
+			slots_position_timeline = tliOfPointInHistory(slot_contents.data.confirmed_flush,
+														  timeline_history);
+			values[i] = Int32GetDatum(slots_position_timeline);
+			nulls[i] = false;
+		}
+		i++;
+		Assert(i == READ_REPLICATION_SLOT_COLS);
+	}
+
+	dest = CreateDestReceiver(DestRemoteSimple);
+	tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+	do_tup_output(tstate, values, nulls);
+	end_tup_output(tstate);
+}
+
 
 /*
  * Handle TIMELINE_HISTORY command.
@@ -1618,6 +1723,13 @@ exec_replication_command(const char *cmd_string)
 			EndReplicationCommand(cmdtag);
 			break;
 
+		case T_ReadReplicationSlotCmd:
+			cmdtag = "READ_REPLICATION_SLOT";
+			set_ps_display(cmdtag);
+			ReadReplicationSlot((ReadReplicationSlotCmd *) cmd_node);
+			EndReplicationCommand(cmdtag);
+			break;
+
 		case T_BaseBackupCmd:
 			cmdtag = "BASE_BACKUP";
 			set_ps_display(cmdtag);
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 6a4d82f0a8..5f78bdd573 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -495,6 +495,7 @@ typedef enum NodeTag
 	 * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
 	 */
 	T_IdentifySystemCmd,
+	T_ReadReplicationSlotCmd,
 	T_BaseBackupCmd,
 	T_CreateReplicationSlotCmd,
 	T_DropReplicationSlotCmd,
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index faa3a251f2..46384ea074 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -33,6 +33,16 @@ typedef struct IdentifySystemCmd
 	NodeTag		type;
 } IdentifySystemCmd;
 
+/* ----------------------
+ *		READ_REPLICATION_SLOT command
+ * ----------------------
+ */
+typedef struct ReadReplicationSlotCmd
+{
+	NodeTag		type;
+	char	   *slotname;
+} ReadReplicationSlotCmd;
+
 
 /* ----------------------
  *		BASE_BACKUP command
diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl
index ac581c1c07..cbd817cda3 100644
--- a/src/test/recovery/t/001_stream_rep.pl
+++ b/src/test/recovery/t/001_stream_rep.pl
@@ -6,7 +6,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 49;
+use Test::More tests => 55;
 
 # Initialize primary node
 my $node_primary = PostgresNode->new('primary');
@@ -252,6 +252,51 @@ ok( $ret == 0,
 	"SHOW with superuser-settable parameter, replication role and logical replication"
 );
 
+note "testing READ_REPLICATION_SLOT command";
+
+my $slotname = 'test_read_replication_slot_physical';
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+	'postgres',
+	'READ_REPLICATION_SLOT non_existent_slot;',
+	extra_params => [ '-d', $connstr_rep ]);
+ok( $ret == 0,
+	"READ_REPLICATION_SLOT does not produce an error with non existent slot");
+ok($stdout eq '||||',
+	"READ_REPLICATION_SLOT returns NULL values if slot does not exist");
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+	'postgres',
+	"CREATE_REPLICATION_SLOT $slotname PHYSICAL RESERVE_WAL;",
+	extra_params => [ '-d', $connstr_rep ],
+	0,
+	'physical slot created on primary');
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+	'postgres',
+	"READ_REPLICATION_SLOT $slotname;",
+	extra_params => [ '-d', $connstr_rep ]);
+ok($ret == 0,
+	"READ_REPLICATION_SLOT does not produce an error with existing slot");
+ok($stdout =~ 'physical\|[^|]*\|\|1\|',
+	"READ_REPLICATION_SLOT returns tuple corresponding to the slot");
+
+$node_primary->psql(
+	'postgres',
+	"DROP_REPLICATION_SLOT $slotname;",
+	extra_params => [ '-d', $connstr_rep ],
+	0,
+	'physical slot dropped on primary');
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+	'postgres',
+	"READ_REPLICATION_SLOT $slotname;",
+	extra_params => [ '-d', $connstr_rep ]);
+ok($ret == 0,
+	"READ_REPLICATION_SLOT does not produce an error with dropped slot");
+ok($stdout eq '||||',
+	"READ_REPLICATION_SLOT returns NULL values if slot has been dropped");
+
 note "switching to physical replication slot";
 
 # Switch to using a physical replication slot. We can do this without a new
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index cc116062c2..50e4fa1042 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -10,7 +10,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 14;
+use Test::More tests => 16;
 use Config;
 
 # Initialize primary node
@@ -39,6 +39,19 @@ ok( $stderr =~
 	  m/replication slot "test_slot" was not created in this database/,
 	"Logical decoding correctly fails to start");
 
+($result, $stdout, $stderr) = $node_primary->psql(
+	'template1',
+	qq[READ_REPLICATION_SLOT test_slot;],
+	replication => 'database');
+ok($stdout =~ 'logical\|[^|]*\|[^|]*\|1\|1',
+	'Logical replication slot can be read on any logical connection');
+($result, $stdout, $stderr) = $node_primary->psql(
+	'postgres',
+	qq[READ_REPLICATION_SLOT test_slot;],
+	replication => '1');
+ok($stdout =~ 'logical\|[^|]*\|[^|]*\|1\|1',
+	'Logical replication slot can be read on a physical connection');
+
 # Check case of walsender not using a database connection.  Logical
 # decoding should not be allowed.
 ($result, $stdout, $stderr) = $node_primary->psql(
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index f31a1e4e1e..b256ee3be6 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2126,6 +2126,7 @@ ReadBufferMode
 ReadBytePtrType
 ReadExtraTocPtrType
 ReadFunc
+ReadReplicationSlotCmd
 ReassignOwnedStmt
 RecheckForeignScan_function
 RecordCacheEntry
-- 
2.33.0

>From a1ca1f98096b05344e8b2c7025d90885ad6f8b8e Mon Sep 17 00:00:00 2001
From: Ronan Dunklau <ronan.dunk...@aiven.io>
Date: Wed, 1 Sep 2021 15:52:32 +0200
Subject: [PATCH v5 2/3] Use READ_REPLICATION_SLOT command in pg_receivewal.

Prior to this patch, when running pg_receivewal, the start LSN is determined by looking at
the WAL files currently stored on disk, then using the current flush lsn
from the server.

If for some reason the WAL files from pg_receivewal were moved, we want
to restart where we left at, which is the replication slot's restart_lsn
instead of skipping right to the current flush location.

To keep compatibility with prior server versions, we only attempt it if
the version is < 15.
---
 src/bin/pg_basebackup/pg_receivewal.c        |  53 +++++++++-
 src/bin/pg_basebackup/streamutil.c           | 100 +++++++++++++++++++
 src/bin/pg_basebackup/streamutil.h           |  16 +++
 src/bin/pg_basebackup/t/020_pg_receivewal.pl |  51 +++++++++-
 4 files changed, 216 insertions(+), 4 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index 9d1843728d..9f63df120b 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -408,10 +408,59 @@ StreamLog(void)
 	stream.startpos = FindStreamingStart(&stream.timeline);
 	if (stream.startpos == InvalidXLogRecPtr)
 	{
-		stream.startpos = serverpos;
-		stream.timeline = servertli;
+		/* Try to get it from the slot if any, and the server supports it */
+		if (replication_slot)
+		{
+			SlotInformation slot_info;
+
+			switch (GetSlotInformation(conn, replication_slot, &slot_info))
+			{
+				case READ_REPLICATION_SLOT_ERROR:
+
+					/*
+					 * Error has been logged by GetSlotInformation, return and
+					 * maybe retry
+					 */
+					return;
+				case READ_REPLICATION_SLOT_NONEXISTENT:
+					pg_log_error("replication slot \"%s\" does not exist", replication_slot);
+					return;
+				case READ_REPLICATION_SLOT_UNSUPPORTED:
+
+					/*
+					 * Server doesn't support the command, so fallback to the
+					 * default case
+					 */
+					pg_log_warning("server does not support fetching the slot's position, "
+								   "resuming from current server position instead");
+					break;
+				default:
+					if (slot_info.is_logical)
+					{
+						/*
+						 * If the slot is not physical we can't expect to
+						 * recover from that
+						 */
+						pg_log_error("cannot use the slot provided, physical slot expected.");
+						exit(1);
+					}
+					stream.startpos = slot_info.restart_lsn;
+					stream.timeline = slot_info.restart_tli;
+			}
+		}
+
+		/*
+		 * If it is still unknown, use the current flush value from the server
+		 */
+		if (stream.startpos == InvalidXLogRecPtr)
+		{
+			stream.startpos = serverpos;
+			stream.timeline = servertli;
+		}
 	}
 
+	Assert(stream.startpos != InvalidXLogRecPtr);
+
 	/*
 	 * Always start streaming at the beginning of a segment
 	 */
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index f5b3b476e5..a4d25552b3 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -479,6 +479,106 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
 	return true;
 }
 
+
+/*
+ * Returns wether a replication slot exists through a given connection,
+ * and fills in the slot_info with the results if passed by the caller.
+ */
+ReadReplicationSlotStatus
+GetSlotInformation(PGconn *conn, const char *slot_name, SlotInformation * slot_info)
+{
+	PGresult   *res;
+	PQExpBuffer query;
+
+	Assert(slot_name != NULL);
+
+	if (PQserverVersion(conn) < 150000)
+		return READ_REPLICATION_SLOT_UNSUPPORTED;
+
+
+	query = createPQExpBuffer();
+	appendPQExpBuffer(query, "READ_REPLICATION_SLOT %s", slot_name);
+	res = PQexec(conn, query->data);
+	destroyPQExpBuffer(query);
+	/* The commpand should always return precisely one tuple */
+	if (PQresultStatus(res) != PGRES_TUPLES_OK || PQntuples(res) != 1)
+	{
+		pg_log_error("could not read replication slot : %s",
+					 PQerrorMessage(conn));
+		PQclear(res);
+		return READ_REPLICATION_SLOT_ERROR;
+	}
+
+	/*
+	 * When the slot doesn't exist, the command returns an all-null tuple. The
+	 * first column (slot_type) will only be null if the slot doesn't exists.
+	 */
+	if (PQgetisnull(res, 0, 0))
+	{
+		PQclear(res);
+		return READ_REPLICATION_SLOT_NONEXISTENT;
+	}
+	if (PQntuples(res) != 1 || PQnfields(res) < 5)
+	{
+		pg_log_error("could not fetch replication slot: got %d rows and %d fields, expected %d rows and %d or more fields",
+					 PQntuples(res), PQnfields(res), 1, 5);
+		PQclear(res);
+		return READ_REPLICATION_SLOT_ERROR;
+	}
+	/* If no slotinformation has been passed, we can return immediately */
+	if (slot_info == NULL)
+	{
+		PQclear(res);
+		return READ_REPLICATION_SLOT_OK;
+	}
+
+	slot_info->is_logical = strcmp(PQgetvalue(res, 0, 0), "logical") == 0;
+	/* Restart LSN */
+	if (PQgetisnull(res, 0, 1))
+		slot_info->restart_lsn = InvalidXLogRecPtr;
+	else
+	{
+		uint32		hi,
+					lo;
+
+		if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
+		{
+			pg_log_error("could not parse slot's restart_lsn \"%s\"",
+						 PQgetvalue(res, 0, 1));
+			PQclear(res);
+			return READ_REPLICATION_SLOT_ERROR;
+		}
+		slot_info->restart_lsn = ((uint64) hi) << 32 | lo;
+	}
+	if (PQgetisnull(res, 0, 2))
+		slot_info->confirmed_flush_lsn = InvalidXLogRecPtr;
+	else
+	{
+		uint32		hi,
+					lo;
+
+		if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
+		{
+			pg_log_error("could not parse slot's confirmed_flush_lsn \"%s\"",
+						 PQgetvalue(res, 0, 2));
+			PQclear(res);
+			return READ_REPLICATION_SLOT_ERROR;
+		}
+		slot_info->confirmed_flush_lsn = ((uint64) hi) << 32 | lo;
+	}
+
+	if (PQgetisnull(res, 0, 3))
+		slot_info->restart_tli = 0;
+	else
+		slot_info->restart_tli = atoi(PQgetvalue(res, 0, 3));
+	if (PQgetisnull(res, 0, 4))
+		slot_info->confirmed_flush_tli = 0;
+	else
+		slot_info->confirmed_flush_tli = atoi(PQgetvalue(res, 0, 4));
+	PQclear(res);
+	return READ_REPLICATION_SLOT_OK;
+}
+
 /*
  * Create a replication slot for the given connection. This function
  * returns true in case of success.
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
index 504803b976..cd3bc3229a 100644
--- a/src/bin/pg_basebackup/streamutil.h
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -28,6 +28,14 @@ extern uint32 WalSegSz;
 /* Connection kept global so we can disconnect easily */
 extern PGconn *conn;
 
+typedef struct SlotInformation {
+	bool	   is_logical;
+	XLogRecPtr restart_lsn;
+	XLogRecPtr confirmed_flush_lsn;
+	TimeLineID restart_tli;
+	TimeLineID confirmed_flush_tli;
+} SlotInformation;
+
 extern PGconn *GetConnection(void);
 
 /* Replication commands */
@@ -40,6 +48,14 @@ extern bool RunIdentifySystem(PGconn *conn, char **sysid,
 							  TimeLineID *starttli,
 							  XLogRecPtr *startpos,
 							  char **db_name);
+typedef enum {
+	READ_REPLICATION_SLOT_OK,
+	READ_REPLICATION_SLOT_UNSUPPORTED,
+	READ_REPLICATION_SLOT_ERROR,
+	READ_REPLICATION_SLOT_NONEXISTENT
+} ReadReplicationSlotStatus;
+
+extern ReadReplicationSlotStatus GetSlotInformation(PGconn *conn, const char *slot_name, SlotInformation *slot_info);
 extern bool RetrieveWalSegSize(PGconn *conn);
 extern TimestampTz feGetCurrentTimestamp(void);
 extern void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
diff --git a/src/bin/pg_basebackup/t/020_pg_receivewal.pl b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
index 0b33d73900..e79fd9d7f0 100644
--- a/src/bin/pg_basebackup/t/020_pg_receivewal.pl
+++ b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
@@ -5,7 +5,7 @@ use strict;
 use warnings;
 use TestLib;
 use PostgresNode;
-use Test::More tests => 27;
+use Test::More tests => 32;
 
 program_help_ok('pg_receivewal');
 program_version_ok('pg_receivewal');
@@ -49,7 +49,7 @@ is($primary->slot($slot_name)->{'slot_type'},
 
 # Generate some WAL.  Use --synchronous at the same time to add more
 # code coverage.  Switch to the next segment first so that subsequent
-# restarts of pg_receivewal will see this segment as full..
+# restarts of pg_receivewal will see this segment as full
 $primary->psql('postgres', 'CREATE TABLE test_table(x integer);');
 $primary->psql('postgres', 'SELECT pg_switch_wal();');
 my $nextlsn =
@@ -146,6 +146,53 @@ $primary->command_ok(
 $partial_wals[0] =~ s/(\.gz)?.partial//;
 ok(-e $partial_wals[0], "check that previously partial WAL is now complete");
 
+# Verify that if we use a replication slot, we resume where we left even in the
+# absence of WALs
+
+# Setup the slot, and connect to it a first time
+$primary->run_log(
+	[ 'pg_receivewal', '--slot', $slot_name, '--create-slot' ],
+	'creating a replication slot');
+$primary->psql('postgres',
+	'INSERT INTO test_table VALUES (generate_series(1,100));');
+$primary->psql('postgres', 'SELECT pg_switch_wal();');
+$nextlsn =
+  $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+chomp($nextlsn);
+$primary->run_log(
+	[ 'pg_receivewal', '-D', $stream_dir, '--slot', $slot_name, '--verbose', '--endpos', $nextlsn ],
+	"streaming some WAL");
+
+# Get the slot restart_lsn and make sure we retrieve the associated WAL file
+# even after deletion of the previous stored WAL files.
+$slot = $primary->slot($slot_name);
+my $restart_lsn = $slot->{'restart_lsn'};
+# Add one so that the restart_lsn doesn't correspond to the previous file.
+$restart_lsn =~ s/.$/1/;
+my $walfile_to_be_archived = $primary->safe_psql('postgres',
+	"SELECT pg_walfile_name('$restart_lsn');");
+isnt($restart_lsn, '', 'restart LSN of new slot is not null');
+
+unlink glob "'${stream_dir}/*'";
+
+$primary->psql('postgres',
+	'INSERT INTO test_table VALUES (generate_series(1,100));');
+$primary->psql('postgres', 'SELECT pg_switch_wal();');
+$nextlsn =
+  $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+chomp($nextlsn);
+# First verify what happens if we try with a non-existing slot.
+$primary->command_fails_like(
+  [ 'pg_receivewal', '-D', $stream_dir, '--slot', 'nonexistentslot', '-n', '--verbose', '--endpos', $nextlsn ],
+  qr/pg_receivewal: error: replication slot "nonexistentslot" does not exist/,
+  'pg_receivewal fails with a non-existing slot');
+$primary->command_ok(
+	[ 'pg_receivewal', '-D', $stream_dir, '--slot', $slot_name, '--verbose', '--endpos', $nextlsn ],
+	"Stream some wal starting from the slot's restart_lsn");
+$slot = $primary->slot($slot_name);
+my @walfiles = glob "${stream_dir}/*";
+ok(-e "$stream_dir/$walfile_to_be_archived", "WAL from the slot's restart_lsn has been archived");
+
 # Permissions on WAL files should be default
 SKIP:
 {
-- 
2.33.0

>From bbd70460e8de09da8bcacb0df3aa92e043fbff14 Mon Sep 17 00:00:00 2001
From: Ronan Dunklau <ronan.dunk...@aiven.io>
Date: Wed, 1 Sep 2021 15:52:53 +0200
Subject: [PATCH v5 3/3] Check slot existence in pg_basebackup.

Use the newly introduced READ_REPLICATION_SLOT command to check for a
slot existence in pg_basebackup. That way, we can fail early.
---
 src/bin/pg_basebackup/pg_basebackup.c        | 31 ++++++++++++++++++++
 src/bin/pg_basebackup/t/010_pg_basebackup.pl |  7 +++--
 2 files changed, 35 insertions(+), 3 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 7296eb97d0..a807d5a699 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -1867,6 +1867,37 @@ BaseBackup(void)
 	if (!RunIdentifySystem(conn, &sysidentifier, &latesttli, NULL, NULL))
 		exit(1);
 
+	/*
+	 * Check the replication slot exists if applicable
+	 */
+	if (replication_slot && !(temp_replication_slot || create_slot))
+	{
+		SlotInformation slot_info;
+
+		switch (GetSlotInformation(conn, replication_slot, &slot_info))
+		{
+			case READ_REPLICATION_SLOT_ERROR:
+
+				/*
+				 * Error has been logged by GetSlotInformation
+				 */
+				exit(1);
+			case READ_REPLICATION_SLOT_UNSUPPORTED:
+				/* We don't care about the result here */
+				break;
+			case READ_REPLICATION_SLOT_NONEXISTENT:
+				pg_log_error("replication slot \"%s\" does not exist", replication_slot);
+				exit(1);
+			case READ_REPLICATION_SLOT_OK:
+				if (slot_info.is_logical)
+				{
+					pg_log_error("cannot use the slot provided, physical slot expected.");
+					exit(1);
+				}
+				break;
+		}
+	}
+
 	/*
 	 * Start the actual backup
 	 */
diff --git a/src/bin/pg_basebackup/t/010_pg_basebackup.pl b/src/bin/pg_basebackup/t/010_pg_basebackup.pl
index a2cb2a7679..69fd262a97 100644
--- a/src/bin/pg_basebackup/t/010_pg_basebackup.pl
+++ b/src/bin/pg_basebackup/t/010_pg_basebackup.pl
@@ -10,7 +10,7 @@ use File::Path qw(rmtree);
 use Fcntl qw(:seek);
 use PostgresNode;
 use TestLib;
-use Test::More tests => 110;
+use Test::More tests => 111;
 
 program_help_ok('pg_basebackup');
 program_version_ok('pg_basebackup');
@@ -465,14 +465,15 @@ $node->command_ok(
 	'pg_basebackup -X stream runs with --no-slot');
 rmtree("$tempdir/backupnoslot");
 
-$node->command_fails(
+$node->command_fails_like(
 	[
 		'pg_basebackup',             '-D',
 		"$tempdir/backupxs_sl_fail", '-X',
 		'stream',                    '-S',
 		'slot0'
 	],
-	'pg_basebackup fails with nonexistent replication slot');
+	qr/pg_basebackup: error: replication slot "slot0" does not exist/,
+	'pg_basebackup fails early with nonexistent replication slot');
 
 $node->command_fails(
 	[ 'pg_basebackup', '-D', "$tempdir/backupxs_slot", '-C' ],
-- 
2.33.0

Reply via email to