On Thu, Sep 02, 2021 at 02:45:54PM +0900, Kyotaro Horiguchi wrote:
> At Wed, 1 Sep 2021 10:30:05 +0530, Bharath Rupireddy 
> <bharath.rupireddyforpostg...@gmail.com> wrote in 
> If I read the patch correctly the situation above is warned by the
> following message then continue to the next step giving up to identify
> start position from slot data.

Better to fallback to the past behavior if attempting to use a
pg_receivewal >= 15 with a PG instance older than 14.

>> "server does not suport fetching the slot's position, resuming from the 
>> current server position instead"
> 
> (The message looks a bit too long, though..)

Agreed.  Falling back to a warning is not the best answer we can have
here, as there could be various failure types, and for some of them a
hard failure is adapted;
- Failure in the backend while running READ_REPLICATION_SLOT.  This
should imply a hard failure, no?
- Slot that does not exist.  In this case, we could fall back to the
current write position of the server.  

by default if the slot information cannot be retrieved.
Something that's disturbing me in patch 0002 is that we would ignore
the results of GetSlotInformation() if any error happens, even if
there is a problem in the backend, like an OOM.  We should be careful
about the semantics here.

> However, if the operator doesn't know the server is old, pg_receivewal
> starts streaming from unexpected position, which is a kind of
> disaster. So I'm inclined to agree to Bharath, but rather I imagine of
> an option to explicitly specify how to determine the start position.
> 
> --start-source=[server,wal,slot]  specify starting-LSN source, default is
>                      trying all of them in the order of wal, slot, server. 
> 
> I don't think the option doesn't need to accept multiple values at once.

What is the difference between "wal" and "server"?  "wal" stands for
the start position of the set of files stored in the location
directory, and "server" is the location that we'd receive from the
server?  I don't think that we need that because, when using a slot,
we know that we can rely on the LSN that the slot retains for
pg_receivewal as that should be the same point as what has been
streamed last.  Could there be an argument instead for changing the
default and rely on the slot information rather than scanning the
local WAL archive path for the start point when using --slot?  When
using pg_receivewal as a service, relying on a scan of the WAL archive
directory if there is no slot and fallback to an invalid LSN if there
is nothing is fine by me, but I think that just relying on the slot
information is saner as the backend makes sure that nothing is
missing.  That's also more useful when streaming changes from a single
slot from multiple locations (stream to location 1 with a slot, stop
pg_receivewal, stream to location 2 that completes 1 with the same
slot).

+        pg_log_error("Slot \"%s\" is not a physical replication slot",
+                     replication_slot);
In 0003, the format of this error is not really project-like.
Something like that perhaps would be more adapted:
"cannot use the slot provided, physical slot expected."

I am not really convinced about the need of getting the active state
and the PID used in the backend when fetcing the slot data,
particularly if that's just for some frontend-side checks.  The
backend has safeguards already for all that.

While looking at that, I have applied de1d4fe to add 
PostgresNode::command_fails_like(), coming from 0003, and put my hands
on 0001 as per the attached, as the starting point.  That basically
comes down to all the points raised upthread, plus some tweaks for
things I bumped into to get the semantics of the command to what looks
like the right shape.
--
Michael
From adbd72f70ee3592965f2a52500820d1387dcbf85 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Thu, 2 Sep 2021 16:25:25 +0900
Subject: [PATCH v4] Add READ_REPLICATION_SLOT command

---
 src/include/nodes/nodes.h                   |   1 +
 src/include/nodes/replnodes.h               |  10 ++
 src/backend/replication/repl_gram.y         |  16 ++-
 src/backend/replication/repl_scanner.l      |   1 +
 src/backend/replication/walsender.c         | 112 ++++++++++++++++++++
 src/test/recovery/t/001_stream_rep.pl       |  47 +++++++-
 src/test/recovery/t/006_logical_decoding.pl |  15 ++-
 doc/src/sgml/protocol.sgml                  |  66 ++++++++++++
 src/tools/pgindent/typedefs.list            |   1 +
 9 files changed, 266 insertions(+), 3 deletions(-)

diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 6a4d82f0a8..5f78bdd573 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -495,6 +495,7 @@ typedef enum NodeTag
         * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
         */
        T_IdentifySystemCmd,
+       T_ReadReplicationSlotCmd,
        T_BaseBackupCmd,
        T_CreateReplicationSlotCmd,
        T_DropReplicationSlotCmd,
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index faa3a251f2..46384ea074 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -33,6 +33,16 @@ typedef struct IdentifySystemCmd
        NodeTag         type;
 } IdentifySystemCmd;
 
+/* ----------------------
+ *             READ_REPLICATION_SLOT command
+ * ----------------------
+ */
+typedef struct ReadReplicationSlotCmd
+{
+       NodeTag         type;
+       char       *slotname;
+} ReadReplicationSlotCmd;
+
 
 /* ----------------------
  *             BASE_BACKUP command
diff --git a/src/backend/replication/repl_gram.y 
b/src/backend/replication/repl_gram.y
index e1e8ec29cc..d34d617045 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -64,6 +64,7 @@ static SQLCmd *make_sqlcmd(void);
 /* Keyword tokens. */
 %token K_BASE_BACKUP
 %token K_IDENTIFY_SYSTEM
+%token K_READ_REPLICATION_SLOT
 %token K_SHOW
 %token K_START_REPLICATION
 %token K_CREATE_REPLICATION_SLOT
@@ -94,7 +95,7 @@ static SQLCmd *make_sqlcmd(void);
 %type <node>   command
 %type <node>   base_backup start_replication start_logical_replication
                                create_replication_slot drop_replication_slot 
identify_system
-                               timeline_history show sql_cmd
+                               read_replication_slot timeline_history show 
sql_cmd
 %type <list>   base_backup_opt_list
 %type <defelt> base_backup_opt
 %type <uintval>        opt_timeline
@@ -120,6 +121,7 @@ opt_semicolon:      ';'
 
 command:
                        identify_system
+                       | read_replication_slot
                        | base_backup
                        | start_replication
                        | start_logical_replication
@@ -140,6 +142,18 @@ identify_system:
                                }
                        ;
 
+/*
+ * READ_REPLICATION_SLOT %s
+ */
+read_replication_slot:
+                       K_READ_REPLICATION_SLOT var_name
+                               {
+                                       ReadReplicationSlotCmd *n = 
makeNode(ReadReplicationSlotCmd);
+                                       n->slotname = $2;
+                                       $$ = (Node *) n;
+                               }
+                       ;
+
 /*
  * SHOW setting
  */
diff --git a/src/backend/replication/repl_scanner.l 
b/src/backend/replication/repl_scanner.l
index c038a636c3..1b599c255e 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -85,6 +85,7 @@ identifier            {ident_start}{ident_cont}*
 BASE_BACKUP                    { return K_BASE_BACKUP; }
 FAST                   { return K_FAST; }
 IDENTIFY_SYSTEM                { return K_IDENTIFY_SYSTEM; }
+READ_REPLICATION_SLOT  { return K_READ_REPLICATION_SLOT; }
 SHOW           { return K_SHOW; }
 LABEL                  { return K_LABEL; }
 NOWAIT                 { return K_NOWAIT; }
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index 3ca2a11389..afcc5f6612 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -232,6 +232,7 @@ static void XLogSendLogical(void);
 static void WalSndDone(WalSndSendDataCallback send_data);
 static XLogRecPtr GetStandbyFlushRecPtr(void);
 static void IdentifySystem(void);
+static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd);
 static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
 static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
 static void StartReplication(StartReplicationCmd *cmd);
@@ -457,6 +458,110 @@ IdentifySystem(void)
        end_tup_output(tstate);
 }
 
+/* Handle READ_REPLICATION_SLOT command */
+static void
+ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
+{
+#define READ_REPLICATION_SLOT_COLS 5
+       ReplicationSlot *slot;
+       DestReceiver *dest;
+       TupOutputState *tstate;
+       TupleDesc       tupdesc;
+       Datum           values[READ_REPLICATION_SLOT_COLS];
+       bool            nulls[READ_REPLICATION_SLOT_COLS];
+
+       tupdesc = CreateTemplateTupleDesc(READ_REPLICATION_SLOT_COLS);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
+                                                         TEXTOID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
+                                                         TEXTOID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, 
"confirmed_flush_lsn",
+                                                         TEXTOID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "restart_tli",
+                                                         INT4OID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 5, 
"confirmed_flush_tli",
+                                                         INT4OID, -1, 0);
+
+       MemSet(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
+
+       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+       slot = SearchNamedReplicationSlot(cmd->slotname, false);
+       if (slot == NULL || !slot->in_use)
+       {
+               LWLockRelease(ReplicationSlotControlLock);
+       }
+       else
+       {
+               List       *timeline_history = NIL;
+               ReplicationSlot slot_contents;
+               int                     i = 0;
+               char            xloc[MAXFNAMELEN];
+               TimeLineID      slots_position_timeline;
+
+               /* Copy slot contents while holding spinlock */
+               SpinLockAcquire(&slot->mutex);
+               slot_contents = *slot;
+               SpinLockRelease(&slot->mutex);
+               LWLockRelease(ReplicationSlotControlLock);
+
+               if (OidIsValid(slot_contents.data.database))
+                       values[i] = CStringGetTextDatum("logical");
+               else
+                       values[i] = CStringGetTextDatum("physical");
+               nulls[i] = false;
+               i++;
+
+               if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+               {
+                       snprintf(xloc, sizeof(xloc), "%X/%X",
+                                        
LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
+                       values[i] = CStringGetTextDatum(xloc);
+                       nulls[i] = false;
+               }
+               i++;
+
+               if (!XLogRecPtrIsInvalid(slot_contents.data.confirmed_flush))
+               {
+                       snprintf(xloc, sizeof(xloc), "%X/%X",
+                                        
LSN_FORMAT_ARGS(slot_contents.data.confirmed_flush));
+                       values[i] = CStringGetTextDatum(xloc);
+                       nulls[i] = false;
+               }
+               i++;
+
+               /*
+                * Now get the timeline this wal was produced on, to get to the
+                * current timeline
+                */
+               if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+               {
+                       timeline_history = readTimeLineHistory(ThisTimeLineID);
+                       slots_position_timeline = 
tliOfPointInHistory(slot_contents.data.restart_lsn,
+                                                                               
                                  timeline_history);
+                       values[i] = Int32GetDatum(slots_position_timeline);
+                       nulls[i] = false;
+               }
+               i++;
+
+               if (!XLogRecPtrIsInvalid(slot_contents.data.confirmed_flush))
+               {
+                       if (!timeline_history)
+                               timeline_history = 
readTimeLineHistory(ThisTimeLineID);
+                       slots_position_timeline = 
tliOfPointInHistory(slot_contents.data.confirmed_flush,
+                                                                               
                                  timeline_history);
+                       values[i] = Int32GetDatum(slots_position_timeline);
+                       nulls[i] = false;
+               }
+               i++;
+               Assert(i == READ_REPLICATION_SLOT_COLS);
+       }
+
+       dest = CreateDestReceiver(DestRemoteSimple);
+       tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+       do_tup_output(tstate, values, nulls);
+       end_tup_output(tstate);
+}
+
 
 /*
  * Handle TIMELINE_HISTORY command.
@@ -1618,6 +1723,13 @@ exec_replication_command(const char *cmd_string)
                        EndReplicationCommand(cmdtag);
                        break;
 
+               case T_ReadReplicationSlotCmd:
+                       cmdtag = "READ_REPLICATION_SLOT";
+                       set_ps_display(cmdtag);
+                       ReadReplicationSlot((ReadReplicationSlotCmd *) 
cmd_node);
+                       EndReplicationCommand(cmdtag);
+                       break;
+
                case T_BaseBackupCmd:
                        cmdtag = "BASE_BACKUP";
                        set_ps_display(cmdtag);
diff --git a/src/test/recovery/t/001_stream_rep.pl 
b/src/test/recovery/t/001_stream_rep.pl
index ac581c1c07..cbd817cda3 100644
--- a/src/test/recovery/t/001_stream_rep.pl
+++ b/src/test/recovery/t/001_stream_rep.pl
@@ -6,7 +6,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 49;
+use Test::More tests => 55;
 
 # Initialize primary node
 my $node_primary = PostgresNode->new('primary');
@@ -252,6 +252,51 @@ ok( $ret == 0,
        "SHOW with superuser-settable parameter, replication role and logical 
replication"
 );
 
+note "testing READ_REPLICATION_SLOT command";
+
+my $slotname = 'test_read_replication_slot_physical';
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+       'postgres',
+       'READ_REPLICATION_SLOT non_existent_slot;',
+       extra_params => [ '-d', $connstr_rep ]);
+ok( $ret == 0,
+       "READ_REPLICATION_SLOT does not produce an error with non existent 
slot");
+ok($stdout eq '||||',
+       "READ_REPLICATION_SLOT returns NULL values if slot does not exist");
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+       'postgres',
+       "CREATE_REPLICATION_SLOT $slotname PHYSICAL RESERVE_WAL;",
+       extra_params => [ '-d', $connstr_rep ],
+       0,
+       'physical slot created on primary');
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+       'postgres',
+       "READ_REPLICATION_SLOT $slotname;",
+       extra_params => [ '-d', $connstr_rep ]);
+ok($ret == 0,
+       "READ_REPLICATION_SLOT does not produce an error with existing slot");
+ok($stdout =~ 'physical\|[^|]*\|\|1\|',
+       "READ_REPLICATION_SLOT returns tuple corresponding to the slot");
+
+$node_primary->psql(
+       'postgres',
+       "DROP_REPLICATION_SLOT $slotname;",
+       extra_params => [ '-d', $connstr_rep ],
+       0,
+       'physical slot dropped on primary');
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+       'postgres',
+       "READ_REPLICATION_SLOT $slotname;",
+       extra_params => [ '-d', $connstr_rep ]);
+ok($ret == 0,
+       "READ_REPLICATION_SLOT does not produce an error with dropped slot");
+ok($stdout eq '||||',
+       "READ_REPLICATION_SLOT returns NULL values if slot has been dropped");
+
 note "switching to physical replication slot";
 
 # Switch to using a physical replication slot. We can do this without a new
diff --git a/src/test/recovery/t/006_logical_decoding.pl 
b/src/test/recovery/t/006_logical_decoding.pl
index cc116062c2..50e4fa1042 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -10,7 +10,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 14;
+use Test::More tests => 16;
 use Config;
 
 # Initialize primary node
@@ -39,6 +39,19 @@ ok( $stderr =~
          m/replication slot "test_slot" was not created in this database/,
        "Logical decoding correctly fails to start");
 
+($result, $stdout, $stderr) = $node_primary->psql(
+       'template1',
+       qq[READ_REPLICATION_SLOT test_slot;],
+       replication => 'database');
+ok($stdout =~ 'logical\|[^|]*\|[^|]*\|1\|1',
+       'Logical replication slot can be read on any logical connection');
+($result, $stdout, $stderr) = $node_primary->psql(
+       'postgres',
+       qq[READ_REPLICATION_SLOT test_slot;],
+       replication => '1');
+ok($stdout =~ 'logical\|[^|]*\|[^|]*\|1\|1',
+       'Logical replication slot can be read on a physical connection');
+
 # Check case of walsender not using a database connection.  Logical
 # decoding should not be allowed.
 ($result, $stdout, $stderr) = $node_primary->psql(
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index a232546b1d..8191f17137 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2052,6 +2052,72 @@ The commands accepted in replication mode are:
     </listitem>
   </varlistentry>
 
+  <varlistentry>
+    <term><literal>READ_REPLICATION_SLOT</literal> <replaceable 
class="parameter">slot_name</replaceable>
+      <indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm>
+    </term>
+    <listitem>
+     <para>
+      Read the information of a replication slot. Returns a tuple with
+      <literal>NULL</literal> values if the replication slot does not
+      exist.
+     </para>
+     <para>
+      In response to this command, the server will return a one-row result set,
+      containing the following fields:
+      <variablelist>
+       <varlistentry>
+        <term><literal>slot_type</literal> (<type>text</type>)</term>
+        <listitem>
+         <para>
+          The replication slot's type, either <literal>physical</literal> or
+          <literal>logical</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
+
+       <varlistentry>
+        <term><literal>restart_lsn</literal> (<type>text</type>)</term>
+        <listitem>
+         <para>
+          The replication slot's <literal>restart_lsn</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
+
+       <varlistentry>
+        <term><literal>confirmed_flush_lsn</literal> (<type>text</type>)</term>
+        <listitem>
+         <para>
+          The replication slot's <literal>confirmed_flush_lsn</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
+
+       <varlistentry>
+        <term><literal>restart_tli</literal> (<type>int4</type>)</term>
+        <listitem>
+         <para>
+          The timeline ID for the <literal>restart_lsn</literal> position,
+          when following the current timeline history.
+         </para>
+        </listitem>
+       </varlistentry>
+
+       <varlistentry>
+        <term><literal>confirmed_flush_tli</literal> (<type>int4</type>)</term>
+        <listitem>
+         <para>
+          The timeline ID for the <literal>confirmed_flush_lsn</literal>
+          position, when following the current timeline history.
+         </para>
+        </listitem>
+       </varlistentry>
+      </variablelist>
+     </para>
+    </listitem>
+  </varlistentry>
+
   <varlistentry>
     <term><literal>START_REPLICATION</literal> [ <literal>SLOT</literal> 
<replaceable class="parameter">slot_name</replaceable> ] [ 
<literal>PHYSICAL</literal> ] <replaceable 
class="parameter">XXX/XXX</replaceable> [ <literal>TIMELINE</literal> 
<replaceable class="parameter">tli</replaceable> ]
      <indexterm><primary>START_REPLICATION</primary></indexterm>
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index f31a1e4e1e..b256ee3be6 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2126,6 +2126,7 @@ ReadBufferMode
 ReadBytePtrType
 ReadExtraTocPtrType
 ReadFunc
+ReadReplicationSlotCmd
 ReassignOwnedStmt
 RecheckForeignScan_function
 RecordCacheEntry
-- 
2.33.0

Attachment: signature.asc
Description: PGP signature

Reply via email to