From 2b220abdf8ee18b8c60ef0dce584d9ec9e36aedd Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Thu, 24 Feb 2022 16:56:58 +0900
Subject: [PATCH v12 2/3] Add the origin name and remote commit-LSN to logical
 replication worker errcontext.

This commits adds both the commit-LSN and replication origin name to
the existing error context message.

This will help users in specifying the origin name and commit-LSN to
pg_replication_origin_advance() SQL function to skip the particular transaction.
---
 doc/src/sgml/logical-replication.sgml    | 19 +++++--
 src/backend/replication/logical/worker.c | 71 ++++++++++++++++++------
 2 files changed, 67 insertions(+), 23 deletions(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index fb4472356d..57272e641e 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -352,12 +352,21 @@
   <para>
    The resolution can be done either by changing data or permissions on the subscriber so
    that it does not conflict with the incoming change or by skipping the
-   transaction that conflicts with the existing data.  The transaction can be
-   skipped by calling the <link linkend="pg-replication-origin-advance">
+   transaction that conflicts with the existing data.  When a conflict produces
+   an error, it is shown in the subscriber's server logs as follows:
+<screen>
+ERROR:  duplicate key value violates unique constraint "test_pkey"
+DETAIL:  Key (c)=(1) already exists.
+CONTEXT:  processing remote data during "INSERT" for replication target relation "public.test" in transaction 725 committed at LSN 0/14BFA88 and timestamp 2022-02-28 20:58:27.964238+00 from replication origin "pg_16395"
+</screen>
+   The LSN of the transaction that contains the change violating the constraint and
+   the replication origin name can be found from those outputs (LSN 0/14C0378 and
+   replication origin <literal>pg_16395</literal> in the above case).  The transaction
+   can be skipped by calling the <link linkend="pg-replication-origin-advance">
    <function>pg_replication_origin_advance()</function></link> function with
-   a <parameter>node_name</parameter> corresponding to the subscription name,
-   and a position.  The current position of origins can be seen in the
-   <link linkend="view-pg-replication-origin-status">
+   the <parameter>node_name</parameter> and the next LSN of the commit LSN
+   (i.e., 0/14C0379) from those outputs.  The current position of origins can be
+   seen in the <link linkend="view-pg-replication-origin-status">
    <structname>pg_replication_origin_status</structname></link> system view.
   </para>
  </sect1>
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index e81f85e2a3..a159561e31 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -226,6 +226,8 @@ typedef struct ApplyErrorCallbackArg
 	/* Remote node information */
 	int			remote_attnum;	/* -1 if invalid */
 	TransactionId remote_xid;
+	XLogRecPtr	commit_lsn;
+	char	   *origin_name;
 	TimestampTz ts;				/* commit, rollback, or prepare timestamp */
 } ApplyErrorCallbackArg;
 
@@ -235,6 +237,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
 	.rel = NULL,
 	.remote_attnum = -1,
 	.remote_xid = InvalidTransactionId,
+	.commit_lsn = InvalidXLogRecPtr,
+	.origin_name = NULL,
 	.ts = 0,
 };
 
@@ -334,7 +338,8 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
 
 /* Functions for apply error callback */
 static void apply_error_callback(void *arg);
-static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts);
+static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn,
+												TimestampTz ts);
 static inline void reset_apply_error_context_info(void);
 
 /*
@@ -787,7 +792,8 @@ apply_handle_begin(StringInfo s)
 	LogicalRepBeginData begin_data;
 
 	logicalrep_read_begin(s, &begin_data);
-	set_apply_error_context_xact(begin_data.xid, begin_data.committime);
+	set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn,
+								 begin_data.committime);
 
 	remote_final_lsn = begin_data.final_lsn;
 
@@ -839,7 +845,8 @@ apply_handle_begin_prepare(StringInfo s)
 				 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
 
 	logicalrep_read_begin_prepare(s, &begin_data);
-	set_apply_error_context_xact(begin_data.xid, begin_data.prepare_time);
+	set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn,
+								 begin_data.prepare_time);
 
 	remote_final_lsn = begin_data.prepare_lsn;
 
@@ -938,7 +945,8 @@ apply_handle_commit_prepared(StringInfo s)
 	char		gid[GIDSIZE];
 
 	logicalrep_read_commit_prepared(s, &prepare_data);
-	set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_time);
+	set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn,
+								 prepare_data.commit_time);
 
 	/* Compute GID for two_phase transactions. */
 	TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
@@ -979,7 +987,8 @@ apply_handle_rollback_prepared(StringInfo s)
 	char		gid[GIDSIZE];
 
 	logicalrep_read_rollback_prepared(s, &rollback_data);
-	set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_time);
+	set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn,
+								 rollback_data.rollback_time);
 
 	/* Compute GID for two_phase transactions. */
 	TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
@@ -1044,7 +1053,8 @@ apply_handle_stream_prepare(StringInfo s)
 				 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
 
 	logicalrep_read_stream_prepare(s, &prepare_data);
-	set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_time);
+	set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn,
+								 prepare_data.prepare_time);
 
 	elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
 
@@ -1126,7 +1136,7 @@ apply_handle_stream_start(StringInfo s)
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg_internal("invalid transaction ID in streamed replication transaction")));
 
-	set_apply_error_context_xact(stream_xid, 0);
+	set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr, 0);
 
 	/*
 	 * Initialize the worker's stream_fileset if we haven't yet. This will be
@@ -1215,7 +1225,7 @@ apply_handle_stream_abort(StringInfo s)
 	 */
 	if (xid == subxid)
 	{
-		set_apply_error_context_xact(xid, 0);
+		set_apply_error_context_xact(xid, InvalidXLogRecPtr, 0);
 		stream_cleanup_files(MyLogicalRepWorker->subid, xid);
 	}
 	else
@@ -1241,7 +1251,7 @@ apply_handle_stream_abort(StringInfo s)
 		bool		found = false;
 		char		path[MAXPGPATH];
 
-		set_apply_error_context_xact(subxid, 0);
+		set_apply_error_context_xact(subxid, InvalidXLogRecPtr, 0);
 
 		subidx = -1;
 		begin_replication_step();
@@ -1426,7 +1436,7 @@ apply_handle_stream_commit(StringInfo s)
 				 errmsg_internal("STREAM COMMIT message without STREAM STOP")));
 
 	xid = logicalrep_read_stream_commit(s, &commit_data);
-	set_apply_error_context_xact(xid, commit_data.committime);
+	set_apply_error_context_xact(xid, commit_data.commit_lsn, commit_data.committime);
 
 	elog(DEBUG1, "received commit for streamed transaction %u", xid);
 
@@ -3501,6 +3511,17 @@ ApplyWorkerMain(Datum main_arg)
 		myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
 
 		pfree(syncslotname);
+
+		/*
+		 * Allocate the origin name in long-lived context for error context
+		 * message
+		 */
+		ReplicationOriginNameForTablesync(MySubscription->oid,
+										  MyLogicalRepWorker->relid,
+										  originname,
+										  sizeof(originname));
+		apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+																   originname);
 	}
 	else
 	{
@@ -3544,6 +3565,13 @@ ApplyWorkerMain(Datum main_arg)
 		 * does some initializations on the upstream so let's still call it.
 		 */
 		(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
+
+		/*
+		 * Allocate the origin name in long-lived context for error context
+		 * message
+		 */
+		apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+																   originname);
 	}
 
 	/*
@@ -3659,33 +3687,40 @@ apply_error_callback(void *arg)
 			errcontext("processing remote data during \"%s\"",
 					   logicalrep_message_type(errarg->command));
 		else
-			errcontext("processing remote data during \"%s\" in transaction %u at %s",
+			errcontext("processing remote data during \"%s\" in transaction %u committed at LSN %X/%X and timestamp %s from replication origin \"%s\"",
 					   logicalrep_message_type(errarg->command),
 					   errarg->remote_xid,
-					   (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)");
+					   LSN_FORMAT_ARGS(errarg->commit_lsn),
+					   (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)",
+					   errarg->origin_name);
 	}
 	else if (errarg->remote_attnum < 0)
-		errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u at %s",
+		errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u committed at LSN %X/%X and timestamp %s from replication origin \"%s\"",
 				   logicalrep_message_type(errarg->command),
 				   errarg->rel->remoterel.nspname,
 				   errarg->rel->remoterel.relname,
 				   errarg->remote_xid,
-				   (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)");
+				   LSN_FORMAT_ARGS(errarg->commit_lsn),
+				   (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)",
+				   errarg->origin_name);
 	else
-		errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u at %s",
+		errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u committed at LSN %X/%X and timestamp %s from replication origin \"%s\"",
 				   logicalrep_message_type(errarg->command),
 				   errarg->rel->remoterel.nspname,
 				   errarg->rel->remoterel.relname,
 				   errarg->rel->remoterel.attnames[errarg->remote_attnum],
 				   errarg->remote_xid,
-				   (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)");
+				   LSN_FORMAT_ARGS(errarg->commit_lsn),
+				   (errarg->ts != 0) ? timestamptz_to_str(errarg->ts) : "(not-set)",
+				   errarg->origin_name);
 }
 
 /* Set transaction information of apply error callback */
 static inline void
-set_apply_error_context_xact(TransactionId xid, TimestampTz ts)
+set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn, TimestampTz ts)
 {
 	apply_error_callback_arg.remote_xid = xid;
+	apply_error_callback_arg.commit_lsn = lsn;
 	apply_error_callback_arg.ts = ts;
 }
 
@@ -3696,5 +3731,5 @@ reset_apply_error_context_info(void)
 	apply_error_callback_arg.command = 0;
 	apply_error_callback_arg.rel = NULL;
 	apply_error_callback_arg.remote_attnum = -1;
-	set_apply_error_context_xact(InvalidTransactionId, 0);
+	set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr, 0);
 }
-- 
2.24.3 (Apple Git-128)

