Hi,

On 3/2/23 1:40 AM, Jeff Davis wrote:
On Wed, 2023-03-01 at 11:51 +0100, Drouvot, Bertrand wrote:


Why not "simply" call ConditionVariablePrepareToSleep() without any
call to ConditionVariableTimedSleep() later?

ConditionVariableSleep() re-inserts itself into the queue if it was
previously removed. Without that, a single wakeup could remove it from
the wait queue, and the effects of ConditionVariablePrepareToSleep()
would be lost.

Right, but in our case, right after the wakeup (the one due to the CV broadcast,
aka the one that will remove it from the wait queue) we'll exit the loop due to:

"
        /* check whether we're done */
        if (loc <= RecentFlushPtr)
            break;
"

as the CV broadcast means that a flush/replay occurred.

So I don't see any issue in this particular case (as we are removed from the 
queue
but we'll not have to wait anymore).


In that case the walsender will be put in the wait queue (thanks to
ConditionVariablePrepareToSleep())
and will be waked up by the event on the socket, the timeout or the
CV broadcast

I believe it will only be awakened once, and if it enters WalSndWait()
again, future ConditionVariableBroadcast/Signal() calls won't wake it
up any more.

I don't think that's right and that's not what my testing shows (please find 
attached 0004-CV-POC.txt,
a .txt file to not break the CF bot), as:

- If it is awakened due to the CV broadcast, then we'll right after exit the 
loop (see above)
- If it is awakened due to the timeout or the socket event then we're still in 
the CV wait queue
(as nothing removed it from the CV wait queue).


  (since IIUC they all rely on the same latch).

Relying on that fact seems like too much action-at-a-distance to me
If
we change the implementation of condition variables, then it would stop
working.


I'm not sure about this one. I mean it would depend what the implementation 
changes are.
Also the related TAP test (0005) would probably fail or start taking a long 
time due to
the corner case we are trying to solve here coming back (like it was detected 
in [1])

Also, since they are using the same latch, that means we are still
waking up too frequently, right? We haven't really solved the problem.


I don't think so as the first CV broadcast will make us exit the loop.
So, ISTM that we'll wake up as we currently do, expect when there is a 
flush/replay
which is what we want, right?

That looks weird to use ConditionVariablePrepareToSleep() without
actually using ConditionVariableTimedSleep()
but it looks to me that it would achieve the same goal: having the
walsender being waked up
by the event on the socket, the timeout or the CV broadcast.

I don't think it actually works, because something needs to keep re-
inserting it into the queue after it gets removed.

I think that's not needed as we'd exit the loop right after we are awakened by 
a CV broadcast.

To use condition variables properly, I think we'd need an API like
ConditionVariableEventsSleep(), which takes a WaitEventSet and a
timeout. I think this is what Andres was suggesting and seems like a
good idea. I looked into it and I don't think it's too hard to
implement -- we just need to WaitEventSetWait instead of WaitLatch.
There are a few details to sort out, like how to enable callers to
easily create the right WaitEventSet (it obviously needs to include
MyLatch, for instance) and update it with the right socket events.


I agree that's a good idea and that it should/would work too. I just wanted to 
highlight that in this particular
case that might not be necessary to build this new API.

[1]: 
https://www.postgresql.org/message-id/47606911-cf44-5a62-21d5-366d3bc6e445%40enterprisedb.com

Regards,

--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
diff --git a/src/backend/access/transam/xlogrecovery.c 
b/src/backend/access/transam/xlogrecovery.c
index dbe9394762..8a9505a52d 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -358,6 +358,9 @@ typedef struct XLogRecoveryCtlData
        RecoveryPauseState recoveryPauseState;
        ConditionVariable recoveryNotPausedCV;
 
+       /* Replay state (see check_for_replay() for more explanation) */
+       ConditionVariable replayedCV;
+
        slock_t         info_lck;               /* locks shared variables shown 
above */
 } XLogRecoveryCtlData;
 
@@ -468,6 +471,7 @@ XLogRecoveryShmemInit(void)
        SpinLockInit(&XLogRecoveryCtl->info_lck);
        InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
        ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV);
+       ConditionVariableInit(&XLogRecoveryCtl->replayedCV);
 }
 
 /*
@@ -1935,6 +1939,11 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord 
*record, TimeLineID *repl
        XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
        SpinLockRelease(&XLogRecoveryCtl->info_lck);
 
+       /*
+        * wake up walsender(s) used by logical decoding on standby.
+        */
+       ConditionVariableBroadcast(&XLogRecoveryCtl->replayedCV);
+
        /*
         * If rm_redo called XLogRequestWalReceiverReply, then we wake up the
         * receiver so that it notices the updated lastReplayedEndRecPtr and 
sends
@@ -4942,3 +4951,22 @@ assign_recovery_target_xid(const char *newval, void 
*extra)
        else
                recoveryTarget = RECOVERY_TARGET_UNSET;
 }
+
+/*
+ * Return the ConditionVariable indicating that a replay has been done.
+ *
+ * This is needed for logical decoding on standby. Indeed the "problem" is that
+ * WalSndWaitForWal() waits for the *replay* LSN to increase, but gets woken up
+ * by walreceiver when new WAL has been flushed. Which means that typically
+ * walsenders will get woken up at the same time that the startup process
+ * will be - which means that by the time the logical walsender checks
+ * GetXLogReplayRecPtr() it's unlikely that the startup process already 
replayed
+ * the record and updated XLogCtl->lastReplayedEndRecPtr.
+ *
+ * The ConditionVariable XLogRecoveryCtl->replayedCV solves this corner case.
+ */
+ConditionVariable *
+check_for_replay(void)
+{
+       return &XLogRecoveryCtl->replayedCV;
+}
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index 3042e5bd64..05350bb535 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1552,6 +1552,17 @@ WalSndWaitForWal(XLogRecPtr loc)
 {
        int                     wakeEvents;
        static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
+       ConditionVariable *replayedCV = check_for_replay();
+
+       /*
+        * Prepare the replayedCV to sleep. Note that this is enough to be added
+        * in the wait queue and then waked up (while in WalSndWait() below)
+        * by ConditionVariableBroadcast() during the WAL replay. Also Note that
+        * if awakaned by the CV broadcast we'll exit the loop right after due 
to
+        * the loc <= RecentFlushPtr test done in the loop. Indeed, CV brodcast
+        * would mean that a replay occured.
+        */
+       ConditionVariablePrepareToSleep(replayedCV);
 
        /*
         * Fast path to avoid acquiring the spinlock in case we already know we
@@ -1670,6 +1681,7 @@ WalSndWaitForWal(XLogRecPtr loc)
                WalSndWait(wakeEvents, sleeptime, 
WAIT_EVENT_WAL_SENDER_WAIT_WAL);
        }
 
+       ConditionVariableCancelSleep();
        /* reactivate latch so WalSndLoop knows to continue */
        SetLatch(MyLatch);
        return RecentFlushPtr;
diff --git a/src/include/access/xlogrecovery.h 
b/src/include/access/xlogrecovery.h
index 47c29350f5..2bfeaaa00f 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -15,6 +15,7 @@
 #include "catalog/pg_control.h"
 #include "lib/stringinfo.h"
 #include "utils/timestamp.h"
+#include "storage/condition_variable.h"
 
 /*
  * Recovery target type.
@@ -155,4 +156,6 @@ extern void RecoveryRequiresIntParameter(const char 
*param_name, int currValue,
 
 extern void xlog_outdesc(StringInfo buf, XLogReaderState *record);
 
+extern ConditionVariable *check_for_replay(void);
+
 #endif                                                 /* XLOGRECOVERY_H */
diff --git a/src/include/replication/walsender.h 
b/src/include/replication/walsender.h
index 52bb3e2aae..2fd745fe72 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -13,6 +13,7 @@
 #define _WALSENDER_H
 
 #include <signal.h>
+#include "storage/condition_variable.h"
 
 /*
  * What to do with a snapshot in create replication slot command.

Reply via email to