From 354b0b6a0fa00c3cb487103084ce5618d6c6a38f Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 28 Jun 2021 13:22:13 +0900
Subject: [PATCH v1 3/3] Add pg_stat_logical_replication_error statistics view.

---
 src/backend/catalog/system_views.sql     |  10 ++
 src/backend/postmaster/pgstat.c          | 207 +++++++++++++++++++++++
 src/backend/replication/logical/worker.c |  28 ++-
 src/backend/utils/adt/pgstatfuncs.c      |  57 +++++++
 src/include/catalog/pg_proc.dat          |   8 +
 src/include/pgstat.h                     |  31 ++++
 src/test/regress/expected/rules.out      |   7 +
 7 files changed, 347 insertions(+), 1 deletion(-)

diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 999d984068..e00d6e4fc0 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1257,3 +1257,13 @@ REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
               substream, subslotname, subsynccommit, subpublications)
     ON pg_subscription TO public;
+
+CREATE VIEW pg_stat_logical_replication_error AS
+    SELECT
+            s.subname,
+	    e.relid,
+	    e.action,
+	    e.xid,
+	    e.last_failure
+    FROM pg_subscription as s,
+        LATERAL pg_stat_get_logical_replication_error(oid) as e;
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index b0d07c0e0b..51bb73196e 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -41,6 +41,7 @@
 #include "catalog/partition.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_proc.h"
+#include "catalog/pg_subscription.h"
 #include "common/ip.h"
 #include "executor/instrument.h"
 #include "libpq/libpq.h"
@@ -106,6 +107,7 @@
 #define PGSTAT_TAB_HASH_SIZE	512
 #define PGSTAT_FUNCTION_HASH_SIZE	512
 #define PGSTAT_REPLSLOT_HASH_SIZE	32
+#define PGSTAT_LOGICALREP_ERR_HASH_SIZE	32
 
 
 /* ----------
@@ -279,6 +281,7 @@ static PgStat_GlobalStats globalStats;
 static PgStat_WalStats walStats;
 static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
 static HTAB *replSlotStatHash = NULL;
+static HTAB *logicalRepErrHash = NULL;
 
 /*
  * List of OIDs of databases we need to write out.  If an entry is InvalidOid,
@@ -320,6 +323,8 @@ static bool pgstat_db_requested(Oid databaseid);
 static PgStat_StatReplSlotEntry *pgstat_get_replslot_entry(NameData name, bool create_it);
 static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, TimestampTz ts);
 
+static PgStat_LogicalRepErrEntry * pgstat_get_logicalrep_error_entry(Oid subid, bool create);
+
 static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg);
 static void pgstat_send_funcstats(void);
 static void pgstat_send_slru(void);
@@ -358,6 +363,7 @@ static void pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len
 static void pgstat_recv_connstat(PgStat_MsgConn *msg, int len);
 static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
 static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
+static void pgstat_recv_logicalrep_error(PgStat_MsgLogicalRepErr *msg, int len);
 
 /* ------------------------------------------------------------
  * Public functions called from postmaster follow
@@ -1134,6 +1140,25 @@ pgstat_vacuum_stat(void)
 		}
 	}
 
+	if (logicalRepErrHash)
+	{
+		PgStat_LogicalRepErrEntry *errentry;
+		HTAB	*htab;
+
+		htab = pgstat_collect_oids(SubscriptionRelationId, Anum_pg_subscription_oid);
+
+		hash_seq_init(&hstat, logicalRepErrHash);
+		while ((errentry = (PgStat_LogicalRepErrEntry *) hash_seq_search(&hstat)) != NULL)
+		{
+			CHECK_FOR_INTERRUPTS();
+
+			if (hash_search(htab, (void *) &errentry->subid, HASH_FIND, NULL) == NULL)
+				pgstat_report_logicalrep_error_clear(errentry->subid);
+		}
+
+		hash_destroy(htab);
+	}
+
 	/*
 	 * Lookup our own database entry; if not found, nothing more to do.
 	 */
@@ -1863,6 +1888,46 @@ pgstat_report_replslot_drop(const char *slotname)
 	pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
 }
 
+/* ----------
+ * pgstat_report_logicalrep_error() -
+ *
+ *	Tell the collector about error of logical replication transaction.
+ * ----------
+ */
+void
+pgstat_report_logicalrep_error(Oid subid, LogicalRepMsgType action,
+							   TransactionId xid, Oid relid)
+{
+	PgStat_MsgLogicalRepErr msg;
+
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_LOGICALREPERROR);
+	msg.m_subid = subid;
+	msg.m_clear = false;
+	msg.m_action = action;
+	msg.m_xid = xid;
+	msg.m_relid = relid;
+	msg.m_last_failure = GetCurrentTimestamp();
+	pgstat_send(&msg, sizeof(PgStat_MsgLogicalRepErr));
+}
+
+/* ----------
+ * pgstat_report_logicalrep_error_clear() -
+ *
+ *	Tell the collector about dropping the subscription, clearing
+ *	the corresponding logical replication error information.
+ * ----------
+ */
+void
+pgstat_report_logicalrep_error_clear(Oid subid)
+{
+	PgStat_MsgLogicalRepErr msg;
+
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_LOGICALREPERROR);
+	msg.m_subid = subid;
+	msg.m_clear = true;
+	pgstat_send(&msg, sizeof(PgStat_MsgLogicalRepErr));
+}
+
 /* ----------
  * pgstat_ping() -
  *
@@ -2895,6 +2960,23 @@ pgstat_fetch_replslot(NameData slotname)
 	return pgstat_get_replslot_entry(slotname, false);
 }
 
+/*
+ * ---------
+ * pgstat_fetch_replslot() -
+ *
+ *	Support function for the SQL-callable pgstat* functions. Returns
+ *	a pointer to the logical replication error struct.
+ * ---------
+ */
+PgStat_LogicalRepErrEntry *
+pgstat_fetch_logicalrep_error(Oid subid)
+{
+	backend_read_statsfile();
+
+	return pgstat_get_logicalrep_error_entry(subid, false);
+}
+
+
 /*
  * Shut down a single backend's statistics reporting at process exit.
  *
@@ -3424,6 +3506,10 @@ PgstatCollectorMain(int argc, char *argv[])
 					pgstat_recv_connstat(&msg.msg_conn, len);
 					break;
 
+				case PGSTAT_MTYPE_LOGICALREPERROR:
+					pgstat_recv_logicalrep_error(&msg.msg_logicalreperr, len);
+					break;
+
 				default:
 					break;
 			}
@@ -3725,6 +3811,22 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
 		}
 	}
 
+	/*
+	 * Write logical replication transaction error struct.
+	 */
+	if (logicalRepErrHash)
+	{
+		PgStat_LogicalRepErrEntry *errent;
+
+		hash_seq_init(&hstat, logicalRepErrHash);
+		while ((errent = (PgStat_LogicalRepErrEntry *) hash_seq_search(&hstat)) != NULL)
+		{
+			fputc('L', fpout);
+			rc = fwrite(errent, sizeof(PgStat_LogicalRepErrEntry), 1, fpout);
+			(void) rc;			/* we'll check for error with ferror */
+		}
+	}
+
 	/*
 	 * No more output to be done. Close the temp file and replace the old
 	 * pgstat.stat with it.  The ferror() check replaces testing for error
@@ -4184,6 +4286,46 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 					break;
 				}
 
+			case 'L':
+				{
+					PgStat_LogicalRepErrEntry errbuf;
+					PgStat_LogicalRepErrEntry *errent;
+
+					if (fread(&errbuf, 1, sizeof(PgStat_LogicalRepErrEntry), fpin)
+						!= sizeof(PgStat_LogicalRepErrEntry))
+					{
+						ereport(pgStatRunningInCollector ? LOG : WARNING,
+								(errmsg("corrupted statistics file \"%s\"",
+										statfile)));
+						goto done;
+					}
+
+					/* Create hash table if we don't have it already. */
+					if (logicalRepErrHash == NULL)
+					{
+						HASHCTL		hash_ctl;
+
+						hash_ctl.keysize = sizeof(Oid);
+						hash_ctl.entrysize = sizeof(PgStat_LogicalRepErrEntry);
+						hash_ctl.hcxt = pgStatLocalContext;
+						logicalRepErrHash = hash_create("Logical replication transaction error hash",
+														PGSTAT_LOGICALREP_ERR_HASH_SIZE,
+														&hash_ctl,
+														HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+					}
+
+					errent =
+						(PgStat_LogicalRepErrEntry *) hash_search(logicalRepErrHash,
+																  (void *) &errbuf.subid,
+																  HASH_ENTER, NULL);
+					errent->subid = errbuf.subid;
+					errent->relid = errbuf.relid;
+					errent->action = errbuf.action;
+					errent->xid = errbuf.xid;
+					errent->last_failure = errbuf.last_failure;
+					break;
+				}
+
 			case 'E':
 				goto done;
 
@@ -4396,6 +4538,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
 	PgStat_WalStats myWalStats;
 	PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
 	PgStat_StatReplSlotEntry myReplSlotStats;
+	PgStat_LogicalRepErrEntry myLogicalRepErrs;
 	FILE	   *fpin;
 	int32		format_id;
 	const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
@@ -4526,6 +4669,18 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
 				}
 				break;
 
+			case 'L':
+				if (fread(&myLogicalRepErrs, 1, sizeof(PgStat_LogicalRepErrEntry), fpin)
+					!= sizeof(PgStat_LogicalRepErrEntry))
+				{
+					ereport(pgStatRunningInCollector ? LOG : WARNING,
+							(errmsg("corrupted statistics file \"%s\"",
+									statfile)));
+					FreeFile(fpin);
+					return false;
+				}
+				break;
+
 			case 'E':
 				goto done;
 
@@ -4716,6 +4871,7 @@ pgstat_clear_snapshot(void)
 	pgStatLocalContext = NULL;
 	pgStatDBHash = NULL;
 	replSlotStatHash = NULL;
+	logicalRepErrHash = NULL;
 
 	/*
 	 * Historically the backend_status.c facilities lived in this file, and
@@ -5650,6 +5806,33 @@ pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len)
 	}
 }
 
+/* ----------
+ * pgstat_recv_logicalrep_error() -
+ *
+ *	Process a LOGICALREPERROR message.
+ * ----------
+ */
+static void
+pgstat_recv_logicalrep_error(PgStat_MsgLogicalRepErr *msg, int len)
+{
+	PgStat_LogicalRepErrEntry *errent;
+
+	if (msg->m_clear)
+	{
+		if (logicalRepErrHash != NULL)
+			hash_search(logicalRepErrHash, (void *) &msg->m_subid,
+						HASH_REMOVE, NULL);
+		return;
+	}
+
+	errent = pgstat_get_logicalrep_error_entry(msg->m_subid, true);
+
+	errent->relid = msg->m_relid;
+	errent->action = msg->m_action;
+	errent->xid = msg->m_xid;
+	errent->last_failure = msg->m_last_failure;
+}
+
 /* ----------
  * pgstat_write_statsfile_needed() -
  *
@@ -5747,6 +5930,30 @@ pgstat_get_replslot_entry(NameData name, bool create)
 	return slotent;
 }
 
+static PgStat_LogicalRepErrEntry *
+pgstat_get_logicalrep_error_entry(Oid subid, bool create)
+{
+	PgStat_LogicalRepErrEntry *errent;
+	HASHACTION	action = (create ? HASH_ENTER : HASH_FIND);
+
+	if (logicalRepErrHash == NULL)
+	{
+		HASHCTL		hash_ctl;
+
+		hash_ctl.keysize = sizeof(Oid);
+		hash_ctl.entrysize = sizeof(PgStat_LogicalRepErrEntry);
+		logicalRepErrHash = hash_create("Logical replication transaction error hash",
+										PGSTAT_LOGICALREP_ERR_HASH_SIZE,
+										&hash_ctl,
+										HASH_ELEM | HASH_BLOBS);
+	}
+
+	errent = (PgStat_LogicalRepErrEntry *) hash_search(logicalRepErrHash,
+													   (void *) &subid,
+													   action, NULL);
+	return errent;
+}
+
 /* ----------
  * pgstat_reset_replslot
  *
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b65f72c9a4..3a09c27b16 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3349,7 +3349,30 @@ ApplyWorkerMain(Datum main_arg)
 	walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
 
 	/* Run the main loop. */
-	LogicalRepApplyLoop(origin_startpos);
+	PG_TRY();
+	{
+		LogicalRepApplyLoop(origin_startpos);
+	}
+	PG_CATCH();
+	{
+		if (apply_error_callback_arg.action != -1)
+		{
+			Oid relid;
+
+			if (apply_error_callback_arg.rel)
+				relid = apply_error_callback_arg.rel->localreloid;
+			else
+				relid = InvalidOid;
+
+			pgstat_report_logicalrep_error(MySubscription->oid,
+										   apply_error_callback_arg.action,
+										   apply_error_callback_arg.remote_xid,
+										   relid);
+		}
+
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
 
 	proc_exit(0);
 }
@@ -3459,6 +3482,9 @@ stop_skipping_changes(bool reset_xid, LogicalRepCommitData *commit_data)
 
 	CommitTransactionCommand();
 
+	/* Clear the error information in the stats collector too */
+	pgstat_report_logicalrep_error_clear(MySubscription->oid);
+
 	return true;
 }
 
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 14056f5347..6dd4f3887f 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -24,6 +24,7 @@
 #include "pgstat.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/postmaster.h"
+#include "replication/logicalproto.h"
 #include "replication/slot.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
@@ -2380,3 +2381,59 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
 }
+
+/*
+ * Get the logical replication error for the given subscription.
+ */
+Datum
+pg_stat_get_logical_replication_error(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_GET_LOGICAL_REPLICATION_ERROR_COLS 5
+	Oid			subid = PG_GETARG_OID(0);
+	TupleDesc	tupdesc;
+	Datum		values[PG_STAT_GET_LOGICAL_REPLICATION_ERROR_COLS];
+	bool		nulls[PG_STAT_GET_LOGICAL_REPLICATION_ERROR_COLS];
+	PgStat_LogicalRepErrEntry *errent;
+
+	/* Initialise values and NULL flags arrays */
+	MemSet(values, 0, sizeof(values));
+	MemSet(nulls, 0, sizeof(nulls));
+
+	/* Initialise attributes information in the tuple descriptor */
+	tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_LOGICAL_REPLICATION_ERROR_COLS);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "subid",
+					   OIDOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
+					   OIDOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "action",
+					   TEXTOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "xid",
+					   XIDOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "last_failure",
+					   TIMESTAMPTZOID, -1, 0);
+	BlessTupleDesc(tupdesc);
+
+	errent = pgstat_fetch_logicalrep_error(subid);
+
+	values[0] = ObjectIdGetDatum(subid);
+
+	if (!errent)
+	{
+		MemSet(nulls, true, sizeof(nulls));
+		nulls[0] = false;
+	}
+	else
+	{
+		if (OidIsValid(errent->relid))
+			values[1] = ObjectIdGetDatum(errent->relid);
+		else
+			nulls[1] = true;
+
+		values[2] = CStringGetTextDatum(logicalrep_action(errent->action));
+		values[3] = TransactionIdGetDatum(errent->xid);
+		values[4] = TimestampTzGetDatum(errent->last_failure);
+	}
+
+	/* Returns the record as Datum */
+	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
+}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index fde251fa4f..1c5283a4e3 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5317,6 +5317,14 @@
   proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
   proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
   prosrc => 'pg_stat_get_replication_slot' },
+{ oid => '8523', descr => 'statistics: information about logical replication error',
+  proname => 'pg_stat_get_logical_replication_error', prorows => '1', proisstrict => 'f',
+  proretset => 't', provolatile => 's', proparallel => 'r',
+  prorettype => 'record', proargtypes => 'oid',
+  proallargtypes => '{oid,oid,oid,text,xid,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o}',
+  proargnames => '{subid,subid,relid,action,xid,last_failure}',
+  prosrc => 'pg_stat_get_logical_replication_error' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 9612c0a6c2..46f6d29a21 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -14,6 +14,7 @@
 #include "datatype/timestamp.h"
 #include "portability/instr_time.h"
 #include "postmaster/pgarch.h"	/* for MAX_XFN_CHARS */
+#include "replication/logicalproto.h"
 #include "utils/backend_progress.h" /* for backward compatibility */
 #include "utils/backend_status.h"	/* for backward compatibility */
 #include "utils/hsearch.h"
@@ -66,6 +67,7 @@ typedef enum StatMsgType
 	PGSTAT_MTYPE_RESETSINGLECOUNTER,
 	PGSTAT_MTYPE_RESETSLRUCOUNTER,
 	PGSTAT_MTYPE_RESETREPLSLOTCOUNTER,
+	PGSTAT_MTYPE_LOGICALREPERROR,
 	PGSTAT_MTYPE_AUTOVAC_START,
 	PGSTAT_MTYPE_VACUUM,
 	PGSTAT_MTYPE_ANALYZE,
@@ -539,6 +541,21 @@ typedef struct PgStat_MsgReplSlot
 	PgStat_Counter m_total_bytes;
 } PgStat_MsgReplSlot;
 
+/* ----------
+ * PgStat_MsgLogicalRepErr	Sent by a apply worker to to update the error
+ *							of logical replication transaction.
+ * ----------
+ */
+typedef struct PgStat_MsgLogicalRepErr
+{
+	PgStat_MsgHdr m_hdr;
+	Oid		m_subid;
+	bool	m_clear;
+	Oid		m_relid;
+	LogicalRepMsgType	m_action;
+	TransactionId		m_xid;
+	TimestampTz			m_last_failure;
+} PgStat_MsgLogicalRepErr;
 
 /* ----------
  * PgStat_MsgRecoveryConflict	Sent by the backend upon recovery conflict
@@ -710,6 +727,7 @@ typedef union PgStat_Msg
 	PgStat_MsgChecksumFailure msg_checksumfailure;
 	PgStat_MsgReplSlot msg_replslot;
 	PgStat_MsgConn msg_conn;
+	PgStat_MsgLogicalRepErr msg_logicalreperr;
 } PgStat_Msg;
 
 
@@ -908,6 +926,15 @@ typedef struct PgStat_StatReplSlotEntry
 	TimestampTz stat_reset_timestamp;
 } PgStat_StatReplSlotEntry;
 
+typedef struct PgStat_LogicalRepErrEntry
+{
+	Oid	subid;
+	Oid		relid;
+	LogicalRepMsgType	action;
+	TransactionId		xid;
+	TimestampTz			last_failure;
+} PgStat_LogicalRepErrEntry;
+
 
 /*
  * Working state needed to accumulate per-function-call timing statistics.
@@ -1011,6 +1038,9 @@ extern void pgstat_report_checksum_failure(void);
 extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat);
 extern void pgstat_report_replslot_create(const char *slotname);
 extern void pgstat_report_replslot_drop(const char *slotname);
+extern void pgstat_report_logicalrep_error(Oid subid, LogicalRepMsgType action,
+										   TransactionId xid, Oid relid);
+extern void pgstat_report_logicalrep_error_clear(Oid subid);
 
 extern void pgstat_initialize(void);
 
@@ -1106,6 +1136,7 @@ extern PgStat_GlobalStats *pgstat_fetch_global(void);
 extern PgStat_WalStats *pgstat_fetch_stat_wal(void);
 extern PgStat_SLRUStats *pgstat_fetch_slru(void);
 extern PgStat_StatReplSlotEntry *pgstat_fetch_replslot(NameData slotname);
+extern PgStat_LogicalRepErrEntry *pgstat_fetch_logicalrep_error(Oid subid);
 
 extern void pgstat_count_slru_page_zeroed(int slru_idx);
 extern void pgstat_count_slru_page_hit(int slru_idx);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index e5ab11275d..506801e19b 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1879,6 +1879,13 @@ pg_stat_gssapi| SELECT s.pid,
     s.gss_enc AS encrypted
    FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id)
   WHERE (s.client_port IS NOT NULL);
+pg_stat_logical_replication_error| SELECT s.subname,
+    e.relid,
+    e.action,
+    e.xid,
+    e.last_failure
+   FROM pg_subscription s,
+    LATERAL pg_stat_get_logical_replication_error(s.oid) e(subid, relid, action, xid, last_failure);
 pg_stat_progress_analyze| SELECT s.pid,
     s.datid,
     d.datname,
-- 
2.24.3 (Apple Git-128)

