Hi,

On Mon, 29 Apr 2024 at 18:41, Nazir Bilal Yavuz <byavu...@gmail.com> wrote:
>
> Hi,
>
> On Mon, 8 Apr 2024 at 04:21, Thomas Munro <thomas.mu...@gmail.com> wrote:
> >
> > Pushed.  Thanks Bilal and reviewers!
>
> I wanted to discuss what will happen to this patch now that
> 27bc1772fc8 is reverted. I am continuing this thread but I can create
> another thread if you prefer so.

041b96802ef is discussed in the 'Table AM Interface Enhancements'
thread [1]. The main problems discussed about this commit is that the
read stream API is not pushed to the heap-specific code and, because
of that, the other AM implementations need to use read streams. To
push read stream API to the heap-specific code, it is pretty much
required to pass BufferAccessStrategy and BlockSamplerData to the
initscan().

I am sharing the alternative version of this patch. The first patch
just reverts 041b96802ef and the second patch is the alternative
version.

In this alternative version, the read stream API is not pushed to the
heap-specific code, but it is controlled by the heap-specific code.
The SO_USE_READ_STREAMS_IN_ANALYZE flag is introduced and set in the
heap-specific code if the scan type is 'ANALYZE'. This flag is used to
decide whether streaming API in ANALYZE will be used or not. If this
flag is set, this means heap AMs and read stream API will be used. If
it is not set, this means heap AMs will not be used and code falls
back to the version before read streams.

Pros of the alternative version:

* The existing AM implementations other than heap AM can continue to
use their AMs without any change.
* AM implementations other than heap do not need to use read streams.
* Upstream code uses the read stream API and benefits from that.

Cons of the alternative version:

* 6 if cases are added to the acquire_sample_rows() function and 3 of
them are in the while loop.
* Because of these changes, the code looks messy.

Any kind of feedback would be appreciated.

[1] 
https://www.postgresql.org/message-id/flat/CAPpHfdurb9ycV8udYqM%3Do0sPS66PJ4RCBM1g-bBpvzUfogY0EA%40mail.gmail.com

-- 
Regards,
Nazir Bilal Yavuz
Microsoft
From 323f28ff979cde8e4dbde8b4654bded74abf1fbc Mon Sep 17 00:00:00 2001
From: Nazir Bilal Yavuz <byavu...@gmail.com>
Date: Wed, 15 May 2024 00:03:56 +0300
Subject: [PATCH v13 1/2] Revert "Use streaming I/O in ANALYZE."

This commit reverts 041b96802ef.

041b96802ef revised the changes on 27bc1772fc8 but 27bc1772fc8 and
dd1f6b0c172 are reverted together in 6377e12a5a5. So, this commit
reverts all 27bc1772fc, 041b96802ef and dd1f6b0c172 together.

Discussion: https://postgr.es/m/flat/CAN55FZ0UhXqk9v3y-zW_fp4-WCp43V8y0A72xPmLkOM%2B6M%2BmJg%40mail.gmail.com
---
 src/include/access/tableam.h             | 26 +++----
 src/backend/access/heap/heapam_handler.c | 38 +++++-----
 src/backend/commands/analyze.c           | 96 ++++++++++++++++++------
 3 files changed, 98 insertions(+), 62 deletions(-)

diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 8e583b45cd5..e08b9627f30 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -21,7 +21,6 @@
 #include "access/sdir.h"
 #include "access/xact.h"
 #include "executor/tuptable.h"
-#include "storage/read_stream.h"
 #include "utils/rel.h"
 #include "utils/snapshot.h"
 
@@ -655,16 +654,6 @@ typedef struct TableAmRoutine
 									struct VacuumParams *params,
 									BufferAccessStrategy bstrategy);
 
-	/*
-	 * Prepare to analyze the next block in the read stream.  Returns false if
-	 * the stream is exhausted and true otherwise. The scan must have been
-	 * started with SO_TYPE_ANALYZE option.
-	 *
-	 * This routine holds a buffer pin and lock on the heap page.  They are
-	 * held until heapam_scan_analyze_next_tuple() returns false.  That is
-	 * until all the items of the heap page are analyzed.
-	 */
-
 	/*
 	 * Prepare to analyze block `blockno` of `scan`. The scan has been started
 	 * with table_beginscan_analyze().  See also
@@ -683,7 +672,8 @@ typedef struct TableAmRoutine
 	 * isn't one yet.
 	 */
 	bool		(*scan_analyze_next_block) (TableScanDesc scan,
-											ReadStream *stream);
+											BlockNumber blockno,
+											BufferAccessStrategy bstrategy);
 
 	/*
 	 * See table_scan_analyze_next_tuple().
@@ -1721,17 +1711,19 @@ table_relation_vacuum(Relation rel, struct VacuumParams *params,
 }
 
 /*
- * Prepare to analyze the next block in the read stream. The scan needs to
- * have been  started with table_beginscan_analyze().  Note that this routine
- * might acquire resources like locks that are held until
+ * Prepare to analyze block `blockno` of `scan`. The scan needs to have been
+ * started with table_beginscan_analyze().  Note that this routine might
+ * acquire resources like locks that are held until
  * table_scan_analyze_next_tuple() returns false.
  *
  * Returns false if block is unsuitable for sampling, true otherwise.
  */
 static inline bool
-table_scan_analyze_next_block(TableScanDesc scan, ReadStream *stream)
+table_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno,
+							  BufferAccessStrategy bstrategy)
 {
-	return scan->rs_rd->rd_tableam->scan_analyze_next_block(scan, stream);
+	return scan->rs_rd->rd_tableam->scan_analyze_next_block(scan, blockno,
+															bstrategy);
 }
 
 /*
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 6f8b1b79298..a9c8cd4306c 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -46,6 +46,12 @@
 #include "utils/builtins.h"
 #include "utils/rel.h"
 
+static TM_Result heapam_tuple_lock(Relation relation, ItemPointer tid,
+								   Snapshot snapshot, TupleTableSlot *slot,
+								   CommandId cid, LockTupleMode mode,
+								   LockWaitPolicy wait_policy, uint8 flags,
+								   TM_FailureData *tmfd);
+
 static void reform_and_rewrite_tuple(HeapTuple tuple,
 									 Relation OldHeap, Relation NewHeap,
 									 Datum *values, bool *isnull, RewriteState rwstate);
@@ -993,36 +999,28 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
 	pfree(isnull);
 }
 
-/*
- * Prepare to analyze the next block in the read stream.  Returns false if
- * the stream is exhausted and true otherwise. The scan must have been started
- * with SO_TYPE_ANALYZE option.
- *
- * This routine holds a buffer pin and lock on the heap page.  They are held
- * until heapam_scan_analyze_next_tuple() returns false.  That is until all the
- * items of the heap page are analyzed.
- */
 static bool
-heapam_scan_analyze_next_block(TableScanDesc scan, ReadStream *stream)
+heapam_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno,
+							   BufferAccessStrategy bstrategy)
 {
 	HeapScanDesc hscan = (HeapScanDesc) scan;
 
 	/*
 	 * We must maintain a pin on the target page's buffer to ensure that
 	 * concurrent activity - e.g. HOT pruning - doesn't delete tuples out from
-	 * under us.  It comes from the stream already pinned.   We also choose to
-	 * hold sharelock on the buffer throughout --- we could release and
-	 * re-acquire sharelock for each tuple, but since we aren't doing much
-	 * work per tuple, the extra lock traffic is probably better avoided.
+	 * under us.  Hence, pin the page until we are done looking at it.  We
+	 * also choose to hold sharelock on the buffer throughout --- we could
+	 * release and re-acquire sharelock for each tuple, but since we aren't
+	 * doing much work per tuple, the extra lock traffic is probably better
+	 * avoided.
 	 */
-	hscan->rs_cbuf = read_stream_next_buffer(stream, NULL);
-	if (!BufferIsValid(hscan->rs_cbuf))
-		return false;
-
+	hscan->rs_cblock = blockno;
+	hscan->rs_cindex = FirstOffsetNumber;
+	hscan->rs_cbuf = ReadBufferExtended(scan->rs_rd, MAIN_FORKNUM,
+										blockno, RBM_NORMAL, bstrategy);
 	LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_SHARE);
 
-	hscan->rs_cblock = BufferGetBlockNumber(hscan->rs_cbuf);
-	hscan->rs_cindex = FirstOffsetNumber;
+	/* in heap all blocks can contain tuples, so always return true */
 	return true;
 }
 
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index 7d2cd249972..8a82af4a4ca 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -1102,20 +1102,6 @@ examine_attribute(Relation onerel, int attnum, Node *index_expr)
 	return stats;
 }
 
-/*
- * Read stream callback returning the next BlockNumber as chosen by the
- * BlockSampling algorithm.
- */
-static BlockNumber
-block_sampling_read_stream_next(ReadStream *stream,
-								void *callback_private_data,
-								void *per_buffer_data)
-{
-	BlockSamplerData *bs = callback_private_data;
-
-	return BlockSampler_HasMore(bs) ? BlockSampler_Next(bs) : InvalidBlockNumber;
-}
-
 /*
  * acquire_sample_rows -- acquire a random sample of rows from the table
  *
@@ -1168,7 +1154,10 @@ acquire_sample_rows(Relation onerel, int elevel,
 	TableScanDesc scan;
 	BlockNumber nblocks;
 	BlockNumber blksdone = 0;
-	ReadStream *stream;
+#ifdef USE_PREFETCH
+	int			prefetch_maximum = 0;	/* blocks to prefetch if enabled */
+	BlockSamplerData prefetch_bs;
+#endif
 
 	Assert(targrows > 0);
 
@@ -1181,6 +1170,13 @@ acquire_sample_rows(Relation onerel, int elevel,
 	randseed = pg_prng_uint32(&pg_global_prng_state);
 	nblocks = BlockSampler_Init(&bs, totalblocks, targrows, randseed);
 
+#ifdef USE_PREFETCH
+	prefetch_maximum = get_tablespace_maintenance_io_concurrency(onerel->rd_rel->reltablespace);
+	/* Create another BlockSampler, using the same seed, for prefetching */
+	if (prefetch_maximum)
+		(void) BlockSampler_Init(&prefetch_bs, totalblocks, targrows, randseed);
+#endif
+
 	/* Report sampling block numbers */
 	pgstat_progress_update_param(PROGRESS_ANALYZE_BLOCKS_TOTAL,
 								 nblocks);
@@ -1191,19 +1187,71 @@ acquire_sample_rows(Relation onerel, int elevel,
 	scan = table_beginscan_analyze(onerel);
 	slot = table_slot_create(onerel, NULL);
 
-	stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE,
-										vac_strategy,
-										scan->rs_rd,
-										MAIN_FORKNUM,
-										block_sampling_read_stream_next,
-										&bs,
-										0);
+#ifdef USE_PREFETCH
+
+	/*
+	 * If we are doing prefetching, then go ahead and tell the kernel about
+	 * the first set of pages we are going to want.  This also moves our
+	 * iterator out ahead of the main one being used, where we will keep it so
+	 * that we're always pre-fetching out prefetch_maximum number of blocks
+	 * ahead.
+	 */
+	if (prefetch_maximum)
+	{
+		for (int i = 0; i < prefetch_maximum; i++)
+		{
+			BlockNumber prefetch_block;
+
+			if (!BlockSampler_HasMore(&prefetch_bs))
+				break;
+
+			prefetch_block = BlockSampler_Next(&prefetch_bs);
+			PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, prefetch_block);
+		}
+	}
+#endif
 
 	/* Outer loop over blocks to sample */
-	while (table_scan_analyze_next_block(scan, stream))
+	while (BlockSampler_HasMore(&bs))
 	{
+		bool		block_accepted;
+		BlockNumber targblock = BlockSampler_Next(&bs);
+#ifdef USE_PREFETCH
+		BlockNumber prefetch_targblock = InvalidBlockNumber;
+
+		/*
+		 * Make sure that every time the main BlockSampler is moved forward
+		 * that our prefetch BlockSampler also gets moved forward, so that we
+		 * always stay out ahead.
+		 */
+		if (prefetch_maximum && BlockSampler_HasMore(&prefetch_bs))
+			prefetch_targblock = BlockSampler_Next(&prefetch_bs);
+#endif
+
 		vacuum_delay_point();
 
+		block_accepted = table_scan_analyze_next_block(scan, targblock, vac_strategy);
+
+#ifdef USE_PREFETCH
+
+		/*
+		 * When pre-fetching, after we get a block, tell the kernel about the
+		 * next one we will want, if there's any left.
+		 *
+		 * We want to do this even if the table_scan_analyze_next_block() call
+		 * above decides against analyzing the block it picked.
+		 */
+		if (prefetch_maximum && prefetch_targblock != InvalidBlockNumber)
+			PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, prefetch_targblock);
+#endif
+
+		/*
+		 * Don't analyze if table_scan_analyze_next_block() indicated this
+		 * block is unsuitable for analyzing.
+		 */
+		if (!block_accepted)
+			continue;
+
 		while (table_scan_analyze_next_tuple(scan, OldestXmin, &liverows, &deadrows, slot))
 		{
 			/*
@@ -1253,8 +1301,6 @@ acquire_sample_rows(Relation onerel, int elevel,
 									 ++blksdone);
 	}
 
-	read_stream_end(stream);
-
 	ExecDropSingleTupleTableSlot(slot);
 	table_endscan(scan);
 
-- 
2.43.0

From 923f0bd32e2d30bc2a2eec4a5b0e308fc45dc5cf Mon Sep 17 00:00:00 2001
From: Nazir Bilal Yavuz <byavu...@gmail.com>
Date: Wed, 15 May 2024 16:25:32 +0300
Subject: [PATCH v13 2/2] Use streaming I/O in ANALYZE (alternative).

The ANALYZE command prefetches and reads sample blocks chosen by a
BlockSampler algorithm. Instead of calling [Prefetch|Read]Buffer() for
each block, ANALYZE now uses the streaming API introduced in b5a9b18cd0.

SO_USE_READ_STREAMS_IN_ANALYZE is introduced and is used to decide
whether streaming API in ANALYZE will be used or not. This flag is set
in heap AM code. Other AMs does not set this flag. If this flag is not
set, ANALYZE code falls back to version before read stream API.

Author: Nazir Bilal Yavuz <byavu...@gmail.com>
Author: Melanie Plageman <melanieplage...@gmail.com>
Reviewed-by: Andres Freund <and...@anarazel.de>
Reviewed-by: Jakub Wartak <jakub.war...@enterprisedb.com>
Reviewed-by: Heikki Linnakangas <hlinn...@iki.fi>
Reviewed-by: Thomas Munro <thomas.mu...@gmail.com>
Discussion: https://postgr.es/m/flat/CAN55FZ0UhXqk9v3y-zW_fp4-WCp43V8y0A72xPmLkOM%2B6M%2BmJg%40mail.gmail.com
---
 src/include/access/tableam.h             |   3 +
 src/backend/access/heap/heapam.c         |   8 ++
 src/backend/access/heap/heapam_handler.c |  21 ++--
 src/backend/commands/analyze.c           | 131 ++++++++++++++++-------
 4 files changed, 114 insertions(+), 49 deletions(-)

diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index e08b9627f30..e30ab077ade 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -69,6 +69,9 @@ typedef enum ScanOptions
 	 * needed. If table data may be needed, set SO_NEED_TUPLES.
 	 */
 	SO_NEED_TUPLES = 1 << 10,
+
+	/* use read streams in ANALYZE */
+	SO_USE_READ_STREAMS_IN_ANALYZE = 1 << 11,
 }			ScanOptions;
 
 /*
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 4be0dee4de0..41a5b19d0a2 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1143,6 +1143,14 @@ heap_beginscan(Relation relation, Snapshot snapshot,
 														  0);
 	}
 
+	/*
+	 * Set USE_READ_STREAMS_IN_ANALYZE flag here, then use this flag to decide
+	 * if read streams will be used in the acquire_sample_rows() function.
+	 */
+	if (scan->rs_base.rs_flags & SO_TYPE_ANALYZE)
+	{
+		scan->rs_base.rs_flags |= SO_USE_READ_STREAMS_IN_ANALYZE;
+	}
 
 	return (TableScanDesc) scan;
 }
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index a9c8cd4306c..6200a9d9068 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -42,6 +42,7 @@
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
 #include "storage/procarray.h"
+#include "storage/read_stream.h"
 #include "storage/smgr.h"
 #include "utils/builtins.h"
 #include "utils/rel.h"
@@ -1008,19 +1009,19 @@ heapam_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno,
 	/*
 	 * We must maintain a pin on the target page's buffer to ensure that
 	 * concurrent activity - e.g. HOT pruning - doesn't delete tuples out from
-	 * under us.  Hence, pin the page until we are done looking at it.  We
-	 * also choose to hold sharelock on the buffer throughout --- we could
-	 * release and re-acquire sharelock for each tuple, but since we aren't
-	 * doing much work per tuple, the extra lock traffic is probably better
-	 * avoided.
+	 * under us.  It comes from the stream already pinned.   We also choose to
+	 * hold sharelock on the buffer throughout --- we could release and
+	 * re-acquire sharelock for each tuple, but since we aren't doing much
+	 * work per tuple, the extra lock traffic is probably better avoided.
 	 */
-	hscan->rs_cblock = blockno;
-	hscan->rs_cindex = FirstOffsetNumber;
-	hscan->rs_cbuf = ReadBufferExtended(scan->rs_rd, MAIN_FORKNUM,
-										blockno, RBM_NORMAL, bstrategy);
+	hscan->rs_cbuf = read_stream_next_buffer(hscan->rs_read_stream, NULL);
+	if (!BufferIsValid(hscan->rs_cbuf))
+		return false;
+
 	LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_SHARE);
 
-	/* in heap all blocks can contain tuples, so always return true */
+	hscan->rs_cblock = BufferGetBlockNumber(hscan->rs_cbuf);
+	hscan->rs_cindex = FirstOffsetNumber;
 	return true;
 }
 
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index 8a82af4a4ca..d858cc8160a 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -18,6 +18,7 @@
 
 #include "access/detoast.h"
 #include "access/genam.h"
+#include "access/heapam.h"
 #include "access/multixact.h"
 #include "access/relation.h"
 #include "access/table.h"
@@ -1102,6 +1103,20 @@ examine_attribute(Relation onerel, int attnum, Node *index_expr)
 	return stats;
 }
 
+/*
+ * Read stream callback returning the next BlockNumber as chosen by the
+ * BlockSampling algorithm.
+ */
+static BlockNumber
+block_sampling_read_stream_next(ReadStream *stream,
+								void *callback_private_data,
+								void *per_buffer_data)
+{
+	BlockSamplerData *bs = callback_private_data;
+
+	return BlockSampler_HasMore(bs) ? BlockSampler_Next(bs) : InvalidBlockNumber;
+}
+
 /*
  * acquire_sample_rows -- acquire a random sample of rows from the table
  *
@@ -1146,6 +1161,7 @@ acquire_sample_rows(Relation onerel, int elevel,
 	double		deadrows = 0;	/* # dead rows seen */
 	double		rowstoskip = -1;	/* -1 means not set yet */
 	uint32		randseed;		/* Seed for block sampler(s) */
+	bool		use_read_streams = false;
 	BlockNumber totalblocks;
 	TransactionId OldestXmin;
 	BlockSamplerData bs;
@@ -1170,13 +1186,6 @@ acquire_sample_rows(Relation onerel, int elevel,
 	randseed = pg_prng_uint32(&pg_global_prng_state);
 	nblocks = BlockSampler_Init(&bs, totalblocks, targrows, randseed);
 
-#ifdef USE_PREFETCH
-	prefetch_maximum = get_tablespace_maintenance_io_concurrency(onerel->rd_rel->reltablespace);
-	/* Create another BlockSampler, using the same seed, for prefetching */
-	if (prefetch_maximum)
-		(void) BlockSampler_Init(&prefetch_bs, totalblocks, targrows, randseed);
-#endif
-
 	/* Report sampling block numbers */
 	pgstat_progress_update_param(PROGRESS_ANALYZE_BLOCKS_TOTAL,
 								 nblocks);
@@ -1187,62 +1196,105 @@ acquire_sample_rows(Relation onerel, int elevel,
 	scan = table_beginscan_analyze(onerel);
 	slot = table_slot_create(onerel, NULL);
 
-#ifdef USE_PREFETCH
+	/*
+	 * SO_USE_READ_STREAMS_IN_ANALYZE flag is set in heap AM constructor,
+	 * which means heap AMs will be used. So, it is safe to use read streams
+	 * now.
+	 */
+	use_read_streams = scan->rs_flags & SO_USE_READ_STREAMS_IN_ANALYZE;
+
+	if (use_read_streams)
+	{
+		((HeapScanDesc) scan)->rs_read_stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE,
+																		   vac_strategy,
+																		   scan->rs_rd,
+																		   MAIN_FORKNUM,
+																		   block_sampling_read_stream_next,
+																		   &bs,
+																		   0);
+	}
 
 	/*
-	 * If we are doing prefetching, then go ahead and tell the kernel about
-	 * the first set of pages we are going to want.  This also moves our
-	 * iterator out ahead of the main one being used, where we will keep it so
-	 * that we're always pre-fetching out prefetch_maximum number of blocks
-	 * ahead.
+	 * Read streams have their own prefetch mechanism, so do not prefetch when
+	 * the read streams are used. This applies for all of the prefetch code in
+	 * this function.
 	 */
-	if (prefetch_maximum)
+#ifdef USE_PREFETCH
+	if (!use_read_streams)
 	{
-		for (int i = 0; i < prefetch_maximum; i++)
+		prefetch_maximum = get_tablespace_maintenance_io_concurrency(onerel->rd_rel->reltablespace);
+		/* Create another BlockSampler, using the same seed, for prefetching */
+		if (prefetch_maximum)
+			(void) BlockSampler_Init(&prefetch_bs, totalblocks, targrows, randseed);
+	}
+#endif
+
+#ifdef USE_PREFETCH
+	if (!use_read_streams)
+	{
+		/*
+		 * If we are doing prefetching, then go ahead and tell the kernel
+		 * about the first set of pages we are going to want.  This also moves
+		 * our iterator out ahead of the main one being used, where we will
+		 * keep it so that we're always pre-fetching out prefetch_maximum
+		 * number of blocks ahead.
+		 */
+		if (prefetch_maximum)
 		{
-			BlockNumber prefetch_block;
+			for (int i = 0; i < prefetch_maximum; i++)
+			{
+				BlockNumber prefetch_block;
 
-			if (!BlockSampler_HasMore(&prefetch_bs))
-				break;
+				if (!BlockSampler_HasMore(&prefetch_bs))
+					break;
 
-			prefetch_block = BlockSampler_Next(&prefetch_bs);
-			PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, prefetch_block);
+				prefetch_block = BlockSampler_Next(&prefetch_bs);
+				PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, prefetch_block);
+			}
 		}
 	}
 #endif
 
 	/* Outer loop over blocks to sample */
-	while (BlockSampler_HasMore(&bs))
+	while (nblocks)
 	{
 		bool		block_accepted;
-		BlockNumber targblock = BlockSampler_Next(&bs);
+		BlockNumber targblock = InvalidBlockNumber;
 #ifdef USE_PREFETCH
 		BlockNumber prefetch_targblock = InvalidBlockNumber;
 
-		/*
-		 * Make sure that every time the main BlockSampler is moved forward
-		 * that our prefetch BlockSampler also gets moved forward, so that we
-		 * always stay out ahead.
-		 */
-		if (prefetch_maximum && BlockSampler_HasMore(&prefetch_bs))
-			prefetch_targblock = BlockSampler_Next(&prefetch_bs);
+		if (!use_read_streams)
+		{
+			/*
+			 * Make sure that every time the main BlockSampler is moved
+			 * forward that our prefetch BlockSampler also gets moved forward,
+			 * so that we always stay out ahead.
+			 */
+			if (prefetch_maximum && BlockSampler_HasMore(&prefetch_bs))
+				prefetch_targblock = BlockSampler_Next(&prefetch_bs);
+		}
 #endif
 
+		if (!use_read_streams)
+			targblock = BlockSampler_Next(&bs);
+
 		vacuum_delay_point();
 
 		block_accepted = table_scan_analyze_next_block(scan, targblock, vac_strategy);
 
 #ifdef USE_PREFETCH
-
-		/*
-		 * When pre-fetching, after we get a block, tell the kernel about the
-		 * next one we will want, if there's any left.
-		 *
-		 * We want to do this even if the table_scan_analyze_next_block() call
-		 * above decides against analyzing the block it picked.
-		 */
-		if (prefetch_maximum && prefetch_targblock != InvalidBlockNumber)
-			PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, prefetch_targblock);
+		if (!use_read_streams)
+		{
+			/*
+			 * When pre-fetching, after we get a block, tell the kernel about
+			 * the next one we will want, if there's any left.
+			 *
+			 * We want to do this even if the table_scan_analyze_next_block()
+			 * call above decides against analyzing the block it picked.
+			 */
+			if (prefetch_maximum && prefetch_targblock != InvalidBlockNumber)
+				PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, prefetch_targblock);
+		}
 #endif
 
 		/*
@@ -1299,6 +1351,7 @@ acquire_sample_rows(Relation onerel, int elevel,
 
 		pgstat_progress_update_param(PROGRESS_ANALYZE_BLOCKS_DONE,
 									 ++blksdone);
+		nblocks--;
 	}
 
 	ExecDropSingleTupleTableSlot(slot);
-- 
2.43.0

Reply via email to