On Wed, Sep 13, 2023 at 09:59:04AM -0700, Nathan Bossart wrote:
> On Wed, Sep 13, 2023 at 05:06:28PM +0300, Maxim Orlov wrote:
>> So, should we mark this thread as RfC?
> 
> I've done so.  Barring additional feedback, I intend to commit this in the
> next few days.

I did some staging work for the patch (attached).  The one code change I
made was for the new test.  Instead of adding a new test, I figured we
could modify the preceding test to check for the expected worker type
instead of whether relid is NULL.  ISTM this relid check is intended to
filter for the apply worker, anyway.

The only reason I didn't apply this already is because IMHO we should
adjust the worker types and the documentation for the view to be
consistent.  For example, the docs say "leader apply worker" but the view
just calls them "apply" workers.  The docs say "synchronization worker" but
the view calls them "table synchronization" workers.  My first instinct is
to call apply workers "leader apply" workers in the view, and to call table
synchronization workers "table synchronization workers" in the docs.

Thoughts?

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From a49c8d92c4ddaf99d067d03e6adabea068497e93 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Thu, 14 Sep 2023 14:31:44 -0700
Subject: [PATCH v6 1/1] Add worker type to pg_stat_subscription.

Thanks to 2a8b40e368, the logical replication worker type is easily
determined, and this information is a nice addition to the
pg_stat_subscription view.  The worker type could already be
deduced via other columns such as leader_pid and relid, but that is
unnecessary complexity for users.

Bumps catversion.

Author: Peter Smith
Reviewed-by: Michael Paquier, Maxim Orlov
Discussion: https://postgr.es/m/CAHut%2BPtmbSMfErSk0S7xxVdZJ9XVE3xVLhqBTmT91kf57BeKDQ%40mail.gmail.com
---
 doc/src/sgml/monitoring.sgml               | 11 +++++++++++
 src/backend/catalog/system_views.sql       |  1 +
 src/backend/replication/logical/launcher.c | 18 +++++++++++++++++-
 src/include/catalog/catversion.h           |  2 +-
 src/include/catalog/pg_proc.dat            |  6 +++---
 src/test/regress/expected/rules.out        |  3 ++-
 src/test/subscription/t/004_sync.pl        |  2 +-
 7 files changed, 36 insertions(+), 7 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 4ff415d6a0..17f9323f23 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1993,6 +1993,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>worker_type</structfield> <type>text</type>
+      </para>
+      <para>
+       Type of the subscription worker process.  Possible types are
+       <literal>apply</literal>, <literal>parallel</literal>, and
+       <literal>table synchronization</literal>.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>pid</structfield> <type>integer</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 77b06e2a7a..fcb14976c0 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -949,6 +949,7 @@ CREATE VIEW pg_stat_subscription AS
     SELECT
             su.oid AS subid,
             su.subname,
+            st.worker_type,
             st.pid,
             st.leader_pid,
             st.relid,
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 7882fc91ce..501910b445 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1278,7 +1278,7 @@ GetLeaderApplyWorkerPid(pid_t pid)
 Datum
 pg_stat_get_subscription(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_COLS	9
+#define PG_STAT_GET_SUBSCRIPTION_COLS	10
 	Oid			subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
 	int			i;
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
@@ -1339,6 +1339,22 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 		else
 			values[8] = TimestampTzGetDatum(worker.reply_time);
 
+		switch (worker.type)
+		{
+			case WORKERTYPE_APPLY:
+				values[9] = CStringGetTextDatum("apply");
+				break;
+			case WORKERTYPE_PARALLEL_APPLY:
+				values[9] = CStringGetTextDatum("parallel apply");
+				break;
+			case WORKERTYPE_TABLESYNC:
+				values[9] = CStringGetTextDatum("table synchronization");
+				break;
+			case WORKERTYPE_UNKNOWN:
+				/* Should never happen. */
+				elog(ERROR, "unknown worker type");
+		}
+
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
 							 values, nulls);
 
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 4eaef54d0c..a0c1dbca95 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -57,6 +57,6 @@
  */
 
 /*							yyyymmddN */
-#define CATALOG_VERSION_NO	202309061
+#define CATALOG_VERSION_NO	202309141
 
 #endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9805bc6118..f0b7b9cbd8 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5484,9 +5484,9 @@
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
+  proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz,text}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,worker_type}',
   prosrc => 'pg_stat_get_subscription' },
 { oid => '2026', descr => 'statistics: current backend PID',
   proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r',
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 5058be5411..2c60400ade 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2118,6 +2118,7 @@ pg_stat_ssl| SELECT pid,
   WHERE (client_port IS NOT NULL);
 pg_stat_subscription| SELECT su.oid AS subid,
     su.subname,
+    st.worker_type,
     st.pid,
     st.leader_pid,
     st.relid,
@@ -2127,7 +2128,7 @@ pg_stat_subscription| SELECT su.oid AS subid,
     st.latest_end_lsn,
     st.latest_end_time
    FROM (pg_subscription su
-     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
+     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, worker_type) ON ((st.subid = su.oid)));
 pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl
index bf4d59efba..ee07d28b37 100644
--- a/src/test/subscription/t/004_sync.pl
+++ b/src/test/subscription/t/004_sync.pl
@@ -80,7 +80,7 @@ $node_subscriber->safe_psql('postgres',
 
 # wait for it to start
 $node_subscriber->poll_query_until('postgres',
-	"SELECT pid IS NOT NULL FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND relid IS NULL"
+	"SELECT pid IS NOT NULL FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND worker_type = 'apply'"
 ) or die "Timed out while waiting for subscriber to start";
 
 # and drop both subscriptions
-- 
2.25.1

Reply via email to