On Thu, Sep 02, 2021 at 02:45:54PM +0900, Kyotaro Horiguchi wrote: > At Wed, 1 Sep 2021 10:30:05 +0530, Bharath Rupireddy > <bharath.rupireddyforpostg...@gmail.com> wrote in > If I read the patch correctly the situation above is warned by the > following message then continue to the next step giving up to identify > start position from slot data.
Better to fallback to the past behavior if attempting to use a pg_receivewal >= 15 with a PG instance older than 14. >> "server does not suport fetching the slot's position, resuming from the >> current server position instead" > > (The message looks a bit too long, though..) Agreed. Falling back to a warning is not the best answer we can have here, as there could be various failure types, and for some of them a hard failure is adapted; - Failure in the backend while running READ_REPLICATION_SLOT. This should imply a hard failure, no? - Slot that does not exist. In this case, we could fall back to the current write position of the server. by default if the slot information cannot be retrieved. Something that's disturbing me in patch 0002 is that we would ignore the results of GetSlotInformation() if any error happens, even if there is a problem in the backend, like an OOM. We should be careful about the semantics here. > However, if the operator doesn't know the server is old, pg_receivewal > starts streaming from unexpected position, which is a kind of > disaster. So I'm inclined to agree to Bharath, but rather I imagine of > an option to explicitly specify how to determine the start position. > > --start-source=[server,wal,slot] specify starting-LSN source, default is > trying all of them in the order of wal, slot, server. > > I don't think the option doesn't need to accept multiple values at once. What is the difference between "wal" and "server"? "wal" stands for the start position of the set of files stored in the location directory, and "server" is the location that we'd receive from the server? I don't think that we need that because, when using a slot, we know that we can rely on the LSN that the slot retains for pg_receivewal as that should be the same point as what has been streamed last. Could there be an argument instead for changing the default and rely on the slot information rather than scanning the local WAL archive path for the start point when using --slot? When using pg_receivewal as a service, relying on a scan of the WAL archive directory if there is no slot and fallback to an invalid LSN if there is nothing is fine by me, but I think that just relying on the slot information is saner as the backend makes sure that nothing is missing. That's also more useful when streaming changes from a single slot from multiple locations (stream to location 1 with a slot, stop pg_receivewal, stream to location 2 that completes 1 with the same slot). + pg_log_error("Slot \"%s\" is not a physical replication slot", + replication_slot); In 0003, the format of this error is not really project-like. Something like that perhaps would be more adapted: "cannot use the slot provided, physical slot expected." I am not really convinced about the need of getting the active state and the PID used in the backend when fetcing the slot data, particularly if that's just for some frontend-side checks. The backend has safeguards already for all that. While looking at that, I have applied de1d4fe to add PostgresNode::command_fails_like(), coming from 0003, and put my hands on 0001 as per the attached, as the starting point. That basically comes down to all the points raised upthread, plus some tweaks for things I bumped into to get the semantics of the command to what looks like the right shape. -- Michael
From adbd72f70ee3592965f2a52500820d1387dcbf85 Mon Sep 17 00:00:00 2001 From: Michael Paquier <mich...@paquier.xyz> Date: Thu, 2 Sep 2021 16:25:25 +0900 Subject: [PATCH v4] Add READ_REPLICATION_SLOT command --- src/include/nodes/nodes.h | 1 + src/include/nodes/replnodes.h | 10 ++ src/backend/replication/repl_gram.y | 16 ++- src/backend/replication/repl_scanner.l | 1 + src/backend/replication/walsender.c | 112 ++++++++++++++++++++ src/test/recovery/t/001_stream_rep.pl | 47 +++++++- src/test/recovery/t/006_logical_decoding.pl | 15 ++- doc/src/sgml/protocol.sgml | 66 ++++++++++++ src/tools/pgindent/typedefs.list | 1 + 9 files changed, 266 insertions(+), 3 deletions(-) diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 6a4d82f0a8..5f78bdd573 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -495,6 +495,7 @@ typedef enum NodeTag * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h) */ T_IdentifySystemCmd, + T_ReadReplicationSlotCmd, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index faa3a251f2..46384ea074 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -33,6 +33,16 @@ typedef struct IdentifySystemCmd NodeTag type; } IdentifySystemCmd; +/* ---------------------- + * READ_REPLICATION_SLOT command + * ---------------------- + */ +typedef struct ReadReplicationSlotCmd +{ + NodeTag type; + char *slotname; +} ReadReplicationSlotCmd; + /* ---------------------- * BASE_BACKUP command diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index e1e8ec29cc..d34d617045 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -64,6 +64,7 @@ static SQLCmd *make_sqlcmd(void); /* Keyword tokens. */ %token K_BASE_BACKUP %token K_IDENTIFY_SYSTEM +%token K_READ_REPLICATION_SLOT %token K_SHOW %token K_START_REPLICATION %token K_CREATE_REPLICATION_SLOT @@ -94,7 +95,7 @@ static SQLCmd *make_sqlcmd(void); %type <node> command %type <node> base_backup start_replication start_logical_replication create_replication_slot drop_replication_slot identify_system - timeline_history show sql_cmd + read_replication_slot timeline_history show sql_cmd %type <list> base_backup_opt_list %type <defelt> base_backup_opt %type <uintval> opt_timeline @@ -120,6 +121,7 @@ opt_semicolon: ';' command: identify_system + | read_replication_slot | base_backup | start_replication | start_logical_replication @@ -140,6 +142,18 @@ identify_system: } ; +/* + * READ_REPLICATION_SLOT %s + */ +read_replication_slot: + K_READ_REPLICATION_SLOT var_name + { + ReadReplicationSlotCmd *n = makeNode(ReadReplicationSlotCmd); + n->slotname = $2; + $$ = (Node *) n; + } + ; + /* * SHOW setting */ diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index c038a636c3..1b599c255e 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -85,6 +85,7 @@ identifier {ident_start}{ident_cont}* BASE_BACKUP { return K_BASE_BACKUP; } FAST { return K_FAST; } IDENTIFY_SYSTEM { return K_IDENTIFY_SYSTEM; } +READ_REPLICATION_SLOT { return K_READ_REPLICATION_SLOT; } SHOW { return K_SHOW; } LABEL { return K_LABEL; } NOWAIT { return K_NOWAIT; } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3ca2a11389..afcc5f6612 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -232,6 +232,7 @@ static void XLogSendLogical(void); static void WalSndDone(WalSndSendDataCallback send_data); static XLogRecPtr GetStandbyFlushRecPtr(void); static void IdentifySystem(void); +static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd); static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd); static void DropReplicationSlot(DropReplicationSlotCmd *cmd); static void StartReplication(StartReplicationCmd *cmd); @@ -457,6 +458,110 @@ IdentifySystem(void) end_tup_output(tstate); } +/* Handle READ_REPLICATION_SLOT command */ +static void +ReadReplicationSlot(ReadReplicationSlotCmd *cmd) +{ +#define READ_REPLICATION_SLOT_COLS 5 + ReplicationSlot *slot; + DestReceiver *dest; + TupOutputState *tstate; + TupleDesc tupdesc; + Datum values[READ_REPLICATION_SLOT_COLS]; + bool nulls[READ_REPLICATION_SLOT_COLS]; + + tupdesc = CreateTemplateTupleDesc(READ_REPLICATION_SLOT_COLS); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "confirmed_flush_lsn", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "restart_tli", + INT4OID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 5, "confirmed_flush_tli", + INT4OID, -1, 0); + + MemSet(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool)); + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + slot = SearchNamedReplicationSlot(cmd->slotname, false); + if (slot == NULL || !slot->in_use) + { + LWLockRelease(ReplicationSlotControlLock); + } + else + { + List *timeline_history = NIL; + ReplicationSlot slot_contents; + int i = 0; + char xloc[MAXFNAMELEN]; + TimeLineID slots_position_timeline; + + /* Copy slot contents while holding spinlock */ + SpinLockAcquire(&slot->mutex); + slot_contents = *slot; + SpinLockRelease(&slot->mutex); + LWLockRelease(ReplicationSlotControlLock); + + if (OidIsValid(slot_contents.data.database)) + values[i] = CStringGetTextDatum("logical"); + else + values[i] = CStringGetTextDatum("physical"); + nulls[i] = false; + i++; + + if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn)) + { + snprintf(xloc, sizeof(xloc), "%X/%X", + LSN_FORMAT_ARGS(slot_contents.data.restart_lsn)); + values[i] = CStringGetTextDatum(xloc); + nulls[i] = false; + } + i++; + + if (!XLogRecPtrIsInvalid(slot_contents.data.confirmed_flush)) + { + snprintf(xloc, sizeof(xloc), "%X/%X", + LSN_FORMAT_ARGS(slot_contents.data.confirmed_flush)); + values[i] = CStringGetTextDatum(xloc); + nulls[i] = false; + } + i++; + + /* + * Now get the timeline this wal was produced on, to get to the + * current timeline + */ + if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn)) + { + timeline_history = readTimeLineHistory(ThisTimeLineID); + slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn, + timeline_history); + values[i] = Int32GetDatum(slots_position_timeline); + nulls[i] = false; + } + i++; + + if (!XLogRecPtrIsInvalid(slot_contents.data.confirmed_flush)) + { + if (!timeline_history) + timeline_history = readTimeLineHistory(ThisTimeLineID); + slots_position_timeline = tliOfPointInHistory(slot_contents.data.confirmed_flush, + timeline_history); + values[i] = Int32GetDatum(slots_position_timeline); + nulls[i] = false; + } + i++; + Assert(i == READ_REPLICATION_SLOT_COLS); + } + + dest = CreateDestReceiver(DestRemoteSimple); + tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); + do_tup_output(tstate, values, nulls); + end_tup_output(tstate); +} + /* * Handle TIMELINE_HISTORY command. @@ -1618,6 +1723,13 @@ exec_replication_command(const char *cmd_string) EndReplicationCommand(cmdtag); break; + case T_ReadReplicationSlotCmd: + cmdtag = "READ_REPLICATION_SLOT"; + set_ps_display(cmdtag); + ReadReplicationSlot((ReadReplicationSlotCmd *) cmd_node); + EndReplicationCommand(cmdtag); + break; + case T_BaseBackupCmd: cmdtag = "BASE_BACKUP"; set_ps_display(cmdtag); diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl index ac581c1c07..cbd817cda3 100644 --- a/src/test/recovery/t/001_stream_rep.pl +++ b/src/test/recovery/t/001_stream_rep.pl @@ -6,7 +6,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 49; +use Test::More tests => 55; # Initialize primary node my $node_primary = PostgresNode->new('primary'); @@ -252,6 +252,51 @@ ok( $ret == 0, "SHOW with superuser-settable parameter, replication role and logical replication" ); +note "testing READ_REPLICATION_SLOT command"; + +my $slotname = 'test_read_replication_slot_physical'; + +($ret, $stdout, $stderr) = $node_primary->psql( + 'postgres', + 'READ_REPLICATION_SLOT non_existent_slot;', + extra_params => [ '-d', $connstr_rep ]); +ok( $ret == 0, + "READ_REPLICATION_SLOT does not produce an error with non existent slot"); +ok($stdout eq '||||', + "READ_REPLICATION_SLOT returns NULL values if slot does not exist"); + +($ret, $stdout, $stderr) = $node_primary->psql( + 'postgres', + "CREATE_REPLICATION_SLOT $slotname PHYSICAL RESERVE_WAL;", + extra_params => [ '-d', $connstr_rep ], + 0, + 'physical slot created on primary'); + +($ret, $stdout, $stderr) = $node_primary->psql( + 'postgres', + "READ_REPLICATION_SLOT $slotname;", + extra_params => [ '-d', $connstr_rep ]); +ok($ret == 0, + "READ_REPLICATION_SLOT does not produce an error with existing slot"); +ok($stdout =~ 'physical\|[^|]*\|\|1\|', + "READ_REPLICATION_SLOT returns tuple corresponding to the slot"); + +$node_primary->psql( + 'postgres', + "DROP_REPLICATION_SLOT $slotname;", + extra_params => [ '-d', $connstr_rep ], + 0, + 'physical slot dropped on primary'); + +($ret, $stdout, $stderr) = $node_primary->psql( + 'postgres', + "READ_REPLICATION_SLOT $slotname;", + extra_params => [ '-d', $connstr_rep ]); +ok($ret == 0, + "READ_REPLICATION_SLOT does not produce an error with dropped slot"); +ok($stdout eq '||||', + "READ_REPLICATION_SLOT returns NULL values if slot has been dropped"); + note "switching to physical replication slot"; # Switch to using a physical replication slot. We can do this without a new diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index cc116062c2..50e4fa1042 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -10,7 +10,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 14; +use Test::More tests => 16; use Config; # Initialize primary node @@ -39,6 +39,19 @@ ok( $stderr =~ m/replication slot "test_slot" was not created in this database/, "Logical decoding correctly fails to start"); +($result, $stdout, $stderr) = $node_primary->psql( + 'template1', + qq[READ_REPLICATION_SLOT test_slot;], + replication => 'database'); +ok($stdout =~ 'logical\|[^|]*\|[^|]*\|1\|1', + 'Logical replication slot can be read on any logical connection'); +($result, $stdout, $stderr) = $node_primary->psql( + 'postgres', + qq[READ_REPLICATION_SLOT test_slot;], + replication => '1'); +ok($stdout =~ 'logical\|[^|]*\|[^|]*\|1\|1', + 'Logical replication slot can be read on a physical connection'); + # Check case of walsender not using a database connection. Logical # decoding should not be allowed. ($result, $stdout, $stderr) = $node_primary->psql( diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index a232546b1d..8191f17137 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2052,6 +2052,72 @@ The commands accepted in replication mode are: </listitem> </varlistentry> + <varlistentry> + <term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> + <indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm> + </term> + <listitem> + <para> + Read the information of a replication slot. Returns a tuple with + <literal>NULL</literal> values if the replication slot does not + exist. + </para> + <para> + In response to this command, the server will return a one-row result set, + containing the following fields: + <variablelist> + <varlistentry> + <term><literal>slot_type</literal> (<type>text</type>)</term> + <listitem> + <para> + The replication slot's type, either <literal>physical</literal> or + <literal>logical</literal>. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>restart_lsn</literal> (<type>text</type>)</term> + <listitem> + <para> + The replication slot's <literal>restart_lsn</literal>. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>confirmed_flush_lsn</literal> (<type>text</type>)</term> + <listitem> + <para> + The replication slot's <literal>confirmed_flush_lsn</literal>. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>restart_tli</literal> (<type>int4</type>)</term> + <listitem> + <para> + The timeline ID for the <literal>restart_lsn</literal> position, + when following the current timeline history. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>confirmed_flush_tli</literal> (<type>int4</type>)</term> + <listitem> + <para> + The timeline ID for the <literal>confirmed_flush_lsn</literal> + position, when following the current timeline history. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + <varlistentry> <term><literal>START_REPLICATION</literal> [ <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ] [ <literal>PHYSICAL</literal> ] <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>TIMELINE</literal> <replaceable class="parameter">tli</replaceable> ] <indexterm><primary>START_REPLICATION</primary></indexterm> diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index f31a1e4e1e..b256ee3be6 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2126,6 +2126,7 @@ ReadBufferMode ReadBytePtrType ReadExtraTocPtrType ReadFunc +ReadReplicationSlotCmd ReassignOwnedStmt RecheckForeignScan_function RecordCacheEntry -- 2.33.0
signature.asc
Description: PGP signature