From 7a26e5a3a55b0bf32406dc314d208584a20430f9 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Wed, 30 Apr 2025 06:52:30 -0400
Subject: [PATCH v18 3/3] Tests for filtering unpublished changes

Since filtering is throttled, this patch removes throttling to be able to correctly test filtering.
---
 src/backend/replication/logical/reorderbuffer.c |  6 +-
 src/test/subscription/t/001_rep_changes.pl      | 89 +++++++++++++++++++++++++
 2 files changed, 94 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index cd2b052..bd12d77 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -378,7 +378,7 @@ static void ReorderBufferMemoryResetcallback(void *arg);
  * hash table search for each record, especially when most changes are not
  * filterable.
  */
-#define CHANGES_THRESHOLD_FOR_FILTER 100
+#define CHANGES_THRESHOLD_FOR_FILTER 0
 
 /*
  * Allocate a new ReorderBuffer and clean out any old serialized state from
@@ -5800,6 +5800,10 @@ ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
 		entry->filterable = false;
 		rb->try_to_filter_change = rb->filter_change(rb, entry->relid, change_type,
 												  true, &cache_valid);
+		if (rb->try_to_filter_change)
+			elog(DEBUG1,"Filtering change for relation \"%s\"",
+						RelationGetRelationName(relation));
+
 		RelationClose(relation);
 	}
 	else
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 916fdb4..1de1c0c 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -571,6 +571,95 @@ $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*) FROM pg_replication_origin");
 is($result, qq(0), 'check replication origin was dropped on subscriber');
 
+$node_publisher->stop('fast');
+
+# Additional tests for filtering of unpublished changes
+# Bump up log verbosity to DEBUG1 for confirmation logs
+$node_publisher->append_conf('postgresql.conf', "log_min_messages = debug1");
+$node_publisher->start;
+
+# Create new tables on publisher and subscriber
+$node_publisher->safe_psql('postgres',
+							"CREATE TABLE pub_table (id int primary key, data text);
+							 CREATE TABLE unpub_table (id int primary key, data text);
+							 CREATE TABLE insert_only_table (id int primary key, data text);
+							 CREATE TABLE delete_only_table (id int primary key, data text);");
+
+$node_subscriber->safe_psql('postgres',
+							"CREATE TABLE pub_table (id int primary key, data text);
+							 CREATE TABLE unpub_table (id int primary key, data text);
+							 CREATE TABLE insert_only_table (id int primary key, data text);
+							 CREATE TABLE delete_only_table (id int primary key, data text);");
+
+# Setup logical replication publications
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub_all FOR TABLE pub_table;
+	 CREATE PUBLICATION pub_insert_only FOR TABLE insert_only_table WITH (publish = insert);
+	 CREATE PUBLICATION pub_delete_only FOR TABLE delete_only_table WITH (publish = delete);");
+
+# Setup logical replication subscriptions
+$node_subscriber->safe_psql('postgres',
+"CREATE SUBSCRIPTION sub_all CONNECTION '$publisher_connstr' PUBLICATION pub_all;
+ CREATE SUBSCRIPTION sub_insert_only CONNECTION '$publisher_connstr' PUBLICATION pub_insert_only;
+ CREATE SUBSCRIPTION sub_delete_only CONNECTION '$publisher_connstr' PUBLICATION pub_delete_only;");
+
+# Wait for initial sync
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub_all');
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub_insert_only');
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub_delete_only');
+
+# Insert into an unpublished table (should not be replicated)
+$log_location = -s $node_publisher->logfile;
+$node_publisher->safe_psql('postgres', "INSERT INTO unpub_table VALUES (1, 'unpublished')");
+$node_publisher->wait_for_catchup('sub_all');
+
+$logfile = slurp_file($node_publisher->logfile, $log_location);
+ok($logfile =~ qr/Filtering INSERT/,
+	'unpublished INSERT is filtered');
+ok($logfile =~ qr/Filtering change for relation "unpub_table"/,
+	 'change for relation unpub_table is filtered');
+
+# Update, delete and insert tests for publication with restricted tables
+$log_location = -s $node_publisher->logfile;
+$node_publisher->safe_psql('postgres', "INSERT INTO insert_only_table VALUES (1, 'to be inserted')");
+$node_publisher->safe_psql('postgres', "UPDATE insert_only_table SET data = 'updated' WHERE id = 1");
+$node_publisher->wait_for_catchup('sub_all');
+
+$logfile = slurp_file($node_publisher->logfile, $log_location);
+ok($logfile =~ qr/Filtering UPDATE/,
+	'unpublished UPDATE is filtered');
+ok($logfile =~ qr/Filtering change for relation "insert_only_table"/,
+	'change for relation insert_only_table is filtered');
+
+$log_location = -s $node_publisher->logfile;
+$node_publisher->safe_psql('postgres', "DELETE FROM insert_only_table WHERE id = 1");
+$node_publisher->wait_for_catchup('sub_all');
+
+$logfile = slurp_file($node_publisher->logfile, $log_location);
+ok($logfile =~ qr/Filtering DELETE/,
+	'unpublished DELETE is filtered');
+
+$log_location = -s $node_publisher->logfile;
+$node_publisher->safe_psql('postgres', "INSERT INTO delete_only_table VALUES (1, 'to be deleted')");
+$node_publisher->wait_for_catchup('sub_all');
+
+$logfile = slurp_file($node_publisher->logfile, $log_location);
+ok($logfile =~ qr/Filtering INSERT/,
+	'unpublished INSERT is filtered');
+ok($logfile =~ qr/Filtering change for relation "delete_only_table"/,
+	'change for relation delete_only_table is filtered');
+
+#cleanup
+$node_subscriber->safe_psql('postgres',
+					"DROP SUBSCRIPTION sub_all;
+					 DROP SUBSCRIPTION sub_insert_only;
+					 DROP SUBSCRIPTION sub_delete_only;");
+
+$node_publisher->safe_psql('postgres',
+					"DROP PUBLICATION pub_all;
+					 DROP PUBLICATION pub_insert_only;
+					 DROP PUBLICATION pub_delete_only;");
+
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
 
-- 
1.8.3.1

