Hi, On 4/25/23 6:23 AM, Amit Kapila wrote:
On Mon, Apr 24, 2023 at 3:36 PM Drouvot, Bertrand <[email protected]> wrote:Without the second "pg_log_standby_snapshot()" then wait_for_subscription_sync() would be waiting some time on the poll for "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');" Adding a comment in V3 to explain the need for the second pg_log_standby_snapshot().Won't this still be unpredictable because it is possible that the tablesync worker may take more time to get launched or create a replication slot? If that happens after your second pg_log_standby_snapshot() then wait_for_subscription_sync() will behanging.
Oh right, that looks like a possible scenario.
Wouldn't it be better to create a subscription with (copy_data = false) to make it predictable and then we won't need pg_log_standby_snapshot() to be performed twice? If you agree with the above suggestion then you probably need to move wait_for_subscription_sync() before Insert.
I like that idea, thanks! Done in V4 attached. Not related to the above corner case, but while re-reading the patch I also added: " $node_primary->wait_for_replay_catchup($node_standby); " between the publication creation on the primary and the subscription to the standby (to ensure the publication gets replicated before we request for the subscription creation). Regards, -- Bertrand Drouvot PostgreSQL Contributors Team RDS Open Source Databases Amazon Web Services: https://aws.amazon.com
From ed92ea9bf3385d2fe9e4ef0d8a04b87b1c7e6b3d Mon Sep 17 00:00:00 2001 From: Bertrand Drouvot <[email protected]> Date: Tue, 25 Apr 2023 06:02:17 +0000 Subject: [PATCH v4 2/2] Add retained WAL test in 035_standby_logical_decoding.pl Adding one test, to verify that invalidated logical slots do not lead to retaining WAL. --- .../t/035_standby_logical_decoding.pl | 78 ++++++++++++++++++- 1 file changed, 76 insertions(+), 2 deletions(-) 100.0% src/test/recovery/t/ diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl index 03346f44f2..6ae4fc1e02 100644 --- a/src/test/recovery/t/035_standby_logical_decoding.pl +++ b/src/test/recovery/t/035_standby_logical_decoding.pl @@ -9,6 +9,7 @@ use warnings; use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +use Time::HiRes qw(usleep); my ($stdin, $stdout, $stderr, $cascading_stdout, $cascading_stderr, $subscriber_stdin, @@ -495,9 +496,82 @@ $node_standby->restart; check_slots_conflicting_status(1); ################################################## -# Verify that invalidated logical slots do not lead to retaining WAL +# Verify that invalidated logical slots do not lead to retaining WAL. ################################################## -# XXXXX TODO + +# Get the restart_lsn from an invalidated slot +my $restart_lsn = $node_standby->safe_psql('postgres', + "SELECT restart_lsn from pg_replication_slots WHERE slot_name = 'vacuum_full_activeslot' and conflicting is true;" +); + +chomp($restart_lsn); + +# Get the WAL file name associated to this lsn on the primary +my $walfile_name = $node_primary->safe_psql('postgres', + "SELECT pg_walfile_name('$restart_lsn')"); + +chomp($walfile_name); + +# Check the WAL file is still on the primary +ok(-f $node_primary->data_dir . '/pg_wal/' . $walfile_name, + "WAL file still on the primary"); + +# Get the number of WAL files on the standby +my $nb_standby_files = $node_standby->safe_psql('postgres', + "SELECT COUNT(*) FROM pg_ls_dir('pg_wal')"); + +chomp($nb_standby_files); + +# Switch WAL files on the primary +my @c = (1 .. $nb_standby_files); + +$node_primary->safe_psql('postgres', "create table retain_test(a int)"); + +for (@c) +{ + $node_primary->safe_psql( + 'postgres', "SELECT pg_switch_wal(); + insert into retain_test values(" + . $_ . ");"); +} + +# Ask for a checkpoint +$node_primary->safe_psql('postgres', 'checkpoint;'); + +# Check that the WAL file has not been retained on the primary +ok(!-f $node_primary->data_dir . '/pg_wal/' . $walfile_name, + "WAL file not on the primary anymore"); + +# Wait for the standby to catch up +$node_primary->wait_for_catchup($node_standby); + +# Generate another WAL switch, more activity and a checkpoint +$node_primary->safe_psql( + 'postgres', "SELECT pg_switch_wal(); + insert into retain_test values(1);"); +$node_primary->safe_psql('postgres', 'checkpoint;'); + +# Wait for the standby to catch up +$node_primary->wait_for_catchup($node_standby); + +# Verify that the wal file has not been retained on the standby +my $standby_walfile = $node_standby->data_dir . '/pg_wal/' . $walfile_name; + +# We can not test if the WAL file still exists immediately. +# We need to let some time to the standby to actually "remove" it. +my $i = 0; +while (1) +{ + last if !-f $standby_walfile; + if ($i++ == 10 * $default_timeout) + { + die + "could not determine if WAL file has been retained or not, can't continue"; + } + usleep(100_000); +} + +ok(1, "invalidated logical slots do not lead to retaining WAL"); ################################################## # Recovery conflict: Invalidate conflicting slots, including in-use slots -- 2.34.1
From 53507b322cd0af79b15cb62eff8e39422a8f98c6 Mon Sep 17 00:00:00 2001 From: Bertrand Drouvot <[email protected]> Date: Mon, 24 Apr 2023 05:13:23 +0000 Subject: [PATCH v4 1/2] Add subscribtion to the standby test in 035_standby_logical_decoding.pl Adding one test, to verify that subscribtion to the standby is possible. --- src/test/perl/PostgreSQL/Test/Cluster.pm | 11 ++- .../t/035_standby_logical_decoding.pl | 91 ++++++++++++++++++- 2 files changed, 99 insertions(+), 3 deletions(-) 7.5% src/test/perl/PostgreSQL/Test/ 92.4% src/test/recovery/t/ diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm index 6f7f4e5de4..819667d42a 100644 --- a/src/test/perl/PostgreSQL/Test/Cluster.pm +++ b/src/test/perl/PostgreSQL/Test/Cluster.pm @@ -2644,7 +2644,16 @@ sub wait_for_catchup } if (!defined($target_lsn)) { - $target_lsn = $self->lsn('write'); + my $isrecovery = $self->safe_psql('postgres', "SELECT pg_is_in_recovery()"); + chomp($isrecovery); + if ($isrecovery eq 't') + { + $target_lsn = $self->lsn('replay'); + } + else + { + $target_lsn = $self->lsn('write'); + } } print "Waiting for replication conn " . $standby_name . "'s " diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl index b8f5311fe9..03346f44f2 100644 --- a/src/test/recovery/t/035_standby_logical_decoding.pl +++ b/src/test/recovery/t/035_standby_logical_decoding.pl @@ -10,12 +10,17 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; -my ($stdin, $stdout, $stderr, $cascading_stdout, $cascading_stderr, $ret, $handle, $slot); +my ($stdin, $stdout, $stderr, + $cascading_stdout, $cascading_stderr, $subscriber_stdin, + $subscriber_stdout, $subscriber_stderr, $ret, + $handle, $slot); my $node_primary = PostgreSQL::Test::Cluster->new('primary'); my $node_standby = PostgreSQL::Test::Cluster->new('standby'); my $node_cascading_standby = PostgreSQL::Test::Cluster->new('cascading_standby'); +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); my $default_timeout = $PostgreSQL::Test::Utils::timeout_default; +my $psql_timeout = IPC::Run::timer(2 * $default_timeout); my $res; # Name for the physical slot on primary @@ -267,7 +272,8 @@ $node_standby->init_from_backup( has_streaming => 1, has_restoring => 1); $node_standby->append_conf('postgresql.conf', - qq[primary_slot_name = '$primary_slotname']); + qq[primary_slot_name = '$primary_slotname' + max_replication_slots = 5]); $node_standby->start; $node_primary->wait_for_replay_catchup($node_standby); $node_standby->safe_psql('testdb', qq[SELECT * FROM pg_create_physical_replication_slot('$standby_physical_slotname');]); @@ -285,6 +291,26 @@ $node_cascading_standby->append_conf('postgresql.conf', $node_cascading_standby->start; $node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary); +####################### +# Initialize subscriber node +####################### +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +my %psql_subscriber = ( + 'subscriber_stdin' => '', + 'subscriber_stdout' => '', + 'subscriber_stderr' => ''); +$psql_subscriber{run} = IPC::Run::start( + [ 'psql', '-XA', '-f', '-', '-d', $node_subscriber->connstr('postgres') ], + '<', + \$psql_subscriber{subscriber_stdin}, + '>', + \$psql_subscriber{subscriber_stdout}, + '2>', + \$psql_subscriber{subscriber_stderr}, + $psql_timeout); + ################################################## # Test that logical decoding on the standby # behaves correctly. @@ -365,6 +391,67 @@ is( $node_primary->psql( 3, 'replaying logical slot from another database fails'); +################################################## +# Test that we can subscribe on the standby with the publication +# created on the primary. +################################################## + +# Create a table on the primary +$node_primary->safe_psql('postgres', + "CREATE TABLE tab_rep (a int primary key)"); + +# Create a table (same structure) on the subscriber node +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rep (a int primary key)"); + +# Create a publication on the primary +$node_primary->safe_psql('postgres', + "CREATE PUBLICATION tap_pub for table tab_rep"); + +$node_primary->wait_for_replay_catchup($node_standby); + +# Subscribe on the standby +my $standby_connstr = $node_standby->connstr . ' dbname=postgres'; + +# Not using safe_psql() here as it would wait for activity on the primary +# and we wouldn't be able to launch pg_log_standby_snapshot() on the primary +# while waiting. +# psql_subscriber() allows to not wait synchronously. +$psql_subscriber{subscriber_stdin} .= + qq[CREATE SUBSCRIPTION tap_sub + CONNECTION '$standby_connstr' + PUBLICATION tap_pub + WITH (copy_data = off);]; +$psql_subscriber{subscriber_stdin} .= "\n"; + +$psql_subscriber{run}->pump_nb(); + +# Speed up the subscription creation +$node_primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot()"); + +# Explicitly shut down psql instance gracefully - to avoid hangs +# or worse on windows +$psql_subscriber{subscriber_stdin} .= "\\q\n"; +$psql_subscriber{run}->finish; + +$node_subscriber->wait_for_subscription_sync($node_standby, 'tap_sub'); + +# Insert some rows on the primary +$node_primary->safe_psql('postgres', + qq[INSERT INTO tab_rep select generate_series(1,10);]); + +$node_primary->wait_for_replay_catchup($node_standby); +$node_standby->wait_for_catchup('tap_sub'); + +# Check that the subscriber can see the rows inserted in the primary +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep"); +is($result, qq(10), 'check replicated inserts after subscription on standby'); + +# We do not need the subscription and the subscriber anymore +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); +$node_subscriber->stop; + ################################################## # Recovery conflict: Invalidate conflicting slots, including in-use slots # Scenario 1: hot_standby_feedback off and vacuum FULL -- 2.34.1
