Hi,

Currently, the main loop of apply worker looks like below[1]. Since there are
two loops, the inner loop will keep receiving and applying message from
publisher until no more message left. The worker only reloads the configuration 
in
the outer loop. This means if the publisher keeps sending messages (it could
keep sending multiple transactions), the apply worker won't get a chance to
update the GUCs.

[1]
for(;;) /* outer loop */
{
        for(;;) /* inner loop */
        {
                len = walrcv_receive()
                if (len == 0)
                        break;
                ...
                apply change
        }

        ...
        if (ConfigReloadPending)
        {
                ConfigReloadPending = false;
                ProcessConfigFile(PGC_SIGHUP);
        }
        ...
}

I think it would be better that the apply worker can reflect user's
configuration changes sooner. To achieve this, we can add one more
ProcessConfigFile() call in the inner loop. Attach the patch for the same. What
do you think ?

BTW, I saw one BF failure[2] (it's very rare and only happened once in 4
months) which I think is due to the low frequent reload in apply worker.

The attached tap test shows how the failure happened.

The test use streaming parallel mode and change logical_replication_mode to
immediate, we expect serialization to happen in the test. To reproduce the 
failure
easier, we need to add a sleep(1s) in the inner loop of apply worker so
that the apply worker won't be able to consume all messages quickly and will be
busy in the inner loop. Then the attached test will fail because the leader
apply didn't reload the configuration, thus serialization didn't happen.

[2] 
https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=mamba&dt=2023-05-12%2008%3A05%3A41

Best Regards,
Hou zj

Attachment: 0001-Reload-configuration-more-frequently-in-apply-worker.patch
Description: 0001-Reload-configuration-more-frequently-in-apply-worker.patch

# Copyright (c) 2021-2023, PostgreSQL Global Development Group

# Test streaming of simple large transaction
use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;

# Create publisher node
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf('postgresql.conf',
        'logical_decoding_work_mem = 64kB');
$node_publisher->append_conf('postgresql.conf',
                'logical_replication_mode = immediate');
$node_publisher->start;

# Create subscriber node
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->append_conf('postgresql.conf',
        "log_min_messages = DEBUG1");
$node_subscriber->start;

$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");

$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");

# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
        "CREATE PUBLICATION tap_pub FOR TABLE test_tab_2");

my $appname = 'tap_sub';

################################
# Test using streaming mode 'on'
################################
$node_subscriber->safe_psql('postgres',
        "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr 
application_name=$appname' PUBLICATION tap_pub WITH (streaming = parallel)"
);

# Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
$node_publisher->wait_for_catchup($appname);

sleep 3;

$node_publisher->safe_psql('postgres',
        "INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5) s(i)");

# Test serializing changes to files and notify the parallel apply worker to
# apply them at the end of the transaction.
$node_subscriber->append_conf('postgresql.conf',
        'logical_replication_mode = immediate');
# Reset the log_min_messages to default.
$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning");
$node_subscriber->reload;

# Run a query to make sure that the reload has taken effect.
$node_subscriber->safe_psql('postgres', q{SELECT 1});

my $offset = -s $node_subscriber->logfile;

$node_publisher->safe_psql('postgres',
        "INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5) s(i)");

# Ensure that the changes are serialized.
$node_subscriber->wait_for_log(
        qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize 
the remaining changes of remote transaction \d+ to a file/,
        $offset);

$node_subscriber->stop;
$node_publisher->stop;

done_testing();

Reply via email to