Hi,

On 12/20/22 7:31 PM, Robert Haas wrote:
On Tue, Dec 20, 2022 at 1:19 PM Andres Freund <and...@anarazel.de> wrote:
I don't understand what the "may*" or "*Possible" really are
about. snapshotConflictHorizon is a conflict with a certain xid - there
commonly won't be anything to conflict with. If there's a conflict in
the logical-decoding-on-standby case, we won't be able to apply it only
sometimes or such.

The way I was imagining it is that snapshotConflictHorizon tells us
whether there is a conflict with this record and then, if there is,
this new Boolean tells us whether it's relevant to logical decoding as
well.


the "may*" or "*Possible" was, most probably, because I preferred to have the 
uncertainty of the conflict mentioned in the name.
But, somehow, I was forgetting about the relationship with 
snapshotConflictHorizon.
So, agree with both of you that mentioning about the uncertainty of the 
conflict is useless.

How about "affectsLogicalDecoding", "conflictsWithSlots" or
"isCatalogRel" or such?

isCatalogRel seems fine to me.

For me too, please find attached v34 using it.

Regards,

--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From 46a728b39f7ea85f2dec60d72cb400094955b785 Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 20 Dec 2022 19:56:22 +0000
Subject: [PATCH v34 6/6] Fixing Walsender corner case with logical decoding on
 standby.

The problem is that WalSndWaitForWal() waits for the *replay* LSN to
increase, but gets woken up by walreceiver when new WAL has been
flushed. Which means that typically walsenders will get woken up at the
same time that the startup process will be - which means that by the
time the logical walsender checks GetXLogReplayRecPtr() it's unlikely
that the startup process already replayed the record and updated
XLogCtl->lastReplayedEndRecPtr.

Introducing a new condition variable to fix this corner case.
---
 src/backend/access/transam/xlogrecovery.c | 28 ++++++++++++++++++++
 src/backend/replication/walsender.c       | 31 +++++++++++++++++------
 src/backend/utils/activity/wait_event.c   |  3 +++
 src/include/access/xlogrecovery.h         |  3 +++
 src/include/replication/walsender.h       |  1 +
 src/include/utils/wait_event.h            |  1 +
 6 files changed, 59 insertions(+), 8 deletions(-)
  41.2% src/backend/access/transam/
  48.5% src/backend/replication/
   3.6% src/backend/utils/activity/
   3.4% src/include/access/

diff --git a/src/backend/access/transam/xlogrecovery.c 
b/src/backend/access/transam/xlogrecovery.c
index d5a81f9d83..ac8b169ab5 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -358,6 +358,9 @@ typedef struct XLogRecoveryCtlData
        RecoveryPauseState recoveryPauseState;
        ConditionVariable recoveryNotPausedCV;
 
+       /* Replay state (see getReplayedCV() for more explanation) */
+       ConditionVariable replayedCV;
+
        slock_t         info_lck;               /* locks shared variables shown 
above */
 } XLogRecoveryCtlData;
 
@@ -467,6 +470,7 @@ XLogRecoveryShmemInit(void)
        SpinLockInit(&XLogRecoveryCtl->info_lck);
        InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
        ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV);
+       ConditionVariableInit(&XLogRecoveryCtl->replayedCV);
 }
 
 /*
@@ -1916,6 +1920,11 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord 
*record, TimeLineID *repl
        XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
        SpinLockRelease(&XLogRecoveryCtl->info_lck);
 
+       /*
+        * wake up walsender(s) used by logical decoding on standby.
+        */
+       ConditionVariableBroadcast(&XLogRecoveryCtl->replayedCV);
+
        /*
         * If rm_redo called XLogRequestWalReceiverReply, then we wake up the
         * receiver so that it notices the updated lastReplayedEndRecPtr and 
sends
@@ -4916,3 +4925,22 @@ assign_recovery_target_xid(const char *newval, void 
*extra)
        else
                recoveryTarget = RECOVERY_TARGET_UNSET;
 }
+
+/*
+ * Return the ConditionVariable indicating that a replay has been done.
+ *
+ * This is needed for logical decoding on standby. Indeed the "problem" is that
+ * WalSndWaitForWal() waits for the *replay* LSN to increase, but gets woken up
+ * by walreceiver when new WAL has been flushed. Which means that typically
+ * walsenders will get woken up at the same time that the startup process
+ * will be - which means that by the time the logical walsender checks
+ * GetXLogReplayRecPtr() it's unlikely that the startup process already 
replayed
+ * the record and updated XLogCtl->lastReplayedEndRecPtr.
+ *
+ * The ConditionVariable XLogRecoveryCtl->replayedCV solves this corner case.
+ */
+ConditionVariable *
+getReplayedCV(void)
+{
+       return &XLogRecoveryCtl->replayedCV;
+}
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index 9662e316c9..8c8dbe812f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1548,6 +1548,7 @@ WalSndWaitForWal(XLogRecPtr loc)
 {
        int                     wakeEvents;
        static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
+       ConditionVariable *replayedCV = getReplayedCV();
 
        /*
         * Fast path to avoid acquiring the spinlock in case we already know we
@@ -1566,7 +1567,6 @@ WalSndWaitForWal(XLogRecPtr loc)
 
        for (;;)
        {
-               long            sleeptime;
 
                /* Clear any already-pending wakeups */
                ResetLatch(MyLatch);
@@ -1650,20 +1650,35 @@ WalSndWaitForWal(XLogRecPtr loc)
                WalSndKeepaliveIfNecessary();
 
                /*
-                * Sleep until something happens or we time out.  Also wait for 
the
-                * socket becoming writable, if there's still pending output.
+                * When not in recovery, sleep until something happens or we 
time out.
+                * Also wait for the socket becoming writable, if there's still 
pending output.
                 * Otherwise we might sit on sendable output data while waiting 
for
                 * new WAL to be generated.  (But if we have nothing to send, 
we don't
                 * want to wake on socket-writable.)
                 */
-               sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
+               if (!RecoveryInProgress())
+               {
+                       long            sleeptime;
+                       sleeptime = 
WalSndComputeSleeptime(GetCurrentTimestamp());
 
-               wakeEvents = WL_SOCKET_READABLE;
+                       wakeEvents = WL_SOCKET_READABLE;
 
-               if (pq_is_send_pending())
-                       wakeEvents |= WL_SOCKET_WRITEABLE;
+                       if (pq_is_send_pending())
+                               wakeEvents |= WL_SOCKET_WRITEABLE;
 
-               WalSndWait(wakeEvents, sleeptime, 
WAIT_EVENT_WAL_SENDER_WAIT_WAL);
+                       WalSndWait(wakeEvents, sleeptime * 10, 
WAIT_EVENT_WAL_SENDER_WAIT_WAL);
+               }
+               else
+               /*
+                * We are in the logical decoding on standby case.
+                * We are waiting for the startup process to replay wal 
record(s) using
+                * a timeout in case we are requested to stop.
+                */
+               {
+                       ConditionVariablePrepareToSleep(replayedCV);
+                       ConditionVariableTimedSleep(replayedCV, 1000,
+                                                                               
WAIT_EVENT_WAL_SENDER_WAIT_REPLAY);
+               }
        }
 
        /* reactivate latch so WalSndLoop knows to continue */
diff --git a/src/backend/utils/activity/wait_event.c 
b/src/backend/utils/activity/wait_event.c
index b2abd75ddb..3f6059805a 100644
--- a/src/backend/utils/activity/wait_event.c
+++ b/src/backend/utils/activity/wait_event.c
@@ -457,6 +457,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
                case WAIT_EVENT_WAL_RECEIVER_WAIT_START:
                        event_name = "WalReceiverWaitStart";
                        break;
+               case WAIT_EVENT_WAL_SENDER_WAIT_REPLAY:
+                       event_name = "WalReceiverWaitReplay";
+                       break;
                case WAIT_EVENT_XACT_GROUP_UPDATE:
                        event_name = "XactGroupUpdate";
                        break;
diff --git a/src/include/access/xlogrecovery.h 
b/src/include/access/xlogrecovery.h
index f3398425d8..0afd57ecac 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -15,6 +15,7 @@
 #include "catalog/pg_control.h"
 #include "lib/stringinfo.h"
 #include "utils/timestamp.h"
+#include "storage/condition_variable.h"
 
 /*
  * Recovery target type.
@@ -155,4 +156,6 @@ extern void RecoveryRequiresIntParameter(const char 
*param_name, int currValue,
 
 extern void xlog_outdesc(StringInfo buf, XLogReaderState *record);
 
+extern ConditionVariable *getReplayedCV(void);
+
 #endif                                                 /* XLOGRECOVERY_H */
diff --git a/src/include/replication/walsender.h 
b/src/include/replication/walsender.h
index 8336a6e719..550ef3107f 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -13,6 +13,7 @@
 #define _WALSENDER_H
 
 #include <signal.h>
+#include "storage/condition_variable.h"
 
 /*
  * What to do with a snapshot in create replication slot command.
diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h
index 0b2100be4a..30c2cf35ae 100644
--- a/src/include/utils/wait_event.h
+++ b/src/include/utils/wait_event.h
@@ -128,6 +128,7 @@ typedef enum
        WAIT_EVENT_SYNC_REP,
        WAIT_EVENT_WAL_RECEIVER_EXIT,
        WAIT_EVENT_WAL_RECEIVER_WAIT_START,
+       WAIT_EVENT_WAL_SENDER_WAIT_REPLAY,
        WAIT_EVENT_XACT_GROUP_UPDATE
 } WaitEventIPC;
 
-- 
2.34.1

From 3fdfcf1f6e836e87091c2047cc33338ef7abd8b5 Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 20 Dec 2022 19:55:38 +0000
Subject: [PATCH v34 5/6] Doc changes describing details about logical
 decoding.

Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes 
Mello
---
 doc/src/sgml/logicaldecoding.sgml | 21 +++++++++++++++++++++
 1 file changed, 21 insertions(+)
 100.0% doc/src/sgml/

diff --git a/doc/src/sgml/logicaldecoding.sgml 
b/doc/src/sgml/logicaldecoding.sgml
index 38ee69dccc..9acf16037a 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -316,6 +316,27 @@ postgres=# select * from 
pg_logical_slot_get_changes('regression_slot', NULL, NU
      may consume changes from a slot at any given time.
     </para>
 
+    <para>
+     A logical replication slot can also be created on a hot standby. To 
prevent
+     <command>VACUUM</command> from removing required rows from the system
+     catalogs, <varname>hot_standby_feedback</varname> should be set on the
+     standby. In spite of that, if any required rows get removed, the slot gets
+     invalidated. It's highly recommended to use a physical slot between the 
primary
+     and the standby. Otherwise, hot_standby_feedback will work, but only 
while the
+     connection is alive (for example a node restart would break it). Existing
+     logical slots on standby also get invalidated if wal_level on primary is 
reduced to
+     less than 'logical'.
+    </para>
+
+    <para>
+     For a logical slot to be created, it builds a historic snapshot, for which
+     information of all the currently running transactions is essential. On
+     primary, this information is available, but on standby, this information
+     has to be obtained from primary. So, slot creation may wait for some
+     activity to happen on the primary. If the primary is idle, creating a
+     logical slot on standby may take a noticeable time.
+    </para>
+
     <caution>
      <para>
       Replication slots persist across crashes and know nothing about the state
-- 
2.34.1

From 5429feffc82c7cf18482d5e95da90b5f74e1e9c2 Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 20 Dec 2022 19:54:46 +0000
Subject: [PATCH v34 4/6] New TAP test for logical decoding on standby.

Author: Craig Ringer (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes 
Mello
---
 src/test/perl/PostgreSQL/Test/Cluster.pm      |  37 ++
 src/test/recovery/meson.build                 |   1 +
 .../t/034_standby_logical_decoding.pl         | 479 ++++++++++++++++++
 3 files changed, 517 insertions(+)
   6.0% src/test/perl/PostgreSQL/Test/
  93.7% src/test/recovery/t/

diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm 
b/src/test/perl/PostgreSQL/Test/Cluster.pm
index 7411188dc8..171dc85388 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -3037,6 +3037,43 @@ $SIG{TERM} = $SIG{INT} = sub {
 
 =pod
 
+=item $node->create_logical_slot_on_standby(self, master, slot_name, dbname)
+
+Create logical replication slot on given standby
+
+=cut
+
+sub create_logical_slot_on_standby
+{
+       my ($self, $master, $slot_name, $dbname) = @_;
+       my ($stdout, $stderr);
+
+       my $handle;
+
+       $handle = IPC::Run::start(['pg_recvlogical', '-d', 
$self->connstr($dbname), '-P', 'test_decoding', '-S', $slot_name, 
'--create-slot'], '>', \$stdout, '2>', \$stderr);
+
+       # Once slot restart_lsn is created, the standby looks for 
xl_running_xacts
+       # WAL record from the restart_lsn onwards. So firstly, wait until the 
slot
+       # restart_lsn is evaluated.
+
+       $self->poll_query_until(
+               'postgres', qq[
+               SELECT restart_lsn IS NOT NULL
+               FROM pg_catalog.pg_replication_slots WHERE slot_name = 
'$slot_name'
+       ]) or die "timed out waiting for logical slot to calculate its 
restart_lsn";
+
+       # Now arrange for the xl_running_xacts record for which pg_recvlogical
+       # is waiting.
+       $master->safe_psql('postgres', 'CHECKPOINT');
+
+       $handle->finish();
+
+       is($self->slot($slot_name)->{'slot_type'}, 'logical', $slot_name . ' on 
standby created')
+               or die "could not create slot" . $slot_name;
+}
+
+=pod
+
 =back
 
 =cut
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index b8c3c104ae..81913bdfd6 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -40,6 +40,7 @@ tests += {
       't/031_recovery_conflict.pl',
       't/032_relfilenode_reuse.pl',
       't/033_replay_tsp_drops.pl',
+      't/034_standby_logical_decoding.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/034_standby_logical_decoding.pl 
b/src/test/recovery/t/034_standby_logical_decoding.pl
new file mode 100644
index 0000000000..4258844c8f
--- /dev/null
+++ b/src/test/recovery/t/034_standby_logical_decoding.pl
@@ -0,0 +1,479 @@
+# logical decoding on standby : test logical decoding,
+# recovery conflict and standby promotion.
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use Test::More tests => 42;
+
+my ($stdin, $stdout, $stderr, $ret, $handle, $slot);
+
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+
+# Name for the physical slot on primary
+my $primary_slotname = 'primary_physical';
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+       my ($node, $pat, $off) = @_;
+
+       $off = 0 unless defined $off;
+       my $log = PostgreSQL::Test::Utils::slurp_file($node->logfile);
+       return 0 if (length($log) <= $off);
+
+       $log = substr($log, $off);
+
+       return $log =~ m/$pat/;
+}
+
+# Fetch xmin columns from slot's pg_replication_slots row, after waiting for
+# given boolean condition to be true to ensure we've reached a quiescent state.
+sub wait_for_xmins
+{
+       my ($node, $slotname, $check_expr) = @_;
+
+       $node->poll_query_until(
+               'postgres', qq[
+               SELECT $check_expr
+               FROM pg_catalog.pg_replication_slots
+               WHERE slot_name = '$slotname';
+       ]) or die "Timed out waiting for slot xmins to advance";
+}
+
+# Create the required logical slots on standby.
+sub create_logical_slots
+{
+       $node_standby->create_logical_slot_on_standby($node_primary, 
'inactiveslot', 'testdb');
+       $node_standby->create_logical_slot_on_standby($node_primary, 
'activeslot', 'testdb');
+}
+
+# Acquire one of the standby logical slots created by create_logical_slots().
+# In case wait is true we are waiting for an active pid on the 'activeslot' 
slot.
+# If wait is not true it means we are testing a known failure scenario.
+sub make_slot_active
+{
+       my $wait = shift;
+       my $slot_user_handle;
+
+       print "starting pg_recvlogical\n";
+       $slot_user_handle = IPC::Run::start(['pg_recvlogical', '-d', 
$node_standby->connstr('testdb'), '-S', 'activeslot', '-f', '-', '--no-loop', 
'--start'], '>', \$stdout, '2>', \$stderr);
+
+       if ($wait)
+       # make sure activeslot is in use
+       {
+               $node_standby->poll_query_until('testdb',
+                       "SELECT EXISTS (SELECT 1 FROM pg_replication_slots 
WHERE slot_name = 'activeslot' AND active_pid IS NOT NULL)"
+               ) or die "slot never became active";
+       }
+
+       return $slot_user_handle;
+}
+
+# Check pg_recvlogical stderr
+sub check_pg_recvlogical_stderr
+{
+       my ($slot_user_handle, $check_stderr) = @_;
+       my $return;
+
+       # our client should've terminated in response to the walsender error
+       $slot_user_handle->finish;
+       $return = $?;
+       cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero");
+       if ($return) {
+               like($stderr, qr/$check_stderr/, 'slot has been invalidated');
+       }
+
+       return 0;
+}
+
+# Check if all the slots on standby are dropped. These include the 'activeslot'
+# that was acquired by make_slot_active(), and the non-active 'inactiveslot'.
+sub check_slots_dropped
+{
+       my ($slot_user_handle) = @_;
+
+       is($node_standby->slot('inactiveslot')->{'slot_type'}, '', 
'inactiveslot on standby dropped');
+       is($node_standby->slot('activeslot')->{'slot_type'}, '', 'activeslot on 
standby dropped');
+
+       check_pg_recvlogical_stderr($slot_user_handle, "conflict with 
recovery");
+}
+
+########################
+# Initialize primary node
+########################
+
+$node_primary->init(allows_streaming => 1, has_archiving => 1);
+$node_primary->append_conf('postgresql.conf', q{
+wal_level = 'logical'
+max_replication_slots = 4
+max_wal_senders = 4
+log_min_messages = 'debug2'
+log_error_verbosity = verbose
+});
+$node_primary->dump_info;
+$node_primary->start;
+
+$node_primary->psql('postgres', q[CREATE DATABASE testdb]);
+
+$node_primary->safe_psql('testdb', qq[SELECT * FROM 
pg_create_physical_replication_slot('$primary_slotname');]);
+my $backup_name = 'b1';
+$node_primary->backup($backup_name);
+
+#######################
+# Initialize standby node
+#######################
+
+$node_standby->init_from_backup(
+       $node_primary, $backup_name,
+       has_streaming => 1,
+       has_restoring => 1);
+$node_standby->append_conf('postgresql.conf',
+       qq[primary_slot_name = '$primary_slotname']);
+$node_standby->start;
+$node_primary->wait_for_catchup($node_standby, 'replay', 
$node_primary->lsn('flush'));
+
+
+##################################################
+# Test that logical decoding on the standby
+# behaves correctly.
+##################################################
+
+create_logical_slots();
+
+$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y 
text);]);
+$node_primary->safe_psql('testdb', qq[INSERT INTO decoding_test(x,y) SELECT s, 
s::text FROM generate_series(1,10) s;]);
+
+$node_primary->wait_for_catchup($node_standby, 'replay', 
$node_primary->lsn('flush'));
+
+my $result = $node_standby->safe_psql('testdb',
+       qq[SELECT pg_logical_slot_get_changes('activeslot', NULL, NULL);]);
+
+# test if basic decoding works
+is(scalar(my @foobar = split /^/m, $result),
+       14, 'Decoding produced 14 rows');
+
+# Insert some rows and verify that we get the same results from pg_recvlogical
+# and the SQL interface.
+$node_primary->safe_psql('testdb',
+       qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM 
generate_series(1,4) s;]
+);
+
+my $expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT};
+
+$node_primary->wait_for_catchup($node_standby, 'replay', 
$node_primary->lsn('flush'));
+
+my $stdout_sql = $node_standby->safe_psql('testdb',
+       qq[SELECT data FROM pg_logical_slot_peek_changes('activeslot', NULL, 
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session');
+
+my $endpos = $node_standby->safe_psql('testdb',
+       "SELECT lsn FROM pg_logical_slot_peek_changes('activeslot', NULL, NULL) 
ORDER BY lsn DESC LIMIT 1;"
+);
+print "waiting to replay $endpos\n";
+
+# Insert some rows after $endpos, which we won't read.
+$node_primary->safe_psql('testdb',
+       qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM 
generate_series(5,50) s;]
+);
+
+$node_primary->wait_for_catchup($node_standby, 'replay', 
$node_primary->lsn('flush'));
+
+my $stdout_recv = $node_standby->pg_recvlogical_upto(
+    'testdb', 'activeslot', $endpos, 180,
+    'include-xids'     => '0',
+    'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, $expected,
+    'got same expected output from pg_recvlogical decoding session');
+
+$node_standby->poll_query_until('testdb',
+       "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 
'activeslot' AND active_pid IS NULL)"
+) or die "slot never became inactive";
+
+$stdout_recv = $node_standby->pg_recvlogical_upto(
+    'testdb', 'activeslot', $endpos, 180,
+    'include-xids'     => '0',
+    'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, '', 'pg_recvlogical acknowledged changes');
+
+$node_primary->safe_psql('postgres', 'CREATE DATABASE otherdb');
+
+is( $node_primary->psql(
+        'otherdb',
+        "SELECT lsn FROM pg_logical_slot_peek_changes('activeslot', NULL, 
NULL) ORDER BY lsn DESC LIMIT 1;"
+    ),
+    3,
+    'replaying logical slot from another database fails');
+
+# drop the logical slots
+$node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('inactiveslot')]);
+$node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('activeslot')]);
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 1: hot_standby_feedback off and vacuum FULL
+##################################################
+
+create_logical_slots();
+
+# One way to reproduce recovery conflict is to run VACUUM FULL with
+# hot_standby_feedback turned off on the standby.
+$node_standby->append_conf('postgresql.conf',q[
+hot_standby_feedback = off
+]);
+$node_standby->restart;
+# ensure walreceiver feedback off by waiting for expected xmin and
+# catalog_xmin on primary. Both should be NULL since hs_feedback is off
+wait_for_xmins($node_primary, $primary_slotname,
+                          "xmin IS NULL AND catalog_xmin IS NULL");
+
+$handle = make_slot_active(1);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', 'VACUUM FULL');
+
+$node_primary->wait_for_catchup($node_standby, 'replay', 
$node_primary->lsn('flush'));
+
+# message should be issued
+ok( find_in_log(
+   $node_standby,
+  "invalidating slot \"inactiveslot\" because it conflicts with recovery"),
+  'inactiveslot slot invalidation is logged with vacuum FULL');
+
+ok( find_in_log(
+   $node_standby,
+  "invalidating slot \"activeslot\" because it conflicts with recovery"),
+  'activeslot slot invalidation is logged with vacuum FULL');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been 
updated
+ok( $node_standby->poll_query_until(
+       'postgres',
+       "select (confl_active_logicalslot = 1) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
+       'confl_active_logicalslot updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+
+$handle = make_slot_active(0);
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication 
slot \"activeslot\"");
+
+# Turn hot_standby_feedback back on
+$node_standby->append_conf('postgresql.conf',q[
+hot_standby_feedback = on
+]);
+$node_standby->restart;
+
+# ensure walreceiver feedback sent by waiting for expected xmin and
+# catalog_xmin on primary. With hot_standby_feedback on, xmin should advance,
+# but catalog_xmin should still remain NULL since there is no logical slot.
+wait_for_xmins($node_primary, $primary_slotname,
+                          "xmin IS NOT NULL AND catalog_xmin IS NULL");
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 2: conflict due to row removal with hot_standby_feedback off.
+##################################################
+
+# get the position to search from in the standby logfile
+my $logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+$node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('inactiveslot')]);
+$node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('activeslot')]);
+
+create_logical_slots();
+
+# One way to produce recovery conflict is to create/drop a relation and launch 
a vacuum
+# with hot_standby_feedback turned off on the standby.
+$node_standby->append_conf('postgresql.conf',q[
+hot_standby_feedback = off
+]);
+$node_standby->restart;
+# ensure walreceiver feedback off by waiting for expected xmin and
+# catalog_xmin on primary. Both should be NULL since hs_feedback is off
+wait_for_xmins($node_primary, $primary_slotname,
+                          "xmin IS NULL AND catalog_xmin IS NULL");
+
+$handle = make_slot_active(1);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y 
text);]);
+$node_primary->safe_psql('testdb', qq[DROP TABLE conflict_test;]);
+$node_primary->safe_psql('testdb', 'VACUUM');
+
+$node_primary->wait_for_catchup($node_standby, 'replay', 
$node_primary->lsn('flush'));
+
+# message should be issued
+ok( find_in_log(
+   $node_standby,
+  "invalidating slot \"inactiveslot\" because it conflicts with recovery", 
$logstart),
+  'inactiveslot slot invalidation is logged due to row removal');
+
+ok( find_in_log(
+   $node_standby,
+  "invalidating slot \"activeslot\" because it conflicts with recovery", 
$logstart),
+  'activeslot slot invalidation is logged due to row removal');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been 
updated
+# we now expect 2 conflicts reported as the counter persist across restarts
+ok( $node_standby->poll_query_until(
+       'postgres',
+       "select (confl_active_logicalslot = 2) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
+       'confl_active_logicalslot updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+
+$handle = make_slot_active(0);
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication 
slot \"activeslot\"");
+
+# Turn hot_standby_feedback back on
+$node_standby->append_conf('postgresql.conf',q[
+hot_standby_feedback = on
+]);
+$node_standby->restart;
+
+# ensure walreceiver feedback sent by waiting for expected xmin and
+# catalog_xmin on primary. With hot_standby_feedback on, xmin should advance,
+# but catalog_xmin should still remain NULL since there is no logical slot.
+wait_for_xmins($node_primary, $primary_slotname,
+                          "xmin IS NOT NULL AND catalog_xmin IS NULL");
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 3: incorrect wal_level on primary.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+$node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('inactiveslot')]);
+$node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('activeslot')]);
+
+create_logical_slots();
+
+$handle = make_slot_active(1);
+
+# Make primary wal_level replica. This will trigger slot conflict.
+$node_primary->append_conf('postgresql.conf',q[
+wal_level = 'replica'
+]);
+$node_primary->restart;
+
+$node_primary->wait_for_catchup($node_standby, 'replay', 
$node_primary->lsn('flush'));
+
+# message should be issued
+ok( find_in_log(
+   $node_standby,
+  "invalidating slot \"inactiveslot\" because it conflicts with recovery", 
$logstart),
+  'inactiveslot slot invalidation is logged due to wal_level');
+
+ok( find_in_log(
+   $node_standby,
+  "invalidating slot \"activeslot\" because it conflicts with recovery", 
$logstart),
+  'activeslot slot invalidation is logged due to wal_level');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been 
updated
+# we now expect 3 conflicts reported as the counter persist across restarts
+ok( $node_standby->poll_query_until(
+       'postgres',
+       "select (confl_active_logicalslot = 3) from pg_stat_database_conflicts 
where datname = 'testdb'", 't'),
+       'confl_active_logicalslot updated') or die "Timed out waiting 
confl_active_logicalslot to be updated";
+
+$handle = make_slot_active(0);
+# We are not able to read from the slot as it requires wal_level at least 
logical on master
+check_pg_recvlogical_stderr($handle, "logical decoding on standby requires 
wal_level to be at least logical on master");
+
+# Restore primary wal_level
+$node_primary->append_conf('postgresql.conf',q[
+wal_level = 'logical'
+]);
+$node_primary->restart;
+$node_primary->wait_for_catchup($node_standby, 'replay', 
$node_primary->lsn('flush'));
+
+$handle = make_slot_active(0);
+# as the slot has been invalidated we should not be able to read
+check_pg_recvlogical_stderr($handle, "cannot read from logical replication 
slot \"activeslot\"");
+
+##################################################
+# DROP DATABASE should drops it's slots, including active slots.
+##################################################
+
+$node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('inactiveslot')]);
+$node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('activeslot')]);
+create_logical_slots();
+$handle = make_slot_active(1);
+# Create a slot on a database that would not be dropped. This slot should not
+# get dropped.
+$node_standby->create_logical_slot_on_standby($node_primary, 'otherslot', 
'postgres');
+
+# dropdb on the primary to verify slots are dropped on standby
+$node_primary->safe_psql('postgres', q[DROP DATABASE testdb]);
+
+$node_primary->wait_for_catchup($node_standby, 'replay', 
$node_primary->lsn('flush'));
+
+is($node_standby->safe_psql('postgres',
+       q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 
'f',
+       'database dropped on standby');
+
+check_slots_dropped($handle);
+
+is($node_standby->slot('otherslot')->{'slot_type'}, 'logical',
+       'otherslot on standby not dropped');
+
+# Cleanup : manually drop the slot that was not dropped.
+$node_standby->psql('postgres', q[SELECT 
pg_drop_replication_slot('otherslot')]);
+
+##################################################
+# Test standby promotion and logical decoding behavior
+# after the standby gets promoted.
+##################################################
+
+$node_primary->psql('postgres', q[CREATE DATABASE testdb]);
+$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y 
text);]);
+
+# create the logical slots
+create_logical_slots();
+
+# Insert some rows before the promotion
+$node_primary->safe_psql('testdb',
+       qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM 
generate_series(1,4) s;]
+);
+
+$node_primary->wait_for_catchup($node_standby, 'replay', 
$node_primary->lsn('flush'));
+
+# promote
+$node_standby->promote;
+
+# insert some rows on promoted standby
+$node_standby->safe_psql('testdb',
+       qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM 
generate_series(5,7) s;]
+);
+
+
+$expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT
+BEGIN
+table public.decoding_test: INSERT: x[integer]:5 y[text]:'5'
+table public.decoding_test: INSERT: x[integer]:6 y[text]:'6'
+table public.decoding_test: INSERT: x[integer]:7 y[text]:'7'
+COMMIT};
+
+# check that we are decoding pre and post promotion inserted rows
+$stdout_sql = $node_standby->safe_psql('testdb',
+       qq[SELECT data FROM pg_logical_slot_peek_changes('activeslot', NULL, 
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session on 
promoted standby');
-- 
2.34.1

From 21ea7c8c793e0e0bfce764811e78aa540753f16f Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 20 Dec 2022 19:53:39 +0000
Subject: [PATCH v34 3/6] Allow logical decoding on standby.

Allow a logical slot to be created on standby. Restrict its usage
or its creation if wal_level on primary is less than logical.
During slot creation, it's restart_lsn is set to the last replayed
LSN. Effectively, a logical slot creation on standby waits for an
xl_running_xact record to arrive from primary. Conflicting slots
would be handled in next commits.

Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes 
Mello
---
 src/backend/access/transam/xlog.c         | 11 ++++
 src/backend/replication/logical/decode.c  | 22 ++++++-
 src/backend/replication/logical/logical.c | 37 +++++++-----
 src/backend/replication/slot.c            | 73 +++++++++++++++--------
 src/backend/replication/walsender.c       | 27 +++++----
 src/include/access/xlog.h                 |  1 +
 6 files changed, 118 insertions(+), 53 deletions(-)
   4.5% src/backend/access/transam/
  36.6% src/backend/replication/logical/
  57.9% src/backend/replication/

diff --git a/src/backend/access/transam/xlog.c 
b/src/backend/access/transam/xlog.c
index fca6ee4584..f9cc842a6a 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4462,6 +4462,17 @@ LocalProcessControlFile(bool reset)
        ReadControlFile();
 }
 
+/*
+ * Get the wal_level from the control file. For a standby, this value should be
+ * considered as its active wal_level, because it may be different from what
+ * was originally configured on standby.
+ */
+WalLevel
+GetActiveWalLevelOnStandby(void)
+{
+       return ControlFile->wal_level;
+}
+
 /*
  * Initialization of shared memory for XLOG
  */
diff --git a/src/backend/replication/logical/decode.c 
b/src/backend/replication/logical/decode.c
index 2cc0ac9eb0..c210721ab0 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -152,11 +152,31 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer 
*buf)
                         * can restart from there.
                         */
                        break;
+               case XLOG_PARAMETER_CHANGE:
+               {
+                       xl_parameter_change *xlrec =
+                               (xl_parameter_change *) 
XLogRecGetData(buf->record);
+
+                       /*
+                        * If wal_level on primary is reduced to less than 
logical, then we
+                        * want to prevent existing logical slots from being 
used.
+                        * Existing logical slots on standby get invalidated 
when this WAL
+                        * record is replayed; and further, slot creation fails 
when the
+                        * wal level is not sufficient; but all these 
operations are not
+                        * synchronized, so a logical slot may creep in while 
the wal_level
+                        * is being reduced. Hence this extra check.
+                        */
+                       if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                                errmsg("logical decoding on 
standby requires "
+                                                               "wal_level to 
be at least logical on master")));
+                       break;
+               }
                case XLOG_NOOP:
                case XLOG_NEXTOID:
                case XLOG_SWITCH:
                case XLOG_BACKUP_END:
-               case XLOG_PARAMETER_CHANGE:
                case XLOG_RESTORE_POINT:
                case XLOG_FPW_CHANGE:
                case XLOG_FPI_FOR_HINT:
diff --git a/src/backend/replication/logical/logical.c 
b/src/backend/replication/logical/logical.c
index 625a7f4273..a9567f2d8c 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -119,23 +119,22 @@ CheckLogicalDecodingRequirements(void)
                                
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                 errmsg("logical decoding requires a database 
connection")));
 
-       /* ----
-        * TODO: We got to change that someday soon...
-        *
-        * There's basically three things missing to allow this:
-        * 1) We need to be able to correctly and quickly identify the timeline 
a
-        *        LSN belongs to
-        * 2) We need to force hot_standby_feedback to be enabled at all times 
so
-        *        the primary cannot remove rows we need.
-        * 3) support dropping replication slots referring to a database, in
-        *        dbase_redo. There can't be any active ones due to HS recovery
-        *        conflicts, so that should be relatively easy.
-        * ----
-        */
        if (RecoveryInProgress())
-               ereport(ERROR,
-                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                errmsg("logical decoding cannot be used while 
in recovery")));
+       {
+               /*
+                * This check may have race conditions, but whenever
+                * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, 
we
+                * verify that there are no existing logical replication slots. 
And to
+                * avoid races around creating a new slot,
+                * CheckLogicalDecodingRequirements() is called once before 
creating
+                * the slot, and once when logical decoding is initially 
starting up.
+                */
+               if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL)
+                       ereport(ERROR,
+                                       
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                        errmsg("logical decoding on standby 
requires "
+                                                       "wal_level to be at 
least logical on master")));
+       }
 }
 
 /*
@@ -331,6 +330,12 @@ CreateInitDecodingContext(const char *plugin,
        LogicalDecodingContext *ctx;
        MemoryContext old_context;
 
+       /*
+        * On standby, this check is also required while creating the slot. 
Check
+        * the comments in this function.
+        */
+       CheckLogicalDecodingRequirements();
+
        /* shorter lines... */
        slot = MyReplicationSlot;
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 6a4e2cd19b..f554dac6fd 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -51,6 +51,7 @@
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/builtins.h"
+#include "access/xlogrecovery.h"
 
 /*
  * Replication slot on-disk data structure.
@@ -1175,37 +1176,46 @@ ReplicationSlotReserveWal(void)
                /*
                 * For logical slots log a standby snapshot and start logical 
decoding
                 * at exactly that position. That allows the slot to start up 
more
-                * quickly.
+                * quickly. But on a standby we cannot do WAL writes, so just 
use the
+                * replay pointer; effectively, an attempt to create a logical 
slot on
+                * standby will cause it to wait for an xl_running_xact record 
to be
+                * logged independently on the primary, so that a snapshot can 
be built
+                * using the record.
                 *
-                * That's not needed (or indeed helpful) for physical slots as 
they'll
-                * start replay at the last logged checkpoint anyway. Instead 
return
-                * the location of the last redo LSN. While that slightly 
increases
-                * the chance that we have to retry, it's where a base backup 
has to
-                * start replay at.
+                * None of this is needed (or indeed helpful) for physical 
slots as
+                * they'll start replay at the last logged checkpoint anyway. 
Instead
+                * return the location of the last redo LSN. While that slightly
+                * increases the chance that we have to retry, it's where a 
base backup
+                * has to start replay at.
                 */
-               if (!RecoveryInProgress() && SlotIsLogical(slot))
+               if (SlotIsPhysical(slot))
+                       restart_lsn = GetRedoRecPtr();
+               else if (RecoveryInProgress())
                {
-                       XLogRecPtr      flushptr;
-
-                       /* start at current insert position */
-                       restart_lsn = GetXLogInsertRecPtr();
-                       SpinLockAcquire(&slot->mutex);
-                       slot->data.restart_lsn = restart_lsn;
-                       SpinLockRelease(&slot->mutex);
-
-                       /* make sure we have enough information to start */
-                       flushptr = LogStandbySnapshot();
-
-                       /* and make sure it's fsynced to disk */
-                       XLogFlush(flushptr);
+                       restart_lsn = GetXLogReplayRecPtr(NULL);
+                       /*
+                        * Replay pointer may point one past the end of the 
record. If that
+                        * is a XLOG page boundary, it will not be a valid LSN 
for the
+                        * start of a record, so bump it up past the page 
header.
+                        */
+                       if (!XRecOffIsValid(restart_lsn))
+                       {
+                               if (restart_lsn % XLOG_BLCKSZ != 0)
+                                       elog(ERROR, "invalid replay pointer");
+
+                               /* For the first page of a segment file, it's a 
long header */
+                               if (XLogSegmentOffset(restart_lsn, 
wal_segment_size) == 0)
+                                       restart_lsn += SizeOfXLogLongPHD;
+                               else
+                                       restart_lsn += SizeOfXLogShortPHD;
+                       }
                }
                else
-               {
-                       restart_lsn = GetRedoRecPtr();
-                       SpinLockAcquire(&slot->mutex);
-                       slot->data.restart_lsn = restart_lsn;
-                       SpinLockRelease(&slot->mutex);
-               }
+                       restart_lsn = GetXLogInsertRecPtr();
+
+               SpinLockAcquire(&slot->mutex);
+               slot->data.restart_lsn = restart_lsn;
+               SpinLockRelease(&slot->mutex);
 
                /* prevent WAL removal as fast as possible */
                ReplicationSlotsComputeRequiredLSN();
@@ -1221,6 +1231,17 @@ ReplicationSlotReserveWal(void)
                if (XLogGetLastRemovedSegno() < segno)
                        break;
        }
+
+       if (!RecoveryInProgress() && SlotIsLogical(slot))
+       {
+               XLogRecPtr      flushptr;
+
+               /* make sure we have enough information to start */
+               flushptr = LogStandbySnapshot();
+
+               /* and make sure it's fsynced to disk */
+               XLogFlush(flushptr);
+       }
 }
 
 /*
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index 64fbd52e34..9662e316c9 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -906,14 +906,18 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr 
targetPagePtr, int req
        int                     count;
        WALReadError errinfo;
        XLogSegNo       segno;
-       TimeLineID      currTLI = GetWALInsertionTimeLine();
+       TimeLineID      currTLI;
 
        /*
-        * Since logical decoding is only permitted on a primary server, we know
-        * that the current timeline ID can't be changing any more. If we did 
this
-        * on a standby, we'd have to worry about the values we compute here
-        * becoming invalid due to a promotion or timeline change.
+        * Since logical decoding is also permitted on a standby server, we need
+        * to check if the server is in recovery to decide how to get the 
current
+        * timeline ID (so that it also cover the promotion or timeline change 
cases).
         */
+       if (!RecoveryInProgress())
+               currTLI = GetWALInsertionTimeLine();
+       else
+               GetXLogReplayRecPtr(&currTLI);
+
        XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
        sendTimeLineIsHistoric = (state->currTLI != currTLI);
        sendTimeLine = state->currTLI;
@@ -3074,10 +3078,12 @@ XLogSendLogical(void)
         * If first time through in this session, initialize flushPtr.  
Otherwise,
         * we only need to update flushPtr if EndRecPtr is past it.
         */
-       if (flushPtr == InvalidXLogRecPtr)
-               flushPtr = GetFlushRecPtr(NULL);
-       else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
-               flushPtr = GetFlushRecPtr(NULL);
+       if (flushPtr == InvalidXLogRecPtr ||
+               logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
+       {
+               flushPtr = (am_cascading_walsender ?
+                                       GetStandbyFlushRecPtr(NULL) : 
GetFlushRecPtr(NULL));
+       }
 
        /* If EndRecPtr is still past our flushPtr, it means we caught up. */
        if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
@@ -3168,7 +3174,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli)
        receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
        replayPtr = GetXLogReplayRecPtr(&replayTLI);
 
-       *tli = replayTLI;
+       if (tli)
+               *tli = replayTLI;
 
        result = replayPtr;
        if (receiveTLI == replayTLI && receivePtr > replayPtr)
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 1fbd48fbda..027e155e8e 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -230,6 +230,7 @@ extern void XLOGShmemInit(void);
 extern void BootStrapXLOG(void);
 extern void InitializeWalConsistencyChecking(void);
 extern void LocalProcessControlFile(bool reset);
+extern WalLevel GetActiveWalLevelOnStandby(void);
 extern void StartupXLOG(void);
 extern void ShutdownXLOG(int code, Datum arg);
 extern void CreateCheckPoint(int flags);
-- 
2.34.1

From dd9e7719d008f8c54ab0f59c31b576a5881e36e9 Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 20 Dec 2022 19:52:46 +0000
Subject: [PATCH v34 2/6] Handle logical slot conflicts on standby.

During WAL replay on standby, when slot conflict is identified,
invalidate such slots. Also do the same thing if wal_level on master
is reduced to below logical and there are existing logical slots
on standby. Introduce a new ProcSignalReason value for slot
conflict recovery. Arrange for a new pg_stat_database_conflicts field:
confl_active_logicalslot.

Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes 
Mello
---
 doc/src/sgml/monitoring.sgml                  |  11 +
 src/backend/access/gist/gistxlog.c            |   2 +
 src/backend/access/hash/hash_xlog.c           |   1 +
 src/backend/access/heap/heapam.c              |   3 +
 src/backend/access/nbtree/nbtxlog.c           |   2 +
 src/backend/access/spgist/spgxlog.c           |   1 +
 src/backend/access/transam/xlog.c             |  13 ++
 src/backend/catalog/system_views.sql          |   3 +-
 .../replication/logical/logicalfuncs.c        |   7 +-
 src/backend/replication/slot.c                | 209 ++++++++++++++++++
 src/backend/replication/walsender.c           |   8 +
 src/backend/storage/ipc/procarray.c           |   4 +
 src/backend/storage/ipc/procsignal.c          |   3 +
 src/backend/storage/ipc/standby.c             |  13 +-
 src/backend/tcop/postgres.c                   |  22 ++
 src/backend/utils/activity/pgstat_database.c  |   4 +
 src/backend/utils/adt/pgstatfuncs.c           |   3 +
 src/include/catalog/pg_proc.dat               |   5 +
 src/include/pgstat.h                          |   1 +
 src/include/replication/slot.h                |   2 +
 src/include/storage/procsignal.h              |   1 +
 src/include/storage/standby.h                 |   2 +
 src/test/regress/expected/rules.out           |   3 +-
 23 files changed, 318 insertions(+), 5 deletions(-)
   3.9% doc/src/sgml/
   5.3% src/backend/access/transam/
   3.1% src/backend/access/
   3.9% src/backend/replication/logical/
  59.0% src/backend/replication/
   6.7% src/backend/storage/ipc/
   8.0% src/backend/tcop/
   3.4% src/backend/
   5.6% src/include/

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 363b183e5f..27235418a6 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -4317,6 +4317,17 @@ SELECT pid, wait_event_type, wait_event FROM 
pg_stat_activity WHERE wait_event i
        deadlocks
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_active_logicalslot</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of active logical slots in this database that have been
+       invalidated because they conflict with recovery (note that inactive ones
+       are also invalidated but do not increment this counter)
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/access/gist/gistxlog.c 
b/src/backend/access/gist/gistxlog.c
index f47587b8f5..285126ce50 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -196,6 +196,7 @@ gistRedoDeleteRecord(XLogReaderState *record)
                XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
 
                
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+                                                                               
        xldata->isCatalogRel,
                                                                                
        rlocator);
        }
 
@@ -396,6 +397,7 @@ gistRedoPageReuse(XLogReaderState *record)
         */
        if (InHotStandby)
                
ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
+                                                                               
                   xlrec->isCatalogRel,
                                                                                
                   xlrec->locator);
 }
 
diff --git a/src/backend/access/hash/hash_xlog.c 
b/src/backend/access/hash/hash_xlog.c
index b452697a2f..75dd33e581 100644
--- a/src/backend/access/hash/hash_xlog.c
+++ b/src/backend/access/hash/hash_xlog.c
@@ -1001,6 +1001,7 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
 
                XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
                
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+                                                                               
        xldata->isCatalogRel,
                                                                                
        rlocator);
        }
 
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 1b344eace7..1116eb3e3a 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8693,6 +8693,7 @@ heap_xlog_prune(XLogReaderState *record)
         */
        if (InHotStandby)
                
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+                                                                               
        xlrec->isCatalogRel,
                                                                                
        rlocator);
 
        /*
@@ -8862,6 +8863,7 @@ heap_xlog_visible(XLogReaderState *record)
         */
        if (InHotStandby)
                
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+                                                                               
        xlrec->flags & VISIBILITYMAP_ON_CATALOG_ACCESSIBLE_IN_LOGICAL_DECODING,
                                                                                
        rlocator);
 
        /*
@@ -9117,6 +9119,7 @@ heap_xlog_freeze_page(XLogReaderState *record)
 
                XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
                
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+                                                                               
        xlrec->isCatalogRel,
                                                                                
        rlocator);
        }
 
diff --git a/src/backend/access/nbtree/nbtxlog.c 
b/src/backend/access/nbtree/nbtxlog.c
index 3e311a98a6..cfede906a3 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -669,6 +669,7 @@ btree_xlog_delete(XLogReaderState *record)
                XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
 
                
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+                                                                               
        xlrec->isCatalogRel,
                                                                                
        rlocator);
        }
 
@@ -1007,6 +1008,7 @@ btree_xlog_reuse_page(XLogReaderState *record)
 
        if (InHotStandby)
                
ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
+                                                                               
                   xlrec->isCatalogRel,
                                                                                
                   xlrec->locator);
 }
 
diff --git a/src/backend/access/spgist/spgxlog.c 
b/src/backend/access/spgist/spgxlog.c
index 44adc2098f..20165bc588 100644
--- a/src/backend/access/spgist/spgxlog.c
+++ b/src/backend/access/spgist/spgxlog.c
@@ -879,6 +879,7 @@ spgRedoVacuumRedirect(XLogReaderState *record)
 
                XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
                
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
+                                                                               
        xldata->isCatalogRel,
                                                                                
        locator);
        }
 
diff --git a/src/backend/access/transam/xlog.c 
b/src/backend/access/transam/xlog.c
index 91473b00d9..fca6ee4584 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7958,6 +7958,19 @@ xlog_redo(XLogReaderState *record)
                /* Update our copy of the parameters in pg_control */
                memcpy(&xlrec, XLogRecGetData(record), 
sizeof(xl_parameter_change));
 
+               /*
+                * Invalidate logical slots if we are in hot standby and the 
primary does not
+                * have a WAL level sufficient for logical decoding. No need to 
search
+                * for potentially conflicting logically slots if standby is 
running
+                * with wal_level lower than logical, because in that case, we 
would
+                * have either disallowed creation of logical slots or 
invalidated existing
+                * ones.
+                */
+               if (InRecovery && InHotStandby &&
+                       xlrec.wal_level < WAL_LEVEL_LOGICAL &&
+                       wal_level >= WAL_LEVEL_LOGICAL)
+                       
InvalidateConflictingLogicalReplicationSlots(InvalidOid,InvalidTransactionId);
+
                LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
                ControlFile->MaxConnections = xlrec.MaxConnections;
                ControlFile->max_worker_processes = xlrec.max_worker_processes;
diff --git a/src/backend/catalog/system_views.sql 
b/src/backend/catalog/system_views.sql
index 2d8104b090..0e0b8ef415 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1065,7 +1065,8 @@ CREATE VIEW pg_stat_database_conflicts AS
             pg_stat_get_db_conflict_lock(D.oid) AS confl_lock,
             pg_stat_get_db_conflict_snapshot(D.oid) AS confl_snapshot,
             pg_stat_get_db_conflict_bufferpin(D.oid) AS confl_bufferpin,
-            pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock
+            pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock,
+            pg_stat_get_db_conflict_logicalslot(D.oid) AS 
confl_active_logicalslot
     FROM pg_database D;
 
 CREATE VIEW pg_stat_user_functions AS
diff --git a/src/backend/replication/logical/logicalfuncs.c 
b/src/backend/replication/logical/logicalfuncs.c
index 5c23178570..8432de219b 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -216,11 +216,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, 
bool confirm, bool bin
 
                /*
                 * After the sanity checks in CreateDecodingContext, make sure 
the
-                * restart_lsn is valid.  Avoid "cannot get changes" wording in 
this
+                * restart_lsn is valid or both xmin and catalog_xmin are valid.
+                * Avoid "cannot get changes" wording in this
                 * errmsg because that'd be confusingly ambiguous about no 
changes
                 * being available.
                 */
-               if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
+               if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)
+                       || (!TransactionIdIsValid(MyReplicationSlot->data.xmin)
+                               && 
!TransactionIdIsValid(MyReplicationSlot->data.catalog_xmin)))
                        ereport(ERROR,
                                        
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                         errmsg("can no longer get changes from 
replication slot \"%s\"",
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 899acfd912..6a4e2cd19b 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1432,6 +1432,215 @@ restart:
        return invalidated;
 }
 
+/*
+ * Helper for InvalidateConflictingLogicalReplicationSlot -- acquires the 
given slot
+ * and mark it invalid, if necessary and possible.
+ *
+ * Returns whether ReplicationSlotControlLock was released in the interim (and
+ * in that case we're not holding the lock at return, otherwise we are).
+ *
+ * This is inherently racy, because we release the LWLock
+ * for syscalls, so caller must restart if we return true.
+ */
+static bool
+InvalidatePossiblyConflictingLogicalReplicationSlot(ReplicationSlot *s, 
TransactionId xid)
+{
+       int             last_signaled_pid = 0;
+       bool    released_lock = false;
+
+       for (;;)
+       {
+               TransactionId slot_xmin;
+               TransactionId slot_catalog_xmin;
+               NameData        slotname;
+               int                     active_pid = 0;
+
+               Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, 
LW_SHARED));
+
+               if (!s->in_use)
+               {
+                       if (released_lock)
+                               LWLockRelease(ReplicationSlotControlLock);
+                       break;
+               }
+
+               /*
+                * Check if the slot needs to be invalidated. If it needs to be
+                * invalidated, and is not currently acquired, acquire it and 
mark it
+                * as having been invalidated. We do this with the spinlock 
held to
+                * avoid race conditions -- for example the xmin(s) could move 
forward
+                * , or the slot could be dropped.
+                */
+               SpinLockAcquire(&s->mutex);
+
+               slot_xmin = s->data.xmin;
+               slot_catalog_xmin = s->data.catalog_xmin;
+
+               /*
+                * If the slot is already invalid or is not conflicting, we 
don't need to
+                * do anything.
+                */
+
+               /* slot has been invalidated */
+               if ((!TransactionIdIsValid(slot_xmin) && 
!TransactionIdIsValid(slot_catalog_xmin))
+                       ||
+               /*
+                * we are not forcing for invalidation because the xid is valid
+                * and this is a non conflicting slot
+                */
+                       (TransactionIdIsValid(xid) && !(
+                               (TransactionIdIsValid(slot_xmin) && 
TransactionIdPrecedesOrEquals(slot_xmin, xid))
+                               ||
+                               (TransactionIdIsValid(slot_catalog_xmin) && 
TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid))
+                               ))
+                       )
+               {
+                       SpinLockRelease(&s->mutex);
+                       if (released_lock)
+                               LWLockRelease(ReplicationSlotControlLock);
+                       break;
+               }
+
+               slotname = s->data.name;
+               active_pid = s->active_pid;
+
+               /*
+                * If the slot can be acquired, do so and mark it invalidated
+                * immediately.  Otherwise we'll signal the owning process, 
below, and
+                * retry.
+                */
+               if (active_pid == 0)
+               {
+                       MyReplicationSlot = s;
+                       s->active_pid = MyProcPid;
+                       s->data.xmin = InvalidTransactionId;
+                       s->data.catalog_xmin = InvalidTransactionId;
+               }
+
+               SpinLockRelease(&s->mutex);
+
+               if (active_pid != 0)
+               {
+                       /*
+                        * Prepare the sleep on the slot's condition variable 
before
+                        * releasing the lock, to close a possible race 
condition if the
+                        * slot is released before the sleep below.
+                        */
+
+                       ConditionVariablePrepareToSleep(&s->active_cv);
+
+                       LWLockRelease(ReplicationSlotControlLock);
+                       released_lock = true;
+
+                       /*
+                        * Signal to terminate the process that owns the slot, 
if we
+                        * haven't already signalled it.  (Avoidance of repeated
+                        * signalling is the only reason for there to be a loop 
in this
+                        * routine; otherwise we could rely on caller's restart 
loop.)
+                        *
+                        * There is the race condition that other process may 
own the slot
+                        * after its current owner process is terminated and 
before this
+                        * process owns it. To handle that, we signal only if 
the PID of
+                        * the owning process has changed from the previous 
time. (This
+                        * logic assumes that the same PID is not reused very 
quickly.)
+                        */
+                       if (last_signaled_pid != active_pid)
+                       {
+                               ereport(LOG,
+                                               (errmsg("terminating process %d 
because replication slot \"%s\" conflicts with recovery",
+                                                               active_pid, 
NameStr(slotname))));
+
+                               (void) SendProcSignal(active_pid, 
PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, InvalidBackendId);
+                               last_signaled_pid = active_pid;
+                       }
+
+                       /* Wait until the slot is released. */
+                       ConditionVariableSleep(&s->active_cv,
+                                                                       
WAIT_EVENT_REPLICATION_SLOT_DROP);
+
+                       /*
+                        * Re-acquire lock and start over; we expect to 
invalidate the
+                        * slot next time (unless another process acquires the 
slot in the
+                        * meantime).
+                        */
+                       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+                       continue;
+               }
+               else
+               {
+                       /*
+                        * We hold the slot now and have already invalidated 
it; flush it
+                        * to ensure that state persists.
+                        *
+                        * Don't want to hold ReplicationSlotControlLock across 
file
+                        * system operations, so release it now but be sure to 
tell caller
+                        * to restart from scratch.
+                        */
+                       LWLockRelease(ReplicationSlotControlLock);
+                       released_lock = true;
+
+                       /* Make sure the invalidated state persists across 
server restart */
+                       ReplicationSlotMarkDirty();
+                       ReplicationSlotSave();
+                       ReplicationSlotRelease();
+                       pgstat_drop_replslot(s);
+
+                       ereport(LOG,
+                                       (errmsg("invalidating slot \"%s\" 
because it conflicts with recovery", NameStr(slotname))));
+
+                       /* done with this slot for now */
+                       break;
+               }
+       }
+
+       Assert(!released_lock == 
LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
+
+       return released_lock;
+}
+
+/*
+ * Resolve recovery conflicts with logical slots.
+ *
+ * When xid is valid, it means that we are about to remove rows older than xid.
+ * Therefore we need to invalidate slots that depend on seeing those rows.
+ * When xid is invalid, invalidate all logical slots. This is required when the
+ * master wal_level is set back to replica, so existing logical slots need to
+ * be invalidated.
+ */
+void
+InvalidateConflictingLogicalReplicationSlots(Oid dboid, TransactionId xid)
+{
+
+       Assert(max_replication_slots >= 0);
+
+       if (max_replication_slots == 0)
+               return;
+restart:
+       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+       for (int i = 0; i < max_replication_slots; i++)
+       {
+               ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+               if (!s->in_use)
+                       continue;
+
+               /* We are only dealing with *logical* slot conflicts. */
+               if (!SlotIsLogical(s))
+                       continue;
+
+               /* not our database and we don't want all the database, skip */
+               if (s->data.database != dboid && TransactionIdIsValid(xid))
+                       continue;
+
+               if (InvalidatePossiblyConflictingLogicalReplicationSlot(s, xid))
+               {
+                       /* if the lock was released, we need to restart from 
scratch */
+                       goto restart;
+               }
+       }
+       LWLockRelease(ReplicationSlotControlLock);
+}
+
 /*
  * Flush all replication slots to disk.
  *
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index c11bb3716f..64fbd52e34 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1253,6 +1253,14 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
        ReplicationSlotAcquire(cmd->slotname, true);
 
+       if (!TransactionIdIsValid(MyReplicationSlot->data.xmin)
+                && !TransactionIdIsValid(MyReplicationSlot->data.catalog_xmin))
+               ereport(ERROR,
+                               
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("cannot read from logical replication 
slot \"%s\"",
+                                               cmd->slotname),
+                                errdetail("This slot has been invalidated 
because it was conflicting with recovery.")));
+
        if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
                ereport(ERROR,
                                
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
diff --git a/src/backend/storage/ipc/procarray.c 
b/src/backend/storage/ipc/procarray.c
index 0176f30270..d68b752c91 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -3477,6 +3477,10 @@ SignalVirtualTransaction(VirtualTransactionId vxid, 
ProcSignalReason sigmode,
 
                GET_VXID_FROM_PGPROC(procvxid, *proc);
 
+               /*
+                * Note: vxid.localTransactionId can be invalid, which means the
+                * request is to signal the pid that is not running a 
transaction.
+                */
                if (procvxid.backendId == vxid.backendId &&
                        procvxid.localTransactionId == vxid.localTransactionId)
                {
diff --git a/src/backend/storage/ipc/procsignal.c 
b/src/backend/storage/ipc/procsignal.c
index 7767657f27..1b3bf943c1 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -669,6 +669,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
        if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT))
                RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
 
+       if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT))
+               
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT);
+
        if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK))
                
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
 
diff --git a/src/backend/storage/ipc/standby.c 
b/src/backend/storage/ipc/standby.c
index f43229dfda..f78cf5de68 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -35,6 +35,7 @@
 #include "utils/ps_status.h"
 #include "utils/timeout.h"
 #include "utils/timestamp.h"
+#include "replication/slot.h"
 
 /* User-settable GUC parameters */
 int                    vacuum_defer_cleanup_age;
@@ -475,6 +476,7 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId 
*waitlist,
  */
 void
 ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
+                                                                       bool 
isCatalogRel,
                                                                        
RelFileLocator locator)
 {
        VirtualTransactionId *backends;
@@ -499,6 +501,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId 
snapshotConflictHorizon,
                                                                                
   PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
                                                                                
   WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
                                                                                
   true);
+
+       if (isCatalogRel)
+               InvalidateConflictingLogicalReplicationSlots(locator.dbOid, 
snapshotConflictHorizon);
 }
 
 /*
@@ -507,6 +512,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId 
snapshotConflictHorizon,
  */
 void
 ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId 
snapshotConflictHorizon,
+                                                                               
   bool isCatalogRel,
                                                                                
   RelFileLocator locator)
 {
        /*
@@ -525,7 +531,9 @@ 
ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHor
                TransactionId truncated;
 
                truncated = XidFromFullTransactionId(snapshotConflictHorizon);
-               ResolveRecoveryConflictWithSnapshot(truncated, locator);
+               ResolveRecoveryConflictWithSnapshot(truncated,
+                                                                               
        isCatalogRel,
+                                                                               
        locator);
        }
 }
 
@@ -1486,6 +1494,9 @@ get_recovery_conflict_desc(ProcSignalReason reason)
                case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
                        reasonDesc = _("recovery conflict on snapshot");
                        break;
+               case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+                       reasonDesc = _("recovery conflict on replication slot");
+                       break;
                case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
                        reasonDesc = _("recovery conflict on buffer deadlock");
                        break;
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 01d264b5ab..05da83bf5b 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -2482,6 +2482,9 @@ errdetail_recovery_conflict(void)
                case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
                        errdetail("User query might have needed to see row 
versions that must be removed.");
                        break;
+               case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+                       errdetail("User was using the logical slot that must be 
dropped.");
+                       break;
                case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
                        errdetail("User transaction caused buffer deadlock with 
recovery.");
                        break;
@@ -3051,6 +3054,25 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
                        case PROCSIG_RECOVERY_CONFLICT_LOCK:
                        case PROCSIG_RECOVERY_CONFLICT_TABLESPACE:
                        case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
+                       case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+                               /*
+                                * For conflicts that require a logical slot to 
be invalidated, the
+                                * requirement is for the signal receiver to 
release the slot,
+                                * so that it could be invalidated by the 
signal sender. So for
+                                * normal backends, the transaction should be 
aborted, just
+                                * like for other recovery conflicts. But if 
it's walsender on
+                                * standby, then it has to be killed so as to 
release an
+                                * acquired logical slot.
+                                */
+                               if (am_cascading_walsender &&
+                                       reason == 
PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT &&
+                                       MyReplicationSlot && 
SlotIsLogical(MyReplicationSlot))
+                               {
+                                       RecoveryConflictPending = true;
+                                       QueryCancelPending = true;
+                                       InterruptPending = true;
+                                       break;
+                               }
 
                                /*
                                 * If we aren't in a transaction any longer 
then ignore.
diff --git a/src/backend/utils/activity/pgstat_database.c 
b/src/backend/utils/activity/pgstat_database.c
index 290086fc22..7a8909d8b9 100644
--- a/src/backend/utils/activity/pgstat_database.c
+++ b/src/backend/utils/activity/pgstat_database.c
@@ -109,6 +109,9 @@ pgstat_report_recovery_conflict(int reason)
                case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN:
                        dbentry->conflict_bufferpin++;
                        break;
+               case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+                       dbentry->conflict_logicalslot++;
+                       break;
                case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
                        dbentry->conflict_startup_deadlock++;
                        break;
@@ -387,6 +390,7 @@ pgstat_database_flush_cb(PgStat_EntryRef *entry_ref, bool 
nowait)
        PGSTAT_ACCUM_DBCOUNT(conflict_tablespace);
        PGSTAT_ACCUM_DBCOUNT(conflict_lock);
        PGSTAT_ACCUM_DBCOUNT(conflict_snapshot);
+       PGSTAT_ACCUM_DBCOUNT(conflict_logicalslot);
        PGSTAT_ACCUM_DBCOUNT(conflict_bufferpin);
        PGSTAT_ACCUM_DBCOUNT(conflict_startup_deadlock);
 
diff --git a/src/backend/utils/adt/pgstatfuncs.c 
b/src/backend/utils/adt/pgstatfuncs.c
index 46f98fd67f..41eb6256ea 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1055,6 +1055,8 @@ PG_STAT_GET_DBENTRY_INT64(xact_commit)
 /* pg_stat_get_db_xact_rollback */
 PG_STAT_GET_DBENTRY_INT64(xact_rollback)
 
+/* pg_stat_get_db_conflict_logicalslot */
+PG_STAT_GET_DBENTRY_INT64(conflict_logicalslot)
 
 Datum
 pg_stat_get_db_stat_reset_time(PG_FUNCTION_ARGS)
@@ -1088,6 +1090,7 @@ pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS)
                result = (int64) (dbentry->conflict_tablespace +
                                                  dbentry->conflict_lock +
                                                  dbentry->conflict_snapshot +
+                                                 dbentry->conflict_logicalslot 
+
                                                  dbentry->conflict_bufferpin +
                                                  
dbentry->conflict_startup_deadlock);
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 98d90d9338..21dd65a483 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5546,6 +5546,11 @@
   proname => 'pg_stat_get_db_conflict_snapshot', provolatile => 's',
   proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
   prosrc => 'pg_stat_get_db_conflict_snapshot' },
+{ oid => '9901',
+  descr => 'statistics: recovery conflicts in database caused by logical 
replication slot',
+  proname => 'pg_stat_get_db_conflict_logicalslot', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
+  prosrc => 'pg_stat_get_db_conflict_logicalslot' },
 { oid => '3068',
   descr => 'statistics: recovery conflicts in database caused by shared buffer 
pin',
   proname => 'pg_stat_get_db_conflict_bufferpin', provolatile => 's',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index a3df8d27c3..7ffce84d07 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -291,6 +291,7 @@ typedef struct PgStat_StatDBEntry
        PgStat_Counter conflict_tablespace;
        PgStat_Counter conflict_lock;
        PgStat_Counter conflict_snapshot;
+       PgStat_Counter conflict_logicalslot;
        PgStat_Counter conflict_bufferpin;
        PgStat_Counter conflict_startup_deadlock;
        PgStat_Counter temp_files;
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 65f2c74239..0ed1d8af28 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -216,6 +216,7 @@ extern XLogRecPtr 
ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
 extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
+extern void InvalidateConflictingLogicalReplicationSlots(Oid dboid, 
TransactionId xid);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool 
need_lock);
 extern int     ReplicationSlotIndex(ReplicationSlot *slot);
 extern bool ReplicationSlotName(int index, Name name);
@@ -227,5 +228,6 @@ extern void CheckPointReplicationSlots(void);
 
 extern void CheckSlotRequirements(void);
 extern void CheckSlotPermissions(void);
+extern void ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId 
xid, char *reason);
 
 #endif                                                 /* SLOT_H */
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index ee636900f3..56096bd3e2 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -41,6 +41,7 @@ typedef enum
        PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
        PROCSIG_RECOVERY_CONFLICT_LOCK,
        PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
+       PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
        PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
        PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
 
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index e46c934c56..7df66d6136 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -30,8 +30,10 @@ extern void InitRecoveryTransactionEnvironment(void);
 extern void ShutdownRecoveryTransactionEnvironment(void);
 
 extern void ResolveRecoveryConflictWithSnapshot(TransactionId 
snapshotConflictHorizon,
+                                                                               
                bool isCatalogRel,
                                                                                
                RelFileLocator locator);
 extern void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId 
snapshotConflictHorizon,
+                                                                               
                           bool isCatalogRel,
                                                                                
                           RelFileLocator locator);
 extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
 extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
diff --git a/src/test/regress/expected/rules.out 
b/src/test/regress/expected/rules.out
index fb9f936d43..1cc62c447d 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1868,7 +1868,8 @@ pg_stat_database_conflicts| SELECT d.oid AS datid,
     pg_stat_get_db_conflict_lock(d.oid) AS confl_lock,
     pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot,
     pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin,
-    pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock
+    pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock,
+    pg_stat_get_db_conflict_logicalslot(d.oid) AS confl_active_logicalslot
    FROM pg_database d;
 pg_stat_gssapi| SELECT s.pid,
     s.gss_auth AS gss_authenticated,
-- 
2.34.1

From df1ed6b773908407c2c165eef1627eb54be11d10 Mon Sep 17 00:00:00 2001
From: bdrouvotAWS <bdrou...@amazon.com>
Date: Tue, 20 Dec 2022 19:46:06 +0000
Subject: [PATCH v34 1/6] Add info in WAL records in preparation for logical
 slot conflict handling.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Overall design:

1. We want to enable logical decoding on standbys, but replay of WAL
from the primary might remove data that is needed by logical decoding,
causing replication conflicts much as hot standby does.

2. Our chosen strategy for dealing with this type of replication slot
is to invalidate logical slots for which needed data has been removed.

3. To do this we need the latestRemovedXid for each change, just as we
do for physical replication conflicts, but we also need to know
whether any particular change was to data that logical replication
might access.

4. We can't rely on the standby's relcache entries for this purpose in
any way, because the WAL record that causes the problem might be
replayed before the standby even reaches consistency.

5. Therefore every WAL record that potentially removes data from the
index or heap must carry a flag indicating whether or not it is one
that might be accessed during logical decoding.

Why do we need this for logical decoding on standby?

First, let's forget about logical decoding on standby and recall that
on a primary database, any catalog rows that may be needed by a logical
decoding replication slot are not removed.

This is done thanks to the catalog_xmin associated with the logical
replication slot.

But, with logical decoding on standby, in the following cases:

- hot_standby_feedback is off
- hot_standby_feedback is on but there is no a physical slot between
  the primary and the standby. Then, hot_standby_feedback will work,
  but only while the connection is alive (for example a node restart
  would break it)

Then, the primary may delete system catalog rows that could be needed
by the logical decoding on the standby (as it does not know about the
catalog_xmin on the standby).

So, it’s mandatory to identify those rows and invalidate the slots
that may need them if any. Identifying those rows is the purpose of
this commit.

Implementation:

When a WAL replay on standby indicates that a catalog table tuple is
to be deleted by an xid that is greater than a logical slot's
catalog_xmin, then that means the slot's catalog_xmin conflicts with
the xid, and we need to handle the conflict. While subsequent commits
will do the actual conflict handling, this commit adds a new field
isCatalogRel in such WAL records (and a new bit set in the
xl_heap_visible flags field), that is true for catalog tables, so as to
arrange for conflict handling.

Author: Andres Freund (in an older version), Amit Khandekar, Bertrand
Drouvot
Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de
Royes Mello
---
 contrib/test_decoding/expected/ddl.out  | 65 +++++++++++++++++++++++++
 contrib/test_decoding/sql/ddl.sql       | 23 +++++++++
 doc/src/sgml/catalogs.sgml              | 11 +++++
 src/backend/access/common/reloptions.c  |  2 +-
 src/backend/access/gist/gistxlog.c      |  1 +
 src/backend/access/hash/hashinsert.c    |  1 +
 src/backend/access/heap/heapam.c        |  5 +-
 src/backend/access/heap/pruneheap.c     |  1 +
 src/backend/access/heap/visibilitymap.c |  3 +-
 src/backend/access/nbtree/nbtpage.c     |  2 +
 src/backend/access/spgist/spgvacuum.c   |  1 +
 src/backend/catalog/index.c             | 10 ++--
 src/backend/commands/tablecmds.c        | 55 ++++++++++++++++++++-
 src/include/access/gistxlog.h           |  6 ++-
 src/include/access/hash_xlog.h          |  3 +-
 src/include/access/heapam_xlog.h        |  8 +--
 src/include/access/nbtxlog.h            |  6 ++-
 src/include/access/spgxlog.h            |  1 +
 src/include/access/visibilitymapdefs.h  |  9 ++--
 src/include/catalog/pg_index.h          |  2 +
 src/include/utils/rel.h                 | 14 +++++-
 21 files changed, 208 insertions(+), 21 deletions(-)
  28.0% contrib/test_decoding/expected/
  12.0% contrib/test_decoding/sql/
   4.7% doc/src/sgml/
   5.8% src/backend/access/heap/
   4.0% src/backend/access/
   3.1% src/backend/catalog/
  16.3% src/backend/commands/
  19.9% src/include/access/
   4.7% src/include/utils/

diff --git a/contrib/test_decoding/expected/ddl.out 
b/contrib/test_decoding/expected/ddl.out
index 9a28b5ddc5..48fb44c575 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -483,6 +483,7 @@ CREATE TABLE replication_metadata (
 )
 WITH (user_catalog_table = true)
 ;
+CREATE INDEX replication_metadata_idx1 on replication_metadata(relation);
 \d+ replication_metadata
                                                  Table 
"public.replication_metadata"
   Column  |  Type   | Collation | Nullable |                     Default       
               | Storage  | Stats target | Description 
@@ -492,11 +493,19 @@ WITH (user_catalog_table = true)
  options  | text[]  |           |          |                                   
               | extended |              | 
 Indexes:
     "replication_metadata_pkey" PRIMARY KEY, btree (id)
+    "replication_metadata_idx1" btree (relation)
 Options: user_catalog_table=true
 
+SELECT bool_and(indisusercatalog) from pg_index where indrelid = 
'replication_metadata'::regclass;
+ bool_and 
+----------
+ t
+(1 row)
+
 INSERT INTO replication_metadata(relation, options)
 VALUES ('foo', ARRAY['a', 'b']);
 ALTER TABLE replication_metadata RESET (user_catalog_table);
+CREATE INDEX replication_metadata_idx2 on replication_metadata(relation);
 \d+ replication_metadata
                                                  Table 
"public.replication_metadata"
   Column  |  Type   | Collation | Nullable |                     Default       
               | Storage  | Stats target | Description 
@@ -506,10 +515,19 @@ ALTER TABLE replication_metadata RESET 
(user_catalog_table);
  options  | text[]  |           |          |                                   
               | extended |              | 
 Indexes:
     "replication_metadata_pkey" PRIMARY KEY, btree (id)
+    "replication_metadata_idx1" btree (relation)
+    "replication_metadata_idx2" btree (relation)
+
+SELECT bool_or(indisusercatalog) from pg_index where indrelid = 
'replication_metadata'::regclass;
+ bool_or 
+---------
+ f
+(1 row)
 
 INSERT INTO replication_metadata(relation, options)
 VALUES ('bar', ARRAY['a', 'b']);
 ALTER TABLE replication_metadata SET (user_catalog_table = true);
+CREATE INDEX replication_metadata_idx3 on replication_metadata(relation);
 \d+ replication_metadata
                                                  Table 
"public.replication_metadata"
   Column  |  Type   | Collation | Nullable |                     Default       
               | Storage  | Stats target | Description 
@@ -519,15 +537,52 @@ ALTER TABLE replication_metadata SET (user_catalog_table 
= true);
  options  | text[]  |           |          |                                   
               | extended |              | 
 Indexes:
     "replication_metadata_pkey" PRIMARY KEY, btree (id)
+    "replication_metadata_idx1" btree (relation)
+    "replication_metadata_idx2" btree (relation)
+    "replication_metadata_idx3" btree (relation)
 Options: user_catalog_table=true
 
+SELECT bool_and(indisusercatalog) from pg_index where indrelid = 
'replication_metadata'::regclass;
+ bool_and 
+----------
+ t
+(1 row)
+
 INSERT INTO replication_metadata(relation, options)
 VALUES ('blub', NULL);
+-- Also checking that indisusercatalog is set correctly when a table is 
created with user_catalog_table = false
+CREATE TABLE replication_metadata_false (
+    id serial primary key,
+    relation name NOT NULL,
+    options text[]
+)
+WITH (user_catalog_table = false)
+;
+CREATE INDEX replication_metadata_false_idx1 on 
replication_metadata_false(relation);
+\d+ replication_metadata_false
+                                                 Table 
"public.replication_metadata_false"
+  Column  |  Type   | Collation | Nullable |                        Default    
                     | Storage  | Stats target | Description 
+----------+---------+-----------+----------+--------------------------------------------------------+----------+--------------+-------------
+ id       | integer |           | not null | 
nextval('replication_metadata_false_id_seq'::regclass) | plain    |             
 | 
+ relation | name    |           | not null |                                   
                     | plain    |              | 
+ options  | text[]  |           |          |                                   
                     | extended |              | 
+Indexes:
+    "replication_metadata_false_pkey" PRIMARY KEY, btree (id)
+    "replication_metadata_false_idx1" btree (relation)
+Options: user_catalog_table=false
+
+SELECT bool_or(indisusercatalog) from pg_index where indrelid = 
'replication_metadata_false'::regclass;
+ bool_or 
+---------
+ f
+(1 row)
+
 -- make sure rewrites don't work
 ALTER TABLE replication_metadata ADD COLUMN rewritemeornot int;
 ALTER TABLE replication_metadata ALTER COLUMN rewritemeornot TYPE text;
 ERROR:  cannot rewrite table "replication_metadata" used as a catalog table
 ALTER TABLE replication_metadata SET (user_catalog_table = false);
+CREATE INDEX replication_metadata_idx4 on replication_metadata(relation);
 \d+ replication_metadata
                                                     Table 
"public.replication_metadata"
      Column     |  Type   | Collation | Nullable |                     Default 
                     | Storage  | Stats target | Description 
@@ -538,8 +593,18 @@ ALTER TABLE replication_metadata SET (user_catalog_table = 
false);
  rewritemeornot | integer |           |          |                             
                     | plain    |              | 
 Indexes:
     "replication_metadata_pkey" PRIMARY KEY, btree (id)
+    "replication_metadata_idx1" btree (relation)
+    "replication_metadata_idx2" btree (relation)
+    "replication_metadata_idx3" btree (relation)
+    "replication_metadata_idx4" btree (relation)
 Options: user_catalog_table=false
 
+SELECT bool_or(indisusercatalog) from pg_index where indrelid = 
'replication_metadata'::regclass;
+ bool_or 
+---------
+ f
+(1 row)
+
 INSERT INTO replication_metadata(relation, options)
 VALUES ('zaphod', NULL);
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 
'include-xids', '0', 'skip-empty-xacts', '1');
diff --git a/contrib/test_decoding/sql/ddl.sql 
b/contrib/test_decoding/sql/ddl.sql
index 4f76bed72c..51baac5c4e 100644
--- a/contrib/test_decoding/sql/ddl.sql
+++ b/contrib/test_decoding/sql/ddl.sql
@@ -276,29 +276,52 @@ CREATE TABLE replication_metadata (
 )
 WITH (user_catalog_table = true)
 ;
+
+CREATE INDEX replication_metadata_idx1 on replication_metadata(relation);
+
 \d+ replication_metadata
+SELECT bool_and(indisusercatalog) from pg_index where indrelid = 
'replication_metadata'::regclass;
 
 INSERT INTO replication_metadata(relation, options)
 VALUES ('foo', ARRAY['a', 'b']);
 
 ALTER TABLE replication_metadata RESET (user_catalog_table);
+CREATE INDEX replication_metadata_idx2 on replication_metadata(relation);
 \d+ replication_metadata
+SELECT bool_or(indisusercatalog) from pg_index where indrelid = 
'replication_metadata'::regclass;
 
 INSERT INTO replication_metadata(relation, options)
 VALUES ('bar', ARRAY['a', 'b']);
 
 ALTER TABLE replication_metadata SET (user_catalog_table = true);
+CREATE INDEX replication_metadata_idx3 on replication_metadata(relation);
 \d+ replication_metadata
+SELECT bool_and(indisusercatalog) from pg_index where indrelid = 
'replication_metadata'::regclass;
 
 INSERT INTO replication_metadata(relation, options)
 VALUES ('blub', NULL);
 
+-- Also checking that indisusercatalog is set correctly when a table is 
created with user_catalog_table = false
+CREATE TABLE replication_metadata_false (
+    id serial primary key,
+    relation name NOT NULL,
+    options text[]
+)
+WITH (user_catalog_table = false)
+;
+
+CREATE INDEX replication_metadata_false_idx1 on 
replication_metadata_false(relation);
+\d+ replication_metadata_false
+SELECT bool_or(indisusercatalog) from pg_index where indrelid = 
'replication_metadata_false'::regclass;
+
 -- make sure rewrites don't work
 ALTER TABLE replication_metadata ADD COLUMN rewritemeornot int;
 ALTER TABLE replication_metadata ALTER COLUMN rewritemeornot TYPE text;
 
 ALTER TABLE replication_metadata SET (user_catalog_table = false);
+CREATE INDEX replication_metadata_idx4 on replication_metadata(relation);
 \d+ replication_metadata
+SELECT bool_or(indisusercatalog) from pg_index where indrelid = 
'replication_metadata'::regclass;
 
 INSERT INTO replication_metadata(relation, options)
 VALUES ('zaphod', NULL);
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 9316b811ac..459539b761 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -4447,6 +4447,17 @@ SCRAM-SHA-256$<replaceable>&lt;iteration 
count&gt;</replaceable>:<replaceable>&l
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>indisusercatalog</structfield> <type>bool</type>
+      </para>
+      <para>
+       If true, the index is linked to a table that is declared as an 
additional
+       catalog table for purposes of logical replication (means has <link 
linkend="sql-createtable"><literal>user_catalog_table</literal></link>)
+       set to true.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>indisreplident</structfield> <type>bool</type>
diff --git a/src/backend/access/common/reloptions.c 
b/src/backend/access/common/reloptions.c
index 75b7344891..4b41f5e68d 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -120,7 +120,7 @@ static relopt_bool boolRelOpts[] =
                        RELOPT_KIND_HEAP,
                        AccessExclusiveLock
                },
-               false
+               HEAP_DEFAULT_USER_CATALOG_TABLE
        },
        {
                {
diff --git a/src/backend/access/gist/gistxlog.c 
b/src/backend/access/gist/gistxlog.c
index cb5affa3d2..f47587b8f5 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -608,6 +608,7 @@ gistXLogPageReuse(Relation rel, BlockNumber blkno, 
FullTransactionId deleteXid)
         */
 
        /* XLOG stuff */
+       xlrec_reuse.isCatalogRel = RelationIsAccessibleInLogicalDecoding(rel);
        xlrec_reuse.locator = rel->rd_locator;
        xlrec_reuse.block = blkno;
        xlrec_reuse.snapshotConflictHorizon = deleteXid;
diff --git a/src/backend/access/hash/hashinsert.c 
b/src/backend/access/hash/hashinsert.c
index 9a921e341e..06c2659068 100644
--- a/src/backend/access/hash/hashinsert.c
+++ b/src/backend/access/hash/hashinsert.c
@@ -432,6 +432,7 @@ _hash_vacuum_one_page(Relation rel, Relation hrel, Buffer 
metabuf, Buffer buf)
                        xl_hash_vacuum_one_page xlrec;
                        XLogRecPtr      recptr;
 
+                       xlrec.isCatalogRel = 
RelationIsAccessibleInLogicalDecoding(hrel);
                        xlrec.snapshotConflictHorizon = snapshotConflictHorizon;
                        xlrec.ntuples = ndeletable;
 
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 42756a9e6d..1b344eace7 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -6827,6 +6827,7 @@ heap_freeze_execute_prepared(Relation rel, Buffer buffer,
                snapshotConflictHorizon = FreezeLimit;
                TransactionIdRetreat(snapshotConflictHorizon);
 
+               xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(rel);
                xlrec.snapshotConflictHorizon = snapshotConflictHorizon;
                xlrec.nplans = nplans;
 
@@ -8244,7 +8245,7 @@ bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate)
  * update the heap page's LSN.
  */
 XLogRecPtr
-log_heap_visible(RelFileLocator rlocator, Buffer heap_buffer, Buffer vm_buffer,
+log_heap_visible(Relation rel, Buffer heap_buffer, Buffer vm_buffer,
                                 TransactionId snapshotConflictHorizon, uint8 
vmflags)
 {
        xl_heap_visible xlrec;
@@ -8256,6 +8257,8 @@ log_heap_visible(RelFileLocator rlocator, Buffer 
heap_buffer, Buffer vm_buffer,
 
        xlrec.snapshotConflictHorizon = snapshotConflictHorizon;
        xlrec.flags = vmflags;
+       if (RelationIsAccessibleInLogicalDecoding(rel))
+               xlrec.flags |= 
VISIBILITYMAP_ON_CATALOG_ACCESSIBLE_IN_LOGICAL_DECODING;
        XLogBeginInsert();
        XLogRegisterData((char *) &xlrec, SizeOfHeapVisible);
 
diff --git a/src/backend/access/heap/pruneheap.c 
b/src/backend/access/heap/pruneheap.c
index 91c5f5e9ef..184e5123af 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -418,6 +418,7 @@ heap_page_prune(Relation relation, Buffer buffer,
                        xl_heap_prune xlrec;
                        XLogRecPtr      recptr;
 
+                       xlrec.isCatalogRel = 
RelationIsAccessibleInLogicalDecoding(relation);
                        xlrec.snapshotConflictHorizon = 
prstate.snapshotConflictHorizon;
                        xlrec.nredirected = prstate.nredirected;
                        xlrec.ndead = prstate.ndead;
diff --git a/src/backend/access/heap/visibilitymap.c 
b/src/backend/access/heap/visibilitymap.c
index 4ed70275e2..0bd73f4d9f 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -283,8 +283,7 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer 
heapBuf,
                        if (XLogRecPtrIsInvalid(recptr))
                        {
                                Assert(!InRecovery);
-                               recptr = log_heap_visible(rel->rd_locator, 
heapBuf, vmBuf,
-                                                                               
  cutoff_xid, flags);
+                               recptr = log_heap_visible(rel, heapBuf, vmBuf, 
cutoff_xid, flags);
 
                                /*
                                 * If data checksums are enabled (or 
wal_log_hints=on), we
diff --git a/src/backend/access/nbtree/nbtpage.c 
b/src/backend/access/nbtree/nbtpage.c
index 65aa44893c..426a5df4fb 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -836,6 +836,7 @@ _bt_log_reuse_page(Relation rel, BlockNumber blkno, 
FullTransactionId safexid)
         */
 
        /* XLOG stuff */
+       xlrec_reuse.isCatalogRel = RelationIsAccessibleInLogicalDecoding(rel);
        xlrec_reuse.locator = rel->rd_locator;
        xlrec_reuse.block = blkno;
        xlrec_reuse.snapshotConflictHorizon = safexid;
@@ -1358,6 +1359,7 @@ _bt_delitems_delete(Relation rel, Buffer buf,
                XLogRecPtr      recptr;
                xl_btree_delete xlrec_delete;
 
+               xlrec_delete.isCatalogRel = 
RelationIsAccessibleInLogicalDecoding(rel);
                xlrec_delete.snapshotConflictHorizon = snapshotConflictHorizon;
                xlrec_delete.ndeleted = ndeletable;
                xlrec_delete.nupdated = nupdatable;
diff --git a/src/backend/access/spgist/spgvacuum.c 
b/src/backend/access/spgist/spgvacuum.c
index ad90b213b9..2e62e3fa3b 100644
--- a/src/backend/access/spgist/spgvacuum.c
+++ b/src/backend/access/spgist/spgvacuum.c
@@ -503,6 +503,7 @@ vacuumRedirectAndPlaceholder(Relation index, Buffer buffer)
        spgxlogVacuumRedirect xlrec;
        GlobalVisState *vistest;
 
+       xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(index);
        xlrec.nToPlaceholder = 0;
        xlrec.snapshotConflictHorizon = InvalidTransactionId;
 
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 61f1d3926a..f7540f4101 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -123,7 +123,8 @@ static void UpdateIndexRelation(Oid indexoid, Oid heapoid,
                                                                bool 
isexclusion,
                                                                bool immediate,
                                                                bool isvalid,
-                                                               bool isready);
+                                                               bool isready,
+                                                               bool 
is_user_catalog);
 static void index_update_stats(Relation rel,
                                                           bool hasindex,
                                                           double reltuples);
@@ -545,7 +546,8 @@ UpdateIndexRelation(Oid indexoid,
                                        bool isexclusion,
                                        bool immediate,
                                        bool isvalid,
-                                       bool isready)
+                                       bool isready,
+                                       bool is_user_catalog)
 {
        int2vector *indkey;
        oidvector  *indcollation;
@@ -622,6 +624,7 @@ UpdateIndexRelation(Oid indexoid,
        values[Anum_pg_index_indcheckxmin - 1] = BoolGetDatum(false);
        values[Anum_pg_index_indisready - 1] = BoolGetDatum(isready);
        values[Anum_pg_index_indislive - 1] = BoolGetDatum(true);
+       values[Anum_pg_index_indisusercatalog - 1] = 
BoolGetDatum(is_user_catalog);
        values[Anum_pg_index_indisreplident - 1] = BoolGetDatum(false);
        values[Anum_pg_index_indkey - 1] = PointerGetDatum(indkey);
        values[Anum_pg_index_indcollation - 1] = PointerGetDatum(indcollation);
@@ -1020,7 +1023,8 @@ index_create(Relation heapRelation,
                                                isprimary, is_exclusion,
                                                (constr_flags & 
INDEX_CONSTR_CREATE_DEFERRABLE) == 0,
                                                !concurrent && !invalid,
-                                               !concurrent);
+                                               !concurrent,
+                                               
RelationIsUsedAsCatalogTable(heapRelation));
 
        /*
         * Register relcache invalidation on the indexes' heap relation, to
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 56dc995713..fd8200e670 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -103,6 +103,7 @@
 #include "utils/syscache.h"
 #include "utils/timestamp.h"
 #include "utils/typcache.h"
+#include "utils/rel.h"
 
 /*
  * ON COMMIT action list
@@ -14148,6 +14149,10 @@ ATExecSetRelOptions(Relation rel, List *defList, 
AlterTableType operation,
        Datum           repl_val[Natts_pg_class];
        bool            repl_null[Natts_pg_class];
        bool            repl_repl[Natts_pg_class];
+       ListCell   *cell;
+       List       *rel_options;
+       bool            catalog_table_val = HEAP_DEFAULT_USER_CATALOG_TABLE;
+       bool            catalog_table = false;
        static char *validnsps[] = HEAP_RELOPT_NAMESPACES;
 
        if (defList == NIL && operation != AT_ReplaceRelOptions)
@@ -14214,7 +14219,6 @@ ATExecSetRelOptions(Relation rel, List *defList, 
AlterTableType operation,
        {
                Query      *view_query = get_view_query(rel);
                List       *view_options = untransformRelOptions(newOptions);
-               ListCell   *cell;
                bool            check_option = false;
 
                foreach(cell, view_options)
@@ -14242,6 +14246,20 @@ ATExecSetRelOptions(Relation rel, List *defList, 
AlterTableType operation,
                }
        }
 
+       /* If user_catalog_table is part of the new options, record its new 
value */
+       rel_options = untransformRelOptions(newOptions);
+
+       foreach(cell, rel_options)
+       {
+               DefElem    *defel = (DefElem *) lfirst(cell);
+
+               if (strcmp(defel->defname, "user_catalog_table") == 0)
+               {
+                       catalog_table = true;
+                       catalog_table_val = defGetBoolean(defel);
+               }
+       }
+
        /*
         * All we need do here is update the pg_class row; the new options will 
be
         * propagated into relcaches during post-commit cache inval.
@@ -14268,6 +14286,41 @@ ATExecSetRelOptions(Relation rel, List *defList, 
AlterTableType operation,
 
        ReleaseSysCache(tuple);
 
+       /* Update the indexes if there is a need to */
+       if (catalog_table || operation == AT_ResetRelOptions)
+       {
+               Relation        pg_index;
+               HeapTuple       pg_index_tuple;
+               Form_pg_index pg_index_form;
+               ListCell   *index;
+
+               pg_index = table_open(IndexRelationId, RowExclusiveLock);
+
+               foreach(index, RelationGetIndexList(rel))
+               {
+                       Oid                     thisIndexOid = 
lfirst_oid(index);
+
+                       pg_index_tuple = SearchSysCacheCopy1(INDEXRELID,
+                                                                               
                 ObjectIdGetDatum(thisIndexOid));
+                       if (!HeapTupleIsValid(pg_index_tuple))
+                               elog(ERROR, "cache lookup failed for index %u", 
thisIndexOid);
+                       pg_index_form = (Form_pg_index) 
GETSTRUCT(pg_index_tuple);
+
+                       /* Modify the index only if user_catalog_table differ */
+                       if (catalog_table_val != 
pg_index_form->indisusercatalog)
+                       {
+                               pg_index_form->indisusercatalog = 
catalog_table_val;
+                               CatalogTupleUpdate(pg_index, 
&pg_index_tuple->t_self, pg_index_tuple);
+                               InvokeObjectPostAlterHookArg(IndexRelationId, 
thisIndexOid, 0,
+                                                                               
         InvalidOid, true);
+                       }
+
+                       heap_freetuple(pg_index_tuple);
+               }
+
+               table_close(pg_index, RowExclusiveLock);
+       }
+
        /* repeat the whole exercise for the toast table, if there's one */
        if (OidIsValid(rel->rd_rel->reltoastrelid))
        {
diff --git a/src/include/access/gistxlog.h b/src/include/access/gistxlog.h
index 33f1c7e31b..2d293fc8f4 100644
--- a/src/include/access/gistxlog.h
+++ b/src/include/access/gistxlog.h
@@ -51,13 +51,14 @@ typedef struct gistxlogDelete
 {
        TransactionId snapshotConflictHorizon;
        uint16          ntodelete;              /* number of deleted offsets */
+       bool        isCatalogRel;
 
        /*
         * In payload of blk 0 : todelete OffsetNumbers
         */
 } gistxlogDelete;
 
-#define SizeOfGistxlogDelete   (offsetof(gistxlogDelete, ntodelete) + 
sizeof(uint16))
+#define SizeOfGistxlogDelete   (offsetof(gistxlogDelete, isCatalogRel) + 
sizeof(bool))
 
 /*
  * Backup Blk 0: If this operation completes a page split, by inserting a
@@ -100,9 +101,10 @@ typedef struct gistxlogPageReuse
        RelFileLocator locator;
        BlockNumber block;
        FullTransactionId snapshotConflictHorizon;
+       bool        isCatalogRel;
 } gistxlogPageReuse;
 
-#define SizeOfGistxlogPageReuse        (offsetof(gistxlogPageReuse, 
snapshotConflictHorizon) + sizeof(FullTransactionId))
+#define SizeOfGistxlogPageReuse        (offsetof(gistxlogPageReuse, 
isCatalogRel) + sizeof(bool))
 
 extern void gist_redo(XLogReaderState *record);
 extern void gist_desc(StringInfo buf, XLogReaderState *record);
diff --git a/src/include/access/hash_xlog.h b/src/include/access/hash_xlog.h
index 6dafb4a598..1df1c626e5 100644
--- a/src/include/access/hash_xlog.h
+++ b/src/include/access/hash_xlog.h
@@ -252,12 +252,13 @@ typedef struct xl_hash_vacuum_one_page
 {
        TransactionId snapshotConflictHorizon;
        int                     ntuples;
+       bool        isCatalogRel;
 
        /* TARGET OFFSET NUMBERS FOLLOW AT THE END */
 } xl_hash_vacuum_one_page;
 
 #define SizeOfHashVacuumOnePage \
-       (offsetof(xl_hash_vacuum_one_page, ntuples) + sizeof(int))
+       (offsetof(xl_hash_vacuum_one_page, isCatalogRel) + sizeof(bool))
 
 extern void hash_redo(XLogReaderState *record);
 extern void hash_desc(StringInfo buf, XLogReaderState *record);
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 5c77290eec..68cacd532a 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -245,10 +245,11 @@ typedef struct xl_heap_prune
        TransactionId snapshotConflictHorizon;
        uint16          nredirected;
        uint16          ndead;
+       bool        isCatalogRel;
        /* OFFSET NUMBERS are in the block reference 0 */
 } xl_heap_prune;
 
-#define SizeOfHeapPrune (offsetof(xl_heap_prune, ndead) + sizeof(uint16))
+#define SizeOfHeapPrune (offsetof(xl_heap_prune, isCatalogRel) + sizeof(bool))
 
 /*
  * The vacuum page record is similar to the prune record, but can only mark
@@ -344,12 +345,13 @@ typedef struct xl_heap_freeze_page
 {
        TransactionId snapshotConflictHorizon;
        uint16          nplans;
+       bool        isCatalogRel;
 
        /* FREEZE PLANS FOLLOW */
        /* OFFSET NUMBER ARRAY FOLLOWS */
 } xl_heap_freeze_page;
 
-#define SizeOfHeapFreezePage (offsetof(xl_heap_freeze_page, nplans) + 
sizeof(uint16))
+#define SizeOfHeapFreezePage (offsetof(xl_heap_freeze_page, isCatalogRel) + 
sizeof(bool))
 
 /*
  * This is what we need to know about setting a visibility map bit
@@ -408,7 +410,7 @@ extern void heap2_desc(StringInfo buf, XLogReaderState 
*record);
 extern const char *heap2_identify(uint8 info);
 extern void heap_xlog_logical_rewrite(XLogReaderState *r);
 
-extern XLogRecPtr log_heap_visible(RelFileLocator rlocator, Buffer heap_buffer,
+extern XLogRecPtr log_heap_visible(Relation rel, Buffer heap_buffer,
                                                                   Buffer 
vm_buffer,
                                                                   
TransactionId snapshotConflictHorizon,
                                                                   uint8 
vmflags);
diff --git a/src/include/access/nbtxlog.h b/src/include/access/nbtxlog.h
index 3b2d959c69..fbeb9cfbe0 100644
--- a/src/include/access/nbtxlog.h
+++ b/src/include/access/nbtxlog.h
@@ -188,9 +188,10 @@ typedef struct xl_btree_reuse_page
        RelFileLocator locator;
        BlockNumber block;
        FullTransactionId snapshotConflictHorizon;
+       bool        isCatalogRel;
 } xl_btree_reuse_page;
 
-#define SizeOfBtreeReusePage   (sizeof(xl_btree_reuse_page))
+#define SizeOfBtreeReusePage   (offsetof(xl_btree_reuse_page, isCatalogRel) + 
sizeof(bool))
 
 /*
  * xl_btree_vacuum and xl_btree_delete records describe deletion of index
@@ -235,13 +236,14 @@ typedef struct xl_btree_delete
        TransactionId snapshotConflictHorizon;
        uint16          ndeleted;
        uint16          nupdated;
+       bool        isCatalogRel;
 
        /* DELETED TARGET OFFSET NUMBERS FOLLOW */
        /* UPDATED TARGET OFFSET NUMBERS FOLLOW */
        /* UPDATED TUPLES METADATA (xl_btree_update) ARRAY FOLLOWS */
 } xl_btree_delete;
 
-#define SizeOfBtreeDelete      (offsetof(xl_btree_delete, nupdated) + 
sizeof(uint16))
+#define SizeOfBtreeDelete      (offsetof(xl_btree_delete, isCatalogRel) + 
sizeof(bool))
 
 /*
  * The offsets that appear in xl_btree_update metadata are offsets into the
diff --git a/src/include/access/spgxlog.h b/src/include/access/spgxlog.h
index 82332cb694..2ec0931a12 100644
--- a/src/include/access/spgxlog.h
+++ b/src/include/access/spgxlog.h
@@ -240,6 +240,7 @@ typedef struct spgxlogVacuumRedirect
        uint16          nToPlaceholder; /* number of redirects to make 
placeholders */
        OffsetNumber firstPlaceholder;  /* first placeholder tuple to remove */
        TransactionId snapshotConflictHorizon;  /* newest XID of removed 
redirects */
+       bool        isCatalogRel;
 
        /* offsets of redirect tuples to make placeholders follow */
        OffsetNumber offsets[FLEXIBLE_ARRAY_MEMBER];
diff --git a/src/include/access/visibilitymapdefs.h 
b/src/include/access/visibilitymapdefs.h
index 2803ef5c07..6005df3278 100644
--- a/src/include/access/visibilitymapdefs.h
+++ b/src/include/access/visibilitymapdefs.h
@@ -17,9 +17,10 @@
 #define BITS_PER_HEAPBLOCK 2
 
 /* Flags for bit map */
-#define VISIBILITYMAP_ALL_VISIBLE      0x01
-#define VISIBILITYMAP_ALL_FROZEN       0x02
-#define VISIBILITYMAP_VALID_BITS       0x03    /* OR of all valid visibilitymap
-                                                                               
         * flags bits */
+#define VISIBILITYMAP_ALL_VISIBLE                                              
                0x01
+#define VISIBILITYMAP_ALL_FROZEN                                               
                0x02
+#define VISIBILITYMAP_VALID_BITS                                               
                0x03    /* OR of all valid visibilitymap
+                                                                               
                                                                 * flags bits */
+#define VISIBILITYMAP_ON_CATALOG_ACCESSIBLE_IN_LOGICAL_DECODING        0x04
 
 #endif                                                 /* VISIBILITYMAPDEFS_H 
*/
diff --git a/src/include/catalog/pg_index.h b/src/include/catalog/pg_index.h
index f853846ee1..dd16431378 100644
--- a/src/include/catalog/pg_index.h
+++ b/src/include/catalog/pg_index.h
@@ -43,6 +43,8 @@ CATALOG(pg_index,2610,IndexRelationId) BKI_SCHEMA_MACRO
        bool            indcheckxmin;   /* must we wait for xmin to be old? */
        bool            indisready;             /* is this index ready for 
inserts? */
        bool            indislive;              /* is this index alive at all? 
*/
+       bool            indisusercatalog;       /* is this index linked to a 
user catalog
+                                                                        * 
relation? */
        bool            indisreplident; /* is this index the identity for 
replication? */
 
        /* variable-length fields start here, but we allow direct access to 
indkey */
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index f383a2fca9..5d41ef6505 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -27,6 +27,7 @@
 #include "storage/smgr.h"
 #include "utils/relcache.h"
 #include "utils/reltrigger.h"
+#include "catalog/catalog.h"
 
 
 /*
@@ -343,6 +344,7 @@ typedef struct StdRdOptions
 
 #define HEAP_MIN_FILLFACTOR                    10
 #define HEAP_DEFAULT_FILLFACTOR                100
+#define HEAP_DEFAULT_USER_CATALOG_TABLE                false
 
 /*
  * RelationGetToastTupleTarget
@@ -385,6 +387,15 @@ typedef struct StdRdOptions
          (relation)->rd_rel->relkind == RELKIND_MATVIEW) ? \
         ((StdRdOptions *) (relation)->rd_options)->user_catalog_table : false)
 
+/*
+ * IndexIsLinkedToUserCatalogTable
+ *             Returns whether the relation should be treated as an index 
linked to
+ *             a user catalog table from the pov of logical decoding.
+ */
+#define IndexIsLinkedToUserCatalogTable(relation)      \
+       ((relation)->rd_rel->relkind == RELKIND_INDEX && \
+        (relation)->rd_index->indisusercatalog)
+
 /*
  * RelationGetParallelWorkers
  *             Returns the relation's parallel_workers reloption setting.
@@ -682,7 +693,8 @@ RelationCloseSmgr(Relation relation)
 #define RelationIsAccessibleInLogicalDecoding(relation) \
        (XLogLogicalInfoActive() && \
         RelationNeedsWAL(relation) && \
-        (IsCatalogRelation(relation) || 
RelationIsUsedAsCatalogTable(relation)))
+        (IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation) 
|| \
+         IndexIsLinkedToUserCatalogTable(relation)))
 
 /*
  * RelationIsLogicallyLogged
-- 
2.34.1

Reply via email to