On Tue, Nov 22, 2022 at 07:25:36AM +0000, houzj.f...@fujitsu.com wrote:
> On Tuesday, November 22, 2022 2:49 PM Hayato Kuroda (Fujitsu) 
> <kuroda.hay...@fujitsu.com>
>> Thanks for updating! It becomes better. Further comments:
>> 
>> 01. AlterSubscription()
>> 
>> ```
>> +    LogicalRepWorkersWakeupAtCommit(subid);
>> +
>> ```
>> 
>> Currently subids will be recorded even if the subscription is not modified.
>> I think LogicalRepWorkersWakeupAtCommit() should be called inside the if
>> (update_tuple).
> 
> I think an exception would be REFRESH PULLICATION in which case update_tuple 
> is
> false, but it seems better to wake up apply worker in this case as well,
> because the apply worker is also responsible to start table sync workers for
> newly subscribed tables(in process_syncing_tables()).
> 
> Besides, it seems not a must to wake up apply worker for ALTER SKIP 
> TRANSACTION,
> Although there might be no harm for waking up in this case.

In v3, I moved the call to LogicalRepWorkersWakeupAtCommit() to the end of
the function.  This should avoid waking up workers in some cases where it's
unnecessary (e.g., if ALTER SUBSCRIPTION ERRORs in a subtransaction), but
there are still cases where we'll wake up the workers unnecessarily.  I
think this is unlikely to cause any real problems in practice.

>> 02. LogicalRepWorkersWakeupAtCommit()
>> 
>> ```
>> +    oldcxt = MemoryContextSwitchTo(TopTransactionContext);
>> +    on_commit_wakeup_workers_subids =
>> lappend_oid(on_commit_wakeup_workers_subids,
>> +
>>                subid);
>> ```
>> 
>> If the subscription is altered twice in the same transaction, the same subid 
>> will
>> be recorded twice.
>> I'm not sure whether it may be caused some issued, but list_member_oid() can
>> be used to avoid that.
> 
> +1, list_append_unique_oid might be better.

Done in v3.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From b80c5934b40034dfbbde411721db1d4171c86d5c Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Mon, 21 Nov 2022 16:01:01 -0800
Subject: [PATCH v3 1/1] wake up logical workers after ALTER SUBSCRIPTION

---
 src/backend/access/transam/xact.c        |  3 ++
 src/backend/commands/subscriptioncmds.c  |  3 ++
 src/backend/replication/logical/worker.c | 46 ++++++++++++++++++++++++
 src/include/replication/logicalworker.h  |  3 ++
 4 files changed, 55 insertions(+)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 8086b857b9..dc00e66cfb 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -47,6 +47,7 @@
 #include "pgstat.h"
 #include "replication/logical.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
 #include "replication/syncrep.h"
@@ -2360,6 +2361,7 @@ CommitTransaction(void)
 	AtEOXact_PgStat(true, is_parallel_worker);
 	AtEOXact_Snapshot(true, false);
 	AtEOXact_ApplyLauncher(true);
+	AtEOXact_LogicalRepWorkers(true);
 	pgstat_report_xact_timestamp(0);
 
 	CurrentResourceOwner = NULL;
@@ -2860,6 +2862,7 @@ AbortTransaction(void)
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false, is_parallel_worker);
 		AtEOXact_ApplyLauncher(false);
+		AtEOXact_LogicalRepWorkers(false);
 		pgstat_report_xact_timestamp(0);
 	}
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index d673557ea4..c761785947 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
@@ -1358,6 +1359,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	table_close(rel, RowExclusiveLock);
 
+	LogicalRepWorkersWakeupAtCommit(subid);
+
 	ObjectAddressSet(myself, SubscriptionRelationId, subid);
 
 	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index e48a3f589a..d101cddf6c 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -253,6 +253,8 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
 Subscription *MySubscription = NULL;
 static bool MySubscriptionValid = false;
 
+static List *on_commit_wakeup_workers_subids = NIL;
+
 bool		in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
 
@@ -4092,3 +4094,47 @@ reset_apply_error_context_info(void)
 	apply_error_callback_arg.remote_attnum = -1;
 	set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
 }
+
+/*
+ * Wakeup the stored subscriptions' workers on commit if requested.
+ */
+void
+AtEOXact_LogicalRepWorkers(bool isCommit)
+{
+	if (isCommit && on_commit_wakeup_workers_subids != NIL)
+	{
+		ListCell   *subid;
+
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		foreach(subid, on_commit_wakeup_workers_subids)
+		{
+			List	   *workers;
+			ListCell   *worker;
+
+			workers = logicalrep_workers_find(lfirst_oid(subid), true);
+			foreach(worker, workers)
+				logicalrep_worker_wakeup_ptr((LogicalRepWorker *) lfirst(worker));
+		}
+		LWLockRelease(LogicalRepWorkerLock);
+	}
+
+	on_commit_wakeup_workers_subids = NIL;
+}
+
+/*
+ * Request wakeup of the workers for the given subscription ID on commit of the
+ * transaction.
+ *
+ * This is used to ensure that the workers reread the subscription info as soon
+ * as possible.
+ */
+void
+LogicalRepWorkersWakeupAtCommit(Oid subid)
+{
+	MemoryContext oldcxt;
+
+	oldcxt = MemoryContextSwitchTo(TopTransactionContext);
+	on_commit_wakeup_workers_subids = list_append_unique_oid(on_commit_wakeup_workers_subids,
+															 subid);
+	MemoryContextSwitchTo(oldcxt);
+}
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index cd1b6e8afc..2c2340d758 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -16,4 +16,7 @@ extern void ApplyWorkerMain(Datum main_arg);
 
 extern bool IsLogicalWorker(void);
 
+extern void LogicalRepWorkersWakeupAtCommit(Oid subid);
+extern void AtEOXact_LogicalRepWorkers(bool isCommit);
+
 #endif							/* LOGICALWORKER_H */
-- 
2.25.1

Reply via email to