Hi,

On 2026-03-17 18:51:15 +0100, Tomas Vondra wrote:
> Subject: [PATCH v2 2/2] explain: show prefetch stats in EXPLAIN (ANALYZE,
>  VERBOSE)
>
> This adds details about AIO / prefetch for a number of executor nodes
> using the ReadStream, notably:
>
> - SeqScan
> - BitmapHeapScan
>
> The statistics is tracked by the ReadStream, and then propagated up
> through the table AM interface.
>
> The ReadStream tracks the statistics unconditionally, i.e. even outside
> EXPLAIN ANALYZE etc. The amount of statistics is trivial (a handful of
> integer counters), it's not worth gating this by a flag.
>
> The TAM gets one new callback "scan_stats", to collect stats from the
> scan (which fetch tuples from the TAM). There is also a new struct
> TableScanStatsData/TableScanStats to separate the statistics from the
> actual TAM implementation.

It's not really clear to me that this need to go through the tableam
interface.  All the accesses already happen within AM code, it seems we could
just populate fields in TableScanDesc TableScanDesc->rs_flags indicates that
that is desired.

That seems like it might be nicer, because

a) there could be multiple read stream instances within a scan
b) we will need one such callback for each different type of scan - so we
   could just as well do that within the scan datastructure


Showing any such detail in explain will always be a bit of a piercing of
abstraction layers, I'm not particularly concerned with having optionally
fillable information in *ScanDesc that some AM might not want in that
shape. It seems unlikely that a lot of other architectural decisions would be
based on that, so we can just evolve this in the future.


> +             case T_SeqScan:
> +                     {
> +                             SharedSeqScanInstrumentation *sinstrument
> +                                     = ((SeqScanState *) 
> planstate)->sinstrument;
> +
> +                             /* get the sum of the counters set within each 
> and every process */
> +                             if (sinstrument)
> +                             {
> +                                     for (int i = 0; i < 
> sinstrument->num_workers; ++i)
> +                                     {
> +                                             SeqScanInstrumentation 
> *winstrument = &sinstrument->sinstrument[i];
> +
> +                                             stats.prefetch_count += 
> winstrument->stream.prefetch_count;
> +                                             stats.distance_sum += 
> winstrument->stream.distance_sum;
> +                                             if 
> (winstrument->stream.distance_max > stats.distance_max)
> +                                                     stats.distance_max = 
> winstrument->stream.distance_max;
> +                                             if 
> (winstrument->stream.distance_capacity > stats.distance_capacity)
> +                                                     stats.distance_capacity 
> = winstrument->stream.distance_capacity;
> +                                             stats.stall_count += 
> winstrument->stream.stall_count;
> +                                             stats.io_count += 
> winstrument->stream.io_count;
> +                                             stats.io_nblocks += 
> winstrument->stream.io_nblocks;
> +                                             stats.io_in_progress += 
> winstrument->stream.io_in_progress;
> +                                     }
> +                             }
> +
> +                             break;
> +                     }
> +             case T_BitmapHeapScan:
> +                     {
> +                             SharedBitmapHeapInstrumentation *sinstrument
> +                                     = ((BitmapHeapScanState *) 
> planstate)->sinstrument;
> +
> +                             /* get the sum of the counters set within each 
> and every process */
> +                             if (sinstrument)
> +                             {
> +                                     for (int i = 0; i < 
> sinstrument->num_workers; ++i)
> +                                     {
> +                                             BitmapHeapScanInstrumentation 
> *winstrument = &sinstrument->sinstrument[i];
> +
> +                                             stats.prefetch_count += 
> winstrument->stream.prefetch_count;
> +                                             stats.distance_sum += 
> winstrument->stream.distance_sum;
> +                                             if 
> (winstrument->stream.distance_max > stats.distance_max)
> +                                                     stats.distance_max = 
> winstrument->stream.distance_max;
> +                                             if 
> (winstrument->stream.distance_capacity > stats.distance_capacity)
> +                                                     stats.distance_capacity 
> = winstrument->stream.distance_capacity;
> +                                             stats.stall_count += 
> winstrument->stream.stall_count;
> +                                             stats.io_count += 
> winstrument->stream.io_count;
> +                                             stats.io_nblocks += 
> winstrument->stream.io_nblocks;
> +                                             stats.io_in_progress += 
> winstrument->stream.io_in_progress;

It doesn't seem good to duplicate these fairly large blocks.  Can't that be in
a helper?





> +             if (es->format == EXPLAIN_FORMAT_TEXT)
> +             {
> +                     /* prefetch distance info */
> +                     ExplainIndentText(es);
> +                     appendStringInfo(es->str, "Prefetch: avg=%.3f max=%d 
> capacity=%d",
> +                                                      (stats.distance_sum * 
> 1.0 / stats.prefetch_count),
> +                                                      stats.distance_max,
> +                                                      
> stats.distance_capacity);
> +                     appendStringInfoChar(es->str, '\n');
> +
> +                     /* prefetch I/O info (only if there were actual I/Os) */
> +                     if (stats.stall_count > 0 || stats.io_count > 0)
> +                     {

Cann there be stalls without IO? Why check both?


> +                             ExplainIndentText(es);
> +                             appendStringInfo(es->str, "I/O: stalls=%" 
> PRIu64,
> +                                                              
> stats.stall_count);
> +
> +                             if (stats.io_count > 0)
> +                             {
> +                                     appendStringInfo(es->str, " size=%.3f 
> inprogress=%.3f",
> +                                                                      
> (stats.io_nblocks * 1.0 / stats.io_count),
> +                                                                      
> (stats.io_in_progress * 1.0 / stats.io_count));
> +                             }
> +
> +                             appendStringInfoChar(es->str, '\n');
> +                     }
> +             }
> +             else
> +             {
> +                     ExplainOpenGroup("Prefetch", "I/O", true, es);
> +
> +                     ExplainPropertyFloat("Average Distance", NULL,
> +                                                              
> (stats.distance_sum * 1.0 / stats.prefetch_count), 3, es);
> +                     ExplainPropertyInteger("Max Distance", NULL,
> +                                                                
> stats.distance_max, es);
> +                     ExplainPropertyInteger("Capacity", NULL,
> +                                                                
> stats.distance_capacity, es);
> +                     ExplainPropertyUInteger("Stalls", NULL,
> +                                                                     
> stats.stall_count, es);
> +
> +                     if (stats.io_count > 0)
> +                     {
> +                             ExplainPropertyFloat("Average IO Size", NULL,
> +                                                                      
> (stats.io_nblocks * 1.0 / stats.io_count), 3, es);
> +                             ExplainPropertyFloat("Average IOs In Progress", 
> NULL,
> +                                                                      
> (stats.io_in_progress * 1.0 / stats.io_count), 3, es);
> +                     }
> +
> +                     ExplainCloseGroup("Prefetch", "I/O", true, es);
> +             }
> +     }
> +}
> +
> +/*
> + * show_io_worker_info
> + *           show info about prefetching for a single worker
> + *
> + * Shows prefetching stats for a parallel scan worker.
> + */
> +static void
> +show_worker_io_info(PlanState *planstate, ExplainState *es, int worker)
> +{
> +     Plan       *plan = planstate->plan;
> +     ReadStreamInstrumentation *stats = NULL;
> +
> +     if (!es->io)
> +             return;
> +
> +     /* get instrumentation for the given worker */
> +     switch (nodeTag(plan))
> +     {
> +             case T_BitmapHeapScan:
> +                     {
> +                             BitmapHeapScanState *state = 
> ((BitmapHeapScanState *) planstate);
> +                             SharedBitmapHeapInstrumentation *sinstrument = 
> state->sinstrument;
> +                             BitmapHeapScanInstrumentation *instrument = 
> &sinstrument->sinstrument[worker];
> +
> +                             stats = &instrument->stream;
> +
> +                             break;
> +                     }
> +             case T_SeqScan:
> +                     {
> +                             SeqScanState *state = ((SeqScanState *) 
> planstate);
> +                             SharedSeqScanInstrumentation *sinstrument = 
> state->sinstrument;
> +                             SeqScanInstrumentation *instrument = 
> &sinstrument->sinstrument[worker];
> +
> +                             stats = &instrument->stream;
> +
> +                             break;
> +                     }
> +             default:
> +                     /* ignore other plans */
> +                     return;
> +     }
> +
> +     /* don't print stats if there's nothing to report */
> +     if (stats->prefetch_count > 0)
> +     {
> +             if (es->format == EXPLAIN_FORMAT_TEXT)
> +             {
> +                     /* prefetch distance info */
> +                     ExplainIndentText(es);
> +                     appendStringInfo(es->str, "Prefetch: avg=%.3f max=%d 
> capacity=%d",
> +                                                      (stats->distance_sum * 
> 1.0 / stats->prefetch_count),
> +                                                      stats->distance_max,
> +                                                      
> stats->distance_capacity);
> +                     appendStringInfoChar(es->str, '\n');
> +
> +                     /* prefetch I/O info (only if there were actual I/Os) */
> +                     if (stats->stall_count > 0 || stats->io_count > 0)
> +                     {
> +                             ExplainIndentText(es);
> +                             appendStringInfo(es->str, "I/O: stalls=%" 
> PRIu64,
> +                                                              
> stats->stall_count);
> +
> +                             if (stats->io_count > 0)
> +                             {
> +                                     appendStringInfo(es->str, " size=%.3f 
> inprogress=%.3f",
> +                                                                      
> (stats->io_nblocks * 1.0 / stats->io_count),
> +                                                                      
> (stats->io_in_progress * 1.0 / stats->io_count));
> +                             }
> +
> +                             appendStringInfoChar(es->str, '\n');
> +                     }
> +             }
> +             else
> +             {
> +                     ExplainOpenGroup("Prefetch", "I/O", true, es);
> +
> +                     ExplainPropertyFloat("Average Distance", NULL,
> +                                                              
> (stats->distance_sum * 1.0 / stats->prefetch_count), 3, es);
> +                     ExplainPropertyInteger("Max Distance", NULL,
> +                                                                
> stats->distance_max, es);
> +                     ExplainPropertyInteger("Capacity", NULL,
> +                                                                
> stats->distance_capacity, es);
> +                     ExplainPropertyUInteger("Stalls", NULL,
> +                                                                     
> stats->stall_count, es);
> +
> +                     if (stats->io_count > 0)
> +                     {
> +                             ExplainPropertyFloat("Average IO Size", NULL,
> +                                                                      
> (stats->io_nblocks * 1.0 / stats->io_count), 3, es);
> +                             ExplainPropertyFloat("Average IOs In Progress", 
> NULL,
> +                                                                      
> (stats->io_in_progress * 1.0 / stats->io_count), 3, es);
> +                     }
> +
> +                     ExplainCloseGroup("Prefetch", "I/O", true, es);
> +             }
> +     }
> +}

Lots of duplication between show_scan_io_info() and show_worker_io_info(). I
assume that's just due to prototype state?


> +/*
> + * read_stream_update_stats_prefetch
> + *           update read_stream stats with current pinned buffer depth

Other stuff in read stream does not do this absurd thing of restating the
function name in a comment and indenting the "title" of the function. Don't do
it here.


> + *
> + * Called once per buffer returned to the consumer in 
> read_stream_next_buffer().
> + * Records the number of pinned buffers at that moment, so we can compute the
> + * average look-ahead depth.
> + */
> +static inline void
> +read_stream_update_stats_prefetch(ReadStream *stream)
> +{
> +     stream->stats.prefetch_count++;
> +     stream->stats.distance_sum += stream->pinned_buffers;
> +     if (stream->pinned_buffers > stream->stats.distance_max)
> +             stream->stats.distance_max = stream->pinned_buffers;
> +}


> +/*
> + * read_stream_update_stats_io
> + *           update read_stream stats about size of I/O requests
> + *
> + * We count the number of I/O requests, size of requests (counted in blocks)
> + * and number of in-progress I/Os.
> + */
> +static inline void
> +read_stream_update_stats_io(ReadStream *stream, int nblocks, int in_progress)
> +{
> +     stream->stats.io_count++;
> +     stream->stats.io_nblocks += nblocks;
> +     stream->stats.io_in_progress += in_progress;
> +}

What's the point of the caller passing in stream->ios_in_progress?  Are you
expecting some callers to pass in different values? If so, why?

Seems a bit confusing to have both stream->stats.io_in_progress and
streams->ios_in_progress.  It's also not clear that stats.io_in_progress is a
sum from the name.



> @@ -851,6 +894,7 @@ read_stream_next_buffer(ReadStream *stream, void 
> **per_buffer_data)
>                                                                               
> flags)))
>                       {
>                               /* Fast return. */
> +                             read_stream_update_stats_prefetch(stream);
>                               return buffer;
>                       }
>
> @@ -860,6 +904,9 @@ read_stream_next_buffer(ReadStream *stream, void 
> **per_buffer_data)
>                       stream->ios_in_progress = 1;
>                       stream->ios[0].buffer_index = oldest_buffer_index;
>                       stream->seq_blocknum = next_blocknum + 1;
> +
> +                     /* update I/O stats */
> +                     read_stream_update_stats_io(stream, 1, 
> stream->ios_in_progress);
>               }
>               else
>               {
> @@ -871,6 +918,7 @@ read_stream_next_buffer(ReadStream *stream, void 
> **per_buffer_data)
>               }
>
>               stream->fast_path = false;
> +             read_stream_update_stats_prefetch(stream);
>               return buffer;
>       }
>  #endif

Should read_stream_update_stats_prefetch() actually be called when we reached
the end of the stream?



> @@ -916,12 +964,17 @@ read_stream_next_buffer(ReadStream *stream, void 
> **per_buffer_data)
>       {
>               int16           io_index = stream->oldest_io_index;
>               int32           distance;       /* wider temporary value, 
> clamped below */
> +             bool            needed_wait;
>
>               /* Sanity check that we still agree on the buffers. */
>               Assert(stream->ios[io_index].op.buffers ==
>                          &stream->buffers[oldest_buffer_index]);
>
> -             WaitReadBuffers(&stream->ios[io_index].op);
> +             needed_wait = WaitReadBuffers(&stream->ios[io_index].op);
> +
> +             /* Count it as a stall if we need to wait for IO */
> +             if (needed_wait)
> +                     stream->stats.stall_count += 1;
>
>               Assert(stream->ios_in_progress > 0);
>               stream->ios_in_progress--;

I'd probably put the stalls_count++ in a helper too, that makes it easier to
test compiling out the stats support and similar things.



> @@ -328,6 +329,21 @@ ExecEndBitmapHeapScan(BitmapHeapScanState *node)
>                */
>               si->exact_pages += node->stats.exact_pages;
>               si->lossy_pages += node->stats.lossy_pages;
> +
> +             /* collect prefetch info for this process from the read_stream 
> */
> +             if ((stats = table_scan_stats(node->ss.ss_currentScanDesc)) != 
> NULL)
> +             {
> +                     si->stream.prefetch_count += stats->prefetch_count;
> +                     si->stream.distance_sum += stats->distance_sum;
> +                     if (stats->distance_max > si->stream.distance_max)
> +                             si->stream.distance_max = stats->distance_max;
> +                     if (stats->distance_capacity > 
> si->stream.distance_capacity)
> +                             si->stream.distance_capacity = 
> stats->distance_capacity;
> +                     si->stream.stall_count += stats->stall_count;
> +                     si->stream.io_count += stats->io_count;
> +                     si->stream.io_nblocks += stats->io_nblocks;
> +                     si->stream.io_in_progress += stats->io_in_progress;
> +             }
>       }

Could this be wrapped in a helper function?


> +/*
> + * Generic prefetch stats for table scans.
> + */
> +typedef struct TableScanStatsData
> +{
> +     /* number of buffers returned to consumer (for averaging distance) */
> +     uint64          prefetch_count;
> +
> +     /* sum of pinned_buffers sampled at each buffer return */
> +     uint64          distance_sum;
> +
> +     /* maximum actual pinned_buffers observed during the scan */
> +     int16           distance_max;
> +
> +     /* maximum possible look-ahead distance (max_pinned_buffers) */
> +     int16           distance_capacity;
> +
> +     /* number of stalled reads (waiting for I/O) */
> +     uint64          stall_count;
> +
> +     /* I/O stats */
> +     uint64          io_count;               /* number of I/Os */
> +     uint64          io_nblocks;             /* sum of blocks for all I/Os */
> +     uint64          io_in_progress; /* sum of in-progress I/Os */
> +} TableScanStatsData;
> +typedef struct TableScanStatsData *TableScanStats;


> --- a/src/include/executor/instrument_node.h
> +++ b/src/include/executor/instrument_node.h
> @@ -41,9 +41,51 @@ typedef struct SharedAggInfo
>
>
>  /* ---------------------
> - *   Instrumentation information for indexscans (amgettuple and amgetbitmap)
> + *   Instrumentation information about read streams
>   * ---------------------
>   */
> +typedef struct ReadStreamInstrumentation
> +{
> +     /* number of buffers returned to consumer (for averaging distance) */
> +     uint64          prefetch_count;
> +
> +     /* sum of pinned_buffers sampled at each buffer return */
> +     uint64          distance_sum;
> +
> +     /* maximum actual pinned_buffers observed during the scan */
> +     int16           distance_max;
> +
> +     /* maximum possible look-ahead distance (max_pinned_buffers) */
> +     int16           distance_capacity;
> +
> +     /* number of stalled reads (waiting for I/O) */
> +     uint64          stall_count;
> +
> +     /* I/O stats */
> +     uint64          io_count;               /* number of I/Os */
> +     uint64          io_nblocks;             /* sum of blocks for all I/Os */
> +     uint64          io_in_progress; /* sum of in-progress I/Os */
> +} ReadStreamInstrumentation;

I don't get what we gain by having this twice, once as TableScanStatsData and
once as ReadStreamInstrumentation.


Greetings,

Andres Freund


Reply via email to