From 8c310e291ad80370fb55ca55515b31d284f2b2aa Mon Sep 17 00:00:00 2001
From: Anthonin Bonnefoy <anthonin.bonnefoy@datadoghq.com>
Date: Fri, 17 Jan 2025 15:20:52 +0100
Subject: Accept recovery conflict interrupt on blocked writing

Previously, all interrupts except process dying were ignored while a
process was blocked writing to a socket. If the connection to the client
was broken (no clean FIN nor RST), a process sending results to the
client could be stuck for 924s until the TCP retransmission timeout is
reached. During this time, it was possible for the process to conflict
with recovery: For example, the process returning results can have a
conflicting buffer pin.

To avoid blocking recovery for an extended period of time, this patch
changes client write interrupts by handling recovery conflict interrupts
instead of ignoring them. Since the interrupt happens while we're likely
to have partially written results on the socket, there's no easy way to
keep protocol sync so the session needs to be terminated.

In addition, a blocked write could happen while interrupts are blocked.
For example, we're sending an error message to the client and eventually
saturate the socket buffer. In this case, we can't process the interrupt
in ProcessClientWriteInterrupt. Instead, we stop retrying writes and
trigger a write error, allowing to close the connection and termiate the
session.
---
 src/backend/libpq/be-secure.c                |  29 +++-
 src/backend/tcop/postgres.c                  | 163 +++++++++++++++---
 src/include/tcop/tcopprot.h                  |   2 +-
 src/test/recovery/t/031_recovery_conflict.pl | 168 ++++++++++++++++---
 4 files changed, 303 insertions(+), 59 deletions(-)

diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c
index 3b4f80146be..3b6089464a1 100644
--- a/src/backend/libpq/be-secure.c
+++ b/src/backend/libpq/be-secure.c
@@ -306,6 +306,7 @@ secure_write(Port *port, void *ptr, size_t len)
 {
 	ssize_t		n;
 	int			waitfor;
+	bool		retryable = true;
 
 	/* Deal with any already-pending interrupt condition. */
 	ProcessClientWriteInterrupt(false);
@@ -353,14 +354,28 @@ retry:
 		if (event.events & WL_LATCH_SET)
 		{
 			ResetLatch(MyLatch);
-			ProcessClientWriteInterrupt(true);
-
-			/*
-			 * We'll retry the write. Most likely it will return immediately
-			 * because there's still no buffer space available, and we'll wait
-			 * for the socket to become ready again.
-			 */
+			retryable = ProcessClientWriteInterrupt(true);
+			if (!retryable)
+			{
+				/*
+				 * The blocking write is interfering with recovery but
+				 * ProcessClientWriteInterrupt can't process interrupts. This
+				 * can happen when trying to send error messages to the client
+				 * and saturating the buffer. To dislodge ourself, we give up
+				 * retrying with a socket error. This will close the
+				 * connection with a "connection to client lost" error during
+				 * the next CHECK_FOR_INTERRUPTS.
+				 */
+				errno = ENOBUFS;
+				return -1;
+			}
 		}
+
+		/*
+		 * We'll retry the write. Most likely it will return immediately
+		 * because there's still no buffer space available, and we'll wait for
+		 * the socket to become ready again.
+		 */
 		goto retry;
 	}
 
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 5655348a2e2..24dfb638b5d 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -184,6 +184,9 @@ static void drop_unnamed_stmt(void);
 static void log_disconnections(int code, Datum arg);
 static void enable_statement_timeout(void);
 static void disable_statement_timeout(void);
+static void ProcessRecoveryConflictInterrupts(void);
+static bool CheckBlockedWriteConflictInterrupts(ProcSignalReason *triggered_reason);
+static int	errdetail_recovery_conflict(ProcSignalReason reason);
 
 
 /* ----------------------------------------------------------------
@@ -540,49 +543,97 @@ ProcessClientReadInterrupt(bool blocked)
  * 'blocked' is true if no data could be written and we plan to retry,
  * false if about to write or done writing.
  *
+ * Return true if write is retryable.
+ *
  * Must preserve errno!
  */
-void
+bool
 ProcessClientWriteInterrupt(bool blocked)
 {
 	int			save_errno = errno;
+	bool		retryable = true;
+	bool		can_process_interrupts = InterruptHoldoffCount == 0 && CritSectionCount == 0;
+	bool		conflict_with_recovery = false;
+	ProcSignalReason reason;
 
-	if (ProcDiePending)
+	if (blocked && RecoveryConflictPending)
+	{
+		conflict_with_recovery = CheckBlockedWriteConflictInterrupts(&reason);
+	}
+
+	if (!blocked && ProcDiePending)
+	{
+		/*
+		 * We're dying and we haven't tried to write yet, make sure the
+		 * process latch is set, so that if the write would block then we'll
+		 * come back here and die.  If we're done writing, also make sure the
+		 * process latch is set, as we might've undesirably cleared it while
+		 * writing.
+		 */
+		SetLatch(MyLatch);
+	}
+	else if (blocked && !can_process_interrupts && conflict_with_recovery)
+	{
+		/*
+		 * We have a blocking write conflicting with recovery but interrupts
+		 * can't be processed. This can happen when sending errors to the
+		 * client and saturating the socket buffer. This will make the
+		 * blocking write fail, triggering a "could not send data to client"
+		 * error and closing the socket. Since the socket error will supersede
+		 * the recovery conflict, we need to log the recovery conflict now.
+		 */
+		pgstat_report_recovery_conflict(reason);
+		ereport(COMMERROR,
+				(errcode(reason == PROCSIG_RECOVERY_CONFLICT_DATABASE ?
+						 ERRCODE_DATABASE_DROPPED :
+						 ERRCODE_T_R_SERIALIZATION_FAILURE),
+				 errmsg("Interrupting blocking write due to conflict with recovery"),
+				 errdetail_recovery_conflict(reason)));
+
+		retryable = false;
+	}
+	else if (blocked && can_process_interrupts && (ProcDiePending || conflict_with_recovery))
 	{
 		/*
-		 * We're dying.  If it's not possible to write, then we should handle
-		 * that immediately, else a stuck client could indefinitely delay our
-		 * response to the signal.  If we haven't tried to write yet, make
-		 * sure the process latch is set, so that if the write would block
-		 * then we'll come back here and die.  If we're done writing, also
-		 * make sure the process latch is set, as we might've undesirably
-		 * cleared it while writing.
+		 * Don't mess with whereToSendOutput if ProcessInterrupts wouldn't
+		 * service ProcDiePending.
+		 *
+		 * We don't want to send the client the error message, as a) that
+		 * would possibly block again, and b) it would likely lead to loss of
+		 * protocol sync because we may have already sent a partial protocol
+		 * message.
 		 */
-		if (blocked)
+		if (whereToSendOutput == DestRemote)
+			whereToSendOutput = DestNone;
+		if (ProcDiePending)
 		{
 			/*
-			 * Don't mess with whereToSendOutput if ProcessInterrupts wouldn't
-			 * service ProcDiePending.
+			 * We're dying and it's not possible to write so we should handle
+			 * that immediately, else a stuck client could indefinitely delay
+			 * our response to the signal.
 			 */
-			if (InterruptHoldoffCount == 0 && CritSectionCount == 0)
-			{
-				/*
-				 * We don't want to send the client the error message, as a)
-				 * that would possibly block again, and b) it would likely
-				 * lead to loss of protocol sync because we may have already
-				 * sent a partial protocol message.
-				 */
-				if (whereToSendOutput == DestRemote)
-					whereToSendOutput = DestNone;
-
-				CHECK_FOR_INTERRUPTS();
-			}
+			CHECK_FOR_INTERRUPTS();
 		}
 		else
-			SetLatch(MyLatch);
+		{
+			/*
+			 * As we're in a blocked write, we can't keep protocol sync.
+			 * Upgrade
+			 */
+			ExitOnAnyError = true;
+			ProcessRecoveryConflictInterrupts();
+		}
+
+		/*
+		 * Both ProcDiePending and ProcessRecoveryConflictInterrupts should
+		 * exit as interrupts can be processed and we checked that we did
+		 * conflict with recovery
+		 */
+		pg_unreachable();
 	}
 
 	errno = save_errno;
+	return retryable;
 }
 
 /*
@@ -3227,6 +3278,66 @@ ProcessRecoveryConflictInterrupt(ProcSignalReason reason)
 	}
 }
 
+/*
+ * Check if a blocked write conflict with a specific conflict reason.
+ */
+static bool
+CheckBlockedWriteConflictInterrupt(ProcSignalReason reason)
+{
+	switch (reason)
+	{
+		case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
+			/* Blocked writes can't be waiting for a lock */
+			return false;
+		case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN:
+			/* Do we block the startup process? */
+			return HoldingBufferPinThatDelaysRecovery();
+		case PROCSIG_RECOVERY_CONFLICT_LOCK:
+		case PROCSIG_RECOVERY_CONFLICT_TABLESPACE:
+		case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
+			/* We can only block if we have an existing transaction */
+			return IsTransactionOrTransactionBlock();
+		case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+
+			/*
+			 * Aborted subtransaction and top level aborted transaction can be
+			 * ignored
+			 */
+			return IsSubTransaction() || !IsAbortedTransactionBlockState();
+		case PROCSIG_RECOVERY_CONFLICT_DATABASE:
+			/* This will always conflict */
+			return true;
+		default:
+			return true;
+	}
+}
+
+/*
+ * Check if the blocked write conflict with any conflict reason.
+ *
+ * If log_conflict is true, the recovery conflict will be logged and reported in the stats.
+ */
+static bool
+CheckBlockedWriteConflictInterrupts(ProcSignalReason *triggered_reason)
+{
+	Assert(RecoveryConflictPending);
+
+	for (ProcSignalReason reason = PROCSIG_RECOVERY_CONFLICT_FIRST;
+		 reason <= PROCSIG_RECOVERY_CONFLICT_LAST;
+		 reason++)
+	{
+		if (RecoveryConflictPendingReasons[reason])
+		{
+			if (CheckBlockedWriteConflictInterrupt(reason))
+			{
+				*triggered_reason = reason;
+				return true;
+			}
+		}
+	}
+	return false;
+}
+
 /*
  * Check each possible recovery conflict reason.
  */
diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h
index a62367f7793..63b0442903d 100644
--- a/src/include/tcop/tcopprot.h
+++ b/src/include/tcop/tcopprot.h
@@ -74,7 +74,7 @@ extern void StatementCancelHandler(SIGNAL_ARGS);
 extern void FloatExceptionHandler(SIGNAL_ARGS) pg_attribute_noreturn();
 extern void HandleRecoveryConflictInterrupt(ProcSignalReason reason);
 extern void ProcessClientReadInterrupt(bool blocked);
-extern void ProcessClientWriteInterrupt(bool blocked);
+extern bool ProcessClientWriteInterrupt(bool blocked);
 
 extern void process_postgres_switches(int argc, char *argv[],
 									  GucContext ctx, const char **dbname);
diff --git a/src/test/recovery/t/031_recovery_conflict.pl b/src/test/recovery/t/031_recovery_conflict.pl
index 028b0b5f0e1..0f272f149b9 100644
--- a/src/test/recovery/t/031_recovery_conflict.pl
+++ b/src/test/recovery/t/031_recovery_conflict.pl
@@ -7,8 +7,10 @@
 use strict;
 use warnings FATAL => 'all';
 use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::PgProto;
 use PostgreSQL::Test::Utils;
 use Test::More;
+use Time::HiRes qw(usleep);
 
 
 # Set up nodes
@@ -74,25 +76,10 @@ my $expected_conflicts = 0;
 
 ## RECOVERY CONFLICT 1: Buffer pin conflict
 my $sect = "buffer pin conflict";
-$expected_conflicts++;
-
-# Aborted INSERT on primary that will be cleaned up by vacuum. Has to be old
-# enough so that there's not a snapshot conflict before the buffer pin
-# conflict.
-
-$node_primary->safe_psql(
-	$test_db,
-	qq[
-	BEGIN;
-	INSERT INTO $table1 VALUES (1,0);
-	ROLLBACK;
-	-- ensure flush, rollback doesn't do so
-	BEGIN; LOCK $table1; COMMIT;
-	]);
-
-$node_primary->wait_for_replay_catchup($node_standby);
-
 my $cursor1 = "test_recovery_conflict_cursor";
+$expected_conflicts++;
+
+setup_bufferpin_conflict();
 
 # DECLARE and use a cursor on standby, causing buffer with the only block of
 # the relation to be pinned on the standby
@@ -120,8 +107,94 @@ $node_primary->wait_for_replay_catchup($node_standby);
 
 check_conflict_log("User was holding shared buffer pin for too long");
 $psql_standby->reconnect_and_clear();
-check_conflict_stat("bufferpin");
+check_conflict_stat("bufferpin", 1);
 
+SKIP:
+{
+	skip "those tests require working raw_connect()"
+		unless $node_standby->raw_connect_works();
+
+	## RECOVERY CONFLICT 1 bis: Buffer pin conflict with conflicting query in ClientWrite
+	$sect = "buffer pin conflict (ClientWrite)";
+	$expected_conflicts++;
+
+	setup_bufferpin_conflict();
+
+	# The simplest way to get the user to use for the startup packet
+	# is to grab it from an existing psql session
+	my $user = $psql_standby->query_safe("SELECT current_user");
+
+	# Start the conflicting session
+	my $sock = $node_standby->raw_connect();
+	my $pgproto = PostgreSQL::Test::PgProto->new($sock);
+	my %parameters = ( user => $user, database => $test_db, application_name => $ENV{PGAPPNAME} );
+
+	$pgproto->send_startup_message(\%parameters);
+	# Read until Ready For Query message
+	$pgproto->read_until_message('Z');
+	my $pid = $pgproto->read_session_pid();
+
+	# We want the session to pin table1's block while staying in a ClientWrite
+	# state. To achieve that, we ask the server enough rows to saturate the
+	# buffer with the client not read those results.
+	$pgproto->send_simple_query(qq[
+			BEGIN;
+			DECLARE $cursor1 CURSOR FOR SELECT b FROM $table1;
+			FETCH FORWARD FROM $cursor1;
+			SELECT generate_series(1, 100000);
+		]);
+
+	# Check that our session is in ClientWrite
+	wait_for_wait_event($pid, 'ClientWrite');
+
+	# VACUUM FREEZE on the primary
+	$node_primary->safe_psql($test_db, qq[VACUUM FREEZE $table1;]);
+
+	check_conflict_log("User was holding shared buffer pin for too long");
+
+	# The conflicting session should be terminated, consume everything until the socket is closed.
+	$pgproto->wait_until_closed();
+
+	check_conflict_stat("bufferpin", 2);
+
+	## RECOVERY CONFLICT 1 ter: Buffer pin conflict with conflicting query reports
+	## an error and saturate socket buffer
+	$sect = "buffer pin conflict (Error report)";
+	$expected_conflicts++;
+
+	setup_bufferpin_conflict();
+
+	# Start the conflicting session
+	$sock = $node_standby->raw_connect();
+	$pgproto = PostgreSQL::Test::PgProto->new($sock);
+
+	$pgproto->send_startup_message(\%parameters);
+	# Read until Ready For Query message
+	$pgproto->read_until_message('Z');
+	$pid = $pgproto->read_session_pid();
+
+	# We want the session to pin table1's block while staying in a ClientWrite
+	# state and reporting an error to the client.
+	$pgproto->send_simple_query(qq[
+			BEGIN;
+			DECLARE $cursor1 CURSOR FOR SELECT b FROM $table1;
+			FETCH FORWARD FROM $cursor1;
+			DO \$\$BEGIN RAISE 'endless scream: %', repeat('a', 2000000); END;\$\$;
+		]);
+
+	# Check that our session is in ClientWrite
+	wait_for_wait_event($pid, 'ClientWrite');
+
+	# VACUUM FREEZE on the primary
+	$node_primary->safe_psql($test_db, qq[VACUUM FREEZE $table1;]);
+
+	check_conflict_log("User was holding shared buffer pin for too long");
+
+	# The conflicting session should be terminated, consume everything until the socket is closed.
+	$pgproto->wait_until_closed();
+
+	check_conflict_stat("bufferpin", 3);
+}
 
 ## RECOVERY CONFLICT 2: Snapshot conflict
 $sect = "snapshot conflict";
@@ -153,7 +226,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 check_conflict_log(
 	"User query might have needed to see row versions that must be removed");
 $psql_standby->reconnect_and_clear();
-check_conflict_stat("snapshot");
+check_conflict_stat("snapshot", 1);
 
 
 ## RECOVERY CONFLICT 3: Lock conflict
@@ -176,7 +249,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 
 check_conflict_log("User was holding a relation lock for too long");
 $psql_standby->reconnect_and_clear();
-check_conflict_stat("lock");
+check_conflict_stat("lock", 1);
 
 
 ## RECOVERY CONFLICT 4: Tablespace conflict
@@ -206,7 +279,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 check_conflict_log(
 	"User was or might have been using tablespace that must be dropped");
 $psql_standby->reconnect_and_clear();
-check_conflict_stat("tablespace");
+check_conflict_stat("tablespace", 1);
 
 
 ## RECOVERY CONFLICT 5: Deadlock
@@ -271,7 +344,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 
 check_conflict_log("User transaction caused buffer deadlock with recovery.");
 $psql_standby->reconnect_and_clear();
-check_conflict_stat("deadlock");
+check_conflict_stat("deadlock", 1);
 
 # clean up for next tests
 $node_primary->safe_psql($test_db, qq[ROLLBACK PREPARED 'lock';]);
@@ -310,6 +383,22 @@ $node_primary->stop();
 
 done_testing();
 
+sub setup_bufferpin_conflict
+{
+	# Aborted INSERT on primary that will be cleaned up by vacuum. Has to be old
+	# enough so that there's not a snapshot conflict before the buffer pin
+	# conflict.
+	$node_primary->safe_psql($test_db,
+		qq[BEGIN;
+			INSERT INTO $table1 VALUES (1,0);
+			ROLLBACK;
+			-- ensure flush, rollback doesn't do so
+			BEGIN; LOCK $table1; COMMIT;]
+	);
+
+	$node_primary->wait_for_replay_catchup($node_standby);
+}
+
 sub check_conflict_log
 {
 	my $message = shift;
@@ -318,16 +407,45 @@ sub check_conflict_log
 	$log_location = $node_standby->wait_for_log(qr/$message/, $log_location);
 
 	cmp_ok($log_location, '>', $old_log_location,
-		"$sect: logfile contains terminated connection due to recovery conflict"
+		"$sect: logfile contains '$message'"
 	);
 }
 
 sub check_conflict_stat
 {
 	my $conflict_type = shift;
+	my $expected_count = shift;
 	my $count = $node_standby->safe_psql($test_db,
 		qq[SELECT confl_$conflict_type FROM pg_stat_database_conflicts WHERE datname='$test_db';]
 	);
 
-	is($count, 1, "$sect: stats show conflict on standby");
+	is($count, $expected_count, "$sect: stats show $count conflicts on standby (expected $expected_count)");
+}
+
+sub wait_for_wait_event
+{
+	my $pid = shift;
+	my $expected_wait_event = shift;
+
+	my $max_attempts = 10 * $PostgreSQL::Test::Utils::timeout_default;
+	my $attempts = 0;
+	my $wait_event = "";
+
+	while ($attempts < $max_attempts)
+	{
+		$wait_event = $node_standby->safe_psql($test_db,
+			qq[SELECT wait_event FROM pg_stat_activity WHERE pid=$pid;]
+		);
+
+		if ($wait_event eq $expected_wait_event) {
+			last;
+		}
+
+		# Wait 0.1 second before retrying.
+		usleep(100_000);
+
+		$attempts++;
+	}
+
+	is($wait_event, $expected_wait_event, "$sect: session with pid $pid has wait_event $wait_event");
 }
-- 
2.39.5 (Apple Git-154)

