This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 11b7b5c215 Add ClickBench queries to DataFusion benchmark runner 
(#7060)
11b7b5c215 is described below

commit 11b7b5c215012231e5768fc5be3445c0254d0169
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Jul 27 06:30:17 2023 -0500

    Add ClickBench queries to DataFusion benchmark runner (#7060)
    
    * Add clickbench query runner to benchmarks, update docs
    
    * Fix numbering so it goes from 0 to 42
---
 benchmarks/README.md                      |  91 +++++++++++--------
 benchmarks/bench.sh                       |  12 ++-
 benchmarks/queries/clickbench/README.txt  |   1 +
 benchmarks/queries/clickbench/queries.sql |  43 +++++++++
 benchmarks/src/bin/dfbench.rs             |   4 +-
 benchmarks/src/bin/tpch.rs                |   5 +-
 benchmarks/src/clickbench.rs              | 141 ++++++++++++++++++++++++++++++
 benchmarks/src/lib.rs                     |  25 +-----
 benchmarks/src/options.rs                 |  53 +++++++++++
 benchmarks/src/tpch/convert.rs            |   2 +-
 benchmarks/src/tpch/run.rs                |  67 ++++++++------
 11 files changed, 354 insertions(+), 90 deletions(-)

diff --git a/benchmarks/README.md b/benchmarks/README.md
index cf8a20a823..b182311977 100644
--- a/benchmarks/README.md
+++ b/benchmarks/README.md
@@ -20,11 +20,14 @@
 # DataFusion Benchmarks
 
 This crate contains benchmarks based on popular public data sets and
-open source benchmark suites, making it easy to run more realistic
-benchmarks to help with performance and scalability testing of DataFusion.
+open source benchmark suites, to help with performance and scalability
+testing of DataFusion.
 
-# Benchmarks Against Other Engines
 
+## Other engines
+
+The benchmarks measure changes to DataFusion itself, rather than
+its performance against other engines. For competitive benchmarking,
 DataFusion is included in the benchmark setups for several popular
 benchmarks that compare performance with other engines. For example:
 
@@ -36,30 +39,35 @@ benchmarks that compare performance with other engines. For 
example:
 
 # Running the benchmarks
 
-## Running Benchmarks
+## `bench.sh`
 
-The easiest way to run benchmarks from DataFusion source checkouts is
-to use the [bench.sh](bench.sh) script. Usage instructions can be
-found with:
+The easiest way to run benchmarks is the [bench.sh](bench.sh)
+script. Usage instructions can be found with:
 
 ```shell
 # show usage
 ./bench.sh
 ```
 
-## Generating Data
+## Generating data
+
+You can create / download the data for these benchmarks using the 
[bench.sh](bench.sh) script:
 
-You can create data for all these benchmarks using the [bench.sh](bench.sh) 
script:
+Create / download all datasets
 
 ```shell
 ./bench.sh data
 ```
 
-Data is generated in the `data` subdirectory and will not be checked
-in because this directory has been added to the `.gitignore` file.
+Create / download a specific dataset (TPCH)
 
+```shell
+./bench.sh data tpch
+```
 
-## Example to compare peformance on main to a branch
+Data is placed in the `data` subdirectory.
+
+## Comparing performance of main and a branch
 
 ```shell
 git checkout main
@@ -143,27 +151,17 @@ Benchmark tpch_mem.json
 ```
 
 
-# Benchmark Descriptions:
-
-## `tpch` Benchmark derived from TPC-H
-
-These benchmarks are derived from the [TPC-H][1] benchmark. And we use this 
repo as the source of tpch-gen and answers:
-https://github.com/databricks/tpch-dbgen.git, based on 
[2.17.1](https://www.tpc.org/tpc_documents_current_versions/pdf/tpc-h_v2.17.1.pdf)
 version of TPC-H.
+### Running Benchmarks Manually
 
-
-### Running the DataFusion Benchmarks Manually
-
-The benchmark can then be run (assuming the data created from `dbgen` is in 
`./data`) with a command such as:
+Assuming data in the `data` directory, the `tpch` benchmark can be run with a 
command like this
 
 ```bash
-cargo run --release --bin tpch -- benchmark datafusion --iterations 3 --path 
./data --format tbl --query 1 --batch-size 4096
+cargo run --release --bin dfbench -- tpch --iterations 3 --path ./data 
--format tbl --query 1 --batch-size 4096
 ```
 
-If you omit `--query=<query_id>` argument, then all benchmarks will be run one 
by one (from query 1 to query 22).
+See the help for more details
 
-```bash
-cargo run --release --bin tpch -- benchmark datafusion --iterations 1 --path 
./data --format tbl --batch-size 4096
-```
+### Different features
 
 You can enable the features `simd` (to use SIMD instructions, `cargo nightly` 
is required.) and/or `mimalloc` or `snmalloc` (to use either the mimalloc or 
snmalloc allocator) as features by passing them in as `--features`:
 
@@ -171,12 +169,6 @@ You can enable the features `simd` (to use SIMD 
instructions, `cargo nightly` is
 cargo run --release --features "simd mimalloc" --bin tpch -- benchmark 
datafusion --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096
 ```
 
-If you want to disable collection of statistics (and thus cost based 
optimizers), you can pass `--disable-statistics` flag.
-
-```bash
-cargo run --release --bin tpch -- benchmark datafusion --iterations 3 --path 
/mnt/tpch-parquet --format parquet --query 17 --disable-statistics
-```
-
 The benchmark program also supports CSV and Parquet input file formats and a 
utility is provided to convert from `tbl`
 (generated by the `dbgen` utility) to CSV and Parquet.
 
@@ -188,9 +180,10 @@ Or if you want to verify and run all the queries in the 
benchmark, you can just
 
 ### Comparing results between runs
 
-Any `tpch` execution with `-o <dir>` argument will produce a summary file 
right under the `<dir>`
-directory. It is a JSON serialized form of all the runs that happened as well 
as the runtime metadata
-(number of cores, DataFusion version, etc.).
+Any `dfbench` execution with `-o <dir>` argument will produce a
+summary JSON in the specified directory. This file contains a
+serialized form of all the runs that happened and runtime
+metadata (number of cores, DataFusion version, etc.).
 
 ```shell
 $ git checkout main
@@ -253,6 +246,32 @@ Query 1 iteration 0 took 1956.1 ms
 Query 1 avg time: 1956.11 ms
 ```
 
+# Benchmark Descriptions
+
+## `dfbench`
+
+The `dfbench` program contains subcommands to run various benchmarks.
+
+Full help can be found in the relevant sub command. For example to get help 
for tpch,
+run `cargo run --release  --bin dfbench tpch --help`
+
+```shell
+cargo run --release --bin dfbench  --help
+...
+datafusion-benchmarks 27.0.0
+benchmark command
+
+USAGE:
+    dfbench <SUBCOMMAND>
+
+SUBCOMMANDS:
+    clickbench      Run the clickbench benchmark
+    help            Prints this message or the help of the given subcommand(s)
+    tpch            Run the tpch benchmark.
+    tpch-convert    Convert tpch .slt files to .parquet or .csv files
+
+```
+
 ## NYC Taxi Benchmark
 
 These benchmarks are based on the [New York Taxi and Limousine Commission][2] 
data set.
diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh
index f71094a425..ca58e49f60 100755
--- a/benchmarks/bench.sh
+++ b/benchmarks/bench.sh
@@ -35,7 +35,7 @@ BENCHMARK=all
 DATAFUSION_DIR=${DATAFUSION_DIR:-$SCRIPT_DIR/..}
 DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data}
 #CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"}
-CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --profile release-nonlto"}  # TEMP: 
for faster iterations
+CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --profile release-nonlto"}  # for 
faster iterations
 
 usage() {
     echo "
@@ -386,12 +386,18 @@ data_clickbench_partitioned() {
 
 # Runs the clickbench benchmark with a single large parquet file
 run_clickbench_1() {
-    echo "NOTICE: ClickBench (1 parquet file) is not yet supported"
+    RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json"
+    echo "RESULTS_FILE: ${RESULTS_FILE}"
+    echo "Running clickbench (1 file) benchmark..."
+    $CARGO_COMMAND --bin dfbench -- clickbench  --iterations 10 --path 
"${DATA_DIR}/hits.parquet" --queries-path 
"${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
 }
 
  # Runs the clickbench benchmark with a single large parquet file
 run_clickbench_partitioned() {
-    echo "NOTICE: ClickBench (1 parquet file) is not yet supported"
+    RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json"
+    echo "RESULTS_FILE: ${RESULTS_FILE}"
+    echo "Running clickbench (partitioned, 100 files) benchmark..."
+    $CARGO_COMMAND --bin dfbench -- clickbench  --iterations 10 --path 
"${DATA_DIR}/hits_partitioned" --queries-path 
"${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
 }
 
 compare_benchmarks() {
diff --git a/benchmarks/queries/clickbench/README.txt 
b/benchmarks/queries/clickbench/README.txt
new file mode 100644
index 0000000000..b46900956e
--- /dev/null
+++ b/benchmarks/queries/clickbench/README.txt
@@ -0,0 +1 @@
+Downloaded from 
https://github.com/ClickHouse/ClickBench/blob/main/datafusion/queries.sql
diff --git a/benchmarks/queries/clickbench/queries.sql 
b/benchmarks/queries/clickbench/queries.sql
new file mode 100644
index 0000000000..52e72e02e1
--- /dev/null
+++ b/benchmarks/queries/clickbench/queries.sql
@@ -0,0 +1,43 @@
+SELECT COUNT(*) FROM hits;
+SELECT COUNT(*) FROM hits WHERE "AdvEngineID" <> 0;
+SELECT SUM("AdvEngineID"), COUNT(*), AVG("ResolutionWidth") FROM hits;
+SELECT AVG("UserID") FROM hits;
+SELECT COUNT(DISTINCT "UserID") FROM hits;
+SELECT COUNT(DISTINCT "SearchPhrase") FROM hits;
+SELECT MIN("EventDate"::INT::DATE), MAX("EventDate"::INT::DATE) FROM hits;
+SELECT "AdvEngineID", COUNT(*) FROM hits WHERE "AdvEngineID" <> 0 GROUP BY 
"AdvEngineID" ORDER BY COUNT(*) DESC;
+SELECT "RegionID", COUNT(DISTINCT "UserID") AS u FROM hits GROUP BY "RegionID" 
ORDER BY u DESC LIMIT 10;
+SELECT "RegionID", SUM("AdvEngineID"), COUNT(*) AS c, AVG("ResolutionWidth"), 
COUNT(DISTINCT "UserID") FROM hits GROUP BY "RegionID" ORDER BY c DESC LIMIT 10;
+SELECT "MobilePhoneModel", COUNT(DISTINCT "UserID") AS u FROM hits WHERE 
"MobilePhoneModel" <> '' GROUP BY "MobilePhoneModel" ORDER BY u DESC LIMIT 10;
+SELECT "MobilePhone", "MobilePhoneModel", COUNT(DISTINCT "UserID") AS u FROM 
hits WHERE "MobilePhoneModel" <> '' GROUP BY "MobilePhone", "MobilePhoneModel" 
ORDER BY u DESC LIMIT 10;
+SELECT "SearchPhrase", COUNT(*) AS c FROM hits WHERE "SearchPhrase" <> '' 
GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10;
+SELECT "SearchPhrase", COUNT(DISTINCT "UserID") AS u FROM hits WHERE 
"SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY u DESC LIMIT 10;
+SELECT "SearchEngineID", "SearchPhrase", COUNT(*) AS c FROM hits WHERE 
"SearchPhrase" <> '' GROUP BY "SearchEngineID", "SearchPhrase" ORDER BY c DESC 
LIMIT 10;
+SELECT "UserID", COUNT(*) FROM hits GROUP BY "UserID" ORDER BY COUNT(*) DESC 
LIMIT 10;
+SELECT "UserID", "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", 
"SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10;
+SELECT "UserID", "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", 
"SearchPhrase" LIMIT 10;
+SELECT "UserID", extract(minute FROM to_timestamp_seconds("EventTime")) AS m, 
"SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", m, "SearchPhrase" ORDER 
BY COUNT(*) DESC LIMIT 10;
+SELECT "UserID" FROM hits WHERE "UserID" = 435090932899640449;
+SELECT COUNT(*) FROM hits WHERE "URL" LIKE '%google%';
+SELECT "SearchPhrase", MIN("URL"), COUNT(*) AS c FROM hits WHERE "URL" LIKE 
'%google%' AND "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC 
LIMIT 10;
+SELECT "SearchPhrase", MIN("URL"), MIN("Title"), COUNT(*) AS c, COUNT(DISTINCT 
"UserID") FROM hits WHERE "Title" LIKE '%Google%' AND "URL" NOT LIKE 
'%.google.%' AND "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC 
LIMIT 10;
+SELECT * FROM hits WHERE "URL" LIKE '%google%' ORDER BY 
to_timestamp_seconds("EventTime") LIMIT 10;
+SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY 
to_timestamp_seconds("EventTime") LIMIT 10;
+SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY 
"SearchPhrase" LIMIT 10;
+SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY 
to_timestamp_seconds("EventTime"), "SearchPhrase" LIMIT 10;
+SELECT "CounterID", AVG(length("URL")) AS l, COUNT(*) AS c FROM hits WHERE 
"URL" <> '' GROUP BY "CounterID" HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 
25;
+SELECT REGEXP_REPLACE("Referer", '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS 
k, AVG(length("Referer")) AS l, COUNT(*) AS c, MIN("Referer") FROM hits WHERE 
"Referer" <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;
+SELECT SUM("ResolutionWidth"), SUM("ResolutionWidth" + 1), 
SUM("ResolutionWidth" + 2), SUM("ResolutionWidth" + 3), SUM("ResolutionWidth" + 
4), SUM("ResolutionWidth" + 5), SUM("ResolutionWidth" + 6), 
SUM("ResolutionWidth" + 7), SUM("ResolutionWidth" + 8), SUM("ResolutionWidth" + 
9), SUM("ResolutionWidth" + 10), SUM("ResolutionWidth" + 11), 
SUM("ResolutionWidth" + 12), SUM("ResolutionWidth" + 13), SUM("ResolutionWidth" 
+ 14), SUM("ResolutionWidth" + 15), SUM("ResolutionWidth" + 16), SUM("R [...]
+SELECT "SearchEngineID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), 
AVG("ResolutionWidth") FROM hits WHERE "SearchPhrase" <> '' GROUP BY 
"SearchEngineID", "ClientIP" ORDER BY c DESC LIMIT 10;
+SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), 
AVG("ResolutionWidth") FROM hits WHERE "SearchPhrase" <> '' GROUP BY "WatchID", 
"ClientIP" ORDER BY c DESC LIMIT 10;
+SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), 
AVG("ResolutionWidth") FROM hits GROUP BY "WatchID", "ClientIP" ORDER BY c DESC 
LIMIT 10;
+SELECT "URL", COUNT(*) AS c FROM hits GROUP BY "URL" ORDER BY c DESC LIMIT 10;
+SELECT 1, "URL", COUNT(*) AS c FROM hits GROUP BY 1, "URL" ORDER BY c DESC 
LIMIT 10;
+SELECT "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3, COUNT(*) AS 
c FROM hits GROUP BY "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3 
ORDER BY c DESC LIMIT 10;
+SELECT "URL", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND 
"EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= 
'2013-07-31' AND "DontCountHits" = 0 AND "IsRefresh" = 0 AND "URL" <> '' GROUP 
BY "URL" ORDER BY PageViews DESC LIMIT 10;
+SELECT "Title", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND 
"EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= 
'2013-07-31' AND "DontCountHits" = 0 AND "IsRefresh" = 0 AND "Title" <> '' 
GROUP BY "Title" ORDER BY PageViews DESC LIMIT 10;
+SELECT "URL", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND 
"EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= 
'2013-07-31' AND "IsRefresh" = 0 AND "IsLink" <> 0 AND "IsDownload" = 0 GROUP 
BY "URL" ORDER BY PageViews DESC LIMIT 10 OFFSET 1000;
+SELECT "TraficSourceID", "SearchEngineID", "AdvEngineID", CASE WHEN 
("SearchEngineID" = 0 AND "AdvEngineID" = 0) THEN "Referer" ELSE '' END AS Src, 
"URL" AS Dst, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND 
"EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= 
'2013-07-31' AND "IsRefresh" = 0 GROUP BY "TraficSourceID", "SearchEngineID", 
"AdvEngineID", Src, Dst ORDER BY PageViews DESC LIMIT 10 OFFSET 1000;
+SELECT "URLHash", "EventDate"::INT::DATE, COUNT(*) AS PageViews FROM hits 
WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND 
"EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 AND "TraficSourceID" 
IN (-1, 6) AND "RefererHash" = 3594120000172545465 GROUP BY "URLHash", 
"EventDate"::INT::DATE ORDER BY PageViews DESC LIMIT 10 OFFSET 100;
+SELECT "WindowClientWidth", "WindowClientHeight", COUNT(*) AS PageViews FROM 
hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND 
"EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 AND "DontCountHits" 
= 0 AND "URLHash" = 2868770270353813622 GROUP BY "WindowClientWidth", 
"WindowClientHeight" ORDER BY PageViews DESC LIMIT 10 OFFSET 10000;
+SELECT DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) AS M, COUNT(*) 
AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= 
'2013-07-14' AND "EventDate"::INT::DATE <= '2013-07-15' AND "IsRefresh" = 0 AND 
"DontCountHits" = 0 GROUP BY DATE_TRUNC('minute', 
to_timestamp_seconds("EventTime")) ORDER BY DATE_TRUNC('minute', M) LIMIT 10 
OFFSET 1000;
diff --git a/benchmarks/src/bin/dfbench.rs b/benchmarks/src/bin/dfbench.rs
index f4ba8bc975..d5a17a2ab5 100644
--- a/benchmarks/src/bin/dfbench.rs
+++ b/benchmarks/src/bin/dfbench.rs
@@ -28,13 +28,14 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
 #[global_allocator]
 static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
-use datafusion_benchmarks::tpch;
+use datafusion_benchmarks::{clickbench, tpch};
 
 #[derive(Debug, StructOpt)]
 #[structopt(about = "benchmark command")]
 enum Options {
     Tpch(tpch::RunOpt),
     TpchConvert(tpch::ConvertOpt),
+    Clickbench(clickbench::RunOpt),
 }
 
 // Main benchmark runner entrypoint
@@ -45,5 +46,6 @@ pub async fn main() -> Result<()> {
     match Options::from_args() {
         Options::Tpch(opt) => opt.run().await,
         Options::TpchConvert(opt) => opt.run().await,
+        Options::Clickbench(opt) => opt.run().await,
     }
 }
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 757136d231..9548093570 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -43,9 +43,10 @@ enum TpchOpt {
     Convert(tpch::ConvertOpt),
 }
 
-/// 'tpch' entry point, with tortured command line arguments
+/// 'tpch' entry point, with tortured command line arguments.  Please
+/// use `dbbench` instead.
 ///
-/// This is kept to be backwards compatible with the benchmark names prior to
+/// Note: this is kept to be backwards compatible with the benchmark names 
prior to
 /// <https://github.com/apache/arrow-datafusion/issues/6994>
 #[tokio::main]
 async fn main() -> Result<()> {
diff --git a/benchmarks/src/clickbench.rs b/benchmarks/src/clickbench.rs
new file mode 100644
index 0000000000..c884af8094
--- /dev/null
+++ b/benchmarks/src/clickbench.rs
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{path::PathBuf, time::Instant};
+
+use datafusion::{
+    error::{DataFusionError, Result},
+    prelude::SessionContext,
+};
+use structopt::StructOpt;
+
+use crate::{BenchmarkRun, CommonOpt};
+
+/// Run the clickbench benchmark
+///
+/// The ClickBench[1] benchmarks are widely cited in the industry and
+/// focus on grouping / aggregation / filtering. This runner uses the
+/// scripts and queries from [2].
+///
+/// [1]: https://github.com/ClickHouse/ClickBench
+/// [2]: https://github.com/ClickHouse/ClickBench/tree/main/datafusion
+#[derive(Debug, StructOpt, Clone)]
+#[structopt(verbatim_doc_comment)]
+pub struct RunOpt {
+    /// Query number (between 0 and 42). If not specified, runs all queries
+    #[structopt(short, long)]
+    query: Option<usize>,
+
+    /// Common options
+    #[structopt(flatten)]
+    common: CommonOpt,
+
+    /// Path to hits.parquet (single file) or `hits_partitioned`
+    /// (partitioned, 100 files)
+    #[structopt(
+        parse(from_os_str),
+        short = "p",
+        long = "path",
+        default_value = "benchmarks/data/hits.parquet"
+    )]
+    path: PathBuf,
+
+    /// Path to queries.sql (single file)
+    #[structopt(
+        parse(from_os_str),
+        short = "r",
+        long = "queries-path",
+        default_value = "benchmarks/queries/clickbench/queries.sql"
+    )]
+    queries_path: PathBuf,
+
+    /// If present, write results json here
+    #[structopt(parse(from_os_str), short = "o", long = "output")]
+    output_path: Option<PathBuf>,
+}
+
+const CLICKBENCH_QUERY_START_ID: usize = 0;
+const CLICKBENCH_QUERY_END_ID: usize = 42;
+
+impl RunOpt {
+    pub async fn run(self) -> Result<()> {
+        println!("Running benchmarks with the following options: {self:?}");
+        let query_range = match self.query {
+            Some(query_id) => query_id..=query_id,
+            None => CLICKBENCH_QUERY_START_ID..=CLICKBENCH_QUERY_END_ID,
+        };
+
+        let config = self.common.config();
+        let ctx = SessionContext::with_config(config);
+        self.register_hits(&ctx).await?;
+
+        let iterations = self.common.iterations();
+        let mut benchmark_run = BenchmarkRun::new();
+        for query_id in query_range {
+            benchmark_run.start_new_case(&format!("Query {query_id}"));
+            let sql = self.get_query(query_id)?;
+            println!("Q{query_id}: {sql}");
+
+            for i in 0..iterations {
+                let start = Instant::now();
+                let results = ctx.sql(&sql).await?.collect().await?;
+                let elapsed = start.elapsed();
+                let ms = elapsed.as_secs_f64() * 1000.0;
+                let row_count: usize = results.iter().map(|b| 
b.num_rows()).sum();
+                println!(
+                    "Query {query_id} iteration {i} took {ms:.1} ms and 
returned {row_count} rows"
+                );
+                benchmark_run.write_iter(elapsed, row_count);
+            }
+        }
+        benchmark_run.maybe_write_json(self.output_path.as_ref())?;
+        Ok(())
+    }
+
+    /// Registrs the `hits.parquet` as a table named `hits`
+    async fn register_hits(&self, ctx: &SessionContext) -> Result<()> {
+        let options = Default::default();
+        let path = self.path.as_os_str().to_str().unwrap();
+        ctx.register_parquet("hits", path, options)
+            .await
+            .map_err(|e| {
+                DataFusionError::Context(
+                    format!("Registering 'hits' as {path}"),
+                    Box::new(e),
+                )
+            })
+    }
+
+    /// Returns the text of query `query_id`
+    fn get_query(&self, query_id: usize) -> Result<String> {
+        if query_id > CLICKBENCH_QUERY_END_ID {
+            return Err(DataFusionError::Execution(format!(
+                "Invalid query id {query_id}. Must be between 
{CLICKBENCH_QUERY_START_ID} and {CLICKBENCH_QUERY_END_ID}"
+            )));
+        }
+
+        let path = self.queries_path.as_path();
+
+        // ClickBench has all queries in a single file identified by line 
number
+        let all_queries = std::fs::read_to_string(path).map_err(|e| {
+            DataFusionError::Execution(format!("Could not open {path:?}: {e}"))
+        })?;
+        let all_queries: Vec<_> = all_queries.lines().collect();
+
+        Ok(all_queries.get(query_id).map(|s| s.to_string()).unwrap())
+    }
+}
diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs
index 9d5530d31f..8cab151115 100644
--- a/benchmarks/src/lib.rs
+++ b/benchmarks/src/lib.rs
@@ -16,27 +16,10 @@
 // under the License.
 
 //! DataFusion benchmark runner
-use datafusion::error::Result;
-use structopt::StructOpt;
-
-pub mod run;
+pub mod clickbench;
 pub mod tpch;
 
+mod options;
+mod run;
+pub use options::CommonOpt;
 pub use run::{BenchQuery, BenchmarkRun};
-
-#[derive(Debug, StructOpt)]
-#[structopt(about = "benchmark command")]
-enum Options {
-    Tpch(tpch::RunOpt),
-    TpchConvert(tpch::ConvertOpt),
-}
-
-// Main benchmark runner entrypoint
-pub async fn main() -> Result<()> {
-    env_logger::init();
-
-    match Options::from_args() {
-        Options::Tpch(opt) => opt.run().await,
-        Options::TpchConvert(opt) => opt.run().await,
-    }
-}
diff --git a/benchmarks/src/options.rs b/benchmarks/src/options.rs
new file mode 100644
index 0000000000..d285741e23
--- /dev/null
+++ b/benchmarks/src/options.rs
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::prelude::SessionConfig;
+use structopt::StructOpt;
+
+// Common benchmark options (don't use doc comments otherwise this doc
+// shows up in help files)
+#[derive(Debug, StructOpt, Clone)]
+pub struct CommonOpt {
+    /// Number of iterations of each test run
+    #[structopt(short = "i", long = "iterations", default_value = "3")]
+    pub iterations: usize,
+
+    /// Number of partitions to process in parallel
+    #[structopt(short = "n", long = "partitions", default_value = "2")]
+    pub partitions: usize,
+
+    /// Batch size when reading CSV or Parquet files
+    #[structopt(short = "s", long = "batch-size", default_value = "8192")]
+    pub batch_size: usize,
+}
+
+impl CommonOpt {
+    /// Return an appropriately configured `SessionConfig`
+    pub fn config(&self) -> SessionConfig {
+        SessionConfig::new()
+            .with_target_partitions(self.partitions)
+            .with_batch_size(self.batch_size)
+    }
+
+    pub fn iterations(&self) -> usize {
+        self.iterations
+    }
+
+    pub fn partitions(&self) -> usize {
+        self.partitions
+    }
+}
diff --git a/benchmarks/src/tpch/convert.rs b/benchmarks/src/tpch/convert.rs
index c65ced4c7b..f5fcbb0547 100644
--- a/benchmarks/src/tpch/convert.rs
+++ b/benchmarks/src/tpch/convert.rs
@@ -58,7 +58,7 @@ pub struct ConvertOpt {
 }
 
 impl ConvertOpt {
-    pub async fn run(&self) -> Result<()> {
+    pub async fn run(self) -> Result<()> {
         let compression = self.compression()?;
 
         let input_path = self.input_path.to_str().unwrap();
diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs
index 6aada30bc7..5b43f2b93f 100644
--- a/benchmarks/src/tpch/run.rs
+++ b/benchmarks/src/tpch/run.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use super::get_query_sql;
-use crate::BenchmarkRun;
+use crate::{BenchmarkRun, CommonOpt};
 use arrow::record_batch::RecordBatch;
 use arrow::util::pretty::{self, pretty_format_batches};
 use datafusion::datasource::file_format::csv::{CsvFormat, 
DEFAULT_CSV_EXTENSION};
@@ -42,8 +42,17 @@ use structopt::StructOpt;
 
 use super::{get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_TABLES};
 
-/// Run the tpch benchmark
+/// Run the tpch benchmark.
+///
+/// This benchmarks is derived from the [TPC-H][1] version
+/// [2.17.1]. The data and answers are generated using `tpch-gen` from
+/// [2].
+///
+/// [1]: http://www.tpc.org/tpch/
+/// [2]: https://github.com/databricks/tpch-dbgen.git,
+/// [2.17.1]: 
https://www.tpc.org/tpc_documents_current_versions/pdf/tpc-h_v2.17.1.pdf
 #[derive(Debug, StructOpt, Clone)]
+#[structopt(verbatim_doc_comment)]
 pub struct RunOpt {
     /// Query number. If not specified, runs all queries
     #[structopt(short, long)]
@@ -53,17 +62,9 @@ pub struct RunOpt {
     #[structopt(short, long)]
     debug: bool,
 
-    /// Number of iterations of each test run
-    #[structopt(short = "i", long = "iterations", default_value = "3")]
-    iterations: usize,
-
-    /// Number of partitions to process in parallel
-    #[structopt(short = "n", long = "partitions", default_value = "2")]
-    partitions: usize,
-
-    /// Batch size when reading CSV or Parquet files
-    #[structopt(short = "s", long = "batch-size", default_value = "8192")]
-    batch_size: usize,
+    /// Common options
+    #[structopt(flatten)]
+    common: CommonOpt,
 
     /// Path to data files
     #[structopt(parse(from_os_str), required = true, short = "p", long = 
"path")]
@@ -90,7 +91,7 @@ const TPCH_QUERY_START_ID: usize = 1;
 const TPCH_QUERY_END_ID: usize = 22;
 
 impl RunOpt {
-    pub async fn run(&self) -> Result<()> {
+    pub async fn run(self) -> Result<()> {
         println!("Running benchmarks with the following options: {self:?}");
         let query_range = match self.query {
             Some(query_id) => query_id..=query_id,
@@ -110,9 +111,9 @@ impl RunOpt {
     }
 
     async fn benchmark_query(&self, query_id: usize) -> 
Result<Vec<QueryResult>> {
-        let config = SessionConfig::new()
-            .with_target_partitions(self.partitions)
-            .with_batch_size(self.batch_size)
+        let config = self
+            .common
+            .config()
             .with_collect_statistics(!self.disable_statistics);
         let ctx = SessionContext::with_config(config);
 
@@ -122,7 +123,7 @@ impl RunOpt {
         let mut millis = vec![];
         // run benchmark
         let mut query_results = vec![];
-        for i in 0..self.iterations {
+        for i in 0..self.iterations() {
             let start = Instant::now();
 
             let sql = &get_query_sql(query_id)?;
@@ -169,7 +170,7 @@ impl RunOpt {
                 println!("Loading table '{table}' into memory");
                 let start = Instant::now();
                 let memtable =
-                    MemTable::load(table_provider, Some(self.partitions), 
&ctx.state())
+                    MemTable::load(table_provider, Some(self.partitions()), 
&ctx.state())
                         .await?;
                 println!(
                     "Loaded table '{}' into memory in {} ms",
@@ -231,7 +232,7 @@ impl RunOpt {
     ) -> Result<Arc<dyn TableProvider>> {
         let path = self.path.to_str().unwrap();
         let table_format = self.file_format.as_str();
-        let target_partitions = self.partitions;
+        let target_partitions = self.partitions();
 
         // Obtain a snapshot of the SessionState
         let state = ctx.state();
@@ -283,6 +284,14 @@ impl RunOpt {
 
         Ok(Arc::new(ListingTable::try_new(config)?))
     }
+
+    fn iterations(&self) -> usize {
+        self.common.iterations()
+    }
+
+    fn partitions(&self) -> usize {
+        self.common.partitions()
+    }
 }
 
 struct QueryResult {
@@ -318,12 +327,15 @@ mod tests {
     async fn round_trip_logical_plan(query: usize) -> Result<()> {
         let ctx = SessionContext::default();
         let path = get_tpch_data_path()?;
-        let opt = RunOpt {
-            query: Some(query),
-            debug: false,
+        let common = CommonOpt {
             iterations: 1,
             partitions: 2,
             batch_size: 8192,
+        };
+        let opt = RunOpt {
+            query: Some(query),
+            debug: false,
+            common,
             path: PathBuf::from(path.to_string()),
             file_format: "tbl".to_string(),
             mem_table: false,
@@ -347,12 +359,15 @@ mod tests {
     async fn round_trip_physical_plan(query: usize) -> Result<()> {
         let ctx = SessionContext::default();
         let path = get_tpch_data_path()?;
-        let opt = RunOpt {
-            query: Some(query),
-            debug: false,
+        let common = CommonOpt {
             iterations: 1,
             partitions: 2,
             batch_size: 8192,
+        };
+        let opt = RunOpt {
+            query: Some(query),
+            debug: false,
+            common,
             path: PathBuf::from(path.to_string()),
             file_format: "tbl".to_string(),
             mem_table: false,

Reply via email to