On Sun, Apr 7, 2024 at 3:57 PM Melanie Plageman
<melanieplage...@gmail.com> wrote:
>
> On Thu, Apr 04, 2024 at 02:03:30PM +0300, Nazir Bilal Yavuz wrote:
> >
> > On Wed, 3 Apr 2024 at 23:44, Melanie Plageman <melanieplage...@gmail.com> 
> > wrote:
> > >
> > > I don't see the point of BlockSampler_HasMore() anymore. I removed it in
> > > the attached and made BlockSampler_Next() return InvalidBlockNumber
> > > under the same conditions. Is there a reason not to do this? There
> > > aren't other callers. If the BlockSampler_Next() wasn't part of an API,
> > > we could just make it the streaming read callback, but that might be
> > > weird as it is now.
> >
> > I agree. There is no reason to have BlockSampler_HasMore() after
> > streaming read API changes.
> >
> > > That and my other ideas in attached. Let me know what you think.
> >
> > I agree with your changes but I am not sure if others agree with all
> > the changes you have proposed. So, I didn't merge 0001 and your ideas
> > yet, instead I wrote a commit message, added some comments, changed ->
> > 'if (bs->t >= bs->N || bs->m >= bs->n)' to 'if (K <= 0 || k <= 0)' and
> > attached it as 0002.
>
> I couldn't quite let go of those changes to acquire_sample_rows(), so
> attached v9 0001 implements them as a preliminary patch before your
> analyze streaming read user. I inlined heapam_scan_analyze_next_block()
> entirely and made heapam_scan_analyze_next_tuple() a static function in
> commands/analyze.c (and tweaked the name).
>
> I made a few tweaks to your patch since it is on top of those changes
> instead of preceding them. Then 0003 is removing BlockSampler_HasMore()
> since it doesn't make sense to remove it before the streaming read user
> was added.

I realized there were a few outdated comments. Fixed in attached v10.

- Melanie
From 1dc2343661f3edb3b1bc4307afb0e956397eb76c Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Sun, 7 Apr 2024 14:55:22 -0400
Subject: [PATCH v10 1/3] Make heapam_scan_analyze_next_[tuple|block] static.

27bc1772fc81 removed the table AM callbacks scan_analyze_next_block and
scan_analzye_next_tuple -- leaving their heap AM implementations only
called by acquire_sample_rows().

Move heapam_scan_analyze_next_tuple() to analyze.c as a static helper for
acquire_sample_rows() and inline heapam_scan_analyze_next_block().

Author: Melanie Plageman
Discussion: https://postgr.es/m/flat/CAN55FZ0UhXqk9v3y-zW_fp4-WCp43V8y0A72xPmLkOM%2B6M%2BmJg%40mail.gmail.com
---
 src/backend/access/heap/heapam_handler.c | 193 +----------------------
 src/backend/commands/analyze.c           | 181 ++++++++++++++++++++-
 src/include/access/heapam.h              |   9 --
 3 files changed, 179 insertions(+), 204 deletions(-)

diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 58de2c82a70..364dd0b165b 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -1054,189 +1054,6 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
 	pfree(isnull);
 }
 
-/*
- * Prepare to analyze block `blockno` of `scan`.  The scan has been started
- * with SO_TYPE_ANALYZE option.
- *
- * This routine holds a buffer pin and lock on the heap page.  They are held
- * until heapam_scan_analyze_next_tuple() returns false.  That is until all the
- * items of the heap page are analyzed.
- */
-void
-heapam_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno,
-							   BufferAccessStrategy bstrategy)
-{
-	HeapScanDesc hscan = (HeapScanDesc) scan;
-
-	/*
-	 * We must maintain a pin on the target page's buffer to ensure that
-	 * concurrent activity - e.g. HOT pruning - doesn't delete tuples out from
-	 * under us.  Hence, pin the page until we are done looking at it.  We
-	 * also choose to hold sharelock on the buffer throughout --- we could
-	 * release and re-acquire sharelock for each tuple, but since we aren't
-	 * doing much work per tuple, the extra lock traffic is probably better
-	 * avoided.
-	 */
-	hscan->rs_cblock = blockno;
-	hscan->rs_cindex = FirstOffsetNumber;
-	hscan->rs_cbuf = ReadBufferExtended(scan->rs_rd, MAIN_FORKNUM,
-										blockno, RBM_NORMAL, bstrategy);
-	LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_SHARE);
-}
-
-/*
- * Iterate over tuples in the block selected with
- * heapam_scan_analyze_next_block().  If a tuple that's suitable for sampling
- * is found, true is returned and a tuple is stored in `slot`.  When no more
- * tuples for sampling, false is returned and the pin and lock acquired by
- * heapam_scan_analyze_next_block() are released.
- *
- * *liverows and *deadrows are incremented according to the encountered
- * tuples.
- */
-bool
-heapam_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin,
-							   double *liverows, double *deadrows,
-							   TupleTableSlot *slot)
-{
-	HeapScanDesc hscan = (HeapScanDesc) scan;
-	Page		targpage;
-	OffsetNumber maxoffset;
-	BufferHeapTupleTableSlot *hslot;
-
-	Assert(TTS_IS_BUFFERTUPLE(slot));
-
-	hslot = (BufferHeapTupleTableSlot *) slot;
-	targpage = BufferGetPage(hscan->rs_cbuf);
-	maxoffset = PageGetMaxOffsetNumber(targpage);
-
-	/* Inner loop over all tuples on the selected page */
-	for (; hscan->rs_cindex <= maxoffset; hscan->rs_cindex++)
-	{
-		ItemId		itemid;
-		HeapTuple	targtuple = &hslot->base.tupdata;
-		bool		sample_it = false;
-
-		itemid = PageGetItemId(targpage, hscan->rs_cindex);
-
-		/*
-		 * We ignore unused and redirect line pointers.  DEAD line pointers
-		 * should be counted as dead, because we need vacuum to run to get rid
-		 * of them.  Note that this rule agrees with the way that
-		 * heap_page_prune_and_freeze() counts things.
-		 */
-		if (!ItemIdIsNormal(itemid))
-		{
-			if (ItemIdIsDead(itemid))
-				*deadrows += 1;
-			continue;
-		}
-
-		ItemPointerSet(&targtuple->t_self, hscan->rs_cblock, hscan->rs_cindex);
-
-		targtuple->t_tableOid = RelationGetRelid(scan->rs_rd);
-		targtuple->t_data = (HeapTupleHeader) PageGetItem(targpage, itemid);
-		targtuple->t_len = ItemIdGetLength(itemid);
-
-		switch (HeapTupleSatisfiesVacuum(targtuple, OldestXmin,
-										 hscan->rs_cbuf))
-		{
-			case HEAPTUPLE_LIVE:
-				sample_it = true;
-				*liverows += 1;
-				break;
-
-			case HEAPTUPLE_DEAD:
-			case HEAPTUPLE_RECENTLY_DEAD:
-				/* Count dead and recently-dead rows */
-				*deadrows += 1;
-				break;
-
-			case HEAPTUPLE_INSERT_IN_PROGRESS:
-
-				/*
-				 * Insert-in-progress rows are not counted.  We assume that
-				 * when the inserting transaction commits or aborts, it will
-				 * send a stats message to increment the proper count.  This
-				 * works right only if that transaction ends after we finish
-				 * analyzing the table; if things happen in the other order,
-				 * its stats update will be overwritten by ours.  However, the
-				 * error will be large only if the other transaction runs long
-				 * enough to insert many tuples, so assuming it will finish
-				 * after us is the safer option.
-				 *
-				 * A special case is that the inserting transaction might be
-				 * our own.  In this case we should count and sample the row,
-				 * to accommodate users who load a table and analyze it in one
-				 * transaction.  (pgstat_report_analyze has to adjust the
-				 * numbers we report to the cumulative stats system to make
-				 * this come out right.)
-				 */
-				if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(targtuple->t_data)))
-				{
-					sample_it = true;
-					*liverows += 1;
-				}
-				break;
-
-			case HEAPTUPLE_DELETE_IN_PROGRESS:
-
-				/*
-				 * We count and sample delete-in-progress rows the same as
-				 * live ones, so that the stats counters come out right if the
-				 * deleting transaction commits after us, per the same
-				 * reasoning given above.
-				 *
-				 * If the delete was done by our own transaction, however, we
-				 * must count the row as dead to make pgstat_report_analyze's
-				 * stats adjustments come out right.  (Note: this works out
-				 * properly when the row was both inserted and deleted in our
-				 * xact.)
-				 *
-				 * The net effect of these choices is that we act as though an
-				 * IN_PROGRESS transaction hasn't happened yet, except if it
-				 * is our own transaction, which we assume has happened.
-				 *
-				 * This approach ensures that we behave sanely if we see both
-				 * the pre-image and post-image rows for a row being updated
-				 * by a concurrent transaction: we will sample the pre-image
-				 * but not the post-image.  We also get sane results if the
-				 * concurrent transaction never commits.
-				 */
-				if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetUpdateXid(targtuple->t_data)))
-					*deadrows += 1;
-				else
-				{
-					sample_it = true;
-					*liverows += 1;
-				}
-				break;
-
-			default:
-				elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
-				break;
-		}
-
-		if (sample_it)
-		{
-			ExecStoreBufferHeapTuple(targtuple, slot, hscan->rs_cbuf);
-			hscan->rs_cindex++;
-
-			/* note that we leave the buffer locked here! */
-			return true;
-		}
-	}
-
-	/* Now release the lock and pin on the page */
-	UnlockReleaseBuffer(hscan->rs_cbuf);
-	hscan->rs_cbuf = InvalidBuffer;
-
-	/* also prevent old slot contents from having pin on page */
-	ExecClearTuple(slot);
-
-	return false;
-}
-
 static double
 heapam_index_build_range_scan(Relation heapRelation,
 							  Relation indexRelation,
@@ -1476,7 +1293,7 @@ heapam_index_build_range_scan(Relation heapRelation,
 
 			/*
 			 * The criteria for counting a tuple as live in this block need to
-			 * match what analyze.c's heapam_scan_analyze_next_tuple() does,
+			 * match what analyze.c's heap_scan_analyze_next_tuple() does,
 			 * otherwise CREATE INDEX and ANALYZE may produce wildly different
 			 * reltuples values, e.g. when there are many recently-dead
 			 * tuples.
@@ -1510,7 +1327,7 @@ heapam_index_build_range_scan(Relation heapRelation,
 					 * index as unusable for them.
 					 *
 					 * We don't count recently-dead tuples in reltuples, even
-					 * if we index them; see heapam_scan_analyze_next_tuple().
+					 * if we index them; see heap_scan_analyze_next_tuple().
 					 */
 					if (HeapTupleIsHotUpdated(heapTuple))
 					{
@@ -1575,7 +1392,7 @@ heapam_index_build_range_scan(Relation heapRelation,
 					{
 						/*
 						 * For consistency with
-						 * heapam_scan_analyze_next_tuple(), count
+						 * heap_scan_analyze_next_tuple(), count
 						 * HEAPTUPLE_INSERT_IN_PROGRESS tuples as live only
 						 * when inserted by our own transaction.
 						 */
@@ -1648,8 +1465,8 @@ heapam_index_build_range_scan(Relation heapRelation,
 						/*
 						 * Count HEAPTUPLE_DELETE_IN_PROGRESS tuples as live,
 						 * if they were not deleted by the current
-						 * transaction.  That's what
-						 * heapam_scan_analyze_next_tuple() does, and we want
+						 * transaction. That's what
+						 * heap_scan_analyze_next_tuple() does, and we want
 						 * the behavior to be consistent.
 						 */
 						reltuples += 1;
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index 2fb39f3ede1..335ffb24302 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -1102,6 +1102,157 @@ examine_attribute(Relation onerel, int attnum, Node *index_expr)
 	return stats;
 }
 
+/*
+ * Iterate over tuples in the block pinned and locked in scan->rs_cbuf by
+ * acquire_sample_rows(). If a tuple that's suitable for sampling is found,
+ * store the tuple in `slot` and return true. When there are no more tuples for
+ * sampling, return false.
+ *
+ * *liverows and *deadrows are incremented according to the encountered
+ * tuples.
+ */
+static bool
+heap_scan_analyze_next_tuple(HeapScanDesc scan, TransactionId OldestXmin,
+							 double *liverows, double *deadrows,
+							 TupleTableSlot *slot)
+{
+	Page		targpage;
+	OffsetNumber maxoffset;
+	BufferHeapTupleTableSlot *hslot;
+
+	Assert(TTS_IS_BUFFERTUPLE(slot));
+
+	hslot = (BufferHeapTupleTableSlot *) slot;
+	targpage = BufferGetPage(scan->rs_cbuf);
+	maxoffset = PageGetMaxOffsetNumber(targpage);
+
+	/* Inner loop over all tuples on the selected page */
+	for (; scan->rs_cindex <= maxoffset; scan->rs_cindex++)
+	{
+		ItemId		itemid;
+		HeapTuple	targtuple = &hslot->base.tupdata;
+		bool		sample_it = false;
+
+		itemid = PageGetItemId(targpage, scan->rs_cindex);
+
+		/*
+		 * We ignore unused and redirect line pointers.  DEAD line pointers
+		 * should be counted as dead, because we need vacuum to run to get rid
+		 * of them.  Note that this rule agrees with the way that
+		 * heap_page_prune_and_freeze() counts things.
+		 */
+		if (!ItemIdIsNormal(itemid))
+		{
+			if (ItemIdIsDead(itemid))
+				*deadrows += 1;
+			continue;
+		}
+
+		ItemPointerSet(&targtuple->t_self, scan->rs_cblock, scan->rs_cindex);
+
+		targtuple->t_tableOid = RelationGetRelid(scan->rs_base.rs_rd);
+		targtuple->t_data = (HeapTupleHeader) PageGetItem(targpage, itemid);
+		targtuple->t_len = ItemIdGetLength(itemid);
+
+		switch (HeapTupleSatisfiesVacuum(targtuple, OldestXmin,
+										 scan->rs_cbuf))
+		{
+			case HEAPTUPLE_LIVE:
+				sample_it = true;
+				*liverows += 1;
+				break;
+
+			case HEAPTUPLE_DEAD:
+			case HEAPTUPLE_RECENTLY_DEAD:
+				/* Count dead and recently-dead rows */
+				*deadrows += 1;
+				break;
+
+			case HEAPTUPLE_INSERT_IN_PROGRESS:
+
+				/*
+				 * Insert-in-progress rows are not counted.  We assume that
+				 * when the inserting transaction commits or aborts, it will
+				 * send a stats message to increment the proper count.  This
+				 * works right only if that transaction ends after we finish
+				 * analyzing the table; if things happen in the other order,
+				 * its stats update will be overwritten by ours.  However, the
+				 * error will be large only if the other transaction runs long
+				 * enough to insert many tuples, so assuming it will finish
+				 * after us is the safer option.
+				 *
+				 * A special case is that the inserting transaction might be
+				 * our own.  In this case we should count and sample the row,
+				 * to accommodate users who load a table and analyze it in one
+				 * transaction.  (pgstat_report_analyze has to adjust the
+				 * numbers we report to the cumulative stats system to make
+				 * this come out right.)
+				 */
+				if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(targtuple->t_data)))
+				{
+					sample_it = true;
+					*liverows += 1;
+				}
+				break;
+
+			case HEAPTUPLE_DELETE_IN_PROGRESS:
+
+				/*
+				 * We count and sample delete-in-progress rows the same as
+				 * live ones, so that the stats counters come out right if the
+				 * deleting transaction commits after us, per the same
+				 * reasoning given above.
+				 *
+				 * If the delete was done by our own transaction, however, we
+				 * must count the row as dead to make pgstat_report_analyze's
+				 * stats adjustments come out right.  (Note: this works out
+				 * properly when the row was both inserted and deleted in our
+				 * xact.)
+				 *
+				 * The net effect of these choices is that we act as though an
+				 * IN_PROGRESS transaction hasn't happened yet, except if it
+				 * is our own transaction, which we assume has happened.
+				 *
+				 * This approach ensures that we behave sanely if we see both
+				 * the pre-image and post-image rows for a row being updated
+				 * by a concurrent transaction: we will sample the pre-image
+				 * but not the post-image.  We also get sane results if the
+				 * concurrent transaction never commits.
+				 */
+				if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetUpdateXid(targtuple->t_data)))
+					*deadrows += 1;
+				else
+				{
+					sample_it = true;
+					*liverows += 1;
+				}
+				break;
+
+			default:
+				elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
+				break;
+		}
+
+		if (sample_it)
+		{
+			ExecStoreBufferHeapTuple(targtuple, slot, scan->rs_cbuf);
+			scan->rs_cindex++;
+
+			/* note that we leave the buffer locked here! */
+			return true;
+		}
+	}
+
+	/* Now release the lock and pin on the page */
+	UnlockReleaseBuffer(scan->rs_cbuf);
+	scan->rs_cbuf = InvalidBuffer;
+
+	/* Prevent old slot contents from having pin on page */
+	ExecClearTuple(slot);
+
+	return false;
+}
+
 /*
  * acquire_sample_rows -- acquire a random sample of rows from the heap
  *
@@ -1151,7 +1302,7 @@ acquire_sample_rows(Relation onerel, int elevel,
 	BlockSamplerData bs;
 	ReservoirStateData rstate;
 	TupleTableSlot *slot;
-	TableScanDesc scan;
+	HeapScanDesc scan;
 	BlockNumber nblocks;
 	BlockNumber blksdone = 0;
 #ifdef USE_PREFETCH
@@ -1184,7 +1335,7 @@ acquire_sample_rows(Relation onerel, int elevel,
 	/* Prepare for sampling rows */
 	reservoir_init_selection_state(&rstate, targrows);
 
-	scan = heap_beginscan(onerel, NULL, 0, NULL, NULL, SO_TYPE_ANALYZE);
+	scan = (HeapScanDesc) heap_beginscan(onerel, NULL, 0, NULL, NULL, SO_TYPE_ANALYZE);
 	slot = table_slot_create(onerel, NULL);
 
 #ifdef USE_PREFETCH
@@ -1206,11 +1357,13 @@ acquire_sample_rows(Relation onerel, int elevel,
 				break;
 
 			prefetch_block = BlockSampler_Next(&prefetch_bs);
-			PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, prefetch_block);
+			PrefetchBuffer(scan->rs_base.rs_rd, MAIN_FORKNUM, prefetch_block);
 		}
 	}
 #endif
 
+	scan->rs_cbuf = InvalidBuffer;
+
 	/* Outer loop over blocks to sample */
 	while (BlockSampler_HasMore(&bs))
 	{
@@ -1229,7 +1382,21 @@ acquire_sample_rows(Relation onerel, int elevel,
 
 		vacuum_delay_point();
 
-		heapam_scan_analyze_next_block(scan, targblock, vac_strategy);
+		scan->rs_cblock = targblock;
+		scan->rs_cindex = FirstOffsetNumber;
+
+		/*
+		 * We must maintain a pin on the target page's buffer to ensure that
+		 * concurrent activity - e.g. HOT pruning - doesn't delete tuples out
+		 * from under us.  Hence, pin the page until we are done looking at
+		 * it.  We also choose to hold sharelock on the buffer throughout ---
+		 * we could release and re-acquire sharelock for each tuple, but since
+		 * we aren't doing much work per tuple, the extra lock traffic is
+		 * probably better avoided.
+		 */
+		scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM,
+										   targblock, RBM_NORMAL, vac_strategy);
+		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
 
 #ifdef USE_PREFETCH
 
@@ -1238,10 +1405,10 @@ acquire_sample_rows(Relation onerel, int elevel,
 		 * next one we will want, if there's any left.
 		 */
 		if (prefetch_maximum && prefetch_targblock != InvalidBlockNumber)
-			PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, prefetch_targblock);
+			PrefetchBuffer(scan->rs_base.rs_rd, MAIN_FORKNUM, prefetch_targblock);
 #endif
 
-		while (heapam_scan_analyze_next_tuple(scan, OldestXmin, &liverows, &deadrows, slot))
+		while (heap_scan_analyze_next_tuple(scan, OldestXmin, &liverows, &deadrows, slot))
 		{
 			/*
 			 * The first targrows sample rows are simply copied into the
@@ -1291,7 +1458,7 @@ acquire_sample_rows(Relation onerel, int elevel,
 	}
 
 	ExecDropSingleTupleTableSlot(slot);
-	heap_endscan(scan);
+	heap_endscan((TableScanDesc) scan);
 
 	/*
 	 * If we didn't find as many tuples as we wanted then we're done. No sort
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 48936826bcc..be630620d0d 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -412,15 +412,6 @@ extern bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple);
 extern bool HeapTupleIsSurelyDead(HeapTuple htup,
 								  struct GlobalVisState *vistest);
 
-/* in heap/heapam_handler.c*/
-extern void heapam_scan_analyze_next_block(TableScanDesc scan,
-										   BlockNumber blockno,
-										   BufferAccessStrategy bstrategy);
-extern bool heapam_scan_analyze_next_tuple(TableScanDesc scan,
-										   TransactionId OldestXmin,
-										   double *liverows, double *deadrows,
-										   TupleTableSlot *slot);
-
 /*
  * To avoid leaking too much knowledge about reorderbuffer implementation
  * details this is implemented in reorderbuffer.c not heapam_visibility.c
-- 
2.40.1

From 90d115c2401567be65bcf64393a6d3b39286779e Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Sun, 7 Apr 2024 15:28:32 -0400
Subject: [PATCH v10 2/3] Use streaming read API in ANALYZE

The ANALYZE command prefetches and reads sample blocks chosen by a
BlockSampler algorithm. Instead of calling Prefetch|ReadBuffer() for
each block, ANALYZE now uses the streaming API introduced in b5a9b18cd0.

Author: Nazir Bilal Yavuz
Reviewed-by: Melanie Plageman
Discussion: https://postgr.es/m/flat/CAN55FZ0UhXqk9v3y-zW_fp4-WCp43V8y0A72xPmLkOM%2B6M%2BmJg%40mail.gmail.com
---
 src/backend/commands/analyze.c | 89 ++++++++++------------------------
 1 file changed, 26 insertions(+), 63 deletions(-)

diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index 335ffb24302..3cfad92390d 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -1253,6 +1253,20 @@ heap_scan_analyze_next_tuple(HeapScanDesc scan, TransactionId OldestXmin,
 	return false;
 }
 
+/*
+ * Streaming read callback returning the next block number while using
+ * BlockSampling algorithm.
+ */
+static BlockNumber
+block_sampling_streaming_read_next(ReadStream *stream,
+								   void *user_data,
+								   void *per_buffer_data)
+{
+	BlockSamplerData *bs = user_data;
+
+	return BlockSampler_HasMore(bs) ? BlockSampler_Next(bs) : InvalidBlockNumber;
+}
+
 /*
  * acquire_sample_rows -- acquire a random sample of rows from the heap
  *
@@ -1305,10 +1319,7 @@ acquire_sample_rows(Relation onerel, int elevel,
 	HeapScanDesc scan;
 	BlockNumber nblocks;
 	BlockNumber blksdone = 0;
-#ifdef USE_PREFETCH
-	int			prefetch_maximum = 0;	/* blocks to prefetch if enabled */
-	BlockSamplerData prefetch_bs;
-#endif
+	ReadStream *stream;
 
 	Assert(targrows > 0);
 
@@ -1321,13 +1332,6 @@ acquire_sample_rows(Relation onerel, int elevel,
 	randseed = pg_prng_uint32(&pg_global_prng_state);
 	nblocks = BlockSampler_Init(&bs, totalblocks, targrows, randseed);
 
-#ifdef USE_PREFETCH
-	prefetch_maximum = get_tablespace_maintenance_io_concurrency(onerel->rd_rel->reltablespace);
-	/* Create another BlockSampler, using the same seed, for prefetching */
-	if (prefetch_maximum)
-		(void) BlockSampler_Init(&prefetch_bs, totalblocks, targrows, randseed);
-#endif
-
 	/* Report sampling block numbers */
 	pgstat_progress_update_param(PROGRESS_ANALYZE_BLOCKS_TOTAL,
 								 nblocks);
@@ -1337,54 +1341,21 @@ acquire_sample_rows(Relation onerel, int elevel,
 
 	scan = (HeapScanDesc) heap_beginscan(onerel, NULL, 0, NULL, NULL, SO_TYPE_ANALYZE);
 	slot = table_slot_create(onerel, NULL);
-
-#ifdef USE_PREFETCH
-
-	/*
-	 * If we are doing prefetching, then go ahead and tell the kernel about
-	 * the first set of pages we are going to want.  This also moves our
-	 * iterator out ahead of the main one being used, where we will keep it so
-	 * that we're always pre-fetching out prefetch_maximum number of blocks
-	 * ahead.
-	 */
-	if (prefetch_maximum)
-	{
-		for (int i = 0; i < prefetch_maximum; i++)
-		{
-			BlockNumber prefetch_block;
-
-			if (!BlockSampler_HasMore(&prefetch_bs))
-				break;
-
-			prefetch_block = BlockSampler_Next(&prefetch_bs);
-			PrefetchBuffer(scan->rs_base.rs_rd, MAIN_FORKNUM, prefetch_block);
-		}
-	}
-#endif
+	stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE,
+										vac_strategy,
+										scan->rs_base.rs_rd,
+										MAIN_FORKNUM,
+										block_sampling_streaming_read_next,
+										&bs,
+										0);
 
 	scan->rs_cbuf = InvalidBuffer;
 
 	/* Outer loop over blocks to sample */
-	while (BlockSampler_HasMore(&bs))
+	while (BufferIsValid(scan->rs_cbuf = read_stream_next_buffer(stream, NULL)))
 	{
-		BlockNumber targblock = BlockSampler_Next(&bs);
-#ifdef USE_PREFETCH
-		BlockNumber prefetch_targblock = InvalidBlockNumber;
-
-		/*
-		 * Make sure that every time the main BlockSampler is moved forward
-		 * that our prefetch BlockSampler also gets moved forward, so that we
-		 * always stay out ahead.
-		 */
-		if (prefetch_maximum && BlockSampler_HasMore(&prefetch_bs))
-			prefetch_targblock = BlockSampler_Next(&prefetch_bs);
-#endif
-
 		vacuum_delay_point();
 
-		scan->rs_cblock = targblock;
-		scan->rs_cindex = FirstOffsetNumber;
-
 		/*
 		 * We must maintain a pin on the target page's buffer to ensure that
 		 * concurrent activity - e.g. HOT pruning - doesn't delete tuples out
@@ -1394,19 +1365,10 @@ acquire_sample_rows(Relation onerel, int elevel,
 		 * we aren't doing much work per tuple, the extra lock traffic is
 		 * probably better avoided.
 		 */
-		scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM,
-										   targblock, RBM_NORMAL, vac_strategy);
 		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
 
-#ifdef USE_PREFETCH
-
-		/*
-		 * When pre-fetching, after we get a block, tell the kernel about the
-		 * next one we will want, if there's any left.
-		 */
-		if (prefetch_maximum && prefetch_targblock != InvalidBlockNumber)
-			PrefetchBuffer(scan->rs_base.rs_rd, MAIN_FORKNUM, prefetch_targblock);
-#endif
+		scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf);
+		scan->rs_cindex = FirstOffsetNumber;
 
 		while (heap_scan_analyze_next_tuple(scan, OldestXmin, &liverows, &deadrows, slot))
 		{
@@ -1456,6 +1418,7 @@ acquire_sample_rows(Relation onerel, int elevel,
 		pgstat_progress_update_param(PROGRESS_ANALYZE_BLOCKS_DONE,
 									 ++blksdone);
 	}
+	read_stream_end(stream);
 
 	ExecDropSingleTupleTableSlot(slot);
 	heap_endscan((TableScanDesc) scan);
-- 
2.40.1

From 862b7ac81cdafcda7b525e02721da14e46265509 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Sun, 7 Apr 2024 15:38:41 -0400
Subject: [PATCH v10 3/3] Obsolete BlockSampler_HasMore()

A previous commit stopped using BlockSampler_HasMore() for flow control
in acquire_sample_rows(). There seems little use now for
BlockSampler_HasMore(). It should be sufficient to return
InvalidBlockNumber from BlockSampler_Next() when BlockSample_HasMore()
would have returned false. Remove BlockSampler_HasMore().

Author: Melanie Plageman, Nazir Bilal Yavuz
Discussion: https://postgr.es/m/flat/CAN55FZ0UhXqk9v3y-zW_fp4-WCp43V8y0A72xPmLkOM%2B6M%2BmJg%40mail.gmail.com
---
 src/backend/commands/analyze.c    |  4 +---
 src/backend/utils/misc/sampling.c | 11 ++++-------
 src/include/utils/sampling.h      |  1 -
 3 files changed, 5 insertions(+), 11 deletions(-)

diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index 3cfad92390d..95d113bb907 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -1262,9 +1262,7 @@ block_sampling_streaming_read_next(ReadStream *stream,
 								   void *user_data,
 								   void *per_buffer_data)
 {
-	BlockSamplerData *bs = user_data;
-
-	return BlockSampler_HasMore(bs) ? BlockSampler_Next(bs) : InvalidBlockNumber;
+	return BlockSampler_Next(user_data);
 }
 
 /*
diff --git a/src/backend/utils/misc/sampling.c b/src/backend/utils/misc/sampling.c
index 933db06702c..245d826affe 100644
--- a/src/backend/utils/misc/sampling.c
+++ b/src/backend/utils/misc/sampling.c
@@ -54,12 +54,6 @@ BlockSampler_Init(BlockSampler bs, BlockNumber nblocks, int samplesize,
 	return Min(bs->n, bs->N);
 }
 
-bool
-BlockSampler_HasMore(BlockSampler bs)
-{
-	return (bs->t < bs->N) && (bs->m < bs->n);
-}
-
 BlockNumber
 BlockSampler_Next(BlockSampler bs)
 {
@@ -68,7 +62,10 @@ BlockSampler_Next(BlockSampler bs)
 	double		p;				/* probability to skip block */
 	double		V;				/* random */
 
-	Assert(BlockSampler_HasMore(bs));	/* hence K > 0 and k > 0 */
+
+	/* Return if no remaining blocks or no blocks to sample */
+	if (K <= 0 || k <= 0)
+		return InvalidBlockNumber;
 
 	if ((BlockNumber) k >= K)
 	{
diff --git a/src/include/utils/sampling.h b/src/include/utils/sampling.h
index be48ee52bac..fb5d6820a24 100644
--- a/src/include/utils/sampling.h
+++ b/src/include/utils/sampling.h
@@ -38,7 +38,6 @@ typedef BlockSamplerData *BlockSampler;
 
 extern BlockNumber BlockSampler_Init(BlockSampler bs, BlockNumber nblocks,
 									 int samplesize, uint32 randseed);
-extern bool BlockSampler_HasMore(BlockSampler bs);
 extern BlockNumber BlockSampler_Next(BlockSampler bs);
 
 /* Reservoir sampling methods */
-- 
2.40.1

Reply via email to