Hi, Currently, the main loop of apply worker looks like below[1]. Since there are two loops, the inner loop will keep receiving and applying message from publisher until no more message left. The worker only reloads the configuration in the outer loop. This means if the publisher keeps sending messages (it could keep sending multiple transactions), the apply worker won't get a chance to update the GUCs.
[1] for(;;) /* outer loop */ { for(;;) /* inner loop */ { len = walrcv_receive() if (len == 0) break; ... apply change } ... if (ConfigReloadPending) { ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); } ... } I think it would be better that the apply worker can reflect user's configuration changes sooner. To achieve this, we can add one more ProcessConfigFile() call in the inner loop. Attach the patch for the same. What do you think ? BTW, I saw one BF failure[2] (it's very rare and only happened once in 4 months) which I think is due to the low frequent reload in apply worker. The attached tap test shows how the failure happened. The test use streaming parallel mode and change logical_replication_mode to immediate, we expect serialization to happen in the test. To reproduce the failure easier, we need to add a sleep(1s) in the inner loop of apply worker so that the apply worker won't be able to consume all messages quickly and will be busy in the inner loop. Then the attached test will fail because the leader apply didn't reload the configuration, thus serialization didn't happen. [2] https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=mamba&dt=2023-05-12%2008%3A05%3A41 Best Regards, Hou zj
0001-Reload-configuration-more-frequently-in-apply-worker.patch
Description: 0001-Reload-configuration-more-frequently-in-apply-worker.patch
# Copyright (c) 2021-2023, PostgreSQL Global Development Group # Test streaming of simple large transaction use strict; use warnings; use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; # Create publisher node my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); $node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); $node_publisher->append_conf('postgresql.conf', 'logical_replication_mode = immediate'); $node_publisher->start; # Create subscriber node my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init(allows_streaming => 'logical'); $node_subscriber->append_conf('postgresql.conf', "log_min_messages = DEBUG1"); $node_subscriber->start; $node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)"); $node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)"); # Setup logical replication my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab_2"); my $appname = 'tap_sub'; ################################ # Test using streaming mode 'on' ################################ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = parallel)" ); # Wait for initial table sync to finish $node_subscriber->wait_for_subscription_sync($node_publisher, $appname); $node_publisher->wait_for_catchup($appname); sleep 3; $node_publisher->safe_psql('postgres', "INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5) s(i)"); # Test serializing changes to files and notify the parallel apply worker to # apply them at the end of the transaction. $node_subscriber->append_conf('postgresql.conf', 'logical_replication_mode = immediate'); # Reset the log_min_messages to default. $node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning"); $node_subscriber->reload; # Run a query to make sure that the reload has taken effect. $node_subscriber->safe_psql('postgres', q{SELECT 1}); my $offset = -s $node_subscriber->logfile; $node_publisher->safe_psql('postgres', "INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5) s(i)"); # Ensure that the changes are serialized. $node_subscriber->wait_for_log( qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/, $offset); $node_subscriber->stop; $node_publisher->stop; done_testing();