Hi,

On Tue, Dec 30, 2025 at 10:43 AM Xuneng Zhou <[email protected]> wrote:
>
> Hi,
>
> On Tue, Dec 30, 2025 at 9:51 AM Xuneng Zhou <[email protected]> wrote:
> >
> > Hi,
> >
> > Thanks for looking into this.
> >
> > On Mon, Dec 29, 2025 at 6:58 PM Nazir Bilal Yavuz <[email protected]> 
> > wrote:
> > >
> > > Hi,
> > >
> > > On Sun, 28 Dec 2025 at 14:46, Xuneng Zhou <[email protected]> wrote:
> > > >
> > > > Hi,
> > > > >
> > > > > Two more to go:
> > > > > patch 5: Streamify log_newpage_range() WAL logging path
> > > > > patch 6: Streamify hash index VACUUM primary bucket page reads
> > > > >
> > > > > Benchmarks will be conducted soon.
> > > > >
> > > >
> > > > v6 in the last message has a problem and has not been updated. Attach
> > > > the right one again. Sorry for the noise.
> > >
> > > 0003 and 0006:
> > >
> > > You need to add 'StatApproxReadStreamPrivate' and
> > > 'HashBulkDeleteStreamPrivate' to the typedefs.list.
> >
> > Done.
> >
> > > 0005:
> > >
> > > @@ -1321,8 +1341,10 @@ log_newpage_range(Relation rel, ForkNumber forknum,
> > >          nbufs = 0;
> > >          while (nbufs < XLR_MAX_BLOCK_ID && blkno < endblk)
> > >          {
> > > -            Buffer        buf = ReadBufferExtended(rel, forknum, blkno,
> > > -                                                 RBM_NORMAL, NULL);
> > > +            Buffer        buf = read_stream_next_buffer(stream, NULL);
> > > +
> > > +            if (!BufferIsValid(buf))
> > > +                break;
> > >
> > > We are loosening a check here, there should not be a invalid buffer in
> > > the stream until the endblk. I think you can remove this
> > > BufferIsValid() check, then we can learn if something goes wrong.
> >
> > My concern before for not adding assert at the end of streaming is the
> > potential early break in here:
> >
> > /* Nothing more to do if all remaining blocks were empty. */
> > if (nbufs == 0)
> >     break;
> >
> > After looking more closely, it turns out to be a misunderstanding of the 
> > logic.
> >
> > > 0006:
> > >
> > > You can use read_stream_reset() instead of read_stream_end(), then you
> > > can use the same stream with different variables, I believe this is
> > > the preferred way.
> > >
> > > Rest LGTM!
> > >
> >
> > Yeah, reset seems a more proper way here.
> >
>
> Run pgindent using the updated typedefs.list.
>

I've completed benchmarking of the v4 streaming read patches across
three I/O methods (io_uring, sync, worker). Tests were run with cold
cache on large datasets.

--- Settings ---

shared_buffers = '8GB'
effective_io_concurrency = 200
io_method = $IO_METHOD
io_workers = $IO_WORKERS
io_max_concurrency = $IO_MAX_CONCURRENCY
track_io_timing = on
autovacuum = off
checkpoint_timeout = 1h
max_wal_size = 10GB
max_parallel_workers_per_gather = 0

--- Machine ---
CPU: 48-core
RAM: 256 GB DDR5
Disk: 2 x 1.92 TB NVMe SSD

--- Executive Summary ---

The patches provide significant benefits for I/O-bound sequential
operations, with the greatest improvements seen when using
asynchronous I/O methods (io_uring and worker). The synchronous I/O
mode shows reduced but still meaningful gains.

--- Results by I/O Method

Best Results: io_method=worker

bloom_scan: 4.14x (75.9% faster); 93% fewer reads
pgstattuple: 1.59x (37.1% faster); 94% fewer reads
hash_vacuum: 1.05x (4.4% faster); 80% fewer reads
gin_vacuum: 1.06x (5.6% faster); 15% fewer reads
bloom_vacuum: 1.04x (3.9% faster); 76% fewer reads
wal_logging: 0.98x (-2.5%, neutral/slightly slower); no change in reads

io_method=io_uring

bloom_scan: 3.12x (68.0% faster); 93% fewer reads
pgstattuple: 1.50x (33.2% faster); 94% fewer reads
hash_vacuum: 1.03x (3.3% faster); 80% fewer reads
gin_vacuum: 1.02x (2.1% faster); 15% fewer reads
bloom_vacuum: 1.03x (3.4% faster); 76% fewer reads
wal_logging: 1.00x (-0.5%, neutral); no change in reads

io_method=sync (baseline comparison)

bloom_scan: 1.20x (16.4% faster); 93% fewer reads
pgstattuple: 1.10x (9.0% faster); 94% fewer reads
hash_vacuum: 1.01x (0.8% faster); 80% fewer reads
gin_vacuum: 1.02x (1.7% faster); 15% fewer reads
bloom_vacuum: 1.03x (2.8% faster); 76% fewer reads
wal_logging: 0.99x (-0.7%, neutral); no change in reads

--- Observations ---

Async I/O amplifies streaming benefits: The same patches show 3-4x
improvement with worker/io_uring vs 1.2x with sync.

I/O operation reduction is consistent: All modes show the same ~93-94%
reduction in I/O operations for bloom_scan and pgstattuple.

VACUUM operations show modest gains: Despite large I/O reductions
(76-80%), wall-clock improvements are smaller (3-15%) since VACUUM has
larger CPU overhead (tuple processing, index maintenance, WAL
logging).

log_newpage_range shows no benefit: The patch provides no improvement (~0.97x).

--
Best,
Xuneng
#!/usr/bin/env bash
set -euo pipefail

###############################################################################
# Streaming Read Patches Benchmark
#
# Usage: ./run_streaming_bench.sh [OPTIONS] <patch>
#
# Options:
#   --clean           Remove existing builds and start fresh
#   --baseline        Also build and test vanilla PostgreSQL for comparison
#   --test TEST       Run specific test (bloom_scan, bloom_vacuum, pgstattuple,
#                     gin_vacuum, wal_logging, hash_vacuum, or "all")
#   --io-method MODE  I/O method: io_uring, worker, or sync (default: io_uring)
#   --io-workers N    Number of I/O workers for worker mode (default: 3)
#   --io-concurrency N  Max concurrent I/Os per process (default: 64)
#
# Environment:
#   WORKROOT     Base directory (default: $HOME/pg_bench)
#   REPS         Repetitions per test (default: 5)
#   SIZES        Table sizes to test (default: "large")
###############################################################################

log() { printf '\033[1;34m==>\033[0m %s\n' "$*"; }
die() { printf '\033[1;31mERROR:\033[0m %s\n' "$*" >&2; exit 1; }

# --- CLI parsing ---
CLEAN=0
BASELINE=0
TEST="all"
IO_METHOD="${IO_METHOD:-io_uring}"
IO_WORKERS="${IO_WORKERS:-3}"
IO_MAX_CONCURRENCY="${IO_MAX_CONCURRENCY:-64}"
PATCH=""

while [[ $# -gt 0 ]]; do
  case "$1" in
    --clean)          CLEAN=1 ;;
    --baseline)       BASELINE=1 ;;
    --test)           TEST="$2"; shift ;;
    --io-method)      IO_METHOD="$2"; shift ;;
    --io-workers)     IO_WORKERS="$2"; shift ;;
    --io-concurrency) IO_MAX_CONCURRENCY="$2"; shift ;;
    -h|--help)        sed -n '3,18p' "$0" | sed 's/^# \?//'; exit 0 ;;
    -*)               die "Unknown option: $1" ;;
    *)                PATCH="$1" ;;
  esac
  shift
done

# Validate io_method
case "$IO_METHOD" in
  io_uring|worker|sync) ;;
  *) die "Invalid --io-method: $IO_METHOD (must be io_uring, worker, or sync)" ;;
esac

[[ -z "$PATCH" ]] && die "Usage: $0 [--clean] [--baseline] [--test TEST] <patch>"
[[ ! -f "$PATCH" ]] && die "Patch not found: $PATCH"
[[ "$PATCH" != /* ]] && PATCH="$PWD/$PATCH"

# --- Configuration ---
WORKROOT="${WORKROOT:-$HOME/pg_bench}"
REPS="${REPS:-5}"
SIZES="${SIZES:-large}"

ROOT_BASE="$WORKROOT/vanilla"
ROOT_PATCH="$WORKROOT/patched"

# --- Helpers ---
pg() { echo "$1/pg/bin/$2"; }

pick_port() {
  for p in $(seq "${1:-5432}" 60000); do
    lsof -iTCP:"$p" -sTCP:LISTEN >/dev/null 2>&1 || { echo "$p"; return; }
  done
  die "No free port found"
}

# --- Build PostgreSQL ---
build_pg() {
  local ROOT="$1" PATCH_FILE="${2:-}"
  
  [[ $CLEAN -eq 1 ]] && rm -rf "$ROOT"
  
  if [[ -x "$(pg "$ROOT" initdb)" ]]; then
    log "Reusing build: $ROOT"
    return
  fi
  
  log "Building PostgreSQL: $ROOT"
  mkdir -p "$ROOT"
  
  git clone --depth 1 https://github.com/postgres/postgres "$ROOT/src" 2>/dev/null
  cd "$ROOT/src"
  
  [[ -n "$PATCH_FILE" ]] && { log "Applying patch"; git apply "$PATCH_FILE"; }
  
  ./configure --prefix="$ROOT/pg" --with-liburing \
    CFLAGS='-O2 -ggdb3 -fno-omit-frame-pointer' >/dev/null 2>&1
  
  make -j"$(nproc)" install >/dev/null 2>&1
  make -C contrib/bloom install >/dev/null 2>&1
  make -C contrib/pgstattuple install >/dev/null 2>&1
}

# --- Cluster management ---
init_cluster() {
  local ROOT="$1" PORT="$2"
  
  rm -rf "$ROOT/data"
  "$(pg "$ROOT" initdb)" -D "$ROOT/data" --no-locale >/dev/null 2>&1
  
  cat >> "$ROOT/data/postgresql.conf" <<EOF
port = $PORT
listen_addresses = '127.0.0.1'
shared_buffers = '8GB'
effective_io_concurrency = 200
io_method = $IO_METHOD
io_workers = $IO_WORKERS
io_max_concurrency = $IO_MAX_CONCURRENCY
track_io_timing = on
autovacuum = off
checkpoint_timeout = 1h
max_wal_size = 10GB
max_parallel_workers_per_gather = 0
EOF
  
  "$(pg "$ROOT" pg_ctl)" -D "$ROOT/data" -l "$ROOT/server.log" start -w >/dev/null
}

stop_cluster() {
  local ROOT="$1"
  "$(pg "$ROOT" pg_ctl)" -D "$ROOT/data" stop -m fast 2>/dev/null || true
}

drop_caches() {
  local ROOT="$1"
  stop_cluster "$ROOT"
  sync
  echo 3 | sudo tee /proc/sys/vm/drop_caches >/dev/null 2>&1 || true
  sleep 2
  "$(pg "$ROOT" pg_ctl)" -D "$ROOT/data" -l "$ROOT/server.log" start -w >/dev/null
}

psql_run() {
  local ROOT="$1" PORT="$2"
  shift 2
  "$(pg "$ROOT" psql)" -h 127.0.0.1 -p "$PORT" -d postgres -v ON_ERROR_STOP=1 -Atq "$@"
}

# --- Timing ---
run_timed() {
  local ROOT="$1" PORT="$2" SQL="$3"
  local ms
  # -X: ignore .psqlrc, -v ON_ERROR_STOP=1: fail on SQL errors
  # Parse last Time: line, handle both "ms" and "s" units
  ms=$("$(pg "$ROOT" psql)" -h 127.0.0.1 -p "$PORT" -d postgres -X -v ON_ERROR_STOP=1 -At \
    -c '\timing on' -c "$SQL" 2>&1 | \
    awk '
      /Time:/ {
        val=$2; unit=$3;
        if (unit=="ms") ms=val;
        else if (unit=="s") ms=val*1000;
      }
      END { if (ms=="") exit 1; printf "%.3f\n", ms; }
    ')
  # Validate numeric output
  [[ "$ms" =~ ^[0-9]+(\.[0-9]+)?$ ]] || { echo "ERROR: Non-numeric timing: $ms" >&2; return 1; }
  echo "$ms"
}

# --- I/O Stats ---
# Run SQL and capture timing + I/O stats from pg_stat_io
# Resets stats before query, waits for flush, then reads absolute values
# Note: pg_stat_io has PGSTAT_MIN_INTERVAL=1000ms flush delay, so we wait 1.5s
#       after the query to ensure stats are flushed to shared memory.
# Note: pg_stat_io counts I/O operations, not pages (with io_combine_limit=128kB,
#       up to 16 pages per operation). This is expected behavior.
# Returns: ms,reads,read_time
run_timed_with_io() {
  local ROOT="$1" PORT="$2" SQL="$3"
  local result
  
  # Reset stats, run query, wait for flush, read absolute values
  # - Filter by client backend and io worker (excludes bgwriter/checkpointer)
  # - 1.5s delay allows stats to flush (PGSTAT_MIN_INTERVAL=1000ms)
  result=$("$(pg "$ROOT" psql)" -h 127.0.0.1 -p "$PORT" -d postgres -X -v ON_ERROR_STOP=1 <<EOSQL
SELECT pg_stat_reset_shared('io');
\\timing on
$SQL
\\timing off
SELECT pg_sleep(1.5);
\\t on
SELECT 
  COALESCE(SUM(reads),0)::bigint,
  COALESCE(SUM(read_time),0)::numeric(12,2)
FROM pg_stat_io 
WHERE object = 'relation' AND backend_type IN ('client backend', 'io worker');
EOSQL
  2>&1)
  
  # Parse timing (last Time: line)
  local ms
  ms=$(echo "$result" | awk '
    /Time:/ {
      val=$2; unit=$3;
      if (unit=="ms") ms=val;
      else if (unit=="s") ms=val*1000;
    }
    END { if (ms=="") exit 1; printf "%.3f\n", ms; }
  ')
  
  # Parse I/O stats (last non-empty line with pipe separator: reads|read_time)
  local reads read_time
  local io_line
  io_line=$(echo "$result" | grep '|' | tail -1)
  reads=$(echo "$io_line" | cut -d'|' -f1 | tr -d ' ')
  read_time=$(echo "$io_line" | cut -d'|' -f2 | tr -d ' ')
  
  # Default to 0 if not found
  [[ "$reads" =~ ^-?[0-9]+$ ]] || reads=0
  [[ "$read_time" =~ ^-?[0-9]+(\.[0-9]+)?$ ]] || read_time=0
  
  echo "$ms,$reads,$read_time"
}

# --- Statistics ---
calc_median() {
  awk -F, 'NR>1{a[++n]=$2}END{
    if(n==0){print 0; exit}
    for(i=1;i<=n;i++)for(j=i+1;j<=n;j++)if(a[i]>a[j]){t=a[i];a[i]=a[j];a[j]=t}
    print (n%2)?a[int(n/2)+1]:(a[n/2]+a[n/2+1])/2
  }' "$1"
}

calc_median_col() {
  local file="$1" col="$2"
  awk -F, -v col="$col" 'NR>1{a[++n]=$col}END{
    if(n==0){print 0; exit}
    for(i=1;i<=n;i++)for(j=i+1;j<=n;j++)if(a[i]>a[j]){t=a[i];a[i]=a[j];a[j]=t}
    print (n%2)?a[int(n/2)+1]:(a[n/2]+a[n/2+1])/2
  }' "$file"
}

calc_stats() {
  local csv="$1"
  awk -F, 'NR>1{a[++n]=$2;s+=$2}END{
    if(n==0)exit
    for(i=1;i<=n;i++)for(j=i+1;j<=n;j++)if(a[i]>a[j]){t=a[i];a[i]=a[j];a[j]=t}
    med=(n%2)?a[int(n/2)+1]:(a[n/2]+a[n/2+1])/2
    avg=s/n; for(i=1;i<=n;i++)ss+=(a[i]-avg)^2; sd=sqrt(ss/n)
    printf "median=%.1fms mean=%.1f±%.1fms n=%d", med, avg, sd, n
  }' "$csv"
}

# --- Benchmark runner ---
benchmark() {
  local ROOT="$1" PORT="$2" NAME="$3" SQL="$4"
  local OUT="$ROOT/results/${NAME}.csv"
  
  mkdir -p "$ROOT/results"
  echo "run,ms,reads,read_time_ms" > "$OUT"
  
  for i in $(seq 1 "$REPS"); do
    drop_caches "$ROOT"
    local result ms reads read_time
    result=$(run_timed_with_io "$ROOT" "$PORT" "$SQL")
    IFS=',' read -r ms reads read_time <<<"$result"
    echo "$i,$ms,$reads,$read_time" >> "$OUT"
    log "$NAME [$i/$REPS]: ${ms}ms (reads=$reads, io_time=${read_time}ms)"
  done
}

# --- Data setup functions ---
setup_bloom() {
  local ROOT="$1" PORT="$2" SIZE="$3"
  local NROWS
  case "$SIZE" in
    small)  NROWS=100000 ;;
    medium) NROWS=1000000 ;;
    large)  NROWS=10000000 ;;
  esac
  
  log "Creating Bloom test data ($SIZE: $NROWS rows)"
  psql_run "$ROOT" "$PORT" <<SQL
CREATE EXTENSION IF NOT EXISTS bloom;
DROP TABLE IF EXISTS bloom_test;
CREATE TABLE bloom_test (id INT, data TEXT, val1 INT, val2 INT);
INSERT INTO bloom_test SELECT i, 'data_'||i, i%1000, i%100 FROM generate_series(1,$NROWS) i;
CREATE INDEX bloom_idx ON bloom_test USING bloom (val1, val2);
VACUUM ANALYZE bloom_test;
CHECKPOINT;
SQL
}

setup_pgstattuple() {
  local ROOT="$1" PORT="$2" SIZE="$3"
  local NROWS
  case "$SIZE" in
    small)  NROWS=100000 ;;
    medium) NROWS=1000000 ;;
    large)  NROWS=10000000 ;;
  esac
  
  log "Creating pgstattuple test data ($SIZE: $NROWS rows)"
  psql_run "$ROOT" "$PORT" <<SQL
CREATE EXTENSION IF NOT EXISTS pgstattuple;
DROP TABLE IF EXISTS heap_test;
CREATE TABLE heap_test (id SERIAL PRIMARY KEY, data TEXT);
INSERT INTO heap_test (data) SELECT repeat('x',100) FROM generate_series(1,$NROWS);
VACUUM ANALYZE heap_test;
CHECKPOINT;
SQL
}

setup_gin() {
  local ROOT="$1" PORT="$2" SIZE="$3"
  local NROWS
  case "$SIZE" in
    small)  NROWS=100000 ;;
    medium) NROWS=1000000 ;;
    large)  NROWS=5000000 ;;
  esac
  
  log "Creating GIN test data ($SIZE: $NROWS rows)"
  psql_run "$ROOT" "$PORT" <<SQL
DROP TABLE IF EXISTS gin_test;
-- No PRIMARY KEY: isolate GIN index vacuum from btree overhead
CREATE TABLE gin_test (id INT, tags TEXT[]);
INSERT INTO gin_test (id, tags)
SELECT i, ARRAY(SELECT 'tag_'||(random()*100)::int FROM generate_series(1,5))
FROM generate_series(1,$NROWS) i;
CREATE INDEX gin_idx ON gin_test USING gin (tags);
VACUUM ANALYZE gin_test;
CHECKPOINT;
SQL
}

setup_hash() {
  local ROOT="$1" PORT="$2" SIZE="$3"
  local NROWS
  case "$SIZE" in
    small)  NROWS=500000 ;;
    medium) NROWS=5000000 ;;
    large)  NROWS=20000000 ;;
  esac
  
  log "Creating Hash test data ($SIZE: $NROWS unique values)"
  psql_run "$ROOT" "$PORT" <<SQL
DROP TABLE IF EXISTS hash_test;
-- No PRIMARY KEY: isolate hash index vacuum from btree overhead
CREATE TABLE hash_test (id INT, data TEXT);
INSERT INTO hash_test SELECT i, 'x' FROM generate_series(1,$NROWS) i;
CREATE INDEX hash_idx ON hash_test USING hash (id);
VACUUM ANALYZE hash_test;
CHECKPOINT;
SQL
}

setup_wal() {
  local ROOT="$1" PORT="$2" SIZE="$3"
  local NROWS
  case "$SIZE" in
    small)  NROWS=1000000 ;;
    medium) NROWS=5000000 ;;
    large)  NROWS=20000000 ;;
  esac
  
  log "Creating UNLOGGED table for log_newpage_range test ($SIZE: $NROWS rows)"
  psql_run "$ROOT" "$PORT" <<SQL
DROP TABLE IF EXISTS wal_test;
-- UNLOGGED table: pages written to disk but NO WAL generated
-- ALTER TABLE SET LOGGED will call log_newpage_range() to read all pages
-- and write them to WAL - this is where streaming read helps
CREATE UNLOGGED TABLE wal_test (id INT, data TEXT);
INSERT INTO wal_test SELECT i, repeat('x', 100) FROM generate_series(1,$NROWS) i;
-- Checkpoint ensures all pages are on disk before we drop caches
CHECKPOINT;
SQL
}

# --- Test functions ---
test_bloom_scan() {
  local ROOT="$1" PORT="$2" LABEL="$3" SIZE="$4"
  setup_bloom "$ROOT" "$PORT" "$SIZE"
  benchmark "$ROOT" "$PORT" "${LABEL}_bloom_scan_${SIZE}" \
    "SET enable_seqscan=off; SELECT COUNT(*) FROM bloom_test WHERE val1=42 AND val2=7;"
}

test_bloom_vacuum() {
  local ROOT="$1" PORT="$2" LABEL="$3" SIZE="$4"
  local OUT="$ROOT/results/${LABEL}_bloom_vacuum_${SIZE}.csv"
  mkdir -p "$ROOT/results"
  echo "run,ms,reads,read_time_ms" > "$OUT"
  
  for i in $(seq 1 "$REPS"); do
    # Fresh table each run for consistent state
    setup_bloom "$ROOT" "$PORT" "$SIZE"
    # Create 10% dead tuples
    psql_run "$ROOT" "$PORT" -c "DELETE FROM bloom_test WHERE id % 10 = 0;"
    
    drop_caches "$ROOT"
    local result ms reads read_time
    result=$(run_timed_with_io "$ROOT" "$PORT" "VACUUM bloom_test;")
    IFS=',' read -r ms reads read_time <<<"$result"
    echo "$i,$ms,$reads,$read_time" >> "$OUT"
    log "${LABEL}_bloom_vacuum_${SIZE} [$i/$REPS]: ${ms}ms (reads=$reads, io_time=${read_time}ms)"
  done
}

test_pgstattuple() {
  local ROOT="$1" PORT="$2" LABEL="$3" SIZE="$4"
  local OUT="$ROOT/results/${LABEL}_pgstattuple_${SIZE}.csv"
  mkdir -p "$ROOT/results"
  echo "run,ms,reads,read_time_ms" > "$OUT"
  
  for i in $(seq 1 "$REPS"); do
    # Fresh table each run
    setup_pgstattuple "$ROOT" "$PORT" "$SIZE"
    # UPDATE clears the all-visible bit in the Visibility Map for affected pages.
    # pgstattuple_approx skips all-visible pages (estimates from VM without reading).
    # This ensures the function must actually read pages, exercising the streaming path.
    psql_run "$ROOT" "$PORT" -c "UPDATE heap_test SET data = data || '!' WHERE id % 5 = 0;"
    
    drop_caches "$ROOT"
    local result ms reads read_time
    result=$(run_timed_with_io "$ROOT" "$PORT" "SELECT * FROM pgstattuple_approx('heap_test');")
    IFS=',' read -r ms reads read_time <<<"$result"
    echo "$i,$ms,$reads,$read_time" >> "$OUT"
    log "${LABEL}_pgstattuple_${SIZE} [$i/$REPS]: ${ms}ms (reads=$reads, io_time=${read_time}ms)"
  done
}

test_gin_vacuum() {
  local ROOT="$1" PORT="$2" LABEL="$3" SIZE="$4"
  local OUT="$ROOT/results/${LABEL}_gin_vacuum_${SIZE}.csv"
  mkdir -p "$ROOT/results"
  echo "run,ms,reads,read_time_ms" > "$OUT"
  
  for i in $(seq 1 "$REPS"); do
    # Fresh table each run for consistent state
    setup_gin "$ROOT" "$PORT" "$SIZE"
    
    drop_caches "$ROOT"
    local result ms reads read_time
    # VACUUM ANALYZE forces ginvacuumcleanup() to run and scan all pages
    result=$(run_timed_with_io "$ROOT" "$PORT" "VACUUM ANALYZE gin_test;")
    IFS=',' read -r ms reads read_time <<<"$result"
    echo "$i,$ms,$reads,$read_time" >> "$OUT"
    log "${LABEL}_gin_vacuum_${SIZE} [$i/$REPS]: ${ms}ms (reads=$reads, io_time=${read_time}ms)"
  done
}

test_hash_vacuum() {
  local ROOT="$1" PORT="$2" LABEL="$3" SIZE="$4"
  local OUT="$ROOT/results/${LABEL}_hash_vacuum_${SIZE}.csv"
  mkdir -p "$ROOT/results"
  echo "run,ms,reads,read_time_ms" > "$OUT"
  
  for i in $(seq 1 "$REPS"); do
    # Fresh table each run for consistent state
    setup_hash "$ROOT" "$PORT" "$SIZE"
    # Create 10% dead tuples
    psql_run "$ROOT" "$PORT" -c "DELETE FROM hash_test WHERE id % 10 = 0;"
    
    drop_caches "$ROOT"
    local result ms reads read_time
    result=$(run_timed_with_io "$ROOT" "$PORT" "VACUUM hash_test;")
    IFS=',' read -r ms reads read_time <<<"$result"
    echo "$i,$ms,$reads,$read_time" >> "$OUT"
    log "${LABEL}_hash_vacuum_${SIZE} [$i/$REPS]: ${ms}ms (reads=$reads, io_time=${read_time}ms)"
  done
}

test_wal_logging() {
  local ROOT="$1" PORT="$2" LABEL="$3" SIZE="$4"
  local OUT="$ROOT/results/${LABEL}_wal_logging_${SIZE}.csv"
  mkdir -p "$ROOT/results"
  echo "run,ms,reads,read_time_ms" > "$OUT"
  
  for i in $(seq 1 "$REPS"); do
    # Create UNLOGGED table with data (pages on disk, no WAL)
    setup_wal "$ROOT" "$PORT" "$SIZE"
    
    # Drop OS caches - pages are now COLD on disk
    drop_caches "$ROOT"
    
    # ALTER TABLE SET LOGGED calls log_newpage_range() to:
    # 1. Read ALL table pages from disk (COLD - streaming helps here!)
    # 2. Write them to WAL for crash recovery
    # This directly exercises the patched code path in log_newpage_range()
    local result ms reads read_time
    result=$(run_timed_with_io "$ROOT" "$PORT" "ALTER TABLE wal_test SET LOGGED;")
    IFS=',' read -r ms reads read_time <<<"$result"
    echo "$i,$ms,$reads,$read_time" >> "$OUT"
    log "${LABEL}_wal_logging_${SIZE} [$i/$REPS]: ${ms}ms (reads=$reads, io_time=${read_time}ms)"
  done
}

# --- Run tests for a build ---
run_tests() {
  local ROOT="$1" LABEL="$2"
  local PORT
  PORT=$(pick_port)
  
  log "[$LABEL] Starting cluster on port $PORT"
  init_cluster "$ROOT" "$PORT"
  
  trap "stop_cluster '$ROOT'" EXIT
  
  for SIZE in $SIZES; do
    case "$TEST" in
      bloom_scan)   test_bloom_scan "$ROOT" "$PORT" "$LABEL" "$SIZE" ;;
      bloom_vacuum) test_bloom_vacuum "$ROOT" "$PORT" "$LABEL" "$SIZE" ;;
      pgstattuple)  test_pgstattuple "$ROOT" "$PORT" "$LABEL" "$SIZE" ;;
      gin_vacuum)   test_gin_vacuum "$ROOT" "$PORT" "$LABEL" "$SIZE" ;;
      hash_vacuum)  test_hash_vacuum "$ROOT" "$PORT" "$LABEL" "$SIZE" ;;
      wal_logging)  test_wal_logging "$ROOT" "$PORT" "$LABEL" "$SIZE" ;;
      all)
        test_bloom_scan "$ROOT" "$PORT" "$LABEL" "$SIZE"
        test_bloom_vacuum "$ROOT" "$PORT" "$LABEL" "$SIZE"
        test_pgstattuple "$ROOT" "$PORT" "$LABEL" "$SIZE"
        test_gin_vacuum "$ROOT" "$PORT" "$LABEL" "$SIZE"
        test_hash_vacuum "$ROOT" "$PORT" "$LABEL" "$SIZE"
        test_wal_logging "$ROOT" "$PORT" "$LABEL" "$SIZE"
        ;;
      *) die "Unknown test: $TEST" ;;
    esac
  done
  
  stop_cluster "$ROOT"
  trap - EXIT
}

# --- Compare results ---
compare_results() {
  local base_csv="$1" patch_csv="$2" label="$3"
  
  [[ ! -f "$base_csv" || ! -f "$patch_csv" ]] && return
  
  local base_med patch_med
  base_med=$(calc_median "$base_csv")
  patch_med=$(calc_median "$patch_csv")
  
  # Guard against empty or zero values to prevent division by zero
  [[ -z "$base_med" || "$base_med" == "0" ]] && base_med="0.001"
  [[ -z "$patch_med" || "$patch_med" == "0" ]] && patch_med="0.001"
  
  local speedup pct
  speedup=$(awk "BEGIN { printf \"%.2f\", $base_med / $patch_med }")
  pct=$(awk "BEGIN { printf \"%.1f\", ($base_med - $patch_med) / $base_med * 100 }")
  
  # Check I/O stats (reads and read_time columns)
  local base_reads patch_reads base_io patch_io io_info=""
  if head -1 "$base_csv" | grep -q "reads"; then
    base_reads=$(calc_median_col "$base_csv" 3)
    patch_reads=$(calc_median_col "$patch_csv" 3)
    base_io=$(calc_median_col "$base_csv" 4)
    patch_io=$(calc_median_col "$patch_csv" 4)
    # Default to 0 if empty
    [[ -z "$base_reads" ]] && base_reads=0
    [[ -z "$patch_reads" ]] && patch_reads=0
    [[ -z "$base_io" ]] && base_io=0
    [[ -z "$patch_io" ]] && patch_io=0
    io_info="  (reads=${base_reads}→${patch_reads}, io_time=${base_io}→${patch_io}ms)"
  fi
  
  printf "%-26s base=%8.1fms  patch=%8.1fms  %5.2fx  (%5.1f%%)%s\n" \
    "$label" "$base_med" "$patch_med" "$speedup" "$pct" "$io_info"
}

print_summary() {
  echo ""
  echo "═══════════════════════════════════════════════════════════════════════"
  echo "                     STREAMING READ BENCHMARK RESULTS                   "
  echo "═══════════════════════════════════════════════════════════════════════"
  echo ""
  
  if [[ $BASELINE -eq 1 ]]; then
    printf "%-26s %-17s %-17s %-7s %-7s %s\n" "TEST" "BASELINE" "PATCHED" "SPEEDUP" "CHANGE" "I/O TIME"
    echo "─────────────────────────────────────────────────────────────────────────────────────────────────"
    
    for SIZE in $SIZES; do
      for test_name in bloom_scan bloom_vacuum pgstattuple gin_vacuum hash_vacuum wal_logging; do
        [[ "$TEST" != "all" && "$TEST" != "$test_name" ]] && continue
        compare_results \
          "$ROOT_BASE/results/base_${test_name}_${SIZE}.csv" \
          "$ROOT_PATCH/results/patched_${test_name}_${SIZE}.csv" \
          "${test_name}_${SIZE}"
      done
    done
  else
    echo "Results (patched only):"
    echo ""
    for f in "$ROOT_PATCH/results/"*.csv; do
      [[ -f "$f" ]] || continue
      printf "%-40s %s\n" "$(basename "$f" .csv):" "$(calc_stats "$f")"
    done
  fi
  
  echo ""
  echo "═══════════════════════════════════════════════════════════════════════"
  echo "CSV files: $ROOT_PATCH/results/"
  [[ $BASELINE -eq 1 ]] && echo "Baseline:  $ROOT_BASE/results/"
  echo "═══════════════════════════════════════════════════════════════════════"
}

# --- Main ---
main() {
  log "Streaming Read Benchmark"
  log "Patch: $PATCH"
  log "Tests: $TEST"
  log "Sizes: $SIZES"
  log "Reps:  $REPS"
  log "I/O:   $IO_METHOD (workers=$IO_WORKERS, concurrency=$IO_MAX_CONCURRENCY)"
  
  # Build
  if [[ $BASELINE -eq 1 ]]; then
    build_pg "$ROOT_BASE" ""
  fi
  build_pg "$ROOT_PATCH" "$PATCH"
  
  # Run tests
  if [[ $BASELINE -eq 1 ]]; then
    log "Running baseline tests"
    run_tests "$ROOT_BASE" "base"
  fi
  
  log "Running patched tests"
  run_tests "$ROOT_PATCH" "patched"
  
  # Summary
  print_summary
}

main

Reply via email to