From 3c3ab0bd589aef7ba165f022c7adfe018fa65cec Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 16 Jul 2021 23:10:22 +0900
Subject: [PATCH v15 1/3] Add a subscription errors statistics view
 "pg_stat_subscription_errors".

This commit adds a new system view pg_stat_logical_replication_errors,
that shows information about any errors which occur during application
of logical replication changes as well as during performing initial table
synchronization.

The subscription error entries are removed by autovacuum workers after
table synchronization completes in table sync worker cases and after
dropping the subscription in apply worker cases.

It also adds an SQL function pg_stat_reset_subscription_error() to
reset a single subscription error.
---
 doc/src/sgml/monitoring.sgml                | 160 +++++
 src/backend/catalog/system_functions.sql    |   2 +
 src/backend/catalog/system_views.sql        |  25 +
 src/backend/postmaster/pgstat.c             | 609 ++++++++++++++++++++
 src/backend/replication/logical/worker.c    |  51 +-
 src/backend/utils/adt/pgstatfuncs.c         | 121 ++++
 src/include/catalog/pg_proc.dat             |  13 +
 src/include/pgstat.h                        | 121 ++++
 src/test/regress/expected/rules.out         |  20 +
 src/test/subscription/t/025_error_report.pl | 154 +++++
 src/tools/pgindent/typedefs.list            |   7 +
 11 files changed, 1280 insertions(+), 3 deletions(-)
 create mode 100644 src/test/subscription/t/025_error_report.pl

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 2cd8920645..6c57cd61d5 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -346,6 +346,15 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       </entry>
      </row>
 
+     <row>
+      <entry><structname>pg_stat_subscription_errors</structname><indexterm><primary>pg_stat_subscription_errors</primary></indexterm></entry>
+      <entry>One row per error that occurred on subscription, showing information about
+       each subscription error.
+       See <link linkend="monitoring-pg-stat-subscription-errors">
+       <structname>pg_stat_subscription_errors</structname></link> for details.
+      </entry>
+     </row>
+
      <row>
       <entry><structname>pg_stat_ssl</structname><indexterm><primary>pg_stat_ssl</primary></indexterm></entry>
       <entry>One row per connection (regular and replication), showing information about
@@ -3050,6 +3059,135 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
  </sect2>
 
+ <sect2 id="monitoring-pg-stat-subscription-errors">
+  <title><structname>pg_stat_subscription_errors</structname></title>
+
+  <indexterm>
+   <primary>pg_stat_subscription_errors</primary>
+  </indexterm>
+
+  <para>
+   The <structname>pg_stat_subscription_errors</structname> view will contain one
+   row per subscription error reported by workers applying logical replication
+   changes and workers handling the initial data copy of the subscribed tables.
+  </para>
+
+  <table id="pg-stat-subscription-errors" xreflabel="pg_stat_subscription-errors">
+   <title><structname>pg_stat_subscription_errors</structname> View</title>
+   <tgroup cols="1">
+    <thead>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       Column Type
+      </para>
+      <para>
+       Description
+      </para></entry>
+     </row>
+    </thead>
+
+    <tbody>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subid</structfield> <type>oid</type>
+      </para>
+      <para>
+       OID of the subscription
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subname</structfield> <type>name</type>
+      </para>
+      <para>
+       Name of the subscription
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subrelid</structfield> <type>oid</type>
+      </para>
+      <para>
+       OID of the relation that the worker is synchronizing; NULL for the
+       main apply worker
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>relid</structfield> <type>oid</type>
+      </para>
+      <para>
+       OID of the relation that the worker was processing when the
+       error occurred
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>command</structfield> <type>text</type>
+      </para>
+      <para>
+       Name of command being applied when the error occurred.  This field
+       is always NULL if the error was reported during the initial data
+       copy.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xid</structfield> <type>xid</type>
+      </para>
+      <para>
+       Transaction ID of the publisher node being applied when the error
+       occurred.  This field is always NULL if the error was reported
+       during the initial data copy.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>count</structfield> <type>uint8</type>
+      </para>
+      <para>
+       Number of consecutive times the error occurred
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>error_message</structfield> <type>text</type>
+      </para>
+      <para>
+       Message of the error
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>last_failed_time</structfield> <type>timestamp with time zone</type>
+      </para>
+      <para>
+       Time at which the last error occurred
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
+      </para>
+      <para>
+       Time at which these statistics were last reset
+      </para></entry>
+     </row>
+    </tbody>
+   </tgroup>
+  </table>
+
+ </sect2>
+
  <sect2 id="monitoring-pg-stat-ssl-view">
   <title><structname>pg_stat_ssl</structname></title>
 
@@ -5172,6 +5310,28 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
          can be granted EXECUTE to run the function.
        </para></entry>
       </row>
+
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+          <primary>pg_stat_reset_subscription_error</primary>
+        </indexterm>
+        <function>pg_stat_reset_subscription_error</function> ( <parameter>subid</parameter> <type>oid</type>, <parameter>relid</parameter> <type>oid</type> )
+        <returnvalue>void</returnvalue>
+       </para>
+       <para>
+        Resets statistics of a single subscription error.  If
+        the argument <parameter>relid</parameter> is not <literal>NULL</literal>,
+        resets error statistics of the <literal>tablesync</literal> worker for
+        the relation with <parameter>relid</parameter>.  Otherwise, resets the
+        error statistics of the <literal>apply</literal> worker running on the
+        subscription with <parameter>subid</parameter>.
+       </para>
+       <para>
+         This function is restricted to superusers by default, but other users
+         can be granted EXECUTE to run the function.
+       </para></entry>
+      </row>
      </tbody>
     </tgroup>
    </table>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index a416e94d37..c9aa6f04d3 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -639,6 +639,8 @@ REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_function_counters(oid) FROM publ
 
 REVOKE EXECUTE ON FUNCTION pg_stat_reset_replication_slot(text) FROM public;
 
+REVOKE EXECUTE ON FUNCTION pg_stat_reset_subscription_error(oid, oid) FROM public;
+
 REVOKE EXECUTE ON FUNCTION lo_import(text) FROM public;
 
 REVOKE EXECUTE ON FUNCTION lo_import(text, oid) FROM public;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 55f6e3711d..6e891b960e 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1257,3 +1257,28 @@ REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
               substream, subtwophasestate, subslotname, subsynccommit, subpublications)
     ON pg_subscription TO public;
+
+CREATE VIEW pg_stat_subscription_errors AS
+    SELECT
+	e.subid,
+	s.subname,
+	e.subrelid,
+	e.relid,
+	e.command,
+	e.xid,
+	e.count,
+	e.error_message,
+	e.last_failed_time,
+	e.stats_reset
+    FROM (SELECT
+              oid as subid,
+              NULL as relid
+          FROM pg_subscription
+          UNION ALL
+          SELECT
+              srsubid as subid,
+              srrelid as relid
+          FROM pg_subscription_rel
+          WHERE srsubstate <> 'r') sr,
+          LATERAL pg_stat_get_subscription_error(sr.subid, sr.relid) e
+          JOIN pg_subscription s ON (e.subid = s.oid);
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index b7d0fbaefd..7a5615c1df 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -41,6 +41,8 @@
 #include "catalog/catalog.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_proc.h"
+#include "catalog/pg_subscription.h"
+#include "catalog/pg_subscription_rel.h"
 #include "common/ip.h"
 #include "executor/instrument.h"
 #include "libpq/libpq.h"
@@ -106,6 +108,7 @@
 #define PGSTAT_TAB_HASH_SIZE	512
 #define PGSTAT_FUNCTION_HASH_SIZE	512
 #define PGSTAT_REPLSLOT_HASH_SIZE	32
+#define PGSTAT_SUBWORKER_HASH_SIZE	32
 
 
 /* ----------
@@ -282,6 +285,7 @@ static PgStat_GlobalStats globalStats;
 static PgStat_WalStats walStats;
 static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
 static HTAB *replSlotStatHash = NULL;
+static HTAB *subWorkerStatHash = NULL;
 
 /*
  * List of OIDs of databases we need to write out.  If an entry is InvalidOid,
@@ -332,6 +336,13 @@ static bool pgstat_db_requested(Oid databaseid);
 static PgStat_StatReplSlotEntry *pgstat_get_replslot_entry(NameData name, bool create_it);
 static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, TimestampTz ts);
 
+static PgStat_StatSubWorkerEntry *pgstat_get_subworker_entry(Oid subid, Oid subrelid,
+															 bool create);
+static void pgstat_reset_subworker_error(PgStat_StatSubWorkerEntry *wentry, TimestampTz ts);
+static void pgstat_report_subworker_purge(PgStat_MsgSubWorkerPurge *msg);
+static void pgstat_report_subworker_error_purge(PgStat_MsgSubWorkerErrorPurge *msg);
+static void pgstat_vacuum_subworker_stats(void);
+
 static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg, TimestampTz now);
 static void pgstat_send_funcstats(void);
 static void pgstat_send_slru(void);
@@ -356,6 +367,7 @@ static void pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, in
 static void pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len);
 static void pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len);
 static void pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, int len);
+static void pgstat_recv_resetsubworkererror(PgStat_MsgResetsubworkererror *msg, int len);
 static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len);
 static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len);
 static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);
@@ -373,6 +385,10 @@ static void pgstat_recv_connect(PgStat_MsgConnect *msg, int len);
 static void pgstat_recv_disconnect(PgStat_MsgDisconnect *msg, int len);
 static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
 static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
+static void pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len);
+static void pgstat_recv_subworker_error_purge(PgStat_MsgSubWorkerErrorPurge *msg,
+											  int len);
+static void pgstat_recv_subworker_purge(PgStat_MsgSubWorkerPurge *msg, int len);
 
 /* ------------------------------------------------------------
  * Public functions called from postmaster follow
@@ -1178,6 +1194,10 @@ pgstat_vacuum_stat(void)
 		}
 	}
 
+	/* Cleanup the dead subscription workers statistics */
+	if (subWorkerStatHash)
+		pgstat_vacuum_subworker_stats();
+
 	/*
 	 * Lookup our own database entry; if not found, nothing more to do.
 	 */
@@ -1355,6 +1375,218 @@ pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid)
 }
 
 
+/* PgStat_StatSubWorkerEntry comparator, sorting subid and subrelid */
+static int
+subworker_stats_comparator(const ListCell *a, const ListCell *b)
+{
+	PgStat_StatSubWorkerEntry *entry1 = (PgStat_StatSubWorkerEntry *) lfirst(a);
+	PgStat_StatSubWorkerEntry *entry2 = (PgStat_StatSubWorkerEntry *) lfirst(b);
+	int			ret;
+
+	ret = oid_cmp(&entry1->key.subid, &entry2->key.subid);
+	if (ret != 0)
+		return ret;
+
+	return oid_cmp(&entry1->key.subrelid, &entry2->key.subrelid);
+}
+
+/* ----------
+ * pgstat_vacuum_subworker_stat() -
+ *
+ * This is a subroutine for pgstat_vacuum_stat to tell the collector about
+ * the all the dead subscription worker statistics.
+ */
+static void
+pgstat_vacuum_subworker_stats(void)
+{
+	struct subid_dbid_mapping
+	{
+		Oid			subid;
+		Oid			dbid;
+	};
+	HTAB	   *subdbmap;
+	HASHCTL		hash_ctl;
+	HASH_SEQ_STATUS hstat;
+	Relation	rel;
+	HeapTuple	tup;
+	Snapshot	snapshot;
+	TupleDesc	desc;
+	TableScanDesc scan;
+	PgStat_MsgSubWorkerPurge wpmsg;
+	PgStat_MsgSubWorkerErrorPurge epmsg;
+	PgStat_StatSubWorkerEntry *wentry;
+	List	   *subworker_stats = NIL;
+	List	   *not_ready_rels = NIL;
+	ListCell   *lc1;
+
+	/* Create a map for mapping subscriptoin OID and database OID */
+	hash_ctl.keysize = sizeof(Oid);
+	hash_ctl.entrysize = sizeof(struct subid_dbid_mapping);
+	subdbmap = hash_create("Temporary map of subscription and database OIDs",
+						   PGSTAT_SUBWORKER_HASH_SIZE,
+						   &hash_ctl,
+						   HASH_ELEM | HASH_BLOBS);
+
+	rel = table_open(SubscriptionRelationId, AccessShareLock);
+	snapshot = RegisterSnapshot(GetLatestSnapshot());
+	scan = table_beginscan(rel, snapshot, 0, NULL);
+	desc = RelationGetDescr(rel);
+
+	/* Register entries into the hash table */
+	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
+	{
+		struct subid_dbid_mapping buf;
+		struct subid_dbid_mapping *entry;
+		bool		isnull;
+
+		CHECK_FOR_INTERRUPTS();
+
+		buf.subid = heap_getattr(tup, Anum_pg_subscription_oid, desc, &isnull);
+		Assert(!isnull);
+
+		buf.dbid = heap_getattr(tup, Anum_pg_subscription_subdbid, desc, &isnull);
+		Assert(!isnull);
+
+		entry = hash_search(subdbmap, (void *) &(buf.subid), HASH_ENTER, NULL);
+		entry->dbid = buf.dbid;
+	}
+	table_endscan(scan);
+	UnregisterSnapshot(snapshot);
+	table_close(rel, AccessShareLock);
+
+	/* Build the list of worker stats and sort it by subid and relid */
+	hash_seq_init(&hstat, subWorkerStatHash);
+	while ((wentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&hstat)) != NULL)
+		subworker_stats = lappend(subworker_stats, wentry);
+	list_sort(subworker_stats, subworker_stats_comparator);
+
+	wpmsg.m_nentries = 0;
+	epmsg.m_nentries = 0;
+	epmsg.m_subid = InvalidOid;
+
+	/*
+	 * Search for all the dead subscriptions and unnecessary table sync worker
+	 * entries in stats hashtable and tell the stats collector to drop them.
+	 */
+	foreach(lc1, subworker_stats)
+	{
+		struct subid_dbid_mapping *hentry;
+		ListCell   *lc2;
+		bool		keep_it = false;
+
+		wentry = (PgStat_StatSubWorkerEntry *) lfirst(lc1);
+
+		CHECK_FOR_INTERRUPTS();
+
+		/* Skip if we already registered this subscription to purge */
+		if (wpmsg.m_nentries > 0 &&
+			wpmsg.m_subids[wpmsg.m_nentries - 1] == wentry->key.subid)
+			continue;
+
+		/* Check if the subscription is dead */
+		if ((hentry = hash_search(subdbmap, (void *) &(wentry->key.subid),
+								  HASH_FIND, NULL)) == NULL)
+		{
+			/* This subscription is dead, add the subid to the message */
+			wpmsg.m_subids[wpmsg.m_nentries++] = wentry->key.subid;
+
+			/*
+			 * If the message is full, send it out and reinitialize to empty
+			 */
+			if (wpmsg.m_nentries >= PGSTAT_NUM_SUBWORKERPURGE)
+			{
+				pgstat_report_subworker_purge(&wpmsg);
+				wpmsg.m_nentries = 0;
+			}
+
+			continue;
+		}
+
+		/*
+		 * This subscription is live.  The next step is that we search errors
+		 * of the table sync workers who are already in sync state. These
+		 * errors should be removed.
+		 */
+
+		/* We remove only table sync errors in the current database */
+		if (hentry->dbid != MyDatabaseId)
+			continue;
+
+		/* Skip if it's an apply worker error */
+		if (!OidIsValid(wentry->key.subrelid))
+			continue;
+
+		if (epmsg.m_subid != wentry->key.subid)
+		{
+			/*
+			 * Send the purge message for previously collected table sync
+			 * errors, if there is.
+			 */
+			if (epmsg.m_nentries > 0)
+			{
+				pgstat_report_subworker_error_purge(&epmsg);
+				epmsg.m_nentries = 0;
+			}
+
+			/* Clean up if necessary */
+			if (not_ready_rels != NIL)
+				list_free_deep(not_ready_rels);
+
+			/* Refresh the not-ready-relations of this subscription */
+			not_ready_rels = GetSubscriptionNotReadyRelations(wentry->key.subid);
+
+			/* Prepare the error purge message for the subscription */
+			epmsg.m_subid = wentry->key.subid;
+		}
+
+		/*
+		 * Check if the table is still being synchronized or no longer belongs
+		 * to the subscription.
+		 */
+		foreach(lc2, not_ready_rels)
+		{
+			SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc2);
+
+			if (relstate->relid == wentry->key.subrelid)
+			{
+				/* This table is still being synchronized, so keep it */
+				keep_it = true;
+				break;
+			}
+		}
+
+		if (keep_it)
+			continue;
+
+		/* Add the table to the error purge message */
+		epmsg.m_relids[epmsg.m_nentries++] = wentry->key.subrelid;
+
+		/*
+		 * If the error purge message is full, send it out and reinitialize to
+		 * empty
+		 */
+		if (epmsg.m_nentries >= PGSTAT_NUM_SUBWORKERERRORPURGE)
+		{
+			pgstat_report_subworker_error_purge(&epmsg);
+			epmsg.m_nentries = 0;
+		}
+	}
+
+	/* Send the rest of dead subscriptions */
+	if (wpmsg.m_nentries > 0)
+		pgstat_report_subworker_purge(&wpmsg);
+
+	/* Send the rest of dead error entries */
+	if (epmsg.m_nentries > 0)
+		pgstat_report_subworker_error_purge(&epmsg);
+
+	/* Clean up */
+	if (not_ready_rels != NIL)
+		list_free_deep(not_ready_rels);
+
+	hash_destroy(subdbmap);
+}
+
 /* ----------
  * pgstat_drop_database() -
  *
@@ -1544,6 +1776,24 @@ pgstat_reset_replslot_counter(const char *name)
 	pgstat_send(&msg, sizeof(msg));
 }
 
+/* ----------
+ * pgstat_reset_subscription_error_stats() -
+ *
+ *	Tell the collector to reset the subscription worker error.
+ * ----------
+ */
+void
+pgstat_reset_subworker_error_stats(Oid subid, Oid subrelid)
+{
+	PgStat_MsgResetsubworkererror msg;
+
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSUBWORKERERROR);
+	msg.m_subid = subid;
+	msg.m_subrelid = subrelid;
+
+	pgstat_send(&msg, sizeof(PgStat_MsgResetsubworkererror));
+}
+
 /* ----------
  * pgstat_report_autovac() -
  *
@@ -1804,6 +2054,47 @@ pgstat_should_report_connstat(void)
 	return MyBackendType == B_BACKEND;
 }
 
+/* --------
+ * pgstat_report_subworker_purge() -
+ *
+ *	Tell the collector about dead subscriptions.
+ * --------
+ */
+static void
+pgstat_report_subworker_purge(PgStat_MsgSubWorkerPurge *msg)
+{
+	int			len;
+
+	Assert(msg->m_nentries > 0);
+
+	len = offsetof(PgStat_MsgSubWorkerPurge, m_subids[0])
+		+ msg->m_nentries * sizeof(Oid);
+
+	pgstat_setheader(&msg->m_hdr, PGSTAT_MTYPE_SUBWORKERPURGE);
+	pgstat_send(msg, len);
+}
+
+/* --------
+ * pgstat_report_subworker_error_purge() -
+ *
+ *	Tell the collector to remove table sync errors.
+ * --------
+ */
+static void
+pgstat_report_subworker_error_purge(PgStat_MsgSubWorkerErrorPurge *msg)
+{
+	int			len;
+
+	Assert(OidIsValid(msg->m_subid));
+	Assert(msg->m_nentries > 0);
+
+	len = offsetof(PgStat_MsgSubWorkerErrorPurge, m_relids[0])
+		+ msg->m_nentries * sizeof(Oid);
+
+	pgstat_setheader(&msg->m_hdr, PGSTAT_MTYPE_SUBWORKERERRORPURGE);
+	pgstat_send(msg, len);
+}
+
 /* ----------
  * pgstat_report_replslot() -
  *
@@ -1869,6 +2160,35 @@ pgstat_report_replslot_drop(const char *slotname)
 	pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
 }
 
+/* ----------
+ * pgstat_report_subworker_error() -
+ *
+ *	Tell the collector about the subscription worker error.
+ * ----------
+ */
+void
+pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
+							  LogicalRepMsgType command, TransactionId xid,
+							  const char *errmsg)
+{
+	PgStat_MsgSubWorkerError msg;
+	int			len;
+
+	Assert(strlen(errmsg) < PGSTAT_SUBWORKERERROR_MSGLEN);
+	len = offsetof(PgStat_MsgSubWorkerError, m_message[0]) + strlen(errmsg) + 1;
+
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERERROR);
+	msg.m_subid = subid;
+	msg.m_subrelid = subrelid;
+	msg.m_relid = relid;
+	msg.m_command = command;
+	msg.m_xid = xid;
+	msg.m_timestamp = GetCurrentTimestamp();
+	strlcpy(msg.m_message, errmsg, PGSTAT_SUBWORKERERROR_MSGLEN);
+
+	pgstat_send(&msg, len);
+}
+
 /* ----------
  * pgstat_ping() -
  *
@@ -2987,6 +3307,22 @@ pgstat_fetch_replslot(NameData slotname)
 	return pgstat_get_replslot_entry(slotname, false);
 }
 
+/*
+ * ---------
+ * pgstat_fetch_subworker() -
+ *
+ *	Support function for the SQL-callable pgstat* functions. Returns
+ *	a pointer to the subscription worker struct.
+ * ---------
+ */
+PgStat_StatSubWorkerEntry *
+pgstat_fetch_subworker(Oid subid, Oid subrelid)
+{
+	backend_read_statsfile();
+
+	return pgstat_get_subworker_entry(subid, subrelid, false);
+}
+
 /*
  * Shut down a single backend's statistics reporting at process exit.
  *
@@ -3498,6 +3834,11 @@ PgstatCollectorMain(int argc, char *argv[])
 													 len);
 					break;
 
+				case PGSTAT_MTYPE_RESETSUBWORKERERROR:
+					pgstat_recv_resetsubworkererror(&msg.msg_resetsubworkererror,
+													len);
+					break;
+
 				case PGSTAT_MTYPE_AUTOVAC_START:
 					pgstat_recv_autovac(&msg.msg_autovacuum_start, len);
 					break;
@@ -3568,6 +3909,19 @@ PgstatCollectorMain(int argc, char *argv[])
 					pgstat_recv_disconnect(&msg.msg_disconnect, len);
 					break;
 
+				case PGSTAT_MTYPE_SUBWORKERERROR:
+					pgstat_recv_subworker_error(&msg.msg_subworkererror, len);
+					break;
+
+				case PGSTAT_MTYPE_SUBWORKERERRORPURGE:
+					pgstat_recv_subworker_error_purge(&msg.msg_subworkererrorpurge,
+													  len);
+					break;
+
+				case PGSTAT_MTYPE_SUBWORKERPURGE:
+					pgstat_recv_subworker_purge(&msg.msg_subworkerpurge, len);
+					break;
+
 				default:
 					break;
 			}
@@ -3868,6 +4222,22 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
 		}
 	}
 
+	/*
+	 * Write subscription worker stats struct
+	 */
+	if (subWorkerStatHash)
+	{
+		PgStat_StatSubWorkerEntry *wentry;
+
+		hash_seq_init(&hstat, subWorkerStatHash);
+		while ((wentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&hstat)) != NULL)
+		{
+			fputc('S', fpout);
+			rc = fwrite(wentry, sizeof(PgStat_StatSubWorkerEntry), 1, fpout);
+			(void) rc;			/* we'll check for error with ferror */
+		}
+	}
+
 	/*
 	 * No more output to be done. Close the temp file and replace the old
 	 * pgstat.stat with it.  The ferror() check replaces testing for error
@@ -4329,6 +4699,48 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 					break;
 				}
 
+				/*
+				 * 'S'	A PgStat_StatSubWorkerEntry struct describing a
+				 * subscription worker statistics.
+				 */
+			case 'S':
+				{
+					PgStat_StatSubWorkerEntry wbuf;
+					PgStat_StatSubWorkerEntry *wentry;
+
+					/* Read the subscription entry */
+					if (fread(&wbuf, 1, sizeof(PgStat_StatSubWorkerEntry), fpin)
+						!= sizeof(PgStat_StatSubWorkerEntry))
+					{
+						ereport(pgStatRunningInCollector ? LOG : WARNING,
+								(errmsg("corrupted statistics file \"%s\"",
+										statfile)));
+						goto done;
+					}
+
+					/* Create hash table if we don't have it already. */
+					if (subWorkerStatHash == NULL)
+					{
+						HASHCTL		hash_ctl;
+
+						hash_ctl.keysize = sizeof(PgStat_StatSubWorkerKey);
+						hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerEntry);
+						hash_ctl.hcxt = pgStatLocalContext;
+						subWorkerStatHash = hash_create("Subscription worker stat entries",
+														PGSTAT_SUBWORKER_HASH_SIZE,
+														&hash_ctl,
+														HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+					}
+
+					/* Enter the subscription entry and initialize fields */
+					wentry =
+						(PgStat_StatSubWorkerEntry *) hash_search(subWorkerStatHash,
+																  (void *) &wbuf.key,
+																  HASH_ENTER, NULL);
+					memcpy(wentry, &wbuf, sizeof(PgStat_StatSubWorkerEntry));
+					break;
+				}
+
 			case 'E':
 				goto done;
 
@@ -4541,6 +4953,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
 	PgStat_WalStats myWalStats;
 	PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
 	PgStat_StatReplSlotEntry myReplSlotStats;
+	PgStat_StatSubWorkerEntry mySubWorkerStats;
 	FILE	   *fpin;
 	int32		format_id;
 	const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
@@ -4671,6 +5084,22 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
 				}
 				break;
 
+				/*
+				 * 'S'	A PgStat_StatSubWorkerEntry struct describing a
+				 * subscription worker statistics.
+				 */
+			case 'S':
+				if (fread(&mySubWorkerStats, 1, sizeof(mySubWorkerStats), fpin)
+					!= sizeof(mySubWorkerStats))
+				{
+					ereport(pgStatRunningInCollector ? LOG : WARNING,
+							(errmsg("corrupted statistics file \"%s\"",
+									statfile)));
+					FreeFile(fpin);
+					return false;
+				}
+				break;
+
 			case 'E':
 				goto done;
 
@@ -4876,6 +5305,7 @@ pgstat_clear_snapshot(void)
 	pgStatLocalContext = NULL;
 	pgStatDBHash = NULL;
 	replSlotStatHash = NULL;
+	subWorkerStatHash = NULL;
 
 	/*
 	 * Historically the backend_status.c facilities lived in this file, and
@@ -5344,6 +5774,33 @@ pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
 	}
 }
 
+/* ----------
+ * pgstat_recv_resetsubworkererror() -
+ *
+ *	Process a RESETSUBWORKERERROR message.
+ * ----------
+ */
+static void
+pgstat_recv_resetsubworkererror(PgStat_MsgResetsubworkererror *msg, int len)
+{
+	PgStat_StatSubWorkerEntry *wentry;
+
+	Assert(OidIsValid(msg->m_subid));
+
+	/* Get subscription worker stats */
+	wentry = pgstat_get_subworker_entry(msg->m_subid, msg->m_subrelid, false);
+
+	/*
+	 * Nothing to do if the subscription error entry is not found.  This could
+	 * happen when the subscription is dropped and the message for dropping
+	 * subscription entry arrived before the message for resetting the error.
+	 */
+	if (wentry == NULL)
+		return;
+
+	/* reset the entry and set reset timestamp */
+	pgstat_reset_subworker_error(wentry, GetCurrentTimestamp());
+}
 
 /* ----------
  * pgstat_recv_autovac() -
@@ -5816,6 +6273,93 @@ pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len)
 	}
 }
 
+/* ----------
+ * pgstat_recv_subworker_error() -
+ *
+ *	Process a SUBWORKERERROR message.
+ * ----------
+ */
+static void
+pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len)
+{
+	PgStat_StatSubWorkerEntry *wentry;
+
+	/* Get the subscription worker stats */
+	wentry = pgstat_get_subworker_entry(msg->m_subid, msg->m_subrelid, true);
+	Assert(wentry);
+
+	/*
+	 * Update only the counter and timestamp if we received the same error
+	 * again
+	 */
+	if (wentry->relid == msg->m_relid &&
+		wentry->command == msg->m_command &&
+		wentry->xid == msg->m_xid &&
+		strncmp(wentry->message, msg->m_message, strlen(wentry->message)) == 0)
+	{
+		wentry->count++;
+		wentry->timestamp = msg->m_timestamp;
+		return;
+	}
+
+	/* Otherwise, update the error information */
+	wentry->relid = msg->m_relid;
+	wentry->command = msg->m_command;
+	wentry->xid = msg->m_xid;
+	wentry->count = 1;
+	wentry->timestamp = msg->m_timestamp;
+	strlcpy(wentry->message, msg->m_message, PGSTAT_SUBWORKERERROR_MSGLEN);
+}
+
+/* ----------
+ * pgstat_recv_subworker_purge() -
+ *
+ *	Process a SUBWORKERPURGE message.
+ * ----------
+ */
+static void
+pgstat_recv_subworker_purge(PgStat_MsgSubWorkerPurge *msg, int len)
+{
+	if (subWorkerStatHash == NULL)
+		return;
+
+	for (int i = 0; i < msg->m_nentries; i++)
+	{
+		HASH_SEQ_STATUS sstat;
+		PgStat_StatSubWorkerEntry *wentry;
+
+		/* Remove all worker statistics of the subscription */
+		hash_seq_init(&sstat, subWorkerStatHash);
+		while ((wentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&sstat)) != NULL)
+		{
+			if (wentry->key.subid == msg->m_subids[i])
+				(void) hash_search(subWorkerStatHash, (void *) &(wentry->key),
+								   HASH_REMOVE, NULL);
+		}
+	}
+}
+
+/* ----------
+ * pgstat_recv_subworker_error_purge() -
+ *
+ *	Process a SUBWORKERERRORPURGE message.
+ * ----------
+ */
+static void
+pgstat_recv_subworker_error_purge(PgStat_MsgSubWorkerErrorPurge *msg, int len)
+{
+	PgStat_StatSubWorkerKey key;
+
+	key.subid = msg->m_subid;
+	for (int i = 0; i < msg->m_nentries; i++)
+	{
+		Assert(OidIsValid(msg->m_relids[i]));
+
+		key.subrelid = msg->m_relids[i];
+		(void) hash_search(subWorkerStatHash, (void *) &key, HASH_REMOVE, NULL);
+	}
+}
+
 /* ----------
  * pgstat_write_statsfile_needed() -
  *
@@ -5934,6 +6478,71 @@ pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotent, TimestampTz ts)
 	slotent->stat_reset_timestamp = ts;
 }
 
+/* ----------
+ * pgstat_get_subworker_entry
+ *
+ * Return the entry of subscription worker entry with the subscription
+ * OID and relation OID.  If subrelid is InvalidOid, it returns an entry
+ * of the apply worker otherwise of the table sync worker associated with
+ * subrelid. If no subscription entry exists, initialize it, if the
+ * create parameter is true.  Else, return NULL.
+ * ----------
+ */
+static PgStat_StatSubWorkerEntry *
+pgstat_get_subworker_entry(Oid subid, Oid subrelid, bool create)
+{
+	PgStat_StatSubWorkerEntry *wentry;
+	PgStat_StatSubWorkerKey key;
+	HASHACTION	action;
+	bool		found;
+
+	if (subWorkerStatHash == NULL)
+	{
+		HASHCTL		hash_ctl;
+
+		if (!create)
+			return NULL;
+
+		hash_ctl.keysize = sizeof(PgStat_StatSubWorkerKey);
+		hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerEntry);
+		subWorkerStatHash = hash_create("Subscription worker stat entries",
+										PGSTAT_SUBWORKER_HASH_SIZE,
+										&hash_ctl,
+										HASH_ELEM | HASH_BLOBS);
+	}
+
+	key.subid = subid;
+	key.subrelid = subrelid;
+	action = (create ? HASH_ENTER : HASH_FIND);
+	wentry = (PgStat_StatSubWorkerEntry *) hash_search(subWorkerStatHash,
+													   (void *) &key,
+													   action, &found);
+
+	/* initialize fields */
+	if (create && !found)
+		pgstat_reset_subworker_error(wentry, 0);
+
+	return wentry;
+}
+
+/* ----------
+ * pgstat_reset_subworker_error
+ *
+ * Reset the given subscription worker error stats.
+ * ----------
+ */
+static void
+pgstat_reset_subworker_error(PgStat_StatSubWorkerEntry *wentry, TimestampTz ts)
+{
+	wentry->relid = InvalidOid;
+	wentry->command = 0;
+	wentry->xid = InvalidTransactionId;
+	wentry->count = 0;
+	wentry->timestamp = 0;
+	wentry->message[0] = '\0';
+	wentry->stat_reset_timestamp = ts;
+}
+
 /*
  * pgstat_slru_index
  *
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 8d96c926b4..ac3236a573 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3329,6 +3329,7 @@ void
 ApplyWorkerMain(Datum main_arg)
 {
 	int			worker_slot = DatumGetInt32(main_arg);
+	MemoryContext cctx = CurrentMemoryContext;
 	MemoryContext oldctx;
 	char		originname[NAMEDATALEN];
 	XLogRecPtr	origin_startpos;
@@ -3429,8 +3430,27 @@ ApplyWorkerMain(Datum main_arg)
 	{
 		char	   *syncslotname;
 
-		/* This is table synchronization worker, call initial sync. */
-		syncslotname = LogicalRepSyncTableStart(&origin_startpos);
+		PG_TRY();
+		{
+			/* This is table synchronization worker, call initial sync. */
+			syncslotname = LogicalRepSyncTableStart(&origin_startpos);
+		}
+		PG_CATCH();
+		{
+			MemoryContext ecxt = MemoryContextSwitchTo(cctx);
+			ErrorData  *errdata = CopyErrorData();
+
+			/* report the table sync error */
+			pgstat_report_subworker_error(MyLogicalRepWorker->subid,
+										  MyLogicalRepWorker->relid,
+										  MyLogicalRepWorker->relid,
+										  0,
+										  InvalidTransactionId,
+										  errdata->message);
+			MemoryContextSwitchTo(ecxt);
+			PG_RE_THROW();
+		}
+		PG_END_TRY();
 
 		/* allocate slot name in long-lived context */
 		myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
@@ -3548,7 +3568,32 @@ ApplyWorkerMain(Datum main_arg)
 	}
 
 	/* Run the main loop. */
-	LogicalRepApplyLoop(origin_startpos);
+	PG_TRY();
+	{
+		LogicalRepApplyLoop(origin_startpos);
+	}
+	PG_CATCH();
+	{
+		/* report the apply error */
+		if (apply_error_callback_arg.command != 0)
+		{
+			MemoryContext ecxt = MemoryContextSwitchTo(cctx);
+			ErrorData  *errdata = CopyErrorData();
+
+			pgstat_report_subworker_error(MyLogicalRepWorker->subid,
+										  MyLogicalRepWorker->relid,
+										  apply_error_callback_arg.rel != NULL
+										  ? apply_error_callback_arg.rel->localreloid
+										  : InvalidOid,
+										  apply_error_callback_arg.command,
+										  apply_error_callback_arg.remote_xid,
+										  errdata->message);
+			MemoryContextSwitchTo(ecxt);
+		}
+
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
 
 	proc_exit(0);
 }
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index ff5aedc99c..b2e324036c 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -24,6 +24,7 @@
 #include "pgstat.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/postmaster.h"
+#include "replication/logicalproto.h"
 #include "replication/slot.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
@@ -2239,6 +2240,23 @@ pg_stat_reset_replication_slot(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
+/* Reset a subscription error stats */
+Datum
+pg_stat_reset_subscription_error(PG_FUNCTION_ARGS)
+{
+	Oid			subid = PG_GETARG_OID(0);
+	Oid			relid;
+
+	if (PG_ARGISNULL(1))
+		relid = InvalidOid;		/* reset apply worker error stats */
+	else
+		relid = PG_GETARG_OID(1);	/* reset table sync worker error stats */
+
+	pgstat_reset_subworker_error_stats(subid, relid);
+
+	PG_RETURN_VOID();
+}
+
 Datum
 pg_stat_get_archiver(PG_FUNCTION_ARGS)
 {
@@ -2379,3 +2397,106 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
 }
+
+/*
+ * Get the subscription error for the given subscription (and relation).
+ */
+Datum
+pg_stat_get_subscription_error(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_GET_SUBSCRIPTION_ERROR_COLS 9
+	Oid			subid = PG_GETARG_OID(0);
+	Oid			subrelid;
+	TupleDesc	tupdesc;
+	Datum		values[PG_STAT_GET_SUBSCRIPTION_ERROR_COLS];
+	bool		nulls[PG_STAT_GET_SUBSCRIPTION_ERROR_COLS];
+	PgStat_StatSubWorkerEntry *wentry;
+	int			i;
+
+	if (PG_ARGISNULL(1))
+		subrelid = InvalidOid;
+	else
+		subrelid = PG_GETARG_OID(1);
+
+	/* Initialise attributes information in the tuple descriptor */
+	tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_SUBSCRIPTION_ERROR_COLS);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "subid",
+					   OIDOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "subrelid",
+					   OIDOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "relid",
+					   OIDOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "command",
+					   TEXTOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "xid",
+					   XIDOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "error_message",
+					   TEXTOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "last_failed_time",
+					   TIMESTAMPTZOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "stats_reset",
+					   TIMESTAMPTZOID, -1, 0);
+	BlessTupleDesc(tupdesc);
+
+	/* Get subscription worker stats */
+	wentry = pgstat_fetch_subworker(subid, subrelid);
+
+	/* Return NULL if the subscription doesn't have any errors */
+	if (wentry == NULL)
+		PG_RETURN_NULL();
+
+	/* Initialise values and NULL flags arrays */
+	MemSet(values, 0, sizeof(values));
+	MemSet(nulls, 0, sizeof(nulls));
+
+	i = 0;
+	/* subid */
+	values[i++] = ObjectIdGetDatum(subid);
+
+	/* subrelid */
+	if (OidIsValid(subrelid))
+		values[i++] = ObjectIdGetDatum(subrelid);
+	else
+		nulls[i++] = true;
+
+	/* relid */
+	if (OidIsValid(wentry->relid))
+		values[i++] = ObjectIdGetDatum(wentry->relid);
+	else
+		nulls[i++] = true;
+
+	/* command */
+	if (wentry->command != 0)
+		values[i++] = CStringGetTextDatum(logicalrep_message_type(wentry->command));
+	else
+		nulls[i++] = true;
+
+	/* xid */
+	if (TransactionIdIsValid(wentry->xid))
+		values[i++] = TransactionIdGetDatum(wentry->xid);
+	else
+		nulls[i++] = true;
+
+	/* count */
+	values[i++] = Int64GetDatum(wentry->count);
+
+	/* error_message */
+	values[i++] = CStringGetTextDatum(wentry->message);
+
+	/* last_failed_time */
+	if (wentry->timestamp != 0)
+		values[i++] = TimestampTzGetDatum(wentry->timestamp);
+	else
+		nulls[i++] = true;
+
+	/* stats_reset */
+	if (wentry->stat_reset_timestamp != 0)
+		values[i++] = TimestampTzGetDatum(wentry->stat_reset_timestamp);
+	else
+		nulls[i++] = true;
+
+	/* Returns the record as Datum */
+	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
+}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index d068d6532e..a901fe9a55 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5385,6 +5385,14 @@
   proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
   proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
   prosrc => 'pg_stat_get_replication_slot' },
+{ oid => '8523', descr => 'statistics: information about subscription error',
+  proname => 'pg_stat_get_subscription_error', prorows => '1', proisstrict => 'f',
+  proretset => 't', provolatile => 's', proparallel => 'r',
+  prorettype => 'record', proargtypes => 'oid oid',
+  proallargtypes => '{oid,oid,oid,oid,oid,text,xid,int8,text,timestamptz,timestamptz}',
+  proargmodes => '{i,i,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subrelid,subid,subrelid,relid,command,xid,count,error_message,last_failed_time,stats_reset}',
+  prosrc => 'pg_stat_get_subscription_error' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
@@ -5772,6 +5780,11 @@
   proname => 'pg_stat_reset_replication_slot', proisstrict => 'f',
   provolatile => 'v', prorettype => 'void', proargtypes => 'text',
   prosrc => 'pg_stat_reset_replication_slot' },
+{ oid => '8524',
+  descr => 'statistics: reset collected statistics for a single subscription error',
+  proname => 'pg_stat_reset_subscription_error', proisstrict => 'f',
+  provolatile => 'v', prorettype => 'void', proargtypes => 'oid oid',
+  prosrc => 'pg_stat_reset_subscription_error' },
 
 { oid => '3163', descr => 'current trigger depth',
   proname => 'pg_trigger_depth', provolatile => 's', proparallel => 'r',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index bcd3588ea2..fdcfea3ec4 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -14,6 +14,7 @@
 #include "datatype/timestamp.h"
 #include "portability/instr_time.h"
 #include "postmaster/pgarch.h"	/* for MAX_XFN_CHARS */
+#include "replication/logicalproto.h"
 #include "utils/backend_progress.h" /* for backward compatibility */
 #include "utils/backend_status.h"	/* for backward compatibility */
 #include "utils/hsearch.h"
@@ -66,6 +67,7 @@ typedef enum StatMsgType
 	PGSTAT_MTYPE_RESETSINGLECOUNTER,
 	PGSTAT_MTYPE_RESETSLRUCOUNTER,
 	PGSTAT_MTYPE_RESETREPLSLOTCOUNTER,
+	PGSTAT_MTYPE_RESETSUBWORKERERROR,
 	PGSTAT_MTYPE_AUTOVAC_START,
 	PGSTAT_MTYPE_VACUUM,
 	PGSTAT_MTYPE_ANALYZE,
@@ -83,6 +85,9 @@ typedef enum StatMsgType
 	PGSTAT_MTYPE_REPLSLOT,
 	PGSTAT_MTYPE_CONNECT,
 	PGSTAT_MTYPE_DISCONNECT,
+	PGSTAT_MTYPE_SUBWORKERERROR,
+	PGSTAT_MTYPE_SUBWORKERERRORPURGE,
+	PGSTAT_MTYPE_SUBWORKERPURGE,
 } StatMsgType;
 
 /* ----------
@@ -389,6 +394,24 @@ typedef struct PgStat_MsgResetreplslotcounter
 	bool		clearall;
 } PgStat_MsgResetreplslotcounter;
 
+/* ----------
+ * PgStat_MsgRestsubworkererror	Sent by the backend to reset the subscription
+ *								worker error information.
+ * ----------
+ */
+typedef struct PgStat_MsgResetsubworkererror
+{
+	PgStat_MsgHdr m_hdr;
+
+	/*
+	 * Same as PgStat_MsgSubWorkerError, m_subid and m_subrelid are used to
+	 * determine the subscription and the reporter of the error: the apply
+	 * worker or the table sync worker.
+	 */
+	Oid			m_subid;
+	Oid			m_subrelid;
+} PgStat_MsgResetsubworkererror;
+
 /* ----------
  * PgStat_MsgAutovacStart		Sent by the autovacuum daemon to signal
  *								that a database is going to be processed
@@ -536,6 +559,67 @@ typedef struct PgStat_MsgReplSlot
 	PgStat_Counter m_total_bytes;
 } PgStat_MsgReplSlot;
 
+/* ----------
+ * PgStat_MsgSubWorkerError		Sent by the apply worker or the table sync worker to
+ *								report the error occurred during logical replication.
+ * ----------
+ */
+#define PGSTAT_SUBWORKERERROR_MSGLEN 256
+typedef struct PgStat_MsgSubWorkerError
+{
+	PgStat_MsgHdr m_hdr;
+
+	/*
+	 * m_subid and m_subrelid are used to determine the subscription and the
+	 * reporter of the error. m_subrelid is InvalidOid if reported by an apply
+	 * worker otherwise reported by a table sync worker.
+	 */
+	Oid			m_subid;
+	Oid			m_subrelid;
+
+	/*
+	 * Oid of the table that the reporter was actually processing. This can be
+	 * InvalidOid if the worker was applying a non-data-modification change
+	 * such as STREAM_STOP.
+	 */
+	Oid			m_relid;
+
+	LogicalRepMsgType m_command;
+	TransactionId m_xid;
+	TimestampTz m_timestamp;
+	char		m_message[PGSTAT_SUBWORKERERROR_MSGLEN];
+} PgStat_MsgSubWorkerError;
+
+/* ----------
+ * PgStat_MsgSubWorkerPurge		Sent by the backend and autovacuum to tell the
+ *								collector about the dead subscriptions.
+ * ----------
+ */
+#define PGSTAT_NUM_SUBWORKERPURGE  \
+	((PGSTAT_MSG_PAYLOAD - sizeof(int)) / sizeof(Oid))
+
+typedef struct PgStat_MsgSubWorkerPurge
+{
+	PgStat_MsgHdr m_hdr;
+	int			m_nentries;
+	Oid			m_subids[PGSTAT_NUM_SUBWORKERPURGE];
+} PgStat_MsgSubWorkerPurge;
+
+/* ----------
+ * PgStat_MsgSubWorkerErrorPurge	Sent by the backend and autovacuum to purge
+ *									the subscription errors.
+ * ----------
+ */
+#define PGSTAT_NUM_SUBWORKERERRORPURGE  \
+	((PGSTAT_MSG_PAYLOAD - sizeof(Oid) - sizeof(int)) / sizeof(Oid))
+
+typedef struct PgStat_MsgSubWorkerErrorPurge
+{
+	PgStat_MsgHdr m_hdr;
+	Oid			m_subid;
+	int			m_nentries;
+	Oid			m_relids[PGSTAT_NUM_SUBWORKERERRORPURGE];
+} PgStat_MsgSubWorkerErrorPurge;
 
 /* ----------
  * PgStat_MsgRecoveryConflict	Sent by the backend upon recovery conflict
@@ -697,6 +781,7 @@ typedef union PgStat_Msg
 	PgStat_MsgResetsinglecounter msg_resetsinglecounter;
 	PgStat_MsgResetslrucounter msg_resetslrucounter;
 	PgStat_MsgResetreplslotcounter msg_resetreplslotcounter;
+	PgStat_MsgResetsubworkererror msg_resetsubworkererror;
 	PgStat_MsgAutovacStart msg_autovacuum_start;
 	PgStat_MsgVacuum msg_vacuum;
 	PgStat_MsgAnalyze msg_analyze;
@@ -714,6 +799,9 @@ typedef union PgStat_Msg
 	PgStat_MsgReplSlot msg_replslot;
 	PgStat_MsgConnect msg_connect;
 	PgStat_MsgDisconnect msg_disconnect;
+	PgStat_MsgSubWorkerError msg_subworkererror;
+	PgStat_MsgSubWorkerErrorPurge msg_subworkererrorpurge;
+	PgStat_MsgSubWorkerPurge msg_subworkerpurge;
 } PgStat_Msg;
 
 
@@ -929,6 +1017,34 @@ typedef struct PgStat_StatReplSlotEntry
 	TimestampTz stat_reset_timestamp;
 } PgStat_StatReplSlotEntry;
 
+/* The lookup key for subscription worker hash table */
+typedef struct PgStat_StatSubWorkerKey
+{
+	Oid			subid;
+	Oid			subrelid;		/* InvalidOid for apply worker, otherwise for
+								 * table sync worker */
+} PgStat_StatSubWorkerKey;
+
+/*
+ * Logical replication apply worker and table sync worker statistics kept in the
+ * stats collector.
+ */
+typedef struct PgStat_StatSubWorkerEntry
+{
+	PgStat_StatSubWorkerKey key;	/* hash key (must be first) */
+
+	/*
+	 * Subscription worker error statistics representing an error that occurred
+	 * during application of logical replication or the initial table synchronization.
+	 */
+	Oid			relid;
+	LogicalRepMsgType command;
+	TransactionId xid;
+	PgStat_Counter count;
+	TimestampTz timestamp;
+	char		message[PGSTAT_SUBWORKERERROR_MSGLEN];
+	TimestampTz stat_reset_timestamp;
+} PgStat_StatSubWorkerEntry;
 
 /*
  * Working state needed to accumulate per-function-call timing statistics.
@@ -1022,6 +1138,7 @@ extern void pgstat_reset_shared_counters(const char *);
 extern void pgstat_reset_single_counter(Oid objectid, PgStat_Single_Reset_Type type);
 extern void pgstat_reset_slru_counter(const char *);
 extern void pgstat_reset_replslot_counter(const char *name);
+extern void pgstat_reset_subworker_error_stats(Oid subid, Oid subrelid);
 
 extern void pgstat_report_connect(Oid dboid);
 extern void pgstat_report_autovac(Oid dboid);
@@ -1038,6 +1155,9 @@ extern void pgstat_report_checksum_failure(void);
 extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat);
 extern void pgstat_report_replslot_create(const char *slotname);
 extern void pgstat_report_replslot_drop(const char *slotname);
+extern void pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
+										  LogicalRepMsgType command,
+										  TransactionId xid, const char *errmsg);
 
 extern void pgstat_initialize(void);
 
@@ -1136,6 +1256,7 @@ extern PgStat_GlobalStats *pgstat_fetch_global(void);
 extern PgStat_WalStats *pgstat_fetch_stat_wal(void);
 extern PgStat_SLRUStats *pgstat_fetch_slru(void);
 extern PgStat_StatReplSlotEntry *pgstat_fetch_replslot(NameData slotname);
+extern PgStat_StatSubWorkerEntry *pgstat_fetch_subworker(Oid subid, Oid subrelid);
 
 extern void pgstat_count_slru_page_zeroed(int slru_idx);
 extern void pgstat_count_slru_page_hit(int slru_idx);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 2fa00a3c29..7ecd4f167a 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2094,6 +2094,26 @@ pg_stat_subscription| SELECT su.oid AS subid,
     st.latest_end_time
    FROM (pg_subscription su
      LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
+pg_stat_subscription_errors| SELECT e.subid,
+    s.subname,
+    e.subrelid,
+    e.relid,
+    e.command,
+    e.xid,
+    e.count,
+    e.error_message,
+    e.last_failed_time,
+    e.stats_reset
+   FROM ( SELECT pg_subscription.oid AS subid,
+            NULL::oid AS relid
+           FROM pg_subscription
+        UNION ALL
+         SELECT pg_subscription_rel.srsubid AS subid,
+            pg_subscription_rel.srrelid AS relid
+           FROM pg_subscription_rel
+          WHERE (pg_subscription_rel.srsubstate <> 'r'::"char")) sr,
+    (LATERAL pg_stat_get_subscription_error(sr.subid, sr.relid) e(subid, subrelid, relid, command, xid, count, error_message, last_failed_time, stats_reset)
+     JOIN pg_subscription s ON ((e.subid = s.oid)));
 pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid,
     pg_stat_all_indexes.indexrelid,
     pg_stat_all_indexes.schemaname,
diff --git a/src/test/subscription/t/025_error_report.pl b/src/test/subscription/t/025_error_report.pl
new file mode 100644
index 0000000000..c6fea0d046
--- /dev/null
+++ b/src/test/subscription/t/025_error_report.pl
@@ -0,0 +1,154 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Tests for subscription error reporting.
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 5;
+
+# Test if the error reported on pg_subscription_errors view is expected.
+sub test_subscription_error
+{
+    my ($node, $relname, $xid, $expected_error, $msg) = @_;
+
+    my $check_sql = qq[
+SELECT count(1) > 0 FROM pg_stat_subscription_errors
+WHERE relid = '$relname'::regclass];
+    $check_sql .= " AND xid = '$xid'::xid;" if $xid ne '';
+
+    # Wait for the error statistics to be updated.
+    $node->poll_query_until(
+	'postgres', $check_sql,
+) or die "Timed out while waiting for statistics to be updated";
+
+    my $result = $node->safe_psql(
+	'postgres',
+	qq[
+SELECT subname, command, relid::regclass, count > 0
+FROM pg_stat_subscription_errors
+WHERE relid = '$relname'::regclass;
+]);
+    is($result, $expected_error, $msg);
+}
+
+# Create publisher node.
+my $node_publisher = PostgresNode->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+			     qq[
+max_prepared_transactions = 10
+logical_decoding_work_mem = 64kB
+]);
+$node_publisher->start;
+
+# Create subscriber node.
+my $node_subscriber = PostgresNode->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+
+# The subscriber will enter an infinite error loop, so we don't want
+# to overflow the server log with error messages.
+$node_subscriber->append_conf('postgresql.conf',
+			      qq[
+max_prepared_transactions = 10
+wal_retrieve_retry_interval = 5s
+]);
+$node_subscriber->start;
+
+# Initial table setup on both publisher and subscriber. On subscriber we create
+# the same tables but with primary keys. Also, insert some data that will conflict
+# with the data replicated from publisher later.
+$node_publisher->safe_psql(
+    'postgres',
+    q[
+BEGIN;
+CREATE TABLE test_tab1 (a int);
+CREATE TABLE test_tab2 (a int);
+CREATE TABLE test_tab_streaming (a int, b text);
+INSERT INTO test_tab1 VALUES (1);
+INSERT INTO test_tab2 VALUES (1);
+COMMIT;
+]);
+$node_subscriber->safe_psql(
+    'postgres',
+    q[
+BEGIN;
+CREATE TABLE test_tab1 (a int primary key);
+CREATE TABLE test_tab2 (a int primary key);
+CREATE TABLE test_tab_streaming (a int primary key, b text);
+INSERT INTO test_tab2 VALUES (1);
+INSERT INTO test_tab_streaming SELECT 10000, md5(10000::text);
+COMMIT;
+]);
+
+# Setup publications.
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+			   q[
+CREATE PUBLICATION tap_pub FOR TABLE test_tab1, test_tab2;
+CREATE PUBLICATION tap_pub_streaming FOR TABLE test_tab_streaming;
+]);
+
+# Check if there is no subscription errors before starting logical replication.
+my $result =
+    $node_subscriber->safe_psql('postgres',
+				"SELECT count(1) FROM pg_stat_subscription_errors");
+is($result, qq(0), 'check no subscription error');
+
+# Create subscriptions. The table sync for test_tab2 on tap_sub will enter to
+# infinite error due to violating the unique constraint.
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql(
+    'postgres',
+    "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = off, two_phase = on);");
+my $appname_streaming = 'tap_sub_streaming';
+$node_subscriber->safe_psql(
+    'postgres',
+    "CREATE SUBSCRIPTION tap_sub_streaming CONNECTION '$publisher_connstr application_name=$appname_streaming' PUBLICATION tap_pub_streaming WITH (streaming = on, two_phase = on);");
+
+$node_publisher->wait_for_catchup($appname);
+$node_publisher->wait_for_catchup($appname_streaming);
+
+# Wait for initial table sync for test_tab1 and test_tab_streaming to finish.
+$node_subscriber->poll_query_until('postgres',
+				   q[
+SELECT count(1) = 2 FROM pg_subscription_rel
+WHERE srrelid in ('test_tab1'::regclass, 'test_tab_streaming'::regclass) AND srsubstate in ('r', 's')
+]) or die "Timed out while waiting for subscriber to synchronize data";
+
+# Check the initial data.
+$result = $node_subscriber->safe_psql('postgres',
+				      "SELECT count(a) FROM test_tab1");
+is($result, q(1), 'check initial data are copied to subscriber');
+
+# Insert more data to test_tab1, raising an error on the subscriber due to violation
+# of the unique constraint on test_tab1.
+my $xid = $node_publisher->safe_psql(
+    'postgres',
+    qq[
+BEGIN;
+INSERT INTO test_tab1 VALUES (1);
+SELECT pg_current_xact_id()::xid;
+COMMIT;
+]);
+test_subscription_error($node_subscriber, 'test_tab1', $xid,
+			qq(tap_sub|INSERT|test_tab1|t),
+			'check the error reported by the apply worker');
+
+# Check the table sync worker's error in the view.
+test_subscription_error($node_subscriber, 'test_tab2', '',
+			qq(tap_sub||test_tab2|t),
+			'check the error reported by the table sync worker');
+
+# Check if the view doesn't show any entries after dropping the subscriptions.
+$node_subscriber->safe_psql(
+    'postgres',
+    q[
+DROP SUBSCRIPTION tap_sub;
+DROP SUBSCRIPTION tap_sub_streaming;
+]);
+$result = $node_subscriber->safe_psql('postgres',
+				      "SELECT count(1) FROM pg_stat_subscription_errors");
+is($result, q(0), 'no error after dropping subscription');
+
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index cb5b5ec74c..8ff6294267 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1939,7 +1939,11 @@ PgStat_MsgResetreplslotcounter
 PgStat_MsgResetsharedcounter
 PgStat_MsgResetsinglecounter
 PgStat_MsgResetslrucounter
+PgStat_MsgResetsubworkererror
 PgStat_MsgSLRU
+PgStat_MsgSubWorkerError
+PgStat_MsgSubWorkerErrorPurge
+PgStat_MsgSubWorkerPurge
 PgStat_MsgTabpurge
 PgStat_MsgTabstat
 PgStat_MsgTempFile
@@ -1951,6 +1955,9 @@ PgStat_Single_Reset_Type
 PgStat_StatDBEntry
 PgStat_StatFuncEntry
 PgStat_StatReplSlotEntry
+PgStat_StatSubWorkerEntry
+PgStat_StatSubWorkerKey
+PgStat_SubWorkerError
 PgStat_StatTabEntry
 PgStat_SubXactStatus
 PgStat_TableCounts
-- 
2.24.3 (Apple Git-128)

