From 7b5920dfde922bd12d3f957051f2afab7baf3b8b Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Fri, 7 Mar 2025 09:38:40 +0900
Subject: [PATCH] Avoid invalidating all entries when pg_namespace is modified

---
 src/backend/replication/pgoutput/pgoutput.c | 38 +++++++++++----------
 1 file changed, 20 insertions(+), 18 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 8357bf8b4c0..1f3de136798 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -182,6 +182,9 @@ typedef struct RelationSyncEntry
 	 * row filter expressions, column list, etc.
 	 */
 	MemoryContext entry_cxt;
+
+	/* Hash value of schema OID. Used for pg_namespace syscache callback */
+	uint32		schema_hashvalue;
 } RelationSyncEntry;
 
 /*
@@ -224,8 +227,7 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid,
 									LogicalDecodingContext *ctx,
 									RelationSyncEntry *relentry);
 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
-static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
-										  uint32 hashvalue);
+static void rel_sync_cache_schema_cb(Datum arg, int cacheid, uint32 hashvalue);
 static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
 											TransactionId xid);
 static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
@@ -1971,18 +1973,11 @@ init_rel_sync_cache(MemoryContext cachectx)
 	CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
 
 	/*
-	 * Flush all cache entries after a pg_namespace change, in case it was a
+	 * Flush related entries after a pg_namespace change, in case it was a
 	 * schema rename affecting a relation being replicated.
-	 *
-	 * XXX: It is not a good idea to invalidate all the relation entries in
-	 * RelationSyncCache on schema rename. We can optimize it to invalidate
-	 * only the required relations by either having a specific invalidation
-	 * message containing impacted relations or by having schema information
-	 * in each RelationSyncCache entry and using hashvalue of pg_namespace.oid
-	 * passed to the callback.
 	 */
 	CacheRegisterSyscacheCallback(NAMESPACEOID,
-								  rel_sync_cache_publication_cb,
+								  rel_sync_cache_schema_cb,
 								  (Datum) 0);
 
 	relation_callbacks_registered = true;
@@ -2054,6 +2049,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->publish_as_relid = InvalidOid;
 		entry->columns = NULL;
 		entry->attrmap = NULL;
+		entry->schema_hashvalue = 0;
 	}
 
 	/* Validate the entry */
@@ -2300,6 +2296,10 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		list_free(schemaPubids);
 		list_free(rel_publications);
 
+		entry->schema_hashvalue =
+			GetSysCacheHashValue1(NAMESPACEOID,
+								  ObjectIdGetDatum(RelationGetNamespace(relation)));
+
 		entry->replicate_valid = true;
 	}
 
@@ -2398,12 +2398,12 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 }
 
 /*
- * Publication relation/schema map syscache invalidation callback
+ * schema syscache invalidation callback
  *
- * Called for invalidations on pg_publication and pg_namespace.
+ * Called for invalidations on pg_namespace.
  */
-static void
-rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
+void
+rel_sync_cache_schema_cb(Datum arg, int cacheid, uint32 hashvalue)
 {
 	HASH_SEQ_STATUS status;
 	RelationSyncEntry *entry;
@@ -2417,13 +2417,15 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 		return;
 
 	/*
-	 * We have no easy way to identify which cache entries this invalidation
-	 * event might have affected, so just mark them all invalid.
+	 * Identify entries which belongs to the specified schema, and invalidate
+	 * it.
 	 */
 	hash_seq_init(&status, RelationSyncCache);
 	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
 	{
-		entry->replicate_valid = false;
+		/* hashvalue == 0 means a cache reset, must clear all state */
+		if (hashvalue == 0 || entry->schema_hashvalue == hashvalue)
+			entry->replicate_valid = false;
 	}
 }
 
-- 
2.43.5

