From 0640e2e4a9ffb286e713189e37b104f442047a3b Mon Sep 17 00:00:00 2001
From: ed <eduard.stepanov@tantorlabs.ru>
Date: Tue, 10 Mar 2026 23:41:58 +0300
Subject: [PATCH] Throttle read stream look-ahead against local buffer pin
 limit

---
 src/backend/storage/aio/read_stream.c | 37 +++++++++++-
 src/test/regress/expected/temp.out    | 87 +++++++++++++++++++++++++++
 src/test/regress/sql/temp.sql         | 79 ++++++++++++++++++++++++
 3 files changed, 201 insertions(+), 2 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 031fde9f4c..0ac575e1a6 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -428,6 +428,8 @@ read_stream_start_pending_read(ReadStream *stream)
 static void
 read_stream_look_ahead(ReadStream *stream)
 {
+	int			buffer_limit;
+
 	/*
 	 * Allow amortizing the cost of submitting IO over multiple IOs. This
 	 * requires that we don't do any operations that could lead to a deadlock
@@ -437,8 +439,24 @@ read_stream_look_ahead(ReadStream *stream)
 	if (stream->batch_mode)
 		pgaio_enter_batchmode();
 
+	/*
+	 * Compute how many more buffers this backend is allowed to pin.
+	 * This is checked before each StartReadBuffers() call and also used
+	 * to throttle the outer loop so we don't queue more pending blocks
+	 * than we'll be able to pin.
+	 */
+	if (stream->temporary)
+		buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
+	else
+		buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
+	/* Always allow at least 1 if we hold no pins yet, for progress */
+	if (buffer_limit == 0 && stream->pinned_buffers == 0)
+		buffer_limit = 1;
+
+
 	while (stream->ios_in_progress < stream->max_ios &&
-		   stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
+		   stream->pinned_buffers + stream->pending_read_nblocks < stream->distance &&
+		   stream->pinned_buffers + stream->pending_read_nblocks < buffer_limit)
 	{
 		BlockNumber blocknum;
 		int16		buffer_index;
@@ -447,6 +465,13 @@ read_stream_look_ahead(ReadStream *stream)
 		if (stream->pending_read_nblocks == stream->io_combine_limit)
 		{
 			read_stream_start_pending_read(stream);
+			/* Re-check limit after pinning */
+			if (stream->temporary)
+				buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
+			else
+				buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
+			if (buffer_limit == 0 && stream->pinned_buffers == 0)
+				buffer_limit = 1;
 			continue;
 		}
 
@@ -488,6 +513,13 @@ read_stream_look_ahead(ReadStream *stream)
 					pgaio_exit_batchmode();
 				return;
 			}
+			/* Re-check limit after each pin */
+			if (stream->temporary)
+				buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
+			else
+				buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
+			if (buffer_limit == 0 && stream->pinned_buffers == 0)
+				buffer_limit = 1;
 		}
 
 		/* This is the start of a new pending read. */
@@ -509,7 +541,8 @@ read_stream_look_ahead(ReadStream *stream)
 		(stream->pending_read_nblocks == stream->io_combine_limit ||
 		 (stream->pending_read_nblocks >= stream->distance &&
 		  stream->pinned_buffers == 0) ||
-		 stream->distance == 0) &&
+		 stream->distance == 0 ||
+		 stream->pinned_buffers + stream->pending_read_nblocks >= buffer_limit) &&
 		stream->ios_in_progress < stream->max_ios)
 		read_stream_start_pending_read(stream);
 
diff --git a/src/test/regress/expected/temp.out b/src/test/regress/expected/temp.out
index 370361543b..be7f426fbe 100644
--- a/src/test/regress/expected/temp.out
+++ b/src/test/regress/expected/temp.out
@@ -566,3 +566,90 @@ SELECT count(*), max(a) max_a, min(a) min_a, max(cnt) max_cnt FROM test_temp;
 
 -- cleanup
 DROP FUNCTION test_temp_pin(int, int);
+-- Tests to verify the correct recovery from exhausting buffer pins
+\c
+SET temp_buffers = 100;
+CREATE TEMP TABLE tt1(val text);
+INSERT INTO tt1 SELECT repeat('x', 2000) FROM generate_series(1, 600);
+CREATE INDEX ON tt1(val);
+CREATE INDEX ON tt1((length(val)));
+CREATE TEMP TABLE tt2(val text);
+INSERT INTO tt2 SELECT val FROM tt1;
+SELECT count(*) FROM tt2;
+ count 
+-------
+   600
+(1 row)
+
+SELECT count(*) FROM tt2 WHERE val = repeat('x', 2000);
+ count 
+-------
+   600
+(1 row)
+
+DROP TABLE tt2;
+DROP TABLE tt1;
+-- Exhaust local buffers via many correlated subplans (row IN with correlated condition)
+-- verifies no "no empty local buffer available" error during heavy subquery execution
+\c
+SET effective_io_concurrency = 128;
+SET enable_mergejoin = off;
+CREATE TEMP TABLE tt_big(
+    id serial, f01 bytea, f02 bytea, f03 bytea,
+    f04 bytea, f05 boolean, val text
+);
+INSERT INTO tt_big
+SELECT g,
+    decode(md5(g::text), 'hex'),
+    decode(md5((g+1)::text), 'hex'),
+    decode(md5((g+2)::text), 'hex'),
+    decode(md5((g+3)::text), 'hex'),
+    (g % 2 = 0),
+    repeat('x', 500)
+FROM generate_series(1, 100000) g;
+CREATE TEMP TABLE tt_small(
+    s01 bytea, s02 bytea, s03 bytea, s04 boolean
+);
+INSERT INTO tt_small
+SELECT
+    decode(md5(g::text), 'hex'),
+    decode(md5((g+1)::text), 'hex'),
+    decode(md5((g+2)::text), 'hex'),
+    (g % 3 = 0)
+FROM generate_series(1, 500) g;
+CREATE TEMP TABLE tt_out(id int, f01 bytea, val numeric);
+-- Force correlated subplans (not semi-joins) by using row-value IN
+-- with correlated conditions, mimicking 1C query pattern
+INSERT INTO tt_out
+SELECT DISTINCT t1.id, t1.f01,
+    CAST(CASE
+        WHEN (t1.f02, t1.f03) IN
+            (SELECT t2.s02, t2.s03 FROM tt_small t2
+             WHERE t1.f01 = t2.s01 AND t2.s04 = true)
+        THEN 0
+        WHEN (t1.f03, t1.f04) IN
+            (SELECT t3.s02, t3.s03 FROM tt_small t3
+             WHERE t1.f01 = t3.s01 AND t3.s04 = false)
+        THEN 1
+        ELSE 2
+    END AS numeric)
+FROM tt_big t1
+LEFT JOIN tt_small s ON t1.f01 = s.s01
+WHERE CASE
+    WHEN (t1.f02, t1.f03) IN
+        (SELECT t4.s02, t4.s03 FROM tt_small t4
+         WHERE t1.f01 = t4.s01 AND t4.s04 = true)
+    THEN 0
+    WHEN (t1.f03, t1.f04) IN
+        (SELECT t5.s02, t5.s03 FROM tt_small t5
+         WHERE t1.f01 = t5.s01 AND t5.s04 = false)
+    THEN 1
+    ELSE 2
+END <> 0;
+SELECT count(*) FROM tt_out;
+ count 
+-------
+ 99834
+(1 row)
+
+DROP TABLE tt_out;
diff --git a/src/test/regress/sql/temp.sql b/src/test/regress/sql/temp.sql
index d50472ddce..e3063bab9a 100644
--- a/src/test/regress/sql/temp.sql
+++ b/src/test/regress/sql/temp.sql
@@ -418,3 +418,82 @@ SELECT count(*), max(a) max_a, min(a) min_a, max(cnt) max_cnt FROM test_temp;
 
 -- cleanup
 DROP FUNCTION test_temp_pin(int, int);
+
+-- Tests to verify the correct recovery from exhausting buffer pins
+\c
+SET temp_buffers = 100;
+CREATE TEMP TABLE tt1(val text);
+INSERT INTO tt1 SELECT repeat('x', 2000) FROM generate_series(1, 600);
+CREATE INDEX ON tt1(val);
+CREATE INDEX ON tt1((length(val)));
+CREATE TEMP TABLE tt2(val text);
+INSERT INTO tt2 SELECT val FROM tt1;
+SELECT count(*) FROM tt2;
+SELECT count(*) FROM tt2 WHERE val = repeat('x', 2000);
+DROP TABLE tt2;
+DROP TABLE tt1;
+
+-- Exhaust local buffers via many correlated subplans (row IN with correlated condition)
+-- verifies no "no empty local buffer available" error during heavy subquery execution
+\c
+SET effective_io_concurrency = 128;
+SET enable_mergejoin = off;
+
+CREATE TEMP TABLE tt_big(
+    id serial, f01 bytea, f02 bytea, f03 bytea,
+    f04 bytea, f05 boolean, val text
+);
+INSERT INTO tt_big
+SELECT g,
+    decode(md5(g::text), 'hex'),
+    decode(md5((g+1)::text), 'hex'),
+    decode(md5((g+2)::text), 'hex'),
+    decode(md5((g+3)::text), 'hex'),
+    (g % 2 = 0),
+    repeat('x', 500)
+FROM generate_series(1, 100000) g;
+
+CREATE TEMP TABLE tt_small(
+    s01 bytea, s02 bytea, s03 bytea, s04 boolean
+);
+INSERT INTO tt_small
+SELECT
+    decode(md5(g::text), 'hex'),
+    decode(md5((g+1)::text), 'hex'),
+    decode(md5((g+2)::text), 'hex'),
+    (g % 3 = 0)
+FROM generate_series(1, 500) g;
+
+CREATE TEMP TABLE tt_out(id int, f01 bytea, val numeric);
+
+-- Force correlated subplans (not semi-joins) by using row-value IN
+-- with correlated conditions, mimicking 1C query pattern
+INSERT INTO tt_out
+SELECT DISTINCT t1.id, t1.f01,
+    CAST(CASE
+        WHEN (t1.f02, t1.f03) IN
+            (SELECT t2.s02, t2.s03 FROM tt_small t2
+             WHERE t1.f01 = t2.s01 AND t2.s04 = true)
+        THEN 0
+        WHEN (t1.f03, t1.f04) IN
+            (SELECT t3.s02, t3.s03 FROM tt_small t3
+             WHERE t1.f01 = t3.s01 AND t3.s04 = false)
+        THEN 1
+        ELSE 2
+    END AS numeric)
+FROM tt_big t1
+LEFT JOIN tt_small s ON t1.f01 = s.s01
+WHERE CASE
+    WHEN (t1.f02, t1.f03) IN
+        (SELECT t4.s02, t4.s03 FROM tt_small t4
+         WHERE t1.f01 = t4.s01 AND t4.s04 = true)
+    THEN 0
+    WHEN (t1.f03, t1.f04) IN
+        (SELECT t5.s02, t5.s03 FROM tt_small t5
+         WHERE t1.f01 = t5.s01 AND t5.s04 = false)
+    THEN 1
+    ELSE 2
+END <> 0;
+
+SELECT count(*) FROM tt_out;
+DROP TABLE tt_out;
-- 
2.43.0

