From c4add94376eb6455ac9951791cd097439dee083e Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 16 Jul 2021 23:10:22 +0900
Subject: [PATCH v14 1/3] Add pg_stat_subscription_errors statistics view.

This commits adds new system view pg_stat_subscription_errors view
that show information about any errors which occur during application
of logical replication changes as well as during performing initial
table synchronization.

The subscription error entries are removed by autovacuum workers after
table synchronization compeletes in table sync worker cases and after
dropping the subscription in apply worker cases.

It also adds SQL function pg_stat_reset_subscription_error() to
reset the single subscription error.
---
 doc/src/sgml/monitoring.sgml             | 169 ++++++
 src/backend/catalog/system_functions.sql |   2 +
 src/backend/catalog/system_views.sql     |  27 +
 src/backend/postmaster/pgstat.c          | 709 +++++++++++++++++++++++
 src/backend/replication/logical/worker.c |  51 +-
 src/backend/utils/adt/pgstatfuncs.c      | 112 ++++
 src/include/catalog/pg_proc.dat          |  13 +
 src/include/pgstat.h                     | 127 ++++
 src/test/regress/expected/rules.out      |  22 +
 src/tools/pgindent/typedefs.list         |   5 +
 10 files changed, 1234 insertions(+), 3 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 2281ba120f..b0e426033e 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -346,6 +346,15 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       </entry>
      </row>
 
+     <row>
+      <entry><structname>pg_stat_subscription_errors</structname><indexterm><primary>pg_stat_subscription_errors</primary></indexterm></entry>
+      <entry>One row per error that occurred on subscription, showing information about
+       each subscription error.
+       See <link linkend="monitoring-pg-stat-subscription-errors">
+       <structname>pg_stat_subscription_errors</structname></link> for details.
+      </entry>
+     </row>
+
      <row>
       <entry><structname>pg_stat_ssl</structname><indexterm><primary>pg_stat_ssl</primary></indexterm></entry>
       <entry>One row per connection (regular and replication), showing information about
@@ -3050,6 +3059,144 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
  </sect2>
 
+ <sect2 id="monitoring-pg-stat-subscription-errors">
+  <title><structname>pg_stat_subscription_errors</structname></title>
+
+  <indexterm>
+   <primary>pg_stat_subscription_errors</primary>
+  </indexterm>
+
+  <para>
+   The <structname>pg_stat_subscription_errors</structname> view will contain one
+   row per subscription error reported by workers applying logical replication
+   changes and workers handling the initial data copy of the subscribed tables.
+  </para>
+
+  <table id="pg-stat-subscription-errors" xreflabel="pg_stat_subscription-errors">
+   <title><structname>pg_stat_subscription_errors</structname> View</title>
+   <tgroup cols="1">
+    <thead>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       Column Type
+      </para>
+      <para>
+       Description
+      </para></entry>
+     </row>
+    </thead>
+
+    <tbody>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>datname</structfield> <type>name</type>
+      </para>
+      <para>
+        Name of the database in which the subscription was created.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subid</structfield> <type>oid</type>
+      </para>
+      <para>
+       OID of the subscription.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subname</structfield> <type>name</type>
+      </para>
+      <para>
+       Name of the subscription.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>relid</structfield> <type>oid</type>
+      </para>
+      <para>
+       OID of the relation that the worker was processing when the
+       error occurred.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>command</structfield> <type>text</type>
+      </para>
+      <para>
+        Name of command being applied when the error occurred.  This
+        field is always NULL if the error is reported by the
+        <literal>tablesync</literal> worker.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xid</structfield> <type>xid</type>
+      </para>
+      <para>
+        Transaction ID of the publisher node being applied when the error
+        occurred.  This field is always NULL if the error is reported
+        by the <literal>tablesync</literal> worker.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>failure_source</structfield> <type>text</type>
+      </para>
+      <para>
+        Type of worker reporting the error: <literal>apply</literal> or
+        <literal>tablesync</literal>.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>failure_count</structfield> <type>uint8</type>
+      </para>
+      <para>
+       Number of times the error occurred in the worker.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>last_failure</structfield> <type>timestamp with time zone</type>
+      </para>
+      <para>
+       Time at which the last error occurred.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>last_failure_message</structfield> <type>text</type>
+      </para>
+      <para>
+       Last reported error message.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
+      </para>
+      <para>
+       Time at which these statistics were last reset.
+      </para></entry>
+     </row>
+    </tbody>
+   </tgroup>
+  </table>
+
+ </sect2>
+
  <sect2 id="monitoring-pg-stat-ssl-view">
   <title><structname>pg_stat_ssl</structname></title>
 
@@ -5172,6 +5319,28 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
          can be granted EXECUTE to run the function.
        </para></entry>
       </row>
+
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+          <primary>pg_stat_reset_subscription_error</primary>
+        </indexterm>
+        <function>pg_stat_reset_subscription_error</function> ( <parameter>subid</parameter> <type>oid</type>, <parameter>relid</parameter> <type>oid</type> )
+        <returnvalue>void</returnvalue>
+       </para>
+       <para>
+        Resets statistics of a single subscription error.  If
+        the argument <parameter>relid</parameter> is not <literal>NULL</literal>,
+        resets error statistics of the <literal>tablesync</literal> worker for
+        the relation with <parameter>relid</parameter>.  Otherwise, resets the
+        error statistics of the <literal>apply</literal> worker running on the
+        subscription with <parameter>subid</parameter>.
+       </para>
+       <para>
+         This function is restricted to superusers by default, but other users
+         can be granted EXECUTE to run the function.
+       </para></entry>
+      </row>
      </tbody>
     </tgroup>
    </table>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index a416e94d37..c9aa6f04d3 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -639,6 +639,8 @@ REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_function_counters(oid) FROM publ
 
 REVOKE EXECUTE ON FUNCTION pg_stat_reset_replication_slot(text) FROM public;
 
+REVOKE EXECUTE ON FUNCTION pg_stat_reset_subscription_error(oid, oid) FROM public;
+
 REVOKE EXECUTE ON FUNCTION lo_import(text) FROM public;
 
 REVOKE EXECUTE ON FUNCTION lo_import(text, oid) FROM public;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 55f6e3711d..b0cd8d2546 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1257,3 +1257,30 @@ REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
               substream, subtwophasestate, subslotname, subsynccommit, subpublications)
     ON pg_subscription TO public;
+
+CREATE VIEW pg_stat_subscription_errors AS
+    SELECT
+	d.datname,
+	sr.subid,
+	s.subname,
+	e.relid,
+	e.command,
+	e.xid,
+	e.failure_source,
+	e.failure_count,
+	e.last_failure,
+	e.last_failure_message,
+	e.stats_reset
+    FROM (SELECT
+              oid as subid,
+              NULL as relid
+          FROM pg_subscription
+          UNION ALL
+          SELECT
+              srsubid as subid,
+              srrelid as relid
+          FROM pg_subscription_rel
+          WHERE srsubstate <> 'r') sr,
+          LATERAL pg_stat_get_subscription_error(sr.subid, sr.relid) e
+          JOIN pg_subscription s ON (e.subid = s.oid)
+          JOIN pg_database d ON (s.subdbid = d.oid);
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index b7d0fbaefd..b01c6b5fcc 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -41,6 +41,8 @@
 #include "catalog/catalog.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_proc.h"
+#include "catalog/pg_subscription.h"
+#include "catalog/pg_subscription_rel.h"
 #include "common/ip.h"
 #include "executor/instrument.h"
 #include "libpq/libpq.h"
@@ -106,6 +108,7 @@
 #define PGSTAT_TAB_HASH_SIZE	512
 #define PGSTAT_FUNCTION_HASH_SIZE	512
 #define PGSTAT_REPLSLOT_HASH_SIZE	32
+#define PGSTAT_SUBSCRIPTION_HASH_SIZE	32
 
 
 /* ----------
@@ -282,6 +285,7 @@ static PgStat_GlobalStats globalStats;
 static PgStat_WalStats walStats;
 static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
 static HTAB *replSlotStatHash = NULL;
+static HTAB *subscriptionHash = NULL;
 
 /*
  * List of OIDs of databases we need to write out.  If an entry is InvalidOid,
@@ -332,6 +336,14 @@ static bool pgstat_db_requested(Oid databaseid);
 static PgStat_StatReplSlotEntry *pgstat_get_replslot_entry(NameData name, bool create_it);
 static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, TimestampTz ts);
 
+static PgStat_StatSubEntry *pgstat_get_subscription_entry(Oid subid,
+														  bool create);
+static PgStat_StatSubErrEntry *pgstat_get_subscription_error_entry(Oid subid,
+																   Oid subrelid,
+																   bool create);
+static void pgstat_reset_subscription_error_entry(PgStat_StatSubErrEntry *errent,
+												  TimestampTz ts);
+
 static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg, TimestampTz now);
 static void pgstat_send_funcstats(void);
 static void pgstat_send_slru(void);
@@ -373,6 +385,12 @@ static void pgstat_recv_connect(PgStat_MsgConnect *msg, int len);
 static void pgstat_recv_disconnect(PgStat_MsgDisconnect *msg, int len);
 static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
 static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
+static void pgstat_recv_subscription_error(PgStat_MsgSubscriptionErr *msg, int len);
+static void pgstat_recv_subscription_error_reset(PgStat_MsgSubscriptionErrReset *msg,
+												 int len);
+static void pgstat_recv_subscription_error_purge(PgStat_MsgSubscriptionErrPurge *msg,
+												 int len);
+static void pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len);
 
 /* ------------------------------------------------------------
  * Public functions called from postmaster follow
@@ -1178,6 +1196,165 @@ pgstat_vacuum_stat(void)
 		}
 	}
 
+	/*
+	 * Search for all the dead subscriptions and error entries in stats
+	 * hashtable and tell the stats collector to drop them.
+	 */
+	if (subscriptionHash)
+	{
+		PgStat_MsgSubscriptionPurge submsg;
+		PgStat_StatSubEntry *subent;
+		HTAB	   *htab;
+
+		htab = pgstat_collect_oids(SubscriptionRelationId, Anum_pg_subscription_oid);
+
+		submsg.m_nentries = 0;
+		hash_seq_init(&hstat, subscriptionHash);
+		while ((subent = (PgStat_StatSubEntry *) hash_seq_search(&hstat)) != NULL)
+		{
+			PgStat_MsgSubscriptionErrPurge errmsg;
+			PgStat_StatSubErrEntry *errent;
+			HASH_SEQ_STATUS hstat_rel;
+			List	   *not_ready_rels_list;
+			HTAB	   *not_ready_rels_htab = NULL;
+
+			CHECK_FOR_INTERRUPTS();
+
+			/* Check if the subscription is dead */
+			if (hash_search(htab, (void *) &(subent->subid), HASH_FIND, NULL) == NULL)
+			{
+				/* This subscription is dead, add the subid to the message */
+				submsg.m_subids[submsg.m_nentries++] = subent->subid;
+
+				/*
+				 * If the message is full, send it out and reinitialize to
+				 * empty
+				 */
+				if (submsg.m_nentries >= PGSTAT_NUM_SUBSCRIPTIONPURGE)
+				{
+					len = offsetof(PgStat_MsgSubscriptionPurge, m_subids[0])
+						+ submsg.m_nentries * sizeof(Oid);
+
+					pgstat_setheader(&submsg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONPURGE);
+					pgstat_send(&submsg, len);
+					submsg.m_nentries = 0;
+				}
+
+				continue;
+			}
+
+			/*
+			 * Nothing to do here if the subscription exists but has no table
+			 * sync error entries.
+			 */
+			if (subent->sync_errors == NULL)
+				continue;
+
+			/*
+			 * The subscription has table sync error entries.  We search errors of
+			 * the table sync workers who are already in sync state.  Those errors
+			 * should be removed.
+			 */
+			not_ready_rels_list = GetSubscriptionNotReadyRelations(subent->subid);
+
+			if (not_ready_rels_list != NIL)
+			{
+				HASHCTL		hash_ctl;
+				ListCell   *lc;
+
+				hash_ctl.keysize = sizeof(Oid);
+				hash_ctl.entrysize = sizeof(SubscriptionRelState);
+				not_ready_rels_htab = hash_create("not ready relations in subscription",
+												  64,
+												  &hash_ctl,
+												  HASH_ELEM | HASH_BLOBS);
+
+				/*
+				 * The number of not-ready relations can be high for example right
+				 * after creating a subscription, so we load the list of
+				 * SubscriptionRelState into the hash table for faster lookups.
+				 */
+				foreach(lc, not_ready_rels_list)
+				{
+					SubscriptionRelState *r_elem = (SubscriptionRelState *) lfirst(lc);
+					SubscriptionRelState *r_entry;
+
+					CHECK_FOR_INTERRUPTS();
+					r_entry = hash_search(not_ready_rels_htab, (void *) &(r_elem->relid),
+										  HASH_ENTER, NULL);
+					memcpy(r_entry, r_elem, sizeof(SubscriptionRelState));
+				}
+
+				list_free(not_ready_rels_list);
+			}
+
+			errmsg.m_nentries = 0;
+			errmsg.m_subid = subent->subid;
+
+			/*
+			 * Search for all table sync error entries of which relation is already
+			 * in ready state
+			 */
+			hash_seq_init(&hstat_rel, subent->sync_errors);
+			while ((errent = (PgStat_StatSubErrEntry *) hash_seq_search(&hstat_rel)) != NULL)
+			{
+				Assert(OidIsValid(errent->relid));
+
+				CHECK_FOR_INTERRUPTS();
+
+				/* Skip this table if it doesn't complete yet */
+				if (not_ready_rels_htab != NULL &&
+					hash_search(not_ready_rels_htab, (void *) &(errent->relid),
+								HASH_FIND, NULL) != NULL)
+					continue;
+
+				errmsg.m_relids[errmsg.m_nentries++] = errent->relid;
+
+				/*
+				 * If the message is full, send it out and reinitialize to
+				 * empty
+				 */
+				if (errmsg.m_nentries >= PGSTAT_NUM_SUBSCRIPTIONERRPURGE)
+				{
+					len = offsetof(PgStat_MsgSubscriptionErrPurge, m_relids[0])
+						+ errmsg.m_nentries * sizeof(Oid);
+
+					pgstat_setheader(&errmsg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONERRPURGE);
+					pgstat_send(&errmsg, len);
+					errmsg.m_nentries = 0;
+				}
+			}
+
+			/* Send the rest of dead error entries */
+			if (errmsg.m_nentries > 0)
+			{
+				len = offsetof(PgStat_MsgSubscriptionErrPurge, m_relids[0])
+					+ errmsg.m_nentries * sizeof(Oid);
+
+				pgstat_setheader(&errmsg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONERRPURGE);
+				pgstat_send(&errmsg, len);
+				errmsg.m_nentries = 0;
+			}
+
+			/* Clean up */
+			if (not_ready_rels_htab != NULL)
+				hash_destroy(not_ready_rels_htab);
+		}
+
+		/* Send the rest of dead subscriptions */
+		if (submsg.m_nentries > 0)
+		{
+			len = offsetof(PgStat_MsgSubscriptionPurge, m_subids[0])
+				+ submsg.m_nentries * sizeof(Oid);
+
+			pgstat_setheader(&submsg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONPURGE);
+			pgstat_send(&submsg, len);
+		}
+
+		/* Clean up */
+		hash_destroy(htab);
+	}
+
 	/*
 	 * Lookup our own database entry; if not found, nothing more to do.
 	 */
@@ -1544,6 +1721,24 @@ pgstat_reset_replslot_counter(const char *name)
 	pgstat_send(&msg, sizeof(msg));
 }
 
+/* ----------
+ * pgstat_reset_subscription_error() -
+ *
+ *	Tell the collector to reset the subscription error.
+ * ----------
+ */
+void
+pgstat_reset_subscription_error(Oid subid, Oid subrelid)
+{
+	PgStat_MsgSubscriptionErrReset msg;
+
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONERRRESET);
+	msg.m_subid = subid;
+	msg.m_subrelid = subrelid;
+
+	pgstat_send(&msg, sizeof(PgStat_MsgSubscriptionErrReset));
+}
+
 /* ----------
  * pgstat_report_autovac() -
  *
@@ -1869,6 +2064,35 @@ pgstat_report_replslot_drop(const char *slotname)
 	pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
 }
 
+/* ----------
+ * pgstat_report_subscription_error() -
+ *
+ *	Tell the collector about the subscription error.
+ * ----------
+ */
+void
+pgstat_report_subscription_error(Oid subid, Oid subrelid, Oid relid,
+								 LogicalRepMsgType command, TransactionId xid,
+								 const char *errmsg)
+{
+	PgStat_MsgSubscriptionErr msg;
+	int			len;
+
+	Assert(strlen(errmsg) < PGSTAT_SUBSCRIPTIONERR_MSGLEN);
+	len = offsetof(PgStat_MsgSubscriptionErr, m_errmsg[0]) + strlen(errmsg) + 1;
+
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONERR);
+	msg.m_subid = subid;
+	msg.m_subrelid = subrelid;
+	msg.m_relid = relid;
+	msg.m_command = command;
+	msg.m_xid = xid;
+	msg.m_failure_time = GetCurrentTimestamp();
+	strlcpy(msg.m_errmsg, errmsg, PGSTAT_SUBSCRIPTIONERR_MSGLEN);
+
+	pgstat_send(&msg, len);
+}
+
 /* ----------
  * pgstat_ping() -
  *
@@ -2987,6 +3211,22 @@ pgstat_fetch_replslot(NameData slotname)
 	return pgstat_get_replslot_entry(slotname, false);
 }
 
+/*
+ * ---------
+ * pgstat_fetch_subscription_error() -
+ *
+ *	Support function for the SQL-callable pgstat* functions. Returns
+ *	a pointer to the subscription error struct.
+ * ---------
+ */
+PgStat_StatSubErrEntry *
+pgstat_fetch_subscription_error(Oid subid, Oid relid)
+{
+	backend_read_statsfile();
+
+	return pgstat_get_subscription_error_entry(subid, relid, false);
+}
+
 /*
  * Shut down a single backend's statistics reporting at process exit.
  *
@@ -3568,6 +3808,24 @@ PgstatCollectorMain(int argc, char *argv[])
 					pgstat_recv_disconnect(&msg.msg_disconnect, len);
 					break;
 
+				case PGSTAT_MTYPE_SUBSCRIPTIONERR:
+					pgstat_recv_subscription_error(&msg.msg_subscriptionerr, len);
+					break;
+
+				case PGSTAT_MTYPE_SUBSCRIPTIONERRRESET:
+					pgstat_recv_subscription_error_reset(&msg.msg_subscriptionerrreset,
+														 len);
+					break;
+
+				case PGSTAT_MTYPE_SUBSCRIPTIONERRPURGE:
+					pgstat_recv_subscription_error_purge(&msg.msg_subscriptionerrpurge,
+														 len);
+					break;
+
+				case PGSTAT_MTYPE_SUBSCRIPTIONPURGE:
+					pgstat_recv_subscription_purge(&msg.msg_subscriptionpurge, len);
+					break;
+
 				default:
 					break;
 			}
@@ -3868,6 +4126,57 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
 		}
 	}
 
+	/*
+	 * Write subscription error structs
+	 */
+	if (subscriptionHash)
+	{
+		PgStat_StatSubEntry *subent;
+
+		hash_seq_init(&hstat, subscriptionHash);
+		while ((subent = (PgStat_StatSubEntry *) hash_seq_search(&hstat)) != NULL)
+		{
+			PgStat_StatSubErrEntry *errent;
+			int32		nerrors = (subent->sync_errors == NULL)
+				? 0
+				: (int32) hash_get_num_entries(subent->sync_errors);
+
+			/*
+			 * We always write at least subscription entry since it could have
+			 * apply worker error.
+			 */
+			fputc('S', fpout);
+			rc = fwrite(subent, sizeof(PgStat_StatSubEntry), 1, fpout);
+			(void) rc;			/* we'll check for error with ferror */
+
+			/* The number of errors follows */
+			rc = fwrite(&nerrors, sizeof(nerrors), 1, fpout);
+			(void) rc;			/* we'll check for error with ferror */
+
+			/* Then, the error entries follow */
+			if (nerrors > 0)
+			{
+				HASH_SEQ_STATUS relhstat;
+
+				hash_seq_init(&relhstat, subent->sync_errors);
+				while ((errent = (PgStat_StatSubErrEntry *) hash_seq_search(&relhstat)) != NULL)
+				{
+					/*
+					 * XXX we write the whole PgStat_StatSubErrEntry entry
+					 * that contains the fixed-length error message string
+					 * which is PGSTAT_SUBSCRIPTIONERR_MSGLEN in length,
+					 * making the stats file bloat.  It's okay since we assume
+					 * that the number of error entries is not high.  But if
+					 * the expectation became false we should write the string
+					 * and its length instead.
+					 */
+					rc = fwrite(errent, sizeof(PgStat_StatSubErrEntry), 1, fpout);
+					(void) rc;	/* we'll check for error with ferror */
+				}
+			}
+		}
+	}
+
 	/*
 	 * No more output to be done. Close the temp file and replace the old
 	 * pgstat.stat with it.  The ferror() check replaces testing for error
@@ -4329,6 +4638,105 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 					break;
 				}
 
+				/*
+				 * 'S'	A PgStat_StatSubEntry struct followed by the number of
+				 * errors and PgStat_StatSubErrEntry structs, describing
+				 * subscription and its errors.
+				 */
+			case 'S':
+				{
+					PgStat_StatSubEntry subbuf;
+					PgStat_StatSubEntry *subent;
+					int32		nerrors;
+
+					/* Read the subscription entry */
+					if (fread(&subbuf, 1, sizeof(PgStat_StatSubEntry), fpin) !=
+						sizeof(PgStat_StatSubEntry))
+					{
+						ereport(pgStatRunningInCollector ? LOG : WARNING,
+								(errmsg("corrupted statistics file \"%s\"",
+										statfile)));
+						goto done;
+					}
+
+					/* Create hash table if we don't have it already. */
+					if (subscriptionHash == NULL)
+					{
+						HASHCTL		hash_ctl;
+
+						hash_ctl.keysize = sizeof(Oid);
+						hash_ctl.entrysize = sizeof(PgStat_StatSubEntry);
+						hash_ctl.hcxt = pgStatLocalContext;
+						subscriptionHash = hash_create("Subscription stat entries",
+													   PGSTAT_SUBSCRIPTION_HASH_SIZE,
+													   &hash_ctl,
+													   HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+					}
+
+					/* Enter the subscription entry and initialize fields */
+					subent =
+						(PgStat_StatSubEntry *) hash_search(subscriptionHash,
+															(void *) &(subbuf.subid),
+															HASH_ENTER, NULL);
+					memcpy(&(subent->apply_error), &(subbuf.apply_error),
+						   sizeof(PgStat_StatSubErrEntry));
+					subent->sync_errors = NULL;
+
+					/*
+					 * Read the number of table sync errors in the
+					 * subscription
+					 */
+					if (fread(&nerrors, 1, sizeof(nerrors), fpin) != sizeof(nerrors))
+					{
+						ereport(pgStatRunningInCollector ? LOG : WARNING,
+								(errmsg("corrupted statistics file \"%s\"",
+										statfile)));
+						goto done;
+					}
+
+					/* Read table sync error entries */
+					for (int i = 0; i < nerrors; i++)
+					{
+						PgStat_StatSubErrEntry errbuf;
+						PgStat_StatSubErrEntry *errent;
+
+						if (fread(&errbuf, 1, sizeof(PgStat_StatSubErrEntry), fpin) !=
+							sizeof(PgStat_StatSubErrEntry))
+						{
+							ereport(pgStatRunningInCollector ? LOG : WARNING,
+									(errmsg("corrupted statistics file \"%s\"",
+											statfile)));
+							goto done;
+						}
+
+						if (subent->sync_errors == NULL)
+						{
+							HASHCTL		hash_ctl;
+
+							hash_ctl.keysize = sizeof(Oid);
+							hash_ctl.entrysize = sizeof(PgStat_StatSubErrEntry);
+							hash_ctl.hcxt = pgStatLocalContext;
+							subent->sync_errors = hash_create("table sync errors",
+															  PGSTAT_SUBSCRIPTION_HASH_SIZE,
+															  &hash_ctl,
+															  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+						}
+
+						/*
+						 * Enter the table sync error information to the
+						 * subscription hash
+						 */
+						errent =
+							(PgStat_StatSubErrEntry *) hash_search(subent->sync_errors,
+																   (void *) &(errbuf.relid),
+																   HASH_ENTER, NULL);
+
+						memcpy(errent, &errbuf, sizeof(PgStat_StatSubErrEntry));
+					}
+
+					break;
+				}
+
 			case 'E':
 				goto done;
 
@@ -4671,6 +5079,50 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
 				}
 				break;
 
+				/*
+				 * 'S'	A PgStat_StatSubEntry struct followed by the number of
+				 * errors and PgStat_StatSubErrEntry structs describing
+				 * subscription errors.
+				 */
+			case 'S':
+				{
+					PgStat_StatSubEntry subbuf;
+					PgStat_StatSubErrEntry errbuf;
+					int32		nerrors;
+
+					if (fread(&subbuf, 1, sizeof(PgStat_StatSubEntry), fpin)
+						!= sizeof(PgStat_StatSubEntry))
+					{
+						ereport(pgStatRunningInCollector ? LOG : WARNING,
+								(errmsg("corrupted statistics file \"%s\"",
+										statfile)));
+						FreeFile(fpin);
+						return false;
+					}
+
+					if (fread(&nerrors, 1, sizeof(nerrors), fpin) != sizeof(nerrors))
+					{
+						ereport(pgStatRunningInCollector ? LOG : WARNING,
+								(errmsg("corrupted statistics file \"%s\"",
+										statfile)));
+						goto done;
+					}
+
+					for (int i = 0; i < nerrors; i++)
+					{
+						if (fread(&errbuf, 1, sizeof(PgStat_StatSubErrEntry), fpin) !=
+							sizeof(PgStat_StatSubErrEntry))
+						{
+							ereport(pgStatRunningInCollector ? LOG : WARNING,
+									(errmsg("corrupted statistics file \"%s\"",
+											statfile)));
+							goto done;
+						}
+					}
+				}
+
+				break;
+
 			case 'E':
 				goto done;
 
@@ -4876,6 +5328,7 @@ pgstat_clear_snapshot(void)
 	pgStatLocalContext = NULL;
 	pgStatDBHash = NULL;
 	replSlotStatHash = NULL;
+	subscriptionHash = NULL;
 
 	/*
 	 * Historically the backend_status.c facilities lived in this file, and
@@ -5816,6 +6269,147 @@ pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len)
 	}
 }
 
+/* ----------
+ * pgstat_recv_subscription_error() -
+ *
+ *	Process a SUBSCRIPTIONERR message.
+ * ----------
+ */
+static void
+pgstat_recv_subscription_error(PgStat_MsgSubscriptionErr *msg, int len)
+{
+	PgStat_StatSubErrEntry *errent;
+
+	/* Get subscription error */
+	errent = pgstat_get_subscription_error_entry(msg->m_subid,
+												 msg->m_subrelid,
+												 true);
+
+	/*
+	 * If the error is reported by the table sync worker, OIDs in the message
+	 * and the entry must be matched. Otherwise, the reporter must be the apply
+	 * worker.
+	 */
+	Assert(errent);
+	Assert((OidIsValid(msg->m_subrelid) && msg->m_subrelid == msg->m_relid &&
+			msg->m_subrelid == errent->relid) || !OidIsValid(msg->m_subrelid));
+
+	/*
+	 * If the error is reported by the apply worker, we always have to update
+	 * the relid since the apply worker could report different relid per error.
+	 * In table sync error case, relid should be set by a hash table lookup since
+	 * it's the hash entry key. So we don't update it.
+	 */
+	if (!OidIsValid(msg->m_subrelid))
+		errent->relid = msg->m_relid;
+
+	/* update the error entry */
+	errent->command = msg->m_command;
+	errent->xid = msg->m_xid;
+	errent->failure_count++;
+	errent->last_failure = msg->m_failure_time;
+	strlcpy(errent->last_errmsg, msg->m_errmsg, PGSTAT_SUBSCRIPTIONERR_MSGLEN);
+}
+
+/* ----------
+ * pgstat_recv_subscription_error_reset() -
+ *
+ *	Process a SUBSCRIPTIONRESET message.
+ * ----------
+ */
+static void
+pgstat_recv_subscription_error_reset(PgStat_MsgSubscriptionErrReset *msg, int len)
+{
+	PgStat_StatSubErrEntry *errent;
+
+	/* Get subscription error */
+	errent = pgstat_get_subscription_error_entry(msg->m_subid,
+												 msg->m_subrelid,
+												 false);
+
+	/*
+	 * Nothing to do if the subscription error entry is not found.  This could
+	 * happen when the subscription is dropped and the message for dropping
+	 * subscription entry arrived before the message for resetting the error.
+	 */
+	if (errent == NULL)
+		return;
+
+	/* reset fields and set reset timestamp */
+	pgstat_reset_subscription_error_entry(errent, GetCurrentTimestamp());
+
+	/* If the apply error, reset also the relid */
+	if (!OidIsValid(msg->m_subrelid))
+		errent->relid = InvalidOid;
+}
+
+/* ----------
+ * pgstat_recv_subscription_purge() -
+ *
+ *	Process a SUBSCRIPTIONPURGE message.
+ * ----------
+ */
+static void
+pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len)
+{
+	if (subscriptionHash == NULL)
+		return;
+
+	for (int i = 0; i < msg->m_nentries; i++)
+	{
+		PgStat_StatSubEntry *subent;
+
+		subent = pgstat_get_subscription_entry(msg->m_subids[i], false);
+
+		/*
+		 * Nothing to do if the subscription entry is not found.  This could
+		 * happen when the subscription is dropped and the message for
+		 * dropping subscription entry arrived before the message for
+		 * reporting the error.
+		 */
+		if (subent == NULL)
+			continue;
+
+		/* Cleanup the table sync errors */
+		if (subent->sync_errors != NULL)
+			hash_destroy(subent->sync_errors);
+
+		/* Remove the subscription entry */
+		(void) hash_search(subscriptionHash, (void *) &(msg->m_subids[i]),
+						   HASH_REMOVE, NULL);
+	}
+}
+
+/* ----------
+ * pgstat_recv_subscription_error_purge() -
+ *
+ *	Process a SUBSCRIPTIONERRPURGE message.
+ * ----------
+ */
+static void
+pgstat_recv_subscription_error_purge(PgStat_MsgSubscriptionErrPurge *msg, int len)
+{
+	PgStat_StatSubEntry *subent;
+
+	subent = pgstat_get_subscription_entry(msg->m_subid, false);
+
+	/*
+	 * Nothing to do if the subscription entry is not found.  This could
+	 * happen when the subscription with msg->m_subid is removed and the
+	 * corresponding entry is also removed before receiving the error purge
+	 * message.
+	 */
+	if (subent == NULL)
+		return;
+
+	for (int i = 0; i < msg->m_nentries; i++)
+	{
+		Assert(OidIsValid(msg->m_relids[i]));
+		(void) hash_search(subent->sync_errors, (void *) &(msg->m_relids[i]),
+						   HASH_REMOVE, NULL);
+	}
+}
+
 /* ----------
  * pgstat_write_statsfile_needed() -
  *
@@ -5934,6 +6528,121 @@ pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotent, TimestampTz ts)
 	slotent->stat_reset_timestamp = ts;
 }
 
+/* ----------
+ * pgstat_get_subscription_entry
+ *
+ * Return the subscription statistics with the subscription OID. If no
+ * subscription entry exists, initialize it, if the create parameter is true.
+ * Else, return NULL.
+ * ----------
+ */
+static PgStat_StatSubEntry *
+pgstat_get_subscription_entry(Oid subid, bool create)
+{
+	PgStat_StatSubEntry *subent;
+	HASHACTION	action;
+	bool		found;
+
+	if (subscriptionHash == NULL)
+	{
+		HASHCTL		hash_ctl;
+
+		if (!create)
+			return NULL;
+
+		hash_ctl.keysize = sizeof(Oid);
+		hash_ctl.entrysize = sizeof(PgStat_StatSubEntry);
+		subscriptionHash = hash_create("Subscription stat entries",
+									   PGSTAT_SUBSCRIPTION_HASH_SIZE,
+									   &hash_ctl,
+									   HASH_ELEM | HASH_BLOBS);
+	}
+
+	action = (create ? HASH_ENTER : HASH_FIND);
+	subent = (PgStat_StatSubEntry *) hash_search(subscriptionHash,
+												 (void *) &subid,
+												 action, &found);
+
+	/* initialize fields */
+	if (create && !found)
+	{
+		MemSet(&(subent->apply_error), 0, sizeof(PgStat_StatSubErrEntry));
+		subent->sync_errors = NULL;
+	}
+
+	return subent;
+}
+
+/* ----------
+ * pgstat_get_subscription_error_entry
+ *
+ * Return the entry of subscription error entry with the subscription
+ * OID and relation OID.  If no subscription error entry exists, initialize it,
+ * if the create parameter is true.  Else, return NULL.
+ * ----------
+ */
+static PgStat_StatSubErrEntry *
+pgstat_get_subscription_error_entry(Oid subid, Oid subrelid, bool create)
+{
+	PgStat_StatSubEntry *subent;
+	PgStat_StatSubErrEntry *errent;
+	HASHACTION	action;
+	bool		found;
+
+	subent = pgstat_get_subscription_entry(subid, create);
+
+	if (subent == NULL)
+	{
+		Assert(!create);
+		return NULL;
+	}
+
+	if (!OidIsValid(subrelid))
+	{
+		/* Return the apply error worker */
+		return &(subent->apply_error);
+	}
+
+	if (subent->sync_errors == NULL)
+	{
+		HASHCTL		hash_ctl;
+
+		if (!create)
+			return NULL;
+
+		hash_ctl.keysize = sizeof(Oid);
+		hash_ctl.entrysize = sizeof(PgStat_StatSubErrEntry);
+		subent->sync_errors = hash_create("table sync errors",
+										  PGSTAT_SUBSCRIPTION_HASH_SIZE,
+										  &hash_ctl,
+										  HASH_ELEM | HASH_BLOBS);
+	}
+
+	action = (create ? HASH_ENTER : HASH_FIND);
+	errent = (PgStat_StatSubErrEntry *) hash_search(subent->sync_errors,
+													(void *) &subrelid,
+													action, &found);
+
+	/* initialize fields */
+	if (create && !found)
+		pgstat_reset_subscription_error_entry(errent, 0);
+
+	return errent;
+}
+
+/* Reset fields other than relid and set the reset timestamp */
+static void
+pgstat_reset_subscription_error_entry(PgStat_StatSubErrEntry *errent,
+									  TimestampTz ts)
+{
+	errent->command = 0;
+	errent->xid = InvalidTransactionId;
+	errent->failure_count = 0;
+	errent->last_failure = 0;
+	errent->last_errmsg[0] = '\0';
+	errent->stat_reset_timestamp = ts;
+}
+
 /*
  * pgstat_slru_index
  *
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 8d96c926b4..e91fa86b1a 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3329,6 +3329,7 @@ void
 ApplyWorkerMain(Datum main_arg)
 {
 	int			worker_slot = DatumGetInt32(main_arg);
+	MemoryContext cctx = CurrentMemoryContext;
 	MemoryContext oldctx;
 	char		originname[NAMEDATALEN];
 	XLogRecPtr	origin_startpos;
@@ -3429,8 +3430,27 @@ ApplyWorkerMain(Datum main_arg)
 	{
 		char	   *syncslotname;
 
-		/* This is table synchronization worker, call initial sync. */
-		syncslotname = LogicalRepSyncTableStart(&origin_startpos);
+		PG_TRY();
+		{
+			/* This is table synchronization worker, call initial sync. */
+			syncslotname = LogicalRepSyncTableStart(&origin_startpos);
+		}
+		PG_CATCH();
+		{
+			MemoryContext ecxt = MemoryContextSwitchTo(cctx);
+			ErrorData *errdata = CopyErrorData();
+
+			/* report the table sync error */
+			pgstat_report_subscription_error(MyLogicalRepWorker->subid,
+											 MyLogicalRepWorker->relid,
+											 MyLogicalRepWorker->relid,
+											 0,
+											 InvalidTransactionId,
+											 errdata->message);
+			MemoryContextSwitchTo(ecxt);
+			PG_RE_THROW();
+		}
+		PG_END_TRY();
 
 		/* allocate slot name in long-lived context */
 		myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
@@ -3548,7 +3568,32 @@ ApplyWorkerMain(Datum main_arg)
 	}
 
 	/* Run the main loop. */
-	LogicalRepApplyLoop(origin_startpos);
+	PG_TRY();
+	{
+		LogicalRepApplyLoop(origin_startpos);
+	}
+	PG_CATCH();
+	{
+		/* report the apply error */
+		if (apply_error_callback_arg.command != 0)
+		{
+			MemoryContext ecxt = MemoryContextSwitchTo(cctx);
+			ErrorData *errdata = CopyErrorData();
+
+			pgstat_report_subscription_error(MySubscription->oid,
+											 InvalidOid,
+											 apply_error_callback_arg.rel != NULL
+											 ? apply_error_callback_arg.rel->localreloid
+											 : InvalidOid,
+											 apply_error_callback_arg.command,
+											 apply_error_callback_arg.remote_xid,
+											 errdata->message);
+			MemoryContextSwitchTo(ecxt);
+		}
+
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
 
 	proc_exit(0);
 }
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index ff5aedc99c..51f693c22b 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -24,6 +24,7 @@
 #include "pgstat.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/postmaster.h"
+#include "replication/logicalproto.h"
 #include "replication/slot.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
@@ -2239,6 +2240,23 @@ pg_stat_reset_replication_slot(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
+/* Reset a subscription error stats */
+Datum
+pg_stat_reset_subscription_error(PG_FUNCTION_ARGS)
+{
+	Oid			subid = PG_GETARG_OID(0);
+	Oid			relid;
+
+	if (PG_ARGISNULL(1))
+		relid = InvalidOid;		/* reset apply worker error stats */
+	else
+		relid = PG_GETARG_OID(1);	/* reset table sync worker error stats */
+
+	pgstat_reset_subscription_error(subid, relid);
+
+	PG_RETURN_VOID();
+}
+
 Datum
 pg_stat_get_archiver(PG_FUNCTION_ARGS)
 {
@@ -2379,3 +2397,97 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
 }
+
+/*
+ * Get the subscription error for the given subscription (and relation).
+ */
+Datum
+pg_stat_get_subscription_error(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_GET_SUBSCRIPTION_ERROR_COLS 9
+	Oid			subid = PG_GETARG_OID(0);
+	Oid			relid;
+	TupleDesc	tupdesc;
+	Datum		values[PG_STAT_GET_SUBSCRIPTION_ERROR_COLS];
+	bool		nulls[PG_STAT_GET_SUBSCRIPTION_ERROR_COLS];
+	PgStat_StatSubErrEntry *errent;
+	int			i;
+
+	/* Initialise values and NULL flags arrays */
+	MemSet(values, 0, sizeof(values));
+	MemSet(nulls, 0, sizeof(nulls));
+
+	/* Initialise attributes information in the tuple descriptor */
+	tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_SUBSCRIPTION_ERROR_COLS);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "subid",
+					   OIDOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
+					   OIDOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "command",
+					   TEXTOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "xid",
+					   XIDOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "failure_source",
+					   TEXTOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "failure_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "last_failure",
+					   TIMESTAMPTZOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "last_failure_message",
+					   TEXTOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "stats_reset",
+					   TIMESTAMPTZOID, -1, 0);
+	BlessTupleDesc(tupdesc);
+
+	if (PG_ARGISNULL(1))
+		relid = InvalidOid;
+	else
+		relid = PG_GETARG_OID(1);
+
+	/* Get subscription errors */
+	errent = pgstat_fetch_subscription_error(subid, relid);
+
+	/* Return NULL if the subscription doesn't have any errors */
+	if (errent == NULL)
+		PG_RETURN_NULL();
+
+	i = 0;
+	values[i++] = ObjectIdGetDatum(subid);
+
+	if (OidIsValid(errent->relid))
+		values[i++] = ObjectIdGetDatum(errent->relid);
+	else
+		nulls[i++] = true;
+
+	if (errent->command == 0)
+		nulls[i++] = true;
+	else
+		values[i++] = CStringGetTextDatum(logicalrep_message_type(errent->command));
+
+	if (TransactionIdIsValid(errent->xid))
+		values[i++] = TransactionIdGetDatum(errent->xid);
+	else
+		nulls[i++] = true;
+
+	if (OidIsValid(relid))
+		values[i++] = CStringGetTextDatum("tablesync");
+	else
+		values[i++] = CStringGetTextDatum("apply");
+
+	values[i++] = Int64GetDatum(errent->failure_count);
+
+	if (errent->last_failure == 0)
+		nulls[i++] = true;
+	else
+		values[i++] = TimestampTzGetDatum(errent->last_failure);
+
+	values[i++] = CStringGetTextDatum(errent->last_errmsg);
+
+	if (errent->stat_reset_timestamp == 0)
+		nulls[i++] = true;
+	else
+		values[i++] = TimestampTzGetDatum(errent->stat_reset_timestamp);
+
+	/* Returns the record as Datum */
+	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
+}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index d068d6532e..ac02061347 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5385,6 +5385,14 @@
   proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
   proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
   prosrc => 'pg_stat_get_replication_slot' },
+{ oid => '8523', descr => 'statistics: information about subscription error',
+  proname => 'pg_stat_get_subscription_error', prorows => '1', proisstrict => 'f',
+  proretset => 't', provolatile => 's', proparallel => 'r',
+  prorettype => 'record', proargtypes => 'oid oid',
+  proallargtypes => '{oid,oid,oid,oid,text,xid,text,int8,timestamptz,text,timestamptz}',
+  proargmodes => '{i,i,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,relid,subid,relid,command,xid,failure_source,failure_count,last_failure,last_failure_message,stats_reset}',
+  prosrc => 'pg_stat_get_subscription_error' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
@@ -5772,6 +5780,11 @@
   proname => 'pg_stat_reset_replication_slot', proisstrict => 'f',
   provolatile => 'v', prorettype => 'void', proargtypes => 'text',
   prosrc => 'pg_stat_reset_replication_slot' },
+{ oid => '8524',
+  descr => 'statistics: reset collected statistics for a single subscription error',
+  proname => 'pg_stat_reset_subscription_error', proisstrict => 'f',
+  provolatile => 'v', prorettype => 'void', proargtypes => 'oid oid',
+  prosrc => 'pg_stat_reset_subscription_error' },
 
 { oid => '3163', descr => 'current trigger depth',
   proname => 'pg_trigger_depth', provolatile => 's', proparallel => 'r',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index bcd3588ea2..e9702ec150 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -14,6 +14,7 @@
 #include "datatype/timestamp.h"
 #include "portability/instr_time.h"
 #include "postmaster/pgarch.h"	/* for MAX_XFN_CHARS */
+#include "replication/logicalproto.h"
 #include "utils/backend_progress.h" /* for backward compatibility */
 #include "utils/backend_status.h"	/* for backward compatibility */
 #include "utils/hsearch.h"
@@ -66,6 +67,10 @@ typedef enum StatMsgType
 	PGSTAT_MTYPE_RESETSINGLECOUNTER,
 	PGSTAT_MTYPE_RESETSLRUCOUNTER,
 	PGSTAT_MTYPE_RESETREPLSLOTCOUNTER,
+	PGSTAT_MTYPE_SUBSCRIPTIONERR,
+	PGSTAT_MTYPE_SUBSCRIPTIONERRRESET,
+	PGSTAT_MTYPE_SUBSCRIPTIONERRPURGE,
+	PGSTAT_MTYPE_SUBSCRIPTIONPURGE,
 	PGSTAT_MTYPE_AUTOVAC_START,
 	PGSTAT_MTYPE_VACUUM,
 	PGSTAT_MTYPE_ANALYZE,
@@ -536,6 +541,81 @@ typedef struct PgStat_MsgReplSlot
 	PgStat_Counter m_total_bytes;
 } PgStat_MsgReplSlot;
 
+/* ----------
+ * PgStat_MsgSubscriptionErr	Sent by the apply worker or the table sync worker to
+ *								report the error occurred during logical replication.
+ * ----------
+ */
+#define PGSTAT_SUBSCRIPTIONERR_MSGLEN 256
+typedef struct PgStat_MsgSubscriptionErr
+{
+	PgStat_MsgHdr m_hdr;
+
+	/*
+	 * m_subid and m_subrelid are used to determine the subscription and the
+	 * reporter of this error.  m_subrelid is InvalidOid if reported by the
+	 * apply worker, otherwise by the table sync worker.  In table sync worker
+	 * case, m_subrelid must be the same as m_relid.
+	 */
+	Oid			m_subid;
+	Oid			m_subrelid;
+
+	/* Error information */
+	Oid			m_relid;
+	LogicalRepMsgType m_command;
+	TransactionId m_xid;
+	TimestampTz m_failure_time;
+	char		m_errmsg[PGSTAT_SUBSCRIPTIONERR_MSGLEN];
+} PgStat_MsgSubscriptionErr;
+
+/* ----------
+ * PgStat_MsgSubscriptionReset	Sent by the backend to reset the subscription
+ *								error fields.
+ * ----------
+ */
+typedef struct PgStat_MsgSubscriptionErrReset
+{
+	PgStat_MsgHdr m_hdr;
+
+	/*
+	 * Same as PgStat_MsgSubscriptionErr, m_subid and m_subrelid are used to
+	 * determine the subscription and the reporter of the error: the apply worker
+	 * or the table sync worker.
+	 */
+	Oid		m_subid;
+	Oid		m_subrelid;
+} PgStat_MsgSubscriptionErrReset;
+
+/* ----------
+ * PgStat_MsgSubscriptionPurge	Sent by the backend and autovacuum to tell the
+ *								collector about the dead subscriptions.
+ * ----------
+ */
+#define PGSTAT_NUM_SUBSCRIPTIONPURGE  \
+	((PGSTAT_MSG_PAYLOAD - sizeof(int)) / sizeof(Oid))
+
+typedef struct PgStat_MsgSubscriptionPurge
+{
+	PgStat_MsgHdr m_hdr;
+	int			m_nentries;
+	Oid			m_subids[PGSTAT_NUM_SUBSCRIPTIONPURGE];
+} PgStat_MsgSubscriptionPurge;
+
+/* ----------
+ * PgStat_MsgSubscriptionErrPurge	Sent by the backend and autovacuum to purge
+ *									the subscription errors.
+ * ----------
+ */
+#define PGSTAT_NUM_SUBSCRIPTIONERRPURGE  \
+	((PGSTAT_MSG_PAYLOAD - sizeof(Oid) - sizeof(int)) / sizeof(Oid))
+
+typedef struct PgStat_MsgSubscriptionErrPurge
+{
+	PgStat_MsgHdr m_hdr;
+	Oid			m_subid;
+	int			m_nentries;
+	Oid			m_relids[PGSTAT_NUM_SUBSCRIPTIONERRPURGE];
+} PgStat_MsgSubscriptionErrPurge;
 
 /* ----------
  * PgStat_MsgRecoveryConflict	Sent by the backend upon recovery conflict
@@ -714,6 +794,10 @@ typedef union PgStat_Msg
 	PgStat_MsgReplSlot msg_replslot;
 	PgStat_MsgConnect msg_connect;
 	PgStat_MsgDisconnect msg_disconnect;
+	PgStat_MsgSubscriptionErr msg_subscriptionerr;
+	PgStat_MsgSubscriptionErrReset msg_subscriptionerrreset;
+	PgStat_MsgSubscriptionErrPurge msg_subscriptionerrpurge;
+	PgStat_MsgSubscriptionPurge msg_subscriptionpurge;
 } PgStat_Msg;
 
 
@@ -929,6 +1013,44 @@ typedef struct PgStat_StatReplSlotEntry
 	TimestampTz stat_reset_timestamp;
 } PgStat_StatReplSlotEntry;
 
+/*
+ * Subscription error statistics kept in the stats collector, representing
+ * an error that occurred during application of logical replication or
+ * initial table synchronization.
+ */
+typedef struct PgStat_StatSubErrEntry
+{
+	Oid			relid;			/* hash table key */
+	LogicalRepMsgType command;
+	TransactionId xid;
+	PgStat_Counter failure_count;
+	TimestampTz last_failure;
+	char		last_errmsg[PGSTAT_SUBSCRIPTIONERR_MSGLEN];
+	TimestampTz stat_reset_timestamp;
+} PgStat_StatSubErrEntry;
+
+/*
+ * Subscription statistics kept in the stats collector.
+ */
+typedef struct PgStat_StatSubEntry
+{
+	Oid			subid;			/* hash table key */
+
+	/*
+	 * Statistics of errors that occurred during logical replication.  While
+	 * having the hash table for table sync errors we have a separate
+	 * statistics value for apply error (apply_error), because we can avoid
+	 * building a nested hash table for table sync errors in the case where
+	 * there is no table sync error, which is the common case in practice.
+	 *
+	 * Note that the lifetime of error entries of the apply worker and the
+	 * table sync worker are also different.  Both are removed altogether
+	 * after the subscription is dropped but the table sync errors are
+	 * removed also after the table synchronization is completed.
+	 */
+	PgStat_StatSubErrEntry apply_error;
+	HTAB	   *sync_errors;
+} PgStat_StatSubEntry;
 
 /*
  * Working state needed to accumulate per-function-call timing statistics.
@@ -1022,6 +1144,7 @@ extern void pgstat_reset_shared_counters(const char *);
 extern void pgstat_reset_single_counter(Oid objectid, PgStat_Single_Reset_Type type);
 extern void pgstat_reset_slru_counter(const char *);
 extern void pgstat_reset_replslot_counter(const char *name);
+extern void pgstat_reset_subscription_error(Oid subid, Oid subrelid);
 
 extern void pgstat_report_connect(Oid dboid);
 extern void pgstat_report_autovac(Oid dboid);
@@ -1038,6 +1161,9 @@ extern void pgstat_report_checksum_failure(void);
 extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat);
 extern void pgstat_report_replslot_create(const char *slotname);
 extern void pgstat_report_replslot_drop(const char *slotname);
+extern void pgstat_report_subscription_error(Oid subid, Oid subrelid, Oid relid,
+											 LogicalRepMsgType command,
+											 TransactionId xid, const char *errmsg);
 
 extern void pgstat_initialize(void);
 
@@ -1136,6 +1262,7 @@ extern PgStat_GlobalStats *pgstat_fetch_global(void);
 extern PgStat_WalStats *pgstat_fetch_stat_wal(void);
 extern PgStat_SLRUStats *pgstat_fetch_slru(void);
 extern PgStat_StatReplSlotEntry *pgstat_fetch_replslot(NameData slotname);
+extern PgStat_StatSubErrEntry *pgstat_fetch_subscription_error(Oid subid, Oid relid);
 
 extern void pgstat_count_slru_page_zeroed(int slru_idx);
 extern void pgstat_count_slru_page_hit(int slru_idx);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 2fa00a3c29..3719b8a41a 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2094,6 +2094,28 @@ pg_stat_subscription| SELECT su.oid AS subid,
     st.latest_end_time
    FROM (pg_subscription su
      LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
+pg_stat_subscription_errors| SELECT d.datname,
+    sr.subid,
+    s.subname,
+    e.relid,
+    e.command,
+    e.xid,
+    e.failure_source,
+    e.failure_count,
+    e.last_failure,
+    e.last_failure_message,
+    e.stats_reset
+   FROM ( SELECT pg_subscription.oid AS subid,
+            NULL::oid AS relid
+           FROM pg_subscription
+        UNION ALL
+         SELECT pg_subscription_rel.srsubid AS subid,
+            pg_subscription_rel.srrelid AS relid
+           FROM pg_subscription_rel
+          WHERE (pg_subscription_rel.srsubstate <> 'r'::"char")) sr,
+    ((LATERAL pg_stat_get_subscription_error(sr.subid, sr.relid) e(subid, relid, command, xid, failure_source, failure_count, last_failure, last_failure_message, stats_reset)
+     JOIN pg_subscription s ON ((e.subid = s.oid)))
+     JOIN pg_database d ON ((s.subdbid = d.oid)));
 pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid,
     pg_stat_all_indexes.indexrelid,
     pg_stat_all_indexes.schemaname,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 402a6617a9..82dec0851f 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1940,6 +1940,9 @@ PgStat_MsgResetsharedcounter
 PgStat_MsgResetsinglecounter
 PgStat_MsgResetslrucounter
 PgStat_MsgSLRU
+PgStat_MsgSubscriptionErr
+PgStat_MsgSubscriptionErrPurge
+PgStat_MsgSubscriptionPurge
 PgStat_MsgTabpurge
 PgStat_MsgTabstat
 PgStat_MsgTempFile
@@ -1951,6 +1954,8 @@ PgStat_Single_Reset_Type
 PgStat_StatDBEntry
 PgStat_StatFuncEntry
 PgStat_StatReplSlotEntry
+PgStat_StatSubEntry
+PgStat_StatSubErrEntry
 PgStat_StatTabEntry
 PgStat_SubXactStatus
 PgStat_TableCounts
-- 
2.24.3 (Apple Git-128)

