use strict;
use warnings FATAL => 'all';
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;


# no. of concurrent txn
my $concurrent_txns = 1000;

# max_connections to be set
my $max_connections = $concurrent_txns + 100;

# setup publisher
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf(
	'postgresql.conf', q[
shared_buffers = 10GB
max_worker_processes = 32
max_parallel_maintenance_workers = 24
max_parallel_workers = 32
synchronous_commit = on
checkpoint_timeout = 1d
max_wal_size = 24GB
min_wal_size = 15GB
autovacuum = off
]);
$node_publisher->append_conf('postgresql.conf', "max_connections = $max_connections");
$node_publisher->start;

# setup subscriber
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init;
$node_subscriber->append_conf(
	'postgresql.conf', q[
shared_buffers = 10GB
max_worker_processes = 32
max_parallel_maintenance_workers = 24
max_parallel_workers = 32
synchronous_commit = on
checkpoint_timeout = 1d
max_wal_size = 24GB
min_wal_size = 15GB
autovacuum = off
]);
$node_subscriber->start;

# initial logical replication setup
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql(
	'postgres', qq(
	CREATE TABLE tab_conc1(a int);
	CREATE PUBLICATION regress_pub1 for table tab_conc1;
));
$node_subscriber->safe_psql(
	'postgres', qq(
    CREATE TABLE tab_conc1(a int);
	CREATE SUBSCRIPTION regress_sub1 CONNECTION '$publisher_connstr' PUBLICATION regress_pub1;
));
$node_subscriber->wait_for_subscription_sync($node_publisher, 'regress_sub1');

my $psql_timeout_secs = 4 * $PostgreSQL::Test::Utils::timeout_default;

my @background_psqls;

# create $concurrent_txns sessions
foreach my $i (1 .. $concurrent_txns)
{
    my $background_psql = $node_publisher->background_psql(
	'postgres',
	on_error_stop => 0,
	timeout => $psql_timeout_secs);

    push (@background_psqls, $background_psql);
}

# start time
my $start = Time::HiRes::gettimeofday();

# begin txn in each session
foreach my $background_psql (@background_psqls)
{
    $background_psql->query_safe('BEGIN;');
    $background_psql->query_safe(qq[INSERT INTO tab_conc1 VALUES (11);]);
}

# invalidate message is distributed to all $concurrent_txns transactions
$node_publisher->safe_psql('postgres', 'ALTER PUBLICATION regress_pub1 DROP TABLE tab_conc1;');
$node_publisher->safe_psql('postgres', 'ALTER PUBLICATION regress_pub1 ADD TABLE tab_conc1;');

# Insert will build cache for each txn as cache is invalidated for each txn in previous step
foreach my $background_psql (@background_psqls)
{
    $background_psql->query_safe(qq[INSERT INTO tab_conc1 VALUES (12);]);
}

# commit
foreach my $background_psql (@background_psqls)
{
    $background_psql->query_safe('COMMIT;');
}

# wait for sync and verify replication is successful
my $rows_inserted = 2 * $concurrent_txns;
$node_publisher->wait_for_catchup('regress_sub1');
my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_conc1");
is( $result, qq($rows_inserted), 'check replicated update on subscriber');

# end time
my $end = Time::HiRes::gettimeofday();

printf("time elapsed %f\n", $end - $start);

# cleanup
foreach my $background_psql (@background_psqls)
{
    $background_psql->quit;
}
$node_publisher->stop('fast');
$node_subscriber->stop('fast');

done_testing();
