On Thu, Feb 22, 2024 at 08:00:24AM +0000, Bertrand Drouvot wrote:
> +/* Maximum number of wait usable in injection points at once */
> 
> s/Maximum number of wait/Maximum number of waits/ ?

Thanks.  I've edited a few more places while scanning the whole.

> 
> 2 ===
> 
> +# Check the logs that the restart point has started on standby.  This is
> +# optional, but let's be sure.
> +my $log = slurp_file($node_standby->logfile, $logstart);
> +my $checkpoint_start = 0;
> +if ($log =~ m/restartpoint starting: immediate wait/)
> +{
> +       $checkpoint_start = 1;
> +}
> +is($checkpoint_start, 1, 'restartpoint has started');
> 
> what about?
> 
> ok( $node_standby->log_contains( "restartpoint starting: immediate wait", 
> $logstart),
>     "restartpoint has started");

And I'm behind the commit that introduced it (392ea0c78fdb).  It is
possible to remove the dependency to slurp_file() entirely by
switching the second location checking the logs for the checkpoint
completion.

> Except for the above, v3 looks good to me.

Thanks.  I'm looking at applying that at the beginning of next week if
there are no more comments, to get something by the feature freeze.
We could be more flexible for all that as that's related to testing,
but let's be in the clear.

I've also renamed the test to 041_checkpoint_at_promote.pl, as now
that the original is fixed, the checkpoint is not invalid.  That's
cleaner this way.
--
Michael
From 1da8e77398bdb2e3a2c8a1d14a88c991fcae7919 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Mon, 26 Feb 2024 12:44:36 +0900
Subject: [PATCH v4 1/2] injection_points: Add routines to wait and wake
 processes

This commit is made of two parts:
- A new callback that can be attached to a process to make it wait on a
condition variable.  The condition checked is registered in shared
memory by the module injection_points.
- A new SQL function to update the shared state and broadcast the update
using a condition variable.

The shared state used by the module is registered using the DSM
registry, and is optional.
---
 .../injection_points--1.0.sql                 |  10 ++
 .../injection_points/injection_points.c       | 155 ++++++++++++++++++
 src/tools/pgindent/typedefs.list              |   1 +
 3 files changed, 166 insertions(+)

diff --git a/src/test/modules/injection_points/injection_points--1.0.sql b/src/test/modules/injection_points/injection_points--1.0.sql
index 5944c41716..0a2e59aba7 100644
--- a/src/test/modules/injection_points/injection_points--1.0.sql
+++ b/src/test/modules/injection_points/injection_points--1.0.sql
@@ -24,6 +24,16 @@ RETURNS void
 AS 'MODULE_PATHNAME', 'injection_points_run'
 LANGUAGE C STRICT PARALLEL UNSAFE;
 
+--
+-- injection_points_wakeup()
+--
+-- Wakes up a waiting injection point.
+--
+CREATE FUNCTION injection_points_wakeup(IN point_name TEXT)
+RETURNS void
+AS 'MODULE_PATHNAME', 'injection_points_wakeup'
+LANGUAGE C STRICT PARALLEL UNSAFE;
+
 --
 -- injection_points_detach()
 --
diff --git a/src/test/modules/injection_points/injection_points.c b/src/test/modules/injection_points/injection_points.c
index e843e6594f..255941d105 100644
--- a/src/test/modules/injection_points/injection_points.c
+++ b/src/test/modules/injection_points/injection_points.c
@@ -18,18 +18,75 @@
 #include "postgres.h"
 
 #include "fmgr.h"
+#include "storage/condition_variable.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
+#include "storage/dsm_registry.h"
 #include "utils/builtins.h"
 #include "utils/injection_point.h"
 #include "utils/wait_event.h"
 
 PG_MODULE_MAGIC;
 
+/* Maximum number of waits usable in injection points at once */
+#define INJ_MAX_WAIT	32
+#define INJ_NAME_MAXLEN	64
+
+/* Shared state information for injection points. */
+typedef struct InjectionPointSharedState
+{
+	/* Protects access to other fields */
+	slock_t		lock;
+
+	/* Counters advancing when injection_points_wakeup() is called */
+	uint32		wait_counts[INJ_MAX_WAIT];
+
+	/* Names of injection points attached to wait counters */
+	char		name[INJ_MAX_WAIT][INJ_NAME_MAXLEN];
+
+	/* Condition variable used for waits and wakeups */
+	ConditionVariable wait_point;
+} InjectionPointSharedState;
+
+/* Pointer to shared-memory state. */
+static InjectionPointSharedState *inj_state = NULL;
+
 extern PGDLLEXPORT void injection_error(const char *name);
 extern PGDLLEXPORT void injection_notice(const char *name);
+extern PGDLLEXPORT void injection_wait(const char *name);
 
 
+/*
+ * Callback for shared memory area initialization.
+ */
+static void
+injection_point_init_state(void *ptr)
+{
+	InjectionPointSharedState *state = (InjectionPointSharedState *) ptr;
+
+	SpinLockInit(&state->lock);
+	memset(state->wait_counts, 0, sizeof(state->wait_counts));
+	memset(state->name, 0, sizeof(state->name));
+	ConditionVariableInit(&state->wait_point);
+}
+
+/*
+ * Initialize shared memory area for this module.
+ */
+static void
+injection_init_shmem(void)
+{
+	bool		found;
+
+	if (inj_state != NULL)
+		return;
+
+	inj_state = GetNamedDSMSegment("injection_points",
+								   sizeof(InjectionPointSharedState),
+								   injection_point_init_state,
+								   &found);
+}
+
 /* Set of callbacks available to be attached to an injection point. */
 void
 injection_error(const char *name)
@@ -43,6 +100,66 @@ injection_notice(const char *name)
 	elog(NOTICE, "notice triggered for injection point %s", name);
 }
 
+/* Wait on a condition variable, awaken by injection_points_wakeup() */
+void
+injection_wait(const char *name)
+{
+	uint32		old_wait_counts = 0;
+	int			index = -1;
+	uint32		injection_wait_event = 0;
+
+	if (inj_state == NULL)
+		injection_init_shmem();
+
+	/*
+	 * Use the injection point name for this custom wait event.  Note that
+	 * this custom wait event name is not released, but we don't care much for
+	 * testing as this should be short-lived.
+	 */
+	injection_wait_event = WaitEventExtensionNew(name);
+
+	/*
+	 * Find a free slot to wait for, and register this injection point's name.
+	 */
+	SpinLockAcquire(&inj_state->lock);
+	for (int i = 0; i < INJ_MAX_WAIT; i++)
+	{
+		if (inj_state->name[i][0] == '\0')
+		{
+			index = i;
+			strlcpy(inj_state->name[i], name, INJ_NAME_MAXLEN);
+			old_wait_counts = inj_state->wait_counts[i];
+			break;
+		}
+	}
+	SpinLockRelease(&inj_state->lock);
+
+	if (index < 0)
+		elog(ERROR, "could not find free slot for wait of injection point %s ",
+			 name);
+
+	/* And sleep.. */
+	ConditionVariablePrepareToSleep(&inj_state->wait_point);
+	for (;;)
+	{
+		uint32		new_wait_counts;
+
+		SpinLockAcquire(&inj_state->lock);
+		new_wait_counts = inj_state->wait_counts[index];
+		SpinLockRelease(&inj_state->lock);
+
+		if (old_wait_counts != new_wait_counts)
+			break;
+		ConditionVariableSleep(&inj_state->wait_point, injection_wait_event);
+	}
+	ConditionVariableCancelSleep();
+
+	/* Remove this injection point from the waiters. */
+	SpinLockAcquire(&inj_state->lock);
+	inj_state->name[index][0] = '\0';
+	SpinLockRelease(&inj_state->lock);
+}
+
 /*
  * SQL function for creating an injection point.
  */
@@ -58,6 +175,8 @@ injection_points_attach(PG_FUNCTION_ARGS)
 		function = "injection_error";
 	else if (strcmp(action, "notice") == 0)
 		function = "injection_notice";
+	else if (strcmp(action, "wait") == 0)
+		function = "injection_wait";
 	else
 		elog(ERROR, "incorrect action \"%s\" for injection point creation", action);
 
@@ -80,6 +199,42 @@ injection_points_run(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
+/*
+ * SQL function for waking up an injection point waiting in injection_wait().
+ */
+PG_FUNCTION_INFO_V1(injection_points_wakeup);
+Datum
+injection_points_wakeup(PG_FUNCTION_ARGS)
+{
+	char	   *name = text_to_cstring(PG_GETARG_TEXT_PP(0));
+	int			index = -1;
+
+	if (inj_state == NULL)
+		injection_init_shmem();
+
+	/* First bump the wait counter for the injection point to wake up */
+	SpinLockAcquire(&inj_state->lock);
+	for (int i = 0; i < INJ_MAX_WAIT; i++)
+	{
+		if (strcmp(name, inj_state->name[i]) == 0)
+		{
+			index = i;
+			break;
+		}
+	}
+	if (index < 0)
+	{
+		SpinLockRelease(&inj_state->lock);
+		elog(ERROR, "could not find injection point %s to wake up", name);
+	}
+	inj_state->wait_counts[index]++;
+	SpinLockRelease(&inj_state->lock);
+
+	/* And broadcast the change to the waiters */
+	ConditionVariableBroadcast(&inj_state->wait_point);
+	PG_RETURN_VOID();
+}
+
 /*
  * SQL function for dropping an injection point.
  */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index fc8b15d0cf..4742784ea3 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1210,6 +1210,7 @@ InitializeDSMForeignScan_function
 InitializeWorkerForeignScan_function
 InjectionPointCacheEntry
 InjectionPointEntry
+InjectionPointSharedState
 InlineCodeBlock
 InsertStmt
 Instrumentation
-- 
2.43.0

From a86fd5268953372ada45b393cffc426ff3d6f12e Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Mon, 26 Feb 2024 12:55:36 +0900
Subject: [PATCH v4 2/2] Add regression test for restart points during
 promotion

This test fails when 7863ee4def65 is reverted, checking that a node is
able to properly restart following a crash when a restart point was
finishing across a promotion.

This is an old bug that had no coverage, and injection points make that
cheap to achieve.
---
 src/backend/access/transam/xlog.c             |   7 +
 src/test/recovery/Makefile                    |   7 +-
 src/test/recovery/meson.build                 |   4 +
 .../recovery/t/041_checkpoint_at_promote.pl   | 171 ++++++++++++++++++
 4 files changed, 188 insertions(+), 1 deletion(-)
 create mode 100644 src/test/recovery/t/041_checkpoint_at_promote.pl

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index c1162d55bf..dbf820f988 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -100,6 +100,7 @@
 #include "storage/sync.h"
 #include "utils/guc_hooks.h"
 #include "utils/guc_tables.h"
+#include "utils/injection_point.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
 #include "utils/relmapper.h"
@@ -7536,6 +7537,12 @@ CreateRestartPoint(int flags)
 
 	CheckPointGuts(lastCheckPoint.redo, flags);
 
+	/*
+	 * This location needs to be after CheckPointGuts() to ensure that some
+	 * work has already happened during this checkpoint.
+	 */
+	INJECTION_POINT("CreateRestartPoint");
+
 	/*
 	 * Remember the prior checkpoint's redo ptr for
 	 * UpdateCheckPointDistanceEstimate()
diff --git a/src/test/recovery/Makefile b/src/test/recovery/Makefile
index 17ee353735..f57baba5e8 100644
--- a/src/test/recovery/Makefile
+++ b/src/test/recovery/Makefile
@@ -9,12 +9,17 @@
 #
 #-------------------------------------------------------------------------
 
-EXTRA_INSTALL=contrib/pg_prewarm contrib/pg_stat_statements contrib/test_decoding
+EXTRA_INSTALL=contrib/pg_prewarm \
+	contrib/pg_stat_statements \
+	contrib/test_decoding \
+	src/test/modules/injection_points
 
 subdir = src/test/recovery
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
+export enable_injection_points enable_injection_points
+
 # required for 017_shm.pl and 027_stream_regress.pl
 REGRESS_SHLIB=$(abs_top_builddir)/src/test/regress/regress$(DLSUFFIX)
 export REGRESS_SHLIB
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index bf087ac2a9..c67249500e 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -6,6 +6,9 @@ tests += {
   'bd': meson.current_build_dir(),
   'tap': {
     'test_kwargs': {'priority': 40}, # recovery tests are slow, start early
+    'env': {
+       'enable_injection_points': get_option('injection_points') ? 'yes' : 'no',
+    },
     'tests': [
       't/001_stream_rep.pl',
       't/002_archiving.pl',
@@ -46,6 +49,7 @@ tests += {
       't/038_save_logical_slots_shutdown.pl',
       't/039_end_of_wal.pl',
       't/040_standby_failover_slots_sync.pl',
+      't/041_checkpoint_at_promote.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/041_checkpoint_at_promote.pl b/src/test/recovery/t/041_checkpoint_at_promote.pl
new file mode 100644
index 0000000000..47381a2c82
--- /dev/null
+++ b/src/test/recovery/t/041_checkpoint_at_promote.pl
@@ -0,0 +1,171 @@
+
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Time::HiRes qw(usleep);
+use Test::More;
+
+##################################################
+# Test race condition when a restart point is running during a promotion,
+# checking that WAL segments are correctly removed in the restart point
+# while the promotion finishes.
+#
+# This test relies on an injection point that causes the checkpointer to
+# wait in the middle of a restart point on a standby.  The checkpointer
+# is awaken to finish its restart point only once the promotion of the
+# standby is completed, and the node should be able to restart properly.
+##################################################
+
+if ($ENV{enable_injection_points} ne 'yes')
+{
+	plan skip_all => 'Injection points not supported by this build';
+}
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('master');
+$node_primary->init(allows_streaming => 1);
+$node_primary->append_conf(
+	'postgresql.conf', q[
+checkpoint_timeout = 30s
+log_checkpoints = on
+restart_after_crash = on
+]);
+$node_primary->start;
+
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Setup a standby
+my $node_standby = PostgreSQL::Test::Cluster->new('standby1');
+$node_standby->init_from_backup($node_primary, $backup_name,
+	has_streaming => 1);
+$node_standby->start;
+
+# Dummy table for the upcoming tests.
+$node_primary->safe_psql('postgres', 'checkpoint');
+$node_primary->safe_psql('postgres', 'CREATE TABLE prim_tab (a int);');
+
+# Register an injection point on the standby so as the follow-up
+# restart point will wait on it.
+$node_primary->safe_psql('postgres', 'CREATE EXTENSION injection_points;');
+# Wait until the extension has been created on the standby
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# Note that from this point the checkpointer will wait in the middle of
+# a restart point on the standby.
+$node_standby->safe_psql('postgres',
+	"SELECT injection_points_attach('CreateRestartPoint', 'wait');");
+
+# Execute a restart point on the standby, that we will now be waiting on.
+# This needs to be in the background.
+my $logstart = -s $node_standby->logfile;
+my $psql_session =
+  $node_standby->background_psql('postgres', on_error_stop => 0);
+$psql_session->query_until(
+	qr/starting_checkpoint/, q(
+   \echo starting_checkpoint
+   CHECKPOINT;
+));
+
+# Switch one WAL segment to make the previous restart point remove the
+# segment once the restart point completes.
+$node_primary->safe_psql('postgres', 'INSERT INTO prim_tab VALUES (1);');
+$node_primary->safe_psql('postgres', 'SELECT pg_switch_wal();');
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# Wait until the checkpointer is in the middle of the restart point
+# processing, relying on the custom wait event generated in the
+# wait callback used in the injection point previously attached.
+ok( $node_standby->poll_query_until(
+		'postgres',
+		qq[SELECT count(*) FROM pg_stat_activity
+           WHERE backend_type = 'checkpointer' AND wait_event = 'CreateRestartPoint' ;],
+		'1'),
+	'checkpointer is waiting in restart point'
+) or die "Timed out while waiting for checkpointer to run restart point";
+
+# Check the logs that the restart point has started on standby.  This is
+# optional, but let's be sure.
+ok( $node_standby->log_contains(
+		"restartpoint starting: immediate wait", $logstart),
+	"restartpoint has started");
+
+# Trigger promotion during the restart point.
+$node_primary->stop;
+$node_standby->promote;
+
+# Update the start position before waking up the checkpointer!
+$logstart = -s $node_standby->logfile;
+
+# Now wake up the checkpointer.
+$node_standby->safe_psql('postgres',
+	"SELECT injection_points_wakeup('CreateRestartPoint');");
+
+# Wait until the previous restart point completes on the newly-promoted
+# standby, checking the logs for that.
+my $checkpoint_complete = 0;
+foreach my $i (0 .. 10 * $PostgreSQL::Test::Utils::timeout_default)
+{
+	if ($node_standby->log_contains("restartpoint complete"), $logstart)
+	{
+		$checkpoint_complete = 1;
+		last;
+	}
+	usleep(100_000);
+}
+is($checkpoint_complete, 1, 'restart point has completed');
+
+# Kill with SIGKILL, forcing all the backends to restart.
+my $psql_timeout = IPC::Run::timer(3600);
+my ($killme_stdin, $killme_stdout, $killme_stderr) = ('', '', '');
+my $killme = IPC::Run::start(
+	[
+		'psql', '-XAtq', '-v', 'ON_ERROR_STOP=1', '-f', '-', '-d',
+		$node_standby->connstr('postgres')
+	],
+	'<',
+	\$killme_stdin,
+	'>',
+	\$killme_stdout,
+	'2>',
+	\$killme_stderr,
+	$psql_timeout);
+$killme_stdin .= q[
+SELECT pg_backend_pid();
+];
+$killme->pump until $killme_stdout =~ /[[:digit:]]+[\r\n]$/;
+my $pid = $killme_stdout;
+chomp($pid);
+$killme_stdout = '';
+$killme_stderr = '';
+
+my $ret = PostgreSQL::Test::Utils::system_log('pg_ctl', 'kill', 'KILL', $pid);
+is($ret, 0, 'killed process with KILL');
+
+# Wait until the server restarts, finish consuming output.
+$killme_stdin .= q[
+SELECT 1;
+];
+ok( pump_until(
+		$killme,
+		$psql_timeout,
+		\$killme_stderr,
+		qr/server closed the connection unexpectedly|connection to server was lost|could not send data to server/m
+	),
+	"psql query died successfully after SIGKILL");
+$killme->finish;
+
+# Wait till server finishes restarting
+$node_standby->poll_query_until('postgres', undef, '');
+
+# After recovery, the server should be able to start.
+my $stdout;
+my $stderr;
+($ret, $stdout, $stderr) = $node_standby->psql('postgres', 'select 1');
+is($ret, 0, "psql connect success");
+is($stdout, 1, "psql select 1");
+
+done_testing();
-- 
2.43.0

Attachment: signature.asc
Description: PGP signature

Reply via email to