Hi,

On 3/3/23 5:26 PM, Drouvot, Bertrand wrote:
Hi,

On 3/3/23 8:58 AM, Jeff Davis wrote:
On Thu, 2023-03-02 at 11:45 -0800, Jeff Davis wrote:
In this case it looks easier to add the right API than to be sure
about
whether it's needed or not.

I attached a sketch of one approach.

Oh, that's very cool, thanks a lot!

I'm not very confident that it's
the right API or even that it works as I intended it, but if others
like the approach I can work on it some more.


I'll look at it early next week.


So, I took your patch and as an example I tried a quick integration in 0004,
(see 0004_new_API.txt attached) to put it in the logical decoding on standby 
context.

Based on this, I've 3 comments:

- Maybe ConditionVariableEventSleep() should take care of the “WaitEventSetWait 
returns 1 and cvEvent.event == WL_POSTMASTER_DEATH” case?

- Maybe ConditionVariableEventSleep() could accept and deal with the CV being 
NULL?
I used it in the POC attached to handle logical decoding on the primary server 
case.
One option should be to create a dedicated CV for that case though.

- In the POC attached I had to add this extra condition “(cv && 
!RecoveryInProgress())” to avoid waiting on the timeout when there is a promotion.
That makes me think that we may want to add 2 extra parameters (as 2 functions 
returning a bool?) to ConditionVariableEventSleep()
to check whether or not we still want to test the socket or the CV wake up in 
each loop iteration.

Also 3 additional remarks:

1) About InitializeConditionVariableWaitSet() and ConditionVariableWaitSetCreate(): I'm 
not sure about the naming as there is no CV yet (they "just" deal with 
WaitEventSet).

So, what about renaming?

+static WaitEventSet *ConditionVariableWaitSet = NULL;

to say, "LocalWaitSet" and then rename ConditionVariableWaitSetLatchPos, 
InitializeConditionVariableWaitSet() and ConditionVariableWaitSetCreate() accordingly?

But it might be not needed (see 3) below).

2)

 /*
  * Prepare to wait on a given condition variable.
  *
@@ -97,7 +162,8 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv)
 void
 ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
 {
-       (void) ConditionVariableTimedSleep(cv, -1 /* no timeout */ ,
+       (void) ConditionVariableEventSleep(cv, ConditionVariableWaitSet,
+                                                                          -1 
/* no timeout */ ,
                                                                           
wait_event_info);
 }

@@ -111,11 +177,27 @@ ConditionVariableSleep(ConditionVariable *cv, uint32 
wait_event_info)
 bool
 ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
                                                        uint32 wait_event_info)
+{
+       return ConditionVariableEventSleep(cv, ConditionVariableWaitSet, 
timeout,
+                                                                          
wait_event_info);
+}
+

I like the idea of making use of the new ConditionVariableEventSleep() here, 
but on the other hand...

3)

I wonder if there is no race conditions: ConditionVariableWaitSet is being 
initialized with PGINVALID_SOCKET
as WL_LATCH_SET and might be also (if IsUnderPostmaster) be initialized with 
PGINVALID_SOCKET as WL_EXIT_ON_PM_DEATH.

So IIUC, the patch is introducing 2 new possible source of wake up.

Then, what about?

- not create ConditionVariableWaitSet, ConditionVariableWaitSetLatchPos, 
InitializeConditionVariableWaitSet() and ConditionVariableWaitSetCreate() at 
all?
- call ConditionVariableEventSleep() with a NULL parameter in 
ConditionVariableSleep() and ConditionVariableTimedSleep()?
- handle the case where the WaitEventSet parameter is NULL in 
ConditionVariableEventSleep()? (That could also make sense if we handle the 
case of the CV being NULL as proposed above)

Regards,

--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From d4423857bd73c4d87b17a0dac74388f664421e18 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Mon, 6 Mar 2023 08:17:52 +0000
Subject: [PATCH v99 4/6] Fixing Walsender corner case with logical decoding on
 standby.

The problem is that WalSndWaitForWal() waits for the *replay* LSN to
increase, but gets woken up by walreceiver when new WAL has been
flushed. Which means that typically walsenders will get woken up at the
same time that the startup process will be - which means that by the
time the logical walsender checks GetXLogReplayRecPtr() it's unlikely
that the startup process already replayed the record and updated
XLogCtl->lastReplayedEndRecPtr.

Introducing a new condition variable and a new API ConditionVariableEventSleep()
to fix this corner case.
---
 doc/src/sgml/monitoring.sgml                  |   4 +
 src/backend/access/transam/xlogrecovery.c     |  28 ++++
 src/backend/libpq/pqcomm.c                    |  14 +-
 src/backend/replication/walsender.c           |  17 ++-
 src/backend/storage/lmgr/condition_variable.c | 124 +++++++++++++++---
 src/backend/storage/lmgr/proc.c               |   6 +
 src/backend/utils/activity/wait_event.c       |   3 +
 src/backend/utils/init/miscinit.c             |   1 +
 src/include/access/xlogrecovery.h             |   3 +
 src/include/libpq/libpq.h                     |   6 +-
 src/include/replication/walsender.h           |   1 +
 src/include/storage/condition_variable.h      |  10 ++
 src/include/utils/wait_event.h                |   1 +
 13 files changed, 189 insertions(+), 29 deletions(-)
  13.5% src/backend/access/transam/
   8.5% src/backend/libpq/
   6.5% src/backend/replication/
  58.6% src/backend/storage/lmgr/
   4.4% src/include/storage/
   4.4% src/include/

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index cdf7c09b4b..9af8d58da2 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1857,6 +1857,10 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   
11:34   0:00 postgres: ser
       <entry>Waiting for startup process to send initial data for streaming
        replication.</entry>
      </row>
+     <row>
+      <entry><literal>WalSenderWaitReplay</literal></entry>
+      <entry>Waiting for startup process to replay write-ahead log.</entry>
+     </row>
      <row>
       <entry><literal>XactGroupUpdate</literal></entry>
       <entry>Waiting for the group leader to update transaction status at
diff --git a/src/backend/access/transam/xlogrecovery.c 
b/src/backend/access/transam/xlogrecovery.c
index dbe9394762..8a9505a52d 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -358,6 +358,9 @@ typedef struct XLogRecoveryCtlData
        RecoveryPauseState recoveryPauseState;
        ConditionVariable recoveryNotPausedCV;
 
+       /* Replay state (see check_for_replay() for more explanation) */
+       ConditionVariable replayedCV;
+
        slock_t         info_lck;               /* locks shared variables shown 
above */
 } XLogRecoveryCtlData;
 
@@ -468,6 +471,7 @@ XLogRecoveryShmemInit(void)
        SpinLockInit(&XLogRecoveryCtl->info_lck);
        InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
        ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV);
+       ConditionVariableInit(&XLogRecoveryCtl->replayedCV);
 }
 
 /*
@@ -1935,6 +1939,11 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord 
*record, TimeLineID *repl
        XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
        SpinLockRelease(&XLogRecoveryCtl->info_lck);
 
+       /*
+        * wake up walsender(s) used by logical decoding on standby.
+        */
+       ConditionVariableBroadcast(&XLogRecoveryCtl->replayedCV);
+
        /*
         * If rm_redo called XLogRequestWalReceiverReply, then we wake up the
         * receiver so that it notices the updated lastReplayedEndRecPtr and 
sends
@@ -4942,3 +4951,22 @@ assign_recovery_target_xid(const char *newval, void 
*extra)
        else
                recoveryTarget = RECOVERY_TARGET_UNSET;
 }
+
+/*
+ * Return the ConditionVariable indicating that a replay has been done.
+ *
+ * This is needed for logical decoding on standby. Indeed the "problem" is that
+ * WalSndWaitForWal() waits for the *replay* LSN to increase, but gets woken up
+ * by walreceiver when new WAL has been flushed. Which means that typically
+ * walsenders will get woken up at the same time that the startup process
+ * will be - which means that by the time the logical walsender checks
+ * GetXLogReplayRecPtr() it's unlikely that the startup process already 
replayed
+ * the record and updated XLogCtl->lastReplayedEndRecPtr.
+ *
+ * The ConditionVariable XLogRecoveryCtl->replayedCV solves this corner case.
+ */
+ConditionVariable *
+check_for_replay(void)
+{
+       return &XLogRecoveryCtl->replayedCV;
+}
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index da5bb5fc5d..babd0b6c4e 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -80,6 +80,7 @@
 #include "storage/ipc.h"
 #include "utils/guc_hooks.h"
 #include "utils/memutils.h"
+#include "storage/condition_variable.h"
 
 /*
  * Cope with the various platform-specific ways to spell TCP keepalive socket
@@ -172,7 +173,6 @@ void
 pq_init(void)
 {
        int                     socket_pos PG_USED_FOR_ASSERTS_ONLY;
-       int                     latch_pos PG_USED_FOR_ASSERTS_ONLY;
 
        /* initialize state variables */
        PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
@@ -207,20 +207,14 @@ pq_init(void)
                elog(FATAL, "fcntl(F_SETFD) failed on socket: %m");
 #endif
 
-       FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, FeBeWaitSetNEvents);
+       FeBeWaitSet = ConditionVariableWaitSetCreate(TopMemoryContext, 
FeBeWaitSetNEvents);
        socket_pos = AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE,
                                                                   
MyProcPort->sock, NULL, NULL);
-       latch_pos = AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, 
PGINVALID_SOCKET,
-                                                                 MyLatch, 
NULL);
-       AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET,
-                                         NULL, NULL);
-
        /*
-        * The event positions match the order we added them, but let's sanity
-        * check them to be sure.
+        * The socket_pos matches the order we added it, but let's sanity
+        * check it to be sure.
         */
        Assert(socket_pos == FeBeWaitSetSocketPos);
-       Assert(latch_pos == FeBeWaitSetLatchPos);
 }
 
 /* --------------------------------
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index 3042e5bd64..89d1a36e6a 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1551,7 +1551,9 @@ static XLogRecPtr
 WalSndWaitForWal(XLogRecPtr loc)
 {
        int                     wakeEvents;
+       uint32          wait_event;
        static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
+       ConditionVariable *cv = NULL;
 
        /*
         * Fast path to avoid acquiring the spinlock in case we already know we
@@ -1564,9 +1566,20 @@ WalSndWaitForWal(XLogRecPtr loc)
 
        /* Get a more recent flush pointer. */
        if (!RecoveryInProgress())
+       {
                RecentFlushPtr = GetFlushRecPtr(NULL);
+               wait_event = WAIT_EVENT_WAL_SENDER_WAIT_WAL;
+       }
        else
+       {
                RecentFlushPtr = GetXLogReplayRecPtr(NULL);
+               wait_event = WAIT_EVENT_WAL_SENDER_WAIT_REPLAY;
+               cv = check_for_replay();
+       }
+
+       /* Prepare the cv to sleep */
+       if (cv)
+               ConditionVariablePrepareToSleep(cv);
 
        for (;;)
        {
@@ -1667,9 +1680,11 @@ WalSndWaitForWal(XLogRecPtr loc)
                if (pq_is_send_pending())
                        wakeEvents |= WL_SOCKET_WRITEABLE;
 
-               WalSndWait(wakeEvents, sleeptime, 
WAIT_EVENT_WAL_SENDER_WAIT_WAL);
+               ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, wakeEvents, 
NULL);
+               ConditionVariableEventSleep(cv, FeBeWaitSet, sleeptime, 
wait_event);
        }
 
+       ConditionVariableCancelSleep();
        /* reactivate latch so WalSndLoop knows to continue */
        SetLatch(MyLatch);
        return RecentFlushPtr;
diff --git a/src/backend/storage/lmgr/condition_variable.c 
b/src/backend/storage/lmgr/condition_variable.c
index 7e2bbf46d9..766f1bd7b2 100644
--- a/src/backend/storage/lmgr/condition_variable.c
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -27,9 +27,29 @@
 #include "storage/spin.h"
 #include "utils/memutils.h"
 
+#define ConditionVariableWaitSetLatchPos       0
+
 /* Initially, we are not prepared to sleep on any condition variable. */
 static ConditionVariable *cv_sleep_target = NULL;
 
+/* Used by ConditionVariableSleep() and ConditionVariableTimedSleep(). */
+static WaitEventSet *ConditionVariableWaitSet = NULL;
+
+/*
+ * Initialize the process-local condition variable WaitEventSet.
+ *
+ * This must be called once during startup of any process that can wait on
+ * condition variables, before it issues any ConditionVariableInit() calls.
+ */
+void
+InitializeConditionVariableWaitSet(void)
+{
+       Assert(ConditionVariableWaitSet == NULL);
+
+       ConditionVariableWaitSet = ConditionVariableWaitSetCreate(
+               TopMemoryContext, 0);
+}
+
 /*
  * Initialize a condition variable.
  */
@@ -40,6 +60,51 @@ ConditionVariableInit(ConditionVariable *cv)
        proclist_init(&cv->wakeup);
 }
 
+/*
+ * Create a WaitEventSet for ConditionVariableEventSleep(). This should be
+ * used when the caller of ConditionVariableEventSleep() would like to wake up
+ * on either the condition variable signal or a socket event. For example:
+ *
+ *   ConditionVariableInit(&cv);
+ *   waitset = ConditionVariableWaitSetCreate(mcxt, 1);
+ *   event_pos = AddWaitEventToSet(waitset, 0, sock, NULL, NULL);
+ *   ...
+ *   ConditionVariablePrepareToSleep(&cv);
+ *   while (...condition not met...)
+ *   {
+ *       socket_wait_events = ...
+ *       ModifyWaitEvent(waitset, event_pos, socket_wait_events, NULL);
+ *       ConditionVariableEventSleep(&cv, waitset, ...);
+ *   }
+ *   ConditionVariableCancelSleep();
+ *
+ * The waitset is created with the standard events for a condition variable,
+ * and room for adding n_socket_events additional socket events. The
+ * initially-filled event positions should not be modified, but added socket
+ * events can be modified. The same waitset can be used for multiple condition
+ * variables as long as the callers of ConditionVariableEventSleep() are
+ * interested in the same sockets.
+ */
+WaitEventSet *
+ConditionVariableWaitSetCreate(MemoryContext mcxt, int n_socket_events)
+{
+       int                              latch_pos   PG_USED_FOR_ASSERTS_ONLY;
+       int                              n_cv_events = IsUnderPostmaster ? 2 : 
1;
+       int                              nevents         = n_cv_events + 
n_socket_events;
+       WaitEventSet    *waitset         = CreateWaitEventSet(mcxt, nevents);
+
+       latch_pos = AddWaitEventToSet(waitset, WL_LATCH_SET, PGINVALID_SOCKET,
+                                                                 MyLatch, 
NULL);
+
+       if (IsUnderPostmaster)
+               AddWaitEventToSet(waitset, WL_EXIT_ON_PM_DEATH, 
PGINVALID_SOCKET,
+                                                 NULL, NULL);
+
+       Assert(latch_pos == ConditionVariableWaitSetLatchPos);
+
+       return waitset;
+}
+
 /*
  * Prepare to wait on a given condition variable.
  *
@@ -97,7 +162,8 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv)
 void
 ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
 {
-       (void) ConditionVariableTimedSleep(cv, -1 /* no timeout */ ,
+       (void) ConditionVariableEventSleep(cv, ConditionVariableWaitSet,
+                                                                          -1 
/* no timeout */ ,
                                                                           
wait_event_info);
 }
 
@@ -111,11 +177,27 @@ ConditionVariableSleep(ConditionVariable *cv, uint32 
wait_event_info)
 bool
 ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
                                                        uint32 wait_event_info)
+{
+       return ConditionVariableEventSleep(cv, ConditionVariableWaitSet, 
timeout,
+                                                                          
wait_event_info);
+}
+
+/*
+ * Wait for a condition variable to be signaled, a timeout to be reached, or a
+ * socket event in the given waitset. The waitset must have been created by
+ * ConditionVariableWaitSetCreate().
+ *
+ * Returns true when timeout expires, otherwise returns false.
+ *
+ * See ConditionVariableSleep() for general usage.
+ */
+bool
+ConditionVariableEventSleep(ConditionVariable *cv, WaitEventSet *waitset,
+                                                       long timeout, uint32 
wait_event_info)
 {
        long            cur_timeout = -1;
        instr_time      start_time;
        instr_time      cur_time;
-       int                     wait_events;
 
        /*
         * If the caller didn't prepare to sleep explicitly, then do so now and
@@ -132,7 +214,7 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long 
timeout,
         * If we are currently prepared to sleep on some other CV, we just 
cancel
         * that and prepare this one; see ConditionVariablePrepareToSleep.
         */
-       if (cv_sleep_target != cv)
+       if (cv && cv_sleep_target != cv)
        {
                ConditionVariablePrepareToSleep(cv);
                return false;
@@ -147,24 +229,28 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long 
timeout,
                INSTR_TIME_SET_CURRENT(start_time);
                Assert(timeout >= 0 && timeout <= INT_MAX);
                cur_timeout = timeout;
-               wait_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
        }
-       else
-               wait_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
 
        while (true)
        {
                bool            done = false;
+               WaitEvent       cvEvent;
+               int                     nevents;
 
                /*
-                * Wait for latch to be set.  (If we're awakened for some other
-                * reason, the code below will cope anyway.)
+                * Wait for latch to be set, or other events which will be 
handled
+                * below.
                 */
-               (void) WaitLatch(MyLatch, wait_events, cur_timeout, 
wait_event_info);
+               nevents = WaitEventSetWait(waitset, cur_timeout, &cvEvent,
+                                                                  1, 
wait_event_info);
 
                /* Reset latch before examining the state of the wait list. */
                ResetLatch(MyLatch);
 
+               /* If a socket event occurred, no need to check wait list. */
+               if (nevents == 1 && (cvEvent.events & WL_SOCKET_MASK) != 0)
+                       return true;
+
                /*
                 * If this process has been taken out of the wait list, then we 
know
                 * that it has been signaled by ConditionVariableSignal (or
@@ -180,13 +266,21 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long 
timeout,
                 * by something other than ConditionVariableSignal; though we 
don't
                 * guarantee not to return spuriously, we'll avoid this obvious 
case.
                 */
-               SpinLockAcquire(&cv->mutex);
-               if (!proclist_contains(&cv->wakeup, MyProc->pgprocno, 
cvWaitLink))
+
+               if (cv)
                {
-                       done = true;
-                       proclist_push_tail(&cv->wakeup, MyProc->pgprocno, 
cvWaitLink);
+                       SpinLockAcquire(&cv->mutex);
+                       if (!proclist_contains(&cv->wakeup, MyProc->pgprocno, 
cvWaitLink))
+                       {
+                               done = true;
+                               proclist_push_tail(&cv->wakeup, 
MyProc->pgprocno, cvWaitLink);
+                       }
+                       SpinLockRelease(&cv->mutex);
                }
-               SpinLockRelease(&cv->mutex);
+
+               /* Note for the POC: If we are not waiting on a CV or have just 
been promoted. */
+               if (!cv || (cv && !RecoveryInProgress()))
+                       done = true;
 
                /*
                 * Check for interrupts, and return spuriously if that caused 
the
@@ -194,7 +288,7 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long 
timeout,
                 * waited for a different condition variable).
                 */
                CHECK_FOR_INTERRUPTS();
-               if (cv != cv_sleep_target)
+               if (cv && cv != cv_sleep_target)
                        done = true;
 
                /* We were signaled, so return */
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 22b4278610..ae4a7aecd4 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -440,6 +440,9 @@ InitProcess(void)
        OwnLatch(&MyProc->procLatch);
        SwitchToSharedLatch();
 
+       /* Initialize process-local condition variable support */
+       InitializeConditionVariableWaitSet();
+
        /* now that we have a proc, report wait events to shared memory */
        pgstat_set_wait_event_storage(&MyProc->wait_event_info);
 
@@ -596,6 +599,9 @@ InitAuxiliaryProcess(void)
        OwnLatch(&MyProc->procLatch);
        SwitchToSharedLatch();
 
+       /* Initialize process-local condition variable support */
+       InitializeConditionVariableWaitSet();
+
        /* now that we have a proc, report wait events to shared memory */
        pgstat_set_wait_event_storage(&MyProc->wait_event_info);
 
diff --git a/src/backend/utils/activity/wait_event.c 
b/src/backend/utils/activity/wait_event.c
index cb99cc6339..a10dcd4e61 100644
--- a/src/backend/utils/activity/wait_event.c
+++ b/src/backend/utils/activity/wait_event.c
@@ -466,6 +466,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
                case WAIT_EVENT_WAL_RECEIVER_WAIT_START:
                        event_name = "WalReceiverWaitStart";
                        break;
+               case WAIT_EVENT_WAL_SENDER_WAIT_REPLAY:
+                       event_name = "WalSenderWaitReplay";
+                       break;
                case WAIT_EVENT_XACT_GROUP_UPDATE:
                        event_name = "XactGroupUpdate";
                        break;
diff --git a/src/backend/utils/init/miscinit.c 
b/src/backend/utils/init/miscinit.c
index 7eb7fe87f6..d07d24bc45 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -40,6 +40,7 @@
 #include "postmaster/interrupt.h"
 #include "postmaster/pgarch.h"
 #include "postmaster/postmaster.h"
+#include "storage/condition_variable.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/latch.h"
diff --git a/src/include/access/xlogrecovery.h 
b/src/include/access/xlogrecovery.h
index 47c29350f5..2bfeaaa00f 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -15,6 +15,7 @@
 #include "catalog/pg_control.h"
 #include "lib/stringinfo.h"
 #include "utils/timestamp.h"
+#include "storage/condition_variable.h"
 
 /*
  * Recovery target type.
@@ -155,4 +156,6 @@ extern void RecoveryRequiresIntParameter(const char 
*param_name, int currValue,
 
 extern void xlog_outdesc(StringInfo buf, XLogReaderState *record);
 
+extern ConditionVariable *check_for_replay(void);
+
 #endif                                                 /* XLOGRECOVERY_H */
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 50fc781f47..33eddc7d40 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -60,9 +60,9 @@ extern const PGDLLIMPORT PQcommMethods *PqCommMethods;
  */
 extern PGDLLIMPORT WaitEventSet *FeBeWaitSet;
 
-#define FeBeWaitSetSocketPos 0
-#define FeBeWaitSetLatchPos 1
-#define FeBeWaitSetNEvents 3
+#define FeBeWaitSetLatchPos 0
+#define FeBeWaitSetSocketPos 2
+#define FeBeWaitSetNEvents 1
 
 extern int     StreamServerPort(int family, const char *hostName,
                                                         unsigned short 
portNumber, const char *unixSocketDir,
diff --git a/src/include/replication/walsender.h 
b/src/include/replication/walsender.h
index 52bb3e2aae..2fd745fe72 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -13,6 +13,7 @@
 #define _WALSENDER_H
 
 #include <signal.h>
+#include "storage/condition_variable.h"
 
 /*
  * What to do with a snapshot in create replication slot command.
diff --git a/src/include/storage/condition_variable.h 
b/src/include/storage/condition_variable.h
index 589bdd323c..94adb54b91 100644
--- a/src/include/storage/condition_variable.h
+++ b/src/include/storage/condition_variable.h
@@ -22,6 +22,7 @@
 #ifndef CONDITION_VARIABLE_H
 #define CONDITION_VARIABLE_H
 
+#include "storage/latch.h"
 #include "storage/proclist_types.h"
 #include "storage/spin.h"
 
@@ -42,9 +43,14 @@ typedef union ConditionVariableMinimallyPadded
        char            pad[CV_MINIMAL_SIZE];
 } ConditionVariableMinimallyPadded;
 
+extern void InitializeConditionVariableWaitSet(void);
+
 /* Initialize a condition variable. */
 extern void ConditionVariableInit(ConditionVariable *cv);
 
+extern WaitEventSet *ConditionVariableWaitSetCreate(MemoryContext mcxt,
+                                                                               
                        int n_socket_events);
+
 /*
  * To sleep on a condition variable, a process should use a loop which first
  * checks the condition, exiting the loop if it is met, and then calls
@@ -56,6 +62,10 @@ extern void ConditionVariableInit(ConditionVariable *cv);
 extern void ConditionVariableSleep(ConditionVariable *cv, uint32 
wait_event_info);
 extern bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
                                                                                
uint32 wait_event_info);
+extern bool ConditionVariableEventSleep(ConditionVariable *cv,
+                                                                               
WaitEventSet *cvEventSet,
+                                                                               
long timeout,
+                                                                               
uint32 wait_event_info);
 extern void ConditionVariableCancelSleep(void);
 
 /*
diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h
index 9ab23e1c4a..548ef41dca 100644
--- a/src/include/utils/wait_event.h
+++ b/src/include/utils/wait_event.h
@@ -131,6 +131,7 @@ typedef enum
        WAIT_EVENT_SYNC_REP,
        WAIT_EVENT_WAL_RECEIVER_EXIT,
        WAIT_EVENT_WAL_RECEIVER_WAIT_START,
+       WAIT_EVENT_WAL_SENDER_WAIT_REPLAY,
        WAIT_EVENT_XACT_GROUP_UPDATE
 } WaitEventIPC;
 
-- 
2.34.1

Reply via email to