Hi,

On Thu, Jun 25, 2026 at 5:26 AM Hayato Kuroda (Fujitsu)
<[email protected]> wrote:
>
> Thanks for updating and sorry for the late reply. I reviewed your patch.
> Basically it looks good, but here are my comments.

Thank you for reviewing!

> ```
> @@ -439,10 +442,12 @@ retry:
>
>                 if (should_refetch_tuple(res, &tmfd))
>                         goto retry;
> +
> +               /* Materialize the slot so it preserves pass-by-ref values. */
> +               ExecMaterializeSlot(outslot);
> ```
>
> Can you tell me why ExecMaterializeSlot() needs to be added? Maybe you've 
> added
> to make the code consistent with RelationFindReplTupleByIndex(), but till now
> any errors have not been reported due to the missing materialization. Is 
> there a
> possibility that even RelationFindReplTupleByIndex() have done the unnecessary
> materialization?

The first part of the fix is this. RelationFindReplTupleSeq currently
allocates an additional tuple slot and copies the result slot into the
output slot. This is a redundant tuple copy, unlike its friend
RelationFindReplTupleByIndex, which directly stores the result tuple
into the output slot. I fixed RelationFindReplTupleSeq to use the same
approach that RelationFindReplTupleByIndex uses.

The second part of the fix is this. Before waiting for the in-progress
transaction to finish, we materialize the tuple slot. This, to me, is
unnecessary - the scan (sequential or index) already pins the buffer
referred to by the tuple slot (of TTSOpsBufferHeapTuple type), and a
pinned buffer is never evicted by the clock-sweep (StrategyGetBuffer
skips any buffer with a non-zero refcount), so materializing the slot
to guard against eviction is unnecessary. Once the wait for the
in-progress transaction is over, we don't do anything with the
materialized slot or the previous output slot, because we restart the
lookup anyway. Another important point is that we don't even need the
slot materialized before taking the tuple lock, because the tuple lock
relies only on the TID of the row found in the loop above (sequential
or index scan) - it fills in the slot with the tuple and transfers the
buffer pin anyway. So in the attached v3 patch I ended up
materializing the slot just once, before returning it to the caller -
neither the wait for the in-progress transaction nor the tuple-lock
retry path needs it.

> ```
> +# Create subscriber.
> +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
> +$node_subscriber->init(allows_streaming => 'logical');
> +$node_subscriber->start;
> ```
>
> Minor point; allows_streaming is not set for the logical.

Fixed.

> ```
>         $node_subscriber->append_conf('postgresql.conf',
>                 "shared_preload_libraries = 'injection_points'");
>         $node_subscriber->restart;
> ```
>
> No need to set the shared_preload_libraries and restart.

Fixed.

Please find the attached v3 patch for further review. I would be happy
to split this patch into two if necessary.

--
Bharath Rupireddy
Amazon Web Services: https://aws.amazon.com
From 1676e3a8c93e94867d25df025785a59478ab7220 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Sat, 27 Jun 2026 08:47:39 +0000
Subject: [PATCH v3] Add tests for concurrent DML retry paths in logical
 replication apply

The retry paths in RelationFindReplTupleByIndex and
RelationFindReplTupleSeq for concurrent updates and deletes had no
test coverage. The apply worker uses a dirty snapshot to find the
target tuple on the subscriber (index scan when available,
sequential scan for REPLICA IDENTITY FULL), waits via
XactLockTableWait if an in-progress transaction is found, retries
the scan, then locks the tuple with table_tuple_lock and applies
the change.

This adds TAP tests for both retry paths: the XactLockTableWait
path via an in-progress transaction on the subscriber, and the
table_tuple_lock retry path (TM_Updated / TM_Deleted) via an
injection point that pauses the worker between finding and
locking the tuple.

While here, I noticed some redundant work in these functions and
tidied it up. First, RelationFindReplTupleSeq allocated a separate
scan slot and copied it into outslot; it now scans directly into
outslot, like RelationFindReplTupleByIndex. Second, both functions
materialized the slot before the wait and the lock. That is
unnecessary: the scan keeps the buffer pinned throughout the
lookup, and neither the wait nor the lock retry path uses the
slot, as both restart the lookup. The slot is now materialized
just once, before returning it to the caller.

Author: Bharath Rupireddy <[email protected]>
Reviewed-by: Hayato Kuroda <[email protected]>
Discussion: https://www.postgresql.org/message-id/CALj2ACV6ESpggn2Az%3DOdZBzVx7jiBG9O_EdKdgbk2chAJusC2w%40mail.gmail.com
---
 src/backend/executor/execReplication.c        |  34 ++-
 src/test/subscription/meson.build             |   1 +
 .../t/039_concurrent_dml_retry.pl             | 221 ++++++++++++++++++
 3 files changed, 246 insertions(+), 10 deletions(-)
 create mode 100644 src/test/subscription/t/039_concurrent_dml_retry.pl

diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index b2ca5cbf117..e19e847c085 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -31,6 +31,7 @@
 #include "replication/logicalrelation.h"
 #include "storage/lmgr.h"
 #include "utils/builtins.h"
+#include "utils/injection_point.h"
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
@@ -229,8 +230,6 @@ retry:
 				continue;
 		}
 
-		ExecMaterializeSlot(outslot);
-
 		xwait = TransactionIdIsValid(snap.xmin) ?
 			snap.xmin : snap.xmax;
 
@@ -255,6 +254,8 @@ retry:
 		TM_FailureData tmfd;
 		TM_Result	res;
 
+		INJECTION_POINT("find-repl-tuple-by-index-before-lock", NULL);
+
 		PushActiveSnapshot(GetLatestSnapshot());
 
 		res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
@@ -269,6 +270,14 @@ retry:
 
 		if (should_refetch_tuple(res, &tmfd))
 			goto retry;
+
+		/*
+		 * Materialize the slot once before returning, so the tuple no longer
+		 * depends on the buffer.  Materializing earlier is unnecessary: the
+		 * scan keeps the buffer pinned, and neither the wait nor the lock
+		 * retry path uses the slot, as both restart the lookup.
+		 */
+		ExecMaterializeSlot(outslot);
 	}
 
 	index_endscan(scan);
@@ -370,7 +379,6 @@ bool
 RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
 						 TupleTableSlot *searchslot, TupleTableSlot *outslot)
 {
-	TupleTableSlot *scanslot;
 	TableScanDesc scan;
 	SnapshotData snap;
 	TypeCacheEntry **eq;
@@ -386,7 +394,6 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
 	InitDirtySnapshot(snap);
 	scan = table_beginscan(rel, &snap, 0, NULL,
 						   SO_NONE);
-	scanslot = table_slot_create(rel, NULL);
 
 retry:
 	found = false;
@@ -394,14 +401,11 @@ retry:
 	table_rescan(scan, NULL);
 
 	/* Try to find the tuple */
-	while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
+	while (table_scan_getnextslot(scan, ForwardScanDirection, outslot))
 	{
-		if (!tuples_equal(scanslot, searchslot, eq, NULL))
+		if (!tuples_equal(outslot, searchslot, eq, NULL))
 			continue;
 
-		found = true;
-		ExecCopySlot(outslot, scanslot);
-
 		xwait = TransactionIdIsValid(snap.xmin) ?
 			snap.xmin : snap.xmax;
 
@@ -416,6 +420,7 @@ retry:
 		}
 
 		/* Found our tuple and it's not locked */
+		found = true;
 		break;
 	}
 
@@ -425,6 +430,8 @@ retry:
 		TM_FailureData tmfd;
 		TM_Result	res;
 
+		INJECTION_POINT("find-repl-tuple-seq-before-lock", NULL);
+
 		PushActiveSnapshot(GetLatestSnapshot());
 
 		res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
@@ -439,10 +446,17 @@ retry:
 
 		if (should_refetch_tuple(res, &tmfd))
 			goto retry;
+
+		/*
+		 * Materialize the slot once before returning, so the tuple no longer
+		 * depends on the buffer.  Materializing earlier is unnecessary: the
+		 * scan keeps the buffer pinned, and neither the wait nor the lock
+		 * retry path uses the slot, as both restart the lookup.
+		 */
+		ExecMaterializeSlot(outslot);
 	}
 
 	table_endscan(scan);
-	ExecDropSingleTupleTableSlot(scanslot);
 
 	return found;
 }
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index e71e95c6297..5df7f143721 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -48,6 +48,7 @@ tests += {
       't/036_sequences.pl',
       't/037_except.pl',
       't/038_walsnd_shutdown_timeout.pl',
+      't/039_concurrent_dml_retry.pl',
       't/100_bugs.pl',
     ],
   },
diff --git a/src/test/subscription/t/039_concurrent_dml_retry.pl b/src/test/subscription/t/039_concurrent_dml_retry.pl
new file mode 100644
index 00000000000..3e983af5e67
--- /dev/null
+++ b/src/test/subscription/t/039_concurrent_dml_retry.pl
@@ -0,0 +1,221 @@
+# Copyright (c) 2026, PostgreSQL Global Development Group
+
+# Test concurrent DML retry paths in logical replication apply.
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Create publisher.
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber.
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Create tables on both sides and set up replication.
+$node_publisher->safe_psql('postgres', qq(
+	CREATE TABLE test_tab (a int PRIMARY KEY, b int, c text);
+	CREATE TABLE test_tab_full (a int, b int, c text);
+	ALTER TABLE test_tab_full REPLICA IDENTITY FULL;
+));
+
+$node_subscriber->safe_psql('postgres', qq(
+	CREATE TABLE test_tab (a int PRIMARY KEY, b int, c text);
+	CREATE TABLE test_tab_full (a int, b int, c text);
+	ALTER TABLE test_tab_full REPLICA IDENTITY FULL;
+));
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION test_pub FOR TABLE test_tab, test_tab_full;");
+
+my $appname = 'test_sub';
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION test_sub
+	 CONNECTION '$publisher_connstr application_name=$appname'
+	 PUBLICATION test_pub;");
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+# Insert test data.  Rows 1-2 are used by XactLockTableWait tests,
+# rows 3-4 by injection-point tests.
+$node_publisher->safe_psql('postgres', qq(
+	INSERT INTO test_tab VALUES (1, 10, 'foo'), (2, 20, 'bar'),
+	                            (3, 30, 'baz'), (4, 40, 'qux');
+	INSERT INTO test_tab_full VALUES (1, 10, 'foo'), (2, 20, 'bar'),
+	                                 (3, 30, 'baz'), (4, 40, 'qux');
+));
+$node_publisher->wait_for_catchup($appname);
+
+# Test the apply worker retry when the dirty snapshot finds an in-progress
+# transaction on the target tuple, causing XactLockTableWait.
+
+my $sub_session = $node_subscriber->background_psql('postgres');
+
+sub test_retry_with_in_progress_xact
+{
+	my (%args) = @_;
+	my $test_name     = $args{name};
+	my $dml           = $args{dml};
+	my $verify_query  = $args{verify_query};
+	my $verify_result = $args{verify_result};
+
+	# Modify the target tuple in an in-progress transaction on the subscriber.
+	$sub_session->query_safe("BEGIN;");
+	$sub_session->query_safe($dml);
+
+	# Run DML on the publisher.
+	$node_publisher->safe_psql('postgres', $dml);
+
+	# Wait for the apply worker to block on XactLockTableWait.
+	$node_subscriber->poll_query_until('postgres', qq(
+		SELECT count(*) > 0 FROM pg_stat_activity
+		WHERE backend_type = 'logical replication apply worker'
+		  AND wait_event_type = 'Lock'
+		  AND wait_event = 'transactionid';
+	)) or die "Timed out waiting for apply worker to block on XactLockTableWait";
+
+	# Abort so the apply worker wakes up, retries, and applies successfully.
+	$sub_session->query_safe("ROLLBACK;");
+	$node_publisher->wait_for_catchup($appname);
+
+	# Verify the results.
+	my $result = $node_subscriber->safe_psql('postgres', $verify_query);
+	is($result, $verify_result, $test_name);
+}
+
+# Index scan (PK): publisher UPDATE with in-progress subscriber transaction.
+test_retry_with_in_progress_xact(
+	name          => 'XactLockTableWait index scan UPDATE',
+	dml           => "UPDATE test_tab SET c = 'foo_u' WHERE a = 1;",
+	verify_query  => "SELECT c FROM test_tab WHERE a = 1;",
+	verify_result => 'foo_u',
+);
+
+# Index scan (PK): publisher DELETE with in-progress subscriber transaction.
+test_retry_with_in_progress_xact(
+	name          => 'XactLockTableWait index scan DELETE',
+	dml           => "DELETE FROM test_tab WHERE a = 2;",
+	verify_query  => "SELECT count(*) FROM test_tab WHERE a = 2;",
+	verify_result => '0',
+);
+
+# Seq scan (REPLICA IDENTITY FULL): publisher UPDATE with in-progress
+# subscriber transaction.
+test_retry_with_in_progress_xact(
+	name          => 'XactLockTableWait seq scan UPDATE',
+	dml           => "UPDATE test_tab_full SET c = 'foo_u' WHERE a = 1;",
+	verify_query  => "SELECT c FROM test_tab_full WHERE a = 1;",
+	verify_result => 'foo_u',
+);
+
+# Seq scan (REPLICA IDENTITY FULL): publisher DELETE with in-progress
+# subscriber transaction.
+test_retry_with_in_progress_xact(
+	name          => 'XactLockTableWait seq scan DELETE',
+	dml           => "DELETE FROM test_tab_full WHERE a = 2;",
+	verify_query  => "SELECT count(*) FROM test_tab_full WHERE a = 2;",
+	verify_result => '0',
+);
+
+$sub_session->quit;
+
+# Test the apply worker retry when table_tuple_lock detects a concurrently
+# updated or deleted tuple (TM_Updated / TM_Deleted). An injection point pauses
+# the worker between finding the tuple and locking it, allowing concurrent DML
+# to intervene.
+
+sub test_retry_with_concurrent_dml_before_tuple_lock
+{
+	my (%args) = @_;
+	my $test_name     = $args{name};
+	my $inj_point     = $args{inj_point};
+	my $dml           = $args{dml};
+	my $expected_log  = $args{expected_log};
+	my $verify_query  = $args{verify_query};
+	my $verify_result = $args{verify_result};
+
+	$node_subscriber->safe_psql('postgres',
+		"SELECT injection_points_attach('$inj_point', 'wait');");
+
+	# Run DML on the publisher.
+	$node_publisher->safe_psql('postgres', $dml);
+
+	$node_subscriber->wait_for_event('logical replication apply worker',
+		$inj_point);
+
+	# Run DML on the subscriber.
+	$node_subscriber->safe_psql('postgres', $dml);
+
+	my $log_offset = -s $node_subscriber->logfile;
+
+	# Detach before wakeup so the retry doesn't hit the same injection point.
+	$node_subscriber->safe_psql('postgres',
+		"SELECT injection_points_detach('$inj_point');
+		 SELECT injection_points_wakeup('$inj_point');");
+
+	$node_subscriber->wait_for_log($expected_log, $log_offset);
+	pass("$test_name: concurrent modification detected and retried");
+
+	$node_publisher->wait_for_catchup($appname);
+
+	my $result = $node_subscriber->safe_psql('postgres', $verify_query);
+	is($result, $verify_result, "$test_name: data correct after retry");
+}
+
+# Check whether injection_points extension is available on the subscriber.
+my $injection_points_supported =
+	$node_subscriber->check_extension('injection_points');
+
+if ($injection_points_supported != 0)
+{
+	$node_subscriber->safe_psql('postgres',
+		"CREATE EXTENSION injection_points;");
+
+	# TM_Updated via index scan (PK).
+	test_retry_with_concurrent_dml_before_tuple_lock(
+		name          => 'index scan TM_Updated',
+		inj_point     => 'find-repl-tuple-by-index-before-lock',
+		dml           => "UPDATE test_tab SET c = 'baz_u' WHERE a = 3;",
+		expected_log  => qr/concurrent update, retrying/,
+		verify_query  => "SELECT c FROM test_tab WHERE a = 3;",
+		verify_result => 'baz_u',
+	);
+
+	# TM_Deleted via index scan (PK).
+	test_retry_with_concurrent_dml_before_tuple_lock(
+		name          => 'index scan TM_Deleted',
+		inj_point     => 'find-repl-tuple-by-index-before-lock',
+		dml           => "DELETE FROM test_tab WHERE a = 4;",
+		expected_log  => qr/concurrent delete, retrying/,
+		verify_query  => "SELECT count(*) FROM test_tab WHERE a = 4;",
+		verify_result => '0',
+	);
+
+	# TM_Updated via seq scan (REPLICA IDENTITY FULL).
+	test_retry_with_concurrent_dml_before_tuple_lock(
+		name          => 'seq scan TM_Updated',
+		inj_point     => 'find-repl-tuple-seq-before-lock',
+		dml           => "UPDATE test_tab_full SET c = 'baz_u' WHERE a = 3;",
+		expected_log  => qr/concurrent update, retrying/,
+		verify_query  => "SELECT c FROM test_tab_full WHERE a = 3;",
+		verify_result => 'baz_u',
+	);
+
+	# TM_Deleted via seq scan (REPLICA IDENTITY FULL).
+	test_retry_with_concurrent_dml_before_tuple_lock(
+		name          => 'seq scan TM_Deleted',
+		inj_point     => 'find-repl-tuple-seq-before-lock',
+		dml           => "DELETE FROM test_tab_full WHERE a = 4;",
+		expected_log  => qr/concurrent delete, retrying/,
+		verify_query  => "SELECT count(*) FROM test_tab_full WHERE a = 4;",
+		verify_result => '0',
+	);
+}
+
+done_testing();
-- 
2.47.3

Reply via email to