On Wed, 2023-03-08 at 11:25 +0100, Drouvot, Bertrand wrote:
> - Maybe ConditionVariableEventSleep() should take care of the
> “WaitEventSetWait returns 1 and cvEvent.event == WL_POSTMASTER_DEATH”
> case?

Thank you, done. I think the nearby line was also wrong, returning true
when there was no timeout. I combined the lines and got rid of the
early return so it can check the list and timeout condition like
normal. Attached.

> - Maybe ConditionVariableEventSleep() could accept and deal with the
> CV being NULL?
> I used it in the POC attached to handle logical decoding on the
> primary server case.
> One option should be to create a dedicated CV for that case though.

I don't think it's a good idea to have a CV-based API that doesn't need
a CV. Wouldn't that just be a normal WaitEventSet?

> - In the POC attached I had to add this extra condition “(cv &&
> !RecoveryInProgress())” to avoid waiting on the timeout when there is
> a promotion.
> That makes me think that we may want to add 2 extra parameters (as 2
> functions returning a bool?) to ConditionVariableEventSleep()
> to check whether or not we still want to test the socket or the CV
> wake up in each loop iteration.

That seems like a complex API. Would it work to signal the CV during
promotion instead?

> Also 3 additional remarks:
> 
> 1) About InitializeConditionVariableWaitSet() and
> ConditionVariableWaitSetCreate(): I'm not sure about the naming as
> there is no CV yet (they "just" deal with WaitEventSet).

It's a WaitEventSet that contains the conditions always required for
any CV, and allows you to add in more.

> 3)
> 
> I wonder if there is no race conditions: ConditionVariableWaitSet is
> being initialized with PGINVALID_SOCKET
> as WL_LATCH_SET and might be also (if IsUnderPostmaster) be
> initialized with PGINVALID_SOCKET as WL_EXIT_ON_PM_DEATH.
> 
> So IIUC, the patch is introducing 2 new possible source of wake up.

Those should be the same conditions already required by
ConditionVariableTimedSleep() in master, right?


Regards,
        Jeff Davis
From 2f05cab9012950ae9290fccbb6366d50fc01553e Mon Sep 17 00:00:00 2001
From: Jeff Davis <j...@j-davis.com>
Date: Wed, 1 Mar 2023 20:02:42 -0800
Subject: [PATCH v2] Introduce ConditionVariableEventSleep().

The new API takes a WaitEventSet which can include socket events. The
WaitEventSet must have been created by
ConditionVariableWaitSetCreate(), another new function, so that it
includes the wait events necessary for a condition variable.
---
 src/backend/storage/lmgr/condition_variable.c | 106 ++++++++++++++++--
 src/backend/storage/lmgr/proc.c               |   6 +
 src/backend/utils/init/miscinit.c             |   1 +
 src/include/storage/condition_variable.h      |  10 ++
 4 files changed, 115 insertions(+), 8 deletions(-)

diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
index 7e2bbf46d9..3dbfa7468b 100644
--- a/src/backend/storage/lmgr/condition_variable.c
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -27,9 +27,29 @@
 #include "storage/spin.h"
 #include "utils/memutils.h"
 
+#define ConditionVariableWaitSetLatchPos	0
+
 /* Initially, we are not prepared to sleep on any condition variable. */
 static ConditionVariable *cv_sleep_target = NULL;
 
+/* Used by ConditionVariableSleep() and ConditionVariableTimedSleep(). */
+static WaitEventSet *ConditionVariableWaitSet = NULL;
+
+/*
+ * Initialize the process-local condition variable WaitEventSet.
+ *
+ * This must be called once during startup of any process that can wait on
+ * condition variables, before it issues any ConditionVariableInit() calls.
+ */
+void
+InitializeConditionVariableWaitSet(void)
+{
+	Assert(ConditionVariableWaitSet == NULL);
+
+	ConditionVariableWaitSet = ConditionVariableWaitSetCreate(
+		TopMemoryContext, 0);
+}
+
 /*
  * Initialize a condition variable.
  */
@@ -40,6 +60,51 @@ ConditionVariableInit(ConditionVariable *cv)
 	proclist_init(&cv->wakeup);
 }
 
+/*
+ * Create a WaitEventSet for ConditionVariableEventSleep(). This should be
+ * used when the caller of ConditionVariableEventSleep() would like to wake up
+ * on either the condition variable signal or a socket event. For example:
+ *
+ *   ConditionVariableInit(&cv);
+ *   waitset = ConditionVariableWaitSetCreate(mcxt, 1);
+ *   event_pos = AddWaitEventToSet(waitset, 0, sock, NULL, NULL);
+ *   ...
+ *   ConditionVariablePrepareToSleep(&cv);
+ *   while (...condition not met...)
+ *   {
+ *       socket_wait_events = ...
+ *       ModifyWaitEvent(waitset, event_pos, socket_wait_events, NULL);
+ *       ConditionVariableEventSleep(&cv, waitset, ...);
+ *   }
+ *   ConditionVariableCancelSleep();
+ *
+ * The waitset is created with the standard events for a condition variable,
+ * and room for adding n_socket_events additional socket events. The
+ * initially-filled event positions should not be modified, but added socket
+ * events can be modified. The same waitset can be used for multiple condition
+ * variables as long as the callers of ConditionVariableEventSleep() are
+ * interested in the same sockets.
+ */
+WaitEventSet *
+ConditionVariableWaitSetCreate(MemoryContext mcxt, int n_socket_events)
+{
+	int				 latch_pos   PG_USED_FOR_ASSERTS_ONLY;
+	int				 n_cv_events = IsUnderPostmaster ? 2 : 1;
+	int				 nevents	 = n_cv_events + n_socket_events;
+	WaitEventSet    *waitset	 = CreateWaitEventSet(mcxt, nevents);
+
+	latch_pos = AddWaitEventToSet(waitset, WL_LATCH_SET, PGINVALID_SOCKET,
+								  MyLatch, NULL);
+
+	if (IsUnderPostmaster)
+		AddWaitEventToSet(waitset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
+						  NULL, NULL);
+
+	Assert(latch_pos == ConditionVariableWaitSetLatchPos);
+
+	return waitset;
+}
+
 /*
  * Prepare to wait on a given condition variable.
  *
@@ -97,7 +162,8 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv)
 void
 ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
 {
-	(void) ConditionVariableTimedSleep(cv, -1 /* no timeout */ ,
+	(void) ConditionVariableEventSleep(cv, ConditionVariableWaitSet,
+									   -1 /* no timeout */ ,
 									   wait_event_info);
 }
 
@@ -111,11 +177,27 @@ ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
 bool
 ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
 							uint32 wait_event_info)
+{
+	return ConditionVariableEventSleep(cv, ConditionVariableWaitSet, timeout,
+									   wait_event_info);
+}
+
+/*
+ * Wait for a condition variable to be signaled, a timeout to be reached, or a
+ * socket event in the given waitset. The waitset must have been created by
+ * ConditionVariableWaitSetCreate().
+ *
+ * Returns true when timeout expires, otherwise returns false.
+ *
+ * See ConditionVariableSleep() for general usage.
+ */
+bool
+ConditionVariableEventSleep(ConditionVariable *cv, WaitEventSet *waitset,
+							long timeout, uint32 wait_event_info)
 {
 	long		cur_timeout = -1;
 	instr_time	start_time;
 	instr_time	cur_time;
-	int			wait_events;
 
 	/*
 	 * If the caller didn't prepare to sleep explicitly, then do so now and
@@ -147,24 +229,32 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
 		INSTR_TIME_SET_CURRENT(start_time);
 		Assert(timeout >= 0 && timeout <= INT_MAX);
 		cur_timeout = timeout;
-		wait_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
 	}
-	else
-		wait_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
 
 	while (true)
 	{
 		bool		done = false;
+		WaitEvent	cvEvent;
+		int			nevents;
 
 		/*
-		 * Wait for latch to be set.  (If we're awakened for some other
-		 * reason, the code below will cope anyway.)
+		 * Wait for latch to be set, or other events which will be handled
+		 * below.
 		 */
-		(void) WaitLatch(MyLatch, wait_events, cur_timeout, wait_event_info);
+		nevents = WaitEventSetWait(waitset, cur_timeout, &cvEvent,
+								   1, wait_event_info);
 
 		/* Reset latch before examining the state of the wait list. */
 		ResetLatch(MyLatch);
 
+		/*
+		 * If the wakeup was due to a socket event or postmaster death, then
+		 * we must return to the caller.
+		 */
+		if (nevents == 1 &&
+			(cvEvent.events & (WL_SOCKET_MASK | WL_POSTMASTER_DEATH)) != 0)
+			done = true;
+
 		/*
 		 * If this process has been taken out of the wait list, then we know
 		 * that it has been signaled by ConditionVariableSignal (or
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 22b4278610..ae4a7aecd4 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -440,6 +440,9 @@ InitProcess(void)
 	OwnLatch(&MyProc->procLatch);
 	SwitchToSharedLatch();
 
+	/* Initialize process-local condition variable support */
+	InitializeConditionVariableWaitSet();
+
 	/* now that we have a proc, report wait events to shared memory */
 	pgstat_set_wait_event_storage(&MyProc->wait_event_info);
 
@@ -596,6 +599,9 @@ InitAuxiliaryProcess(void)
 	OwnLatch(&MyProc->procLatch);
 	SwitchToSharedLatch();
 
+	/* Initialize process-local condition variable support */
+	InitializeConditionVariableWaitSet();
+
 	/* now that we have a proc, report wait events to shared memory */
 	pgstat_set_wait_event_storage(&MyProc->wait_event_info);
 
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index a604432126..0545044225 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -40,6 +40,7 @@
 #include "postmaster/interrupt.h"
 #include "postmaster/pgarch.h"
 #include "postmaster/postmaster.h"
+#include "storage/condition_variable.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/latch.h"
diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h
index 589bdd323c..94adb54b91 100644
--- a/src/include/storage/condition_variable.h
+++ b/src/include/storage/condition_variable.h
@@ -22,6 +22,7 @@
 #ifndef CONDITION_VARIABLE_H
 #define CONDITION_VARIABLE_H
 
+#include "storage/latch.h"
 #include "storage/proclist_types.h"
 #include "storage/spin.h"
 
@@ -42,9 +43,14 @@ typedef union ConditionVariableMinimallyPadded
 	char		pad[CV_MINIMAL_SIZE];
 } ConditionVariableMinimallyPadded;
 
+extern void InitializeConditionVariableWaitSet(void);
+
 /* Initialize a condition variable. */
 extern void ConditionVariableInit(ConditionVariable *cv);
 
+extern WaitEventSet *ConditionVariableWaitSetCreate(MemoryContext mcxt,
+													int n_socket_events);
+
 /*
  * To sleep on a condition variable, a process should use a loop which first
  * checks the condition, exiting the loop if it is met, and then calls
@@ -56,6 +62,10 @@ extern void ConditionVariableInit(ConditionVariable *cv);
 extern void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info);
 extern bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
 										uint32 wait_event_info);
+extern bool ConditionVariableEventSleep(ConditionVariable *cv,
+										WaitEventSet *cvEventSet,
+										long timeout,
+										uint32 wait_event_info);
 extern void ConditionVariableCancelSleep(void);
 
 /*
-- 
2.34.1

Reply via email to