From cc97393e11ed3027ae5b78623a2903e70134aade Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Wed, 26 Oct 2022 17:36:34 +1300
Subject: [PATCH v2 3/9] Use SetLatches() for condition variables.

Drain condition variable wait lists in larger batches, not one at a
time, while broadcasting.  This is an idea from Yura Sokolov, which I've
now combined with SetLatches() for slightly more efficiency.

Since we're now performing loops, change the internal spinlock to an
lwlock.

XXX There is probably a better data structure/arrangement that would
allow us to hold the lock for a shorter time.  This patch is not very
satisfying yet.

Discussion: https://postgr.es/m/CA%2BhUKGKmO7ze0Z6WXKdrLxmvYa%3DzVGGXOO30MMktufofVwEm1A%40mail.gmail.com
Discussion: https://postgr.es/m/1edbb61981fe1d99c3f20e3d56d6c88999f4227c.camel%40postgrespro.ru
---
 src/backend/storage/lmgr/condition_variable.c | 96 +++++++++----------
 src/backend/storage/lmgr/lwlock.c             |  2 +
 src/include/storage/condition_variable.h      |  4 +-
 src/include/storage/lwlock.h                  |  1 +
 4 files changed, 49 insertions(+), 54 deletions(-)

diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
index de65dac3ae..b975a91172 100644
--- a/src/backend/storage/lmgr/condition_variable.c
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -36,7 +36,7 @@ static ConditionVariable *cv_sleep_target = NULL;
 void
 ConditionVariableInit(ConditionVariable *cv)
 {
-	SpinLockInit(&cv->mutex);
+	LWLockInitialize(&cv->mutex, LWTRANCHE_CONDITION_VARIABLE);
 	proclist_init(&cv->wakeup);
 }
 
@@ -74,9 +74,9 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv)
 	cv_sleep_target = cv;
 
 	/* Add myself to the wait queue. */
-	SpinLockAcquire(&cv->mutex);
+	LWLockAcquire(&cv->mutex, LW_EXCLUSIVE);
 	proclist_push_tail(&cv->wakeup, pgprocno, cvWaitLink);
-	SpinLockRelease(&cv->mutex);
+	LWLockRelease(&cv->mutex);
 }
 
 /*
@@ -180,13 +180,13 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
 		 * by something other than ConditionVariableSignal; though we don't
 		 * guarantee not to return spuriously, we'll avoid this obvious case.
 		 */
-		SpinLockAcquire(&cv->mutex);
+		LWLockAcquire(&cv->mutex, LW_EXCLUSIVE);
 		if (!proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink))
 		{
 			done = true;
 			proclist_push_tail(&cv->wakeup, MyProc->pgprocno, cvWaitLink);
 		}
-		SpinLockRelease(&cv->mutex);
+		LWLockRelease(&cv->mutex);
 
 		/*
 		 * Check for interrupts, and return spuriously if that caused the
@@ -233,12 +233,12 @@ ConditionVariableCancelSleep(void)
 	if (cv == NULL)
 		return;
 
-	SpinLockAcquire(&cv->mutex);
+	LWLockAcquire(&cv->mutex, LW_EXCLUSIVE);
 	if (proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink))
 		proclist_delete(&cv->wakeup, MyProc->pgprocno, cvWaitLink);
 	else
 		signaled = true;
-	SpinLockRelease(&cv->mutex);
+	LWLockRelease(&cv->mutex);
 
 	/*
 	 * If we've received a signal, pass it on to another waiting process, if
@@ -265,10 +265,10 @@ ConditionVariableSignal(ConditionVariable *cv)
 	PGPROC	   *proc = NULL;
 
 	/* Remove the first process from the wakeup queue (if any). */
-	SpinLockAcquire(&cv->mutex);
+	LWLockAcquire(&cv->mutex, LW_EXCLUSIVE);
 	if (!proclist_is_empty(&cv->wakeup))
 		proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink);
-	SpinLockRelease(&cv->mutex);
+	LWLockRelease(&cv->mutex);
 
 	/* If we found someone sleeping, set their latch to wake them up. */
 	if (proc != NULL)
@@ -287,6 +287,7 @@ ConditionVariableBroadcast(ConditionVariable *cv)
 {
 	int			pgprocno = MyProc->pgprocno;
 	PGPROC	   *proc = NULL;
+	bool		inserted_sentinel = false;
 	bool		have_sentinel = false;
 
 	/*
@@ -313,52 +314,43 @@ ConditionVariableBroadcast(ConditionVariable *cv)
 	if (cv_sleep_target != NULL)
 		ConditionVariableCancelSleep();
 
-	/*
-	 * Inspect the state of the queue.  If it's empty, we have nothing to do.
-	 * If there's exactly one entry, we need only remove and signal that
-	 * entry.  Otherwise, remove the first entry and insert our sentinel.
-	 */
-	SpinLockAcquire(&cv->mutex);
-	/* While we're here, let's assert we're not in the list. */
-	Assert(!proclist_contains(&cv->wakeup, pgprocno, cvWaitLink));
-
-	if (!proclist_is_empty(&cv->wakeup))
+	do
 	{
-		proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink);
-		if (!proclist_is_empty(&cv->wakeup))
-		{
-			proclist_push_tail(&cv->wakeup, pgprocno, cvWaitLink);
-			have_sentinel = true;
-		}
-	}
-	SpinLockRelease(&cv->mutex);
+		LatchGroup	wakeups = {0};
 
-	/* Awaken first waiter, if there was one. */
-	if (proc != NULL)
-		SetLatch(&proc->procLatch);
+		LWLockAcquire(&cv->mutex, LW_EXCLUSIVE);
+		while (!proclist_is_empty(&cv->wakeup))
+		{
+			/* Wake up in small sized batches.  XXX tune */
+			if (wakeups.size == 8)
+			{
+				if (!inserted_sentinel)
+				{
+					/* First batch of many to wake up.  Add sentinel. */
+					proclist_push_tail(&cv->wakeup, pgprocno, cvWaitLink);
+					inserted_sentinel = true;
+					have_sentinel = true;
+				}
+				break;
+			}
 
-	while (have_sentinel)
-	{
-		/*
-		 * Each time through the loop, remove the first wakeup list entry, and
-		 * signal it unless it's our sentinel.  Repeat as long as the sentinel
-		 * remains in the list.
-		 *
-		 * Notice that if someone else removes our sentinel, we will waken one
-		 * additional process before exiting.  That's intentional, because if
-		 * someone else signals the CV, they may be intending to waken some
-		 * third process that added itself to the list after we added the
-		 * sentinel.  Better to give a spurious wakeup (which should be
-		 * harmless beyond wasting some cycles) than to lose a wakeup.
-		 */
-		proc = NULL;
-		SpinLockAcquire(&cv->mutex);
-		if (!proclist_is_empty(&cv->wakeup))
 			proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink);
-		have_sentinel = proclist_contains(&cv->wakeup, pgprocno, cvWaitLink);
-		SpinLockRelease(&cv->mutex);
+			if (proc == MyProc)
+			{
+				/* We hit our sentinel.  We're done. */
+				have_sentinel = false;
+				break;
+			}
+			else if (!proclist_contains(&cv->wakeup, pgprocno, cvWaitLink))
+			{
+				/* Someone else hit our sentinel.  We're done. */
+				have_sentinel = false;
+			}
+			AddLatch(&wakeups, &proc->procLatch);
+		}
+		LWLockRelease(&cv->mutex);
 
-		if (proc != NULL && proc != MyProc)
-			SetLatch(&proc->procLatch);
-	}
+		/* Awaken this batch of waiters, if there were some. */
+		SetLatches(&wakeups);
+	} while (have_sentinel);
 }
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 0fc0cf6ebb..77986bac7e 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -183,6 +183,8 @@ static const char *const BuiltinTrancheNames[] = {
 	"PgStatsHash",
 	/* LWTRANCHE_PGSTATS_DATA: */
 	"PgStatsData",
+	/* LWTRANCHE_CONDITION_VARIABLE: */
+	"ConditionVariable",
 };
 
 StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h
index e89175ebd5..82b8eeabf5 100644
--- a/src/include/storage/condition_variable.h
+++ b/src/include/storage/condition_variable.h
@@ -22,12 +22,12 @@
 #ifndef CONDITION_VARIABLE_H
 #define CONDITION_VARIABLE_H
 
+#include "storage/lwlock.h"
 #include "storage/proclist_types.h"
-#include "storage/spin.h"
 
 typedef struct
 {
-	slock_t		mutex;			/* spinlock protecting the wakeup list */
+	LWLock		mutex;			/* lock protecting the wakeup list */
 	proclist_head wakeup;		/* list of wake-able processes */
 } ConditionVariable;
 
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index ca4eca76f4..dcbffe11a2 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -193,6 +193,7 @@ typedef enum BuiltinTrancheIds
 	LWTRANCHE_PGSTATS_DSA,
 	LWTRANCHE_PGSTATS_HASH,
 	LWTRANCHE_PGSTATS_DATA,
+	LWTRANCHE_CONDITION_VARIABLE,
 	LWTRANCHE_FIRST_USER_DEFINED
 }			BuiltinTrancheIds;
 
-- 
2.37.0 (Apple Git-136)

