I tried setting wal_retrieve_retry_interval to 1ms for all TAP tests
(similar to what was done in 2710ccd), and I noticed that the recovery
tests consistently took much longer. Upon further inspection, it looks
like the same (or a very similar) race condition described in e5d494d's
commit message [0]. With some added debug logs, I see that all of the
callers of MaybeStartWalReceiver() complete before SIGCHLD is processed, so
ServerLoop() waits for a minute before starting the WAL receiver.
A simple fix is to have DetermineSleepTime() take the WalReceiverRequested
flag into consideration. The attached 0002 patch shortens the sleep time
to 100ms if it looks like we are waiting on a SIGCHLD. I'm not certain
this is the best approach, but it seems to fix the tests.
On my machine, I see the following improvements in the tests (all units in
seconds):
HEAD patched (v9)
check-world -j8 165 138
subscription 120 75
recovery 111 108
[0] https://postgr.es/m/21344.1498494720%40sss.pgh.pa.us
--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From a80c2b3b2a80d024b72be9fca6b5d4136b8b8272 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <[email protected]>
Date: Mon, 21 Nov 2022 16:01:01 -0800
Subject: [PATCH v9 1/3] wake up logical workers as needed instead of relying
on periodic wakeups
---
src/backend/access/transam/xact.c | 3 ++
src/backend/commands/alter.c | 7 ++++
src/backend/commands/subscriptioncmds.c | 4 ++
src/backend/replication/logical/tablesync.c | 10 +++++
src/backend/replication/logical/worker.c | 46 +++++++++++++++++++++
src/include/replication/logicalworker.h | 3 ++
6 files changed, 73 insertions(+)
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b7c7fd9f00..70ad51c591 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -47,6 +47,7 @@
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
@@ -2360,6 +2361,7 @@ CommitTransaction(void)
AtEOXact_PgStat(true, is_parallel_worker);
AtEOXact_Snapshot(true, false);
AtEOXact_ApplyLauncher(true);
+ AtEOXact_LogicalRepWorkers(true);
pgstat_report_xact_timestamp(0);
CurrentResourceOwner = NULL;
@@ -2860,6 +2862,7 @@ AbortTransaction(void)
AtEOXact_HashTables(false);
AtEOXact_PgStat(false, is_parallel_worker);
AtEOXact_ApplyLauncher(false);
+ AtEOXact_LogicalRepWorkers(false);
pgstat_report_xact_timestamp(0);
}
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 10b6fe19a2..d095cd3ced 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -59,6 +59,7 @@
#include "commands/user.h"
#include "miscadmin.h"
#include "parser/parse_func.h"
+#include "replication/logicalworker.h"
#include "rewrite/rewriteDefine.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
@@ -279,6 +280,12 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
if (strncmp(new_name, "regress_", 8) != 0)
elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
#endif
+
+ /*
+ * Wake up the logical replication workers to handle this change
+ * quickly.
+ */
+ LogicalRepWorkersWakeupAtCommit(objectId);
}
else if (nameCacheId >= 0)
{
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index d673557ea4..d6993c26e5 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
#include "nodes/makefuncs.h"
#include "pgstat.h"
#include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
@@ -1362,6 +1363,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
+ /* Wake up the logical replication workers to handle this change quickly. */
+ LogicalRepWorkersWakeupAtCommit(subid);
+
return myself;
}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 94e813ac53..509fe2eb19 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -105,6 +105,7 @@
#include "pgstat.h"
#include "replication/logicallauncher.h"
#include "replication/logicalrelation.h"
+#include "replication/logicalworker.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "replication/slot.h"
@@ -619,6 +620,15 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
if (started_tx)
{
+ /*
+ * If we are ready to enable two_phase mode, wake up the logical
+ * replication workers to handle this change quickly.
+ */
+ CommandCounterIncrement();
+ if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+ AllTablesyncsReady())
+ LogicalRepWorkersWakeupAtCommit(MyLogicalRepWorker->subid);
+
CommitTransactionCommand();
pgstat_report_stat(true);
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 96772e4d73..722f796c7a 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -254,6 +254,8 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
Subscription *MySubscription = NULL;
static bool MySubscriptionValid = false;
+static List *on_commit_wakeup_workers_subids = NIL;
+
bool in_remote_transaction = false;
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
@@ -4097,3 +4099,47 @@ reset_apply_error_context_info(void)
apply_error_callback_arg.remote_attnum = -1;
set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
}
+
+/*
+ * Wakeup the stored subscriptions' workers on commit if requested.
+ */
+void
+AtEOXact_LogicalRepWorkers(bool isCommit)
+{
+ if (isCommit && on_commit_wakeup_workers_subids != NIL)
+ {
+ ListCell *subid;
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+ foreach(subid, on_commit_wakeup_workers_subids)
+ {
+ List *workers;
+ ListCell *worker;
+
+ workers = logicalrep_workers_find(lfirst_oid(subid), true);
+ foreach(worker, workers)
+ logicalrep_worker_wakeup_ptr((LogicalRepWorker *) lfirst(worker));
+ }
+ LWLockRelease(LogicalRepWorkerLock);
+ }
+
+ on_commit_wakeup_workers_subids = NIL;
+}
+
+/*
+ * Request wakeup of the workers for the given subscription ID on commit of the
+ * transaction.
+ *
+ * This is used to ensure that the workers process assorted changes as soon as
+ * possible.
+ */
+void
+LogicalRepWorkersWakeupAtCommit(Oid subid)
+{
+ MemoryContext oldcxt;
+
+ oldcxt = MemoryContextSwitchTo(TopTransactionContext);
+ on_commit_wakeup_workers_subids = list_append_unique_oid(on_commit_wakeup_workers_subids,
+ subid);
+ MemoryContextSwitchTo(oldcxt);
+}
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index cd1b6e8afc..2c2340d758 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -16,4 +16,7 @@ extern void ApplyWorkerMain(Datum main_arg);
extern bool IsLogicalWorker(void);
+extern void LogicalRepWorkersWakeupAtCommit(Oid subid);
+extern void AtEOXact_LogicalRepWorkers(bool isCommit);
+
#endif /* LOGICALWORKER_H */
--
2.25.1
>From 0b40b322652f5f7f538381a4f299deb94f465c94 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <[email protected]>
Date: Thu, 15 Dec 2022 14:11:43 -0800
Subject: [PATCH v9 2/3] handle race condition when restarting wal receivers
---
src/backend/postmaster/postmaster.c | 24 ++++++++++++++++++------
1 file changed, 18 insertions(+), 6 deletions(-)
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index f459dab360..4107a85b51 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1601,9 +1601,9 @@ checkControlFile(void)
*
* In normal conditions we wait at most one minute, to ensure that the other
* background tasks handled by ServerLoop get done even when no requests are
- * arriving. However, if there are background workers waiting to be started,
- * we don't actually sleep so that they are quickly serviced. Other exception
- * cases are as shown in the code.
+ * arriving. However, if there are background workers or a WAL receiver
+ * waiting to be started, we make sure they are quickly serviced. Other
+ * exception cases are as shown in the code.
*/
static void
DetermineSleepTime(struct timeval *timeout)
@@ -1611,11 +1611,12 @@ DetermineSleepTime(struct timeval *timeout)
TimestampTz next_wakeup = 0;
/*
- * Normal case: either there are no background workers at all, or we're in
- * a shutdown sequence (during which we ignore bgworkers altogether).
+ * Normal case: either there are no background workers and no WAL receiver
+ * at all, or we're in a shutdown sequence (during which we ignore
+ * bgworkers altogether).
*/
if (Shutdown > NoShutdown ||
- (!StartWorkerNeeded && !HaveCrashedWorker))
+ (!StartWorkerNeeded && !HaveCrashedWorker && !WalReceiverRequested))
{
if (AbortStartTime != 0)
{
@@ -1640,6 +1641,17 @@ DetermineSleepTime(struct timeval *timeout)
return;
}
+ /*
+ * We're probably waiting for SIGCHLD before starting the WAL receiver. We
+ * don't expect that to take long.
+ */
+ if (WalReceiverRequested)
+ {
+ timeout->tv_sec = 0;
+ timeout->tv_usec = 100000; /* 100ms */
+ return;
+ }
+
if (HaveCrashedWorker)
{
slist_mutable_iter siter;
--
2.25.1
>From 5fd191a08e4b47b393f5a049dc9e683d1a2df16f Mon Sep 17 00:00:00 2001
From: Nathan Bossart <[email protected]>
Date: Thu, 15 Dec 2022 09:16:22 -0800
Subject: [PATCH v9 3/3] set wal_retrieve_retry_interval to 1ms in tests
---
src/test/perl/PostgreSQL/Test/Cluster.pm | 2 +-
src/test/recovery/t/002_archiving.pl | 2 --
src/test/subscription/t/004_sync.pl | 2 --
3 files changed, 1 insertion(+), 5 deletions(-)
diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm
index 7411188dc8..c6aeee07fc 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -532,7 +532,7 @@ sub init
print $conf "log_line_prefix = '%m [%p] %q%a '\n";
print $conf "log_statement = all\n";
print $conf "log_replication_commands = on\n";
- print $conf "wal_retrieve_retry_interval = '500ms'\n";
+ print $conf "wal_retrieve_retry_interval = '1ms'\n";
# If a setting tends to affect whether tests pass or fail, print it after
# TEMP_CONFIG. Otherwise, print it before TEMP_CONFIG, thereby permitting
diff --git a/src/test/recovery/t/002_archiving.pl b/src/test/recovery/t/002_archiving.pl
index d69da4e5ef..8e269671b2 100644
--- a/src/test/recovery/t/002_archiving.pl
+++ b/src/test/recovery/t/002_archiving.pl
@@ -28,8 +28,6 @@ my $node_standby = PostgreSQL::Test::Cluster->new('standby');
# of the primary.
$node_standby->init_from_backup($node_primary, $backup_name,
has_restoring => 1);
-$node_standby->append_conf('postgresql.conf',
- "wal_retrieve_retry_interval = '100ms'");
# Set archive_cleanup_command and recovery_end_command, checking their
# execution by the backend with dummy commands.
diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl
index fd4bf7bacd..edb6fbf3a6 100644
--- a/src/test/subscription/t/004_sync.pl
+++ b/src/test/subscription/t/004_sync.pl
@@ -16,8 +16,6 @@ $node_publisher->start;
# Create subscriber node
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
-$node_subscriber->append_conf('postgresql.conf',
- "wal_retrieve_retry_interval = 1ms");
$node_subscriber->start;
# Create some preexisting content on publisher
--
2.25.1