From f26897ebe46dc0822df9a8b843f404ff2187684c Mon Sep 17 00:00:00 2001
From: "houzj.fnst" <houzj.fnst@cn.fujitsu.com>
Date: Thu, 6 Oct 2022 14:42:24 +0800
Subject: [PATCH v82] Display the leader apply worker's PID for parallel apply
 workers.

Add leader_pid to pg_stat_subscription. leader_pid is the process ID of
the leader apply worker if this process is a parallel apply worker. If
this field is NULL, it indicates that the process is a leader apply
worker or a synchronization worker. The new column makes it easier to
distinguish parallel apply workers from other kinds of workers and helps
to identify the leader for the parallel workers corresponding to a
particular subscription.

Additionally, update the leader_pid column in pg_stat_activity to
display the PID of the leader apply worker for parallel apply workers.

Author: Hou Zhijie
Reviewed-by: Peter Smith, Sawada Masahiko, Amit Kapila
Discussion: https://postgr.es/m/CAA4eK1+wyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw@mail.gmail.com
---
 doc/src/sgml/logical-replication.sgml         |  3 +-
 doc/src/sgml/monitoring.sgml                  | 36 ++++++++---
 src/backend/catalog/system_views.sql          |  1 +
 .../replication/logical/applyparallelworker.c |  6 +-
 src/backend/replication/logical/launcher.c    | 64 ++++++++++++++-----
 src/backend/utils/adt/pgstatfuncs.c           | 21 ++++--
 src/include/catalog/pg_proc.dat               |  6 +-
 src/include/replication/logicallauncher.h     |  2 +
 src/include/replication/worker_internal.h     |  4 +-
 src/test/regress/expected/rules.out           |  3 +-
 10 files changed, 104 insertions(+), 42 deletions(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 54f48be87f..f4b4e641be 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1692,7 +1692,8 @@ CONTEXT:  processing remote data for replication origin "pg_16395" during "INSER
    subscription.  A disabled subscription or a crashed subscription will have
    zero rows in this view.  If the initial data synchronization of any
    table is in progress, there will be additional workers for the tables
-   being synchronized.
+   being synchronized. Moreover, if the streaming transaction is applied in
+   parallel, there may be additional parallel apply workers.
   </para>
  </sect1>
 
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 358d2ff90f..e3a783abd0 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -743,9 +743,11 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
        <structfield>leader_pid</structfield> <type>integer</type>
       </para>
       <para>
-       Process ID of the parallel group leader, if this process is a
-       parallel query worker.  <literal>NULL</literal> if this process is a
-       parallel group leader or does not participate in parallel query.
+       Process ID of the parallel group leader if this process is a parallel
+       query worker, or process ID of the leader apply worker if this process
+       is a parallel apply worker.  <literal>NULL</literal> indicates that this
+       process is a parallel group leader or leader apply worker, or does not
+       participate in any parallel operation.
       </para></entry>
      </row>
 
@@ -3206,13 +3208,24 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>leader_pid</structfield> <type>integer</type>
+      </para>
+      <para>
+       Process ID of the leader apply worker if this process is a parallel
+       apply worker; NULL if this process is a leader apply worker or a
+       synchronization worker
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>relid</structfield> <type>oid</type>
       </para>
       <para>
-       OID of the relation that the worker is synchronizing; null for the
-       main apply worker
+       OID of the relation that the worker is synchronizing; NULL for the
+       leader apply worker and parallel apply workers
       </para></entry>
      </row>
 
@@ -3222,7 +3235,7 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
       </para>
       <para>
        Last write-ahead log location received, the initial value of
-       this field being 0
+       this field being 0; NULL for parallel apply workers
       </para></entry>
      </row>
 
@@ -3231,7 +3244,8 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
        <structfield>last_msg_send_time</structfield> <type>timestamp with time zone</type>
       </para>
       <para>
-       Send time of last message received from origin WAL sender
+       Send time of last message received from origin WAL sender; NULL for
+       parallel apply workers
       </para></entry>
      </row>
 
@@ -3240,7 +3254,8 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
        <structfield>last_msg_receipt_time</structfield> <type>timestamp with time zone</type>
       </para>
       <para>
-       Receipt time of last message received from origin WAL sender
+       Receipt time of last message received from origin WAL sender; NULL for
+       parallel apply workers
       </para></entry>
      </row>
 
@@ -3249,7 +3264,8 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
        <structfield>latest_end_lsn</structfield> <type>pg_lsn</type>
       </para>
       <para>
-       Last write-ahead log location reported to origin WAL sender
+       Last write-ahead log location reported to origin WAL sender; NULL for
+       parallel apply workers
       </para></entry>
      </row>
 
@@ -3259,7 +3275,7 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
       </para>
       <para>
        Time of last write-ahead log location reported to origin WAL
-       sender
+       sender; NULL for parallel apply workers
       </para></entry>
      </row>
     </tbody>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index d2a8c82900..8608e3fa5b 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -948,6 +948,7 @@ CREATE VIEW pg_stat_subscription AS
             su.oid AS subid,
             su.subname,
             st.pid,
+            st.leader_pid,
             st.relid,
             st.received_lsn,
             st.last_msg_send_time,
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 3dfcff2798..3579e704fe 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -849,7 +849,7 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
 static void
 pa_shutdown(int code, Datum arg)
 {
-	SendProcSignal(MyLogicalRepWorker->apply_leader_pid,
+	SendProcSignal(MyLogicalRepWorker->leader_pid,
 				   PROCSIG_PARALLEL_APPLY_MESSAGE,
 				   InvalidBackendId);
 
@@ -932,7 +932,7 @@ ParallelApplyWorkerMain(Datum main_arg)
 	error_mqh = shm_mq_attach(mq, seg, NULL);
 
 	pq_redirect_to_shm_mq(seg, error_mqh);
-	pq_set_parallel_leader(MyLogicalRepWorker->apply_leader_pid,
+	pq_set_parallel_leader(MyLogicalRepWorker->leader_pid,
 						   InvalidBackendId);
 
 	MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
@@ -950,7 +950,7 @@ ParallelApplyWorkerMain(Datum main_arg)
 	 * The parallel apply worker doesn't need to monopolize this replication
 	 * origin which was already acquired by its leader process.
 	 */
-	replorigin_session_setup(originid, MyLogicalRepWorker->apply_leader_pid);
+	replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
 	replorigin_session_origin = originid;
 	CommitTransactionCommand();
 
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index afb7acddaa..ff97ae4ffb 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -410,7 +410,7 @@ retry:
 	worker->relstate = SUBREL_STATE_UNKNOWN;
 	worker->relstate_lsn = InvalidXLogRecPtr;
 	worker->stream_fileset = NULL;
-	worker->apply_leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
+	worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
 	worker->parallel_apply = is_parallel_apply_worker;
 	worker->last_lsn = InvalidXLogRecPtr;
 	TIMESTAMP_NOBEGIN(worker->last_send_time);
@@ -732,7 +732,7 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
 	worker->userid = InvalidOid;
 	worker->subid = InvalidOid;
 	worker->relid = InvalidOid;
-	worker->apply_leader_pid = InvalidPid;
+	worker->leader_pid = InvalidPid;
 	worker->parallel_apply = false;
 }
 
@@ -1066,13 +1066,41 @@ IsLogicalLauncher(void)
 	return LogicalRepCtx->launcher_pid == MyProcPid;
 }
 
+/*
+ * Return the pid of the leader apply worker if the given pid is the pid of a
+ * parallel apply worker, otherwise, return InvalidPid.
+ */
+pid_t
+GetLeaderApplyWorkerPid(pid_t pid)
+{
+	int			leader_pid = InvalidPid;
+	int			i;
+
+	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+	for (i = 0; i < max_logical_replication_workers; i++)
+	{
+		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+		if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
+		{
+			leader_pid = w->leader_pid;
+			break;
+		}
+	}
+
+	LWLockRelease(LogicalRepWorkerLock);
+
+	return leader_pid;
+}
+
 /*
  * Returns state of the subscriptions.
  */
 Datum
 pg_stat_get_subscription(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_COLS	8
+#define PG_STAT_GET_SUBSCRIPTION_COLS	9
 	Oid			subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
 	int			i;
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
@@ -1098,10 +1126,6 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 		if (OidIsValid(subid) && worker.subid != subid)
 			continue;
 
-		/* Skip if this is a parallel apply worker */
-		if (isParallelApplyWorker(&worker))
-			continue;
-
 		worker_pid = worker.proc->pid;
 
 		values[0] = ObjectIdGetDatum(worker.subid);
@@ -1110,26 +1134,32 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 		else
 			nulls[1] = true;
 		values[2] = Int32GetDatum(worker_pid);
-		if (XLogRecPtrIsInvalid(worker.last_lsn))
+
+		if (isParallelApplyWorker(&worker))
+			values[3] = Int32GetDatum(worker.leader_pid);
+		else
 			nulls[3] = true;
+
+		if (XLogRecPtrIsInvalid(worker.last_lsn))
+			nulls[4] = true;
 		else
-			values[3] = LSNGetDatum(worker.last_lsn);
+			values[4] = LSNGetDatum(worker.last_lsn);
 		if (worker.last_send_time == 0)
-			nulls[4] = true;
+			nulls[5] = true;
 		else
-			values[4] = TimestampTzGetDatum(worker.last_send_time);
+			values[5] = TimestampTzGetDatum(worker.last_send_time);
 		if (worker.last_recv_time == 0)
-			nulls[5] = true;
+			nulls[6] = true;
 		else
-			values[5] = TimestampTzGetDatum(worker.last_recv_time);
+			values[6] = TimestampTzGetDatum(worker.last_recv_time);
 		if (XLogRecPtrIsInvalid(worker.reply_lsn))
-			nulls[6] = true;
+			nulls[7] = true;
 		else
-			values[6] = LSNGetDatum(worker.reply_lsn);
+			values[7] = LSNGetDatum(worker.reply_lsn);
 		if (worker.reply_time == 0)
-			nulls[7] = true;
+			nulls[8] = true;
 		else
-			values[7] = TimestampTzGetDatum(worker.reply_time);
+			values[8] = TimestampTzGetDatum(worker.reply_time);
 
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
 							 values, nulls);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 58bd1360b9..a3bf95ff0f 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -25,6 +25,7 @@
 #include "pgstat.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/postmaster.h"
+#include "replication/logicallauncher.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/acl.h"
@@ -409,9 +410,9 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
 
 			/*
 			 * If a PGPROC entry was retrieved, display wait events and lock
-			 * group leader information if any.  To avoid extra overhead, no
-			 * extra lock is being held, so there is no guarantee of
-			 * consistency across multiple rows.
+			 * group leader or apply leader information if any.  To avoid
+			 * extra overhead, no extra lock is being held, so there is no
+			 * guarantee of consistency across multiple rows.
 			 */
 			if (proc != NULL)
 			{
@@ -426,14 +427,24 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
 
 				/*
 				 * Show the leader only for active parallel workers.  This
-				 * leaves the field as NULL for the leader of a parallel
-				 * group.
+				 * leaves the field as NULL for the leader of a parallel group
+				 * or the leader of parallel apply workers.
 				 */
 				if (leader && leader->pid != beentry->st_procpid)
 				{
 					values[28] = Int32GetDatum(leader->pid);
 					nulls[28] = false;
 				}
+				else
+				{
+					int			leader_pid = GetLeaderApplyWorkerPid(beentry->st_procpid);
+
+					if (leader_pid != InvalidPid)
+					{
+						values[28] = Int32GetDatum(leader_pid);
+						nulls[28] = false;
+					}
+				}
 			}
 
 			if (wait_event_type)
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 3810de7b22..86eb8e8c58 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5430,9 +5430,9 @@
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,oid,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subid,relid,pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
+  proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
   prosrc => 'pg_stat_get_subscription' },
 { oid => '2026', descr => 'statistics: current backend PID',
   proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r',
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index c85593ad11..360e98702a 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -27,4 +27,6 @@ extern void AtEOXact_ApplyLauncher(bool isCommit);
 
 extern bool IsLogicalLauncher(void);
 
+extern pid_t GetLeaderApplyWorkerPid(pid_t pid);
+
 #endif							/* LOGICALLAUNCHER_H */
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index db891eea8a..dc87a4edd1 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -71,7 +71,7 @@ typedef struct LogicalRepWorker
 	 * PID of leader apply worker if this slot is used for a parallel apply
 	 * worker, InvalidPid otherwise.
 	 */
-	pid_t		apply_leader_pid;
+	pid_t		leader_pid;
 
 	/* Indicates whether apply can be performed in parallel. */
 	bool		parallel_apply;
@@ -303,7 +303,7 @@ extern void pa_decr_and_wait_stream_block(void);
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 						   XLogRecPtr remote_lsn);
 
-#define isParallelApplyWorker(worker) ((worker)->apply_leader_pid != InvalidPid)
+#define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
 
 static inline bool
 am_tablesync_worker(void)
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index a969ae63eb..05eec2adfd 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2094,6 +2094,7 @@ pg_stat_ssl| SELECT s.pid,
 pg_stat_subscription| SELECT su.oid AS subid,
     su.subname,
     st.pid,
+    st.leader_pid,
     st.relid,
     st.received_lsn,
     st.last_msg_send_time,
@@ -2101,7 +2102,7 @@ pg_stat_subscription| SELECT su.oid AS subid,
     st.latest_end_lsn,
     st.latest_end_time
    FROM (pg_subscription su
-     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
+     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
 pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
-- 
2.31.1

