From 701249c2abb86b33ca0576dc799301840cb7622d Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@enterprisedb.com>
Date: Fri, 29 Dec 2017 11:35:40 +1300
Subject: [PATCH] Fix condition variable livelock.

Limit the number of wakeups that ConditionVariableBroadcast() delivers using
the queue size at the beginning of the operation, so that it can't get stuck
looping forever if backends immediately rejoin the queue.  Without this limit,
resource-constrained systems could burn many seconds of CPU time stuck in this
loop when executing the new parallel-aware hash joins in join.sql, depending
on the vagaries of scheduling.

Author: Thomas Munro
Discussion: https://postgr.es/m/CAEepm%3D0NWKehYw7NDoUSf8juuKOPRnCyY3vuaSvhrEWsOTAa3w%40mail.gmail.com
---
 src/backend/storage/lmgr/condition_variable.c | 23 +++++++++++++++++++++--
 src/include/storage/condition_variable.h      |  1 +
 2 files changed, 22 insertions(+), 2 deletions(-)

diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
index b4b7d28dd5..46d1d2ffac 100644
--- a/src/backend/storage/lmgr/condition_variable.c
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -40,6 +40,7 @@ ConditionVariableInit(ConditionVariable *cv)
 {
 	SpinLockInit(&cv->mutex);
 	proclist_init(&cv->wakeup);
+	cv->wakeup_length = 0;
 }
 
 /*
@@ -80,7 +81,10 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv)
 	/* Add myself to the wait queue. */
 	SpinLockAcquire(&cv->mutex);
 	if (!proclist_contains(&cv->wakeup, pgprocno, cvWaitLink))
+	{
 		proclist_push_tail(&cv->wakeup, pgprocno, cvWaitLink);
+		++cv->wakeup_length;
+	}
 	SpinLockRelease(&cv->mutex);
 }
 
@@ -152,6 +156,7 @@ ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
 		{
 			done = true;
 			proclist_push_tail(&cv->wakeup, MyProc->pgprocno, cvWaitLink);
+			++cv->wakeup_length;
 		}
 		SpinLockRelease(&cv->mutex);
 	}
@@ -172,7 +177,11 @@ ConditionVariableCancelSleep(void)
 
 	SpinLockAcquire(&cv->mutex);
 	if (proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink))
+	{
 		proclist_delete(&cv->wakeup, MyProc->pgprocno, cvWaitLink);
+		Assert(cv->wakeup_length > 0);
+		--cv->wakeup_length;
+	}
 	SpinLockRelease(&cv->mutex);
 
 	cv_sleep_target = NULL;
@@ -191,7 +200,11 @@ ConditionVariableSignal(ConditionVariable *cv)
 	/* Remove the first process from the wakeup queue (if any). */
 	SpinLockAcquire(&cv->mutex);
 	if (!proclist_is_empty(&cv->wakeup))
+	{
 		proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink);
+		Assert(cv->wakeup_length > 0);
+		--cv->wakeup_length;
+	}
 	SpinLockRelease(&cv->mutex);
 
 	/* If we found someone sleeping, set their latch to wake them up. */
@@ -213,15 +226,21 @@ ConditionVariableSignal(ConditionVariable *cv)
 int
 ConditionVariableBroadcast(ConditionVariable *cv)
 {
+	int			wakeup_length;
 	int			nwoken = 0;
 
 	/*
 	 * Let's just do this the dumbest way possible.  We could try to dequeue
 	 * all the sleepers at once to save spinlock cycles, but it's a bit hard
 	 * to get that right in the face of possible sleep cancelations, and we
-	 * don't want to loop holding the mutex.
+	 * don't want to loop holding the mutex.  Limit the number of wakeups to
+	 * the queue size observed before we begin so that we can't loop forever
+	 * if a waker adds itself to the end of the queue again immediately.
 	 */
-	while (ConditionVariableSignal(cv))
+	SpinLockAcquire(&cv->mutex);
+	wakeup_length = cv->wakeup_length;
+	SpinLockRelease(&cv->mutex);
+	while (nwoken < wakeup_length && ConditionVariableSignal(cv))
 		++nwoken;
 
 	return nwoken;
diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h
index f77c0b22ad..dd4ea592a1 100644
--- a/src/include/storage/condition_variable.h
+++ b/src/include/storage/condition_variable.h
@@ -29,6 +29,7 @@ typedef struct
 {
 	slock_t		mutex;
 	proclist_head wakeup;
+	int			wakeup_length;
 } ConditionVariable;
 
 /* Initialize a condition variable. */
-- 
2.14.1

