From 5305d9295ffc404fa593bb318dea66a0f8a2f607 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumarb@google.com>
Date: Wed, 12 Nov 2025 10:43:19 +0530
Subject: [PATCH v3 1/2] Add configurable conflict log history table for 
 Logical Replication

This patch adds a feature to provide a structured, queryable record of all
logical replication conflicts. The current approach of logging conflicts as
plain text in the server logs makes it difficult to query, analyze, and
use for external monitoring and automation.

This patch addresses these limitations by introducing a configurable
conflict_log_table option in the CREATE SUBSCRIPTION command. Key design
decisions include:

User-Defined Table: The conflict log is stored in a user-managed table
rather than a system catalog.

Structured Data: Conflict details, including the original and remote tuples,
are stored in JSON columns, providing a flexible format to accommodate different
table schemas.

Comprehensive Information: The log table captures essential attributes such as
local and remote transaction IDs, LSNs, commit timestamps, and conflict type,
providing a complete record for post-mortem analysis.

This feature will make logical replication conflicts easier to monitor and manage,
significantly improving the overall resilience and operability of replication setups.
---
 src/backend/commands/subscriptioncmds.c    | 177 ++++++++++++++++++++-
 src/backend/replication/logical/conflict.c | 166 +++++++++++++++++++
 src/backend/replication/logical/worker.c   |  10 +-
 src/backend/utils/cache/lsyscache.c        |  40 +++++
 src/include/catalog/pg_subscription.h      |   5 +
 src/include/replication/conflict.h         |   2 +
 src/include/replication/worker_internal.h  |   4 +
 src/include/utils/lsyscache.h              |   1 +
 src/test/regress/expected/subscription.out |  79 +++++++++
 src/test/regress/sql/subscription.sql      |  55 +++++++
 10 files changed, 535 insertions(+), 4 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 5930e8c5816..a5dc9a11c60 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
 #include "commands/event_trigger.h"
 #include "commands/subscriptioncmds.h"
 #include "executor/executor.h"
+#include "executor/spi.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
@@ -47,10 +48,12 @@
 #include "storage/lmgr.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
+#include "utils/fmgroids.h"
 #include "utils/guc.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/pg_lsn.h"
+#include "utils/regproc.h"
 #include "utils/syscache.h"
 
 /*
@@ -75,6 +78,7 @@
 #define SUBOPT_MAX_RETENTION_DURATION	0x00008000
 #define SUBOPT_LSN					0x00010000
 #define SUBOPT_ORIGIN				0x00020000
+#define SUBOPT_CONFLICT_TABLE		0x00030000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -103,6 +107,7 @@ typedef struct SubOpts
 	bool		retaindeadtuples;
 	int32		maxretention;
 	char	   *origin;
+	char	   *conflicttable;
 	XLogRecPtr	lsn;
 } SubOpts;
 
@@ -135,7 +140,8 @@ static List *merge_publications(List *oldpublist, List *newpublist, bool addpub,
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
 static void CheckAlterSubOption(Subscription *sub, const char *option,
 								bool slot_needs_update, bool isTopLevel);
-
+static void CreateConflictHistoryTable(Oid namespaceId, char *conflictrel);
+static void DropConflictHistoryTable(Oid namespaceId, char *conflictrel);
 
 /*
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -191,6 +197,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->maxretention = 0;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
+	if (IsSet(supported_opts, SUBOPT_CONFLICT_TABLE))
+		opts->conflicttable = NULL;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -402,6 +410,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_LSN;
 			opts->lsn = lsn;
 		}
+		else if (IsSet(supported_opts, SUBOPT_CONFLICT_TABLE) &&
+				 strcmp(defel->defname, "conflict_log_table") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_CONFLICT_TABLE))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_CONFLICT_TABLE;
+			opts->conflicttable = defGetString(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -599,6 +616,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	bits32		supported_opts;
 	SubOpts		opts = {0};
 	AclResult	aclresult;
+	Oid			conflict_table_nspid;
+	char	   *conflict_table;
 
 	/*
 	 * Parse and check options.
@@ -612,7 +631,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
 					  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
 					  SUBOPT_RETAIN_DEAD_TUPLES |
-					  SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN);
+					  SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN |
+					  SUBOPT_CONFLICT_TABLE);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -747,6 +767,25 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_suborigin - 1] =
 		CStringGetTextDatum(opts.origin);
 
+	/*
+	 * If a conflict log history table name is specified, parse the schema and
+	 * table name from the string. Store the namespace OID and the table name in
+	 * the pg_subscription catalog tuple.
+	 */
+	if (opts.conflicttable)
+	{
+		List   *names = stringToQualifiedNameList(opts.conflicttable, NULL);
+
+		conflict_table_nspid =
+				QualifiedNameGetCreationNamespace(names, &conflict_table);
+		values[Anum_pg_subscription_subconflictnspid - 1] =
+					ObjectIdGetDatum(conflict_table_nspid);
+		values[Anum_pg_subscription_subconflicttable - 1] =
+					CStringGetTextDatum(conflict_table);
+	}
+	else
+		nulls[Anum_pg_subscription_subconflicttable - 1] = true;
+
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
 	/* Insert tuple into catalog. */
@@ -768,6 +807,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
 	replorigin_create(originname);
 
+	/* If conflict log history table name is given than create the table. */
+	if (opts.conflicttable)
+		CreateConflictHistoryTable(conflict_table_nspid, conflict_table);
+
 	/*
 	 * Connect to remote side to execute requested commands and fetch table
 	 * and sequence info.
@@ -1410,7 +1453,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 								  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
 								  SUBOPT_RETAIN_DEAD_TUPLES |
 								  SUBOPT_MAX_RETENTION_DURATION |
-								  SUBOPT_ORIGIN);
+								  SUBOPT_ORIGIN |
+								  SUBOPT_CONFLICT_TABLE);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -1665,6 +1709,25 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					origin = opts.origin;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_CONFLICT_TABLE))
+				{
+					Oid		nspid;
+					char   *relname = NULL;
+					List   *names =
+						stringToQualifiedNameList(opts.conflicttable, NULL);
+
+					nspid = QualifiedNameGetCreationNamespace(names, &relname);
+					values[Anum_pg_subscription_subconflictnspid - 1] =
+								ObjectIdGetDatum(nspid);
+					values[Anum_pg_subscription_subconflicttable - 1] =
+						CStringGetTextDatum(relname);
+
+					replaces[Anum_pg_subscription_subconflictnspid - 1] = true;
+					replaces[Anum_pg_subscription_subconflicttable - 1] = true;
+
+					CreateConflictHistoryTable(nspid, relname);
+				}
+
 				update_tuple = true;
 				break;
 			}
@@ -2027,6 +2090,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	Form_pg_subscription form;
 	List	   *rstates;
 	bool		must_use_password;
+	Oid			conflict_table_nsp = InvalidOid;
+	char	   *conflict_table = NULL;
 
 	/*
 	 * The launcher may concurrently start a new worker for this subscription.
@@ -2110,6 +2175,19 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	ObjectAddressSet(myself, SubscriptionRelationId, subid);
 	EventTriggerSQLDropAddObject(&myself, true, true);
 
+	/* Fetch the conflict log table information. */
+	conflict_table = get_subscription_conflictrel(subid, &conflict_table_nsp);
+
+	/*
+	 * If the subscription had a conflict log history table, drop it now.
+	 * This happens before deleting the subscription tuple.
+	 */
+	if (conflict_table)
+	{
+		DropConflictHistoryTable(conflict_table_nsp, conflict_table);
+		pfree(conflict_table);
+	}
+
 	/* Remove the tuple from catalog. */
 	CatalogTupleDelete(rel, &tup->t_self);
 
@@ -3188,3 +3266,96 @@ defGetStreamingMode(DefElem *def)
 					def->defname)));
 	return LOGICALREP_STREAM_OFF;	/* keep compiler quiet */
 }
+
+/*
+ * Create conflict log history table.
+ *
+ * The subscription owner becomes the owner of this table and has all
+ * privileges on it.
+ */
+static void
+CreateConflictHistoryTable(Oid namespaceId, char *conflictrel)
+{
+	StringInfoData 	querybuf;
+
+	/*
+	 * Check if table with same name already present, if so report an error
+	 * as currently we do not support user created table as conflict history
+	 * table.
+	 */
+	if (OidIsValid(get_relname_relid(conflictrel, namespaceId)))
+		ereport(ERROR,
+				(errcode(ERRCODE_DUPLICATE_TABLE),
+				 errmsg("cannot create conflict history table \"%s.%s\" because a table with that name already exists",
+						get_namespace_name(namespaceId), conflictrel),
+				 errhint("Use a different name for the conflict history table or drop the existing table.")));
+
+	initStringInfo(&querybuf);
+
+	/* build and execute the CREATE TABLE query. */
+	appendStringInfo(&querybuf,
+					 "CREATE TABLE %s.%s ("
+					 "relid	Oid,"
+					 "local_xid xid,"
+					 "remote_xid xid,"
+					 "remote_commit_lsn pg_lsn,"
+					 "local_commit_ts TIMESTAMPTZ,"
+					 "remote_commit_ts TIMESTAMPTZ,"
+					 "table_schema	TEXT,"
+					 "table_name	TEXT,"
+					 "conflict_type TEXT,"
+					 "local_origin	TEXT,"
+					 "remote_origin	TEXT,"
+					 "key_tuple		JSON,"
+					 "local_tuple	JSON,"
+					 "remote_tuple	JSON)",
+					 quote_identifier(get_namespace_name(namespaceId)),
+					 quote_identifier(conflictrel));
+
+	if (SPI_connect() != SPI_OK_CONNECT)
+		elog(ERROR, "SPI_connect failed");
+
+	if (SPI_execute(querybuf.data, false, 0) != SPI_OK_UTILITY)
+		elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+
+	if (SPI_finish() != SPI_OK_FINISH)
+		elog(ERROR, "SPI_finish failed");
+
+	pfree(querybuf.data);
+}
+
+/*
+ * Drop the conflict log history table.
+ *
+ * This function uses SPI to execute DROP TABLE IF EXISTS.
+ * We use IF EXISTS to avoid errors if the user manually dropped it first.
+ */
+static void
+DropConflictHistoryTable(Oid namespaceId, char *conflictrel)
+{
+	StringInfoData 	querybuf;
+
+	initStringInfo(&querybuf);
+
+	/*
+	 * Use DROP TABLE IF EXISTS and quote the identifiers to handle case-sensitive
+	 * or non-simple names. We use RESTRICT (default) since the table should
+	 * not have external dependencies preventing its removal, but IF EXISTS
+	 * ensures the command won't error if the table is already gone.
+	 */
+	appendStringInfo(&querybuf,
+					 "DROP TABLE IF EXISTS %s.%s",
+					 quote_identifier(get_namespace_name(namespaceId)),
+					 quote_identifier(conflictrel));
+
+	if (SPI_connect() != SPI_OK_CONNECT)
+		elog(ERROR, "SPI_connect failed");
+
+	if (SPI_execute(querybuf.data, false, 0) != SPI_OK_UTILITY)
+		elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+
+	if (SPI_finish() != SPI_OK_FINISH)
+		elog(ERROR, "SPI_finish failed");
+
+	pfree(querybuf.data);
+}
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 16695592265..1d9f8fabe6f 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -15,13 +15,23 @@
 #include "postgres.h"
 
 #include "access/commit_ts.h"
+#include "access/heapam.h"
 #include "access/tableam.h"
+#include "access/table.h"
+#include "catalog/indexing.h"
+#include "catalog/namespace.h"
+#include "catalog/pg_namespace_d.h"
+#include "catalog/pg_type.h"
 #include "executor/executor.h"
+#include "executor/spi.h"
 #include "pgstat.h"
 #include "replication/conflict.h"
 #include "replication/worker_internal.h"
 #include "storage/lmgr.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
+#include "utils/pg_lsn.h"
 
 static const char *const ConflictTypeNames[] = {
 	[CT_INSERT_EXISTS] = "insert_exists",
@@ -52,6 +62,16 @@ static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
 									   Oid indexoid);
 static char *build_index_value_desc(EState *estate, Relation localrel,
 									TupleTableSlot *slot, Oid indexoid);
+static Datum TupleTableSlotToJsonDatum(TupleTableSlot *slot);
+
+static void InsertConflictLog(Relation rel,
+							  TransactionId local_xid,
+							  TimestampTz local_ts,
+							  ConflictType conflict_type,
+							  RepOriginId origin_id,
+							  TupleTableSlot *searchslot,
+							  TupleTableSlot *localslot,
+							  TupleTableSlot *remoteslot);
 
 /*
  * Get the xmin and commit timestamp data (origin and timestamp) associated
@@ -112,6 +132,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 
 	/* Form errdetail message by combining conflicting tuples information. */
 	foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
+	{
 		errdetail_apply_conflict(estate, relinfo, type, searchslot,
 								 conflicttuple->slot, remoteslot,
 								 conflicttuple->indexoid,
@@ -120,6 +141,15 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 								 conflicttuple->ts,
 								 &err_detail);
 
+		/* Insert conflict details to log history table. */
+		InsertConflictLog(relinfo->ri_RelationDesc,
+						  conflicttuple->xmin,
+						  conflicttuple->ts, type,
+						  conflicttuple->origin,
+						  searchslot, conflicttuple->slot,
+						  remoteslot);
+	}
+
 	pgstat_report_subscription_conflict(MySubscription->oid, type);
 
 	ereport(elevel,
@@ -525,3 +555,139 @@ build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
 
 	return index_value;
 }
+
+/*
+ * Helper function to convert a TupleTableSlot to Jsonb
+ *
+ * This would be a new internal helper function for logical replication
+ * Needs to handle various data types and potentially TOASTed data
+ */
+static Datum
+TupleTableSlotToJsonDatum(TupleTableSlot *slot)
+{
+	HeapTuple	tuple = ExecCopySlotHeapTuple(slot);
+	Datum		datum = heap_copy_tuple_as_datum(tuple, slot->tts_tupleDescriptor);
+	Datum		json;
+
+	if (TupIsNull(slot))
+		return 0;
+
+	json = DirectFunctionCall1(row_to_json, datum);
+	heap_freetuple(tuple);
+
+	return json;
+}
+
+/*
+ * InsertConflictLog
+ *
+ * Insert details about a logical replication conflict to a conflict history
+ * table.
+ */
+static void
+InsertConflictLog(Relation rel, TransactionId local_xid, TimestampTz local_ts,
+				  ConflictType conflict_type, RepOriginId origin_id,
+				  TupleTableSlot *searchslot, TupleTableSlot *localslot,
+				  TupleTableSlot *remoteslot)
+{
+	Datum		values[MAX_CONFLICT_ATTR_NUM];
+	bool		nulls[MAX_CONFLICT_ATTR_NUM];
+	Oid			nspid;
+	Oid			relid;
+	Relation	conflictrel;
+	int			attno;
+	int			options = HEAP_INSERT_NO_LOGICAL;
+	char	   *relname;
+	char	   *origin = NULL;
+	char	   *remote_origin = NULL;
+	HeapTuple	tup;
+
+	/* If conflict history is not enabled for the subscription just return. */
+	relname = get_subscription_conflictrel(MyLogicalRepWorker->subid, &nspid);
+	if (relname == NULL)
+		return;
+
+	/* TODO: proper error code */
+	relid = get_relname_relid(relname, nspid);
+	if (!OidIsValid(relid))
+		elog(ERROR, "conflict log history table does not exists");
+	conflictrel = table_open(relid, RowExclusiveLock);
+	if (conflictrel == NULL)
+		elog(ERROR, "could not open conflict log history table");
+
+
+	/* Initialize values and nulls arrays */
+	memset(values, 0, sizeof(Datum) * MAX_CONFLICT_ATTR_NUM);
+	memset(nulls, 0, sizeof(bool) * MAX_CONFLICT_ATTR_NUM);
+
+	/* Populate the values and nulls arrays */
+	attno = 0;
+	values[attno] = ObjectIdGetDatum(RelationGetRelid(rel));
+	attno++;
+
+	values[attno] = TransactionIdGetDatum(local_xid);
+	attno++;
+
+	values[attno] = TransactionIdGetDatum(remote_xid);
+	attno++;
+
+	values[attno] = LSNGetDatum(remote_final_lsn);
+	attno++;
+
+	values[attno] = TimestampTzGetDatum(local_ts);
+	attno++;
+
+	values[attno] = TimestampTzGetDatum(remote_commit_ts);
+	attno++;
+
+	values[attno] =
+			CStringGetTextDatum(get_namespace_name(RelationGetNamespace(rel)));
+	attno++;
+
+	values[attno] = CStringGetTextDatum(RelationGetRelationName(rel));
+	attno++;
+
+	values[attno] = CStringGetTextDatum(ConflictTypeNames[conflict_type]);
+	attno++;
+
+	if (origin_id != InvalidRepOriginId)
+		replorigin_by_oid(origin_id, true, &origin);
+
+	if (origin != NULL)
+		values[attno] = CStringGetTextDatum(origin);
+	else
+		nulls[attno] = true;
+	attno++;
+
+	if (replorigin_session_origin != InvalidRepOriginId)
+		replorigin_by_oid(replorigin_session_origin, true, &remote_origin);
+
+	if (remote_origin != NULL)
+		values[attno] = CStringGetTextDatum(remote_origin);
+	else
+		nulls[attno] = true;
+	attno++;
+
+	if (searchslot != NULL)
+		values[attno] = TupleTableSlotToJsonDatum(searchslot);
+	else
+		nulls[attno] = true;
+	attno++;
+
+	if (localslot != NULL)
+		values[attno] = TupleTableSlotToJsonDatum(localslot);
+	else
+		nulls[attno] = true;
+	attno++;
+
+	if (remoteslot != NULL)
+		values[attno] = TupleTableSlotToJsonDatum(remoteslot);
+	else
+		nulls[attno] = true;
+
+	tup = heap_form_tuple(RelationGetDescr(conflictrel), values, nulls);
+	heap_insert(conflictrel, tup, GetCurrentCommandId(true), options, NULL);
+	table_close(conflictrel, RowExclusiveLock);
+
+	pfree(relname);
+}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 93970c6af29..e6c02685ec9 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -482,7 +482,9 @@ static bool MySubscriptionValid = false;
 static List *on_commit_wakeup_workers_subids = NIL;
 
 bool		in_remote_transaction = false;
-static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+TransactionId	remote_xid = InvalidTransactionId;
+TimestampTz	remote_commit_ts = 0;
 
 /* fields valid only when processing streamed transaction */
 static bool in_streamed_transaction = false;
@@ -1219,6 +1221,8 @@ apply_handle_begin(StringInfo s)
 	set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
 
 	remote_final_lsn = begin_data.final_lsn;
+	remote_commit_ts = begin_data.committime;
+	remote_xid = begin_data.xid;
 
 	maybe_start_skipping_changes(begin_data.final_lsn);
 
@@ -1745,6 +1749,10 @@ apply_handle_stream_start(StringInfo s)
 	/* extract XID of the top-level transaction */
 	stream_xid = logicalrep_read_stream_start(s, &first_segment);
 
+	remote_xid = stream_xid;
+	remote_final_lsn = InvalidXLogRecPtr;
+	remote_commit_ts = 0;
+
 	if (!TransactionIdIsValid(stream_xid))
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c
index fa7cd7e06a7..34fe347ebb7 100644
--- a/src/backend/utils/cache/lsyscache.c
+++ b/src/backend/utils/cache/lsyscache.c
@@ -3881,3 +3881,43 @@ get_subscription_name(Oid subid, bool missing_ok)
 
 	return subname;
 }
+
+/*
+ * get_subscription_conflictrel
+ *
+ * Get conflict relation name and namespace id from subscription.
+ */
+char *
+get_subscription_conflictrel(Oid subid, Oid *nspid)
+{
+	HeapTuple	tup;
+	Datum		datum;
+	bool		isnull;
+	char	   *relname;
+	Form_pg_subscription subform;
+
+	tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
+
+	if (!HeapTupleIsValid(tup))
+		return NULL;
+
+	subform = (Form_pg_subscription) GETSTRUCT(tup);
+
+	/* Get conflict table name */
+	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
+							tup,
+							Anum_pg_subscription_subconflicttable,
+							&isnull);
+	if (isnull)
+	{
+		ReleaseSysCache(tup);
+		return NULL;
+	}
+
+	*nspid = subform->subconflictnspid;
+	relname = pstrdup(TextDatumGetCString(datum));
+
+	ReleaseSysCache(tup);
+
+	return relname;
+}
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 55cb9b1eefa..ec31e2b1d56 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -80,6 +80,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	bool		subretaindeadtuples;	/* True if dead tuples useful for
 										 * conflict detection are retained */
+	Oid			subconflictnspid;	/* Namespace Oid in which the conflict history
+									 * table is created. */
 
 	int32		submaxretention;	/* The maximum duration (in milliseconds)
 									 * for which information useful for
@@ -105,6 +107,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	/* Only publish data originating from the specified origin */
 	text		suborigin BKI_DEFAULT(LOGICALREP_ORIGIN_ANY);
+
+	/* conflict log history table name if valid */
+	text		subconflicttable;
 #endif
 } FormData_pg_subscription;
 
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index c8fbf9e51b8..adc46e79286 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -62,6 +62,7 @@ typedef enum
 } ConflictType;
 
 #define CONFLICT_NUM_TYPES (CT_MULTIPLE_UNIQUE_CONFLICTS + 1)
+#define	MAX_CONFLICT_ATTR_NUM	15
 
 /*
  * Information for the existing local row that caused the conflict.
@@ -89,4 +90,5 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
 								TupleTableSlot *remoteslot,
 								List *conflicttuples);
 extern void InitConflictIndexes(ResultRelInfo *relInfo);
+
 #endif
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index f081619f151..314ac5dc746 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -256,6 +256,10 @@ extern PGDLLIMPORT bool InitializingApplyWorker;
 
 extern PGDLLIMPORT List *table_states_not_ready;
 
+extern XLogRecPtr remote_final_lsn;
+extern TimestampTz remote_commit_ts;
+extern TransactionId	remote_xid;
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype,
 												Oid subid, Oid relid,
diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h
index 50fb149e9ac..dc6df5843a4 100644
--- a/src/include/utils/lsyscache.h
+++ b/src/include/utils/lsyscache.h
@@ -210,6 +210,7 @@ extern Oid	get_publication_oid(const char *pubname, bool missing_ok);
 extern char *get_publication_name(Oid pubid, bool missing_ok);
 extern Oid	get_subscription_oid(const char *subname, bool missing_ok);
 extern char *get_subscription_name(Oid subid, bool missing_ok);
+extern char *get_subscription_conflictrel(Oid subid, Oid *nspid);
 
 #define type_is_array(typid)  (get_element_type(typid) != InvalidOid)
 /* type_is_array_domain accepts both plain arrays and domains over arrays */
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 327d1e7731f..72eea5e1601 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -517,6 +517,85 @@ COMMIT;
 -- ok, owning it is enough for this stuff
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
+--
+-- CONFLICT LOG HISTORY TABLE TESTS
+--
+SET SESSION AUTHORIZATION 'regress_subscription_user';
+-- fail - conflict_log_table specified when table already exists
+CREATE TABLE public.regress_conflict_log_temp (id int);
+CREATE SUBSCRIPTION regress_conflict_fail CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log_temp');
+ERROR:  cannot create conflict history table "public.regress_conflict_log_temp" because a table with that name already exists
+HINT:  Use a different name for the conflict history table or drop the existing table.
+DROP TABLE public.regress_conflict_log_temp;
+-- ok - conflict_log_table creation with CREATE SUBSCRIPTION
+CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log1');
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
+-- check metadata in pg_subscription
+SELECT subname, subconflicttable, subconflictnspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema
+FROM pg_subscription WHERE subname = 'regress_conflict_test1';
+        subname         |   subconflicttable    | is_public_schema 
+------------------------+-----------------------+------------------
+ regress_conflict_test1 | regress_conflict_log1 | t
+(1 row)
+
+-- check if the table exists and has the correct schema (15 columns)
+SELECT count(*) FROM pg_attribute WHERE attrelid = 'public.regress_conflict_log1'::regclass AND attnum > 0;
+ count 
+-------
+    14
+(1 row)
+
+-- check a specific column type (e.g., key_tuple should be JSON)
+SELECT format_type(atttypid, atttypmod) FROM pg_attribute WHERE attrelid = 'public.regress_conflict_log1'::regclass AND attname = 'key_tuple';
+ format_type 
+-------------
+ json
+(1 row)
+
+-- ok - adding conflict_log_table with ALTER SUBSCRIPTION
+CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log2');
+-- check metadata after ALTER
+SELECT subname, subconflicttable, subconflictnspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema
+FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+        subname         |   subconflicttable    | is_public_schema 
+------------------------+-----------------------+------------------
+ regress_conflict_test2 | regress_conflict_log2 | t
+(1 row)
+
+-- ok - dropping subscription also drops the log table
+ALTER SUBSCRIPTION regress_conflict_test1 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_test1;
+-- should return NULL, meaning the table was dropped
+SELECT to_regclass('public.regress_conflict_log1');
+ to_regclass 
+-------------
+ 
+(1 row)
+
+-- ok - dropping subscription when the log table was manually dropped first
+CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log1');
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
+DROP TABLE public.regress_conflict_log1;
+ALTER SUBSCRIPTION regress_conflict_test1 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_test1;
+NOTICE:  table "regress_conflict_log1" does not exist, skipping
+-- should return NULL, meaning the subscription was dropped successfully
+SELECT subname FROM pg_subscription WHERE subname = 'regress_conflict_test1';
+ subname 
+---------
+(0 rows)
+
+-- Clean up remaining test subscription
+ALTER SUBSCRIPTION regress_conflict_test2 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test2 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_test2;
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index ef0c298d2df..02afbf5c213 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -365,6 +365,61 @@ COMMIT;
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 
+--
+-- CONFLICT LOG HISTORY TABLE TESTS
+--
+
+SET SESSION AUTHORIZATION 'regress_subscription_user';
+
+-- fail - conflict_log_table specified when table already exists
+CREATE TABLE public.regress_conflict_log_temp (id int);
+CREATE SUBSCRIPTION regress_conflict_fail CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log_temp');
+DROP TABLE public.regress_conflict_log_temp;
+
+-- ok - conflict_log_table creation with CREATE SUBSCRIPTION
+CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log1');
+
+-- check metadata in pg_subscription
+SELECT subname, subconflicttable, subconflictnspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema
+FROM pg_subscription WHERE subname = 'regress_conflict_test1';
+
+-- check if the table exists and has the correct schema (15 columns)
+SELECT count(*) FROM pg_attribute WHERE attrelid = 'public.regress_conflict_log1'::regclass AND attnum > 0;
+
+-- check a specific column type (e.g., key_tuple should be JSON)
+SELECT format_type(atttypid, atttypmod) FROM pg_attribute WHERE attrelid = 'public.regress_conflict_log1'::regclass AND attname = 'key_tuple';
+
+-- ok - adding conflict_log_table with ALTER SUBSCRIPTION
+CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false);
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log2');
+
+-- check metadata after ALTER
+SELECT subname, subconflicttable, subconflictnspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema
+FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+
+-- ok - dropping subscription also drops the log table
+ALTER SUBSCRIPTION regress_conflict_test1 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_test1;
+
+-- should return NULL, meaning the table was dropped
+SELECT to_regclass('public.regress_conflict_log1');
+
+-- ok - dropping subscription when the log table was manually dropped first
+CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log1');
+DROP TABLE public.regress_conflict_log1;
+ALTER SUBSCRIPTION regress_conflict_test1 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_test1;
+
+-- should return NULL, meaning the subscription was dropped successfully
+SELECT subname FROM pg_subscription WHERE subname = 'regress_conflict_test1';
+
+-- Clean up remaining test subscription
+ALTER SUBSCRIPTION regress_conflict_test2 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test2 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_test2;
+
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
-- 
2.49.0

