From 9e5a4eda23ca23dcc33f90139e11dd27761763c1 Mon Sep 17 00:00:00 2001
From: Osumi Takamichi <osumi.takamichi@fujitsu.com>
Date: Tue, 16 Nov 2021 12:01:54 +0000
Subject: [PATCH v12 2/2] Extend pg_stat_subscription_workers to include
 general transaction statistics

Categorize transactions of logical replication subscriber
into three types (commit, error, abort) and introduce
cumulative columns of those numbers and amounts of
consumed data during message apply respectively.

The calculation of consumed resources by subscriber's apply
is computed based on the data structure for message apply
and extra data, which is different from that of publisher's
decoding processing. At present, there's no special consideration
to spool file statistics such as amount of data spooled to disk or
its count. But, this can be added later.

The amount of data used by apply for STREAM COMMIT and
STREAM PREPARE is transaction data consumption and can be
regarded as data that should be added to column of this commit.
This size of prepared transaction becomes persistent to conclude
the appropriate category for the transaction at either commit
prepared or rollback prepared time.

Author: Takamichi Osumi
Discussed & Reviewed-by: Amit Kapila, Masahiko Sawada, Hou Zhijie, Greg Nancarrow, Vignesh C, Ajin Cherian
Discussion: https://www.postgresql.org/message-id/OSBPR01MB48887CA8F40C8D984A6DC00CED199%40OSBPR01MB4888.jpnprd01.prod.outlook.com
---
 doc/src/sgml/monitoring.sgml                     |  71 ++++-
 src/backend/catalog/system_views.sql             |   6 +
 src/backend/postmaster/pgstat.c                  | 324 ++++++++++++++++++++++-
 src/backend/replication/logical/proto.c          |  12 +
 src/backend/replication/logical/tablesync.c      |   6 +
 src/backend/replication/logical/worker.c         | 174 +++++++++++-
 src/backend/utils/adt/pgstatfuncs.c              |  36 ++-
 src/include/catalog/pg_proc.dat                  |   6 +-
 src/include/pgstat.h                             | 101 ++++++-
 src/include/replication/logicalworker.h          |   5 +
 src/test/regress/expected/rules.out              |   8 +-
 src/test/subscription/t/027_worker_xact_stats.pl | 146 ++++++++++
 src/tools/pgindent/typedefs.list                 |   4 +
 13 files changed, 878 insertions(+), 21 deletions(-)
 create mode 100644 src/test/subscription/t/027_worker_xact_stats.pl

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 5b10d18..22caeef 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -629,8 +629,8 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
 
      <row>
       <entry><structname>pg_stat_subscription_workers</structname><indexterm><primary>pg_stat_subscription_workers</primary></indexterm></entry>
-      <entry>One row per subscription worker, showing statistics about errors
-      that occurred on that subscription worker.
+      <entry>One row per subscription worker, showing transaction statistics
+      and information about errors that occurred on that subscription worker.
       See <link linkend="monitoring-pg-stat-subscription-workers">
       <structname>pg_stat_subscription_workers</structname></link> for details.
       </entry>
@@ -3052,10 +3052,12 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
   <para>
    The <structname>pg_stat_subscription_workers</structname> view will contain
-   one row per subscription error reported by workers applying logical
+   one row per subscription, showing corresponding transaction statistics and
+   information about the error reported by workers applying logical
    replication changes and workers handling the initial data copy of the
    subscribed tables.  The statistics entry is removed when the subscription
-   the worker is running on is removed.
+   the worker is running on is removed. The statistics of transaction size is
+   utilized only by the apply worker.
   </para>
 
   <table id="pg-stat-subscription-workers" xreflabel="pg_stat_subscription_workers">
@@ -3103,6 +3105,67 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>commit_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions successfully applied in this subscription.
+       COMMIT, COMMIT of streaming transaction and COMMIT PREPARED increments
+       this counter.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>commit_bytes</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Amount of data (in bytes) successfully applied in this subscription,
+       across <literal>commit_count</literal> transactions.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>error_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions that failed to be applied by the table
+       sync worker or main apply worker in this subscription.
+       </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>error_bytes</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Amount of data (in bytes) unsuccessfully applied in this subscription,
+       across <literal>error_count</literal> transactions.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>abort_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions aborted in this subscription.
+       ROLLBACK PREPARED increments this counter.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>abort_bytes</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Amount of data (in bytes) aborted in this subscription,
+       across <literal>abort_count</literal> transactions.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>last_error_relid</structfield> <type>oid</type>
       </para>
       <para>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index ecf1a0b..a8880e8 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1267,6 +1267,12 @@ CREATE VIEW pg_stat_subscription_workers AS
         w.subid,
         s.subname,
         w.subrelid,
+        w.commit_count,
+        w.commit_bytes,
+        w.error_count,
+        w.error_bytes,
+        w.abort_count,
+        w.abort_bytes,
         w.last_error_relid,
         w.last_error_command,
         w.last_error_xid,
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index ee3b39a..d0e4ee0 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -53,6 +53,7 @@
 #include "postmaster/fork_process.h"
 #include "postmaster/interrupt.h"
 #include "postmaster/postmaster.h"
+#include "replication/logicalworker.h"
 #include "replication/slot.h"
 #include "replication/walsender.h"
 #include "storage/backendid.h"
@@ -325,11 +326,14 @@ static PgStat_StatTabEntry *pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry,
 static PgStat_StatSubWorkerEntry *pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry,
 															 Oid subid, Oid subrelid,
 															 bool create);
+static PgStat_SW_PreparedXactEntry *pgstat_get_subworker_prepared_txn(Oid databaseid, Oid subid,
+																	  char *gid, bool create);
+
 static void pgstat_write_statsfiles(bool permanent, bool allDbs);
 static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent);
 static HTAB *pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep);
 static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
-									 HTAB *subworkerhash, bool permanent);
+									 HTAB *subworkerhash, HTAB *preparedtxnhash, bool permanent);
 static void backend_read_statsfile(void);
 
 static bool pgstat_write_statsfile_needed(void);
@@ -382,6 +386,9 @@ static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
 static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
 static void pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len);
 static void pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len);
+static void pgstat_recv_subworker_xact_end(PgStat_MsgSubWorkerXactEnd *msg, int len);
+static void pgstat_recv_subworker_twophase_xact(PgStat_MsgSubWorkerTwophaseXact *msg, int len);
+
 
 /* ------------------------------------------------------------
  * Public functions called from postmaster follow
@@ -1930,6 +1937,61 @@ pgstat_report_replslot_drop(const char *slotname)
 }
 
 /* ----------
+ * pgstat_report_subworker_xact_end() -
+ *
+ *  Tell the collector that worker transaction has successfully completed.
+ * ----------
+ */
+void
+pgstat_report_subworker_xact_end(Oid subid, Oid subrel,
+								 LogicalRepMsgType command, PgStat_Counter xact_size)
+{
+	PgStat_MsgSubWorkerXactEnd msg;
+
+	Assert(command == 0 /* table sync worker */ ||
+		   command == LOGICAL_REP_MSG_COMMIT ||
+		   command == LOGICAL_REP_MSG_STREAM_COMMIT);
+
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERXACTEND);
+	msg.m_databaseid = MyDatabaseId;
+	msg.m_subid = subid;
+	msg.m_subrelid = subrel;
+	msg.m_command = command;
+	msg.m_xact_bytes = xact_size;
+	pgstat_send(&msg, sizeof(PgStat_MsgSubWorkerXactEnd));
+
+	reset_apply_xact_size();
+}
+
+/* ----------
+ * pgstat_report_subworker_twophase_xact() -
+ *
+ *  Tell the collector that worker transaction has done 2PC related operation.
+ * ----------
+ */
+void
+pgstat_report_subworker_twophase_xact(Oid subid, LogicalRepMsgType command,
+									  PgStat_Counter xact_size, char *gid)
+{
+	PgStat_MsgSubWorkerTwophaseXact msg;
+
+	Assert(command == LOGICAL_REP_MSG_PREPARE ||
+		   command == LOGICAL_REP_MSG_STREAM_PREPARE ||
+		   command == LOGICAL_REP_MSG_COMMIT_PREPARED ||
+		   command == LOGICAL_REP_MSG_ROLLBACK_PREPARED);
+
+	/* setup the message */
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERTWOPHASEXACT);
+	msg.m_databaseid = MyDatabaseId;
+	msg.m_subid = subid;
+	msg.m_command = command;
+	strlcpy(msg.m_gid, gid, sizeof(msg.m_gid));
+	msg.m_xact_bytes = xact_size;
+	pgstat_send(&msg, sizeof(PgStat_MsgSubWorkerTwophaseXact));
+	reset_apply_xact_size();
+}
+
+/* ----------
  * pgstat_report_subworker_error() -
  *
  *	Tell the collector about the subscription worker error.
@@ -1947,6 +2009,7 @@ pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
 	msg.m_databaseid = MyDatabaseId;
 	msg.m_subid = subid;
 	msg.m_subrelid = subrelid;
+	msg.m_xact_error_bytes = get_apply_xact_size();
 	msg.m_relid = relid;
 	msg.m_command = command;
 	msg.m_xid = xid;
@@ -1955,6 +2018,8 @@ pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
 
 	len = offsetof(PgStat_MsgSubWorkerError, m_message) + strlen(msg.m_message) + 1;
 	pgstat_send(&msg, len);
+
+	reset_apply_xact_size();
 }
 
 /* ----------
@@ -3725,6 +3790,14 @@ PgstatCollectorMain(int argc, char *argv[])
 					pgstat_recv_subworker_error(&msg.msg_subworkererror, len);
 					break;
 
+				case PGSTAT_MTYPE_SUBWORKERXACTEND:
+					pgstat_recv_subworker_xact_end(&msg.msg_subworkerxactend, len);
+					break;
+
+				case PGSTAT_MTYPE_SUBWORKERTWOPHASEXACT:
+					pgstat_recv_subworker_twophase_xact(&msg.msg_subworkertwophasexact, len);
+					break;
+
 				default:
 					break;
 			}
@@ -3831,6 +3904,14 @@ reset_dbentry_counters(PgStat_StatDBEntry *dbentry)
 									  PGSTAT_SUBWORKER_HASH_SIZE,
 									  &hash_ctl,
 									  HASH_ELEM | HASH_BLOBS);
+
+	hash_ctl.keysize = sizeof(PgStat_SW_PreparedXactKey);
+	hash_ctl.entrysize = sizeof(PgStat_SW_PreparedXactEntry);
+	hash_ctl.hcxt = pgStatLocalContext;
+	dbentry->subworkers_preparedsizes = hash_create("Subscription worker stats of prepared txn",
+													PGSTAT_SUBWORKER_HASH_SIZE,
+													&hash_ctl,
+													HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
 }
 
 /*
@@ -3941,6 +4022,12 @@ pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry, Oid subid, Oid subrelid,
 	/* If not found, initialize the new one */
 	if (create && !found)
 	{
+		subwentry->xact_commit_count = 0;
+		subwentry->xact_commit_bytes = 0;
+		subwentry->xact_error_count = 0;
+		subwentry->xact_error_bytes = 0;
+		subwentry->xact_abort_count = 0;
+		subwentry->xact_abort_bytes = 0;
 		subwentry->relid = InvalidOid;
 		subwentry->command = 0;
 		subwentry->xid = InvalidTransactionId;
@@ -4152,9 +4239,11 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
 	HASH_SEQ_STATUS tstat;
 	HASH_SEQ_STATUS fstat;
 	HASH_SEQ_STATUS sstat;
+	HASH_SEQ_STATUS pstat;
 	PgStat_StatTabEntry *tabentry;
 	PgStat_StatFuncEntry *funcentry;
 	PgStat_StatSubWorkerEntry *subwentry;
+	PgStat_SW_PreparedXactEntry *pentry;
 	FILE	   *fpout;
 	int32		format_id;
 	Oid			dbid = dbentry->databaseid;
@@ -4221,6 +4310,17 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
 	}
 
 	/*
+	 * Write subscription worker's prepared transaction struct
+	 */
+	hash_seq_init(&pstat, dbentry->subworkers_preparedsizes);
+	while ((pentry = (PgStat_SW_PreparedXactEntry *) hash_seq_search(&pstat)) != NULL)
+	{
+		fputc('P', fpout);
+		rc = fwrite(pentry, sizeof(PgStat_SW_PreparedXactEntry), 1, fpout);
+		(void) rc;				/* we'll check for error with ferror */
+	}
+
+	/*
 	 * No more output to be done. Close the temp file and replace the old
 	 * pgstat.stat with it.  The ferror() check replaces testing for error
 	 * after each individual fputc or fwrite above.
@@ -4459,6 +4559,7 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 				dbentry->tables = NULL;
 				dbentry->functions = NULL;
 				dbentry->subworkers = NULL;
+				dbentry->subworkers_preparedsizes = NULL;
 
 				/*
 				 * In the collector, disregard the timestamp we read from the
@@ -4504,6 +4605,14 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 												  &hash_ctl,
 												  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 
+				hash_ctl.keysize = sizeof(PgStat_SW_PreparedXactKey);
+				hash_ctl.entrysize = sizeof(PgStat_SW_PreparedXactEntry);
+				hash_ctl.hcxt = pgStatLocalContext;
+				dbentry->subworkers_preparedsizes = hash_create("Subscription worker stats of prepared txn",
+																PGSTAT_SUBWORKER_HASH_SIZE,
+																&hash_ctl,
+																HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
+
 				/*
 				 * If requested, read the data from the database-specific
 				 * file.  Otherwise we just leave the hashtables empty.
@@ -4513,6 +4622,7 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 											 dbentry->tables,
 											 dbentry->functions,
 											 dbentry->subworkers,
+											 dbentry->subworkers_preparedsizes,
 											 permanent);
 
 				break;
@@ -4597,7 +4707,7 @@ done:
  */
 static void
 pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
-						 HTAB *subworkerhash, bool permanent)
+						 HTAB *subworkerhash, HTAB *preparedtxnhash, bool permanent)
 {
 	PgStat_StatTabEntry *tabentry;
 	PgStat_StatTabEntry tabbuf;
@@ -4753,6 +4863,32 @@ pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
 				memcpy(subwentry, &subwbuf, sizeof(subwbuf));
 				break;
 
+			case 'P':
+				{
+					PgStat_SW_PreparedXactEntry buff;
+					PgStat_SW_PreparedXactEntry *prepared_xact_entry;
+
+					if (fread(&buff, 1, sizeof(PgStat_SW_PreparedXactEntry),
+							  fpin) != sizeof(PgStat_SW_PreparedXactEntry))
+					{
+						ereport(pgStatRunningInCollector ? LOG : WARNING,
+								(errmsg("corrupted statistics file \"%s\"",
+										statfile)));
+						goto done;
+					}
+
+					if (preparedtxnhash == NULL)
+						break;
+
+					prepared_xact_entry =
+						(PgStat_SW_PreparedXactEntry *) hash_search(preparedtxnhash,
+																	(void *) &buff.key,
+																	HASH_ENTER, NULL);
+
+					memcpy(prepared_xact_entry, &buff, sizeof(PgStat_SW_PreparedXactEntry));
+					break;
+				}
+
 				/*
 				 * 'E'	The EOF marker of a complete stats file.
 				 */
@@ -5428,6 +5564,8 @@ pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
 			hash_destroy(dbentry->functions);
 		if (dbentry->subworkers != NULL)
 			hash_destroy(dbentry->subworkers);
+		if (dbentry->subworkers_preparedsizes != NULL)
+			hash_destroy(dbentry->subworkers_preparedsizes);
 
 		if (hash_search(pgStatDBHash,
 						(void *) &dbid,
@@ -5467,10 +5605,13 @@ pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
 		hash_destroy(dbentry->functions);
 	if (dbentry->subworkers != NULL)
 		hash_destroy(dbentry->subworkers);
+	if (dbentry->subworkers_preparedsizes != NULL)
+		hash_destroy(dbentry->subworkers_preparedsizes);
 
 	dbentry->tables = NULL;
 	dbentry->functions = NULL;
 	dbentry->subworkers = NULL;
+	dbentry->subworkers_preparedsizes = NULL;
 
 	/*
 	 * Reset database-level stats, too.  This creates empty hash tables for
@@ -5546,10 +5687,28 @@ pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len)
 	else if (msg->m_resettype == RESET_SUBWORKER)
 	{
 		PgStat_StatSubWorkerKey key;
+		PgStat_SW_PreparedXactKey pkey;
 
 		key.subid = msg->m_objectid;
 		key.subrelid = msg->m_subobjectid;
 		(void) hash_search(dbentry->subworkers, (void *) &key, HASH_REMOVE, NULL);
+
+		/*
+		 * Clean up entry of prepared size hash as well.
+		 *
+		 * There's no gid input here as a part of key for
+		 * PgStat_SW_PreparedXactEntry. Also any valid 'relid' indicates the
+		 * entry is created by table sync but it has nothing to do with the
+		 * two phase operation of apply. Proceed with this removal only when
+		 * the specified subrelid isn't invalid.
+		 */
+		if (msg->m_subobjectid == InvalidOid)
+		{
+			pkey.subid = msg->m_objectid;
+			(void) hash_search(dbentry->subworkers_preparedsizes,
+							   (void *) &pkey, HASH_REMOVE, NULL);
+		}
+
 	}
 }
 
@@ -6105,6 +6264,7 @@ pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len)
 	HASH_SEQ_STATUS hstat;
 	PgStat_StatDBEntry *dbentry;
 	PgStat_StatSubWorkerEntry *subwentry;
+	PgStat_SW_PreparedXactEntry *prepared_txn_entry;
 
 	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
 
@@ -6126,6 +6286,127 @@ pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len)
 			}
 		}
 	}
+
+	/* Remove associated prepared transaction stats */
+	if (!dbentry->subworkers_preparedsizes)
+	{
+		hash_seq_init(&hstat, dbentry->subworkers_preparedsizes);
+		while ((prepared_txn_entry = (PgStat_SW_PreparedXactEntry *) hash_seq_search(&hstat)) != NULL)
+		{
+			for (int i = 0; i < msg->m_nentries; i++)
+			{
+				if (prepared_txn_entry->key.subid == msg->m_subids[i])
+					(void) hash_search(dbentry->subworkers_preparedsizes,
+									   (void *) &(prepared_txn_entry->key),
+									   HASH_REMOVE, NULL);
+				break;
+			}
+		}
+	}
+}
+
+/* ----------
+ * pgstat_recv_subworker_xact_end() -
+ *
+ *	Process a SUBWORKERXACTEND message.
+ * ----------
+ */
+static void
+pgstat_recv_subworker_xact_end(PgStat_MsgSubWorkerXactEnd *msg, int len)
+{
+	PgStat_StatDBEntry *dbentry;
+	PgStat_StatSubWorkerEntry *wentry;
+
+	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
+	wentry = pgstat_get_subworker_entry(dbentry, msg->m_subid,
+										msg->m_subrelid, true);
+	Assert(wentry);
+
+	/* table sync worker */
+	if (msg->m_command == 0)
+		wentry->xact_commit_count++;
+	else
+	{
+		/* apply worker */
+		switch (msg->m_command)
+		{
+			case LOGICAL_REP_MSG_COMMIT:
+			case LOGICAL_REP_MSG_STREAM_COMMIT:
+				wentry->xact_commit_count++;
+				wentry->xact_commit_bytes += msg->m_xact_bytes;
+				break;
+			default:
+				elog(ERROR, "unexpected logical message type as normal apply end");
+				break;
+		}
+	}
+}
+
+/* ----------
+ * pgstat_recv_subworker_twophase_xact() -
+ *
+ *	Process a SUBWORKERTWOPHASEXACT message.
+ * ----------
+ */
+static void
+pgstat_recv_subworker_twophase_xact(PgStat_MsgSubWorkerTwophaseXact *msg, int len)
+{
+	PgStat_StatDBEntry *dbentry;
+	PgStat_SW_PreparedXactEntry *pentry;
+	PgStat_StatSubWorkerEntry *wentry;
+	PgStat_SW_PreparedXactKey key;
+
+	pentry = pgstat_get_subworker_prepared_txn(msg->m_databaseid,
+											   msg->m_subid,
+											   msg->m_gid, true);
+	Assert(pentry);
+	switch (msg->m_command)
+	{
+		case LOGICAL_REP_MSG_PREPARE:
+		case LOGICAL_REP_MSG_STREAM_PREPARE:
+
+			/*
+			 * Make each size of prepared transaction persistent so that we
+			 * can update stats over the server restart and make prepared
+			 * stats updated when commit prepared or rollback prepared
+			 * arrives.
+			 */
+			pentry->subid = msg->m_subid;
+			strlcpy(pentry->gid, msg->m_gid, sizeof(pentry->gid));
+			pentry->xact_size = msg->m_xact_bytes;
+			break;
+
+		case LOGICAL_REP_MSG_COMMIT_PREPARED:
+		case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
+			/* Update exported xact stats now */
+			dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
+			wentry = pgstat_get_subworker_entry(dbentry,
+												msg->m_subid,
+												InvalidOid /* apply worker */ ,
+												true);
+			Assert(wentry);
+			if (msg->m_command == LOGICAL_REP_MSG_COMMIT_PREPARED)
+			{
+				wentry->xact_commit_count++;
+				wentry->xact_commit_bytes += pentry->xact_size;
+			}
+			else
+			{
+				wentry->xact_abort_count++;
+				wentry->xact_abort_bytes += pentry->xact_size;
+			}
+
+			/* Clean up this gid from transaction size hash */
+			key.subid = pentry->subid;
+			strlcpy(key.gid, msg->m_gid, sizeof(key.gid));
+			(void) hash_search(dbentry->subworkers_preparedsizes,
+							   (void *) &key, HASH_REMOVE, NULL);
+			break;
+
+		default:
+			elog(ERROR, "unexpected logical message type as prepare transaction");
+			break;
+	}
 }
 
 /* ----------
@@ -6147,6 +6428,10 @@ pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len)
 										   msg->m_subrelid, true);
 	Assert(subwentry);
 
+	/* general transaction stats for error */
+	subwentry->xact_error_count++;
+	subwentry->xact_error_bytes += msg->m_xact_error_bytes;
+
 	/*
 	 * Update only the counter and last error timestamp if we received
 	 * the same error again
@@ -6394,3 +6679,38 @@ pgstat_count_slru_truncate(int slru_idx)
 {
 	slru_entry(slru_idx)->m_truncate += 1;
 }
+
+ /* ----------
+ * pgstat_get_subworker_prepared_txn
+ *
+ * Return subscription worker entry with the given subscription OID and
+ * gid.
+ * ----------
+ */
+static PgStat_SW_PreparedXactEntry *
+pgstat_get_subworker_prepared_txn(Oid databaseid, Oid subid,
+								  char *gid, bool create)
+{
+	PgStat_StatDBEntry *dbentry;
+	PgStat_SW_PreparedXactKey key;
+	PgStat_SW_PreparedXactEntry *pentry;
+	HASHACTION	action;
+	bool		found;
+
+	dbentry = pgstat_get_db_entry(databaseid, true);
+	key.subid = subid;
+	strlcpy(key.gid, gid, sizeof(key.gid));
+	action = (create ? HASH_ENTER : HASH_FIND);
+	pentry = (PgStat_SW_PreparedXactEntry *) hash_search(dbentry->subworkers_preparedsizes,
+														 (void *) &key,
+														 action, &found);
+
+	if (create && !found)
+	{
+		pentry->subid = 0;
+		pentry->gid[0] = '\0';
+		pentry->xact_size = 0;
+	}
+
+	return pentry;
+}
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 9f5bf4b..8306876 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -17,6 +17,7 @@
 #include "catalog/pg_type.h"
 #include "libpq/pqformat.h"
 #include "replication/logicalproto.h"
+#include "replication/logicalworker.h"
 #include "utils/lsyscache.h"
 #include "utils/syscache.h"
 
@@ -842,6 +843,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
 {
 	int			i;
 	int			natts;
+	Size		total_size;
 
 	/* Get number of attributes */
 	natts = pq_getmsgint(in, 2);
@@ -850,6 +852,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
 	tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
 	tuple->colstatus = (char *) palloc(natts * sizeof(char));
 	tuple->ncols = natts;
+	total_size = natts * (sizeof(char) + sizeof(StringInfoData));
 
 	/* Read the data */
 	for (i = 0; i < natts; i++)
@@ -880,6 +883,9 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
 				value->len = len;
 				value->cursor = 0;
 				value->maxlen = len;
+
+				/* memory for tuple */
+				total_size += len + 1;
 				break;
 			case LOGICALREP_COLUMN_BINARY:
 				len = pq_getmsgint(in, 4);	/* read length */
@@ -893,11 +899,17 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
 				value->len = len;
 				value->cursor = 0;
 				value->maxlen = len;
+
+				/* memory for tuple */
+				total_size += len + 1;
 				break;
 			default:
 				elog(ERROR, "unrecognized data representation type '%c'", kind);
 		}
 	}
+
+	/* Record memory consumption for tuple */
+	add_apply_xact_size(total_size);
 }
 
 /*
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f07983a..5bb1d63 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1149,6 +1149,12 @@ copy_table_done:
 	MyLogicalRepWorker->relstate_lsn = *origin_startpos;
 	SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
+	/* Report the success of table sync. */
+	pgstat_report_subworker_xact_end(MyLogicalRepWorker->subid,
+									 MyLogicalRepWorker->relid,
+									 0 /* no logical message type */ ,
+									 0 /* xact size */ );
+
 	/*
 	 * Finally, wait until the main apply worker tells us to catch up and then
 	 * return to let LogicalRepApplyLoop do it.
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 2e79302..ec9c270 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -221,6 +221,18 @@ typedef struct ApplyErrorCallbackArg
 	LogicalRepMsgType command;	/* 0 if invalid */
 	LogicalRepRelMapEntry *rel;
 
+	/*
+	 * Store data size of this transaction.
+	 *
+	 * The byte size of transaction on the publisher is calculated by
+	 * ReorderBufferChangeSize() based on the ReorderBufferChange structure.
+	 * But on the subscriber, consumed resources are not same as the
+	 * publisher's decoding processsing and required to be computed in
+	 * different way. Therefore, the exact same byte size is not restored on
+	 * the subscriber usually.
+	 */
+	PgStat_Counter bytes;
+
 	/* Remote node information */
 	int			remote_attnum;	/* -1 if invalid */
 	TransactionId remote_xid;
@@ -231,6 +243,7 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
 {
 	.command = 0,
 	.rel = NULL,
+	.bytes = 0,
 	.remote_attnum = -1,
 	.remote_xid = InvalidTransactionId,
 	.ts = 0,
@@ -293,6 +306,7 @@ static inline void cleanup_subxact_info(void);
 static void stream_cleanup_files(Oid subid, TransactionId xid);
 static void stream_open_file(Oid subid, TransactionId xid, bool first);
 static void stream_write_change(char action, StringInfo s);
+
 static void stream_close_file(void);
 
 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
@@ -304,6 +318,9 @@ static void maybe_reread_subscription(void);
 /* prototype needed because of stream_commit */
 static void apply_dispatch(StringInfo s);
 
+static void update_apply_change_size(LogicalRepMsgType action,
+									 bool is_partitioned);
+
 static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
 static void apply_handle_insert_internal(ApplyExecutionData *edata,
 										 ResultRelInfo *relinfo,
@@ -818,6 +835,11 @@ apply_handle_commit(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(commit_data.end_lsn);
 
+	pgstat_report_subworker_xact_end(MyLogicalRepWorker->subid,
+									 InvalidOid,
+									 LOGICAL_REP_MSG_COMMIT,
+									 get_apply_xact_size());
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 	reset_apply_error_context_info();
 }
@@ -922,6 +944,13 @@ apply_handle_prepare(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(prepare_data.end_lsn);
 
+	/* Update the memory consumption of prepare and report */
+	update_apply_change_size(LOGICAL_REP_MSG_PREPARE, false);
+	pgstat_report_subworker_twophase_xact(MyLogicalRepWorker->subid,
+										  LOGICAL_REP_MSG_PREPARE,
+										  get_apply_xact_size(),
+										  prepare_data.gid);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 	reset_apply_error_context_info();
 }
@@ -963,6 +992,13 @@ apply_handle_commit_prepared(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(prepare_data.end_lsn);
 
+	/* Update the transaction size and report */
+	update_apply_change_size(LOGICAL_REP_MSG_COMMIT_PREPARED, false);
+	pgstat_report_subworker_twophase_xact(MyLogicalRepWorker->subid,
+										  LOGICAL_REP_MSG_COMMIT_PREPARED,
+										  get_apply_xact_size(),
+										  prepare_data.gid);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 	reset_apply_error_context_info();
 }
@@ -1014,6 +1050,13 @@ apply_handle_rollback_prepared(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(rollback_data.rollback_end_lsn);
 
+	/* Update the transaction size and report */
+	update_apply_change_size(LOGICAL_REP_MSG_ROLLBACK_PREPARED, false);
+	pgstat_report_subworker_twophase_xact(MyLogicalRepWorker->subid,
+										  LOGICAL_REP_MSG_ROLLBACK_PREPARED,
+										  get_apply_xact_size(),
+										  rollback_data.gid);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 	reset_apply_error_context_info();
 }
@@ -1066,6 +1109,15 @@ apply_handle_stream_prepare(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(prepare_data.end_lsn);
 
+	/*
+	 * Report the prepared streaming xact size to the stats collector in a
+	 * prepared xact manner to make it survive over the restart.
+	 */
+	pgstat_report_subworker_twophase_xact(MyLogicalRepWorker->subid,
+										  LOGICAL_REP_MSG_STREAM_PREPARE,
+										  get_apply_xact_size(),
+										  prepare_data.gid);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 
 	reset_apply_error_context_info();
@@ -1438,6 +1490,12 @@ apply_handle_stream_commit(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(commit_data.end_lsn);
 
+	/* Report and clean up the xid */
+	pgstat_report_subworker_xact_end(MyLogicalRepWorker->subid,
+									 InvalidOid,
+									 LOGICAL_REP_MSG_STREAM_COMMIT,
+									 get_apply_xact_size());
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 
 	reset_apply_error_context_info();
@@ -1579,6 +1637,10 @@ apply_handle_insert(StringInfo s)
 	slot_fill_defaults(rel, estate, remoteslot);
 	MemoryContextSwitchTo(oldctx);
 
+	/* Update transaction size */
+	update_apply_change_size(LOGICAL_REP_MSG_INSERT,
+							 rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
+
 	/* For a partitioned table, insert the tuple into a partition. */
 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 		apply_handle_tuple_routing(edata,
@@ -1736,7 +1798,11 @@ apply_handle_update(StringInfo s)
 					has_oldtup ? &oldtup : &newtup);
 	MemoryContextSwitchTo(oldctx);
 
-	/* For a partitioned table, apply update to correct partition. */
+	/* Update transaction size */
+	update_apply_change_size(LOGICAL_REP_MSG_UPDATE,
+							 rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
+
+	/* For a partitioned table, apply update to corect partition. */
 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 		apply_handle_tuple_routing(edata,
 								   remoteslot, &newtup, CMD_UPDATE);
@@ -1870,6 +1936,10 @@ apply_handle_delete(StringInfo s)
 	slot_store_data(remoteslot, rel, &oldtup);
 	MemoryContextSwitchTo(oldctx);
 
+	/* Update transaction size */
+	update_apply_change_size(LOGICAL_REP_MSG_DELETE,
+							 rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
+
 	/* For a partitioned table, apply delete to correct partition. */
 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 		apply_handle_tuple_routing(edata,
@@ -2421,6 +2491,81 @@ apply_dispatch(StringInfo s)
 }
 
 /*
+ * Subscriber side implementation equivalent to ReorderBufferChangeSize
+ * of the publisher.
+ *
+ * According to the logical replication message type, record major
+ * resource consumptions of this subscription for each message.
+ * At present, do not collect data from generic functions to keep
+ * code simplicity except for tuple length, since the implementation
+ * complexity versus benefit tradeoff should not be good.
+ * In terms of tuple length consumption, see logicalrep_read_tuple.
+ *
+ * Also, add multiple values at once in order to reduce the number
+ * of calls to this function, although the disadvantage of this way
+ * is we cannot get correct transaction size when we get an error.
+ *
+ * 'is_partitioned' is used to add some extra size.
+ */
+static void
+update_apply_change_size(LogicalRepMsgType action, bool is_partitioned)
+{
+	int64		size = 0;
+
+	switch (action)
+	{
+			/* No special memory consumption */
+		case LOGICAL_REP_MSG_BEGIN:
+		case LOGICAL_REP_MSG_COMMIT:
+		case LOGICAL_REP_MSG_TRUNCATE:
+		case LOGICAL_REP_MSG_TYPE:
+		case LOGICAL_REP_MSG_ORIGIN:
+		case LOGICAL_REP_MSG_MESSAGE:
+		case LOGICAL_REP_MSG_BEGIN_PREPARE:
+		case LOGICAL_REP_MSG_RELATION:
+			break;
+
+		case LOGICAL_REP_MSG_INSERT:
+		case LOGICAL_REP_MSG_UPDATE:
+		case LOGICAL_REP_MSG_DELETE:
+			Assert(!in_streamed_transaction);
+
+			/*
+			 * Compute size based on ApplyExecutionData. The size of
+			 * LogicalRepRelMapEntry can be skipped because it is obtained
+			 * from hash_search in logicalrep_rel_open.
+			 */
+			size += sizeof(ApplyExecutionData) + sizeof(EState) +
+				sizeof(ResultRelInfo) + sizeof(ResultRelInfo);
+
+			/* Add some extra size if the target relation is partitioned. */
+			if (is_partitioned)
+				size += sizeof(ModifyTableState) + sizeof(PartitionTupleRouting);
+			break;
+
+		case LOGICAL_REP_MSG_PREPARE:
+		case LOGICAL_REP_MSG_COMMIT_PREPARED:
+		case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
+			size += sizeof(FlushPosition);
+			break;
+
+		case LOGICAL_REP_MSG_STREAM_START:
+		case LOGICAL_REP_MSG_STREAM_STOP:
+		case LOGICAL_REP_MSG_STREAM_COMMIT:
+		case LOGICAL_REP_MSG_STREAM_PREPARE:
+		case LOGICAL_REP_MSG_STREAM_ABORT:
+		default:
+			ereport(ERROR,
+					(errcode(ERRCODE_PROTOCOL_VIOLATION),
+					 errmsg("invalid logical replication message type \"%c\"", action)));
+	}
+
+	/* Update the total size of consumption when necessary */
+	if (size != 0)
+		add_apply_xact_size(size);
+}
+
+/*
  * Figure out which write/flush positions to report to the walsender process.
  *
  * We can't simply report back the last LSN the walsender sent us because the
@@ -3294,6 +3439,7 @@ stream_write_change(char action, StringInfo s)
 	BufFileWrite(stream_fd, &s->data[s->cursor], len);
 }
 
+
 /*
  * Cleanup the memory for subxacts and reset the related variables.
  */
@@ -3605,6 +3751,32 @@ ApplyWorkerMain(Datum main_arg)
 }
 
 /*
+ * Get transaction size stored on apply error callback.
+ * This is used not only for error but also commit and
+ * rollback. Exported so that stats collector can utilize
+ * this value.
+ */
+int64
+get_apply_xact_size(void)
+{
+	return apply_error_callback_arg.bytes;
+}
+
+/* Add size to apply transaction size */
+void
+add_apply_xact_size(int64 size)
+{
+	apply_error_callback_arg.bytes += size;
+}
+
+/* Reset transaction size on apply error callback */
+void
+reset_apply_xact_size(void)
+{
+	apply_error_callback_arg.bytes = 0;
+}
+
+/*
  * Is current process a logical replication worker?
  */
 bool
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index a34c0b6..d7264fc 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2414,7 +2414,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS	9
+#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS	15
 	Oid			subid = PG_GETARG_OID(0);
 	Oid			subrelid;
 	TupleDesc	tupdesc;
@@ -2441,19 +2441,31 @@ pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 					   OIDOID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "subrelid",
 					   OIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "last_error_relid",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "commit_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "commit_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "error_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "error_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "abort_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "abort_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "last_error_relid",
 					   OIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "last_error_command",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "last_error_command",
 					   TEXTOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "last_error_xid",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 11, "last_error_xid",
 					   XIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "last_error_count",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 12, "last_error_count",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "last_error_message",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 13, "last_error_message",
 					   TEXTOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "first_error_time",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 14, "first_error_time",
 					   TIMESTAMPTZOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "last_error_time",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 15, "last_error_time",
 					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
@@ -2471,6 +2483,14 @@ pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 	else
 		nulls[i++] = true;
 
+	/* transaction stats */
+	values[i++] = Int64GetDatum(wentry->xact_commit_count);
+	values[i++] = Int64GetDatum(wentry->xact_commit_bytes);
+	values[i++] = Int64GetDatum(wentry->xact_error_count);
+	values[i++] = Int64GetDatum(wentry->xact_error_bytes);
+	values[i++] = Int64GetDatum(wentry->xact_abort_count);
+	values[i++] = Int64GetDatum(wentry->xact_abort_bytes);
+
 	/* last_error_relid */
 	if (OidIsValid(wentry->relid))
 		values[i++] = ObjectIdGetDatum(wentry->relid);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 25f685f..fc654b4 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5389,9 +5389,9 @@
   proname => 'pg_stat_get_subscription_worker', prorows => '1', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'oid oid',
-  proallargtypes => '{oid,oid,oid,oid,oid,text,xid,int8,text,timestamptz,timestamptz}',
-  proargmodes => '{i,i,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subrelid,subid,subrelid,last_error_relid,last_error_command,last_error_xid,last_error_count,last_error_message,first_error_time,last_error_time}',
+  proallargtypes => '{oid,oid,oid,oid,int8,int8,int8,int8,int8,int8,oid,text,xid,int8,text,timestamptz,timestamptz}',
+  proargmodes => '{i,i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subrelid,subid,subrelid,commit_count,commit_bytes,error_count,error_bytes,abort_count,abort_bytes,last_error_relid,last_error_command,last_error_xid,last_error_count,last_error_message,first_error_time,last_error_time}',
   prosrc => 'pg_stat_get_subscription_worker' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 2c26b1c..266711b 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -86,6 +86,8 @@ typedef enum StatMsgType
 	PGSTAT_MTYPE_DISCONNECT,
 	PGSTAT_MTYPE_SUBSCRIPTIONPURGE,
 	PGSTAT_MTYPE_SUBWORKERERROR,
+	PGSTAT_MTYPE_SUBWORKERXACTEND,
+	PGSTAT_MTYPE_SUBWORKERTWOPHASEXACT,
 } StatMsgType;
 
 /* ----------
@@ -558,6 +560,56 @@ typedef struct PgStat_MsgSubscriptionPurge
 } PgStat_MsgSubscriptionPurge;
 
 /* ----------
+ * PgStat_MsgSubscriptionXactEnd	Sent by the apply worker or the table sync worker
+ *									to report successful transaction ends.
+ * ----------
+ */
+typedef struct PgStat_MsgSubWorkerXactEnd
+{
+	PgStat_MsgHdr m_hdr;
+
+	/* determine the worker entry */
+	Oid			m_databaseid;
+	Oid			m_subid;
+	Oid			m_subrelid;
+
+	/*
+	 * distinguish between transaction commits and streaming transaction
+	 * aborts that are handled without error.
+	 */
+	LogicalRepMsgType m_command;
+
+	/* memory consumption used by transaction */
+	PgStat_Counter m_xact_bytes;
+
+} PgStat_MsgSubWorkerXactEnd;
+
+/* ----------
+ * PgStat_MsgSubWorkerTwophaseXact	Sent by the apply worker to make size of prepared
+ *									txn persistent over the server restart and make it
+ *									visible after commit prepare or rollback prepared.
+ *									This is separated from PgStat_MsgSubWorkerXactEnd
+ *									so that we can reduce message size of gid for other
+ *									operations (e.g. normal COMMIT) that should happen more
+ *									frequently than prepare operation usually.
+ * ----------
+ */
+typedef struct PgStat_MsgSubWorkerTwophaseXact
+{
+	PgStat_MsgHdr m_hdr;
+
+	/* determine the subscription */
+	Oid			m_databaseid;
+	Oid			m_subid;
+
+	LogicalRepMsgType m_command;
+	char		m_gid[GIDSIZE];
+	int			gid_len;
+	PgStat_Counter m_xact_bytes;
+
+} PgStat_MsgSubWorkerTwophaseXact;
+
+/* ----------
  * PgStat_MsgSubWorkerError		Sent by the apply worker or the table sync worker to
  *								report the error occurred during logical replication.
  * ----------
@@ -577,6 +629,12 @@ typedef struct PgStat_MsgSubWorkerError
 	Oid			m_subrelid;
 
 	/*
+	 * Transaction stats of subscription needs to be updated when an error
+	 * occurs.
+	 */
+	PgStat_Counter m_xact_error_bytes;
+
+	/*
 	 * Oid of the table that the reporter was actually processing. m_relid can
 	 * be InvalidOid if an error occurred during worker applying a
 	 * non-data-modification message such as RELATION.
@@ -768,6 +826,8 @@ typedef union PgStat_Msg
 	PgStat_MsgDisconnect msg_disconnect;
 	PgStat_MsgSubscriptionPurge msg_subscriptionpurge;
 	PgStat_MsgSubWorkerError msg_subworkererror;
+	PgStat_MsgSubWorkerXactEnd msg_subworkerxactend;
+	PgStat_MsgSubWorkerTwophaseXact msg_subworkertwophasexact;
 } PgStat_Msg;
 
 
@@ -822,16 +882,23 @@ typedef struct PgStat_StatDBEntry
 	TimestampTz stats_timestamp;	/* time of db stats file update */
 
 	/*
-	 * tables, functions, and subscription workers must be last in the struct,
-	 * because we don't write the pointers out to the stats file.
+	 * tables, functions, subscription workers and its prepared transaction
+	 * stats must be last in the struct, because we don't write the pointers
+	 * out to the stats file.
 	 *
 	 * subworker is the hash table of PgStat_StatSubWorkerEntry which stores
 	 * statistics of logical replication workers: apply worker and table sync
 	 * worker.
+	 *
+	 * subworkers_preparedsizes should give appropriate transaction sizes at
+	 * either commit prepared or rollback prepared time, even when it's after
+	 * the server restart. We have the apply worker send those statistics to
+	 * the stats collector at prepare time.
 	 */
 	HTAB	   *tables;
 	HTAB	   *functions;
 	HTAB	   *subworkers;
+	HTAB	   *subworkers_preparedsizes;
 } PgStat_StatDBEntry;
 
 
@@ -1005,6 +1072,16 @@ typedef struct PgStat_StatSubWorkerEntry
 	PgStat_StatSubWorkerKey key;	/* hash key (must be first) */
 
 	/*
+	 * Cumulative transaction statistics of subscription worker
+	 */
+	PgStat_Counter xact_commit_count;
+	PgStat_Counter xact_commit_bytes;
+	PgStat_Counter xact_error_count;
+	PgStat_Counter xact_error_bytes;
+	PgStat_Counter xact_abort_count;
+	PgStat_Counter xact_abort_bytes;
+
+	/*
 	 * Subscription worker error statistics representing an error that
 	 * occurred during application of logical replication or the initial table
 	 * synchronization.
@@ -1018,6 +1095,22 @@ typedef struct PgStat_StatSubWorkerEntry
 	char		error_message[PGSTAT_SUBWORKERERROR_MSGLEN];
 } PgStat_StatSubWorkerEntry;
 
+/* prepared transaction */
+typedef struct PgStat_SW_PreparedXactKey
+{
+	Oid			subid;
+	char		gid[GIDSIZE];
+} PgStat_SW_PreparedXactKey;
+
+typedef struct PgStat_SW_PreparedXactEntry
+{
+	PgStat_SW_PreparedXactKey key;	/* hash key */
+
+	Oid			subid;
+	char		gid[GIDSIZE];
+	PgStat_Counter xact_size;
+} PgStat_SW_PreparedXactEntry;
+
 /*
  * Working state needed to accumulate per-function-call timing statistics.
  */
@@ -1128,6 +1221,10 @@ extern void pgstat_report_checksum_failure(void);
 extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat);
 extern void pgstat_report_replslot_create(const char *slotname);
 extern void pgstat_report_replslot_drop(const char *slotname);
+extern void pgstat_report_subworker_xact_end(Oid subid, Oid subrel,
+											 LogicalRepMsgType command, PgStat_Counter xact_size);
+extern void pgstat_report_subworker_twophase_xact(Oid subid, LogicalRepMsgType command,
+												  PgStat_Counter xact_size, char *gid);
 extern void pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
 										  LogicalRepMsgType command,
 										  TransactionId xid, const char *errmsg);
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 2ad61a0..435884d 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -16,4 +16,9 @@ extern void ApplyWorkerMain(Datum main_arg);
 
 extern bool IsLogicalWorker(void);
 
+/* for transaction stats */
+extern int64 get_apply_xact_size(void);
+extern void add_apply_xact_size(int64 size);
+extern void reset_apply_xact_size(void);
+
 #endif							/* LOGICALWORKER_H */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index d60c5a5..bd3e221 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2097,6 +2097,12 @@ pg_stat_subscription| SELECT su.oid AS subid,
 pg_stat_subscription_workers| SELECT w.subid,
     s.subname,
     w.subrelid,
+    w.commit_count,
+    w.commit_bytes,
+    w.error_count,
+    w.error_bytes,
+    w.abort_count,
+    w.abort_bytes,
     w.last_error_relid,
     w.last_error_command,
     w.last_error_xid,
@@ -2112,7 +2118,7 @@ pg_stat_subscription_workers| SELECT w.subid,
             pg_subscription_rel.srrelid AS relid
            FROM pg_subscription_rel
           WHERE (pg_subscription_rel.srsubstate <> 'r'::"char")) sr,
-    (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, last_error_relid, last_error_command, last_error_xid, last_error_count, last_error_message, first_error_time, last_error_time)
+    (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, commit_count, commit_bytes, error_count, error_bytes, abort_count, abort_bytes, last_error_relid, last_error_command, last_error_xid, last_error_count, last_error_message, first_error_time, last_error_time)
      JOIN pg_subscription s ON ((w.subid = s.oid)));
 pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid,
     pg_stat_all_indexes.indexrelid,
diff --git a/src/test/subscription/t/027_worker_xact_stats.pl b/src/test/subscription/t/027_worker_xact_stats.pl
new file mode 100644
index 0000000..e09be42
--- /dev/null
+++ b/src/test/subscription/t/027_worker_xact_stats.pl
@@ -0,0 +1,146 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Tests for subscription worker statistics during apply.
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 2;
+
+# Create publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf(
+	'postgresql.conf', qq[
+logical_decoding_work_mem = 64kB
+max_prepared_transactions = 10
+max_wal_senders = 10
+wal_sender_timeout = 0
+]);
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf(
+	'postgresql.conf', qq[
+max_prepared_transactions = 10
+log_min_messages = DEBUG1
+]);
+$node_subscriber->start;
+
+# Setup structures on the publisher and the subscriber
+$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab (a int)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_tab (a int primary key);");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname = 'tap_sub';
+my $subopts = 'streaming = on, two_phase = on, copy_data = false';
+$node_subscriber->safe_psql(
+	'postgres', qq[
+CREATE SUBSCRIPTION tap_sub
+CONNECTION '$publisher_connstr application_name=$appname'
+PUBLICATION tap_pub WITH ($subopts);
+]);
+
+$node_publisher->wait_for_catchup($appname);
+
+# There's no entry at the beginning
+my $result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_stat_subscription_workers;");
+is($result, q(0), 'no entry for transaction stats yet');
+
+# COMMIT
+$node_publisher->safe_psql('postgres',
+	"BEGIN; INSERT INTO test_tab VALUES (1); COMMIT;");
+
+$node_subscriber->poll_query_until('postgres',
+	"SELECT count(1) = 1 FROM pg_stat_subscription_workers where commit_count = 1;"
+) or die "didn't get updates of xact stats by commit";
+
+# Now, stats collector make the bytes updated also.
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT commit_bytes > 0 FROM pg_stat_subscription_workers;");
+is($result, q(t), 'got consumed bytes');
+
+# STREAM COMMIT
+$node_publisher->safe_psql('postgres',
+	"BEGIN; INSERT INTO test_tab VALUES(generate_series(1001, 2000)); COMMIT;"
+);
+$node_subscriber->poll_query_until('postgres',
+	"SELECT count(1) = 1 FROM pg_stat_subscription_workers where commit_count = 2;"
+) or die "didn't get updates of xact stats by stream commit";
+
+# STREAM PREPARE & COMMIT PREPARED
+# This should restore the xact size before the shutdown.
+$node_publisher->safe_psql(
+	'postgres', q[
+BEGIN;
+INSERT INTO test_tab VALUES(generate_series(2001, 3000));
+PREPARE TRANSACTION 'gid1';
+]);
+
+# This streamed prepare is not displayed until the commit prepared
+# or rollback prepared. Hence, there's no way to confirm that
+# stats collector has received the bytes of prepared transaction.
+# So, instead of checking the view, issue one more committed transaction
+# after the prepare and make sure that this commit's update is done,
+# which should mean the previous streamed prepare is already processed
+# by the stats collector as well.
+$node_publisher->safe_psql('postgres',
+	"BEGIN; INSERT INTO test_tab VALUES (2); COMMIT;");
+$node_subscriber->poll_query_until('postgres',
+	"SELECT count(1) = 1 FROM pg_stat_subscription_workers where commit_count = 3;"
+) or die "didn't process the updates of committed transaction";
+
+$node_subscriber->restart;
+
+# Get commit_bytes before commit prepared.
+my $tmp = $node_subscriber->safe_psql('postgres',
+	"SELECT commit_bytes FROM pg_stat_subscription_workers where commit_count = 3;"
+);
+
+# Commit prepared increments the commit_count.
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'gid1'");
+$node_subscriber->poll_query_until('postgres',
+	"SELECT count(1) = 1 FROM pg_stat_subscription_workers where commit_count = 4;"
+  )
+  or die
+  "didn't get updates of xact stats by stream prepare and commit prepared";
+
+$node_subscriber->poll_query_until('postgres',
+	"SELECT commit_bytes > $tmp FROM pg_stat_subscription_workers where commit_count = 4;"
+);
+
+# ROLLBACK PREPARED
+$node_publisher->safe_psql(
+	'postgres', q[
+BEGIN;
+INSERT INTO test_tab VALUES (3);
+PREPARE TRANSACTION 'gid2';
+]);
+$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'gid2'");
+$node_subscriber->poll_query_until('postgres',
+	"SELECT count(1) = 1 FROM pg_stat_subscription_workers where abort_count = 1;"
+) or die "didn't get updates of xact stats by rollback prepared";
+
+# error stats (by duplication error)
+$node_publisher->safe_psql(
+	'postgres', q[
+BEGIN;
+INSERT INTO test_tab VALUES (1);
+COMMIT;
+]);
+
+$node_subscriber->poll_query_until('postgres',
+	"SELECT count(1) = 1 FROM pg_stat_subscription_workers where error_count > 0;"
+) or die "didn't get updates of xact stats by error";
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index f41ef0d..3403667 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1945,6 +1945,8 @@ PgStat_MsgResetslrucounter
 PgStat_MsgSLRU
 PgStat_MsgSubscriptionPurge
 PgStat_MsgSubWorkerError
+PgStat_MsgSubWorkerTwophaseXact
+PgStat_MsgSubWorkerXactEnd
 PgStat_MsgTabpurge
 PgStat_MsgTabstat
 PgStat_MsgTempFile
@@ -1958,6 +1960,8 @@ PgStat_StatFuncEntry
 PgStat_StatReplSlotEntry
 PgStat_StatSubWorkerEntry
 PgStat_StatSubWorkerKey
+PgStat_SW_PreparedXactKey
+PgStat_SW_PreparedXactEntry
 PgStat_StatTabEntry
 PgStat_SubXactStatus
 PgStat_TableCounts
-- 
2.2.0

