From 45f456701eb015d9e34ab28d580124981f90e420 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 28 Jun 2021 13:21:58 +0900
Subject: [PATCH v1 2/3] Add errcontext to errors of the applying logical
 replication changes.

---
 src/backend/replication/logical/proto.c  |  41 ++++++++
 src/backend/replication/logical/worker.c | 119 +++++++++++++++++++----
 src/include/replication/logicalproto.h   |   1 +
 3 files changed, 143 insertions(+), 18 deletions(-)

diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 1cf59e0fb0..08e63c3a89 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -898,3 +898,44 @@ logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
 	*xid = pq_getmsgint(in, 4);
 	*subxid = pq_getmsgint(in, 4);
 }
+
+/*
+ * get string representing LogicalRepMsgType.
+ */
+const char *
+logicalrep_action(LogicalRepMsgType action)
+{
+	switch (action)
+	{
+		case LOGICAL_REP_MSG_BEGIN:
+			return "BEGIN";
+		case LOGICAL_REP_MSG_COMMIT:
+			return "COMMIT";
+		case LOGICAL_REP_MSG_INSERT:
+			return "INSERT";
+		case LOGICAL_REP_MSG_UPDATE:
+			return "UPDATE";
+		case LOGICAL_REP_MSG_DELETE:
+			return "DELETE";
+		case LOGICAL_REP_MSG_TRUNCATE:
+			return "TRUNCATE";
+		case LOGICAL_REP_MSG_RELATION:
+			return "RELATION";
+		case LOGICAL_REP_MSG_TYPE:
+			return "TYPE";
+		case LOGICAL_REP_MSG_ORIGIN:
+			return "ORIGIN";
+		case LOGICAL_REP_MSG_MESSAGE:
+			return "MESSAGE";
+		case LOGICAL_REP_MSG_STREAM_START:
+			return "STREAM START";
+		case LOGICAL_REP_MSG_STREAM_END:
+			return "STREAM END";
+		case LOGICAL_REP_MSG_STREAM_ABORT:
+			return "STREAM ABORT";
+		case LOGICAL_REP_MSG_STREAM_COMMIT:
+			return "STREAM COMMIT";
+		default:
+			return "UNKNOWN";
+	}
+}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b90a8df166..b65f72c9a4 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -149,6 +149,21 @@ typedef struct ApplyExecutionData
 	PartitionTupleRouting *proute;	/* partition routing info */
 } ApplyExecutionData;
 
+typedef struct ApplyErrCallbackArg
+{
+	LogicalRepMsgType action;	/* 0 if invalid */
+	LogicalRepRelMapEntry *rel;
+	TransactionId	remote_xid;
+	TimestampTz	committs;
+} ApplyErrCallbackArg;
+static ApplyErrCallbackArg apply_error_callback_arg =
+{
+	.action = 0,
+	.rel = NULL,
+	.remote_xid = InvalidTransactionId,
+	.committs = 0,
+};
+
 /*
  * Stream xid hash entry. Whenever we see a new xid we create this entry in the
  * xidhash and along with it create the streaming file and store the fileset handle.
@@ -276,6 +291,8 @@ static bool start_skipping_changes(TransactionId xid);
 static bool stop_skipping_changes(bool reset_xid,
 								  LogicalRepCommitData *commit_data);
 
+static void apply_error_callback(void *arg);
+static void reset_apply_error_context_info(void);
 
 /*
  * Should this worker apply changes for given relation.
@@ -788,6 +805,8 @@ apply_handle_begin(StringInfo s)
 	LogicalRepBeginData begin_data;
 
 	logicalrep_read_begin(s, &begin_data);
+	apply_error_callback_arg.remote_xid = begin_data.xid;
+	apply_error_callback_arg.committs = begin_data.committime;
 
 	remote_final_lsn = begin_data.final_lsn;
 
@@ -828,6 +847,7 @@ apply_handle_commit(StringInfo s)
 	process_syncing_tables(commit_data.end_lsn);
 
 	pgstat_report_activity(STATE_IDLE, NULL);
+	reset_apply_error_context_info();
 }
 
 /*
@@ -876,6 +896,7 @@ apply_handle_stream_start(StringInfo s)
 	 * streaming data and subxact info.
 	 */
 	begin_replication_step();
+	apply_error_callback_arg.remote_xid = stream_xid;
 
 	/* notify handle methods we're processing a remote transaction */
 	in_streamed_transaction = true;
@@ -941,6 +962,7 @@ apply_handle_stream_stop(StringInfo s)
 	MemoryContextReset(LogicalStreamingContext);
 
 	pgstat_report_activity(STATE_IDLE, NULL);
+	reset_apply_error_context_info();
 }
 
 /*
@@ -964,7 +986,10 @@ apply_handle_stream_abort(StringInfo s)
 	 * just delete the files with serialized info.
 	 */
 	if (xid == subxid)
+	{
+		apply_error_callback_arg.remote_xid = xid;
 		stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+	}
 	else
 	{
 		/*
@@ -989,6 +1014,7 @@ apply_handle_stream_abort(StringInfo s)
 		char		path[MAXPGPATH];
 		StreamXidHash *ent;
 
+		apply_error_callback_arg.remote_xid = subxid;
 		subidx = -1;
 		begin_replication_step();
 		subxact_info_read(MyLogicalRepWorker->subid, xid);
@@ -1013,6 +1039,7 @@ apply_handle_stream_abort(StringInfo s)
 			cleanup_subxact_info();
 			end_replication_step();
 			CommitTransactionCommand();
+			reset_apply_error_context_info();
 			return;
 		}
 
@@ -1047,6 +1074,8 @@ apply_handle_stream_abort(StringInfo s)
 
 	/* Stop the skipping transaction if enabled */
 	stop_skipping_changes(true, NULL);
+
+	reset_apply_error_context_info();
 }
 
 /*
@@ -1064,6 +1093,8 @@ apply_handle_stream_commit(StringInfo s)
 				 errmsg_internal("STREAM COMMIT message without STREAM STOP")));
 
 	xid = logicalrep_read_stream_commit(s, &commit_data);
+	apply_error_callback_arg.remote_xid = xid;
+	apply_error_callback_arg.committs = commit_data.committime;
 
 	remote_final_lsn = commit_data.commit_lsn;
 
@@ -1083,6 +1114,8 @@ apply_handle_stream_commit(StringInfo s)
 	process_syncing_tables(commit_data.end_lsn);
 
 	pgstat_report_activity(STATE_IDLE, NULL);
+
+	reset_apply_error_context_info();
 }
 
 /*
@@ -1330,6 +1363,8 @@ apply_handle_insert(StringInfo s)
 		return;
 	}
 
+	apply_error_callback_arg.rel = rel;
+
 	/* Initialize the executor state. */
 	edata = create_edata_for_relation(rel);
 	estate = edata->estate;
@@ -1451,6 +1486,8 @@ apply_handle_update(StringInfo s)
 		return;
 	}
 
+	apply_error_callback_arg.rel = rel;
+
 	/* Check if we can do the update. */
 	check_relation_updatable(rel);
 
@@ -1607,6 +1644,8 @@ apply_handle_delete(StringInfo s)
 		return;
 	}
 
+	apply_error_callback_arg.rel = rel;
+
 	/* Check if we can do the delete. */
 	check_relation_updatable(rel);
 
@@ -2066,6 +2105,7 @@ static void
 apply_dispatch(StringInfo s)
 {
 	LogicalRepMsgType action = pq_getmsgbyte(s);
+	ErrorContextCallback errcallback;
 
 	/*
 	 * Skip all data-modification changes if we're skipping changes of this
@@ -2078,43 +2118,49 @@ apply_dispatch(StringInfo s)
 		 action == LOGICAL_REP_MSG_TRUNCATE))
 		return;
 
+	/* Push apply error context callback */
+	apply_error_callback_arg.action = action;
+	errcallback.callback  = apply_error_callback;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
 	switch (action)
 	{
 		case LOGICAL_REP_MSG_BEGIN:
 			apply_handle_begin(s);
-			return;
+			break;
 
 		case LOGICAL_REP_MSG_COMMIT:
 			apply_handle_commit(s);
-			return;
+			break;
 
 		case LOGICAL_REP_MSG_INSERT:
 			apply_handle_insert(s);
-			return;
+			break;
 
 		case LOGICAL_REP_MSG_UPDATE:
 			apply_handle_update(s);
-			return;
+			break;
 
 		case LOGICAL_REP_MSG_DELETE:
 			apply_handle_delete(s);
-			return;
+			break;
 
 		case LOGICAL_REP_MSG_TRUNCATE:
 			apply_handle_truncate(s);
-			return;
+			break;
 
 		case LOGICAL_REP_MSG_RELATION:
 			apply_handle_relation(s);
-			return;
+			break;
 
 		case LOGICAL_REP_MSG_TYPE:
 			apply_handle_type(s);
-			return;
+			break;
 
 		case LOGICAL_REP_MSG_ORIGIN:
 			apply_handle_origin(s);
-			return;
+			break;
 
 		case LOGICAL_REP_MSG_MESSAGE:
 
@@ -2123,29 +2169,32 @@ apply_dispatch(StringInfo s)
 			 * Although, it could be used by other applications that use this
 			 * output plugin.
 			 */
-			return;
+			break;
 
 		case LOGICAL_REP_MSG_STREAM_START:
 			apply_handle_stream_start(s);
-			return;
+			break;
 
 		case LOGICAL_REP_MSG_STREAM_END:
 			apply_handle_stream_stop(s);
-			return;
+			break;
 
 		case LOGICAL_REP_MSG_STREAM_ABORT:
 			apply_handle_stream_abort(s);
-			return;
+			break;
 
 		case LOGICAL_REP_MSG_STREAM_COMMIT:
 			apply_handle_stream_commit(s);
-			return;
+			break;
+
+		default:
+			ereport(ERROR,
+					(errcode(ERRCODE_PROTOCOL_VIOLATION),
+					 errmsg("invalid logical replication message type \"%c\"", action)));
 	}
 
-	ereport(ERROR,
-			(errcode(ERRCODE_PROTOCOL_VIOLATION),
-			 errmsg_internal("invalid logical replication message type \"%c\"",
-							 action)));
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
 }
 
 /*
@@ -3412,3 +3461,37 @@ stop_skipping_changes(bool reset_xid, LogicalRepCommitData *commit_data)
 
 	return true;
 }
+
+static void
+apply_error_callback(void *arg)
+{
+	StringInfoData buf;
+
+	initStringInfo(&buf);
+	appendStringInfo(&buf, _("during apply of \"%s\""),
+					 logicalrep_action(apply_error_callback_arg.action));
+
+
+	if (apply_error_callback_arg.rel)
+		appendStringInfo(&buf, _(" for relation \"%s.%s\""),
+						 apply_error_callback_arg.rel->remoterel.nspname,
+						 apply_error_callback_arg.rel->remoterel.relname);
+
+	if (TransactionIdIsNormal(apply_error_callback_arg.remote_xid))
+		appendStringInfo(&buf, _(" in transaction with xid %u committs %s"),
+						 apply_error_callback_arg.remote_xid,
+						 apply_error_callback_arg.committs == 0
+						 ? "(unset)"
+						 : timestamptz_to_str(apply_error_callback_arg.committs));
+
+	errcontext("%s", buf.data);
+}
+
+static void
+reset_apply_error_context_info(void)
+{
+	apply_error_callback_arg.action = -1;
+	apply_error_callback_arg.rel = NULL;
+	apply_error_callback_arg.remote_xid = InvalidTransactionId;
+	apply_error_callback_arg.committs = 0;
+}
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 55b90c03ea..a1bd3e2d9a 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -173,5 +173,6 @@ extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
 										  TransactionId subxid);
 extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
 										 TransactionId *subxid);
+extern const char *logicalrep_action(LogicalRepMsgType action);
 
 #endif							/* LOGICAL_PROTO_H */
-- 
2.24.3 (Apple Git-128)

