From bcf31aa24d8d2694a912c56280707a574e31d74a Mon Sep 17 00:00:00 2001
From: Takamichi Osumi <osumi.takamichi@fujitsu.com>
Date: Tue, 1 Mar 2022 01:22:05 +0000
Subject: [PATCH v25] Extend pg_stat_subscription_stats to include general
 transaction statistics

Introduce cumulative columns of transactions of
logical replication subscriber to the pg_stat_subscription_stats view.

The timing when the data of transaction statistics is sent to the
stats collector is adjusted with PGSTAT_STAT_INTERVAL to avoid overload.

Author: Takamichi Osumi
Reviewed-by: Amit Kapila, Masahiko Sawada, Hou Zhijie, Greg Nancarrow,
             Vignesh C, Ajin Cherian, Kyotaro Horiguchi, Tang Haiying
Tested-by: Wang wei
Discussion: https://www.postgresql.org/message-id/OSBPR01MB48887CA8F40C8D984A6DC00CED199%40OSBPR01MB4888.jpnprd01.prod.outlook.com
---
 doc/src/sgml/monitoring.sgml               | 23 +++++++++
 src/backend/catalog/system_views.sql       |  2 +
 src/backend/postmaster/pgstat.c            | 83 ++++++++++++++++++++++++++++++
 src/backend/replication/logical/launcher.c |  2 +
 src/backend/replication/logical/worker.c   | 33 ++++++++++++
 src/backend/utils/adt/pgstatfuncs.c        | 18 +++++--
 src/include/catalog/pg_proc.dat            |  6 +--
 src/include/pgstat.h                       | 26 ++++++++++
 src/include/replication/worker_internal.h  |  9 ++++
 src/test/regress/expected/rules.out        |  4 +-
 src/tools/pgindent/typedefs.list           |  2 +
 11 files changed, 200 insertions(+), 8 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 9fb62fe..a50a13f 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -635,6 +635,29 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       </entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>apply_commit_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions successfully applied in this subscription.
+       Both <command>COMMIT</command> and <command>COMMIT PREPARED</command>
+       increment this counter.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>apply_rollback_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions rollbacked in this subscription. Both
+       <command>ROLLBACK</command> of transaction streamed as in-progress
+       transaction and <command>ROLLBACK PREPARED</command> increment this
+       counter.
+      </para></entry>
+     </row>
+
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 40b7bca..eae957f 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1270,6 +1270,8 @@ CREATE VIEW pg_stat_subscription_stats AS
         s.subname,
         ss.apply_error_count,
         ss.sync_error_count,
+        ss.apply_commit_count,
+        ss.apply_rollback_count,
         ss.stats_reset
     FROM pg_subscription as s,
          pg_stat_get_subscription_stats(s.oid) as ss;
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 53ddd93..67a64ad 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -55,6 +55,7 @@
 #include "postmaster/postmaster.h"
 #include "replication/slot.h"
 #include "replication/walsender.h"
+#include "replication/worker_internal.h"
 #include "storage/backendid.h"
 #include "storage/dsm.h"
 #include "storage/fd.h"
@@ -286,6 +287,8 @@ static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
 static HTAB *replSlotStatHash = NULL;
 static HTAB *subscriptionStatHash = NULL;
 
+extern LogicalRepSubscriptionStats subStats;
+
 /*
  * List of OIDs of databases we need to write out.  If an entry is InvalidOid,
  * it means to write only the shared-catalog stats ("DB 0"); otherwise, we
@@ -382,6 +385,7 @@ static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
 static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
 static void pgstat_recv_subscription_drop(PgStat_MsgSubscriptionDrop *msg, int len);
 static void pgstat_recv_subscription_error(PgStat_MsgSubscriptionError *msg, int len);
+static void pgstat_recv_subscription_xact(PgStat_MsgSubscriptionXact *msg, int len);
 
 /* ------------------------------------------------------------
  * Public functions called from postmaster follow
@@ -3421,6 +3425,60 @@ pgstat_send_slru(void)
 }
 
 /* ----------
+ * pgstat_report_subscription_xact() -
+ *
+ *	Send a subscription transaction stats to the collector.
+ *	The statistics are cleared upon sending.
+ *
+ *	'force' is true only when the subscription worker process exits.
+ * ----------
+ */
+void
+pgstat_report_subscription_xact(bool force)
+{
+	static TimestampTz last_report = 0;
+	PgStat_MsgSubscriptionXact msg;
+
+	/*
+	 * This function can be called even if nothing at all has happened. In
+	 * this case, there's no need to go forward.
+	 */
+	if (subStats.subid == InvalidOid ||
+		(subStats.apply_commit_count == 0 && subStats.apply_rollback_count == 0))
+		return;
+
+	if (!force)
+	{
+		TimestampTz now = GetCurrentTimestamp();
+
+		/*
+		 * Don't send a message unless it's been at least PGSTAT_STAT_INTERVAL
+		 * msec since we last sent one to avoid overloading the stats
+		 * collector.
+		 */
+		if (!TimestampDifferenceExceeds(last_report, now, PGSTAT_STAT_INTERVAL))
+			return;
+		last_report = now;
+	}
+
+	/*
+	 * Prepare and send the message.
+	 */
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONXACT);
+	msg.m_databaseid = MyDatabaseId;
+	msg.m_subid = subStats.subid;
+	msg.apply_commit_count = subStats.apply_commit_count;
+	msg.apply_rollback_count = subStats.apply_rollback_count;
+	pgstat_send(&msg, sizeof(PgStat_MsgSubscriptionXact));
+
+	/*
+	 * Clear out the statistics.
+	 */
+	subStats.apply_commit_count = 0;
+	subStats.apply_rollback_count = 0;
+}
+
+/* ----------
  * PgstatCollectorMain() -
  *
  *	Start up the statistics collector process.  This is the body of the
@@ -3687,6 +3745,10 @@ PgstatCollectorMain(int argc, char *argv[])
 					pgstat_recv_subscription_error(&msg.msg_subscriptionerror, len);
 					break;
 
+				case PGSTAT_MTYPE_SUBSCRIPTIONXACT:
+					pgstat_recv_subscription_xact(&msg.msg_subscriptionxact, len);
+					break;
+
 				default:
 					break;
 			}
@@ -6092,6 +6154,25 @@ pgstat_recv_subscription_error(PgStat_MsgSubscriptionError *msg, int len)
 }
 
 /* ----------
+ * pgstat_recv_subscription_xact() -
+ *
+ *	Process a SUBSCRIPTIONXACT message.
+ * ----------
+ */
+static void
+pgstat_recv_subscription_xact(PgStat_MsgSubscriptionXact *msg, int len)
+{
+	PgStat_StatSubEntry *subentry;
+
+	/* Get the subscription stats */
+	subentry = pgstat_get_subscription_entry(msg->m_subid, true);
+	Assert(subentry);
+
+	subentry->apply_commit_count += msg->apply_commit_count;
+	subentry->apply_rollback_count += msg->apply_rollback_count;
+}
+
+/* ----------
  * pgstat_write_statsfile_needed() -
  *
  *	Do we need to write out any stats files?
@@ -6268,6 +6349,8 @@ pgstat_reset_subscription(PgStat_StatSubEntry *subentry, TimestampTz ts)
 {
 	subentry->apply_error_count = 0;
 	subentry->sync_error_count = 0;
+	subentry->apply_commit_count = 0;
+	subentry->apply_rollback_count = 0;
 	subentry->stat_reset_timestamp = ts;
 }
 
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 5a68d6d..4dfcac8 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -647,6 +647,8 @@ logicalrep_worker_onexit(int code, Datum arg)
 	if (LogRepWorkerWalRcvConn)
 		walrcv_disconnect(LogRepWorkerWalRcvConn);
 
+	pgstat_report_subscription_xact(true);
+
 	logicalrep_worker_detach();
 
 	/* Cleanup fileset used for streaming transactions. */
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 7e267f7..ad1e5aa 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -238,6 +238,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
 	.ts = 0,
 };
 
+LogicalRepSubscriptionStats subStats = {InvalidOid, 0, 0};
+
 static MemoryContext ApplyMessageContext = NULL;
 MemoryContext ApplyContext = NULL;
 
@@ -329,6 +331,8 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
 /* Compute GID for two_phase transactions */
 static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
 
+static void subscription_stats_update(bool is_commit);
+
 /* Common streaming function to apply all the spooled messages */
 static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
 
@@ -959,6 +963,8 @@ apply_handle_commit_prepared(StringInfo s)
 	CommitTransactionCommand();
 	pgstat_report_stat(false);
 
+	subscription_stats_update(true);
+
 	store_flush_position(prepare_data.end_lsn);
 	in_remote_transaction = false;
 
@@ -1006,6 +1012,8 @@ apply_handle_rollback_prepared(StringInfo s)
 		FinishPreparedTransaction(gid, false);
 		end_replication_step();
 		CommitTransactionCommand();
+
+		subscription_stats_update(false);
 	}
 
 	pgstat_report_stat(false);
@@ -1217,6 +1225,8 @@ apply_handle_stream_abort(StringInfo s)
 	{
 		set_apply_error_context_xact(xid, 0);
 		stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+
+		subscription_stats_update(false);
 	}
 	else
 	{
@@ -1463,6 +1473,8 @@ apply_handle_commit_internal(LogicalRepCommitData *commit_data)
 		CommitTransactionCommand();
 		pgstat_report_stat(false);
 
+		subscription_stats_update(true);
+
 		store_flush_position(commit_data->end_lsn);
 	}
 	else
@@ -2717,6 +2729,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		if (endofstream)
 			break;
 
+		pgstat_report_subscription_xact(false);
+
 		/*
 		 * Wait for more data or latch.  If we have unflushed transactions,
 		 * wake up after WalWriterDelay to see if they've been flushed yet (in
@@ -3372,6 +3386,22 @@ TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
 	snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
 }
 
+/*
+ * Update the statistics of subscription.
+ */
+static void
+subscription_stats_update(bool is_commit)
+{
+	Assert(OidIsValid(subStats.subid));
+
+	if (is_commit)
+		subStats.apply_commit_count++;
+	else
+		subStats.apply_rollback_count++;
+
+	pgstat_report_subscription_xact(false);
+}
+
 /* Logical Replication Apply worker entry point */
 void
 ApplyWorkerMain(Datum main_arg)
@@ -3469,6 +3499,9 @@ ApplyWorkerMain(Datum main_arg)
 
 	CommitTransactionCommand();
 
+	/* Set the subid for subscription statistics */
+	subStats.subid = MyLogicalRepWorker->subid;
+
 	/* Connect to the origin and start the replication. */
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index fd993d0..a8dcc27 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2405,7 +2405,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS	4
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS	6
 	Oid			subid = PG_GETARG_OID(0);
 	TupleDesc	tupdesc;
 	Datum		values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS];
@@ -2424,7 +2424,11 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 					   INT8OID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "stats_reset",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "apply_commit_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "sync_rollback_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "stats_reset",
 					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
@@ -2448,11 +2452,17 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 	/* sync_error_count */
 	values[2] = Int64GetDatum(subentry->sync_error_count);
 
+	/* apply_commit_count */
+	values[3] = Int64GetDatum(subentry->apply_commit_count);
+
+	/* apply_rollback_count */
+	values[4] = Int64GetDatum(subentry->apply_rollback_count);
+
 	/* stats_reset */
 	if (subentry->stat_reset_timestamp == 0)
-		nulls[3] = true;
+		nulls[5] = true;
 	else
-		values[3] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
+		values[5] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
 
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index bf88858..5a18b9a 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5380,9 +5380,9 @@
   proname => 'pg_stat_get_subscription_stats', proisstrict => 'f',
   provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,int8,int8,timestamptz}',
-  proargmodes => '{i,o,o,o,o}',
-  proargnames => '{subid,subid,apply_error_count,sync_error_count,stats_reset}',
+  proallargtypes => '{oid,oid,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,apply_error_count,sync_error_count,apply_commit_count,apply_rollback_count,stats_reset}',
   prosrc => 'pg_stat_get_subscription_stats' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index be2f7e2..f191047 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -87,6 +87,7 @@ typedef enum StatMsgType
 	PGSTAT_MTYPE_DISCONNECT,
 	PGSTAT_MTYPE_SUBSCRIPTIONDROP,
 	PGSTAT_MTYPE_SUBSCRIPTIONERROR,
+	PGSTAT_MTYPE_SUBSCRIPTIONXACT
 } StatMsgType;
 
 /* ----------
@@ -577,6 +578,25 @@ typedef struct PgStat_MsgSubscriptionError
 	bool		m_is_apply_error;
 } PgStat_MsgSubscriptionError;
 
+
+/* ----------
+ * PgStat_MsgSubscriptionXact          Sent by the apply worker to report transaction
+ *                                                                     ends.
+ * ----------
+ */
+typedef struct PgStat_MsgSubscriptionXact
+{
+	PgStat_MsgHdr m_hdr;
+
+	/* determine the worker entry */
+	Oid			m_databaseid;
+	Oid			m_subid;
+
+	PgStat_Counter apply_commit_count;
+	PgStat_Counter apply_rollback_count;
+} PgStat_MsgSubscriptionXact;
+
+
 /* ----------
  * PgStat_MsgRecoveryConflict	Sent by the backend upon recovery conflict
  * ----------
@@ -757,6 +777,7 @@ typedef union PgStat_Msg
 	PgStat_MsgDisconnect msg_disconnect;
 	PgStat_MsgSubscriptionError msg_subscriptionerror;
 	PgStat_MsgSubscriptionDrop msg_subscriptiondrop;
+	PgStat_MsgSubscriptionXact msg_subscriptionxact;
 } PgStat_Msg;
 
 
@@ -981,6 +1002,9 @@ typedef struct PgStat_StatSubEntry
 
 	PgStat_Counter apply_error_count;
 	PgStat_Counter sync_error_count;
+	PgStat_Counter apply_commit_count;
+	PgStat_Counter apply_rollback_count;
+
 	TimestampTz stat_reset_timestamp;
 } PgStat_StatSubEntry;
 
@@ -1177,6 +1201,8 @@ extern void pgstat_send_archiver(const char *xlog, bool failed);
 extern void pgstat_send_bgwriter(void);
 extern void pgstat_send_checkpointer(void);
 extern void pgstat_send_wal(bool force);
+extern void pgstat_report_subscription_xact(bool force);
+
 
 /* ----------
  * Support functions for the SQL-callable functions to
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 3c3f5f6..c5d01fa 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -68,6 +68,15 @@ typedef struct LogicalRepWorker
 	TimestampTz reply_time;
 } LogicalRepWorker;
 
+
+typedef struct LogicalRepSubscriptionStats
+{
+	Oid			subid;
+
+	int64		apply_commit_count;
+	int64		apply_rollback_count;
+} LogicalRepSubscriptionStats;
+
 /* Main memory context for apply worker. Permanent during worker lifetime. */
 extern MemoryContext ApplyContext;
 
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index ac46856..a7ef303 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2076,9 +2076,11 @@ pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
     ss.sync_error_count,
+    ss.apply_commit_count,
+    ss.apply_rollback_count,
     ss.stats_reset
    FROM pg_subscription s,
-    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, stats_reset);
+    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, apply_commit_count, apply_rollback_count, stats_reset);
 pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid,
     pg_stat_all_indexes.indexrelid,
     pg_stat_all_indexes.schemaname,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index d9b83f7..c5f7aec 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1401,6 +1401,7 @@ LogicalRepRelId
 LogicalRepRelMapEntry
 LogicalRepRelation
 LogicalRepRollbackPreparedTxnData
+LogicalRepSubscriptionStats
 LogicalRepTupleData
 LogicalRepTyp
 LogicalRepWorker
@@ -1947,6 +1948,7 @@ PgStat_MsgResetsubcounter
 PgStat_MsgSLRU
 PgStat_MsgSubscriptionDrop
 PgStat_MsgSubscriptionError
+PgStat_MsgSubscriptionXact
 PgStat_MsgTabpurge
 PgStat_MsgTabstat
 PgStat_MsgTempFile
-- 
1.8.3.1

