From 62b52d0011f08d472f3d60b67110453b07f1017d Mon Sep 17 00:00:00 2001
From: Nikolay Samokhvalov <nik@postgres.ai>
Date: Thu, 16 Apr 2026 19:25:55 -0700
Subject: [PATCH] Do not count logical decoding cleanup aborts in xact_rollback

ReorderBufferProcessTXN() aborts the current transaction after each
decoded commit to release locks and clean up catalog access.  In a
logical walsender that is a top-level abort, so every decoded commit
bumps pg_stat_database.xact_rollback; the counts surface as a spike on
walsender exit (e.g. when a subscription is disabled).

Add AbortCurrentTransactionWithoutXactStats(), a wrapper that suppresses
only the DB-level xact_commit/xact_rollback counter and leaves
per-relation and subxact stat handling intact.  Use it from the
top-level cleanup paths in ReorderBufferProcessTXN() (gated on
!using_subtxn) and SnapBuildClearExportedSnapshot().

A TAP test asserts a publisher xact_rollback delta of 0 across five
decoded transactions (delta is 5 without the fix).

Reported-by: Rafael Thofehrn Castro
Discussion: https://postgr.es/m/CAM527d_EbU5Li4a5FdKQjYsdF-4Lqr_i3jXmZOm7Wbb%3DQ2KzTw%40mail.gmail.com
---
 src/backend/access/transam/twophase.c         |  2 +-
 src/backend/access/transam/xact.c             | 39 ++++++++++-
 .../replication/logical/reorderbuffer.c       | 16 +++--
 src/backend/replication/logical/snapbuild.c   |  9 ++-
 src/backend/utils/activity/pgstat_xact.c      | 11 ++-
 src/include/access/xact.h                     |  1 +
 src/include/pgstat.h                          |  2 +-
 src/test/subscription/meson.build             |  1 +
 .../t/039_publisher_xact_rollback.pl          | 70 +++++++++++++++++++
 9 files changed, 139 insertions(+), 12 deletions(-)
 create mode 100644 src/test/subscription/t/039_publisher_xact_rollback.pl

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 1035e8b3fc7..3aa31e7f805 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1676,7 +1676,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	LWLockRelease(TwoPhaseStateLock);
 
 	/* Count the prepared xact as committed or aborted */
-	AtEOXact_PgStat(isCommit, false);
+	AtEOXact_PgStat(isCommit, false, true);
 
 	/*
 	 * And now we can clean up any files we may have left.
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 48bc90c9673..35d5e174011 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -316,6 +316,15 @@ typedef struct XactCallbackItem
 
 static XactCallbackItem *Xact_callbacks = NULL;
 
+/*
+ * When true, suppress the pg_stat_database xact_commit/xact_rollback bump
+ * for the current transaction end.  Must only be set via
+ * AbortCurrentTransactionWithoutXactStats(); assertions in
+ * StartTransaction() and in the wrapper itself guard against the flag
+ * leaking across transactions.
+ */
+static bool xactSkipXactStats = false;
+
 /*
  * List of add-on start- and end-of-subxact callbacks
  */
@@ -2118,6 +2127,7 @@ StartTransaction(void)
 
 	/* check the current transaction state */
 	Assert(s->state == TRANS_DEFAULT);
+	Assert(!xactSkipXactStats);
 
 	/*
 	 * Set the current transaction state information appropriately during
@@ -2514,7 +2524,7 @@ CommitTransaction(void)
 	AtEOXact_Files(true);
 	AtEOXact_ComboCid();
 	AtEOXact_HashTables(true);
-	AtEOXact_PgStat(true, is_parallel_worker);
+	AtEOXact_PgStat(true, is_parallel_worker, true);
 	AtEOXact_Snapshot(true, false);
 	AtEOXact_ApplyLauncher(true);
 	AtEOXact_LogicalRepWorkers(true);
@@ -3039,7 +3049,7 @@ AbortTransaction(void)
 		AtEOXact_Files(false);
 		AtEOXact_ComboCid();
 		AtEOXact_HashTables(false);
-		AtEOXact_PgStat(false, is_parallel_worker);
+		AtEOXact_PgStat(false, is_parallel_worker, !xactSkipXactStats);
 		AtEOXact_ApplyLauncher(false);
 		AtEOXact_LogicalRepWorkers(false);
 		AtEOXact_LogicalCtl();
@@ -3509,6 +3519,31 @@ AbortCurrentTransaction(void)
 	}
 }
 
+/*
+ * AbortCurrentTransactionWithoutXactStats
+ *
+ * Like AbortCurrentTransaction(), but do not count the transaction abort in
+ * pg_stat_database.xact_rollback.  This is for internal cleanup aborts that
+ * release transaction-local resources but do not represent a user-visible
+ * transaction rollback.
+ */
+void
+AbortCurrentTransactionWithoutXactStats(void)
+{
+	Assert(!xactSkipXactStats);
+
+	xactSkipXactStats = true;
+	PG_TRY();
+	{
+		AbortCurrentTransaction();
+	}
+	PG_FINALLY();
+	{
+		xactSkipXactStats = false;
+	}
+	PG_END_TRY();
+}
+
 /*
  *	AbortCurrentTransactionInternal - a function doing an iteration of work
  *		regarding handling the current transaction abort.  In the case of
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 682d13c9f22..f953676bfe1 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2674,9 +2674,13 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		 * Aborting the current (sub-)transaction as a whole has the right
 		 * semantics. We want all locks acquired in here to be released, not
 		 * reassigned to the parent and we do not want any database access
-		 * have persistent effects.
+		 * have persistent effects.  In the !using_subtxn case this is a
+		 * top-level abort; keep it out of pg_stat_database.xact_rollback.
 		 */
-		AbortCurrentTransaction();
+		if (using_subtxn)
+			AbortCurrentTransaction();
+		else
+			AbortCurrentTransactionWithoutXactStats();
 
 		/* make sure there's no cache pollution */
 		if (rbtxn_distr_inval_overflowed(txn))
@@ -2737,9 +2741,13 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
 		/*
 		 * Force cache invalidation to happen outside of a valid transaction
-		 * to prevent catalog access as we just caught an error.
+		 * to prevent catalog access as we just caught an error.  As above,
+		 * keep the top-level abort out of pg_stat_database.xact_rollback.
 		 */
-		AbortCurrentTransaction();
+		if (using_subtxn)
+			AbortCurrentTransaction();
+		else
+			AbortCurrentTransactionWithoutXactStats();
 
 		/* make sure there's no cache pollution */
 		if (rbtxn_distr_inval_overflowed(txn))
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index c8309b96ed4..ba9af88c505 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -631,8 +631,13 @@ SnapBuildClearExportedSnapshot(void)
 	 */
 	tmpResOwner = SavedResourceOwnerDuringExport;
 
-	/* make sure nothing could have ever happened */
-	AbortCurrentTransaction();
+	/*
+	 * Make sure nothing could have ever happened.  Keep this cleanup abort
+	 * out of pg_stat_database.xact_rollback; we must be at top level so
+	 * the abort reaches AtEOXact_PgStat_Database.
+	 */
+	Assert(!IsSubTransaction());
+	AbortCurrentTransactionWithoutXactStats();
 
 	CurrentResourceOwner = tmpResOwner;
 }
diff --git a/src/backend/utils/activity/pgstat_xact.c b/src/backend/utils/activity/pgstat_xact.c
index 5e2d69e6297..ea9f703c088 100644
--- a/src/backend/utils/activity/pgstat_xact.c
+++ b/src/backend/utils/activity/pgstat_xact.c
@@ -37,11 +37,18 @@ static PgStat_SubXactStatus *pgStatXactStack = NULL;
  * Called from access/transam/xact.c at top-level transaction commit/abort.
  */
 void
-AtEOXact_PgStat(bool isCommit, bool parallel)
+AtEOXact_PgStat(bool isCommit, bool parallel, bool count_xact_stats)
 {
 	PgStat_SubXactStatus *xact_state;
 
-	AtEOXact_PgStat_Database(isCommit, parallel);
+	/*
+	 * Only the database-level xact_commit/xact_rollback counter is gated
+	 * here.  Per-relation and subxact stat handling below must still run
+	 * unconditionally so any stats accumulated during the transaction are
+	 * not lost.
+	 */
+	if (count_xact_stats)
+		AtEOXact_PgStat_Database(isCommit, parallel);
 
 	/* handle transactional stats information */
 	xact_state = pgStatXactStack;
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index a8cbdf247c8..a5ec9a027d4 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -467,6 +467,7 @@ extern void SaveTransactionCharacteristics(SavedTransactionCharacteristics *s);
 extern void RestoreTransactionCharacteristics(const SavedTransactionCharacteristics *s);
 extern void CommitTransactionCommand(void);
 extern void AbortCurrentTransaction(void);
+extern void AbortCurrentTransactionWithoutXactStats(void);
 extern void BeginTransactionBlock(void);
 extern bool EndTransactionBlock(bool chain);
 extern bool PrepareTransactionBlock(const char *gid);
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index dfa2e837638..8509a590bbf 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -814,7 +814,7 @@ extern PgStat_StatSubEntry *pgstat_fetch_stat_subscription(Oid subid);
  * Functions in pgstat_xact.c
  */
 
-extern void AtEOXact_PgStat(bool isCommit, bool parallel);
+extern void AtEOXact_PgStat(bool isCommit, bool parallel, bool count_xact_stats);
 extern void AtEOSubXact_PgStat(bool isCommit, int nestDepth);
 extern void AtPrepare_PgStat(void);
 extern void PostPrepare_PgStat(void);
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index e71e95c6297..268fa8c3e9c 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -48,6 +48,7 @@ tests += {
       't/036_sequences.pl',
       't/037_except.pl',
       't/038_walsnd_shutdown_timeout.pl',
+      't/039_publisher_xact_rollback.pl',
       't/100_bugs.pl',
     ],
   },
diff --git a/src/test/subscription/t/039_publisher_xact_rollback.pl b/src/test/subscription/t/039_publisher_xact_rollback.pl
new file mode 100644
index 00000000000..1fa72a19712
--- /dev/null
+++ b/src/test/subscription/t/039_publisher_xact_rollback.pl
@@ -0,0 +1,70 @@
+# Copyright (c) 2026, PostgreSQL Global Development Group
+
+# Check that pg_stat_database.xact_rollback on a logical-replication
+# publisher is not inflated by the walsender's internal catalog-cleanup
+# aborts.  ReorderBufferProcessTXN() ends each decoded transaction with
+# AbortCurrentTransaction(); in the walsender that is a top-level abort
+# whose counter increment flushes to shared stats on walsender exit.
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+$node_publisher->safe_psql('postgres',
+	'CREATE TABLE t (id int PRIMARY KEY)');
+$node_subscriber->safe_psql('postgres',
+	'CREATE TABLE t (id int PRIMARY KEY)');
+
+$node_publisher->safe_psql('postgres', 'CREATE PUBLICATION p FOR TABLE t');
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION s CONNECTION '$publisher_connstr' PUBLICATION p");
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 's');
+
+# Use a baseline-delta rather than pg_stat_reset() to tolerate ambient
+# rollback activity.
+my $base = $node_publisher->safe_psql('postgres',
+	"SELECT xact_rollback FROM pg_stat_database WHERE datname = 'postgres'");
+chomp $base;
+
+# Five autocommit INSERTs: each becomes one decoded committed txn on the
+# walsender.  Without the fix, that's five spurious rollbacks after DISABLE.
+my $n = 5;
+$node_publisher->safe_psql('postgres',
+	join('', map { "INSERT INTO t VALUES ($_);\n" } 1 .. $n));
+
+$node_publisher->wait_for_catchup('s');
+
+# Disabling the subscription terminates the walsender; its shutdown hook
+# flushes pgstat counters to shared stats.
+$node_subscriber->safe_psql('postgres', 'ALTER SUBSCRIPTION s DISABLE');
+
+# Wait for this subscription's walsender (filter by application_name).
+$node_publisher->poll_query_until(
+	'postgres', q{
+	SELECT count(*) = 0 FROM pg_stat_activity
+	WHERE backend_type = 'walsender' AND application_name = 's'
+})
+  or die 's walsender did not exit';
+
+my $final = $node_publisher->safe_psql('postgres',
+	"SELECT xact_rollback FROM pg_stat_database WHERE datname = 'postgres'");
+chomp $final;
+
+cmp_ok(
+	$final - $base, '==', 0,
+	'walsender does not inflate publisher xact_rollback for decoded transactions'
+);
+
+done_testing();
-- 
2.50.1 (Apple Git-155)

