From 25d17db1e61f9078dab37417ebb8e06d2a4ba766 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Sat, 28 Feb 2026 16:14:14 +0800
Subject: [PATCH v9 2/2] Synchronize sequences directly in REFRESH SEQUENCES
 command

The ALTER SUBSCRIPTION ... REFRESH SEQUENCES command currently sets all
sequence states in pg_subscription_rel to INIT and relies on the sequence sync
worker to perform the actual synchronization and update states to READY.

With the recent change making the sequence sync worker long-lived, most
sequences are now synchronized in the background, reducing the need for
REFRESH SEQUENCES. However, the command remains necessary for sequences that
haven't been synchronized.

This commit enhances REFRESH SEQUENCES to synchronize sequences directly within
the command itself, eliminating the overhead of launching a worker and updating
catalog entries unnecessarily.
---
 src/backend/commands/subscriptioncmds.c       |  17 +-
 .../replication/logical/sequencesync.c        | 165 ++++++++++++++----
 src/include/replication/logicalworker.h       |   5 +
 src/test/subscription/t/036_sequences.pl      |  49 ++++++
 4 files changed, 190 insertions(+), 46 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 5e3c0964d38..0a5acfda0ff 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1245,25 +1245,10 @@ AlterSubscription_refresh_seq(Subscription *sub)
 
 	PG_TRY();
 	{
-		List	   *subrel_states;
-
 		check_publications_origin_sequences(wrconn, sub->publications, true,
 											sub->origin, NULL, 0, sub->name);
 
-		/* Get local sequence list. */
-		subrel_states = GetSubscriptionRelations(sub->oid, false, true, false);
-		foreach_ptr(SubscriptionRelState, subrel, subrel_states)
-		{
-			Oid			relid = subrel->relid;
-
-			UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT,
-									   InvalidXLogRecPtr, false);
-			ereport(DEBUG1,
-					errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
-									get_namespace_name(get_rel_namespace(relid)),
-									get_rel_name(relid),
-									sub->name));
-		}
+		AlterSubSyncSequences(wrconn, sub->oid, sub->name, sub->runasowner);
 	}
 	PG_FINALLY();
 	{
diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c
index fad1bb548b2..b4081216477 100644
--- a/src/backend/replication/logical/sequencesync.c
+++ b/src/backend/replication/logical/sequencesync.c
@@ -209,7 +209,7 @@ get_sequences_string(List *seqindexes, List *seqinfos, StringInfo buf)
  */
 static void
 report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
-					   List *missing_seqs_idx, List *seqinfos)
+					   List *missing_seqs_idx, List *seqinfos, char *subname)
 {
 	StringInfo	seqstr;
 
@@ -255,7 +255,7 @@ report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
 	ereport(ERROR,
 			errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 			errmsg("logical replication sequence synchronization failed for subscription \"%s\"",
-				   MySubscription->name));
+				   subname));
 }
 
 /*
@@ -283,6 +283,7 @@ get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
 	HeapTuple	tup;
 	Form_pg_sequence local_seq;
 	LogicalRepSequenceInfo *seqinfo_local;
+	LOCKMODE	lockmode;
 
 	*seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull));
 	Assert(!isnull);
@@ -329,7 +330,20 @@ get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
 
 	seqinfo_local->found_on_pub = true;
 
-	*sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock);
+	/*
+	 * We take a stronger lock during DDL commands (currently only ALTER
+	 * SUBSCRIPTION ... REFRESH SEQUENCES) to prevent concurrent sequencesync
+	 * workers from updating the page_lsn while the DDL is also updating the
+	 * same sequence. This ensures we can always fetch the latest page_lsn to
+	 * determine whether the remote sequence value should be synchronized (see
+	 * validate_seqsync_state).
+	 */
+	if (IsLogicalWorker())
+		lockmode = RowExclusiveLock;
+	else
+		lockmode = ShareRowExclusiveLock;
+
+	*sequence_rel = try_table_open(seqinfo_local->localrelid, lockmode);
 
 	/* Sequence was concurrently dropped? */
 	if (!*sequence_rel)
@@ -367,7 +381,8 @@ get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
  * Preliminary check to determine if copying the sequence is allowed.
  */
 static CopySeqResult
-validate_seqsync_state(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel)
+validate_seqsync_state(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel,
+					   XLogRecPtr local_page_lsn)
 {
 	AclResult	aclresult;
 	Oid			seqoid = seqinfo->localrelid;
@@ -377,6 +392,16 @@ validate_seqsync_state(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel)
 	/* Perform drift check if it's not the initial sync */
 	if (seqinfo->relstate == SUBREL_STATE_READY)
 	{
+		/*
+		 * Skip synchronization if we are processing outdated sequence info
+		 * based on the LSN. This occurs when the sequence has been updated to
+		 * more recent data concurrently (via either ALTER SUBSCRIPTION ...
+		 * REFRESH SEQUENCES or the sequencesync worker).
+		 */
+		if (XLogRecPtrIsValid(local_page_lsn) &&
+			local_page_lsn > seqinfo->page_lsn)
+			return COPYSEQ_NO_DRIFT;
+
 		/*
 		 * Verify that the current user has SELECT privilege on the sequence.
 		 * This is required to read the sequence state below.
@@ -390,9 +415,32 @@ validate_seqsync_state(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel)
 		GetSequence(sequence_rel, &local_last_value, &local_is_called);
 
 		/*
-		 * Skip synchronization if the sequence is already in READY state and
-		 * has not drifted from the publisher's value.
+		 * Skip synchronization if the local sequence value is already ahead of
+		 * the publisher's value.
+		 *
+		 * XXX This occurs not only when the local sequence has been
+		 * synchronized to a newer value from the publisher (where skipping is
+		 * necessary to avoid backward movement), but also when the local
+		 * sequence has been manually updated by the user on the subscriber. The
+		 * latter could be considered a replication conflict, and overwriting
+		 * the user's update might be acceptable. However, since we cannot
+		 * easily distinguish between these two scenarios, we choose to skip
+		 * synchronization in all cases and emit a WARNING to notify the user to
+		 * manually resolve the conflict.
 		 */
+		if (local_last_value > seqinfo->last_value)
+		{
+			ereport(WARNING,
+					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					errmsg("skipped synchronizing the sequence \"%s.%s\"",
+						   seqinfo->nspname, seqinfo->seqname),
+					errdetail("The local last_value %lld is ahead of the one on publisher",
+							  (long long int) local_last_value));
+
+			return COPYSEQ_NO_DRIFT;
+		}
+
+		/* Skip synchronization if the sequence hasn't drifted */
 		if (local_last_value == seqinfo->last_value &&
 			local_is_called == seqinfo->is_called)
 			return COPYSEQ_NO_DRIFT;
@@ -413,12 +461,16 @@ validate_seqsync_state(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel)
  * synchronized.
  */
 static CopySeqResult
-copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel)
+copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel,
+			  Oid subid, bool run_as_owner)
 {
 	UserContext ucxt;
-	bool		run_as_owner = MySubscription->runasowner;
 	Oid			seqoid = seqinfo->localrelid;
 	CopySeqResult result;
+	XLogRecPtr	local_page_lsn;
+
+	(void) GetSubscriptionRelState(subid, RelationGetRelid(sequence_rel),
+								   &local_page_lsn);
 
 	/*
 	 * If the user did not opt to run as the owner of the subscription
@@ -427,7 +479,7 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel)
 	if (!run_as_owner)
 		SwitchToUntrustedUser(sequence_rel->rd_rel->relowner, &ucxt);
 
-	result = validate_seqsync_state(seqinfo, sequence_rel);
+	result = validate_seqsync_state(seqinfo, sequence_rel, local_page_lsn);
 
 	if (result != COPYSEQ_SUCCESS)
 	{
@@ -454,8 +506,9 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel)
 	 * Record the remote sequence's LSN in pg_subscription_rel and mark the
 	 * sequence as READY if updating a sequence that is in INIT state.
 	 */
-	if (seqinfo->relstate == SUBREL_STATE_INIT)
-		UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
+	if (seqinfo->relstate == SUBREL_STATE_INIT ||
+		seqinfo->page_lsn != local_page_lsn)
+		UpdateSubscriptionRelState(subid, seqoid, SUBREL_STATE_READY,
 								   seqinfo->page_lsn, false);
 
 	return COPYSEQ_SUCCESS;
@@ -471,7 +524,8 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel)
  * Returns true/false if any sequences were actually copied.
  */
 static bool
-copy_sequences(WalReceiverConn *conn, List *seqinfos)
+copy_sequences(WalReceiverConn *conn, List *seqinfos, Oid subid, char *subname,
+			   bool runasowner)
 {
 	int			cur_batch_base_index = 0;
 	int			n_seqinfos = list_length(seqinfos);
@@ -497,11 +551,16 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos)
 		int			batch_no_drift = 0;
 		int			batch_missing_count;
 		Relation	sequence_rel = NULL;
+		bool		started_tx = false;
 
 		WalRcvExecResult *res;
 		TupleTableSlot *slot;
 
-		StartTransactionCommand();
+		if (!IsTransactionState())
+		{
+			StartTransactionCommand();
+			started_tx = true;
+		}
 
 		for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++)
 		{
@@ -597,14 +656,15 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos)
 			 * sequences in READY state, only sync if there's drift.
 			 */
 			if (sync_status == COPYSEQ_SUCCESS)
-				sync_status = copy_sequence(seqinfo, sequence_rel);
+				sync_status = copy_sequence(seqinfo, sequence_rel,
+											subid, runasowner);
 
 			switch (sync_status)
 			{
 				case COPYSEQ_SUCCESS:
 					elog(DEBUG1,
 						 "logical replication synchronization has updated sequence \"%s.%s\" in subscription \"%s\"",
-						 seqinfo->nspname, seqinfo->seqname, MySubscription->name);
+						 seqinfo->nspname, seqinfo->seqname, subname);
 					batch_succeeded_count++;
 					sequence_copied = true;
 					break;
@@ -612,9 +672,8 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos)
 				case COPYSEQ_MISMATCH:
 
 					/*
-					 * Remember mismatched sequences in SequenceSyncContext
-					 * since these will be used after the transaction is
-					 * committed.
+					 * Remember mismatched sequences in SequenceSyncContext since
+					 * these will be used after the transaction is committed.
 					 */
 					oldctx = MemoryContextSwitchTo(SequenceSyncContext);
 					mismatched_seqs_idx = lappend_int(mismatched_seqs_idx,
@@ -680,13 +739,17 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos)
 
 		elog(DEBUG1,
 			 "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped, %d no drift",
-			 MySubscription->name,
+			 subname,
 			 (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
 			 batch_size, batch_succeeded_count, batch_mismatched_count,
 			 batch_insuffperm_count, batch_missing_count, batch_skipped_count, batch_no_drift);
 
-		/* Commit this batch, and prepare for next batch */
-		CommitTransactionCommand();
+		/*
+		 * Commit this batch if started a transaction, and prepare for next
+		 * batch.
+		 */
+		if (started_tx)
+			CommitTransactionCommand();
 
 		if (batch_missing_count)
 		{
@@ -711,7 +774,7 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos)
 
 	/* Report mismatches, permission issues, or missing sequences */
 	report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx,
-						   missing_seqs_idx, seqinfos);
+						   missing_seqs_idx, seqinfos, subname);
 
 	return sequence_copied;
 }
@@ -723,20 +786,23 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos)
  * Returns true if sequences have been updated.
  */
 static bool
-LogicalRepSyncSequences(WalReceiverConn *conn)
+LogicalRepSyncSequences(WalReceiverConn *conn, Oid subid, char *subname,
+						bool runasowner)
 {
 	Relation	rel;
 	HeapTuple	tup;
 	ScanKeyData skey[1];
 	SysScanDesc scan;
-	Oid			subid = MyLogicalRepWorker->subid;
 	bool		sequence_copied = false;
 	List	   *seqinfos = NIL;
 	MemoryContext oldctx;
+	bool		started_tx = false;
 
-	Assert(SequenceSyncContext);
-
-	StartTransactionCommand();
+	if (!IsTransactionState())
+	{
+		StartTransactionCommand();
+		started_tx = true;
+	}
 
 	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
 
@@ -796,7 +862,8 @@ LogicalRepSyncSequences(WalReceiverConn *conn)
 	systable_endscan(scan);
 	table_close(rel, AccessShareLock);
 
-	CommitTransactionCommand();
+	if (started_tx)
+		CommitTransactionCommand();
 
 	/*
 	 * Exit early if no catalog entries found, likely due to concurrent drops.
@@ -805,7 +872,8 @@ LogicalRepSyncSequences(WalReceiverConn *conn)
 		return false;
 
 	/* Process sequences */
-	sequence_copied = copy_sequences(conn, seqinfos);
+	sequence_copied = copy_sequences(conn, seqinfos, subid, subname,
+									 runasowner);
 
 	return sequence_copied;
 }
@@ -880,7 +948,10 @@ start_sequence_sync(void)
 			/*
 			 * Synchronize all sequences (both READY and INIT states).
 			 */
-			sequence_copied = LogicalRepSyncSequences(LogRepWorkerWalRcvConn);
+			sequence_copied = LogicalRepSyncSequences(LogRepWorkerWalRcvConn,
+													  MySubscription->oid,
+													  MySubscription->name,
+													  MySubscription->runasowner);
 
 			MemoryContextReset(SequenceSyncContext);
 			MemoryContextSwitchTo(oldctx);
@@ -942,3 +1013,37 @@ SequenceSyncWorkerMain(Datum main_arg)
 
 	FinishSyncWorker();
 }
+
+/*
+ * Wrapper for LogicalRepSyncSequences to synchronize all sequences of a
+ * subscription from outside the sequencesync worker
+ */
+void
+AlterSubSyncSequences(WalReceiverConn *conn, Oid subid, char *subname,
+					  bool runasowner)
+{
+	/*
+	 * Init the SequenceSyncContext which we clean up after the sequence
+	 * synchronization.
+	 */
+	SequenceSyncContext = AllocSetContextCreate(CurrentMemoryContext,
+												"SequenceSyncContext",
+												ALLOCSET_DEFAULT_SIZES);
+
+	PG_TRY();
+	{
+		MemoryContext	oldctx;
+
+		oldctx = MemoryContextSwitchTo(SequenceSyncContext);
+
+		LogicalRepSyncSequences(conn, subid, subname, runasowner);
+
+		MemoryContextSwitchTo(oldctx);
+	}
+	PG_FINALLY();
+	{
+		MemoryContextDelete(SequenceSyncContext);
+		SequenceSyncContext = NULL;
+	}
+	PG_END_TRY();
+}
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 7d748a28da8..73afd7853d0 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -14,6 +14,8 @@
 
 #include <signal.h>
 
+#include "replication/walreceiver.h"
+
 extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending;
 
 extern void ApplyWorkerMain(Datum main_arg);
@@ -31,4 +33,7 @@ extern void LogicalRepWorkersWakeupAtCommit(Oid subid);
 
 extern void AtEOXact_LogicalRepWorkers(bool isCommit);
 
+extern void AlterSubSyncSequences(WalReceiverConn *conn, Oid subid,
+								  char *subname, bool runasowner);
+
 #endif							/* LOGICALWORKER_H */
diff --git a/src/test/subscription/t/036_sequences.pl b/src/test/subscription/t/036_sequences.pl
index 1d81518fe22..9a61b7bd0c8 100644
--- a/src/test/subscription/t/036_sequences.pl
+++ b/src/test/subscription/t/036_sequences.pl
@@ -176,4 +176,53 @@ $node_subscriber->wait_for_log(
 	qr/WARNING: ( [A-Z0-9]+:)? missing sequence on publisher \("public.regress_s4"\)/,
 	$log_offset);
 
+##########
+# ALTER SUBSCRIPTION ... REFRESH SEQUENCES synchronizes sequences online,
+# eliminating the need to launch a sequencesync worker.
+##########
+
+# Reduce max_logical_replication_workers to disallow sequence worker from running
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_logical_replication_workers = 0));
+$node_subscriber->restart;
+
+# Verify there is no logical replication apply worker running
+$result = $node_subscriber->safe_psql(
+	'postgres',
+	"SELECT count(*) FROM pg_stat_activity WHERE backend_type = 'logical replication apply worker'");
+
+is($result, '0', 'no logical replication worker is running');
+
+# Increment sequence on publisher
+$node_publisher->safe_psql('postgres',
+	qq(SELECT nextval('regress_s1');));
+
+# The command should fail due to missing sequence ('regress_s4')
+my ($cmdret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+	"ALTER SUBSCRIPTION regress_seq_sub REFRESH SEQUENCES;");
+
+like(
+	$stderr,
+	qr/WARNING:  missing sequence on publisher \("public.regress_s4"\)/,
+	"output the wanring for the missing sequence regress_s4");
+
+like(
+	$stderr,
+	qr/ERROR:  logical replication sequence synchronization failed for subscription \"regress_seq_sub\"/,
+	"the command failed due to the missing sequence regress_s4");
+
+# Refresh the publication to remove the missing sequence
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION;");
+
+# Sync the sequence regress_s1
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION regress_seq_sub REFRESH SEQUENCES;");
+
+# Get the current sequence value on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	qq(SELECT last_value FROM regress_s1;));
+
+is($result, '201', 'sequence regress_s1 is synced now');
+
 done_testing();
-- 
2.51.1.windows.1

