On Wed, Feb 21, 2024 at 07:08:03AM +0900, Michael Paquier wrote:
> Well, both you and Andrey are asking for it now, so let's do it.  The
> implementation is simple:
> - Store in InjectionPointSharedState an array of wait_counts and an
> array of names.  There is only one condition variable.
> - When a point wants to wait, it takes the spinlock and looks within
> the array of names until it finds a free slot, adds its name into the
> array to reserve a wait counter at the same position, releases the
> spinlock.  Then it loops on the condition variable for an update of
> the counter it has reserved.  It is possible to make something more
> efficient, but at a small size it would not really matter.
> - The wakeup takes a point name in argument, acquires the spinlock,
> and checks if it can find the point into the array, pinpoints the
> location of the counter to update and updates it.  Then it broadcasts
> the change.
> - The wait loop checks its counter, leaves its loop, cancels the
> sleep, takes the spinlock to unregister from the array, and leaves.
> 
> I would just hardcode the number of points that can wait, say 5 of
> them tracked in shmem?  Does that look like what you are looking at?

I was looking at that, and it proves to work OK, so you can do stuff
like waits and wakeups for multiple processes in a controlled manner.
The attached patch authorizes up to 32 waiters.  I have switched
things so as the information reported in pg_stat_activity is the name
of the injection point itself.

Comments and ideas are welcome.
--
Michael
From 75302cba302b83ce2a6d6eaf30b163f473b87276 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Wed, 21 Feb 2024 16:36:25 +0900
Subject: [PATCH v2 1/2] injection_points: Add routines to wait and wake
 processes

This commit is made of two parts:
- A new callback that can be attached to a process to make it wait on a
condition variable.  The condition checked is registered in shared
memory by the module injection_points.
- A new SQL function to update the shared state and broadcast the update
using a condition variable.

The shared state used by the module is registered using the DSM
registry, and is optional.
---
 .../injection_points--1.0.sql                 |  10 ++
 .../injection_points/injection_points.c       | 151 ++++++++++++++++++
 src/tools/pgindent/typedefs.list              |   1 +
 3 files changed, 162 insertions(+)

diff --git a/src/test/modules/injection_points/injection_points--1.0.sql b/src/test/modules/injection_points/injection_points--1.0.sql
index 5944c41716..eed0310cf6 100644
--- a/src/test/modules/injection_points/injection_points--1.0.sql
+++ b/src/test/modules/injection_points/injection_points--1.0.sql
@@ -24,6 +24,16 @@ RETURNS void
 AS 'MODULE_PATHNAME', 'injection_points_run'
 LANGUAGE C STRICT PARALLEL UNSAFE;
 
+--
+-- injection_points_wakeup()
+--
+-- Wakes a condition variable waited on in an injection point.
+--
+CREATE FUNCTION injection_points_wakeup(IN point_name TEXT)
+RETURNS void
+AS 'MODULE_PATHNAME', 'injection_points_wakeup'
+LANGUAGE C STRICT PARALLEL UNSAFE;
+
 --
 -- injection_points_detach()
 --
diff --git a/src/test/modules/injection_points/injection_points.c b/src/test/modules/injection_points/injection_points.c
index e843e6594f..052b20f9c8 100644
--- a/src/test/modules/injection_points/injection_points.c
+++ b/src/test/modules/injection_points/injection_points.c
@@ -18,18 +18,72 @@
 #include "postgres.h"
 
 #include "fmgr.h"
+#include "storage/condition_variable.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
+#include "storage/dsm_registry.h"
 #include "utils/builtins.h"
 #include "utils/injection_point.h"
 #include "utils/wait_event.h"
 
 PG_MODULE_MAGIC;
 
+/* Maximum number of wait usable in injection points at once */
+#define INJ_MAX_WAIT	32
+#define INJ_NAME_MAXLEN	64
+
+/* Shared state information for injection points. */
+typedef struct InjectionPointSharedState
+{
+	/* protects accesses to wait_counts */
+	slock_t		lock;
+
+	/* Counters advancing when injection_points_wakeup() is called */
+	int			wait_counts[INJ_MAX_WAIT];
+
+	/* Names of injection points attached to wait counters */
+	char		name[INJ_MAX_WAIT][INJ_NAME_MAXLEN];
+
+	/*
+	 * Condition variable used for waits and wakeups, checking upon the set of
+	 * wait_counts when waiting.
+	 */
+	ConditionVariable wait_point;
+} InjectionPointSharedState;
+
+/* Pointer to shared-memory state. */
+static InjectionPointSharedState *inj_state = NULL;
+
 extern PGDLLEXPORT void injection_error(const char *name);
 extern PGDLLEXPORT void injection_notice(const char *name);
+extern PGDLLEXPORT void injection_wait(const char *name);
 
 
+static void
+injection_point_init_state(void *ptr)
+{
+	InjectionPointSharedState *state = (InjectionPointSharedState *) ptr;
+
+	SpinLockInit(&state->lock);
+	memset(state->wait_counts, 0, sizeof(state->wait_counts));
+	memset(state->name, 0, sizeof(state->name));
+	ConditionVariableInit(&state->wait_point);
+}
+
+static void
+injection_init_shmem(void)
+{
+	bool		found;
+
+	if (inj_state != NULL)
+		return;
+
+	inj_state = GetNamedDSMSegment("injection_points",
+								   sizeof(InjectionPointSharedState),
+								   injection_point_init_state,
+								   &found);
+}
+
 /* Set of callbacks available to be attached to an injection point. */
 void
 injection_error(const char *name)
@@ -43,6 +97,65 @@ injection_notice(const char *name)
 	elog(NOTICE, "notice triggered for injection point %s", name);
 }
 
+/* Wait on a condition variable, awaken by injection_points_wakeup() */
+void
+injection_wait(const char *name)
+{
+	int			old_wait_counts = -1;
+	int			index = -1;
+	uint32		injection_wait_event = 0;
+
+	if (inj_state == NULL)
+		injection_init_shmem();
+
+	/*
+	 * This custom wait event name is not released, but we don't care much for
+	 * testing as this will be short-lived.
+	 */
+	injection_wait_event = WaitEventExtensionNew(name);
+
+	/*
+	 * Find a free slot to wait for, and register this injection point's name.
+	 */
+	SpinLockAcquire(&inj_state->lock);
+	for (int i = 0; i < INJ_MAX_WAIT; i++)
+	{
+		if (inj_state->name[i][0] == '\0')
+		{
+			index = i;
+			strlcpy(inj_state->name[i], name, INJ_NAME_MAXLEN);
+			old_wait_counts = inj_state->wait_counts[i];
+			break;
+		}
+	}
+	SpinLockRelease(&inj_state->lock);
+
+	if (index < 0)
+		elog(ERROR, "could not find free slot for wait of injection point %s ",
+			 name);
+
+	/* And sleep.. */
+	ConditionVariablePrepareToSleep(&inj_state->wait_point);
+	for (;;)
+	{
+		int			new_wait_counts;
+
+		SpinLockAcquire(&inj_state->lock);
+		new_wait_counts = inj_state->wait_counts[index];
+		SpinLockRelease(&inj_state->lock);
+
+		if (old_wait_counts != new_wait_counts)
+			break;
+		ConditionVariableSleep(&inj_state->wait_point, injection_wait_event);
+	}
+	ConditionVariableCancelSleep();
+
+	/* Remove us from the waiting list */
+	SpinLockAcquire(&inj_state->lock);
+	inj_state->name[index][0] = '\0';
+	SpinLockRelease(&inj_state->lock);
+}
+
 /*
  * SQL function for creating an injection point.
  */
@@ -58,6 +171,8 @@ injection_points_attach(PG_FUNCTION_ARGS)
 		function = "injection_error";
 	else if (strcmp(action, "notice") == 0)
 		function = "injection_notice";
+	else if (strcmp(action, "wait") == 0)
+		function = "injection_wait";
 	else
 		elog(ERROR, "incorrect action \"%s\" for injection point creation", action);
 
@@ -80,6 +195,42 @@ injection_points_run(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
+/*
+ * SQL function for waking a condition variable.
+ */
+PG_FUNCTION_INFO_V1(injection_points_wakeup);
+Datum
+injection_points_wakeup(PG_FUNCTION_ARGS)
+{
+	char	   *name = text_to_cstring(PG_GETARG_TEXT_PP(0));
+	int			index = -1;
+
+	if (inj_state == NULL)
+		injection_init_shmem();
+
+	/* First bump the wait counter for the injection point to wake */
+	SpinLockAcquire(&inj_state->lock);
+	for (int i = 0; i < INJ_MAX_WAIT; i++)
+	{
+		if (strcmp(name, inj_state->name[i]) == 0)
+		{
+			index = i;
+			break;
+		}
+	}
+	if (index < 0)
+	{
+		SpinLockRelease(&inj_state->lock);
+		elog(ERROR, "could not find injection point %s to wake", name);
+	}
+	inj_state->wait_counts[index]++;
+	SpinLockRelease(&inj_state->lock);
+
+	/* And broadcast the change for the waiters */
+	ConditionVariableBroadcast(&inj_state->wait_point);
+	PG_RETURN_VOID();
+}
+
 /*
  * SQL function for dropping an injection point.
  */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index d808aad8b0..d7eca00502 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1208,6 +1208,7 @@ InitializeDSMForeignScan_function
 InitializeWorkerForeignScan_function
 InjectionPointCacheEntry
 InjectionPointEntry
+InjectionPointSharedState
 InlineCodeBlock
 InsertStmt
 Instrumentation
-- 
2.43.0

From 79cc3e309c15bec19fec8c3cb2ac2e48bd42d7a2 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Wed, 21 Feb 2024 16:37:17 +0900
Subject: [PATCH v2 2/2] Add regression test for restart points during
 promotion

This test fails when 7863ee4def65 is reverted, checking that a node is
able to properly restart following a crash when a restart point was
finishing across a promotion.

This is an old bug that had no coverage, and injection points make that
cheap to achieve.
---
 src/backend/access/transam/xlog.c             |   7 +
 src/test/recovery/Makefile                    |   7 +-
 src/test/recovery/meson.build                 |   4 +
 .../t/041_invalid_checkpoint_after_promote.pl | 176 ++++++++++++++++++
 4 files changed, 193 insertions(+), 1 deletion(-)
 create mode 100644 src/test/recovery/t/041_invalid_checkpoint_after_promote.pl

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 50c347a679..50b045ff08 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -100,6 +100,7 @@
 #include "storage/sync.h"
 #include "utils/guc_hooks.h"
 #include "utils/guc_tables.h"
+#include "utils/injection_point.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
 #include "utils/relmapper.h"
@@ -7536,6 +7537,12 @@ CreateRestartPoint(int flags)
 
 	CheckPointGuts(lastCheckPoint.redo, flags);
 
+	/*
+	 * This location needs to be after CheckPointGuts() to ensure that some
+	 * work has already happened during this checkpoint.
+	 */
+	INJECTION_POINT("CreateRestartPoint");
+
 	/*
 	 * Remember the prior checkpoint's redo ptr for
 	 * UpdateCheckPointDistanceEstimate()
diff --git a/src/test/recovery/Makefile b/src/test/recovery/Makefile
index 17ee353735..f57baba5e8 100644
--- a/src/test/recovery/Makefile
+++ b/src/test/recovery/Makefile
@@ -9,12 +9,17 @@
 #
 #-------------------------------------------------------------------------
 
-EXTRA_INSTALL=contrib/pg_prewarm contrib/pg_stat_statements contrib/test_decoding
+EXTRA_INSTALL=contrib/pg_prewarm \
+	contrib/pg_stat_statements \
+	contrib/test_decoding \
+	src/test/modules/injection_points
 
 subdir = src/test/recovery
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
+export enable_injection_points enable_injection_points
+
 # required for 017_shm.pl and 027_stream_regress.pl
 REGRESS_SHLIB=$(abs_top_builddir)/src/test/regress/regress$(DLSUFFIX)
 export REGRESS_SHLIB
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index bf087ac2a9..e4e0e2b4cc 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -6,6 +6,9 @@ tests += {
   'bd': meson.current_build_dir(),
   'tap': {
     'test_kwargs': {'priority': 40}, # recovery tests are slow, start early
+    'env': {
+       'enable_injection_points': get_option('injection_points') ? 'yes' : 'no',
+    },
     'tests': [
       't/001_stream_rep.pl',
       't/002_archiving.pl',
@@ -46,6 +49,7 @@ tests += {
       't/038_save_logical_slots_shutdown.pl',
       't/039_end_of_wal.pl',
       't/040_standby_failover_slots_sync.pl',
+      't/041_invalid_checkpoint_after_promote.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/041_invalid_checkpoint_after_promote.pl b/src/test/recovery/t/041_invalid_checkpoint_after_promote.pl
new file mode 100644
index 0000000000..e91f360d12
--- /dev/null
+++ b/src/test/recovery/t/041_invalid_checkpoint_after_promote.pl
@@ -0,0 +1,176 @@
+
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Time::HiRes qw(usleep);
+use Test::More;
+
+##################################################
+# Test race condition when a restart point is running during a promotion,
+# checking that WAL segments are correctly removed in the restart point
+# while the promotion finishes.
+#
+# This test relies on an injection point that causes the checkpointer to
+# wait in the middle of a restart point on a standby.  The checkpointer
+# is awaken to finish its restart point only once the promotion of the
+# standby is completed, and the node should be able to restart properly.
+##################################################
+
+if ($ENV{enable_injection_points} ne 'yes')
+{
+	plan skip_all => 'Injection points not supported by this build';
+}
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('master');
+$node_primary->init(allows_streaming => 1);
+$node_primary->append_conf(
+	'postgresql.conf', q[
+checkpoint_timeout = 30s
+log_checkpoints = on
+restart_after_crash = on
+]);
+$node_primary->start;
+
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Setup a standby
+my $node_standby = PostgreSQL::Test::Cluster->new('standby1');
+$node_standby->init_from_backup($node_primary, $backup_name,
+	has_streaming => 1);
+$node_standby->start;
+
+# Dummy table for the upcoming tests.
+$node_primary->safe_psql('postgres', 'checkpoint');
+$node_primary->safe_psql('postgres', 'CREATE TABLE prim_tab (a int);');
+
+# Register an injection point on the standby so as the follow-up
+# restart point will wait on it.
+$node_primary->safe_psql('postgres', 'CREATE EXTENSION injection_points;');
+# Wait until the extension has been created on the standby
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# Note that from this point the checkpointer will wait in the middle of
+# a restart point on the standby.
+$node_standby->safe_psql('postgres',
+	"SELECT injection_points_attach('CreateRestartPoint', 'wait');");
+
+# Execute a restart point on the standby, that we will now be waiting on.
+# This needs to be in the background.
+my $logstart = -s $node_standby->logfile;
+my $psql_session =
+  $node_standby->background_psql('postgres', on_error_stop => 0);
+$psql_session->query_until(
+	qr/starting_checkpoint/, q(
+   \echo starting_checkpoint
+   CHECKPOINT;
+));
+
+# Switch one WAL segment to make the previous restart point remove the
+# segment once the restart point completes.
+$node_primary->safe_psql('postgres', 'INSERT INTO prim_tab VALUES (1);');
+$node_primary->safe_psql('postgres', 'SELECT pg_switch_wal();');
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# Wait until the checkpointer is in the middle of the restart point
+# processing, relying on the custom wait event generated in the
+# wait callback used in the injection point previously attached.
+ok( $node_standby->poll_query_until(
+		'postgres',
+		qq[SELECT count(*) FROM pg_stat_activity
+           WHERE backend_type = 'checkpointer' AND wait_event = 'CreateRestartPoint' ;],
+		'1'),
+	'checkpointer is waiting in restart point'
+) or die "Timed out while waiting for checkpointer to run restart point";
+
+# Check the logs that the restart point has started on standby.  This is
+# optional, but let's be sure.
+my $log = slurp_file($node_standby->logfile, $logstart);
+my $checkpoint_start = 0;
+if ($log =~ m/restartpoint starting: immediate wait/)
+{
+	$checkpoint_start = 1;
+}
+is($checkpoint_start, 1, 'restartpoint has started');
+
+# Trigger promotion during the restart point.
+$node_primary->stop;
+$node_standby->promote;
+
+# Update the start position before waking up the checkpointer!
+$logstart = -s $node_standby->logfile;
+
+# Now wake up the checkpointer.
+$node_standby->safe_psql('postgres',
+	"SELECT injection_points_wakeup('CreateRestartPoint');");
+
+# Wait until the previous restart point completes on the newly-promoted
+# standby, checking the logs for that.
+my $checkpoint_complete = 0;
+foreach my $i (0 .. 10 * $PostgreSQL::Test::Utils::timeout_default)
+{
+	my $log = slurp_file($node_standby->logfile, $logstart);
+	if ($log =~ m/restartpoint complete/)
+	{
+		$checkpoint_complete = 1;
+		last;
+	}
+	usleep(100_000);
+}
+is($checkpoint_complete, 1, 'restart point has completed');
+
+# Kill with SIGKILL, forcing all the backends to restart.
+my $psql_timeout = IPC::Run::timer(3600);
+my ($killme_stdin, $killme_stdout, $killme_stderr) = ('', '', '');
+my $killme = IPC::Run::start(
+	[
+		'psql', '-XAtq', '-v', 'ON_ERROR_STOP=1', '-f', '-', '-d',
+		$node_standby->connstr('postgres')
+	],
+	'<',
+	\$killme_stdin,
+	'>',
+	\$killme_stdout,
+	'2>',
+	\$killme_stderr,
+	$psql_timeout);
+$killme_stdin .= q[
+SELECT pg_backend_pid();
+];
+$killme->pump until $killme_stdout =~ /[[:digit:]]+[\r\n]$/;
+my $pid = $killme_stdout;
+chomp($pid);
+$killme_stdout = '';
+$killme_stderr = '';
+
+my $ret = PostgreSQL::Test::Utils::system_log('pg_ctl', 'kill', 'KILL', $pid);
+is($ret, 0, 'killed process with KILL');
+
+# Wait until the server restarts, finish consuming output.
+$killme_stdin .= q[
+SELECT 1;
+];
+ok( pump_until(
+		$killme,
+		$psql_timeout,
+		\$killme_stderr,
+		qr/server closed the connection unexpectedly|connection to server was lost|could not send data to server/m
+	),
+	"psql query died successfully after SIGKILL");
+$killme->finish;
+
+# Wait till server finishes restarting
+$node_standby->poll_query_until('postgres', undef, '');
+
+# After recovery, the server should be able to start.
+my $stdout;
+my $stderr;
+($ret, $stdout, $stderr) = $node_standby->psql('postgres', 'select 1');
+is($ret, 0, "psql connect success");
+is($stdout, 1, "psql select 1");
+
+done_testing();
-- 
2.43.0

Attachment: signature.asc
Description: PGP signature

Reply via email to