From 8d308dc76f160b5c64063dabfd27f61b29dd88c1 Mon Sep 17 00:00:00 2001
From: Anthonin Bonnefoy <anthonin.bonnefoy@datadoghq.com>
Date: Mon, 13 Jan 2025 09:41:43 +0100
Subject: Accept recovery conflict interrupt on blocked writing

Previously, all interrupts except process dying were ignored while a
process was blocked writing to a socket. If the connection to the client
was broken (no clean FIN nor RST), a process sending results to the
client could be stuck for 924s until the TCP retransmission timeout is
reached. During this time, it was possible for the process to conflict
with recovery: For example, the process returning results can have a
conflicting buffer pin.

To avoid blocking recovery for an extended period of time, this patch
changes client write interrupts by handling recovery conflict interrupts
instead of ignoring them. Since the interrupt happens while we're likely
to have partially written results on the socket, there's no easy way to
keep protocol sync so the session needs to be terminated.
---
 src/backend/tcop/postgres.c                  |  41 +++-
 src/test/recovery/t/031_recovery_conflict.pl | 202 ++++++++++++++++---
 2 files changed, 216 insertions(+), 27 deletions(-)

diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 5655348a2e2..95bad579d62 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -134,6 +134,12 @@ static bool xact_started = false;
  */
 static bool DoingCommandRead = false;
 
+/*
+ * Flag to indicate that we are doing a blocking write as the socket doesn't have
+ * the necessary space
+ */
+static bool DoingBlockingWrite = false;
+
 /*
  * Flags to implement skip-till-Sync-after-error behavior for messages of
  * the extended query protocol.
@@ -184,6 +190,7 @@ static void drop_unnamed_stmt(void);
 static void log_disconnections(int code, Datum arg);
 static void enable_statement_timeout(void);
 static void disable_statement_timeout(void);
+static void ProcessRecoveryConflictInterrupts(void);
 
 
 /* ----------------------------------------------------------------
@@ -581,6 +588,27 @@ ProcessClientWriteInterrupt(bool blocked)
 		else
 			SetLatch(MyLatch);
 	}
+	else if (blocked && RecoveryConflictPending)
+	{
+		/*
+		 * We're conflicting with recovery while being blocked writing. This
+		 * can happen when the process is returning results and no ACK is
+		 * received (broken connection, client overloaded...), eventually
+		 * saturating the socket buffer while the process holds a page pin
+		 * that eventually conflict with recovery.
+		 */
+		if (InterruptHoldoffCount == 0 && CritSectionCount == 0)
+		{
+			if (whereToSendOutput == DestRemote)
+				whereToSendOutput = DestNone;
+
+			/*
+			 * Set blocking write flag to terminate the session as it is not
+			 * possible to keep protocol sync
+			 */
+			ProcessRecoveryConflictInterrupts();
+		}
+	}
 
 	errno = save_errno;
 }
@@ -3167,10 +3195,17 @@ ProcessRecoveryConflictInterrupt(ProcSignalReason reason)
 				 * If a recovery conflict happens while we are waiting for
 				 * input from the client, the client is presumably just
 				 * sitting idle in a transaction, preventing recovery from
-				 * making progress.  We'll drop through to the FATAL case
-				 * below to dislodge it, in that case.
+				 * making progress.
+				 *
+				 * Similarly, if we are in a blocking write, we could be
+				 * waiting for a TCP ACK from the client to make the socket
+				 * writable but the client may not be here anymore, leaving us
+				 * stuck in TCP retransmission.
+				 *
+				 * In those cases, we'll drop through to the FATAL case below
+				 * to dislodge it.
 				 */
-				if (!DoingCommandRead)
+				if (!DoingCommandRead && !DoingBlockingWrite)
 				{
 					/* Avoid losing sync in the FE/BE protocol. */
 					if (QueryCancelHoldoffCount != 0)
diff --git a/src/test/recovery/t/031_recovery_conflict.pl b/src/test/recovery/t/031_recovery_conflict.pl
index 028b0b5f0e1..55a28196b22 100644
--- a/src/test/recovery/t/031_recovery_conflict.pl
+++ b/src/test/recovery/t/031_recovery_conflict.pl
@@ -74,25 +74,10 @@ my $expected_conflicts = 0;
 
 ## RECOVERY CONFLICT 1: Buffer pin conflict
 my $sect = "buffer pin conflict";
-$expected_conflicts++;
-
-# Aborted INSERT on primary that will be cleaned up by vacuum. Has to be old
-# enough so that there's not a snapshot conflict before the buffer pin
-# conflict.
-
-$node_primary->safe_psql(
-	$test_db,
-	qq[
-	BEGIN;
-	INSERT INTO $table1 VALUES (1,0);
-	ROLLBACK;
-	-- ensure flush, rollback doesn't do so
-	BEGIN; LOCK $table1; COMMIT;
-	]);
-
-$node_primary->wait_for_replay_catchup($node_standby);
-
 my $cursor1 = "test_recovery_conflict_cursor";
+$expected_conflicts++;
+
+setup_bufferpin_conflict();
 
 # DECLARE and use a cursor on standby, causing buffer with the only block of
 # the relation to be pinned on the standby
@@ -120,8 +105,48 @@ $node_primary->wait_for_replay_catchup($node_standby);
 
 check_conflict_log("User was holding shared buffer pin for too long");
 $psql_standby->reconnect_and_clear();
-check_conflict_stat("bufferpin");
+check_conflict_stat("bufferpin", 1);
 
+SKIP:
+{
+	skip "this test requires working raw_connect()"
+		unless $node_standby->raw_connect_works();
+
+	## RECOVERY CONFLICT 1 bis: Buffer pin conflict with query in ClientWrite
+	$sect = "buffer pin conflict (ClientWrite)";
+	$expected_conflicts++;
+
+	setup_bufferpin_conflict();
+
+	# Start the conflicting session
+	my $sock = $node_standby->raw_connect();
+	send_startup_packet($sock, $ENV{"USER"}, $test_db, "raw_tcp");
+	# Read until Ready For Query message
+	read_message_payload($sock, 'Z');
+	my $pid = read_session_pid($sock);
+
+	# We want the session to pin table1's block while staying in a ClientWrite
+	# state. To achieve that, we ask the server enough rows to saturate the
+	# buffer with the client not acknowledging those results.
+	send_query($sock, qq[
+			BEGIN;
+			DECLARE $cursor1 CURSOR FOR SELECT b FROM $table1;
+			FETCH FORWARD FROM $cursor1;
+			SELECT generate_series(1, 100000);
+		]);
+
+	# Check that our session is in ClientWrite
+	check_session_wait_event($pid, 'ClientWrite');
+
+	# VACUUM FREEZE on the primary
+	$node_primary->safe_psql($test_db, qq[VACUUM FREEZE $table1;]);
+
+	check_conflict_log("User was holding shared buffer pin for too long");
+	check_conflict_stat("bufferpin", 2);
+
+	# The conflicting session was triggered, consume everything until the socket is closed.
+	consume_until_closed($sock);
+}
 
 ## RECOVERY CONFLICT 2: Snapshot conflict
 $sect = "snapshot conflict";
@@ -153,7 +178,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 check_conflict_log(
 	"User query might have needed to see row versions that must be removed");
 $psql_standby->reconnect_and_clear();
-check_conflict_stat("snapshot");
+check_conflict_stat("snapshot", 1);
 
 
 ## RECOVERY CONFLICT 3: Lock conflict
@@ -176,7 +201,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 
 check_conflict_log("User was holding a relation lock for too long");
 $psql_standby->reconnect_and_clear();
-check_conflict_stat("lock");
+check_conflict_stat("lock", 1);
 
 
 ## RECOVERY CONFLICT 4: Tablespace conflict
@@ -206,7 +231,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 check_conflict_log(
 	"User was or might have been using tablespace that must be dropped");
 $psql_standby->reconnect_and_clear();
-check_conflict_stat("tablespace");
+check_conflict_stat("tablespace", 1);
 
 
 ## RECOVERY CONFLICT 5: Deadlock
@@ -271,7 +296,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 
 check_conflict_log("User transaction caused buffer deadlock with recovery.");
 $psql_standby->reconnect_and_clear();
-check_conflict_stat("deadlock");
+check_conflict_stat("deadlock", 1);
 
 # clean up for next tests
 $node_primary->safe_psql($test_db, qq[ROLLBACK PREPARED 'lock';]);
@@ -310,6 +335,22 @@ $node_primary->stop();
 
 done_testing();
 
+sub setup_bufferpin_conflict
+{
+	# Aborted INSERT on primary that will be cleaned up by vacuum. Has to be old
+	# enough so that there's not a snapshot conflict before the buffer pin
+	# conflict.
+	$node_primary->safe_psql($test_db,
+		qq[BEGIN;
+			INSERT INTO $table1 VALUES (1,0);
+			ROLLBACK;
+			-- ensure flush, rollback doesn't do so
+			BEGIN; LOCK $table1; COMMIT;]
+	);
+
+	$node_primary->wait_for_replay_catchup($node_standby);
+}
+
 sub check_conflict_log
 {
 	my $message = shift;
@@ -325,9 +366,122 @@ sub check_conflict_log
 sub check_conflict_stat
 {
 	my $conflict_type = shift;
+	my $expected_count = shift;
 	my $count = $node_standby->safe_psql($test_db,
 		qq[SELECT confl_$conflict_type FROM pg_stat_database_conflicts WHERE datname='$test_db';]
 	);
 
-	is($count, 1, "$sect: stats show conflict on standby");
+	is($count, $expected_count, "$sect: stats show $count conflicts on standby (expected $expected_count)");
+}
+
+sub check_session_wait_event
+{
+	my $pid = shift;
+	my $expected_wait_event = shift;
+	my $wait_event = $node_standby->safe_psql($test_db,
+		qq[SELECT wait_event FROM pg_stat_activity WHERE pid=$pid;]
+	);
+
+	is($wait_event, $expected_wait_event, "$sect: session with pid $pid has $wait_event wait_event");
+}
+
+sub send_startup_packet
+{
+	my ($sock, $user, $db, $app_name) = @_;
+	my %attributes = ( user => $user,
+		database => $db,
+		application_name => $app_name);
+	# Startup packet contains:
+	# Packet length: 4 bytes
+	# Major proto version: 2 bytes
+	# Minor proto version: 2 bytes
+	# Multiple Parameters:
+	#     Parameter Name (null terminated string)
+	#     Parameter Value (null terminated string)
+	# Ending null character
+	my $pack_template = "Nnn(Z*Z*)" . keys(%attributes) . 'x';
+	# Packet length, proto and final null character
+	my $total_length = 9;
+
+	for(keys %attributes){
+		my $key_length = length($_) + 1;
+		my $value_length = length($attributes{$_}) + 1;
+		$total_length += $key_length + $value_length;
+	}
+
+	my $startup_packet = pack($pack_template, $total_length, 3, 0, %attributes);
+	$sock->send($startup_packet);
+}
+
+sub send_query
+{
+	my ($sock, $query) = @_;
+
+	# Query message contains:
+	# Message type 'Q' (1 byte)
+	# Message length not including message type (4 bytes)
+	# Null terminated string
+	my $query_packet = pack("CNZ*", ord('Q'), length($query) + 5, $query);
+	note "Sending following query through raw_tcp: $query";
+	$sock->send($query_packet);
+}
+
+sub read_session_pid
+{
+	my ($sock) = @_;
+
+	send_query($sock, "select pg_backend_pid()");
+	my $data_row = read_message_payload($sock, 'D');
+	# We should have only one field and one column with the
+	# pid representing the rest of the payload outside of the
+	# data row header
+	my ($field_count, $column_length, $pid) = unpack("nNA*", $data_row);
+	note "raw_tcp session found pid: $pid";
+	# Consume until Ready for query is reached
+	read_message_payload($sock, 'Z');
+	return $pid;
+}
+
+sub consume_until_closed
+{
+	my ($sock) = @_;
+	my $received = "";
+
+	while (1) {
+		$sock->recv($received, 64*1024);
+		if ($received eq "") {
+			# Closed socket was detected
+			return;
+		}
+	}
+}
+
+sub read_message_payload
+{
+	my ($sock, $message_type) = @_;
+
+	note "Reading until message of type $message_type is found";
+	while (1)
+	{
+		my $header = "";
+		$sock->recv($header, 5);
+		my ($type, $length) = unpack("CN", $header);
+		my $type_char = chr($type);
+		note "Reading message of type $type_char and length $length";
+
+		# Need to remove message's length from the payload's length
+		$length -= 4;
+		if ($length < 0) {
+			diag("read_message_payload Unexpected payload length $length");
+			return;
+		}
+		my $payload = "";
+		$sock->recv($payload, $length);
+
+		if ($type_char eq $message_type) {
+			# We've found the desired message type
+			note "Found expected message type";
+			return $payload;
+		}
+	}
 }
-- 
2.39.5 (Apple Git-154)

