From 6c6df5edda0c93d6b58f409dbab9d45289abac4d Mon Sep 17 00:00:00 2001
From: Rui Zhao <zhaorui126@gmail.com>
Date: Mon, 11 May 2026 23:52:51 +0800
Subject: [PATCH v1 1/2] Lazy snapshot distribution in logical decoding

When a long-running write transaction in logical decoding coexists with
many concurrent catalog-modifying commits (e.g., autovacuum updating
pg_class stats for many tables, or batch DDL), the reorder buffer's
spill files can grow quadratically.  In measurements, master spills
247 MB at N=5000 and 813 MB at N=10000 catalog-modifying commits
during a single long write transaction with K=1 INSERT; linear
extrapolation to N=100K yields ~80 GB.

The trigger requires two conditions:

  (i)  Something prevents xmin from advancing.  This can be the long
       write transaction itself, or an unrelated xmin-holder such as
       pg_dump's REPEATABLE READ session or a slow logical-replication
       consumer.
  (ii) A transaction with at least one data change is present in the
       reorder buffer.  Read-only sessions do not appear in the reorder
       buffer and are not victims themselves; they only enable (i).

The root cause is that on every catalog-modifying commit, the snapshot
builder calls SnapBuildDistributeSnapshotAndInval() which appends a new
catalog snapshot to every in-progress transaction's change list.  Since
xmin cannot advance, SnapBuildPurgeOlderTxn() cannot purge entries from
builder->committed.xip, and each successive snapshot is larger than the
last.  Total disk usage attributed to snapshot distribution is O(N^2)
where N is the number of catalog-modifying commits.

Beyond disk usage, these accumulated snapshots also degrade decoding
performance: memory pressure forces earlier spill, every INTERNAL_SNAPSHOT
change incurs serialize/deserialize I/O, ReorderBufferProcessTXN()
performs an extra binary-heap pop and snapshot_now switch for each one,
and replication lag grows accordingly.

Defer snapshot distribution from commit time to data-change time.  A
generation counter (SnapBuild.snapshot_generation) is incremented each
time a new catalog snapshot is built.  Each ReorderBufferTXN tracks the
last generation it received.  When SnapBuildProcessChange() is called for
a data change, it distributes the current snapshot only if the txn's
generation is behind the builder's.

Invalidation messages are still distributed eagerly because each one
carries unique information that cannot be deduplicated; missing an
invalidation can cause pgoutput to silently drop changes for newly
published tables.  Snapshots, by contrast, represent absolute state, so
the latest one supersedes all earlier ones and intermediate ones can be
skipped.

A long-running write transaction with K data changes now accumulates at
most min(K, G) snapshots, where G is the number of catalog-modifying
commits during its lifetime; the generation counter deduplicates further
so consecutive data changes without intervening DDLs share one snapshot.
Disk usage drops from O(N^2) to O(min(K, G) * N).

The hot path (SnapBuildProcessChange for each data change) now performs a
single ReorderBufferTXNByXid() lookup that is reused for both the base
snapshot check and lazy distribution, eliminating redundant hash lookups
that existed in the original code.

The lazy snapshot is distributed to the toplevel transaction (using
txn->xid), consistent with the original eager distribution.  When a data
change belongs to a subtransaction, eager invalidation distribution still
places an invalidation change entry in the toplevel's list at the DDL
commit LSN (strictly less than the data change LSN), which ensures the
toplevel is at the binary heap root when equal-LSN comparisons occur, so
the snapshot is processed before the data change.

SNAPBUILD_VERSION is bumped from 6 to 7 because snapshot_generation is
added to the serialized SnapBuild struct.

Tests:
  - contrib/test_decoding/specs/lazy_snapshot_distribution.spec: three
    isolation test scenarios covering long-tx + DDL + INSERT,
    long-tx + many catalog changes + INSERT, and subtransaction + DDL +
    INSERT.
  - contrib/test_decoding/t/002_lazy_snapshot_spill.pl: verifies that a
    long transaction with 200 catalog-modifying DDLs between its two
    INSERTs results in spill_bytes = 0.
  - All existing test_decoding, subscription, recovery, and
    pg_logicalinspect tests pass without modification.
---
 contrib/test_decoding/Makefile                |   3 +-
 .../expected/lazy_snapshot_distribution.out   |  70 ++++++++++
 contrib/test_decoding/meson.build             |   2 +
 .../specs/lazy_snapshot_distribution.spec     |  81 +++++++++++
 .../t/002_lazy_snapshot_spill.pl              | 120 ++++++++++++++++
 .../replication/logical/reorderbuffer.c       |   4 +-
 src/backend/replication/logical/snapbuild.c   | 130 ++++++++++--------
 src/include/replication/reorderbuffer.h       |  12 ++
 src/include/replication/snapbuild_internal.h  |  11 ++
 9 files changed, 372 insertions(+), 61 deletions(-)
 create mode 100644 contrib/test_decoding/expected/lazy_snapshot_distribution.out
 create mode 100644 contrib/test_decoding/specs/lazy_snapshot_distribution.spec
 create mode 100644 contrib/test_decoding/t/002_lazy_snapshot_spill.pl

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 0111124399..17e342cba4 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -9,7 +9,8 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
 	oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
 	twophase_snapshot slot_creation_error catalog_change_snapshot \
-	skip_snapshot_restore invalidation_distribution parallel_session_origin
+	skip_snapshot_restore invalidation_distribution parallel_session_origin \
+	lazy_snapshot_distribution
 
 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/lazy_snapshot_distribution.out b/contrib/test_decoding/expected/lazy_snapshot_distribution.out
new file mode 100644
index 0000000000..49b85bfa3a
--- /dev/null
+++ b/contrib/test_decoding/expected/lazy_snapshot_distribution.out
@@ -0,0 +1,70 @@
+Parsed test spec with 3 sessions
+
+starting permutation: s1_begin s1_insert_tbl1 s2_add_col_c1 s2_add_col_c2 s1_insert_tbl2_3col s1_commit s3_get_changes
+step s1_begin: BEGIN;
+step s1_insert_tbl1: INSERT INTO tbl1 VALUES (1);
+step s2_add_col_c1: ALTER TABLE tbl2 ADD COLUMN c1 integer;
+step s2_add_col_c2: ALTER TABLE tbl2 ADD COLUMN c2 integer;
+step s1_insert_tbl2_3col: INSERT INTO tbl2 VALUES (1, 10, 100);
+step s1_commit: COMMIT;
+step s3_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+data                                                                     
+-------------------------------------------------------------------------
+BEGIN                                                                    
+table public.tbl1: INSERT: val1[integer]:1                               
+table public.tbl2: INSERT: val1[integer]:1 c1[integer]:10 c2[integer]:100
+COMMIT                                                                   
+(4 rows)
+
+?column?
+--------
+stop    
+(1 row)
+
+
+starting permutation: s1_begin s1_insert_tbl1 s2_create_dummy1 s2_create_dummy2 s2_create_dummy3 s1_insert_tbl1 s1_commit s3_get_changes
+step s1_begin: BEGIN;
+step s1_insert_tbl1: INSERT INTO tbl1 VALUES (1);
+step s2_create_dummy1: CREATE TABLE dummy1 (id int);
+step s2_create_dummy2: CREATE TABLE dummy2 (id int);
+step s2_create_dummy3: CREATE TABLE dummy3 (id int);
+step s1_insert_tbl1: INSERT INTO tbl1 VALUES (1);
+step s1_commit: COMMIT;
+step s3_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+data                                      
+------------------------------------------
+BEGIN                                     
+table public.tbl1: INSERT: val1[integer]:1
+table public.tbl1: INSERT: val1[integer]:1
+COMMIT                                    
+(4 rows)
+
+?column?
+--------
+stop    
+(1 row)
+
+
+starting permutation: s1_begin s1_savepoint s1_insert_tbl1 s2_add_col_c1 s2_add_col_c2 s1_insert_tbl2_3col s1_release s1_commit s3_get_changes
+step s1_begin: BEGIN;
+step s1_savepoint: SAVEPOINT sp1;
+step s1_insert_tbl1: INSERT INTO tbl1 VALUES (1);
+step s2_add_col_c1: ALTER TABLE tbl2 ADD COLUMN c1 integer;
+step s2_add_col_c2: ALTER TABLE tbl2 ADD COLUMN c2 integer;
+step s1_insert_tbl2_3col: INSERT INTO tbl2 VALUES (1, 10, 100);
+step s1_release: RELEASE SAVEPOINT sp1;
+step s1_commit: COMMIT;
+step s3_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+data                                                                     
+-------------------------------------------------------------------------
+BEGIN                                                                    
+table public.tbl1: INSERT: val1[integer]:1                               
+table public.tbl2: INSERT: val1[integer]:1 c1[integer]:10 c2[integer]:100
+COMMIT                                                                   
+(4 rows)
+
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build
index ac655853d2..564d82bd4e 100644
--- a/contrib/test_decoding/meson.build
+++ b/contrib/test_decoding/meson.build
@@ -66,6 +66,7 @@ tests += {
       'skip_snapshot_restore',
       'invalidation_distribution',
       'parallel_session_origin',
+      'lazy_snapshot_distribution',
     ],
     'regress_args': [
       '--temp-config', files('logical.conf'),
@@ -76,6 +77,7 @@ tests += {
   'tap': {
     'tests': [
       't/001_repl_stats.pl',
+      't/002_lazy_snapshot_spill.pl',
     ],
   },
 }
diff --git a/contrib/test_decoding/specs/lazy_snapshot_distribution.spec b/contrib/test_decoding/specs/lazy_snapshot_distribution.spec
new file mode 100644
index 0000000000..217836a019
--- /dev/null
+++ b/contrib/test_decoding/specs/lazy_snapshot_distribution.spec
@@ -0,0 +1,81 @@
+# Test lazy snapshot distribution: snapshots are no longer eagerly distributed
+# to all in-progress transactions on each catalog-modifying commit. Instead,
+# they are lazily distributed when a transaction actually decodes a data change.
+# This avoids O(N^2) snapshot disk usage when a long-running transaction
+# coexists with many catalog-modifying commits (e.g. autovacuum).
+
+setup
+{
+    SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+    DROP TABLE IF EXISTS tbl1;
+    DROP TABLE IF EXISTS tbl2;
+    DROP TABLE IF EXISTS dummy1;
+    DROP TABLE IF EXISTS dummy2;
+    DROP TABLE IF EXISTS dummy3;
+    CREATE TABLE tbl1 (val1 integer);
+    CREATE TABLE tbl2 (val1 integer);
+}
+
+teardown
+{
+    DROP TABLE IF EXISTS tbl1;
+    DROP TABLE IF EXISTS tbl2;
+    DROP TABLE IF EXISTS dummy1;
+    DROP TABLE IF EXISTS dummy2;
+    DROP TABLE IF EXISTS dummy3;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+# Session s1: long-running transaction
+session "s1"
+setup { SET synchronous_commit=on; }
+step "s1_begin" { BEGIN; }
+step "s1_insert_tbl1" { INSERT INTO tbl1 VALUES (1); }
+step "s1_insert_tbl2_3col" { INSERT INTO tbl2 VALUES (1, 10, 100); }
+step "s1_savepoint" { SAVEPOINT sp1; }
+step "s1_release" { RELEASE SAVEPOINT sp1; }
+step "s1_commit" { COMMIT; }
+
+# Session s2: performs catalog-modifying operations
+session "s2"
+setup { SET synchronous_commit=on; }
+step "s2_add_col_c1" { ALTER TABLE tbl2 ADD COLUMN c1 integer; }
+step "s2_add_col_c2" { ALTER TABLE tbl2 ADD COLUMN c2 integer; }
+step "s2_create_dummy1" { CREATE TABLE dummy1 (id int); }
+step "s2_create_dummy2" { CREATE TABLE dummy2 (id int); }
+step "s2_create_dummy3" { CREATE TABLE dummy3 (id int); }
+
+# Session s3: consumes changes
+session "s3"
+setup { SET synchronous_commit=on; }
+step "s3_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); }
+
+# Scenario 1: Long transaction + multiple DDLs + correct decoding
+#
+# s1 starts a long transaction and inserts into tbl1 (gets base snapshot).
+# s2 performs two ALTER TABLE ADD COLUMN on tbl2.
+# s1 then inserts into tbl2 using the new schema (3 columns).
+# The lazily distributed snapshot at s1's second insert must reflect both
+# ALTER TABLEs so the new columns are visible during decoding.
+permutation "s1_begin" "s1_insert_tbl1" "s2_add_col_c1" "s2_add_col_c2" "s1_insert_tbl2_3col" "s1_commit" "s3_get_changes"
+
+# Scenario 2: Long transaction + many catalog changes + correct decoding
+#
+# s1 starts a long transaction and inserts into tbl1.
+# s2 performs multiple catalog-modifying DDLs (CREATE TABLE), simulating
+# the pattern of autovacuum generating many catalog change commits.
+# s1 inserts again into tbl1 after all the DDLs.
+# Both inserts must be correctly decoded despite many catalog-modifying
+# commits in between.  With eager distribution, each CREATE TABLE would
+# have distributed a snapshot to s1; with lazy distribution, only the
+# last snapshot is distributed when s1 does its second insert.
+permutation "s1_begin" "s1_insert_tbl1" "s2_create_dummy1" "s2_create_dummy2" "s2_create_dummy3" "s1_insert_tbl1" "s1_commit" "s3_get_changes"
+
+# Scenario 3: Subtransaction + catalog change + correct decoding
+#
+# s1 starts a transaction with a savepoint, inserts into tbl1 inside the
+# subtransaction.  s2 performs ALTER TABLE ADD COLUMN on tbl2.  s1 then
+# inserts into tbl2 with the new column, still inside the subtransaction.
+# Both inserts must decode correctly, verifying lazy distribution works
+# with subtransactions.
+permutation "s1_begin" "s1_savepoint" "s1_insert_tbl1" "s2_add_col_c1" "s2_add_col_c2" "s1_insert_tbl2_3col" "s1_release" "s1_commit" "s3_get_changes"
diff --git a/contrib/test_decoding/t/002_lazy_snapshot_spill.pl b/contrib/test_decoding/t/002_lazy_snapshot_spill.pl
new file mode 100644
index 0000000000..1f4551b0a6
--- /dev/null
+++ b/contrib/test_decoding/t/002_lazy_snapshot_spill.pl
@@ -0,0 +1,120 @@
+
+# Copyright (c) 2024-2026, PostgreSQL Global Development Group
+
+# Test that lazy snapshot distribution reduces spill file usage.
+#
+# With the old eager distribution, each catalog-modifying commit (e.g. vacuum,
+# DDL) would distribute a snapshot to every in-progress transaction in the
+# reorder buffer.  When a long-running transaction coexists with many such
+# commits, the snapshots accumulate with O(N^2) total size and spill to disk.
+#
+# With lazy distribution, snapshots are only distributed when a transaction
+# actually decodes a data change, so a long-running transaction with few or
+# no data changes receives at most one snapshot regardless of how many
+# catalog-modifying commits happen.
+#
+# Note: invalidation messages are still eagerly distributed and they produce
+# REORDER_BUFFER_CHANGE_INVALIDATION entries that count toward memory usage.
+# We set logical_decoding_work_mem high enough to accommodate invalidation
+# messages while verifying that snapshot distribution does not cause spilling.
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node = PostgreSQL::Test::Cluster->new('test');
+$node->init(allows_streaming => 'logical');
+$node->append_conf('postgresql.conf', qq(
+synchronous_commit = on
+logical_decoding_work_mem = 1MB
+autovacuum = off
+));
+$node->start;
+
+# Setup
+$node->safe_psql('postgres', qq[
+    CREATE TABLE test_data (id int);
+    SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');
+]);
+
+# Consume any setup-related changes
+$node->safe_psql('postgres',
+    "SELECT count(*) FROM pg_logical_slot_get_changes('test_slot', NULL, NULL)");
+
+# Reset stats
+$node->safe_psql('postgres',
+    "SELECT pg_stat_reset_replication_slot('test_slot')");
+$node->safe_psql('postgres', "SELECT pg_stat_force_next_flush()");
+
+# Start a long-running transaction in a background session.
+# This transaction will be in-progress while many catalog changes happen.
+my $long_txn = $node->background_psql('postgres', on_error_stop => 1);
+$long_txn->query_safe("BEGIN");
+$long_txn->query_safe("INSERT INTO test_data VALUES (1)");
+
+# In the main session, perform many catalog-modifying DDLs.
+# Each DDL commit increments snapshot_generation and would have eagerly
+# distributed a growing snapshot under the old code.
+#
+# With 200 DDLs (each CREATE TABLE is a catalog-modifying commit), the old
+# eager approach would accumulate ~200 snapshots with O(N^2) total size in
+# the long transaction's reorder buffer changes.
+# With lazy distribution, only 1 snapshot is distributed when the long
+# transaction does its next data change.
+my $num_ddls = 200;
+for my $i (1 .. $num_ddls) {
+    $node->safe_psql('postgres', "CREATE TABLE dummy_$i (id int)");
+}
+
+# Insert one more row in the long transaction and commit.
+$long_txn->query_safe("INSERT INTO test_data VALUES (2)");
+$long_txn->query_safe("COMMIT");
+$long_txn->quit;
+
+# Consume the changes to trigger decoding
+my $result = $node->safe_psql('postgres',
+    "SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, " .
+    "'include-xids', '0', 'skip-empty-xacts', '1')");
+
+# Verify the decoded data is correct
+like($result, qr/INSERT: id\[integer\]:1/, 'first INSERT decoded correctly');
+like($result, qr/INSERT: id\[integer\]:2/, 'second INSERT decoded correctly');
+
+# Check spill statistics.
+# With lazy snapshot distribution and logical_decoding_work_mem=1MB, the long
+# transaction should NOT spill.  It only contains 2 small INSERTs plus 1
+# lazily-distributed snapshot, well under 1MB.
+#
+# Without the optimization, the long transaction would have accumulated ~200
+# internal snapshots with a total size of ~O(N^2) bytes (~80KB for N=200),
+# plus ~200 invalidation change entries.  The combined size could push the
+# transaction over 1MB with larger N.
+$node->safe_psql('postgres', "SELECT pg_stat_force_next_flush()");
+$node->poll_query_until('postgres', qq[
+    SELECT spill_bytes IS NOT NULL
+    FROM pg_stat_replication_slots
+    WHERE slot_name = 'test_slot'
+]) or die "Timed out waiting for stats";
+
+my $spill_bytes = $node->safe_psql('postgres', qq[
+    SELECT spill_bytes
+    FROM pg_stat_replication_slots
+    WHERE slot_name = 'test_slot'
+]);
+
+is($spill_bytes, '0',
+    "lazy snapshot distribution prevents spilling (spill_bytes=$spill_bytes)");
+
+# Cleanup
+for my $i (1 .. $num_ddls) {
+    $node->safe_psql('postgres', "DROP TABLE IF EXISTS dummy_$i");
+}
+$node->safe_psql('postgres', qq[
+    SELECT pg_drop_replication_slot('test_slot');
+    DROP TABLE test_data;
+]);
+
+$node->stop;
+done_testing();
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 682d13c9f2..c63d4e17d3 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -235,7 +235,7 @@ int			debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED
  */
 static ReorderBufferTXN *ReorderBufferAllocTXN(ReorderBuffer *rb);
 static void ReorderBufferFreeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
-static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
+ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
 											   TransactionId xid, bool create, bool *is_new,
 											   XLogRecPtr lsn, bool create_as_top);
 static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
@@ -650,7 +650,7 @@ ReorderBufferFreeRelids(ReorderBuffer *rb, Oid *relids)
  * (with the given LSN, and as top transaction if that's specified);
  * when this happens, is_new is set to true.
  */
-static ReorderBufferTXN *
+ReorderBufferTXN *
 ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
 					  bool *is_new, XLogRecPtr lsn, bool create_as_top)
 {
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index c8309b96ed..1c6a94917a 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -172,7 +172,7 @@ static void SnapBuildFreeSnapshot(Snapshot snap);
 
 static void SnapBuildSnapIncRefcount(Snapshot snap);
 
-static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
+static void SnapBuildDistributeInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
 
 static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
 												 uint32 xinfo);
@@ -655,6 +655,8 @@ SnapBuildResetExportedSnapshotState(void)
 bool
 SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
 {
+	ReorderBufferTXN *txn;
+
 	/*
 	 * We can't handle data in transactions if we haven't built a snapshot
 	 * yet, so don't store them.
@@ -672,10 +674,23 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
 		return false;
 
 	/*
-	 * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
-	 * be needed to decode the change we're currently processing.
+	 * Look up the transaction once and reuse it for both the base snapshot
+	 * check and the lazy catalog snapshot distribution below.  This avoids
+	 * repeated hash lookups that were previously done separately by
+	 * ReorderBufferXidHasBaseSnapshot(), ReorderBufferSetBaseSnapshot(),
+	 * and the lazy distribution logic.
 	 */
-	if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
+	txn = ReorderBufferTXNByXid(builder->reorder, xid, true,
+								NULL, lsn, true);
+	if (rbtxn_is_known_subxact(txn))
+		txn = ReorderBufferTXNByXid(builder->reorder, txn->toplevel_xid,
+									false, NULL, InvalidXLogRecPtr, false);
+
+	/*
+	 * If the reorderbuffer doesn't yet have a base snapshot, add one now,
+	 * it will be needed to decode the change we're currently processing.
+	 */
+	if (txn->base_snapshot == NULL)
 	{
 		/* only build a new snapshot if we don't have a prebuilt one */
 		if (builder->snapshot == NULL)
@@ -694,6 +709,28 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
 									 builder->snapshot);
 	}
 
+	/*
+	 * Lazily distribute the catalog snapshot to this transaction if it hasn't
+	 * received the latest one yet.  This replaces the previous eager
+	 * distribution in SnapBuildDistributeSnapshotAndInval(), avoiding O(N^2)
+	 * disk usage when many catalog-modifying transactions (e.g. autovacuum)
+	 * commit while a long-running transaction is in progress.
+	 *
+	 * We only add a snapshot when the transaction actually has a data change
+	 * to decode, so idle long-running transactions won't accumulate any
+	 * snapshots at all.
+	 */
+	if (builder->snapshot_generation > 0 &&
+		txn->last_snapshot_generation < builder->snapshot_generation)
+	{
+		Assert(builder->snapshot != NULL);
+
+		SnapBuildSnapIncRefcount(builder->snapshot);
+		ReorderBufferAddSnapshot(builder->reorder, txn->xid,
+								 lsn, builder->snapshot);
+		txn->last_snapshot_generation = builder->snapshot_generation;
+	}
+
 	return true;
 }
 
@@ -737,15 +774,20 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
 }
 
 /*
- * Add a new Snapshot and invalidation messages to all transactions we're
- * decoding that currently are in-progress so they can see new catalog contents
- * made by the transaction that just committed. This is necessary because those
- * in-progress transactions will use the new catalog's contents from here on
- * (at the very least everything they do needs to be compatible with newer
- * catalog contents).
+ * Distribute invalidation messages to all in-progress transactions so they
+ * can see new catalog contents made by the transaction that just committed.
+ *
+ * Note: we no longer eagerly distribute snapshots here.  Instead, snapshots
+ * are lazily distributed in SnapBuildProcessChange() when a transaction
+ * actually needs to decode a data change.  This avoids O(N^2) snapshot disk
+ * usage when a long-running transaction coexists with many catalog-modifying
+ * commits (e.g. autovacuum of many tables).  The snapshot_generation counter
+ * in SnapBuild tracks when a new snapshot is available, and each transaction's
+ * last_snapshot_generation in ReorderBufferTXN tracks whether it has received
+ * the latest snapshot.
  */
 static void
-SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
+SnapBuildDistributeInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
 {
 	dlist_iter	txn_i;
 	ReorderBufferTXN *txn;
@@ -753,8 +795,7 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
 	/*
 	 * Iterate through all toplevel transactions. This can include
 	 * subtransactions which we just don't yet know to be that, but that's
-	 * fine, they will just get an unnecessary snapshot and invalidations
-	 * queued.
+	 * fine, they will just get unnecessary invalidations queued.
 	 */
 	dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
 	{
@@ -764,60 +805,25 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
 
 		/*
 		 * If we don't have a base snapshot yet, there are no changes in this
-		 * transaction which in turn implies we don't yet need a snapshot at
-		 * all. We'll add a snapshot when the first change gets queued.
-		 *
-		 * Similarly, we don't need to add invalidations to a transaction
-		 * whose base snapshot is not yet set. Once a base snapshot is built,
-		 * it will include the xids of committed transactions that have
-		 * modified the catalog, thus reflecting the new catalog contents. The
-		 * existing catalog cache will have already been invalidated after
-		 * processing the invalidations in the transaction that modified
-		 * catalogs, ensuring that a fresh cache is constructed during
-		 * decoding.
-		 *
-		 * NB: This works correctly even for subtransactions because
-		 * ReorderBufferAssignChild() takes care to transfer the base snapshot
-		 * to the top-level transaction, and while iterating the changequeue
-		 * we'll get the change from the subtxn.
+		 * transaction which in turn implies we don't yet need invalidations.
+		 * Once a base snapshot is built, it will include the xids of committed
+		 * transactions that have modified the catalog, thus reflecting the new
+		 * catalog contents.
 		 */
 		if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
 			continue;
 
 		/*
-		 * We don't need to add snapshot or invalidations to prepared
-		 * transactions as they should not see the new catalog contents.
+		 * We don't need to add invalidations to prepared transactions as they
+		 * should not see the new catalog contents.
 		 */
 		if (rbtxn_is_prepared(txn))
 			continue;
 
-		elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%08X",
-			 txn->xid, LSN_FORMAT_ARGS(lsn));
-
-		/*
-		 * increase the snapshot's refcount for the transaction we are handing
-		 * it out to
-		 */
-		SnapBuildSnapIncRefcount(builder->snapshot);
-		ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
-								 builder->snapshot);
-
 		/*
 		 * Add invalidation messages to the reorder buffer of in-progress
 		 * transactions except the current committed transaction, for which we
 		 * will execute invalidations at the end.
-		 *
-		 * It is required, otherwise, we will end up using the stale catcache
-		 * contents built by the current transaction even after its decoding,
-		 * which should have been invalidated due to concurrent catalog
-		 * changing transaction.
-		 *
-		 * Distribute only the invalidation messages generated by the current
-		 * committed transaction. Invalidation messages received from other
-		 * transactions would have already been propagated to the relevant
-		 * in-progress transactions. This transaction would have processed
-		 * those invalidations, ensuring that subsequent transactions observe
-		 * a consistent cache state.
 		 */
 		if (txn->xid != xid)
 		{
@@ -831,6 +837,9 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
 			{
 				Assert(msgs != NULL);
 
+				elog(DEBUG2, "adding invalidations to %u at %X/%08X",
+					 txn->xid, LSN_FORMAT_ARGS(lsn));
+
 				ReorderBufferAddDistributedInvalidations(builder->reorder,
 														 txn->xid, lsn,
 														 ninvalidations, msgs);
@@ -1097,6 +1106,9 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 
 		builder->snapshot = SnapBuildBuildSnapshot(builder);
 
+		/* Track that the catalog snapshot changed */
+		builder->snapshot_generation++;
+
 		/* we might need to execute invalidations, add snapshot */
 		if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
 		{
@@ -1109,10 +1121,12 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 		SnapBuildSnapIncRefcount(builder->snapshot);
 
 		/*
-		 * Add a new catalog snapshot and invalidations messages to all
-		 * currently running transactions.
+		 * Distribute invalidation messages to all currently running
+		 * transactions.  Snapshots are distributed lazily in
+		 * SnapBuildProcessChange() when a transaction decodes a data
+		 * change, avoiding O(N^2) disk usage from snapshot accumulation.
 		 */
-		SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
+		SnapBuildDistributeInval(builder, lsn, xid);
 	}
 }
 
@@ -1532,7 +1546,7 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
 	offsetof(SnapBuildOnDisk, version)
 
 #define SNAPBUILD_MAGIC 0x51A1E001
-#define SNAPBUILD_VERSION 6
+#define SNAPBUILD_VERSION 7
 
 /*
  * Store/Load a snapshot from disk, depending on the snapshot builder's state.
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index ff825e4b7b..c0ee7ca8ae 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -461,6 +461,14 @@ typedef struct ReorderBufferTXN
 	/* Size of top-transaction including sub-transactions. */
 	Size		total_size;
 
+	/*
+	 * Tracks the snapshot generation at which this transaction last received
+	 * a catalog snapshot via lazy distribution.  Compared against
+	 * SnapBuild.snapshot_generation to decide if a new snapshot is needed
+	 * before decoding a data change.
+	 */
+	uint64		last_snapshot_generation;
+
 	/*
 	 * Private data pointer of the output plugin.
 	 */
@@ -742,6 +750,10 @@ extern void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunning
 extern void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);
 extern void ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);
 
+extern ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
+											   TransactionId xid, bool create,
+											   bool *is_new, XLogRecPtr lsn,
+											   bool create_as_top);
 extern void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
 										 XLogRecPtr lsn, Snapshot snap);
 extern void ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
diff --git a/src/include/replication/snapbuild_internal.h b/src/include/replication/snapbuild_internal.h
index 363f7f5977..f8d20dd1f2 100644
--- a/src/include/replication/snapbuild_internal.h
+++ b/src/include/replication/snapbuild_internal.h
@@ -129,6 +129,17 @@ struct SnapBuild
 		TransactionId *xip;
 	}			committed;
 
+	/*
+	 * Generation counter, incremented each time a new catalog snapshot is
+	 * built due to a catalog-modifying transaction commit.  Used for lazy
+	 * snapshot distribution: instead of distributing a snapshot to every
+	 * in-progress transaction on each catalog-modifying commit, we only
+	 * distribute when the transaction actually needs to decode a data change.
+	 * This avoids O(N^2) snapshot disk usage when a long-running transaction
+	 * coexists with many vacuum commits.
+	 */
+	uint64		snapshot_generation;
+
 	/*
 	 * Array of transactions and subtransactions that had modified catalogs
 	 * and were running when the snapshot was serialized.
-- 
2.43.7

