From 53e17680f404bc65b1fb63e6244903c6ac1bf4bc Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Thu, 22 Aug 2024 17:52:50 +0800
Subject: [PATCH v3] Collect statistics about conflicts in logical replication

This commit adds columns in view pg_stat_subscription_stats to show
information about the conflict which occur during the application of
logical replication changes. Currently, the following columns are added.

insert_exists_count:
	Number of times a row insertion violated a NOT DEFERRABLE unique constraint.
update_differ_count:
	Number of times an update was performed on a row that was previously modified by another origin.
update_exists_count:
	Number of times that the updated value of a row violates a NOT DEFERRABLE unique constraint.
update_missing_count:
	Number of times that the tuple to be updated is missing.
delete_differ_count:
	Number of times a delete was performed on a row that was previously modified by another origin.
delete_missing_count:
	Number of times that the tuple to be deleted is missing.

The update_differ and delete_differ conflicts can be detected only when
track_commit_timestamp is enabled.
---
 doc/src/sgml/logical-replication.sgml         |   5 +-
 doc/src/sgml/monitoring.sgml                  |  74 +++++++-
 src/backend/catalog/system_views.sql          |   6 +
 src/backend/replication/logical/conflict.c    |   5 +-
 .../utils/activity/pgstat_subscription.c      |  17 ++
 src/backend/utils/adt/pgstatfuncs.c           |  33 +++-
 src/include/catalog/pg_proc.dat               |   6 +-
 src/include/pgstat.h                          |   4 +
 src/include/replication/conflict.h            |   7 +
 src/test/regress/expected/rules.out           |   8 +-
 src/test/subscription/t/026_stats.pl          | 166 ++++++++++++++++--
 11 files changed, 300 insertions(+), 31 deletions(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index bee7e02983..f6823694c9 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1585,8 +1585,9 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
   </para>
 
   <para>
-   Additional logging is triggered in the following <firstterm>conflict</firstterm>
-   cases:
+   Additional logging is triggered, and the conflict statistics are collected (displayed in the
+   <link linkend="monitoring-pg-stat-subscription-stats"><structname>pg_stat_subscription_stats</structname></link> view)
+   in the following <firstterm>conflict</firstterm> cases:
    <variablelist>
     <varlistentry>
      <term><literal>insert_exists</literal></term>
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 55417a6fa9..ac3c773ea1 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -507,7 +507,7 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
 
      <row>
       <entry><structname>pg_stat_subscription_stats</structname><indexterm><primary>pg_stat_subscription_stats</primary></indexterm></entry>
-      <entry>One row per subscription, showing statistics about errors.
+      <entry>One row per subscription, showing statistics about errors and conflicts.
       See <link linkend="monitoring-pg-stat-subscription-stats">
       <structname>pg_stat_subscription_stats</structname></link> for details.
       </entry>
@@ -2157,7 +2157,9 @@ description | Waiting for a newly initialized WAL file to reach durable storage
        <structfield>apply_error_count</structfield> <type>bigint</type>
       </para>
       <para>
-       Number of times an error occurred while applying changes
+       Number of times an error occurred while applying changes. Note that any
+       conflict resulting in an apply error will be counted in both
+       <literal>apply_error_count</literal> and the corresponding conflict count.
       </para></entry>
      </row>
 
@@ -2171,6 +2173,74 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>insert_exists_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times a row insertion violated a
+       <literal>NOT DEFERRABLE</literal> unique constraint during the
+       application of changes
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>update_differ_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times an update was applied to a row that had been previously
+       modified by another source during the application of changes. This conflict is
+       counted only when the
+       <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+       option is enabled on the subscriber.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>update_exists_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times that an updated row value violated a
+       <literal>NOT DEFERRABLE</literal> unique constraint during the
+       application of changes
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>update_missing_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times the tuple to be updated was not found during the
+       application of changes
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>delete_differ_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times a delete operation was applied to row that had been
+       previously modified by another source during the application of changes.
+       This conflict is counted only when the
+       <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+       option is enabled on the subscriber.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>delete_missing_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times the tuple to be deleted was not found during the application
+       of changes
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 19cabc9a47..fcdd199117 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1365,6 +1365,12 @@ CREATE VIEW pg_stat_subscription_stats AS
         s.subname,
         ss.apply_error_count,
         ss.sync_error_count,
+        ss.insert_exists_count,
+        ss.update_differ_count,
+        ss.update_exists_count,
+        ss.update_missing_count,
+        ss.delete_differ_count,
+        ss.delete_missing_count,
         ss.stats_reset
     FROM pg_subscription as s,
          pg_stat_get_subscription_stats(s.oid) as ss;
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 0bc7959980..02f7892cb2 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -17,8 +17,9 @@
 #include "access/commit_ts.h"
 #include "access/tableam.h"
 #include "executor/executor.h"
+#include "pgstat.h"
 #include "replication/conflict.h"
-#include "replication/logicalrelation.h"
+#include "replication/worker_internal.h"
 #include "storage/lmgr.h"
 #include "utils/lsyscache.h"
 
@@ -114,6 +115,8 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 	Assert(!OidIsValid(indexoid) ||
 		   CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
 
+	pgstat_report_subscription_conflict(MySubscription->oid, type);
+
 	ereport(elevel,
 			errcode_apply_conflict(type),
 			errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
diff --git a/src/backend/utils/activity/pgstat_subscription.c b/src/backend/utils/activity/pgstat_subscription.c
index d9af8de658..e06c92727e 100644
--- a/src/backend/utils/activity/pgstat_subscription.c
+++ b/src/backend/utils/activity/pgstat_subscription.c
@@ -39,6 +39,21 @@ pgstat_report_subscription_error(Oid subid, bool is_apply_error)
 		pending->sync_error_count++;
 }
 
+/*
+ * Report a subscription conflict.
+ */
+void
+pgstat_report_subscription_conflict(Oid subid, ConflictType type)
+{
+	PgStat_EntryRef *entry_ref;
+	PgStat_BackendSubEntry *pending;
+
+	entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION,
+										  InvalidOid, subid, NULL);
+	pending = entry_ref->pending;
+	pending->conflict_count[type]++;
+}
+
 /*
  * Report creating the subscription.
  */
@@ -101,6 +116,8 @@ pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
 #define SUB_ACC(fld) shsubent->stats.fld += localent->fld
 	SUB_ACC(apply_error_count);
 	SUB_ACC(sync_error_count);
+	for (int i = 0; i < CONFLICT_NUM_TYPES; i++)
+		SUB_ACC(conflict_count[i]);
 #undef SUB_ACC
 
 	pgstat_unlock_entry(entry_ref);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 3221137123..870aee8e7b 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1966,13 +1966,14 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS	4
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS	10
 	Oid			subid = PG_GETARG_OID(0);
 	TupleDesc	tupdesc;
 	Datum		values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
 	bool		nulls[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
 	PgStat_StatSubEntry *subentry;
 	PgStat_StatSubEntry allzero;
+	int			i = 0;
 
 	/* Get subscription stats */
 	subentry = pgstat_fetch_stat_subscription(subid);
@@ -1985,7 +1986,19 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 					   INT8OID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "stats_reset",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "insert_exists_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "update_differ_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "update_exists_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "update_missing_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "delete_differ_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "delete_missing_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
 					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
@@ -1997,19 +2010,25 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 	}
 
 	/* subid */
-	values[0] = ObjectIdGetDatum(subid);
+	values[i++] = ObjectIdGetDatum(subid);
 
 	/* apply_error_count */
-	values[1] = Int64GetDatum(subentry->apply_error_count);
+	values[i++] = Int64GetDatum(subentry->apply_error_count);
 
 	/* sync_error_count */
-	values[2] = Int64GetDatum(subentry->sync_error_count);
+	values[i++] = Int64GetDatum(subentry->sync_error_count);
+
+	/* conflict count */
+	for (int nconflict = 0; nconflict < CONFLICT_NUM_TYPES; nconflict++)
+		values[i++] = Int64GetDatum(subentry->conflict_count[nconflict]);
 
 	/* stats_reset */
 	if (subentry->stat_reset_timestamp == 0)
-		nulls[3] = true;
+		nulls[i] = true;
 	else
-		values[3] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
+		values[i] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
+
+	Assert(i + 1 == PG_STAT_GET_SUBSCRIPTION_STATS_COLS);
 
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 4abc6d9526..3d5c2957c9 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5538,9 +5538,9 @@
 { oid => '6231', descr => 'statistics: information about subscription stats',
   proname => 'pg_stat_get_subscription_stats', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,int8,int8,timestamptz}',
-  proargmodes => '{i,o,o,o,o}',
-  proargnames => '{subid,subid,apply_error_count,sync_error_count,stats_reset}',
+  proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,apply_error_count,sync_error_count,insert_exists_count,update_differ_count,update_exists_count,update_missing_count,delete_differ_count,delete_missing_count,stats_reset}',
   prosrc => 'pg_stat_get_subscription_stats' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index f63159c55c..be2c91168a 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -15,6 +15,7 @@
 #include "datatype/timestamp.h"
 #include "portability/instr_time.h"
 #include "postmaster/pgarch.h"	/* for MAX_XFN_CHARS */
+#include "replication/conflict.h"
 #include "utils/backend_progress.h" /* for backward compatibility */
 #include "utils/backend_status.h"	/* for backward compatibility */
 #include "utils/relcache.h"
@@ -165,6 +166,7 @@ typedef struct PgStat_BackendSubEntry
 {
 	PgStat_Counter apply_error_count;
 	PgStat_Counter sync_error_count;
+	PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
 } PgStat_BackendSubEntry;
 
 /* ----------
@@ -423,6 +425,7 @@ typedef struct PgStat_StatSubEntry
 {
 	PgStat_Counter apply_error_count;
 	PgStat_Counter sync_error_count;
+	PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
 	TimestampTz stat_reset_timestamp;
 } PgStat_StatSubEntry;
 
@@ -725,6 +728,7 @@ extern PgStat_SLRUStats *pgstat_fetch_slru(void);
  */
 
 extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error);
+extern void pgstat_report_subscription_conflict(Oid subid, ConflictType type);
 extern void pgstat_create_subscription(Oid subid);
 extern void pgstat_drop_subscription(Oid subid);
 extern PgStat_StatSubEntry *pgstat_fetch_stat_subscription(Oid subid);
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 02cb84da7e..7232c8889b 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -14,6 +14,11 @@
 
 /*
  * Conflict types that could occur while applying remote changes.
+ *
+ * This enum is used in statistics collection (see
+ * PgStat_StatSubEntry::conflict_count) as well, therefore, when adding new
+ * values or reordering existing ones, ensure to review and potentially adjust
+ * the corresponding statistics collection codes.
  */
 typedef enum
 {
@@ -42,6 +47,8 @@ typedef enum
 	 */
 } ConflictType;
 
+#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
+
 extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
 									TransactionId *xmin,
 									RepOriginId *localorigin,
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 862433ee52..1985d2ffad 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2139,9 +2139,15 @@ pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
     ss.sync_error_count,
+    ss.insert_exists_count,
+    ss.update_differ_count,
+    ss.update_exists_count,
+    ss.update_missing_count,
+    ss.delete_differ_count,
+    ss.delete_missing_count,
     ss.stats_reset
    FROM pg_subscription s,
-    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, stats_reset);
+    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, insert_exists_count, update_differ_count, update_exists_count, update_missing_count, delete_differ_count, delete_missing_count, stats_reset);
 pg_stat_sys_indexes| SELECT relid,
     indexrelid,
     schemaname,
diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl
index fb3e5629b3..44a7f7fec1 100644
--- a/src/test/subscription/t/026_stats.pl
+++ b/src/test/subscription/t/026_stats.pl
@@ -16,6 +16,15 @@ $node_publisher->start;
 # Create subscriber node.
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init;
+
+# Enable track_commit_timestamp to detect origin-differ conflicts in logical
+# replication. Reduce wal_retrieve_retry_interval to 1ms to accelerate the
+# restart of the logical replication worker after encountering a conflict.
+$node_subscriber->append_conf(
+	'postgresql.conf', q{
+track_commit_timestamp = on
+wal_retrieve_retry_interval = 1ms
+});
 $node_subscriber->start;
 
 
@@ -30,6 +39,7 @@ sub create_sub_pub_w_errors
 		qq[
 	BEGIN;
 	CREATE TABLE $table_name(a int);
+	ALTER TABLE $table_name REPLICA IDENTITY FULL;
 	INSERT INTO $table_name VALUES (1);
 	COMMIT;
 	]);
@@ -95,7 +105,7 @@ sub create_sub_pub_w_errors
 	$node_subscriber->poll_query_until(
 		$db,
 		qq[
-	SELECT apply_error_count > 0
+	SELECT apply_error_count > 0 AND insert_exists_count > 0
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub_name'
 	])
@@ -105,6 +115,102 @@ sub create_sub_pub_w_errors
 	# Truncate test table so that apply worker can continue.
 	$node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));
 
+	# Insert a row on the subscriber.
+	$node_subscriber->safe_psql($db, qq(INSERT INTO $table_name VALUES (2)));
+
+	# Update the test table on the publisher. This operation will raise an
+	# error on the subscriber due to a violation of the unique constraint on
+	# the test table.
+	$node_publisher->safe_psql($db, qq(UPDATE $table_name SET a = 2;));
+
+	# Wait for the subscriber to report an update_exists conflict.
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT update_exists_count > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for update_exists conflict for subscription '$sub_name');
+
+	# Truncate the subscriber side test table. Now that the table is empty, the
+	# update conflict (update_existing) ERRORs will stop happening. A single
+	# update_missing conflict will be reported, but the update will be skipped
+	# on the subscriber, allowing the test to continue.
+    $node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));
+
+	# Wait for the subscriber to report update_missing conflict.
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT update_missing_count > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for update_missing conflict for subscription '$sub_name');
+
+	# Delete data from the test table on the publisher. This delete operation
+	# should be skipped on the subscriber since the table is already empty.
+	$node_publisher->safe_psql($db, qq(DELETE FROM $table_name;));
+
+	# Wait for the subscriber to report delete_missing conflict.
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT delete_missing_count > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for delete_missing conflict for subscription '$sub_name');
+
+	# Prepare data for further tests.
+	$node_publisher->safe_psql($db, qq(INSERT INTO $table_name VALUES (1)));
+	$node_publisher->wait_for_catchup($sub_name);
+	$node_subscriber->safe_psql($db, qq(
+		TRUNCATE $table_name;
+		INSERT INTO $table_name VALUES (1);
+	));
+
+	# Update the data in the test table on the publisher. This should generate
+	# a conflict because it causes subscriber to attempt to update a row that has
+	# been modified by a different origin.
+	$node_publisher->safe_psql($db, qq(UPDATE $table_name SET a = 2;));
+
+	# Wait for the subscriber to report an update_differ conflict.
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT update_differ_count > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for update_differ conflict for subscription '$sub_name');
+
+	# Prepare data for further tests.
+	$node_subscriber->safe_psql($db, qq(
+		TRUNCATE $table_name;
+		INSERT INTO $table_name VALUES (2);
+	));
+
+	# Delete data from the test table on the publisher. This should generate a
+	# conflict because it causes subscriber to attempt to delete a row that has
+	# been modified by a different origin.
+	$node_publisher->safe_psql($db, qq(DELETE FROM $table_name;));
+
+	$node_subscriber->poll_query_until(
+		$db,
+		qq[
+	SELECT delete_differ_count > 0
+	FROM pg_stat_subscription_stats
+	WHERE subname = '$sub_name'
+	])
+	  or die
+	  qq(Timed out while waiting for delete_differ conflict for subscription '$sub_name');
+
 	return ($pub_name, $sub_name);
 }
 
@@ -123,17 +229,23 @@ my ($pub1_name, $sub1_name) =
   create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
 	$table1_name);
 
-# Apply and Sync errors are > 0 and reset timestamp is NULL
+# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count > 0,
 	sync_error_count > 0,
+	insert_exists_count > 0,
+	update_differ_count > 0,
+	update_exists_count > 0,
+	update_missing_count > 0,
+	delete_differ_count > 0,
+	delete_missing_count > 0,
 	stats_reset IS NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub1_name')
 	),
-	qq(t|t|t),
-	qq(Check that apply errors and sync errors are both > 0 and stats_reset is NULL for subscription '$sub1_name'.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Check that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for subscription '$sub1_name'.)
 );
 
 # Reset a single subscription
@@ -141,17 +253,23 @@ $node_subscriber->safe_psql($db,
 	qq(SELECT pg_stat_reset_subscription_stats((SELECT subid FROM pg_stat_subscription_stats WHERE subname = '$sub1_name')))
 );
 
-# Apply and Sync errors are 0 and stats reset is not NULL
+# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
 	sync_error_count = 0,
+	insert_exists_count = 0,
+	update_differ_count = 0,
+	update_exists_count = 0,
+	update_missing_count = 0,
+	delete_differ_count = 0,
+	delete_missing_count = 0,
 	stats_reset IS NOT NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub1_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
 );
 
 # Get reset timestamp
@@ -181,46 +299,64 @@ my ($pub2_name, $sub2_name) =
   create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
 	$table2_name);
 
-# Apply and Sync errors are > 0 and reset timestamp is NULL
+# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count > 0,
 	sync_error_count > 0,
+	insert_exists_count > 0,
+	update_differ_count > 0,
+	update_exists_count > 0,
+	update_missing_count > 0,
+	delete_differ_count > 0,
+	delete_missing_count > 0,
 	stats_reset IS NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub2_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both > 0 and stats_reset is NULL for sub '$sub2_name'.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for sub '$sub2_name'.)
 );
 
 # Reset all subscriptions
 $node_subscriber->safe_psql($db,
 	qq(SELECT pg_stat_reset_subscription_stats(NULL)));
 
-# Apply and Sync errors are 0 and stats reset is not NULL
+# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
 	sync_error_count = 0,
+	insert_exists_count = 0,
+	update_differ_count = 0,
+	update_exists_count = 0,
+	update_missing_count = 0,
+	delete_differ_count = 0,
+	delete_missing_count = 0,
 	stats_reset IS NOT NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub1_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
 );
 
 is( $node_subscriber->safe_psql(
 		$db,
 		qq(SELECT apply_error_count = 0,
 	sync_error_count = 0,
+	insert_exists_count = 0,
+	update_differ_count = 0,
+	update_exists_count = 0,
+	update_missing_count = 0,
+	delete_differ_count = 0,
+	delete_missing_count = 0,
 	stats_reset IS NOT NULL
 	FROM pg_stat_subscription_stats
 	WHERE subname = '$sub2_name')
 	),
-	qq(t|t|t),
-	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
+	qq(t|t|t|t|t|t|t|t|t),
+	qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
 );
 
 $reset_time1 = $node_subscriber->safe_psql($db,
-- 
2.30.0.windows.2

