This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9445aa0ee5 Create `dfbench`, split up `tpch` benchmark runner into 
modules (#7054)
9445aa0ee5 is described below

commit 9445aa0ee52be9c08981d4a8ff7568f520926046
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Jul 24 06:30:25 2023 -0500

    Create `dfbench`, split up `tpch` benchmark runner into modules (#7054)
---
 benchmarks/src/bin/dfbench.rs           |  49 ++++
 benchmarks/src/bin/tpch.rs              | 503 +-------------------------------
 benchmarks/src/lib.rs                   | 147 +---------
 benchmarks/src/{lib.rs => run.rs}       |   2 -
 benchmarks/src/tpch/convert.rs          | 147 ++++++++++
 benchmarks/src/{tpch.rs => tpch/mod.rs} |  88 +-----
 benchmarks/src/tpch/run.rs              | 442 ++++++++++++++++++++++++++++
 7 files changed, 675 insertions(+), 703 deletions(-)

diff --git a/benchmarks/src/bin/dfbench.rs b/benchmarks/src/bin/dfbench.rs
new file mode 100644
index 0000000000..f4ba8bc975
--- /dev/null
+++ b/benchmarks/src/bin/dfbench.rs
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! DataFusion benchmark runner
+use datafusion::error::Result;
+
+use structopt::StructOpt;
+
+#[cfg(feature = "snmalloc")]
+#[global_allocator]
+static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
+
+#[cfg(feature = "mimalloc")]
+#[global_allocator]
+static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
+
+use datafusion_benchmarks::tpch;
+
+#[derive(Debug, StructOpt)]
+#[structopt(about = "benchmark command")]
+enum Options {
+    Tpch(tpch::RunOpt),
+    TpchConvert(tpch::ConvertOpt),
+}
+
+// Main benchmark runner entrypoint
+#[tokio::main]
+pub async fn main() -> Result<()> {
+    env_logger::init();
+
+    match Options::from_args() {
+        Options::Tpch(opt) => opt.run().await,
+        Options::TpchConvert(opt) => opt.run().await,
+    }
+}
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index b5f535b38d..757136d231 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -15,30 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Benchmark derived from TPC-H. This is not an official TPC-H benchmark.
-use log::info;
+//! tpch binary only entrypoint
 
-use arrow::util::pretty::pretty_format_batches;
-use datafusion::datasource::file_format::{csv::CsvFormat, FileFormat};
-use datafusion::datasource::{MemTable, TableProvider};
-use datafusion::error::{DataFusionError, Result};
-use datafusion::parquet::basic::Compression;
-use datafusion::physical_plan::display::DisplayableExecutionPlan;
-use datafusion::physical_plan::{collect, displayable};
-use datafusion::prelude::*;
-use datafusion::{
-    arrow::record_batch::RecordBatch, 
datasource::file_format::parquet::ParquetFormat,
-};
-use datafusion::{
-    arrow::util::pretty,
-    datasource::listing::{ListingOptions, ListingTable, ListingTableConfig},
-};
-use datafusion_benchmarks::{tpch::*, BenchmarkRun};
-use std::{iter::Iterator, path::PathBuf, sync::Arc, time::Instant};
-
-use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
-use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
-use datafusion::datasource::listing::ListingTableUrl;
+use datafusion::error::Result;
+use datafusion_benchmarks::tpch;
 use structopt::StructOpt;
 
 #[cfg(feature = "snmalloc")]
@@ -49,490 +29,31 @@ static ALLOC: snmalloc_rs::SnMalloc = 
snmalloc_rs::SnMalloc;
 #[global_allocator]
 static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
 
-#[derive(Debug, StructOpt, Clone)]
-struct DataFusionBenchmarkOpt {
-    /// Query number. If not specified, runs all queries
-    #[structopt(short, long)]
-    query: Option<usize>,
-
-    /// Activate debug mode to see query results
-    #[structopt(short, long)]
-    debug: bool,
-
-    /// Number of iterations of each test run
-    #[structopt(short = "i", long = "iterations", default_value = "3")]
-    iterations: usize,
-
-    /// Number of partitions to process in parallel
-    #[structopt(short = "n", long = "partitions", default_value = "2")]
-    partitions: usize,
-
-    /// Batch size when reading CSV or Parquet files
-    #[structopt(short = "s", long = "batch-size", default_value = "8192")]
-    batch_size: usize,
-
-    /// Path to data files
-    #[structopt(parse(from_os_str), required = true, short = "p", long = 
"path")]
-    path: PathBuf,
-
-    /// File format: `csv` or `parquet`
-    #[structopt(short = "f", long = "format", default_value = "csv")]
-    file_format: String,
-
-    /// Load the data into a MemTable before executing the query
-    #[structopt(short = "m", long = "mem-table")]
-    mem_table: bool,
-
-    /// Path to machine readable output file
-    #[structopt(parse(from_os_str), short = "o", long = "output")]
-    output_path: Option<PathBuf>,
-
-    /// Whether to disable collection of statistics (and cost based 
optimizations) or not.
-    #[structopt(short = "S", long = "disable-statistics")]
-    disable_statistics: bool,
-}
-
-#[derive(Debug, StructOpt)]
-struct ConvertOpt {
-    /// Path to csv files
-    #[structopt(parse(from_os_str), required = true, short = "i", long = 
"input")]
-    input_path: PathBuf,
-
-    /// Output path
-    #[structopt(parse(from_os_str), required = true, short = "o", long = 
"output")]
-    output_path: PathBuf,
-
-    /// Output file format: `csv` or `parquet`
-    #[structopt(short = "f", long = "format")]
-    file_format: String,
-
-    /// Compression to use when writing Parquet files
-    #[structopt(short = "c", long = "compression", default_value = "zstd")]
-    compression: String,
-
-    /// Number of partitions to produce
-    #[structopt(short = "n", long = "partitions", default_value = "1")]
-    partitions: usize,
-
-    /// Batch size when reading CSV or Parquet files
-    #[structopt(short = "s", long = "batch-size", default_value = "8192")]
-    batch_size: usize,
-}
-
 #[derive(Debug, StructOpt)]
 #[structopt(about = "benchmark command")]
 enum BenchmarkSubCommandOpt {
     #[structopt(name = "datafusion")]
-    DataFusionBenchmark(DataFusionBenchmarkOpt),
+    DataFusionBenchmark(tpch::RunOpt),
 }
 
 #[derive(Debug, StructOpt)]
 #[structopt(name = "TPC-H", about = "TPC-H Benchmarks.")]
 enum TpchOpt {
     Benchmark(BenchmarkSubCommandOpt),
-    Convert(ConvertOpt),
+    Convert(tpch::ConvertOpt),
 }
 
+/// 'tpch' entry point, with tortured command line arguments
+///
+/// This is kept to be backwards compatible with the benchmark names prior to
+/// <https://github.com/apache/arrow-datafusion/issues/6994>
 #[tokio::main]
 async fn main() -> Result<()> {
-    use BenchmarkSubCommandOpt::*;
-
     env_logger::init();
     match TpchOpt::from_args() {
-        TpchOpt::Benchmark(DataFusionBenchmark(opt)) => {
-            benchmark_datafusion(opt).await.map(|_| ())
-        }
-        TpchOpt::Convert(opt) => {
-            let compression = match opt.compression.as_str() {
-                "none" => Compression::UNCOMPRESSED,
-                "snappy" => Compression::SNAPPY,
-                "brotli" => Compression::BROTLI(Default::default()),
-                "gzip" => Compression::GZIP(Default::default()),
-                "lz4" => Compression::LZ4,
-                "lz0" => Compression::LZO,
-                "zstd" => Compression::ZSTD(Default::default()),
-                other => {
-                    return Err(DataFusionError::NotImplemented(format!(
-                        "Invalid compression format: {other}"
-                    )));
-                }
-            };
-            convert_tbl(
-                opt.input_path.to_str().unwrap(),
-                opt.output_path.to_str().unwrap(),
-                &opt.file_format,
-                opt.partitions,
-                opt.batch_size,
-                compression,
-            )
-            .await
+        TpchOpt::Benchmark(BenchmarkSubCommandOpt::DataFusionBenchmark(opt)) 
=> {
+            opt.run().await
         }
+        TpchOpt::Convert(opt) => opt.run().await,
     }
 }
-
-const TPCH_QUERY_START_ID: usize = 1;
-const TPCH_QUERY_END_ID: usize = 22;
-
-async fn benchmark_datafusion(
-    opt: DataFusionBenchmarkOpt,
-) -> Result<Vec<Vec<RecordBatch>>> {
-    println!("Running benchmarks with the following options: {opt:?}");
-    let query_range = match opt.query {
-        Some(query_id) => query_id..=query_id,
-        None => TPCH_QUERY_START_ID..=TPCH_QUERY_END_ID,
-    };
-
-    let mut benchmark_run = BenchmarkRun::new();
-    let mut results = vec![];
-    for query_id in query_range {
-        benchmark_run.start_new_case(&format!("Query {query_id}"));
-        let (query_run, result) = benchmark_query(&opt, query_id).await?;
-        results.push(result);
-        for iter in query_run {
-            benchmark_run.write_iter(iter.elapsed, iter.row_count);
-        }
-    }
-    benchmark_run.maybe_write_json(opt.output_path.as_ref())?;
-    Ok(results)
-}
-
-async fn benchmark_query(
-    opt: &DataFusionBenchmarkOpt,
-    query_id: usize,
-) -> Result<(Vec<QueryResult>, Vec<RecordBatch>)> {
-    let mut query_results = vec![];
-    let config = SessionConfig::new()
-        .with_target_partitions(opt.partitions)
-        .with_batch_size(opt.batch_size)
-        .with_collect_statistics(!opt.disable_statistics);
-    let ctx = SessionContext::with_config(config);
-
-    // register tables
-    register_tables(opt, &ctx).await?;
-
-    let mut millis = vec![];
-    // run benchmark
-    let mut result: Vec<RecordBatch> = Vec::with_capacity(1);
-    for i in 0..opt.iterations {
-        let start = Instant::now();
-
-        let sql = &get_query_sql(query_id)?;
-
-        // query 15 is special, with 3 statements. the second statement is the 
one from which we
-        // want to capture the results
-        if query_id == 15 {
-            for (n, query) in sql.iter().enumerate() {
-                if n == 1 {
-                    result = execute_query(&ctx, query, opt.debug).await?;
-                } else {
-                    execute_query(&ctx, query, opt.debug).await?;
-                }
-            }
-        } else {
-            for query in sql {
-                result = execute_query(&ctx, query, opt.debug).await?;
-            }
-        }
-
-        let elapsed = start.elapsed(); //.as_secs_f64() * 1000.0;
-        let ms = elapsed.as_secs_f64() * 1000.0;
-        millis.push(ms);
-        info!("output:\n\n{}\n\n", pretty_format_batches(&result)?);
-        let row_count = result.iter().map(|b| b.num_rows()).sum();
-        println!(
-            "Query {query_id} iteration {i} took {ms:.1} ms and returned 
{row_count} rows"
-        );
-        query_results.push(QueryResult { elapsed, row_count });
-    }
-
-    let avg = millis.iter().sum::<f64>() / millis.len() as f64;
-    println!("Query {query_id} avg time: {avg:.2} ms");
-
-    Ok((query_results, result))
-}
-
-async fn register_tables(
-    opt: &DataFusionBenchmarkOpt,
-    ctx: &SessionContext,
-) -> Result<()> {
-    for table in TPCH_TABLES {
-        let table_provider = {
-            get_table(
-                ctx,
-                opt.path.to_str().unwrap(),
-                table,
-                opt.file_format.as_str(),
-                opt.partitions,
-            )
-            .await?
-        };
-
-        if opt.mem_table {
-            println!("Loading table '{table}' into memory");
-            let start = Instant::now();
-            let memtable =
-                MemTable::load(table_provider, Some(opt.partitions), 
&ctx.state())
-                    .await?;
-            println!(
-                "Loaded table '{}' into memory in {} ms",
-                table,
-                start.elapsed().as_millis()
-            );
-            ctx.register_table(*table, Arc::new(memtable))?;
-        } else {
-            ctx.register_table(*table, table_provider)?;
-        }
-    }
-    Ok(())
-}
-
-async fn execute_query(
-    ctx: &SessionContext,
-    sql: &str,
-    debug: bool,
-) -> Result<Vec<RecordBatch>> {
-    let plan = ctx.sql(sql).await?;
-    let (state, plan) = plan.into_parts();
-
-    if debug {
-        println!("=== Logical plan ===\n{plan:?}\n");
-    }
-
-    let plan = state.optimize(&plan)?;
-    if debug {
-        println!("=== Optimized logical plan ===\n{plan:?}\n");
-    }
-    let physical_plan = state.create_physical_plan(&plan).await?;
-    if debug {
-        println!(
-            "=== Physical plan ===\n{}\n",
-            displayable(physical_plan.as_ref()).indent(true)
-        );
-    }
-    let result = collect(physical_plan.clone(), state.task_ctx()).await?;
-    if debug {
-        println!(
-            "=== Physical plan with metrics ===\n{}\n",
-            
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref()).indent(true)
-        );
-        if !result.is_empty() {
-            // do not call print_batches if there are no batches as the result 
is confusing
-            // and makes it look like there is a batch with no columns
-            pretty::print_batches(&result)?;
-        }
-    }
-    Ok(result)
-}
-
-async fn get_table(
-    ctx: &SessionContext,
-    path: &str,
-    table: &str,
-    table_format: &str,
-    target_partitions: usize,
-) -> Result<Arc<dyn TableProvider>> {
-    // Obtain a snapshot of the SessionState
-    let state = ctx.state();
-    let (format, path, extension): (Arc<dyn FileFormat>, String, &'static str) 
=
-        match table_format {
-            // dbgen creates .tbl ('|' delimited) files without header
-            "tbl" => {
-                let path = format!("{path}/{table}.tbl");
-
-                let format = CsvFormat::default()
-                    .with_delimiter(b'|')
-                    .with_has_header(false);
-
-                (Arc::new(format), path, ".tbl")
-            }
-            "csv" => {
-                let path = format!("{path}/{table}");
-                let format = CsvFormat::default()
-                    .with_delimiter(b',')
-                    .with_has_header(true);
-
-                (Arc::new(format), path, DEFAULT_CSV_EXTENSION)
-            }
-            "parquet" => {
-                let path = format!("{path}/{table}");
-                let format = 
ParquetFormat::default().with_enable_pruning(Some(true));
-
-                (Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
-            }
-            other => {
-                unimplemented!("Invalid file format '{}'", other);
-            }
-        };
-
-    let options = ListingOptions::new(format)
-        .with_file_extension(extension)
-        .with_target_partitions(target_partitions)
-        .with_collect_stat(state.config().collect_statistics());
-
-    let table_path = ListingTableUrl::parse(path)?;
-    let config = 
ListingTableConfig::new(table_path).with_listing_options(options);
-
-    let config = match table_format {
-        "parquet" => config.infer_schema(&state).await?,
-        "tbl" => 
config.with_schema(Arc::new(get_tbl_tpch_table_schema(table))),
-        "csv" => config.with_schema(Arc::new(get_tpch_table_schema(table))),
-        _ => unreachable!(),
-    };
-
-    Ok(Arc::new(ListingTable::try_new(config)?))
-}
-
-struct QueryResult {
-    elapsed: std::time::Duration,
-    row_count: usize,
-}
-
-#[cfg(test)]
-#[cfg(feature = "ci")]
-/// CI checks
-mod tests {
-    use std::path::Path;
-
-    use super::*;
-    use datafusion_proto::bytes::{
-        logical_plan_from_bytes, logical_plan_to_bytes, 
physical_plan_from_bytes,
-        physical_plan_to_bytes,
-    };
-
-    fn get_tpch_data_path() -> Result<String> {
-        let path =
-            std::env::var("TPCH_DATA").unwrap_or_else(|_| 
"benchmarks/data".to_string());
-        if !Path::new(&path).exists() {
-            return Err(DataFusionError::Execution(format!(
-                "Benchmark data not found (set TPCH_DATA env var to override): 
{}",
-                path
-            )));
-        }
-        Ok(path)
-    }
-
-    async fn round_trip_logical_plan(query: usize) -> Result<()> {
-        let ctx = SessionContext::default();
-        let path = get_tpch_data_path()?;
-        let opt = DataFusionBenchmarkOpt {
-            query: Some(query),
-            debug: false,
-            iterations: 1,
-            partitions: 2,
-            batch_size: 8192,
-            path: PathBuf::from(path.to_string()),
-            file_format: "tbl".to_string(),
-            mem_table: false,
-            output_path: None,
-            disable_statistics: false,
-        };
-        register_tables(&opt, &ctx).await?;
-        let queries = get_query_sql(query)?;
-        for query in queries {
-            let plan = ctx.sql(&query).await?;
-            let plan = plan.into_optimized_plan()?;
-            let bytes = logical_plan_to_bytes(&plan)?;
-            let plan2 = logical_plan_from_bytes(&bytes, &ctx)?;
-            let plan_formatted = format!("{}", plan.display_indent());
-            let plan2_formatted = format!("{}", plan2.display_indent());
-            assert_eq!(plan_formatted, plan2_formatted);
-        }
-        Ok(())
-    }
-
-    async fn round_trip_physical_plan(query: usize) -> Result<()> {
-        let ctx = SessionContext::default();
-        let path = get_tpch_data_path()?;
-        let opt = DataFusionBenchmarkOpt {
-            query: Some(query),
-            debug: false,
-            iterations: 1,
-            partitions: 2,
-            batch_size: 8192,
-            path: PathBuf::from(path.to_string()),
-            file_format: "tbl".to_string(),
-            mem_table: false,
-            output_path: None,
-            disable_statistics: false,
-        };
-        register_tables(&opt, &ctx).await?;
-        let queries = get_query_sql(query)?;
-        for query in queries {
-            let plan = ctx.sql(&query).await?;
-            let plan = plan.create_physical_plan().await?;
-            let bytes = physical_plan_to_bytes(plan.clone())?;
-            let plan2 = physical_plan_from_bytes(&bytes, &ctx)?;
-            let plan_formatted = format!("{}", 
displayable(plan.as_ref()).indent(false));
-            let plan2_formatted =
-                format!("{}", displayable(plan2.as_ref()).indent(false));
-            assert_eq!(plan_formatted, plan2_formatted);
-        }
-        Ok(())
-    }
-
-    macro_rules! test_round_trip_logical {
-        ($tn:ident, $query:expr) => {
-            #[tokio::test]
-            async fn $tn() -> Result<()> {
-                round_trip_logical_plan($query).await
-            }
-        };
-    }
-
-    macro_rules! test_round_trip_physical {
-        ($tn:ident, $query:expr) => {
-            #[tokio::test]
-            async fn $tn() -> Result<()> {
-                round_trip_physical_plan($query).await
-            }
-        };
-    }
-
-    // logical plan tests
-    test_round_trip_logical!(round_trip_logical_plan_q1, 1);
-    test_round_trip_logical!(round_trip_logical_plan_q2, 2);
-    test_round_trip_logical!(round_trip_logical_plan_q3, 3);
-    test_round_trip_logical!(round_trip_logical_plan_q4, 4);
-    test_round_trip_logical!(round_trip_logical_plan_q5, 5);
-    test_round_trip_logical!(round_trip_logical_plan_q6, 6);
-    test_round_trip_logical!(round_trip_logical_plan_q7, 7);
-    test_round_trip_logical!(round_trip_logical_plan_q8, 8);
-    test_round_trip_logical!(round_trip_logical_plan_q9, 9);
-    test_round_trip_logical!(round_trip_logical_plan_q10, 10);
-    test_round_trip_logical!(round_trip_logical_plan_q11, 11);
-    test_round_trip_logical!(round_trip_logical_plan_q12, 12);
-    test_round_trip_logical!(round_trip_logical_plan_q13, 13);
-    test_round_trip_logical!(round_trip_logical_plan_q14, 14);
-    test_round_trip_logical!(round_trip_logical_plan_q15, 15);
-    test_round_trip_logical!(round_trip_logical_plan_q16, 16);
-    test_round_trip_logical!(round_trip_logical_plan_q17, 17);
-    test_round_trip_logical!(round_trip_logical_plan_q18, 18);
-    test_round_trip_logical!(round_trip_logical_plan_q19, 19);
-    test_round_trip_logical!(round_trip_logical_plan_q20, 20);
-    test_round_trip_logical!(round_trip_logical_plan_q21, 21);
-    test_round_trip_logical!(round_trip_logical_plan_q22, 22);
-
-    // physical plan tests
-    test_round_trip_physical!(round_trip_physical_plan_q1, 1);
-    test_round_trip_physical!(round_trip_physical_plan_q2, 2);
-    test_round_trip_physical!(round_trip_physical_plan_q3, 3);
-    test_round_trip_physical!(round_trip_physical_plan_q4, 4);
-    test_round_trip_physical!(round_trip_physical_plan_q5, 5);
-    test_round_trip_physical!(round_trip_physical_plan_q6, 6);
-    test_round_trip_physical!(round_trip_physical_plan_q7, 7);
-    test_round_trip_physical!(round_trip_physical_plan_q8, 8);
-    test_round_trip_physical!(round_trip_physical_plan_q9, 9);
-    test_round_trip_physical!(round_trip_physical_plan_q10, 10);
-    test_round_trip_physical!(round_trip_physical_plan_q11, 11);
-    test_round_trip_physical!(round_trip_physical_plan_q12, 12);
-    test_round_trip_physical!(round_trip_physical_plan_q13, 13);
-    test_round_trip_physical!(round_trip_physical_plan_q14, 14);
-    test_round_trip_physical!(round_trip_physical_plan_q15, 15);
-    test_round_trip_physical!(round_trip_physical_plan_q16, 16);
-    test_round_trip_physical!(round_trip_physical_plan_q17, 17);
-    test_round_trip_physical!(round_trip_physical_plan_q18, 18);
-    test_round_trip_physical!(round_trip_physical_plan_q19, 19);
-    test_round_trip_physical!(round_trip_physical_plan_q20, 20);
-    test_round_trip_physical!(round_trip_physical_plan_q21, 21);
-    test_round_trip_physical!(round_trip_physical_plan_q22, 22);
-}
diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs
index c2f4e876ce..9d5530d31f 100644
--- a/benchmarks/src/lib.rs
+++ b/benchmarks/src/lib.rs
@@ -15,143 +15,28 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use datafusion::{error::Result, DATAFUSION_VERSION};
-use serde::{Serialize, Serializer};
-use serde_json::Value;
-use std::{
-    collections::HashMap,
-    path::Path,
-    time::{Duration, SystemTime},
-};
+//! DataFusion benchmark runner
+use datafusion::error::Result;
+use structopt::StructOpt;
 
+pub mod run;
 pub mod tpch;
 
-fn serialize_start_time<S>(start_time: &SystemTime, ser: S) -> Result<S::Ok, 
S::Error>
-where
-    S: Serializer,
-{
-    ser.serialize_u64(
-        start_time
-            .duration_since(SystemTime::UNIX_EPOCH)
-            .expect("current time is later than UNIX_EPOCH")
-            .as_secs(),
-    )
-}
-fn serialize_elapsed<S>(elapsed: &Duration, ser: S) -> Result<S::Ok, S::Error>
-where
-    S: Serializer,
-{
-    let ms = elapsed.as_secs_f64() * 1000.0;
-    ser.serialize_f64(ms)
-}
-#[derive(Debug, Serialize)]
-pub struct RunContext {
-    /// Benchmark crate version
-    pub benchmark_version: String,
-    /// DataFusion crate version
-    pub datafusion_version: String,
-    /// Number of CPU cores
-    pub num_cpus: usize,
-    /// Start time
-    #[serde(serialize_with = "serialize_start_time")]
-    pub start_time: SystemTime,
-    /// CLI arguments
-    pub arguments: Vec<String>,
-}
-
-impl Default for RunContext {
-    fn default() -> Self {
-        Self::new()
-    }
-}
-
-impl RunContext {
-    pub fn new() -> Self {
-        Self {
-            benchmark_version: env!("CARGO_PKG_VERSION").to_owned(),
-            datafusion_version: DATAFUSION_VERSION.to_owned(),
-            num_cpus: num_cpus::get(),
-            start_time: SystemTime::now(),
-            arguments: std::env::args().skip(1).collect::<Vec<String>>(),
-        }
-    }
-}
-
-/// A single iteration of a benchmark query
-#[derive(Debug, Serialize)]
-struct QueryIter {
-    #[serde(serialize_with = "serialize_elapsed")]
-    elapsed: Duration,
-    row_count: usize,
-}
-/// A single benchmark case
-#[derive(Debug, Serialize)]
-pub struct BenchQuery {
-    query: String,
-    iterations: Vec<QueryIter>,
-    #[serde(serialize_with = "serialize_start_time")]
-    start_time: SystemTime,
-}
+pub use run::{BenchQuery, BenchmarkRun};
 
-/// collects benchmark run data and then serializes it at the end
-pub struct BenchmarkRun {
-    context: RunContext,
-    queries: Vec<BenchQuery>,
-    current_case: Option<usize>,
-}
-
-impl Default for BenchmarkRun {
-    fn default() -> Self {
-        Self::new()
-    }
+#[derive(Debug, StructOpt)]
+#[structopt(about = "benchmark command")]
+enum Options {
+    Tpch(tpch::RunOpt),
+    TpchConvert(tpch::ConvertOpt),
 }
 
-impl BenchmarkRun {
-    // create new
-    pub fn new() -> Self {
-        Self {
-            context: RunContext::new(),
-            queries: vec![],
-            current_case: None,
-        }
-    }
-    /// begin a new case. iterations added after this will be included in the 
new case
-    pub fn start_new_case(&mut self, id: &str) {
-        self.queries.push(BenchQuery {
-            query: id.to_owned(),
-            iterations: vec![],
-            start_time: SystemTime::now(),
-        });
-        if let Some(c) = self.current_case.as_mut() {
-            *c += 1;
-        } else {
-            self.current_case = Some(0);
-        }
-    }
-    /// Write a new iteration to the current case
-    pub fn write_iter(&mut self, elapsed: Duration, row_count: usize) {
-        if let Some(idx) = self.current_case {
-            self.queries[idx]
-                .iterations
-                .push(QueryIter { elapsed, row_count })
-        } else {
-            panic!("no cases existed yet");
-        }
-    }
-
-    /// Stringify data into formatted json
-    pub fn to_json(&self) -> String {
-        let mut output = HashMap::<&str, Value>::new();
-        output.insert("context", serde_json::to_value(&self.context).unwrap());
-        output.insert("queries", serde_json::to_value(&self.queries).unwrap());
-        serde_json::to_string_pretty(&output).unwrap()
-    }
+// Main benchmark runner entrypoint
+pub async fn main() -> Result<()> {
+    env_logger::init();
 
-    /// Write data as json into output path if it exists.
-    pub fn maybe_write_json(&self, maybe_path: Option<impl AsRef<Path>>) -> 
Result<()> {
-        if let Some(path) = maybe_path {
-            std::fs::write(path, self.to_json())?;
-        };
-        Ok(())
+    match Options::from_args() {
+        Options::Tpch(opt) => opt.run().await,
+        Options::TpchConvert(opt) => opt.run().await,
     }
 }
diff --git a/benchmarks/src/lib.rs b/benchmarks/src/run.rs
similarity index 99%
copy from benchmarks/src/lib.rs
copy to benchmarks/src/run.rs
index c2f4e876ce..5ee6691576 100644
--- a/benchmarks/src/lib.rs
+++ b/benchmarks/src/run.rs
@@ -24,8 +24,6 @@ use std::{
     time::{Duration, SystemTime},
 };
 
-pub mod tpch;
-
 fn serialize_start_time<S>(start_time: &SystemTime, ser: S) -> Result<S::Ok, 
S::Error>
 where
     S: Serializer,
diff --git a/benchmarks/src/tpch/convert.rs b/benchmarks/src/tpch/convert.rs
new file mode 100644
index 0000000000..e076097ad7
--- /dev/null
+++ b/benchmarks/src/tpch/convert.rs
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::path::{Path, PathBuf};
+use std::time::Instant;
+
+use datafusion::error::DataFusionError;
+use datafusion::error::Result;
+use datafusion::prelude::*;
+use parquet::basic::Compression;
+use parquet::file::properties::WriterProperties;
+use structopt::StructOpt;
+
+use super::get_tbl_tpch_table_schema;
+use super::TPCH_TABLES;
+
+/// Convert tpch .slt files to .parquet or .csv files
+#[derive(Debug, StructOpt)]
+pub struct ConvertOpt {
+    /// Path to csv files
+    #[structopt(parse(from_os_str), required = true, short = "i", long = 
"input")]
+    input_path: PathBuf,
+
+    /// Output path
+    #[structopt(parse(from_os_str), required = true, short = "o", long = 
"output")]
+    output_path: PathBuf,
+
+    /// Output file format: `csv` or `parquet`
+    #[structopt(short = "f", long = "format")]
+    file_format: String,
+
+    /// Compression to use when writing Parquet files
+    #[structopt(short = "c", long = "compression", default_value = "zstd")]
+    compression: String,
+
+    /// Number of partitions to produce
+    #[structopt(short = "n", long = "partitions", default_value = "1")]
+    partitions: usize,
+
+    /// Batch size when reading CSV or Parquet files
+    #[structopt(short = "s", long = "batch-size", default_value = "8192")]
+    batch_size: usize,
+}
+
+impl ConvertOpt {
+    pub async fn run(&self) -> Result<()> {
+        let compression = self.compression()?;
+
+        let input_path = self.input_path.to_str().unwrap();
+        let output_path = self.output_path.to_str().unwrap();
+
+        let output_root_path = Path::new(output_path);
+        for table in TPCH_TABLES {
+            let start = Instant::now();
+            let schema = get_tbl_tpch_table_schema(table);
+
+            let input_path = format!("{input_path}/{table}.tbl");
+            let options = CsvReadOptions::new()
+                .schema(&schema)
+                .has_header(false)
+                .delimiter(b'|')
+                .file_extension(".tbl");
+
+            let config = SessionConfig::new().with_batch_size(self.batch_size);
+            let ctx = SessionContext::with_config(config);
+
+            // build plan to read the TBL file
+            let mut csv = ctx.read_csv(&input_path, options).await?;
+
+            // Select all apart from the padding column
+            let selection = csv
+                .schema()
+                .fields()
+                .iter()
+                .take(schema.fields.len() - 1)
+                .map(|d| Expr::Column(d.qualified_column()))
+                .collect();
+
+            csv = csv.select(selection)?;
+            // optionally, repartition the file
+            let partitions = self.partitions;
+            if partitions > 1 {
+                csv = 
csv.repartition(Partitioning::RoundRobinBatch(partitions))?
+            }
+
+            // create the physical plan
+            let csv = csv.create_physical_plan().await?;
+
+            let output_path = output_root_path.join(table);
+            let output_path = output_path.to_str().unwrap().to_owned();
+
+            println!(
+                "Converting '{}' to {} files in directory '{}'",
+                &input_path, self.file_format, &output_path
+            );
+            match self.file_format.as_str() {
+                "csv" => ctx.write_csv(csv, output_path).await?,
+                "parquet" => {
+                    let props = WriterProperties::builder()
+                        .set_compression(compression)
+                        .build();
+                    ctx.write_parquet(csv, output_path, Some(props)).await?
+                }
+                other => {
+                    return Err(DataFusionError::NotImplemented(format!(
+                        "Invalid output format: {other}"
+                    )));
+                }
+            }
+            println!("Conversion completed in {} ms", 
start.elapsed().as_millis());
+        }
+
+        Ok(())
+    }
+
+    /// return the compression method to use when writing parquet
+    fn compression(&self) -> Result<Compression> {
+        Ok(match self.compression.as_str() {
+            "none" => Compression::UNCOMPRESSED,
+            "snappy" => Compression::SNAPPY,
+            "brotli" => Compression::BROTLI(Default::default()),
+            "gzip" => Compression::GZIP(Default::default()),
+            "lz4" => Compression::LZ4,
+            "lz0" => Compression::LZO,
+            "zstd" => Compression::ZSTD(Default::default()),
+            other => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Invalid compression format: {other}"
+                )));
+            }
+        })
+    }
+}
diff --git a/benchmarks/src/tpch.rs b/benchmarks/src/tpch/mod.rs
similarity index 74%
rename from benchmarks/src/tpch.rs
rename to benchmarks/src/tpch/mod.rs
index 58b9c3637c..1b6350278d 100644
--- a/benchmarks/src/tpch.rs
+++ b/benchmarks/src/tpch/mod.rs
@@ -15,18 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow::datatypes::SchemaBuilder;
-use std::fs;
-use std::path::Path;
-use std::time::Instant;
+//! Benchmark derived from TPC-H. This is not an official TPC-H benchmark.
 
-use datafusion::prelude::*;
+use arrow::datatypes::SchemaBuilder;
 use datafusion::{
     arrow::datatypes::{DataType, Field, Schema},
     error::{DataFusionError, Result},
 };
-use parquet::basic::Compression;
-use parquet::file::properties::WriterProperties;
+use std::fs;
+
+mod run;
+pub use run::RunOpt;
+
+mod convert;
+pub use convert::ConvertOpt;
 
 pub const TPCH_TABLES: &[&str] = &[
     "part", "supplier", "partsupp", "customer", "orders", "lineitem", 
"nation", "region",
@@ -166,78 +168,6 @@ pub fn get_query_sql(query: usize) -> Result<Vec<String>> {
     }
 }
 
-/// Conver tbl (csv) file to parquet
-pub async fn convert_tbl(
-    input_path: &str,
-    output_path: &str,
-    file_format: &str,
-    partitions: usize,
-    batch_size: usize,
-    compression: Compression,
-) -> Result<()> {
-    let output_root_path = Path::new(output_path);
-    for table in TPCH_TABLES {
-        let start = Instant::now();
-        let schema = get_tbl_tpch_table_schema(table);
-
-        let input_path = format!("{input_path}/{table}.tbl");
-        let options = CsvReadOptions::new()
-            .schema(&schema)
-            .has_header(false)
-            .delimiter(b'|')
-            .file_extension(".tbl");
-
-        let config = SessionConfig::new().with_batch_size(batch_size);
-        let ctx = SessionContext::with_config(config);
-
-        // build plan to read the TBL file
-        let mut csv = ctx.read_csv(&input_path, options).await?;
-
-        // Select all apart from the padding column
-        let selection = csv
-            .schema()
-            .fields()
-            .iter()
-            .take(schema.fields.len() - 1)
-            .map(|d| Expr::Column(d.qualified_column()))
-            .collect();
-
-        csv = csv.select(selection)?;
-        // optionally, repartition the file
-        if partitions > 1 {
-            csv = csv.repartition(Partitioning::RoundRobinBatch(partitions))?
-        }
-
-        // create the physical plan
-        let csv = csv.create_physical_plan().await?;
-
-        let output_path = output_root_path.join(table);
-        let output_path = output_path.to_str().unwrap().to_owned();
-
-        println!(
-            "Converting '{}' to {} files in directory '{}'",
-            &input_path, &file_format, &output_path
-        );
-        match file_format {
-            "csv" => ctx.write_csv(csv, output_path).await?,
-            "parquet" => {
-                let props = WriterProperties::builder()
-                    .set_compression(compression)
-                    .build();
-                ctx.write_parquet(csv, output_path, Some(props)).await?
-            }
-            other => {
-                return Err(DataFusionError::NotImplemented(format!(
-                    "Invalid output format: {other}"
-                )));
-            }
-        }
-        println!("Conversion completed in {} ms", start.elapsed().as_millis());
-    }
-
-    Ok(())
-}
-
 pub const QUERY_LIMIT: [Option<usize>; 22] = [
     None,
     Some(100),
diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs
new file mode 100644
index 0000000000..6aada30bc7
--- /dev/null
+++ b/benchmarks/src/tpch/run.rs
@@ -0,0 +1,442 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::get_query_sql;
+use crate::BenchmarkRun;
+use arrow::record_batch::RecordBatch;
+use arrow::util::pretty::{self, pretty_format_batches};
+use datafusion::datasource::file_format::csv::{CsvFormat, 
DEFAULT_CSV_EXTENSION};
+use datafusion::datasource::file_format::parquet::{
+    ParquetFormat, DEFAULT_PARQUET_EXTENSION,
+};
+use datafusion::datasource::file_format::FileFormat;
+use datafusion::datasource::listing::{
+    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
+};
+use datafusion::datasource::{MemTable, TableProvider};
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use datafusion::physical_plan::{collect, displayable};
+use log::info;
+
+use std::path::PathBuf;
+use std::sync::Arc;
+use std::time::Instant;
+
+use datafusion::error::Result;
+use datafusion::prelude::*;
+use structopt::StructOpt;
+
+use super::{get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_TABLES};
+
+/// Run the tpch benchmark
+#[derive(Debug, StructOpt, Clone)]
+pub struct RunOpt {
+    /// Query number. If not specified, runs all queries
+    #[structopt(short, long)]
+    query: Option<usize>,
+
+    /// Activate debug mode to see query results
+    #[structopt(short, long)]
+    debug: bool,
+
+    /// Number of iterations of each test run
+    #[structopt(short = "i", long = "iterations", default_value = "3")]
+    iterations: usize,
+
+    /// Number of partitions to process in parallel
+    #[structopt(short = "n", long = "partitions", default_value = "2")]
+    partitions: usize,
+
+    /// Batch size when reading CSV or Parquet files
+    #[structopt(short = "s", long = "batch-size", default_value = "8192")]
+    batch_size: usize,
+
+    /// Path to data files
+    #[structopt(parse(from_os_str), required = true, short = "p", long = 
"path")]
+    path: PathBuf,
+
+    /// File format: `csv` or `parquet`
+    #[structopt(short = "f", long = "format", default_value = "csv")]
+    file_format: String,
+
+    /// Load the data into a MemTable before executing the query
+    #[structopt(short = "m", long = "mem-table")]
+    mem_table: bool,
+
+    /// Path to machine readable output file
+    #[structopt(parse(from_os_str), short = "o", long = "output")]
+    output_path: Option<PathBuf>,
+
+    /// Whether to disable collection of statistics (and cost based 
optimizations) or not.
+    #[structopt(short = "S", long = "disable-statistics")]
+    disable_statistics: bool,
+}
+
+const TPCH_QUERY_START_ID: usize = 1;
+const TPCH_QUERY_END_ID: usize = 22;
+
+impl RunOpt {
+    pub async fn run(&self) -> Result<()> {
+        println!("Running benchmarks with the following options: {self:?}");
+        let query_range = match self.query {
+            Some(query_id) => query_id..=query_id,
+            None => TPCH_QUERY_START_ID..=TPCH_QUERY_END_ID,
+        };
+
+        let mut benchmark_run = BenchmarkRun::new();
+        for query_id in query_range {
+            benchmark_run.start_new_case(&format!("Query {query_id}"));
+            let query_run = self.benchmark_query(query_id).await?;
+            for iter in query_run {
+                benchmark_run.write_iter(iter.elapsed, iter.row_count);
+            }
+        }
+        benchmark_run.maybe_write_json(self.output_path.as_ref())?;
+        Ok(())
+    }
+
+    async fn benchmark_query(&self, query_id: usize) -> 
Result<Vec<QueryResult>> {
+        let config = SessionConfig::new()
+            .with_target_partitions(self.partitions)
+            .with_batch_size(self.batch_size)
+            .with_collect_statistics(!self.disable_statistics);
+        let ctx = SessionContext::with_config(config);
+
+        // register tables
+        self.register_tables(&ctx).await?;
+
+        let mut millis = vec![];
+        // run benchmark
+        let mut query_results = vec![];
+        for i in 0..self.iterations {
+            let start = Instant::now();
+
+            let sql = &get_query_sql(query_id)?;
+
+            // query 15 is special, with 3 statements. the second statement is 
the one from which we
+            // want to capture the results
+            let mut result = vec![];
+            if query_id == 15 {
+                for (n, query) in sql.iter().enumerate() {
+                    if n == 1 {
+                        result = self.execute_query(&ctx, query).await?;
+                    } else {
+                        self.execute_query(&ctx, query).await?;
+                    }
+                }
+            } else {
+                for query in sql {
+                    result = self.execute_query(&ctx, query).await?;
+                }
+            }
+
+            let elapsed = start.elapsed(); //.as_secs_f64() * 1000.0;
+            let ms = elapsed.as_secs_f64() * 1000.0;
+            millis.push(ms);
+            info!("output:\n\n{}\n\n", pretty_format_batches(&result)?);
+            let row_count = result.iter().map(|b| b.num_rows()).sum();
+            println!(
+                "Query {query_id} iteration {i} took {ms:.1} ms and returned 
{row_count} rows"
+            );
+            query_results.push(QueryResult { elapsed, row_count });
+        }
+
+        let avg = millis.iter().sum::<f64>() / millis.len() as f64;
+        println!("Query {query_id} avg time: {avg:.2} ms");
+
+        Ok(query_results)
+    }
+
+    async fn register_tables(&self, ctx: &SessionContext) -> Result<()> {
+        for table in TPCH_TABLES {
+            let table_provider = { self.get_table(ctx, table).await? };
+
+            if self.mem_table {
+                println!("Loading table '{table}' into memory");
+                let start = Instant::now();
+                let memtable =
+                    MemTable::load(table_provider, Some(self.partitions), 
&ctx.state())
+                        .await?;
+                println!(
+                    "Loaded table '{}' into memory in {} ms",
+                    table,
+                    start.elapsed().as_millis()
+                );
+                ctx.register_table(*table, Arc::new(memtable))?;
+            } else {
+                ctx.register_table(*table, table_provider)?;
+            }
+        }
+        Ok(())
+    }
+
+    async fn execute_query(
+        &self,
+        ctx: &SessionContext,
+        sql: &str,
+    ) -> Result<Vec<RecordBatch>> {
+        let debug = self.debug;
+        let plan = ctx.sql(sql).await?;
+        let (state, plan) = plan.into_parts();
+
+        if debug {
+            println!("=== Logical plan ===\n{plan:?}\n");
+        }
+
+        let plan = state.optimize(&plan)?;
+        if debug {
+            println!("=== Optimized logical plan ===\n{plan:?}\n");
+        }
+        let physical_plan = state.create_physical_plan(&plan).await?;
+        if debug {
+            println!(
+                "=== Physical plan ===\n{}\n",
+                displayable(physical_plan.as_ref()).indent(true)
+            );
+        }
+        let result = collect(physical_plan.clone(), state.task_ctx()).await?;
+        if debug {
+            println!(
+                "=== Physical plan with metrics ===\n{}\n",
+                DisplayableExecutionPlan::with_metrics(physical_plan.as_ref())
+                    .indent(true)
+            );
+            if !result.is_empty() {
+                // do not call print_batches if there are no batches as the 
result is confusing
+                // and makes it look like there is a batch with no columns
+                pretty::print_batches(&result)?;
+            }
+        }
+        Ok(result)
+    }
+
+    async fn get_table(
+        &self,
+        ctx: &SessionContext,
+        table: &str,
+    ) -> Result<Arc<dyn TableProvider>> {
+        let path = self.path.to_str().unwrap();
+        let table_format = self.file_format.as_str();
+        let target_partitions = self.partitions;
+
+        // Obtain a snapshot of the SessionState
+        let state = ctx.state();
+        let (format, path, extension): (Arc<dyn FileFormat>, String, &'static 
str) =
+            match table_format {
+                // dbgen creates .tbl ('|' delimited) files without header
+                "tbl" => {
+                    let path = format!("{path}/{table}.tbl");
+
+                    let format = CsvFormat::default()
+                        .with_delimiter(b'|')
+                        .with_has_header(false);
+
+                    (Arc::new(format), path, ".tbl")
+                }
+                "csv" => {
+                    let path = format!("{path}/{table}");
+                    let format = CsvFormat::default()
+                        .with_delimiter(b',')
+                        .with_has_header(true);
+
+                    (Arc::new(format), path, DEFAULT_CSV_EXTENSION)
+                }
+                "parquet" => {
+                    let path = format!("{path}/{table}");
+                    let format = 
ParquetFormat::default().with_enable_pruning(Some(true));
+
+                    (Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
+                }
+                other => {
+                    unimplemented!("Invalid file format '{}'", other);
+                }
+            };
+
+        let options = ListingOptions::new(format)
+            .with_file_extension(extension)
+            .with_target_partitions(target_partitions)
+            .with_collect_stat(state.config().collect_statistics());
+
+        let table_path = ListingTableUrl::parse(path)?;
+        let config = 
ListingTableConfig::new(table_path).with_listing_options(options);
+
+        let config = match table_format {
+            "parquet" => config.infer_schema(&state).await?,
+            "tbl" => 
config.with_schema(Arc::new(get_tbl_tpch_table_schema(table))),
+            "csv" => 
config.with_schema(Arc::new(get_tpch_table_schema(table))),
+            _ => unreachable!(),
+        };
+
+        Ok(Arc::new(ListingTable::try_new(config)?))
+    }
+}
+
+struct QueryResult {
+    elapsed: std::time::Duration,
+    row_count: usize,
+}
+
+#[cfg(test)]
+// Only run with "ci" mode when we have the data
+#[cfg(feature = "ci")]
+mod tests {
+    use super::*;
+    use datafusion::error::{DataFusionError, Result};
+    use std::path::Path;
+
+    use datafusion_proto::bytes::{
+        logical_plan_from_bytes, logical_plan_to_bytes, 
physical_plan_from_bytes,
+        physical_plan_to_bytes,
+    };
+
+    fn get_tpch_data_path() -> Result<String> {
+        let path =
+            std::env::var("TPCH_DATA").unwrap_or_else(|_| 
"benchmarks/data".to_string());
+        if !Path::new(&path).exists() {
+            return Err(DataFusionError::Execution(format!(
+                "Benchmark data not found (set TPCH_DATA env var to override): 
{}",
+                path
+            )));
+        }
+        Ok(path)
+    }
+
+    async fn round_trip_logical_plan(query: usize) -> Result<()> {
+        let ctx = SessionContext::default();
+        let path = get_tpch_data_path()?;
+        let opt = RunOpt {
+            query: Some(query),
+            debug: false,
+            iterations: 1,
+            partitions: 2,
+            batch_size: 8192,
+            path: PathBuf::from(path.to_string()),
+            file_format: "tbl".to_string(),
+            mem_table: false,
+            output_path: None,
+            disable_statistics: false,
+        };
+        opt.register_tables(&ctx).await?;
+        let queries = get_query_sql(query)?;
+        for query in queries {
+            let plan = ctx.sql(&query).await?;
+            let plan = plan.into_optimized_plan()?;
+            let bytes = logical_plan_to_bytes(&plan)?;
+            let plan2 = logical_plan_from_bytes(&bytes, &ctx)?;
+            let plan_formatted = format!("{}", plan.display_indent());
+            let plan2_formatted = format!("{}", plan2.display_indent());
+            assert_eq!(plan_formatted, plan2_formatted);
+        }
+        Ok(())
+    }
+
+    async fn round_trip_physical_plan(query: usize) -> Result<()> {
+        let ctx = SessionContext::default();
+        let path = get_tpch_data_path()?;
+        let opt = RunOpt {
+            query: Some(query),
+            debug: false,
+            iterations: 1,
+            partitions: 2,
+            batch_size: 8192,
+            path: PathBuf::from(path.to_string()),
+            file_format: "tbl".to_string(),
+            mem_table: false,
+            output_path: None,
+            disable_statistics: false,
+        };
+        opt.register_tables(&ctx).await?;
+        let queries = get_query_sql(query)?;
+        for query in queries {
+            let plan = ctx.sql(&query).await?;
+            let plan = plan.create_physical_plan().await?;
+            let bytes = physical_plan_to_bytes(plan.clone())?;
+            let plan2 = physical_plan_from_bytes(&bytes, &ctx)?;
+            let plan_formatted = format!("{}", 
displayable(plan.as_ref()).indent(false));
+            let plan2_formatted =
+                format!("{}", displayable(plan2.as_ref()).indent(false));
+            assert_eq!(plan_formatted, plan2_formatted);
+        }
+        Ok(())
+    }
+
+    macro_rules! test_round_trip_logical {
+        ($tn:ident, $query:expr) => {
+            #[tokio::test]
+            async fn $tn() -> Result<()> {
+                round_trip_logical_plan($query).await
+            }
+        };
+    }
+
+    macro_rules! test_round_trip_physical {
+        ($tn:ident, $query:expr) => {
+            #[tokio::test]
+            async fn $tn() -> Result<()> {
+                round_trip_physical_plan($query).await
+            }
+        };
+    }
+
+    // logical plan tests
+    test_round_trip_logical!(round_trip_logical_plan_q1, 1);
+    test_round_trip_logical!(round_trip_logical_plan_q2, 2);
+    test_round_trip_logical!(round_trip_logical_plan_q3, 3);
+    test_round_trip_logical!(round_trip_logical_plan_q4, 4);
+    test_round_trip_logical!(round_trip_logical_plan_q5, 5);
+    test_round_trip_logical!(round_trip_logical_plan_q6, 6);
+    test_round_trip_logical!(round_trip_logical_plan_q7, 7);
+    test_round_trip_logical!(round_trip_logical_plan_q8, 8);
+    test_round_trip_logical!(round_trip_logical_plan_q9, 9);
+    test_round_trip_logical!(round_trip_logical_plan_q10, 10);
+    test_round_trip_logical!(round_trip_logical_plan_q11, 11);
+    test_round_trip_logical!(round_trip_logical_plan_q12, 12);
+    test_round_trip_logical!(round_trip_logical_plan_q13, 13);
+    test_round_trip_logical!(round_trip_logical_plan_q14, 14);
+    test_round_trip_logical!(round_trip_logical_plan_q15, 15);
+    test_round_trip_logical!(round_trip_logical_plan_q16, 16);
+    test_round_trip_logical!(round_trip_logical_plan_q17, 17);
+    test_round_trip_logical!(round_trip_logical_plan_q18, 18);
+    test_round_trip_logical!(round_trip_logical_plan_q19, 19);
+    test_round_trip_logical!(round_trip_logical_plan_q20, 20);
+    test_round_trip_logical!(round_trip_logical_plan_q21, 21);
+    test_round_trip_logical!(round_trip_logical_plan_q22, 22);
+
+    // physical plan tests
+    test_round_trip_physical!(round_trip_physical_plan_q1, 1);
+    test_round_trip_physical!(round_trip_physical_plan_q2, 2);
+    test_round_trip_physical!(round_trip_physical_plan_q3, 3);
+    test_round_trip_physical!(round_trip_physical_plan_q4, 4);
+    test_round_trip_physical!(round_trip_physical_plan_q5, 5);
+    test_round_trip_physical!(round_trip_physical_plan_q6, 6);
+    test_round_trip_physical!(round_trip_physical_plan_q7, 7);
+    test_round_trip_physical!(round_trip_physical_plan_q8, 8);
+    test_round_trip_physical!(round_trip_physical_plan_q9, 9);
+    test_round_trip_physical!(round_trip_physical_plan_q10, 10);
+    test_round_trip_physical!(round_trip_physical_plan_q11, 11);
+    test_round_trip_physical!(round_trip_physical_plan_q12, 12);
+    test_round_trip_physical!(round_trip_physical_plan_q13, 13);
+    test_round_trip_physical!(round_trip_physical_plan_q14, 14);
+    test_round_trip_physical!(round_trip_physical_plan_q15, 15);
+    test_round_trip_physical!(round_trip_physical_plan_q16, 16);
+    test_round_trip_physical!(round_trip_physical_plan_q17, 17);
+    test_round_trip_physical!(round_trip_physical_plan_q18, 18);
+    test_round_trip_physical!(round_trip_physical_plan_q19, 19);
+    test_round_trip_physical!(round_trip_physical_plan_q20, 20);
+    test_round_trip_physical!(round_trip_physical_plan_q21, 21);
+    test_round_trip_physical!(round_trip_physical_plan_q22, 22);
+}

Reply via email to