From ac8da6fa047037ba1c2f6514e3481ce2afce0fea Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@fujitsu.com>
Date: Fri, 31 May 2024 14:20:45 +0530
Subject: [PATCH v1] Detect update and delete conflict in logical replication

This patch adds a new parameter detect_conflict for CREATE and ALTER
subscription commands. This new parameter will decide if subscription will
go for confict detection. By default, conflict detection will be off for a
subscription.

When conflict detection is enabled, additional logging is triggered in the
following conflict scenarios:

* updating a row that was previously modified by another origin.
* The tuple to be updated is not found.
* The tuple to be deleted is not found.

While there exist other conflict types in logical replication, such as an
incoming insert conflicting with an existing row due to a primary key or
unique index, these cases already result in constraint violation errors.
Therefore, additional conflict detection for these cases is currently
omitted to minimize potential overhead. However, the pre-detection for
conflict in these error cases is still essential to support automatic
conflict resolution in the future.
---
 doc/src/sgml/catalogs.sgml                 |   9 ++
 doc/src/sgml/ref/alter_subscription.sgml   |   5 +-
 doc/src/sgml/ref/create_subscription.sgml  |  43 +++++
 src/backend/catalog/pg_subscription.c      |   1 +
 src/backend/catalog/system_views.sql       |   3 +-
 src/backend/commands/subscriptioncmds.c    |  31 +++-
 src/backend/replication/logical/worker.c   | 145 ++++++++++++++---
 src/bin/pg_dump/pg_dump.c                  |  17 +-
 src/bin/pg_dump/pg_dump.h                  |   1 +
 src/bin/psql/describe.c                    |   6 +-
 src/bin/psql/tab-complete.c                |  14 +-
 src/include/catalog/pg_subscription.h      |   4 +
 src/test/regress/expected/subscription.out | 176 ++++++++++++---------
 src/test/regress/sql/subscription.sql      |  15 ++
 src/test/subscription/t/001_rep_changes.pl |  15 +-
 src/test/subscription/t/013_partition.pl   |  48 +++---
 src/test/subscription/t/030_origin.pl      |  26 +++
 src/tools/pgindent/typedefs.list           |   1 +
 18 files changed, 419 insertions(+), 141 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 15f6255d86..495a6ea479 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -8038,6 +8038,15 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subdetectconflict</structfield> <type>bool</type>
+      </para>
+      <para>
+       If true, the subscription is enabled for conflict detection.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>subconninfo</structfield> <type>text</type>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 476f195622..5f6b83e415 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -228,8 +228,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       <link linkend="sql-createsubscription-params-with-disable-on-error"><literal>disable_on_error</literal></link>,
       <link linkend="sql-createsubscription-params-with-password-required"><literal>password_required</literal></link>,
       <link linkend="sql-createsubscription-params-with-run-as-owner"><literal>run_as_owner</literal></link>,
-      <link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>, and
-      <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>.
+      <link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>,
+      <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>, and
+      <link linkend="sql-createsubscription-params-with-detect-conflict"><literal>detect_conflict</literal></link>.
       Only a superuser can set <literal>password_required = false</literal>.
      </para>
 
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 740b7d9421..45f71d6386 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -428,6 +428,49 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
         </listitem>
        </varlistentry>
+
+      <varlistentry id="sql-createsubscription-params-with-detect-conflict">
+        <term><literal>detect_conflict</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether the subscription is enabled for conflict detection.
+          The default is <literal>false</literal>.
+         </para>
+         <para>
+          When conflict detection is enabled, additional logging is triggered
+          in the following scenarios:
+          <variablelist>
+           <varlistentry>
+            <term><literal>update_differ</literal></term>
+            <listitem>
+             <para>
+              Updating a row that was previously modified by another origin. Note that this
+              conflict only be detected when
+              <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+              is enabled.
+             </para>
+            </listitem>
+           </varlistentry>
+           <varlistentry>
+            <term><literal>update_missing</literal></term>
+            <listitem>
+             <para>
+              The tuple to be updated is not found.
+             </para>
+            </listitem>
+           </varlistentry>
+           <varlistentry>
+            <term><literal>delete_missing</literal></term>
+            <listitem>
+             <para>
+              The tuple to be deleted is not found.
+             </para>
+            </listitem>
+           </varlistentry>
+          </variablelist>
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist></para>
 
     </listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 9efc9159f2..5a423f4fb0 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -72,6 +72,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->passwordrequired = subform->subpasswordrequired;
 	sub->runasowner = subform->subrunasowner;
 	sub->failover = subform->subfailover;
+	sub->detectconflict = subform->subdetectconflict;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 53047cab5f..49ac738f26 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1359,7 +1359,8 @@ REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
               subbinary, substream, subtwophasestate, subdisableonerr,
 			  subpasswordrequired, subrunasowner, subfailover,
-              subslotname, subsynccommit, subpublications, suborigin)
+			  subdetectconflict, subslotname, subsynccommit,
+			  subpublications, suborigin)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index e407428dbc..e670d72708 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -70,8 +70,9 @@
 #define SUBOPT_PASSWORD_REQUIRED	0x00000800
 #define SUBOPT_RUN_AS_OWNER			0x00001000
 #define SUBOPT_FAILOVER				0x00002000
-#define SUBOPT_LSN					0x00004000
-#define SUBOPT_ORIGIN				0x00008000
+#define SUBOPT_DETECT_CONFLICT		0x00004000
+#define SUBOPT_LSN					0x00008000
+#define SUBOPT_ORIGIN				0x00010000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -97,6 +98,7 @@ typedef struct SubOpts
 	bool		passwordrequired;
 	bool		runasowner;
 	bool		failover;
+	bool		detectconflict;
 	char	   *origin;
 	XLogRecPtr	lsn;
 } SubOpts;
@@ -159,6 +161,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->runasowner = false;
 	if (IsSet(supported_opts, SUBOPT_FAILOVER))
 		opts->failover = false;
+	if (IsSet(supported_opts, SUBOPT_DETECT_CONFLICT))
+		opts->detectconflict = false;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
 
@@ -316,6 +320,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_FAILOVER;
 			opts->failover = defGetBoolean(defel);
 		}
+		else if (IsSet(supported_opts, SUBOPT_DETECT_CONFLICT) &&
+				 strcmp(defel->defname, "detect_conflict") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_DETECT_CONFLICT))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_DETECT_CONFLICT;
+			opts->detectconflict = defGetBoolean(defel);
+		}
 		else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
 				 strcmp(defel->defname, "origin") == 0)
 		{
@@ -603,7 +616,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
 					  SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
-					  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN);
+					  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
+					  SUBOPT_DETECT_CONFLICT | SUBOPT_ORIGIN);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -710,6 +724,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
 	values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
 	values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
+	values[Anum_pg_subscription_subdetectconflict - 1] =
+		BoolGetDatum(opts.detectconflict);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (opts.slot_name)
@@ -1146,7 +1162,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
 								  SUBOPT_PASSWORD_REQUIRED |
 								  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
-								  SUBOPT_ORIGIN);
+								  SUBOPT_DETECT_CONFLICT | SUBOPT_ORIGIN);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -1256,6 +1272,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_subfailover - 1] = true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_DETECT_CONFLICT))
+				{
+					values[Anum_pg_subscription_subdetectconflict - 1] =
+						BoolGetDatum(opts.detectconflict);
+					replaces[Anum_pg_subscription_subdetectconflict - 1] = true;
+				}
+
 				if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
 				{
 					values[Anum_pg_subscription_suborigin - 1] =
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b5a80fe3e8..80d6d02ecb 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -147,6 +147,7 @@
 #include <sys/stat.h>
 #include <unistd.h>
 
+#include "access/commit_ts.h"
 #include "access/table.h"
 #include "access/tableam.h"
 #include "access/twophase.h"
@@ -274,6 +275,31 @@ typedef enum
 	TRANS_PARALLEL_APPLY,
 } TransApplyAction;
 
+/*
+ * Conflict types that could be encountered when applying remote changes.
+ *
+ * For now, this list includes conflict types that will prompt additional logging
+ * only if conflict detection is turned on. Other conflicts that already
+ * lead to constraint violation errors are excluded from this enumeration.
+ */
+typedef enum
+{
+	/* The row to be updated was modified by a different origin */
+	CT_UPDATE_DIFFER,
+
+	/* The row to be updated is missing */
+	CT_UPDATE_MISSING,
+
+	/* The row to be deleted is missing */
+	CT_DELETE_MISSING,
+} ConflictType;
+
+const char *const ConflictTypeNames[] = {
+	[CT_UPDATE_DIFFER] = "update_differ",
+	[CT_UPDATE_MISSING] = "update_missing",
+	[CT_DELETE_MISSING] = "delete_missing"
+};
+
 /* errcontext tracker */
 ApplyErrorCallbackArg apply_error_callback_arg =
 {
@@ -416,6 +442,14 @@ static inline void reset_apply_error_context_info(void);
 static TransApplyAction get_transaction_apply_action(TransactionId xid,
 													 ParallelApplyWorkerInfo **winfo);
 
+static bool get_tuple_commit_ts(TupleTableSlot *localslot, TransactionId *xmin,
+								RepOriginId *localorigin,
+								TimestampTz *localts);
+static void report_apply_conflict(ConflictType type, Relation localrel,
+								  TransactionId localxmin,
+								  RepOriginId localorigin,
+								  TimestampTz localts);
+
 /*
  * Form the origin name for the subscription.
  *
@@ -2664,6 +2698,20 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 	 */
 	if (found)
 	{
+		RepOriginId localorigin;
+		TransactionId localxmin;
+		TimestampTz localts;
+
+		/*
+		 * If conflict detection is enabled, check whether the local tuple was
+		 * modified by a different origin. If detected, report the conflict.
+		 */
+		if (MySubscription->detectconflict &&
+			get_tuple_commit_ts(localslot, &localxmin, &localorigin, &localts) &&
+			localorigin != replorigin_session_origin)
+			report_apply_conflict(CT_UPDATE_DIFFER, localrel, localxmin,
+								  localorigin, localts);
+
 		/* Process and store remote tuple in the slot */
 		oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 		slot_modify_data(remoteslot, localslot, relmapentry, newtup);
@@ -2681,13 +2729,10 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 		/*
 		 * The tuple to be updated could not be found.  Do nothing except for
 		 * emitting a log message.
-		 *
-		 * XXX should this be promoted to ereport(LOG) perhaps?
 		 */
-		elog(DEBUG1,
-			 "logical replication did not find row to be updated "
-			 "in replication target relation \"%s\"",
-			 RelationGetRelationName(localrel));
+		if (MySubscription->detectconflict)
+			report_apply_conflict(CT_UPDATE_MISSING, localrel,
+								  InvalidTransactionId, InvalidRepOriginId, 0);
 	}
 
 	/* Cleanup. */
@@ -2821,13 +2866,10 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
 		/*
 		 * The tuple to be deleted could not be found.  Do nothing except for
 		 * emitting a log message.
-		 *
-		 * XXX should this be promoted to ereport(LOG) perhaps?
 		 */
-		elog(DEBUG1,
-			 "logical replication did not find row to be deleted "
-			 "in replication target relation \"%s\"",
-			 RelationGetRelationName(localrel));
+		if (MySubscription->detectconflict)
+			report_apply_conflict(CT_DELETE_MISSING, localrel,
+								  InvalidTransactionId, InvalidRepOriginId, 0);
 	}
 
 	/* Cleanup. */
@@ -3005,13 +3047,13 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 					/*
 					 * The tuple to be updated could not be found.  Do nothing
 					 * except for emitting a log message.
-					 *
-					 * XXX should this be promoted to ereport(LOG) perhaps?
 					 */
-					elog(DEBUG1,
-						 "logical replication did not find row to be updated "
-						 "in replication target relation's partition \"%s\"",
-						 RelationGetRelationName(partrel));
+					if (MySubscription->detectconflict)
+						report_apply_conflict(CT_UPDATE_MISSING,
+											  partrel,
+											  InvalidTransactionId,
+											  InvalidRepOriginId, 0);
+
 					return;
 				}
 
@@ -5088,3 +5130,70 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
 		return TRANS_LEADER_APPLY;
 	}
 }
+
+/*
+ * Get the xmin and commit timestamp data (origin and timestamp) associated
+ * with the provided local tuple.
+ *
+ * Returns true if the commit timestamp data was found, false otherwise.
+ */
+static bool
+get_tuple_commit_ts(TupleTableSlot *localslot, TransactionId *xmin,
+					RepOriginId *localorigin, TimestampTz *localts)
+{
+	Datum		xminDatum;
+	bool		isnull;
+
+	xminDatum = slot_getsysattr(localslot, MinTransactionIdAttributeNumber,
+								&isnull);
+	*xmin = DatumGetTransactionId(xminDatum);
+	Assert(!isnull);
+
+	/*
+	 * The commit timestamp data is not available if track_commit_timestamp is
+	 * disabled.
+	 */
+	if (!track_commit_timestamp)
+	{
+		*localorigin = InvalidRepOriginId;
+		*localts = 0;
+		return false;
+	}
+
+	return TransactionIdGetCommitTsData(*xmin, localts, localorigin);
+}
+
+/*
+ * Report a conflict when applying remote changes.
+ */
+static void
+report_apply_conflict(ConflictType type, Relation localrel,
+					  TransactionId localxmin, RepOriginId localorigin,
+					  TimestampTz localts)
+{
+	switch (type)
+	{
+		case CT_UPDATE_DIFFER:
+			ereport(LOG,
+					errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
+					errmsg("conflict %s detected on relation \"%s\"",
+						   ConflictTypeNames[type], RelationGetRelationName(localrel)),
+					errdetail("Updating a row that was modified by a different origin %u in transaction %u at %s.",
+							  localorigin, localxmin, timestamptz_to_str(localts)));
+			break;
+		case CT_UPDATE_MISSING:
+			ereport(LOG,
+					errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
+					errmsg("conflict %s detected on relation \"%s\"",
+						   ConflictTypeNames[type], RelationGetRelationName(localrel)),
+					errdetail("Did not find the row to be updated."));
+			break;
+		case CT_DELETE_MISSING:
+			ereport(LOG,
+					errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
+					errmsg("conflict %s detected on relation \"%s\"",
+						   ConflictTypeNames[type], RelationGetRelationName(localrel)),
+					errdetail("Did not find the row to be deleted."));
+			break;
+	}
+}
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index e324070828..c6b67c692d 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4739,6 +4739,7 @@ getSubscriptions(Archive *fout)
 	int			i_suboriginremotelsn;
 	int			i_subenabled;
 	int			i_subfailover;
+	int			i_subdetectconflict;
 	int			i,
 				ntups;
 
@@ -4811,11 +4812,17 @@ getSubscriptions(Archive *fout)
 
 	if (fout->remoteVersion >= 170000)
 		appendPQExpBufferStr(query,
-							 " s.subfailover\n");
+							 " s.subfailover,\n");
 	else
 		appendPQExpBuffer(query,
-						  " false AS subfailover\n");
+						  " false AS subfailover,\n");
 
+	if (fout->remoteVersion >= 170000)
+		appendPQExpBufferStr(query,
+							 " s.subdetectconflict\n");
+	else
+		appendPQExpBuffer(query,
+						  " false AS subdetectconflict\n");
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n");
 
@@ -4854,6 +4861,7 @@ getSubscriptions(Archive *fout)
 	i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn");
 	i_subenabled = PQfnumber(res, "subenabled");
 	i_subfailover = PQfnumber(res, "subfailover");
+	i_subdetectconflict = PQfnumber(res, "subdetectconflict");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4900,6 +4908,8 @@ getSubscriptions(Archive *fout)
 			pg_strdup(PQgetvalue(res, i, i_subenabled));
 		subinfo[i].subfailover =
 			pg_strdup(PQgetvalue(res, i, i_subfailover));
+		subinfo[i].subdetectconflict =
+			pg_strdup(PQgetvalue(res, i, i_subdetectconflict));
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -5140,6 +5150,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	if (strcmp(subinfo->subfailover, "t") == 0)
 		appendPQExpBufferStr(query, ", failover = true");
 
+	if (strcmp(subinfo->subdetectconflict, "t") == 0)
+		appendPQExpBufferStr(query, ", detect_conflict = true");
+
 	if (strcmp(subinfo->subsynccommit, "off") != 0)
 		appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
 
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 865823868f..02aa4a6f32 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -671,6 +671,7 @@ typedef struct _SubscriptionInfo
 	char	   *suborigin;
 	char	   *suboriginremotelsn;
 	char	   *subfailover;
+	char	   *subdetectconflict;
 } SubscriptionInfo;
 
 /*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index f67bf0b892..0472fe2e87 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6529,7 +6529,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
 		false, false, false, false, false, false, false, false, false, false,
-	false};
+	false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6597,6 +6597,10 @@ describeSubscriptions(const char *pattern, bool verbose)
 			appendPQExpBuffer(&buf,
 							  ", subfailover AS \"%s\"\n",
 							  gettext_noop("Failover"));
+		if (pset.sversion >= 170000)
+			appendPQExpBuffer(&buf,
+							  ", subdetectconflict AS \"%s\"\n",
+							  gettext_noop("Detect conflict"));
 
 		appendPQExpBuffer(&buf,
 						  ",  subsynccommit AS \"%s\"\n"
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index d453e224d9..219fac7e71 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1946,9 +1946,10 @@ psql_completion(const char *text, int start, int end)
 		COMPLETE_WITH("(", "PUBLICATION");
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
-		COMPLETE_WITH("binary", "disable_on_error", "failover", "origin",
-					  "password_required", "run_as_owner", "slot_name",
-					  "streaming", "synchronous_commit");
+		COMPLETE_WITH("binary", "detect_conflict", "disable_on_error",
+					  "failover", "origin", "password_required",
+					  "run_as_owner", "slot_name", "streaming",
+					  "synchronous_commit");
 	/* ALTER SUBSCRIPTION <name> SKIP ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
 		COMPLETE_WITH("lsn");
@@ -3363,9 +3364,10 @@ psql_completion(const char *text, int start, int end)
 	/* Complete "CREATE SUBSCRIPTION <name> ...  WITH ( <opt>" */
 	else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
 		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
-					  "disable_on_error", "enabled", "failover", "origin",
-					  "password_required", "run_as_owner", "slot_name",
-					  "streaming", "synchronous_commit", "two_phase");
+					  "detect_conflict", "disable_on_error", "enabled",
+					  "failover", "origin", "password_required",
+					  "run_as_owner", "slot_name", "streaming",
+					  "synchronous_commit", "two_phase");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
 
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 0aa14ec4a2..aad4907d43 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -98,6 +98,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 								 * slots) in the upstream database are enabled
 								 * to be synchronized to the standbys. */
 
+	bool		subdetectconflict;	/* True if replication should perform
+										 * conflict detection */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -151,6 +154,7 @@ typedef struct Subscription
 								 * (i.e. the main slot and the table sync
 								 * slots) in the upstream database are enabled
 								 * to be synchronized to the standbys. */
+	bool		detectconflict;	/* True if conflict detection is enabled */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 0f2a25cdc1..a8b0086dd9 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+ regress_testsub4
-                                                                                                                 List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | none   | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                          List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | none   | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
 \dRs+ regress_testsub4
-                                                                                                                 List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                          List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub3;
@@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname');
 ALTER SUBSCRIPTION regress_testsub SET (password_required = false);
 ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
 \dRs+
-                                                                                                                     List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | f                 | t             | f        | off                | dbname=regress_doesnotexist2 | 0/0
+                                                                                                                              List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | f                 | t             | f        | f               | off                | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
@@ -176,10 +176,10 @@ ERROR:  unrecognized subscription parameter: "create_slot"
 -- ok
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
 \dRs+
-                                                                                                                     List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist2 | 0/12345
+                                                                                                                              List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist2 | 0/12345
 (1 row)
 
 -- ok - with lsn = NONE
@@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
 ERROR:  invalid WAL location (LSN): 0/0
 \dRs+
-                                                                                                                     List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist2 | 0/0
+                                                                                                                              List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 BEGIN;
@@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                                                                       List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |           Conninfo           | Skip LSN 
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | t                 | f             | f        | local              | dbname=regress_doesnotexist2 | 0/0
+                                                                                                                                List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit |           Conninfo           | Skip LSN 
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+------------------------------+----------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | t                 | f             | f        | f               | local              | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 -- rename back to keep the rest simple
@@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication already exists
@@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                                                                        List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication used more than once
@@ -332,10 +332,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -371,10 +371,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 --fail - alter of two_phase option not supported.
@@ -383,10 +383,10 @@ ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -396,10 +396,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -412,18 +412,42 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 \dRs+
-                                                                                                                List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | t                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | t                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+-- fail - detect_conflict must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = foo);
+ERROR:  detect_conflict requires a Boolean value
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = true);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+\dRs+
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | t               | off                | dbname=regress_doesnotexist | 0/0
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (detect_conflict = false);
+\dRs+
+                                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Detect conflict | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | f               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 3e5ba4cb8c..a77b196704 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -290,6 +290,21 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 
+-- fail - detect_conflict must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = foo);
+
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = true);
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (detect_conflict = false);
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+
 -- let's do some tests with pg_create_subscription rather than superuser
 SET SESSION AUTHORIZATION regress_subscription_user3;
 
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 471e981962..d74f6bdabe 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -331,11 +331,12 @@ is( $result, qq(1|bar
 2|baz),
 	'update works with REPLICA IDENTITY FULL and a primary key');
 
-# Check that subscriber handles cases where update/delete target tuple
-# is missing.  We have to look for the DEBUG1 log messages about that,
-# so temporarily bump up the log verbosity.
-$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1");
-$node_subscriber->reload;
+# To check that subscriber handles cases where update/delete target tuple
+# is missing, detect_conflict is temporarily enabled to log conflicts
+# related to missing tuples.
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub SET (detect_conflict = true)"
+);
 
 $node_subscriber->safe_psql('postgres', "DELETE FROM tab_full_pk");
 
@@ -352,10 +353,10 @@ $node_publisher->wait_for_catchup('tap_sub');
 
 my $logfile = slurp_file($node_subscriber->logfile, $log_location);
 ok( $logfile =~
-	  qr/logical replication did not find row to be updated in replication target relation "tab_full_pk"/,
+	  qr/conflict update_missing detected on relation "tab_full_pk".*\n.*DETAIL:.* Did not find the row to be updated./m,
 	'update target row is missing');
 ok( $logfile =~
-	  qr/logical replication did not find row to be deleted in replication target relation "tab_full_pk"/,
+	  qr/conflict delete_missing detected on relation "tab_full_pk".*\n.*DETAIL:.* Did not find the row to be deleted./m,
 	'delete target row is missing');
 
 $node_subscriber->append_conf('postgresql.conf',
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index 29580525a9..02afbc2ed4 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -343,12 +343,12 @@ $result =
   $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1 ORDER BY 1");
 is($result, qq(), 'truncate of tab1 replicated');
 
-# Check that subscriber handles cases where update/delete target tuple
-# is missing.  We have to look for the DEBUG1 log messages about that,
-# so temporarily bump up the log verbosity.
-$node_subscriber1->append_conf('postgresql.conf',
-	"log_min_messages = debug1");
-$node_subscriber1->reload;
+# To check that subscriber handles cases where update/delete target tuple
+# is missing, detect_conflict is temporarily enabled to log conflicts
+# related to missing tuples.
+$node_subscriber1->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub1 SET (detect_conflict = true)"
+);
 
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab1 VALUES (1, 'foo'), (4, 'bar'), (10, 'baz')");
@@ -372,21 +372,21 @@ $node_publisher->wait_for_catchup('sub2');
 
 my $logfile = slurp_file($node_subscriber1->logfile(), $log_location);
 ok( $logfile =~
-	  qr/logical replication did not find row to be updated in replication target relation's partition "tab1_2_2"/,
+	  qr/conflict update_missing detected on relation "tab1_2_2".*\n.*DETAIL:.* Did not find the row to be updated./,
 	'update target row is missing in tab1_2_2');
 ok( $logfile =~
-	  qr/logical replication did not find row to be deleted in replication target relation "tab1_1"/,
+	  qr/conflict delete_missing detected on relation "tab1_1".*\n.*DETAIL:.* Did not find the row to be deleted./,
 	'delete target row is missing in tab1_1');
 ok( $logfile =~
-	  qr/logical replication did not find row to be deleted in replication target relation "tab1_2_2"/,
+	  qr/conflict delete_missing detected on relation "tab1_2_2".*\n.*DETAIL:.* Did not find the row to be deleted./,
 	'delete target row is missing in tab1_2_2');
 ok( $logfile =~
-	  qr/logical replication did not find row to be deleted in replication target relation "tab1_def"/,
+	  qr/conflict delete_missing detected on relation "tab1_def".*\n.*DETAIL:.* Did not find the row to be deleted./,
 	'delete target row is missing in tab1_def');
 
-$node_subscriber1->append_conf('postgresql.conf',
-	"log_min_messages = warning");
-$node_subscriber1->reload;
+$node_subscriber1->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub1 SET (detect_conflict = false)"
+);
 
 # Tests for replication using root table identity and schema
 
@@ -773,12 +773,12 @@ pub_tab2|3|yyy
 pub_tab2|5|zzz
 xxx_c|6|aaa), 'inserts into tab2 replicated');
 
-# Check that subscriber handles cases where update/delete target tuple
-# is missing.  We have to look for the DEBUG1 log messages about that,
-# so temporarily bump up the log verbosity.
-$node_subscriber1->append_conf('postgresql.conf',
-	"log_min_messages = debug1");
-$node_subscriber1->reload;
+# To check that subscriber handles cases where update/delete target tuple
+# is missing, detect_conflict is temporarily enabled to log conflicts
+# related to missing tuples.
+$node_subscriber1->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub_viaroot SET (detect_conflict = true)"
+);
 
 $node_subscriber1->safe_psql('postgres', "DELETE FROM tab2");
 
@@ -796,15 +796,15 @@ $node_publisher->wait_for_catchup('sub2');
 
 $logfile = slurp_file($node_subscriber1->logfile(), $log_location);
 ok( $logfile =~
-	  qr/logical replication did not find row to be updated in replication target relation's partition "tab2_1"/,
+	  qr/conflict update_missing detected on relation "tab2_1".*\n.*DETAIL:.* Did not find the row to be updated./,
 	'update target row is missing in tab2_1');
 ok( $logfile =~
-	  qr/logical replication did not find row to be deleted in replication target relation "tab2_1"/,
+	  qr/conflict delete_missing detected on relation "tab2_1".*\n.*DETAIL:.* Did not find the row to be deleted./,
 	'delete target row is missing in tab2_1');
 
-$node_subscriber1->append_conf('postgresql.conf',
-	"log_min_messages = warning");
-$node_subscriber1->reload;
+$node_subscriber1->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub_viaroot SET (detect_conflict = false)"
+);
 
 # Test that replication continues to work correctly after altering the
 # partition of a partitioned target table.
diff --git a/src/test/subscription/t/030_origin.pl b/src/test/subscription/t/030_origin.pl
index 056561f008..03dabfeb72 100644
--- a/src/test/subscription/t/030_origin.pl
+++ b/src/test/subscription/t/030_origin.pl
@@ -26,7 +26,12 @@ my $stderr;
 # node_A
 my $node_A = PostgreSQL::Test::Cluster->new('node_A');
 $node_A->init(allows_streaming => 'logical');
+
+# Enable the track_commit_timestamp to detect the conflict when attempting to
+# update a row that was previously modified by a different origin.
+$node_A->append_conf('postgresql.conf', 'track_commit_timestamp = on');
 $node_A->start;
+
 # node_B
 my $node_B = PostgreSQL::Test::Cluster->new('node_B');
 $node_B->init(allows_streaming => 'logical');
@@ -89,11 +94,32 @@ is( $result, qq(11
 	'Inserted successfully without leading to infinite recursion in bidirectional replication setup'
 );
 
+###############################################################################
+# Check that the conflict can be detected when attempting to update a row that
+# was previously modified by a different source.
+###############################################################################
+
+$node_A->safe_psql('postgres',
+	"ALTER SUBSCRIPTION $subname_AB SET (detect_conflict = true);");
+
+$node_B->safe_psql('postgres', "UPDATE tab SET a = 10 WHERE a = 11;");
+
+$node_A->wait_for_log(
+	qr/Updating a row that was modified by a different origin [0-9]+ in transaction [0-9]+ at .*/
+);
+
 $node_A->safe_psql('postgres', "DELETE FROM tab;");
 
 $node_A->wait_for_catchup($subname_BA);
 $node_B->wait_for_catchup($subname_AB);
 
+# The remaining tests no longer test conflict detection.
+$node_A->safe_psql('postgres',
+	"ALTER SUBSCRIPTION $subname_AB SET (detect_conflict = false);");
+
+$node_A->append_conf('postgresql.conf', 'track_commit_timestamp = off');
+$node_A->restart;
+
 ###############################################################################
 # Check that remote data of node_B (that originated from node_C) is not
 # published to node_A.
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index d427a1c16a..3b12be42ad 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -465,6 +465,7 @@ ConditionVariableMinimallyPadded
 ConditionalStack
 ConfigData
 ConfigVariable
+ConflictType
 ConnCacheEntry
 ConnCacheKey
 ConnParams
-- 
2.30.0.windows.2

