From d670d696ce3a264c1b566c44574cd8403bde079b Mon Sep 17 00:00:00 2001
From: bovenshi <bovenshi@tencent.com>
Date: Mon, 27 Feb 2023 14:59:32 +0800
Subject: [PATCH] Optimize invalidating cache in pgoutput and relfilenodeMap.

Before PG 14, we use to execute all the invalidations at each command end
as we had no way of knowing which invalidations happened before that command.
Due to this, transactions involving large amounts of DDLs use to take more
time and also lead to high CPU usage.

In this commit, we add flag to optimize pgoutput part and add hash table in
relfilenodeMap part. In this way walsender process can handle 'drop publication' statement more quickly.
---
 src/backend/replication/pgoutput/pgoutput.c   |  17 ++-
 src/backend/utils/cache/relfilenodemap.c      | 105 ++++++++++++++---
 .../subscription/t/014_large_publication.pl   | 108 ++++++++++++++++++
 3 files changed, 212 insertions(+), 18 deletions(-)
 create mode 100644 src/test/subscription/t/014_large_publication.pl

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index c31af57bd4..0d51df026d 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -94,6 +94,9 @@ typedef struct RelationSyncEntry
 /* Map used to remember which relation schemas we sent. */
 static HTAB *RelationSyncCache = NULL;
 
+/* Flag to mark whether we need another invalidatio. */
+static bool all_cache_valid = false;
+
 static void init_rel_sync_cache(MemoryContext decoding_context);
 static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
@@ -846,6 +849,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 				/* The new value is an ancestor, so let's keep it. */
 				publish_as_relid = pub_relid;
 				publish_ancestor_level = ancestor_level;
+				all_cache_valid = false;
 			}
 		}
 
@@ -930,10 +934,17 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 	/*
 	 * There is no way to find which entry in our cache the hash belongs to so
 	 * mark the whole cache as invalid.
+	 * 
+	 * We use all_cache_valid flag to get a better performance for the reason
+	 * that we can skip useless invalidation after first time.
 	 */
-	hash_seq_init(&status, RelationSyncCache);
-	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
-		entry->replicate_valid = false;
+	if (!all_cache_valid)
+	{ 
+		hash_seq_init(&status, RelationSyncCache);
+		while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
+			entry->replicate_valid = false;
+		all_cache_valid = true;
+	}
 }
 
 /*
diff --git a/src/backend/utils/cache/relfilenodemap.c b/src/backend/utils/cache/relfilenodemap.c
index 68b01ca68f..ff0535632f 100644
--- a/src/backend/utils/cache/relfilenodemap.c
+++ b/src/backend/utils/cache/relfilenodemap.c
@@ -32,6 +32,9 @@
 /* Hash table for information about each relfilenode <-> oid pair */
 static HTAB *RelfilenodeMapHash = NULL;
 
+/* Hash table for information about each oid <-> relfilenode pair */
+static HTAB *RelidMapHash = NULL;
+
 /* built first time through in InitializeRelfilenodeMap */
 static ScanKeyData relfilenode_skey[2];
 
@@ -47,6 +50,12 @@ typedef struct
 	Oid			relid;			/* pg_class.oid */
 } RelfilenodeMapEntry;
 
+typedef struct
+{
+	Oid			key;			/* pg_class.oid */
+	RelfilenodeMapKey relfilenode;
+} RelidMapEntry;
+
 /*
  * RelfilenodeMapInvalidateCallback
  *		Flush mapping entries when pg_class is updated in a relevant fashion.
@@ -54,30 +63,78 @@ typedef struct
 static void
 RelfilenodeMapInvalidateCallback(Datum arg, Oid relid)
 {
-	HASH_SEQ_STATUS status;
-	RelfilenodeMapEntry *entry;
-
 	/* callback only gets registered after creating the hash */
 	Assert(RelfilenodeMapHash != NULL);
-
-	hash_seq_init(&status, RelfilenodeMapHash);
-	while ((entry = (RelfilenodeMapEntry *) hash_seq_search(&status)) != NULL)
+	Assert(RelidMapHash != NULL);
+
+	/* 
+	 * Now we have two choices.
+	 *
+	 * 1. relid is invalid: Remove all elems in both RelfilenodeMapHash and RelidMapHash;
+	 * 
+	 * 2. relid isn't invalid: Find relfilenode with relid in RelidMapHash and remove it.
+	 *    Then use the finding relfilenode as the key to remove elem in RelfilenodeMapHash.
+	 */
+	if (relid != InvalidOid)
 	{
-		/*
-		 * If relid is InvalidOid, signaling a complete reset, we must remove
-		 * all entries, otherwise just remove the specific relation's entry.
-		 * Always remove negative cache entries.
-		 */
-		if (relid == InvalidOid ||	/* complete reset */
-			entry->relid == InvalidOid ||	/* negative cache entry */
-			entry->relid == relid)	/* individual flushed relation */
+		RelidMapEntry *relid_entry;
+		bool found;
+
+		relid_entry = (RelidMapEntry *) hash_search(RelidMapHash,
+													(void *) &relid,
+													HASH_FIND, &found);
+		if (found)
 		{
 			if (hash_search(RelfilenodeMapHash,
-							(void *) &entry->key,
+						    (void *) &relid_entry->relfilenode,
+						    HASH_REMOVE,
+						    NULL) == NULL)
+				elog(ERROR, "hash table corrupted");
+			
+			if (hash_search(RelidMapHash,
+						    (void *) &relid,
+						    HASH_REMOVE,
+						    NULL) == NULL)
+				elog(ERROR, "hash table corrupted");
+		}
+	}
+	else
+	{
+		HASH_SEQ_STATUS status;
+		RelfilenodeMapEntry *entry;
+		RelidMapEntry *relid_entry;
+
+		hash_seq_init(&status, RelfilenodeMapHash);
+		while ((entry = (RelfilenodeMapEntry *) hash_seq_search(&status)) != NULL)
+		{
+			/*
+			 * If relid is InvalidOid, signaling a complete reset, we must remove
+			 * all entries, otherwise just remove the specific relation's entry.
+			 * Always remove negative cache entries.
+			 */
+			if (relid == InvalidOid ||	/* complete reset */
+				entry->relid == InvalidOid ||	/* negative cache entry */
+				entry->relid == relid)	/* individual flushed relation */
+			{
+				if (hash_search(RelfilenodeMapHash,
+								(void *) &entry->key,
+								HASH_REMOVE,
+								NULL) == NULL)
+					elog(ERROR, "hash table corrupted");
+			}
+		}
+
+		/* Remove all elems in RelidMapHash. */
+		hash_seq_init(&status, RelidMapHash);
+		while ((relid_entry = (RelidMapEntry *) hash_seq_search(&status)) != NULL)
+		{
+			if (hash_search(RelidMapHash,
+							(void *) &relid_entry->key,
 							HASH_REMOVE,
 							NULL) == NULL)
 				elog(ERROR, "hash table corrupted");
 		}
+
 	}
 }
 
@@ -126,6 +183,15 @@ InitializeRelfilenodeMap(void)
 		hash_create("RelfilenodeMap cache", 64, &ctl,
 					HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 
+	MemSet(&ctl, 0, sizeof(ctl));
+	ctl.keysize = sizeof(Oid);
+	ctl.entrysize = sizeof(RelidMapEntry);
+	ctl.hcxt = CacheMemoryContext;
+
+	RelidMapHash =
+		hash_create("RelidMap cache", 64, &ctl,
+					HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
 	/* Watch for invalidation events. */
 	CacheRegisterRelcacheCallback(RelfilenodeMapInvalidateCallback,
 								  (Datum) 0);
@@ -148,6 +214,7 @@ RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
 	HeapTuple	ntp;
 	ScanKeyData skey[2];
 	Oid			relid;
+	RelidMapEntry *relid_entry;
 
 	if (RelfilenodeMapHash == NULL)
 		InitializeRelfilenodeMap();
@@ -243,5 +310,13 @@ RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
 		elog(ERROR, "corrupted hashtable");
 	entry->relid = relid;
 
+	/* 
+	 * Add relid <-> relfilenode pair into hash table.
+	 */
+	if (relid != InvalidOid)
+	{
+		relid_entry = hash_search(RelidMapHash, (void *) &relid, HASH_ENTER, &found);
+		relid_entry->relfilenode = key;
+	}
 	return relid;
 }
diff --git a/src/test/subscription/t/014_large_publication.pl b/src/test/subscription/t/014_large_publication.pl
new file mode 100644
index 0000000000..8b940d5ac5
--- /dev/null
+++ b/src/test/subscription/t/014_large_publication.pl
@@ -0,0 +1,108 @@
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More;
+use DBI;
+
+plan tests => 4;
+
+my $pub_table_num = 1000;
+my $total_table_num = 10000;
+
+# Initialize node_publisher node
+my $node_publisher = get_new_node('node_publisher');
+$node_publisher->init;
+$node_publisher->append_conf(
+	'postgresql.conf', qq(
+wal_level=logical
+max_connections=100
+max_wal_senders=10
+max_replication_slots=10
+log_min_messages = LOG
+log_error_verbosity = verbose
+log_statement = all
+));
+$node_publisher->start;
+
+my $result;
+
+# create table
+$node_publisher->safe_psql('postgres', "CREATE TABLE t(a int,b int);");
+$node_publisher->safe_psql('postgres', "INSERT INTO t VALUES(1,1);");
+$result = $node_publisher->safe_psql('postgres', "SELECT COUNT(*) FROM t;");
+is($result, qq(1), "create table and insert data success");
+
+# create publication
+my $node_publisher_port = $node_publisher->port;
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub_t FOR TABLE t;");
+
+# create subscription
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf(
+	'postgresql.conf', qq(
+wal_level=logical
+max_connections=100
+max_wal_senders=10
+max_replication_slots=10
+log_error_verbosity = verbose
+log_statement = all
+));
+$node_subscriber->start;
+
+$node_subscriber->safe_psql('postgres', "CREATE TABLE t(a int,b int);");
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_subscriber->safe_psql('postgres',
+"CREATE SUBSCRIPTION sub_t CONNECTION '$publisher_connstr' PUBLICATION pub_t;");
+
+$node_publisher->safe_psql('postgres',
+"INSERT INTO t VALUES(2,2);");
+
+sleep(1);
+$result = $node_subscriber->safe_psql('postgres',
+"SELECT COUNT(*) FROM t;");
+is($result, qq(2), "subscription test ok");
+
+# create large publication
+$node_publisher->safe_psql('postgres',
+"CREATE PUBLICATION pub_large;");
+for (my $i = 0; $i<$pub_table_num; $i = $i+1)
+{
+	my $table_name=qq(t_$i);
+
+	$node_publisher->safe_psql('postgres',
+	"CREATE TABLE $table_name(a int);");
+	$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION pub_large ADD TABLE ONLY $table_name;");
+}
+
+# add large number of relations into walsender hash table
+for (my $i = 0; $i<$total_table_num; $i = $i+1)
+{
+	my $table_name=qq(t_$i);
+
+	$node_publisher->safe_psql('postgres',
+	"CREATE TABLE IF NOT EXISTS $table_name(a int);
+	 INSERT INTO $table_name VALUES(1);");
+}
+
+# test subscription
+$node_publisher->safe_psql('postgres',
+"INSERT INTO t VALUES(1);");
+sleep(1);
+$result = $node_subscriber->safe_psql('postgres',
+"SELECT COUNT(*) FROM t;");
+is($result, qq(3), "subscription test ok");
+
+# drop publication and test again
+$node_publisher->safe_psql('postgres',
+"DROP PUBLICATION pub_large;");
+
+$node_publisher->safe_psql('postgres',
+"INSERT INTO t VALUES(1);");
+sleep(3);
+$result = $node_subscriber->safe_psql('postgres', 
+"SELECT COUNT(*) FROM t;");
+is($result, qq(4), "subscription test ok");
\ No newline at end of file
-- 
2.37.1

