From 8d8c9208193cb93170b5b0423d52c5459b45890c Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Tue, 18 Mar 2025 12:27:48 +0530
Subject: [PATCH v5] Implement the conflict detection for
 multiple_unique_conflicts in logical replication

Introduce a new conflict type, multiple_unique_conflicts, to handle cases
where an incoming row during logical replication violates multiple UNIQUE
constraints.

Previously, the apply worker detected and reported only the first
encountered key conflict (insert_exists/update_exists), causing repeated
failures as each constraint violation need to be handled one by one making
the process slow and error-prone.

Now, the apply worker checks all unique constraints upfront and reports
multiple_unique_conflicts if multiple violations exist. This allows users
to resolve all conflicts at once by deleting all conflicting tuples rather
than dealing with them individually or skipping the transaction.
---
 doc/src/sgml/logical-replication.sgml         |  13 ++
 src/backend/executor/execReplication.c        |  29 ++--
 src/backend/replication/logical/conflict.c    |  76 +++++++---
 src/backend/replication/logical/worker.c      |  18 +--
 src/include/replication/conflict.h            |  13 +-
 src/test/subscription/meson.build             |   1 +
 .../t/035_multiple_unique_conflicts.pl        | 133 ++++++++++++++++++
 7 files changed, 239 insertions(+), 44 deletions(-)
 create mode 100644 src/test/subscription/t/035_multiple_unique_conflicts.pl

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 3d18e507bbc..4817206af7d 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1877,6 +1877,19 @@ test_sub=# SELECT * from tab_gen_to_gen;
       </para>
      </listitem>
     </varlistentry>
+    <varlistentry id="conflict-multiple-unique-conflicts" xreflabel="multiple_unique_conflicts">
+     <term><literal>multiple_unique_conflicts</literal></term>
+     <listitem>
+      <para>
+       Inserting a row or updating values of a row violates more than one
+       <literal>NOT DEFERRABLE</literal> unique constraint. Note that to log
+       the origin and commit timestamp details of the conflicting key,
+       <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+       should be enabled on the subscriber. In this case, an error will be
+       raised until the conflict is resolved manually.
+      </para>
+     </listitem>
+    </varlistentry>
    </variablelist>
     Note that there are other conflict scenarios, such as exclusion constraint
     violations. Currently, we do not provide additional details for them in the
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 0a9b880d250..c5b31cd728b 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -493,25 +493,32 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
 					   ConflictType type, List *recheckIndexes,
 					   TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
 {
-	/* Check all the unique indexes for a conflict */
+	int			conflicts = 0;
+	List	   *conflictSlots = NIL;
+	List	   *conflictIndexes = NIL;
+	TupleTableSlot *conflictslot;
+
+	/* Check all the unique indexes for conflicts */
 	foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
 	{
-		TupleTableSlot *conflictslot;
-
 		if (list_member_oid(recheckIndexes, uniqueidx) &&
 			FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
 							  &conflictslot))
 		{
-			RepOriginId origin;
-			TimestampTz committs;
-			TransactionId xmin;
-
-			GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
-			ReportApplyConflict(estate, resultRelInfo, ERROR, type,
-								searchslot, conflictslot, remoteslot,
-								uniqueidx, xmin, origin, committs);
+			conflicts++;
+
+			/* Add the conflict slot and index to their respective lists */
+			conflictSlots = lappend(conflictSlots, conflictslot);
+			conflictIndexes = lappend_oid(conflictIndexes, uniqueidx);
 		}
 	}
+
+	/* Report the conflict if found */
+	if (conflicts)
+		ReportApplyConflict(estate, resultRelInfo, ERROR,
+							conflicts > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type,
+							searchslot, conflictSlots, remoteslot,
+							conflictIndexes, InvalidTransactionId, 0, 0);
 }
 
 /*
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 772fc83e88b..cba85c16888 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -29,11 +29,12 @@ static const char *const ConflictTypeNames[] = {
 	[CT_UPDATE_EXISTS] = "update_exists",
 	[CT_UPDATE_MISSING] = "update_missing",
 	[CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
-	[CT_DELETE_MISSING] = "delete_missing"
+	[CT_DELETE_MISSING] = "delete_missing",
+	[CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
 };
 
 static int	errcode_apply_conflict(ConflictType type);
-static int	errdetail_apply_conflict(EState *estate,
+static void errdetail_apply_conflict(EState *estate,
 									 ResultRelInfo *relinfo,
 									 ConflictType type,
 									 TupleTableSlot *searchslot,
@@ -41,7 +42,7 @@ static int	errdetail_apply_conflict(EState *estate,
 									 TupleTableSlot *remoteslot,
 									 Oid indexoid, TransactionId localxmin,
 									 RepOriginId localorigin,
-									 TimestampTz localts);
+									 TimestampTz localts, StringInfo err_msg);
 static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
 									   ConflictType type,
 									   TupleTableSlot *searchslot,
@@ -90,15 +91,15 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
  * 'searchslot' should contain the tuple used to search the local tuple to be
  * updated or deleted.
  *
- * 'localslot' should contain the existing local tuple, if any, that conflicts
- * with the remote tuple. 'localxmin', 'localorigin', and 'localts' provide the
- * transaction information related to this existing local tuple.
+ * 'conflictSlots' list contain the existing local tuples, if any, that
+ * conflicts with the remote tuple. 'localxmin', 'localorigin', and 'localts'
+ * provide the transaction information related to this existing local tuple.
  *
  * 'remoteslot' should contain the remote new tuple, if any.
  *
- * The 'indexoid' represents the OID of the unique index that triggered the
- * constraint violation error. We use this to report the key values for
- * conflicting tuple.
+ * The 'conflictIndexes' list represents the OIDs of the unique index that
+ * triggered the constraint violation error. We use this to report the key
+ * values for conflicting tuple.
  *
  * The caller must ensure that the index with the OID 'indexoid' is locked so
  * that we can fetch and display the conflicting key value.
@@ -106,16 +107,47 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
 void
 ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 					ConflictType type, TupleTableSlot *searchslot,
-					TupleTableSlot *localslot, TupleTableSlot *remoteslot,
-					Oid indexoid, TransactionId localxmin,
+					List *conflictSlots, TupleTableSlot *remoteslot,
+					List *conflictIndexes, TransactionId localxmin,
 					RepOriginId localorigin, TimestampTz localts)
 {
+	int			conflictNum = 0;
+	Oid			indexoid;
 	Relation	localrel = relinfo->ri_RelationDesc;
+	StringInfoData err_detail;
+
+	initStringInfo(&err_detail);
 
-	Assert(!OidIsValid(indexoid) ||
-		   CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
+	if (!conflictSlots)
+		errdetail_apply_conflict(estate, relinfo, type, searchslot,
+								 NULL, remoteslot, InvalidOid,
+								 localxmin, localorigin, localts, &err_detail);
 
-	pgstat_report_subscription_conflict(MySubscription->oid, type);
+	foreach_ptr(TupleTableSlot, slot, conflictSlots)
+	{
+		indexoid = lfirst_oid(list_nth_cell(conflictIndexes, conflictNum));
+
+		Assert(!OidIsValid(indexoid) ||
+			   CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
+
+		if (!localorigin)
+			GetTupleTransactionInfo(slot, &localxmin, &localorigin, &localts);
+
+		/*
+		 * Build the error detail message containing the conflicting key and
+		 * tuple information. The details for each conflict will be appended
+		 * to err_detail.
+		 */
+		errdetail_apply_conflict(estate, relinfo, type, searchslot,
+								 slot, remoteslot, indexoid,
+								 localxmin, localorigin, localts, &err_detail);
+
+		conflictNum++;
+	}
+
+	/* XXX: stats not supported for multiple_unique_conflict in this patch */
+	if (type != CT_MULTIPLE_UNIQUE_CONFLICTS)
+		pgstat_report_subscription_conflict(MySubscription->oid, type);
 
 	ereport(elevel,
 			errcode_apply_conflict(type),
@@ -123,9 +155,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 				   get_namespace_name(RelationGetNamespace(localrel)),
 				   RelationGetRelationName(localrel),
 				   ConflictTypeNames[type]),
-			errdetail_apply_conflict(estate, relinfo, type, searchslot,
-									 localslot, remoteslot, indexoid,
-									 localxmin, localorigin, localts));
+			errdetail_internal("%s", err_detail.data));
 }
 
 /*
@@ -169,6 +199,7 @@ errcode_apply_conflict(ConflictType type)
 	{
 		case CT_INSERT_EXISTS:
 		case CT_UPDATE_EXISTS:
+		case CT_MULTIPLE_UNIQUE_CONFLICTS:
 			return errcode(ERRCODE_UNIQUE_VIOLATION);
 		case CT_UPDATE_ORIGIN_DIFFERS:
 		case CT_UPDATE_MISSING:
@@ -191,12 +222,13 @@ errcode_apply_conflict(ConflictType type)
  *    replica identity columns, if any. The remote old tuple is excluded as its
  *    information is covered in the replica identity columns.
  */
-static int
+static void
 errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
 						 ConflictType type, TupleTableSlot *searchslot,
 						 TupleTableSlot *localslot, TupleTableSlot *remoteslot,
 						 Oid indexoid, TransactionId localxmin,
-						 RepOriginId localorigin, TimestampTz localts)
+						 RepOriginId localorigin, TimestampTz localts,
+						 StringInfo err_msg)
 {
 	StringInfoData err_detail;
 	char	   *val_desc;
@@ -209,6 +241,7 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
 	{
 		case CT_INSERT_EXISTS:
 		case CT_UPDATE_EXISTS:
+		case CT_MULTIPLE_UNIQUE_CONFLICTS:
 			Assert(OidIsValid(indexoid));
 
 			if (localts)
@@ -291,7 +324,7 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
 	if (val_desc)
 		appendStringInfo(&err_detail, "\n%s", val_desc);
 
-	return errdetail_internal("%s", err_detail.data);
+	appendStringInfo(err_msg, "%s", err_detail.data);
 }
 
 /*
@@ -323,7 +356,8 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
 	 * Report the conflicting key values in the case of a unique constraint
 	 * violation.
 	 */
-	if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS)
+	if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS ||
+		type == CT_MULTIPLE_UNIQUE_CONFLICTS)
 	{
 		Assert(OidIsValid(indexoid) && localslot);
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 31ab69ea13a..6a8d8843b61 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2711,8 +2711,8 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 			slot_store_data(newslot, relmapentry, newtup);
 
 			ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
-								remoteslot, localslot, newslot,
-								InvalidOid, localxmin, localorigin, localts);
+								remoteslot, list_make1(localslot), newslot,
+								list_make1(InvalidOid), localxmin, localorigin, localts);
 		}
 
 		/* Process and store remote tuple in the slot */
@@ -2742,7 +2742,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 		 */
 		ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
 							remoteslot, NULL, newslot,
-							InvalidOid, InvalidTransactionId,
+							NULL, InvalidTransactionId,
 							InvalidRepOriginId, 0);
 	}
 
@@ -2887,8 +2887,8 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
 		if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
 			localorigin != replorigin_session_origin)
 			ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
-								remoteslot, localslot, NULL,
-								InvalidOid, localxmin, localorigin, localts);
+								remoteslot, list_make1(localslot), NULL,
+								list_make1(InvalidOid), localxmin, localorigin, localts);
 
 		EvalPlanQualSetSlot(&epqstate, localslot);
 
@@ -2904,7 +2904,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
 		 */
 		ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
 							remoteslot, NULL, NULL,
-							InvalidOid, InvalidTransactionId,
+							NULL, InvalidTransactionId,
 							InvalidRepOriginId, 0);
 	}
 
@@ -3096,7 +3096,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 					ReportApplyConflict(estate, partrelinfo,
 										LOG, CT_UPDATE_MISSING,
 										remoteslot_part, NULL, newslot,
-										InvalidOid, InvalidTransactionId,
+										NULL, InvalidTransactionId,
 										InvalidRepOriginId, 0);
 
 					return;
@@ -3116,8 +3116,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 					slot_store_data(newslot, part_entry, newtup);
 
 					ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
-										remoteslot_part, localslot, newslot,
-										InvalidOid, localxmin, localorigin,
+										remoteslot_part, list_make1(localslot), newslot,
+										list_make1(InvalidOid), localxmin, localorigin,
 										localts);
 				}
 
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 37454dc9513..2c5a129959d 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -41,6 +41,9 @@ typedef enum
 	/* The row to be deleted is missing */
 	CT_DELETE_MISSING,
 
+	/* The row to be inserted/updated violates multiple unique constraint */
+	CT_MULTIPLE_UNIQUE_CONFLICTS,
+
 	/*
 	 * Other conflicts, such as exclusion constraint violations, involve more
 	 * complex rules than simple equality checks. These conflicts are left for
@@ -48,6 +51,11 @@ typedef enum
 	 */
 } ConflictType;
 
+/*
+ * XXX: Don't increament the CONFLICT_NUM_TYPES, as it is used by subscription
+ * stats module and this patch does not support stats for
+ * multiple_unique_conflicts.
+ */
 #define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
 
 extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
@@ -57,10 +65,9 @@ extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
 extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
 								int elevel, ConflictType type,
 								TupleTableSlot *searchslot,
-								TupleTableSlot *localslot,
+								List *conflictSlots,
 								TupleTableSlot *remoteslot,
-								Oid indexoid, TransactionId localxmin,
+								List *conflictIndexes, TransactionId localxmin,
 								RepOriginId localorigin, TimestampTz localts);
 extern void InitConflictIndexes(ResultRelInfo *relInfo);
-
 #endif
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index d40b49714f6..05fcdd08f57 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -41,6 +41,7 @@ tests += {
       't/032_subscribe_use_index.pl',
       't/033_run_as_table_owner.pl',
       't/034_temporal.pl',
+      't/035_multiple_unique_conflicts.pl',
       't/100_bugs.pl',
     ],
   },
diff --git a/src/test/subscription/t/035_multiple_unique_conflicts.pl b/src/test/subscription/t/035_multiple_unique_conflicts.pl
new file mode 100644
index 00000000000..f1417e313db
--- /dev/null
+++ b/src/test/subscription/t/035_multiple_unique_conflicts.pl
@@ -0,0 +1,133 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test the conflict detection of conflict type 'multiple_unique_conflicts'.
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+###############################
+# Setup
+###############################
+
+# Create a publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create a subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Create a table on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE conf_tab (a int PRIMARY KEY, b int UNIQUE, c int UNIQUE);");
+
+# Create same table on subscriber
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE conf_tab (a int PRIMARY key, b int unique, c int unique);");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub_tab FOR TABLE conf_tab");
+
+# Create the subscription
+my $appname = 'sub_tab';
+$node_subscriber->safe_psql(
+	'postgres',
+	"CREATE SUBSCRIPTION sub_tab
+	 CONNECTION '$publisher_connstr application_name=$appname'
+	 PUBLICATION pub_tab;");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+##################################################
+# INSERT data on Pub and Sub
+##################################################
+
+# Insert data in the publisher table
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO conf_tab VALUES (1,1,1);");
+
+# Insert data in the subscriber table
+$node_subscriber->safe_psql('postgres',
+	"INSERT INTO conf_tab VALUES (2,2,2), (3,3,3), (4,4,4);");
+
+##################################################
+# Test multiple_unique_conflicts due to INSERT
+##################################################
+my $log_offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO conf_tab VALUES (2,3,4);");
+
+# Confirm that this causes an error on the subscriber
+$node_subscriber->wait_for_log(
+	qr/ERROR:  conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts/,
+	$log_offset);
+
+ok( $node_subscriber->log_contains(
+		qr/Key already exists in unique index \"conf_tab_pkey\".*\n.*Key \(a\)=\(2\); existing local tuple \(2, 2, 2\); remote tuple \(2, 3, 4\)./,
+		$log_offset),
+	'multiple_unique_conflicts detected during insertion for conf_tab_pkey (a) = (2)'
+);
+
+ok( $node_subscriber->log_contains(
+		qr/Key already exists in unique index \"conf_tab_b_key\".*\n.*Key \(b\)=\(3\); existing local tuple \(3, 3, 3\); remote tuple \(2, 3, 4\)./,
+		$log_offset),
+	'multiple_unique_conflicts detected during insertion for conf_tab_b_key (b) = (3)'
+);
+
+ok( $node_subscriber->log_contains(
+		qr/Key already exists in unique index \"conf_tab_c_key\".*\n.*Key \(c\)=\(4\); existing local tuple \(4, 4, 4\); remote tuple \(2, 3, 4\)./,
+		$log_offset),
+	'multiple_unique_conflicts detected during insertion for conf_tab_c_key (c) = (4)'
+);
+
+# Truncate table to get rid of the error
+$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;");
+
+##################################################
+# Test multiple_unique_conflicts due to UPDATE
+##################################################
+$log_offset = -s $node_subscriber->logfile;
+
+# Insert data in the publisher table
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO conf_tab VALUES (5,5,5);");
+
+# Insert data in the subscriber table
+$node_subscriber->safe_psql('postgres',
+	"INSERT INTO conf_tab VALUES (6,6,6), (7,7,7), (8,8,8);");
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE conf_tab set a=6, b=7, c=8 where a=5;");
+
+# Confirm that this causes an error on the subscriber
+$node_subscriber->wait_for_log(
+	qr/ERROR:  conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts/,
+	$log_offset);
+
+ok( $node_subscriber->log_contains(
+		qr/Key already exists in unique index \"conf_tab_pkey\".*\n.*Key \(a\)=\(6\); existing local tuple \(6, 6, 6\); remote tuple \(6, 7, 8\)./,
+		$log_offset),
+	'multiple_unique_conflicts detected during update for conf_tab_pkey (a) = (6)'
+);
+
+ok( $node_subscriber->log_contains(
+		qr/Key already exists in unique index \"conf_tab_b_key\".*\n.*Key \(b\)=\(7\); existing local tuple \(7, 7, 7\); remote tuple \(6, 7, 8\)./,
+		$log_offset),
+	'multiple_unique_conflicts detected during update for conf_tab_b_key (b) = (7)'
+);
+
+ok( $node_subscriber->log_contains(
+		qr/Key already exists in unique index \"conf_tab_c_key\".*\n.*Key \(c\)=\(8\); existing local tuple \(8, 8, 8\); remote tuple \(6, 7, 8\)./,
+		$log_offset),
+	'multiple_unique_conflicts detected during update for conf_tab_c_key (c) = (8)'
+);
+
+done_testing();
-- 
2.34.1

