Hi,

On 4/6/23 4:20 PM, Drouvot, Bertrand wrote:
Hi,

On 4/6/23 3:39 PM, Amit Kapila wrote:
On Thu, Apr 6, 2023 at 6:32 PM Drouvot, Bertrand
<bertranddrouvot...@gmail.com> wrote:


I don't think it could be possible to create logical walsenders on a standby if
AllowCascadeReplication() is not true, or am I missing something?


Right, so why to even traverse walsenders for that case? What I was
imagining a code is like:
if (AllowCascadeReplication())
     WalSndWakeup(switchedTLI, true);

Do you see any problem with this change?

Not at all, it looks good to me.



Done in V63 attached and did change the associated comment a bit.


Few more minor comments on 0005
=============================
0005
1.
+       <para>
+        Take a snapshot of running transactions and write this to WAL without
+        having to wait bgwriter or checkpointer to log one.

/wait bgwriter/wait for bgwriter

2.
+use Test::More tests => 67;

We no more use the number of tests. Please refer to other similar tests.


Thanks! Will update 0005.


Done in V63.

Regards,

--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From b5ff35d8ace1e429c6a15d53203b00304a3ff1f4 Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 7 Feb 2023 14:08:11 +0000
Subject: [PATCH v63 6/6] Doc changes describing details about logical
 decoding.

Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes 
Mello
---
 doc/src/sgml/logicaldecoding.sgml | 27 +++++++++++++++++++++++++++
 1 file changed, 27 insertions(+)
 100.0% doc/src/sgml/

diff --git a/doc/src/sgml/logicaldecoding.sgml 
b/doc/src/sgml/logicaldecoding.sgml
index 4e912b4bd4..8651024b8a 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -316,6 +316,33 @@ postgres=# select * from 
pg_logical_slot_get_changes('regression_slot', NULL, NU
      may consume changes from a slot at any given time.
     </para>
 
+    <para>
+     A logical replication slot can also be created on a hot standby. To 
prevent
+     <command>VACUUM</command> from removing required rows from the system
+     catalogs, <varname>hot_standby_feedback</varname> should be set on the
+     standby. In spite of that, if any required rows get removed, the slot gets
+     invalidated. It's highly recommended to use a physical slot between the 
primary
+     and the standby. Otherwise, hot_standby_feedback will work, but only 
while the
+     connection is alive (for example a node restart would break it). Then, the
+     primary may delete system catalog rows that could be needed by the logical
+     decoding on the standby (as it does not know about the catalog_xmin on the
+     standby). Existing logical slots on standby also get invalidated if 
wal_level
+     on primary is reduced to less than 'logical'. This is done as soon as the
+     standby detects such a change in the WAL stream. It means, that for 
walsenders
+     that are lagging (if any), some WAL records up to the wal_level parameter 
change
+     on the primary won't be decoded.
+    </para>
+
+    <para>
+     For a logical slot to be created, it builds a historic snapshot, for which
+     information of all the currently running transactions is essential. On
+     primary, this information is available, but on standby, this information
+     has to be obtained from primary. So, slot creation may wait for some
+     activity to happen on the primary. If the primary is idle, creating a
+     logical slot on standby may take a noticeable time. One option to speed it
+     is to call the <function>pg_log_standby_snapshot</function> on the 
primary.
+    </para>
+
     <caution>
      <para>
       Replication slots persist across crashes and know nothing about the state
-- 
2.34.1

From ee35ea655de6d3178a1ec1b7b70345e2f4adccb5 Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 7 Feb 2023 09:04:12 +0000
Subject: [PATCH v63 5/6] New TAP test for logical decoding on standby.

In addition to the new TAP test, this commit introduces a new 
pg_log_standby_snapshot()
function.

The idea is to be able to take a snapshot of running transactions and write this
to WAL without requesting for a (costly) checkpoint.

Author: Craig Ringer (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes 
Mello
---
 doc/src/sgml/func.sgml                        |  15 +
 src/backend/access/transam/xlogfuncs.c        |  32 +
 src/backend/catalog/system_functions.sql      |   2 +
 src/include/catalog/pg_proc.dat               |   3 +
 src/test/perl/PostgreSQL/Test/Cluster.pm      |  37 +
 src/test/recovery/meson.build                 |   1 +
 .../t/035_standby_logical_decoding.pl         | 706 ++++++++++++++++++
 7 files changed, 796 insertions(+)
   3.1% src/backend/
   4.0% src/test/perl/PostgreSQL/Test/
  89.6% src/test/recovery/t/

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index dc44a74eb2..9253cd1c18 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -27032,6 +27032,21 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * 
ps.setting::int + :offset
         prepared with <xref linkend="sql-prepare-transaction"/>.
        </para></entry>
       </row>
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+         <primary>pg_log_standby_snapshot</primary>
+        </indexterm>
+        <function>pg_log_standby_snapshot</function> ()
+        <returnvalue>pg_lsn</returnvalue>
+       </para>
+       <para>
+        Take a snapshot of running transactions and write this to WAL without
+        having to wait bgwriter or checkpointer to log one. This one is useful 
for
+        logical decoding on standby for which logical slot creation is hanging
+        until such a record is replayed on the standby.
+       </para></entry>
+      </row>
      </tbody>
     </tgroup>
    </table>
diff --git a/src/backend/access/transam/xlogfuncs.c 
b/src/backend/access/transam/xlogfuncs.c
index c07daa874f..36a309b54c 100644
--- a/src/backend/access/transam/xlogfuncs.c
+++ b/src/backend/access/transam/xlogfuncs.c
@@ -31,6 +31,7 @@
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/smgr.h"
+#include "storage/standby.h"
 #include "utils/builtins.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
@@ -196,6 +197,37 @@ pg_switch_wal(PG_FUNCTION_ARGS)
        PG_RETURN_LSN(switchpoint);
 }
 
+/*
+ * pg_log_standby_snapshot: call LogStandbySnapshot()
+ *
+ * Permission checking for this function is managed through the normal
+ * GRANT system.
+ */
+Datum
+pg_log_standby_snapshot(PG_FUNCTION_ARGS)
+{
+       XLogRecPtr      recptr;
+
+       if (RecoveryInProgress())
+               ereport(ERROR,
+                               
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("recovery is in progress"),
+                                errhint("pg_log_standby_snapshot() cannot be 
executed during recovery.")));
+
+       if (!XLogStandbyInfoActive())
+               ereport(ERROR,
+                               
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("wal_level is not in desired state"),
+                                errhint("wal_level has to be >= 
WAL_LEVEL_REPLICA.")));
+
+       recptr = LogStandbySnapshot();
+
+       /*
+        * As a convenience, return the WAL location of the last inserted record
+        */
+       PG_RETURN_LSN(recptr);
+}
+
 /*
  * pg_create_restore_point: a named point for restore
  *
diff --git a/src/backend/catalog/system_functions.sql 
b/src/backend/catalog/system_functions.sql
index 83ca893444..b7c65ea37d 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -644,6 +644,8 @@ REVOKE EXECUTE ON FUNCTION pg_create_restore_point(text) 
FROM public;
 
 REVOKE EXECUTE ON FUNCTION pg_switch_wal() FROM public;
 
+REVOKE EXECUTE ON FUNCTION pg_log_standby_snapshot() FROM public;
+
 REVOKE EXECUTE ON FUNCTION pg_wal_replay_pause() FROM public;
 
 REVOKE EXECUTE ON FUNCTION pg_wal_replay_resume() FROM public;
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index bcbae9036d..284138727e 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -6426,6 +6426,9 @@
 { oid => '2848', descr => 'switch to new wal file',
   proname => 'pg_switch_wal', provolatile => 'v', prorettype => 'pg_lsn',
   proargtypes => '', prosrc => 'pg_switch_wal' },
+{ oid => '9658', descr => 'log details of the current snapshot to WAL',
+  proname => 'pg_log_standby_snapshot', provolatile => 'v', prorettype => 
'pg_lsn',
+  proargtypes => '', prosrc => 'pg_log_standby_snapshot' },
 { oid => '3098', descr => 'create a named restore point',
   proname => 'pg_create_restore_point', provolatile => 'v',
   prorettype => 'pg_lsn', proargtypes => 'text',
diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm 
b/src/test/perl/PostgreSQL/Test/Cluster.pm
index a3aef8b5e9..62376de602 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -3076,6 +3076,43 @@ $SIG{TERM} = $SIG{INT} = sub {
 
 =pod
 
+=item $node->create_logical_slot_on_standby(self, primary, slot_name, dbname)
+
+Create logical replication slot on given standby
+
+=cut
+
+sub create_logical_slot_on_standby
+{
+       my ($self, $primary, $slot_name, $dbname) = @_;
+       my ($stdout, $stderr);
+
+       my $handle;
+
+       $handle = IPC::Run::start(['pg_recvlogical', '-d', 
$self->connstr($dbname), '-P', 'test_decoding', '-S', $slot_name, 
'--create-slot'], '>', \$stdout, '2>', \$stderr);
+
+       # Once slot restart_lsn is created, the standby looks for 
xl_running_xacts
+       # WAL record from the restart_lsn onwards. So firstly, wait until the 
slot
+       # restart_lsn is evaluated.
+
+       $self->poll_query_until(
+               'postgres', qq[
+               SELECT restart_lsn IS NOT NULL
+               FROM pg_catalog.pg_replication_slots WHERE slot_name = 
'$slot_name'
+       ]) or die "timed out waiting for logical slot to calculate its 
restart_lsn";
+
+       # Now arrange for the xl_running_xacts record for which pg_recvlogical
+       # is waiting.
+       $primary->safe_psql('postgres', 'SELECT pg_log_standby_snapshot()');
+
+       $handle->finish();
+
+       is($self->slot($slot_name)->{'slot_type'}, 'logical', $slot_name . ' on 
standby created')
+               or die "could not create slot" . $slot_name;
+}
+
+=pod
+
 =back
 
 =cut
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index 59465b97f3..e834ad5e0d 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -40,6 +40,7 @@ tests += {
       't/032_relfilenode_reuse.pl',
       't/033_replay_tsp_drops.pl',
       't/034_create_database.pl',
+      't/035_standby_logical_decoding.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl 
b/src/test/recovery/t/035_standby_logical_decoding.pl
new file mode 100644
index 0000000000..5b6d19d379
--- /dev/null
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -0,0 +1,706 @@
+# logical decoding on standby : test logical decoding,
+# recovery conflict and standby promotion.
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my ($stdin, $stdout, $stderr, $cascading_stdout, $cascading_stderr, $ret, 
$handle, $slot);
+
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $node_cascading_standby = 
PostgreSQL::Test::Cluster->new('cascading_standby');
+my $default_timeout = $PostgreSQL::Test::Utils::timeout_default;
+my $res;
+
+# Name for the physical slot on primary
+my $primary_slotname = 'primary_physical';
+my $standby_physical_slotname = 'standby_physical';
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+       my ($node, $pat, $off) = @_;
+
+       $off = 0 unless defined $off;
+       my $log = PostgreSQL::Test::Utils::slurp_file($node->logfile);
+       return 0 if (length($log) <= $off);
+
+       $log = substr($log, $off);
+
+       return $log =~ m/$pat/;
+}
+
+# Fetch xmin columns from slot's pg_replication_slots row, after waiting for
+# given boolean condition to be true to ensure we've reached a quiescent state.
+sub wait_for_xmins
+{
+       my ($node, $slotname, $check_expr) = @_;
+
+       $node->poll_query_until(
+               'postgres', qq[
+               SELECT $check_expr
+               FROM pg_catalog.pg_replication_slots
+               WHERE slot_name = '$slotname';
+       ]) or die "Timed out waiting for slot xmins to advance";
+}
+
+# Create the required logical slots on standby.
+sub create_logical_slots
+{
+       my ($node) = @_;
+       $node->create_logical_slot_on_standby($node_primary, 'inactiveslot', 
'testdb');
+       $node->create_logical_slot_on_standby($node_primary, 'activeslot', 
'testdb');
+}
+
+# Drop the logical slots on standby.
+sub drop_logical_slots
+{
+       $node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('inactiveslot')]);
+       $node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('activeslot')]);
+}
+
+# Acquire one of the standby logical slots created by create_logical_slots().
+# In case wait is true we are waiting for an active pid on the 'activeslot' 
slot.
+# If wait is not true it means we are testing a known failure scenario.
+sub make_slot_active
+{
+       my ($node, $wait, $to_stdout, $to_stderr) = @_;
+       my $slot_user_handle;
+
+       $slot_user_handle = IPC::Run::start(['pg_recvlogical', '-d', 
$node->connstr('testdb'), '-S', 'activeslot', '-o', 'include-xids=0', '-o', 
'skip-empty-xacts=1', '--no-loop', '--start', '-f', '-'], '>', $to_stdout, 
'2>', $to_stderr);
+
+       if ($wait)
+       {
+               # make sure activeslot is in use
+               $node->poll_query_until('testdb',
+                       "SELECT EXISTS (SELECT 1 FROM pg_replication_slots 
WHERE slot_name = 'activeslot' AND active_pid IS NOT NULL)"
+               ) or die "slot never became active";
+       }
+       return $slot_user_handle;
+}
+
+# Check pg_recvlogical stderr
+sub check_pg_recvlogical_stderr
+{
+       my ($slot_user_handle, $check_stderr) = @_;
+       my $return;
+
+       # our client should've terminated in response to the walsender error
+       $slot_user_handle->finish;
+       $return = $?;
+       cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero");
+       if ($return) {
+               like($stderr, qr/$check_stderr/, 'slot has been invalidated');
+       }
+
+       return 0;
+}
+
+# Check if all the slots on standby are dropped. These include the 'activeslot'
+# that was acquired by make_slot_active(), and the non-active 'inactiveslot'.
+sub check_slots_dropped
+{
+       my ($slot_user_handle) = @_;
+
+       is($node_standby->slot('inactiveslot')->{'slot_type'}, '', 
'inactiveslot on standby dropped');
+       is($node_standby->slot('activeslot')->{'slot_type'}, '', 'activeslot on 
standby dropped');
+
+       check_pg_recvlogical_stderr($slot_user_handle, "conflict with 
recovery");
+}
+
+# Change hot_standby_feedback and check xmin and catalog_xmin values.
+sub change_hot_standby_feedback_and_wait_for_xmins
+{
+       my ($hsf, $invalidated) = @_;
+
+       $node_standby->append_conf('postgresql.conf',qq[
+       hot_standby_feedback = $hsf
+       ]);
+
+       $node_standby->reload;
+
+       if ($hsf && $invalidated)
+       {
+               # With hot_standby_feedback on, xmin should advance,
+               # but catalog_xmin should still remain NULL since there is no 
logical slot.
+               wait_for_xmins($node_primary, $primary_slotname,
+                          "xmin IS NOT NULL AND catalog_xmin IS NULL");
+       }
+       elsif ($hsf)
+       {
+               # With hot_standby_feedback on, xmin and catalog_xmin should 
advance.
+               wait_for_xmins($node_primary, $primary_slotname,
+                          "xmin IS NOT NULL AND catalog_xmin IS NOT NULL");
+       }
+       else
+       {
+               # Both should be NULL since hs_feedback is off
+               wait_for_xmins($node_primary, $primary_slotname,
+                          "xmin IS NULL AND catalog_xmin IS NULL");
+
+       }
+}
+
+# Check conflicting status in pg_replication_slots.
+sub check_slots_conflicting_status
+{
+       my ($conflicting) = @_;
+
+       if ($conflicting)
+       {
+               $res = $node_standby->safe_psql(
+                               'postgres', qq(
+                                select bool_and(conflicting) from 
pg_replication_slots;));
+
+               is($res, 't',
+                       "Logical slots are reported as conflicting");
+       }
+       else
+       {
+               $res = $node_standby->safe_psql(
+                               'postgres', qq(
+                               select bool_or(conflicting) from 
pg_replication_slots;));
+
+               is($res, 'f',
+                       "Logical slots are reported as non conflicting");
+       }
+}
+
+########################
+# Initialize primary node
+########################
+
+$node_primary->init(allows_streaming => 1, has_archiving => 1);
+$node_primary->append_conf('postgresql.conf', q{
+wal_level = 'logical'
+max_replication_slots = 4
+max_wal_senders = 4
+log_min_messages = 'debug2'
+log_error_verbosity = verbose
+});
+$node_primary->dump_info;
+$node_primary->start;
+
+$node_primary->psql('postgres', q[CREATE DATABASE testdb]);
+
+$node_primary->safe_psql('testdb', qq[SELECT * FROM 
pg_create_physical_replication_slot('$primary_slotname');]);
+
+# Check conflicting is NULL for physical slot
+$res = $node_primary->safe_psql(
+               'postgres', qq[
+                SELECT conflicting is null FROM pg_replication_slots where 
slot_name = '$primary_slotname';]);
+
+is($res, 't',
+       "Physical slot reports conflicting as NULL");
+
+my $backup_name = 'b1';
+$node_primary->backup($backup_name);
+
+#######################
+# Initialize standby node
+#######################
+
+$node_standby->init_from_backup(
+       $node_primary, $backup_name,
+       has_streaming => 1,
+       has_restoring => 1);
+$node_standby->append_conf('postgresql.conf',
+       qq[primary_slot_name = '$primary_slotname']);
+$node_standby->start;
+$node_primary->wait_for_replay_catchup($node_standby);
+$node_standby->safe_psql('testdb', qq[SELECT * FROM 
pg_create_physical_replication_slot('$standby_physical_slotname');]);
+
+#######################
+# Initialize cascading standby node
+#######################
+$node_standby->backup($backup_name);
+$node_cascading_standby->init_from_backup(
+       $node_standby, $backup_name,
+       has_streaming => 1,
+       has_restoring => 1);
+$node_cascading_standby->append_conf('postgresql.conf',
+       qq[primary_slot_name = '$standby_physical_slotname']);
+$node_cascading_standby->start;
+$node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary);
+
+##################################################
+# Test that logical decoding on the standby
+# behaves correctly.
+##################################################
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y 
text);]);
+$node_primary->safe_psql('testdb', qq[INSERT INTO decoding_test(x,y) SELECT s, 
s::text FROM generate_series(1,10) s;]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+my $result = $node_standby->safe_psql('testdb',
+       qq[SELECT pg_logical_slot_get_changes('activeslot', NULL, NULL);]);
+
+# test if basic decoding works
+is(scalar(my @foobar = split /^/m, $result),
+       14, 'Decoding produced 14 rows (2 BEGIN/COMMIT and 10 rows)');
+
+# Insert some rows and verify that we get the same results from pg_recvlogical
+# and the SQL interface.
+$node_primary->safe_psql('testdb',
+       qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM 
generate_series(1,4) s;]
+);
+
+my $expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT};
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+my $stdout_sql = $node_standby->safe_psql('testdb',
+       qq[SELECT data FROM pg_logical_slot_peek_changes('activeslot', NULL, 
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session');
+
+my $endpos = $node_standby->safe_psql('testdb',
+       "SELECT lsn FROM pg_logical_slot_peek_changes('activeslot', NULL, NULL) 
ORDER BY lsn DESC LIMIT 1;"
+);
+
+# Insert some rows after $endpos, which we won't read.
+$node_primary->safe_psql('testdb',
+       qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM 
generate_series(5,50) s;]
+);
+
+$node_primary->wait_for_catchup($node_standby);
+
+my $stdout_recv = $node_standby->pg_recvlogical_upto(
+    'testdb', 'activeslot', $endpos, $default_timeout,
+    'include-xids'     => '0',
+    'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, $expected,
+    'got same expected output from pg_recvlogical decoding session');
+
+$node_standby->poll_query_until('testdb',
+       "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 
'activeslot' AND active_pid IS NULL)"
+) or die "slot never became inactive";
+
+$stdout_recv = $node_standby->pg_recvlogical_upto(
+    'testdb', 'activeslot', $endpos, $default_timeout,
+    'include-xids'     => '0',
+    'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, '', 'pg_recvlogical acknowledged changes');
+
+$node_primary->safe_psql('postgres', 'CREATE DATABASE otherdb');
+
+is( $node_primary->psql(
+        'otherdb',
+        "SELECT lsn FROM pg_logical_slot_peek_changes('activeslot', NULL, 
NULL) ORDER BY lsn DESC LIMIT 1;"
+    ),
+    3,
+    'replaying logical slot from another database fails');
+
+# drop the logical slots
+drop_logical_slots();
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 1: hot_standby_feedback off and vacuum FULL
+##################################################
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# One way to produce recovery conflict is to create/drop a relation and
+# launch a vacuum full on pg_class with hot_standby_feedback turned off on
+# the standby.
+change_hot_standby_feedback_and_wait_for_xmins(0,1);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y 
text);]);
+$node_primary->safe_psql('testdb', qq[DROP TABLE conflict_test;]);
+$node_primary->safe_psql('testdb', 'VACUUM full pg_class;');
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should be issued
+ok( find_in_log(
+   $node_standby,
+  "invalidating obsolete replication slot \"inactiveslot\""),
+  'inactiveslot slot invalidation is logged with vacuum FULL on pg_class');
+
+ok( find_in_log(
+   $node_standby,
+  "invalidating obsolete replication slot \"activeslot\""),
+  'activeslot slot invalidation is logged with vacuum FULL on pg_class');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been 
updated
+ok( $node_standby->poll_query_until(
+       'postgres',
+       "select (confl_active_logicalslot = 1) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
+       'confl_active_logicalslot updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication 
slot \"activeslot\"");
+
+# Turn hot_standby_feedback back on
+change_hot_standby_feedback_and_wait_for_xmins(1,1);
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 2: conflict due to row removal with hot_standby_feedback off.
+##################################################
+
+# get the position to search from in the standby logfile
+my $logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# One way to produce recovery conflict is to create/drop a relation and
+# launch a vacuum on pg_class with hot_standby_feedback turned off on the 
standby.
+change_hot_standby_feedback_and_wait_for_xmins(0,1);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y 
text);]);
+$node_primary->safe_psql('testdb', qq[DROP TABLE conflict_test;]);
+$node_primary->safe_psql('testdb', 'VACUUM pg_class;');
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should be issued
+ok( find_in_log(
+   $node_standby,
+  "invalidating obsolete replication slot \"inactiveslot\"", $logstart),
+  'inactiveslot slot invalidation is logged with vacuum on pg_class');
+
+ok( find_in_log(
+   $node_standby,
+  "invalidating obsolete replication slot \"activeslot\"", $logstart),
+  'activeslot slot invalidation is logged with vacuum on pg_class');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been 
updated
+# we now expect 2 conflicts reported as the counter persist across reloads
+ok( $node_standby->poll_query_until(
+       'postgres',
+       "select (confl_active_logicalslot = 2) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
+       'confl_active_logicalslot updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication 
slot \"activeslot\"");
+
+##################################################
+# Recovery conflict: Same as Scenario 2 but on a non catalog table
+# Scenario 3: No conflict expected.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# put hot standby feedback to off
+change_hot_standby_feedback_and_wait_for_xmins(0,1);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# This should not trigger a conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y 
text);]);
+$node_primary->safe_psql('testdb', qq[INSERT INTO conflict_test(x,y) SELECT s, 
s::text FROM generate_series(1,4) s;]);
+$node_primary->safe_psql('testdb', qq[UPDATE conflict_test set x=1, y=1;]);
+$node_primary->safe_psql('testdb', 'VACUUM conflict_test;');
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should not be issued
+ok( !find_in_log(
+   $node_standby,
+  "invalidating obsolete slot \"inactiveslot\"", $logstart),
+  'inactiveslot slot invalidation is not logged with vacuum on conflict_test');
+
+ok( !find_in_log(
+   $node_standby,
+  "invalidating obsolete slot \"activeslot\"", $logstart),
+  'activeslot slot invalidation is not logged with vacuum on conflict_test');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has not been 
updated
+# we now still expect 2 conflicts reported as the counter persist across 
reloads
+ok( $node_standby->poll_query_until(
+       'postgres',
+       "select (confl_active_logicalslot = 2) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
+       'confl_active_logicalslot not updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+
+# Verify slots are reported as non conflicting in pg_replication_slots
+check_slots_conflicting_status(0);
+
+# Turn hot_standby_feedback back on
+change_hot_standby_feedback_and_wait_for_xmins(1, 0);
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 4: conflict due to on-access pruning.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# One way to produce recovery conflict is to trigger an on-access pruning
+# on a relation marked as user_catalog_table.
+change_hot_standby_feedback_and_wait_for_xmins(0,0);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE prun(id integer, s 
char(2000)) WITH (fillfactor = 75, user_catalog_table = true);]);
+$node_primary->safe_psql('testdb', qq[INSERT INTO prun VALUES (1, 'A');]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'B';]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'C';]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'D';]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'E';]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should be issued
+ok( find_in_log(
+   $node_standby,
+  "invalidating obsolete replication slot \"inactiveslot\"", $logstart),
+  'inactiveslot slot invalidation is logged with on-access pruning');
+
+ok( find_in_log(
+   $node_standby,
+  "invalidating obsolete replication slot \"activeslot\"", $logstart),
+  'activeslot slot invalidation is logged with on-access pruning');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been 
updated
+# we now expect 3 conflicts reported as the counter persist across reloads
+ok( $node_standby->poll_query_until(
+       'postgres',
+       "select (confl_active_logicalslot = 3) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
+       'confl_active_logicalslot updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication 
slot \"activeslot\"");
+
+# Turn hot_standby_feedback back on
+change_hot_standby_feedback_and_wait_for_xmins(1, 1);
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 5: incorrect wal_level on primary.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+
+# Make primary wal_level replica. This will trigger slot conflict.
+$node_primary->append_conf('postgresql.conf',q[
+wal_level = 'replica'
+]);
+$node_primary->restart;
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should be issued
+ok( find_in_log(
+   $node_standby,
+  "invalidating obsolete replication slot \"inactiveslot\"", $logstart),
+  'inactiveslot slot invalidation is logged due to wal_level');
+
+ok( find_in_log(
+   $node_standby,
+  "invalidating obsolete replication slot \"activeslot\"", $logstart),
+  'activeslot slot invalidation is logged due to wal_level');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been 
updated
+# we now expect 4 conflicts reported as the counter persist across reloads
+ok( $node_standby->poll_query_until(
+       'postgres',
+       "select (confl_active_logicalslot = 4) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
+       'confl_active_logicalslot updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+# We are not able to read from the slot as it requires wal_level at least 
logical on the primary server
+check_pg_recvlogical_stderr($handle, "logical decoding on standby requires 
wal_level to be at least logical on the primary server");
+
+# Restore primary wal_level
+$node_primary->append_conf('postgresql.conf',q[
+wal_level = 'logical'
+]);
+$node_primary->restart;
+$node_primary->wait_for_replay_catchup($node_standby);
+
+$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr);
+# as the slot has been invalidated we should not be able to read
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication 
slot \"activeslot\"");
+
+##################################################
+# DROP DATABASE should drops it's slots, including active slots.
+##################################################
+
+# drop the logical slots
+drop_logical_slots();
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+# Create a slot on a database that would not be dropped. This slot should not
+# get dropped.
+$node_standby->create_logical_slot_on_standby($node_primary, 'otherslot', 
'postgres');
+
+# dropdb on the primary to verify slots are dropped on standby
+$node_primary->safe_psql('postgres', q[DROP DATABASE testdb]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+is($node_standby->safe_psql('postgres',
+       q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 
'f',
+       'database dropped on standby');
+
+check_slots_dropped($handle);
+
+is($node_standby->slot('otherslot')->{'slot_type'}, 'logical',
+       'otherslot on standby not dropped');
+
+# Cleanup : manually drop the slot that was not dropped.
+$node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('otherslot')]);
+
+##################################################
+# Test standby promotion and logical decoding behavior
+# after the standby gets promoted.
+##################################################
+
+$node_standby->reload;
+
+$node_primary->psql('postgres', q[CREATE DATABASE testdb]);
+$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y 
text);]);
+
+# create the logical slots
+create_logical_slots($node_standby);
+
+# create the logical slots on the cascading standby too
+create_logical_slots($node_cascading_standby);
+
+# Make slots actives
+$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr);
+my $cascading_handle = make_slot_active($node_cascading_standby, 1, 
\$cascading_stdout, \$cascading_stderr);
+
+# Insert some rows before the promotion
+$node_primary->safe_psql('testdb',
+       qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM 
generate_series(1,4) s;]
+);
+
+# Wait for both standbys to catchup
+$node_primary->wait_for_replay_catchup($node_standby);
+$node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary);
+
+# promote
+$node_standby->promote;
+
+# insert some rows on promoted standby
+$node_standby->safe_psql('testdb',
+       qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM 
generate_series(5,7) s;]
+);
+
+# Wait for the cascading standby to catchup
+$node_standby->wait_for_replay_catchup($node_cascading_standby);
+
+$expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT
+BEGIN
+table public.decoding_test: INSERT: x[integer]:5 y[text]:'5'
+table public.decoding_test: INSERT: x[integer]:6 y[text]:'6'
+table public.decoding_test: INSERT: x[integer]:7 y[text]:'7'
+COMMIT};
+
+# check that we are decoding pre and post promotion inserted rows
+$stdout_sql = $node_standby->safe_psql('testdb',
+       qq[SELECT data FROM pg_logical_slot_peek_changes('inactiveslot', NULL, 
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session on 
promoted standby');
+
+# check that we are decoding pre and post promotion inserted rows
+# with pg_recvlogical that has started before the promotion
+my $pump_timeout = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default);
+
+ok( pump_until(
+        $handle, $pump_timeout, \$stdout, qr/^.*COMMIT.*COMMIT$/s),
+    'got 2 COMMIT from pg_recvlogical output');
+
+chomp($stdout);
+is($stdout, $expected,
+    'got same expected output from pg_recvlogical decoding session');
+
+# check that we are decoding pre and post promotion inserted rows on the 
cascading standby
+$stdout_sql = $node_cascading_standby->safe_psql('testdb',
+       qq[SELECT data FROM pg_logical_slot_peek_changes('inactiveslot', NULL, 
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session on 
cascading standby');
+
+# check that we are decoding pre and post promotion inserted rows
+# with pg_recvlogical that has started before the promotion on the cascading 
standby
+ok( pump_until(
+        $cascading_handle, $pump_timeout, \$cascading_stdout, 
qr/^.*COMMIT.*COMMIT$/s),
+    'got 2 COMMIT from pg_recvlogical output');
+
+chomp($cascading_stdout);
+is($cascading_stdout, $expected,
+    'got same expected output from pg_recvlogical decoding session on 
cascading standby');
+
+done_testing();
-- 
2.34.1

From c8cc725b967a705f4626089f083d56190d5c5d44 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Mon, 3 Apr 2023 16:46:09 +0000
Subject: [PATCH v63 4/6] For cascading replication, wake up physical
 walsenders separately from logical walsenders.

Physical walsenders can't send data until it's been flushed; logical
walsenders can't decode and send data until it's been applied. On the
standby, the WAL is flushed first, which will only wake up physical
walsenders; and then applied, which will only wake up logical
walsenders.

Previously, all walsenders were awakened when the WAL was flushed. That
was fine for logical walsenders on the primary; but on the standby the
flushed WAL would have been not applied yet, so logical walsenders were
awakened too early.

Author: Bertrand Drouvot per idea from Jeff Davis and Amit Kapila.
Reviewed-By: Sawada Masahiko, Robert Haas.
---
 src/backend/access/transam/xlog.c           |  6 +--
 src/backend/access/transam/xlogarchive.c    |  2 +-
 src/backend/access/transam/xlogrecovery.c   | 30 +++++++++++---
 src/backend/replication/walreceiver.c       |  2 +-
 src/backend/replication/walsender.c         | 43 +++++++++++++++++----
 src/include/replication/walsender.h         | 22 +++++------
 src/include/replication/walsender_private.h |  3 ++
 7 files changed, 79 insertions(+), 29 deletions(-)
  35.6% src/backend/access/transam/
  47.8% src/backend/replication/
  16.5% src/include/replication/

diff --git a/src/backend/access/transam/xlog.c 
b/src/backend/access/transam/xlog.c
index 68dfb0344c..caeffc5860 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -2645,7 +2645,7 @@ XLogFlush(XLogRecPtr record)
        END_CRIT_SECTION();
 
        /* wake up walsenders now that we've released heavily contended locks */
-       WalSndWakeupProcessRequests();
+       WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
        /*
         * If we still haven't flushed to the request point then we have a
@@ -2816,7 +2816,7 @@ XLogBackgroundFlush(void)
        END_CRIT_SECTION();
 
        /* wake up walsenders now that we've released heavily contended locks */
-       WalSndWakeupProcessRequests();
+       WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
        /*
         * Great, done. To take some work off the critical path, try to 
initialize
@@ -5773,7 +5773,7 @@ StartupXLOG(void)
         * If there were cascading standby servers connected to us, nudge any 
wal
         * sender processes to notice that we've been promoted.
         */
-       WalSndWakeup();
+       WalSndWakeup(true, true);
 
        /*
         * If this was a promotion, request an (online) checkpoint now. This 
isn't
diff --git a/src/backend/access/transam/xlogarchive.c 
b/src/backend/access/transam/xlogarchive.c
index a0f5aa24b5..f3fb92c8f9 100644
--- a/src/backend/access/transam/xlogarchive.c
+++ b/src/backend/access/transam/xlogarchive.c
@@ -421,7 +421,7 @@ KeepFileRestoredFromArchive(const char *path, const char 
*xlogfname)
         * if we restored something other than a WAL segment, but it does no 
harm
         * either.
         */
-       WalSndWakeup();
+       WalSndWakeup(true, false);
 }
 
 /*
diff --git a/src/backend/access/transam/xlogrecovery.c 
b/src/backend/access/transam/xlogrecovery.c
index dbe9394762..e6427c54c5 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -1935,6 +1935,30 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord 
*record, TimeLineID *repl
        XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
        SpinLockRelease(&XLogRecoveryCtl->info_lck);
 
+       /*
+        * Wakeup walsenders:
+        *
+        * On the standby, the WAL is flushed first (which will only wake up
+        * physical walsenders) and then applied, which will only wake up 
logical
+        * walsenders.
+        *
+        * Indeed, logical walsenders on standby can't decode and send data 
until
+        * it's been applied.
+        *
+        * Physical walsenders don't need to be woken up during replay unless
+        * cascading replication is allowed and time line change occured (so 
that
+        * they can notice that they are on a new time line).
+        *
+        * That's why the wake up conditions are for:
+        *
+        *  - physical walsenders in case of new time line and cascade
+        *  replication is allowed.
+        *  - logical walsenders in case cascade replication is allowed (could 
not
+        *  be created otherwise).
+        */
+       if (AllowCascadeReplication())
+               WalSndWakeup(switchedTLI, true);
+
        /*
         * If rm_redo called XLogRequestWalReceiverReply, then we wake up the
         * receiver so that it notices the updated lastReplayedEndRecPtr and 
sends
@@ -1958,12 +1982,6 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord 
*record, TimeLineID *repl
                 */
                RemoveNonParentXlogFiles(xlogreader->EndRecPtr, *replayTLI);
 
-               /*
-                * Wake up any walsenders to notice that we are on a new 
timeline.
-                */
-               if (AllowCascadeReplication())
-                       WalSndWakeup();
-
                /* Reset the prefetcher. */
                XLogPrefetchReconfigure();
        }
diff --git a/src/backend/replication/walreceiver.c 
b/src/backend/replication/walreceiver.c
index 685af51d5d..feff709435 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
                /* Signal the startup process and walsender that new WAL has 
arrived */
                WakeupRecovery();
                if (AllowCascadeReplication())
-                       WalSndWakeup();
+                       WalSndWakeup(true, false);
 
                /* Report XLOG streaming progress in PS display */
                if (update_process_title)
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index 2d908d1de2..97990e1827 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2628,6 +2628,23 @@ InitWalSenderSlot(void)
                        walsnd->sync_standby_priority = 0;
                        walsnd->latch = &MyProc->procLatch;
                        walsnd->replyTime = 0;
+
+                       /*
+                        * The kind assignment is done here and not in 
StartReplication()
+                        * and StartLogicalReplication(). Indeed, the logical 
walsender
+                        * needs to read WAL records (like snapshot of running
+                        * transactions) during the slot creation. So it needs 
to be woken
+                        * up based on its kind.
+                        *
+                        * The kind assignment could also be done in 
StartReplication(),
+                        * StartLogicalReplication() and 
CREATE_REPLICATION_SLOT but it
+                        * seems better to set it on one place.
+                        */
+                       if (MyDatabaseId == InvalidOid)
+                               walsnd->kind = REPLICATION_KIND_PHYSICAL;
+                       else
+                               walsnd->kind = REPLICATION_KIND_LOGICAL;
+
                        SpinLockRelease(&walsnd->mutex);
                        /* don't need the lock anymore */
                        MyWalSnd = (WalSnd *) walsnd;
@@ -3310,30 +3327,42 @@ WalSndShmemInit(void)
 }
 
 /*
- * Wake up all walsenders
+ * Wake up physical, logical or both walsenders kind
+ *
+ * The distinction between physical and logical walsenders is done, because:
+ * - physical walsenders can't send data until it's been flushed
+ * - logical walsenders on standby can't decode and send data until it's been
+ * applied
+ *
+ * For cascading replication we need to wake up physical
+ * walsenders separately from logical walsenders (see the comment before 
calling
+ * WalSndWakeup() in ApplyWalRecord() for more details).
  *
  * This will be called inside critical sections, so throwing an error is not
  * advisable.
  */
 void
-WalSndWakeup(void)
+WalSndWakeup(bool physical, bool logical)
 {
        int                     i;
 
        for (i = 0; i < max_wal_senders; i++)
        {
                Latch      *latch;
+               ReplicationKind kind;
                WalSnd     *walsnd = &WalSndCtl->walsnds[i];
 
-               /*
-                * Get latch pointer with spinlock held, for the unlikely case 
that
-                * pointer reads aren't atomic (as they're 8 bytes).
-                */
+               /* get latch pointer and kind with spinlock helds */
                SpinLockAcquire(&walsnd->mutex);
                latch = walsnd->latch;
+               kind = walsnd->kind;
                SpinLockRelease(&walsnd->mutex);
 
-               if (latch != NULL)
+               if (latch == NULL)
+                       continue;
+
+               if ((physical && kind == REPLICATION_KIND_PHYSICAL) ||
+                       (logical && kind == REPLICATION_KIND_LOGICAL))
                        SetLatch(latch);
        }
 }
diff --git a/src/include/replication/walsender.h 
b/src/include/replication/walsender.h
index 52bb3e2aae..9df7e50f94 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -42,7 +42,7 @@ extern void WalSndResourceCleanup(bool isCommit);
 extern void WalSndSignals(void);
 extern Size WalSndShmemSize(void);
 extern void WalSndShmemInit(void);
-extern void WalSndWakeup(void);
+extern void WalSndWakeup(bool physical, bool logical);
 extern void WalSndInitStopping(void);
 extern void WalSndWaitStopping(void);
 extern void HandleWalSndInitStopping(void);
@@ -60,15 +60,15 @@ extern void WalSndRqstFileReload(void);
 /*
  * wakeup walsenders if there is work to be done
  */
-#define WalSndWakeupProcessRequests()          \
-       do                                                                      
        \
-       {                                                                       
        \
-               if (wake_wal_senders)                           \
-               {                                                               
        \
-                       wake_wal_senders = false;               \
-                       if (max_wal_senders > 0)                \
-                               WalSndWakeup();                         \
-               }                                                               
        \
-       } while (0)
+static inline void
+WalSndWakeupProcessRequests(bool physical, bool logical)
+{
+       if (wake_wal_senders)
+       {
+               wake_wal_senders = false;
+               if (max_wal_senders > 0)
+                       WalSndWakeup(physical, logical);
+       }
+}
 
 #endif                                                 /* _WALSENDER_H */
diff --git a/src/include/replication/walsender_private.h 
b/src/include/replication/walsender_private.h
index 5310e054c4..ff25aa70a8 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -15,6 +15,7 @@
 #include "access/xlog.h"
 #include "lib/ilist.h"
 #include "nodes/nodes.h"
+#include "nodes/replnodes.h"
 #include "replication/syncrep.h"
 #include "storage/latch.h"
 #include "storage/shmem.h"
@@ -79,6 +80,8 @@ typedef struct WalSnd
         * Timestamp of the last message received from standby.
         */
        TimestampTz replyTime;
+
+       ReplicationKind kind;
 } WalSnd;
 
 extern PGDLLIMPORT WalSnd *MyWalSnd;
-- 
2.34.1

From 3ea6d1da37e4c48eee518a0ff1e72225df6a0bed Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Mon, 3 Apr 2023 12:45:20 +0000
Subject: [PATCH v63 3/6] Allow logical decoding on standby.

Allow a logical slot to be created on standby. Restrict its usage
or its creation if wal_level on primary is less than logical.
During slot creation, it's restart_lsn is set to the last replayed
LSN. Effectively, a logical slot creation on standby waits for an
xl_running_xact record to arrive from primary.

Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes 
Mello
---
 src/backend/access/transam/xlog.c         | 11 +++++
 src/backend/replication/logical/decode.c  | 30 +++++++++++-
 src/backend/replication/logical/logical.c | 36 +++++++-------
 src/backend/replication/slot.c            | 58 ++++++++++++-----------
 src/backend/replication/walsender.c       | 48 ++++++++++++-------
 src/include/access/xlog.h                 |  1 +
 6 files changed, 123 insertions(+), 61 deletions(-)
   4.3% src/backend/access/transam/
  38.9% src/backend/replication/logical/
  55.9% src/backend/replication/

diff --git a/src/backend/access/transam/xlog.c 
b/src/backend/access/transam/xlog.c
index 713b61a9bf..68dfb0344c 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4466,6 +4466,17 @@ LocalProcessControlFile(bool reset)
        ReadControlFile();
 }
 
+/*
+ * Get the wal_level from the control file. For a standby, this value should be
+ * considered as its active wal_level, because it may be different from what
+ * was originally configured on standby.
+ */
+WalLevel
+GetActiveWalLevelOnStandby(void)
+{
+       return ControlFile->wal_level;
+}
+
 /*
  * Initialization of shared memory for XLOG
  */
diff --git a/src/backend/replication/logical/decode.c 
b/src/backend/replication/logical/decode.c
index 8fe7bb65f1..8352dbf5df 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -152,11 +152,39 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer 
*buf)
                         * can restart from there.
                         */
                        break;
+               case XLOG_PARAMETER_CHANGE:
+                       {
+                               xl_parameter_change *xlrec =
+                               (xl_parameter_change *) 
XLogRecGetData(buf->record);
+
+                               /*
+                                * If wal_level on primary is reduced to less 
than logical,
+                                * then we want to prevent existing logical 
slots from being
+                                * used. Existing logical slots on standby get 
invalidated
+                                * when this WAL record is replayed; and 
further, slot
+                                * creation fails when the wal level is not 
sufficient; but
+                                * all these operations are not synchronized, 
so a logical
+                                * slot may creep in while the wal_level is 
being reduced.
+                                * Hence this extra check.
+                                */
+                               if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
+                               {
+                                       /*
+                                        * This can occur only on a standby, as 
a primary would
+                                        * not allow to restart after changing 
wal_level < logical
+                                        * if there is pre-existing logical 
slot.
+                                        */
+                                       Assert(RecoveryInProgress());
+                                       ereport(ERROR,
+                                                       
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                                        errmsg("logical 
decoding on standby requires wal_level to be at least logical on the primary 
server")));
+                               }
+                               break;
+                       }
                case XLOG_NOOP:
                case XLOG_NEXTOID:
                case XLOG_SWITCH:
                case XLOG_BACKUP_END:
-               case XLOG_PARAMETER_CHANGE:
                case XLOG_RESTORE_POINT:
                case XLOG_FPW_CHANGE:
                case XLOG_FPI_FOR_HINT:
diff --git a/src/backend/replication/logical/logical.c 
b/src/backend/replication/logical/logical.c
index c3ec97a0a6..60a5008b6d 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -124,23 +124,21 @@ CheckLogicalDecodingRequirements(void)
                                
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                 errmsg("logical decoding requires a database 
connection")));
 
-       /* ----
-        * TODO: We got to change that someday soon...
-        *
-        * There's basically three things missing to allow this:
-        * 1) We need to be able to correctly and quickly identify the timeline 
a
-        *        LSN belongs to
-        * 2) We need to force hot_standby_feedback to be enabled at all times 
so
-        *        the primary cannot remove rows we need.
-        * 3) support dropping replication slots referring to a database, in
-        *        dbase_redo. There can't be any active ones due to HS recovery
-        *        conflicts, so that should be relatively easy.
-        * ----
-        */
        if (RecoveryInProgress())
-               ereport(ERROR,
-                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                errmsg("logical decoding cannot be used while 
in recovery")));
+       {
+               /*
+                * This check may have race conditions, but whenever
+                * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, 
we
+                * verify that there are no existing logical replication slots. 
And to
+                * avoid races around creating a new slot,
+                * CheckLogicalDecodingRequirements() is called once before 
creating
+                * the slot, and once when logical decoding is initially 
starting up.
+                */
+               if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL)
+                       ereport(ERROR,
+                                       
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                        errmsg("logical decoding on standby 
requires wal_level to be at least logical on the primary server")));
+       }
 }
 
 /*
@@ -342,6 +340,12 @@ CreateInitDecodingContext(const char *plugin,
        LogicalDecodingContext *ctx;
        MemoryContext old_context;
 
+       /*
+        * On standby, this check is also required while creating the slot. 
Check
+        * the comments in this function.
+        */
+       CheckLogicalDecodingRequirements();
+
        /* shorter lines... */
        slot = MyReplicationSlot;
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 376d453374..b9b16b9191 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -41,6 +41,7 @@
 
 #include "access/transam.h"
 #include "access/xlog_internal.h"
+#include "access/xlogrecovery.h"
 #include "common/file_utils.h"
 #include "common/string.h"
 #include "miscadmin.h"
@@ -1183,37 +1184,28 @@ ReplicationSlotReserveWal(void)
                /*
                 * For logical slots log a standby snapshot and start logical 
decoding
                 * at exactly that position. That allows the slot to start up 
more
-                * quickly.
+                * quickly. But on a standby we cannot do WAL writes, so just 
use the
+                * replay pointer; effectively, an attempt to create a logical 
slot on
+                * standby will cause it to wait for an xl_running_xact record 
to be
+                * logged independently on the primary, so that a snapshot can 
be
+                * built using the record.
                 *
-                * That's not needed (or indeed helpful) for physical slots as 
they'll
-                * start replay at the last logged checkpoint anyway. Instead 
return
-                * the location of the last redo LSN. While that slightly 
increases
-                * the chance that we have to retry, it's where a base backup 
has to
-                * start replay at.
+                * None of this is needed (or indeed helpful) for physical 
slots as
+                * they'll start replay at the last logged checkpoint anyway. 
Instead
+                * return the location of the last redo LSN. While that slightly
+                * increases the chance that we have to retry, it's where a base
+                * backup has to start replay at.
                 */
-               if (!RecoveryInProgress() && SlotIsLogical(slot))
-               {
-                       XLogRecPtr      flushptr;
-
-                       /* start at current insert position */
+               if (SlotIsPhysical(slot))
+                       restart_lsn = GetRedoRecPtr();
+               else if (RecoveryInProgress())
+                       restart_lsn = GetXLogReplayRecPtr(NULL);
+               else
                        restart_lsn = GetXLogInsertRecPtr();
-                       SpinLockAcquire(&slot->mutex);
-                       slot->data.restart_lsn = restart_lsn;
-                       SpinLockRelease(&slot->mutex);
-
-                       /* make sure we have enough information to start */
-                       flushptr = LogStandbySnapshot();
 
-                       /* and make sure it's fsynced to disk */
-                       XLogFlush(flushptr);
-               }
-               else
-               {
-                       restart_lsn = GetRedoRecPtr();
-                       SpinLockAcquire(&slot->mutex);
-                       slot->data.restart_lsn = restart_lsn;
-                       SpinLockRelease(&slot->mutex);
-               }
+               SpinLockAcquire(&slot->mutex);
+               slot->data.restart_lsn = restart_lsn;
+               SpinLockRelease(&slot->mutex);
 
                /* prevent WAL removal as fast as possible */
                ReplicationSlotsComputeRequiredLSN();
@@ -1229,8 +1221,18 @@ ReplicationSlotReserveWal(void)
                if (XLogGetLastRemovedSegno() < segno)
                        break;
        }
-}
 
+       if (!RecoveryInProgress() && SlotIsLogical(slot))
+       {
+               XLogRecPtr      flushptr;
+
+               /* make sure we have enough information to start */
+               flushptr = LogStandbySnapshot();
+
+               /* and make sure it's fsynced to disk */
+               XLogFlush(flushptr);
+       }
+}
 
 /*
  * Report terminating or conflicting message.
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index b686691ca2..2d908d1de2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -906,23 +906,34 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr 
targetPagePtr, int req
        int                     count;
        WALReadError errinfo;
        XLogSegNo       segno;
-       TimeLineID      currTLI = GetWALInsertionTimeLine();
+       TimeLineID      currTLI;
+
+       /*
+        * Make sure we have enough WAL available before retrieving the current
+        * timeline. This is needed to determine am_cascading_walsender 
accurately
+        * which is needed to determine the current timeline.
+        */
+       flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
 
        /*
-        * Since logical decoding is only permitted on a primary server, we know
-        * that the current timeline ID can't be changing any more. If we did 
this
-        * on a standby, we'd have to worry about the values we compute here
-        * becoming invalid due to a promotion or timeline change.
+        * Since logical decoding is also permitted on a standby server, we need
+        * to check if the server is in recovery to decide how to get the 
current
+        * timeline ID (so that it also cover the promotion or timeline change
+        * cases).
         */
+       am_cascading_walsender = RecoveryInProgress();
+
+       if (am_cascading_walsender)
+               GetXLogReplayRecPtr(&currTLI);
+       else
+               currTLI = GetWALInsertionTimeLine();
+
        XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
        sendTimeLineIsHistoric = (state->currTLI != currTLI);
        sendTimeLine = state->currTLI;
        sendTimeLineValidUpto = state->currTLIValidUntil;
        sendTimeLineNextTLI = state->nextTLI;
 
-       /* make sure we have enough WAL available */
-       flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
-
        /* fail if not (implies we are going to shut down) */
        if (flushptr < targetPagePtr + reqLen)
                return -1;
@@ -937,9 +948,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr 
targetPagePtr, int req
                                 cur_page,
                                 targetPagePtr,
                                 XLOG_BLCKSZ,
-                                state->seg.ws_tli, /* Pass the current TLI 
because only
-                                                                        * 
WalSndSegmentOpen controls whether new
-                                                                        * TLI 
is needed. */
+                                currTLI,               /* Pass the current TLI 
because only
+                                                                * 
WalSndSegmentOpen controls whether new TLI
+                                                                * is needed. */
                                 &errinfo))
                WALReadRaiseError(&errinfo);
 
@@ -3073,10 +3084,14 @@ XLogSendLogical(void)
         * If first time through in this session, initialize flushPtr.  
Otherwise,
         * we only need to update flushPtr if EndRecPtr is past it.
         */
-       if (flushPtr == InvalidXLogRecPtr)
-               flushPtr = GetFlushRecPtr(NULL);
-       else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
-               flushPtr = GetFlushRecPtr(NULL);
+       if (flushPtr == InvalidXLogRecPtr ||
+               logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
+       {
+               if (am_cascading_walsender)
+                       flushPtr = GetStandbyFlushRecPtr(NULL);
+               else
+                       flushPtr = GetFlushRecPtr(NULL);
+       }
 
        /* If EndRecPtr is still past our flushPtr, it means we caught up. */
        if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
@@ -3167,7 +3182,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli)
        receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
        replayPtr = GetXLogReplayRecPtr(&replayTLI);
 
-       *tli = replayTLI;
+       if (tli)
+               *tli = replayTLI;
 
        result = replayPtr;
        if (receiveTLI == replayTLI && receivePtr > replayPtr)
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index cfe5409738..48ca852381 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -230,6 +230,7 @@ extern void XLOGShmemInit(void);
 extern void BootStrapXLOG(void);
 extern void InitializeWalConsistencyChecking(void);
 extern void LocalProcessControlFile(bool reset);
+extern WalLevel GetActiveWalLevelOnStandby(void);
 extern void StartupXLOG(void);
 extern void ShutdownXLOG(int code, Datum arg);
 extern void CreateCheckPoint(int flags);
-- 
2.34.1

From 79e89eebbe7d69bb5c18831e6d05b3ce391eec42 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Mon, 3 Apr 2023 11:28:30 +0000
Subject: [PATCH v63 2/6] Arrange for a new pg_stat_database_conflicts and
 pg_replication_slots field

As we handled logical slot conflicts on standby on the previous commit, we
can expose the conflict in pg_stat_database_conflicts and pg_replication_slots.

Adding:

- confl_active_logicalslot in pg_stat_database_conflicts
- conflicting in pg_replication_slots

to do so.
---
 doc/src/sgml/monitoring.sgml                 | 11 +++++++++++
 doc/src/sgml/system-views.sgml               | 10 ++++++++++
 src/backend/catalog/system_views.sql         |  6 ++++--
 src/backend/replication/slotfuncs.c          | 12 +++++++++++-
 src/backend/utils/activity/pgstat_database.c |  4 ++++
 src/backend/utils/adt/pgstatfuncs.c          |  3 +++
 src/include/catalog/pg_proc.dat              | 11 ++++++++---
 src/include/pgstat.h                         |  1 +
 src/test/regress/expected/rules.out          |  8 +++++---
 9 files changed, 57 insertions(+), 9 deletions(-)
  33.7% doc/src/sgml/
   8.1% src/backend/catalog/
  13.1% src/backend/replication/
   5.9% src/backend/utils/activity/
   5.6% src/backend/utils/adt/
  24.6% src/include/catalog/
   6.9% src/test/regress/expected/

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index bce9ae4661..fa3b0f810c 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -4674,6 +4674,17 @@ SELECT pid, wait_event_type, wait_event FROM 
pg_stat_activity WHERE wait_event i
        deadlocks
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_active_logicalslot</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of active logical slots in this database that have been
+       invalidated because they conflict with recovery (note that inactive ones
+       are also invalidated but do not increment this counter)
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index bb1a418450..57b228076e 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2517,6 +2517,16 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
        false for physical slots.
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>conflicting</structfield> <type>bool</type>
+      </para>
+      <para>
+       True if this logical slot conflicted with recovery (and so is now
+       invalidated). Always NULL for physical slots.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/catalog/system_views.sql 
b/src/backend/catalog/system_views.sql
index 6b098234f8..c25067d06d 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -999,7 +999,8 @@ CREATE VIEW pg_replication_slots AS
             L.confirmed_flush_lsn,
             L.wal_status,
             L.safe_wal_size,
-            L.two_phase
+            L.two_phase,
+            L.conflicting
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
@@ -1067,7 +1068,8 @@ CREATE VIEW pg_stat_database_conflicts AS
             pg_stat_get_db_conflict_lock(D.oid) AS confl_lock,
             pg_stat_get_db_conflict_snapshot(D.oid) AS confl_snapshot,
             pg_stat_get_db_conflict_bufferpin(D.oid) AS confl_bufferpin,
-            pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock
+            pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock,
+            pg_stat_get_db_conflict_logicalslot(D.oid) AS 
confl_active_logicalslot
     FROM pg_database D;
 
 CREATE VIEW pg_stat_user_functions AS
diff --git a/src/backend/replication/slotfuncs.c 
b/src/backend/replication/slotfuncs.c
index 015d276fd9..6473c73eca 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -232,7 +232,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 14
+#define PG_GET_REPLICATION_SLOTS_COLS 15
        ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
        XLogRecPtr      currlsn;
        int                     slotno;
@@ -403,6 +403,16 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
                values[i++] = BoolGetDatum(slot_contents.data.two_phase);
 
+               if (slot_contents.data.database == InvalidOid)
+                       nulls[i++] = true;
+               else
+               {
+                       if (LogicalReplicationSlotIsInvalid(slot))
+                               values[i++] = BoolGetDatum(true);
+                       else
+                               values[i++] = BoolGetDatum(false);
+               }
+
                Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
 
                tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
diff --git a/src/backend/utils/activity/pgstat_database.c 
b/src/backend/utils/activity/pgstat_database.c
index 6e650ceaad..7149f22f72 100644
--- a/src/backend/utils/activity/pgstat_database.c
+++ b/src/backend/utils/activity/pgstat_database.c
@@ -109,6 +109,9 @@ pgstat_report_recovery_conflict(int reason)
                case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN:
                        dbentry->conflict_bufferpin++;
                        break;
+               case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+                       dbentry->conflict_logicalslot++;
+                       break;
                case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
                        dbentry->conflict_startup_deadlock++;
                        break;
@@ -387,6 +390,7 @@ pgstat_database_flush_cb(PgStat_EntryRef *entry_ref, bool 
nowait)
        PGSTAT_ACCUM_DBCOUNT(conflict_tablespace);
        PGSTAT_ACCUM_DBCOUNT(conflict_lock);
        PGSTAT_ACCUM_DBCOUNT(conflict_snapshot);
+       PGSTAT_ACCUM_DBCOUNT(conflict_logicalslot);
        PGSTAT_ACCUM_DBCOUNT(conflict_bufferpin);
        PGSTAT_ACCUM_DBCOUNT(conflict_startup_deadlock);
 
diff --git a/src/backend/utils/adt/pgstatfuncs.c 
b/src/backend/utils/adt/pgstatfuncs.c
index eec9f3cf9b..4de60d8aa1 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1066,6 +1066,8 @@ PG_STAT_GET_DBENTRY_INT64(xact_commit)
 /* pg_stat_get_db_xact_rollback */
 PG_STAT_GET_DBENTRY_INT64(xact_rollback)
 
+/* pg_stat_get_db_conflict_logicalslot */
+PG_STAT_GET_DBENTRY_INT64(conflict_logicalslot)
 
 Datum
 pg_stat_get_db_stat_reset_time(PG_FUNCTION_ARGS)
@@ -1099,6 +1101,7 @@ pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS)
                result = (int64) (dbentry->conflict_tablespace +
                                                  dbentry->conflict_lock +
                                                  dbentry->conflict_snapshot +
+                                                 dbentry->conflict_logicalslot 
+
                                                  dbentry->conflict_bufferpin +
                                                  
dbentry->conflict_startup_deadlock);
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index f9f2642201..bcbae9036d 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5605,6 +5605,11 @@
   proname => 'pg_stat_get_db_conflict_snapshot', provolatile => 's',
   proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
   prosrc => 'pg_stat_get_db_conflict_snapshot' },
+{ oid => '9901',
+  descr => 'statistics: recovery conflicts in database caused by logical 
replication slot',
+  proname => 'pg_stat_get_db_conflict_logicalslot', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
+  prosrc => 'pg_stat_get_db_conflict_logicalslot' },
 { oid => '3068',
   descr => 'statistics: recovery conflicts in database caused by shared buffer 
pin',
   proname => 'pg_stat_get_db_conflict_bufferpin', provolatile => 's',
@@ -11071,9 +11076,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => 
'{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => 
'{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase}',
+  proallargtypes => 
'{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => 
'{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index e79b8a34eb..5e8b04d21b 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -331,6 +331,7 @@ typedef struct PgStat_StatDBEntry
        PgStat_Counter conflict_tablespace;
        PgStat_Counter conflict_lock;
        PgStat_Counter conflict_snapshot;
+       PgStat_Counter conflict_logicalslot;
        PgStat_Counter conflict_bufferpin;
        PgStat_Counter conflict_startup_deadlock;
        PgStat_Counter temp_files;
diff --git a/src/test/regress/expected/rules.out 
b/src/test/regress/expected/rules.out
index ab1aebfde4..06d3f1f5d3 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1472,8 +1472,9 @@ pg_replication_slots| SELECT l.slot_name,
     l.confirmed_flush_lsn,
     l.wal_status,
     l.safe_wal_size,
-    l.two_phase
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, 
temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, 
confirmed_flush_lsn, wal_status, safe_wal_size, two_phase)
+    l.two_phase,
+    l.conflicting
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, 
temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, 
confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
@@ -1869,7 +1870,8 @@ pg_stat_database_conflicts| SELECT oid AS datid,
     pg_stat_get_db_conflict_lock(oid) AS confl_lock,
     pg_stat_get_db_conflict_snapshot(oid) AS confl_snapshot,
     pg_stat_get_db_conflict_bufferpin(oid) AS confl_bufferpin,
-    pg_stat_get_db_conflict_startup_deadlock(oid) AS confl_deadlock
+    pg_stat_get_db_conflict_startup_deadlock(oid) AS confl_deadlock,
+    pg_stat_get_db_conflict_logicalslot(oid) AS confl_active_logicalslot
    FROM pg_database d;
 pg_stat_gssapi| SELECT pid,
     gss_auth AS gss_authenticated,
-- 
2.34.1

From 67ce45537f9766da93de828a40bb46dee75dd846 Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 7 Feb 2023 08:57:56 +0000
Subject: [PATCH v63 1/6] Handle logical slot conflicts on standby.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

During WAL replay on standby, when slot conflict is identified,
invalidate such slots. Also do the same thing if wal_level on the primary server
is reduced to below logical and there are existing logical slots
on standby. Introduce a new ProcSignalReason value for slot
conflict recovery.

Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes 
Mello,
Bharath Rupireddy, Amit Kapila, Álvaro Herrera
---
 src/backend/access/gist/gistxlog.c            |   2 +
 src/backend/access/hash/hash_xlog.c           |   1 +
 src/backend/access/heap/heapam.c              |   3 +
 src/backend/access/nbtree/nbtxlog.c           |   2 +
 src/backend/access/spgist/spgxlog.c           |   1 +
 src/backend/access/transam/xlog.c             |  20 ++-
 .../replication/logical/logicalfuncs.c        |  13 +-
 src/backend/replication/slot.c                | 170 +++++++++++++-----
 src/backend/replication/slotfuncs.c           |   3 +-
 src/backend/replication/walsender.c           |   7 +
 src/backend/storage/ipc/procsignal.c          |   3 +
 src/backend/storage/ipc/standby.c             |  14 +-
 src/backend/tcop/postgres.c                   |  18 ++
 src/include/replication/slot.h                |  61 ++++++-
 src/include/storage/procsignal.h              |   1 +
 src/include/storage/standby.h                 |   2 +
 16 files changed, 270 insertions(+), 51 deletions(-)
   7.5% src/backend/access/transam/
   5.6% src/backend/replication/logical/
  58.7% src/backend/replication/
   5.2% src/backend/storage/ipc/
   4.8% src/backend/tcop/
  14.2% src/include/replication/
   3.5% src/

diff --git a/src/backend/access/gist/gistxlog.c 
b/src/backend/access/gist/gistxlog.c
index b7678f3c14..9a86fb3fef 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -197,6 +197,7 @@ gistRedoDeleteRecord(XLogReaderState *record)
                XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
 
                
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+                                                                               
        xldata->isCatalogRel,
                                                                                
        rlocator);
        }
 
@@ -390,6 +391,7 @@ gistRedoPageReuse(XLogReaderState *record)
         */
        if (InHotStandby)
                
ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
+                                                                               
                   xlrec->isCatalogRel,
                                                                                
                   xlrec->locator);
 }
 
diff --git a/src/backend/access/hash/hash_xlog.c 
b/src/backend/access/hash/hash_xlog.c
index f2dd9be8d3..e8e06c62a9 100644
--- a/src/backend/access/hash/hash_xlog.c
+++ b/src/backend/access/hash/hash_xlog.c
@@ -1003,6 +1003,7 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
 
                XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
                
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+                                                                               
        xldata->isCatalogRel,
                                                                                
        rlocator);
        }
 
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 8b13e3f892..f389ceee1e 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8769,6 +8769,7 @@ heap_xlog_prune(XLogReaderState *record)
         */
        if (InHotStandby)
                
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+                                                                               
        xlrec->isCatalogRel,
                                                                                
        rlocator);
 
        /*
@@ -8940,6 +8941,7 @@ heap_xlog_visible(XLogReaderState *record)
         */
        if (InHotStandby)
                
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+                                                                               
        xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL,
                                                                                
        rlocator);
 
        /*
@@ -9061,6 +9063,7 @@ heap_xlog_freeze_page(XLogReaderState *record)
 
                XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
                
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+                                                                               
        xlrec->isCatalogRel,
                                                                                
        rlocator);
        }
 
diff --git a/src/backend/access/nbtree/nbtxlog.c 
b/src/backend/access/nbtree/nbtxlog.c
index 414ca4f6de..c87e46ed66 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -669,6 +669,7 @@ btree_xlog_delete(XLogReaderState *record)
                XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
 
                
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+                                                                               
        xlrec->isCatalogRel,
                                                                                
        rlocator);
        }
 
@@ -1007,6 +1008,7 @@ btree_xlog_reuse_page(XLogReaderState *record)
 
        if (InHotStandby)
                
ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
+                                                                               
                   xlrec->isCatalogRel,
                                                                                
                   xlrec->locator);
 }
 
diff --git a/src/backend/access/spgist/spgxlog.c 
b/src/backend/access/spgist/spgxlog.c
index b071b59c8a..459ac929ba 100644
--- a/src/backend/access/spgist/spgxlog.c
+++ b/src/backend/access/spgist/spgxlog.c
@@ -879,6 +879,7 @@ spgRedoVacuumRedirect(XLogReaderState *record)
 
                XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
                
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+                                                                               
        xldata->isCatalogRel,
                                                                                
        locator);
        }
 
diff --git a/src/backend/access/transam/xlog.c 
b/src/backend/access/transam/xlog.c
index 46821ad605..713b61a9bf 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6806,7 +6806,8 @@ CreateCheckPoint(int flags)
         */
        XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
        KeepLogSeg(recptr, &_logSegNo);
-       if (InvalidateObsoleteReplicationSlots(_logSegNo))
+       if (InvalidateObsoleteReplicationSlots(_logSegNo, InvalidOid,
+                                                                               
   InvalidTransactionId, false))
        {
                /*
                 * Some slots have been invalidated; recalculate the old-segment
@@ -7250,7 +7251,8 @@ CreateRestartPoint(int flags)
        replayPtr = GetXLogReplayRecPtr(&replayTLI);
        endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
        KeepLogSeg(endptr, &_logSegNo);
-       if (InvalidateObsoleteReplicationSlots(_logSegNo))
+       if (InvalidateObsoleteReplicationSlots(_logSegNo, InvalidOid,
+                                                                               
   InvalidTransactionId, false))
        {
                /*
                 * Some slots have been invalidated; recalculate the old-segment
@@ -7963,6 +7965,20 @@ xlog_redo(XLogReaderState *record)
                /* Update our copy of the parameters in pg_control */
                memcpy(&xlrec, XLogRecGetData(record), 
sizeof(xl_parameter_change));
 
+               /*
+                * Invalidate logical slots if we are in hot standby and the 
primary
+                * does not have a WAL level sufficient for logical decoding. 
No need
+                * to search for potentially conflicting logically slots if 
standby is
+                * running with wal_level lower than logical, because in that 
case, we
+                * would have either disallowed creation of logical slots or
+                * invalidated existing ones.
+                */
+               if (InRecovery && InHotStandby &&
+                       xlrec.wal_level < WAL_LEVEL_LOGICAL &&
+                       wal_level >= WAL_LEVEL_LOGICAL)
+                       InvalidateObsoleteReplicationSlots(0, InvalidOid,
+                                                                               
           InvalidTransactionId, true);
+
                LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
                ControlFile->MaxConnections = xlrec.MaxConnections;
                ControlFile->max_worker_processes = xlrec.max_worker_processes;
diff --git a/src/backend/replication/logical/logicalfuncs.c 
b/src/backend/replication/logical/logicalfuncs.c
index fa1b641a2b..575a047e53 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -216,9 +216,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, 
bool confirm, bool bin
 
                /*
                 * After the sanity checks in CreateDecodingContext, make sure 
the
-                * restart_lsn is valid.  Avoid "cannot get changes" wording in 
this
-                * errmsg because that'd be confusingly ambiguous about no 
changes
-                * being available.
+                * restart_lsn is valid or both effective_xmin and catalog_xmin 
are
+                * valid. Avoid "cannot get changes" wording in this errmsg 
because
+                * that'd be confusingly ambiguous about no changes being 
available.
                 */
                if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
                        ereport(ERROR,
@@ -227,6 +227,13 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, 
bool confirm, bool bin
                                                        NameStr(*name)),
                                         errdetail("This slot has never 
previously reserved WAL, or it has been invalidated.")));
 
+               if (LogicalReplicationSlotIsInvalid(MyReplicationSlot))
+                       ereport(ERROR,
+                                       
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                        errmsg("cannot read from logical 
replication slot \"%s\"",
+                                                       NameStr(*name)),
+                                        errdetail("This slot has been 
invalidated because it was conflicting with recovery.")));
+
                MemoryContextSwitchTo(oldcontext);
 
                /*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 2293c0c6fc..376d453374 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -110,6 +110,13 @@ static void RestoreSlotFromDisk(const char *name);
 static void CreateSlotOnDisk(ReplicationSlot *slot);
 static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
 
+/* to report termination/invalidation */
+static void ReportTerminationInvalidation(bool terminating, bool check_on_xid,
+                                                                               
  int pid, NameData slotname,
+                                                                               
  TransactionId xid,
+                                                                               
  XLogRecPtr restart_lsn,
+                                                                               
  XLogRecPtr oldestLSN);
+
 /*
  * Report shared-memory space needed by ReplicationSlotsShmemInit.
  */
@@ -855,8 +862,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
                SpinLockAcquire(&s->mutex);
                effective_xmin = s->effective_xmin;
                effective_catalog_xmin = s->effective_catalog_xmin;
-               invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
-                                          
XLogRecPtrIsInvalid(s->data.restart_lsn));
+               invalidated = ObsoleteSlotIsInvalid(s, true) || 
LogicalReplicationSlotIsInvalid(s);
                SpinLockRelease(&s->mutex);
 
                /* invalidated slots need not apply */
@@ -1225,28 +1231,85 @@ ReplicationSlotReserveWal(void)
        }
 }
 
+
+/*
+ * Report terminating or conflicting message.
+ *
+ * For both, logical conflict on standby and obsolete slot are handled.
+ */
+static void
+ReportTerminationInvalidation(bool terminating, bool check_on_xid, int pid,
+                                                         NameData slotname, 
TransactionId xid,
+                                                         XLogRecPtr 
restart_lsn, XLogRecPtr oldestLSN)
+{
+       StringInfoData err_detail;
+       bool            hint = false;
+
+       initStringInfo(&err_detail);
+
+       if (check_on_xid)
+       {
+               if (TransactionIdIsValid(xid))
+                       appendStringInfo(&err_detail, _("The slot conflicted 
with xid horizon %u."), xid);
+               else
+                       appendStringInfo(&err_detail, _("Logical decoding on 
standby requires wal_level to be at least logical on the primary server"));
+       }
+       else
+       {
+               appendStringInfo(&err_detail, _("The slot's restart_lsn %X/%X 
exceeds the limit by %llu bytes."),
+                                                LSN_FORMAT_ARGS(restart_lsn),
+                                                (unsigned long long) 
(oldestLSN - restart_lsn));
+
+               hint = true;
+       }
+
+       ereport(LOG,
+                       terminating ? errmsg("terminating process %d to release 
replication slot \"%s\"", pid, NameStr(slotname)) :
+                       errmsg("invalidating obsolete replication slot \"%s\"", 
NameStr(slotname)),
+                       errdetail_internal("%s", err_detail.data),
+                       hint ? errhint("You might need to increase 
max_slot_wal_keep_size.") : 0);
+
+       pfree(err_detail.data);
+}
+
 /*
- * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
- * and mark it invalid, if necessary and possible.
+ * Helper for InvalidateObsoleteReplicationSlots
+ *
+ * Acquires the given slot and mark it invalid, if necessary and possible.
  *
  * Returns whether ReplicationSlotControlLock was released in the interim (and
  * in that case we're not holding the lock at return, otherwise we are).
  *
- * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
+ * Sets *invalidated true if an obsolete slot was invalidated. (Untouched 
otherwise.)
  *
  * This is inherently racy, because we release the LWLock
  * for syscalls, so caller must restart if we return true.
  */
 static bool
 InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
-                                                          bool *invalidated)
+                                                          Oid dboid, bool 
*invalidated, TransactionId xid,
+                                                          bool check_on_xid)
 {
        int                     last_signaled_pid = 0;
        bool            released_lock = false;
 
+       if (check_on_xid)
+       {
+               /* we are only dealing with *logical* slot conflicts */
+               if (!SlotIsLogical(s))
+                       return false;
+
+               /*
+                * not the database of interest and we don't want all the
+                * database, skip
+                */
+               if (s->data.database != dboid && TransactionIdIsValid(xid))
+                       return false;
+       }
        for (;;)
        {
                XLogRecPtr      restart_lsn;
+
                NameData        slotname;
                int                     active_pid = 0;
 
@@ -1263,19 +1326,20 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
XLogRecPtr oldestLSN,
                 * Check if the slot needs to be invalidated. If it needs to be
                 * invalidated, and is not currently acquired, acquire it and 
mark it
                 * as having been invalidated.  We do this with the spinlock 
held to
-                * avoid race conditions -- for example the restart_lsn could 
move
-                * forward, or the slot could be dropped.
+                * avoid race conditions -- for example the restart_lsn (or the
+                * xmin(s) could) move forward or the slot could be dropped.
                 */
                SpinLockAcquire(&s->mutex);
 
                restart_lsn = s->data.restart_lsn;
 
                /*
-                * If the slot is already invalid or is fresh enough, we don't 
need to
-                * do anything.
+                * If the slot is already invalid or is a non conflicting slot, 
we
+                * don't need to do anything.
                 */
-               if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= 
oldestLSN)
+               if (DoNotInvalidateSlot(s, xid, oldestLSN, check_on_xid))
                {
+                       /* then, we are not forcing for invalidation */
                        SpinLockRelease(&s->mutex);
                        if (released_lock)
                                LWLockRelease(ReplicationSlotControlLock);
@@ -1294,9 +1358,16 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
XLogRecPtr oldestLSN,
                {
                        MyReplicationSlot = s;
                        s->active_pid = MyProcPid;
-                       s->data.invalidated_at = restart_lsn;
-                       s->data.restart_lsn = InvalidXLogRecPtr;
-
+                       if (check_on_xid)
+                       {
+                               s->effective_xmin = InvalidTransactionId;
+                               s->data.catalog_xmin = InvalidTransactionId;
+                       }
+                       else
+                       {
+                               s->data.invalidated_at = restart_lsn;
+                               s->data.restart_lsn = InvalidXLogRecPtr;
+                       }
                        /* Let caller know */
                        *invalidated = true;
                }
@@ -1329,15 +1400,15 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
XLogRecPtr oldestLSN,
                         */
                        if (last_signaled_pid != active_pid)
                        {
-                               ereport(LOG,
-                                               errmsg("terminating process %d 
to release replication slot \"%s\"",
-                                                          active_pid, 
NameStr(slotname)),
-                                               errdetail("The slot's 
restart_lsn %X/%X exceeds the limit by %llu bytes.",
-                                                                 
LSN_FORMAT_ARGS(restart_lsn),
-                                                                 (unsigned 
long long) (oldestLSN - restart_lsn)),
-                                               errhint("You might need to 
increase max_slot_wal_keep_size."));
-
-                               (void) kill(active_pid, SIGTERM);
+                               ReportTerminationInvalidation(true, 
check_on_xid, active_pid,
+                                                                               
          slotname, xid, restart_lsn,
+                                                                               
          oldestLSN);
+
+                               if (check_on_xid)
+                                       (void) SendProcSignal(active_pid, 
PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, InvalidBackendId);
+                               else
+                                       (void) kill(active_pid, SIGTERM);
+
                                last_signaled_pid = active_pid;
                        }
 
@@ -1370,14 +1441,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
XLogRecPtr oldestLSN,
                        ReplicationSlotMarkDirty();
                        ReplicationSlotSave();
                        ReplicationSlotRelease();
+                       pgstat_drop_replslot(s);
 
-                       ereport(LOG,
-                                       errmsg("invalidating obsolete 
replication slot \"%s\"",
-                                                  NameStr(slotname)),
-                                       errdetail("The slot's restart_lsn %X/%X 
exceeds the limit by %llu bytes.",
-                                                         
LSN_FORMAT_ARGS(restart_lsn),
-                                                         (unsigned long long) 
(oldestLSN - restart_lsn)),
-                                       errhint("You might need to increase 
max_slot_wal_keep_size."));
+                       ReportTerminationInvalidation(false, check_on_xid, 
active_pid,
+                                                                               
  slotname, xid, restart_lsn,
+                                                                               
  oldestLSN);
 
                        /* done with this slot for now */
                        break;
@@ -1390,20 +1458,39 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
XLogRecPtr oldestLSN,
 }
 
 /*
- * Mark any slot that points to an LSN older than the given segment
- * as invalid; it requires WAL that's about to be removed.
+ * Invalidate Obsolete slots.
+ *
+ * WAL case (aka check_on_xid is false):
+ *
+ *      Mark any slot that points to an LSN older than the given segment
+ *      as invalid; it requires WAL that's about to be removed.
+ *      invalidated is set to true when any slot have got invalidated.
  *
- * Returns true when any slot have got invalidated.
+ * Xid case (aka check_on_xid is true):
+ *
+ *      When xid is valid, it means that we are about to remove rows older 
than xid.
+ *      Therefore we need to invalidate slots that depend on seeing those rows.
+ *      When xid is invalid, invalidate all logical slots. This is required 
when the
+ *      master wal_level is set back to replica, so existing logical slots 
need to
+ *      be invalidated. Note that WaitExceedsMaxStandbyDelay() is not taken 
into
+ *      account here (as opposed to ResolveRecoveryConflictWithVirtualXIDs()): 
XXXX
  *
- * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
 bool
-InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
+InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno, Oid dboid,
+                                                                  
TransactionId xid, bool check_on_xid)
 {
-       XLogRecPtr      oldestLSN;
+
+       XLogRecPtr      oldestLSN = InvalidXLogRecPtr;
        bool            invalidated = false;
 
-       XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
+       Assert(max_replication_slots >= 0);
+
+       if (max_replication_slots == 0)
+               return invalidated;
+
+       if (!check_on_xid)
+               XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, 
oldestLSN);
 
 restart:
        LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
@@ -1414,16 +1501,15 @@ restart:
                if (!s->in_use)
                        continue;
 
-               if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
-               {
-                       /* if the lock was released, start from scratch */
+               if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, dboid, 
&invalidated, xid, check_on_xid))
                        goto restart;
-               }
        }
+
        LWLockRelease(ReplicationSlotControlLock);
 
        /*
-        * If any slots have been invalidated, recalculate the resource limits.
+        * If any slots have been invalidated, recalculate the required xmin and
+        * the required lsn (if appropriate).
         */
        if (invalidated)
        {
diff --git a/src/backend/replication/slotfuncs.c 
b/src/backend/replication/slotfuncs.c
index 2f3c964824..015d276fd9 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -319,8 +319,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
                 * certain that the slot has been invalidated.  Otherwise, test
                 * availability from restart_lsn.
                 */
-               if (XLogRecPtrIsInvalid(slot_contents.data.restart_lsn) &&
-                       !XLogRecPtrIsInvalid(slot_contents.data.invalidated_at))
+               if (ObsoleteSlotIsInvalid(slot, true))
                        walstate = WALAVAIL_REMOVED;
                else
                        walstate = 
GetWALAvailability(slot_contents.data.restart_lsn);
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index 75e8363e24..b686691ca2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1253,6 +1253,13 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
        ReplicationSlotAcquire(cmd->slotname, true);
 
+       if (LogicalReplicationSlotIsInvalid(MyReplicationSlot))
+               ereport(ERROR,
+                               
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("cannot read from logical replication 
slot \"%s\"",
+                                               cmd->slotname),
+                                errdetail("This slot has been invalidated 
because it was conflicting with recovery.")));
+
        if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
                ereport(ERROR,
                                
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
diff --git a/src/backend/storage/ipc/procsignal.c 
b/src/backend/storage/ipc/procsignal.c
index 395b2cf690..c85cb5cc18 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -673,6 +673,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
        if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT))
                RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
 
+       if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT))
+               
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT);
+
        if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK))
                
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
 
diff --git a/src/backend/storage/ipc/standby.c 
b/src/backend/storage/ipc/standby.c
index 9f56b4e95c..a23220cae7 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -24,6 +24,7 @@
 #include "access/xlogutils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/slot.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
@@ -466,6 +467,7 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId 
*waitlist,
  */
 void
 ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
+                                                                       bool 
isCatalogRel,
                                                                        
RelFileLocator locator)
 {
        VirtualTransactionId *backends;
@@ -491,6 +493,10 @@ ResolveRecoveryConflictWithSnapshot(TransactionId 
snapshotConflictHorizon,
                                                                                
   PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
                                                                                
   WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
                                                                                
   true);
+
+       if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel)
+               InvalidateObsoleteReplicationSlots(0, locator.dbOid,
+                                                                               
   snapshotConflictHorizon, true);
 }
 
 /*
@@ -499,6 +505,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId 
snapshotConflictHorizon,
  */
 void
 ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId 
snapshotConflictHorizon,
+                                                                               
   bool isCatalogRel,
                                                                                
   RelFileLocator locator)
 {
        /*
@@ -517,7 +524,9 @@ 
ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHor
                TransactionId truncated;
 
                truncated = XidFromFullTransactionId(snapshotConflictHorizon);
-               ResolveRecoveryConflictWithSnapshot(truncated, locator);
+               ResolveRecoveryConflictWithSnapshot(truncated,
+                                                                               
        isCatalogRel,
+                                                                               
        locator);
        }
 }
 
@@ -1478,6 +1487,9 @@ get_recovery_conflict_desc(ProcSignalReason reason)
                case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
                        reasonDesc = _("recovery conflict on snapshot");
                        break;
+               case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+                       reasonDesc = _("recovery conflict on replication slot");
+                       break;
                case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
                        reasonDesc = _("recovery conflict on buffer deadlock");
                        break;
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index a10ecbaf50..4ec64b0a4a 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -2526,6 +2526,9 @@ errdetail_recovery_conflict(void)
                case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
                        errdetail("User query might have needed to see row 
versions that must be removed.");
                        break;
+               case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+                       errdetail("User was using the logical slot that must be 
dropped.");
+                       break;
                case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
                        errdetail("User transaction caused buffer deadlock with 
recovery.");
                        break;
@@ -3143,6 +3146,21 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
                                InterruptPending = true;
                                break;
 
+                       case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+                               /* This signal is only used for logical sloti, 
sanity check */
+                               Assert(MyReplicationSlot && 
SlotIsLogical(MyReplicationSlot));
+                               RecoveryConflictPending = true;
+                               QueryCancelPending = true;
+                               InterruptPending = true;
+
+                               /*
+                                * Normal backends should exit, so that the 
startup process
+                                * can mark the slot invalid.
+                                */
+                               if (!am_cascading_walsender)
+                                       ProcDiePending = true;
+                               break;
+
                        default:
                                elog(FATAL, "unrecognized conflict mode: %d",
                                         (int) reason);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 8872c80cdf..f2838022e5 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -168,6 +168,64 @@ typedef struct ReplicationSlot
 #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
 #define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)
 
+static inline bool
+ObsoleteSlotIsInvalid(ReplicationSlot *s, bool check_invalidated_at)
+{
+       if (check_invalidated_at)
+               return (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
+                               XLogRecPtrIsInvalid(s->data.restart_lsn));
+       else
+               return (XLogRecPtrIsInvalid(s->data.restart_lsn));
+}
+
+static inline bool
+LogicalReplicationSlotIsInvalid(ReplicationSlot *s)
+{
+       return (!TransactionIdIsValid(s->effective_xmin) &&
+                       !TransactionIdIsValid(s->data.catalog_xmin));
+}
+
+static inline bool
+TransactionIdIsValidPrecedesOrEquals(TransactionId xid1, TransactionId xid2)
+{
+       return (TransactionIdIsValid(xid1) && 
TransactionIdPrecedesOrEquals(xid1, xid2));
+}
+
+static inline bool
+LogicalReplicationSlotXidsConflict(ReplicationSlot *s, TransactionId xid)
+{
+       TransactionId slot_effective_xmin;
+       TransactionId slot_catalog_xmin;
+
+       slot_effective_xmin = s->effective_xmin;
+       slot_catalog_xmin = s->data.catalog_xmin;
+
+       return (TransactionIdIsValidPrecedesOrEquals(slot_effective_xmin, xid) 
||
+                       TransactionIdIsValidPrecedesOrEquals(slot_catalog_xmin, 
xid));
+}
+
+static inline bool
+SlotIsFreshEnough(ReplicationSlot *s, XLogRecPtr oldestLSN)
+{
+       return (s->data.restart_lsn >= oldestLSN);
+}
+
+static inline bool
+LogicalSlotIsNotConflicting(ReplicationSlot *s, TransactionId xid)
+{
+       return (TransactionIdIsValid(xid) && 
!LogicalReplicationSlotXidsConflict(s, xid));
+}
+
+static inline bool
+DoNotInvalidateSlot(ReplicationSlot *s, TransactionId xid, XLogRecPtr 
oldestLSN, bool check_on_xid)
+{
+       if (check_on_xid)
+               return (LogicalReplicationSlotIsInvalid(s) || 
LogicalSlotIsNotConflicting(s, xid));
+       else
+               return (ObsoleteSlotIsInvalid(s, false) || SlotIsFreshEnough(s, 
oldestLSN));
+
+}
+
 /*
  * Shared memory control area for all of replication slots.
  */
@@ -215,7 +273,8 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
-extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
+extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno, Oid 
dboid,
+                                                                               
           TransactionId xid, bool check_on_xid);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool 
need_lock);
 extern int     ReplicationSlotIndex(ReplicationSlot *slot);
 extern bool ReplicationSlotName(int index, Name name);
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index 905af2231b..2f52100b00 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -42,6 +42,7 @@ typedef enum
        PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
        PROCSIG_RECOVERY_CONFLICT_LOCK,
        PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
+       PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
        PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
        PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
 
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 2effdea126..41f4dc372e 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -30,8 +30,10 @@ extern void InitRecoveryTransactionEnvironment(void);
 extern void ShutdownRecoveryTransactionEnvironment(void);
 
 extern void ResolveRecoveryConflictWithSnapshot(TransactionId 
snapshotConflictHorizon,
+                                                                               
                bool isCatalogRel,
                                                                                
                RelFileLocator locator);
 extern void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId 
snapshotConflictHorizon,
+                                                                               
                           bool isCatalogRel,
                                                                                
                           RelFileLocator locator);
 extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
 extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
-- 
2.34.1

Reply via email to