This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 5ea1d31ca6 Add sort integration benchmark (#13306)
5ea1d31ca6 is described below
commit 5ea1d31ca6da7136ee5e9786f817c6d5baff5f13
Author: Yongting You <[email protected]>
AuthorDate: Sat Nov 16 04:06:39 2024 +0800
Add sort integration benchmark (#13306)
* Add sort integration benchmark
* clippy
* review
---
benchmarks/README.md | 24 ++++
benchmarks/bench.sh | 18 +++
benchmarks/src/bin/dfbench.rs | 4 +-
benchmarks/src/lib.rs | 1 +
benchmarks/src/sort_tpch.rs | 320 ++++++++++++++++++++++++++++++++++++++++++
5 files changed, 366 insertions(+), 1 deletion(-)
diff --git a/benchmarks/README.md b/benchmarks/README.md
index a9aa1afb97..cccd7f44f5 100644
--- a/benchmarks/README.md
+++ b/benchmarks/README.md
@@ -330,6 +330,30 @@ steps.
The tests sort the entire dataset using several different sort
orders.
+## Sort TPCH
+
+Test performance of end-to-end sort SQL queries. (While the `Sort` benchmark
focuses on a single sort executor, this benchmark tests how sorting is executed
across multiple CPU cores by benchmarking sorting the whole relational table.)
+
+Sort integration benchmark runs whole table sort queries on TPCH `lineitem`
table, with different characteristics. For example, different number of sort
keys, different sort key cardinality, different number of payload columns, etc.
+
+See [`sort_tpch.rs`](src/sort_tpch.rs) for more details.
+
+### Sort TPCH Benchmark Example Runs
+1. Run all queries with default setting:
+```bash
+ cargo run --release --bin dfbench -- sort-tpch -p
'....../datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_tpch.json'
+```
+
+2. Run a specific query:
+```bash
+ cargo run --release --bin dfbench -- sort-tpch -p
'....../datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_tpch.json' --query 2
+```
+
+3. Run all queries with `bench.sh` script:
+```bash
+./bench.sh run sort_tpch
+```
+
## IMDB
Run Join Order Benchmark (JOB) on IMDB dataset.
diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh
index 47c5d12616..b02bfee245 100755
--- a/benchmarks/bench.sh
+++ b/benchmarks/bench.sh
@@ -75,6 +75,7 @@ tpch10: TPCH inspired benchmark on Scale
Factor (SF) 10 (~10GB),
tpch_mem10: TPCH inspired benchmark on Scale Factor (SF) 10
(~10GB), query from memory
parquet: Benchmark of parquet reader's filtering speed
sort: Benchmark of sorting speed
+sort_tpch: Benchmark of sorting speed for end-to-end sort queries
on TPCH dataset
clickbench_1: ClickBench queries against a single parquet file
clickbench_partitioned: ClickBench queries against a partitioned (100 files)
parquet
clickbench_extended: ClickBench \"inspired\" queries against a single
parquet (DataFusion specific)
@@ -175,6 +176,10 @@ main() {
# same data as for tpch
data_tpch "1"
;;
+ sort_tpch)
+ # same data as for tpch
+ data_tpch "1"
+ ;;
*)
echo "Error: unknown benchmark '$BENCHMARK' for data
generation"
usage
@@ -252,6 +257,9 @@ main() {
external_aggr)
run_external_aggr
;;
+ sort_tpch)
+ run_sort_tpch
+ ;;
*)
echo "Error: unknown benchmark '$BENCHMARK' for run"
usage
@@ -549,6 +557,16 @@ run_external_aggr() {
$CARGO_COMMAND --bin external_aggr -- benchmark --partitions 4
--iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}"
}
+# Runs the sort integration benchmark
+run_sort_tpch() {
+ TPCH_DIR="${DATA_DIR}/tpch_sf1"
+ RESULTS_FILE="${RESULTS_DIR}/sort_tpch.json"
+ echo "RESULTS_FILE: ${RESULTS_FILE}"
+ echo "Running sort tpch benchmark..."
+
+ $CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path
"${TPCH_DIR}" -o "${RESULTS_FILE}"
+}
+
compare_benchmarks() {
BASE_RESULTS_DIR="${SCRIPT_DIR}/results"
diff --git a/benchmarks/src/bin/dfbench.rs b/benchmarks/src/bin/dfbench.rs
index f7b84116e7..81aa5437dd 100644
--- a/benchmarks/src/bin/dfbench.rs
+++ b/benchmarks/src/bin/dfbench.rs
@@ -33,7 +33,7 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
#[global_allocator]
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
-use datafusion_benchmarks::{clickbench, imdb, parquet_filter, sort, tpch};
+use datafusion_benchmarks::{clickbench, imdb, parquet_filter, sort, sort_tpch,
tpch};
#[derive(Debug, StructOpt)]
#[structopt(about = "benchmark command")]
@@ -43,6 +43,7 @@ enum Options {
Clickbench(clickbench::RunOpt),
ParquetFilter(parquet_filter::RunOpt),
Sort(sort::RunOpt),
+ SortTpch(sort_tpch::RunOpt),
Imdb(imdb::RunOpt),
}
@@ -57,6 +58,7 @@ pub async fn main() -> Result<()> {
Options::Clickbench(opt) => opt.run().await,
Options::ParquetFilter(opt) => opt.run().await,
Options::Sort(opt) => opt.run().await,
+ Options::SortTpch(opt) => opt.run().await,
Options::Imdb(opt) => opt.run().await,
}
}
diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs
index 02410e0cfa..2d37d78764 100644
--- a/benchmarks/src/lib.rs
+++ b/benchmarks/src/lib.rs
@@ -20,5 +20,6 @@ pub mod clickbench;
pub mod imdb;
pub mod parquet_filter;
pub mod sort;
+pub mod sort_tpch;
pub mod tpch;
pub mod util;
diff --git a/benchmarks/src/sort_tpch.rs b/benchmarks/src/sort_tpch.rs
new file mode 100644
index 0000000000..4b83b3b888
--- /dev/null
+++ b/benchmarks/src/sort_tpch.rs
@@ -0,0 +1,320 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module provides integration benchmark for sort operation.
+//! It will run different sort SQL queries on TPCH `lineitem` parquet dataset.
+//!
+//! Another `Sort` benchmark focus on single core execution. This benchmark
+//! runs end-to-end sort queries and test the performance on multiple CPU
cores.
+
+use futures::StreamExt;
+use std::path::PathBuf;
+use std::sync::Arc;
+use structopt::StructOpt;
+
+use datafusion::datasource::file_format::parquet::ParquetFormat;
+use datafusion::datasource::listing::{
+ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
+};
+use datafusion::datasource::{MemTable, TableProvider};
+use datafusion::error::Result;
+use datafusion::execution::runtime_env::RuntimeConfig;
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use datafusion::physical_plan::{displayable, execute_stream};
+use datafusion::prelude::*;
+use datafusion_common::instant::Instant;
+use datafusion_common::DEFAULT_PARQUET_EXTENSION;
+
+use crate::util::{BenchmarkRun, CommonOpt};
+
+#[derive(Debug, StructOpt)]
+pub struct RunOpt {
+ /// Common options
+ #[structopt(flatten)]
+ common: CommonOpt,
+
+ /// Sort query number. If not specified, runs all queries
+ #[structopt(short, long)]
+ query: Option<usize>,
+
+ /// Path to data files (lineitem). Only parquet format is supported
+ #[structopt(parse(from_os_str), required = true, short = "p", long =
"path")]
+ path: PathBuf,
+
+ /// Path to JSON benchmark result to be compare using `compare.py`
+ #[structopt(parse(from_os_str), short = "o", long = "output")]
+ output_path: Option<PathBuf>,
+
+ /// Load the data into a MemTable before executing the query
+ #[structopt(short = "m", long = "mem-table")]
+ mem_table: bool,
+}
+
+struct QueryResult {
+ elapsed: std::time::Duration,
+ row_count: usize,
+}
+
+impl RunOpt {
+ const SORT_TABLES: [&'static str; 1] = ["lineitem"];
+
+ /// Sort queries with different characteristics:
+ /// - Sort key with fixed length or variable length (VARCHAR)
+ /// - Sort key with different cardinality
+ /// - Different number of sort keys
+ /// - Different number of payload columns (thin: 1 additional column other
+ /// than sort keys; wide: all columns except sort keys)
+ ///
+ /// DataSet is `lineitem` table in TPCH dataset (16 columns, 6M rows for
+ /// scale factor 1.0, cardinality is counted from SF1 dataset)
+ ///
+ /// Key Columns:
+ /// - Column `l_linenumber`, type: `INTEGER`, cardinality: 7
+ /// - Column `l_suppkey`, type: `BIGINT`, cardinality: 10k
+ /// - Column `l_orderkey`, type: `BIGINT`, cardinality: 1.5M
+ /// - Column `l_comment`, type: `VARCHAR`, cardinality: 4.5M (len is ~26
chars)
+ ///
+ /// Payload Columns:
+ /// - Thin variant: `l_partkey` column with `BIGINT` type (1 column)
+ /// - Wide variant: all columns except for possible key columns (12
columns)
+ const SORT_QUERIES: [&'static str; 10] = [
+ // Q1: 1 sort key (type: INTEGER, cardinality: 7) + 1 payload column
+ r#"
+ SELECT l_linenumber, l_partkey
+ FROM lineitem
+ ORDER BY l_linenumber
+ "#,
+ // Q2: 1 sort key (type: BIGINT, cardinality: 1.5M) + 1 payload column
+ r#"
+ SELECT l_orderkey, l_partkey
+ FROM lineitem
+ ORDER BY l_orderkey
+ "#,
+ // Q3: 1 sort key (type: VARCHAR, cardinality: 4.5M) + 1 payload column
+ r#"
+ SELECT l_comment, l_partkey
+ FROM lineitem
+ ORDER BY l_comment
+ "#,
+ // Q4: 2 sort keys {(BIGINT, 1.5M), (INTEGER, 7)} + 1 payload column
+ r#"
+ SELECT l_orderkey, l_linenumber, l_partkey
+ FROM lineitem
+ ORDER BY l_orderkey, l_linenumber
+ "#,
+ // Q5: 3 sort keys {(INTEGER, 7), (BIGINT, 10k), (BIGINT, 1.5M)} + no
payload column
+ r#"
+ SELECT l_linenumber, l_suppkey, l_orderkey
+ FROM lineitem
+ ORDER BY l_linenumber, l_suppkey, l_orderkey
+ "#,
+ // Q6: 3 sort keys {(INTEGER, 7), (BIGINT, 10k), (BIGINT, 1.5M)} + 1
payload column
+ r#"
+ SELECT l_linenumber, l_suppkey, l_orderkey, l_partkey
+ FROM lineitem
+ ORDER BY l_linenumber, l_suppkey, l_orderkey
+ "#,
+ // Q7: 3 sort keys {(INTEGER, 7), (BIGINT, 10k), (BIGINT, 1.5M)} + 12
all other columns
+ r#"
+ SELECT l_linenumber, l_suppkey, l_orderkey,
+ l_partkey, l_quantity, l_extendedprice, l_discount, l_tax,
+ l_returnflag, l_linestatus, l_shipdate, l_commitdate,
+ l_receiptdate, l_shipinstruct, l_shipmode
+ FROM lineitem
+ ORDER BY l_linenumber, l_suppkey, l_orderkey
+ "#,
+ // Q8: 4 sort keys {(BIGINT, 1.5M), (BIGINT, 10k), (INTEGER, 7),
(VARCHAR, 4.5M)} + no payload column
+ r#"
+ SELECT l_orderkey, l_suppkey, l_linenumber, l_comment
+ FROM lineitem
+ ORDER BY l_orderkey, l_suppkey, l_linenumber, l_comment
+ "#,
+ // Q9: 4 sort keys {(BIGINT, 1.5M), (BIGINT, 10k), (INTEGER, 7),
(VARCHAR, 4.5M)} + 1 payload column
+ r#"
+ SELECT l_orderkey, l_suppkey, l_linenumber, l_comment, l_partkey
+ FROM lineitem
+ ORDER BY l_orderkey, l_suppkey, l_linenumber, l_comment
+ "#,
+ // Q10: 4 sort keys {(BIGINT, 1.5M), (BIGINT, 10k), (INTEGER, 7),
(VARCHAR, 4.5M)} + 12 all other columns
+ r#"
+ SELECT l_orderkey, l_suppkey, l_linenumber, l_comment,
+ l_partkey, l_quantity, l_extendedprice, l_discount, l_tax,
+ l_returnflag, l_linestatus, l_shipdate, l_commitdate,
+ l_receiptdate, l_shipinstruct, l_shipmode
+ FROM lineitem
+ ORDER BY l_orderkey, l_suppkey, l_linenumber, l_comment
+ "#,
+ ];
+
+ /// If query is specified from command line, run only that query.
+ /// Otherwise, run all queries.
+ pub async fn run(&self) -> Result<()> {
+ let mut benchmark_run = BenchmarkRun::new();
+
+ let query_range = match self.query {
+ Some(query_id) => query_id..=query_id,
+ None => 1..=Self::SORT_QUERIES.len(),
+ };
+
+ for query_id in query_range {
+ benchmark_run.start_new_case(&format!("{query_id}"));
+
+ let query_results = self.benchmark_query(query_id).await?;
+ for iter in query_results {
+ benchmark_run.write_iter(iter.elapsed, iter.row_count);
+ }
+ }
+
+ benchmark_run.maybe_write_json(self.output_path.as_ref())?;
+
+ Ok(())
+ }
+
+ /// Benchmark query `query_id` in `SORT_QUERIES`
+ async fn benchmark_query(&self, query_id: usize) ->
Result<Vec<QueryResult>> {
+ let config = self.common.config();
+
+ let runtime_config = RuntimeConfig::new().build_arc()?;
+ let ctx = SessionContext::new_with_config_rt(config, runtime_config);
+
+ // register tables
+ self.register_tables(&ctx).await?;
+
+ let mut millis = vec![];
+ // run benchmark
+ let mut query_results = vec![];
+ for i in 0..self.iterations() {
+ let start = Instant::now();
+
+ let query_idx = query_id - 1; // 1-indexed -> 0-indexed
+ let sql = Self::SORT_QUERIES[query_idx];
+
+ let row_count = self.execute_query(&ctx, sql).await?;
+
+ let elapsed = start.elapsed(); //.as_secs_f64() * 1000.0;
+ let ms = elapsed.as_secs_f64() * 1000.0;
+ millis.push(ms);
+
+ println!(
+ "Q{query_id} iteration {i} took {ms:.1} ms and returned
{row_count} rows"
+ );
+ query_results.push(QueryResult { elapsed, row_count });
+ }
+
+ let avg = millis.iter().sum::<f64>() / millis.len() as f64;
+ println!("Q{query_id} avg time: {avg:.2} ms");
+
+ Ok(query_results)
+ }
+
+ async fn register_tables(&self, ctx: &SessionContext) -> Result<()> {
+ for table in Self::SORT_TABLES {
+ let table_provider = { self.get_table(ctx, table).await? };
+
+ if self.mem_table {
+ println!("Loading table '{table}' into memory");
+ let start = Instant::now();
+ let memtable =
+ MemTable::load(table_provider, Some(self.partitions()),
&ctx.state())
+ .await?;
+ println!(
+ "Loaded table '{}' into memory in {} ms",
+ table,
+ start.elapsed().as_millis()
+ );
+ ctx.register_table(table, Arc::new(memtable))?;
+ } else {
+ ctx.register_table(table, table_provider)?;
+ }
+ }
+ Ok(())
+ }
+
+ async fn execute_query(&self, ctx: &SessionContext, sql: &str) ->
Result<usize> {
+ let debug = self.common.debug;
+ let plan = ctx.sql(sql).await?;
+ let (state, plan) = plan.into_parts();
+
+ if debug {
+ println!("=== Logical plan ===\n{plan}\n");
+ }
+
+ let plan = state.optimize(&plan)?;
+ if debug {
+ println!("=== Optimized logical plan ===\n{plan}\n");
+ }
+ let physical_plan = state.create_physical_plan(&plan).await?;
+ if debug {
+ println!(
+ "=== Physical plan ===\n{}\n",
+ displayable(physical_plan.as_ref()).indent(true)
+ );
+ }
+
+ let mut row_count = 0;
+
+ let mut stream = execute_stream(physical_plan.clone(),
state.task_ctx())?;
+ while let Some(batch) = stream.next().await {
+ row_count += batch.unwrap().num_rows();
+ }
+
+ if debug {
+ println!(
+ "=== Physical plan with metrics ===\n{}\n",
+ DisplayableExecutionPlan::with_metrics(physical_plan.as_ref())
+ .indent(true)
+ );
+ }
+
+ Ok(row_count)
+ }
+
+ async fn get_table(
+ &self,
+ ctx: &SessionContext,
+ table: &str,
+ ) -> Result<Arc<dyn TableProvider>> {
+ let path = self.path.to_str().unwrap();
+
+ // Obtain a snapshot of the SessionState
+ let state = ctx.state();
+ let path = format!("{path}/{table}");
+ let format = Arc::new(
+ ParquetFormat::default()
+ .with_options(ctx.state().table_options().parquet.clone()),
+ );
+ let extension = DEFAULT_PARQUET_EXTENSION;
+
+ let options = ListingOptions::new(format)
+ .with_file_extension(extension)
+ .with_collect_stat(state.config().collect_statistics());
+
+ let table_path = ListingTableUrl::parse(path)?;
+ let config =
ListingTableConfig::new(table_path).with_listing_options(options);
+ let config = config.infer_schema(&state).await?;
+
+ Ok(Arc::new(ListingTable::try_new(config)?))
+ }
+
+ fn iterations(&self) -> usize {
+ self.common.iterations
+ }
+
+ fn partitions(&self) -> usize {
+ self.common.partitions.unwrap_or(num_cpus::get())
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]