On Sun, Dec 18, 2022 at 03:36:07PM -0800, Nathan Bossart wrote: > This seems to have somehow broken the archiving tests on Windows, so > obviously I owe some better analysis here. I didn't see anything obvious > in the logs, but I will continue to dig.
On Windows, WaitForWALToBecomeAvailable() seems to depend on the call to WaitLatch() for wal_retrieve_retry_interval to ensure that signals are dispatched (i.e., pgwin32_dispatch_queued_signals()). My first instinct is to just always call WaitLatch() in this code path, even if wal_retrieve_rety_interval milliseconds have already elapsed. The attached 0003 does this. -- Nathan Bossart Amazon Web Services: https://aws.amazon.com
>From d1025a5d7ad9a966f7ec8bee4bb8127e8a0e1d8b Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nathandboss...@gmail.com> Date: Mon, 21 Nov 2022 16:01:01 -0800 Subject: [PATCH v10 1/4] wake up logical workers as needed instead of relying on periodic wakeups --- src/backend/access/transam/xact.c | 3 ++ src/backend/commands/alter.c | 7 ++++ src/backend/commands/subscriptioncmds.c | 4 ++ src/backend/replication/logical/tablesync.c | 10 +++++ src/backend/replication/logical/worker.c | 46 +++++++++++++++++++++ src/include/replication/logicalworker.h | 3 ++ 6 files changed, 73 insertions(+) diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index b7c7fd9f00..70ad51c591 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -47,6 +47,7 @@ #include "pgstat.h" #include "replication/logical.h" #include "replication/logicallauncher.h" +#include "replication/logicalworker.h" #include "replication/origin.h" #include "replication/snapbuild.h" #include "replication/syncrep.h" @@ -2360,6 +2361,7 @@ CommitTransaction(void) AtEOXact_PgStat(true, is_parallel_worker); AtEOXact_Snapshot(true, false); AtEOXact_ApplyLauncher(true); + AtEOXact_LogicalRepWorkers(true); pgstat_report_xact_timestamp(0); CurrentResourceOwner = NULL; @@ -2860,6 +2862,7 @@ AbortTransaction(void) AtEOXact_HashTables(false); AtEOXact_PgStat(false, is_parallel_worker); AtEOXact_ApplyLauncher(false); + AtEOXact_LogicalRepWorkers(false); pgstat_report_xact_timestamp(0); } diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index 10b6fe19a2..d095cd3ced 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -59,6 +59,7 @@ #include "commands/user.h" #include "miscadmin.h" #include "parser/parse_func.h" +#include "replication/logicalworker.h" #include "rewrite/rewriteDefine.h" #include "tcop/utility.h" #include "utils/builtins.h" @@ -279,6 +280,12 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name) if (strncmp(new_name, "regress_", 8) != 0) elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\""); #endif + + /* + * Wake up the logical replication workers to handle this change + * quickly. + */ + LogicalRepWorkersWakeupAtCommit(objectId); } else if (nameCacheId >= 0) { diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index d673557ea4..d6993c26e5 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -34,6 +34,7 @@ #include "nodes/makefuncs.h" #include "pgstat.h" #include "replication/logicallauncher.h" +#include "replication/logicalworker.h" #include "replication/origin.h" #include "replication/slot.h" #include "replication/walreceiver.h" @@ -1362,6 +1363,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0); + /* Wake up the logical replication workers to handle this change quickly. */ + LogicalRepWorkersWakeupAtCommit(subid); + return myself; } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 94e813ac53..509fe2eb19 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -105,6 +105,7 @@ #include "pgstat.h" #include "replication/logicallauncher.h" #include "replication/logicalrelation.h" +#include "replication/logicalworker.h" #include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "replication/slot.h" @@ -619,6 +620,15 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) if (started_tx) { + /* + * If we are ready to enable two_phase mode, wake up the logical + * replication workers to handle this change quickly. + */ + CommandCounterIncrement(); + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && + AllTablesyncsReady()) + LogicalRepWorkersWakeupAtCommit(MyLogicalRepWorker->subid); + CommitTransactionCommand(); pgstat_report_stat(true); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 96772e4d73..722f796c7a 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -254,6 +254,8 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL; Subscription *MySubscription = NULL; static bool MySubscriptionValid = false; +static List *on_commit_wakeup_workers_subids = NIL; + bool in_remote_transaction = false; static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; @@ -4097,3 +4099,47 @@ reset_apply_error_context_info(void) apply_error_callback_arg.remote_attnum = -1; set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr); } + +/* + * Wakeup the stored subscriptions' workers on commit if requested. + */ +void +AtEOXact_LogicalRepWorkers(bool isCommit) +{ + if (isCommit && on_commit_wakeup_workers_subids != NIL) + { + ListCell *subid; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + foreach(subid, on_commit_wakeup_workers_subids) + { + List *workers; + ListCell *worker; + + workers = logicalrep_workers_find(lfirst_oid(subid), true); + foreach(worker, workers) + logicalrep_worker_wakeup_ptr((LogicalRepWorker *) lfirst(worker)); + } + LWLockRelease(LogicalRepWorkerLock); + } + + on_commit_wakeup_workers_subids = NIL; +} + +/* + * Request wakeup of the workers for the given subscription ID on commit of the + * transaction. + * + * This is used to ensure that the workers process assorted changes as soon as + * possible. + */ +void +LogicalRepWorkersWakeupAtCommit(Oid subid) +{ + MemoryContext oldcxt; + + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + on_commit_wakeup_workers_subids = list_append_unique_oid(on_commit_wakeup_workers_subids, + subid); + MemoryContextSwitchTo(oldcxt); +} diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index cd1b6e8afc..2c2340d758 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -16,4 +16,7 @@ extern void ApplyWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); +extern void LogicalRepWorkersWakeupAtCommit(Oid subid); +extern void AtEOXact_LogicalRepWorkers(bool isCommit); + #endif /* LOGICALWORKER_H */ -- 2.25.1
>From b16d2c4b8b32b609f5dc5c5ff3527b8f986ab0d8 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nathandboss...@gmail.com> Date: Thu, 15 Dec 2022 14:11:43 -0800 Subject: [PATCH v10 2/4] handle race condition when restarting wal receivers --- src/backend/postmaster/postmaster.c | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index f459dab360..4107a85b51 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -1601,9 +1601,9 @@ checkControlFile(void) * * In normal conditions we wait at most one minute, to ensure that the other * background tasks handled by ServerLoop get done even when no requests are - * arriving. However, if there are background workers waiting to be started, - * we don't actually sleep so that they are quickly serviced. Other exception - * cases are as shown in the code. + * arriving. However, if there are background workers or a WAL receiver + * waiting to be started, we make sure they are quickly serviced. Other + * exception cases are as shown in the code. */ static void DetermineSleepTime(struct timeval *timeout) @@ -1611,11 +1611,12 @@ DetermineSleepTime(struct timeval *timeout) TimestampTz next_wakeup = 0; /* - * Normal case: either there are no background workers at all, or we're in - * a shutdown sequence (during which we ignore bgworkers altogether). + * Normal case: either there are no background workers and no WAL receiver + * at all, or we're in a shutdown sequence (during which we ignore + * bgworkers altogether). */ if (Shutdown > NoShutdown || - (!StartWorkerNeeded && !HaveCrashedWorker)) + (!StartWorkerNeeded && !HaveCrashedWorker && !WalReceiverRequested)) { if (AbortStartTime != 0) { @@ -1640,6 +1641,17 @@ DetermineSleepTime(struct timeval *timeout) return; } + /* + * We're probably waiting for SIGCHLD before starting the WAL receiver. We + * don't expect that to take long. + */ + if (WalReceiverRequested) + { + timeout->tv_sec = 0; + timeout->tv_usec = 100000; /* 100ms */ + return; + } + if (HaveCrashedWorker) { slist_mutable_iter siter; -- 2.25.1
>From 34a892e3d7f234f84914aaa5c46579eb3fabb291 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nathandboss...@gmail.com> Date: Sat, 31 Dec 2022 15:16:54 -0800 Subject: [PATCH v10 3/4] ensure signals are dispatched in startup process on Windows --- src/backend/access/transam/xlogrecovery.c | 36 ++++++++++++++--------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index d5a81f9d83..6c4c5a18a1 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -3466,6 +3466,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, */ if (lastSourceFailed) { + long wait_time; + /* * Don't allow any retry loops to occur during nonblocking * readahead. Let the caller process everything that has been @@ -3556,33 +3558,39 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, * and retry from the archive, but if it hasn't been long * since last attempt, sleep wal_retrieve_retry_interval * milliseconds to avoid busy-waiting. + * + * NB: Even if it's already been wal_retrieve_rety_interval + * milliseconds since the last attempt, we still call + * WaitLatch() with the timeout set to 0 to make sure any + * queued signals are dispatched on Windows builds. */ now = GetCurrentTimestamp(); if (!TimestampDifferenceExceeds(last_fail_time, now, wal_retrieve_retry_interval)) { - long wait_time; - wait_time = wal_retrieve_retry_interval - TimestampDifferenceMilliseconds(last_fail_time, now); elog(LOG, "waiting for WAL to become available at %X/%X", LSN_FORMAT_ARGS(RecPtr)); + } + else + wait_time = 0; - /* Do background tasks that might benefit us later. */ - KnownAssignedTransactionIdsIdleMaintenance(); + /* Do background tasks that might benefit us later. */ + KnownAssignedTransactionIdsIdleMaintenance(); - (void) WaitLatch(&XLogRecoveryCtl->recoveryWakeupLatch, - WL_LATCH_SET | WL_TIMEOUT | - WL_EXIT_ON_PM_DEATH, - wait_time, - WAIT_EVENT_RECOVERY_RETRIEVE_RETRY_INTERVAL); - ResetLatch(&XLogRecoveryCtl->recoveryWakeupLatch); - now = GetCurrentTimestamp(); + (void) WaitLatch(&XLogRecoveryCtl->recoveryWakeupLatch, + WL_LATCH_SET | WL_TIMEOUT | + WL_EXIT_ON_PM_DEATH, + wait_time, + WAIT_EVENT_RECOVERY_RETRIEVE_RETRY_INTERVAL); + ResetLatch(&XLogRecoveryCtl->recoveryWakeupLatch); + now = GetCurrentTimestamp(); + + /* Handle interrupt signals of startup process */ + HandleStartupProcInterrupts(); - /* Handle interrupt signals of startup process */ - HandleStartupProcInterrupts(); - } last_fail_time = now; currentSource = XLOG_FROM_ARCHIVE; break; -- 2.25.1
>From ea9ff25e584348cabc30d46778ebb7a538cef991 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nathandboss...@gmail.com> Date: Thu, 15 Dec 2022 09:16:22 -0800 Subject: [PATCH v10 4/4] set wal_retrieve_retry_interval to 1ms in tests --- src/test/perl/PostgreSQL/Test/Cluster.pm | 2 +- src/test/recovery/t/002_archiving.pl | 2 -- src/test/subscription/t/004_sync.pl | 2 -- 3 files changed, 1 insertion(+), 5 deletions(-) diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm index 7411188dc8..c6aeee07fc 100644 --- a/src/test/perl/PostgreSQL/Test/Cluster.pm +++ b/src/test/perl/PostgreSQL/Test/Cluster.pm @@ -532,7 +532,7 @@ sub init print $conf "log_line_prefix = '%m [%p] %q%a '\n"; print $conf "log_statement = all\n"; print $conf "log_replication_commands = on\n"; - print $conf "wal_retrieve_retry_interval = '500ms'\n"; + print $conf "wal_retrieve_retry_interval = '1ms'\n"; # If a setting tends to affect whether tests pass or fail, print it after # TEMP_CONFIG. Otherwise, print it before TEMP_CONFIG, thereby permitting diff --git a/src/test/recovery/t/002_archiving.pl b/src/test/recovery/t/002_archiving.pl index d69da4e5ef..8e269671b2 100644 --- a/src/test/recovery/t/002_archiving.pl +++ b/src/test/recovery/t/002_archiving.pl @@ -28,8 +28,6 @@ my $node_standby = PostgreSQL::Test::Cluster->new('standby'); # of the primary. $node_standby->init_from_backup($node_primary, $backup_name, has_restoring => 1); -$node_standby->append_conf('postgresql.conf', - "wal_retrieve_retry_interval = '100ms'"); # Set archive_cleanup_command and recovery_end_command, checking their # execution by the backend with dummy commands. diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl index fd4bf7bacd..edb6fbf3a6 100644 --- a/src/test/subscription/t/004_sync.pl +++ b/src/test/subscription/t/004_sync.pl @@ -16,8 +16,6 @@ $node_publisher->start; # Create subscriber node my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init(allows_streaming => 'logical'); -$node_subscriber->append_conf('postgresql.conf', - "wal_retrieve_retry_interval = 1ms"); $node_subscriber->start; # Create some preexisting content on publisher -- 2.25.1