On Tue, Nov 22, 2022 at 07:25:36AM +0000, houzj.f...@fujitsu.com wrote: > On Tuesday, November 22, 2022 2:49 PM Hayato Kuroda (Fujitsu) > <kuroda.hay...@fujitsu.com> >> Thanks for updating! It becomes better. Further comments: >> >> 01. AlterSubscription() >> >> ``` >> + LogicalRepWorkersWakeupAtCommit(subid); >> + >> ``` >> >> Currently subids will be recorded even if the subscription is not modified. >> I think LogicalRepWorkersWakeupAtCommit() should be called inside the if >> (update_tuple). > > I think an exception would be REFRESH PULLICATION in which case update_tuple > is > false, but it seems better to wake up apply worker in this case as well, > because the apply worker is also responsible to start table sync workers for > newly subscribed tables(in process_syncing_tables()). > > Besides, it seems not a must to wake up apply worker for ALTER SKIP > TRANSACTION, > Although there might be no harm for waking up in this case.
In v3, I moved the call to LogicalRepWorkersWakeupAtCommit() to the end of the function. This should avoid waking up workers in some cases where it's unnecessary (e.g., if ALTER SUBSCRIPTION ERRORs in a subtransaction), but there are still cases where we'll wake up the workers unnecessarily. I think this is unlikely to cause any real problems in practice. >> 02. LogicalRepWorkersWakeupAtCommit() >> >> ``` >> + oldcxt = MemoryContextSwitchTo(TopTransactionContext); >> + on_commit_wakeup_workers_subids = >> lappend_oid(on_commit_wakeup_workers_subids, >> + >> subid); >> ``` >> >> If the subscription is altered twice in the same transaction, the same subid >> will >> be recorded twice. >> I'm not sure whether it may be caused some issued, but list_member_oid() can >> be used to avoid that. > > +1, list_append_unique_oid might be better. Done in v3. -- Nathan Bossart Amazon Web Services: https://aws.amazon.com
>From b80c5934b40034dfbbde411721db1d4171c86d5c Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nathandboss...@gmail.com> Date: Mon, 21 Nov 2022 16:01:01 -0800 Subject: [PATCH v3 1/1] wake up logical workers after ALTER SUBSCRIPTION --- src/backend/access/transam/xact.c | 3 ++ src/backend/commands/subscriptioncmds.c | 3 ++ src/backend/replication/logical/worker.c | 46 ++++++++++++++++++++++++ src/include/replication/logicalworker.h | 3 ++ 4 files changed, 55 insertions(+) diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 8086b857b9..dc00e66cfb 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -47,6 +47,7 @@ #include "pgstat.h" #include "replication/logical.h" #include "replication/logicallauncher.h" +#include "replication/logicalworker.h" #include "replication/origin.h" #include "replication/snapbuild.h" #include "replication/syncrep.h" @@ -2360,6 +2361,7 @@ CommitTransaction(void) AtEOXact_PgStat(true, is_parallel_worker); AtEOXact_Snapshot(true, false); AtEOXact_ApplyLauncher(true); + AtEOXact_LogicalRepWorkers(true); pgstat_report_xact_timestamp(0); CurrentResourceOwner = NULL; @@ -2860,6 +2862,7 @@ AbortTransaction(void) AtEOXact_HashTables(false); AtEOXact_PgStat(false, is_parallel_worker); AtEOXact_ApplyLauncher(false); + AtEOXact_LogicalRepWorkers(false); pgstat_report_xact_timestamp(0); } diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index d673557ea4..c761785947 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -34,6 +34,7 @@ #include "nodes/makefuncs.h" #include "pgstat.h" #include "replication/logicallauncher.h" +#include "replication/logicalworker.h" #include "replication/origin.h" #include "replication/slot.h" #include "replication/walreceiver.h" @@ -1358,6 +1359,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, table_close(rel, RowExclusiveLock); + LogicalRepWorkersWakeupAtCommit(subid); + ObjectAddressSet(myself, SubscriptionRelationId, subid); InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index e48a3f589a..d101cddf6c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -253,6 +253,8 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL; Subscription *MySubscription = NULL; static bool MySubscriptionValid = false; +static List *on_commit_wakeup_workers_subids = NIL; + bool in_remote_transaction = false; static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; @@ -4092,3 +4094,47 @@ reset_apply_error_context_info(void) apply_error_callback_arg.remote_attnum = -1; set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr); } + +/* + * Wakeup the stored subscriptions' workers on commit if requested. + */ +void +AtEOXact_LogicalRepWorkers(bool isCommit) +{ + if (isCommit && on_commit_wakeup_workers_subids != NIL) + { + ListCell *subid; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + foreach(subid, on_commit_wakeup_workers_subids) + { + List *workers; + ListCell *worker; + + workers = logicalrep_workers_find(lfirst_oid(subid), true); + foreach(worker, workers) + logicalrep_worker_wakeup_ptr((LogicalRepWorker *) lfirst(worker)); + } + LWLockRelease(LogicalRepWorkerLock); + } + + on_commit_wakeup_workers_subids = NIL; +} + +/* + * Request wakeup of the workers for the given subscription ID on commit of the + * transaction. + * + * This is used to ensure that the workers reread the subscription info as soon + * as possible. + */ +void +LogicalRepWorkersWakeupAtCommit(Oid subid) +{ + MemoryContext oldcxt; + + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + on_commit_wakeup_workers_subids = list_append_unique_oid(on_commit_wakeup_workers_subids, + subid); + MemoryContextSwitchTo(oldcxt); +} diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index cd1b6e8afc..2c2340d758 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -16,4 +16,7 @@ extern void ApplyWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); +extern void LogicalRepWorkersWakeupAtCommit(Oid subid); +extern void AtEOXact_LogicalRepWorkers(bool isCommit); + #endif /* LOGICALWORKER_H */ -- 2.25.1