From 11b70a070c40433389fec476464078a7038384e3 Mon Sep 17 00:00:00 2001
From: Nik Samokhvalov <nik@niks-mbp.lan>
Date: Thu, 16 Apr 2026 19:25:55 -0700
Subject: [PATCH] Do not attribute logical decoding aborts to xact_rollback

On a logical-replication publisher, every decoded committed
transaction bumps pg_stat_database.xact_rollback because the walsender
ends each decoded transaction with a top-level AbortCurrentTransaction()
for catalog cleanup.  The counts surface in shared stats on walsender
exit, producing a visible spike that has nothing to do with user
rollbacks.

Add pgStatXactSkipCounters, a backend-local flag consulted by
AtEOXact_PgStat_Database() alongside the existing parallel-worker
check.  ReorderBufferProcessTXN() (both abort sites, gated on
!using_subtxn) and SnapBuildClearExportedSnapshot() bracket their
AbortCurrentTransaction() with it.  StartTransaction() asserts the
flag is clear to catch leaks.

TAP test src/test/subscription/t/039_publisher_xact_rollback.pl:
5 autocommit INSERTs, DISABLE subscription, assert the xact_rollback
delta is 0.

Reported-by: Rafael Thofehrn Castro
Discussion: https://postgr.es/m/CAG0ozMo_xWQn%2BAvv8jzbbhePGp5OnhdO%2BYWTkdg4faWSXz0Jzg%40mail.gmail.com
---
 src/backend/access/transam/xact.c             |  1 +
 .../replication/logical/reorderbuffer.c       | 14 +++-
 src/backend/replication/logical/snapbuild.c   |  9 ++-
 src/backend/utils/activity/pgstat_database.c  | 31 ++++----
 src/include/pgstat.h                          |  3 +
 src/test/subscription/meson.build             |  1 +
 .../t/039_publisher_xact_rollback.pl          | 71 +++++++++++++++++++
 7 files changed, 115 insertions(+), 15 deletions(-)
 create mode 100644 src/test/subscription/t/039_publisher_xact_rollback.pl

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 48bc90c967353..18dff512000c7 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2118,6 +2118,7 @@ StartTransaction(void)
 
 	/* check the current transaction state */
 	Assert(s->state == TRANS_DEFAULT);
+	Assert(!pgStatXactSkipCounters);
 
 	/*
 	 * Set the current transaction state information appropriately during
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 682d13c9f22f0..c100506e259bd 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2674,9 +2674,14 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		 * Aborting the current (sub-)transaction as a whole has the right
 		 * semantics. We want all locks acquired in here to be released, not
 		 * reassigned to the parent and we do not want any database access
-		 * have persistent effects.
+		 * have persistent effects.  In the !using_subtxn case this is a
+		 * top-level abort; keep it out of pg_stat_database.xact_rollback.
 		 */
+		if (!using_subtxn)
+			pgStatXactSkipCounters = true;
 		AbortCurrentTransaction();
+		/* unconditional clear is fine; it was false coming in */
+		pgStatXactSkipCounters = false;
 
 		/* make sure there's no cache pollution */
 		if (rbtxn_distr_inval_overflowed(txn))
@@ -2737,9 +2742,14 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
 		/*
 		 * Force cache invalidation to happen outside of a valid transaction
-		 * to prevent catalog access as we just caught an error.
+		 * to prevent catalog access as we just caught an error.  As above,
+		 * keep the top-level abort out of pg_stat_database.xact_rollback.
 		 */
+		if (!using_subtxn)
+			pgStatXactSkipCounters = true;
 		AbortCurrentTransaction();
+		/* unconditional clear is fine; it was false coming in */
+		pgStatXactSkipCounters = false;
 
 		/* make sure there's no cache pollution */
 		if (rbtxn_distr_inval_overflowed(txn))
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index c8309b96ed45c..d15db19abf3b6 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -631,8 +631,15 @@ SnapBuildClearExportedSnapshot(void)
 	 */
 	tmpResOwner = SavedResourceOwnerDuringExport;
 
-	/* make sure nothing could have ever happened */
+	/*
+	 * Make sure nothing could have ever happened.  Keep this cleanup abort
+	 * out of pg_stat_database.xact_rollback; the invariant that we are at
+	 * top level is required so the abort reaches AtEOXact_PgStat_Database.
+	 */
+	Assert(!IsSubTransaction());
+	pgStatXactSkipCounters = true;
 	AbortCurrentTransaction();
+	pgStatXactSkipCounters = false;
 
 	CurrentResourceOwner = tmpResOwner;
 }
diff --git a/src/backend/utils/activity/pgstat_database.c b/src/backend/utils/activity/pgstat_database.c
index 7f3bc0165931c..00c349631ca7b 100644
--- a/src/backend/utils/activity/pgstat_database.c
+++ b/src/backend/utils/activity/pgstat_database.c
@@ -291,21 +291,28 @@ pgstat_fetch_stat_dbentry(Oid dboid)
 		pgstat_fetch_entry(PGSTAT_KIND_DATABASE, dboid, InvalidOid, NULL);
 }
 
+/*
+ * When true, AtEOXact_PgStat_Database() skips the xact_commit /
+ * xact_rollback bump for the current (sub-)transaction end.  Logical
+ * decoding sets this around its catalog-cleanup AbortCurrentTransaction()
+ * so that abort is not attributed to pg_stat_database.
+ */
+bool		pgStatXactSkipCounters = false;
+
 void
 AtEOXact_PgStat_Database(bool isCommit, bool parallel)
 {
-	/* Don't count parallel worker transaction stats */
-	if (!parallel)
-	{
-		/*
-		 * Count transaction commit or abort.  (We use counters, not just
-		 * bools, in case the reporting message isn't sent right away.)
-		 */
-		if (isCommit)
-			pgStatXactCommit++;
-		else
-			pgStatXactRollback++;
-	}
+	if (parallel || pgStatXactSkipCounters)
+		return;
+
+	/*
+	 * Count transaction commit or abort.  (We use counters, not just bools,
+	 * in case the reporting message isn't sent right away.)
+	 */
+	if (isCommit)
+		pgStatXactCommit++;
+	else
+		pgStatXactRollback++;
 }
 
 /*
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index dfa2e8376382a..8e90903b56b7f 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -875,6 +875,9 @@ extern PGDLLIMPORT PgStat_Counter pgStatBlockWriteTime;
 extern PGDLLIMPORT PgStat_Counter pgStatActiveTime;
 extern PGDLLIMPORT PgStat_Counter pgStatTransactionIdleTime;
 
+/* see AtEOXact_PgStat_Database() */
+extern PGDLLIMPORT bool pgStatXactSkipCounters;
+
 /* updated by the traffic cop and in errfinish() */
 extern PGDLLIMPORT SessionEndType pgStatSessionEndCause;
 
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index e71e95c6297eb..268fa8c3e9c13 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -48,6 +48,7 @@ tests += {
       't/036_sequences.pl',
       't/037_except.pl',
       't/038_walsnd_shutdown_timeout.pl',
+      't/039_publisher_xact_rollback.pl',
       't/100_bugs.pl',
     ],
   },
diff --git a/src/test/subscription/t/039_publisher_xact_rollback.pl b/src/test/subscription/t/039_publisher_xact_rollback.pl
new file mode 100644
index 0000000000000..b7dc5b20d6176
--- /dev/null
+++ b/src/test/subscription/t/039_publisher_xact_rollback.pl
@@ -0,0 +1,71 @@
+
+# Copyright (c) 2026, PostgreSQL Global Development Group
+
+# Check that pg_stat_database.xact_rollback on a logical-replication
+# publisher is not inflated by the walsender's internal catalog-cleanup
+# aborts.  ReorderBufferProcessTXN() ends each decoded transaction with
+# AbortCurrentTransaction(); in the walsender that is a top-level abort
+# whose counter increment flushes to shared stats on walsender exit.
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+$node_publisher->safe_psql('postgres',
+	'CREATE TABLE t (id int PRIMARY KEY)');
+$node_subscriber->safe_psql('postgres',
+	'CREATE TABLE t (id int PRIMARY KEY)');
+
+$node_publisher->safe_psql('postgres', 'CREATE PUBLICATION p FOR TABLE t');
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION s CONNECTION '$publisher_connstr' PUBLICATION p");
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 's');
+
+# Use a baseline-delta rather than pg_stat_reset() to tolerate ambient
+# rollback activity.
+my $base = $node_publisher->safe_psql('postgres',
+	"SELECT xact_rollback FROM pg_stat_database WHERE datname = 'postgres'");
+chomp $base;
+
+# Five autocommit INSERTs: each becomes one decoded committed txn on the
+# walsender.  Without the fix, that's five spurious rollbacks after DISABLE.
+my $n = 5;
+$node_publisher->safe_psql('postgres',
+	join('', map { "INSERT INTO t VALUES ($_);\n" } 1 .. $n));
+
+$node_publisher->wait_for_catchup('s');
+
+# Disabling the subscription terminates the walsender; its shutdown hook
+# flushes pgstat counters to shared stats.
+$node_subscriber->safe_psql('postgres', 'ALTER SUBSCRIPTION s DISABLE');
+
+# Wait for this subscription's walsender (filter by application_name).
+$node_publisher->poll_query_until(
+	'postgres', q{
+	SELECT count(*) = 0 FROM pg_stat_activity
+	WHERE backend_type = 'walsender' AND application_name = 's'
+})
+  or die 's walsender did not exit';
+
+my $final = $node_publisher->safe_psql('postgres',
+	"SELECT xact_rollback FROM pg_stat_database WHERE datname = 'postgres'");
+chomp $final;
+
+cmp_ok(
+	$final - $base, '==', 0,
+	'walsender does not inflate publisher xact_rollback for decoded transactions'
+);
+
+done_testing();
