use strict;
use warnings FATAL => 'all';
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;

my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf(
	'postgresql.conf', q[
shared_buffers = 10GB
max_worker_processes = 32
max_parallel_maintenance_workers = 24
max_parallel_workers = 32
synchronous_commit = on
checkpoint_timeout = 1d
max_wal_size = 24GB
min_wal_size = 15GB
autovacuum = off
max_connections = 20
]);
$node_publisher->start;

my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init;
$node_subscriber->append_conf(
	'postgresql.conf', q[
shared_buffers = 10GB
max_worker_processes = 32
max_parallel_maintenance_workers = 24
max_parallel_workers = 32
synchronous_commit = on
checkpoint_timeout = 1d
max_wal_size = 24GB
min_wal_size = 15GB
autovacuum = off
]);
$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1");
$node_subscriber->start;

my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';

#my $start = time;
$node_publisher->safe_psql(
	'postgres', qq(
	CREATE TABLE tab_conc1(a int);
	CREATE TABLE tp(a int);
	CREATE PUBLICATION regress_pub2 for table tp;
	CREATE PUBLICATION regress_pub1 for table tab_conc1;
));

# create tables to increase the number of invalidations as each table is invalidated
my $tcount = 100;
foreach my $i (1 .. $tcount)
{
	$node_publisher->safe_psql('postgres', qq(CREATE TABLE t$i(a int);));
}

$node_subscriber->safe_psql(
	'postgres', qq(
	CREATE TABLE tp(a int);
    CREATE TABLE tab_conc1(a int);
	CREATE SUBSCRIPTION regress_sub1 CONNECTION '$publisher_connstr' PUBLICATION regress_pub1;
	CREATE SUBSCRIPTION regress_sub2 CONNECTION '$publisher_connstr' PUBLICATION regress_pub2;
));

$node_subscriber->wait_for_subscription_sync($node_publisher, 'regress_sub1');

my $psql_timeout_secs = 4 * $PostgreSQL::Test::Utils::timeout_default;
my $offset = -s $node_subscriber->logfile;


my $background_psql1 = $node_publisher->background_psql(
	'postgres',
	on_error_stop => 0,
	timeout => $psql_timeout_secs);
$background_psql1->set_query_timer_restart();

my $count = 100;
my $count2 = $count * 1;


$background_psql1->query_safe('BEGIN;');
foreach my $i (1 .. $count)
{
    $background_psql1->query_safe(qq[INSERT INTO tab_conc1 VALUES ($i);]);

	foreach my $j (1 .. $tcount)
	{
		$node_publisher->safe_psql('postgres', qq(INSERT INTO t$j VALUES ($i);));
	}

    if ($i % 2 == 1)
    {
        $node_publisher->safe_psql('postgres', 'ALTER PUBLICATION regress_pub2 DROP TABLE tp;');
    }
    else
    {
        $node_publisher->safe_psql('postgres', 'ALTER PUBLICATION regress_pub2 ADD TABLE tp;');
    }
}

$background_psql1->query_safe('COMMIT;');
$background_psql1->quit;

$node_publisher->wait_for_catchup('regress_sub1');
my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_conc1");
is( $result, qq($count2), 'check replicated update on subscriber');

$node_publisher->stop('fast');
$node_subscriber->stop('fast');

done_testing();
