From e6de87be6af810114d74b23e7d5a160499838157 Mon Sep 17 00:00:00 2001
From: Osumi Takamichi <osumi.takamichi@fujitsu.com>
Date: Fri, 5 Nov 2021 06:57:46 +0000
Subject: [PATCH v8] Extend pg_stat_subscription_workers to include general transaction statistics

Categorize transactions of logical replication subscriber
into three types (xact_commit, xact_error, xact_abort)
and introduce cumulative columns of those numbers and amounts of
consumed data during message apply respectively.

One scenario to utilize those columns is to suppress
unnecessary network bandwidth, when streaming transaction aborted
more than expected is observed by the column of xact_abort_count
or the column of its bytes.

The calculation of consumed resources by subscriber is computed
based on the data structure for message apply, which is different
from that of publisher's decoding processing.

Stats of prepared transaction becomes persistent to conclude
the appropriate category for the transaction at
either commit prepared or rollback prepared time.

Also, streaming transactions running in parallel on publisher
can cause data blocks in unpredictable order. Managing each
streaming tranaction by xid makes it possible to handle
stream abort, stream commit and stream prepare properly.

Author: Takamichi Osumi
Discussed & Reviewed-by: Amit Kapila, Masahiko Sawada, Hou Zhijie, Greg Nancarrow, Vignesh C
Discussion: https://www.postgresql.org/message-id/OSBPR01MB48887CA8F40C8D984A6DC00CED199%40OSBPR01MB4888.jpnprd01.prod.outlook.com
---
 doc/src/sgml/monitoring.sgml                     |  94 +++++--
 src/backend/catalog/system_views.sql             |  16 +-
 src/backend/executor/execPartition.c             |  10 +
 src/backend/postmaster/pgstat.c                  | 298 ++++++++++++++++++++++
 src/backend/replication/logical/tablesync.c      |   6 +
 src/backend/replication/logical/worker.c         | 304 +++++++++++++++++++++++
 src/backend/utils/adt/pgstatfuncs.c              |  46 +++-
 src/include/catalog/pg_proc.dat                  |   6 +-
 src/include/executor/execPartition.h             |   1 +
 src/include/pgstat.h                             |  91 +++++++
 src/include/replication/logicalworker.h          |   5 +
 src/test/regress/expected/rules.out              |  18 +-
 src/test/subscription/t/026_error_report.pl      |   8 +-
 src/test/subscription/t/027_worker_xact_stats.pl | 172 +++++++++++++
 14 files changed, 1030 insertions(+), 45 deletions(-)
 create mode 100644 src/test/subscription/t/027_worker_xact_stats.pl

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 094c723..4e9a68c 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -629,7 +629,7 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
 
      <row>
       <entry><structname>pg_stat_subscription_workers</structname><indexterm><primary>pg_stat_subscription_workers</primary></indexterm></entry>
-      <entry>At least one row per subscription, showing about errors that
+      <entry>At least one row per subscription, showing transaction statistics and information about errors that
       occurred on subscription.
       See <link linkend="monitoring-pg-stat-subscription-workers">
       <structname>pg_stat_subscription_workers</structname></link> for details.
@@ -3052,9 +3052,9 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
   <para>
    The <structname>pg_stat_subscription_workers</structname> view will contain
-   one row per subscription error reported by workers applying logical
-   replication changes and workers handling the initial data copy of the
-   subscribed tables.
+   one row per subscription, showing corresponding transaction statistics and
+   information about the last error reported by workers applying logical replication
+   changes or by workers handling the initial data copy of the subscribed tables.
   </para>
 
   <table id="pg-stat-subscription-workers" xreflabel="pg_stat_subscription_workers">
@@ -3102,20 +3102,86 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>relid</structfield> <type>oid</type>
+       <structfield>xact_commit_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions successfully applied in this subscription.
+       COMMIT, COMMIT of streaming transaction and COMMIT PREPARED increments
+       this counter.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xact_commit_bytes</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Amount of data (in bytes) successfully applied in this subscription,
+       across <literal>xact_commit_count</literal> transactions.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xact_error_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions that failed to be applied by the table
+       sync worker or main apply worker in this subscription.
+       </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xact_error_bytes</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Amount of data (in bytes) unsuccessfully applied in this subscription,
+       across <literal>xact_error_count</literal> transactions.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xact_abort_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions aborted in this subscription.
+       Rollback of the prepared transaction and abort of streaming transaction
+       increments this counter.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xact_abort_bytes</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Amount of data (in bytes) aborted in this subscription,
+       across <literal>xact_abort_count</literal> transactions. 
+       In order to suppress unnecessary consumed network bandwidth,
+       increase <literal>logical_decoding_work_mem</literal> on the publisher
+       so that it exceeds the size of the whole streamed transaction, if
+       an unexpected amount of streamed transactions are aborted.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>last_error_relid</structfield> <type>oid</type>
       </para>
       <para>
        OID of the relation that the worker was processing when the
-       error occurred
+       last error occurred
       </para></entry>
      </row>
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>command</structfield> <type>text</type>
+       <structfield>last_error_command</structfield> <type>text</type>
       </para>
       <para>
-       Name of command being applied when the error occurred.  This field
+       Name of last command being applied when the error occurred.  This field
        is always NULL if the error was reported during the initial data
        copy.
       </para></entry>
@@ -3123,10 +3189,10 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>xid</structfield> <type>xid</type>
+       <structfield>last_error_xid</structfield> <type>xid</type>
       </para>
       <para>
-       Transaction ID of the publisher node being applied when the error
+       Transaction ID of the publisher node being applied when the last error
        occurred.  This field is always NULL if the error was reported
        during the initial data copy.
       </para></entry>
@@ -3134,19 +3200,19 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>error_count</structfield> <type>uint8</type>
+       <structfield>last_error_count</structfield> <type>uint8</type>
       </para>
       <para>
-       Number of consecutive times the error occurred
+       Number of consecutive times the last error occurred
       </para></entry>
      </row>
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>error_message</structfield> <type>text</type>
+       <structfield>last_error_message</structfield> <type>text</type>
       </para>
       <para>
-       The error message
+       The last error message
       </para></entry>
      </row>
 
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index a2ee00c..186b888 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1267,11 +1267,17 @@ CREATE VIEW pg_stat_subscription_workers AS
 	w.subid,
 	s.subname,
 	w.subrelid,
-	w.relid,
-	w.command,
-	w.xid,
-	w.error_count,
-	w.error_message,
+	w.xact_commit_count,
+	w.xact_commit_bytes,
+	w.xact_error_count,
+	w.xact_error_bytes,
+	w.xact_abort_count,
+	w.xact_abort_bytes,
+	w.last_error_relid,
+	w.last_error_command,
+	w.last_error_xid,
+	w.last_error_count,
+	w.last_error_message,
 	w.last_error_time,
 	w.stats_reset
     FROM (SELECT
diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index 5c723bc..20184c6 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -192,6 +192,16 @@ static void find_matching_subplans_recurse(PartitionPruningData *prunedata,
 										   bool initial_prune,
 										   Bitmapset **validsubplans);
 
+/*
+ * PartitionTupleRoutingSize - exported to calculate total data size
+ * of logical replication mesage apply, because this is one of the
+ * ApplyExecutionData struct members.
+ */
+size_t
+PartitionTupleRoutingSize(void)
+{
+	return sizeof(PartitionTupleRouting);
+}
 
 /*
  * ExecSetupPartitionTupleRouting - sets up information needed during
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index b7883ec..9edbe47 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -54,6 +54,7 @@
 #include "postmaster/fork_process.h"
 #include "postmaster/interrupt.h"
 #include "postmaster/postmaster.h"
+#include "replication/logicalworker.h"
 #include "replication/slot.h"
 #include "replication/walsender.h"
 #include "storage/backendid.h"
@@ -288,6 +289,15 @@ static HTAB *replSlotStatHash = NULL;
 static HTAB *subWorkerStatHash = NULL;
 
 /*
+ * Stats of prepared transactions should be displayed
+ * at either commit prepared or rollback prepared time, even when it's
+ * after the server restart. We have the apply worker send those statistics
+ * to the stats collector at prepare time and the startup process restore
+ * those at restart if necessary.
+ */
+static HTAB *subWorkerPreparedXactSizeHash = NULL;
+
+/*
  * List of OIDs of databases we need to write out.  If an entry is InvalidOid,
  * it means to write only the shared-catalog stats ("DB 0"); otherwise, we
  * will write both that DB's data and the shared stats.
@@ -338,6 +348,9 @@ static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, Timestamp
 
 static PgStat_StatSubWorkerEntry *pgstat_get_subworker_entry(Oid subid, Oid subrelid,
 															 bool create);
+static PgStat_StatSubWorkerPreparedXactSize *pgstat_get_subworker_prepared_txn(Oid subid,
+																			   char *gid, bool create);
+
 static void pgstat_reset_subworker_entry(PgStat_StatSubWorkerEntry *wentry, TimestampTz ts);
 static void pgstat_vacuum_subworker_stats(void);
 static void pgstat_send_subscription_purge(PgStat_MsgSubscriptionPurge *msg);
@@ -388,6 +401,9 @@ static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
 static void pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len);
 static void pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len);
 static void pgstat_recv_subworker_purge(PgStat_MsgSubWorkerPurge *msg, int len);
+static void pgstat_recv_subworker_xact_end(PgStat_MsgSubWorkerXactEnd *msg, int len);
+static void pgstat_recv_subworker_twophase_xact(PgStat_MsgSubWorkerTwophaseXact *msg, int len);
+
 
 /* ------------------------------------------------------------
  * Public functions called from postmaster follow
@@ -2111,6 +2127,72 @@ pgstat_report_replslot_drop(const char *slotname)
 }
 
 /* ----------
+ * pgstat_report_subworker_xact_end() -
+ *
+ *  Tell the collector that worker transaction has successfully completed.
+ * ----------
+ */
+void
+pgstat_report_subworker_xact_end(Oid subid, Oid subrel,
+								 LogicalRepMsgType command, PgStat_Counter xact_size)
+{
+	PgStat_MsgSubWorkerXactEnd msg;
+
+	Assert(command == 0 /* table sync worker */ ||
+		   command == LOGICAL_REP_MSG_COMMIT ||
+		   command == LOGICAL_REP_MSG_STREAM_ABORT ||
+		   command == LOGICAL_REP_MSG_STREAM_COMMIT);
+
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERXACTEND);
+	msg.m_subid = subid;
+	msg.m_subrelid = subrel;
+	msg.m_command = command;
+	msg.m_xact_bytes = xact_size;
+	pgstat_send(&msg, sizeof(PgStat_MsgSubWorkerXactEnd));
+
+	reset_apply_error_context_xact_size();
+}
+
+/* ----------
+ * pgstat_report_subworker_twophase_xact() -
+ *
+ *  Tell the collector that worker transaction has done 2PC related operation.
+ * ----------
+ */
+void
+pgstat_report_subworker_twophase_xact(Oid subid, LogicalRepMsgType command,
+									  PgStat_Counter xact_size,
+									  LogicalRepPreparedTxnData *prepare_data,
+									  LogicalRepCommitPreparedTxnData *commit_data,
+									  LogicalRepRollbackPreparedTxnData *rollback_data)
+{
+	PgStat_MsgSubWorkerTwophaseXact msg;
+
+	Assert(command == LOGICAL_REP_MSG_PREPARE ||
+		   command == LOGICAL_REP_MSG_STREAM_PREPARE ||
+		   command == LOGICAL_REP_MSG_COMMIT_PREPARED ||
+		   command == LOGICAL_REP_MSG_ROLLBACK_PREPARED);
+
+	/* setup the message */
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERTWOPHASEXACT);
+	msg.m_subid = subid;
+	msg.m_command = command;
+
+	/* get the gid for this two phase operation */
+	if (command == LOGICAL_REP_MSG_PREPARE ||
+		command == LOGICAL_REP_MSG_STREAM_PREPARE)
+		strlcpy(msg.m_gid, prepare_data->gid, sizeof(msg.m_gid));
+	else if (command == LOGICAL_REP_MSG_COMMIT_PREPARED)
+		strlcpy(msg.m_gid, commit_data->gid, sizeof(msg.m_gid));
+	else /* rollback prepared */
+		strlcpy(msg.m_gid, rollback_data->gid, sizeof(msg.m_gid));
+
+	msg.m_xact_bytes = xact_size;
+	pgstat_send(&msg, sizeof(PgStat_MsgSubWorkerTwophaseXact));
+	reset_apply_error_context_xact_size();
+}
+
+/* ----------
  * pgstat_report_subworker_error() -
  *
  *	Tell the collector about the subscription worker error.
@@ -2130,6 +2212,7 @@ pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERERROR);
 	msg.m_subid = subid;
 	msg.m_subrelid = subrelid;
+	msg.m_xact_error_bytes = get_apply_error_context_xact_size();
 	msg.m_dbid = MyDatabaseId;
 	msg.m_relid = relid;
 	msg.m_command = command;
@@ -2138,6 +2221,8 @@ pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
 	strlcpy(msg.m_message, errmsg, PGSTAT_SUBWORKERERROR_MSGLEN);
 
 	pgstat_send(&msg, len);
+
+	reset_apply_error_context_xact_size();
 }
 
 /* ----------
@@ -3906,6 +3991,14 @@ PgstatCollectorMain(int argc, char *argv[])
 					pgstat_recv_subworker_purge(&msg.msg_subworkerpurge, len);
 					break;
 
+				case PGSTAT_MTYPE_SUBWORKERXACTEND:
+					pgstat_recv_subworker_xact_end(&msg.msg_subworkerxactend, len);
+					break;
+
+				case PGSTAT_MTYPE_SUBWORKERTWOPHASEXACT:
+					pgstat_recv_subworker_twophase_xact(&msg.msg_subworkertwophasexact, len);
+					break;
+
 				default:
 					break;
 			}
@@ -4223,6 +4316,22 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
 	}
 
 	/*
+	 * Write subscription worker's prepared transaction struct
+	 */
+	if (subWorkerPreparedXactSizeHash)
+	{
+		PgStat_StatSubWorkerPreparedXactSize *prepared_size;
+
+		hash_seq_init(&hstat, subWorkerPreparedXactSizeHash);
+		while((prepared_size = (PgStat_StatSubWorkerPreparedXactSize *) hash_seq_search(&hstat)) != NULL)
+		{
+			fputc('P', fpout);
+			rc = fwrite(prepared_size, sizeof(PgStat_StatSubWorkerPreparedXactSize), 1, fpout);
+			(void) rc; 			/* we'll check for error with ferror */
+		}
+	}
+
+	/*
 	 * No more output to be done. Close the temp file and replace the old
 	 * pgstat.stat with it.  The ferror() check replaces testing for error
 	 * after each individual fputc or fwrite above.
@@ -4725,6 +4834,40 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 					break;
 				}
 
+			case 'P':
+				{
+					PgStat_StatSubWorkerPreparedXactSize buff;
+					PgStat_StatSubWorkerPreparedXactSize *prepared_xact_size;
+
+					if (fread(&buff, 1, sizeof(PgStat_StatSubWorkerPreparedXactSize),
+							  fpin) != sizeof(PgStat_StatSubWorkerPreparedXactSize))
+					{
+						ereport(pgStatRunningInCollector ? LOG : WARNING,
+								(errmsg("corrupted statistics file \"%s\"",
+										statfile)));
+						goto done;
+					}
+
+					if (subWorkerPreparedXactSizeHash == NULL)
+					{
+						HASHCTL		hash_ctl;
+
+						hash_ctl.keysize = sizeof(PgStat_StatSubWorkerPreparedXact);
+						hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerPreparedXactSize);
+						hash_ctl.hcxt = pgStatLocalContext;
+						subWorkerPreparedXactSizeHash = hash_create("Subscription worker stats of prepared txn",
+														PGSTAT_SUBWORKER_HASH_SIZE,
+														&hash_ctl,
+														HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
+					}
+
+					prepared_xact_size =
+						(PgStat_StatSubWorkerPreparedXactSize *) hash_search(subWorkerPreparedXactSizeHash,
+																			 (void *) &buff.key,
+																			 HASH_ENTER, NULL);
+					memcpy(prepared_xact_size, &buff, sizeof(PgStat_StatSubWorkerPreparedXactSize));
+					break;
+				}
 			case 'E':
 				goto done;
 
@@ -5290,6 +5433,7 @@ pgstat_clear_snapshot(void)
 	pgStatDBHash = NULL;
 	replSlotStatHash = NULL;
 	subWorkerStatHash = NULL;
+	subWorkerPreparedXactSizeHash = NULL;
 
 	/*
 	 * Historically the backend_status.c facilities lived in this file, and
@@ -6287,6 +6431,105 @@ pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len)
 }
 
 /* ----------
+ * pgstat_recv_subworker_xact_end() -
+ *
+ *	Process a SUBWORKERXACTEND message.
+ * ----------
+ */
+static void
+pgstat_recv_subworker_xact_end(PgStat_MsgSubWorkerXactEnd *msg, int len)
+{
+	PgStat_StatSubWorkerEntry *wentry;
+
+	wentry = pgstat_get_subworker_entry(msg->m_subid, msg->m_subrelid, true);
+	Assert(wentry);
+
+	/* table sync worker */
+	if (msg->m_command == 0)
+		wentry->xact_commit_count++;
+	else
+	{
+		/* apply worker */
+		switch(msg->m_command)
+		{
+			case LOGICAL_REP_MSG_COMMIT:
+			case LOGICAL_REP_MSG_STREAM_COMMIT:
+				wentry->xact_commit_count++;
+				wentry->xact_commit_bytes += msg->m_xact_bytes;
+				break;
+			case LOGICAL_REP_MSG_STREAM_ABORT:
+				wentry->xact_abort_count++;
+				wentry->xact_abort_bytes += msg->m_xact_bytes;
+				break;
+			default:
+				elog(ERROR, "unexpected logical message type as normal apply end");
+				break;
+		}
+	}
+}
+
+/* ----------
+ * pgstat_recv_subworker_twophase_xact() -
+ *
+ *	Process a SUBWORKERTWOPHASEXACT message.
+ * ----------
+ */
+static void
+pgstat_recv_subworker_twophase_xact(PgStat_MsgSubWorkerTwophaseXact *msg, int len)
+{
+	PgStat_StatSubWorkerPreparedXactSize *prepared_txn;
+	PgStat_StatSubWorkerEntry *wentry;
+	PgStat_StatSubWorkerPreparedXact key;
+
+	prepared_txn = pgstat_get_subworker_prepared_txn(msg->m_subid, msg->m_gid, true);
+	Assert(prepared_txn);
+	switch(msg->m_command)
+	{
+		case LOGICAL_REP_MSG_PREPARE:
+		case LOGICAL_REP_MSG_STREAM_PREPARE:
+			/*
+			 * Make each size of prepared transaction persistent
+			 * so that we can update stats over the server restart
+			 * and make prepared stats updated when commit prepared
+			 * or rollback prepared arrives.
+			 */
+			prepared_txn->subid = msg->m_subid;
+			strlcpy(prepared_txn->gid, msg->m_gid, sizeof(prepared_txn->gid));
+			prepared_txn->xact_size = msg->m_xact_bytes;
+			break;
+
+		case LOGICAL_REP_MSG_COMMIT_PREPARED:
+		case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
+			/* Update exported xact stats now */
+			wentry = pgstat_get_subworker_entry(msg->m_subid,
+												InvalidOid /* apply worker */,
+												true);
+			Assert(wentry);
+			if (msg->m_command == LOGICAL_REP_MSG_COMMIT_PREPARED)
+			{
+				wentry->xact_commit_count++;
+				wentry->xact_commit_bytes += prepared_txn->xact_size;
+			}
+			else
+			{
+				wentry->xact_abort_count++;
+				wentry->xact_abort_bytes += prepared_txn->xact_size;
+			}
+
+			/* Clean up this gid from transaction size hash */
+			key.subid = prepared_txn->subid;
+			strlcpy(key.gid, msg->m_gid, strlen(key.gid));
+			(void) hash_search(subWorkerPreparedXactSizeHash,
+							   (void *) &key, HASH_REMOVE, NULL);
+			break;
+
+		default:
+			elog(ERROR, "unexpected logical message type as prepare transaction");
+			break;
+	}
+}
+
+/* ----------
  * pgstat_recv_subworker_error() -
  *
  *	Process a SUBWORKERERROR message.
@@ -6301,6 +6544,10 @@ pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len)
 	wentry = pgstat_get_subworker_entry(msg->m_subid, msg->m_subrelid, true);
 	Assert(wentry);
 
+	/* general transaction stats for error */
+	wentry->xact_error_count++;
+	wentry->xact_error_bytes += msg->m_xact_error_bytes;
+
 	/*
 	 * Update only the counter and timestamp if we received the same error
 	 * again
@@ -6520,6 +6767,51 @@ pgstat_get_subworker_entry(Oid subid, Oid subrelid, bool create)
 }
 
 /* ----------
+ * pgstat_get_subworker_prepared_txn
+ *
+ * Return subscription worker entry with the given subscription OID and
+ * gid.
+ * ----------
+ */
+static PgStat_StatSubWorkerPreparedXactSize*
+pgstat_get_subworker_prepared_txn(Oid subid, char *gid, bool create)
+{
+	PgStat_StatSubWorkerPreparedXact key;
+	PgStat_StatSubWorkerPreparedXactSize *prepared_txn_size;
+	HASHACTION	action;
+	bool		found;
+
+	if (subWorkerPreparedXactSizeHash == NULL)
+	{
+		HASHCTL		hash_ctl;
+
+		hash_ctl.keysize = sizeof(PgStat_StatSubWorkerPreparedXact);
+		hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerPreparedXactSize);
+		hash_ctl.hcxt = pgStatLocalContext;
+		subWorkerPreparedXactSizeHash = hash_create("Subscription worker stats of prepared txn",
+													PGSTAT_SUBWORKER_HASH_SIZE,
+													&hash_ctl,
+													HASH_ELEM | HASH_STRINGS);
+	}
+
+	key.subid = subid;
+	memcpy(key.gid, gid, strlen(key.gid));
+	action = (create ? HASH_ENTER : HASH_FIND);
+	prepared_txn_size = (PgStat_StatSubWorkerPreparedXactSize *) hash_search(subWorkerPreparedXactSizeHash,
+																			 (void *) &key,
+																			 action, &found);
+
+	if (create && !found)
+	{
+		prepared_txn_size->subid = 0;
+		prepared_txn_size->gid[0] = '\0';
+		prepared_txn_size->xact_size = 0;
+	}
+
+	return prepared_txn_size;
+}
+
+/* ----------
  * pgstat_reset_subworker_entry
  *
  * Reset the given subscription worker statistics.
@@ -6528,6 +6820,12 @@ pgstat_get_subworker_entry(Oid subid, Oid subrelid, bool create)
 static void
 pgstat_reset_subworker_entry(PgStat_StatSubWorkerEntry *wentry, TimestampTz ts)
 {
+	wentry->xact_commit_count = 0;
+	wentry->xact_commit_bytes = 0;
+	wentry->xact_error_count = 0;
+	wentry->xact_error_bytes = 0;
+	wentry->xact_abort_count = 0;
+	wentry->xact_abort_bytes = 0;
 	wentry->dbid = InvalidOid;
 	wentry->relid = InvalidOid;
 	wentry->command = 0;
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 9b6d057..0d04ea5 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1162,6 +1162,12 @@ copy_table_done:
 	MyLogicalRepWorker->relstate_lsn = *origin_startpos;
 	SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
+	/* Report the success of table sync. */
+	pgstat_report_subworker_xact_end(MyLogicalRepWorker->subid,
+									 MyLogicalRepWorker->relid,
+									 0 /* no logical message type */,
+									 0 /* xact size */);
+
 	/*
 	 * Finally, wait until the main apply worker tells us to catch up and then
 	 * return to let LogicalRepApplyLoop do it.
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 3a40684..04d4236 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -221,21 +221,65 @@ typedef struct ApplyErrorCallbackArg
 	LogicalRepMsgType command;	/* 0 if invalid */
 	LogicalRepRelMapEntry *rel;
 
+	/*
+	 * Store data size of this transaction.
+	 *
+	 * The byte size of transaction on the publisher is calculated
+	 * by ReorderBufferChangeSize() based on the ReorderBufferChange
+	 * structure. But on the subscriber, consumed resources are
+	 * not same as the publisher's decoding processsing and required
+	 * to be computed in different way. Therefore, the exact same byte
+	 * size is not restored on the subscriber usually.
+	 *
+	 * Data size of streaming transactions is managed by streamingXactSize
+	 * for flexible data blocks handling.
+	 */
+	PgStat_Counter bytes;
+
 	/* Remote node information */
 	int			remote_attnum;	/* -1 if invalid */
 	TransactionId remote_xid;
 	TimestampTz ts;				/* commit, rollback, or prepare timestamp */
 } ApplyErrorCallbackArg;
 
+/* Struct to indicate extra consumption of transaction size */
+typedef union ApplyTxnExtraData
+{
+	LogicalRepRelMapEntry *relmapentry;
+	LogicalRepRelation *reprelation;
+	int         *stream_write_len;
+} ApplyTxnExtraData;
+
 static ApplyErrorCallbackArg apply_error_callback_arg =
 {
 	.command = 0,
 	.rel = NULL,
+	.bytes = 0,
 	.remote_attnum = -1,
 	.remote_xid = InvalidTransactionId,
 	.ts = 0,
 };
 
+/*
+ * Two or more streaming transactions in parallel on the publisher
+ * generate unexpected order of partial txn data demarcated stream start
+ * and stream stop to the subscriber, since whenever its size of one of
+ * the txns reaches the publisher's logical_decoding_work_mem,
+ * the part (and in the end, last remaining changes) is streamed.
+ * This creates mixed blocks of streaming data while
+ * there's possibility some are successfully committed but others are
+ * not by stream abort. Therefore, to track correct byte size
+ * it's necessary to trace each streaming transaction by making pair
+ * of xid and transaction size.
+ */
+#define PARALLEL_STREAMING_XACTS 32
+typedef struct XactSizeEntry
+{
+	TransactionId	key;
+	PgStat_Counter	xact_size;
+} XactSizeEntry;
+static HTAB *streamingXactSize = NULL;
+
 static MemoryContext ApplyMessageContext = NULL;
 MemoryContext ApplyContext = NULL;
 
@@ -304,6 +348,9 @@ static void maybe_reread_subscription(void);
 /* prototype needed because of stream_commit */
 static void apply_dispatch(StringInfo s);
 
+static void update_apply_change_size(LogicalRepMsgType action,
+									 ApplyTxnExtraData *extra_data);
+
 static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
 static void apply_handle_insert_internal(ApplyExecutionData *edata,
 										 ResultRelInfo *relinfo,
@@ -818,6 +865,11 @@ apply_handle_commit(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(commit_data.end_lsn);
 
+	pgstat_report_subworker_xact_end(MyLogicalRepWorker->subid,
+									 InvalidOid,
+									 LOGICAL_REP_MSG_COMMIT,
+									 get_apply_error_context_xact_size());
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 	reset_apply_error_context_info();
 }
@@ -922,6 +974,11 @@ apply_handle_prepare(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(prepare_data.end_lsn);
 
+	pgstat_report_subworker_twophase_xact(MyLogicalRepWorker->subid,
+										  LOGICAL_REP_MSG_PREPARE,
+										  get_apply_error_context_xact_size(),
+										  &prepare_data, NULL, NULL);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 	reset_apply_error_context_info();
 }
@@ -963,6 +1020,13 @@ apply_handle_commit_prepared(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(prepare_data.end_lsn);
 
+	update_apply_change_size(LOGICAL_REP_MSG_COMMIT_PREPARED, NULL);
+
+	pgstat_report_subworker_twophase_xact(MyLogicalRepWorker->subid,
+										  LOGICAL_REP_MSG_COMMIT_PREPARED,
+										  get_apply_error_context_xact_size(),
+										  NULL, &prepare_data, NULL);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 	reset_apply_error_context_info();
 }
@@ -1014,6 +1078,12 @@ apply_handle_rollback_prepared(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(rollback_data.rollback_end_lsn);
 
+	/* send rollback prepared message for this gid */
+	pgstat_report_subworker_twophase_xact(MyLogicalRepWorker->subid,
+										  LOGICAL_REP_MSG_ROLLBACK_PREPARED,
+										  get_apply_error_context_xact_size(),
+										  NULL, NULL, &rollback_data);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 	reset_apply_error_context_info();
 }
@@ -1029,6 +1099,7 @@ static void
 apply_handle_stream_prepare(StringInfo s)
 {
 	LogicalRepPreparedTxnData prepare_data;
+	XactSizeEntry *streamed_entry;
 
 	if (in_streamed_transaction)
 		ereport(ERROR,
@@ -1066,6 +1137,19 @@ apply_handle_stream_prepare(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(prepare_data.end_lsn);
 
+	/*
+	 * Report the prepared streaming xact size to the stats collector
+	 * in a prepared xact manner to make it survive over the restart.
+	 */
+	streamed_entry = hash_search(streamingXactSize,
+								 (void *) &prepare_data.xid,
+								 HASH_FIND, NULL);
+	Assert(streamed_entry);
+	pgstat_report_subworker_twophase_xact(MyLogicalRepWorker->subid,
+										  LOGICAL_REP_MSG_STREAM_PREPARE,
+										  streamed_entry->xact_size,
+										  &prepare_data, NULL, NULL);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 
 	reset_apply_error_context_info();
@@ -1143,6 +1227,8 @@ apply_handle_stream_start(StringInfo s)
 		MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
 		FileSetInit(MyLogicalRepWorker->stream_fileset);
 
+		update_apply_change_size(LOGICAL_REP_MSG_STREAM_START, NULL);
+
 		MemoryContextSwitchTo(oldctx);
 	}
 
@@ -1193,12 +1279,17 @@ apply_handle_stream_stop(StringInfo s)
 
 /*
  * Handle STREAM abort message.
+ *
+ * Currently, abort of streaming subtransaction does not affect
+ * size of streaming transaction resources because it has used the
+ * resources anyway.
  */
 static void
 apply_handle_stream_abort(StringInfo s)
 {
 	TransactionId xid;
 	TransactionId subxid;
+	XactSizeEntry *streamed_entry;
 
 	if (in_streamed_transaction)
 		ereport(ERROR,
@@ -1213,8 +1304,36 @@ apply_handle_stream_abort(StringInfo s)
 	 */
 	if (xid == subxid)
 	{
+		bool		found = false;
 		set_apply_error_context_xact(xid, 0);
 		stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+
+		/*
+		 * We've completed to handle stream abort without issue, so
+		 * get ready to report the transaction stats via normal
+		 * termination route instead of the apply error route.
+		 */
+		streamed_entry = hash_search(streamingXactSize,
+									 (void *) &xid,
+									 HASH_FIND, &found);
+		/*
+		 * It's possible that we get stream abort
+		 * earlier than any call of write_stream_change that
+		 * creates one hash entry for this xid. In this case,
+		 * to find a entry with this xid fails. So just check
+		 * if we've found it. Only when we confirm some writes
+		 * by write_stream_change, report the stream_abort.
+		 */
+		if (found)
+		{
+			pgstat_report_subworker_xact_end(MyLogicalRepWorker->subid,
+											 InvalidOid,
+											 LOGICAL_REP_MSG_STREAM_ABORT,
+											 streamed_entry->xact_size);
+			(void) hash_search(streamingXactSize,
+							   (void *) &xid,
+							   HASH_REMOVE, NULL);
+		}
 	}
 	else
 	{
@@ -1417,6 +1536,7 @@ apply_handle_stream_commit(StringInfo s)
 {
 	TransactionId xid;
 	LogicalRepCommitData commit_data;
+	XactSizeEntry *streamed_entry;
 
 	if (in_streamed_transaction)
 		ereport(ERROR,
@@ -1438,6 +1558,19 @@ apply_handle_stream_commit(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(commit_data.end_lsn);
 
+	/* Report and clean up the xid */
+	streamed_entry = hash_search(streamingXactSize,
+								 (void *) &xid,
+								 HASH_FIND, NULL);
+	Assert(streamed_entry);
+	pgstat_report_subworker_xact_end(MyLogicalRepWorker->subid,
+									 InvalidOid,
+									 LOGICAL_REP_MSG_STREAM_COMMIT,
+									 streamed_entry->xact_size);
+	(void) hash_search(streamingXactSize,
+					   (void *) &xid,
+					   HASH_REMOVE, NULL);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 
 	reset_apply_error_context_info();
@@ -1541,6 +1674,7 @@ apply_handle_insert(StringInfo s)
 	EState	   *estate;
 	TupleTableSlot *remoteslot;
 	MemoryContext oldctx;
+	ApplyTxnExtraData extra_size;
 
 	if (handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
 		return;
@@ -1576,6 +1710,10 @@ apply_handle_insert(StringInfo s)
 	slot_fill_defaults(rel, estate, remoteslot);
 	MemoryContextSwitchTo(oldctx);
 
+	/* Update transaction size */
+	extra_size.relmapentry = rel;
+	update_apply_change_size(LOGICAL_REP_MSG_INSERT, &extra_size);
+
 	/* For a partitioned table, insert the tuple into a partition. */
 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 		apply_handle_tuple_routing(edata,
@@ -1667,6 +1805,7 @@ apply_handle_update(StringInfo s)
 	TupleTableSlot *remoteslot;
 	RangeTblEntry *target_rte;
 	MemoryContext oldctx;
+	ApplyTxnExtraData extra_size;
 
 	if (handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
 		return;
@@ -1733,6 +1872,10 @@ apply_handle_update(StringInfo s)
 					has_oldtup ? &oldtup : &newtup);
 	MemoryContextSwitchTo(oldctx);
 
+	/* Update transaction size */
+	extra_size.relmapentry = rel;
+	update_apply_change_size(LOGICAL_REP_MSG_UPDATE, &extra_size);
+
 	/* For a partitioned table, apply update to correct partition. */
 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 		apply_handle_tuple_routing(edata,
@@ -1830,6 +1973,7 @@ apply_handle_delete(StringInfo s)
 	EState	   *estate;
 	TupleTableSlot *remoteslot;
 	MemoryContext oldctx;
+	ApplyTxnExtraData extra_size;
 
 	if (handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
 		return;
@@ -1867,6 +2011,10 @@ apply_handle_delete(StringInfo s)
 	slot_store_data(remoteslot, rel, &oldtup);
 	MemoryContextSwitchTo(oldctx);
 
+	/* Update transaction size */
+	extra_size.relmapentry =  rel;
+	update_apply_change_size(LOGICAL_REP_MSG_DELETE, &extra_size);
+
 	/* For a partitioned table, apply delete to correct partition. */
 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 		apply_handle_tuple_routing(edata,
@@ -2418,6 +2566,111 @@ apply_dispatch(StringInfo s)
 }
 
 /*
+ * Subscriber side implementation equivalent to ReorderBufferChangeSize
+ * of the publisher.
+ *
+ * According to the logical replication message type, record major
+ * resource consumptions of this subscription for each message.
+ * At present, do not collect data from generic functions to keep
+ * code simplicity, since the implementation complexity versus benefit
+ * tradeoff should not be good. Also, add multiple values
+ * at once in order to reduce the number of calls to this function.
+ *
+ * 'extra_data' controls detail handling of data size calculation.
+ */
+static void
+update_apply_change_size(LogicalRepMsgType action, ApplyTxnExtraData *extra_data)
+{
+	int64       size = 0;
+
+	/*
+	 * In streaming mode, stream_write_change is called
+	 * instead of immediate apply. List up the messages types
+	 * that can be caught by handle_streamed_transaction and
+	 * treat the write length as the size of transaction so
+	 * that we can export it as part of pg_stat_subscription_worker.
+	 */
+	if (in_streamed_transaction &&
+		(action == LOGICAL_REP_MSG_INSERT ||
+		 action == LOGICAL_REP_MSG_UPDATE ||
+		 action == LOGICAL_REP_MSG_DELETE ||
+		 action == LOGICAL_REP_MSG_TRUNCATE ||
+		 action == LOGICAL_REP_MSG_RELATION ||
+		 action == LOGICAL_REP_MSG_TYPE))
+	{
+		size += *extra_data->stream_write_len;
+		add_apply_error_context_xact_size(size);
+		return;
+	}
+
+	switch (action)
+	{
+		/* No special memory consumption */
+		case LOGICAL_REP_MSG_BEGIN:
+		case LOGICAL_REP_MSG_COMMIT:
+		case LOGICAL_REP_MSG_TRUNCATE:
+		case LOGICAL_REP_MSG_TYPE:
+		case LOGICAL_REP_MSG_ORIGIN:
+		case LOGICAL_REP_MSG_MESSAGE:
+		case LOGICAL_REP_MSG_STREAM_STOP:
+		case LOGICAL_REP_MSG_STREAM_ABORT:
+		case LOGICAL_REP_MSG_BEGIN_PREPARE:
+			break;
+
+		case LOGICAL_REP_MSG_INSERT:
+		case LOGICAL_REP_MSG_UPDATE:
+		case LOGICAL_REP_MSG_DELETE:
+			Assert(extra_data != NULL);
+
+			/*
+			 * Compute size based on ApplyExecutionData.
+			 * The size of LogicalRepRelMapEntry can be skipped because
+			 * it is obtained from hash_search in logicalrep_rel_open.
+			 */
+			size += sizeof(ApplyExecutionData) + sizeof(EState) +
+				sizeof(ResultRelInfo) + sizeof(ResultRelInfo);
+
+			/*
+			 * Add some extra size if the target relation is partitioned.
+			 * PartitionTupleRouting isn't exported. Therefore, call the
+			 * function that returns its size instead.
+			 */
+			if (extra_data->relmapentry->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+				size += sizeof(ModifyTableState) + PartitionTupleRoutingSize();
+			break;
+
+		case LOGICAL_REP_MSG_RELATION:
+			Assert(extra_data != NULL);
+
+			/* See logicalrep_read_attrs for the last two */
+			size += sizeof(LogicalRepRelation) +
+				extra_data->reprelation->natts * sizeof(char *) +
+				extra_data->reprelation->natts * sizeof(Oid);
+			break;
+
+		case LOGICAL_REP_MSG_STREAM_START:
+			size += sizeof(FileSet);
+			break;
+
+		case LOGICAL_REP_MSG_PREPARE:
+		case LOGICAL_REP_MSG_COMMIT_PREPARED:
+		case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
+		case LOGICAL_REP_MSG_STREAM_COMMIT:
+		case LOGICAL_REP_MSG_STREAM_PREPARE:
+			size += sizeof(FlushPosition);
+			break;
+
+		default:
+			ereport(ERROR,
+					(errcode(ERRCODE_PROTOCOL_VIOLATION),
+					 errmsg("invalid logical replication message type \"%c\"", action)));
+	}
+
+	/* update the total size of consumption */
+	add_apply_error_context_xact_size(size);
+}
+
+/*
  * Figure out which write/flush positions to report to the walsender process.
  *
  * We can't simply report back the last LSN the walsender sent us because the
@@ -3271,6 +3524,9 @@ static void
 stream_write_change(char action, StringInfo s)
 {
 	int			len;
+	int			total_len;
+	bool		found;
+	XactSizeEntry *streamed_entry;
 
 	Assert(in_streamed_transaction);
 	Assert(TransactionIdIsValid(stream_xid));
@@ -3289,6 +3545,16 @@ stream_write_change(char action, StringInfo s)
 	len = (s->len - s->cursor);
 
 	BufFileWrite(stream_fd, &s->data[s->cursor], len);
+
+	/* update xact size by xid */
+	total_len = (s->len - s->cursor) * 2 + sizeof(char) + sizeof(action);
+	streamed_entry = (XactSizeEntry *) hash_search(streamingXactSize,
+												   (void *) &stream_xid,
+												   HASH_ENTER, &found);
+	if (!found)
+		streamed_entry->xact_size = total_len; /* init */
+	else
+		streamed_entry->xact_size += total_len; /* update */
 }
 
 /*
@@ -3426,6 +3692,23 @@ ApplyWorkerMain(Datum main_arg)
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
 
+	/*
+	 * Initialize the apply worker's hash to manage bytes for
+	 * streaming txns.
+	 */
+	if (!am_tablesync_worker() && MySubscription->stream)
+	{
+		HASHCTL     hash_ctl;
+
+		hash_ctl.keysize = sizeof(TransactionId);
+		hash_ctl.entrysize = sizeof(XactSizeEntry);
+		hash_ctl.hcxt = LogicalStreamingContext;
+		streamingXactSize = hash_create("xact size per streaming xid",
+										PARALLEL_STREAMING_XACTS,
+										&hash_ctl,
+										HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+	}
+
 	if (am_tablesync_worker())
 	{
 		char	   *syncslotname;
@@ -3601,6 +3884,27 @@ ApplyWorkerMain(Datum main_arg)
 	proc_exit(0);
 }
 
+/* Exported so that stats collector can utilize this value */
+int64
+get_apply_error_context_xact_size(void)
+{
+	return apply_error_callback_arg.bytes;
+}
+
+/* Add size to apply error bytes */
+void
+add_apply_error_context_xact_size(int64 size)
+{
+	apply_error_callback_arg.bytes += size;
+}
+
+/* Reset information of apply error callback */
+void
+reset_apply_error_context_xact_size(void)
+{
+	apply_error_callback_arg.bytes = 0;
+}
+
 /*
  * Is current process a logical replication worker?
  */
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 2511df1..31aa46e 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2405,7 +2405,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS 9
+#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS 15
 	Oid			subid = PG_GETARG_OID(0);
 	Oid			subrelid;
 	TupleDesc	tupdesc;
@@ -2425,19 +2425,31 @@ pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 					   OIDOID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "subrelid",
 					   OIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "relid",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "xact_commit_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "xact_commit_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "xact_error_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "xact_error_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "xact_abort_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "xact_abort_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "last_error_relid",
 					   OIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "command",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "last_error_command",
 					   TEXTOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "xid",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 11, "last_error_xid",
 					   XIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "error_count",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 12, "last_error_count",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "error_message",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 13, "last_error_message",
 					   TEXTOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "last_error_time",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 14, "last_error_time",
 					   TIMESTAMPTZOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "stats_reset",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 15, "stats_reset",
 					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
@@ -2462,28 +2474,36 @@ pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 	else
 		nulls[i++] = true;
 
-	/* relid */
+	/* transaction stats */
+	values[i++] = Int64GetDatum(wentry->xact_commit_count);
+	values[i++] = Int64GetDatum(wentry->xact_commit_bytes);
+	values[i++] = Int64GetDatum(wentry->xact_error_count);
+	values[i++] = Int64GetDatum(wentry->xact_error_bytes);
+	values[i++] = Int64GetDatum(wentry->xact_abort_count);
+	values[i++] = Int64GetDatum(wentry->xact_abort_bytes);
+
+	/* last_error_relid */
 	if (OidIsValid(wentry->relid))
 		values[i++] = ObjectIdGetDatum(wentry->relid);
 	else
 		nulls[i++] = true;
 
-	/* command */
+	/* last_error_command */
 	if (wentry->command != 0)
 		values[i++] = CStringGetTextDatum(logicalrep_message_type(wentry->command));
 	else
 		nulls[i++] = true;
 
-	/* xid */
+	/* last_error_xid */
 	if (TransactionIdIsValid(wentry->xid))
 		values[i++] = TransactionIdGetDatum(wentry->xid);
 	else
 		nulls[i++] = true;
 
-	/* error_count */
+	/* last_error_count */
 	values[i++] = Int64GetDatum(wentry->error_count);
 
-	/* error_message */
+	/* last_error_message */
 	values[i++] = CStringGetTextDatum(wentry->error_message);
 
 	/* last_error_time */
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index e6c7abb..ae372a5 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5389,9 +5389,9 @@
   proname => 'pg_stat_get_subscription_worker', prorows => '1', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'oid oid',
-  proallargtypes => '{oid,oid,oid,oid,oid,text,xid,int8,text,timestamptz,timestamptz}',
-  proargmodes => '{i,i,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subrelid,subid,subrelid,relid,command,xid,error_count,error_message,last_error_time,stats_reset}',
+  proallargtypes => '{oid,oid,oid,oid,int8,int8,int8,int8,int8,int8,oid,text,xid,int8,text,timestamptz,timestamptz}',
+  proargmodes => '{i,i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subrelid,subid,subrelid,xact_commit_count,xact_commit_bytes,xact_error_count,xact_error_bytes,xact_abort_count,xact_abort_bytes,last_error_relid,last_error_command,last_error_xid,last_error_count,last_error_message,last_error_time,stats_reset}',
   prosrc => 'pg_stat_get_subscription_worker' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/executor/execPartition.h b/src/include/executor/execPartition.h
index 694e38b..773e46c 100644
--- a/src/include/executor/execPartition.h
+++ b/src/include/executor/execPartition.h
@@ -110,6 +110,7 @@ typedef struct PartitionPruneState
 	PartitionPruningData *partprunedata[FLEXIBLE_ARRAY_MEMBER];
 } PartitionPruneState;
 
+extern size_t PartitionTupleRoutingSize(void);
 extern PartitionTupleRouting *ExecSetupPartitionTupleRouting(EState *estate,
 															 Relation rel);
 extern ResultRelInfo *ExecFindPartition(ModifyTableState *mtstate,
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 7a26d6d..ccc9a93 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -88,6 +88,8 @@ typedef enum StatMsgType
 	PGSTAT_MTYPE_SUBSCRIPTIONPURGE,
 	PGSTAT_MTYPE_SUBWORKERERROR,
 	PGSTAT_MTYPE_SUBWORKERPURGE,
+	PGSTAT_MTYPE_SUBWORKERXACTEND,
+	PGSTAT_MTYPE_SUBWORKERTWOPHASEXACT,
 } StatMsgType;
 
 /* ----------
@@ -591,6 +593,54 @@ typedef struct PgStat_MsgSubWorkerPurge
 } PgStat_MsgSubWorkerPurge;
 
 /* ----------
+ * PgStat_MsgSubscriptionXactEnd	Sent by the apply worker or the table sync worker
+ *									to report successful transaction ends.
+ * ----------
+ */
+typedef struct PgStat_MsgSubWorkerXactEnd
+{
+	PgStat_MsgHdr m_hdr;
+
+	/* determine the worker entry */
+	Oid         m_subid;
+	Oid         m_subrelid;
+
+	/*
+	 * distinguish between transaction commits and streaming transaction aborts
+	 * that are handled without error.
+	 */
+	LogicalRepMsgType m_command;
+
+	/* memory consumption used by transaction */
+	PgStat_Counter m_xact_bytes;
+
+} PgStat_MsgSubWorkerXactEnd;
+
+/* ----------
+ * PgStat_MsgSubWorkerTwophaseXact	Sent by the apply worker to make size of prepared
+ *									txn persistent over the server restart and make it
+ *									visible after commit prepare or rollback prepared.
+ *									This is separated from PgStat_MsgSubWorkerXactEnd
+ *									so that we can reduce message size of gid for other
+ *									operations (e.g. normal COMMIT) that should happen more
+ *									frequently than prepare operation usually.
+ * ----------
+ */
+typedef struct PgStat_MsgSubWorkerTwophaseXact
+{
+	PgStat_MsgHdr m_hdr;
+
+	/* determine the subscription */
+	Oid         m_subid;
+
+	LogicalRepMsgType m_command;
+	char         m_gid[GIDSIZE];
+	int          gid_len;
+	PgStat_Counter m_xact_bytes;
+
+} PgStat_MsgSubWorkerTwophaseXact;
+
+/* ----------
  * PgStat_MsgSubWorkerError		Sent by the apply worker or the table sync worker to
  *								report the error occurred during logical replication.
  * ----------
@@ -609,6 +659,12 @@ typedef struct PgStat_MsgSubWorkerError
 	Oid			m_subrelid;
 
 	/*
+	 * Transaction stats of subscription needs to be updated when an
+	 * error occurs.
+	 */
+	PgStat_Counter m_xact_error_bytes;
+
+	/*
 	 * Oids of the database and the table that the reporter was actually
 	 * processing. m_relid can be InvalidOid if an error occurred during
 	 * worker applying a non-data-modification message such as RELATION.
@@ -803,6 +859,8 @@ typedef union PgStat_Msg
 	PgStat_MsgSubscriptionPurge msg_subscriptionpurge;
 	PgStat_MsgSubWorkerError msg_subworkererror;
 	PgStat_MsgSubWorkerPurge msg_subworkerpurge;
+	PgStat_MsgSubWorkerXactEnd msg_subworkerxactend;
+	PgStat_MsgSubWorkerTwophaseXact msg_subworkertwophasexact;
 } PgStat_Msg;
 
 
@@ -1035,6 +1093,16 @@ typedef struct PgStat_StatSubWorkerEntry
 	PgStat_StatSubWorkerKey key;	/* hash key (must be first) */
 
 	/*
+	 * Transaction statistics of subscription worker
+	 */
+	PgStat_Counter xact_commit_count;
+	PgStat_Counter xact_commit_bytes;
+	PgStat_Counter xact_error_count;
+	PgStat_Counter xact_error_bytes;
+	PgStat_Counter xact_abort_count;
+	PgStat_Counter xact_abort_bytes;
+
+	/*
 	 * Subscription worker error statistics representing an error that
 	 * occurred during application of logical replication or the initial table
 	 * synchronization.
@@ -1049,6 +1117,22 @@ typedef struct PgStat_StatSubWorkerEntry
 	TimestampTz stat_reset_timestamp;
 } PgStat_StatSubWorkerEntry;
 
+/* prepared transaction */
+typedef struct PgStat_StatSubWorkerPreparedXact
+{
+	Oid			subid;
+	char		gid[GIDSIZE];
+} PgStat_StatSubWorkerPreparedXact;
+
+typedef struct PgStat_StatSubWorkerPreparedXactSize
+{
+	PgStat_StatSubWorkerPreparedXact key; /* hash key */
+
+	Oid			subid;
+	char		gid[GIDSIZE];
+	PgStat_Counter xact_size;
+} PgStat_StatSubWorkerPreparedXactSize;
+
 /*
  * Working state needed to accumulate per-function-call timing statistics.
  */
@@ -1158,6 +1242,13 @@ extern void pgstat_report_checksum_failure(void);
 extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat);
 extern void pgstat_report_replslot_create(const char *slotname);
 extern void pgstat_report_replslot_drop(const char *slotname);
+extern void pgstat_report_subworker_xact_end(Oid subid, Oid subrel,
+											 LogicalRepMsgType command, PgStat_Counter xact_size);
+extern void pgstat_report_subworker_twophase_xact(Oid subid, LogicalRepMsgType command,
+												  PgStat_Counter xact_size,
+												  LogicalRepPreparedTxnData *prepared_data,
+												  LogicalRepCommitPreparedTxnData *commit_data,
+												  LogicalRepRollbackPreparedTxnData *rollback_data);
 extern void pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
 										  LogicalRepMsgType command,
 										  TransactionId xid, const char *errmsg);
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 2ad61a0..9a8447b 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -16,4 +16,9 @@ extern void ApplyWorkerMain(Datum main_arg);
 
 extern bool IsLogicalWorker(void);
 
+/* for transaction stats */
+extern int64 get_apply_error_context_xact_size(void);
+extern void add_apply_error_context_xact_size(int64 size);
+extern void reset_apply_error_context_xact_size(void);
+
 #endif							/* LOGICALWORKER_H */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index f6b1bd6..8d7f065 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2097,11 +2097,17 @@ pg_stat_subscription| SELECT su.oid AS subid,
 pg_stat_subscription_workers| SELECT w.subid,
     s.subname,
     w.subrelid,
-    w.relid,
-    w.command,
-    w.xid,
-    w.error_count,
-    w.error_message,
+    w.xact_commit_count,
+    w.xact_commit_bytes,
+    w.xact_error_count,
+    w.xact_error_bytes,
+    w.xact_abort_count,
+    w.xact_abort_bytes,
+    w.last_error_relid,
+    w.last_error_command,
+    w.last_error_xid,
+    w.last_error_count,
+    w.last_error_message,
     w.last_error_time,
     w.stats_reset
    FROM ( SELECT pg_subscription.oid AS subid,
@@ -2112,7 +2118,7 @@ pg_stat_subscription_workers| SELECT w.subid,
             pg_subscription_rel.srrelid AS relid
            FROM pg_subscription_rel
           WHERE (pg_subscription_rel.srsubstate <> 'r'::"char")) sr,
-    (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, relid, command, xid, error_count, error_message, last_error_time, stats_reset)
+    (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, xact_commit_count, xact_commit_bytes, xact_error_count, xact_error_bytes, xact_abort_count, xact_abort_bytes, last_error_relid, last_error_command, last_error_xid, last_error_count, last_error_message, last_error_time, stats_reset)
      JOIN pg_subscription s ON ((w.subid = s.oid)));
 pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid,
     pg_stat_all_indexes.indexrelid,
diff --git a/src/test/subscription/t/026_error_report.pl b/src/test/subscription/t/026_error_report.pl
index 3d23bb5..9ff619e 100644
--- a/src/test/subscription/t/026_error_report.pl
+++ b/src/test/subscription/t/026_error_report.pl
@@ -15,8 +15,8 @@ sub test_subscription_error
 
     my $check_sql = qq[
 SELECT count(1) > 0 FROM pg_stat_subscription_workers
-WHERE relid = '$relname'::regclass];
-    $check_sql .= " AND xid = '$xid'::xid;" if $xid ne '';
+WHERE last_error_relid = '$relname'::regclass];
+    $check_sql .= " AND last_error_xid = '$xid'::xid;" if $xid ne '';
 
     # Wait for the error statistics to be updated.
     $node->poll_query_until(
@@ -26,9 +26,9 @@ WHERE relid = '$relname'::regclass];
     my $result = $node->safe_psql(
 	'postgres',
 	qq[
-SELECT subname, command, relid::regclass, error_count > 0
+SELECT subname, last_error_command, last_error_relid::regclass, last_error_count > 0
 FROM pg_stat_subscription_workers
-WHERE relid = '$relname'::regclass;
+WHERE last_error_relid = '$relname'::regclass;
 ]);
     is($result, $expected_error, $msg);
 }
diff --git a/src/test/subscription/t/027_worker_xact_stats.pl b/src/test/subscription/t/027_worker_xact_stats.pl
new file mode 100644
index 0000000..0dad552
--- /dev/null
+++ b/src/test/subscription/t/027_worker_xact_stats.pl
@@ -0,0 +1,172 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Tests for subscription worker statistics during apply.
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 2;
+
+# Create publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', qq[
+logical_decoding_work_mem = 64kB
+max_prepared_transactions = 10
+max_wal_senders = 10
+wal_sender_timeout = 0
+]);
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf', qq[
+max_prepared_transactions = 10
+]);
+$node_subscriber->start;
+
+# Setup structures on the publisher and the subscriber
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_tab (a int)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_tab (a int primary key);");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname = 'tap_sub';
+my $subopts = 'streaming = on, two_phase = on, copy_data = false';
+$node_subscriber->safe_psql('postgres', qq[
+CREATE SUBSCRIPTION tap_sub
+CONNECTION '$publisher_connstr application_name=$appname'
+PUBLICATION tap_pub WITH ($subopts);
+]);
+
+$node_publisher->wait_for_catchup($appname);
+
+# There's no entry at the beginning
+my $result = $node_subscriber->safe_psql('postgres',
+"SELECT count(*) FROM pg_stat_subscription_workers;");
+is($result, q(0), 'no entry for transaction stats yet');
+
+# COMMIT
+$node_publisher->safe_psql('postgres',
+    "BEGIN; INSERT INTO test_tab VALUES (1); COMMIT;");
+
+$node_subscriber->poll_query_until('postgres',
+"SELECT count(1) = 1 FROM pg_stat_subscription_workers where xact_commit_count = 1;")
+  or die "didn't get updates of xact stats by commit";
+
+# Now, stats collector make the bytes updated also.
+$result = $node_subscriber->safe_psql('postgres',
+"SELECT xact_commit_bytes > 0 FROM pg_stat_subscription_workers;");
+is($result, q(t), 'got consumed bytes');
+
+# STREAM COMMIT
+$node_publisher->safe_psql(
+    'postgres',
+    "BEGIN; INSERT INTO test_tab VALUES(generate_series(1001, 2000)); COMMIT;");
+$node_subscriber->poll_query_until('postgres',
+"SELECT count(1) = 1 FROM pg_stat_subscription_workers where xact_commit_count = 2;")
+  or die "didn't get updates of xact stats by stream commit";
+
+# STREAM PREPARE & COMMIT PREPARED
+# This should restore the xact size before the shutdown.
+$node_publisher->safe_psql(
+    'postgres', q[
+BEGIN;
+INSERT INTO test_tab VALUES(generate_series(2001, 3000));
+PREPARE TRANSACTION 'gid1';
+]);
+
+# This streamed prepare is not displayed until the commit prepared
+# or rollback prepared. Hence, there's no way to confirm that
+# stats collector has received the bytes of prepared transaction.
+# So, instead of checking the view, issue one more committed transaction
+# after the prepare and make sure that this commit's update is done,
+# which should mean the previous streamed prepare is already processed
+# by the stats collector as well.
+$node_publisher->safe_psql('postgres',
+"BEGIN; INSERT INTO test_tab VALUES (2); COMMIT;");
+$node_subscriber->poll_query_until('postgres',
+"SELECT count(1) = 1 FROM pg_stat_subscription_workers where xact_commit_count = 3;")
+  or die "didn't process the updates of committed transaction";
+
+$node_subscriber->restart;
+
+# Get xact_commit_bytes before commit prepared.
+my $tmp = $node_subscriber->safe_psql('postgres',
+"SELECT xact_commit_bytes FROM pg_stat_subscription_workers where xact_commit_count = 3;");
+
+# Commit prepared increments the xact_commit.
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'gid1'");
+$node_subscriber->poll_query_until('postgres',
+"SELECT count(1) = 1 FROM pg_stat_subscription_workers where xact_commit_count = 4;")
+  or die "didn't get updates of xact stats by stream prepare and commit prepared";
+
+$node_subscriber->poll_query_until('postgres',
+"SELECT xact_commit_bytes > $tmp FROM pg_stat_subscription_workers where xact_commit_count = 4;");
+
+# STREAM ABORT
+# Store previous stream_counts to recognize
+# another streaming from the increase of this value below.
+$tmp = $node_publisher->safe_psql('postgres',
+"SELECT stream_count FROM pg_stat_replication_slots");
+
+# Cause a new streaming and check it by another session
+my $in = '';
+my $out = '';
+my $timer = IPC::Run::timeout(180);
+my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer,
+	on_error_stop => 1);
+
+$in .= q{
+BEGIN;
+INSERT INTO test_tab VALUES(generate_series(3001, 4000));
+};
+$h->pump_nb;
+
+# Wait until this transaction is streamed certainly
+# and after that rollback to send stream abort.
+$node_publisher->poll_query_until('postgres',
+"SELECT stream_count > $tmp FROM pg_stat_replication_slots;")
+  or die "didn't stream data of a new transaction";
+
+$in .= q{
+ROLLBACK;
+\q
+};
+$h->finish;
+
+$node_subscriber->poll_query_until('postgres',
+"SELECT count(1) = 1 FROM pg_stat_subscription_workers where xact_abort_count = 1;")
+  or die "didn't get updates of xact stats by stream abort";
+
+# ROLLBACK PREPARED
+$node_publisher->safe_psql('postgres', q[
+BEGIN;
+INSERT INTO test_tab VALUES (3);
+PREPARE TRANSACTION 'gid2';
+]);
+$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'gid2'");
+$node_subscriber->poll_query_until('postgres',
+"SELECT count(1) = 1 FROM pg_stat_subscription_workers where xact_abort_count = 2;")
+  or die "didn't get updates of xact stats by rollback prepared";
+
+# error stats (by duplication error)
+$node_publisher->safe_psql('postgres', q[
+BEGIN;
+INSERT INTO test_tab VALUES (1);
+COMMIT;
+]);
+
+$node_subscriber->poll_query_until('postgres',
+"SELECT count(1) = 1 FROM pg_stat_subscription_workers where xact_error_count > 0;")
+  or die "didn't get updates of xact stats by error";
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
-- 
2.2.0

