From 0c663dfac6ac5b41420a0f61ccfd40c14f5c2e59 Mon Sep 17 00:00:00 2001
From: Takamichi Osumi <osumi.takamichi@fujitsu.com>
Date: Tue, 4 Jan 2022 09:50:31 +0000
Subject: [PATCH v20 2/2] Extend pg_stat_subscription_workers to include
 general transaction statistics

Categorize transactions of logical replication subscriber
into three types (commit, abort, error) and introduce
cumulative columns of those numbers in the pg_stat_subscription_workers.

In order to avoid having a large number of entries to be created
by the table synchronization for many tables, the new stats columns
are utilized only by the apply worker.

The timing when the data of transaction statistics is sent to the
stats collector is adjusted to avoid overload.

Author: Takamichi Osumi
Reviewed-by: Amit Kapila, Masahiko Sawada, Hou Zhijie, Greg Nancarrow,
             Vignesh C, Ajin Cherian, Kyotaro Horiguchi, Tang Haiying
Discussion: https://www.postgresql.org/message-id/OSBPR01MB48887CA8F40C8D984A6DC00CED199%40OSBPR01MB4888.jpnprd01.prod.outlook.com
---
 doc/src/sgml/monitoring.sgml                |  42 +++++++++-
 src/backend/catalog/system_views.sql        |   3 +
 src/backend/postmaster/pgstat.c             | 126 ++++++++++++++++++++++++++++
 src/backend/replication/logical/launcher.c  |   4 +
 src/backend/replication/logical/worker.c    |  18 ++++
 src/backend/utils/adt/pgstatfuncs.c         |  25 ++++--
 src/include/catalog/pg_proc.dat             |   6 +-
 src/include/pgstat.h                        |  28 +++++++
 src/include/replication/worker_internal.h   |   6 ++
 src/test/regress/expected/rules.out         |   5 +-
 src/test/subscription/t/026_worker_stats.pl | 109 +++++++++++++++++++++++-
 src/tools/pgindent/typedefs.list            |   1 +
 12 files changed, 354 insertions(+), 19 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 62f2a33..c1d56be 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -629,8 +629,8 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
 
      <row>
       <entry><structname>pg_stat_subscription_workers</structname><indexterm><primary>pg_stat_subscription_workers</primary></indexterm></entry>
-      <entry>One row per subscription worker, showing statistics about errors
-      that occurred on that subscription worker.
+      <entry>One row per subscription worker, showing statistics about transactions
+      and errors that occurred on that subscription worker.
       See <link linkend="monitoring-pg-stat-subscription-workers">
       <structname>pg_stat_subscription_workers</structname></link> for details.
       </entry>
@@ -3074,8 +3074,10 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
    The <structname>pg_stat_subscription_workers</structname> view will contain
    one row per subscription worker on which errors have occurred, for workers
    applying logical replication changes and workers handling the initial data
-   copy of the subscribed tables.  The statistics entry is removed when the
-   corresponding subscription is dropped.
+   copy of the subscribed tables. The row corresponding to the apply
+   worker shows transaction statistics of the main apply worker on the
+   subscription. The statistics entry is removed when the corresponding
+   subscription is dropped.
   </para>
 
   <table id="pg-stat-subscription-workers" xreflabel="pg_stat_subscription_workers">
@@ -3123,6 +3125,38 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>commit_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions successfully applied in this subscription.
+       Both <command>COMMIT</command> and <command>COMMIT PREPARED</command>
+       increment this counter.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>abort_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions aborted in this subscription.
+       <command>ROLLBACK PREPARED</command> increments this counter.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>error_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions that failed to be applied by the apply
+       worker in this subscription. This counter is updated after
+       confirming the error is not the same as the previous one.
+       </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>last_error_relid</structfield> <type>oid</type>
       </para>
       <para>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 61b515c..dda3cd7 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1267,6 +1267,9 @@ CREATE VIEW pg_stat_subscription_workers AS
         w.subid,
         s.subname,
         w.subrelid,
+        w.commit_count,
+        w.abort_count,
+        w.error_count,
         w.last_error_relid,
         w.last_error_command,
         w.last_error_xid,
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 7264d2c..29462da 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -55,6 +55,7 @@
 #include "postmaster/postmaster.h"
 #include "replication/slot.h"
 #include "replication/walsender.h"
+#include "replication/worker_internal.h"
 #include "storage/backendid.h"
 #include "storage/dsm.h"
 #include "storage/fd.h"
@@ -382,6 +383,7 @@ static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
 static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
 static void pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len);
 static void pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len);
+static void pgstat_recv_subworker_xact_end(PgStat_MsgSubWorkerXactEnd *msg, int len);
 
 /* ------------------------------------------------------------
  * Public functions called from postmaster follow
@@ -1977,6 +1979,42 @@ pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
 }
 
 /* ----------
+ * pgstat_report_subworker_xact_end() -
+ *
+ *  Tell the collector that worker transaction has successfully completed.
+ *  This should be called before the call of process_syncing_tables() so to not
+ *  miss an increment of transaction stats in case it leads to a process exit.
+ *  See process_syncing_tables_for_apply().
+ * ----------
+ */
+void
+pgstat_report_subworker_xact_end(LogicalRepMsgType command)
+{
+	Assert(command == LOGICAL_REP_MSG_COMMIT ||
+		   command == LOGICAL_REP_MSG_STREAM_COMMIT ||
+		   command == LOGICAL_REP_MSG_COMMIT_PREPARED ||
+		   command == LOGICAL_REP_MSG_ROLLBACK_PREPARED);
+
+	switch (command)
+	{
+		case LOGICAL_REP_MSG_COMMIT:
+		case LOGICAL_REP_MSG_STREAM_COMMIT:
+		case LOGICAL_REP_MSG_COMMIT_PREPARED:
+			MyLogicalRepWorker->commit_count++;
+			break;
+		case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
+			MyLogicalRepWorker->abort_count++;
+			break;
+		default:
+			ereport(ERROR,
+					errmsg("unexpected logical message type for subworker's transaction statistics"));
+			break;
+	}
+
+	pgstat_send_subworker_xact_stats(false);
+}
+
+/* ----------
  * pgstat_report_subscription_drop() -
  *
  *	Tell the collector about dropping the subscription.
@@ -3484,6 +3522,59 @@ pgstat_send_subscription_purge(PgStat_MsgSubscriptionPurge *msg)
 }
 
 /* ----------
+ * pgstat_send_subworker_xact_stats() -
+ *
+ *	Send a subworker's transaction stats to the collector.
+ *	The statistics are cleared upon return.
+ *
+ *	'force' is true only when the subscription worker process exits.
+ * ----------
+ */
+void
+pgstat_send_subworker_xact_stats(bool force)
+{
+	static TimestampTz last_report = 0;
+	PgStat_MsgSubWorkerXactEnd msg;
+
+	/*
+	 * This function can be called even if nothing at all has happened. In
+	 * this case, there's no need to go forward.
+	 */
+	if (MyLogicalRepWorker->commit_count == 0 && MyLogicalRepWorker->abort_count == 0)
+		return;
+
+	if (!force)
+	{
+		TimestampTz now = GetCurrentTimestamp();
+
+		/*
+		 * Don't send a message unless it's been at least PGSTAT_STAT_INTERVAL
+		 * msec since we last sent one to avoid overloading the stats
+		 * collector.
+		 */
+		if (!TimestampDifferenceExceeds(last_report, now, PGSTAT_STAT_INTERVAL))
+			return;
+		last_report = now;
+	}
+
+	/*
+	 * Prepare and send the message.
+	 */
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERXACTEND);
+	msg.m_databaseid = MyDatabaseId;
+	msg.m_subid = MyLogicalRepWorker->subid;
+	msg.commit_count = MyLogicalRepWorker->commit_count;
+	msg.abort_count = MyLogicalRepWorker->abort_count;
+	pgstat_send(&msg, sizeof(PgStat_MsgSubWorkerXactEnd));
+
+	/*
+	 * Clear out the statistics.
+	 */
+	MyLogicalRepWorker->commit_count = 0;
+	MyLogicalRepWorker->abort_count = 0;
+}
+
+/* ----------
  * PgstatCollectorMain() -
  *
  *	Start up the statistics collector process.  This is the body of the
@@ -3746,6 +3837,10 @@ PgstatCollectorMain(int argc, char *argv[])
 					pgstat_recv_subworker_error(&msg.msg_subworkererror, len);
 					break;
 
+				case PGSTAT_MTYPE_SUBWORKERXACTEND:
+					pgstat_recv_subworker_xact_end(&msg.msg_subworkerxactend, len);
+					break;
+
 				default:
 					break;
 			}
@@ -3965,6 +4060,9 @@ pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry, Oid subid, Oid subrelid,
 	/* If not found, initialize the new one */
 	if (!found)
 	{
+		subwentry->commit_count = 0;
+		subwentry->abort_count = 0;
+		subwentry->error_count = 0;
 		subwentry->last_error_relid = InvalidOid;
 		subwentry->last_error_command = 0;
 		subwentry->last_error_xid = InvalidTransactionId;
@@ -6193,6 +6291,34 @@ pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len)
 	subwentry->last_error_time = msg->m_timestamp;
 	strlcpy(subwentry->last_error_message, msg->m_message,
 			PGSTAT_SUBWORKERERROR_MSGLEN);
+
+	/*
+	 * Only if this is a new error reported by the apply worker, increment the
+	 * counter of error.
+	 */
+	if (!OidIsValid(msg->m_subrelid))
+		subwentry->error_count++;
+}
+
+/* ----------
+ * pgstat_recv_subworker_xact_end() -
+ *
+ *	Process a SUBWORKERXACTEND message.
+ * ----------
+ */
+static void
+pgstat_recv_subworker_xact_end(PgStat_MsgSubWorkerXactEnd *msg, int len)
+{
+	PgStat_StatDBEntry *dbentry;
+	PgStat_StatSubWorkerEntry *wentry;
+
+	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
+	wentry = pgstat_get_subworker_entry(dbentry, msg->m_subid,
+										InvalidOid, true);
+	Assert(wentry);
+
+	wentry->commit_count += msg->commit_count;
+	wentry->abort_count += msg->abort_count;
 }
 
 /* ----------
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 3fb4caa..8fb9b2a 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -385,6 +385,8 @@ retry:
 	TIMESTAMP_NOBEGIN(worker->last_recv_time);
 	worker->reply_lsn = InvalidXLogRecPtr;
 	TIMESTAMP_NOBEGIN(worker->reply_time);
+	worker->commit_count = 0;
+	worker->abort_count = 0;
 
 	/* Before releasing lock, remember generation for future identification. */
 	generation = worker->generation;
@@ -647,6 +649,8 @@ logicalrep_worker_onexit(int code, Datum arg)
 	if (LogRepWorkerWalRcvConn)
 		walrcv_disconnect(LogRepWorkerWalRcvConn);
 
+	pgstat_send_subworker_xact_stats(true);
+
 	logicalrep_worker_detach();
 
 	/* Cleanup fileset used for streaming transactions. */
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 2e79302..3815627c 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -815,6 +815,9 @@ apply_handle_commit(StringInfo s)
 
 	apply_handle_commit_internal(&commit_data);
 
+	/* Update stats */
+	pgstat_report_subworker_xact_end(LOGICAL_REP_MSG_COMMIT);
+
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(commit_data.end_lsn);
 
@@ -960,6 +963,9 @@ apply_handle_commit_prepared(StringInfo s)
 	store_flush_position(prepare_data.end_lsn);
 	in_remote_transaction = false;
 
+	/* Update stats */
+	pgstat_report_subworker_xact_end(LOGICAL_REP_MSG_COMMIT_PREPARED);
+
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(prepare_data.end_lsn);
 
@@ -1011,6 +1017,9 @@ apply_handle_rollback_prepared(StringInfo s)
 	store_flush_position(rollback_data.rollback_end_lsn);
 	in_remote_transaction = false;
 
+	/* Update stats */
+	pgstat_report_subworker_xact_end(LOGICAL_REP_MSG_ROLLBACK_PREPARED);
+
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(rollback_data.rollback_end_lsn);
 
@@ -1435,6 +1444,9 @@ apply_handle_stream_commit(StringInfo s)
 	/* unlink the files with serialized changes and subxact info */
 	stream_cleanup_files(MyLogicalRepWorker->subid, xid);
 
+	/* Update stats */
+	pgstat_report_subworker_xact_end(LOGICAL_REP_MSG_STREAM_COMMIT);
+
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(commit_data.end_lsn);
 
@@ -2561,6 +2573,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 		MemoryContextSwitchTo(ApplyMessageContext);
 
+		/*
+		 * Process if we have pending transaction stats and reached
+		 * PGSTAT_STAT_INTERVAL
+		 */
+		pgstat_send_subworker_xact_stats(false);
+
 		len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 
 		if (len != 0)
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index f529c15..87b5254 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2415,7 +2415,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS	8
+#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS	11
 	Oid			subid = PG_GETARG_OID(0);
 	Oid			subrelid;
 	TupleDesc	tupdesc;
@@ -2442,17 +2442,23 @@ pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 					   OIDOID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "subrelid",
 					   OIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "last_error_relid",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "commit_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "abort_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "error_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "last_error_relid",
 					   OIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "last_error_command",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "last_error_command",
 					   TEXTOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "last_error_xid",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "last_error_xid",
 					   XIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "last_error_count",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "last_error_count",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "last_error_message",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "last_error_message",
 					   TEXTOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "last_error_time",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 11, "last_error_time",
 					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
@@ -2470,6 +2476,11 @@ pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 	else
 		nulls[i++] = true;
 
+	/* transaction stats */
+	values[i++] = Int64GetDatum(wentry->commit_count);
+	values[i++] = Int64GetDatum(wentry->abort_count);
+	values[i++] = Int64GetDatum(wentry->error_count);
+
 	/* last_error_relid */
 	if (OidIsValid(wentry->last_error_relid))
 		values[i++] = ObjectIdGetDatum(wentry->last_error_relid);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 4d992dc..b76f84a 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5375,9 +5375,9 @@
   proname => 'pg_stat_get_subscription_worker', prorows => '1', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'oid oid',
-  proallargtypes => '{oid,oid,oid,oid,oid,text,xid,int8,text,timestamptz}',
-  proargmodes => '{i,i,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subrelid,subid,subrelid,last_error_relid,last_error_command,last_error_xid,last_error_count,last_error_message,last_error_time}',
+  proallargtypes => '{oid,oid,oid,oid,int8,int8,int8,oid,text,xid,int8,text,timestamptz}',
+  proargmodes => '{i,i,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subrelid,subid,subrelid,commit_count,abort_count,error_count,last_error_relid,last_error_command,last_error_xid,last_error_count,last_error_message,last_error_time}',
   prosrc => 'pg_stat_get_subscription_worker' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 5b51b58..90b01d2 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -86,6 +86,7 @@ typedef enum StatMsgType
 	PGSTAT_MTYPE_DISCONNECT,
 	PGSTAT_MTYPE_SUBSCRIPTIONPURGE,
 	PGSTAT_MTYPE_SUBWORKERERROR,
+	PGSTAT_MTYPE_SUBWORKERXACTEND
 } StatMsgType;
 
 /* ----------
@@ -591,6 +592,23 @@ typedef struct PgStat_MsgSubWorkerError
 } PgStat_MsgSubWorkerError;
 
 /* ----------
+ * PgStat_MsgSubscriptionXactEnd	Sent by the apply worker to report transaction
+ *									ends.
+ * ----------
+ */
+typedef struct PgStat_MsgSubWorkerXactEnd
+{
+	PgStat_MsgHdr m_hdr;
+
+	/* determine the worker entry */
+	Oid			m_databaseid;
+	Oid			m_subid;
+
+	PgStat_Counter commit_count;
+	PgStat_Counter abort_count;
+} PgStat_MsgSubWorkerXactEnd;
+
+/* ----------
  * PgStat_MsgRecoveryConflict	Sent by the backend upon recovery conflict
  * ----------
  */
@@ -769,6 +787,7 @@ typedef union PgStat_Msg
 	PgStat_MsgDisconnect msg_disconnect;
 	PgStat_MsgSubscriptionPurge msg_subscriptionpurge;
 	PgStat_MsgSubWorkerError msg_subworkererror;
+	PgStat_MsgSubWorkerXactEnd msg_subworkerxactend;
 } PgStat_Msg;
 
 
@@ -1010,6 +1029,13 @@ typedef struct PgStat_StatSubWorkerEntry
 	PgStat_StatSubWorkerKey key;	/* hash key (must be first) */
 
 	/*
+	 * Cumulative transaction statistics of subscription worker
+	 */
+	PgStat_Counter commit_count;
+	PgStat_Counter abort_count;
+	PgStat_Counter error_count;
+
+	/*
 	 * Subscription worker error statistics representing an error that
 	 * occurred during application of changes or the initial table
 	 * synchronization.
@@ -1134,6 +1160,7 @@ extern void pgstat_report_replslot_drop(const char *slotname);
 extern void pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
 										  LogicalRepMsgType command,
 										  TransactionId xid, const char *errmsg);
+extern void pgstat_report_subworker_xact_end(LogicalRepMsgType command);
 extern void pgstat_report_subscription_drop(Oid subid);
 
 extern void pgstat_initialize(void);
@@ -1217,6 +1244,7 @@ extern void pgstat_send_archiver(const char *xlog, bool failed);
 extern void pgstat_send_bgwriter(void);
 extern void pgstat_send_checkpointer(void);
 extern void pgstat_send_wal(bool force);
+extern void pgstat_send_subworker_xact_stats(bool force);
 
 /* ----------
  * Support functions for the SQL-callable functions to
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 9d29849..d6e4570 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -66,6 +66,12 @@ typedef struct LogicalRepWorker
 	TimestampTz last_recv_time;
 	XLogRecPtr	reply_lsn;
 	TimestampTz reply_time;
+
+	/*
+	 * Transaction statistics of subscription worker
+	 */
+	int64		commit_count;
+	int64		abort_count;
 } LogicalRepWorker;
 
 /* Main memory context for apply worker. Permanent during worker lifetime. */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index b58b062..2b0bfae 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2097,6 +2097,9 @@ pg_stat_subscription| SELECT su.oid AS subid,
 pg_stat_subscription_workers| SELECT w.subid,
     s.subname,
     w.subrelid,
+    w.commit_count,
+    w.abort_count,
+    w.error_count,
     w.last_error_relid,
     w.last_error_command,
     w.last_error_xid,
@@ -2110,7 +2113,7 @@ pg_stat_subscription_workers| SELECT w.subid,
          SELECT pg_subscription_rel.srsubid AS subid,
             pg_subscription_rel.srrelid AS relid
            FROM pg_subscription_rel) sr,
-    (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, last_error_relid, last_error_command, last_error_xid, last_error_count, last_error_message, last_error_time)
+    (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, commit_count, abort_count, error_count, last_error_relid, last_error_command, last_error_xid, last_error_count, last_error_message, last_error_time)
      JOIN pg_subscription s ON ((w.subid = s.oid)));
 pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid,
     pg_stat_all_indexes.indexrelid,
diff --git a/src/test/subscription/t/026_worker_stats.pl b/src/test/subscription/t/026_worker_stats.pl
index 8005c54..99253f4 100644
--- a/src/test/subscription/t/026_worker_stats.pl
+++ b/src/test/subscription/t/026_worker_stats.pl
@@ -1,7 +1,7 @@
 
 # Copyright (c) 2021, PostgreSQL Global Development Group
 
-# Tests for subscription error stats.
+# Tests for subscription stats.
 use strict;
 use warnings;
 use PostgreSQL::Test::Cluster;
@@ -44,9 +44,32 @@ WHERE last_error_relid = '$relname'::regclass
 	  or die "Timed out while waiting for " . $msg;
 }
 
+# Test whether the update of general transaction stats satisfies the expected
+# condition or not.
+sub confirm_transaction_stats_update
+{
+	my ($node, $condition, $msg) = @_;
+
+	# Check only the stats of the apply worker
+	my $sql = qq[
+SELECT count(1) = 1
+FROM pg_stat_subscription_workers
+WHERE subrelid IS NULL AND $condition];
+
+	$node->poll_query_until('postgres', $sql)
+	  or die "Timed out while waiting for " . $msg;
+}
+
 # Create publisher node.
 my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
 $node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf(
+	'postgresql.conf', qq[
+logical_decoding_work_mem = 64kB
+max_prepared_transactions = 10
+max_wal_senders = 10
+wal_sender_timeout = 0
+]);
 $node_publisher->start;
 
 # Create subscriber node.
@@ -58,6 +81,7 @@ $node_subscriber->init(allows_streaming => 'logical');
 $node_subscriber->append_conf(
 	'postgresql.conf',
 	qq[
+max_prepared_transactions = 10
 wal_retrieve_retry_interval = 2s
 ]);
 $node_subscriber->start;
@@ -98,7 +122,7 @@ is($result, qq(0), 'check no subscription error');
 # Create subscription. The table sync for test_tab2 on tap_sub will enter into
 # infinite error loop due to violating the unique constraint.
 $node_subscriber->safe_psql('postgres',
-	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub;"
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (streaming = on, two_phase = on);"
 );
 
 $node_publisher->wait_for_catchup('tap_sub');
@@ -132,6 +156,11 @@ test_subscription_error(
 	qq(duplicate key value violates unique constraint),
 	'error reported by the apply worker');
 
+confirm_transaction_stats_update(
+	$node_subscriber,
+	'error_count = 1',
+	'the error_count increment by the apply worker');
+
 # Check the table sync worker's error in the view.
 test_subscription_error(
 	$node_subscriber, 'test_tab2', '', '',
@@ -150,11 +179,83 @@ $node_subscriber->poll_query_until('postgres',
 $node_subscriber->poll_query_until('postgres',
 	"SELECT count(1) > 0 FROM test_tab2");
 
-# There shouldn't be any errors in the view after dropping the subscription.
+# Check updation of subscription worker transaction count statistics.
+# COMMIT of an insertion of single record to test_tab1
+confirm_transaction_stats_update(
+	$node_subscriber,
+	'commit_count = 1',
+	'the commit_count increment by the apply worker');
+
+# Some more tests for transaction stats
+# PREPARE & COMMIT PREPARED
+$node_publisher->safe_psql(
+	'postgres', qq[
+BEGIN;
+INSERT INTO test_tab1 VALUES (2);
+PREPARE TRANSACTION 'gid1'
+]);
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'gid1'");
+confirm_transaction_stats_update(
+	$node_subscriber,
+	'commit_count = 2',
+	'the commit_count increment by commit prepared');
+
+# STREAM COMMIT
+$node_publisher->safe_psql(
+	'postgres', qq[
+BEGIN;
+INSERT INTO test_tab1 VALUES(generate_series(1001, 2000));
+COMMIT;
+]);
+confirm_transaction_stats_update(
+	$node_subscriber,
+	'commit_count = 3',
+	'the commit_count increment by stream commit');
+
+# STREAM PREPARE & COMMIT PREPARED
+$node_publisher->safe_psql(
+	'postgres', qq[
+BEGIN;
+INSERT INTO test_tab1 VALUES(generate_series(2001, 3000));
+PREPARE TRANSACTION 'gid2'
+]);
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'gid2'");
+confirm_transaction_stats_update(
+	$node_subscriber,
+	'commit_count = 4',
+	'the commit_count increment by streamed commit prepared');
+
+# ROLLBACK PREPARED
+$node_publisher->safe_psql(
+	'postgres', qq[
+BEGIN;
+INSERT INTO test_tab1 VALUES (3);
+PREPARE TRANSACTION 'gid3';
+]);
+$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'gid3'");
+confirm_transaction_stats_update(
+	$node_subscriber,
+	'abort_count = 1',
+	'the abort_count increment by rollback prepared');
+
+# STREAM PREPARE & ROLLBACK PREPARED
+$node_publisher->safe_psql(
+	'postgres', qq[
+BEGIN;
+INSERT INTO test_tab1 VALUES (generate_series(3001, 4000));
+PREPARE TRANSACTION 'gid4';
+]);
+$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'gid4'");
+confirm_transaction_stats_update(
+	$node_subscriber,
+	'abort_count = 2',
+	'the abort_count increment by streamed rollback prepared');
+
+# There shouldn't be any records in the view after dropping the subscription.
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub;");
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(1) FROM pg_stat_subscription_workers");
-is($result, q(0), 'no error after dropping subscription');
+is($result, q(0), 'no record after dropping subscription');
 
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index f093605..8bde390 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1945,6 +1945,7 @@ PgStat_MsgResetslrucounter
 PgStat_MsgSLRU
 PgStat_MsgSubscriptionPurge
 PgStat_MsgSubWorkerError
+PgStat_MsgSubWorkerXactEnd
 PgStat_MsgTabpurge
 PgStat_MsgTabstat
 PgStat_MsgTempFile
-- 
1.8.3.1

