On 3/16/26 18:49, Tomas Vondra wrote:
> On 3/16/26 18:14, Melanie Plageman wrote:
>> ...
>>
>> I see a couple more issues with the counting in read_stream.c.
>>
>> You are double-counting stalls for synchronous IO. You increment
>> stalls in read_stream_next_buffer() but we actually execute
>> synchronous IO in WaitReadBuffers and return needed_wait as true,
>> which will count a stall again.
>>
>> You are not counting fast path IOs because those don't go through
>> read_stream_start_pending_read() and instead are started directly by
>> StartReadBuffer() in read_stream_next_buffer(). Simple diff attached
>> should fix this.
>>
>> Also, per worker stats are not displayed when BUFFERS false is passed
>> even with IO true because of a small oversight. I fixed it in the
>> attached diff.
>>
> 
> Thanks!
> 

Here's a v2 with your fixes merged. No other changes.


regards

-- 
Tomas Vondra
From 0136bf1c9d5ba2563c306fb9ab5a8c7b8a0b0912 Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Tue, 3 Mar 2026 16:50:50 -0500
Subject: [PATCH v2 1/2] bufmgr: Return whether WaitReadBuffers() needed to
 wait

In a subsequent commit read_stream.c will use this as an input to the read
ahead distance.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/storage/buffer/bufmgr.c | 18 +++++++++++++++++-
 src/include/storage/bufmgr.h        |  2 +-
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 00bc609529a..c8ec7fb4f53 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1738,12 +1738,20 @@ ProcessReadBuffersResult(ReadBuffersOperation *operation)
 	Assert(operation->nblocks_done <= operation->nblocks);
 }
 
-void
+/*
+ * Wait for the IO operation initiated by StartReadBuffers() et al to
+ * complete.
+ *
+ * Returns whether the IO operation already had completed by the time of this
+ * call.
+ */
+bool
 WaitReadBuffers(ReadBuffersOperation *operation)
 {
 	PgAioReturn *aio_ret = &operation->io_return;
 	IOContext	io_context;
 	IOObject	io_object;
+	bool		needed_wait = false;
 
 	if (operation->persistence == RELPERSISTENCE_TEMP)
 	{
@@ -1805,6 +1813,7 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 				instr_time	io_start = pgstat_prepare_io_time(track_io_timing);
 
 				pgaio_wref_wait(&operation->io_wref);
+				needed_wait = true;
 
 				/*
 				 * The IO operation itself was already counted earlier, in
@@ -1835,6 +1844,12 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 
 		CHECK_FOR_INTERRUPTS();
 
+		/*
+		 * If the IO completed only partially, we need to perform additional
+		 * work, consider that a form of having had to wait.
+		 */
+		needed_wait = true;
+
 		/*
 		 * This may only complete the IO partially, either because some
 		 * buffers were already valid, or because of a partial read.
@@ -1851,6 +1866,7 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 	CheckReadBuffersOperation(operation, true);
 
 	/* NB: READ_DONE tracepoint was already executed in completion callback */
+	return needed_wait;
 }
 
 /*
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 4017896f951..a1ef04354dd 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -249,7 +249,7 @@ extern bool StartReadBuffers(ReadBuffersOperation *operation,
 							 BlockNumber blockNum,
 							 int *nblocks,
 							 int flags);
-extern void WaitReadBuffers(ReadBuffersOperation *operation);
+extern bool WaitReadBuffers(ReadBuffersOperation *operation);
 
 extern void ReleaseBuffer(Buffer buffer);
 extern void UnlockReleaseBuffer(Buffer buffer);
-- 
2.53.0

From 874af056687bb4a21f4b135c8fcb88a28de070b9 Mon Sep 17 00:00:00 2001
From: test <test>
Date: Thu, 12 Mar 2026 02:04:07 +0100
Subject: [PATCH v2 2/2] explain: show prefetch stats in EXPLAIN (ANALYZE,
 VERBOSE)

This adds details about AIO / prefetch for a number of executor nodes
using the ReadStream, notably:

- SeqScan
- BitmapHeapScan

The statistics is tracked by the ReadStream, and then propagated up
through the table AM interface.

The ReadStream tracks the statistics unconditionally, i.e. even outside
EXPLAIN ANALYZE etc. The amount of statistics is trivial (a handful of
integer counters), it's not worth gating this by a flag.

The TAM gets one new callback "scan_stats", to collect stats from the
scan (which fetch tuples from the TAM). There is also a new struct
TableScanStatsData/TableScanStats to separate the statistics from the
actual TAM implementation.

This required improving SeqScan to properly setup shared instrumentation
between the leader and workers, collect it at the end of a scan, etc.
---
 doc/src/sgml/ref/explain.sgml             |  12 +
 src/backend/access/heap/heapam.c          |  32 +++
 src/backend/access/heap/heapam_handler.c  |   1 +
 src/backend/commands/explain.c            | 266 +++++++++++++++++++++-
 src/backend/commands/explain_state.c      |  10 +
 src/backend/executor/execParallel.c       |   3 +
 src/backend/executor/nodeBitmapHeapscan.c |  16 ++
 src/backend/executor/nodeSeqscan.c        | 123 +++++++++-
 src/backend/storage/aio/read_stream.c     |  64 +++++-
 src/include/access/heapam.h               |   1 +
 src/include/access/relscan.h              |  27 +++
 src/include/access/tableam.h              |  17 ++
 src/include/commands/explain_state.h      |   1 +
 src/include/executor/instrument_node.h    |  45 +++-
 src/include/executor/nodeSeqscan.h        |   1 +
 src/include/nodes/execnodes.h             |   2 +
 src/include/storage/read_stream.h         |   3 +
 17 files changed, 617 insertions(+), 7 deletions(-)

diff --git a/doc/src/sgml/ref/explain.sgml b/doc/src/sgml/ref/explain.sgml
index 7dee77fd366..9da16c77d73 100644
--- a/doc/src/sgml/ref/explain.sgml
+++ b/doc/src/sgml/ref/explain.sgml
@@ -46,6 +46,7 @@ EXPLAIN [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] <rep
     TIMING [ <replaceable class="parameter">boolean</replaceable> ]
     SUMMARY [ <replaceable class="parameter">boolean</replaceable> ]
     MEMORY [ <replaceable class="parameter">boolean</replaceable> ]
+    IO [ <replaceable class="parameter">boolean</replaceable> ]
     FORMAT { TEXT | XML | JSON | YAML }
 </synopsis>
  </refsynopsisdiv>
@@ -295,6 +296,17 @@ ROLLBACK;
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>IO</literal></term>
+    <listitem>
+     <para>
+      Include information on I/O performed by each node.
+      This parameter may only be used when <literal>ANALYZE</literal> is also
+      enabled.  It defaults to <literal>FALSE</literal>.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>FORMAT</literal></term>
     <listitem>
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index e5bd062de77..b13f843cfad 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1416,6 +1416,38 @@ heap_endscan(TableScanDesc sscan)
 	pfree(scan);
 }
 
+/*
+ * heap_scan_stats
+ *		return stats collected by the read stream
+ *
+ * Returns NULL if the scan is not using a read stream.
+ */
+TableScanStats
+heap_scan_stats(TableScanDesc sscan)
+{
+	HeapScanDesc scan = (HeapScanDesc) sscan;
+	ReadStreamInstrumentation stats;
+	TableScanStats res;
+
+	if (!scan->rs_read_stream)
+		return NULL;
+
+	stats = read_stream_prefetch_stats(scan->rs_read_stream);
+
+	res = palloc0(sizeof(TableScanStatsData));
+
+	res->prefetch_count = stats.prefetch_count;
+	res->distance_sum = stats.distance_sum;
+	res->distance_max = stats.distance_max;
+	res->distance_capacity = stats.distance_capacity;
+	res->stall_count = stats.stall_count;
+	res->io_count = stats.io_count;
+	res->io_nblocks = stats.io_nblocks;
+	res->io_in_progress = stats.io_in_progress;
+
+	return res;
+}
+
 HeapTuple
 heap_getnext(TableScanDesc sscan, ScanDirection direction)
 {
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 253a735b6c1..51ac2164afd 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2650,6 +2650,7 @@ static const TableAmRoutine heapam_methods = {
 
 	.scan_set_tidrange = heap_set_tidrange,
 	.scan_getnextslot_tidrange = heap_getnextslot_tidrange,
+	.scan_stats = heap_scan_stats,
 
 	.parallelscan_estimate = table_block_parallelscan_estimate,
 	.parallelscan_initialize = table_block_parallelscan_initialize,
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 296ea8a1ed2..ad86fb752ca 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -13,6 +13,8 @@
  */
 #include "postgres.h"
 
+#include "access/genam.h"
+#include "access/tableam.h"
 #include "access/xact.h"
 #include "catalog/pg_type.h"
 #include "commands/createas.h"
@@ -139,6 +141,11 @@ static void show_hashagg_info(AggState *aggstate, ExplainState *es);
 static void show_indexsearches_info(PlanState *planstate, ExplainState *es);
 static void show_tidbitmap_info(BitmapHeapScanState *planstate,
 								ExplainState *es);
+static void show_scan_io_info(ScanState *planstate,
+							  ExplainState *es);
+static void show_worker_io_info(PlanState *planstate,
+								ExplainState *es,
+								int worker);
 static void show_instrumentation_count(const char *qlabel, int which,
 									   PlanState *planstate, ExplainState *es);
 static void show_foreignscan_info(ForeignScanState *fsstate, ExplainState *es);
@@ -2009,6 +2016,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
 				show_instrumentation_count("Rows Removed by Filter", 1,
 										   planstate, es);
 			show_tidbitmap_info((BitmapHeapScanState *) planstate, es);
+			show_scan_io_info((ScanState *) planstate, es);
 			break;
 		case T_SampleScan:
 			show_tablesample(((SampleScan *) plan)->tablesample,
@@ -2027,6 +2035,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
 										   planstate, es);
 			if (IsA(plan, CteScan))
 				show_ctescan_info(castNode(CteScanState, planstate), es);
+			show_scan_io_info((ScanState *) planstate, es);
 			break;
 		case T_Gather:
 			{
@@ -2297,8 +2306,8 @@ ExplainNode(PlanState *planstate, List *ancestors,
 	if (es->wal && planstate->instrument)
 		show_wal_usage(es, &planstate->instrument->walusage);
 
-	/* Prepare per-worker buffer/WAL usage */
-	if (es->workers_state && (es->buffers || es->wal) && es->verbose)
+	/* Prepare per-worker buffer/WAL/IO usage */
+	if (es->workers_state && (es->buffers || es->wal || es->io) && es->verbose)
 	{
 		WorkerInstrumentation *w = planstate->worker_instrument;
 
@@ -2315,6 +2324,10 @@ ExplainNode(PlanState *planstate, List *ancestors,
 				show_buffer_usage(es, &instrument->bufusage);
 			if (es->wal)
 				show_wal_usage(es, &instrument->walusage);
+
+			/* show prefetch info for the given worker */
+			show_worker_io_info(planstate, es, n);
+
 			ExplainCloseWorker(n, es);
 		}
 	}
@@ -3985,6 +3998,255 @@ show_tidbitmap_info(BitmapHeapScanState *planstate, ExplainState *es)
 	}
 }
 
+/*
+ * show_scan_io_info
+ *		show info about prefetching for a seq/bitmap scan
+ *
+ * Shows summary of stats for leader and workers (if any).
+ */
+static void
+show_scan_io_info(ScanState *planstate, ExplainState *es)
+{
+	Plan	   *plan = planstate->ps.plan;
+	TableScanStats	leader_stats;
+	TableScanStatsData	stats;
+
+	if (!es->io)
+		return;
+
+	/* scan not started, no prefetch stats */
+	if (!(planstate && planstate->ss_currentScanDesc))
+		return;
+
+	/* collect prefetch statistics from the read stream */
+	leader_stats = table_scan_stats(planstate->ss_currentScanDesc);
+
+	if (leader_stats)
+	{
+		memcpy(&stats, leader_stats, sizeof(TableScanStatsData));
+	}
+	else
+	{
+		memset(&stats, 0, sizeof(TableScanStatsData));
+	}
+
+	/* Initialize counters with stats from the local process first */
+	switch (nodeTag(plan))
+	{
+		case T_SeqScan:
+			{
+				SharedSeqScanInstrumentation *sinstrument
+					= ((SeqScanState *) planstate)->sinstrument;
+
+				/* get the sum of the counters set within each and every process */
+				if (sinstrument)
+				{
+					for (int i = 0; i < sinstrument->num_workers; ++i)
+					{
+						SeqScanInstrumentation *winstrument = &sinstrument->sinstrument[i];
+
+						stats.prefetch_count += winstrument->stream.prefetch_count;
+						stats.distance_sum += winstrument->stream.distance_sum;
+						if (winstrument->stream.distance_max > stats.distance_max)
+							stats.distance_max = winstrument->stream.distance_max;
+						if (winstrument->stream.distance_capacity > stats.distance_capacity)
+							stats.distance_capacity = winstrument->stream.distance_capacity;
+						stats.stall_count += winstrument->stream.stall_count;
+						stats.io_count += winstrument->stream.io_count;
+						stats.io_nblocks += winstrument->stream.io_nblocks;
+						stats.io_in_progress += winstrument->stream.io_in_progress;
+					}
+				}
+
+				break;
+			}
+		case T_BitmapHeapScan:
+			{
+				SharedBitmapHeapInstrumentation *sinstrument
+					= ((BitmapHeapScanState *) planstate)->sinstrument;
+
+				/* get the sum of the counters set within each and every process */
+				if (sinstrument)
+				{
+					for (int i = 0; i < sinstrument->num_workers; ++i)
+					{
+						BitmapHeapScanInstrumentation *winstrument = &sinstrument->sinstrument[i];
+
+						stats.prefetch_count += winstrument->stream.prefetch_count;
+						stats.distance_sum += winstrument->stream.distance_sum;
+						if (winstrument->stream.distance_max > stats.distance_max)
+							stats.distance_max = winstrument->stream.distance_max;
+						if (winstrument->stream.distance_capacity > stats.distance_capacity)
+							stats.distance_capacity = winstrument->stream.distance_capacity;
+						stats.stall_count += winstrument->stream.stall_count;
+						stats.io_count += winstrument->stream.io_count;
+						stats.io_nblocks += winstrument->stream.io_nblocks;
+						stats.io_in_progress += winstrument->stream.io_in_progress;
+					}
+				}
+
+				break;
+			}
+		default:
+			/* ignore other plans */
+			return;
+	}
+
+	/* don't print anything without prefetching */
+	if (stats.prefetch_count > 0)
+	{
+		if (es->format == EXPLAIN_FORMAT_TEXT)
+		{
+			/* prefetch distance info */
+			ExplainIndentText(es);
+			appendStringInfo(es->str, "Prefetch: avg=%.3f max=%d capacity=%d",
+							 (stats.distance_sum * 1.0 / stats.prefetch_count),
+							 stats.distance_max,
+							 stats.distance_capacity);
+			appendStringInfoChar(es->str, '\n');
+
+			/* prefetch I/O info (only if there were actual I/Os) */
+			if (stats.stall_count > 0 || stats.io_count > 0)
+			{
+				ExplainIndentText(es);
+				appendStringInfo(es->str, "I/O: stalls=%" PRIu64,
+								 stats.stall_count);
+
+				if (stats.io_count > 0)
+				{
+					appendStringInfo(es->str, " size=%.3f inprogress=%.3f",
+									 (stats.io_nblocks * 1.0 / stats.io_count),
+									 (stats.io_in_progress * 1.0 / stats.io_count));
+				}
+
+				appendStringInfoChar(es->str, '\n');
+			}
+		}
+		else
+		{
+			ExplainOpenGroup("Prefetch", "I/O", true, es);
+
+			ExplainPropertyFloat("Average Distance", NULL,
+								 (stats.distance_sum * 1.0 / stats.prefetch_count), 3, es);
+			ExplainPropertyInteger("Max Distance", NULL,
+								   stats.distance_max, es);
+			ExplainPropertyInteger("Capacity", NULL,
+								   stats.distance_capacity, es);
+			ExplainPropertyUInteger("Stalls", NULL,
+									stats.stall_count, es);
+
+			if (stats.io_count > 0)
+			{
+				ExplainPropertyFloat("Average IO Size", NULL,
+									 (stats.io_nblocks * 1.0 / stats.io_count), 3, es);
+				ExplainPropertyFloat("Average IOs In Progress", NULL,
+									 (stats.io_in_progress * 1.0 / stats.io_count), 3, es);
+			}
+
+			ExplainCloseGroup("Prefetch", "I/O", true, es);
+		}
+	}
+}
+
+/*
+ * show_io_worker_info
+ *		show info about prefetching for a single worker
+ *
+ * Shows prefetching stats for a parallel scan worker.
+ */
+static void
+show_worker_io_info(PlanState *planstate, ExplainState *es, int worker)
+{
+	Plan	   *plan = planstate->plan;
+	ReadStreamInstrumentation *stats = NULL;
+
+	if (!es->io)
+		return;
+
+	/* get instrumentation for the given worker */
+	switch (nodeTag(plan))
+	{
+		case T_BitmapHeapScan:
+			{
+				BitmapHeapScanState *state = ((BitmapHeapScanState *) planstate);
+				SharedBitmapHeapInstrumentation *sinstrument = state->sinstrument;
+				BitmapHeapScanInstrumentation *instrument = &sinstrument->sinstrument[worker];
+
+				stats = &instrument->stream;
+
+				break;
+			}
+		case T_SeqScan:
+			{
+				SeqScanState *state = ((SeqScanState *) planstate);
+				SharedSeqScanInstrumentation *sinstrument = state->sinstrument;
+				SeqScanInstrumentation *instrument = &sinstrument->sinstrument[worker];
+
+				stats = &instrument->stream;
+
+				break;
+			}
+		default:
+			/* ignore other plans */
+			return;
+	}
+
+	/* don't print stats if there's nothing to report */
+	if (stats->prefetch_count > 0)
+	{
+		if (es->format == EXPLAIN_FORMAT_TEXT)
+		{
+			/* prefetch distance info */
+			ExplainIndentText(es);
+			appendStringInfo(es->str, "Prefetch: avg=%.3f max=%d capacity=%d",
+							 (stats->distance_sum * 1.0 / stats->prefetch_count),
+							 stats->distance_max,
+							 stats->distance_capacity);
+			appendStringInfoChar(es->str, '\n');
+
+			/* prefetch I/O info (only if there were actual I/Os) */
+			if (stats->stall_count > 0 || stats->io_count > 0)
+			{
+				ExplainIndentText(es);
+				appendStringInfo(es->str, "I/O: stalls=%" PRIu64,
+								 stats->stall_count);
+
+				if (stats->io_count > 0)
+				{
+					appendStringInfo(es->str, " size=%.3f inprogress=%.3f",
+									 (stats->io_nblocks * 1.0 / stats->io_count),
+									 (stats->io_in_progress * 1.0 / stats->io_count));
+				}
+
+				appendStringInfoChar(es->str, '\n');
+			}
+		}
+		else
+		{
+			ExplainOpenGroup("Prefetch", "I/O", true, es);
+
+			ExplainPropertyFloat("Average Distance", NULL,
+								 (stats->distance_sum * 1.0 / stats->prefetch_count), 3, es);
+			ExplainPropertyInteger("Max Distance", NULL,
+								   stats->distance_max, es);
+			ExplainPropertyInteger("Capacity", NULL,
+								   stats->distance_capacity, es);
+			ExplainPropertyUInteger("Stalls", NULL,
+									stats->stall_count, es);
+
+			if (stats->io_count > 0)
+			{
+				ExplainPropertyFloat("Average IO Size", NULL,
+									 (stats->io_nblocks * 1.0 / stats->io_count), 3, es);
+				ExplainPropertyFloat("Average IOs In Progress", NULL,
+									 (stats->io_in_progress * 1.0 / stats->io_count), 3, es);
+			}
+
+			ExplainCloseGroup("Prefetch", "I/O", true, es);
+		}
+	}
+}
+
 /*
  * If it's EXPLAIN ANALYZE, show instrumentation information for a plan node
  *
diff --git a/src/backend/commands/explain_state.c b/src/backend/commands/explain_state.c
index 77f59b8e500..f5cbdd21aaa 100644
--- a/src/backend/commands/explain_state.c
+++ b/src/backend/commands/explain_state.c
@@ -159,6 +159,10 @@ ParseExplainOptionList(ExplainState *es, List *options, ParseState *pstate)
 								"EXPLAIN", opt->defname, p),
 						 parser_errposition(pstate, opt->location)));
 		}
+		else if (strcmp(opt->defname, "io") == 0)
+		{
+			es->io = defGetBoolean(opt);
+		}
 		else if (!ApplyExtensionExplainOption(es, opt, pstate))
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -185,6 +189,12 @@ ParseExplainOptionList(ExplainState *es, List *options, ParseState *pstate)
 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 				 errmsg("EXPLAIN option %s requires ANALYZE", "TIMING")));
 
+	/* check that IO is used with EXPLAIN ANALYZE */
+	if (es->io && !es->analyze)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("EXPLAIN option %s requires ANALYZE", "IO")));
+
 	/* check that serialize is used with EXPLAIN ANALYZE */
 	if (es->serialize != EXPLAIN_SERIALIZE_NONE && !es->analyze)
 		ereport(ERROR,
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index ac84af294c9..ffc708ed6be 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -1119,6 +1119,9 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
 		case T_BitmapHeapScanState:
 			ExecBitmapHeapRetrieveInstrumentation((BitmapHeapScanState *) planstate);
 			break;
+		case T_SeqScanState:
+			ExecSeqScanRetrieveInstrumentation((SeqScanState *) planstate);
+			break;
 		default:
 			break;
 	}
diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c
index 7cf8d23c742..bf9af7596ce 100644
--- a/src/backend/executor/nodeBitmapHeapscan.c
+++ b/src/backend/executor/nodeBitmapHeapscan.c
@@ -315,6 +315,7 @@ ExecEndBitmapHeapScan(BitmapHeapScanState *node)
 	if (node->sinstrument != NULL && IsParallelWorker())
 	{
 		BitmapHeapScanInstrumentation *si;
+		TableScanStats stats;
 
 		Assert(ParallelWorkerNumber < node->sinstrument->num_workers);
 		si = &node->sinstrument->sinstrument[ParallelWorkerNumber];
@@ -328,6 +329,21 @@ ExecEndBitmapHeapScan(BitmapHeapScanState *node)
 		 */
 		si->exact_pages += node->stats.exact_pages;
 		si->lossy_pages += node->stats.lossy_pages;
+
+		/* collect prefetch info for this process from the read_stream */
+		if ((stats = table_scan_stats(node->ss.ss_currentScanDesc)) != NULL)
+		{
+			si->stream.prefetch_count += stats->prefetch_count;
+			si->stream.distance_sum += stats->distance_sum;
+			if (stats->distance_max > si->stream.distance_max)
+				si->stream.distance_max = stats->distance_max;
+			if (stats->distance_capacity > si->stream.distance_capacity)
+				si->stream.distance_capacity = stats->distance_capacity;
+			si->stream.stall_count += stats->stall_count;
+			si->stream.io_count += stats->io_count;
+			si->stream.io_nblocks += stats->io_nblocks;
+			si->stream.io_in_progress += stats->io_in_progress;
+		}
 	}
 
 	/*
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 8f219f60a93..7992946a5be 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -295,6 +295,43 @@ ExecEndSeqScan(SeqScanState *node)
 {
 	TableScanDesc scanDesc;
 
+	/*
+	 * When ending a parallel worker, copy the statistics gathered by the
+	 * worker back into shared memory so that it can be picked up by the main
+	 * process to report in EXPLAIN ANALYZE.
+	 */
+	if (node->sinstrument != NULL && IsParallelWorker())
+	{
+		SeqScanInstrumentation *si;
+		TableScanStats stats;
+
+		Assert(ParallelWorkerNumber < node->sinstrument->num_workers);
+		si = &node->sinstrument->sinstrument[ParallelWorkerNumber];
+
+		/*
+		 * Here we accumulate the stats rather than performing memcpy on
+		 * node->stats into si.  When a Gather/GatherMerge node finishes it
+		 * will perform planner shutdown on the workers.  On rescan it will
+		 * spin up new workers which will have a new SeqScanState and
+		 * zeroed stats.
+		 */
+
+		/* collect prefetch info for this process from the read_stream */
+		if ((stats = table_scan_stats(node->ss.ss_currentScanDesc)) != NULL)
+		{
+			si->stream.prefetch_count += stats->prefetch_count;
+			si->stream.distance_sum += stats->distance_sum;
+			if (stats->distance_max > si->stream.distance_max)
+				si->stream.distance_max = stats->distance_max;
+			if (stats->distance_capacity > si->stream.distance_capacity)
+				si->stream.distance_capacity = stats->distance_capacity;
+			si->stream.stall_count += stats->stall_count;
+			si->stream.io_count += stats->io_count;
+			si->stream.io_nblocks += stats->io_nblocks;
+			si->stream.io_in_progress += stats->io_in_progress;
+		}
+	}
+
 	/*
 	 * get information from node
 	 */
@@ -349,10 +386,23 @@ ExecSeqScanEstimate(SeqScanState *node,
 					ParallelContext *pcxt)
 {
 	EState	   *estate = node->ss.ps.state;
+	Size		size;
 
-	node->pscan_len = table_parallelscan_estimate(node->ss.ss_currentRelation,
+	size = table_parallelscan_estimate(node->ss.ss_currentRelation,
 												  estate->es_snapshot);
-	shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
+	node->pscan_len = size;
+
+	/* make sure the instrumentation is properly aligned */
+	size = MAXALIGN(size);
+
+	/* account for instrumentation, if required */
+	if (node->ss.ps.instrument && pcxt->nworkers > 0)
+	{
+		size = add_size(size, offsetof(SharedSeqScanInstrumentation, sinstrument));
+		size = add_size(size, mul_size(pcxt->nworkers, sizeof(SeqScanInstrumentation)));
+	}
+
+	shm_toc_estimate_chunk(&pcxt->estimator, size);
 	shm_toc_estimate_keys(&pcxt->estimator, 1);
 }
 
@@ -368,14 +418,42 @@ ExecSeqScanInitializeDSM(SeqScanState *node,
 {
 	EState	   *estate = node->ss.ps.state;
 	ParallelTableScanDesc pscan;
+	SharedSeqScanInstrumentation *sinstrument = NULL;
+	Size		size;
+	char	   *ptr;
+
+	/* Recalculate the size. This needs to match ExecSeqScanEstimate. */
+	size = MAXALIGN(node->pscan_len);
+	if (node->ss.ps.instrument && pcxt->nworkers > 0)
+	{
+		size = add_size(size, offsetof(SharedSeqScanInstrumentation, sinstrument));
+		size = add_size(size, mul_size(pcxt->nworkers, sizeof(SeqScanInstrumentation)));
+	}
 
-	pscan = shm_toc_allocate(pcxt->toc, node->pscan_len);
+	pscan = shm_toc_allocate(pcxt->toc, size);
 	table_parallelscan_initialize(node->ss.ss_currentRelation,
 								  pscan,
 								  estate->es_snapshot);
 	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
 	node->ss.ss_currentScanDesc =
 		table_beginscan_parallel(node->ss.ss_currentRelation, pscan);
+
+	/* initialize the shared instrumentation (with correct alignment) */
+	ptr = (char *) pscan;
+	ptr += MAXALIGN(node->pscan_len);
+	if (node->ss.ps.instrument && pcxt->nworkers > 0)
+		sinstrument = (SharedSeqScanInstrumentation *) ptr;
+
+	if (sinstrument)
+	{
+		sinstrument->num_workers = pcxt->nworkers;
+
+		/* ensure any unfilled slots will contain zeroes */
+		memset(sinstrument->sinstrument, 0,
+			   pcxt->nworkers * sizeof(SeqScanInstrumentation));
+	}
+
+	node->sinstrument = sinstrument;
 }
 
 /* ----------------------------------------------------------------
@@ -404,9 +482,48 @@ void
 ExecSeqScanInitializeWorker(SeqScanState *node,
 							ParallelWorkerContext *pwcxt)
 {
+	EState	   *estate = node->ss.ps.state;
 	ParallelTableScanDesc pscan;
+	char	   *ptr;
+	Size		size;
 
 	pscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
 	node->ss.ss_currentScanDesc =
 		table_beginscan_parallel(node->ss.ss_currentRelation, pscan);
+
+	/*
+	 * Workers don't get the pscan_len value in scan descriptor, so use the
+	 * TAM callback again. The result has to match the earlier result in
+	 * ExecSeqScanEstimate.
+	 */
+	size = table_parallelscan_estimate(node->ss.ss_currentRelation,
+									   estate->es_snapshot);
+
+	ptr = (char *) pscan;
+	ptr += MAXALIGN(size);
+
+	if (node->ss.ps.instrument)
+		node->sinstrument = (SharedSeqScanInstrumentation *) ptr;
+}
+
+/* ----------------------------------------------------------------
+ *		ExecSeqScanRetrieveInstrumentation
+ *
+ *		Transfer seq scan statistics from DSM to private memory.
+ * ----------------------------------------------------------------
+ */
+void
+ExecSeqScanRetrieveInstrumentation(SeqScanState *node)
+{
+	SharedSeqScanInstrumentation *sinstrument = node->sinstrument;
+	Size		size;
+
+	if (sinstrument == NULL)
+		return;
+
+	size = offsetof(SharedSeqScanInstrumentation, sinstrument)
+		+ sinstrument->num_workers * sizeof(SeqScanInstrumentation);
+
+	node->sinstrument = palloc(size);
+	memcpy(node->sinstrument, sinstrument, size);
 }
diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index cd54c1a74ac..f560047afd6 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -72,6 +72,7 @@
 #include "postgres.h"
 
 #include "miscadmin.h"
+#include "executor/instrument_node.h"
 #include "storage/aio.h"
 #include "storage/fd.h"
 #include "storage/smgr.h"
@@ -107,6 +108,9 @@ struct ReadStream
 	bool		advice_enabled;
 	bool		temporary;
 
+	/* stats counters */
+	ReadStreamInstrumentation stats;
+
 	/*
 	 * One-block buffer to support 'ungetting' a block number, to resolve flow
 	 * control problems when I/Os are split.
@@ -172,6 +176,38 @@ block_range_read_stream_cb(ReadStream *stream,
 	return InvalidBlockNumber;
 }
 
+/*
+ * read_stream_update_stats_prefetch
+ *		update read_stream stats with current pinned buffer depth
+ *
+ * Called once per buffer returned to the consumer in read_stream_next_buffer().
+ * Records the number of pinned buffers at that moment, so we can compute the
+ * average look-ahead depth.
+ */
+static inline void
+read_stream_update_stats_prefetch(ReadStream *stream)
+{
+	stream->stats.prefetch_count++;
+	stream->stats.distance_sum += stream->pinned_buffers;
+	if (stream->pinned_buffers > stream->stats.distance_max)
+		stream->stats.distance_max = stream->pinned_buffers;
+}
+
+/*
+ * read_stream_update_stats_io
+ *		update read_stream stats about size of I/O requests
+ *
+ * We count the number of I/O requests, size of requests (counted in blocks)
+ * and number of in-progress I/Os.
+ */
+static inline void
+read_stream_update_stats_io(ReadStream *stream, int nblocks, int in_progress)
+{
+	stream->stats.io_count++;
+	stream->stats.io_nblocks += nblocks;
+	stream->stats.io_in_progress += in_progress;
+}
+
 /*
  * Ask the callback which block it would like us to read next, with a one block
  * buffer in front to allow read_stream_unget_block() to work.
@@ -380,6 +416,9 @@ read_stream_start_pending_read(ReadStream *stream)
 		Assert(stream->ios_in_progress < stream->max_ios);
 		stream->ios_in_progress++;
 		stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
+
+		/* update I/O stats */
+		read_stream_update_stats_io(stream, nblocks, stream->ios_in_progress);
 	}
 
 	/*
@@ -703,6 +742,10 @@ read_stream_begin_impl(int flags,
 	stream->seq_until_processed = InvalidBlockNumber;
 	stream->temporary = SmgrIsTemp(smgr);
 
+	/* zero the stats, then set capacity */
+	memset(&stream->stats, 0, sizeof(ReadStreamInstrumentation));
+	stream->stats.distance_capacity = max_pinned_buffers;
+
 	/*
 	 * Skip the initial ramp-up phase if the caller says we're going to be
 	 * reading the whole relation.  This way we start out assuming we'll be
@@ -851,6 +894,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 										flags)))
 			{
 				/* Fast return. */
+				read_stream_update_stats_prefetch(stream);
 				return buffer;
 			}
 
@@ -860,6 +904,9 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 			stream->ios_in_progress = 1;
 			stream->ios[0].buffer_index = oldest_buffer_index;
 			stream->seq_blocknum = next_blocknum + 1;
+
+			/* update I/O stats */
+			read_stream_update_stats_io(stream, 1, stream->ios_in_progress);
 		}
 		else
 		{
@@ -871,6 +918,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		}
 
 		stream->fast_path = false;
+		read_stream_update_stats_prefetch(stream);
 		return buffer;
 	}
 #endif
@@ -916,12 +964,17 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 	{
 		int16		io_index = stream->oldest_io_index;
 		int32		distance;	/* wider temporary value, clamped below */
+		bool		needed_wait;
 
 		/* Sanity check that we still agree on the buffers. */
 		Assert(stream->ios[io_index].op.buffers ==
 			   &stream->buffers[oldest_buffer_index]);
 
-		WaitReadBuffers(&stream->ios[io_index].op);
+		needed_wait = WaitReadBuffers(&stream->ios[io_index].op);
+
+		/* Count it as a stall if we need to wait for IO */
+		if (needed_wait)
+			stream->stats.stall_count += 1;
 
 		Assert(stream->ios_in_progress > 0);
 		stream->ios_in_progress--;
@@ -981,6 +1034,8 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 	}
 #endif
 
+	read_stream_update_stats_prefetch(stream);
+
 	/* Pin transferred to caller. */
 	Assert(stream->pinned_buffers > 0);
 	stream->pinned_buffers--;
@@ -1118,3 +1173,10 @@ read_stream_end(ReadStream *stream)
 	read_stream_reset(stream);
 	pfree(stream);
 }
+
+/* return the prefetch stats for the read_stream */
+ReadStreamInstrumentation
+read_stream_prefetch_stats(ReadStream *stream)
+{
+	return stream->stats;
+}
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 2fdc50b865b..81dc0b7521d 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -359,6 +359,7 @@ extern void heap_prepare_pagescan(TableScanDesc sscan);
 extern void heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
 						bool allow_strat, bool allow_sync, bool allow_pagemode);
 extern void heap_endscan(TableScanDesc sscan);
+extern TableScanStats heap_scan_stats(TableScanDesc sscan);
 extern HeapTuple heap_getnext(TableScanDesc sscan, ScanDirection direction);
 extern bool heap_getnextslot(TableScanDesc sscan,
 							 ScanDirection direction, TupleTableSlot *slot);
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index ce340c076f8..cada9f28bd8 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -114,6 +114,33 @@ typedef struct ParallelBlockTableScanWorkerData
 } ParallelBlockTableScanWorkerData;
 typedef struct ParallelBlockTableScanWorkerData *ParallelBlockTableScanWorker;
 
+/*
+ * Generic prefetch stats for table scans.
+ */
+typedef struct TableScanStatsData
+{
+	/* number of buffers returned to consumer (for averaging distance) */
+	uint64		prefetch_count;
+
+	/* sum of pinned_buffers sampled at each buffer return */
+	uint64		distance_sum;
+
+	/* maximum actual pinned_buffers observed during the scan */
+	int16		distance_max;
+
+	/* maximum possible look-ahead distance (max_pinned_buffers) */
+	int16		distance_capacity;
+
+	/* number of stalled reads (waiting for I/O) */
+	uint64		stall_count;
+
+	/* I/O stats */
+	uint64		io_count;		/* number of I/Os */
+	uint64		io_nblocks;		/* sum of blocks for all I/Os */
+	uint64		io_in_progress;	/* sum of in-progress I/Os */
+} TableScanStatsData;
+typedef struct TableScanStatsData *TableScanStats;
+
 /*
  * Base class for fetches from a table via an index. This is the base-class
  * for such scans, which needs to be embedded in the respective struct for
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 06084752245..ab84b76443c 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -380,6 +380,11 @@ typedef struct TableAmRoutine
 											  ScanDirection direction,
 											  TupleTableSlot *slot);
 
+	/*
+	 * Collect statistics about table scan.
+	 */
+	TableScanStats		(*scan_stats) (TableScanDesc scan);
+
 	/* ------------------------------------------------------------------------
 	 * Parallel table scan related functions.
 	 * ------------------------------------------------------------------------
@@ -1006,6 +1011,18 @@ table_endscan(TableScanDesc scan)
 	scan->rs_rd->rd_tableam->scan_end(scan);
 }
 
+/*
+ * Fetch statistics about table scan.
+ */
+static inline TableScanStats
+table_scan_stats(TableScanDesc scan)
+{
+	if (scan->rs_rd->rd_tableam->scan_stats)
+		return scan->rs_rd->rd_tableam->scan_stats(scan);
+
+	return NULL;
+}
+
 /*
  * Restart a relation scan.
  */
diff --git a/src/include/commands/explain_state.h b/src/include/commands/explain_state.h
index 5a48bc6fbb1..b412f00f70c 100644
--- a/src/include/commands/explain_state.h
+++ b/src/include/commands/explain_state.h
@@ -55,6 +55,7 @@ typedef struct ExplainState
 	bool		summary;		/* print total planning and execution timing */
 	bool		memory;			/* print planner's memory usage information */
 	bool		settings;		/* print modified settings */
+	bool		io;				/* print info about IO (prefetch, ...) */
 	bool		generic;		/* generate a generic plan */
 	ExplainSerializeOption serialize;	/* serialize the query's output? */
 	ExplainFormat format;		/* output format */
diff --git a/src/include/executor/instrument_node.h b/src/include/executor/instrument_node.h
index 8847d7f94fa..d94dbbc917d 100644
--- a/src/include/executor/instrument_node.h
+++ b/src/include/executor/instrument_node.h
@@ -41,9 +41,51 @@ typedef struct SharedAggInfo
 
 
 /* ---------------------
- *	Instrumentation information for indexscans (amgettuple and amgetbitmap)
+ *	Instrumentation information about read streams
  * ---------------------
  */
+typedef struct ReadStreamInstrumentation
+{
+	/* number of buffers returned to consumer (for averaging distance) */
+	uint64		prefetch_count;
+
+	/* sum of pinned_buffers sampled at each buffer return */
+	uint64		distance_sum;
+
+	/* maximum actual pinned_buffers observed during the scan */
+	int16		distance_max;
+
+	/* maximum possible look-ahead distance (max_pinned_buffers) */
+	int16		distance_capacity;
+
+	/* number of stalled reads (waiting for I/O) */
+	uint64		stall_count;
+
+	/* I/O stats */
+	uint64		io_count;		/* number of I/Os */
+	uint64		io_nblocks;		/* sum of blocks for all I/Os */
+	uint64		io_in_progress;	/* sum of in-progress I/Os */
+} ReadStreamInstrumentation;
+
+
+/* ---------------------
+ *	Instrumentation information for sequential scans
+ * ---------------------
+ */
+typedef struct SeqScanInstrumentation
+{
+	ReadStreamInstrumentation	stream;
+} SeqScanInstrumentation;
+
+/*
+ * Shared memory container for per-worker information
+ */
+typedef struct SharedSeqScanInstrumentation
+{
+	int			num_workers;
+	SeqScanInstrumentation sinstrument[FLEXIBLE_ARRAY_MEMBER];
+} SharedSeqScanInstrumentation;
+
 typedef struct IndexScanInstrumentation
 {
 	/* Index search count (incremented with pgstat_count_index_scan call) */
@@ -71,6 +113,7 @@ typedef struct BitmapHeapScanInstrumentation
 {
 	uint64		exact_pages;
 	uint64		lossy_pages;
+	ReadStreamInstrumentation	stream;
 } BitmapHeapScanInstrumentation;
 
 /*
diff --git a/src/include/executor/nodeSeqscan.h b/src/include/executor/nodeSeqscan.h
index 7a1490596fb..e2122bfffe3 100644
--- a/src/include/executor/nodeSeqscan.h
+++ b/src/include/executor/nodeSeqscan.h
@@ -27,5 +27,6 @@ extern void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt);
 extern void ExecSeqScanReInitializeDSM(SeqScanState *node, ParallelContext *pcxt);
 extern void ExecSeqScanInitializeWorker(SeqScanState *node,
 										ParallelWorkerContext *pwcxt);
+extern void ExecSeqScanRetrieveInstrumentation(SeqScanState *node);
 
 #endif							/* NODESEQSCAN_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 0716c5a9aed..96a8a75031f 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1644,6 +1644,8 @@ typedef struct SeqScanState
 {
 	ScanState	ss;				/* its first field is NodeTag */
 	Size		pscan_len;		/* size of parallel heap scan descriptor */
+	SeqScanInstrumentation	stats;
+	SharedSeqScanInstrumentation *sinstrument;
 } SeqScanState;
 
 /* ----------------
diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h
index c9359b29b0f..ec396db5369 100644
--- a/src/include/storage/read_stream.h
+++ b/src/include/storage/read_stream.h
@@ -65,6 +65,7 @@
 
 struct ReadStream;
 typedef struct ReadStream ReadStream;
+typedef struct ReadStreamInstrumentation ReadStreamInstrumentation;
 
 /* for block_range_read_stream_cb */
 typedef struct BlockRangeReadStreamPrivate
@@ -104,4 +105,6 @@ extern void read_stream_resume(ReadStream *stream);
 extern void read_stream_reset(ReadStream *stream);
 extern void read_stream_end(ReadStream *stream);
 
+extern ReadStreamInstrumentation read_stream_prefetch_stats(ReadStream *stream);
+
 #endif							/* READ_STREAM_H */
-- 
2.53.0

Reply via email to