From dc3316c3cb06a4e97f8c5c66c5b90868f4750590 Mon Sep 17 00:00:00 2001
From: Takamichi Osumi <osumi.takamichi@fujitsu.com>
Date: Thu, 24 Feb 2022 22:29:35 +0000
Subject: [PATCH v24] Extend pg_stat_subscription_workers to include general
 transaction statistics

Categorize transactions of logical replication subscriber
into three types (commit, abort, error) and introduce
cumulative columns of those numbers in the pg_stat_subscription_workers.

In order to avoid having a large number of entries to be created
by the table synchronization for many tables, the new stats columns
are utilized only by the apply worker.

The timing when the data of transaction statistics is sent to the
stats collector is adjusted with PGSTAT_STAT_INTERVAL to avoid overload.

Author: Takamichi Osumi
Reviewed-by: Amit Kapila, Masahiko Sawada, Hou Zhijie, Greg Nancarrow,
             Vignesh C, Ajin Cherian, Kyotaro Horiguchi, Tang Haiying
Tested-by: Wang wei
Discussion: https://www.postgresql.org/message-id/OSBPR01MB48887CA8F40C8D984A6DC00CED199%40OSBPR01MB4888.jpnprd01.prod.outlook.com
---
 doc/src/sgml/monitoring.sgml               | 47 +++++++++++++--
 src/backend/catalog/system_views.sql       |  3 +
 src/backend/postmaster/pgstat.c            | 92 ++++++++++++++++++++++++++++++
 src/backend/replication/logical/launcher.c |  2 +
 src/backend/replication/logical/worker.c   | 30 ++++++++++
 src/backend/utils/adt/pgstatfuncs.c        | 25 +++++---
 src/include/catalog/pg_proc.dat            |  6 +-
 src/include/pgstat.h                       | 27 +++++++++
 src/include/replication/worker_internal.h  |  8 +++
 src/test/regress/expected/rules.out        |  5 +-
 src/tools/pgindent/typedefs.list           |  1 +
 11 files changed, 229 insertions(+), 17 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index bf7625d..2f7af46 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -629,8 +629,8 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
 
      <row>
       <entry><structname>pg_stat_subscription_workers</structname><indexterm><primary>pg_stat_subscription_workers</primary></indexterm></entry>
-      <entry>One row per subscription worker, showing statistics about errors
-      that occurred on that subscription worker.
+      <entry>One row per subscription worker, showing statistics about transactions
+      and errors that occurred on that subscription worker.
       See <link linkend="monitoring-pg-stat-subscription-workers">
       <structname>pg_stat_subscription_workers</structname></link> for details.
       </entry>
@@ -3072,10 +3072,11 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
   <para>
    The <structname>pg_stat_subscription_workers</structname> view will contain
-   one row per subscription worker on which errors have occurred, for workers
-   applying logical replication changes and workers handling the initial data
-   copy of the subscribed tables.  The statistics entry is removed when the
-   corresponding subscription is dropped.
+   the statistics of the main apply worker or create one row per subscription
+   worker on which errors have occurred, for workers applying logical
+   replication changes and workers handling the initial data copy of the
+   subscribed tables. The statistics entry is removed when the corresponding
+   subscription is dropped.
   </para>
 
   <table id="pg-stat-subscription-workers" xreflabel="pg_stat_subscription_workers">
@@ -3123,6 +3124,40 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>commit_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions successfully applied in this subscription.
+       Both <command>COMMIT</command> and <command>COMMIT PREPARED</command>
+       increment this counter.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>abort_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions aborted in this subscription. Both
+       <command>ROLLBACK</command> of transaction streamed as in-progress
+       transaction and <command>ROLLBACK PREPARED</command> increment this
+       counter.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>error_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions that failed to be applied by the apply
+       worker in this subscription. This counter is updated after
+       confirming the error is not the same as the previous one.
+       </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>last_error_relid</structfield> <type>oid</type>
       </para>
       <para>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 3cb69b1..e083a79 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1269,6 +1269,9 @@ CREATE VIEW pg_stat_subscription_workers AS
         w.subid,
         s.subname,
         w.subrelid,
+        w.commit_count,
+        w.abort_count,
+        w.error_count,
         w.last_error_relid,
         w.last_error_command,
         w.last_error_xid,
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 0646f53..0f8f21d 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -55,6 +55,7 @@
 #include "postmaster/postmaster.h"
 #include "replication/slot.h"
 #include "replication/walsender.h"
+#include "replication/worker_internal.h"
 #include "storage/backendid.h"
 #include "storage/dsm.h"
 #include "storage/fd.h"
@@ -285,6 +286,7 @@ static PgStat_WalStats walStats;
 static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
 static HTAB *replSlotStatHash = NULL;
 
+extern LogicalRepSubscriptionStats subStats;
 /*
  * List of OIDs of databases we need to write out.  If an entry is InvalidOid,
  * it means to write only the shared-catalog stats ("DB 0"); otherwise, we
@@ -382,6 +384,7 @@ static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
 static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
 static void pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len);
 static void pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len);
+static void pgstat_recv_subworker_xact_end(PgStat_MsgSubWorkerXactEnd *msg, int len);
 
 /* ------------------------------------------------------------
  * Public functions called from postmaster follow
@@ -3484,6 +3487,60 @@ pgstat_send_subscription_purge(PgStat_MsgSubscriptionPurge *msg)
 }
 
 /* ----------
+ * pgstat_send_subworker_xact_stats() -
+ *
+ *	Send a subworker's transaction stats to the collector.
+ *	The statistics are cleared upon sending.
+ *
+ *	'force' is true only when the subscription worker process exits.
+ * ----------
+ */
+void
+pgstat_send_subworker_xact_stats(bool force)
+{
+	static TimestampTz last_report = 0;
+	PgStat_MsgSubWorkerXactEnd msg;
+
+	/*
+	 * This function can be called even if nothing at all has happened. In
+	 * this case, there's no need to go forward.
+	 */
+	if (subStats.subid == InvalidOid ||
+		(subStats.commit_count == 0 && subStats.abort_count == 0))
+		return;
+
+	if (!force)
+	{
+		TimestampTz now = GetCurrentTimestamp();
+
+		/*
+		 * Don't send a message unless it's been at least PGSTAT_STAT_INTERVAL
+		 * msec since we last sent one to avoid overloading the stats
+		 * collector.
+		 */
+		if (!TimestampDifferenceExceeds(last_report, now, PGSTAT_STAT_INTERVAL))
+			return;
+		last_report = now;
+	}
+
+	/*
+	 * Prepare and send the message.
+	 */
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERXACTEND);
+	msg.m_databaseid = MyDatabaseId;
+	msg.m_subid = subStats.subid;
+	msg.commit_count = subStats.commit_count;
+	msg.abort_count = subStats.abort_count;
+	pgstat_send(&msg, sizeof(PgStat_MsgSubWorkerXactEnd));
+
+	/*
+	 * Clear out the statistics.
+	 */
+	subStats.commit_count = 0;
+	subStats.abort_count = 0;
+}
+
+/* ----------
  * PgstatCollectorMain() -
  *
  *	Start up the statistics collector process.  This is the body of the
@@ -3746,6 +3803,10 @@ PgstatCollectorMain(int argc, char *argv[])
 					pgstat_recv_subworker_error(&msg.msg_subworkererror, len);
 					break;
 
+				case PGSTAT_MTYPE_SUBWORKERXACTEND:
+					pgstat_recv_subworker_xact_end(&msg.msg_subworkerxactend, len);
+					break;
+
 				default:
 					break;
 			}
@@ -3965,6 +4026,9 @@ pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry, Oid subid, Oid subrelid,
 	/* If not found, initialize the new one */
 	if (!found)
 	{
+		subwentry->commit_count = 0;
+		subwentry->abort_count = 0;
+		subwentry->error_count = 0;
 		subwentry->last_error_relid = InvalidOid;
 		subwentry->last_error_command = 0;
 		subwentry->last_error_xid = InvalidTransactionId;
@@ -6193,6 +6257,34 @@ pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len)
 	subwentry->last_error_time = msg->m_timestamp;
 	strlcpy(subwentry->last_error_message, msg->m_message,
 			PGSTAT_SUBWORKERERROR_MSGLEN);
+
+	/*
+	 * Only if this is a new error reported by the apply worker, increment the
+	 * counter of error.
+	 */
+	if (!OidIsValid(msg->m_subrelid))
+		subwentry->error_count++;
+}
+
+/* ----------
+ * pgstat_recv_subworker_xact_end() -
+ *
+ *	Process a SUBWORKERXACTEND message.
+ * ----------
+ */
+static void
+pgstat_recv_subworker_xact_end(PgStat_MsgSubWorkerXactEnd *msg, int len)
+{
+	PgStat_StatDBEntry *dbentry;
+	PgStat_StatSubWorkerEntry *wentry;
+
+	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
+	wentry = pgstat_get_subworker_entry(dbentry, msg->m_subid,
+										InvalidOid, true);
+	Assert(wentry);
+
+	wentry->commit_count += msg->commit_count;
+	wentry->abort_count += msg->abort_count;
 }
 
 /* ----------
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 5a68d6d..70f9a44 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -647,6 +647,8 @@ logicalrep_worker_onexit(int code, Datum arg)
 	if (LogRepWorkerWalRcvConn)
 		walrcv_disconnect(LogRepWorkerWalRcvConn);
 
+	pgstat_send_subworker_xact_stats(true);
+
 	logicalrep_worker_detach();
 
 	/* Cleanup fileset used for streaming transactions. */
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 5d9acc6..8e08a50 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -238,6 +238,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
 	.ts = 0,
 };
 
+LogicalRepSubscriptionStats subStats = {InvalidOid, 0, 0};
+
 static MemoryContext ApplyMessageContext = NULL;
 MemoryContext ApplyContext = NULL;
 
@@ -337,6 +339,25 @@ static void apply_error_callback(void *arg);
 static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts);
 static inline void reset_apply_error_context_info(void);
 
+/* ----------
+ * subscription_xact_end() -
+ *
+ *  Increment subscription stats and call the timer function.
+ * ----------
+ */
+static void
+subscription_xact_end(bool is_commit)
+{
+	Assert(OidIsValid(subStats.subid));
+
+	if (is_commit)
+		subStats.commit_count++;
+	else
+		subStats.abort_count++;
+
+	pgstat_send_subworker_xact_stats(false);
+}
+
 /*
  * Should this worker apply changes for given relation.
  *
@@ -958,6 +979,7 @@ apply_handle_commit_prepared(StringInfo s)
 	end_replication_step();
 	CommitTransactionCommand();
 	pgstat_report_stat(false);
+	subscription_xact_end(true);
 
 	store_flush_position(prepare_data.end_lsn);
 	in_remote_transaction = false;
@@ -1006,6 +1028,7 @@ apply_handle_rollback_prepared(StringInfo s)
 		FinishPreparedTransaction(gid, false);
 		end_replication_step();
 		CommitTransactionCommand();
+		subscription_xact_end(false);
 	}
 
 	pgstat_report_stat(false);
@@ -1217,6 +1240,7 @@ apply_handle_stream_abort(StringInfo s)
 	{
 		set_apply_error_context_xact(xid, 0);
 		stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+		subscription_xact_end(false);
 	}
 	else
 	{
@@ -1462,6 +1486,7 @@ apply_handle_commit_internal(LogicalRepCommitData *commit_data)
 
 		CommitTransactionCommand();
 		pgstat_report_stat(false);
+		subscription_xact_end(true);
 
 		store_flush_position(commit_data->end_lsn);
 	}
@@ -2717,6 +2742,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		if (endofstream)
 			break;
 
+		pgstat_send_subworker_xact_stats(false);
+
 		/*
 		 * Wait for more data or latch.  If we have unflushed transactions,
 		 * wake up after WalWriterDelay to see if they've been flushed yet (in
@@ -3470,6 +3497,9 @@ ApplyWorkerMain(Datum main_arg)
 
 	CommitTransactionCommand();
 
+	/* Set the subid for subscription stats */
+	subStats.subid = MyLogicalRepWorker->subid;
+
 	/* Connect to the origin and start the replication. */
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 30e8dfa..2f1aea3 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2406,7 +2406,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS	8
+#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS	11
 	Oid			subid = PG_GETARG_OID(0);
 	Oid			subrelid;
 	TupleDesc	tupdesc;
@@ -2433,17 +2433,23 @@ pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 					   OIDOID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "subrelid",
 					   OIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "last_error_relid",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "commit_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "abort_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "error_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "last_error_relid",
 					   OIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "last_error_command",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "last_error_command",
 					   TEXTOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "last_error_xid",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "last_error_xid",
 					   XIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "last_error_count",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "last_error_count",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "last_error_message",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "last_error_message",
 					   TEXTOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "last_error_time",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 11, "last_error_time",
 					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
@@ -2461,6 +2467,11 @@ pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 	else
 		nulls[i++] = true;
 
+	/* transaction stats */
+	values[i++] = Int64GetDatum(wentry->commit_count);
+	values[i++] = Int64GetDatum(wentry->abort_count);
+	values[i++] = Int64GetDatum(wentry->error_count);
+
 	/* last_error_relid */
 	if (OidIsValid(wentry->last_error_relid))
 		values[i++] = ObjectIdGetDatum(wentry->last_error_relid);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 7f1ee97..fcce132 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5381,9 +5381,9 @@
   proname => 'pg_stat_get_subscription_worker', prorows => '1', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'oid oid',
-  proallargtypes => '{oid,oid,oid,oid,oid,text,xid,int8,text,timestamptz}',
-  proargmodes => '{i,i,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subrelid,subid,subrelid,last_error_relid,last_error_command,last_error_xid,last_error_count,last_error_message,last_error_time}',
+  proallargtypes => '{oid,oid,oid,oid,int8,int8,int8,oid,text,xid,int8,text,timestamptz}',
+  proargmodes => '{i,i,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subrelid,subid,subrelid,commit_count,abort_count,error_count,last_error_relid,last_error_command,last_error_xid,last_error_count,last_error_message,last_error_time}',
   prosrc => 'pg_stat_get_subscription_worker' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index e10d202..81ef769 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -86,6 +86,7 @@ typedef enum StatMsgType
 	PGSTAT_MTYPE_DISCONNECT,
 	PGSTAT_MTYPE_SUBSCRIPTIONPURGE,
 	PGSTAT_MTYPE_SUBWORKERERROR,
+	PGSTAT_MTYPE_SUBWORKERXACTEND
 } StatMsgType;
 
 /* ----------
@@ -591,6 +592,23 @@ typedef struct PgStat_MsgSubWorkerError
 } PgStat_MsgSubWorkerError;
 
 /* ----------
+ * PgStat_MsgSubscriptionXactEnd	Sent by the apply worker to report transaction
+ *									ends.
+ * ----------
+ */
+typedef struct PgStat_MsgSubWorkerXactEnd
+{
+	PgStat_MsgHdr m_hdr;
+
+	/* determine the worker entry */
+	Oid			m_databaseid;
+	Oid			m_subid;
+
+	PgStat_Counter commit_count;
+	PgStat_Counter abort_count;
+} PgStat_MsgSubWorkerXactEnd;
+
+/* ----------
  * PgStat_MsgRecoveryConflict	Sent by the backend upon recovery conflict
  * ----------
  */
@@ -769,6 +787,7 @@ typedef union PgStat_Msg
 	PgStat_MsgDisconnect msg_disconnect;
 	PgStat_MsgSubscriptionPurge msg_subscriptionpurge;
 	PgStat_MsgSubWorkerError msg_subworkererror;
+	PgStat_MsgSubWorkerXactEnd msg_subworkerxactend;
 } PgStat_Msg;
 
 
@@ -1010,6 +1029,13 @@ typedef struct PgStat_StatSubWorkerEntry
 	PgStat_StatSubWorkerKey key;	/* hash key (must be first) */
 
 	/*
+	 * Cumulative transaction statistics of subscription worker
+	 */
+	PgStat_Counter commit_count;
+	PgStat_Counter abort_count;
+	PgStat_Counter error_count;
+
+	/*
 	 * Subscription worker error statistics representing an error that
 	 * occurred during application of changes or the initial table
 	 * synchronization.
@@ -1217,6 +1243,7 @@ extern void pgstat_send_archiver(const char *xlog, bool failed);
 extern void pgstat_send_bgwriter(void);
 extern void pgstat_send_checkpointer(void);
 extern void pgstat_send_wal(bool force);
+extern void pgstat_send_subworker_xact_stats(bool force);
 
 /* ----------
  * Support functions for the SQL-callable functions to
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 3c3f5f6..cadd134 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -68,6 +68,14 @@ typedef struct LogicalRepWorker
 	TimestampTz reply_time;
 } LogicalRepWorker;
 
+typedef struct LogicalRepSubscriptionStats
+{
+	Oid			subid;
+
+	int64		commit_count;
+	int64		abort_count;
+}			LogicalRepSubscriptionStats;
+
 /* Main memory context for apply worker. Permanent during worker lifetime. */
 extern MemoryContext ApplyContext;
 
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 1420288..f459919 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2075,6 +2075,9 @@ pg_stat_subscription| SELECT su.oid AS subid,
 pg_stat_subscription_workers| SELECT w.subid,
     s.subname,
     w.subrelid,
+    w.commit_count,
+    w.abort_count,
+    w.error_count,
     w.last_error_relid,
     w.last_error_command,
     w.last_error_xid,
@@ -2088,7 +2091,7 @@ pg_stat_subscription_workers| SELECT w.subid,
          SELECT pg_subscription_rel.srsubid AS subid,
             pg_subscription_rel.srrelid AS relid
            FROM pg_subscription_rel) sr,
-    (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, last_error_relid, last_error_command, last_error_xid, last_error_count, last_error_message, last_error_time)
+    (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, commit_count, abort_count, error_count, last_error_relid, last_error_command, last_error_xid, last_error_count, last_error_message, last_error_time)
      JOIN pg_subscription s ON ((w.subid = s.oid)));
 pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid,
     pg_stat_all_indexes.indexrelid,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index c6b302c..cf723fc 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1946,6 +1946,7 @@ PgStat_MsgResetslrucounter
 PgStat_MsgSLRU
 PgStat_MsgSubscriptionPurge
 PgStat_MsgSubWorkerError
+PgStat_MsgSubWorkerXactEnd
 PgStat_MsgTabpurge
 PgStat_MsgTabstat
 PgStat_MsgTempFile
-- 
1.8.3.1

