This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 5ea1d31ca6 Add sort integration benchmark (#13306)
5ea1d31ca6 is described below

commit 5ea1d31ca6da7136ee5e9786f817c6d5baff5f13
Author: Yongting You <[email protected]>
AuthorDate: Sat Nov 16 04:06:39 2024 +0800

    Add sort integration benchmark (#13306)
    
    * Add sort integration benchmark
    
    * clippy
    
    * review
---
 benchmarks/README.md          |  24 ++++
 benchmarks/bench.sh           |  18 +++
 benchmarks/src/bin/dfbench.rs |   4 +-
 benchmarks/src/lib.rs         |   1 +
 benchmarks/src/sort_tpch.rs   | 320 ++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 366 insertions(+), 1 deletion(-)

diff --git a/benchmarks/README.md b/benchmarks/README.md
index a9aa1afb97..cccd7f44f5 100644
--- a/benchmarks/README.md
+++ b/benchmarks/README.md
@@ -330,6 +330,30 @@ steps.
 The tests sort the entire dataset using several different sort
 orders.
 
+## Sort TPCH
+
+Test performance of end-to-end sort SQL queries. (While the `Sort` benchmark 
focuses on a single sort executor, this benchmark tests how sorting is executed 
across multiple CPU cores by benchmarking sorting the whole relational table.)
+
+Sort integration benchmark runs whole table sort queries on TPCH `lineitem` 
table, with different characteristics. For example, different number of sort 
keys, different sort key cardinality, different number of payload columns, etc.
+
+See [`sort_tpch.rs`](src/sort_tpch.rs) for more details.
+
+### Sort TPCH Benchmark Example Runs
+1. Run all queries with default setting:
+```bash
+ cargo run --release --bin dfbench -- sort-tpch -p 
'....../datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_tpch.json'
+```
+
+2. Run a specific query:
+```bash
+ cargo run --release --bin dfbench -- sort-tpch -p 
'....../datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_tpch.json' --query 2
+```
+
+3. Run all queries with `bench.sh` script:
+```bash
+./bench.sh run sort_tpch
+```
+
 ## IMDB
 
 Run Join Order Benchmark (JOB) on IMDB dataset.
diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh
index 47c5d12616..b02bfee245 100755
--- a/benchmarks/bench.sh
+++ b/benchmarks/bench.sh
@@ -75,6 +75,7 @@ tpch10:                 TPCH inspired benchmark on Scale 
Factor (SF) 10 (~10GB),
 tpch_mem10:             TPCH inspired benchmark on Scale Factor (SF) 10 
(~10GB), query from memory
 parquet:                Benchmark of parquet reader's filtering speed
 sort:                   Benchmark of sorting speed
+sort_tpch:              Benchmark of sorting speed for end-to-end sort queries 
on TPCH dataset
 clickbench_1:           ClickBench queries against a single parquet file
 clickbench_partitioned: ClickBench queries against a partitioned (100 files) 
parquet
 clickbench_extended:    ClickBench \"inspired\" queries against a single 
parquet (DataFusion specific)
@@ -175,6 +176,10 @@ main() {
                     # same data as for tpch
                     data_tpch "1"
                     ;;
+                sort_tpch)
+                    # same data as for tpch
+                    data_tpch "1"
+                    ;;
                 *)
                     echo "Error: unknown benchmark '$BENCHMARK' for data 
generation"
                     usage
@@ -252,6 +257,9 @@ main() {
                 external_aggr)
                     run_external_aggr
                     ;;
+                sort_tpch)
+                    run_sort_tpch
+                    ;;
                 *)
                     echo "Error: unknown benchmark '$BENCHMARK' for run"
                     usage
@@ -549,6 +557,16 @@ run_external_aggr() {
     $CARGO_COMMAND --bin external_aggr -- benchmark --partitions 4 
--iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}"
 }
 
+# Runs the sort integration benchmark
+run_sort_tpch() {
+    TPCH_DIR="${DATA_DIR}/tpch_sf1"
+    RESULTS_FILE="${RESULTS_DIR}/sort_tpch.json"
+    echo "RESULTS_FILE: ${RESULTS_FILE}"
+    echo "Running sort tpch benchmark..."
+
+    $CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path 
"${TPCH_DIR}" -o "${RESULTS_FILE}"
+}
+
 
 compare_benchmarks() {
     BASE_RESULTS_DIR="${SCRIPT_DIR}/results"
diff --git a/benchmarks/src/bin/dfbench.rs b/benchmarks/src/bin/dfbench.rs
index f7b84116e7..81aa5437dd 100644
--- a/benchmarks/src/bin/dfbench.rs
+++ b/benchmarks/src/bin/dfbench.rs
@@ -33,7 +33,7 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
 #[global_allocator]
 static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
-use datafusion_benchmarks::{clickbench, imdb, parquet_filter, sort, tpch};
+use datafusion_benchmarks::{clickbench, imdb, parquet_filter, sort, sort_tpch, 
tpch};
 
 #[derive(Debug, StructOpt)]
 #[structopt(about = "benchmark command")]
@@ -43,6 +43,7 @@ enum Options {
     Clickbench(clickbench::RunOpt),
     ParquetFilter(parquet_filter::RunOpt),
     Sort(sort::RunOpt),
+    SortTpch(sort_tpch::RunOpt),
     Imdb(imdb::RunOpt),
 }
 
@@ -57,6 +58,7 @@ pub async fn main() -> Result<()> {
         Options::Clickbench(opt) => opt.run().await,
         Options::ParquetFilter(opt) => opt.run().await,
         Options::Sort(opt) => opt.run().await,
+        Options::SortTpch(opt) => opt.run().await,
         Options::Imdb(opt) => opt.run().await,
     }
 }
diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs
index 02410e0cfa..2d37d78764 100644
--- a/benchmarks/src/lib.rs
+++ b/benchmarks/src/lib.rs
@@ -20,5 +20,6 @@ pub mod clickbench;
 pub mod imdb;
 pub mod parquet_filter;
 pub mod sort;
+pub mod sort_tpch;
 pub mod tpch;
 pub mod util;
diff --git a/benchmarks/src/sort_tpch.rs b/benchmarks/src/sort_tpch.rs
new file mode 100644
index 0000000000..4b83b3b888
--- /dev/null
+++ b/benchmarks/src/sort_tpch.rs
@@ -0,0 +1,320 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module provides integration benchmark for sort operation.
+//! It will run different sort SQL queries on TPCH `lineitem` parquet dataset.
+//!
+//! Another `Sort` benchmark focus on single core execution. This benchmark
+//! runs end-to-end sort queries and test the performance on multiple CPU 
cores.
+
+use futures::StreamExt;
+use std::path::PathBuf;
+use std::sync::Arc;
+use structopt::StructOpt;
+
+use datafusion::datasource::file_format::parquet::ParquetFormat;
+use datafusion::datasource::listing::{
+    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
+};
+use datafusion::datasource::{MemTable, TableProvider};
+use datafusion::error::Result;
+use datafusion::execution::runtime_env::RuntimeConfig;
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use datafusion::physical_plan::{displayable, execute_stream};
+use datafusion::prelude::*;
+use datafusion_common::instant::Instant;
+use datafusion_common::DEFAULT_PARQUET_EXTENSION;
+
+use crate::util::{BenchmarkRun, CommonOpt};
+
+#[derive(Debug, StructOpt)]
+pub struct RunOpt {
+    /// Common options
+    #[structopt(flatten)]
+    common: CommonOpt,
+
+    /// Sort query number. If not specified, runs all queries
+    #[structopt(short, long)]
+    query: Option<usize>,
+
+    /// Path to data files (lineitem). Only parquet format is supported
+    #[structopt(parse(from_os_str), required = true, short = "p", long = 
"path")]
+    path: PathBuf,
+
+    /// Path to JSON benchmark result to be compare using `compare.py`
+    #[structopt(parse(from_os_str), short = "o", long = "output")]
+    output_path: Option<PathBuf>,
+
+    /// Load the data into a MemTable before executing the query
+    #[structopt(short = "m", long = "mem-table")]
+    mem_table: bool,
+}
+
+struct QueryResult {
+    elapsed: std::time::Duration,
+    row_count: usize,
+}
+
+impl RunOpt {
+    const SORT_TABLES: [&'static str; 1] = ["lineitem"];
+
+    /// Sort queries with different characteristics:
+    /// - Sort key with fixed length or variable length (VARCHAR)
+    /// - Sort key with different cardinality
+    /// - Different number of sort keys
+    /// - Different number of payload columns (thin: 1 additional column other
+    ///   than sort keys; wide: all columns except sort keys)
+    ///
+    /// DataSet is `lineitem` table in TPCH dataset (16 columns, 6M rows for
+    /// scale factor 1.0, cardinality is counted from SF1 dataset)
+    ///
+    /// Key Columns:
+    /// - Column `l_linenumber`, type: `INTEGER`, cardinality: 7
+    /// - Column `l_suppkey`, type: `BIGINT`, cardinality: 10k
+    /// - Column `l_orderkey`, type: `BIGINT`, cardinality: 1.5M
+    /// - Column `l_comment`, type: `VARCHAR`, cardinality: 4.5M (len is ~26 
chars)
+    ///
+    /// Payload Columns:
+    /// - Thin variant: `l_partkey` column with `BIGINT` type (1 column)
+    /// - Wide variant: all columns except for possible key columns (12 
columns)
+    const SORT_QUERIES: [&'static str; 10] = [
+        // Q1: 1 sort key (type: INTEGER, cardinality: 7) + 1 payload column
+        r#"
+        SELECT l_linenumber, l_partkey
+        FROM lineitem
+        ORDER BY l_linenumber
+        "#,
+        // Q2: 1 sort key (type: BIGINT, cardinality: 1.5M) + 1 payload column
+        r#"
+        SELECT l_orderkey, l_partkey
+        FROM lineitem
+        ORDER BY l_orderkey
+        "#,
+        // Q3: 1 sort key (type: VARCHAR, cardinality: 4.5M) + 1 payload column
+        r#"
+        SELECT l_comment, l_partkey
+        FROM lineitem
+        ORDER BY l_comment
+        "#,
+        // Q4: 2 sort keys {(BIGINT, 1.5M), (INTEGER, 7)} + 1 payload column
+        r#"
+        SELECT l_orderkey, l_linenumber, l_partkey
+        FROM lineitem
+        ORDER BY l_orderkey, l_linenumber
+        "#,
+        // Q5: 3 sort keys {(INTEGER, 7), (BIGINT, 10k), (BIGINT, 1.5M)} + no 
payload column
+        r#"
+        SELECT l_linenumber, l_suppkey, l_orderkey
+        FROM lineitem
+        ORDER BY l_linenumber, l_suppkey, l_orderkey
+        "#,
+        // Q6: 3 sort keys {(INTEGER, 7), (BIGINT, 10k), (BIGINT, 1.5M)} + 1 
payload column
+        r#"
+        SELECT l_linenumber, l_suppkey, l_orderkey, l_partkey
+        FROM lineitem
+        ORDER BY l_linenumber, l_suppkey, l_orderkey
+        "#,
+        // Q7: 3 sort keys {(INTEGER, 7), (BIGINT, 10k), (BIGINT, 1.5M)} + 12 
all other columns
+        r#"
+        SELECT l_linenumber, l_suppkey, l_orderkey, 
+               l_partkey, l_quantity, l_extendedprice, l_discount, l_tax,
+               l_returnflag, l_linestatus, l_shipdate, l_commitdate,
+               l_receiptdate, l_shipinstruct, l_shipmode
+        FROM lineitem
+        ORDER BY l_linenumber, l_suppkey, l_orderkey
+        "#,
+        // Q8: 4 sort keys {(BIGINT, 1.5M), (BIGINT, 10k), (INTEGER, 7), 
(VARCHAR, 4.5M)} + no payload column
+        r#"
+        SELECT l_orderkey, l_suppkey, l_linenumber, l_comment
+        FROM lineitem
+        ORDER BY l_orderkey, l_suppkey, l_linenumber, l_comment
+        "#,
+        // Q9: 4 sort keys {(BIGINT, 1.5M), (BIGINT, 10k), (INTEGER, 7), 
(VARCHAR, 4.5M)} + 1 payload column
+        r#"
+        SELECT l_orderkey, l_suppkey, l_linenumber, l_comment, l_partkey
+        FROM lineitem
+        ORDER BY l_orderkey, l_suppkey, l_linenumber, l_comment
+        "#,
+        // Q10: 4 sort keys {(BIGINT, 1.5M), (BIGINT, 10k), (INTEGER, 7), 
(VARCHAR, 4.5M)} + 12 all other columns
+        r#"
+        SELECT l_orderkey, l_suppkey, l_linenumber, l_comment,
+               l_partkey, l_quantity, l_extendedprice, l_discount, l_tax,
+               l_returnflag, l_linestatus, l_shipdate, l_commitdate,
+               l_receiptdate, l_shipinstruct, l_shipmode
+        FROM lineitem
+        ORDER BY l_orderkey, l_suppkey, l_linenumber, l_comment
+        "#,
+    ];
+
+    /// If query is specified from command line, run only that query.
+    /// Otherwise, run all queries.
+    pub async fn run(&self) -> Result<()> {
+        let mut benchmark_run = BenchmarkRun::new();
+
+        let query_range = match self.query {
+            Some(query_id) => query_id..=query_id,
+            None => 1..=Self::SORT_QUERIES.len(),
+        };
+
+        for query_id in query_range {
+            benchmark_run.start_new_case(&format!("{query_id}"));
+
+            let query_results = self.benchmark_query(query_id).await?;
+            for iter in query_results {
+                benchmark_run.write_iter(iter.elapsed, iter.row_count);
+            }
+        }
+
+        benchmark_run.maybe_write_json(self.output_path.as_ref())?;
+
+        Ok(())
+    }
+
+    /// Benchmark query `query_id` in `SORT_QUERIES`
+    async fn benchmark_query(&self, query_id: usize) -> 
Result<Vec<QueryResult>> {
+        let config = self.common.config();
+
+        let runtime_config = RuntimeConfig::new().build_arc()?;
+        let ctx = SessionContext::new_with_config_rt(config, runtime_config);
+
+        // register tables
+        self.register_tables(&ctx).await?;
+
+        let mut millis = vec![];
+        // run benchmark
+        let mut query_results = vec![];
+        for i in 0..self.iterations() {
+            let start = Instant::now();
+
+            let query_idx = query_id - 1; // 1-indexed -> 0-indexed
+            let sql = Self::SORT_QUERIES[query_idx];
+
+            let row_count = self.execute_query(&ctx, sql).await?;
+
+            let elapsed = start.elapsed(); //.as_secs_f64() * 1000.0;
+            let ms = elapsed.as_secs_f64() * 1000.0;
+            millis.push(ms);
+
+            println!(
+                "Q{query_id} iteration {i} took {ms:.1} ms and returned 
{row_count} rows"
+            );
+            query_results.push(QueryResult { elapsed, row_count });
+        }
+
+        let avg = millis.iter().sum::<f64>() / millis.len() as f64;
+        println!("Q{query_id} avg time: {avg:.2} ms");
+
+        Ok(query_results)
+    }
+
+    async fn register_tables(&self, ctx: &SessionContext) -> Result<()> {
+        for table in Self::SORT_TABLES {
+            let table_provider = { self.get_table(ctx, table).await? };
+
+            if self.mem_table {
+                println!("Loading table '{table}' into memory");
+                let start = Instant::now();
+                let memtable =
+                    MemTable::load(table_provider, Some(self.partitions()), 
&ctx.state())
+                        .await?;
+                println!(
+                    "Loaded table '{}' into memory in {} ms",
+                    table,
+                    start.elapsed().as_millis()
+                );
+                ctx.register_table(table, Arc::new(memtable))?;
+            } else {
+                ctx.register_table(table, table_provider)?;
+            }
+        }
+        Ok(())
+    }
+
+    async fn execute_query(&self, ctx: &SessionContext, sql: &str) -> 
Result<usize> {
+        let debug = self.common.debug;
+        let plan = ctx.sql(sql).await?;
+        let (state, plan) = plan.into_parts();
+
+        if debug {
+            println!("=== Logical plan ===\n{plan}\n");
+        }
+
+        let plan = state.optimize(&plan)?;
+        if debug {
+            println!("=== Optimized logical plan ===\n{plan}\n");
+        }
+        let physical_plan = state.create_physical_plan(&plan).await?;
+        if debug {
+            println!(
+                "=== Physical plan ===\n{}\n",
+                displayable(physical_plan.as_ref()).indent(true)
+            );
+        }
+
+        let mut row_count = 0;
+
+        let mut stream = execute_stream(physical_plan.clone(), 
state.task_ctx())?;
+        while let Some(batch) = stream.next().await {
+            row_count += batch.unwrap().num_rows();
+        }
+
+        if debug {
+            println!(
+                "=== Physical plan with metrics ===\n{}\n",
+                DisplayableExecutionPlan::with_metrics(physical_plan.as_ref())
+                    .indent(true)
+            );
+        }
+
+        Ok(row_count)
+    }
+
+    async fn get_table(
+        &self,
+        ctx: &SessionContext,
+        table: &str,
+    ) -> Result<Arc<dyn TableProvider>> {
+        let path = self.path.to_str().unwrap();
+
+        // Obtain a snapshot of the SessionState
+        let state = ctx.state();
+        let path = format!("{path}/{table}");
+        let format = Arc::new(
+            ParquetFormat::default()
+                .with_options(ctx.state().table_options().parquet.clone()),
+        );
+        let extension = DEFAULT_PARQUET_EXTENSION;
+
+        let options = ListingOptions::new(format)
+            .with_file_extension(extension)
+            .with_collect_stat(state.config().collect_statistics());
+
+        let table_path = ListingTableUrl::parse(path)?;
+        let config = 
ListingTableConfig::new(table_path).with_listing_options(options);
+        let config = config.infer_schema(&state).await?;
+
+        Ok(Arc::new(ListingTable::try_new(config)?))
+    }
+
+    fn iterations(&self) -> usize {
+        self.common.iterations
+    }
+
+    fn partitions(&self) -> usize {
+        self.common.partitions.unwrap_or(num_cpus::get())
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to