Hi,

On 3/8/23 11:25 AM, Drouvot, Bertrand wrote:
Hi,

On 3/3/23 5:26 PM, Drouvot, Bertrand wrote:
Hi,

On 3/3/23 8:58 AM, Jeff Davis wrote:
On Thu, 2023-03-02 at 11:45 -0800, Jeff Davis wrote:
In this case it looks easier to add the right API than to be sure
about
whether it's needed or not.

I attached a sketch of one approach.

Oh, that's very cool, thanks a lot!

I'm not very confident that it's
the right API or even that it works as I intended it, but if others
like the approach I can work on it some more.


I'll look at it early next week.


So, I took your patch and as an example I tried a quick integration in 0004,
(see 0004_new_API.txt attached) to put it in the logical decoding on standby 
context.

Based on this, I've 3 comments:

- Maybe ConditionVariableEventSleep() should take care of the “WaitEventSetWait 
returns 1 and cvEvent.event == WL_POSTMASTER_DEATH” case?

- Maybe ConditionVariableEventSleep() could accept and deal with the CV being 
NULL?
I used it in the POC attached to handle logical decoding on the primary server 
case.
One option should be to create a dedicated CV for that case though.

- In the POC attached I had to add this extra condition “(cv && 
!RecoveryInProgress())” to avoid waiting on the timeout when there is a promotion.
That makes me think that we may want to add 2 extra parameters (as 2 functions 
returning a bool?) to ConditionVariableEventSleep()
to check whether or not we still want to test the socket or the CV wake up in 
each loop iteration.

Also 3 additional remarks:

1) About InitializeConditionVariableWaitSet() and ConditionVariableWaitSetCreate(): I'm 
not sure about the naming as there is no CV yet (they "just" deal with 
WaitEventSet).

So, what about renaming?

+static WaitEventSet *ConditionVariableWaitSet = NULL;

to say, "LocalWaitSet" and then rename ConditionVariableWaitSetLatchPos, 
InitializeConditionVariableWaitSet() and ConditionVariableWaitSetCreate() accordingly?

But it might be not needed (see 3) below).

2)

  /*
   * Prepare to wait on a given condition variable.
   *
@@ -97,7 +162,8 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv)
  void
  ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
  {
-       (void) ConditionVariableTimedSleep(cv, -1 /* no timeout */ ,
+       (void) ConditionVariableEventSleep(cv, ConditionVariableWaitSet,
+                                                                          -1 
/* no timeout */ ,
                                                                            
wait_event_info);
  }

@@ -111,11 +177,27 @@ ConditionVariableSleep(ConditionVariable *cv, uint32 
wait_event_info)
  bool
  ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
                                                         uint32 wait_event_info)
+{
+       return ConditionVariableEventSleep(cv, ConditionVariableWaitSet, 
timeout,
+                                                                          
wait_event_info);
+}
+

I like the idea of making use of the new ConditionVariableEventSleep() here, 
but on the other hand...

3)

I wonder if there is no race conditions: ConditionVariableWaitSet is being 
initialized with PGINVALID_SOCKET
as WL_LATCH_SET and might be also (if IsUnderPostmaster) be initialized with 
PGINVALID_SOCKET as WL_EXIT_ON_PM_DEATH.

So IIUC, the patch is introducing 2 new possible source of wake up.

Then, what about?

- not create ConditionVariableWaitSet, ConditionVariableWaitSetLatchPos, 
InitializeConditionVariableWaitSet() and ConditionVariableWaitSetCreate() at 
all?
- call ConditionVariableEventSleep() with a NULL parameter in 
ConditionVariableSleep() and ConditionVariableTimedSleep()?
- handle the case where the WaitEventSet parameter is NULL in 
ConditionVariableEventSleep()? (That could also make sense if we handle the 
case of the CV being NULL as proposed above)


I gave it a try, so please find attached 
v2-0001-Introduce-ConditionVariableEventSleep.txt (implementing the comments 
above) and 0004_new_API.txt to put the new API in the logical decoding on 
standby context.

There is no change in v2-0001-Introduce-ConditionVariableEventSleep.txt 
regarding the up-thread comment related to WL_POSTMASTER_DEATH.
What do you think?

Regards,

--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From 9a820140b7356ab94479499a80fc4742403f3ca5 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Fri, 10 Mar 2023 10:58:23 +0000
Subject: [PATCH v99 5/7] Fixing Walsender corner case with logical decoding on
 standby.

The problem is that WalSndWaitForWal() waits for the *replay* LSN to
increase, but gets woken up by walreceiver when new WAL has been
flushed. Which means that typically walsenders will get woken up at the
same time that the startup process will be - which means that by the
time the logical walsender checks GetXLogReplayRecPtr() it's unlikely
that the startup process already replayed the record and updated
XLogCtl->lastReplayedEndRecPtr.

Introducing a new condition variable and a new API ConditionVariableEventSleep()
to fix this corner case.
---
 doc/src/sgml/monitoring.sgml              |  4 ++++
 src/backend/access/transam/xlogrecovery.c | 28 +++++++++++++++++++++++
 src/backend/replication/walsender.c       | 18 ++++++++++++++-
 src/backend/utils/activity/wait_event.c   |  3 +++
 src/include/access/xlogrecovery.h         |  3 +++
 src/include/replication/walsender.h       |  1 +
 src/include/utils/wait_event.h            |  1 +
 7 files changed, 57 insertions(+), 1 deletion(-)
   7.8% doc/src/sgml/
  52.1% src/backend/access/transam/
  27.1% src/backend/replication/
   4.5% src/backend/utils/activity/
   4.5% src/include/access/
   3.7% src/include/

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index cdf7c09b4b..9af8d58da2 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1857,6 +1857,10 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   
11:34   0:00 postgres: ser
       <entry>Waiting for startup process to send initial data for streaming
        replication.</entry>
      </row>
+     <row>
+      <entry><literal>WalSenderWaitReplay</literal></entry>
+      <entry>Waiting for startup process to replay write-ahead log.</entry>
+     </row>
      <row>
       <entry><literal>XactGroupUpdate</literal></entry>
       <entry>Waiting for the group leader to update transaction status at
diff --git a/src/backend/access/transam/xlogrecovery.c 
b/src/backend/access/transam/xlogrecovery.c
index dbe9394762..8a9505a52d 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -358,6 +358,9 @@ typedef struct XLogRecoveryCtlData
        RecoveryPauseState recoveryPauseState;
        ConditionVariable recoveryNotPausedCV;
 
+       /* Replay state (see check_for_replay() for more explanation) */
+       ConditionVariable replayedCV;
+
        slock_t         info_lck;               /* locks shared variables shown 
above */
 } XLogRecoveryCtlData;
 
@@ -468,6 +471,7 @@ XLogRecoveryShmemInit(void)
        SpinLockInit(&XLogRecoveryCtl->info_lck);
        InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
        ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV);
+       ConditionVariableInit(&XLogRecoveryCtl->replayedCV);
 }
 
 /*
@@ -1935,6 +1939,11 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord 
*record, TimeLineID *repl
        XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
        SpinLockRelease(&XLogRecoveryCtl->info_lck);
 
+       /*
+        * wake up walsender(s) used by logical decoding on standby.
+        */
+       ConditionVariableBroadcast(&XLogRecoveryCtl->replayedCV);
+
        /*
         * If rm_redo called XLogRequestWalReceiverReply, then we wake up the
         * receiver so that it notices the updated lastReplayedEndRecPtr and 
sends
@@ -4942,3 +4951,22 @@ assign_recovery_target_xid(const char *newval, void 
*extra)
        else
                recoveryTarget = RECOVERY_TARGET_UNSET;
 }
+
+/*
+ * Return the ConditionVariable indicating that a replay has been done.
+ *
+ * This is needed for logical decoding on standby. Indeed the "problem" is that
+ * WalSndWaitForWal() waits for the *replay* LSN to increase, but gets woken up
+ * by walreceiver when new WAL has been flushed. Which means that typically
+ * walsenders will get woken up at the same time that the startup process
+ * will be - which means that by the time the logical walsender checks
+ * GetXLogReplayRecPtr() it's unlikely that the startup process already 
replayed
+ * the record and updated XLogCtl->lastReplayedEndRecPtr.
+ *
+ * The ConditionVariable XLogRecoveryCtl->replayedCV solves this corner case.
+ */
+ConditionVariable *
+check_for_replay(void)
+{
+       return &XLogRecoveryCtl->replayedCV;
+}
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index 3042e5bd64..8ef22616bb 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1551,7 +1551,9 @@ static XLogRecPtr
 WalSndWaitForWal(XLogRecPtr loc)
 {
        int                     wakeEvents;
+       uint32          wait_event;
        static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
+       ConditionVariable *cv = NULL;
 
        /*
         * Fast path to avoid acquiring the spinlock in case we already know we
@@ -1564,9 +1566,20 @@ WalSndWaitForWal(XLogRecPtr loc)
 
        /* Get a more recent flush pointer. */
        if (!RecoveryInProgress())
+       {
                RecentFlushPtr = GetFlushRecPtr(NULL);
+               wait_event = WAIT_EVENT_WAL_SENDER_WAIT_WAL;
+       }
        else
+       {
                RecentFlushPtr = GetXLogReplayRecPtr(NULL);
+               wait_event = WAIT_EVENT_WAL_SENDER_WAIT_REPLAY;
+               cv = check_for_replay();
+       }
+
+       /* Prepare the cv to sleep */
+       if (cv)
+               ConditionVariablePrepareToSleep(cv);
 
        for (;;)
        {
@@ -1667,9 +1680,12 @@ WalSndWaitForWal(XLogRecPtr loc)
                if (pq_is_send_pending())
                        wakeEvents |= WL_SOCKET_WRITEABLE;
 
-               WalSndWait(wakeEvents, sleeptime, 
WAIT_EVENT_WAL_SENDER_WAIT_WAL);
+               ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, wakeEvents, 
NULL);
+               ConditionVariableEventSleep(cv, RecoveryInProgress, 
FeBeWaitSet, NULL,
+                                                                       
sleeptime, wait_event);
        }
 
+       ConditionVariableCancelSleep();
        /* reactivate latch so WalSndLoop knows to continue */
        SetLatch(MyLatch);
        return RecentFlushPtr;
diff --git a/src/backend/utils/activity/wait_event.c 
b/src/backend/utils/activity/wait_event.c
index cb99cc6339..a10dcd4e61 100644
--- a/src/backend/utils/activity/wait_event.c
+++ b/src/backend/utils/activity/wait_event.c
@@ -466,6 +466,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
                case WAIT_EVENT_WAL_RECEIVER_WAIT_START:
                        event_name = "WalReceiverWaitStart";
                        break;
+               case WAIT_EVENT_WAL_SENDER_WAIT_REPLAY:
+                       event_name = "WalSenderWaitReplay";
+                       break;
                case WAIT_EVENT_XACT_GROUP_UPDATE:
                        event_name = "XactGroupUpdate";
                        break;
diff --git a/src/include/access/xlogrecovery.h 
b/src/include/access/xlogrecovery.h
index 47c29350f5..2bfeaaa00f 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -15,6 +15,7 @@
 #include "catalog/pg_control.h"
 #include "lib/stringinfo.h"
 #include "utils/timestamp.h"
+#include "storage/condition_variable.h"
 
 /*
  * Recovery target type.
@@ -155,4 +156,6 @@ extern void RecoveryRequiresIntParameter(const char 
*param_name, int currValue,
 
 extern void xlog_outdesc(StringInfo buf, XLogReaderState *record);
 
+extern ConditionVariable *check_for_replay(void);
+
 #endif                                                 /* XLOGRECOVERY_H */
diff --git a/src/include/replication/walsender.h 
b/src/include/replication/walsender.h
index 52bb3e2aae..2fd745fe72 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -13,6 +13,7 @@
 #define _WALSENDER_H
 
 #include <signal.h>
+#include "storage/condition_variable.h"
 
 /*
  * What to do with a snapshot in create replication slot command.
diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h
index 9ab23e1c4a..548ef41dca 100644
--- a/src/include/utils/wait_event.h
+++ b/src/include/utils/wait_event.h
@@ -131,6 +131,7 @@ typedef enum
        WAIT_EVENT_SYNC_REP,
        WAIT_EVENT_WAL_RECEIVER_EXIT,
        WAIT_EVENT_WAL_RECEIVER_WAIT_START,
+       WAIT_EVENT_WAL_SENDER_WAIT_REPLAY,
        WAIT_EVENT_XACT_GROUP_UPDATE
 } WaitEventIPC;
 
-- 
2.34.1

From 0044078a540fcb2b5f5c728dcb7e4911b000d6d5 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Fri, 10 Mar 2023 10:57:22 +0000
Subject: [PATCH v99 4/7] Introduce-ConditionVariableEventSleep

---
 src/backend/storage/lmgr/condition_variable.c | 65 ++++++++++++++-----
 src/include/storage/condition_variable.h      |  7 ++
 2 files changed, 57 insertions(+), 15 deletions(-)
  89.0% src/backend/storage/lmgr/
  10.9% src/include/storage/

diff --git a/src/backend/storage/lmgr/condition_variable.c 
b/src/backend/storage/lmgr/condition_variable.c
index 7e2bbf46d9..af241e7317 100644
--- a/src/backend/storage/lmgr/condition_variable.c
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -97,7 +97,8 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv)
 void
 ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
 {
-       (void) ConditionVariableTimedSleep(cv, -1 /* no timeout */ ,
+       (void) ConditionVariableEventSleep(cv, NULL, NULL, NULL,
+                                                                          -1 
/* no timeout */ ,
                                                                           
wait_event_info);
 }
 
@@ -111,11 +112,28 @@ ConditionVariableSleep(ConditionVariable *cv, uint32 
wait_event_info)
 bool
 ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
                                                        uint32 wait_event_info)
+{
+       return ConditionVariableEventSleep(cv, NULL, NULL, NULL, timeout,
+                                                                          
wait_event_info);
+}
+
+/*
+ * Wait for a condition variable to be signaled, a timeout to be reached, or a
+ * socket event in the given waitset.
+ *
+ * Returns true when timeout expires, otherwise returns false.
+ *
+ * See ConditionVariableSleep() for general usage.
+ */
+bool
+ConditionVariableEventSleep(ConditionVariable *cv, bool 
(*cv_resume_waiting)(void),
+                                                       WaitEventSet *waitset,
+                                                       bool 
(*waitset_resume_waiting)(void),
+                                                       long timeout, uint32 
wait_event_info)
 {
        long            cur_timeout = -1;
        instr_time      start_time;
        instr_time      cur_time;
-       int                     wait_events;
 
        /*
         * If the caller didn't prepare to sleep explicitly, then do so now and
@@ -132,7 +150,7 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long 
timeout,
         * If we are currently prepared to sleep on some other CV, we just 
cancel
         * that and prepare this one; see ConditionVariablePrepareToSleep.
         */
-       if (cv_sleep_target != cv)
+       if (cv && cv_sleep_target != cv)
        {
                ConditionVariablePrepareToSleep(cv);
                return false;
@@ -147,24 +165,29 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long 
timeout,
                INSTR_TIME_SET_CURRENT(start_time);
                Assert(timeout >= 0 && timeout <= INT_MAX);
                cur_timeout = timeout;
-               wait_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
        }
-       else
-               wait_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
 
        while (true)
        {
                bool            done = false;
+               WaitEvent       cvEvent;
+               int                     nevents = 0;
 
                /*
-                * Wait for latch to be set.  (If we're awakened for some other
-                * reason, the code below will cope anyway.)
+                * Wait for latch to be set, or other events which will be 
handled
+                * below.
                 */
-               (void) WaitLatch(MyLatch, wait_events, cur_timeout, 
wait_event_info);
+               if (waitset)
+                       nevents = WaitEventSetWait(waitset, cur_timeout, 
&cvEvent,
+                                                                          1, 
wait_event_info);
 
                /* Reset latch before examining the state of the wait list. */
                ResetLatch(MyLatch);
 
+               /* If a socket event occurred, no need to check wait list. */
+               if (nevents == 1 && (cvEvent.events & WL_SOCKET_MASK) != 0)
+                       return true;
+
                /*
                 * If this process has been taken out of the wait list, then we 
know
                 * that it has been signaled by ConditionVariableSignal (or
@@ -180,13 +203,25 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long 
timeout,
                 * by something other than ConditionVariableSignal; though we 
don't
                 * guarantee not to return spuriously, we'll avoid this obvious 
case.
                 */
-               SpinLockAcquire(&cv->mutex);
-               if (!proclist_contains(&cv->wakeup, MyProc->pgprocno, 
cvWaitLink))
+
+               if (cv)
                {
-                       done = true;
-                       proclist_push_tail(&cv->wakeup, MyProc->pgprocno, 
cvWaitLink);
+                       SpinLockAcquire(&cv->mutex);
+                       if (!proclist_contains(&cv->wakeup, MyProc->pgprocno, 
cvWaitLink))
+                       {
+                               done = true;
+                               proclist_push_tail(&cv->wakeup, 
MyProc->pgprocno, cvWaitLink);
+                       }
+                       SpinLockRelease(&cv->mutex);
                }
-               SpinLockRelease(&cv->mutex);
+
+               /* If we are not waiting on a CV or don't want to wait anymore 
*/
+               if (!cv || (cv && cv_resume_waiting && !cv_resume_waiting()))
+                       done = true;
+
+               /* If we don't want to wait on the waitset anymore */
+               if (waitset && waitset_resume_waiting && 
!waitset_resume_waiting())
+                       done = true;
 
                /*
                 * Check for interrupts, and return spuriously if that caused 
the
@@ -194,7 +229,7 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long 
timeout,
                 * waited for a different condition variable).
                 */
                CHECK_FOR_INTERRUPTS();
-               if (cv != cv_sleep_target)
+               if (cv && cv != cv_sleep_target)
                        done = true;
 
                /* We were signaled, so return */
diff --git a/src/include/storage/condition_variable.h 
b/src/include/storage/condition_variable.h
index 589bdd323c..b9510caa17 100644
--- a/src/include/storage/condition_variable.h
+++ b/src/include/storage/condition_variable.h
@@ -22,6 +22,7 @@
 #ifndef CONDITION_VARIABLE_H
 #define CONDITION_VARIABLE_H
 
+#include "storage/latch.h"
 #include "storage/proclist_types.h"
 #include "storage/spin.h"
 
@@ -56,6 +57,12 @@ extern void ConditionVariableInit(ConditionVariable *cv);
 extern void ConditionVariableSleep(ConditionVariable *cv, uint32 
wait_event_info);
 extern bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
                                                                                
uint32 wait_event_info);
+extern bool ConditionVariableEventSleep(ConditionVariable *cv,
+                                                                               
bool (*cv_resume_waiting)(void),
+                                                                               
WaitEventSet *cvEventSet,
+                                                                               
bool (*waitset_resume_waiting)(void),
+                                                                               
long timeout,
+                                                                               
uint32 wait_event_info);
 extern void ConditionVariableCancelSleep(void);
 
 /*
-- 
2.34.1

Reply via email to