From 7bbf7bc3278b3ad9e6071dca9eb78c8c6b80f4b0 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 4 Mar 2025 16:51:19 +0900
Subject: [PATCH v9 1/2] Introduce a new invalidation message to invalidate
 caches in output plugins

A new invalidation message is generated when either ALTER PUBLICATION RENAME TO
or is executed. The primal use-case of the message is to invalidate caches on
the logical decoding output plugin. Plugins can register callback functions for
the message via CacheRegisterRelSyncCallback(), and the function can invalidate
the cache for the specified relation.

A new invalidation message is transactional, and decoder processes should
recognize the message and invalidate specified caches. Thus, the messages are
stored in InvalMessageArray and serialized at the end of the transaction.
---
 src/backend/access/rmgrdesc/standbydesc.c |   2 +
 src/backend/utils/cache/inval.c           | 127 ++++++++++++++++++++++
 src/include/pg_config_manual.h            |   8 +-
 src/include/storage/sinval.h              |  11 ++
 src/include/utils/inval.h                 |  10 ++
 5 files changed, 154 insertions(+), 4 deletions(-)

diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index d849f8e54b..81eff5f31c 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -132,6 +132,8 @@ standby_desc_invalidations(StringInfo buf,
 			appendStringInfo(buf, " relmap db %u", msg->rm.dbId);
 		else if (msg->id == SHAREDINVALSNAPSHOT_ID)
 			appendStringInfo(buf, " snapshot %u", msg->sn.relId);
+		else if (msg->id == SHAREDINVALRELSYNC_ID)
+			appendStringInfo(buf, " relsync %u", msg->rs.relid);
 		else
 			appendStringInfo(buf, " unrecognized id %d", msg->id);
 	}
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 700ccb6df9..35df9be5e5 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -271,6 +271,7 @@ int			debug_discard_caches = 0;
 
 #define MAX_SYSCACHE_CALLBACKS 64
 #define MAX_RELCACHE_CALLBACKS 10
+#define MAX_RELSYNC_CALLBACKS 10
 
 static struct SYSCACHECALLBACK
 {
@@ -292,6 +293,15 @@ static struct RELCACHECALLBACK
 
 static int	relcache_callback_count = 0;
 
+static struct RELSYNCCALLBACK
+{
+	RelSyncCallbackFunction function;
+	Datum		arg;
+}			relsync_callback_list[MAX_RELSYNC_CALLBACKS];
+
+static int	relsync_callback_count = 0;
+
+
 /* ----------------------------------------------------------------
  *				Invalidation subgroup support functions
  * ----------------------------------------------------------------
@@ -484,6 +494,34 @@ AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group,
 	AddInvalidationMessage(group, RelCacheMsgs, &msg);
 }
 
+/*
+ * Add a relsync inval entry
+ *
+ * We put these into the relcache subgroup for simplicity.
+ */
+static void
+AddRelsyncInvalidationMessage(InvalidationMsgsGroup *group,
+							  Oid dbId, Oid relId)
+{
+	SharedInvalidationMessage msg;
+
+	/* Don't add a duplicate item */
+	/* We assume dbId need not be checked because it will never change */
+	ProcessMessageSubGroup(group, RelCacheMsgs,
+						   if (msg->rc.id == SHAREDINVALRELSYNC_ID &&
+							   msg->rc.relId == relId)
+						   return);
+
+	/* OK, add the item */
+	msg.rc.id = SHAREDINVALRELSYNC_ID;
+	msg.rc.dbId = dbId;
+	msg.rc.relId = relId;
+	/* check AddCatcacheInvalidationMessage() for an explanation */
+	VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
+
+	AddInvalidationMessage(group, RelCacheMsgs, &msg);
+}
+
 /*
  * Add a snapshot inval entry
  *
@@ -611,6 +649,18 @@ RegisterRelcacheInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
 		info->RelcacheInitFileInval = true;
 }
 
+/*
+ * RegisterRelcacheInvalidation
+ *
+ * As above, but register a relsync invalidation event.
+ */
+static void
+RegisterRelsyncInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
+{
+	AddRelsyncInvalidationMessage(&info->CurrentCmdInvalidMsgs, dbId, relId);
+}
+
+
 /*
  * RegisterSnapshotInvalidation
  *
@@ -751,6 +801,13 @@ InvalidateSystemCachesExtended(bool debug_discard)
 
 		ccitem->function(ccitem->arg, InvalidOid);
 	}
+
+	for (i = 0; i < relsync_callback_count; i++)
+	{
+		struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+		ccitem->function(ccitem->arg, InvalidOid);
+	}
 }
 
 /*
@@ -832,6 +889,12 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
 		else if (msg->sn.dbId == MyDatabaseId)
 			InvalidateCatalogSnapshot();
 	}
+	else if (msg->id == SHAREDINVALRELSYNC_ID)
+	{
+		/* We only care about our own database */
+		if (msg->rs.dbId == MyDatabaseId)
+			CallRelSyncCallbacks(msg->rs.relid);
+	}
 	else
 		elog(FATAL, "unrecognized SI message ID: %d", msg->id);
 }
@@ -1622,6 +1685,35 @@ CacheInvalidateRelcacheByRelid(Oid relid)
 }
 
 
+/*
+ * RelationCacheInvalidate
+ *		Register invalidation of the cache in logical decoding output plugin
+ *		for a database.
+ *
+ * This type of invalidation message is used for the specific purpose of output
+ * plugins. Processes which do not decode WALs would do nothing even when it
+ * receives the message.
+ */
+void
+CacheInvalidateRelSync(Oid relid)
+{
+	RegisterRelsyncInvalidation(PrepareInvalidationState(),
+								MyDatabaseId, relid);
+}
+
+
+/*
+ * CacheInvalidateRelSyncAll
+ *		Register invalidation of the whole cache in logical decoding output
+ *		plugin.
+ */
+void
+CacheInvalidateRelSyncAll(void)
+{
+	CacheInvalidateRelSync(InvalidOid);
+}
+
+
 /*
  * CacheInvalidateSmgr
  *		Register invalidation of smgr references to a physical relation.
@@ -1763,6 +1855,27 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 	++relcache_callback_count;
 }
 
+/*
+ * CacheRegisterRelSyncCallback
+ *		Register the specified function to be called for all future
+ *		decoding-cache invalidation events.
+ *
+ * This function is intended to be call from the logical decoding output
+ * plugins.
+ */
+void
+CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+							 Datum arg)
+{
+	if (relsync_callback_count >= MAX_RELSYNC_CALLBACKS)
+		elog(FATAL, "out of relsync_callback_list slots");
+
+	relsync_callback_list[relsync_callback_count].function = func;
+	relsync_callback_list[relsync_callback_count].arg = arg;
+
+	++relsync_callback_count;
+}
+
 /*
  * CallSyscacheCallbacks
  *
@@ -1788,6 +1901,20 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
 	}
 }
 
+/*
+ * CallSyscacheCallbacks
+ */
+void
+CallRelSyncCallbacks(Oid relid)
+{
+	for (int i = 0; i < relsync_callback_count; i++)
+	{
+		struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+		ccitem->function(ccitem->arg, relid);
+	}
+}
+
 /*
  * LogLogicalInvalidations
  *
diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h
index 449e50bd78..23308f1de1 100644
--- a/src/include/pg_config_manual.h
+++ b/src/include/pg_config_manual.h
@@ -282,10 +282,10 @@
 
 /*
  * For cache-invalidation debugging, define DISCARD_CACHES_ENABLED to enable
- * use of the debug_discard_caches GUC to aggressively flush syscache/relcache
- * entries whenever it's possible to deliver invalidations.  See
- * AcceptInvalidationMessages() in src/backend/utils/cache/inval.c for
- * details.
+ * use of the debug_discard_caches GUC to aggressively flush
+ * syscache/relcache/relsync cache entries whenever it's possible to deliver
+ * invalidations.  See AcceptInvalidationMessages() in
+ * src/backend/utils/cache/inval.c for details.
  *
  * USE_ASSERT_CHECKING builds default to enabling this.  It's possible to use
  * DISCARD_CACHES_ENABLED without a cassert build and the implied
diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h
index 2463c0f9fa..90a5af4ed8 100644
--- a/src/include/storage/sinval.h
+++ b/src/include/storage/sinval.h
@@ -27,6 +27,7 @@
  *	* invalidate an smgr cache entry for a specific physical relation
  *	* invalidate the mapped-relation mapping for a given database
  *	* invalidate any saved snapshot that might be used to scan a given relation
+ *	* invalidate a specific entry for specific output plugin
  * More types could be added if needed.  The message type is identified by
  * the first "int8" field of the message struct.  Zero or positive means a
  * specific-catcache inval message (and also serves as the catcache ID field).
@@ -110,6 +111,15 @@ typedef struct
 	Oid			relId;			/* relation ID */
 } SharedInvalSnapshotMsg;
 
+#define SHAREDINVALRELSYNC_ID	(-6)
+
+typedef struct
+{
+	int8		id;				/* type field --- must be first */
+	Oid			dbId;			/* database ID */
+	Oid			relid;			/* relation ID, or 0 if whole relcache */
+} SharedInvalRelSyncMsg;
+
 typedef union
 {
 	int8		id;				/* type field --- must be first */
@@ -119,6 +129,7 @@ typedef union
 	SharedInvalSmgrMsg sm;
 	SharedInvalRelmapMsg rm;
 	SharedInvalSnapshotMsg sn;
+	SharedInvalRelSyncMsg rs;
 } SharedInvalidationMessage;
 
 
diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h
index 40658ba2ff..9b871caef6 100644
--- a/src/include/utils/inval.h
+++ b/src/include/utils/inval.h
@@ -22,6 +22,7 @@ extern PGDLLIMPORT int debug_discard_caches;
 
 typedef void (*SyscacheCallbackFunction) (Datum arg, int cacheid, uint32 hashvalue);
 typedef void (*RelcacheCallbackFunction) (Datum arg, Oid relid);
+typedef void (*RelSyncCallbackFunction) (Datum arg, Oid relid);
 
 
 extern void AcceptInvalidationMessages(void);
@@ -55,6 +56,10 @@ extern void CacheInvalidateRelcacheByTuple(HeapTuple classTuple);
 
 extern void CacheInvalidateRelcacheByRelid(Oid relid);
 
+extern void CacheInvalidateRelSync(Oid relid);
+
+extern void CacheInvalidateRelSyncAll(void);
+
 extern void CacheInvalidateSmgr(RelFileLocatorBackend rlocator);
 
 extern void CacheInvalidateRelmap(Oid databaseId);
@@ -66,8 +71,13 @@ extern void CacheRegisterSyscacheCallback(int cacheid,
 extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 										  Datum arg);
 
+extern void CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+										 Datum arg);
+
 extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue);
 
+extern void CallRelSyncCallbacks(Oid relid);
+
 extern void InvalidateSystemCaches(void);
 extern void InvalidateSystemCachesExtended(bool debug_discard);
 
-- 
2.43.5

