Hi all,
(Adding Andrey in CC, as I'm sure he is interested in that.)

While looking at the test proposed on the thread about the ProcKill(),
I have been reminded about the fact that relying on latches and a
condition variable for the wait and the wakeups has its limits:
https://www.postgresql.org/message-id/aheVjCHmcbXBtiy0%40paquier.xyz

In this case, we are trying to synchronize backends once they don't
have latch assigned anymore, which defeats the purpose of wait/wakeup
because the condition variable used in injection_points while waiting
expects a Latch to be set for the processes we are waiting on.

Folks have complained about this limitation a couple of times in the
past, and I never got around to do something about it.  While looking
at that I have finished with the patch attached, which was
surprisingly simpler than what I thought was needed.  This replaces
the condition variable with a set of atomic counters.  The counters
are incremented at wakeup, and the wait checks them on a periodic
basis.  The wait loop uses a delay that increases over time, maxed at
100ms so as we can get a good responsiveness on fast machines, without
burning CPU for nothing in tests that require more wait time due to a
tight loop with the counter checks.

One thing worth noticing is the CHECK_FOR_INTERRUPTS() in the wait
loop, which is something we need for the autovacuum test in test_misc
that requires some signaling and interrupt processing.

It may make sense to be conservative and limit ourselves to do this
change on HEAD, but I'd like to suggest a backpatch down to v17 so as
future tests that rely on a such change can be backpatched.  I would
need this change for the other test, still consistency in the facility
primes for me here.

Note: The CI seems happy with the patch.

Thoughts or comments?
--
Michael
From 038b0f55dfe80f168fdc1b01b8cdadbf38fedfa2 Mon Sep 17 00:00:00 2001
From: Michael Paquier <[email protected]>
Date: Thu, 28 May 2026 11:15:33 +0900
Subject: [PATCH] injection_points: Switch wait/wakeup to rely on atomics

This removes the dependency based on counters and environment variables,
replacing the waiting loop by a wait on an atomic counter, whose check
increases over time in an exponential manner (starts at 10us, up to
100ms).
---
 .../injection_points/injection_points.c       | 53 ++++++++++---------
 1 file changed, 27 insertions(+), 26 deletions(-)

diff --git a/src/test/modules/injection_points/injection_points.c 
b/src/test/modules/injection_points/injection_points.c
index ba282e3dcabf..9b8e1aaad0b0 100644
--- a/src/test/modules/injection_points/injection_points.c
+++ b/src/test/modules/injection_points/injection_points.c
@@ -23,11 +23,11 @@
 #include "miscadmin.h"
 #include "nodes/pg_list.h"
 #include "nodes/value.h"
-#include "storage/condition_variable.h"
 #include "storage/dsm_registry.h"
 #include "storage/ipc.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
+#include "storage/spin.h"
 #include "utils/builtins.h"
 #include "utils/guc.h"
 #include "utils/injection_point.h"
@@ -59,13 +59,10 @@ typedef struct InjectionPointSharedState
        slock_t         lock;
 
        /* Counters advancing when injection_points_wakeup() is called */
-       uint32          wait_counts[INJ_MAX_WAIT];
+       pg_atomic_uint32 wait_counts[INJ_MAX_WAIT];
 
        /* Names of injection points attached to wait counters */
        char            name[INJ_MAX_WAIT][INJ_NAME_MAXLEN];
-
-       /* Condition variable used for waits and wakeups */
-       ConditionVariable wait_point;
 } InjectionPointSharedState;
 
 /* Pointer to shared-memory state. */
@@ -102,9 +99,9 @@ injection_point_init_state(void *ptr, void *arg)
        InjectionPointSharedState *state = (InjectionPointSharedState *) ptr;
 
        SpinLockInit(&state->lock);
-       memset(state->wait_counts, 0, sizeof(state->wait_counts));
        memset(state->name, 0, sizeof(state->name));
-       ConditionVariableInit(&state->wait_point);
+       for (int i = 0; i < INJ_MAX_WAIT; i++)
+               pg_atomic_init_u32(&state->wait_counts[i], 0);
 }
 
 static void
@@ -222,7 +219,7 @@ injection_notice(const char *name, const void 
*private_data, void *arg)
                elog(NOTICE, "notice triggered for injection point %s", name);
 }
 
-/* Wait on a condition variable, awaken by injection_points_wakeup() */
+/* Wait until injection_points_wakeup() is called */
 void
 injection_wait(const char *name, const void *private_data, void *arg)
 {
@@ -254,31 +251,37 @@ injection_wait(const char *name, const void 
*private_data, void *arg)
                {
                        index = i;
                        strlcpy(inj_state->name[i], name, INJ_NAME_MAXLEN);
-                       old_wait_counts = inj_state->wait_counts[i];
+                       old_wait_counts = 
pg_atomic_read_u32(&inj_state->wait_counts[i]);
                        break;
                }
        }
        SpinLockRelease(&inj_state->lock);
 
        if (index < 0)
-               elog(ERROR, "could not find free slot for wait of injection 
point %s ",
+               elog(ERROR, "could not find free slot for wait of injection 
point %s",
                         name);
 
-       /* And sleep.. */
-       ConditionVariablePrepareToSleep(&inj_state->wait_point);
-       for (;;)
+       /*
+        * Wait until the counter is bumped by injection_points_wakeup().
+        *
+        * This loop starts with a short delay for responsiveness, enlarged to
+        * ease the CPU workload in slower environments.
+        */
+#define INJ_WAIT_INITIAL_US            10      /* 10us */
+#define INJ_WAIT_MAX_US                        100000  /* 100ms */
+       pgstat_report_wait_start(injection_wait_event);
        {
-               uint32          new_wait_counts;
+               int                     delay_us = INJ_WAIT_INITIAL_US;
 
-               SpinLockAcquire(&inj_state->lock);
-               new_wait_counts = inj_state->wait_counts[index];
-               SpinLockRelease(&inj_state->lock);
-
-               if (old_wait_counts != new_wait_counts)
-                       break;
-               ConditionVariableSleep(&inj_state->wait_point, 
injection_wait_event);
+               while (pg_atomic_read_u32(&inj_state->wait_counts[index]) == 
old_wait_counts)
+               {
+                       CHECK_FOR_INTERRUPTS();
+                       pg_usleep(delay_us);
+                       if (delay_us < INJ_WAIT_MAX_US)
+                               delay_us *= 2;
+               }
        }
-       ConditionVariableCancelSleep();
+       pgstat_report_wait_end();
 
        /* Remove this injection point from the waiters. */
        SpinLockAcquire(&inj_state->lock);
@@ -443,7 +446,7 @@ injection_points_wakeup(PG_FUNCTION_ARGS)
        if (inj_state == NULL)
                injection_init_shmem();
 
-       /* First bump the wait counter for the injection point to wake up */
+       /* Find the injection point then bump its wait counter */
        SpinLockAcquire(&inj_state->lock);
        for (int i = 0; i < INJ_MAX_WAIT; i++)
        {
@@ -458,11 +461,9 @@ injection_points_wakeup(PG_FUNCTION_ARGS)
                SpinLockRelease(&inj_state->lock);
                elog(ERROR, "could not find injection point %s to wake up", 
name);
        }
-       inj_state->wait_counts[index]++;
        SpinLockRelease(&inj_state->lock);
 
-       /* And broadcast the change to the waiters */
-       ConditionVariableBroadcast(&inj_state->wait_point);
+       pg_atomic_fetch_add_u32(&inj_state->wait_counts[index], 1);
        PG_RETURN_VOID();
 }
 
-- 
2.54.0

Attachment: signature.asc
Description: PGP signature

Reply via email to