From 7ca7e66746d76b9659ef64ccba8d074c0f8e4e43 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 16 Jul 2021 23:10:22 +0900
Subject: [PATCH v13 1/3] Add pg_stat_subscription_errors statistics view.

This commits adds new system view pg_stat_subscription_errors view
that show information about any errors which occur during application
of logical replication changes as well as during performing initial
table synchronization.

The subscription error entries are removed by autovacuum workers after
table synchronization compeletes in table sync worker cases and after
dropping the subscription in apply worker cases.

It also adds SQL function pg_stat_reset_subscription_error() to
reset the single subscription error.
---
 doc/src/sgml/monitoring.sgml             | 169 ++++++
 src/backend/catalog/system_functions.sql |   2 +
 src/backend/catalog/system_views.sql     |  27 +
 src/backend/postmaster/pgstat.c          | 685 +++++++++++++++++++++++
 src/backend/replication/logical/worker.c |  51 +-
 src/backend/utils/adt/pgstatfuncs.c      | 112 ++++
 src/include/catalog/pg_proc.dat          |  13 +
 src/include/pgstat.h                     | 106 ++++
 src/test/regress/expected/rules.out      |  22 +
 src/tools/pgindent/typedefs.list         |   5 +
 10 files changed, 1189 insertions(+), 3 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 2281ba120f..3a4c98ba42 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -346,6 +346,15 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       </entry>
      </row>
 
+     <row>
+      <entry><structname>pg_stat_subscription_errors</structname><indexterm><primary>pg_stat_subscription_errors</primary></indexterm></entry>
+      <entry>One row per error that occurred on subscription, showing information about
+       each subscription error.
+       See <link linkend="monitoring-pg-stat-subscription-errors">
+       <structname>pg_stat_subscription_errors</structname></link> for details.
+      </entry>
+     </row>
+
      <row>
       <entry><structname>pg_stat_ssl</structname><indexterm><primary>pg_stat_ssl</primary></indexterm></entry>
       <entry>One row per connection (regular and replication), showing information about
@@ -3050,6 +3059,144 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
  </sect2>
 
+ <sect2 id="monitoring-pg-stat-subscription-errors">
+  <title><structname>pg_stat_subscription_errors</structname></title>
+
+  <indexterm>
+   <primary>pg_stat_subscription_errors</primary>
+  </indexterm>
+
+  <para>
+   The <structname>pg_stat_subscription_errors</structname> view will contain one
+   row per subscription error reported by workers applying logical replication
+   changes and workers handling the initial data copy of the subscribed tables.
+  </para>
+
+  <table id="pg-stat-subscription-errors" xreflabel="pg_stat_subscription-errors">
+   <title><structname>pg_stat_subscription_errors</structname> View</title>
+   <tgroup cols="1">
+    <thead>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       Column Type
+      </para>
+      <para>
+       Description
+      </para></entry>
+     </row>
+    </thead>
+
+    <tbody>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>datname</structfield> <type>name</type>
+      </para>
+      <para>
+        Name of the database in which the subscription was created.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subid</structfield> <type>oid</type>
+      </para>
+      <para>
+       OID of the subscription.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subname</structfield> <type>name</type>
+      </para>
+      <para>
+       Name of the subscription.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>relid</structfield> <type>oid</type>
+      </para>
+      <para>
+       OID of the relation that the worker was  processing when the
+       error occurred.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>command</structfield> <type>text</type>
+      </para>
+      <para>
+        Name of command being applied when the error occurred.  This
+        field is always NULL if the error is reported by
+        <literal>tablesync</literal> worker.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xid</structfield> <type>xid</type>
+      </para>
+      <para>
+        Transaction ID of the publisher node being applied when the error
+        occurred.  This field is always NULL if the error is reported
+        by <literal>tablesync</literal> worker.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>failure_source</structfield> <type>text</type>
+      </para>
+      <para>
+        Type of worker reporting the error: <literal>apply</literal> or
+        <literal>tablesync</literal>.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>failure_count</structfield> <type>uint8</type>
+      </para>
+      <para>
+       Number of times the error occurred in the worker.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>last_failure</structfield> <type>timestamp with time zone</type>
+      </para>
+      <para>
+       Time at which the last error occurred.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>last_failure_message</structfield> <type>text</type>
+      </para>
+      <para>
+       Last reported error message.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
+      </para>
+      <para>
+       Time at which these statistics were last reset
+      </para></entry>
+     </row>
+    </tbody>
+   </tgroup>
+  </table>
+
+ </sect2>
+
  <sect2 id="monitoring-pg-stat-ssl-view">
   <title><structname>pg_stat_ssl</structname></title>
 
@@ -5172,6 +5319,28 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
          can be granted EXECUTE to run the function.
        </para></entry>
       </row>
+
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+          <primary>pg_stat_reset_subscription_error</primary>
+        </indexterm>
+        <function>pg_stat_reset_subscription_error</function> ( <parameter>subid</parameter> <type>oid</type>, <parameter>relid</parameter> <type>oid</type> )
+        <returnvalue>void</returnvalue>
+       </para>
+       <para>
+        Resets statistics of a single subscription error.  If
+        the argument <parameter>relid</parameter> is not <literal>NULL</literal>,
+        resets error statistics of the <literal>tablesync</literal> worker for
+        the relation with <parameter>relid</parameter>.  Otherwise, resets the
+        error statistics of the <literal>apply</literal> worker running on the
+        subscription with <parameter>subid</parameter>.
+       </para>
+       <para>
+         This function is restricted to superusers by default, but other users
+         can be granted EXECUTE to run the function.
+       </para></entry>
+      </row>
      </tbody>
     </tgroup>
    </table>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index a416e94d37..c9aa6f04d3 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -639,6 +639,8 @@ REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_function_counters(oid) FROM publ
 
 REVOKE EXECUTE ON FUNCTION pg_stat_reset_replication_slot(text) FROM public;
 
+REVOKE EXECUTE ON FUNCTION pg_stat_reset_subscription_error(oid, oid) FROM public;
+
 REVOKE EXECUTE ON FUNCTION lo_import(text) FROM public;
 
 REVOKE EXECUTE ON FUNCTION lo_import(text, oid) FROM public;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 55f6e3711d..b0cd8d2546 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1257,3 +1257,30 @@ REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
               substream, subtwophasestate, subslotname, subsynccommit, subpublications)
     ON pg_subscription TO public;
+
+CREATE VIEW pg_stat_subscription_errors AS
+    SELECT
+	d.datname,
+	sr.subid,
+	s.subname,
+	e.relid,
+	e.command,
+	e.xid,
+	e.failure_source,
+	e.failure_count,
+	e.last_failure,
+	e.last_failure_message,
+	e.stats_reset
+    FROM (SELECT
+              oid as subid,
+              NULL as relid
+          FROM pg_subscription
+          UNION ALL
+          SELECT
+              srsubid as subid,
+              srrelid as relid
+          FROM pg_subscription_rel
+          WHERE srsubstate <> 'r') sr,
+          LATERAL pg_stat_get_subscription_error(sr.subid, sr.relid) e
+          JOIN pg_subscription s ON (e.subid = s.oid)
+          JOIN pg_database d ON (s.subdbid = d.oid);
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 3450a10129..0178186838 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -41,6 +41,8 @@
 #include "catalog/catalog.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_proc.h"
+#include "catalog/pg_subscription.h"
+#include "catalog/pg_subscription_rel.h"
 #include "common/ip.h"
 #include "executor/instrument.h"
 #include "libpq/libpq.h"
@@ -106,6 +108,7 @@
 #define PGSTAT_TAB_HASH_SIZE	512
 #define PGSTAT_FUNCTION_HASH_SIZE	512
 #define PGSTAT_REPLSLOT_HASH_SIZE	32
+#define PGSTAT_SUBSCRIPTION_HASH_SIZE	32
 
 
 /* ----------
@@ -280,6 +283,7 @@ static PgStat_GlobalStats globalStats;
 static PgStat_WalStats walStats;
 static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
 static HTAB *replSlotStatHash = NULL;
+static HTAB *subscriptionHash = NULL;
 
 /*
  * List of OIDs of databases we need to write out.  If an entry is InvalidOid,
@@ -330,6 +334,14 @@ static bool pgstat_db_requested(Oid databaseid);
 static PgStat_StatReplSlotEntry *pgstat_get_replslot_entry(NameData name, bool create_it);
 static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, TimestampTz ts);
 
+static PgStat_StatSubEntry *pgstat_get_subscription_entry(Oid subid,
+														  bool create);
+static PgStat_StatSubErrEntry *pgstat_get_subscription_error_entry(Oid subid,
+																   Oid subrelid,
+																   bool create);
+static void pgstat_reset_subscription_error_entry(PgStat_StatSubErrEntry *errent,
+												  TimestampTz ts);
+
 static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg);
 static void pgstat_send_funcstats(void);
 static void pgstat_send_slru(void);
@@ -369,6 +381,10 @@ static void pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len
 static void pgstat_recv_connstat(PgStat_MsgConn *msg, int len);
 static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
 static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
+static void pgstat_recv_subscription_error(PgStat_MsgSubscriptionErr *msg, int len);
+static void pgstat_recv_subscription_error_purge(PgStat_MsgSubscriptionErrPurge *msg,
+												 int len);
+static void pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len);
 
 /* ------------------------------------------------------------
  * Public functions called from postmaster follow
@@ -1147,6 +1163,164 @@ pgstat_vacuum_stat(void)
 		}
 	}
 
+	/*
+	 * Search for all the dead subscriptions and error entries in stats
+	 * hashtable and tell the stats collector to drop them.
+	 */
+	if (subscriptionHash)
+	{
+		PgStat_MsgSubscriptionPurge submsg;
+		PgStat_StatSubEntry *subent;
+		HTAB	   *htab;
+
+		htab = pgstat_collect_oids(SubscriptionRelationId, Anum_pg_subscription_oid);
+
+		submsg.m_nentries = 0;
+		hash_seq_init(&hstat, subscriptionHash);
+		while ((subent = (PgStat_StatSubEntry *) hash_seq_search(&hstat)) != NULL)
+		{
+			PgStat_MsgSubscriptionErrPurge errmsg;
+			PgStat_StatSubErrEntry *errent;
+			HASH_SEQ_STATUS hstat_rel;
+			List	   *not_ready_rels_list;
+			HTAB	   *not_ready_rels_htab;
+			ListCell   *lc;
+			HASHCTL		hash_ctl;
+
+			CHECK_FOR_INTERRUPTS();
+
+			/* Check if the subscription is still live */
+			if (hash_search(htab, (void *) &(subent->subid), HASH_FIND, NULL) == NULL)
+			{
+				/* This subscription is dead, add subid to the message */
+				submsg.m_subids[submsg.m_nentries++] = subent->subid;
+
+				/*
+				 * If the message is full, send it out and reinitialize to
+				 * empty
+				 */
+				if (submsg.m_nentries >= PGSTAT_NUM_SUBSCRIPTIONPURGE)
+				{
+					len = offsetof(PgStat_MsgSubscriptionPurge, m_subids[0])
+						+ submsg.m_nentries * sizeof(Oid);
+
+					pgstat_setheader(&submsg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONPURGE);
+					pgstat_send(&submsg, len);
+					submsg.m_nentries = 0;
+				}
+
+				continue;
+			}
+
+			/*
+			 * Nothing to do here if the subscription exists but has no table
+			 * sync error entries.
+			 */
+			if (subent->sync_errors == NULL)
+				continue;
+
+			/*
+			 * The subscription has table sync error entries.  We search errors of
+			 * the table sync workers who are already in sync state.  Those errors
+			 * should be removed.
+			 *
+			 * Note that the lifetime of error entries of the apply worker and
+			 * the table sync worker are different.  The former lives until
+			 * the subscription is dropped whereas the latter lives until the table
+			 * synchronization is completed.
+			 */
+			not_ready_rels_list = GetSubscriptionNotReadyRelations(subent->subid);
+
+			hash_ctl.keysize = sizeof(Oid);
+			hash_ctl.entrysize = sizeof(SubscriptionRelState);
+			not_ready_rels_htab = hash_create("not ready relations in subscription",
+											  64,
+											  &hash_ctl,
+											  HASH_ELEM | HASH_BLOBS);
+
+			/*
+			 * The number of not-ready relations can be high for example right
+			 * after creating a subscription, so we load the list of
+			 * SubscriptionRelState into the hash table for faster lookups.
+			 */
+			foreach(lc, not_ready_rels_list)
+			{
+				SubscriptionRelState *r_elem = (SubscriptionRelState *) lfirst(lc);
+				SubscriptionRelState *r_entry;
+
+				CHECK_FOR_INTERRUPTS();
+				r_entry = hash_search(not_ready_rels_htab, (void *) &(r_elem->relid),
+									  HASH_ENTER, NULL);
+				memcpy(r_entry, r_elem, sizeof(SubscriptionRelState));
+			}
+
+			list_free(not_ready_rels_list);
+
+			errmsg.m_nentries = 0;
+			errmsg.m_subid = subent->subid;
+
+			/*
+			 * Search for all table sync error entries of which relation is already
+			 * ready state
+			 */
+			hash_seq_init(&hstat_rel, subent->sync_errors);
+			while ((errent = (PgStat_StatSubErrEntry *) hash_seq_search(&hstat_rel)) != NULL)
+			{
+				Assert(OidIsValid(errent->relid));
+
+				CHECK_FOR_INTERRUPTS();
+
+				/*
+				 * Add the relid to the message if the table synchronization
+				 * for this relation already completes or the table is no
+				 * longer subscribed.
+				 */
+				if (hash_search(not_ready_rels_htab, (void *) &(errent->relid),
+								HASH_FIND, NULL) == NULL)
+					errmsg.m_relids[errmsg.m_nentries++] = errent->relid;
+
+				/*
+				 * If the message is full, send it out and reinitialize to
+				 * empty
+				 */
+				if (errmsg.m_nentries >= PGSTAT_NUM_SUBSCRIPTIONERRPURGE)
+				{
+					len = offsetof(PgStat_MsgSubscriptionErrPurge, m_relids[0])
+						+ errmsg.m_nentries * sizeof(Oid);
+
+					pgstat_setheader(&errmsg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONERRPURGE);
+					pgstat_send(&errmsg, len);
+					errmsg.m_nentries = 0;
+				}
+			}
+
+			/* Send the rest of dead error entries */
+			if (errmsg.m_nentries > 0)
+			{
+				len = offsetof(PgStat_MsgSubscriptionErrPurge, m_relids[0])
+					+ errmsg.m_nentries * sizeof(Oid);
+
+				pgstat_setheader(&errmsg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONERRPURGE);
+				pgstat_send(&errmsg, len);
+				errmsg.m_nentries = 0;
+			}
+
+			hash_destroy(not_ready_rels_htab);
+		}
+
+		/* Send the rest of dead subscriptions */
+		if (submsg.m_nentries > 0)
+		{
+			len = offsetof(PgStat_MsgSubscriptionPurge, m_subids[0])
+				+ submsg.m_nentries * sizeof(Oid);
+
+			pgstat_setheader(&submsg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONPURGE);
+			pgstat_send(&submsg, len);
+		}
+
+		hash_destroy(htab);
+	}
+
 	/*
 	 * Lookup our own database entry; if not found, nothing more to do.
 	 */
@@ -1556,6 +1730,25 @@ pgstat_reset_replslot_counter(const char *name)
 	pgstat_send(&msg, sizeof(msg));
 }
 
+/* ----------
+ * pgstat_reset_subscription_error() -
+ *
+ *	Tell the collector about reset the subscription error.
+ * ----------
+ */
+void
+pgstat_reset_subscription_error(Oid subid, Oid subrelid)
+{
+	PgStat_MsgSubscriptionErr msg;
+
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONERR);
+	msg.m_subid = subid;
+	msg.m_subrelid = subrelid;
+	msg.m_reset = true;
+
+	pgstat_send(&msg, offsetof(PgStat_MsgSubscriptionErr, m_reset) + sizeof(bool));
+}
+
 /* ----------
  * pgstat_report_autovac() -
  *
@@ -1824,6 +2017,36 @@ pgstat_report_replslot_drop(const char *slotname)
 	pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
 }
 
+/* ----------
+ * pgstat_report_subscription_error() -
+ *
+ *	Tell the collector about the subscription error.
+ * ----------
+ */
+void
+pgstat_report_subscription_error(Oid subid, Oid subrelid, Oid relid,
+								 LogicalRepMsgType command, TransactionId xid,
+								 const char *errmsg)
+{
+	PgStat_MsgSubscriptionErr msg;
+	int			len;
+
+	len = offsetof(PgStat_MsgSubscriptionErr, m_errmsg[0]) + strlen(errmsg);
+	Assert(len < PGSTAT_MAX_MSG_SIZE);
+
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONERR);
+	msg.m_subid = subid;
+	msg.m_subrelid = subrelid;
+	msg.m_reset = false;
+	msg.m_relid = relid;
+	msg.m_command = command;
+	msg.m_xid = xid;
+	msg.m_failure_time = GetCurrentTimestamp();
+	strlcpy(msg.m_errmsg, errmsg, PGSTAT_SUBSCRIPTIONERR_MSGLEN);
+
+	pgstat_send(&msg, len);
+}
+
 /* ----------
  * pgstat_ping() -
  *
@@ -2892,6 +3115,22 @@ pgstat_fetch_replslot(NameData slotname)
 	return pgstat_get_replslot_entry(slotname, false);
 }
 
+/*
+ * ---------
+ * pgstat_fetch_subscription_error() -
+ *
+ *	Support function for the SQL-callable pgstat* functions. Returns
+ *	a pointer to the subscription error struct.
+ * ---------
+ */
+PgStat_StatSubErrEntry *
+pgstat_fetch_subscription_error(Oid subid, Oid relid)
+{
+	backend_read_statsfile();
+
+	return pgstat_get_subscription_error_entry(subid, relid, false);
+}
+
 /*
  * Shut down a single backend's statistics reporting at process exit.
  *
@@ -3469,6 +3708,19 @@ PgstatCollectorMain(int argc, char *argv[])
 					pgstat_recv_connstat(&msg.msg_conn, len);
 					break;
 
+				case PGSTAT_MTYPE_SUBSCRIPTIONERR:
+					pgstat_recv_subscription_error(&msg.msg_subscriptionerr, len);
+					break;
+
+				case PGSTAT_MTYPE_SUBSCRIPTIONERRPURGE:
+					pgstat_recv_subscription_error_purge(&msg.msg_subscriptionerrpurge,
+														 len);
+					break;
+
+				case PGSTAT_MTYPE_SUBSCRIPTIONPURGE:
+					pgstat_recv_subscription_purge(&msg.msg_subscriptionpurge, len);
+					break;
+
 				default:
 					break;
 			}
@@ -3769,6 +4021,57 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
 		}
 	}
 
+	/*
+	 * Write subscription error structs
+	 */
+	if (subscriptionHash)
+	{
+		PgStat_StatSubEntry *subent;
+
+		hash_seq_init(&hstat, subscriptionHash);
+		while ((subent = (PgStat_StatSubEntry *) hash_seq_search(&hstat)) != NULL)
+		{
+			PgStat_StatSubErrEntry *errent;
+			int32		nerrors = (subent->sync_errors == NULL)
+			? 0
+			: hash_get_num_entries(subent->sync_errors);
+
+			/*
+			 * We always write at least subscription entry since it could have
+			 * apply worker error.
+			 */
+			fputc('S', fpout);
+			rc = fwrite(subent, sizeof(PgStat_StatSubEntry), 1, fpout);
+			(void) rc;			/* we'll check for error with ferror */
+
+			/* The number of errors follows */
+			rc = fwrite(&nerrors, sizeof(int32), 1, fpout);
+			(void) rc;			/* we'll check for error with ferror */
+
+			/* Then, the error entries follow */
+			if (nerrors > 0)
+			{
+				HASH_SEQ_STATUS relhstat;
+
+				hash_seq_init(&relhstat, subent->sync_errors);
+				while ((errent = (PgStat_StatSubErrEntry *) hash_seq_search(&relhstat)) != NULL)
+				{
+					/*
+					 * XXX we write the whole PgStat_StatSubErrEntry entry
+					 * that contains the fixed-length error message string
+					 * which is PGSTAT_SUBSCRIPTIONERR_MSGLEN in length,
+					 * making the stats file bloat.  It's okay since we assume
+					 * that the number of error entries is not high.  But if
+					 * the expectation became false we should write the string
+					 * and its length instead.
+					 */
+					rc = fwrite(errent, sizeof(PgStat_StatSubErrEntry), 1, fpout);
+					(void) rc;	/* we'll check for error with ferror */
+				}
+			}
+		}
+	}
+
 	/*
 	 * No more output to be done. Close the temp file and replace the old
 	 * pgstat.stat with it.  The ferror() check replaces testing for error
@@ -4230,6 +4533,108 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 					break;
 				}
 
+				/*
+				 * 'S'	A PgStat_StatSubEntry struct followed by the number of
+				 * errors and PgStat_StatSubErrEntry structs, describing
+				 * subscription errors.
+				 */
+			case 'S':
+				{
+					PgStat_StatSubEntry subbuf;
+					PgStat_StatSubEntry *subent;
+					int32		nerrors;
+
+					/* Read the subscription entry */
+					if (fread(&subbuf, 1, sizeof(PgStat_StatSubEntry), fpin) !=
+						sizeof(PgStat_StatSubEntry))
+					{
+						ereport(pgStatRunningInCollector ? LOG : WARNING,
+								(errmsg("corrupted statistics file \"%s\"",
+										statfile)));
+						goto done;
+					}
+
+					/* Create hash table if we don't have it already. */
+					if (subscriptionHash == NULL)
+					{
+						HASHCTL		hash_ctl;
+
+						hash_ctl.keysize = sizeof(Oid);
+						hash_ctl.entrysize = sizeof(PgStat_StatSubEntry);
+						hash_ctl.hcxt = pgStatLocalContext;
+						subscriptionHash = hash_create("Subscription stat entries",
+													   PGSTAT_SUBSCRIPTION_HASH_SIZE,
+													   &hash_ctl,
+													   HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+					}
+
+					/*
+					 * Enter the subscription error entry and initialize
+					 * fields
+					 */
+					subent =
+						(PgStat_StatSubEntry *) hash_search(subscriptionHash,
+															(void *) &(subbuf.subid),
+															HASH_ENTER, NULL);
+					memcpy(&(subent->apply_error), &(subbuf.apply_error),
+						   sizeof(PgStat_StatSubErrEntry));
+					subent->sync_errors = NULL;
+
+					/*
+					 * Read the number of table sync errors in the
+					 * subscription
+					 */
+					if (fread(&nerrors, 1, sizeof(int32), fpin) != sizeof(int32))
+					{
+						ereport(pgStatRunningInCollector ? LOG : WARNING,
+								(errmsg("corrupted statistics file \"%s\"",
+										statfile)));
+						goto done;
+					}
+
+					/* Read table sync error entries */
+					for (int i = 0; i < nerrors; i++)
+					{
+						PgStat_StatSubErrEntry errbuf;
+						PgStat_StatSubErrEntry *errent;
+
+						if (fread(&errbuf, 1, sizeof(PgStat_StatSubErrEntry), fpin) !=
+							sizeof(PgStat_StatSubErrEntry))
+						{
+							ereport(pgStatRunningInCollector ? LOG : WARNING,
+									(errmsg("corrupted statistics file \"%s\"",
+											statfile)));
+							goto done;
+						}
+
+						if (subent->sync_errors == NULL)
+						{
+							HASHCTL		hash_ctl;
+
+							hash_ctl.keysize = sizeof(Oid);
+							hash_ctl.entrysize = sizeof(PgStat_StatSubErrEntry);
+							hash_ctl.hcxt = pgStatLocalContext;
+							subent->sync_errors = hash_create("Subscription error hash",
+															  PGSTAT_SUBSCRIPTION_HASH_SIZE,
+															  &hash_ctl,
+															  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+						}
+
+						/*
+						 * Enter the table sync error information to the
+						 * subscription hash
+						 */
+						errent =
+							(PgStat_StatSubErrEntry *) hash_search(subent->sync_errors,
+																   (void *) &(errbuf.relid),
+																   HASH_ENTER, NULL);
+
+						memcpy(errent, &errbuf, sizeof(PgStat_StatSubErrEntry));
+					}
+
+					break;
+				}
+
 			case 'E':
 				goto done;
 
@@ -4572,6 +4977,50 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
 				}
 				break;
 
+				/*
+				 * 'S'	A PgStat_StatSubEntry struct followed by the number of
+				 * errors and PgStat_StatSubErrEntry structs describing
+				 * subscription errors.
+				 */
+			case 'S':
+				{
+					PgStat_StatSubEntry subbuf;
+					PgStat_StatSubErrEntry errbuf;
+					int32		nerrors;
+
+					if (fread(&subbuf, 1, sizeof(PgStat_StatSubEntry), fpin)
+						!= sizeof(PgStat_StatSubEntry))
+					{
+						ereport(pgStatRunningInCollector ? LOG : WARNING,
+								(errmsg("corrupted statistics file \"%s\"",
+										statfile)));
+						FreeFile(fpin);
+						return false;
+					}
+
+					if (fread(&nerrors, 1, sizeof(int32), fpin) != sizeof(int32))
+					{
+						ereport(pgStatRunningInCollector ? LOG : WARNING,
+								(errmsg("corrupted statistics file \"%s\"",
+										statfile)));
+						goto done;
+					}
+
+					for (int i = 0; i < nerrors; i++)
+					{
+						if (fread(&errbuf, 1, sizeof(PgStat_StatSubErrEntry), fpin) !=
+							sizeof(PgStat_StatSubErrEntry))
+						{
+							ereport(pgStatRunningInCollector ? LOG : WARNING,
+									(errmsg("corrupted statistics file \"%s\"",
+											statfile)));
+							goto done;
+						}
+					}
+				}
+
+				break;
+
 			case 'E':
 				goto done;
 
@@ -4777,6 +5226,7 @@ pgstat_clear_snapshot(void)
 	pgStatLocalContext = NULL;
 	pgStatDBHash = NULL;
 	replSlotStatHash = NULL;
+	subscriptionHash = NULL;
 
 	/*
 	 * Historically the backend_status.c facilities lived in this file, and
@@ -5699,6 +6149,125 @@ pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len)
 	}
 }
 
+/* ----------
+ * pgstat_recv_subscription_error() -
+ *
+ *	Process a SUBSCRIPTIONERR message.
+ * ----------
+ */
+static void
+pgstat_recv_subscription_error(PgStat_MsgSubscriptionErr *msg, int len)
+{
+	PgStat_StatSubErrEntry *errent;
+	bool		create = !msg->m_reset;
+
+	/* Get subscription error */
+	errent = pgstat_get_subscription_error_entry(msg->m_subid,
+												 msg->m_subrelid,
+												 create);
+
+	if (msg->m_reset)
+	{
+		Assert(!create);
+
+		if (errent == NULL)
+			return;
+
+		/* reset fields and set reset timestamp */
+		pgstat_reset_subscription_error_entry(errent,
+											  GetCurrentTimestamp());
+	}
+	else
+	{
+		Assert(errent);
+		Assert((OidIsValid(msg->m_subrelid) && msg->m_subrelid == msg->m_relid &&
+				msg->m_subrelid == errent->relid) || !OidIsValid(msg->m_subrelid));
+
+		/*
+		 * If reported by the apply worker, we have to update the relid since
+		 * the apply worker could report different relid per error.  In table
+		 * sync error case, relid should have been set by a hash table lookup.
+		 * So we don't update the hash entry key.
+		 */
+		if (!OidIsValid(msg->m_subrelid))
+			errent->relid = msg->m_relid;
+
+		/* update the error entry */
+		errent->command = msg->m_command;
+		errent->xid = msg->m_xid;
+		errent->failure_count++;
+		errent->last_failure = msg->m_failure_time;
+		strlcpy(errent->last_errmsg, msg->m_errmsg, PGSTAT_SUBSCRIPTIONERR_MSGLEN);
+	}
+}
+
+/* ----------
+ * pgstat_recv_subscription_purge() -
+ *
+ *	Process a SUBSCRIPTIONPURGE message.
+ * ----------
+ */
+static void
+pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len)
+{
+	if (subscriptionHash == NULL)
+		return;
+
+	for (int i = 0; i < msg->m_nentries; i++)
+	{
+		PgStat_StatSubEntry *subent;
+
+		subent = pgstat_get_subscription_entry(msg->m_subids[i], false);
+
+		/*
+		 * Nothing to do if the subscription entry is not found.  This could
+		 * happen when the subscription is dropped and the message for
+		 * dropping subscription entry arrived before the message for
+		 * reporting the error.
+		 */
+		if (subent == NULL)
+			continue;
+
+		/* Cleanup the table sync errors */
+		if (subent->sync_errors != NULL)
+			hash_destroy(subent->sync_errors);
+
+		/* Remove the subscription entry */
+		(void) hash_search(subscriptionHash, (void *) &(msg->m_subids[i]),
+						   HASH_REMOVE, NULL);
+	}
+}
+
+/* ----------
+ * pgstat_recv_subscription_error_purge() -
+ *
+ *	Process a SUBSCRIPTIONERRPURGE message.
+ * ----------
+ */
+static void
+pgstat_recv_subscription_error_purge(PgStat_MsgSubscriptionErrPurge *msg, int len)
+{
+	PgStat_StatSubEntry *subent;
+
+	subent = pgstat_get_subscription_entry(msg->m_subid, false);
+
+	/*
+	 * Nothing to do if the subscription entry is not found.  This could
+	 * happen when the subscription with msg->m_subid is removed and the
+	 * corresponding entry is also removed before receiving the error purge
+	 * message.
+	 */
+	if (subent == NULL)
+		return;
+
+	for (int i = 0; i < msg->m_nentries; i++)
+	{
+		Assert(OidIsValid(msg->m_relids[i]));
+		(void) hash_search(subent->sync_errors, (void *) &(msg->m_relids[i]),
+						   HASH_REMOVE, NULL);
+	}
+}
+
 /* ----------
  * pgstat_write_statsfile_needed() -
  *
@@ -5817,6 +6386,122 @@ pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotent, TimestampTz ts)
 	slotent->stat_reset_timestamp = ts;
 }
 
+/* ----------
+ * pgstat_get_subscription_entry
+ *
+ * Return the subscription statistics with the subscription OID. Return NULL
+ * if not found and the caller didn't request to create it.
+ *
+ * 'create' tells whether to create the new subscription entry if it is not
+ * found.
+ * ----------
+ */
+static PgStat_StatSubEntry *
+pgstat_get_subscription_entry(Oid subid, bool create)
+{
+	PgStat_StatSubEntry *subent;
+	HASHACTION	action = (create ? HASH_ENTER : HASH_FIND);
+	bool		found;
+
+	if (subscriptionHash == NULL)
+	{
+		HASHCTL		hash_ctl;
+
+		if (!create)
+			return NULL;
+
+		hash_ctl.keysize = sizeof(Oid);
+		hash_ctl.entrysize = sizeof(PgStat_StatSubEntry);
+		subscriptionHash = hash_create("Subscription stat entries",
+									   PGSTAT_SUBSCRIPTION_HASH_SIZE,
+									   &hash_ctl,
+									   HASH_ELEM | HASH_BLOBS);
+	}
+
+	subent = (PgStat_StatSubEntry *) hash_search(subscriptionHash,
+												 (void *) &subid,
+												 action, &found);
+
+	/* initialize fields */
+	if (create && !found)
+	{
+		pgstat_reset_subscription_error_entry(&(subent->apply_error), 0);
+		subent->sync_errors = NULL;
+	}
+
+	return subent;
+}
+
+/* ----------
+ * pgstat_get_subscription_error_entry
+ *
+ * Return the entry of subscription error entry with the subscription
+ * OID and relation OID.  Return NULL if not found and the caller didn't
+ * request to create it.
+ *
+ * 'create' tells whether to create the new subscription relation entry if it is
+ * not found.
+ * ----------
+ */
+static PgStat_StatSubErrEntry *
+pgstat_get_subscription_error_entry(Oid subid, Oid subrelid, bool create)
+{
+	PgStat_StatSubEntry *subent;
+	PgStat_StatSubErrEntry *errent;
+	HASHACTION	action = (create ? HASH_ENTER : HASH_FIND);
+	bool		found;
+
+	subent = pgstat_get_subscription_entry(subid, create);
+
+	if (subent == NULL)
+	{
+		Assert(!create);
+		return NULL;
+	}
+
+	/* Return the apply error worker if requested */
+	if (!OidIsValid(subrelid))
+		return &(subent->apply_error);
+
+	if (subent->sync_errors == NULL)
+	{
+		HASHCTL		hash_ctl;
+
+		if (!create)
+			return NULL;
+
+		hash_ctl.keysize = sizeof(Oid);
+		hash_ctl.entrysize = sizeof(PgStat_StatSubErrEntry);
+		subent->sync_errors = hash_create("Subscription error hash",
+										  PGSTAT_SUBSCRIPTION_HASH_SIZE,
+										  &hash_ctl,
+										  HASH_ELEM | HASH_BLOBS);
+	}
+
+	errent = (PgStat_StatSubErrEntry *) hash_search(subent->sync_errors,
+													(void *) &subrelid,
+													action, &found);
+
+	/* initialize fields */
+	if (create && !found)
+		pgstat_reset_subscription_error_entry(errent, 0);
+
+	return errent;
+}
+
+/* Reset fields other than relid */
+static void
+pgstat_reset_subscription_error_entry(PgStat_StatSubErrEntry *errent,
+									  TimestampTz ts)
+{
+	errent->command = 0;
+	errent->xid = InvalidTransactionId;
+	errent->failure_count = 0;
+	errent->last_failure = 0;
+	errent->last_errmsg[0] = '\0';
+	errent->stat_reset_timestamp = ts;
+}
+
 /*
  * pgstat_slru_index
  *
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 8d96c926b4..e91fa86b1a 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3329,6 +3329,7 @@ void
 ApplyWorkerMain(Datum main_arg)
 {
 	int			worker_slot = DatumGetInt32(main_arg);
+	MemoryContext cctx = CurrentMemoryContext;
 	MemoryContext oldctx;
 	char		originname[NAMEDATALEN];
 	XLogRecPtr	origin_startpos;
@@ -3429,8 +3430,27 @@ ApplyWorkerMain(Datum main_arg)
 	{
 		char	   *syncslotname;
 
-		/* This is table synchronization worker, call initial sync. */
-		syncslotname = LogicalRepSyncTableStart(&origin_startpos);
+		PG_TRY();
+		{
+			/* This is table synchronization worker, call initial sync. */
+			syncslotname = LogicalRepSyncTableStart(&origin_startpos);
+		}
+		PG_CATCH();
+		{
+			MemoryContext ecxt = MemoryContextSwitchTo(cctx);
+			ErrorData *errdata = CopyErrorData();
+
+			/* report the table sync error */
+			pgstat_report_subscription_error(MyLogicalRepWorker->subid,
+											 MyLogicalRepWorker->relid,
+											 MyLogicalRepWorker->relid,
+											 0,
+											 InvalidTransactionId,
+											 errdata->message);
+			MemoryContextSwitchTo(ecxt);
+			PG_RE_THROW();
+		}
+		PG_END_TRY();
 
 		/* allocate slot name in long-lived context */
 		myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
@@ -3548,7 +3568,32 @@ ApplyWorkerMain(Datum main_arg)
 	}
 
 	/* Run the main loop. */
-	LogicalRepApplyLoop(origin_startpos);
+	PG_TRY();
+	{
+		LogicalRepApplyLoop(origin_startpos);
+	}
+	PG_CATCH();
+	{
+		/* report the apply error */
+		if (apply_error_callback_arg.command != 0)
+		{
+			MemoryContext ecxt = MemoryContextSwitchTo(cctx);
+			ErrorData *errdata = CopyErrorData();
+
+			pgstat_report_subscription_error(MySubscription->oid,
+											 InvalidOid,
+											 apply_error_callback_arg.rel != NULL
+											 ? apply_error_callback_arg.rel->localreloid
+											 : InvalidOid,
+											 apply_error_callback_arg.command,
+											 apply_error_callback_arg.remote_xid,
+											 errdata->message);
+			MemoryContextSwitchTo(ecxt);
+		}
+
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
 
 	proc_exit(0);
 }
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index ff5aedc99c..51f693c22b 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -24,6 +24,7 @@
 #include "pgstat.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/postmaster.h"
+#include "replication/logicalproto.h"
 #include "replication/slot.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
@@ -2239,6 +2240,23 @@ pg_stat_reset_replication_slot(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
+/* Reset a subscription error stats */
+Datum
+pg_stat_reset_subscription_error(PG_FUNCTION_ARGS)
+{
+	Oid			subid = PG_GETARG_OID(0);
+	Oid			relid;
+
+	if (PG_ARGISNULL(1))
+		relid = InvalidOid;		/* reset apply worker error stats */
+	else
+		relid = PG_GETARG_OID(1);	/* reset table sync worker error stats */
+
+	pgstat_reset_subscription_error(subid, relid);
+
+	PG_RETURN_VOID();
+}
+
 Datum
 pg_stat_get_archiver(PG_FUNCTION_ARGS)
 {
@@ -2379,3 +2397,97 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
 }
+
+/*
+ * Get the subscription error for the given subscription (and relation).
+ */
+Datum
+pg_stat_get_subscription_error(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_GET_SUBSCRIPTION_ERROR_COLS 9
+	Oid			subid = PG_GETARG_OID(0);
+	Oid			relid;
+	TupleDesc	tupdesc;
+	Datum		values[PG_STAT_GET_SUBSCRIPTION_ERROR_COLS];
+	bool		nulls[PG_STAT_GET_SUBSCRIPTION_ERROR_COLS];
+	PgStat_StatSubErrEntry *errent;
+	int			i;
+
+	/* Initialise values and NULL flags arrays */
+	MemSet(values, 0, sizeof(values));
+	MemSet(nulls, 0, sizeof(nulls));
+
+	/* Initialise attributes information in the tuple descriptor */
+	tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_SUBSCRIPTION_ERROR_COLS);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "subid",
+					   OIDOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
+					   OIDOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "command",
+					   TEXTOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "xid",
+					   XIDOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "failure_source",
+					   TEXTOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "failure_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "last_failure",
+					   TIMESTAMPTZOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "last_failure_message",
+					   TEXTOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "stats_reset",
+					   TIMESTAMPTZOID, -1, 0);
+	BlessTupleDesc(tupdesc);
+
+	if (PG_ARGISNULL(1))
+		relid = InvalidOid;
+	else
+		relid = PG_GETARG_OID(1);
+
+	/* Get subscription errors */
+	errent = pgstat_fetch_subscription_error(subid, relid);
+
+	/* Return NULL if the subscription doesn't have any errors */
+	if (errent == NULL)
+		PG_RETURN_NULL();
+
+	i = 0;
+	values[i++] = ObjectIdGetDatum(subid);
+
+	if (OidIsValid(errent->relid))
+		values[i++] = ObjectIdGetDatum(errent->relid);
+	else
+		nulls[i++] = true;
+
+	if (errent->command == 0)
+		nulls[i++] = true;
+	else
+		values[i++] = CStringGetTextDatum(logicalrep_message_type(errent->command));
+
+	if (TransactionIdIsValid(errent->xid))
+		values[i++] = TransactionIdGetDatum(errent->xid);
+	else
+		nulls[i++] = true;
+
+	if (OidIsValid(relid))
+		values[i++] = CStringGetTextDatum("tablesync");
+	else
+		values[i++] = CStringGetTextDatum("apply");
+
+	values[i++] = Int64GetDatum(errent->failure_count);
+
+	if (errent->last_failure == 0)
+		nulls[i++] = true;
+	else
+		values[i++] = TimestampTzGetDatum(errent->last_failure);
+
+	values[i++] = CStringGetTextDatum(errent->last_errmsg);
+
+	if (errent->stat_reset_timestamp == 0)
+		nulls[i++] = true;
+	else
+		values[i++] = TimestampTzGetDatum(errent->stat_reset_timestamp);
+
+	/* Returns the record as Datum */
+	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
+}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index d068d6532e..ac02061347 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5385,6 +5385,14 @@
   proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
   proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
   prosrc => 'pg_stat_get_replication_slot' },
+{ oid => '8523', descr => 'statistics: information about subscription error',
+  proname => 'pg_stat_get_subscription_error', prorows => '1', proisstrict => 'f',
+  proretset => 't', provolatile => 's', proparallel => 'r',
+  prorettype => 'record', proargtypes => 'oid oid',
+  proallargtypes => '{oid,oid,oid,oid,text,xid,text,int8,timestamptz,text,timestamptz}',
+  proargmodes => '{i,i,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,relid,subid,relid,command,xid,failure_source,failure_count,last_failure,last_failure_message,stats_reset}',
+  prosrc => 'pg_stat_get_subscription_error' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
@@ -5772,6 +5780,11 @@
   proname => 'pg_stat_reset_replication_slot', proisstrict => 'f',
   provolatile => 'v', prorettype => 'void', proargtypes => 'text',
   prosrc => 'pg_stat_reset_replication_slot' },
+{ oid => '8524',
+  descr => 'statistics: reset collected statistics for a single subscription error',
+  proname => 'pg_stat_reset_subscription_error', proisstrict => 'f',
+  provolatile => 'v', prorettype => 'void', proargtypes => 'oid oid',
+  prosrc => 'pg_stat_reset_subscription_error' },
 
 { oid => '3163', descr => 'current trigger depth',
   proname => 'pg_trigger_depth', provolatile => 's', proparallel => 'r',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 509849c7ff..6ff8720631 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -14,6 +14,7 @@
 #include "datatype/timestamp.h"
 #include "portability/instr_time.h"
 #include "postmaster/pgarch.h"	/* for MAX_XFN_CHARS */
+#include "replication/logicalproto.h"
 #include "utils/backend_progress.h" /* for backward compatibility */
 #include "utils/backend_status.h"	/* for backward compatibility */
 #include "utils/hsearch.h"
@@ -66,6 +67,9 @@ typedef enum StatMsgType
 	PGSTAT_MTYPE_RESETSINGLECOUNTER,
 	PGSTAT_MTYPE_RESETSLRUCOUNTER,
 	PGSTAT_MTYPE_RESETREPLSLOTCOUNTER,
+	PGSTAT_MTYPE_SUBSCRIPTIONERR,
+	PGSTAT_MTYPE_SUBSCRIPTIONERRPURGE,
+	PGSTAT_MTYPE_SUBSCRIPTIONPURGE,
 	PGSTAT_MTYPE_AUTOVAC_START,
 	PGSTAT_MTYPE_VACUUM,
 	PGSTAT_MTYPE_ANALYZE,
@@ -530,6 +534,67 @@ typedef struct PgStat_MsgReplSlot
 	PgStat_Counter m_total_bytes;
 } PgStat_MsgReplSlot;
 
+/* ----------
+ * PgStat_MsgSubscriptionErr	Sent by the apply worker or the table sync worker to
+ *								update/reset the error happening during logical
+ *								replication.
+ * ----------
+ */
+#define PGSTAT_SUBSCRIPTIONERR_MSGLEN 256
+typedef struct PgStat_MsgSubscriptionErr
+{
+	PgStat_MsgHdr m_hdr;
+
+	/*
+	 * m_subid and m_subrelid are used to determine the subscription and the
+	 * reporter of this error.  m_subrelid is InvalidOid if reported by the
+	 * apply worker, otherwise by the table sync worker.  In table sync worker
+	 * case, m_subrelid must be the same as m_relid.
+	 */
+	Oid			m_subid;
+	Oid			m_subrelid;
+
+	/* The reset message uses below field */
+	bool		m_reset;		/* Reset all fields and set reset_stats
+								 * timestamp */
+
+	/* The error report message uses below fields */
+	Oid			m_relid;
+	LogicalRepMsgType m_command;
+	TransactionId m_xid;
+	TimestampTz m_failure_time;
+	char		m_errmsg[PGSTAT_SUBSCRIPTIONERR_MSGLEN];
+} PgStat_MsgSubscriptionErr;
+
+/* ----------
+ * PgStat_MsgSubscriptionPurge	Sent by the autovacuum to purge the subscriptions.
+ * ----------
+ */
+#define PGSTAT_NUM_SUBSCRIPTIONPURGE  \
+	((PGSTAT_MSG_PAYLOAD - sizeof(int)) / sizeof(Oid))
+
+typedef struct PgStat_MsgSubscriptionPurge
+{
+	PgStat_MsgHdr m_hdr;
+	int			m_nentries;
+	Oid			m_subids[PGSTAT_NUM_SUBSCRIPTIONPURGE];
+} PgStat_MsgSubscriptionPurge;
+
+/* ----------
+ * PgStat_MsgSubscriptionErrPurge	Sent by the autovacuum to purge the subscription
+ *									errors.
+ * ----------
+ */
+#define PGSTAT_NUM_SUBSCRIPTIONERRPURGE  \
+	((PGSTAT_MSG_PAYLOAD - sizeof(Oid) - sizeof(int)) / sizeof(Oid))
+
+typedef struct PgStat_MsgSubscriptionErrPurge
+{
+	PgStat_MsgHdr m_hdr;
+	Oid			m_subid;
+	int			m_nentries;
+	Oid			m_relids[PGSTAT_NUM_SUBSCRIPTIONERRPURGE];
+} PgStat_MsgSubscriptionErrPurge;
 
 /* ----------
  * PgStat_MsgRecoveryConflict	Sent by the backend upon recovery conflict
@@ -701,6 +766,9 @@ typedef union PgStat_Msg
 	PgStat_MsgChecksumFailure msg_checksumfailure;
 	PgStat_MsgReplSlot msg_replslot;
 	PgStat_MsgConn msg_conn;
+	PgStat_MsgSubscriptionErr msg_subscriptionerr;
+	PgStat_MsgSubscriptionErrPurge msg_subscriptionerrpurge;
+	PgStat_MsgSubscriptionPurge msg_subscriptionpurge;
 } PgStat_Msg;
 
 
@@ -916,6 +984,39 @@ typedef struct PgStat_StatReplSlotEntry
 	TimestampTz stat_reset_timestamp;
 } PgStat_StatReplSlotEntry;
 
+/*
+ * Subscription error statistics kept in the stats collector, representing
+ * an error that occurred during application of logical replicatoin or
+ * initial table synchronization.
+ */
+typedef struct PgStat_StatSubErrEntry
+{
+	Oid			relid;			/* hash table key */
+	LogicalRepMsgType command;
+	TransactionId xid;
+	PgStat_Counter failure_count;
+	TimestampTz last_failure;
+	char		last_errmsg[PGSTAT_SUBSCRIPTIONERR_MSGLEN];
+	TimestampTz stat_reset_timestamp;
+} PgStat_StatSubErrEntry;
+
+/*
+ * Subscription statistics kept in the stats collector.
+ */
+typedef struct PgStat_StatSubEntry
+{
+	Oid			subid;			/* hash table key */
+
+	/*
+	 * Statistics of errors that occurred during logical replication.  While
+	 * having the hash table for table sync errors we have a separate
+	 * statistics value for apply error (apply_error), because we can avoid
+	 * building a nested hash table for table sync errors in the case where
+	 * there is no table sync error, where is the common case in practice.
+	 */
+	PgStat_StatSubErrEntry apply_error;
+	HTAB	   *sync_errors;
+} PgStat_StatSubEntry;
 
 /*
  * Working state needed to accumulate per-function-call timing statistics.
@@ -1009,6 +1110,7 @@ extern void pgstat_reset_shared_counters(const char *);
 extern void pgstat_reset_single_counter(Oid objectid, PgStat_Single_Reset_Type type);
 extern void pgstat_reset_slru_counter(const char *);
 extern void pgstat_reset_replslot_counter(const char *name);
+extern void pgstat_reset_subscription_error(Oid subid, Oid subrelid);
 
 extern void pgstat_report_autovac(Oid dboid);
 extern void pgstat_report_vacuum(Oid tableoid, bool shared,
@@ -1024,6 +1126,9 @@ extern void pgstat_report_checksum_failure(void);
 extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat);
 extern void pgstat_report_replslot_create(const char *slotname);
 extern void pgstat_report_replslot_drop(const char *slotname);
+extern void pgstat_report_subscription_error(Oid subid, Oid subrelid, Oid relid,
+											 LogicalRepMsgType command,
+											 TransactionId xid, const char *errmsg);
 
 extern void pgstat_initialize(void);
 
@@ -1122,6 +1227,7 @@ extern PgStat_GlobalStats *pgstat_fetch_global(void);
 extern PgStat_WalStats *pgstat_fetch_stat_wal(void);
 extern PgStat_SLRUStats *pgstat_fetch_slru(void);
 extern PgStat_StatReplSlotEntry *pgstat_fetch_replslot(NameData slotname);
+extern PgStat_StatSubErrEntry *pgstat_fetch_subscription_error(Oid subid, Oid relid);
 
 extern void pgstat_count_slru_page_zeroed(int slru_idx);
 extern void pgstat_count_slru_page_hit(int slru_idx);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 2fa00a3c29..3719b8a41a 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2094,6 +2094,28 @@ pg_stat_subscription| SELECT su.oid AS subid,
     st.latest_end_time
    FROM (pg_subscription su
      LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
+pg_stat_subscription_errors| SELECT d.datname,
+    sr.subid,
+    s.subname,
+    e.relid,
+    e.command,
+    e.xid,
+    e.failure_source,
+    e.failure_count,
+    e.last_failure,
+    e.last_failure_message,
+    e.stats_reset
+   FROM ( SELECT pg_subscription.oid AS subid,
+            NULL::oid AS relid
+           FROM pg_subscription
+        UNION ALL
+         SELECT pg_subscription_rel.srsubid AS subid,
+            pg_subscription_rel.srrelid AS relid
+           FROM pg_subscription_rel
+          WHERE (pg_subscription_rel.srsubstate <> 'r'::"char")) sr,
+    ((LATERAL pg_stat_get_subscription_error(sr.subid, sr.relid) e(subid, relid, command, xid, failure_source, failure_count, last_failure, last_failure_message, stats_reset)
+     JOIN pg_subscription s ON ((e.subid = s.oid)))
+     JOIN pg_database d ON ((s.subdbid = d.oid)));
 pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid,
     pg_stat_all_indexes.indexrelid,
     pg_stat_all_indexes.schemaname,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 423780652f..141b4a3276 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1939,6 +1939,9 @@ PgStat_MsgResetsharedcounter
 PgStat_MsgResetsinglecounter
 PgStat_MsgResetslrucounter
 PgStat_MsgSLRU
+PgStat_MsgSubscriptionErr
+PgStat_MsgSubscriptionErrPurge
+PgStat_MsgSubscriptionPurge
 PgStat_MsgTabpurge
 PgStat_MsgTabstat
 PgStat_MsgTempFile
@@ -1950,6 +1953,8 @@ PgStat_Single_Reset_Type
 PgStat_StatDBEntry
 PgStat_StatFuncEntry
 PgStat_StatReplSlotEntry
+PgStat_StatSubEntry
+PgStat_StatSubErrEntry
 PgStat_StatTabEntry
 PgStat_SubXactStatus
 PgStat_TableCounts
-- 
2.24.3 (Apple Git-128)

