On 2023-Aug-23, Zhijie Hou (Fujitsu) wrote:

> [1]--
>               LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
> 
>               workers = logicalrep_workers_find(MyLogicalRepWorker->subid, 
> true);
>               foreach(lc, workers)
>               {
>                       LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
> 
> **                    if (isParallelApplyWorker(w))
>                               logicalrep_worker_stop_internal(w, SIGTERM);
>               }

Hmm, I think if worker->in_use is false, we shouldn't consult the rest
of the struct at all, so I propose to add the attached 0001 as a minimal
fix.

In fact, I'd go further and propose that if we do take that stance, then
we don't need clear out the contents of this struct at all, so let's
not.  That's 0002.

And the reason 0002 does not remove the zeroing of ->proc is that the
tests gets stuck when I do that, and the reason for that looks to be
some shoddy coding in WaitForReplicationWorkerAttach, so I propose we
change that too, as in 0003.

Thoughts?

-- 
Álvaro Herrera         PostgreSQL Developer  —  https://www.EnterpriseDB.com/
>From a28775fc5900cd740556a864e1826ceda268b794 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Wed, 23 Aug 2023 09:25:35 +0200
Subject: [PATCH 1/3] Don't use LogicalRepWorker until ->in_use is verified

---
 src/backend/replication/logical/launcher.c | 4 ++--
 src/include/replication/worker_internal.h  | 8 ++++++--
 2 files changed, 8 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 72e44d5a02..ec5156aa41 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -862,7 +862,7 @@ logicalrep_sync_worker_count(Oid subid)
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (w->subid == subid && isTablesyncWorker(w))
+		if (isTablesyncWorker(w) && w->subid == subid)
 			res++;
 	}
 
@@ -889,7 +889,7 @@ logicalrep_pa_worker_count(Oid subid)
 	{
 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-		if (w->subid == subid && isParallelApplyWorker(w))
+		if (isParallelApplyWorker(w) && w->subid == subid)
 			res++;
 	}
 
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index a428663859..8f4bed0958 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -327,8 +327,10 @@ extern void pa_decr_and_wait_stream_block(void);
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 						   XLogRecPtr remote_lsn);
 
-#define isParallelApplyWorker(worker) ((worker)->type == WORKERTYPE_PARALLEL_APPLY)
-#define isTablesyncWorker(worker) ((worker)->type == WORKERTYPE_TABLESYNC)
+#define isParallelApplyWorker(worker) ((worker)->in_use && \
+									   (worker)->type == WORKERTYPE_PARALLEL_APPLY)
+#define isTablesyncWorker(worker) ((worker)->in_use && \
+								   (worker)->type == WORKERTYPE_TABLESYNC)
 
 static inline bool
 am_tablesync_worker(void)
@@ -339,12 +341,14 @@ am_tablesync_worker(void)
 static inline bool
 am_leader_apply_worker(void)
 {
+	Assert(MyLogicalRepWorker->in_use);
 	return (MyLogicalRepWorker->type == WORKERTYPE_APPLY);
 }
 
 static inline bool
 am_parallel_apply_worker(void)
 {
+	Assert(MyLogicalRepWorker->in_use);
 	return isParallelApplyWorker(MyLogicalRepWorker);
 }
 
-- 
2.30.2

>From 36336e6d8ef2913def59a53b6bc083a40181a85a Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Wed, 23 Aug 2023 09:53:51 +0200
Subject: [PATCH 2/3] don't clean up unnecessarily

---
 src/backend/replication/logical/launcher.c | 6 ------
 1 file changed, 6 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index ec5156aa41..f9d6ebf2d8 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -795,12 +795,6 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
 
 	worker->in_use = false;
 	worker->proc = NULL;
-	worker->dbid = InvalidOid;
-	worker->userid = InvalidOid;
-	worker->subid = InvalidOid;
-	worker->relid = InvalidOid;
-	worker->leader_pid = InvalidPid;
-	worker->parallel_apply = false;
 }
 
 /*
-- 
2.30.2

>From 9dced5b0898be22004d442cb2893848663f967ba Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Wed, 23 Aug 2023 09:55:13 +0200
Subject: [PATCH 3/3] Don't rely on proc being NULL either

---
 src/backend/replication/logical/launcher.c | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index f9d6ebf2d8..8bfceb5c27 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -201,11 +201,18 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
 
 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-		/* Worker either died or has started. Return false if died. */
-		if (!worker->in_use || worker->proc)
+		/* Not started? */
+		if (!worker->in_use)
 		{
 			LWLockRelease(LogicalRepWorkerLock);
-			return worker->in_use;
+			return false;
+		}
+
+		/* Already going? */
+		if (worker->proc)
+		{
+			LWLockRelease(LogicalRepWorkerLock);
+			return true;
 		}
 
 		LWLockRelease(LogicalRepWorkerLock);
@@ -794,7 +801,6 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
 	Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
 
 	worker->in_use = false;
-	worker->proc = NULL;
 }
 
 /*
-- 
2.30.2

Reply via email to