Hi hackers, While reading the logical replication apply code in execReplication.c, I noticed that the retry paths in RelationFindReplTupleByIndex and RelationFindReplTupleSeq for concurrent updates and deletes have no test coverage [1]. Specifically, when the same tuple is being updated/deleted on the publisher and subscriber at the same time, the dirty snapshot finds the tuple being modified by another transaction, the apply worker waits and retries the index/sequential scan.
The attached patch adds an injection point before table_tuple_lock and a TAP test exercising these retry paths, hitting both TM_Updated and TM_Deleted. While working on this, I also noticed minor issues in the conflict handling code: 1/ In RelationFindReplTupleByIndex, ExecMaterializeSlot was called before checking should_refetch_tuple. If the tuple needs to be refetched due to a concurrent modification, this materialization is wasted work. Moved it after the retry check, so it only runs when we've successfully locked the tuple. 2/ In RelationFindReplTupleSeq, ExecCopySlot and a separate TupleTableSlot allocation were unnecessary. Made this function consistent with RelationFindReplTupleByIndex by using outslot directly while scanning the heap, avoiding the extra TTS allocation and copy overhead. I'm aware that these are not major performance issues in practice, but it keeps the two functions consistent and avoids unnecessary TTS materialize and copy costs. I also think that we could deduplicate these two functions since the code looks mostly the same, but that would be an overkill IMHO. Please find the attached patch. Appreciate any feedback. Thank you! [1] https://coverage.postgresql.org/src/backend/executor/execReplication.c.gcov.html -- Bharath Rupireddy Amazon Web Services: https://aws.amazon.com
From bcf3cb77bd1ee923f128bc4607b66a6287762743 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Tue, 21 Apr 2026 20:24:02 +0000 Subject: [PATCH v1] Add tests for concurrent DML retry paths in logical replication apply. The retry paths in RelationFindReplTupleByIndex and RelationFindReplTupleSeq for concurrent updates and deletes had no test coverage. When a tuple is concurrently modified on the subscriber while the apply worker is trying to lock it, table_tuple_lock returns TM_Updated or TM_Deleted, and the worker retries the scan. This commit adds an injection point and a TAP test that exercises these retry paths for both index scan and sequential scan. While here, fix minor inefficiency in the retry handling. In RelationFindReplTupleByIndex, avoid calling ExecMaterializeSlot before the retry check. In RelationFindReplTupleSeq, remove the unnecessary separate TupleTableSlot allocation and ExecCopySlot call by using outslot directly for scanning, keeping it consistent with RelationFindReplTupleByIndex. --- src/backend/executor/execReplication.c | 25 +-- src/test/subscription/meson.build | 1 + .../t/039_concurrent_dml_retry.pl | 152 ++++++++++++++++++ 3 files changed, 168 insertions(+), 10 deletions(-) create mode 100644 src/test/subscription/t/039_concurrent_dml_retry.pl diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index b2ca5cbf117..0d219e35daa 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -37,6 +37,8 @@ #include "utils/syscache.h" #include "utils/typcache.h" +#include "utils/injection_point.h" + static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, TypeCacheEntry **eq, Bitmapset *columns); @@ -229,8 +231,6 @@ retry: continue; } - ExecMaterializeSlot(outslot); - xwait = TransactionIdIsValid(snap.xmin) ? snap.xmin : snap.xmax; @@ -255,6 +255,8 @@ retry: TM_FailureData tmfd; TM_Result res; + INJECTION_POINT("find-repl-tuple-before-lock", NULL); + PushActiveSnapshot(GetLatestSnapshot()); res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(), @@ -269,6 +271,9 @@ retry: if (should_refetch_tuple(res, &tmfd)) goto retry; + + /* Materialize the slot so it preserves pass-by-ref values. */ + ExecMaterializeSlot(outslot); } index_endscan(scan); @@ -370,7 +375,6 @@ bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot) { - TupleTableSlot *scanslot; TableScanDesc scan; SnapshotData snap; TypeCacheEntry **eq; @@ -386,7 +390,6 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, InitDirtySnapshot(snap); scan = table_beginscan(rel, &snap, 0, NULL, SO_NONE); - scanslot = table_slot_create(rel, NULL); retry: found = false; @@ -394,14 +397,11 @@ retry: table_rescan(scan, NULL); /* Try to find the tuple */ - while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot)) + while (table_scan_getnextslot(scan, ForwardScanDirection, outslot)) { - if (!tuples_equal(scanslot, searchslot, eq, NULL)) + if (!tuples_equal(outslot, searchslot, eq, NULL)) continue; - found = true; - ExecCopySlot(outslot, scanslot); - xwait = TransactionIdIsValid(snap.xmin) ? snap.xmin : snap.xmax; @@ -416,6 +416,7 @@ retry: } /* Found our tuple and it's not locked */ + found = true; break; } @@ -425,6 +426,8 @@ retry: TM_FailureData tmfd; TM_Result res; + INJECTION_POINT("find-repl-tuple-before-lock", NULL); + PushActiveSnapshot(GetLatestSnapshot()); res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(), @@ -439,10 +442,12 @@ retry: if (should_refetch_tuple(res, &tmfd)) goto retry; + + /* Materialize the slot so it preserves pass-by-ref values. */ + ExecMaterializeSlot(outslot); } table_endscan(scan); - ExecDropSingleTupleTableSlot(scanslot); return found; } diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index e71e95c6297..5df7f143721 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -48,6 +48,7 @@ tests += { 't/036_sequences.pl', 't/037_except.pl', 't/038_walsnd_shutdown_timeout.pl', + 't/039_concurrent_dml_retry.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/039_concurrent_dml_retry.pl b/src/test/subscription/t/039_concurrent_dml_retry.pl new file mode 100644 index 00000000000..074aeafbcc4 --- /dev/null +++ b/src/test/subscription/t/039_concurrent_dml_retry.pl @@ -0,0 +1,152 @@ +# Copyright (c) 2025-2026, PostgreSQL Global Development Group + +# Test concurrent update/delete retry paths in logical replication apply. +# +# Uses injection points to pause the apply worker after finding a tuple but +# before locking it, allowing a concurrent session to modify or delete the +# same row. +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Check if injection points are available. +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +if ($node_publisher->check_extension('injection_points') == 0) +{ + $node_publisher->stop; + plan skip_all => 'injection_points not supported'; +} + +# Subscriber needs injection_points loaded. +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', + "shared_preload_libraries = 'injection_points'"); +$node_subscriber->start; + +# Create tables on both publisher and subscriber, and set up replication. +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE test_tab (a int PRIMARY KEY, b int); + CREATE TABLE test_tab_full (a int, b int); + ALTER TABLE test_tab_full REPLICA IDENTITY FULL; +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE test_tab (a int PRIMARY KEY, b int); + CREATE TABLE test_tab_full (a int, b int); + ALTER TABLE test_tab_full REPLICA IDENTITY FULL; + CREATE EXTENSION injection_points; +)); + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION test_pub FOR TABLE test_tab, test_tab_full;"); + +my $appname = 'test_sub'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION test_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION test_pub;"); + +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); + +# Pre-insert all test data in a single batch to avoid multiple +# wait_for_catchup round trips. +$node_publisher->safe_psql('postgres', qq( + INSERT INTO test_tab VALUES (1, 10), (2, 30); + INSERT INTO test_tab_full VALUES (1, 100), (2, 300); +)); +$node_publisher->wait_for_catchup($appname); + +# Helper to run a single concurrent DML retry test using injection points. +sub test_concurrent_retry +{ + my (%args) = @_; + my $test_name = $args{name}; + my $pub_dml = $args{pub_dml}; + my $sub_dml = $args{sub_dml}; + my $expected_log = $args{expected_log}; + my $verify_query = $args{verify_query}; + my $verify_result = $args{verify_result}; + + # Attach injection point. + $node_subscriber->safe_psql('postgres', + "SELECT injection_points_attach('find-repl-tuple-before-lock', 'wait');"); + + # Publish the DML that will trigger the apply worker to find + lock. + $node_publisher->safe_psql('postgres', $pub_dml); + + # Wait for the apply worker to hit the injection point. + $node_subscriber->wait_for_event('logical replication apply worker', + 'find-repl-tuple-before-lock'); + + # Execute concurrent DML on subscriber while apply worker is paused. + $node_subscriber->safe_psql('postgres', $sub_dml); + + my $log_offset = -s $node_subscriber->logfile; + + # Detach first so the retry loop doesn't hit the injection point again, + # then wake up the apply worker. + $node_subscriber->safe_psql('postgres', + "SELECT injection_points_detach('find-repl-tuple-before-lock'); + SELECT injection_points_wakeup('find-repl-tuple-before-lock');"); + + # Confirm the expected log message. + $node_subscriber->wait_for_log($expected_log, $log_offset); + pass("$test_name: concurrent modification detected and retried"); + + # Wait for apply to finish and verify result. + $node_publisher->wait_for_catchup($appname); + + my $result = $node_subscriber->safe_psql('postgres', $verify_query); + is($result, $verify_result, "$test_name: data correct after retry"); +} + +# TM_Updated via index scan (PK): concurrent update on subscriber. +test_concurrent_retry( + name => 'index scan TM_Updated', + pub_dml => "UPDATE test_tab SET b = 20 WHERE a = 1;", + sub_dml => "UPDATE test_tab SET b = 99 WHERE a = 1;", + expected_log => qr/concurrent update, retrying/, + verify_query => "SELECT b FROM test_tab WHERE a = 1;", + verify_result => '20', +); + +# TM_Deleted via index scan (PK): concurrent delete on subscriber. +test_concurrent_retry( + name => 'index scan TM_Deleted', + pub_dml => "UPDATE test_tab SET b = 40 WHERE a = 2;", + sub_dml => "DELETE FROM test_tab WHERE a = 2;", + expected_log => qr/concurrent delete, retrying/, + verify_query => "SELECT count(*) FROM test_tab WHERE a = 2;", + verify_result => '0', +); + +# TM_Updated via seq scan (REPLICA IDENTITY FULL): concurrent update on +# subscriber. +test_concurrent_retry( + name => 'seq scan TM_Updated', + pub_dml => "UPDATE test_tab_full SET b = 200 WHERE a = 1;", + sub_dml => "UPDATE test_tab_full SET b = 999 WHERE a = 1;", + expected_log => qr/concurrent update, retrying/, + verify_query => "SELECT b FROM test_tab_full WHERE a = 1;", + verify_result => '999', +); + +# TM_Deleted via seq scan (REPLICA IDENTITY FULL): concurrent delete on +# subscriber. +test_concurrent_retry( + name => 'seq scan TM_Deleted', + pub_dml => "UPDATE test_tab_full SET b = 400 WHERE a = 2;", + sub_dml => "DELETE FROM test_tab_full WHERE a = 2;", + expected_log => qr/concurrent delete, retrying/, + verify_query => "SELECT count(*) FROM test_tab_full WHERE a = 2;", + verify_result => '0', +); + +done_testing(); -- 2.47.3
