Le vendredi 27 août 2021, 05:44:32 CEST Michael Paquier a écrit :
> On Thu, Aug 26, 2021 at 02:14:27PM +0200, Ronan Dunklau wrote:
> > Following the discussion at [1], I refactored the implementation into
> > streamutil and added a third patch making use of it in pg_basebackup
> > itself in order to fail early if the replication slot doesn't exist, so
> > please find attached v2 for that.
> 
> Thanks for the split.  That helps a lot.
> 

Thank you very much for the review, please find attached an updated patchset.
I've also taken into account some remarks made by Bharath Rupireddy.

> +
> +
>  /*
>   * Run IDENTIFY_SYSTEM through a given connection and give back to caller
> 
> The patch series has some noise diffs here and there, you may want to
> clean up that for clarity.

Ok, sorry about that.

> 
> +   if (slot == NULL || !slot->in_use)
> +   {
> +       LWLockRelease(ReplicationSlotControlLock);
> +
> +       ereport(ERROR,
> +               (errcode(ERRCODE_UNDEFINED_OBJECT),
> LWLocks are released on ERROR, so no need for LWLockRelease() here.
> 

Following your suggestion of not erroring out on an unexisting slot this point 
is no longer be relevant, but thanks for pointing this out anyway.

> +    <listitem>
> +      <para>
> +      Read information about the named replication slot.  This is
> useful to determine which WAL location we should be asking the server
> to start streaming at.
> 
> A nit.  You may want to be more careful with the indentation of the
> documentation.  Things are usually limited in width for readability.
> More <literal> markups would be nice for the field names used in the
> descriptions.

Ok.

> 
> +   if (slot == NULL || !slot->in_use)                                      
>                                                                            
>                                                                     [...] +
>       ereport(ERROR,
> +               (errcode(ERRCODE_UNDEFINED_OBJECT),
> +                errmsg("replication slot \"%s\" does not exist",
> +                       cmd->slotname)));
> [...]
> +       if (PQntuples(res) == 0)
> +       {
> +               pg_log_error("replication slot %s does not exist",
> slot_name); +               PQclear(0);
> +               return false;
> So, the backend and ReadReplicationSlot() report an ERROR if a slot
> does not exist but pg_basebackup's GetSlotInformation() does the same
> if there are no tuples returned.  That's inconsistent.  Wouldn't it be
> more instinctive to return a NULL tuple instead if the slot does not
> exist to be able to check after real ERRORs in frontends using this
> interface?  

The attached patch returns no tuple at all when the replication slot doesn't 
exist. I'm not sure if that's what you meant by returning a NULL tuple ? 

> A slot in use exists, so the error is a bit confusing here
> anyway, no?

From my understanding, a slot  *not* in use doesn't exist anymore, as such I 
don't really understand this point. Could you clarify ?


> 
> +    * XXX: should we allow the caller to specify which target timeline it
> wants +    * ?
> +    */
> What are you thinking about here?

I was thinking that maybe instead of walking back the timeline history from 
where we currently are on the server, we could allow an additional argument 
for the client to specify which timeline it wants. But I guess a replication 
slot can not be present for a past, divergent timeline ? I have removed that 
suggestion. 

> 
> -# restarts of pg_receivewal will see this segment as full..
> +# restarts of pg_receivewal will see this segment as full../
> Typo.

Ok.

> 
> +   TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4,
> "restart_lsn_timeline", +                             INT4OID, -1, 0);
> +   TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 5,
> "confirmed_flush_lsn_timeline", +                             INT4OID, -1,
> 0);
> I would call these restart_tli and confirmed_flush_tli., without the
> "lsn" part.

Ok.
> 
> The patch for READ_REPLICATION_SLOT could have some tests using a
> connection that has replication=1 in some TAP tests.  We do that in
> 001_stream_rep.pl with SHOW, as one example.

Ok. I added the physical part to 001_stream_rep.pl, using the protocol 
interface directly for creating / dropping the slot, and some tests for 
logical replication slots to 006_logical_decoding.pl.

> 
> -               'slot0'
> +               'slot0',                     '-p',
> +               "$port"
> Something we are missing here?

The thing we're missing here is a wrapper for command_fails_like. I've added 
this to PostgresNode.pm.

Best regards,


-- 
Ronan Dunklau
>From 9fa01789f663975b963c26875b70857055cadb9b Mon Sep 17 00:00:00 2001
From: Ronan Dunklau <ronan.dunk...@aiven.io>
Date: Wed, 28 Jul 2021 16:34:54 +0200
Subject: [PATCH v3 1/3] Add READ_REPLICATION_SLOT command.

This commit introduces a new READ_REPLICATION_SLOT <slot_name> command.
This command is used to read information about a replication slot when
using a physical replication connection.

In this first version it returns the slot type, restart_lsn, flush_lsn and
the timeline of the restart_lsn and flush_lsn, which are obtained by following the
current timeline history.
---
 doc/src/sgml/protocol.sgml                  |  66 ++++++++++
 src/backend/replication/repl_gram.y         |  16 ++-
 src/backend/replication/repl_scanner.l      |   1 +
 src/backend/replication/walsender.c         | 129 ++++++++++++++++++++
 src/include/nodes/nodes.h                   |   1 +
 src/include/nodes/replnodes.h               |  10 ++
 src/test/recovery/t/001_stream_rep.pl       |  46 ++++++-
 src/test/recovery/t/006_logical_decoding.pl |  13 +-
 8 files changed, 279 insertions(+), 3 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index a232546b1d..0e6fb01054 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2052,6 +2052,72 @@ The commands accepted in replication mode are:
     </listitem>
   </varlistentry>
 
+  <varlistentry>
+    <term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable>
+      <indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm>
+    </term>
+    <listitem>
+      <para>
+      Read information about the named replication slot.
+      This is useful to determine which WAL location we should be asking the
+      server to start streaming at.
+      </para>
+      <para>
+      In response to this command, the server will return a one-row result set,
+      containing the following fields:
+        <variablelist>
+          <varlistentry>
+            <term><literal>type</literal> (<type>text</type>)</term>
+            <listitem>
+              <para>
+               The replication slot's type, either <literal>physical</literal> or
+               <literal>logical</literal>
+              </para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term><literal>restart_lsn</literal> (<type>text</type>)</term>
+            <listitem>
+              <para>
+               The replication slot's <literal>restart_lsn</literal>.
+              </para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term><literal>confirmed_flush_lsn</literal> (<type>text</type>)</term>
+            <listitem>
+              <para>
+               The replication slot's <literal>confirmed_flush_lsn</literal>.
+              </para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term><literal>restart_tli</literal> (<type>int4</type>)</term>
+            <listitem>
+              <para>
+               The timeline ID for the <literal>restart_lsn</literal> position,
+               when following the current timeline history
+              </para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term><literal>confirmed_flush_tli</literal> (<type>int4</type>)</term>
+            <listitem>
+              <para>
+               The timeline ID for the <literal>confirmed_flush_lsn</literal>
+               position, when following the current timeline history
+              </para>
+            </listitem>
+          </varlistentry>
+        </variablelist>
+      </para>
+    </listitem>
+  </varlistentry>
+
   <varlistentry>
     <term><literal>START_REPLICATION</literal> [ <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ] [ <literal>PHYSICAL</literal> ] <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>TIMELINE</literal> <replaceable class="parameter">tli</replaceable> ]
      <indexterm><primary>START_REPLICATION</primary></indexterm>
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index e1e8ec29cc..5de003b7dc 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -64,6 +64,7 @@ static SQLCmd *make_sqlcmd(void);
 /* Keyword tokens. */
 %token K_BASE_BACKUP
 %token K_IDENTIFY_SYSTEM
+%token K_READ_REPLICATION_SLOT
 %token K_SHOW
 %token K_START_REPLICATION
 %token K_CREATE_REPLICATION_SLOT
@@ -94,7 +95,7 @@ static SQLCmd *make_sqlcmd(void);
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
 				create_replication_slot drop_replication_slot identify_system
-				timeline_history show sql_cmd
+				identify_replication_slot timeline_history show sql_cmd
 %type <list>	base_backup_opt_list
 %type <defelt>	base_backup_opt
 %type <uintval>	opt_timeline
@@ -120,6 +121,7 @@ opt_semicolon:	';'
 
 command:
 			identify_system
+			| identify_replication_slot
 			| base_backup
 			| start_replication
 			| start_logical_replication
@@ -140,6 +142,18 @@ identify_system:
 				}
 			;
 
+/*
+ * READ_REPLICATION_SLOT %s
+ */
+identify_replication_slot:
+			K_READ_REPLICATION_SLOT var_name
+				{
+					ReadReplicationSlotCmd *n = makeNode(ReadReplicationSlotCmd);
+					n->slotname = $2;
+					$$ = (Node *) n;
+				}
+			;
+
 /*
  * SHOW setting
  */
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index c038a636c3..1b599c255e 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -85,6 +85,7 @@ identifier		{ident_start}{ident_cont}*
 BASE_BACKUP			{ return K_BASE_BACKUP; }
 FAST			{ return K_FAST; }
 IDENTIFY_SYSTEM		{ return K_IDENTIFY_SYSTEM; }
+READ_REPLICATION_SLOT	{ return K_READ_REPLICATION_SLOT; }
 SHOW		{ return K_SHOW; }
 LABEL			{ return K_LABEL; }
 NOWAIT			{ return K_NOWAIT; }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3ca2a11389..de50f06e90 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -232,6 +232,7 @@ static void XLogSendLogical(void);
 static void WalSndDone(WalSndSendDataCallback send_data);
 static XLogRecPtr GetStandbyFlushRecPtr(void);
 static void IdentifySystem(void);
+static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd);
 static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
 static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
 static void StartReplication(StartReplicationCmd *cmd);
@@ -457,6 +458,127 @@ IdentifySystem(void)
 	end_tup_output(tstate);
 }
 
+/* Handle READ_REPLICATION_SLOT command */
+static void
+ReadReplicationSlot(ReadReplicationSlotCmd * cmd)
+{
+	ReplicationSlot *slot;
+	ReplicationSlot slot_contents;
+	DestReceiver *dest;
+	TupOutputState *tstate;
+	TupleDesc	tupdesc;
+	Datum		values[5];
+	bool		nulls[5];
+	char		xloc[MAXFNAMELEN];
+	int			i = 0;
+	List	   *timeline_history = NIL;
+	TimeLineID	slots_position_timeline;
+	bool		has_value;
+
+	tupdesc = CreateTemplateTupleDesc(5);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "type",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "confirmed_flush_lsn",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "restart_tli",
+							  INT4OID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 5, "confirmed_flush_tli",
+							  INT4OID, -1, 0);
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	slot = SearchNamedReplicationSlot(cmd->slotname, false);
+	if (slot == NULL || !slot->in_use)
+	{
+		LWLockRelease(ReplicationSlotControlLock);
+		has_value = false;
+	}
+	else
+	{
+		/* Copy slot contents while holding spinlock */
+		SpinLockAcquire(&slot->mutex);
+		slot_contents = *slot;
+		SpinLockRelease(&slot->mutex);
+		LWLockRelease(ReplicationSlotControlLock);
+
+
+		if (slot_contents.data.database == InvalidOid)
+			values[i] = CStringGetTextDatum("physical");
+		else
+			values[i] = CStringGetTextDatum("logical");
+		nulls[i] = false;
+		i++;
+
+		if (slot_contents.data.restart_lsn != InvalidXLogRecPtr)
+		{
+			snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
+			values[i] = CStringGetTextDatum(xloc);
+			nulls[i] = false;
+		}
+		else
+		{
+			values[i] = 0;
+			nulls[i] = true;
+
+		}
+		i++;
+
+		if (slot_contents.data.confirmed_flush != InvalidXLogRecPtr)
+		{
+			snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(slot_contents.data.confirmed_flush));
+			values[i] = CStringGetTextDatum(xloc);
+			nulls[i] = false;
+		}
+		else
+		{
+			values[i] = 0;
+			nulls[i] = true;
+		}
+		i++;
+
+		/*
+		 * Now get the timeline this wal was produced on, to get to the
+		 * current timeline
+		 */
+		if (slot_contents.data.restart_lsn != InvalidXLogRecPtr)
+		{
+			timeline_history = readTimeLineHistory(ThisTimeLineID);
+			slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn, timeline_history);
+			values[i] = Int32GetDatum(slots_position_timeline);
+			nulls[i] = false;
+		}
+		else
+		{
+			values[i] = 0;
+			nulls[i] = true;
+		}
+		i++;
+
+		if (slot_contents.data.confirmed_flush != InvalidXLogRecPtr)
+		{
+			if (!timeline_history)
+				timeline_history = readTimeLineHistory(ThisTimeLineID);
+			slots_position_timeline = tliOfPointInHistory(slot_contents.data.confirmed_flush, timeline_history);
+			values[i] = Int32GetDatum(slots_position_timeline);
+			nulls[i] = false;
+		}
+		else
+		{
+			values[i] = 0;
+			nulls[i] = true;
+		}
+		i++;
+		has_value = true;
+	}
+
+	dest = CreateDestReceiver(DestRemoteSimple);
+	tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+	if (has_value)
+		do_tup_output(tstate, values, nulls);
+	end_tup_output(tstate);
+}
+
 
 /*
  * Handle TIMELINE_HISTORY command.
@@ -1618,6 +1740,13 @@ exec_replication_command(const char *cmd_string)
 			EndReplicationCommand(cmdtag);
 			break;
 
+		case T_ReadReplicationSlotCmd:
+			cmdtag = "READ_REPLICATION_SLOT";
+			set_ps_display(cmdtag);
+			ReadReplicationSlot((ReadReplicationSlotCmd *) cmd_node);
+			EndReplicationCommand(cmdtag);
+			break;
+
 		case T_BaseBackupCmd:
 			cmdtag = "BASE_BACKUP";
 			set_ps_display(cmdtag);
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 6a4d82f0a8..5f78bdd573 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -495,6 +495,7 @@ typedef enum NodeTag
 	 * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
 	 */
 	T_IdentifySystemCmd,
+	T_ReadReplicationSlotCmd,
 	T_BaseBackupCmd,
 	T_CreateReplicationSlotCmd,
 	T_DropReplicationSlotCmd,
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index faa3a251f2..ec85b7d993 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -33,6 +33,16 @@ typedef struct IdentifySystemCmd
 	NodeTag		type;
 } IdentifySystemCmd;
 
+/* ----------------------
+ *		READ_REPLICATION_SLOT command
+ * ----------------------
+ */
+typedef struct ReadReplicationSlotCmd
+{
+	NodeTag		type;
+	char		*slotname;
+} ReadReplicationSlotCmd;
+
 
 /* ----------------------
  *		BASE_BACKUP command
diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl
index ac581c1c07..75a8c6e45e 100644
--- a/src/test/recovery/t/001_stream_rep.pl
+++ b/src/test/recovery/t/001_stream_rep.pl
@@ -6,7 +6,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 49;
+use Test::More tests => 55;
 
 # Initialize primary node
 my $node_primary = PostgresNode->new('primary');
@@ -252,6 +252,50 @@ ok( $ret == 0,
 	"SHOW with superuser-settable parameter, replication role and logical replication"
 );
 
+note "testing READ_REPLICATION_SLOT command";
+
+my $slotname = 'test_read_replication_slot_physical';
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+  'postgres', 'READ_REPLICATION_SLOT non_existent_slot;',
+  extra_params => [ '-d', $connstr_rep ]);
+ok( $ret == 0,
+  "READ_REPLICATION_SLOT does not produce an error with non existent slot");
+ok( $stdout eq '',
+    "READ_REPLICATION_SLOT returns no tuple if a slot is non existent");
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+  'postgres',
+  "CREATE_REPLICATION_SLOT $slotname PHYSICAL RESERVE_WAL;",
+  extra_params => [ '-d', $connstr_rep ],
+  0,
+  'physical slot created on primary');
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+  'postgres',
+  "READ_REPLICATION_SLOT $slotname;",
+  extra_params => [ '-d', $connstr_rep ]);
+ok( $ret == 0,
+  "READ_REPLICATION_SLOT does not produce an error with existing slot");
+ok( $stdout =~ 'physical\|[^|]*\|\|1\|',
+    "READ_REPLICATION_SLOT returns tuple corresponding to the slot");
+
+$node_primary->psql(
+  'postgres',
+  "DROP_REPLICATION_SLOT $slotname;",
+  extra_params => [ '-d', $connstr_rep ],
+  0,
+  'physical slot dropped on primary');
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+  'postgres',
+  "READ_REPLICATION_SLOT $slotname;",
+  extra_params => [ '-d', $connstr_rep ]);
+ok( $ret == 0,
+  "READ_REPLICATION_SLOT does not produce an error with dropped slot");
+ok( $stdout eq '',
+    "READ_REPLICATION_SLOT returns no tuple if a slot has been dropped");
+
 note "switching to physical replication slot";
 
 # Switch to using a physical replication slot. We can do this without a new
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index cc116062c2..07396f40e8 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -10,7 +10,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 14;
+use Test::More tests => 16;
 use Config;
 
 # Initialize primary node
@@ -39,6 +39,17 @@ ok( $stderr =~
 	  m/replication slot "test_slot" was not created in this database/,
 	"Logical decoding correctly fails to start");
 
+($result, $stdout, $stderr) = $node_primary->psql('template1',
+  qq[READ_REPLICATION_SLOT test_slot;],
+  replication => 'database');
+ok ($stdout =~ 'logical\|[^|]*\|[^|]*\|1\|1',
+  'Logical replication slot can be read on any logical connection');
+($result, $stdout, $stderr) = $node_primary->psql('postgres',
+  qq[READ_REPLICATION_SLOT test_slot;],
+  replication => '1');
+ok ($stdout =~ 'logical\|[^|]*\|[^|]*\|1\|1',
+  'Logical replication slot can be read on a physical connection');
+
 # Check case of walsender not using a database connection.  Logical
 # decoding should not be allowed.
 ($result, $stdout, $stderr) = $node_primary->psql(
-- 
2.32.0

>From 546af79c4d0c2da5fa5bbf00d9280f6fee961e26 Mon Sep 17 00:00:00 2001
From: Ronan Dunklau <ronan.dunk...@aiven.io>
Date: Wed, 28 Jul 2021 16:35:39 +0200
Subject: [PATCH v3 2/3] Use READ_REPLICATION_SLOT command in pg_receivewal.

Prior to this patch, when running pg_receivewal, the start LSN is determined by looking at
the WAL files currently stored on disk, then using the current flush lsn
from the server.

If for some reason the WAL files from pg_receivewal were moved, we want
to restart where we left at, which is the replication slot's restart_lsn
instead of skipping right to the current flush location.

To keep compatibility with prior server versions, we only attempt it if
the version is < 15.
---
 src/bin/pg_basebackup/pg_receivewal.c        | 35 ++++++++-
 src/bin/pg_basebackup/streamutil.c           | 74 ++++++++++++++++++++
 src/bin/pg_basebackup/streamutil.h           |  3 +
 src/bin/pg_basebackup/t/020_pg_receivewal.pl | 46 +++++++++++-
 4 files changed, 154 insertions(+), 4 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index 4474273daf..1f769f6179 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -26,6 +26,7 @@
 #include "fe_utils/option_utils.h"
 #include "getopt_long.h"
 #include "libpq-fe.h"
+#include "pqexpbuffer.h"
 #include "receivelog.h"
 #include "streamutil.h"
 
@@ -408,10 +409,40 @@ StreamLog(void)
 	stream.startpos = FindStreamingStart(&stream.timeline);
 	if (stream.startpos == InvalidXLogRecPtr)
 	{
-		stream.startpos = serverpos;
-		stream.timeline = servertli;
+		/* Try to get it from the slot if any, and the server supports it */
+		if (replication_slot)
+		{
+			if (PQserverVersion(conn) >= 150000)
+			{
+				char	   *slot_type = NULL;
+
+				if (!GetSlotInformation(conn, replication_slot, &stream.startpos, &stream.timeline, &slot_type))
+					pg_log_warning("could not fetch the replication_slot \"%s\" information "
+								   "resuming from the current server position instead", replication_slot);
+				if (strcmp(slot_type, "physical") != 0)
+				{
+					pg_log_error("slot \"%s\" is not a physical replication slot",
+								 replication_slot);
+					exit(1);
+				}
+			}
+			else
+				pg_log_warning("server does not suport fetching the slot's position, "
+							   "resuming from the current server position instead");
+		}
+
+		/*
+		 * If it is still unknown, use the current flush value from the server
+		 */
+		if (stream.startpos == InvalidXLogRecPtr)
+		{
+			stream.startpos = serverpos;
+			stream.timeline = servertli;
+		}
 	}
 
+	Assert(stream.startpos != InvalidXLogRecPtr);
+
 	/*
 	 * Always start streaming at the beginning of a segment
 	 */
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index f5b3b476e5..b100a073d9 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -479,6 +479,80 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
 	return true;
 }
 
+
+/*
+ * Check a replication slot exists through a given connection, and give back to
+ * caller some result information if requested:
+ * 	- restart_lsn
+ * 	- timeline
+ */
+bool
+GetSlotInformation(PGconn *conn, const char *slot_name, XLogRecPtr *restart_lsn,
+				   uint32 *restart_lsn_timeline, char **slot_type)
+{
+	PGresult   *res;
+	PQExpBuffer query;
+	uint32		hi,
+				lo;
+
+	if (slot_name == NULL)
+		return InvalidXLogRecPtr;
+
+	query = createPQExpBuffer();
+	appendPQExpBuffer(query, "READ_REPLICATION_SLOT %s",
+					  slot_name);
+	res = PQexec(conn, query->data);
+	destroyPQExpBuffer(query);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not read replication slot : %s",
+					 PQerrorMessage(conn));
+		PQclear(res);
+		return InvalidXLogRecPtr;
+	}
+	if (PQntuples(res) == 0)
+	{
+		pg_log_error("replication slot \"%s\" does not exist", slot_name);
+		PQclear(0);
+		return false;
+	}
+	if (slot_type != NULL)
+	{
+		*slot_type = NULL;
+		if (!PQgetisnull(res, 0, 0))
+			*slot_type = pg_strdup(PQgetvalue(res, 0, 0));
+	}
+
+	if (PQntuples(res) != 1 || PQnfields(res) < 4)
+	{
+		pg_log_error("could not fetch replication slot LSN: got %d rows and %d fields, expected %d rows and %d or more fields",
+					 PQntuples(res), PQnfields(res), 1, 2);
+		PQclear(res);
+		return false;
+	}
+	if (PQgetisnull(res, 0, 1))
+	{
+		hi = 0;
+		lo = 0;
+	}
+	else
+	{
+		if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
+		{
+			pg_log_error("could not parse slot's restart_lsn \"%s\"",
+						 PQgetvalue(res, 0, 1));
+			PQclear(res);
+			return false;
+		}
+	}
+	if (restart_lsn)
+		*restart_lsn = ((uint64) hi) << 32 | lo;
+	if (restart_lsn_timeline && !PQgetisnull(res, 0, 3))
+		*restart_lsn_timeline = atoi(PQgetvalue(res, 0, 3));
+	PQclear(res);
+	return true;
+}
+
 /*
  * Create a replication slot for the given connection. This function
  * returns true in case of success.
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
index 504803b976..81115029d2 100644
--- a/src/bin/pg_basebackup/streamutil.h
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -40,6 +40,9 @@ extern bool RunIdentifySystem(PGconn *conn, char **sysid,
 							  TimeLineID *starttli,
 							  XLogRecPtr *startpos,
 							  char **db_name);
+extern bool GetSlotInformation(PGconn *conn, const char *slot_name,
+							   XLogRecPtr *restart_lsn, uint32 *restart_lsn_timeline,
+							   char **slot_type);
 extern bool RetrieveWalSegSize(PGconn *conn);
 extern TimestampTz feGetCurrentTimestamp(void);
 extern void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
diff --git a/src/bin/pg_basebackup/t/020_pg_receivewal.pl b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
index 0b33d73900..6b40778429 100644
--- a/src/bin/pg_basebackup/t/020_pg_receivewal.pl
+++ b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
@@ -5,7 +5,7 @@ use strict;
 use warnings;
 use TestLib;
 use PostgresNode;
-use Test::More tests => 27;
+use Test::More tests => 30;
 
 program_help_ok('pg_receivewal');
 program_version_ok('pg_receivewal');
@@ -49,7 +49,7 @@ is($primary->slot($slot_name)->{'slot_type'},
 
 # Generate some WAL.  Use --synchronous at the same time to add more
 # code coverage.  Switch to the next segment first so that subsequent
-# restarts of pg_receivewal will see this segment as full..
+# restarts of pg_receivewal will see this segment as full
 $primary->psql('postgres', 'CREATE TABLE test_table(x integer);');
 $primary->psql('postgres', 'SELECT pg_switch_wal();');
 my $nextlsn =
@@ -146,6 +146,48 @@ $primary->command_ok(
 $partial_wals[0] =~ s/(\.gz)?.partial//;
 ok(-e $partial_wals[0], "check that previously partial WAL is now complete");
 
+# Verify that if we use a replication slot, we resume where we left even in the
+# absence of WALs
+
+# Setup the slot, and connect to it a first time
+$primary->run_log(
+	[ 'pg_receivewal', '--slot', $slot_name, '--create-slot' ],
+	'creating a replication slot');
+$primary->psql('postgres',
+	'INSERT INTO test_table VALUES (generate_series(1,100));');
+$primary->psql('postgres', 'SELECT pg_switch_wal();');
+$nextlsn =
+  $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+chomp($nextlsn);
+$primary->run_log(
+	[ 'pg_receivewal', '-D', $stream_dir, '--slot', $slot_name, '--verbose', '--endpos', $nextlsn ],
+	"streaming some WAL");
+
+# Get the slot restart_lsn and make sure we retrieve the associated WAL file
+# even after deletion of the previous stored WAL files.
+$slot = $primary->slot($slot_name);
+my $restart_lsn = $slot->{'restart_lsn'};
+# Add one so that the restart_lsn doesn't correspond to the previous file.
+$restart_lsn =~ s/.$/1/;
+my $walfile_to_be_archived = $primary->safe_psql('postgres',
+	"SELECT pg_walfile_name('$restart_lsn');");
+isnt($restart_lsn, '', 'restart LSN of new slot is not null');
+
+unlink glob "'${stream_dir}/*'";
+
+$primary->psql('postgres',
+	'INSERT INTO test_table VALUES (generate_series(1,100));');
+$primary->psql('postgres', 'SELECT pg_switch_wal();');
+$nextlsn =
+  $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+chomp($nextlsn);
+$primary->command_ok(
+	[ 'pg_receivewal', '-D', $stream_dir, '--slot', $slot_name, '--verbose', '--endpos', $nextlsn ],
+	"Stream some wal starting from the slot's restart_lsn");
+$slot = $primary->slot($slot_name);
+my @walfiles = glob "${stream_dir}/*";
+ok(-e "$stream_dir/$walfile_to_be_archived", "WAL from the slot's restart_lsn has been archived");
+
 # Permissions on WAL files should be default
 SKIP:
 {
-- 
2.32.0

>From e6a867c8a9724d37ce04b56bd132f8bfc571d659 Mon Sep 17 00:00:00 2001
From: Ronan Dunklau <ronan.dunk...@aiven.io>
Date: Thu, 26 Aug 2021 14:05:26 +0200
Subject: [PATCH v3 3/3] Check slot existence in pg_basebackup.

Use the newly introduced READ_REPLICATION_SLOT command to check for a
slot existence in pg_basebackup. That way, we can fail early.
---
 src/bin/pg_basebackup/pg_basebackup.c        | 17 +++++++++++++++++
 src/bin/pg_basebackup/t/010_pg_basebackup.pl |  7 ++++---
 src/test/perl/PostgresNode.pm                | 20 ++++++++++++++++++++
 3 files changed, 41 insertions(+), 3 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 7296eb97d0..b93db32e23 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -1867,6 +1867,23 @@ BaseBackup(void)
 	if (!RunIdentifySystem(conn, &sysidentifier, &latesttli, NULL, NULL))
 		exit(1);
 
+	/*
+	 * Check the replication slot exists if applicable
+	 */
+	if (replication_slot && !(temp_replication_slot || create_slot) && PQserverVersion(conn) >= 15000)
+	{
+		char	   *slot_type = NULL;
+
+		if (!GetSlotInformation(conn, replication_slot, NULL, NULL, &slot_type))
+			exit(1);
+		if (strcmp(slot_type, "physical") != 0)
+		{
+			pg_log_error("Slot \"%s\" is not a physical replication slot",
+						 replication_slot);
+			exit(1);
+		}
+	}
+
 	/*
 	 * Start the actual backup
 	 */
diff --git a/src/bin/pg_basebackup/t/010_pg_basebackup.pl b/src/bin/pg_basebackup/t/010_pg_basebackup.pl
index a2cb2a7679..69fd262a97 100644
--- a/src/bin/pg_basebackup/t/010_pg_basebackup.pl
+++ b/src/bin/pg_basebackup/t/010_pg_basebackup.pl
@@ -10,7 +10,7 @@ use File::Path qw(rmtree);
 use Fcntl qw(:seek);
 use PostgresNode;
 use TestLib;
-use Test::More tests => 110;
+use Test::More tests => 111;
 
 program_help_ok('pg_basebackup');
 program_version_ok('pg_basebackup');
@@ -465,14 +465,15 @@ $node->command_ok(
 	'pg_basebackup -X stream runs with --no-slot');
 rmtree("$tempdir/backupnoslot");
 
-$node->command_fails(
+$node->command_fails_like(
 	[
 		'pg_basebackup',             '-D',
 		"$tempdir/backupxs_sl_fail", '-X',
 		'stream',                    '-S',
 		'slot0'
 	],
-	'pg_basebackup fails with nonexistent replication slot');
+	qr/pg_basebackup: error: replication slot "slot0" does not exist/,
+	'pg_basebackup fails early with nonexistent replication slot');
 
 $node->command_fails(
 	[ 'pg_basebackup', '-D', "$tempdir/backupxs_slot", '-C' ],
diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index 8158ea5b2f..c59da758c7 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -2267,6 +2267,26 @@ sub command_like
 
 =pod
 
+=item $node->command_fails_like(...)
+
+TestLib::command_fails_like with our connection parameters. See command_ok(...)
+
+=cut
+
+sub command_fails_like
+{
+	local $Test::Builder::Level = $Test::Builder::Level + 1;
+
+	my $self = shift;
+
+	local %ENV = $self->_get_env();
+
+	TestLib::command_fails_like(@_);
+	return;
+}
+
+=pod
+
 =item $node->command_checks_all(...)
 
 TestLib::command_checks_all with our connection parameters. See
-- 
2.32.0

Reply via email to