Hi,

Thank you for looking into this!

On Wed, 3 Apr 2024 at 20:17, Heikki Linnakangas <hlinn...@iki.fi> wrote:
>
> On 03/04/2024 13:31, Nazir Bilal Yavuz wrote:
> > Streaming API has been committed but the committed version has a minor
> > change, the read_stream_begin_relation function takes Relation instead
> > of BufferManagerRelation now. So, here is a v5 which addresses this
> > change.
>
> I'm getting a repeatable segfault / assertion failure with this:
>
> postgres=# CREATE TABLE tengiga (i int, filler text) with (fillfactor=10);
> CREATE TABLE
> postgres=# insert into tengiga select g, repeat('x', 900) from
> generate_series(1, 1400000) g;
> INSERT 0 1400000
> postgres=# set default_statistics_target = 10; ANALYZE tengiga;
> SET
> ANALYZE
> postgres=# set default_statistics_target = 100; ANALYZE tengiga;
> SET
> ANALYZE
> postgres=# set default_statistics_target =1000; ANALYZE tengiga;
> SET
> server closed the connection unexpectedly
>         This probably means the server terminated abnormally
>         before or while processing the request.
>
> TRAP: failed Assert("BufferIsValid(hscan->rs_cbuf)"), File:
> "heapam_handler.c", Line: 1079, PID: 262232
> postgres: heikki postgres [local]
> ANALYZE(ExceptionalCondition+0xa8)[0x56488a0de9d8]
> postgres: heikki postgres [local]
> ANALYZE(heapam_scan_analyze_next_block+0x63)[0x5648899ece34]
> postgres: heikki postgres [local] ANALYZE(+0x2d3f34)[0x564889b6af34]
> postgres: heikki postgres [local] ANALYZE(+0x2d2a3a)[0x564889b69a3a]
> postgres: heikki postgres [local] ANALYZE(analyze_rel+0x33e)[0x564889b68fa9]
> postgres: heikki postgres [local] ANALYZE(vacuum+0x4b3)[0x564889c2dcc0]
> postgres: heikki postgres [local] ANALYZE(ExecVacuum+0xd6f)[0x564889c2d7fe]
> postgres: heikki postgres [local]
> ANALYZE(standard_ProcessUtility+0x901)[0x564889f0b8b9]
> postgres: heikki postgres [local]
> ANALYZE(ProcessUtility+0x136)[0x564889f0afb1]
> postgres: heikki postgres [local] ANALYZE(+0x6728c8)[0x564889f098c8]
> postgres: heikki postgres [local] ANALYZE(+0x672b3b)[0x564889f09b3b]
> postgres: heikki postgres [local] ANALYZE(PortalRun+0x320)[0x564889f09015]
> postgres: heikki postgres [local] ANALYZE(+0x66b2c6)[0x564889f022c6]
> postgres: heikki postgres [local]
> ANALYZE(PostgresMain+0x80c)[0x564889f06fd7]
> postgres: heikki postgres [local] ANALYZE(+0x667876)[0x564889efe876]
> postgres: heikki postgres [local]
> ANALYZE(postmaster_child_launch+0xe6)[0x564889e1f4b3]
> postgres: heikki postgres [local] ANALYZE(+0x58e68e)[0x564889e2568e]
> postgres: heikki postgres [local] ANALYZE(+0x58b7f0)[0x564889e227f0]
> postgres: heikki postgres [local]
> ANALYZE(PostmasterMain+0x152b)[0x564889e2214d]
> postgres: heikki postgres [local] ANALYZE(+0x4444b4)[0x564889cdb4b4]
> /lib/x86_64-linux-gnu/libc.so.6(+0x2724a)[0x7f7d83b6724a]
> /lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0x85)[0x7f7d83b67305]
> postgres: heikki postgres [local] ANALYZE(_start+0x21)[0x564889971a61]
> 2024-04-03 20:15:49.157 EEST [262101] LOG:  server process (PID 262232)
> was terminated by signal 6: Aborted

I realized the same error while working on Jakub's benchmarking results.

Cause: I was using the nblocks variable to check how many blocks will
be returned from the streaming API. But I realized that sometimes the
number returned from BlockSampler_Init() is not equal to the number of
blocks that BlockSampler_Next() will return as BlockSampling algorithm
decides how many blocks to return on the fly by using some random
seeds.

There are a couple of solutions I thought of:

1- Use BlockSampler_HasMore() instead of nblocks in the main loop in
the acquire_sample_rows():

Streaming API uses this function to prefetch block numbers.
BlockSampler_HasMore() will reach to the end first as it is used while
prefetching, so it will start to return false while there are still
buffers to return from the streaming API. That will cause some buffers
at the end to not be processed.

2- Expose something (function, variable etc.) from the streaming API
to understand if the read is finished and there is no buffer to
return:

I think this works but I am not sure if the streaming API allows
something like that.

3- Check every buffer returned from the streaming API, if it is
invalid stop the main loop in the acquire_sample_rows():

This solves the problem but there will be two if checks for each
buffer returned,
- in heapam_scan_analyze_next_block() to check if the returned buffer is invalid
- to break main loop in acquire_sample_rows() if
heapam_scan_analyze_next_block() returns false
One of the if cases can be bypassed by moving
heapam_scan_analyze_next_block()'s code to the main loop in the
acquire_sample_rows().

I implemented the third solution, here is v6.

-- 
Regards,
Nazir Bilal Yavuz
Microsoft
From 8d396a42186325f920d5a05e7092d8e1b66f3cdf Mon Sep 17 00:00:00 2001
From: Nazir Bilal Yavuz <byavu...@gmail.com>
Date: Wed, 3 Apr 2024 15:14:15 +0300
Subject: [PATCH v6] Use streaming read API in ANALYZE

ANALYZE command gets random tuples using BlockSampler algorithm. Use
streaming reads to get these tuples by using BlockSampler algorithm in
streaming read API prefetch logic.
---
 src/include/access/heapam.h              |  6 +-
 src/backend/access/heap/heapam_handler.c | 22 +++---
 src/backend/commands/analyze.c           | 85 ++++++++----------------
 3 files changed, 42 insertions(+), 71 deletions(-)

diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index a307fb5f245..633caee9d95 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -25,6 +25,7 @@
 #include "storage/bufpage.h"
 #include "storage/dsm.h"
 #include "storage/lockdefs.h"
+#include "storage/read_stream.h"
 #include "storage/shm_toc.h"
 #include "utils/relcache.h"
 #include "utils/snapshot.h"
@@ -388,9 +389,8 @@ extern bool HeapTupleIsSurelyDead(HeapTuple htup,
 								  struct GlobalVisState *vistest);
 
 /* in heap/heapam_handler.c*/
-extern void heapam_scan_analyze_next_block(TableScanDesc scan,
-										   BlockNumber blockno,
-										   BufferAccessStrategy bstrategy);
+extern bool heapam_scan_analyze_next_block(TableScanDesc scan,
+										   ReadStream *stream);
 extern bool heapam_scan_analyze_next_tuple(TableScanDesc scan,
 										   TransactionId OldestXmin,
 										   double *liverows, double *deadrows,
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 0952d4a98eb..d83fbbe6af3 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -1054,16 +1054,16 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
 }
 
 /*
- * Prepare to analyze block `blockno` of `scan`.  The scan has been started
- * with SO_TYPE_ANALYZE option.
+ * Prepare to analyze block returned from streaming object.  If the block returned
+ * from streaming object is valid, true is returned; otherwise false is returned.
+ * The scan has been started with SO_TYPE_ANALYZE option.
  *
  * This routine holds a buffer pin and lock on the heap page.  They are held
  * until heapam_scan_analyze_next_tuple() returns false.  That is until all the
  * items of the heap page are analyzed.
  */
-void
-heapam_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno,
-							   BufferAccessStrategy bstrategy)
+bool
+heapam_scan_analyze_next_block(TableScanDesc scan, ReadStream *stream)
 {
 	HeapScanDesc hscan = (HeapScanDesc) scan;
 
@@ -1076,11 +1076,15 @@ heapam_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno,
 	 * doing much work per tuple, the extra lock traffic is probably better
 	 * avoided.
 	 */
-	hscan->rs_cblock = blockno;
-	hscan->rs_cindex = FirstOffsetNumber;
-	hscan->rs_cbuf = ReadBufferExtended(scan->rs_rd, MAIN_FORKNUM,
-										blockno, RBM_NORMAL, bstrategy);
+	hscan->rs_cbuf = read_stream_next_buffer(stream, NULL);
+	if (hscan->rs_cbuf == InvalidBuffer)
+		return false;
+
 	LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_SHARE);
+
+	hscan->rs_cblock = BufferGetBlockNumber(hscan->rs_cbuf);
+	hscan->rs_cindex = FirstOffsetNumber;
+	return true;
 }
 
 /*
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index 2fb39f3ede1..764520d5aa2 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -1102,6 +1102,20 @@ examine_attribute(Relation onerel, int attnum, Node *index_expr)
 	return stats;
 }
 
+/*
+ * Prefetch callback function to get next block number while using
+ * BlockSampling algorithm
+ */
+static BlockNumber
+block_sampling_streaming_read_next(ReadStream *stream,
+								   void *user_data,
+								   void *per_buffer_data)
+{
+	BlockSamplerData *bs = user_data;
+
+	return BlockSampler_HasMore(bs) ? BlockSampler_Next(bs) : InvalidBlockNumber;
+}
+
 /*
  * acquire_sample_rows -- acquire a random sample of rows from the heap
  *
@@ -1154,10 +1168,7 @@ acquire_sample_rows(Relation onerel, int elevel,
 	TableScanDesc scan;
 	BlockNumber nblocks;
 	BlockNumber blksdone = 0;
-#ifdef USE_PREFETCH
-	int			prefetch_maximum = 0;	/* blocks to prefetch if enabled */
-	BlockSamplerData prefetch_bs;
-#endif
+	ReadStream *stream;
 
 	Assert(targrows > 0);
 
@@ -1170,13 +1181,6 @@ acquire_sample_rows(Relation onerel, int elevel,
 	randseed = pg_prng_uint32(&pg_global_prng_state);
 	nblocks = BlockSampler_Init(&bs, totalblocks, targrows, randseed);
 
-#ifdef USE_PREFETCH
-	prefetch_maximum = get_tablespace_maintenance_io_concurrency(onerel->rd_rel->reltablespace);
-	/* Create another BlockSampler, using the same seed, for prefetching */
-	if (prefetch_maximum)
-		(void) BlockSampler_Init(&prefetch_bs, totalblocks, targrows, randseed);
-#endif
-
 	/* Report sampling block numbers */
 	pgstat_progress_update_param(PROGRESS_ANALYZE_BLOCKS_TOTAL,
 								 nblocks);
@@ -1187,59 +1191,21 @@ acquire_sample_rows(Relation onerel, int elevel,
 	scan = heap_beginscan(onerel, NULL, 0, NULL, NULL, SO_TYPE_ANALYZE);
 	slot = table_slot_create(onerel, NULL);
 
-#ifdef USE_PREFETCH
-
-	/*
-	 * If we are doing prefetching, then go ahead and tell the kernel about
-	 * the first set of pages we are going to want.  This also moves our
-	 * iterator out ahead of the main one being used, where we will keep it so
-	 * that we're always pre-fetching out prefetch_maximum number of blocks
-	 * ahead.
-	 */
-	if (prefetch_maximum)
-	{
-		for (int i = 0; i < prefetch_maximum; i++)
-		{
-			BlockNumber prefetch_block;
-
-			if (!BlockSampler_HasMore(&prefetch_bs))
-				break;
-
-			prefetch_block = BlockSampler_Next(&prefetch_bs);
-			PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, prefetch_block);
-		}
-	}
-#endif
+	stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE,
+										vac_strategy,
+										scan->rs_rd,
+										MAIN_FORKNUM,
+										block_sampling_streaming_read_next,
+										&bs,
+										0);
 
 	/* Outer loop over blocks to sample */
-	while (BlockSampler_HasMore(&bs))
+	while (true)
 	{
-		BlockNumber targblock = BlockSampler_Next(&bs);
-#ifdef USE_PREFETCH
-		BlockNumber prefetch_targblock = InvalidBlockNumber;
-
-		/*
-		 * Make sure that every time the main BlockSampler is moved forward
-		 * that our prefetch BlockSampler also gets moved forward, so that we
-		 * always stay out ahead.
-		 */
-		if (prefetch_maximum && BlockSampler_HasMore(&prefetch_bs))
-			prefetch_targblock = BlockSampler_Next(&prefetch_bs);
-#endif
-
 		vacuum_delay_point();
 
-		heapam_scan_analyze_next_block(scan, targblock, vac_strategy);
-
-#ifdef USE_PREFETCH
-
-		/*
-		 * When pre-fetching, after we get a block, tell the kernel about the
-		 * next one we will want, if there's any left.
-		 */
-		if (prefetch_maximum && prefetch_targblock != InvalidBlockNumber)
-			PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, prefetch_targblock);
-#endif
+		if (!heapam_scan_analyze_next_block(scan, stream))
+			break;
 
 		while (heapam_scan_analyze_next_tuple(scan, OldestXmin, &liverows, &deadrows, slot))
 		{
@@ -1289,6 +1255,7 @@ acquire_sample_rows(Relation onerel, int elevel,
 		pgstat_progress_update_param(PROGRESS_ANALYZE_BLOCKS_DONE,
 									 ++blksdone);
 	}
+	read_stream_end(stream);
 
 	ExecDropSingleTupleTableSlot(slot);
 	heap_endscan(scan);
-- 
2.43.0

Reply via email to