From c1940cb37539030efd016d6a409ea39c72302d82 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Thu, 10 Feb 2022 21:18:11 +0900
Subject: [PATCH v11 1/2] Report error transaction's commit LSN instead of XID
 to pg_stat_subscription_workers.

---
 doc/src/sgml/monitoring.sgml                |  6 +--
 src/backend/catalog/system_views.sql        |  2 +-
 src/backend/postmaster/pgstat.c             | 10 ++---
 src/backend/replication/logical/worker.c    | 45 ++++++++++-----------
 src/backend/utils/adt/pgstatfuncs.c         | 11 ++---
 src/include/catalog/pg_proc.dat             |  4 +-
 src/include/pgstat.h                        |  6 +--
 src/test/regress/expected/rules.out         |  4 +-
 src/test/subscription/t/026_worker_stats.pl | 14 ++-----
 9 files changed, 47 insertions(+), 55 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 62f2a3332b..0820d4a320 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -3143,11 +3143,11 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>last_error_xid</structfield> <type>xid</type>
+       <structfield>last_error_lsn</structfield> <type>pg_lsn</type>
       </para>
       <para>
-       Transaction ID of the publisher node being applied when the error
-       occurred.  This field is null if the error was reported
+       The commit LSN of transaction of the publisher node being applied
+       when the error occurred.  This field is null if the error was reported
        during the initial data copy.
       </para></entry>
      </row>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 3cb69b1f87..9e9578bad4 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1271,7 +1271,7 @@ CREATE VIEW pg_stat_subscription_workers AS
         w.subrelid,
         w.last_error_relid,
         w.last_error_command,
-        w.last_error_xid,
+        w.last_error_lsn,
         w.last_error_count,
         w.last_error_message,
         w.last_error_time
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 0646f53098..9d95bcb0e3 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -1956,7 +1956,7 @@ pgstat_report_replslot_drop(const char *slotname)
  */
 void
 pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
-							  LogicalRepMsgType command, TransactionId xid,
+							  LogicalRepMsgType command, XLogRecPtr lsn,
 							  const char *errmsg)
 {
 	PgStat_MsgSubWorkerError msg;
@@ -1968,7 +1968,7 @@ pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
 	msg.m_subrelid = subrelid;
 	msg.m_relid = relid;
 	msg.m_command = command;
-	msg.m_xid = xid;
+	msg.m_lsn = lsn;
 	msg.m_timestamp = GetCurrentTimestamp();
 	strlcpy(msg.m_message, errmsg, PGSTAT_SUBWORKERERROR_MSGLEN);
 
@@ -3967,7 +3967,7 @@ pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry, Oid subid, Oid subrelid,
 	{
 		subwentry->last_error_relid = InvalidOid;
 		subwentry->last_error_command = 0;
-		subwentry->last_error_xid = InvalidTransactionId;
+		subwentry->last_error_lsn = InvalidXLogRecPtr;
 		subwentry->last_error_count = 0;
 		subwentry->last_error_time = 0;
 		subwentry->last_error_message[0] = '\0';
@@ -6173,7 +6173,7 @@ pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len)
 
 	if (subwentry->last_error_relid == msg->m_relid &&
 		subwentry->last_error_command == msg->m_command &&
-		subwentry->last_error_xid == msg->m_xid &&
+		subwentry->last_error_lsn == msg->m_lsn &&
 		strcmp(subwentry->last_error_message, msg->m_message) == 0)
 	{
 		/*
@@ -6188,7 +6188,7 @@ pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len)
 	/* Otherwise, update the error information */
 	subwentry->last_error_relid = msg->m_relid;
 	subwentry->last_error_command = msg->m_command;
-	subwentry->last_error_xid = msg->m_xid;
+	subwentry->last_error_lsn = msg->m_lsn;
 	subwentry->last_error_count = 1;
 	subwentry->last_error_time = msg->m_timestamp;
 	strlcpy(subwentry->last_error_message, msg->m_message,
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d77bb32bb9..2d2c83cd53 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -225,7 +225,7 @@ typedef struct ApplyErrorCallbackArg
 
 	/* Remote node information */
 	int			remote_attnum;	/* -1 if invalid */
-	TransactionId remote_xid;
+	XLogRecPtr	remote_lsn;
 	TimestampTz ts;				/* commit, rollback, or prepare timestamp */
 } ApplyErrorCallbackArg;
 
@@ -234,7 +234,7 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
 	.command = 0,
 	.rel = NULL,
 	.remote_attnum = -1,
-	.remote_xid = InvalidTransactionId,
+	.remote_lsn = InvalidXLogRecPtr,
 	.ts = 0,
 };
 
@@ -334,7 +334,7 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
 
 /* Functions for apply error callback */
 static void apply_error_callback(void *arg);
-static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts);
+static inline void set_apply_error_context_xact(XLogRecPtr lsn, TimestampTz ts);
 static inline void reset_apply_error_context_info(void);
 
 /*
@@ -787,7 +787,7 @@ apply_handle_begin(StringInfo s)
 	LogicalRepBeginData begin_data;
 
 	logicalrep_read_begin(s, &begin_data);
-	set_apply_error_context_xact(begin_data.xid, begin_data.committime);
+	set_apply_error_context_xact(begin_data.final_lsn, begin_data.committime);
 
 	remote_final_lsn = begin_data.final_lsn;
 
@@ -839,7 +839,7 @@ apply_handle_begin_prepare(StringInfo s)
 				 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
 
 	logicalrep_read_begin_prepare(s, &begin_data);
-	set_apply_error_context_xact(begin_data.xid, begin_data.prepare_time);
+	set_apply_error_context_xact(begin_data.prepare_lsn, begin_data.prepare_time);
 
 	remote_final_lsn = begin_data.prepare_lsn;
 
@@ -938,7 +938,7 @@ apply_handle_commit_prepared(StringInfo s)
 	char		gid[GIDSIZE];
 
 	logicalrep_read_commit_prepared(s, &prepare_data);
-	set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_time);
+	set_apply_error_context_xact(prepare_data.commit_lsn, prepare_data.commit_time);
 
 	/* Compute GID for two_phase transactions. */
 	TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
@@ -979,7 +979,8 @@ apply_handle_rollback_prepared(StringInfo s)
 	char		gid[GIDSIZE];
 
 	logicalrep_read_rollback_prepared(s, &rollback_data);
-	set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_time);
+	set_apply_error_context_xact(rollback_data.rollback_end_lsn,
+								 rollback_data.rollback_time);
 
 	/* Compute GID for two_phase transactions. */
 	TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
@@ -1044,7 +1045,8 @@ apply_handle_stream_prepare(StringInfo s)
 				 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
 
 	logicalrep_read_stream_prepare(s, &prepare_data);
-	set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_time);
+	set_apply_error_context_xact(prepare_data.prepare_lsn,
+								 prepare_data.prepare_time);
 
 	elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
 
@@ -1126,8 +1128,6 @@ apply_handle_stream_start(StringInfo s)
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg_internal("invalid transaction ID in streamed replication transaction")));
 
-	set_apply_error_context_xact(stream_xid, 0);
-
 	/*
 	 * Initialize the worker's stream_fileset if we haven't yet. This will be
 	 * used for the entire duration of the worker so create it in a permanent
@@ -1214,10 +1214,7 @@ apply_handle_stream_abort(StringInfo s)
 	 * just delete the files with serialized info.
 	 */
 	if (xid == subxid)
-	{
-		set_apply_error_context_xact(xid, 0);
 		stream_cleanup_files(MyLogicalRepWorker->subid, xid);
-	}
 	else
 	{
 		/*
@@ -1241,8 +1238,6 @@ apply_handle_stream_abort(StringInfo s)
 		bool		found = false;
 		char		path[MAXPGPATH];
 
-		set_apply_error_context_xact(subxid, 0);
-
 		subidx = -1;
 		begin_replication_step();
 		subxact_info_read(MyLogicalRepWorker->subid, xid);
@@ -1426,7 +1421,7 @@ apply_handle_stream_commit(StringInfo s)
 				 errmsg_internal("STREAM COMMIT message without STREAM STOP")));
 
 	xid = logicalrep_read_stream_commit(s, &commit_data);
-	set_apply_error_context_xact(xid, commit_data.committime);
+	set_apply_error_context_xact(commit_data.commit_lsn, commit_data.committime);
 
 	elog(DEBUG1, "received commit for streamed transaction %u", xid);
 
@@ -3499,7 +3494,7 @@ ApplyWorkerMain(Datum main_arg)
 										  MyLogicalRepWorker->relid,
 										  MyLogicalRepWorker->relid,
 										  0,	/* message type */
-										  InvalidTransactionId,
+										  InvalidXLogRecPtr,
 										  errdata->message);
 			MemoryContextSwitchTo(ecxt);
 			PG_RE_THROW();
@@ -3640,7 +3635,7 @@ ApplyWorkerMain(Datum main_arg)
 										  ? apply_error_callback_arg.rel->localreloid
 										  : InvalidOid,
 										  apply_error_callback_arg.command,
-										  apply_error_callback_arg.remote_xid,
+										  apply_error_callback_arg.remote_lsn,
 										  errdata->message);
 			MemoryContextSwitchTo(ecxt);
 		}
@@ -3687,11 +3682,13 @@ apply_error_callback(void *arg)
 	}
 
 	/* append transaction information */
-	if (TransactionIdIsNormal(errarg->remote_xid))
+	if (!XLogRecPtrIsInvalid(errarg->remote_lsn))
 	{
-		appendStringInfo(&buf, _(" in transaction %u"), errarg->remote_xid);
+		appendStringInfo(&buf, _(" in transaction which committed at %X/%X"),
+						 LSN_FORMAT_ARGS(errarg->remote_lsn));
+
 		if (errarg->ts != 0)
-			appendStringInfo(&buf, _(" at %s"),
+			appendStringInfo(&buf, _(", at %s"),
 							 timestamptz_to_str(errarg->ts));
 	}
 
@@ -3701,9 +3698,9 @@ apply_error_callback(void *arg)
 
 /* Set transaction information of apply error callback */
 static inline void
-set_apply_error_context_xact(TransactionId xid, TimestampTz ts)
+set_apply_error_context_xact(XLogRecPtr lsn, TimestampTz ts)
 {
-	apply_error_callback_arg.remote_xid = xid;
+	apply_error_callback_arg.remote_lsn = lsn;
 	apply_error_callback_arg.ts = ts;
 }
 
@@ -3714,5 +3711,5 @@ reset_apply_error_context_info(void)
 	apply_error_callback_arg.command = 0;
 	apply_error_callback_arg.rel = NULL;
 	apply_error_callback_arg.remote_attnum = -1;
-	set_apply_error_context_xact(InvalidTransactionId, 0);
+	set_apply_error_context_xact(InvalidXLogRecPtr, 0);
 }
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 15cb17ace4..697f72c276 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -30,6 +30,7 @@
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/inet.h"
+#include "utils/pg_lsn.h"
 #include "utils/timestamp.h"
 
 #define UINT32_ACCESS_ONCE(var)		 ((uint32)(*((volatile uint32 *)&(var))))
@@ -2446,8 +2447,8 @@ pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 					   OIDOID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "last_error_command",
 					   TEXTOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "last_error_xid",
-					   XIDOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "last_error_lsn",
+					   LSNOID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "last_error_count",
 					   INT8OID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "last_error_message",
@@ -2483,9 +2484,9 @@ pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 	else
 		nulls[i++] = true;
 
-	/* last_error_xid */
-	if (TransactionIdIsValid(wentry->last_error_xid))
-		values[i++] = TransactionIdGetDatum(wentry->last_error_xid);
+	/* last_error_lsn */
+	if (!XLogRecPtrIsInvalid(wentry->last_error_lsn))
+		values[i++] = LSNGetDatum(wentry->last_error_lsn);
 	else
 		nulls[i++] = true;
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 7024dbe10a..1b6b745d11 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5375,9 +5375,9 @@
   proname => 'pg_stat_get_subscription_worker', prorows => '1', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'oid oid',
-  proallargtypes => '{oid,oid,oid,oid,oid,text,xid,int8,text,timestamptz}',
+  proallargtypes => '{oid,oid,oid,oid,oid,text,pg_lsn,int8,text,timestamptz}',
   proargmodes => '{i,i,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subrelid,subid,subrelid,last_error_relid,last_error_command,last_error_xid,last_error_count,last_error_message,last_error_time}',
+  proargnames => '{subid,subrelid,subid,subrelid,last_error_relid,last_error_command,last_error_lsn,last_error_count,last_error_message,last_error_time}',
   prosrc => 'pg_stat_get_subscription_worker' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index e10d20222a..77eb799e81 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -585,7 +585,7 @@ typedef struct PgStat_MsgSubWorkerError
 	Oid			m_relid;
 
 	LogicalRepMsgType m_command;
-	TransactionId m_xid;
+	XLogRecPtr	m_lsn;
 	TimestampTz m_timestamp;
 	char		m_message[PGSTAT_SUBWORKERERROR_MSGLEN];
 } PgStat_MsgSubWorkerError;
@@ -1016,7 +1016,7 @@ typedef struct PgStat_StatSubWorkerEntry
 	 */
 	Oid			last_error_relid;
 	LogicalRepMsgType last_error_command;
-	TransactionId last_error_xid;
+	XLogRecPtr last_error_lsn;
 	PgStat_Counter last_error_count;
 	TimestampTz last_error_time;
 	char		last_error_message[PGSTAT_SUBWORKERERROR_MSGLEN];
@@ -1133,7 +1133,7 @@ extern void pgstat_report_replslot_create(const char *slotname);
 extern void pgstat_report_replslot_drop(const char *slotname);
 extern void pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
 										  LogicalRepMsgType command,
-										  TransactionId xid, const char *errmsg);
+										  XLogRecPtr lsn, const char *errmsg);
 extern void pgstat_report_subscription_drop(Oid subid);
 
 extern void pgstat_initialize(void);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index d652f7b5fb..0b2b2f81e9 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2099,7 +2099,7 @@ pg_stat_subscription_workers| SELECT w.subid,
     w.subrelid,
     w.last_error_relid,
     w.last_error_command,
-    w.last_error_xid,
+    w.last_error_lsn,
     w.last_error_count,
     w.last_error_message,
     w.last_error_time
@@ -2110,7 +2110,7 @@ pg_stat_subscription_workers| SELECT w.subid,
          SELECT pg_subscription_rel.srsubid AS subid,
             pg_subscription_rel.srrelid AS relid
            FROM pg_subscription_rel) sr,
-    (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, last_error_relid, last_error_command, last_error_xid, last_error_count, last_error_message, last_error_time)
+    (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, last_error_relid, last_error_command, last_error_lsn, last_error_count, last_error_message, last_error_time)
      JOIN pg_subscription s ON ((w.subid = s.oid)));
 pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid,
     pg_stat_all_indexes.indexrelid,
diff --git a/src/test/subscription/t/026_worker_stats.pl b/src/test/subscription/t/026_worker_stats.pl
index 6cf21c8fee..d7f6e702df 100644
--- a/src/test/subscription/t/026_worker_stats.pl
+++ b/src/test/subscription/t/026_worker_stats.pl
@@ -11,7 +11,7 @@ use Test::More tests => 3;
 # Test if the error reported on pg_stat_subscription_workers view is expected.
 sub test_subscription_error
 {
-    my ($node, $relname, $command, $xid, $by_apply_worker, $errmsg_prefix, $msg)
+    my ($node, $relname, $command, $by_apply_worker, $errmsg_prefix, $msg)
 	= @_;
 
     my $check_sql = qq[
@@ -30,11 +30,6 @@ WHERE last_error_relid = '$relname'::regclass
 	? qq[ AND last_error_command IS NULL]
 	: qq[ AND last_error_command = '$command'];
 
-    # last_error_xid
-    $check_sql .= $xid eq ''
-	? qq[ AND last_error_xid IS NULL]
-	: qq[ AND last_error_xid = '$xid'::xid];
-
     # Wait for the particular error statistics to be reported.
     $node->poll_query_until('postgres', $check_sql,
 ) or die "Timed out while waiting for " . $msg;
@@ -116,21 +111,20 @@ is($result, q(1), 'check initial data are copied to subscriber');
 
 # Insert more data to test_tab1, raising an error on the subscriber due to
 # violation of the unique constraint on test_tab1.
-my $xid = $node_publisher->safe_psql(
+$node_publisher->safe_psql(
     'postgres',
     qq[
 BEGIN;
 INSERT INTO test_tab1 VALUES (1);
-SELECT pg_current_xact_id()::xid;
 COMMIT;
 ]);
-test_subscription_error($node_subscriber, 'test_tab1', 'INSERT', $xid,
+test_subscription_error($node_subscriber, 'test_tab1', 'INSERT',
 			1,	# check apply worker error
 			qq(duplicate key value violates unique constraint),
 			'error reported by the apply worker');
 
 # Check the table sync worker's error in the view.
-test_subscription_error($node_subscriber, 'test_tab2', '', '',
+test_subscription_error($node_subscriber, 'test_tab2', '',
 			0,	# check tablesync worker error
 			qq(duplicate key value violates unique constraint),
 			'the error reported by the table sync worker');
-- 
2.24.3 (Apple Git-128)

