On Wed, 2023-03-08 at 11:25 +0100, Drouvot, Bertrand wrote:
> - Maybe ConditionVariableEventSleep() should take care of the
> “WaitEventSetWait returns 1 and cvEvent.event == WL_POSTMASTER_DEATH”
> case?
Thank you, done. I think the nearby line was also wrong, returning true
when there was no timeout. I combined the lines and got rid of the
early return so it can check the list and timeout condition like
normal. Attached.
> - Maybe ConditionVariableEventSleep() could accept and deal with the
> CV being NULL?
> I used it in the POC attached to handle logical decoding on the
> primary server case.
> One option should be to create a dedicated CV for that case though.
I don't think it's a good idea to have a CV-based API that doesn't need
a CV. Wouldn't that just be a normal WaitEventSet?
> - In the POC attached I had to add this extra condition “(cv &&
> !RecoveryInProgress())” to avoid waiting on the timeout when there is
> a promotion.
> That makes me think that we may want to add 2 extra parameters (as 2
> functions returning a bool?) to ConditionVariableEventSleep()
> to check whether or not we still want to test the socket or the CV
> wake up in each loop iteration.
That seems like a complex API. Would it work to signal the CV during
promotion instead?
> Also 3 additional remarks:
>
> 1) About InitializeConditionVariableWaitSet() and
> ConditionVariableWaitSetCreate(): I'm not sure about the naming as
> there is no CV yet (they "just" deal with WaitEventSet).
It's a WaitEventSet that contains the conditions always required for
any CV, and allows you to add in more.
> 3)
>
> I wonder if there is no race conditions: ConditionVariableWaitSet is
> being initialized with PGINVALID_SOCKET
> as WL_LATCH_SET and might be also (if IsUnderPostmaster) be
> initialized with PGINVALID_SOCKET as WL_EXIT_ON_PM_DEATH.
>
> So IIUC, the patch is introducing 2 new possible source of wake up.
Those should be the same conditions already required by
ConditionVariableTimedSleep() in master, right?
Regards,
Jeff Davis
From 2f05cab9012950ae9290fccbb6366d50fc01553e Mon Sep 17 00:00:00 2001
From: Jeff Davis <[email protected]>
Date: Wed, 1 Mar 2023 20:02:42 -0800
Subject: [PATCH v2] Introduce ConditionVariableEventSleep().
The new API takes a WaitEventSet which can include socket events. The
WaitEventSet must have been created by
ConditionVariableWaitSetCreate(), another new function, so that it
includes the wait events necessary for a condition variable.
---
src/backend/storage/lmgr/condition_variable.c | 106 ++++++++++++++++--
src/backend/storage/lmgr/proc.c | 6 +
src/backend/utils/init/miscinit.c | 1 +
src/include/storage/condition_variable.h | 10 ++
4 files changed, 115 insertions(+), 8 deletions(-)
diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
index 7e2bbf46d9..3dbfa7468b 100644
--- a/src/backend/storage/lmgr/condition_variable.c
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -27,9 +27,29 @@
#include "storage/spin.h"
#include "utils/memutils.h"
+#define ConditionVariableWaitSetLatchPos 0
+
/* Initially, we are not prepared to sleep on any condition variable. */
static ConditionVariable *cv_sleep_target = NULL;
+/* Used by ConditionVariableSleep() and ConditionVariableTimedSleep(). */
+static WaitEventSet *ConditionVariableWaitSet = NULL;
+
+/*
+ * Initialize the process-local condition variable WaitEventSet.
+ *
+ * This must be called once during startup of any process that can wait on
+ * condition variables, before it issues any ConditionVariableInit() calls.
+ */
+void
+InitializeConditionVariableWaitSet(void)
+{
+ Assert(ConditionVariableWaitSet == NULL);
+
+ ConditionVariableWaitSet = ConditionVariableWaitSetCreate(
+ TopMemoryContext, 0);
+}
+
/*
* Initialize a condition variable.
*/
@@ -40,6 +60,51 @@ ConditionVariableInit(ConditionVariable *cv)
proclist_init(&cv->wakeup);
}
+/*
+ * Create a WaitEventSet for ConditionVariableEventSleep(). This should be
+ * used when the caller of ConditionVariableEventSleep() would like to wake up
+ * on either the condition variable signal or a socket event. For example:
+ *
+ * ConditionVariableInit(&cv);
+ * waitset = ConditionVariableWaitSetCreate(mcxt, 1);
+ * event_pos = AddWaitEventToSet(waitset, 0, sock, NULL, NULL);
+ * ...
+ * ConditionVariablePrepareToSleep(&cv);
+ * while (...condition not met...)
+ * {
+ * socket_wait_events = ...
+ * ModifyWaitEvent(waitset, event_pos, socket_wait_events, NULL);
+ * ConditionVariableEventSleep(&cv, waitset, ...);
+ * }
+ * ConditionVariableCancelSleep();
+ *
+ * The waitset is created with the standard events for a condition variable,
+ * and room for adding n_socket_events additional socket events. The
+ * initially-filled event positions should not be modified, but added socket
+ * events can be modified. The same waitset can be used for multiple condition
+ * variables as long as the callers of ConditionVariableEventSleep() are
+ * interested in the same sockets.
+ */
+WaitEventSet *
+ConditionVariableWaitSetCreate(MemoryContext mcxt, int n_socket_events)
+{
+ int latch_pos PG_USED_FOR_ASSERTS_ONLY;
+ int n_cv_events = IsUnderPostmaster ? 2 : 1;
+ int nevents = n_cv_events + n_socket_events;
+ WaitEventSet *waitset = CreateWaitEventSet(mcxt, nevents);
+
+ latch_pos = AddWaitEventToSet(waitset, WL_LATCH_SET, PGINVALID_SOCKET,
+ MyLatch, NULL);
+
+ if (IsUnderPostmaster)
+ AddWaitEventToSet(waitset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
+ NULL, NULL);
+
+ Assert(latch_pos == ConditionVariableWaitSetLatchPos);
+
+ return waitset;
+}
+
/*
* Prepare to wait on a given condition variable.
*
@@ -97,7 +162,8 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv)
void
ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
{
- (void) ConditionVariableTimedSleep(cv, -1 /* no timeout */ ,
+ (void) ConditionVariableEventSleep(cv, ConditionVariableWaitSet,
+ -1 /* no timeout */ ,
wait_event_info);
}
@@ -111,11 +177,27 @@ ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
bool
ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
uint32 wait_event_info)
+{
+ return ConditionVariableEventSleep(cv, ConditionVariableWaitSet, timeout,
+ wait_event_info);
+}
+
+/*
+ * Wait for a condition variable to be signaled, a timeout to be reached, or a
+ * socket event in the given waitset. The waitset must have been created by
+ * ConditionVariableWaitSetCreate().
+ *
+ * Returns true when timeout expires, otherwise returns false.
+ *
+ * See ConditionVariableSleep() for general usage.
+ */
+bool
+ConditionVariableEventSleep(ConditionVariable *cv, WaitEventSet *waitset,
+ long timeout, uint32 wait_event_info)
{
long cur_timeout = -1;
instr_time start_time;
instr_time cur_time;
- int wait_events;
/*
* If the caller didn't prepare to sleep explicitly, then do so now and
@@ -147,24 +229,32 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
INSTR_TIME_SET_CURRENT(start_time);
Assert(timeout >= 0 && timeout <= INT_MAX);
cur_timeout = timeout;
- wait_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
}
- else
- wait_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
while (true)
{
bool done = false;
+ WaitEvent cvEvent;
+ int nevents;
/*
- * Wait for latch to be set. (If we're awakened for some other
- * reason, the code below will cope anyway.)
+ * Wait for latch to be set, or other events which will be handled
+ * below.
*/
- (void) WaitLatch(MyLatch, wait_events, cur_timeout, wait_event_info);
+ nevents = WaitEventSetWait(waitset, cur_timeout, &cvEvent,
+ 1, wait_event_info);
/* Reset latch before examining the state of the wait list. */
ResetLatch(MyLatch);
+ /*
+ * If the wakeup was due to a socket event or postmaster death, then
+ * we must return to the caller.
+ */
+ if (nevents == 1 &&
+ (cvEvent.events & (WL_SOCKET_MASK | WL_POSTMASTER_DEATH)) != 0)
+ done = true;
+
/*
* If this process has been taken out of the wait list, then we know
* that it has been signaled by ConditionVariableSignal (or
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 22b4278610..ae4a7aecd4 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -440,6 +440,9 @@ InitProcess(void)
OwnLatch(&MyProc->procLatch);
SwitchToSharedLatch();
+ /* Initialize process-local condition variable support */
+ InitializeConditionVariableWaitSet();
+
/* now that we have a proc, report wait events to shared memory */
pgstat_set_wait_event_storage(&MyProc->wait_event_info);
@@ -596,6 +599,9 @@ InitAuxiliaryProcess(void)
OwnLatch(&MyProc->procLatch);
SwitchToSharedLatch();
+ /* Initialize process-local condition variable support */
+ InitializeConditionVariableWaitSet();
+
/* now that we have a proc, report wait events to shared memory */
pgstat_set_wait_event_storage(&MyProc->wait_event_info);
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index a604432126..0545044225 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -40,6 +40,7 @@
#include "postmaster/interrupt.h"
#include "postmaster/pgarch.h"
#include "postmaster/postmaster.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/latch.h"
diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h
index 589bdd323c..94adb54b91 100644
--- a/src/include/storage/condition_variable.h
+++ b/src/include/storage/condition_variable.h
@@ -22,6 +22,7 @@
#ifndef CONDITION_VARIABLE_H
#define CONDITION_VARIABLE_H
+#include "storage/latch.h"
#include "storage/proclist_types.h"
#include "storage/spin.h"
@@ -42,9 +43,14 @@ typedef union ConditionVariableMinimallyPadded
char pad[CV_MINIMAL_SIZE];
} ConditionVariableMinimallyPadded;
+extern void InitializeConditionVariableWaitSet(void);
+
/* Initialize a condition variable. */
extern void ConditionVariableInit(ConditionVariable *cv);
+extern WaitEventSet *ConditionVariableWaitSetCreate(MemoryContext mcxt,
+ int n_socket_events);
+
/*
* To sleep on a condition variable, a process should use a loop which first
* checks the condition, exiting the loop if it is met, and then calls
@@ -56,6 +62,10 @@ extern void ConditionVariableInit(ConditionVariable *cv);
extern void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info);
extern bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
uint32 wait_event_info);
+extern bool ConditionVariableEventSleep(ConditionVariable *cv,
+ WaitEventSet *cvEventSet,
+ long timeout,
+ uint32 wait_event_info);
extern void ConditionVariableCancelSleep(void);
/*
--
2.34.1