From c1ea66e5028e74403a1ef33e9c7c7ea3bb9786fd Mon Sep 17 00:00:00 2001
From: Takamichi Osumi <osumi.takamichi@fujitsu.com>
Date: Mon, 20 Dec 2021 09:13:08 +0000
Subject: [PATCH v18 2/2] Extend pg_stat_subscription_workers to include
 general transaction statistics

Categorize transactions of logical replication subscriber
into three types (commit, abort, error) and introduce
cumulative columns of those numbers in the pg_stat_subscription_workers.

In order to avoid having a large number of entries to be created
by the table synchronization for many tables, the new stats columns
are utilized only by the apply worker.

Author: Takamichi Osumi
Reviewed-by: Amit Kapila, Masahiko Sawada, Hou Zhijie, Greg Nancarrow, Vignesh C, Ajin Cherian, Kyotaro Horiguchi
Discussion: https://www.postgresql.org/message-id/OSBPR01MB48887CA8F40C8D984A6DC00CED199%40OSBPR01MB4888.jpnprd01.prod.outlook.com
---
 doc/src/sgml/monitoring.sgml                |  42 +++++++++-
 src/backend/catalog/system_views.sql        |   3 +
 src/backend/postmaster/pgstat.c             | 124 ++++++++++++++++++++++++++++
 src/backend/replication/logical/launcher.c  |   4 +
 src/backend/replication/logical/worker.c    |  22 +++++
 src/backend/utils/adt/pgstatfuncs.c         |  25 ++++--
 src/include/catalog/pg_proc.dat             |   6 +-
 src/include/pgstat.h                        |  32 +++++++
 src/include/replication/worker_internal.h   |   6 ++
 src/test/regress/expected/rules.out         |   5 +-
 src/test/subscription/t/026_worker_stats.pl | 109 +++++++++++++++++++++++-
 src/tools/pgindent/typedefs.list            |   1 +
 12 files changed, 360 insertions(+), 19 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 62f2a33..e795a1d 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -629,8 +629,8 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
 
      <row>
       <entry><structname>pg_stat_subscription_workers</structname><indexterm><primary>pg_stat_subscription_workers</primary></indexterm></entry>
-      <entry>One row per subscription worker, showing statistics about errors
-      that occurred on that subscription worker.
+      <entry>One row per subscription worker, showing statistics about transactions
+      and errors that occurred on that subscription worker.
       See <link linkend="monitoring-pg-stat-subscription-workers">
       <structname>pg_stat_subscription_workers</structname></link> for details.
       </entry>
@@ -3074,8 +3074,10 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
    The <structname>pg_stat_subscription_workers</structname> view will contain
    one row per subscription worker on which errors have occurred, for workers
    applying logical replication changes and workers handling the initial data
-   copy of the subscribed tables.  The statistics entry is removed when the
-   corresponding subscription is dropped.
+   copy of the subscribed tables. The row corresponding to the apply
+   worker shows transaction statistics of the main apply worker on the
+   subscription. The statistics entry is removed when the corresponding
+   subscription is dropped.
   </para>
 
   <table id="pg-stat-subscription-workers" xreflabel="pg_stat_subscription_workers">
@@ -3123,6 +3125,38 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>commit_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions successfully applied in this subscription.
+       Both COMMIT and COMMIT PREPARED increment this counter.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>abort_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions aborted in this subscription.
+       ROLLBACK PREPARED increments this counter.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>error_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions that failed to be applied by the table
+       sync worker or main apply worker in this subscription. This
+       counter is updated after confirming the error is not same as
+       the previous one.
+       </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>last_error_relid</structfield> <type>oid</type>
       </para>
       <para>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 61b515c..dda3cd7 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1267,6 +1267,9 @@ CREATE VIEW pg_stat_subscription_workers AS
         w.subid,
         s.subname,
         w.subrelid,
+        w.commit_count,
+        w.abort_count,
+        w.error_count,
         w.last_error_relid,
         w.last_error_command,
         w.last_error_xid,
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 7264d2c..927c187 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -382,6 +382,7 @@ static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
 static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
 static void pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len);
 static void pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len);
+static void pgstat_recv_subworker_xact_end(PgStat_MsgSubWorkerXactEnd *msg, int len);
 
 /* ------------------------------------------------------------
  * Public functions called from postmaster follow
@@ -1949,6 +1950,42 @@ pgstat_report_replslot_drop(const char *slotname)
 }
 
 /* ----------
+ * pgstat_report_subworker_xact_end() -
+ *
+ *  Tell the collector that worker transaction has successfully completed.
+ *  This should be called before the call of process_syning_tables() not to
+ *  miss an increment of transaction stats in case it leads to a process exit.
+ *  See process_syncing_tables_for_apply().
+ * ----------
+ */
+void
+pgstat_report_subworker_xact_end(LogicalRepWorker *repWorker,
+								 LogicalRepMsgType command, bool force)
+{
+	Assert(command == LOGICAL_REP_MSG_COMMIT ||
+		   command == LOGICAL_REP_MSG_STREAM_COMMIT ||
+		   command == LOGICAL_REP_MSG_COMMIT_PREPARED ||
+		   command == LOGICAL_REP_MSG_ROLLBACK_PREPARED);
+
+	switch (command)
+	{
+		case LOGICAL_REP_MSG_COMMIT:
+		case LOGICAL_REP_MSG_STREAM_COMMIT:
+		case LOGICAL_REP_MSG_COMMIT_PREPARED:
+			repWorker->commit_count++;
+			break;
+		case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
+			repWorker->abort_count++;
+			break;
+		default:
+			elog(ERROR, "unexpected logical message type as transaction end");
+			break;
+	}
+
+	pgstat_send_subworker_xact_stats(repWorker, force);
+}
+
+/* ----------
  * pgstat_report_subworker_error() -
  *
  *	Tell the collector about the subscription worker error.
@@ -3484,6 +3521,58 @@ pgstat_send_subscription_purge(PgStat_MsgSubscriptionPurge *msg)
 }
 
 /* ----------
+ * pgstat_send_subworker_xact_stats() -
+ *
+ *	Send a subworker transaction stats to the collector.
+ *	'force' becomes true only when the subscription worker process exits.
+ * ----------
+ */
+void
+pgstat_send_subworker_xact_stats(LogicalRepWorker *repWorker, bool force)
+{
+	static TimestampTz last_report = 0;
+	PgStat_MsgSubWorkerXactEnd msg;
+
+	if (!force)
+	{
+		TimestampTz now = GetCurrentTimestamp();
+
+		/*
+		 * Don't send a message unless it's been at least PGSTAT_STAT_INTERVAL
+		 * msec since we last sent one to avoid overloading the stats
+		 * collector.
+		 */
+		if (!TimestampDifferenceExceeds(last_report, now, PGSTAT_STAT_INTERVAL))
+			return;
+		last_report = now;
+	}
+
+	/*
+	 * This function can be called even if nothing at all has happened. In
+	 * this case, avoid sending a completely empty message to the stats
+	 * collector.
+	 */
+	if (repWorker->commit_count == 0 && repWorker->abort_count == 0)
+		return;
+
+	/*
+	 * Prepare and send the message
+	 */
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERXACTEND);
+	msg.m_databaseid = MyDatabaseId;
+	msg.m_subid = repWorker->subid;
+	msg.commit_count = repWorker->commit_count;
+	msg.abort_count = repWorker->abort_count;
+	pgstat_send(&msg, sizeof(PgStat_MsgSubWorkerXactEnd));
+
+	/*
+	 * Clear out the statistics buffer, so it can be re-used.
+	 */
+	repWorker->commit_count = 0;
+	repWorker->abort_count = 0;
+}
+
+/* ----------
  * PgstatCollectorMain() -
  *
  *	Start up the statistics collector process.  This is the body of the
@@ -3746,6 +3835,10 @@ PgstatCollectorMain(int argc, char *argv[])
 					pgstat_recv_subworker_error(&msg.msg_subworkererror, len);
 					break;
 
+				case PGSTAT_MTYPE_SUBWORKERXACTEND:
+					pgstat_recv_subworker_xact_end(&msg.msg_subworkerxactend, len);
+					break;
+
 				default:
 					break;
 			}
@@ -3965,6 +4058,9 @@ pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry, Oid subid, Oid subrelid,
 	/* If not found, initialize the new one */
 	if (!found)
 	{
+		subwentry->commit_count = 0;
+		subwentry->abort_count = 0;
+		subwentry->error_count = 0;
 		subwentry->last_error_relid = InvalidOid;
 		subwentry->last_error_command = 0;
 		subwentry->last_error_xid = InvalidTransactionId;
@@ -6153,6 +6249,27 @@ pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len)
 }
 
 /* ----------
+ * pgstat_recv_subworker_xact_end() -
+ *
+ *	Process a SUBWORKERXACTEND message.
+ * ----------
+ */
+static void
+pgstat_recv_subworker_xact_end(PgStat_MsgSubWorkerXactEnd *msg, int len)
+{
+	PgStat_StatDBEntry *dbentry;
+	PgStat_StatSubWorkerEntry *wentry;
+
+	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
+	wentry = pgstat_get_subworker_entry(dbentry, msg->m_subid,
+										InvalidOid, true);
+	Assert(wentry);
+
+	wentry->commit_count += msg->commit_count;
+	wentry->abort_count += msg->abort_count;
+}
+
+/* ----------
  * pgstat_recv_subworker_error() -
  *
  *	Process a SUBWORKERERROR message.
@@ -6193,6 +6310,13 @@ pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len)
 	subwentry->last_error_time = msg->m_timestamp;
 	strlcpy(subwentry->last_error_message, msg->m_message,
 			PGSTAT_SUBWORKERERROR_MSGLEN);
+
+	/*
+	 * Only if this is a new error reported by the apply worker, increment the
+	 * counter of error.
+	 */
+	if (!OidIsValid(msg->m_subrelid))
+		subwentry->error_count++;
 }
 
 /* ----------
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 3fb4caa..bb67c66 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -385,6 +385,8 @@ retry:
 	TIMESTAMP_NOBEGIN(worker->last_recv_time);
 	worker->reply_lsn = InvalidXLogRecPtr;
 	TIMESTAMP_NOBEGIN(worker->reply_time);
+	worker->commit_count = 0;
+	worker->abort_count = 0;
 
 	/* Before releasing lock, remember generation for future identification. */
 	generation = worker->generation;
@@ -647,6 +649,8 @@ logicalrep_worker_onexit(int code, Datum arg)
 	if (LogRepWorkerWalRcvConn)
 		walrcv_disconnect(LogRepWorkerWalRcvConn);
 
+	pgstat_send_subworker_xact_stats(MyLogicalRepWorker, true);
+
 	logicalrep_worker_detach();
 
 	/* Cleanup fileset used for streaming transactions. */
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 2e79302..7bcf052 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -815,6 +815,10 @@ apply_handle_commit(StringInfo s)
 
 	apply_handle_commit_internal(&commit_data);
 
+	/* Update stats */
+	pgstat_report_subworker_xact_end(MyLogicalRepWorker,
+									 LOGICAL_REP_MSG_COMMIT, false);
+
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(commit_data.end_lsn);
 
@@ -960,6 +964,10 @@ apply_handle_commit_prepared(StringInfo s)
 	store_flush_position(prepare_data.end_lsn);
 	in_remote_transaction = false;
 
+	/* Update stats */
+	pgstat_report_subworker_xact_end(MyLogicalRepWorker,
+									 LOGICAL_REP_MSG_COMMIT_PREPARED, false);
+
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(prepare_data.end_lsn);
 
@@ -1011,6 +1019,10 @@ apply_handle_rollback_prepared(StringInfo s)
 	store_flush_position(rollback_data.rollback_end_lsn);
 	in_remote_transaction = false;
 
+	/* Update stats */
+	pgstat_report_subworker_xact_end(MyLogicalRepWorker,
+									 LOGICAL_REP_MSG_ROLLBACK_PREPARED, false);
+
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(rollback_data.rollback_end_lsn);
 
@@ -1435,6 +1447,10 @@ apply_handle_stream_commit(StringInfo s)
 	/* unlink the files with serialized changes and subxact info */
 	stream_cleanup_files(MyLogicalRepWorker->subid, xid);
 
+	/* Update stats */
+	pgstat_report_subworker_xact_end(MyLogicalRepWorker,
+									 LOGICAL_REP_MSG_STREAM_COMMIT, false);
+
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(commit_data.end_lsn);
 
@@ -2561,6 +2577,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 		MemoryContextSwitchTo(ApplyMessageContext);
 
+		/*
+		 * Process if we have pending transaction stats and reached
+		 * PGSTAT_STAT_INTERVAL
+		 */
+		pgstat_send_subworker_xact_stats(MyLogicalRepWorker, false);
+
 		len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 
 		if (len != 0)
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index f529c15..87b5254 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2415,7 +2415,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS	8
+#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS	11
 	Oid			subid = PG_GETARG_OID(0);
 	Oid			subrelid;
 	TupleDesc	tupdesc;
@@ -2442,17 +2442,23 @@ pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 					   OIDOID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "subrelid",
 					   OIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "last_error_relid",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "commit_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "abort_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "error_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "last_error_relid",
 					   OIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "last_error_command",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "last_error_command",
 					   TEXTOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "last_error_xid",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "last_error_xid",
 					   XIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "last_error_count",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "last_error_count",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "last_error_message",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "last_error_message",
 					   TEXTOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "last_error_time",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 11, "last_error_time",
 					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
@@ -2470,6 +2476,11 @@ pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 	else
 		nulls[i++] = true;
 
+	/* transaction stats */
+	values[i++] = Int64GetDatum(wentry->commit_count);
+	values[i++] = Int64GetDatum(wentry->abort_count);
+	values[i++] = Int64GetDatum(wentry->error_count);
+
 	/* last_error_relid */
 	if (OidIsValid(wentry->last_error_relid))
 		values[i++] = ObjectIdGetDatum(wentry->last_error_relid);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 4d992dc..b76f84a 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5375,9 +5375,9 @@
   proname => 'pg_stat_get_subscription_worker', prorows => '1', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'oid oid',
-  proallargtypes => '{oid,oid,oid,oid,oid,text,xid,int8,text,timestamptz}',
-  proargmodes => '{i,i,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subrelid,subid,subrelid,last_error_relid,last_error_command,last_error_xid,last_error_count,last_error_message,last_error_time}',
+  proallargtypes => '{oid,oid,oid,oid,int8,int8,int8,oid,text,xid,int8,text,timestamptz}',
+  proargmodes => '{i,i,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subrelid,subid,subrelid,commit_count,abort_count,error_count,last_error_relid,last_error_command,last_error_xid,last_error_count,last_error_message,last_error_time}',
   prosrc => 'pg_stat_get_subscription_worker' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 5b51b58..be55b55 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -15,6 +15,7 @@
 #include "portability/instr_time.h"
 #include "postmaster/pgarch.h"	/* for MAX_XFN_CHARS */
 #include "replication/logicalproto.h"
+#include "replication/worker_internal.h"
 #include "utils/backend_progress.h" /* for backward compatibility */
 #include "utils/backend_status.h"	/* for backward compatibility */
 #include "utils/hsearch.h"
@@ -86,6 +87,7 @@ typedef enum StatMsgType
 	PGSTAT_MTYPE_DISCONNECT,
 	PGSTAT_MTYPE_SUBSCRIPTIONPURGE,
 	PGSTAT_MTYPE_SUBWORKERERROR,
+	PGSTAT_MTYPE_SUBWORKERXACTEND
 } StatMsgType;
 
 /* ----------
@@ -558,6 +560,23 @@ typedef struct PgStat_MsgSubscriptionPurge
 } PgStat_MsgSubscriptionPurge;
 
 /* ----------
+ * PgStat_MsgSubscriptionXactEnd	Sent by the apply worker to report transaction
+ *									ends.
+ * ----------
+ */
+typedef struct PgStat_MsgSubWorkerXactEnd
+{
+	PgStat_MsgHdr m_hdr;
+
+	/* determine the worker entry */
+	Oid			m_databaseid;
+	Oid			m_subid;
+
+	PgStat_Counter commit_count;
+	PgStat_Counter abort_count;
+} PgStat_MsgSubWorkerXactEnd;
+
+/* ----------
  * PgStat_MsgSubWorkerError		Sent by the apply worker or the table sync
  *								worker to report the error occurred while
  *								processing changes.
@@ -769,6 +788,7 @@ typedef union PgStat_Msg
 	PgStat_MsgDisconnect msg_disconnect;
 	PgStat_MsgSubscriptionPurge msg_subscriptionpurge;
 	PgStat_MsgSubWorkerError msg_subworkererror;
+	PgStat_MsgSubWorkerXactEnd msg_subworkerxactend;
 } PgStat_Msg;
 
 
@@ -1010,6 +1030,13 @@ typedef struct PgStat_StatSubWorkerEntry
 	PgStat_StatSubWorkerKey key;	/* hash key (must be first) */
 
 	/*
+	 * Cumulative transaction statistics of subscription worker
+	 */
+	PgStat_Counter commit_count;
+	PgStat_Counter abort_count;
+	PgStat_Counter error_count;
+
+	/*
 	 * Subscription worker error statistics representing an error that
 	 * occurred during application of changes or the initial table
 	 * synchronization.
@@ -1131,6 +1158,9 @@ extern void pgstat_report_checksum_failure(void);
 extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat);
 extern void pgstat_report_replslot_create(const char *slotname);
 extern void pgstat_report_replslot_drop(const char *slotname);
+extern void pgstat_report_subworker_xact_end(LogicalRepWorker *repWorker,
+											 LogicalRepMsgType command,
+											 bool bforce);
 extern void pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
 										  LogicalRepMsgType command,
 										  TransactionId xid, const char *errmsg);
@@ -1217,6 +1247,8 @@ extern void pgstat_send_archiver(const char *xlog, bool failed);
 extern void pgstat_send_bgwriter(void);
 extern void pgstat_send_checkpointer(void);
 extern void pgstat_send_wal(bool force);
+extern void pgstat_send_subworker_xact_stats(LogicalRepWorker *repWorker,
+											 bool force);
 
 /* ----------
  * Support functions for the SQL-callable functions to
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 9d29849..d6e4570 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -66,6 +66,12 @@ typedef struct LogicalRepWorker
 	TimestampTz last_recv_time;
 	XLogRecPtr	reply_lsn;
 	TimestampTz reply_time;
+
+	/*
+	 * Transaction statistics of subscription worker
+	 */
+	int64		commit_count;
+	int64		abort_count;
 } LogicalRepWorker;
 
 /* Main memory context for apply worker. Permanent during worker lifetime. */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index b58b062..2b0bfae 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2097,6 +2097,9 @@ pg_stat_subscription| SELECT su.oid AS subid,
 pg_stat_subscription_workers| SELECT w.subid,
     s.subname,
     w.subrelid,
+    w.commit_count,
+    w.abort_count,
+    w.error_count,
     w.last_error_relid,
     w.last_error_command,
     w.last_error_xid,
@@ -2110,7 +2113,7 @@ pg_stat_subscription_workers| SELECT w.subid,
          SELECT pg_subscription_rel.srsubid AS subid,
             pg_subscription_rel.srrelid AS relid
            FROM pg_subscription_rel) sr,
-    (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, last_error_relid, last_error_command, last_error_xid, last_error_count, last_error_message, last_error_time)
+    (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, commit_count, abort_count, error_count, last_error_relid, last_error_command, last_error_xid, last_error_count, last_error_message, last_error_time)
      JOIN pg_subscription s ON ((w.subid = s.oid)));
 pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid,
     pg_stat_all_indexes.indexrelid,
diff --git a/src/test/subscription/t/026_worker_stats.pl b/src/test/subscription/t/026_worker_stats.pl
index 8005c54..99253f4 100644
--- a/src/test/subscription/t/026_worker_stats.pl
+++ b/src/test/subscription/t/026_worker_stats.pl
@@ -1,7 +1,7 @@
 
 # Copyright (c) 2021, PostgreSQL Global Development Group
 
-# Tests for subscription error stats.
+# Tests for subscription stats.
 use strict;
 use warnings;
 use PostgreSQL::Test::Cluster;
@@ -44,9 +44,32 @@ WHERE last_error_relid = '$relname'::regclass
 	  or die "Timed out while waiting for " . $msg;
 }
 
+# Test whether the update of general transaction stats satisfies the expected
+# condition or not.
+sub confirm_transaction_stats_update
+{
+	my ($node, $condition, $msg) = @_;
+
+	# Check only the stats of the apply worker
+	my $sql = qq[
+SELECT count(1) = 1
+FROM pg_stat_subscription_workers
+WHERE subrelid IS NULL AND $condition];
+
+	$node->poll_query_until('postgres', $sql)
+	  or die "Timed out while waiting for " . $msg;
+}
+
 # Create publisher node.
 my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
 $node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf(
+	'postgresql.conf', qq[
+logical_decoding_work_mem = 64kB
+max_prepared_transactions = 10
+max_wal_senders = 10
+wal_sender_timeout = 0
+]);
 $node_publisher->start;
 
 # Create subscriber node.
@@ -58,6 +81,7 @@ $node_subscriber->init(allows_streaming => 'logical');
 $node_subscriber->append_conf(
 	'postgresql.conf',
 	qq[
+max_prepared_transactions = 10
 wal_retrieve_retry_interval = 2s
 ]);
 $node_subscriber->start;
@@ -98,7 +122,7 @@ is($result, qq(0), 'check no subscription error');
 # Create subscription. The table sync for test_tab2 on tap_sub will enter into
 # infinite error loop due to violating the unique constraint.
 $node_subscriber->safe_psql('postgres',
-	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub;"
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (streaming = on, two_phase = on);"
 );
 
 $node_publisher->wait_for_catchup('tap_sub');
@@ -132,6 +156,11 @@ test_subscription_error(
 	qq(duplicate key value violates unique constraint),
 	'error reported by the apply worker');
 
+confirm_transaction_stats_update(
+	$node_subscriber,
+	'error_count = 1',
+	'the error_count increment by the apply worker');
+
 # Check the table sync worker's error in the view.
 test_subscription_error(
 	$node_subscriber, 'test_tab2', '', '',
@@ -150,11 +179,83 @@ $node_subscriber->poll_query_until('postgres',
 $node_subscriber->poll_query_until('postgres',
 	"SELECT count(1) > 0 FROM test_tab2");
 
-# There shouldn't be any errors in the view after dropping the subscription.
+# Check updation of subscription worker transaction count statistics.
+# COMMIT of an insertion of single record to test_tab1
+confirm_transaction_stats_update(
+	$node_subscriber,
+	'commit_count = 1',
+	'the commit_count increment by the apply worker');
+
+# Some more tests for transaction stats
+# PREPARE & COMMIT PREPARED
+$node_publisher->safe_psql(
+	'postgres', qq[
+BEGIN;
+INSERT INTO test_tab1 VALUES (2);
+PREPARE TRANSACTION 'gid1'
+]);
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'gid1'");
+confirm_transaction_stats_update(
+	$node_subscriber,
+	'commit_count = 2',
+	'the commit_count increment by commit prepared');
+
+# STREAM COMMIT
+$node_publisher->safe_psql(
+	'postgres', qq[
+BEGIN;
+INSERT INTO test_tab1 VALUES(generate_series(1001, 2000));
+COMMIT;
+]);
+confirm_transaction_stats_update(
+	$node_subscriber,
+	'commit_count = 3',
+	'the commit_count increment by stream commit');
+
+# STREAM PREPARE & COMMIT PREPARED
+$node_publisher->safe_psql(
+	'postgres', qq[
+BEGIN;
+INSERT INTO test_tab1 VALUES(generate_series(2001, 3000));
+PREPARE TRANSACTION 'gid2'
+]);
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'gid2'");
+confirm_transaction_stats_update(
+	$node_subscriber,
+	'commit_count = 4',
+	'the commit_count increment by streamed commit prepared');
+
+# ROLLBACK PREPARED
+$node_publisher->safe_psql(
+	'postgres', qq[
+BEGIN;
+INSERT INTO test_tab1 VALUES (3);
+PREPARE TRANSACTION 'gid3';
+]);
+$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'gid3'");
+confirm_transaction_stats_update(
+	$node_subscriber,
+	'abort_count = 1',
+	'the abort_count increment by rollback prepared');
+
+# STREAM PREPARE & ROLLBACK PREPARED
+$node_publisher->safe_psql(
+	'postgres', qq[
+BEGIN;
+INSERT INTO test_tab1 VALUES (generate_series(3001, 4000));
+PREPARE TRANSACTION 'gid4';
+]);
+$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'gid4'");
+confirm_transaction_stats_update(
+	$node_subscriber,
+	'abort_count = 2',
+	'the abort_count increment by streamed rollback prepared');
+
+# There shouldn't be any records in the view after dropping the subscription.
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub;");
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(1) FROM pg_stat_subscription_workers");
-is($result, q(0), 'no error after dropping subscription');
+is($result, q(0), 'no record after dropping subscription');
 
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 0c61ccb..238e9fa 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1947,6 +1947,7 @@ PgStat_MsgResetslrucounter
 PgStat_MsgSLRU
 PgStat_MsgSubscriptionPurge
 PgStat_MsgSubWorkerError
+PgStat_MsgSubWorkerXactEnd
 PgStat_MsgTabpurge
 PgStat_MsgTabstat
 PgStat_MsgTempFile
-- 
1.8.3.1

