
# Copyright (c) 2021-2022, PostgreSQL Global Development Group

# Basic logical replication test
use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;

# Initialize publisher node
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf(
	'postgresql.conf', qq(
max_locks_per_transaction=10000
log_min_messages=LOG
));
$node_publisher->start;

# Create subscriber node
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->start;

$node_publisher->safe_psql('postgres',
	"CREATE TABLE tab_full_pk (a int primary key, b text)");

# Create users and tables. These are not required to be replicated but will be
# used to craft a transaction which causes thousands of changes to the catalog
# tables to be logged. This will cause the WAL sender to spend tens of minutes
# decoding a transaction without sending a single change downstream.
my $max_roles = 500;
my @roles = ();
my $max_tables = 1000;
my @tables = ();
my $fallback_user = 'fb_role';
my $tabname_prefix = 'tab_pre';
my $rolname_prefix = 'rol_pre';
my $tabcount_query = "SELECT count(*) FROM pg_class WHERE relkind = 'r' and relname like '" . $tabname_prefix . "%'";
my $rolcount_query = "SELECT count(*) FROM pg_roles WHERE rolname like '" . $rolname_prefix . "%'";
$node_publisher->safe_psql('postgres', q[CREATE USER ] . $fallback_user);
foreach my $nrole (1..$max_roles)
{
	my $rolename = $rolname_prefix . $nrole;
	my $role_ddl = q[CREATE ROLE ] . $rolename . ' superuser';

	$node_publisher->safe_psql('postgres', $role_ddl);
	push @roles, $rolename;
}

foreach my $ntable (1..$max_tables)
{
	my $tabname = $tabname_prefix . $ntable;
	my $nrole = $ntable % $max_roles;
	my $role_set_cmd = 'SET SESSION AUTHORIZATION ' . $roles[$nrole];
	my $table_ddl = q[CREATE TABLE ] . $tabname . q[(a serial primary key, b int)];
	
	$node_publisher->safe_psql('postgres', $role_set_cmd . '; ' . $table_ddl);
	push @tables, $tabname;
}

# Setup structure on subscriber
$node_subscriber->safe_psql('postgres',
	"CREATE TABLE tab_full_pk (a int primary key, b text)");

# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub");
$node_publisher->safe_psql('postgres',
	"ALTER PUBLICATION tap_pub ADD TABLE tab_full_pk"
);

$node_subscriber->safe_psql('postgres',
	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
);

# Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');


$node_publisher->safe_psql('postgres',
	"INSERT INTO tab_full_pk VALUES (1, 'foo'), (2, 'baz')");

$node_publisher->wait_for_catchup('tap_sub');

my $result = $node_publisher->safe_psql('postgres', $tabcount_query);
is ($result, $max_tables, "expected number of tables created");
$result = $node_publisher->safe_psql('postgres', $rolcount_query);
is ($result, $max_roles, "expected number of users created");
# The tables and roles created on publisher should not get automatically
# replicated to subscriber
$result = $node_subscriber->safe_psql('postgres', $tabcount_query);
is ($result, 0, "expected number of tables replicated");
$result = $node_subscriber->safe_psql('postgres', $rolcount_query);
is ($result, 0, "expected number of users replicated");


# and do the updates
$node_publisher->safe_psql('postgres',
	"UPDATE tab_full_pk SET b = 'bar' WHERE a = 1");

$node_publisher->wait_for_catchup('tap_sub');

$result = $node_subscriber->safe_psql('postgres',
	"SELECT * FROM tab_full_pk ORDER BY a");
is( $result, qq(1|bar
2|baz),
	'update works with REPLICA IDENTITY FULL and a primary key');

my $users_remove_do_loop = q[
DO $this$
DECLARE
  fb_role name := '] . $fallback_user . "';" .
q[  user_to_remove name;
  users_to_remove name array := ] . "'{" . join(',', @roles) . "}';" .
q[  group_to_revoke name;
  groups_to_revoke name array := '{pg_read_all_stats}';
BEGIN
  FOREACH user_to_remove IN ARRAY users_to_remove LOOP
    IF NOT EXISTS(SELECT FROM pg_roles WHERE rolname = user_to_remove) THEN
      RAISE NOTICE 'User % does not exist, skipping', user_to_remove;
      CONTINUE;
    END IF;
    EXECUTE format('REVOKE ALL ON ALL TABLES IN SCHEMA public FROM %I', user_to_remove);
    EXECUTE format('REASSIGN OWNED BY %I TO %I', user_to_remove, fb_role);
    EXECUTE format('DROP USER %I', user_to_remove);
 END LOOP;
 RAISE NOTICE 'long running transaction id %', txid_current_if_assigned();
END;
$this$;
];

# Execute the script that touches thousands of rows of catalogs
$node_publisher->safe_psql('postgres', $users_remove_do_loop);

$node_publisher->wait_for_catchup('tap_sub');

# Drop all the tables so that the test doesn't timeout while checking for
# consistency of empty tables
my $role_set_cmd = 'SET SESSION AUTHORIZATION ' . $fallback_user;
my $table_ddl = 'DROP TABLE ' . join(',', @tables);
$node_publisher->safe_psql('postgres', $role_set_cmd . '; ' . $table_ddl);

$result = $node_publisher->safe_psql('postgres', $rolcount_query);
is ($result, 0, "expected number of users dropped");
$result = $node_publisher->safe_psql('postgres', $tabcount_query);
is ($result, 0, "expected number of tables dropped");

$node_publisher->wait_for_catchup('tap_sub');

done_testing();
