From 8e0959f68cb75b6699300375f6df59efbc0dc6b8 Mon Sep 17 00:00:00 2001
From: Shveta Malik <shveta.malik@gmail.com>
Date: Thu, 23 Apr 2026 10:48:49 +0530
Subject: [PATCH] LogicalRep Message Related Change

---
 .../replication/logical/applyparallelworker.c    | 12 ++++++++----
 src/backend/replication/logical/worker.c         | 10 +++++-----
 src/include/replication/logicalworker.h          |  2 ++
 src/include/replication/worker_internal.h        | 16 +---------------
 4 files changed, 16 insertions(+), 24 deletions(-)

diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 7ae67a061ad..f748854505c 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -864,12 +864,16 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
 				 */
 				s.cursor += SIZE_STATS_MESSAGE;
 
-				apply_dispatch(&s);
+				apply_dispatch(&s, READ_NEXT_MSG);
 			}
-			else if (c == PARALLEL_APPLY_INTERNAL_MESSAGE)
+			else if (c == LOGICAL_REP_MSG_INTERNAL_MESSAGE)
 			{
-				/* Handle the internal message. */
-				apply_dispatch(&s);
+				/*
+				 * Since it is an internal message type (sent by leader worker), handle
+				 * directly by sending the action to apply_dispatch(). We don't expect
+				 * internaly generated messages to be wrapped in PqReplMsg_* format types.
+				 */
+				apply_dispatch(&s, c);
 			}
 			else
 			{
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 2c0091e4fb1..16766c551cb 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2357,7 +2357,7 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
 		/* Ensure we are reading the data into our memory context. */
 		oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
 
-		apply_dispatch(&s2);
+		apply_dispatch(&s2, READ_NEXT_MSG);
 
 		MemoryContextReset(ApplyMessageContext);
 
@@ -3779,9 +3779,10 @@ apply_handle_truncate(StringInfo s)
  * Logical replication protocol message dispatcher.
  */
 void
-apply_dispatch(StringInfo s)
+apply_dispatch(StringInfo s, int action_in)
 {
-	LogicalRepMsgType action = pq_getmsgbyte(s);
+	LogicalRepMsgType action = (action_in == READ_NEXT_MSG) ?
+					pq_getmsgbyte(s) : (LogicalRepMsgType) action_in;
 	LogicalRepMsgType saved_command;
 
 	/*
@@ -3878,7 +3879,6 @@ apply_dispatch(StringInfo s)
 		case LOGICAL_REP_MSG_INTERNAL_MESSAGE:
 			apply_handle_internal_message(s);
 			break;
-
 		default:
 			ereport(ERROR,
 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -4101,7 +4101,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 						UpdateWorkerStats(last_received, send_time, false);
 
-						apply_dispatch(&s);
+						apply_dispatch(&s, READ_NEXT_MSG);
 
 						maybe_advance_nonremovable_xid(&rdt_data, false);
 					}
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 7d748a28da8..8107d4fd059 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -14,6 +14,8 @@
 
 #include <signal.h>
 
+#define READ_NEXT_MSG 0
+
 extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending;
 
 extern void ApplyWorkerMain(Datum main_arg);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 96d90c2418c..921fc178534 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -316,7 +316,7 @@ extern void stream_stop_internal(TransactionId xid);
 extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
 								   XLogRecPtr lsn);
 
-extern void apply_dispatch(StringInfo s);
+extern void apply_dispatch(StringInfo s, int action_in);
 
 extern void maybe_reread_subscription(void);
 
@@ -383,20 +383,6 @@ extern void pa_wait_for_depended_transaction(TransactionId xid);
 #define isSequenceSyncWorker(worker) ((worker)->in_use && \
 									  (worker)->type == WORKERTYPE_SEQUENCESYNC)
 
-/*
- * The message code for internal messages sent by the leader apply worker to the
- * parallel apply worker.
- */
-#define PARALLEL_APPLY_INTERNAL_MESSAGE	'i'
-
-/*
- * Ensure PARALLEL_APPLY_INTERNAL_MESSAGE does not conflict with
- * PqReplMsg_WALData ('d'), as parallel apply workers may receive both types of
- * messages.
- */
-StaticAssertDecl(PARALLEL_APPLY_INTERNAL_MESSAGE != PqReplMsg_WALData,
-				 "PARALLEL_APPLY_INTERNAL_MESSAGE conflicts with PqReplMsg_WALData");
-
 static inline bool
 am_tablesync_worker(void)
 {
-- 
2.34.1

