From e7f4a93fdeea15489fbed4b3404ffd08a958af69 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Mon, 25 Aug 2025 10:03:28 +0800
Subject: [PATCH v71 2/2] Add a dead_tuple_retention_active column in
 pg_stat_subscription

To monitor worker's conflict retention status, this patch also introduces a new
column 'dead_tuple_retention_active' in the pg_stat_subscription view. This column
indicates whether the apply worker is effectively retaining conflict
information. The value is set to true only if retain_dead_tuples is enabled
for the associated subscription, and the retention duration for conflict
detection by the apply worker has not exceeded max_retention_duration.
---
 doc/src/sgml/monitoring.sgml               | 12 ++++++++++++
 src/backend/catalog/system_views.sql       |  3 ++-
 src/backend/replication/logical/launcher.c | 18 +++++++++++++++++-
 src/include/catalog/pg_proc.dat            |  6 +++---
 src/test/regress/expected/rules.out        |  5 +++--
 src/test/subscription/t/035_conflicts.pl   | 12 ++++++++++++
 6 files changed, 49 insertions(+), 7 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 3f4a27a736e..96270f03bf2 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2114,6 +2114,18 @@ description | Waiting for a newly initialized WAL file to reach durable storage
        sender; NULL for parallel apply workers
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>dead_tuple_retention_active</structfield> <type>boolean</type>
+      </para>
+      <para>
+       True if <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>
+       is enabled and the retention duration for information used in conflict detection is
+       within <link linkend="sql-createsubscription-params-with-max-retention-duration"><literal>max_retention_duration</literal></link>; NULL for
+       parallel apply workers and table synchronization workers.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index c77fa0234bb..b47109ab11d 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -996,7 +996,8 @@ CREATE VIEW pg_stat_subscription AS
             st.last_msg_send_time,
             st.last_msg_receipt_time,
             st.latest_end_lsn,
-            st.latest_end_time
+            st.latest_end_time,
+            st.dead_tuple_retention_active
     FROM pg_subscription su
             LEFT JOIN pg_stat_get_subscription(NULL) st
                       ON (st.subid = su.oid);
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 226421e3dfa..cd54ac7cbf2 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1593,7 +1593,7 @@ GetLeaderApplyWorkerPid(pid_t pid)
 Datum
 pg_stat_get_subscription(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_COLS	10
+#define PG_STAT_GET_SUBSCRIPTION_COLS	11
 	Oid			subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
 	int			i;
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
@@ -1670,6 +1670,22 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 				elog(ERROR, "unknown worker type");
 		}
 
+		/*
+		 * Use the worker's oldest_nonremovable_xid instead of
+		 * pg_subscription.subretentionactive to determine whether retention
+		 * is active, as retention resumption might not be complete even when
+		 * subretentionactive is set to true; this is because the launcher
+		 * assigns the initial oldest_nonremovable_xid after the apply worker
+		 * updates the catalog (see resume_conflict_info_retention).
+		 *
+		 * Only the leader apply worker manages conflict retention (see
+		 * maybe_advance_nonremovable_xid() for details).
+		 */
+		if (!isParallelApplyWorker(&worker) && !isTablesyncWorker(&worker))
+			values[10] = TransactionIdIsValid(worker.oldest_nonremovable_xid);
+		else
+			nulls[10] = true;
+
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
 							 values, nulls);
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 118d6da1ace..3810f3883b7 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5696,9 +5696,9 @@
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz,text}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,worker_type}',
+  proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz,text,bool}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,worker_type,dead_tuple_retention_active}',
   prosrc => 'pg_stat_get_subscription' },
 { oid => '2026', descr => 'statistics: current backend PID',
   proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r',
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 35e8aad7701..183fc193ad3 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2174,9 +2174,10 @@ pg_stat_subscription| SELECT su.oid AS subid,
     st.last_msg_send_time,
     st.last_msg_receipt_time,
     st.latest_end_lsn,
-    st.latest_end_time
+    st.latest_end_time,
+    st.dead_tuple_retention_active
    FROM (pg_subscription su
-     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, worker_type) ON ((st.subid = su.oid)));
+     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, worker_type, dead_tuple_retention_active) ON ((st.subid = su.oid)));
 pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl
index 947ea131c4d..f53209120a3 100644
--- a/src/test/subscription/t/035_conflicts.pl
+++ b/src/test/subscription/t/035_conflicts.pl
@@ -580,6 +580,10 @@ if ($injection_points_supported != 0)
 # max_retention_duration.
 ###############################################################################
 
+$result = $node_A->safe_psql('postgres',
+	"SELECT dead_tuple_retention_active FROM pg_stat_subscription WHERE subname='$subname_AB';");
+is($result, qq(t), 'worker on node A retains dead tuples');
+
 # Create a physical slot
 $node_B->safe_psql('postgres',
 	"SELECT * FROM pg_create_physical_replication_slot('blocker');");
@@ -622,6 +626,10 @@ $result = $node_A->safe_psql('postgres',
 	"SELECT subretentionactive FROM pg_subscription WHERE subname='$subname_AB';");
 is($result, qq(f), 'retention is inactive');
 
+$result = $node_A->safe_psql('postgres',
+	"SELECT dead_tuple_retention_active FROM pg_stat_subscription WHERE subname='$subname_AB';");
+is($result, qq(f), 'worker on node A does not retain dead tuples');
+
 # Drop the physical slot and reset the synchronized_standby_slots setting
 $node_B->safe_psql('postgres',
 	"SELECT * FROM pg_drop_replication_slot('blocker');");
@@ -655,6 +663,10 @@ $result = $node_A->safe_psql('postgres',
 	"SELECT subretentionactive FROM pg_subscription WHERE subname='$subname_AB';");
 is($result, qq(t), 'retention is active');
 
+$result = $node_A->safe_psql('postgres',
+	"SELECT dead_tuple_retention_active FROM pg_stat_subscription WHERE subname='$subname_AB';");
+is($result, qq(t), 'worker on node A retains dead tuples');
+
 ###############################################################################
 # Check that the replication slot pg_conflict_detection is dropped after
 # removing all the subscriptions.
-- 
2.51.0.windows.1

