From dada5379d323dc91aaf3a27fb64012dc41d5962c Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumarb@google.com>
Date: Sun, 14 Sep 2025 08:38:22 +0530
Subject: [PATCH v1 1/2] Add configurable conflict log history table for
 Logical Replication

This patch adds a feature to provide a structured, queryable record of all
logical replication conflicts. The current approach of logging conflicts as
plain text in the server logs makes it difficult to query, analyze, and
use for external monitoring and automation.

This patch addresses these limitations by introducing a configurable
conflict_log_table option in the CREATE SUBSCRIPTION command. Key design
decisions include:

User-Defined Table: The conflict log is stored in a user-managed table
rather than a system catalog.

Structured Data: Conflict details, including the original and remote tuples,
are stored in JSON columns, providing a flexible format to accommodate different
table schemas.

Comprehensive Information: The log table captures essential attributes such as
local and remote transaction IDs, LSNs, commit timestamps, and conflict type,
providing a complete record for post-mortem analysis.

This feature will make logical replication conflicts easier to monitor and manage,
significantly improving the overall resilience and operability of replication setups.

Open Issues:
------------
1) Need to control publishing this table when publish ALL TABLES, or maybe we can leave it to
user to EXLUDE this.
2) Currently subscription creation owner will have full control on this table as we are
creating the table during create subscription, so they can alter or drop the table which can
cause error while inserting into the conflict table.
3) If we can create a buit-in type can create conflict table of that type then we can control
altering the table.  But currently we are still exploring how to create a built-in type without
creating a table, maybe add create type command in some scripts which get executed during initdb.
---
 src/backend/commands/subscriptioncmds.c    | 248 ++++++++++++++++++++-
 src/backend/replication/logical/conflict.c | 188 ++++++++++++++++
 src/backend/replication/logical/worker.c   |  10 +-
 src/backend/utils/cache/lsyscache.c        |  45 ++++
 src/include/catalog/pg_subscription.h      |   5 +
 src/include/replication/conflict.h         |   2 +
 src/include/replication/worker_internal.h  |   4 +
 src/include/utils/lsyscache.h              |   1 +
 8 files changed, 499 insertions(+), 4 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 750d262fcca..c2f2fdabadb 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
 #include "commands/event_trigger.h"
 #include "commands/subscriptioncmds.h"
 #include "executor/executor.h"
+#include "executor/spi.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
@@ -47,10 +48,12 @@
 #include "storage/lmgr.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
+#include "utils/fmgroids.h"
 #include "utils/guc.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/pg_lsn.h"
+#include "utils/regproc.h"
 #include "utils/syscache.h"
 
 /*
@@ -75,6 +78,7 @@
 #define SUBOPT_MAX_RETENTION_DURATION	0x00008000
 #define SUBOPT_LSN					0x00010000
 #define SUBOPT_ORIGIN				0x00020000
+#define SUBOPT_CONFLICT_TABLE		0x00030000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -103,6 +107,7 @@ typedef struct SubOpts
 	bool		retaindeadtuples;
 	int32		maxretention;
 	char	   *origin;
+	char	   *conflicttable;
 	XLogRecPtr	lsn;
 } SubOpts;
 
@@ -118,7 +123,7 @@ static List *merge_publications(List *oldpublist, List *newpublist, bool addpub,
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
 static void CheckAlterSubOption(Subscription *sub, const char *option,
 								bool slot_needs_update, bool isTopLevel);
-
+static void ValidateConflictHistoryTable(Oid namespaceId, char *conflictrel);
 
 /*
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -174,6 +179,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->maxretention = 0;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
+	if (IsSet(supported_opts, SUBOPT_CONFLICT_TABLE))
+		opts->conflicttable = NULL;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -385,6 +392,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_LSN;
 			opts->lsn = lsn;
 		}
+		else if (IsSet(supported_opts, SUBOPT_CONFLICT_TABLE) &&
+				 strcmp(defel->defname, "conflict_log_table") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_CONFLICT_TABLE))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_CONFLICT_TABLE;
+			opts->conflicttable = defGetString(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -560,6 +576,186 @@ publicationListToArray(List *publist)
 	return PointerGetDatum(arr);
 }
 
+/*
+ * ValidateConflictHistoryTable - Validate conflict history table
+ *
+ * Validate whether the input 'conflictrel' is suitable for considering as
+ * conflict log history table.
+ */
+static void
+ValidateConflictHistoryTable(Oid namespaceId, char *conflictrel)
+{
+	Datum		value;
+	Relation	pg_attribute;
+	Relation	rel;
+	Form_pg_attribute attForm;
+	ScanKeyData scankey;
+	SysScanDesc scan;
+	HeapTuple	atup;
+	Oid			relid;
+	int			attcnt = 0;
+	bool		tbl_ok = true;
+
+	relid = get_relname_relid(conflictrel, namespaceId);
+	if (!OidIsValid(relid))
+		ereport(ERROR,
+				errcode(ERRCODE_UNDEFINED_TABLE),
+				errmsg("relation \"%s.%s\" does not exist",
+						get_namespace_name(namespaceId), conflictrel));
+
+	/* log history table must be a regular realtion */
+	if (get_rel_relkind(relid) != RELKIND_RELATION)
+		ereport(ERROR,
+				errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				errmsg("cannot use relation \"%s.%s\" for storing conflict log",
+						get_namespace_name(namespaceId), conflictrel),
+				errdetail_relkind_not_supported(get_rel_relkind(relid)));
+
+	/*
+	 * We might insert tuples into the conflict log history table later, so we
+	 * first need to check its lock status. If it is already heavily locked,
+	 * our subsequent COPY operation may stuck. Instead of letting COPY FROM
+	 * hang, report an error indicating that the error-saving table is under
+	 * heavy lock.
+	 */
+	if (!ConditionalLockRelationOid(relid, RowExclusiveLock))
+		ereport(ERROR,
+				errcode(ERRCODE_OBJECT_IN_USE),
+				errmsg("can not use table \"%s.%s\" for storing conflict log because it was being locked",
+						get_namespace_name(namespaceId), conflictrel));
+
+	rel = table_open(relid, RowExclusiveLock);
+
+	/* The user should have INSERT privilege on conflict history table */
+	value = DirectFunctionCall3(has_table_privilege_id_id,
+								GetUserId(),
+								ObjectIdGetDatum(relid),
+								CStringGetTextDatum("INSERT"));
+	if (!DatumGetBool(value))
+		ereport(ERROR,
+				errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+				errmsg("permission denied to set table \"%s\" as conflict log history table",
+						RelationGetRelationName(rel)),
+				errhint("Ensure current user have enough privilege on \"%s.%s\" for conflict log history table",
+						get_namespace_name(namespaceId), conflictrel));
+
+	/*
+	 * Check whether the table definition (conflictrel) including its column
+	 * names, data types, and column ordering meets the requirements for conflict
+	 * log history table
+	 */
+	pg_attribute = table_open(AttributeRelationId, AccessShareLock);
+	ScanKeyInit(&scankey,
+				Anum_pg_attribute_attrelid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(relid));
+
+	scan = systable_beginscan(pg_attribute, AttributeRelidNumIndexId, true,
+								SnapshotSelf, 1, &scankey);
+	while (HeapTupleIsValid(atup = systable_getnext(scan)) && tbl_ok)
+	{
+		attForm = (Form_pg_attribute) GETSTRUCT(atup);
+
+		if (attForm->attnum < 1 || attForm->attisdropped)
+			continue;
+
+		attcnt++;
+		switch (attForm->attnum)
+		{
+			case 1:
+				if (attForm->atttypid != OIDOID ||
+					strcmp(NameStr(attForm->attname), "relid") != 0)
+					tbl_ok = false;
+				break;
+			case 2:
+				if (attForm->atttypid != XIDOID ||
+					strcmp(NameStr(attForm->attname), "local_xid") != 0)
+					tbl_ok = false;
+				break;
+			case 3:
+				if (attForm->atttypid != XIDOID ||
+					strcmp(NameStr(attForm->attname), "remote_xid") != 0)
+					tbl_ok = false;
+				break;
+			case 4:
+				if (attForm->atttypid != LSNOID ||
+					strcmp(NameStr(attForm->attname), "local_lsn") != 0)
+					tbl_ok = false;
+				break;
+			case 5:
+				if (attForm->atttypid != LSNOID ||
+					strcmp(NameStr(attForm->attname), "remote_commit_lsn") != 0)
+					tbl_ok = false;
+				break;
+			case 6:
+				if (attForm->atttypid != TIMESTAMPTZOID ||
+					strcmp(NameStr(attForm->attname), "local_commit_ts") != 0)
+					tbl_ok = false;
+				break;
+			case 7:
+				if (attForm->atttypid != TIMESTAMPTZOID ||
+					strcmp(NameStr(attForm->attname), "remote_commit_ts") != 0)
+					tbl_ok = false;
+				break;
+			case 8:
+				if (attForm->atttypid != TEXTOID ||
+					strcmp(NameStr(attForm->attname), "table_schema") != 0)
+					tbl_ok = false;
+				break;
+			case 9:
+				if (attForm->atttypid != TEXTOID ||
+					strcmp(NameStr(attForm->attname), "table_name") != 0)
+					tbl_ok = false;
+				break;
+			case 10:
+				if (attForm->atttypid != TEXTOID ||
+					strcmp(NameStr(attForm->attname), "conflict_type") != 0)
+					tbl_ok = false;
+				break;
+			case 11:
+				if (attForm->atttypid != TEXTOID ||
+					strcmp(NameStr(attForm->attname), "local_origin") != 0)
+					tbl_ok = false;
+				break;
+			case 12:
+				if (attForm->atttypid != TEXTOID ||
+					strcmp(NameStr(attForm->attname), "remote_origin") != 0)
+					tbl_ok = false;
+				break;
+			case 13:
+				if (attForm->atttypid != JSONOID ||
+					strcmp(NameStr(attForm->attname), "key_tuple") != 0)
+					tbl_ok = false;
+				break;
+			case 14:
+				if (attForm->atttypid != JSONOID ||
+					strcmp(NameStr(attForm->attname), "local_tuple") != 0)
+					tbl_ok = false;
+				break;
+			case 15:
+				if (attForm->atttypid != JSONOID ||
+					strcmp(NameStr(attForm->attname), "remote_tuple") != 0)
+					tbl_ok = false;
+				break;
+			default:
+				tbl_ok = false;
+				break;
+		}
+	}
+	systable_endscan(scan);
+	table_close(pg_attribute, AccessShareLock);
+
+	if (attcnt != MAX_CONFLICT_ATTR_NUM || !tbl_ok)
+		ereport(ERROR,
+				errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				errmsg("table \"%s.%s\" cannot be used for storing conflict log",
+						get_namespace_name(namespaceId), conflictrel),
+				errdetail("Table \"%s.%s\" data definition is not suitable for storing conflict log",
+						  get_namespace_name(namespaceId), conflictrel));
+
+	table_close(rel, RowExclusiveLock);
+}
+
 /*
  * Create new subscription.
  */
@@ -580,6 +776,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	bits32		supported_opts;
 	SubOpts		opts = {0};
 	AclResult	aclresult;
+	Oid			conflict_table_nspid;
+	char	   *conflict_table;
 
 	/*
 	 * Parse and check options.
@@ -593,7 +791,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
 					  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
 					  SUBOPT_RETAIN_DEAD_TUPLES |
-					  SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN);
+					  SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN |
+					  SUBOPT_CONFLICT_TABLE);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -728,6 +927,25 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_suborigin - 1] =
 		CStringGetTextDatum(opts.origin);
 
+	/*
+	 * If a conflict log history table name is specified, parse the schema and
+	 * table name from the string. Store the namespace OID and the table name in
+	 * the pg_subscription catalog tuple.
+	 */
+	if (opts.conflicttable)
+	{
+		List   *names = stringToQualifiedNameList(opts.conflicttable, NULL);
+
+		conflict_table_nspid =
+				QualifiedNameGetCreationNamespace(names, &conflict_table);
+		values[Anum_pg_subscription_subconflictnspid - 1] =
+					ObjectIdGetDatum(conflict_table_nspid);
+		values[Anum_pg_subscription_subconflicttable - 1] =
+					CStringGetTextDatum(conflict_table);
+	}
+	else
+		nulls[Anum_pg_subscription_subconflicttable - 1] = true;
+
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
 	/* Insert tuple into catalog. */
@@ -739,6 +957,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
 	replorigin_create(originname);
 
+	/* If conflict log history table name is given than create the table. */
+	if (opts.conflicttable)
+		ValidateConflictHistoryTable(conflict_table_nspid,
+									 conflict_table);
 	/*
 	 * Connect to remote side to execute requested commands and fetch table
 	 * info.
@@ -1272,7 +1494,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 								  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
 								  SUBOPT_RETAIN_DEAD_TUPLES |
 								  SUBOPT_MAX_RETENTION_DURATION |
-								  SUBOPT_ORIGIN);
+								  SUBOPT_ORIGIN |
+								  SUBOPT_CONFLICT_TABLE);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -1527,6 +1750,25 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					origin = opts.origin;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_CONFLICT_TABLE))
+				{
+					Oid		nspid;
+					char   *relname = NULL;
+					List   *names =
+						stringToQualifiedNameList(opts.conflicttable, NULL);
+
+					nspid = QualifiedNameGetCreationNamespace(names, &relname);
+					values[Anum_pg_subscription_subconflictnspid - 1] =
+								ObjectIdGetDatum(nspid);
+					values[Anum_pg_subscription_subconflicttable - 1] =
+						CStringGetTextDatum(relname);
+
+					replaces[Anum_pg_subscription_subconflictnspid - 1] = true;
+					replaces[Anum_pg_subscription_subconflicttable - 1] = true;
+
+					ValidateConflictHistoryTable(nspid, relname);
+				}
+
 				update_tuple = true;
 				break;
 			}
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 16695592265..89ff4c33f11 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -15,13 +15,23 @@
 #include "postgres.h"
 
 #include "access/commit_ts.h"
+#include "access/heapam.h"
 #include "access/tableam.h"
+#include "access/table.h"
+#include "catalog/indexing.h"
+#include "catalog/namespace.h"
+#include "catalog/pg_namespace_d.h"
+#include "catalog/pg_type.h"
 #include "executor/executor.h"
+#include "executor/spi.h"
 #include "pgstat.h"
 #include "replication/conflict.h"
 #include "replication/worker_internal.h"
 #include "storage/lmgr.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
+#include "utils/pg_lsn.h"
 
 static const char *const ConflictTypeNames[] = {
 	[CT_INSERT_EXISTS] = "insert_exists",
@@ -85,6 +95,174 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
 	return TransactionIdGetCommitTsData(*xmin, localts, localorigin);
 }
 
+/*
+ * Helper function to convert a TupleTableSlot to Jsonb
+ *
+ * This would be a new internal helper function for logical replication
+ * Needs to handle various data types and potentially TOASTed data
+ */
+static Datum
+TupleTableSlotToJsonDatum(TupleTableSlot *slot)
+{
+	HeapTuple	tuple = ExecCopySlotHeapTuple(slot);
+	Datum		datum = heap_copy_tuple_as_datum(tuple, slot->tts_tupleDescriptor);
+	Datum		json;
+
+	if (TupIsNull(slot))
+		return 0;
+
+	json = DirectFunctionCall1(row_to_json, datum);
+	heap_freetuple(tuple);
+
+	return json;
+}
+
+/*
+ * InsertIntoLogHistoryTable
+ *
+ * Logs details about a logical replication conflict to a conflict history
+ * table.
+ */
+static void
+InsertIntoLogHistoryTable(Relation rel, TransactionId local_xid,
+						  TimestampTz local_ts, ConflictType conflict_type,
+						  RepOriginId origin_id, TupleTableSlot *searchslot,
+						  TupleTableSlot *localslot,
+						  TupleTableSlot *remoteslot)
+{
+	Datum		values[MAX_CONFLICT_ATTR_NUM];
+	char		nulls[MAX_CONFLICT_ATTR_NUM];
+	Oid			argtypes[MAX_CONFLICT_ATTR_NUM];
+	int			attno;
+	char	   *origin = NULL;
+	char	   *conflict_rel;
+	char	   *remote_origin = NULL;
+	XLogRecPtr		local_lsn = 0;
+	StringInfoData 	querybuf;
+
+
+	/* If conflict history is not enabled for the subscription just return. */
+	conflict_rel = get_subscription_conflictrel(MyLogicalRepWorker->subid);
+	if (conflict_rel == NULL)
+		return;
+
+	/* Initialize values and nulls arrays */
+	memset(values, 0, sizeof(Datum) * MAX_CONFLICT_ATTR_NUM);
+	memset(nulls, ' ', sizeof(char) * MAX_CONFLICT_ATTR_NUM);
+
+	/* Populate the values and nulls arrays */
+	attno = 0;
+	argtypes[attno] = OIDOID;
+	values[attno] = ObjectIdGetDatum(RelationGetRelid(rel));
+	attno++;
+
+	argtypes[attno] = XIDOID;
+	values[attno] = TransactionIdGetDatum(local_xid);
+	attno++;
+
+	argtypes[attno] = XIDOID;
+	values[attno] = TransactionIdGetDatum(remote_xid);
+	attno++;
+
+	argtypes[attno] = LSNOID;
+	values[attno] = LSNGetDatum(local_lsn);
+	attno++;
+
+	argtypes[attno] = LSNOID;
+	values[attno] = LSNGetDatum(remote_final_lsn);
+	attno++;
+
+	argtypes[attno] = TIMESTAMPTZOID;
+	values[attno] = TimestampTzGetDatum(local_ts);
+	attno++;
+
+	argtypes[attno] = TIMESTAMPTZOID;
+	values[attno] = TimestampTzGetDatum(remote_commit_ts);
+	attno++;
+
+	argtypes[attno] = TEXTOID;
+	values[attno] =
+			CStringGetTextDatum(get_namespace_name(RelationGetNamespace(rel)));
+	attno++;
+
+	argtypes[attno] = TEXTOID;
+	values[attno] = CStringGetTextDatum(RelationGetRelationName(rel));
+	attno++;
+
+	argtypes[attno] = TEXTOID;
+	values[attno] = CStringGetTextDatum(ConflictTypeNames[conflict_type]);
+	attno++;
+
+	if (origin_id != InvalidRepOriginId)
+		replorigin_by_oid(origin_id, true, &origin);
+
+	argtypes[attno] = TEXTOID;
+	if (origin != NULL)
+		values[attno] = CStringGetTextDatum(origin);
+	else
+		nulls[attno] = 'n';
+	attno++;
+
+	if (replorigin_session_origin != InvalidRepOriginId)
+		replorigin_by_oid(replorigin_session_origin, true, &remote_origin);
+
+	argtypes[attno] = TEXTOID;
+	if (remote_origin != NULL)
+		values[attno] = CStringGetTextDatum(remote_origin);
+	else
+		nulls[attno] = 'n';
+	attno++;
+
+	argtypes[attno] = JSONOID;
+	if (searchslot != NULL)
+		values[attno] = TupleTableSlotToJsonDatum(searchslot);
+	else
+		nulls[attno] = 'n';
+	attno++;
+
+	argtypes[attno] = JSONOID;
+	if (localslot != NULL)
+		values[attno] = TupleTableSlotToJsonDatum(localslot);
+	else
+		nulls[attno] = 'n';
+	attno++;
+
+	argtypes[attno] = JSONOID;
+	if (remoteslot != NULL)
+		values[attno] = TupleTableSlotToJsonDatum(remoteslot);
+	else
+		nulls[attno] = 'n';
+
+	/* Prepare a insert query. */
+	initStringInfo(&querybuf);
+	appendStringInfo(&querybuf,
+					 "INSERT INTO %s VALUES ("
+					 "$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13,"
+					 "$14, $15)",
+					 conflict_rel);
+
+	if (SPI_connect() != SPI_OK_CONNECT)
+		elog(ERROR, "SPI_connect failed");
+
+	/*
+	 * XXX The following section uses SPI to execute the INSERT. If the\
+	 * insertion fails, we currently throw an ERROR. A future improvement might
+	 * be to log a WARNING instead, to avoid aborting the entire replication
+	 * worker on a logging failure.
+	 */
+	if (SPI_execute_with_args(querybuf.data,
+							  MAX_CONFLICT_ATTR_NUM, argtypes,
+							  values, nulls,
+							  false, 0) != SPI_OK_INSERT)
+		elog(ERROR, "SPI_execute_with_args failed: %s", querybuf.data);
+
+	if (SPI_finish() != SPI_OK_FINISH)
+		elog(ERROR, "SPI_finish failed");
+
+	pfree(querybuf.data);
+	pfree(conflict_rel);
+}
+
 /*
  * This function is used to report a conflict while applying replication
  * changes.
@@ -112,6 +290,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 
 	/* Form errdetail message by combining conflicting tuples information. */
 	foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
+	{
 		errdetail_apply_conflict(estate, relinfo, type, searchslot,
 								 conflicttuple->slot, remoteslot,
 								 conflicttuple->indexoid,
@@ -120,6 +299,15 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 								 conflicttuple->ts,
 								 &err_detail);
 
+		/* Insert conflict details to log history table. */
+		InsertIntoLogHistoryTable(relinfo->ri_RelationDesc,
+								  conflicttuple->xmin,
+								  conflicttuple->ts, type,
+								  conflicttuple->origin,
+								  searchslot, conflicttuple->slot,
+								  remoteslot);
+	}
+
 	pgstat_report_subscription_conflict(MySubscription->oid, type);
 
 	ereport(elevel,
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index ee6ac22329f..3d119a255fd 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -472,7 +472,9 @@ static bool MySubscriptionValid = false;
 static List *on_commit_wakeup_workers_subids = NIL;
 
 bool		in_remote_transaction = false;
-static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+TransactionId	remote_xid = InvalidTransactionId;
+TimestampTz	remote_commit_ts = 0;
 
 /* fields valid only when processing streamed transaction */
 static bool in_streamed_transaction = false;
@@ -1199,6 +1201,8 @@ apply_handle_begin(StringInfo s)
 	set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
 
 	remote_final_lsn = begin_data.final_lsn;
+	remote_commit_ts = begin_data.committime;
+	remote_xid = begin_data.xid;
 
 	maybe_start_skipping_changes(begin_data.final_lsn);
 
@@ -1710,6 +1714,10 @@ apply_handle_stream_start(StringInfo s)
 	/* extract XID of the top-level transaction */
 	stream_xid = logicalrep_read_stream_start(s, &first_segment);
 
+	remote_xid = stream_xid;
+	remote_final_lsn = InvalidXLogRecPtr;
+	remote_commit_ts = 0;
+
 	if (!TransactionIdIsValid(stream_xid))
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c
index fa7cd7e06a7..fcc6940bab0 100644
--- a/src/backend/utils/cache/lsyscache.c
+++ b/src/backend/utils/cache/lsyscache.c
@@ -3881,3 +3881,48 @@ get_subscription_name(Oid subid, bool missing_ok)
 
 	return subname;
 }
+
+/*
+ * get_subscription_conflictrel
+ *
+ * Returns the schema-qualified name of the conflict history table.
+ * The returned string is palloc'd and must be freed by the caller.
+ *
+ */
+char *
+get_subscription_conflictrel(Oid subid)
+{
+	HeapTuple	tup;
+	Datum		datum;
+	bool		isnull;
+	StringInfoData 	conflictrel;
+	Form_pg_subscription subform;
+
+	tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
+
+	if (!HeapTupleIsValid(tup))
+		return NULL;
+
+	subform = (Form_pg_subscription) GETSTRUCT(tup);
+
+	/* Get conflict table name */
+	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
+							tup,
+							Anum_pg_subscription_subconflicttable,
+							&isnull);
+
+	if (!isnull)
+	{
+		initStringInfo(&conflictrel);
+		appendStringInfo(&conflictrel, "%s.%s",
+						 get_namespace_name(subform->subconflictnspid),
+						 TextDatumGetCString(datum));
+		ReleaseSysCache(tup);
+		return conflictrel.data;
+	}
+	else
+	{
+		ReleaseSysCache(tup);
+		return NULL;
+	}
+}
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 55cb9b1eefa..ec31e2b1d56 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -80,6 +80,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	bool		subretaindeadtuples;	/* True if dead tuples useful for
 										 * conflict detection are retained */
+	Oid			subconflictnspid;	/* Namespace Oid in which the conflict history
+									 * table is created. */
 
 	int32		submaxretention;	/* The maximum duration (in milliseconds)
 									 * for which information useful for
@@ -105,6 +107,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	/* Only publish data originating from the specified origin */
 	text		suborigin BKI_DEFAULT(LOGICALREP_ORIGIN_ANY);
+
+	/* conflict log history table name if valid */
+	text		subconflicttable;
 #endif
 } FormData_pg_subscription;
 
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index e516caa5c73..4618e3c102c 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -55,6 +55,7 @@ typedef enum
 } ConflictType;
 
 #define CONFLICT_NUM_TYPES (CT_MULTIPLE_UNIQUE_CONFLICTS + 1)
+#define	MAX_CONFLICT_ATTR_NUM	15
 
 /*
  * Information for the existing local row that caused the conflict.
@@ -82,4 +83,5 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
 								TupleTableSlot *remoteslot,
 								List *conflicttuples);
 extern void InitConflictIndexes(ResultRelInfo *relInfo);
+
 #endif
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index de003802612..84bd6383615 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -251,6 +251,10 @@ extern PGDLLIMPORT bool in_remote_transaction;
 
 extern PGDLLIMPORT bool InitializingApplyWorker;
 
+extern XLogRecPtr remote_final_lsn;
+extern TimestampTz remote_commit_ts;
+extern TransactionId	remote_xid;
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 												bool only_running);
diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h
index c65cee4f24c..c4dc422ce2a 100644
--- a/src/include/utils/lsyscache.h
+++ b/src/include/utils/lsyscache.h
@@ -210,6 +210,7 @@ extern Oid	get_publication_oid(const char *pubname, bool missing_ok);
 extern char *get_publication_name(Oid pubid, bool missing_ok);
 extern Oid	get_subscription_oid(const char *subname, bool missing_ok);
 extern char *get_subscription_name(Oid subid, bool missing_ok);
+extern char *get_subscription_conflictrel(Oid subid);
 
 #define type_is_array(typid)  (get_element_type(typid) != InvalidOid)
 /* type_is_array_domain accepts both plain arrays and domains over arrays */
-- 
2.49.0

