From 52b649b39343a179f4857189b209c6fb862bfe12 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Fri, 13 Mar 2026 12:37:25 +0800
Subject: [PATCH v12 2/3] Cache sequence information in the sequence sync
 worker

Previously, the sequence sync worker would fetch sequence metadata from
the catalog each time it needed to synchronize sequences. This could be
inefficient when many sequences are involved, as the worker would need
to repeatedly open and scan pg_subscription_rel.

To improve this, introduce a cache for sequence information in the sequence sync
worker. The cache is populated on first use and kept across synchronization
cycles. It is invalidated when pg_subscription_rel is modified, ensuring that
changes to subscription relations are reflected promptly.
---
 .../replication/logical/sequencesync.c        | 93 +++++++++++++------
 1 file changed, 66 insertions(+), 27 deletions(-)

diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c
index f04c50f6cae..a4807ee8017 100644
--- a/src/backend/replication/logical/sequencesync.c
+++ b/src/backend/replication/logical/sequencesync.c
@@ -98,6 +98,19 @@ typedef enum CopySeqResult
 
 static MemoryContext SequenceSyncContext = NULL;
 
+/*
+ * Cached list of sequence information (LogicalRepSequenceInfo) for the current
+ * subscription. The cache is invalidated when pg_subscription_rel is modified.
+ *
+ * Note: To avoid the cost of searching for a specific sequence on relcache
+ * invalidation, we do not invalidate the cache immediately when a sequence is
+ * altered (e.g., renamed or moved to another namespace). Instead, we validate
+ * the sequence name and namespace when next attempting to sync it, at which
+ * point we verify the local sequence state.
+ */
+static List *sequence_infos = NIL;
+static bool sequence_infos_valid = false;
+
 /*
  * Apply worker determines whether a sequence sync worker is needed.
  *
@@ -499,6 +512,9 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos, bool update_lsn)
 
 #define MAX_SEQUENCES_SYNC_PER_BATCH 100
 
+	if (seqinfos == NIL)
+		return false;
+
 	while (cur_batch_base_index < n_seqinfos)
 	{
 		Oid			seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID,
@@ -724,24 +740,44 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos, bool update_lsn)
 }
 
 /*
- * Identifies sequences that require synchronization and initiates the
- * synchronization process.
+ * Callback from syscache invalidation.
+ */
+static void
+invalidate_syncing_sequence_infos(Datum arg, SysCacheIdentifier cacheid,
+								  uint32 hashvalue)
+{
+	sequence_infos_valid = false;
+}
+
+/*
+ * Get the list of sequence information for the current subscription.
  *
- * Returns true if sequences have been updated.
+ * Return cached sequence states if valid; otherwise fetches them from the
+ * catalog, caches the result, and return it.
  */
-static bool
-LogicalRepSyncSequences(WalReceiverConn *conn, bool update_lsn)
+static List *
+fetch_sequence_infos(void)
 {
 	Relation	rel;
 	HeapTuple	tup;
 	ScanKeyData skey[1];
 	SysScanDesc scan;
 	Oid			subid = MyLogicalRepWorker->subid;
-	bool		sequence_copied = false;
-	List	   *seqinfos = NIL;
-	MemoryContext oldctx;
+	List	   *tmp_seqinfos = NIL;
+
+	if (sequence_infos_valid)
+		return sequence_infos;
 
-	Assert(SequenceSyncContext);
+	/* Free the existing invalid cache entries */
+	foreach_ptr(LogicalRepSequenceInfo, seqinfo, sequence_infos)
+	{
+		pfree(seqinfo->nspname);
+		pfree(seqinfo->seqname);
+		pfree(seqinfo);
+	}
+
+	list_free(sequence_infos);
+	sequence_infos = NIL;
 
 	StartTransactionCommand();
 
@@ -762,6 +798,7 @@ LogicalRepSyncSequences(WalReceiverConn *conn, bool update_lsn)
 		LogicalRepSequenceInfo *seq;
 		Relation	sequence_rel;
 		char		relstate;
+		MemoryContext oldctx;
 
 		CHECK_FOR_INTERRUPTS();
 
@@ -784,17 +821,14 @@ LogicalRepSyncSequences(WalReceiverConn *conn, bool update_lsn)
 
 		Assert(relstate == SUBREL_STATE_INIT || relstate == SUBREL_STATE_READY);
 
-		/*
-		 * Worker needs to process sequences across transaction boundary, so
-		 * allocate them under SequenceSyncContext.
-		 */
-		oldctx = MemoryContextSwitchTo(SequenceSyncContext);
+		/* Cache the information in a permanent memory context */
+		oldctx = MemoryContextSwitchTo(CacheMemoryContext);
 		seq = palloc0_object(LogicalRepSequenceInfo);
 		seq->localrelid = subrel->srrelid;
 		seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
 		seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
 		seq->relstate = relstate;
-		seqinfos = lappend(seqinfos, seq);
+		tmp_seqinfos = lappend(tmp_seqinfos, seq);
 		MemoryContextSwitchTo(oldctx);
 
 		table_close(sequence_rel, NoLock);
@@ -804,18 +838,12 @@ LogicalRepSyncSequences(WalReceiverConn *conn, bool update_lsn)
 	systable_endscan(scan);
 	table_close(rel, AccessShareLock);
 
-	CommitTransactionCommand();
-
-	/*
-	 * Exit early if no catalog entries found, likely due to concurrent drops.
-	 */
-	if (!seqinfos)
-		return false;
+	sequence_infos = tmp_seqinfos;
+	sequence_infos_valid = true;
 
-	/* Process sequences */
-	sequence_copied = copy_sequences(conn, seqinfos, update_lsn);
+	CommitTransactionCommand();
 
-	return sequence_copied;
+	return sequence_infos;
 }
 
 /*
@@ -830,6 +858,14 @@ start_sequence_sync(void)
 {
 	Assert(am_sequencesync_worker());
 
+	/*
+	 * Setup callback for syscache so that we know when something changes in
+	 * the subscription relation state.
+	 */
+	CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
+								  invalidate_syncing_sequence_infos,
+								  (Datum) 0);
+
 	PG_TRY();
 	{
 		char	   *err;
@@ -875,6 +911,7 @@ start_sequence_sync(void)
 			bool		sequence_copied = false;
 			MemoryContext oldctx;
 			bool		update_lsn;
+			List	   *seqinfos;
 			TimestampTz now = GetCurrentTimestamp();
 
 			CHECK_FOR_INTERRUPTS();
@@ -906,8 +943,10 @@ start_sequence_sync(void)
 			/*
 			 * Synchronize all sequences (both READY and INIT states).
 			 */
-			sequence_copied = LogicalRepSyncSequences(LogRepWorkerWalRcvConn,
-													  update_lsn);
+			seqinfos = fetch_sequence_infos();
+
+			sequence_copied = copy_sequences(LogRepWorkerWalRcvConn, seqinfos,
+											 update_lsn);
 
 			MemoryContextReset(SequenceSyncContext);
 			MemoryContextSwitchTo(oldctx);
-- 
2.53.0.windows.2

