This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 957dddc perf: setup tpch benchmark infra (#538)
957dddc is described below
commit 957dddcea23a7e8ec564ffb2ab0d90d420d4e8d9
Author: Shiyan Xu <[email protected]>
AuthorDate: Mon Mar 9 10:29:59 2026 -0500
perf: setup tpch benchmark infra (#538)
---
.gitignore => .dockerignore | 26 +-
.gitignore | 2 +
.licenserc.yaml | 1 +
Cargo.toml | 1 +
Makefile | 40 +-
.gitignore => benchmark/tpch/Cargo.toml | 44 +-
benchmark/tpch/README.md | 63 +
.gitignore => benchmark/tpch/config/sf1.yaml | 42 +-
.gitignore => benchmark/tpch/config/sf10.yaml | 42 +-
.gitignore => benchmark/tpch/config/sf100.yaml | 42 +-
.gitignore => benchmark/tpch/config/sf1000.yaml | 42 +-
benchmark/tpch/config/tables.yaml | 53 +
benchmark/tpch/infra/Dockerfile | 56 +
benchmark/tpch/infra/spark/bench.py | 118 ++
.../tpch/infra/spark/hudi-defaults.conf | 25 +-
.../tpch/infra/spark/log4j2.properties | 27 +-
.../tpch/infra/spark/spark-defaults.conf | 26 +-
benchmark/tpch/queries/q1.sql | 23 +
benchmark/tpch/queries/q10.sql | 33 +
benchmark/tpch/queries/q11.sql | 29 +
benchmark/tpch/queries/q12.sql | 30 +
benchmark/tpch/queries/q13.sql | 22 +
benchmark/tpch/queries/q14.sql | 15 +
benchmark/tpch/queries/q15.sql | 33 +
benchmark/tpch/queries/q16.sql | 32 +
benchmark/tpch/queries/q17.sql | 19 +
benchmark/tpch/queries/q18.sql | 34 +
benchmark/tpch/queries/q19.sql | 37 +
benchmark/tpch/queries/q2.sql | 45 +
benchmark/tpch/queries/q20.sql | 39 +
benchmark/tpch/queries/q21.sql | 41 +
benchmark/tpch/queries/q22.sql | 39 +
benchmark/tpch/queries/q3.sql | 24 +
benchmark/tpch/queries/q4.sql | 23 +
benchmark/tpch/queries/q5.sql | 26 +
benchmark/tpch/queries/q6.sql | 11 +
benchmark/tpch/queries/q7.sql | 41 +
benchmark/tpch/queries/q8.sql | 39 +
benchmark/tpch/queries/q9.sql | 34 +
benchmark/tpch/run.sh | 458 +++++++
benchmark/tpch/src/config.rs | 248 ++++
benchmark/tpch/src/datagen.rs | 192 +++
benchmark/tpch/src/main.rs | 1268 ++++++++++++++++++++
43 files changed, 3310 insertions(+), 175 deletions(-)
diff --git a/.gitignore b/.dockerignore
similarity index 81%
copy from .gitignore
copy to .dockerignore
index b6e2fa9..70e0351 100644
--- a/.gitignore
+++ b/.dockerignore
@@ -15,24 +15,8 @@
# specific language governing permissions and limitations
# under the License.
-/Cargo.lock
-/target
-**/target
-
-/.idea
-.vscode
-
-# python
-.venv
-venv
-**/.python-version
-__pycache__
-uv.lock
-
-# macOS
-**/.DS_Store
-
-# coverage files
-*.profraw
-cobertura.xml
-/cov-reports/
+target
+.git
+benchmark/tpch/data
+benchmark/tpch/results
+python/.venv
diff --git a/.gitignore b/.gitignore
index b6e2fa9..7373a8d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -18,6 +18,8 @@
/Cargo.lock
/target
**/target
+**/data
+**/results
/.idea
.vscode
diff --git a/.licenserc.yaml b/.licenserc.yaml
index e79c793..57569a6 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -26,6 +26,7 @@ header:
- '**/data/**'
- '.github/PULL_REQUEST_TEMPLATE.md'
- 'crates/core/schemas/**'
+ - 'benchmark/tpch/queries/**'
comment: on-failure
diff --git a/Cargo.toml b/Cargo.toml
index be59e70..b9badbb 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -20,6 +20,7 @@ members = [
"crates/*",
"cpp",
"python",
+ "benchmark/tpch",
]
resolver = "2"
diff --git a/Makefile b/Makefile
index 23198fd..3979066 100644
--- a/Makefile
+++ b/Makefile
@@ -49,7 +49,8 @@ COV_OUTPUT_DIR := ./cov-reports
COV_THRESHOLD ?= 60
COV_EXCLUDE := \
--exclude-files 'cpp/src/*' \
- --exclude-files 'crates/core/src/avro_to_arrow/*'
+ --exclude-files 'crates/core/src/avro_to_arrow/*' \
+ --exclude-files 'benchmark/*'
TARPAULIN_COMMON := --engine llvm --no-dead-code --no-fail-fast \
--all-features --workspace $(COV_EXCLUDE) --skip-clean
@@ -158,3 +159,40 @@ coverage-check: ## Fail if coverage is below threshold
(COV_THRESHOLD=60)
.PHONY: clean-coverage
clean-coverage: ## Remove coverage reports
rm -rf $(COV_OUTPUT_DIR)
+
+# =============================================================================
+# TPC-H Benchmark
+# =============================================================================
+SF ?= 0.001
+ENGINE ?= datafusion
+FORMAT ?= hudi
+MODE ?= native
+QUERIES ?=
+TPCH_DIR := benchmark/tpch
+TPCH_DATA_DIR := $(TPCH_DIR)/data
+TPCH_RESULTS_DIR := $(TPCH_DIR)/results
+
+.PHONY: tpch-generate
+tpch-generate: ## Generate TPC-H parquet tables (SF=0.001)
+ $(info --- Generate TPC-H parquet tables at SF=$(SF) ---)
+ $(TPCH_DIR)/run.sh generate --scale-factor $(SF)
+
+.PHONY: tpch-create-tables
+tpch-create-tables: ## Create Hudi COW tables from parquet (SF=0.001, requires
Docker)
+ $(info --- Create Hudi tables at SF=$(SF) ---)
+ $(TPCH_DIR)/run.sh create-tables --scale-factor $(SF)
+
+.PHONY: bench-tpch
+bench-tpch: ## Run TPC-H benchmark (ENGINE=datafusion|spark SF=0.001
MODE=native|docker QUERIES=1,3,6)
+ $(info --- Benchmark at SF=$(SF) MODE=$(MODE) ---)
+ifeq ($(ENGINE),spark)
+ MODE=$(MODE) $(TPCH_DIR)/run.sh bench-spark --scale-factor $(SF)
--format $(FORMAT) $(if $(QUERIES),--queries $(QUERIES)) --output-dir
$(TPCH_RESULTS_DIR)
+else ifeq ($(ENGINE),datafusion)
+ MODE=$(MODE) $(TPCH_DIR)/run.sh bench-datafusion --scale-factor $(SF)
--format $(FORMAT) $(if $(QUERIES),--queries $(QUERIES)) --output-dir
$(TPCH_RESULTS_DIR)
+else
+ $(error Unknown ENGINE=$(ENGINE). Use datafusion or spark)
+endif
+
+.PHONY: tpch-compare
+tpch-compare: ## Compare persisted TPC-H benchmark results
(ENGINES=datafusion,spark SF=0.001)
+ $(TPCH_DIR)/run.sh compare --scale-factor $(SF) --engines $(ENGINES)
--format $(FORMAT)
diff --git a/.gitignore b/benchmark/tpch/Cargo.toml
similarity index 54%
copy from .gitignore
copy to benchmark/tpch/Cargo.toml
index b6e2fa9..218b899 100644
--- a/.gitignore
+++ b/benchmark/tpch/Cargo.toml
@@ -15,24 +15,30 @@
# specific language governing permissions and limitations
# under the License.
-/Cargo.lock
-/target
-**/target
+[package]
+name = "tpch"
+version.workspace = true
+edition.workspace = true
-/.idea
-.vscode
+[[bin]]
+name = "tpch"
+path = "src/main.rs"
-# python
-.venv
-venv
-**/.python-version
-__pycache__
-uv.lock
-
-# macOS
-**/.DS_Store
-
-# coverage files
-*.profraw
-cobertura.xml
-/cov-reports/
+[dependencies]
+arrow = { workspace = true }
+arrow-array = { workspace = true }
+arrow-cast = { workspace = true }
+clap = { version = "4", features = ["derive"] }
+env_logger = "0.11"
+comfy-table = "7"
+datafusion = { workspace = true }
+hudi = { path = "../../crates/hudi", features = ["datafusion"] }
+object_store = { workspace = true }
+parquet = { workspace = true }
+serde = { workspace = true }
+serde_json = { workspace = true }
+tokio = { workspace = true }
+tpchgen = "2"
+tpchgen-arrow = "2"
+serde_yaml = "0.9"
+url = { workspace = true }
diff --git a/benchmark/tpch/README.md b/benchmark/tpch/README.md
new file mode 100644
index 0000000..ae87196
--- /dev/null
+++ b/benchmark/tpch/README.md
@@ -0,0 +1,63 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# TPC-H Benchmark
+
+## Prerequisites
+
+- Rust toolchain
+- Docker (for Hudi table creation)
+
+## Quick Start
+
+```bash
+# 1. Generate parquet data
+make tpch-generate SF=1
+
+# 2. Create Hudi COW tables from parquet (requires Docker)
+make tpch-create-tables SF=1
+
+# 3. Benchmark (results are automatically persisted)
+make bench-tpch ENGINE=datafusion SF=1
+make bench-tpch ENGINE=spark SF=1
+
+# 4. Compare results
+make tpch-compare ENGINES=datafusion,spark SF=1
+```
+
+## Options
+
+| Variable | Values |
Default |
+|-----------|---------------------------------------------------------|--------------|
+| `ENGINE` | `datafusion`, `spark` |
`datafusion` |
+| `SF` | TPC-H scale factor |
`0.001` |
+| `QUERIES` | Comma-separated query numbers | all 22
|
+| `MODE` | `native`, `docker` |
`native` |
+| `ENGINES` | Comma-separated engine names (for `tpch-compare`) |
|
+
+## Examples
+
+```bash
+# Run only Q1, Q6, Q17
+make bench-tpch QUERIES=1,6,17 SF=10
+
+# Run inside Docker (same apache/spark:3.5.8 base image for fair comparison)
+make bench-tpch ENGINE=datafusion SF=1 MODE=docker
+make bench-tpch ENGINE=spark SF=1 MODE=docker
+```
diff --git a/.gitignore b/benchmark/tpch/config/sf1.yaml
similarity index 57%
copy from .gitignore
copy to benchmark/tpch/config/sf1.yaml
index b6e2fa9..32fc71f 100644
--- a/.gitignore
+++ b/benchmark/tpch/config/sf1.yaml
@@ -15,24 +15,30 @@
# specific language governing permissions and limitations
# under the License.
-/Cargo.lock
-/target
-**/target
+# TPC-H scale factor 1 configuration.
+# Per-scale-factor shuffle parallelism and Spark/Hudi tuning parameters.
-/.idea
-.vscode
+shuffle_parallelism:
+ customer: 1
+ lineitem: 1
+ nation: 1
+ orders: 1
+ part: 1
+ partsupp: 1
+ region: 1
+ supplier: 1
-# python
-.venv
-venv
-**/.python-version
-__pycache__
-uv.lock
+# Spark configs for 'create-tables' command
+create_tables:
+ spark_conf:
+ spark.driver.memory: 3g
-# macOS
-**/.DS_Store
-
-# coverage files
-*.profraw
-cobertura.xml
-/cov-reports/
+# Benchmark settings (shared across DataFusion and Spark)
+bench:
+ warmup: 1
+ iterations: 2
+ memory_limit: 3g
+ spark_conf:
+ spark.driver.memory: 3g
+ spark.sql.shuffle.partitions: "4"
+ spark.sql.autoBroadcastJoinThreshold: "33554432"
diff --git a/.gitignore b/benchmark/tpch/config/sf10.yaml
similarity index 57%
copy from .gitignore
copy to benchmark/tpch/config/sf10.yaml
index b6e2fa9..75d75d4 100644
--- a/.gitignore
+++ b/benchmark/tpch/config/sf10.yaml
@@ -15,24 +15,30 @@
# specific language governing permissions and limitations
# under the License.
-/Cargo.lock
-/target
-**/target
+# TPC-H scale factor 10 configuration.
+# Per-scale-factor shuffle parallelism and Spark/Hudi tuning parameters.
-/.idea
-.vscode
+shuffle_parallelism:
+ customer: 1
+ lineitem: 8
+ nation: 1
+ orders: 2
+ part: 1
+ partsupp: 2
+ region: 1
+ supplier: 1
-# python
-.venv
-venv
-**/.python-version
-__pycache__
-uv.lock
+# Spark configs for 'create-tables' command
+create_tables:
+ spark_conf:
+ spark.driver.memory: 12g
-# macOS
-**/.DS_Store
-
-# coverage files
-*.profraw
-cobertura.xml
-/cov-reports/
+# Benchmark settings (shared across DataFusion and Spark)
+bench:
+ warmup: 1
+ iterations: 3
+ memory_limit: 12g
+ spark_conf:
+ spark.driver.memory: 12g
+ spark.sql.shuffle.partitions: "16"
+ spark.sql.autoBroadcastJoinThreshold: "67108864"
diff --git a/.gitignore b/benchmark/tpch/config/sf100.yaml
similarity index 57%
copy from .gitignore
copy to benchmark/tpch/config/sf100.yaml
index b6e2fa9..6c243aa 100644
--- a/.gitignore
+++ b/benchmark/tpch/config/sf100.yaml
@@ -15,24 +15,30 @@
# specific language governing permissions and limitations
# under the License.
-/Cargo.lock
-/target
-**/target
+# TPC-H scale factor 100 configuration.
+# Per-scale-factor shuffle parallelism and Spark/Hudi tuning parameters.
-/.idea
-.vscode
+shuffle_parallelism:
+ customer: 3
+ lineitem: 73
+ nation: 1
+ orders: 17
+ part: 3
+ partsupp: 12
+ region: 1
+ supplier: 1
-# python
-.venv
-venv
-**/.python-version
-__pycache__
-uv.lock
+# Spark configs for 'create-tables' command
+create_tables:
+ spark_conf:
+ spark.driver.memory: 60g
-# macOS
-**/.DS_Store
-
-# coverage files
-*.profraw
-cobertura.xml
-/cov-reports/
+# Benchmark settings (shared across DataFusion and Spark)
+bench:
+ warmup: 2
+ iterations: 3
+ memory_limit: 60g
+ spark_conf:
+ spark.driver.memory: 60g
+ spark.sql.shuffle.partitions: "200"
+ spark.sql.autoBroadcastJoinThreshold: "134217728"
diff --git a/.gitignore b/benchmark/tpch/config/sf1000.yaml
similarity index 56%
copy from .gitignore
copy to benchmark/tpch/config/sf1000.yaml
index b6e2fa9..d600e55 100644
--- a/.gitignore
+++ b/benchmark/tpch/config/sf1000.yaml
@@ -15,24 +15,30 @@
# specific language governing permissions and limitations
# under the License.
-/Cargo.lock
-/target
-**/target
+# TPC-H scale factor 1000 configuration.
+# Per-scale-factor shuffle parallelism and Spark/Hudi tuning parameters.
-/.idea
-.vscode
+shuffle_parallelism:
+ customer: 12
+ lineitem: 292
+ nation: 1
+ orders: 68
+ part: 12
+ partsupp: 48
+ region: 1
+ supplier: 4
-# python
-.venv
-venv
-**/.python-version
-__pycache__
-uv.lock
+# Spark configs for 'create-tables' command
+create_tables:
+ spark_conf:
+ spark.driver.memory: 120g
-# macOS
-**/.DS_Store
-
-# coverage files
-*.profraw
-cobertura.xml
-/cov-reports/
+# Benchmark settings (shared across DataFusion and Spark)
+bench:
+ warmup: 2
+ iterations: 3
+ memory_limit: 120g
+ spark_conf:
+ spark.driver.memory: 120g
+ spark.sql.shuffle.partitions: "1600"
+ spark.sql.autoBroadcastJoinThreshold: "536870912"
diff --git a/benchmark/tpch/config/tables.yaml
b/benchmark/tpch/config/tables.yaml
new file mode 100644
index 0000000..cefb465
--- /dev/null
+++ b/benchmark/tpch/config/tables.yaml
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Common TPC-H table definitions shared across all scale factors.
+# Per-scale-factor configs (sf*.yaml) override shuffle_parallelism.
+
+tables:
+ customer:
+ primary_key: c_custkey
+ pre_combine_field: c_custkey
+ record_size_estimate: 87
+ lineitem:
+ primary_key: l_orderkey,l_linenumber
+ pre_combine_field: l_shipdate
+ record_size_estimate: 36
+ nation:
+ primary_key: n_nationkey
+ pre_combine_field: n_nationkey
+ record_size_estimate: 160
+ orders:
+ primary_key: o_orderkey
+ pre_combine_field: o_orderdate
+ record_size_estimate: 35
+ part:
+ primary_key: p_partkey
+ pre_combine_field: p_partkey
+ record_size_estimate: 32
+ partsupp:
+ primary_key: ps_partkey,ps_suppkey
+ pre_combine_field: ps_partkey
+ record_size_estimate: 48
+ region:
+ primary_key: r_regionkey
+ pre_combine_field: r_regionkey
+ record_size_estimate: 800
+ supplier:
+ primary_key: s_suppkey
+ pre_combine_field: s_suppkey
+ record_size_estimate: 88
diff --git a/benchmark/tpch/infra/Dockerfile b/benchmark/tpch/infra/Dockerfile
new file mode 100644
index 0000000..9875293
--- /dev/null
+++ b/benchmark/tpch/infra/Dockerfile
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# ---------- Stage 1: build the tpch binary ----------
+FROM rust:1.88-bookworm AS builder
+
+WORKDIR /build
+
+# Copy the full workspace (Cargo.toml, crates, benchmark, etc.)
+COPY Cargo.toml Cargo.lock ./
+COPY crates crates
+COPY benchmark/tpch benchmark/tpch
+# Stub out workspace members that aren't needed for the tpch build
+RUN mkdir -p cpp/src && echo "fn main() {}" > cpp/src/main.rs \
+ && printf '[package]\nname = "hudi-cpp"\nversion = "0.0.0"\nedition =
"2024"\n' > cpp/Cargo.toml \
+ && mkdir -p python/src && touch python/src/lib.rs \
+ && printf '[package]\nname = "hudi-python"\nversion = "0.0.0"\nedition =
"2024"\n' > python/Cargo.toml
+
+RUN cargo build -p tpch --release \
+ && strip target/release/tpch
+
+# ---------- Stage 2: runtime with Spark + Hudi + tpch ----------
+FROM apache/spark:3.5.8
+
+USER root
+
+# Download Hudi Spark bundle jar
+RUN cd /opt/spark/jars && \
+ wget -q \
+
https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.5-bundle_2.12/1.1.1/hudi-spark3.5-bundle_2.12-1.1.1.jar
+
+# Bake in Spark and Hudi defaults, and logging config
+COPY benchmark/tpch/infra/spark/spark-defaults.conf
/opt/spark/conf/spark-defaults.conf
+COPY benchmark/tpch/infra/spark/log4j2.properties
/opt/spark/conf/log4j2.properties
+COPY benchmark/tpch/infra/spark/hudi-defaults.conf
/etc/hudi/conf/hudi-defaults.conf
+
+# Install tpch binary
+COPY --from=builder /build/target/release/tpch /usr/local/bin/tpch
+
+USER spark
+
+WORKDIR /opt/spark/work-dir
diff --git a/benchmark/tpch/infra/spark/bench.py
b/benchmark/tpch/infra/spark/bench.py
new file mode 100644
index 0000000..e473c18
--- /dev/null
+++ b/benchmark/tpch/infra/spark/bench.py
@@ -0,0 +1,118 @@
+#!/usr/bin/env python3
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""TPC-H benchmark runner for Spark SQL with Hudi tables.
+
+Measures wall-clock time around spark.sql().collect() for each query iteration,
+mirroring how DataFusion benchmarks are measured.
+"""
+
+import argparse
+import json
+import os
+import time
+
+from pyspark.sql import SparkSession
+
+TPCH_TABLES = [
+ "customer", "lineitem", "nation", "orders",
+ "part", "partsupp", "region", "supplier",
+]
+
+
+def load_query(query_dir, query_num, scale_factor):
+ path = os.path.join(query_dir, f"q{query_num}.sql")
+ with open(path) as f:
+ sql = f.read()
+ q11_fraction = f"{0.0001 / scale_factor:.10f}"
+ return sql.replace("${Q11_FRACTION}", q11_fraction)
+
+
+def main():
+ parser = argparse.ArgumentParser(description="TPC-H Spark SQL benchmark")
+ parser.add_argument("--hudi-base", default=None)
+ parser.add_argument("--parquet-base", default=None)
+ parser.add_argument("--query-dir", required=True)
+ parser.add_argument("--scale-factor", type=float, default=1.0)
+ parser.add_argument("--queries", default=None, help="Comma-separated query
numbers")
+ parser.add_argument("--warmup", type=int, required=True)
+ parser.add_argument("--iterations", type=int, required=True)
+ parser.add_argument("--output", required=True, help="Output file for JSON
results")
+ args = parser.parse_args()
+
+ if not args.hudi_base and not args.parquet_base:
+ parser.error("at least one of --hudi-base or --parquet-base is
required")
+
+ query_nums = list(range(1, 23))
+ if args.queries:
+ query_nums = [int(q.strip()) for q in args.queries.split(",")]
+
+ total_runs = args.warmup + args.iterations
+
+ print("Initializing Spark session...", flush=True)
+ spark = SparkSession.builder.getOrCreate()
+
+ # Register tables
+ if args.hudi_base:
+ for table in TPCH_TABLES:
+ print(f" Registering Hudi table: {table}", flush=True)
+ spark.sql(
+ f"CREATE TABLE {table} USING hudi LOCATION
'{args.hudi_base}/{table}'"
+ )
+ elif args.parquet_base:
+ for table in TPCH_TABLES:
+ print(f" Registering Parquet table: {table}", flush=True)
+
spark.read.parquet(f"{args.parquet_base}/{table}").createOrReplaceTempView(table)
+
+ print(
+ f"Warmup: {args.warmup} iteration(s), Measured: {args.iterations}
iteration(s)",
+ flush=True,
+ )
+
+ result_file = open(args.output, "w")
+
+ for qn in query_nums:
+ sql = load_query(args.query_dir, qn, args.scale_factor)
+ statements = [s.strip() for s in sql.split(";") if s.strip()]
+
+ for i in range(total_runs):
+ is_warmup = i < args.warmup
+ if is_warmup:
+ label = f"warmup {i + 1}/{args.warmup}"
+ else:
+ label = f"iter {i - args.warmup + 1}/{args.iterations}"
+ print(f" Q{qn:02d} {label}...", end="", flush=True)
+
+ start = time.time()
+ for stmt in statements:
+ spark.sql(stmt).collect()
+ elapsed_ms = (time.time() - start) * 1000.0
+
+ print(f" {elapsed_ms:.1f}ms", flush=True)
+
+ if not is_warmup:
+ result_file.write(json.dumps({"query": qn, "elapsed_ms":
elapsed_ms}) + "\n")
+ result_file.flush()
+
+ result_file.close()
+ spark.stop()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/.gitignore b/benchmark/tpch/infra/spark/hudi-defaults.conf
similarity index 81%
copy from .gitignore
copy to benchmark/tpch/infra/spark/hudi-defaults.conf
index b6e2fa9..9d42b24 100644
--- a/.gitignore
+++ b/benchmark/tpch/infra/spark/hudi-defaults.conf
@@ -15,24 +15,7 @@
# specific language governing permissions and limitations
# under the License.
-/Cargo.lock
-/target
-**/target
-
-/.idea
-.vscode
-
-# python
-.venv
-venv
-**/.python-version
-__pycache__
-uv.lock
-
-# macOS
-**/.DS_Store
-
-# coverage files
-*.profraw
-cobertura.xml
-/cov-reports/
+hoodie.bulkinsert.sort.mode=GLOBAL_SORT
+hoodie.parquet.max.file.size=1073741824
+hoodie.parquet.small.file.limit=0
+hoodie.bulkinsert.shuffle.parallelism=1
diff --git a/.gitignore b/benchmark/tpch/infra/spark/log4j2.properties
similarity index 73%
copy from .gitignore
copy to benchmark/tpch/infra/spark/log4j2.properties
index b6e2fa9..f3de794 100644
--- a/.gitignore
+++ b/benchmark/tpch/infra/spark/log4j2.properties
@@ -15,24 +15,11 @@
# specific language governing permissions and limitations
# under the License.
-/Cargo.lock
-/target
-**/target
+rootLogger.level = error
+rootLogger.appenderRef.stdout.ref = console
-/.idea
-.vscode
-
-# python
-.venv
-venv
-**/.python-version
-__pycache__
-uv.lock
-
-# macOS
-**/.DS_Store
-
-# coverage files
-*.profraw
-cobertura.xml
-/cov-reports/
+appender.console.type = Console
+appender.console.name = console
+appender.console.target = SYSTEM_ERR
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %-5p %c{1}: %m%n
diff --git a/.gitignore b/benchmark/tpch/infra/spark/spark-defaults.conf
similarity index 67%
copy from .gitignore
copy to benchmark/tpch/infra/spark/spark-defaults.conf
index b6e2fa9..ceac52c 100644
--- a/.gitignore
+++ b/benchmark/tpch/infra/spark/spark-defaults.conf
@@ -15,24 +15,8 @@
# specific language governing permissions and limitations
# under the License.
-/Cargo.lock
-/target
-**/target
-
-/.idea
-.vscode
-
-# python
-.venv
-venv
-**/.python-version
-__pycache__
-uv.lock
-
-# macOS
-**/.DS_Store
-
-# coverage files
-*.profraw
-cobertura.xml
-/cov-reports/
+spark.serializer
org.apache.spark.serializer.KryoSerializer
+spark.sql.catalog.spark_catalog
org.apache.spark.sql.hudi.catalog.HoodieCatalog
+spark.sql.extensions
org.apache.spark.sql.hudi.HoodieSparkSessionExtension
+spark.sql.files.maxPartitionBytes 1073741824
+spark.sql.parquet.aggregatePushdown true
diff --git a/benchmark/tpch/queries/q1.sql b/benchmark/tpch/queries/q1.sql
new file mode 100644
index 0000000..0dc4c3e
--- /dev/null
+++ b/benchmark/tpch/queries/q1.sql
@@ -0,0 +1,23 @@
+-- SQLBench-H query 1 derived from TPC-H query 1 under the terms of the TPC
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance
Council.
+select
+ l_returnflag,
+ l_linestatus,
+ sum(l_quantity) as sum_qty,
+ sum(l_extendedprice) as sum_base_price,
+ sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
+ sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
+ avg(l_quantity) as avg_qty,
+ avg(l_extendedprice) as avg_price,
+ avg(l_discount) as avg_disc,
+ count(*) as count_order
+from
+ lineitem
+where
+ l_shipdate <= date '1998-12-01' - interval '68 days'
+group by
+ l_returnflag,
+ l_linestatus
+order by
+ l_returnflag,
+ l_linestatus;
diff --git a/benchmark/tpch/queries/q10.sql b/benchmark/tpch/queries/q10.sql
new file mode 100644
index 0000000..576338f
--- /dev/null
+++ b/benchmark/tpch/queries/q10.sql
@@ -0,0 +1,33 @@
+-- SQLBench-H query 10 derived from TPC-H query 10 under the terms of the TPC
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance
Council.
+select
+ c_custkey,
+ c_name,
+ sum(l_extendedprice * (1 - l_discount)) as revenue,
+ c_acctbal,
+ n_name,
+ c_address,
+ c_phone,
+ c_comment
+from
+ customer,
+ orders,
+ lineitem,
+ nation
+where
+ c_custkey = o_custkey
+ and l_orderkey = o_orderkey
+ and o_orderdate >= date '1993-07-01'
+ and o_orderdate < date '1993-07-01' + interval '3' month
+ and l_returnflag = 'R'
+ and c_nationkey = n_nationkey
+group by
+ c_custkey,
+ c_name,
+ c_acctbal,
+ c_phone,
+ n_name,
+ c_address,
+ c_comment
+order by
+ revenue desc limit 20;
diff --git a/benchmark/tpch/queries/q11.sql b/benchmark/tpch/queries/q11.sql
new file mode 100644
index 0000000..5e31dad
--- /dev/null
+++ b/benchmark/tpch/queries/q11.sql
@@ -0,0 +1,29 @@
+-- SQLBench-H query 11 derived from TPC-H query 11 under the terms of the TPC
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance
Council.
+select
+ ps_partkey,
+ sum(ps_supplycost * ps_availqty) as value
+from
+ partsupp,
+ supplier,
+ nation
+where
+ ps_suppkey = s_suppkey
+ and s_nationkey = n_nationkey
+ and n_name = 'ALGERIA'
+group by
+ ps_partkey having
+ sum(ps_supplycost * ps_availqty) > (
+ select
+ sum(ps_supplycost * ps_availqty) *
${Q11_FRACTION}
+ from
+ partsupp,
+ supplier,
+ nation
+ where
+ ps_suppkey = s_suppkey
+ and s_nationkey = n_nationkey
+ and n_name = 'ALGERIA'
+ )
+order by
+ value desc;
diff --git a/benchmark/tpch/queries/q12.sql b/benchmark/tpch/queries/q12.sql
new file mode 100644
index 0000000..4ab4ea6
--- /dev/null
+++ b/benchmark/tpch/queries/q12.sql
@@ -0,0 +1,30 @@
+-- SQLBench-H query 12 derived from TPC-H query 12 under the terms of the TPC
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance
Council.
+select
+ l_shipmode,
+ sum(case
+ when o_orderpriority = '1-URGENT'
+ or o_orderpriority = '2-HIGH'
+ then 1
+ else 0
+ end) as high_line_count,
+ sum(case
+ when o_orderpriority <> '1-URGENT'
+ and o_orderpriority <> '2-HIGH'
+ then 1
+ else 0
+ end) as low_line_count
+from
+ orders,
+ lineitem
+where
+ o_orderkey = l_orderkey
+ and l_shipmode in ('FOB', 'SHIP')
+ and l_commitdate < l_receiptdate
+ and l_shipdate < l_commitdate
+ and l_receiptdate >= date '1995-01-01'
+ and l_receiptdate < date '1995-01-01' + interval '1' year
+group by
+ l_shipmode
+order by
+ l_shipmode;
diff --git a/benchmark/tpch/queries/q13.sql b/benchmark/tpch/queries/q13.sql
new file mode 100644
index 0000000..301e35d
--- /dev/null
+++ b/benchmark/tpch/queries/q13.sql
@@ -0,0 +1,22 @@
+-- SQLBench-H query 13 derived from TPC-H query 13 under the terms of the TPC
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance
Council.
+select
+ c_count,
+ count(*) as custdist
+from
+ (
+ select
+ c_custkey,
+ count(o_orderkey)
+ from
+ customer left outer join orders on
+ c_custkey = o_custkey
+ and o_comment not like '%express%requests%'
+ group by
+ c_custkey
+ ) as c_orders (c_custkey, c_count)
+group by
+ c_count
+order by
+ custdist desc,
+ c_count desc;
diff --git a/benchmark/tpch/queries/q14.sql b/benchmark/tpch/queries/q14.sql
new file mode 100644
index 0000000..6040ac7
--- /dev/null
+++ b/benchmark/tpch/queries/q14.sql
@@ -0,0 +1,15 @@
+-- SQLBench-H query 14 derived from TPC-H query 14 under the terms of the TPC
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance
Council.
+select
+ 100.00 * sum(case
+ when p_type like 'PROMO%'
+ then l_extendedprice * (1 - l_discount)
+ else 0
+ end) / sum(l_extendedprice * (1 - l_discount)) as promo_revenue
+from
+ lineitem,
+ part
+where
+ l_partkey = p_partkey
+ and l_shipdate >= date '1995-02-01'
+ and l_shipdate < date '1995-02-01' + interval '1' month;
diff --git a/benchmark/tpch/queries/q15.sql b/benchmark/tpch/queries/q15.sql
new file mode 100644
index 0000000..c656dc3
--- /dev/null
+++ b/benchmark/tpch/queries/q15.sql
@@ -0,0 +1,33 @@
+-- SQLBench-H query 15 derived from TPC-H query 15 under the terms of the TPC
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance
Council.
+create view revenue0 (supplier_no, total_revenue) as
+ select
+ l_suppkey,
+ sum(l_extendedprice * (1 - l_discount)) as total_revenue
+ from
+ lineitem
+ where
+ l_shipdate >= date '1996-08-01'
+ and l_shipdate < date '1996-08-01' + interval '3' month
+ group by
+ l_suppkey;
+select
+ s_suppkey,
+ s_name,
+ s_address,
+ s_phone,
+ total_revenue
+from
+ supplier,
+ revenue0
+where
+ s_suppkey = supplier_no
+ and total_revenue = (
+ select
+ max(total_revenue)
+ from
+ revenue0
+ )
+order by
+ s_suppkey;
+drop view revenue0;
diff --git a/benchmark/tpch/queries/q16.sql b/benchmark/tpch/queries/q16.sql
new file mode 100644
index 0000000..7fdf365
--- /dev/null
+++ b/benchmark/tpch/queries/q16.sql
@@ -0,0 +1,32 @@
+-- SQLBench-H query 16 derived from TPC-H query 16 under the terms of the TPC
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance
Council.
+select
+ p_brand,
+ p_type,
+ p_size,
+ count(distinct ps_suppkey) as supplier_cnt
+from
+ partsupp,
+ part
+where
+ p_partkey = ps_partkey
+ and p_brand <> 'Brand#14'
+ and p_type not like 'SMALL PLATED%'
+ and p_size in (14, 6, 5, 31, 49, 15, 41, 47)
+ and ps_suppkey not in (
+ select
+ s_suppkey
+ from
+ supplier
+ where
+ s_comment like '%Customer%Complaints%'
+ )
+group by
+ p_brand,
+ p_type,
+ p_size
+order by
+ supplier_cnt desc,
+ p_brand,
+ p_type,
+ p_size;
diff --git a/benchmark/tpch/queries/q17.sql b/benchmark/tpch/queries/q17.sql
new file mode 100644
index 0000000..ffa0f15
--- /dev/null
+++ b/benchmark/tpch/queries/q17.sql
@@ -0,0 +1,19 @@
+-- SQLBench-H query 17 derived from TPC-H query 17 under the terms of the TPC
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance
Council.
+select
+ sum(l_extendedprice) / 7.0 as avg_yearly
+from
+ lineitem,
+ part
+where
+ p_partkey = l_partkey
+ and p_brand = 'Brand#42'
+ and p_container = 'LG BAG'
+ and l_quantity < (
+ select
+ 0.2 * avg(l_quantity)
+ from
+ lineitem
+ where
+ l_partkey = p_partkey
+ );
diff --git a/benchmark/tpch/queries/q18.sql b/benchmark/tpch/queries/q18.sql
new file mode 100644
index 0000000..f4ab194
--- /dev/null
+++ b/benchmark/tpch/queries/q18.sql
@@ -0,0 +1,34 @@
+-- SQLBench-H query 18 derived from TPC-H query 18 under the terms of the TPC
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance
Council.
+select
+ c_name,
+ c_custkey,
+ o_orderkey,
+ o_orderdate,
+ o_totalprice,
+ sum(l_quantity)
+from
+ customer,
+ orders,
+ lineitem
+where
+ o_orderkey in (
+ select
+ l_orderkey
+ from
+ lineitem
+ group by
+ l_orderkey having
+ sum(l_quantity) > 313
+ )
+ and c_custkey = o_custkey
+ and o_orderkey = l_orderkey
+group by
+ c_name,
+ c_custkey,
+ o_orderkey,
+ o_orderdate,
+ o_totalprice
+order by
+ o_totalprice desc,
+ o_orderdate limit 100;
diff --git a/benchmark/tpch/queries/q19.sql b/benchmark/tpch/queries/q19.sql
new file mode 100644
index 0000000..ad5fb7d
--- /dev/null
+++ b/benchmark/tpch/queries/q19.sql
@@ -0,0 +1,37 @@
+-- SQLBench-H query 19 derived from TPC-H query 19 under the terms of the TPC
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance
Council.
+select
+ sum(l_extendedprice* (1 - l_discount)) as revenue
+from
+ lineitem,
+ part
+where
+ (
+ p_partkey = l_partkey
+ and p_brand = 'Brand#21'
+ and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG')
+ and l_quantity >= 8 and l_quantity <= 8 + 10
+ and p_size between 1 and 5
+ and l_shipmode in ('AIR', 'AIR REG')
+ and l_shipinstruct = 'DELIVER IN PERSON'
+ )
+ or
+ (
+ p_partkey = l_partkey
+ and p_brand = 'Brand#13'
+ and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK')
+ and l_quantity >= 20 and l_quantity <= 20 + 10
+ and p_size between 1 and 10
+ and l_shipmode in ('AIR', 'AIR REG')
+ and l_shipinstruct = 'DELIVER IN PERSON'
+ )
+ or
+ (
+ p_partkey = l_partkey
+ and p_brand = 'Brand#52'
+ and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG')
+ and l_quantity >= 30 and l_quantity <= 30 + 10
+ and p_size between 1 and 15
+ and l_shipmode in ('AIR', 'AIR REG')
+ and l_shipinstruct = 'DELIVER IN PERSON'
+ );
diff --git a/benchmark/tpch/queries/q2.sql b/benchmark/tpch/queries/q2.sql
new file mode 100644
index 0000000..2936532
--- /dev/null
+++ b/benchmark/tpch/queries/q2.sql
@@ -0,0 +1,45 @@
+-- SQLBench-H query 2 derived from TPC-H query 2 under the terms of the TPC
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance
Council.
+select
+ s_acctbal,
+ s_name,
+ n_name,
+ p_partkey,
+ p_mfgr,
+ s_address,
+ s_phone,
+ s_comment
+from
+ part,
+ supplier,
+ partsupp,
+ nation,
+ region
+where
+ p_partkey = ps_partkey
+ and s_suppkey = ps_suppkey
+ and p_size = 48
+ and p_type like '%TIN'
+ and s_nationkey = n_nationkey
+ and n_regionkey = r_regionkey
+ and r_name = 'ASIA'
+ and ps_supplycost = (
+ select
+ min(ps_supplycost)
+ from
+ partsupp,
+ supplier,
+ nation,
+ region
+ where
+ p_partkey = ps_partkey
+ and s_suppkey = ps_suppkey
+ and s_nationkey = n_nationkey
+ and n_regionkey = r_regionkey
+ and r_name = 'ASIA'
+ )
+order by
+ s_acctbal desc,
+ n_name,
+ s_name,
+ p_partkey limit 100;
diff --git a/benchmark/tpch/queries/q20.sql b/benchmark/tpch/queries/q20.sql
new file mode 100644
index 0000000..3136ca3
--- /dev/null
+++ b/benchmark/tpch/queries/q20.sql
@@ -0,0 +1,39 @@
+-- SQLBench-H query 20 derived from TPC-H query 20 under the terms of the TPC
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance
Council.
+select
+ s_name,
+ s_address
+from
+ supplier,
+ nation
+where
+ s_suppkey in (
+ select
+ ps_suppkey
+ from
+ partsupp
+ where
+ ps_partkey in (
+ select
+ p_partkey
+ from
+ part
+ where
+ p_name like 'blanched%'
+ )
+ and ps_availqty > (
+ select
+ 0.5 * sum(l_quantity)
+ from
+ lineitem
+ where
+ l_partkey = ps_partkey
+ and l_suppkey = ps_suppkey
+ and l_shipdate >= date '1993-01-01'
+ and l_shipdate < date '1993-01-01' +
interval '1' year
+ )
+ )
+ and s_nationkey = n_nationkey
+ and n_name = 'KENYA'
+order by
+ s_name;
diff --git a/benchmark/tpch/queries/q21.sql b/benchmark/tpch/queries/q21.sql
new file mode 100644
index 0000000..0170469
--- /dev/null
+++ b/benchmark/tpch/queries/q21.sql
@@ -0,0 +1,41 @@
+-- SQLBench-H query 21 derived from TPC-H query 21 under the terms of the TPC
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance
Council.
+select
+ s_name,
+ count(*) as numwait
+from
+ supplier,
+ lineitem l1,
+ orders,
+ nation
+where
+ s_suppkey = l1.l_suppkey
+ and o_orderkey = l1.l_orderkey
+ and o_orderstatus = 'F'
+ and l1.l_receiptdate > l1.l_commitdate
+ and exists (
+ select
+ *
+ from
+ lineitem l2
+ where
+ l2.l_orderkey = l1.l_orderkey
+ and l2.l_suppkey <> l1.l_suppkey
+ )
+ and not exists (
+ select
+ *
+ from
+ lineitem l3
+ where
+ l3.l_orderkey = l1.l_orderkey
+ and l3.l_suppkey <> l1.l_suppkey
+ and l3.l_receiptdate > l3.l_commitdate
+ )
+ and s_nationkey = n_nationkey
+ and n_name = 'ARGENTINA'
+group by
+ s_name
+order by
+ numwait desc,
+ s_name limit 100;
diff --git a/benchmark/tpch/queries/q22.sql b/benchmark/tpch/queries/q22.sql
new file mode 100644
index 0000000..8d528ef
--- /dev/null
+++ b/benchmark/tpch/queries/q22.sql
@@ -0,0 +1,39 @@
+-- SQLBench-H query 22 derived from TPC-H query 22 under the terms of the TPC
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance
Council.
+select
+ cntrycode,
+ count(*) as numcust,
+ sum(c_acctbal) as totacctbal
+from
+ (
+ select
+ substring(c_phone from 1 for 2) as cntrycode,
+ c_acctbal
+ from
+ customer
+ where
+ substring(c_phone from 1 for 2) in
+ ('24', '34', '16', '30', '33', '14', '13')
+ and c_acctbal > (
+ select
+ avg(c_acctbal)
+ from
+ customer
+ where
+ c_acctbal > 0.00
+ and substring(c_phone from 1 for 2) in
+ ('24', '34', '16', '30', '33',
'14', '13')
+ )
+ and not exists (
+ select
+ *
+ from
+ orders
+ where
+ o_custkey = c_custkey
+ )
+ ) as custsale
+group by
+ cntrycode
+order by
+ cntrycode;
diff --git a/benchmark/tpch/queries/q3.sql b/benchmark/tpch/queries/q3.sql
new file mode 100644
index 0000000..b60be7f
--- /dev/null
+++ b/benchmark/tpch/queries/q3.sql
@@ -0,0 +1,24 @@
+-- SQLBench-H query 3 derived from TPC-H query 3 under the terms of the TPC
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance
Council.
+select
+ l_orderkey,
+ sum(l_extendedprice * (1 - l_discount)) as revenue,
+ o_orderdate,
+ o_shippriority
+from
+ customer,
+ orders,
+ lineitem
+where
+ c_mktsegment = 'BUILDING'
+ and c_custkey = o_custkey
+ and l_orderkey = o_orderkey
+ and o_orderdate < date '1995-03-15'
+ and l_shipdate > date '1995-03-15'
+group by
+ l_orderkey,
+ o_orderdate,
+ o_shippriority
+order by
+ revenue desc,
+ o_orderdate limit 10;
diff --git a/benchmark/tpch/queries/q4.sql b/benchmark/tpch/queries/q4.sql
new file mode 100644
index 0000000..05fae97
--- /dev/null
+++ b/benchmark/tpch/queries/q4.sql
@@ -0,0 +1,23 @@
+-- SQLBench-H query 4 derived from TPC-H query 4 under the terms of the TPC
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance
Council.
+select
+ o_orderpriority,
+ count(*) as order_count
+from
+ orders
+where
+ o_orderdate >= date '1995-04-01'
+ and o_orderdate < date '1995-04-01' + interval '3' month
+ and exists (
+ select
+ *
+ from
+ lineitem
+ where
+ l_orderkey = o_orderkey
+ and l_commitdate < l_receiptdate
+ )
+group by
+ o_orderpriority
+order by
+ o_orderpriority;
diff --git a/benchmark/tpch/queries/q5.sql b/benchmark/tpch/queries/q5.sql
new file mode 100644
index 0000000..4b97ef0
--- /dev/null
+++ b/benchmark/tpch/queries/q5.sql
@@ -0,0 +1,26 @@
+-- SQLBench-H query 5 derived from TPC-H query 5 under the terms of the TPC
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance
Council.
+select
+ n_name,
+ sum(l_extendedprice * (1 - l_discount)) as revenue
+from
+ customer,
+ orders,
+ lineitem,
+ supplier,
+ nation,
+ region
+where
+ c_custkey = o_custkey
+ and l_orderkey = o_orderkey
+ and l_suppkey = s_suppkey
+ and c_nationkey = s_nationkey
+ and s_nationkey = n_nationkey
+ and n_regionkey = r_regionkey
+ and r_name = 'AFRICA'
+ and o_orderdate >= date '1994-01-01'
+ and o_orderdate < date '1994-01-01' + interval '1' year
+group by
+ n_name
+order by
+ revenue desc;
diff --git a/benchmark/tpch/queries/q6.sql b/benchmark/tpch/queries/q6.sql
new file mode 100644
index 0000000..f5b4bae
--- /dev/null
+++ b/benchmark/tpch/queries/q6.sql
@@ -0,0 +1,11 @@
+-- SQLBench-H query 6 derived from TPC-H query 6 under the terms of the TPC
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance
Council.
+select
+ sum(l_extendedprice * l_discount) as revenue
+from
+ lineitem
+where
+ l_shipdate >= date '1994-01-01'
+ and l_shipdate < date '1994-01-01' + interval '1' year
+ and l_discount between 0.04 - 0.01 and 0.04 + 0.01
+ and l_quantity < 24;
diff --git a/benchmark/tpch/queries/q7.sql b/benchmark/tpch/queries/q7.sql
new file mode 100644
index 0000000..f3919be
--- /dev/null
+++ b/benchmark/tpch/queries/q7.sql
@@ -0,0 +1,41 @@
+-- SQLBench-H query 7 derived from TPC-H query 7 under the terms of the TPC
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance
Council.
+select
+ supp_nation,
+ cust_nation,
+ l_year,
+ sum(volume) as revenue
+from
+ (
+ select
+ n1.n_name as supp_nation,
+ n2.n_name as cust_nation,
+ extract(year from l_shipdate) as l_year,
+ l_extendedprice * (1 - l_discount) as volume
+ from
+ supplier,
+ lineitem,
+ orders,
+ customer,
+ nation n1,
+ nation n2
+ where
+ s_suppkey = l_suppkey
+ and o_orderkey = l_orderkey
+ and c_custkey = o_custkey
+ and s_nationkey = n1.n_nationkey
+ and c_nationkey = n2.n_nationkey
+ and (
+ (n1.n_name = 'GERMANY' and n2.n_name = 'IRAQ')
+ or (n1.n_name = 'IRAQ' and n2.n_name =
'GERMANY')
+ )
+ and l_shipdate between date '1995-01-01' and date
'1996-12-31'
+ ) as shipping
+group by
+ supp_nation,
+ cust_nation,
+ l_year
+order by
+ supp_nation,
+ cust_nation,
+ l_year;
diff --git a/benchmark/tpch/queries/q8.sql b/benchmark/tpch/queries/q8.sql
new file mode 100644
index 0000000..7c53e14
--- /dev/null
+++ b/benchmark/tpch/queries/q8.sql
@@ -0,0 +1,39 @@
+-- SQLBench-H query 8 derived from TPC-H query 8 under the terms of the TPC
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance
Council.
+select
+ o_year,
+ sum(case
+ when nation = 'IRAQ' then volume
+ else 0
+ end) / sum(volume) as mkt_share
+from
+ (
+ select
+ extract(year from o_orderdate) as o_year,
+ l_extendedprice * (1 - l_discount) as volume,
+ n2.n_name as nation
+ from
+ part,
+ supplier,
+ lineitem,
+ orders,
+ customer,
+ nation n1,
+ nation n2,
+ region
+ where
+ p_partkey = l_partkey
+ and s_suppkey = l_suppkey
+ and l_orderkey = o_orderkey
+ and o_custkey = c_custkey
+ and c_nationkey = n1.n_nationkey
+ and n1.n_regionkey = r_regionkey
+ and r_name = 'MIDDLE EAST'
+ and s_nationkey = n2.n_nationkey
+ and o_orderdate between date '1995-01-01' and date
'1996-12-31'
+ and p_type = 'LARGE PLATED STEEL'
+ ) as all_nations
+group by
+ o_year
+order by
+ o_year;
diff --git a/benchmark/tpch/queries/q9.sql b/benchmark/tpch/queries/q9.sql
new file mode 100644
index 0000000..2455695
--- /dev/null
+++ b/benchmark/tpch/queries/q9.sql
@@ -0,0 +1,34 @@
+-- SQLBench-H query 9 derived from TPC-H query 9 under the terms of the TPC
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance
Council.
+select
+ nation,
+ o_year,
+ sum(amount) as sum_profit
+from
+ (
+ select
+ n_name as nation,
+ extract(year from o_orderdate) as o_year,
+ l_extendedprice * (1 - l_discount) - ps_supplycost *
l_quantity as amount
+ from
+ part,
+ supplier,
+ lineitem,
+ partsupp,
+ orders,
+ nation
+ where
+ s_suppkey = l_suppkey
+ and ps_suppkey = l_suppkey
+ and ps_partkey = l_partkey
+ and p_partkey = l_partkey
+ and o_orderkey = l_orderkey
+ and s_nationkey = n_nationkey
+ and p_name like '%moccasin%'
+ ) as profit
+group by
+ nation,
+ o_year
+order by
+ nation,
+ o_year desc;
diff --git a/benchmark/tpch/run.sh b/benchmark/tpch/run.sh
new file mode 100755
index 0000000..714000c
--- /dev/null
+++ b/benchmark/tpch/run.sh
@@ -0,0 +1,458 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+set -euo pipefail
+
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+REPO_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)"
+
+DEFAULT_SCALE_FACTOR=1
+DOCKER_IMAGE="tpch-bench"
+TPCH_BIN="$REPO_ROOT/target/release/tpch"
+MODE="${MODE:-native}"
+
+build_tpch() {
+ echo "Building TPC-H tool..."
+ cargo build -p tpch --release --manifest-path "$REPO_ROOT/Cargo.toml"
+}
+
+build_docker() {
+ echo "Building TPC-H Docker image..."
+ docker build -t "$DOCKER_IMAGE" -f "$SCRIPT_DIR/infra/Dockerfile"
"$REPO_ROOT"
+}
+
+# Read spark-submit args from tpch binary (one token per line) into SPARK_ARGS
array.
+read_spark_args() {
+ SPARK_ARGS=()
+ while IFS= read -r line; do
+ SPARK_ARGS+=("$line")
+ done < <("$TPCH_BIN" spark-args "$@")
+}
+
+# Setup Spark config files for native mode.
+setup_spark_native() {
+ if [ -z "${SPARK_HOME:-}" ]; then
+ echo "Error: SPARK_HOME is not set. Set it to your Spark installation
directory." >&2
+ exit 1
+ fi
+ if [ ! -x "$SPARK_HOME/bin/spark-submit" ]; then
+ echo "Error: $SPARK_HOME/bin/spark-submit not found or not executable." >&2
+ exit 1
+ fi
+
+ echo "Configuring Spark at $SPARK_HOME..."
+ cp "$SCRIPT_DIR/infra/spark/spark-defaults.conf"
"$SPARK_HOME/conf/spark-defaults.conf"
+ cp "$SCRIPT_DIR/infra/spark/log4j2.properties"
"$SPARK_HOME/conf/log4j2.properties"
+}
+
+usage() {
+ cat <<EOF
+Usage: $0 <command> [options]
+
+Commands:
+ generate Generate TPC-H parquet data
+ create-tables Create Hudi COW tables from parquet via Spark (Docker)
+ bench-spark Run TPC-H queries against Hudi tables via Spark SQL
+ bench-datafusion Run TPC-H queries against Hudi tables via DataFusion
+ compare Compare persisted benchmark results with bar charts
+
+Environment:
+ MODE Execution mode: docker (default) or native
+
+Options:
+ --scale-factor N TPC-H scale factor (default: $DEFAULT_SCALE_FACTOR)
+ --format F Table format: hudi or parquet (default: auto)
+ --queries Q Comma-separated query numbers (default: all 22)
+ --iterations N Number of measured iterations per query (from config)
+ --warmup N Number of unmeasured warmup iterations per query (from
config)
+ --output-dir D Directory to persist results as JSON (bench commands only)
+ --engines E Comma-separated engine names to compare (compare command
only)
+
+Examples:
+ $0 generate --scale-factor 1
+ $0 create-tables --scale-factor 1
+ MODE=native $0 bench-spark --scale-factor 1 --queries 1,3,6
+ MODE=native $0 bench-datafusion --scale-factor 1 --queries 1,3,6
+ $0 bench-datafusion --scale-factor 1 --output-dir results
+ $0 compare --scale-factor 1 --engines datafusion,spark --format hudi
+EOF
+}
+
+# --- Commands ---
+
+cmd_generate() {
+ local sf="$DEFAULT_SCALE_FACTOR"
+ while [[ $# -gt 0 ]]; do
+ case "$1" in
+ --scale-factor) sf="$2"; shift 2 ;;
+ *) echo "Unknown option: $1" >&2; usage; exit 1 ;;
+ esac
+ done
+
+ local parquet_dir="$SCRIPT_DIR/data/sf$sf-parquet"
+ if [ -d "$parquet_dir" ]; then
+ echo "Removing existing parquet data at $parquet_dir..."
+ rm -rf "$parquet_dir"
+ fi
+
+ build_tpch
+ "$TPCH_BIN" generate --scale-factor "$sf"
+}
+
+cmd_create_tables() {
+ local sf="$DEFAULT_SCALE_FACTOR"
+ while [[ $# -gt 0 ]]; do
+ case "$1" in
+ --scale-factor) sf="$2"; shift 2 ;;
+ *) echo "Unknown option: $1" >&2; usage; exit 1 ;;
+ esac
+ done
+
+ local parquet_dir="$SCRIPT_DIR/data/sf$sf-parquet"
+ if [ ! -d "$parquet_dir" ]; then
+ echo "Error: parquet data not found at $parquet_dir. Run 'generate'
first." >&2
+ exit 1
+ fi
+
+ local hudi_dir="$SCRIPT_DIR/data/sf$sf-hudi"
+ if [ -d "$hudi_dir" ]; then
+ echo "Removing existing Hudi data at $hudi_dir..."
+ rm -rf "$hudi_dir"
+ fi
+
+ build_tpch
+ build_docker
+ mkdir -p "$hudi_dir"
+
+ local sql_file
+ sql_file="$(mktemp)"
+ "$TPCH_BIN" render-ctas --scale-factor "$sf" \
+ --parquet-base /opt/parquet --hudi-base /opt/hudi > "$sql_file"
+
+ read_spark_args --scale-factor "$sf" --command create-tables
+
+ echo "Creating Hudi COW tables from parquet (sf$sf)..."
+ local docker_exit=0
+ docker run --rm \
+ -v "$parquet_dir:/opt/parquet:ro" \
+ -v "$hudi_dir:/opt/hudi" \
+ -v "$sql_file:/opt/spark/work-dir/create_hudi_tables.sql:ro" \
+ "$DOCKER_IMAGE" \
+ /opt/spark/bin/spark-sql "${SPARK_ARGS[@]}" \
+ -f /opt/spark/work-dir/create_hudi_tables.sql \
+ || docker_exit=$?
+
+ rm -f "$sql_file"
+ if [ $docker_exit -ne 0 ]; then
+ echo "Error: Spark SQL failed with exit code $docker_exit" >&2
+ return $docker_exit
+ fi
+ echo "Hudi COW tables created at: $hudi_dir"
+}
+
+cmd_bench_spark() {
+ local sf="$DEFAULT_SCALE_FACTOR"
+ local queries=""
+ local iterations=""
+ local warmup=""
+ local output_dir=""
+ local format="hudi"
+ while [[ $# -gt 0 ]]; do
+ case "$1" in
+ --scale-factor) sf="$2"; shift 2 ;;
+ --queries) queries="$2"; shift 2 ;;
+ --iterations) iterations="$2"; shift 2 ;;
+ --warmup) warmup="$2"; shift 2 ;;
+ --output-dir) output_dir="$2"; shift 2 ;;
+ --format) format="$2"; shift 2 ;;
+ *) echo "Unknown option: $1" >&2; usage; exit 1 ;;
+ esac
+ done
+
+ build_tpch
+
+ # Read defaults from config via the tpch binary
+ if [ -z "$warmup" ] || [ -z "$iterations" ]; then
+ local defaults
+ defaults=$("$TPCH_BIN" bench-defaults --scale-factor "$sf")
+ local cfg_warmup cfg_iterations
+ cfg_warmup=$(echo "$defaults" | awk '{print $1}')
+ cfg_iterations=$(echo "$defaults" | awk '{print $2}')
+ warmup="${warmup:-$cfg_warmup}"
+ iterations="${iterations:-$cfg_iterations}"
+ fi
+
+ local hudi_dir="$SCRIPT_DIR/data/sf$sf-hudi"
+ local parquet_dir="$SCRIPT_DIR/data/sf$sf-parquet"
+
+ local data_dir=""
+ local bench_data_arg=""
+ case "$format" in
+ hudi)
+ data_dir="$hudi_dir"
+ bench_data_arg="--hudi-base"
+ ;;
+ parquet)
+ data_dir="$parquet_dir"
+ bench_data_arg="--parquet-base"
+ ;;
+ *) echo "Error: unknown format '$format'. Use 'hudi' or 'parquet'." >&2;
exit 1 ;;
+ esac
+
+ if [ ! -d "$data_dir" ]; then
+ echo "Error: $format data not found at $data_dir." >&2
+ exit 1
+ fi
+
+ read_spark_args --scale-factor "$sf" --command bench
+
+ local tmp_dir
+ tmp_dir="$(mktemp -d)"
+ local output_file="$tmp_dir/results.jsonl"
+
+ if [ "$MODE" = "native" ]; then
+ setup_spark_native
+
+ local bench_args=(
+ $bench_data_arg "$data_dir"
+ --query-dir "$SCRIPT_DIR/queries"
+ --scale-factor "$sf"
+ --warmup "$warmup"
+ --iterations "$iterations"
+ --output "$output_file"
+ )
+ if [ -n "$queries" ]; then
+ bench_args+=(--queries "$queries")
+ fi
+
+ echo "Running Spark SQL benchmark ($format, native)..."
+ "$SPARK_HOME/bin/spark-submit" \
+ --packages org.apache.hudi:hudi-spark3.5-bundle_2.12:1.1.1 \
+ "${SPARK_ARGS[@]}" \
+ "$SCRIPT_DIR/infra/spark/bench.py" \
+ "${bench_args[@]}"
+ else
+ build_docker
+
+ local bench_args=(
+ $bench_data_arg /opt/data
+ --query-dir /opt/queries
+ --scale-factor "$sf"
+ --warmup "$warmup"
+ --iterations "$iterations"
+ --output /opt/output/results.jsonl
+ )
+ if [ -n "$queries" ]; then
+ bench_args+=(--queries "$queries")
+ fi
+
+ echo "Running Spark SQL benchmark ($format, docker)..."
+ local docker_exit=0
+ docker run --rm \
+ -e PYTHONUNBUFFERED=1 \
+ -v "$data_dir:/opt/data:ro" \
+ -v "$SCRIPT_DIR/queries:/opt/queries:ro" \
+ -v "$SCRIPT_DIR/infra/spark/bench.py:/opt/spark/work-dir/bench.py:ro" \
+ -v "$tmp_dir:/opt/output" \
+ "$DOCKER_IMAGE" \
+ /opt/spark/bin/spark-submit "${SPARK_ARGS[@]}" \
+ /opt/spark/work-dir/bench.py \
+ "${bench_args[@]}" \
+ || docker_exit=$?
+
+ if [ $docker_exit -ne 0 ]; then
+ echo "Error: Spark SQL benchmark failed with exit code $docker_exit" >&2
+ rm -rf "$tmp_dir"
+ return $docker_exit
+ fi
+ fi
+
+ echo ""
+ local parse_args=(parse-spark-output --input "$output_file")
+ if [ -n "$output_dir" ]; then
+ mkdir -p "$output_dir"
+ parse_args+=(--output-dir "$output_dir" --engine-label spark
--format-label "$format" --display-name "spark+hudi" --scale-factor "$sf")
+ fi
+ "$TPCH_BIN" "${parse_args[@]}"
+ rm -rf "$tmp_dir"
+}
+
+cmd_bench_datafusion() {
+ local sf="$DEFAULT_SCALE_FACTOR"
+ local format=""
+ local queries=""
+ local iterations=""
+ local warmup=""
+ local output_dir=""
+ while [[ $# -gt 0 ]]; do
+ case "$1" in
+ --scale-factor) sf="$2"; shift 2 ;;
+ --format) format="$2"; shift 2 ;;
+ --queries) queries="$2"; shift 2 ;;
+ --iterations) iterations="$2"; shift 2 ;;
+ --warmup) warmup="$2"; shift 2 ;;
+ --output-dir) output_dir="$2"; shift 2 ;;
+ *) echo "Unknown option: $1" >&2; usage; exit 1 ;;
+ esac
+ done
+
+ local hudi_dir="$SCRIPT_DIR/data/sf$sf-hudi"
+ local parquet_dir="$SCRIPT_DIR/data/sf$sf-parquet"
+
+ # Determine which formats to bench
+ local use_hudi=false
+ local use_parquet=false
+ case "$format" in
+ hudi) use_hudi=true ;;
+ parquet) use_parquet=true ;;
+ "")
+ # Default: use whichever data dirs exist
+ [ -d "$hudi_dir" ] && use_hudi=true
+ [ -d "$parquet_dir" ] && use_parquet=true
+ ;;
+ *) echo "Error: unknown format '$format'. Use 'hudi' or 'parquet'." >&2;
exit 1 ;;
+ esac
+
+ if [ "$use_hudi" = false ] && [ "$use_parquet" = false ]; then
+ echo "Error: no data found for sf$sf. Run 'generate' and/or
'create-tables' first." >&2
+ exit 1
+ fi
+ if [ "$use_hudi" = true ] && [ ! -d "$hudi_dir" ]; then
+ echo "Error: Hudi data not found at $hudi_dir. Run 'create-tables' first."
>&2
+ exit 1
+ fi
+ if [ "$use_parquet" = true ] && [ ! -d "$parquet_dir" ]; then
+ echo "Error: Parquet data not found at $parquet_dir. Run 'generate'
first." >&2
+ exit 1
+ fi
+
+ if [ "$MODE" = "native" ]; then
+ build_tpch
+
+ local bench_args=(bench --scale-factor "$sf")
+ [ "$use_hudi" = true ] && bench_args+=(--hudi-dir "$hudi_dir")
+ [ "$use_parquet" = true ] && bench_args+=(--parquet-dir "$parquet_dir")
+ [ -n "$queries" ] && bench_args+=(--queries "$queries")
+ [ -n "$iterations" ] && bench_args+=(--iterations "$iterations")
+ [ -n "$warmup" ] && bench_args+=(--warmup "$warmup")
+
+ if [ -n "$output_dir" ]; then
+ mkdir -p "$output_dir"
+ output_dir="$(cd "$output_dir" && pwd)"
+ bench_args+=(--output-dir "$output_dir" --engine-label datafusion
--format-label "${format:-hudi}" --display-name "datafusion+hudi-rs")
+ fi
+
+ echo "Running DataFusion benchmark (native)..."
+ TPCH_CONFIG_DIR="$SCRIPT_DIR/config" \
+ TPCH_QUERY_DIR="$SCRIPT_DIR/queries" \
+ RUST_LOG="${RUST_LOG:-warn}" \
+ "$TPCH_BIN" "${bench_args[@]}"
+ else
+ build_docker
+
+ # Resolve output_dir to absolute path (Docker requires it)
+ if [ -n "$output_dir" ]; then
+ mkdir -p "$output_dir"
+ output_dir="$(cd "$output_dir" && pwd)"
+ fi
+
+ local bench_args=(bench --scale-factor "$sf")
+ [ "$use_hudi" = true ] && bench_args+=(--hudi-dir /opt/hudi)
+ [ "$use_parquet" = true ] && bench_args+=(--parquet-dir /opt/parquet)
+ [ -n "$queries" ] && bench_args+=(--queries "$queries")
+ [ -n "$iterations" ] && bench_args+=(--iterations "$iterations")
+ [ -n "$warmup" ] && bench_args+=(--warmup "$warmup")
+
+ if [ -n "$output_dir" ]; then
+ bench_args+=(--output-dir /opt/results --engine-label datafusion
--format-label "${format:-hudi}" --display-name "datafusion+hudi-rs")
+ fi
+
+ echo "Running DataFusion benchmark (docker)..."
+ local volumes=()
+ [ "$use_hudi" = true ] && volumes+=(-v "$hudi_dir:/opt/hudi:ro")
+ [ "$use_parquet" = true ] && volumes+=(-v "$parquet_dir:/opt/parquet:ro")
+ volumes+=(-v "$SCRIPT_DIR/queries:/opt/queries:ro")
+ volumes+=(-v "$SCRIPT_DIR/config:/opt/config:ro")
+ [ -n "$output_dir" ] && volumes+=(-v "$output_dir:/opt/results")
+
+ docker run --rm \
+ "${volumes[@]}" \
+ -e TPCH_CONFIG_DIR=/opt/config \
+ -e TPCH_QUERY_DIR=/opt/queries \
+ -e RUST_LOG="${RUST_LOG:-warn}" \
+ "$DOCKER_IMAGE" \
+ tpch "${bench_args[@]}"
+ fi
+}
+
+cmd_compare() {
+ local sf="$DEFAULT_SCALE_FACTOR"
+ local engines=""
+ local format="hudi"
+ while [[ $# -gt 0 ]]; do
+ case "$1" in
+ --scale-factor) sf="$2"; shift 2 ;;
+ --engines) engines="$2"; shift 2 ;;
+ --format) format="$2"; shift 2 ;;
+ *) echo "Unknown option: $1" >&2; usage; exit 1 ;;
+ esac
+ done
+
+ if [ -z "$engines" ]; then
+ echo "Error: --engines is required (e.g., --engines datafusion,spark)" >&2
+ exit 1
+ fi
+
+ # Convert "datafusion,spark" → "datafusion_hudi_sf1,spark_hudi_sf1"
+ local runs=""
+ IFS=',' read -ra engine_arr <<< "$engines"
+ for e in "${engine_arr[@]}"; do
+ [ -n "$runs" ] && runs+=","
+ runs+="${e}_${format}_sf${sf}"
+ done
+
+ build_tpch
+ "$TPCH_BIN" compare \
+ --results-dir "$SCRIPT_DIR/results" \
+ --runs "$runs"
+}
+
+# --- Main ---
+
+if [[ $# -lt 1 ]]; then
+ usage
+ exit 1
+fi
+
+COMMAND="$1"
+shift
+
+case "$COMMAND" in
+ generate) cmd_generate "$@" ;;
+ create-tables) cmd_create_tables "$@" ;;
+ bench-spark) cmd_bench_spark "$@" ;;
+ bench-datafusion) cmd_bench_datafusion "$@" ;;
+ compare) cmd_compare "$@" ;;
+ *)
+ echo "Unknown command: $COMMAND" >&2
+ usage
+ exit 1
+ ;;
+esac
diff --git a/benchmark/tpch/src/config.rs b/benchmark/tpch/src/config.rs
new file mode 100644
index 0000000..c375e7c
--- /dev/null
+++ b/benchmark/tpch/src/config.rs
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use std::collections::BTreeMap;
+use std::fmt::Write;
+use std::path::{Path, PathBuf};
+
+use serde::Deserialize;
+
+/// Canonical table ordering for SQL output (matches TPC-H dependency order).
+const TABLE_ORDER: &[&str] = &[
+ "nation", "region", "part", "supplier", "partsupp", "customer", "orders",
"lineitem",
+];
+
+/// Common table definition shared across all scale factors (from tables.yaml).
+#[derive(Deserialize)]
+struct CommonTableConfig {
+ primary_key: String,
+ pre_combine_field: String,
+ record_size_estimate: u32,
+}
+
+/// Common tables file (tables.yaml).
+#[derive(Deserialize)]
+struct CommonConfig {
+ tables: BTreeMap<String, CommonTableConfig>,
+}
+
+/// Per-scale-factor overrides (sf*.yaml).
+#[derive(Deserialize)]
+struct ScaleFactorOverrides {
+ shuffle_parallelism: BTreeMap<String, u32>,
+ create_tables: SparkCommandConfig,
+ bench: BenchConfig,
+}
+
+/// Merged table config used at runtime.
+pub struct TableConfig {
+ pub primary_key: String,
+ pub pre_combine_field: String,
+ pub record_size_estimate: u32,
+ pub shuffle_parallelism: u32,
+}
+
+pub struct ScaleFactorConfig {
+ pub tables: BTreeMap<String, TableConfig>,
+ pub create_tables: SparkCommandConfig,
+ pub bench: BenchConfig,
+}
+
+#[derive(Deserialize)]
+pub struct SparkCommandConfig {
+ #[serde(default)]
+ pub spark_conf: BTreeMap<String, String>,
+}
+
+#[derive(Deserialize)]
+pub struct BenchConfig {
+ #[serde(default)]
+ pub warmup: usize,
+ #[serde(default = "default_iterations")]
+ pub iterations: usize,
+ #[serde(default)]
+ pub memory_limit: Option<String>,
+ #[serde(default)]
+ pub spark_conf: BTreeMap<String, String>,
+}
+
+fn default_iterations() -> usize {
+ 1
+}
+
+impl ScaleFactorConfig {
+ /// Supported scale factors that have config files.
+ const SUPPORTED: &[u64] = &[1, 10, 100, 1000];
+
+ /// Load common table definitions and per-SF overrides, then merge them.
+ pub fn load(scale_factor: f64) -> Result<Self, Box<dyn std::error::Error>>
{
+ let effective_sf = if scale_factor < 1.0 {
+ 1u64
+ } else {
+ let sf = scale_factor as u64;
+ if !Self::SUPPORTED.contains(&sf) {
+ return Err(format!(
+ "Unsupported scale factor {scale_factor}. Supported: {:?}",
+ Self::SUPPORTED
+ )
+ .into());
+ }
+ sf
+ };
+
+ let config_dir = std::env::var("TPCH_CONFIG_DIR")
+ .map(PathBuf::from)
+ .unwrap_or_else(|_|
Path::new(env!("CARGO_MANIFEST_DIR")).join("config"));
+
+ // Load common table definitions
+ let common_path = config_dir.join("tables.yaml");
+ let common_content = std::fs::read_to_string(&common_path)
+ .map_err(|e| format!("Failed to read {}: {e}",
common_path.display()))?;
+ let common: CommonConfig = serde_yaml::from_str(&common_content)
+ .map_err(|e| format!("Failed to parse tables.yaml: {e}"))?;
+
+ // Load per-SF overrides
+ let sf_filename = format!("sf{effective_sf}.yaml");
+ let sf_path = config_dir.join(&sf_filename);
+ let sf_content = std::fs::read_to_string(&sf_path)
+ .map_err(|e| format!("Failed to read config {}: {e}",
sf_path.display()))?;
+ let overrides: ScaleFactorOverrides = serde_yaml::from_str(&sf_content)
+ .map_err(|e| format!("Failed to parse config {sf_filename}:
{e}"))?;
+
+ // Merge: common tables + per-SF shuffle_parallelism
+ let mut tables = BTreeMap::new();
+ for (name, common_table) in common.tables {
+ let shuffle_parallelism = overrides
+ .shuffle_parallelism
+ .get(&name)
+ .copied()
+ .unwrap_or(1);
+ tables.insert(
+ name,
+ TableConfig {
+ primary_key: common_table.primary_key,
+ pre_combine_field: common_table.pre_combine_field,
+ record_size_estimate: common_table.record_size_estimate,
+ shuffle_parallelism,
+ },
+ );
+ }
+
+ Ok(Self {
+ tables,
+ create_tables: overrides.create_tables,
+ bench: overrides.bench,
+ })
+ }
+
+ /// Generate CTAS SQL for creating Hudi tables from parquet sources.
+ pub fn render_ctas_sql(&self, parquet_base: &str, hudi_base: &str) ->
String {
+ let mut sql = String::new();
+ for &name in TABLE_ORDER {
+ let Some(table) = self.tables.get(name) else {
+ continue;
+ };
+ writeln!(sql, "CREATE TABLE {name} USING hudi").unwrap();
+ writeln!(sql, "LOCATION '{hudi_base}/{name}'").unwrap();
+ writeln!(sql, "TBLPROPERTIES (").unwrap();
+ writeln!(sql, " type = 'cow',").unwrap();
+ writeln!(sql, " primaryKey = '{}',", table.primary_key).unwrap();
+ writeln!(sql, " preCombineField = '{}',",
table.pre_combine_field).unwrap();
+ writeln!(sql, " 'hoodie.table.name' = '{name}',").unwrap();
+ writeln!(
+ sql,
+ " 'hoodie.bulkinsert.shuffle.parallelism' = '{}',",
+ table.shuffle_parallelism
+ )
+ .unwrap();
+ writeln!(
+ sql,
+ " 'hoodie.copyonwrite.record.size.estimate' = '{}'",
+ table.record_size_estimate
+ )
+ .unwrap();
+ writeln!(sql, ") AS SELECT * FROM
parquet.`{parquet_base}/{name}/`;").unwrap();
+ writeln!(sql).unwrap();
+ }
+ sql
+ }
+
+ /// Generate benchmark SQL: table registrations followed by query
iterations.
+ pub fn render_bench_sql(
+ &self,
+ hudi_base: &str,
+ query_nums: &[usize],
+ iterations: usize,
+ scale_factor: f64,
+ ) -> Result<String, Box<dyn std::error::Error>> {
+ let mut sql = String::new();
+
+ // Register Hudi tables
+ for &name in TABLE_ORDER {
+ if self.tables.contains_key(name) {
+ writeln!(
+ sql,
+ "CREATE TABLE {name} USING hudi LOCATION
'{hudi_base}/{name}';"
+ )
+ .unwrap();
+ }
+ }
+ writeln!(sql).unwrap();
+
+ // Per-SF substitution values (TPC-H spec Section 2.4.11.3: FRACTION =
0.0001 / SF)
+ let q11_fraction = format!("{:.10}", 0.0001 / scale_factor);
+
+ // Add queries with bench markers
+ let queries_dir =
Path::new(env!("CARGO_MANIFEST_DIR")).join("queries");
+ for &qn in query_nums {
+ let qfile = queries_dir.join(format!("q{qn}.sql"));
+ let query_sql = std::fs::read_to_string(&qfile)
+ .map_err(|e| format!("Failed to read q{qn}.sql: {e}"))?;
+ let query_sql = query_sql.replace("${Q11_FRACTION}",
&q11_fraction);
+ for i in 1..=iterations {
+ writeln!(sql).unwrap();
+ writeln!(sql, "SELECT 'BENCH_MARKER q{qn} iter{i}' as
marker;").unwrap();
+ write!(sql, "{query_sql}").unwrap();
+ if !query_sql.ends_with('\n') {
+ writeln!(sql).unwrap();
+ }
+ }
+ }
+
+ Ok(sql)
+ }
+
+ /// Generate spark-submit arguments for a given command, one per line.
+ pub fn render_spark_args(&self, command: &str) -> Result<Vec<String>,
String> {
+ let spark_conf = match command {
+ "create-tables" => &self.create_tables.spark_conf,
+ "bench" => &self.bench.spark_conf,
+ _ => return Err(format!("Unknown command: {command}")),
+ };
+
+ let mut args = vec!["--master".to_string(), "local[*]".to_string()];
+
+ for (key, value) in spark_conf {
+ args.push("--conf".to_string());
+ args.push(format!("{key}={value}"));
+ }
+
+ Ok(args)
+ }
+}
diff --git a/benchmark/tpch/src/datagen.rs b/benchmark/tpch/src/datagen.rs
new file mode 100644
index 0000000..fb4f675
--- /dev/null
+++ b/benchmark/tpch/src/datagen.rs
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use object_store::buffered::BufWriter;
+use object_store::path::Path as ObjectPath;
+use object_store::{ObjectStore, parse_url_opts};
+use parquet::arrow::ArrowWriter;
+use parquet::arrow::async_writer::AsyncArrowWriter;
+use parquet::basic::Compression;
+use parquet::file::properties::WriterProperties;
+use tpchgen::generators::{
+ CustomerGenerator, LineItemGenerator, NationGenerator, OrderGenerator,
PartGenerator,
+ PartSuppGenerator, RegionGenerator, SupplierGenerator,
+};
+use tpchgen_arrow::{
+ CustomerArrow, LineItemArrow, NationArrow, OrderArrow, PartArrow,
PartSuppArrow,
+ RecordBatchIterator, RegionArrow, SupplierArrow,
+};
+use url::Url;
+
+use crate::{collect_cloud_env_vars, is_cloud_url};
+
+fn writer_props() -> WriterProperties {
+ WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+ .build()
+}
+
+fn write_parquet_local(
+ batches: &mut dyn Iterator<Item = RecordBatch>,
+ schema: &SchemaRef,
+ path: &std::path::Path,
+) -> Result<(), Box<dyn std::error::Error>> {
+ let file = std::fs::File::create(path)?;
+ let mut writer = ArrowWriter::try_new(file, schema.clone(),
Some(writer_props()))?;
+ for batch in batches {
+ writer.write(&batch)?;
+ }
+ writer.close()?;
+ Ok(())
+}
+
+async fn write_parquet_cloud(
+ batches: &mut dyn Iterator<Item = RecordBatch>,
+ schema: &SchemaRef,
+ store: &Arc<dyn ObjectStore>,
+ path: &ObjectPath,
+) -> Result<(), Box<dyn std::error::Error>> {
+ let buf_writer = BufWriter::new(Arc::clone(store), path.clone());
+ let mut writer = AsyncArrowWriter::try_new(buf_writer, schema.clone(),
Some(writer_props()))?;
+ for batch in batches {
+ writer.write(&batch).await?;
+ }
+ writer.close().await?;
+ Ok(())
+}
+
+fn generate_table_local(
+ name: &str,
+ output_dir: &std::path::Path,
+ batches: &mut dyn Iterator<Item = RecordBatch>,
+ schema: &SchemaRef,
+) -> Result<(), Box<dyn std::error::Error>> {
+ let table_dir = output_dir.join(name);
+ std::fs::create_dir_all(&table_dir)?;
+ println!(" Generating table: {name} ...");
+ write_parquet_local(batches, schema, &table_dir.join("data.parquet"))?;
+ println!(" Done: {name}");
+ Ok(())
+}
+
+async fn generate_table_cloud(
+ name: &str,
+ store: &Arc<dyn ObjectStore>,
+ base_path: &ObjectPath,
+ batches: &mut dyn Iterator<Item = RecordBatch>,
+ schema: &SchemaRef,
+) -> Result<(), Box<dyn std::error::Error>> {
+ let file_path =
ObjectPath::from(format!("{base_path}/{name}/data.parquet"));
+ println!(" Generating table: {name} ...");
+ write_parquet_cloud(batches, schema, store, &file_path).await?;
+ println!(" Done: {name}");
+ Ok(())
+}
+
+macro_rules! gen_all_tables {
+ ($sf:expr, $gen_fn:ident $(, $ctx:expr)*) => {{
+ let tables: Vec<(&str, Box<dyn FnOnce(f64) -> (Box<dyn Iterator<Item =
RecordBatch>>, SchemaRef)>)> = vec![
+ ("nation", Box::new(|sf| {
+ let it = NationArrow::new(NationGenerator::new(sf, 1, 1));
+ let s = it.schema().clone(); (Box::new(it) as Box<dyn
Iterator<Item = RecordBatch>>, s)
+ })),
+ ("region", Box::new(|sf| {
+ let it = RegionArrow::new(RegionGenerator::new(sf, 1, 1));
+ let s = it.schema().clone(); (Box::new(it) as _, s)
+ })),
+ ("part", Box::new(|sf| {
+ let it = PartArrow::new(PartGenerator::new(sf, 1, 1));
+ let s = it.schema().clone(); (Box::new(it) as _, s)
+ })),
+ ("supplier", Box::new(|sf| {
+ let it = SupplierArrow::new(SupplierGenerator::new(sf, 1, 1));
+ let s = it.schema().clone(); (Box::new(it) as _, s)
+ })),
+ ("partsupp", Box::new(|sf| {
+ let it = PartSuppArrow::new(PartSuppGenerator::new(sf, 1, 1));
+ let s = it.schema().clone(); (Box::new(it) as _, s)
+ })),
+ ("customer", Box::new(|sf| {
+ let it = CustomerArrow::new(CustomerGenerator::new(sf, 1, 1));
+ let s = it.schema().clone(); (Box::new(it) as _, s)
+ })),
+ ("orders", Box::new(|sf| {
+ let it = OrderArrow::new(OrderGenerator::new(sf, 1, 1));
+ let s = it.schema().clone(); (Box::new(it) as _, s)
+ })),
+ ("lineitem", Box::new(|sf| {
+ let it = LineItemArrow::new(LineItemGenerator::new(sf, 1, 1));
+ let s = it.schema().clone(); (Box::new(it) as _, s)
+ })),
+ ];
+ for (name, factory) in tables {
+ let (mut batches, schema) = factory($sf);
+ $gen_fn(name, &mut *batches, &schema $(, $ctx)*).await?;
+ }
+ }};
+}
+
+async fn do_generate_local(
+ name: &str,
+ batches: &mut dyn Iterator<Item = RecordBatch>,
+ schema: &SchemaRef,
+ output_dir: &std::path::Path,
+) -> Result<(), Box<dyn std::error::Error>> {
+ generate_table_local(name, output_dir, batches, schema)
+}
+
+async fn do_generate_cloud(
+ name: &str,
+ batches: &mut dyn Iterator<Item = RecordBatch>,
+ schema: &SchemaRef,
+ ctx: &(Arc<dyn ObjectStore>, ObjectPath),
+) -> Result<(), Box<dyn std::error::Error>> {
+ generate_table_cloud(name, &ctx.0, &ctx.1, batches, schema).await
+}
+
+pub async fn run_generate(
+ scale_factor: f64,
+ output_dir: &str,
+) -> Result<(), Box<dyn std::error::Error>> {
+ let sf = scale_factor;
+
+ if is_cloud_url(output_dir) {
+ println!("Generating TPC-H data (scale factor {sf}) to {output_dir}");
+
+ let url = Url::parse(output_dir)?;
+ let env_vars = collect_cloud_env_vars();
+ let (store, object_path) = parse_url_opts(&url, env_vars)?;
+ let ctx = (Arc::from(store) as Arc<dyn ObjectStore>, object_path);
+
+ gen_all_tables!(sf, do_generate_cloud, &ctx);
+ } else {
+ let out_path = std::path::Path::new(output_dir);
+ println!(
+ "Generating TPC-H data (scale factor {sf}) into {}",
+ out_path.display()
+ );
+
+ gen_all_tables!(sf, do_generate_local, out_path);
+ }
+
+ println!("All TPC-H tables generated successfully.");
+ Ok(())
+}
diff --git a/benchmark/tpch/src/main.rs b/benchmark/tpch/src/main.rs
new file mode 100644
index 0000000..9c38663
--- /dev/null
+++ b/benchmark/tpch/src/main.rs
@@ -0,0 +1,1268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+mod config;
+mod datagen;
+
+use std::collections::{BTreeMap, BTreeSet, HashMap};
+use std::fs;
+use std::io::BufRead;
+use std::path::Path;
+use std::sync::Arc;
+use std::time::{Instant, SystemTime, UNIX_EPOCH};
+
+use arrow::datatypes::DataType;
+use arrow_array::RecordBatch;
+use arrow_cast::display::{ArrayFormatter, FormatOptions};
+use clap::{Parser, Subcommand};
+use comfy_table::{Cell, Table};
+use datafusion::dataframe::DataFrame;
+use datafusion::error::Result;
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::memory_pool::FairSpillPool;
+use datafusion::execution::runtime_env::RuntimeEnvBuilder;
+use datafusion::prelude::SessionConfig;
+use hudi::HudiDataSource;
+use serde::{Deserialize, Serialize};
+
+/// The 8 TPC-H tables.
+const TPCH_TABLES: &[&str] = &[
+ "customer", "lineitem", "nation", "orders", "part", "partsupp", "region",
"supplier",
+];
+
+/// Total number of TPC-H queries.
+const NUM_QUERIES: usize = 22;
+
+/// Cloud URL scheme prefixes.
+const CLOUD_SCHEMES: &[&str] = &["s3://", "s3a://", "gs://", "wasb://",
"wasbs://", "az://"];
+
+#[derive(Parser)]
+#[command(name = "tpch", about = "TPC-H benchmark tool for Apache Hudi")]
+struct Cli {
+ #[command(subcommand)]
+ command: Commands,
+}
+
+#[derive(Subcommand)]
+enum Commands {
+ /// Generate TPC-H parquet data
+ Generate {
+ /// TPC-H scale factor
+ #[arg(long, default_value_t = 1.0)]
+ scale_factor: f64,
+
+ /// Output directory (local path or cloud URL); defaults to
data/sf{N}-parquet
+ #[arg(long)]
+ output_dir: Option<String>,
+ },
+ /// Render CTAS SQL from scale-factor config
+ RenderCtas {
+ /// TPC-H scale factor (loads config/sf{N}.yaml)
+ #[arg(long)]
+ scale_factor: f64,
+
+ /// Parquet source base path (e.g., /opt/parquet or gs://bucket/path)
+ #[arg(long)]
+ parquet_base: String,
+
+ /// Hudi output base path (e.g., /opt/hudi or gs://bucket/path)
+ #[arg(long)]
+ hudi_base: String,
+ },
+ /// Render benchmark SQL (table registrations + query iterations)
+ RenderBenchSql {
+ /// TPC-H scale factor (loads config/sf{N}.yaml)
+ #[arg(long)]
+ scale_factor: f64,
+
+ /// Hudi tables base path (e.g., /opt/hudi)
+ #[arg(long)]
+ hudi_base: String,
+
+ /// Comma-separated query numbers (default: all 22)
+ #[arg(long)]
+ queries: Option<String>,
+
+ /// Number of iterations per query (overrides config)
+ #[arg(long)]
+ iterations: Option<usize>,
+ },
+ /// Output spark-submit arguments from scale-factor config (one per line)
+ SparkArgs {
+ /// TPC-H scale factor (loads config/sf{N}.yaml)
+ #[arg(long)]
+ scale_factor: f64,
+
+ /// Command profile to use: "create-tables" or "bench"
+ #[arg(long)]
+ command: String,
+ },
+ /// Print bench defaults from config (warmup and iterations)
+ BenchDefaults {
+ /// TPC-H scale factor (loads config/sf{N}.yaml)
+ #[arg(long, default_value_t = 1.0)]
+ scale_factor: f64,
+ },
+ /// Run TPC-H benchmark queries via DataFusion (Hudi, Parquet, or both)
+ Bench {
+ /// Hudi tables location (local path or cloud URL)
+ #[arg(long)]
+ hudi_dir: Option<String>,
+
+ /// Parquet tables location (local path or cloud URL)
+ #[arg(long)]
+ parquet_dir: Option<String>,
+
+ /// TPC-H scale factor (used for query parameter substitution and
config loading)
+ #[arg(long, default_value_t = 1.0)]
+ scale_factor: f64,
+
+ /// Comma-separated query numbers to run (e.g., "1,3,6"); defaults to
all 22
+ #[arg(long)]
+ queries: Option<String>,
+
+ /// Number of measured iterations per query (overrides config)
+ #[arg(long)]
+ iterations: Option<usize>,
+
+ /// Number of unmeasured warmup iterations per query (overrides config)
+ #[arg(long)]
+ warmup: Option<usize>,
+
+ /// DataFusion memory limit (e.g., "3g", "512m"); unlimited if not set
+ #[arg(long)]
+ memory_limit: Option<String>,
+
+ /// Directory to persist results as JSON (enables result saving)
+ #[arg(long)]
+ output_dir: Option<String>,
+
+ /// Engine label for persisted results (e.g., "datafusion")
+ #[arg(long)]
+ engine_label: Option<String>,
+
+ /// Format label for persisted results (e.g., "hudi"); auto-detected
if omitted
+ #[arg(long)]
+ format_label: Option<String>,
+
+ /// Display name for charts (e.g., "datafusion+hudi-rs"); defaults to
engine_label
+ #[arg(long)]
+ display_name: Option<String>,
+ },
+ /// Validate Hudi query results against Parquet (runs each query once,
compares output)
+ Validate {
+ /// Hudi tables location (local path or cloud URL)
+ #[arg(long)]
+ hudi_dir: String,
+
+ /// Parquet tables location (local path or cloud URL)
+ #[arg(long)]
+ parquet_dir: String,
+
+ /// TPC-H scale factor (used for query parameter substitution)
+ #[arg(long, default_value_t = 1.0)]
+ scale_factor: f64,
+
+ /// Comma-separated query numbers to run (e.g., "1,3,6"); defaults to
all 22
+ #[arg(long)]
+ queries: Option<String>,
+
+ /// DataFusion memory limit (e.g., "3g", "512m"); unlimited if not set
+ #[arg(long)]
+ memory_limit: Option<String>,
+ },
+ /// Parse Spark benchmark JSON output into a timing table
+ ParseSparkOutput {
+ /// Input file (reads from stdin if omitted)
+ #[arg(long)]
+ input: Option<String>,
+
+ /// Directory to persist results as JSON
+ #[arg(long)]
+ output_dir: Option<String>,
+
+ /// Engine label for persisted results (default: "spark")
+ #[arg(long)]
+ engine_label: Option<String>,
+
+ /// Format label for persisted results (e.g., "hudi")
+ #[arg(long)]
+ format_label: Option<String>,
+
+ /// Display name for charts (e.g., "spark+hudi"); defaults to
engine_label
+ #[arg(long)]
+ display_name: Option<String>,
+
+ /// TPC-H scale factor (used for result file naming)
+ #[arg(long, default_value_t = 1.0)]
+ scale_factor: f64,
+ },
+ /// Compare persisted benchmark results with terminal bar charts
+ Compare {
+ /// Directory containing result JSON files
+ #[arg(long)]
+ results_dir: String,
+
+ /// Comma-separated result file stems (e.g.,
"datafusion_hudi_sf1,spark_hudi_sf1")
+ #[arg(long)]
+ runs: String,
+ },
+}
+
+/// Check if a path string is a cloud URL.
+fn is_cloud_url(path: &str) -> bool {
+ CLOUD_SCHEMES.iter().any(|s| path.starts_with(s))
+}
+
+/// Resolve a local path to an absolute path string, or return cloud URL as-is.
+fn resolve_path(path: &str) -> std::result::Result<String, String> {
+ if is_cloud_url(path) {
+ Ok(path.to_string())
+ } else {
+ fs::canonicalize(path)
+ .map(|p| p.to_string_lossy().to_string())
+ .map_err(|e| format!("Failed to resolve path {path}: {e}"))
+ }
+}
+
+/// Collect cloud storage env vars as options for object_store.
+fn collect_cloud_env_vars() -> Vec<(String, String)> {
+ std::env::vars()
+ .filter(|(k, _)| {
+ k.starts_with("AWS_")
+ || k.starts_with("GOOGLE_")
+ || k.starts_with("AZURE_")
+ || k.starts_with("OBJECT_STORE_")
+ })
+ .collect()
+}
+
+/// Parse a memory size string (e.g., "3g", "512m", "1024k") into bytes.
+fn parse_memory_size(s: &str) -> std::result::Result<usize, String> {
+ let s = s.trim().to_lowercase();
+ let (num_str, multiplier) = if let Some(n) = s.strip_suffix('g') {
+ (n, 1024 * 1024 * 1024)
+ } else if let Some(n) = s.strip_suffix('m') {
+ (n, 1024 * 1024)
+ } else if let Some(n) = s.strip_suffix('k') {
+ (n, 1024)
+ } else {
+ (s.as_str(), 1usize)
+ };
+ let num: f64 = num_str
+ .parse()
+ .map_err(|_| format!("Invalid memory size: {s}"))?;
+ Ok((num * multiplier as f64) as usize)
+}
+
+/// Create a SessionContext, optionally bounded by a memory pool.
+fn create_session_context(
+ memory_limit: Option<&str>,
+) -> std::result::Result<SessionContext, String> {
+ match memory_limit {
+ Some(limit) => {
+ let pool_size = parse_memory_size(limit)?;
+ let pool = FairSpillPool::new(pool_size);
+ let runtime = RuntimeEnvBuilder::default()
+ .with_memory_pool(Arc::new(pool))
+ .build_arc()
+ .map_err(|e| format!("Failed to build runtime: {e}"))?;
+ Ok(SessionContext::new_with_config_rt(
+ SessionConfig::new(),
+ runtime,
+ ))
+ }
+ None => Ok(SessionContext::new()),
+ }
+}
+
+/// Register a cloud object store on the SessionContext's RuntimeEnv.
+fn register_cloud_store(ctx: &SessionContext, base_url: &str) -> Result<()> {
+ let url = url::Url::parse(base_url).map_err(|e| {
+ datafusion::error::DataFusionError::Plan(format!("Invalid URL
{base_url}: {e}"))
+ })?;
+ let cloud_opts: HashMap<String, String> =
collect_cloud_env_vars().into_iter().collect();
+ let (store, _) = object_store::parse_url_opts(&url,
&cloud_opts).map_err(|e| {
+ datafusion::error::DataFusionError::Plan(format!(
+ "Failed to create object store for {base_url}: {e}"
+ ))
+ })?;
+ ctx.runtime_env()
+ .register_object_store(&url, Arc::new(store));
+ Ok(())
+}
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ env_logger::init();
+ let cli = Cli::parse();
+
+ match cli.command {
+ Commands::Generate {
+ scale_factor,
+ output_dir,
+ } => {
+ let dir = output_dir.unwrap_or_else(|| {
+ let sf_label = if scale_factor == scale_factor.floor() &&
scale_factor >= 1.0 {
+ format!("sf{}", scale_factor as u64)
+ } else {
+ format!("sf{scale_factor}")
+ };
+ let default = Path::new(env!("CARGO_MANIFEST_DIR"))
+ .join("data")
+ .join(format!("{sf_label}-parquet"));
+ default.to_string_lossy().to_string()
+ });
+ if !is_cloud_url(&dir) {
+ std::fs::create_dir_all(&dir).map_err(|e| {
+ datafusion::error::DataFusionError::Plan(format!(
+ "Failed to create output dir {dir}: {e}"
+ ))
+ })?;
+ }
+ datagen::run_generate(scale_factor, &dir)
+ .await
+ .map_err(|e| {
+
datafusion::error::DataFusionError::Plan(format!("Generation failed: {e}"))
+ })
+ }
+ Commands::RenderCtas {
+ scale_factor,
+ parquet_base,
+ hudi_base,
+ } => {
+ let cfg = config::ScaleFactorConfig::load(scale_factor)
+ .map_err(|e|
datafusion::error::DataFusionError::Plan(format!("{e}")))?;
+ print!("{}", cfg.render_ctas_sql(&parquet_base, &hudi_base));
+ Ok(())
+ }
+ Commands::RenderBenchSql {
+ scale_factor,
+ hudi_base,
+ queries,
+ iterations,
+ } => {
+ let cfg = config::ScaleFactorConfig::load(scale_factor)
+ .map_err(|e|
datafusion::error::DataFusionError::Plan(format!("{e}")))?;
+ let iterations = iterations.unwrap_or(cfg.bench.iterations);
+ let query_nums = parse_query_numbers(queries);
+ let sql = cfg
+ .render_bench_sql(&hudi_base, &query_nums, iterations,
scale_factor)
+ .map_err(|e|
datafusion::error::DataFusionError::Plan(format!("{e}")))?;
+ print!("{sql}");
+ Ok(())
+ }
+ Commands::SparkArgs {
+ scale_factor,
+ command,
+ } => {
+ let cfg = config::ScaleFactorConfig::load(scale_factor)
+ .map_err(|e|
datafusion::error::DataFusionError::Plan(format!("{e}")))?;
+ let args = cfg
+ .render_spark_args(&command)
+ .map_err(datafusion::error::DataFusionError::Plan)?;
+ for arg in args {
+ println!("{arg}");
+ }
+ Ok(())
+ }
+ Commands::BenchDefaults { scale_factor } => {
+ let cfg = config::ScaleFactorConfig::load(scale_factor)
+ .map_err(|e|
datafusion::error::DataFusionError::Plan(format!("{e}")))?;
+ println!("{} {}", cfg.bench.warmup, cfg.bench.iterations);
+ Ok(())
+ }
+ Commands::Bench {
+ hudi_dir,
+ parquet_dir,
+ scale_factor,
+ queries,
+ iterations,
+ warmup,
+ memory_limit,
+ output_dir,
+ engine_label,
+ format_label,
+ display_name,
+ } => {
+ let cfg = config::ScaleFactorConfig::load(scale_factor)
+ .map_err(|e|
datafusion::error::DataFusionError::Plan(format!("{e}")))?;
+ let warmup = warmup.unwrap_or(cfg.bench.warmup);
+ let iterations = iterations.unwrap_or(cfg.bench.iterations);
+ let memory_limit = memory_limit.or(cfg.bench.memory_limit);
+ run_bench(
+ hudi_dir.as_deref(),
+ parquet_dir.as_deref(),
+ scale_factor,
+ queries,
+ warmup,
+ iterations,
+ memory_limit.as_deref(),
+ output_dir.as_deref(),
+ engine_label.as_deref(),
+ format_label.as_deref(),
+ display_name.as_deref(),
+ )
+ .await
+ }
+ Commands::Validate {
+ hudi_dir,
+ parquet_dir,
+ scale_factor,
+ queries,
+ memory_limit,
+ } => {
+ run_validate(
+ &hudi_dir,
+ &parquet_dir,
+ scale_factor,
+ queries,
+ memory_limit.as_deref(),
+ )
+ .await
+ }
+ Commands::ParseSparkOutput {
+ input,
+ output_dir,
+ engine_label,
+ format_label,
+ display_name,
+ scale_factor,
+ } => run_parse_spark_output(
+ input.as_deref(),
+ output_dir.as_deref(),
+ engine_label.as_deref(),
+ format_label.as_deref(),
+ display_name.as_deref(),
+ scale_factor,
+ ),
+ Commands::Compare { results_dir, runs } => run_compare(&results_dir,
&runs),
+ }
+}
+
+/// Parse query numbers from the user-provided comma-separated string, or
return all 22.
+fn parse_query_numbers(queries: Option<String>) -> Vec<usize> {
+ match queries {
+ Some(s) => s
+ .split(',')
+ .filter_map(|q| q.trim().parse::<usize>().ok())
+ .filter(|&q| (1..=NUM_QUERIES).contains(&q))
+ .collect(),
+ None => (1..=NUM_QUERIES).collect(),
+ }
+}
+
+/// Load a SQL query file, applying scale-factor-dependent substitutions.
+fn load_query(query_num: usize, scale_factor: f64) ->
std::result::Result<String, String> {
+ let cache_dir = std::env::var("TPCH_QUERY_DIR")
+ .map(std::path::PathBuf::from)
+ .unwrap_or_else(|_|
Path::new(env!("CARGO_MANIFEST_DIR")).join("queries"));
+ let file_name = format!("q{query_num}.sql");
+ let path = cache_dir.join(&file_name);
+ let sql = fs::read_to_string(&path).map_err(|e| format!("Failed to read
{file_name}: {e}"))?;
+ let q11_fraction = format!("{:.10}", 0.0001 / scale_factor);
+ Ok(sql.replace("${Q11_FRACTION}", &q11_fraction))
+}
+
+/// Register all 8 TPC-H Hudi tables. Supports local paths and cloud URLs.
+async fn register_hudi_tables(ctx: &SessionContext, base_dir: &str) ->
Result<()> {
+ let resolved =
resolve_path(base_dir).map_err(datafusion::error::DataFusionError::Plan)?;
+
+ for table_name in TPCH_TABLES {
+ let table_uri = if is_cloud_url(&resolved) {
+ format!("{}/{table_name}", resolved.trim_end_matches('/'))
+ } else {
+ let table_path = Path::new(&resolved).join(table_name);
+ url::Url::from_file_path(&table_path)
+ .map_err(|_| {
+ datafusion::error::DataFusionError::Plan(format!(
+ "Failed to create file URL for {}",
+ table_path.display()
+ ))
+ })?
+ .to_string()
+ };
+ let hudi = HudiDataSource::new(&table_uri).await?;
+ ctx.register_table(*table_name, Arc::new(hudi))?;
+ }
+ Ok(())
+}
+
+/// Register all 8 TPC-H parquet tables. Supports local paths and cloud URLs.
+async fn register_parquet_tables(ctx: &SessionContext, base_dir: &str) ->
Result<()> {
+ let resolved =
resolve_path(base_dir).map_err(datafusion::error::DataFusionError::Plan)?;
+
+ if is_cloud_url(&resolved) {
+ register_cloud_store(ctx, &resolved)?;
+ }
+
+ for table_name in TPCH_TABLES {
+ let table_path = if is_cloud_url(&resolved) {
+ format!("{}/{table_name}", resolved.trim_end_matches('/'))
+ } else {
+ Path::new(&resolved)
+ .join(table_name)
+ .to_string_lossy()
+ .to_string()
+ };
+ ctx.register_parquet(*table_name, &table_path, Default::default())
+ .await?;
+ }
+ Ok(())
+}
+
+/// Collect a DataFrame into a Vec of RecordBatches.
+async fn collect_results(df: DataFrame) -> Result<Vec<RecordBatch>> {
+ df.collect().await
+}
+
+/// Benchmark a single source (hudi or parquet) and return per-query timings
and last batches.
+async fn bench_source(
+ ctx: &SessionContext,
+ query_nums: &[usize],
+ warmup: usize,
+ iterations: usize,
+ scale_factor: f64,
+) -> Vec<QueryResult> {
+ let total_runs = warmup + iterations;
+ let mut results = Vec::new();
+
+ for query_num in query_nums {
+ let sql = match load_query(*query_num, scale_factor) {
+ Ok(s) => s,
+ Err(e) => {
+ results.push(QueryResult {
+ query_num: *query_num,
+ timings_ms: vec![],
+ last_batches: vec![],
+ error: Some(e),
+ });
+ continue;
+ }
+ };
+
+ let mut timings_ms: Vec<f64> = Vec::with_capacity(iterations);
+ let mut last_batches: Vec<RecordBatch> = Vec::new();
+ let mut error = None;
+
+ // Strip SQL comment lines before splitting, so semicolons inside
+ // comments (e.g., license headers) don't produce spurious empty
statements.
+ let sql_no_comments: String = sql
+ .lines()
+ .filter(|line| !line.trim_start().starts_with("--"))
+ .collect::<Vec<_>>()
+ .join("\n");
+
+ // Split multi-statement queries (e.g., Q15: CREATE VIEW; SELECT; DROP
VIEW)
+ let statements: Vec<&str> = sql_no_comments
+ .split(';')
+ .map(|s| s.trim())
+ .filter(|s| !s.is_empty())
+ .collect();
+
+ for i in 0..total_runs {
+ if i < warmup {
+ print!(" Q{:02} warmup {}/{}...", query_num, i + 1, warmup);
+ } else {
+ print!(
+ " Q{:02} iter {}/{}...",
+ query_num,
+ i - warmup + 1,
+ iterations
+ );
+ }
+
+ let start = Instant::now();
+ let mut iter_error = None;
+ let mut iter_batches = Vec::new();
+
+ for stmt in &statements {
+ match ctx.sql(stmt).await {
+ Ok(df) => match collect_results(df).await {
+ Ok(batches) => {
+ if !batches.is_empty() {
+ iter_batches = batches;
+ }
+ }
+ Err(e) => {
+ iter_error = Some(format!("{e}"));
+ break;
+ }
+ },
+ Err(e) => {
+ iter_error = Some(format!("{e}"));
+ break;
+ }
+ }
+ }
+
+ let elapsed = start.elapsed().as_secs_f64() * 1000.0;
+
+ if let Some(e) = iter_error {
+ println!(" ERROR");
+ error = Some(e);
+ break;
+ }
+
+ println!(" {elapsed:.1}ms");
+
+ if i >= warmup {
+ timings_ms.push(elapsed);
+ }
+ if i == total_runs - 1 {
+ last_batches = iter_batches;
+ }
+ }
+
+ results.push(QueryResult {
+ query_num: *query_num,
+ timings_ms,
+ last_batches,
+ error,
+ });
+ }
+
+ results
+}
+
+struct QueryResult {
+ query_num: usize,
+ timings_ms: Vec<f64>,
+ last_batches: Vec<RecordBatch>,
+ error: Option<String>,
+}
+
+struct TimingStats {
+ min: f64,
+ median: f64,
+ mean: f64,
+ max: f64,
+}
+
+fn compute_stats(timings: &[f64]) -> Option<TimingStats> {
+ if timings.is_empty() {
+ return None;
+ }
+ let mut sorted = timings.to_vec();
+ sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
+ let min = sorted[0];
+ let max = sorted[sorted.len() - 1];
+ let mean = sorted.iter().sum::<f64>() / sorted.len() as f64;
+ let median = if sorted.len() % 2 == 0 {
+ let mid = sorted.len() / 2;
+ (sorted[mid - 1] + sorted[mid]) / 2.0
+ } else {
+ sorted[sorted.len() / 2]
+ };
+ Some(TimingStats {
+ min,
+ median,
+ mean,
+ max,
+ })
+}
+
+#[derive(Serialize, Deserialize)]
+struct PersistedQueryStats {
+ avg_ms: f64,
+ min_ms: f64,
+ median_ms: f64,
+ max_ms: f64,
+}
+
+#[derive(Serialize, Deserialize)]
+struct PersistedResults {
+ engine: String,
+ #[serde(default)]
+ display_name: Option<String>,
+ format: String,
+ scale_factor: f64,
+ timestamp: u64,
+ queries: BTreeMap<String, PersistedQueryStats>,
+}
+
+impl PersistedResults {
+ fn label(&self) -> &str {
+ self.display_name.as_deref().unwrap_or(&self.engine)
+ }
+}
+
+fn format_sf_label(sf: f64) -> String {
+ if sf == sf.floor() && sf >= 1.0 {
+ format!("sf{}", sf as u64)
+ } else {
+ format!("sf{sf}")
+ }
+}
+
+fn save_results(
+ results: &[QueryResult],
+ engine: &str,
+ display_name: Option<&str>,
+ format_name: &str,
+ scale_factor: f64,
+ output_dir: &str,
+) -> std::result::Result<(), String> {
+ let mut queries = BTreeMap::new();
+ for r in results {
+ if r.error.is_some() {
+ continue;
+ }
+ if let Some(stats) = compute_stats(&r.timings_ms) {
+ queries.insert(
+ r.query_num.to_string(),
+ PersistedQueryStats {
+ avg_ms: stats.mean,
+ min_ms: stats.min,
+ median_ms: stats.median,
+ max_ms: stats.max,
+ },
+ );
+ }
+ }
+
+ let timestamp = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs();
+
+ let persisted = PersistedResults {
+ engine: engine.to_string(),
+ display_name: display_name.map(|s| s.to_string()),
+ format: format_name.to_string(),
+ scale_factor,
+ timestamp,
+ queries,
+ };
+
+ fs::create_dir_all(output_dir)
+ .map_err(|e| format!("Failed to create output dir {output_dir}:
{e}"))?;
+
+ let sf_label = format_sf_label(scale_factor);
+ let filename = format!("{engine}_{format_name}_{sf_label}.json");
+ let path = Path::new(output_dir).join(&filename);
+
+ let json = serde_json::to_string_pretty(&persisted)
+ .map_err(|e| format!("Failed to serialize results: {e}"))?;
+ fs::write(&path, json).map_err(|e| format!("Failed to write {}: {e}",
path.display()))?;
+
+ println!("Results saved to {}", path.display());
+ Ok(())
+}
+
+fn load_results(path: &str) -> std::result::Result<PersistedResults, String> {
+ let content = fs::read_to_string(path).map_err(|e| format!("Failed to read
{path}: {e}"))?;
+ serde_json::from_str(&content).map_err(|e| format!("Failed to parse
{path}: {e}"))
+}
+
+/// Run the benchmark against Hudi, Parquet, or both.
+#[allow(clippy::too_many_arguments)]
+async fn run_bench(
+ hudi_dir: Option<&str>,
+ parquet_dir: Option<&str>,
+ scale_factor: f64,
+ queries: Option<String>,
+ warmup: usize,
+ iterations: usize,
+ memory_limit: Option<&str>,
+ output_dir: Option<&str>,
+ engine_label: Option<&str>,
+ format_label: Option<&str>,
+ display_name: Option<&str>,
+) -> Result<()> {
+ if hudi_dir.is_none() && parquet_dir.is_none() {
+ return Err(datafusion::error::DataFusionError::Plan(
+ "At least one of --hudi-dir or --parquet-dir must be
provided".to_string(),
+ ));
+ }
+
+ let query_nums = parse_query_numbers(queries);
+
+ if let Some(limit) = memory_limit {
+ println!("DataFusion memory limit: {limit}");
+ }
+ println!("Warmup: {warmup} iteration(s), Measured: {iterations}
iteration(s)");
+
+ if let Some(hudi_dir) = hudi_dir {
+ let ctx = create_session_context(memory_limit)
+ .map_err(datafusion::error::DataFusionError::Plan)?;
+ println!("Registering Hudi tables from {hudi_dir}");
+ register_hudi_tables(&ctx, hudi_dir).await?;
+ println!("Benchmarking Hudi...");
+ let results = bench_source(&ctx, &query_nums, warmup, iterations,
scale_factor).await;
+ print_single_table("Hudi", &results);
+ if let Some(dir) = output_dir {
+ let engine = engine_label.unwrap_or("datafusion");
+ let fmt = format_label.unwrap_or("hudi");
+ save_results(&results, engine, display_name, fmt, scale_factor,
dir)
+ .map_err(datafusion::error::DataFusionError::Plan)?;
+ }
+ }
+
+ if let Some(parquet_dir) = parquet_dir {
+ let ctx = create_session_context(memory_limit)
+ .map_err(datafusion::error::DataFusionError::Plan)?;
+ println!("Registering Parquet tables from {parquet_dir}");
+ register_parquet_tables(&ctx, parquet_dir).await?;
+ println!("Benchmarking Parquet...");
+ let results = bench_source(&ctx, &query_nums, warmup, iterations,
scale_factor).await;
+ print_single_table("Parquet", &results);
+ if let Some(dir) = output_dir {
+ let engine = engine_label.unwrap_or("datafusion");
+ let fmt = format_label.unwrap_or("parquet");
+ save_results(&results, engine, display_name, fmt, scale_factor,
dir)
+ .map_err(datafusion::error::DataFusionError::Plan)?;
+ }
+ }
+
+ Ok(())
+}
+
+/// Run validation: query both Hudi and Parquet once, compare results.
+async fn run_validate(
+ hudi_dir: &str,
+ parquet_dir: &str,
+ scale_factor: f64,
+ queries: Option<String>,
+ memory_limit: Option<&str>,
+) -> Result<()> {
+ let query_nums = parse_query_numbers(queries);
+
+ if let Some(limit) = memory_limit {
+ println!("DataFusion memory limit: {limit}");
+ }
+
+ println!("Registering Hudi tables from {hudi_dir}");
+ let hudi_ctx =
+
create_session_context(memory_limit).map_err(datafusion::error::DataFusionError::Plan)?;
+ register_hudi_tables(&hudi_ctx, hudi_dir).await?;
+
+ println!("Registering Parquet tables from {parquet_dir}");
+ let parquet_ctx =
+
create_session_context(memory_limit).map_err(datafusion::error::DataFusionError::Plan)?;
+ register_parquet_tables(&parquet_ctx, parquet_dir).await?;
+
+ println!("Running Hudi queries...");
+ let hudi_results = bench_source(&hudi_ctx, &query_nums, 0, 1,
scale_factor).await;
+
+ println!("Running Parquet queries...");
+ let parquet_results = bench_source(&parquet_ctx, &query_nums, 0, 1,
scale_factor).await;
+
+ print_validation_table(&query_nums, &hudi_results, &parquet_results);
+
+ Ok(())
+}
+
+/// Parse Spark benchmark JSON output into a timing table.
+fn run_parse_spark_output(
+ input: Option<&str>,
+ output_dir: Option<&str>,
+ engine_label: Option<&str>,
+ format_label: Option<&str>,
+ display_name: Option<&str>,
+ scale_factor: f64,
+) -> Result<()> {
+ let reader: Box<dyn BufRead> = match input {
+ Some(path) => {
+ let file = fs::File::open(path).map_err(|e| {
+ datafusion::error::DataFusionError::Plan(format!("Failed to
open {path}: {e}"))
+ })?;
+ Box::new(std::io::BufReader::new(file))
+ }
+ None => Box::new(std::io::BufReader::new(std::io::stdin())),
+ };
+
+ let results = parse_spark_timings(reader);
+ if results.is_empty() {
+ println!("No benchmark data found in input.");
+ } else {
+ print_single_table("Spark", &results);
+ if let Some(dir) = output_dir {
+ let engine = engine_label.unwrap_or("spark");
+ let fmt = format_label.unwrap_or("hudi");
+ save_results(&results, engine, display_name, fmt, scale_factor,
dir)
+ .map_err(datafusion::error::DataFusionError::Plan)?;
+ }
+ }
+ Ok(())
+}
+
+/// Parse JSON lines from the PySpark bench script.
+///
+/// Each line is: {"query": N, "elapsed_ms": X.X}
+/// Warmup iterations are already excluded by the Python script.
+fn parse_spark_timings(reader: Box<dyn BufRead>) -> Vec<QueryResult> {
+ let mut all_timings: BTreeMap<usize, Vec<f64>> = BTreeMap::new();
+
+ for line in reader.lines().map_while(|l| l.ok()) {
+ if let Ok(v) = serde_json::from_str::<serde_json::Value>(&line) {
+ if let (Some(q), Some(ms)) = (v["query"].as_u64(),
v["elapsed_ms"].as_f64()) {
+ all_timings.entry(q as usize).or_default().push(ms);
+ }
+ }
+ }
+
+ all_timings
+ .into_iter()
+ .map(|(q, times)| QueryResult {
+ query_num: q,
+ timings_ms: times,
+ last_batches: vec![],
+ error: None,
+ })
+ .collect()
+}
+
+/// Compare persisted benchmark results and render terminal bar charts.
+fn run_compare(results_dir: &str, runs: &str) -> Result<()> {
+ let stems: Vec<&str> = runs.split(',').map(|s| s.trim()).collect();
+ if stems.is_empty() {
+ return Err(datafusion::error::DataFusionError::Plan(
+ "No runs specified".to_string(),
+ ));
+ }
+
+ let mut loaded: Vec<PersistedResults> = Vec::new();
+ for stem in &stems {
+ let path = format!("{results_dir}/{stem}.json");
+ let r =
load_results(&path).map_err(datafusion::error::DataFusionError::Plan)?;
+ loaded.push(r);
+ }
+
+ // Collect all query numbers across all runs
+ let mut all_queries = BTreeSet::new();
+ for r in &loaded {
+ for q in r.queries.keys() {
+ if let Ok(n) = q.parse::<usize>() {
+ all_queries.insert(n);
+ }
+ }
+ }
+
+ if all_queries.is_empty() {
+ println!("No query data found in the provided result files.");
+ return Ok(());
+ }
+
+ // Find global max avg_ms for bar scaling
+ let global_max = loaded
+ .iter()
+ .flat_map(|r| r.queries.values().map(|s| s.avg_ms))
+ .fold(0.0_f64, f64::max);
+
+ if global_max == 0.0 {
+ println!("All query timings are zero.");
+ return Ok(());
+ }
+
+ let bar_width: usize = 40;
+ let engine_names: Vec<&str> = loaded.iter().map(|r| r.label()).collect();
+ let max_name_len = engine_names.iter().map(|n| n.len()).max().unwrap_or(0);
+
+ println!();
+ println!("TPC-H Query Runtime Comparison");
+ println!("{}", "=".repeat(max_name_len + 6 + bar_width + 16));
+ println!();
+
+ for q in &all_queries {
+ let q_str = q.to_string();
+ for (i, r) in loaded.iter().enumerate() {
+ let label = if i == 0 {
+ format!("Q{q:02} {:<width$}", r.label(), width = max_name_len)
+ } else {
+ format!(" {:<width$}", r.label(), width = max_name_len)
+ };
+
+ if let Some(stats) = r.queries.get(&q_str) {
+ let filled = ((stats.avg_ms / global_max) * bar_width as
f64).round() as usize;
+ let filled = filled.min(bar_width);
+ let empty = bar_width - filled;
+ println!(
+ "{label} |{}{} | {:>9.1} ms",
+ "\u{2588}".repeat(filled),
+ " ".repeat(empty),
+ stats.avg_ms,
+ );
+ } else {
+ println!("{label} |{} | N/A", " ".repeat(bar_width),);
+ }
+ }
+ println!();
+ }
+
+ // Summary: Total and Geometric Mean as bar charts
+ let mut totals: Vec<(String, f64)> = Vec::new();
+ let mut geomeans: Vec<(String, f64)> = Vec::new();
+ for r in &loaded {
+ let total: f64 = r.queries.values().map(|s| s.avg_ms).sum();
+ totals.push((r.label().to_string(), total));
+
+ let values: Vec<f64> = r.queries.values().map(|s| s.avg_ms).collect();
+ if !values.is_empty() && values.iter().all(|v| *v > 0.0) {
+ let log_sum: f64 = values.iter().map(|v| v.ln()).sum::<f64>();
+ let geomean = (log_sum / values.len() as f64).exp();
+ geomeans.push((r.label().to_string(), geomean));
+ }
+ }
+
+ println!("Summary");
+ println!("{}", "-".repeat(max_name_len + 6 + bar_width + 16));
+ println!();
+
+ // Total runtime bars
+ let total_max = totals.iter().map(|(_, v)| *v).fold(0.0_f64, f64::max);
+ if total_max > 0.0 {
+ for (i, (engine, total)) in totals.iter().enumerate() {
+ let label = if i == 0 {
+ format!("Tot {engine:<max_name_len$}")
+ } else {
+ format!(" {engine:<max_name_len$}")
+ };
+ let filled = ((total / total_max) * bar_width as f64).round() as
usize;
+ let filled = filled.min(bar_width);
+ let empty = bar_width - filled;
+ println!(
+ "{label} |{}{} | {:>9.1} ms",
+ "\u{2588}".repeat(filled),
+ " ".repeat(empty),
+ total,
+ );
+ }
+ println!();
+ }
+
+ // Geometric mean bars
+ if !geomeans.is_empty() {
+ let geomean_max = geomeans.iter().map(|(_, v)| *v).fold(0.0_f64,
f64::max);
+ if geomean_max > 0.0 {
+ for (i, (engine, geomean)) in geomeans.iter().enumerate() {
+ let label = if i == 0 {
+ format!("Geo {engine:<max_name_len$}")
+ } else {
+ format!(" {engine:<max_name_len$}")
+ };
+ let filled = ((geomean / geomean_max) * bar_width as
f64).round() as usize;
+ let filled = filled.min(bar_width);
+ let empty = bar_width - filled;
+ println!(
+ "{label} |{}{} | {:>9.1} ms",
+ "\u{2588}".repeat(filled),
+ " ".repeat(empty),
+ geomean,
+ );
+ }
+ println!();
+ }
+ }
+
+ Ok(())
+}
+
+fn print_single_table(label: &str, results: &[QueryResult]) {
+ let mut table = Table::new();
+ table.set_header(vec![
+ Cell::new("Query"),
+ Cell::new(format!("{label} Min (ms)")),
+ Cell::new(format!("{label} Median (ms)")),
+ Cell::new(format!("{label} Mean (ms)")),
+ Cell::new(format!("{label} Max (ms)")),
+ Cell::new("Status"),
+ ]);
+
+ for r in results {
+ if let Some(ref e) = r.error {
+ table.add_row(vec![
+ Cell::new(format!("Q{:02}", r.query_num)),
+ Cell::new("-"),
+ Cell::new("-"),
+ Cell::new("-"),
+ Cell::new("-"),
+ Cell::new(format!("ERROR: {e}")),
+ ]);
+ } else if let Some(stats) = compute_stats(&r.timings_ms) {
+ table.add_row(vec![
+ Cell::new(format!("Q{:02}", r.query_num)),
+ Cell::new(format!("{:.1}", stats.min)),
+ Cell::new(format!("{:.1}", stats.median)),
+ Cell::new(format!("{:.1}", stats.mean)),
+ Cell::new(format!("{:.1}", stats.max)),
+ Cell::new("OK"),
+ ]);
+ }
+ }
+
+ println!("{table}");
+}
+
+fn print_validation_table(
+ query_nums: &[usize],
+ hudi_results: &[QueryResult],
+ parquet_results: &[QueryResult],
+) {
+ let mut table = Table::new();
+ table.set_header(vec![
+ Cell::new("Query"),
+ Cell::new("Hudi (ms)"),
+ Cell::new("Parquet (ms)"),
+ Cell::new("Result"),
+ ]);
+
+ for (i, qn) in query_nums.iter().enumerate() {
+ let hr = &hudi_results[i];
+ let pr = &parquet_results[i];
+
+ let h_err = hr.error.as_deref();
+ let p_err = pr.error.as_deref();
+
+ if h_err.is_some() || p_err.is_some() {
+ let err_msg = h_err.or(p_err).unwrap_or("unknown error");
+ table.add_row(vec![
+ Cell::new(format!("Q{qn:02}")),
+ Cell::new(if h_err.is_some() { "-" } else { "OK" }),
+ Cell::new(if p_err.is_some() { "-" } else { "OK" }),
+ Cell::new(format!("ERROR: {err_msg}")),
+ ]);
+ continue;
+ }
+
+ let h_ms = hr
+ .timings_ms
+ .first()
+ .map(|t| format!("{t:.1}"))
+ .unwrap_or("-".into());
+ let p_ms = pr
+ .timings_ms
+ .first()
+ .map(|t| format!("{t:.1}"))
+ .unwrap_or("-".into());
+ let validation = compare_batches(&hr.last_batches, &pr.last_batches);
+
+ table.add_row(vec![
+ Cell::new(format!("Q{qn:02}")),
+ Cell::new(h_ms),
+ Cell::new(p_ms),
+ Cell::new(validation),
+ ]);
+ }
+
+ println!("{table}");
+}
+
+/// Compare two sets of record batches for correctness validation.
+fn compare_batches(actual: &[RecordBatch], expected: &[RecordBatch]) -> String
{
+ let actual_rows = match batches_to_csv_rows(actual) {
+ Ok(r) => r,
+ Err(e) => return format!("ERROR: {e}"),
+ };
+ let expected_rows = match batches_to_csv_rows(expected) {
+ Ok(r) => r,
+ Err(e) => return format!("ERROR: {e}"),
+ };
+
+ if actual_rows.len() != expected_rows.len() {
+ return format!(
+ "FAIL (rows: {} vs {})",
+ actual_rows.len(),
+ expected_rows.len()
+ );
+ }
+
+ let mut actual_sorted = actual_rows;
+ actual_sorted.sort();
+ let mut expected_sorted = expected_rows;
+ expected_sorted.sort();
+
+ for (i, (a, e)) in
actual_sorted.iter().zip(expected_sorted.iter()).enumerate() {
+ if !rows_match(a, e) {
+ return format!("FAIL (row {i} mismatch)");
+ }
+ }
+
+ "PASS".to_string()
+}
+
+/// Compare two CSV row strings, using tolerance for floating-point values.
+fn rows_match(actual: &str, expected: &str) -> bool {
+ let actual_cols: Vec<&str> = actual.split(',').collect();
+ let expected_cols: Vec<&str> = expected.split(',').collect();
+
+ if actual_cols.len() != expected_cols.len() {
+ return false;
+ }
+
+ for (a, e) in actual_cols.iter().zip(expected_cols.iter()) {
+ if a == e {
+ continue;
+ }
+ match (a.parse::<f64>(), e.parse::<f64>()) {
+ (Ok(av), Ok(ev)) => {
+ let diff = (av - ev).abs();
+ let max_abs = av.abs().max(ev.abs());
+ if max_abs == 0.0 {
+ if diff > 1e-10 {
+ return false;
+ }
+ } else if diff / max_abs > 1e-6 {
+ return false;
+ }
+ }
+ _ => return false,
+ }
+ }
+
+ true
+}
+
+/// Convert record batches to CSV-like row strings for comparison.
+fn batches_to_csv_rows(batches: &[RecordBatch]) ->
std::result::Result<Vec<String>, String> {
+ let mut rows = Vec::new();
+ let fmt_opts = FormatOptions::default();
+
+ for batch in batches {
+ let formatters: Vec<ArrayFormatter> = batch
+ .columns()
+ .iter()
+ .map(|col| ArrayFormatter::try_new(col.as_ref(), &fmt_opts))
+ .collect::<std::result::Result<Vec<_>, _>>()
+ .map_err(|e| format!("Failed to create formatter: {e}"))?;
+
+ for row_idx in 0..batch.num_rows() {
+ let cols: Vec<String> = batch
+ .schema()
+ .fields()
+ .iter()
+ .enumerate()
+ .map(|(col_idx, field)| {
+ if batch.column(col_idx).is_null(row_idx) {
+ return "".to_string();
+ }
+ match field.data_type() {
+ DataType::Float32 | DataType::Float64 |
DataType::Decimal128(_, _) => {
+ formatters[col_idx].value(row_idx).to_string()
+ }
+ _ => formatters[col_idx].value(row_idx).to_string(),
+ }
+ })
+ .collect();
+ rows.push(cols.join(","));
+ }
+ }
+
+ Ok(rows)
+}