This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 9445aa0ee5 Create `dfbench`, split up `tpch` benchmark runner into
modules (#7054)
9445aa0ee5 is described below
commit 9445aa0ee52be9c08981d4a8ff7568f520926046
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Jul 24 06:30:25 2023 -0500
Create `dfbench`, split up `tpch` benchmark runner into modules (#7054)
---
benchmarks/src/bin/dfbench.rs | 49 ++++
benchmarks/src/bin/tpch.rs | 503 +-------------------------------
benchmarks/src/lib.rs | 147 +---------
benchmarks/src/{lib.rs => run.rs} | 2 -
benchmarks/src/tpch/convert.rs | 147 ++++++++++
benchmarks/src/{tpch.rs => tpch/mod.rs} | 88 +-----
benchmarks/src/tpch/run.rs | 442 ++++++++++++++++++++++++++++
7 files changed, 675 insertions(+), 703 deletions(-)
diff --git a/benchmarks/src/bin/dfbench.rs b/benchmarks/src/bin/dfbench.rs
new file mode 100644
index 0000000000..f4ba8bc975
--- /dev/null
+++ b/benchmarks/src/bin/dfbench.rs
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! DataFusion benchmark runner
+use datafusion::error::Result;
+
+use structopt::StructOpt;
+
+#[cfg(feature = "snmalloc")]
+#[global_allocator]
+static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
+
+#[cfg(feature = "mimalloc")]
+#[global_allocator]
+static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
+
+use datafusion_benchmarks::tpch;
+
+#[derive(Debug, StructOpt)]
+#[structopt(about = "benchmark command")]
+enum Options {
+ Tpch(tpch::RunOpt),
+ TpchConvert(tpch::ConvertOpt),
+}
+
+// Main benchmark runner entrypoint
+#[tokio::main]
+pub async fn main() -> Result<()> {
+ env_logger::init();
+
+ match Options::from_args() {
+ Options::Tpch(opt) => opt.run().await,
+ Options::TpchConvert(opt) => opt.run().await,
+ }
+}
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index b5f535b38d..757136d231 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -15,30 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-//! Benchmark derived from TPC-H. This is not an official TPC-H benchmark.
-use log::info;
+//! tpch binary only entrypoint
-use arrow::util::pretty::pretty_format_batches;
-use datafusion::datasource::file_format::{csv::CsvFormat, FileFormat};
-use datafusion::datasource::{MemTable, TableProvider};
-use datafusion::error::{DataFusionError, Result};
-use datafusion::parquet::basic::Compression;
-use datafusion::physical_plan::display::DisplayableExecutionPlan;
-use datafusion::physical_plan::{collect, displayable};
-use datafusion::prelude::*;
-use datafusion::{
- arrow::record_batch::RecordBatch,
datasource::file_format::parquet::ParquetFormat,
-};
-use datafusion::{
- arrow::util::pretty,
- datasource::listing::{ListingOptions, ListingTable, ListingTableConfig},
-};
-use datafusion_benchmarks::{tpch::*, BenchmarkRun};
-use std::{iter::Iterator, path::PathBuf, sync::Arc, time::Instant};
-
-use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
-use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
-use datafusion::datasource::listing::ListingTableUrl;
+use datafusion::error::Result;
+use datafusion_benchmarks::tpch;
use structopt::StructOpt;
#[cfg(feature = "snmalloc")]
@@ -49,490 +29,31 @@ static ALLOC: snmalloc_rs::SnMalloc =
snmalloc_rs::SnMalloc;
#[global_allocator]
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
-#[derive(Debug, StructOpt, Clone)]
-struct DataFusionBenchmarkOpt {
- /// Query number. If not specified, runs all queries
- #[structopt(short, long)]
- query: Option<usize>,
-
- /// Activate debug mode to see query results
- #[structopt(short, long)]
- debug: bool,
-
- /// Number of iterations of each test run
- #[structopt(short = "i", long = "iterations", default_value = "3")]
- iterations: usize,
-
- /// Number of partitions to process in parallel
- #[structopt(short = "n", long = "partitions", default_value = "2")]
- partitions: usize,
-
- /// Batch size when reading CSV or Parquet files
- #[structopt(short = "s", long = "batch-size", default_value = "8192")]
- batch_size: usize,
-
- /// Path to data files
- #[structopt(parse(from_os_str), required = true, short = "p", long =
"path")]
- path: PathBuf,
-
- /// File format: `csv` or `parquet`
- #[structopt(short = "f", long = "format", default_value = "csv")]
- file_format: String,
-
- /// Load the data into a MemTable before executing the query
- #[structopt(short = "m", long = "mem-table")]
- mem_table: bool,
-
- /// Path to machine readable output file
- #[structopt(parse(from_os_str), short = "o", long = "output")]
- output_path: Option<PathBuf>,
-
- /// Whether to disable collection of statistics (and cost based
optimizations) or not.
- #[structopt(short = "S", long = "disable-statistics")]
- disable_statistics: bool,
-}
-
-#[derive(Debug, StructOpt)]
-struct ConvertOpt {
- /// Path to csv files
- #[structopt(parse(from_os_str), required = true, short = "i", long =
"input")]
- input_path: PathBuf,
-
- /// Output path
- #[structopt(parse(from_os_str), required = true, short = "o", long =
"output")]
- output_path: PathBuf,
-
- /// Output file format: `csv` or `parquet`
- #[structopt(short = "f", long = "format")]
- file_format: String,
-
- /// Compression to use when writing Parquet files
- #[structopt(short = "c", long = "compression", default_value = "zstd")]
- compression: String,
-
- /// Number of partitions to produce
- #[structopt(short = "n", long = "partitions", default_value = "1")]
- partitions: usize,
-
- /// Batch size when reading CSV or Parquet files
- #[structopt(short = "s", long = "batch-size", default_value = "8192")]
- batch_size: usize,
-}
-
#[derive(Debug, StructOpt)]
#[structopt(about = "benchmark command")]
enum BenchmarkSubCommandOpt {
#[structopt(name = "datafusion")]
- DataFusionBenchmark(DataFusionBenchmarkOpt),
+ DataFusionBenchmark(tpch::RunOpt),
}
#[derive(Debug, StructOpt)]
#[structopt(name = "TPC-H", about = "TPC-H Benchmarks.")]
enum TpchOpt {
Benchmark(BenchmarkSubCommandOpt),
- Convert(ConvertOpt),
+ Convert(tpch::ConvertOpt),
}
+/// 'tpch' entry point, with tortured command line arguments
+///
+/// This is kept to be backwards compatible with the benchmark names prior to
+/// <https://github.com/apache/arrow-datafusion/issues/6994>
#[tokio::main]
async fn main() -> Result<()> {
- use BenchmarkSubCommandOpt::*;
-
env_logger::init();
match TpchOpt::from_args() {
- TpchOpt::Benchmark(DataFusionBenchmark(opt)) => {
- benchmark_datafusion(opt).await.map(|_| ())
- }
- TpchOpt::Convert(opt) => {
- let compression = match opt.compression.as_str() {
- "none" => Compression::UNCOMPRESSED,
- "snappy" => Compression::SNAPPY,
- "brotli" => Compression::BROTLI(Default::default()),
- "gzip" => Compression::GZIP(Default::default()),
- "lz4" => Compression::LZ4,
- "lz0" => Compression::LZO,
- "zstd" => Compression::ZSTD(Default::default()),
- other => {
- return Err(DataFusionError::NotImplemented(format!(
- "Invalid compression format: {other}"
- )));
- }
- };
- convert_tbl(
- opt.input_path.to_str().unwrap(),
- opt.output_path.to_str().unwrap(),
- &opt.file_format,
- opt.partitions,
- opt.batch_size,
- compression,
- )
- .await
+ TpchOpt::Benchmark(BenchmarkSubCommandOpt::DataFusionBenchmark(opt))
=> {
+ opt.run().await
}
+ TpchOpt::Convert(opt) => opt.run().await,
}
}
-
-const TPCH_QUERY_START_ID: usize = 1;
-const TPCH_QUERY_END_ID: usize = 22;
-
-async fn benchmark_datafusion(
- opt: DataFusionBenchmarkOpt,
-) -> Result<Vec<Vec<RecordBatch>>> {
- println!("Running benchmarks with the following options: {opt:?}");
- let query_range = match opt.query {
- Some(query_id) => query_id..=query_id,
- None => TPCH_QUERY_START_ID..=TPCH_QUERY_END_ID,
- };
-
- let mut benchmark_run = BenchmarkRun::new();
- let mut results = vec![];
- for query_id in query_range {
- benchmark_run.start_new_case(&format!("Query {query_id}"));
- let (query_run, result) = benchmark_query(&opt, query_id).await?;
- results.push(result);
- for iter in query_run {
- benchmark_run.write_iter(iter.elapsed, iter.row_count);
- }
- }
- benchmark_run.maybe_write_json(opt.output_path.as_ref())?;
- Ok(results)
-}
-
-async fn benchmark_query(
- opt: &DataFusionBenchmarkOpt,
- query_id: usize,
-) -> Result<(Vec<QueryResult>, Vec<RecordBatch>)> {
- let mut query_results = vec![];
- let config = SessionConfig::new()
- .with_target_partitions(opt.partitions)
- .with_batch_size(opt.batch_size)
- .with_collect_statistics(!opt.disable_statistics);
- let ctx = SessionContext::with_config(config);
-
- // register tables
- register_tables(opt, &ctx).await?;
-
- let mut millis = vec![];
- // run benchmark
- let mut result: Vec<RecordBatch> = Vec::with_capacity(1);
- for i in 0..opt.iterations {
- let start = Instant::now();
-
- let sql = &get_query_sql(query_id)?;
-
- // query 15 is special, with 3 statements. the second statement is the
one from which we
- // want to capture the results
- if query_id == 15 {
- for (n, query) in sql.iter().enumerate() {
- if n == 1 {
- result = execute_query(&ctx, query, opt.debug).await?;
- } else {
- execute_query(&ctx, query, opt.debug).await?;
- }
- }
- } else {
- for query in sql {
- result = execute_query(&ctx, query, opt.debug).await?;
- }
- }
-
- let elapsed = start.elapsed(); //.as_secs_f64() * 1000.0;
- let ms = elapsed.as_secs_f64() * 1000.0;
- millis.push(ms);
- info!("output:\n\n{}\n\n", pretty_format_batches(&result)?);
- let row_count = result.iter().map(|b| b.num_rows()).sum();
- println!(
- "Query {query_id} iteration {i} took {ms:.1} ms and returned
{row_count} rows"
- );
- query_results.push(QueryResult { elapsed, row_count });
- }
-
- let avg = millis.iter().sum::<f64>() / millis.len() as f64;
- println!("Query {query_id} avg time: {avg:.2} ms");
-
- Ok((query_results, result))
-}
-
-async fn register_tables(
- opt: &DataFusionBenchmarkOpt,
- ctx: &SessionContext,
-) -> Result<()> {
- for table in TPCH_TABLES {
- let table_provider = {
- get_table(
- ctx,
- opt.path.to_str().unwrap(),
- table,
- opt.file_format.as_str(),
- opt.partitions,
- )
- .await?
- };
-
- if opt.mem_table {
- println!("Loading table '{table}' into memory");
- let start = Instant::now();
- let memtable =
- MemTable::load(table_provider, Some(opt.partitions),
&ctx.state())
- .await?;
- println!(
- "Loaded table '{}' into memory in {} ms",
- table,
- start.elapsed().as_millis()
- );
- ctx.register_table(*table, Arc::new(memtable))?;
- } else {
- ctx.register_table(*table, table_provider)?;
- }
- }
- Ok(())
-}
-
-async fn execute_query(
- ctx: &SessionContext,
- sql: &str,
- debug: bool,
-) -> Result<Vec<RecordBatch>> {
- let plan = ctx.sql(sql).await?;
- let (state, plan) = plan.into_parts();
-
- if debug {
- println!("=== Logical plan ===\n{plan:?}\n");
- }
-
- let plan = state.optimize(&plan)?;
- if debug {
- println!("=== Optimized logical plan ===\n{plan:?}\n");
- }
- let physical_plan = state.create_physical_plan(&plan).await?;
- if debug {
- println!(
- "=== Physical plan ===\n{}\n",
- displayable(physical_plan.as_ref()).indent(true)
- );
- }
- let result = collect(physical_plan.clone(), state.task_ctx()).await?;
- if debug {
- println!(
- "=== Physical plan with metrics ===\n{}\n",
-
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref()).indent(true)
- );
- if !result.is_empty() {
- // do not call print_batches if there are no batches as the result
is confusing
- // and makes it look like there is a batch with no columns
- pretty::print_batches(&result)?;
- }
- }
- Ok(result)
-}
-
-async fn get_table(
- ctx: &SessionContext,
- path: &str,
- table: &str,
- table_format: &str,
- target_partitions: usize,
-) -> Result<Arc<dyn TableProvider>> {
- // Obtain a snapshot of the SessionState
- let state = ctx.state();
- let (format, path, extension): (Arc<dyn FileFormat>, String, &'static str)
=
- match table_format {
- // dbgen creates .tbl ('|' delimited) files without header
- "tbl" => {
- let path = format!("{path}/{table}.tbl");
-
- let format = CsvFormat::default()
- .with_delimiter(b'|')
- .with_has_header(false);
-
- (Arc::new(format), path, ".tbl")
- }
- "csv" => {
- let path = format!("{path}/{table}");
- let format = CsvFormat::default()
- .with_delimiter(b',')
- .with_has_header(true);
-
- (Arc::new(format), path, DEFAULT_CSV_EXTENSION)
- }
- "parquet" => {
- let path = format!("{path}/{table}");
- let format =
ParquetFormat::default().with_enable_pruning(Some(true));
-
- (Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
- }
- other => {
- unimplemented!("Invalid file format '{}'", other);
- }
- };
-
- let options = ListingOptions::new(format)
- .with_file_extension(extension)
- .with_target_partitions(target_partitions)
- .with_collect_stat(state.config().collect_statistics());
-
- let table_path = ListingTableUrl::parse(path)?;
- let config =
ListingTableConfig::new(table_path).with_listing_options(options);
-
- let config = match table_format {
- "parquet" => config.infer_schema(&state).await?,
- "tbl" =>
config.with_schema(Arc::new(get_tbl_tpch_table_schema(table))),
- "csv" => config.with_schema(Arc::new(get_tpch_table_schema(table))),
- _ => unreachable!(),
- };
-
- Ok(Arc::new(ListingTable::try_new(config)?))
-}
-
-struct QueryResult {
- elapsed: std::time::Duration,
- row_count: usize,
-}
-
-#[cfg(test)]
-#[cfg(feature = "ci")]
-/// CI checks
-mod tests {
- use std::path::Path;
-
- use super::*;
- use datafusion_proto::bytes::{
- logical_plan_from_bytes, logical_plan_to_bytes,
physical_plan_from_bytes,
- physical_plan_to_bytes,
- };
-
- fn get_tpch_data_path() -> Result<String> {
- let path =
- std::env::var("TPCH_DATA").unwrap_or_else(|_|
"benchmarks/data".to_string());
- if !Path::new(&path).exists() {
- return Err(DataFusionError::Execution(format!(
- "Benchmark data not found (set TPCH_DATA env var to override):
{}",
- path
- )));
- }
- Ok(path)
- }
-
- async fn round_trip_logical_plan(query: usize) -> Result<()> {
- let ctx = SessionContext::default();
- let path = get_tpch_data_path()?;
- let opt = DataFusionBenchmarkOpt {
- query: Some(query),
- debug: false,
- iterations: 1,
- partitions: 2,
- batch_size: 8192,
- path: PathBuf::from(path.to_string()),
- file_format: "tbl".to_string(),
- mem_table: false,
- output_path: None,
- disable_statistics: false,
- };
- register_tables(&opt, &ctx).await?;
- let queries = get_query_sql(query)?;
- for query in queries {
- let plan = ctx.sql(&query).await?;
- let plan = plan.into_optimized_plan()?;
- let bytes = logical_plan_to_bytes(&plan)?;
- let plan2 = logical_plan_from_bytes(&bytes, &ctx)?;
- let plan_formatted = format!("{}", plan.display_indent());
- let plan2_formatted = format!("{}", plan2.display_indent());
- assert_eq!(plan_formatted, plan2_formatted);
- }
- Ok(())
- }
-
- async fn round_trip_physical_plan(query: usize) -> Result<()> {
- let ctx = SessionContext::default();
- let path = get_tpch_data_path()?;
- let opt = DataFusionBenchmarkOpt {
- query: Some(query),
- debug: false,
- iterations: 1,
- partitions: 2,
- batch_size: 8192,
- path: PathBuf::from(path.to_string()),
- file_format: "tbl".to_string(),
- mem_table: false,
- output_path: None,
- disable_statistics: false,
- };
- register_tables(&opt, &ctx).await?;
- let queries = get_query_sql(query)?;
- for query in queries {
- let plan = ctx.sql(&query).await?;
- let plan = plan.create_physical_plan().await?;
- let bytes = physical_plan_to_bytes(plan.clone())?;
- let plan2 = physical_plan_from_bytes(&bytes, &ctx)?;
- let plan_formatted = format!("{}",
displayable(plan.as_ref()).indent(false));
- let plan2_formatted =
- format!("{}", displayable(plan2.as_ref()).indent(false));
- assert_eq!(plan_formatted, plan2_formatted);
- }
- Ok(())
- }
-
- macro_rules! test_round_trip_logical {
- ($tn:ident, $query:expr) => {
- #[tokio::test]
- async fn $tn() -> Result<()> {
- round_trip_logical_plan($query).await
- }
- };
- }
-
- macro_rules! test_round_trip_physical {
- ($tn:ident, $query:expr) => {
- #[tokio::test]
- async fn $tn() -> Result<()> {
- round_trip_physical_plan($query).await
- }
- };
- }
-
- // logical plan tests
- test_round_trip_logical!(round_trip_logical_plan_q1, 1);
- test_round_trip_logical!(round_trip_logical_plan_q2, 2);
- test_round_trip_logical!(round_trip_logical_plan_q3, 3);
- test_round_trip_logical!(round_trip_logical_plan_q4, 4);
- test_round_trip_logical!(round_trip_logical_plan_q5, 5);
- test_round_trip_logical!(round_trip_logical_plan_q6, 6);
- test_round_trip_logical!(round_trip_logical_plan_q7, 7);
- test_round_trip_logical!(round_trip_logical_plan_q8, 8);
- test_round_trip_logical!(round_trip_logical_plan_q9, 9);
- test_round_trip_logical!(round_trip_logical_plan_q10, 10);
- test_round_trip_logical!(round_trip_logical_plan_q11, 11);
- test_round_trip_logical!(round_trip_logical_plan_q12, 12);
- test_round_trip_logical!(round_trip_logical_plan_q13, 13);
- test_round_trip_logical!(round_trip_logical_plan_q14, 14);
- test_round_trip_logical!(round_trip_logical_plan_q15, 15);
- test_round_trip_logical!(round_trip_logical_plan_q16, 16);
- test_round_trip_logical!(round_trip_logical_plan_q17, 17);
- test_round_trip_logical!(round_trip_logical_plan_q18, 18);
- test_round_trip_logical!(round_trip_logical_plan_q19, 19);
- test_round_trip_logical!(round_trip_logical_plan_q20, 20);
- test_round_trip_logical!(round_trip_logical_plan_q21, 21);
- test_round_trip_logical!(round_trip_logical_plan_q22, 22);
-
- // physical plan tests
- test_round_trip_physical!(round_trip_physical_plan_q1, 1);
- test_round_trip_physical!(round_trip_physical_plan_q2, 2);
- test_round_trip_physical!(round_trip_physical_plan_q3, 3);
- test_round_trip_physical!(round_trip_physical_plan_q4, 4);
- test_round_trip_physical!(round_trip_physical_plan_q5, 5);
- test_round_trip_physical!(round_trip_physical_plan_q6, 6);
- test_round_trip_physical!(round_trip_physical_plan_q7, 7);
- test_round_trip_physical!(round_trip_physical_plan_q8, 8);
- test_round_trip_physical!(round_trip_physical_plan_q9, 9);
- test_round_trip_physical!(round_trip_physical_plan_q10, 10);
- test_round_trip_physical!(round_trip_physical_plan_q11, 11);
- test_round_trip_physical!(round_trip_physical_plan_q12, 12);
- test_round_trip_physical!(round_trip_physical_plan_q13, 13);
- test_round_trip_physical!(round_trip_physical_plan_q14, 14);
- test_round_trip_physical!(round_trip_physical_plan_q15, 15);
- test_round_trip_physical!(round_trip_physical_plan_q16, 16);
- test_round_trip_physical!(round_trip_physical_plan_q17, 17);
- test_round_trip_physical!(round_trip_physical_plan_q18, 18);
- test_round_trip_physical!(round_trip_physical_plan_q19, 19);
- test_round_trip_physical!(round_trip_physical_plan_q20, 20);
- test_round_trip_physical!(round_trip_physical_plan_q21, 21);
- test_round_trip_physical!(round_trip_physical_plan_q22, 22);
-}
diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs
index c2f4e876ce..9d5530d31f 100644
--- a/benchmarks/src/lib.rs
+++ b/benchmarks/src/lib.rs
@@ -15,143 +15,28 @@
// specific language governing permissions and limitations
// under the License.
-use datafusion::{error::Result, DATAFUSION_VERSION};
-use serde::{Serialize, Serializer};
-use serde_json::Value;
-use std::{
- collections::HashMap,
- path::Path,
- time::{Duration, SystemTime},
-};
+//! DataFusion benchmark runner
+use datafusion::error::Result;
+use structopt::StructOpt;
+pub mod run;
pub mod tpch;
-fn serialize_start_time<S>(start_time: &SystemTime, ser: S) -> Result<S::Ok,
S::Error>
-where
- S: Serializer,
-{
- ser.serialize_u64(
- start_time
- .duration_since(SystemTime::UNIX_EPOCH)
- .expect("current time is later than UNIX_EPOCH")
- .as_secs(),
- )
-}
-fn serialize_elapsed<S>(elapsed: &Duration, ser: S) -> Result<S::Ok, S::Error>
-where
- S: Serializer,
-{
- let ms = elapsed.as_secs_f64() * 1000.0;
- ser.serialize_f64(ms)
-}
-#[derive(Debug, Serialize)]
-pub struct RunContext {
- /// Benchmark crate version
- pub benchmark_version: String,
- /// DataFusion crate version
- pub datafusion_version: String,
- /// Number of CPU cores
- pub num_cpus: usize,
- /// Start time
- #[serde(serialize_with = "serialize_start_time")]
- pub start_time: SystemTime,
- /// CLI arguments
- pub arguments: Vec<String>,
-}
-
-impl Default for RunContext {
- fn default() -> Self {
- Self::new()
- }
-}
-
-impl RunContext {
- pub fn new() -> Self {
- Self {
- benchmark_version: env!("CARGO_PKG_VERSION").to_owned(),
- datafusion_version: DATAFUSION_VERSION.to_owned(),
- num_cpus: num_cpus::get(),
- start_time: SystemTime::now(),
- arguments: std::env::args().skip(1).collect::<Vec<String>>(),
- }
- }
-}
-
-/// A single iteration of a benchmark query
-#[derive(Debug, Serialize)]
-struct QueryIter {
- #[serde(serialize_with = "serialize_elapsed")]
- elapsed: Duration,
- row_count: usize,
-}
-/// A single benchmark case
-#[derive(Debug, Serialize)]
-pub struct BenchQuery {
- query: String,
- iterations: Vec<QueryIter>,
- #[serde(serialize_with = "serialize_start_time")]
- start_time: SystemTime,
-}
+pub use run::{BenchQuery, BenchmarkRun};
-/// collects benchmark run data and then serializes it at the end
-pub struct BenchmarkRun {
- context: RunContext,
- queries: Vec<BenchQuery>,
- current_case: Option<usize>,
-}
-
-impl Default for BenchmarkRun {
- fn default() -> Self {
- Self::new()
- }
+#[derive(Debug, StructOpt)]
+#[structopt(about = "benchmark command")]
+enum Options {
+ Tpch(tpch::RunOpt),
+ TpchConvert(tpch::ConvertOpt),
}
-impl BenchmarkRun {
- // create new
- pub fn new() -> Self {
- Self {
- context: RunContext::new(),
- queries: vec![],
- current_case: None,
- }
- }
- /// begin a new case. iterations added after this will be included in the
new case
- pub fn start_new_case(&mut self, id: &str) {
- self.queries.push(BenchQuery {
- query: id.to_owned(),
- iterations: vec![],
- start_time: SystemTime::now(),
- });
- if let Some(c) = self.current_case.as_mut() {
- *c += 1;
- } else {
- self.current_case = Some(0);
- }
- }
- /// Write a new iteration to the current case
- pub fn write_iter(&mut self, elapsed: Duration, row_count: usize) {
- if let Some(idx) = self.current_case {
- self.queries[idx]
- .iterations
- .push(QueryIter { elapsed, row_count })
- } else {
- panic!("no cases existed yet");
- }
- }
-
- /// Stringify data into formatted json
- pub fn to_json(&self) -> String {
- let mut output = HashMap::<&str, Value>::new();
- output.insert("context", serde_json::to_value(&self.context).unwrap());
- output.insert("queries", serde_json::to_value(&self.queries).unwrap());
- serde_json::to_string_pretty(&output).unwrap()
- }
+// Main benchmark runner entrypoint
+pub async fn main() -> Result<()> {
+ env_logger::init();
- /// Write data as json into output path if it exists.
- pub fn maybe_write_json(&self, maybe_path: Option<impl AsRef<Path>>) ->
Result<()> {
- if let Some(path) = maybe_path {
- std::fs::write(path, self.to_json())?;
- };
- Ok(())
+ match Options::from_args() {
+ Options::Tpch(opt) => opt.run().await,
+ Options::TpchConvert(opt) => opt.run().await,
}
}
diff --git a/benchmarks/src/lib.rs b/benchmarks/src/run.rs
similarity index 99%
copy from benchmarks/src/lib.rs
copy to benchmarks/src/run.rs
index c2f4e876ce..5ee6691576 100644
--- a/benchmarks/src/lib.rs
+++ b/benchmarks/src/run.rs
@@ -24,8 +24,6 @@ use std::{
time::{Duration, SystemTime},
};
-pub mod tpch;
-
fn serialize_start_time<S>(start_time: &SystemTime, ser: S) -> Result<S::Ok,
S::Error>
where
S: Serializer,
diff --git a/benchmarks/src/tpch/convert.rs b/benchmarks/src/tpch/convert.rs
new file mode 100644
index 0000000000..e076097ad7
--- /dev/null
+++ b/benchmarks/src/tpch/convert.rs
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::path::{Path, PathBuf};
+use std::time::Instant;
+
+use datafusion::error::DataFusionError;
+use datafusion::error::Result;
+use datafusion::prelude::*;
+use parquet::basic::Compression;
+use parquet::file::properties::WriterProperties;
+use structopt::StructOpt;
+
+use super::get_tbl_tpch_table_schema;
+use super::TPCH_TABLES;
+
+/// Convert tpch .slt files to .parquet or .csv files
+#[derive(Debug, StructOpt)]
+pub struct ConvertOpt {
+ /// Path to csv files
+ #[structopt(parse(from_os_str), required = true, short = "i", long =
"input")]
+ input_path: PathBuf,
+
+ /// Output path
+ #[structopt(parse(from_os_str), required = true, short = "o", long =
"output")]
+ output_path: PathBuf,
+
+ /// Output file format: `csv` or `parquet`
+ #[structopt(short = "f", long = "format")]
+ file_format: String,
+
+ /// Compression to use when writing Parquet files
+ #[structopt(short = "c", long = "compression", default_value = "zstd")]
+ compression: String,
+
+ /// Number of partitions to produce
+ #[structopt(short = "n", long = "partitions", default_value = "1")]
+ partitions: usize,
+
+ /// Batch size when reading CSV or Parquet files
+ #[structopt(short = "s", long = "batch-size", default_value = "8192")]
+ batch_size: usize,
+}
+
+impl ConvertOpt {
+ pub async fn run(&self) -> Result<()> {
+ let compression = self.compression()?;
+
+ let input_path = self.input_path.to_str().unwrap();
+ let output_path = self.output_path.to_str().unwrap();
+
+ let output_root_path = Path::new(output_path);
+ for table in TPCH_TABLES {
+ let start = Instant::now();
+ let schema = get_tbl_tpch_table_schema(table);
+
+ let input_path = format!("{input_path}/{table}.tbl");
+ let options = CsvReadOptions::new()
+ .schema(&schema)
+ .has_header(false)
+ .delimiter(b'|')
+ .file_extension(".tbl");
+
+ let config = SessionConfig::new().with_batch_size(self.batch_size);
+ let ctx = SessionContext::with_config(config);
+
+ // build plan to read the TBL file
+ let mut csv = ctx.read_csv(&input_path, options).await?;
+
+ // Select all apart from the padding column
+ let selection = csv
+ .schema()
+ .fields()
+ .iter()
+ .take(schema.fields.len() - 1)
+ .map(|d| Expr::Column(d.qualified_column()))
+ .collect();
+
+ csv = csv.select(selection)?;
+ // optionally, repartition the file
+ let partitions = self.partitions;
+ if partitions > 1 {
+ csv =
csv.repartition(Partitioning::RoundRobinBatch(partitions))?
+ }
+
+ // create the physical plan
+ let csv = csv.create_physical_plan().await?;
+
+ let output_path = output_root_path.join(table);
+ let output_path = output_path.to_str().unwrap().to_owned();
+
+ println!(
+ "Converting '{}' to {} files in directory '{}'",
+ &input_path, self.file_format, &output_path
+ );
+ match self.file_format.as_str() {
+ "csv" => ctx.write_csv(csv, output_path).await?,
+ "parquet" => {
+ let props = WriterProperties::builder()
+ .set_compression(compression)
+ .build();
+ ctx.write_parquet(csv, output_path, Some(props)).await?
+ }
+ other => {
+ return Err(DataFusionError::NotImplemented(format!(
+ "Invalid output format: {other}"
+ )));
+ }
+ }
+ println!("Conversion completed in {} ms",
start.elapsed().as_millis());
+ }
+
+ Ok(())
+ }
+
+ /// return the compression method to use when writing parquet
+ fn compression(&self) -> Result<Compression> {
+ Ok(match self.compression.as_str() {
+ "none" => Compression::UNCOMPRESSED,
+ "snappy" => Compression::SNAPPY,
+ "brotli" => Compression::BROTLI(Default::default()),
+ "gzip" => Compression::GZIP(Default::default()),
+ "lz4" => Compression::LZ4,
+ "lz0" => Compression::LZO,
+ "zstd" => Compression::ZSTD(Default::default()),
+ other => {
+ return Err(DataFusionError::NotImplemented(format!(
+ "Invalid compression format: {other}"
+ )));
+ }
+ })
+ }
+}
diff --git a/benchmarks/src/tpch.rs b/benchmarks/src/tpch/mod.rs
similarity index 74%
rename from benchmarks/src/tpch.rs
rename to benchmarks/src/tpch/mod.rs
index 58b9c3637c..1b6350278d 100644
--- a/benchmarks/src/tpch.rs
+++ b/benchmarks/src/tpch/mod.rs
@@ -15,18 +15,20 @@
// specific language governing permissions and limitations
// under the License.
-use arrow::datatypes::SchemaBuilder;
-use std::fs;
-use std::path::Path;
-use std::time::Instant;
+//! Benchmark derived from TPC-H. This is not an official TPC-H benchmark.
-use datafusion::prelude::*;
+use arrow::datatypes::SchemaBuilder;
use datafusion::{
arrow::datatypes::{DataType, Field, Schema},
error::{DataFusionError, Result},
};
-use parquet::basic::Compression;
-use parquet::file::properties::WriterProperties;
+use std::fs;
+
+mod run;
+pub use run::RunOpt;
+
+mod convert;
+pub use convert::ConvertOpt;
pub const TPCH_TABLES: &[&str] = &[
"part", "supplier", "partsupp", "customer", "orders", "lineitem",
"nation", "region",
@@ -166,78 +168,6 @@ pub fn get_query_sql(query: usize) -> Result<Vec<String>> {
}
}
-/// Conver tbl (csv) file to parquet
-pub async fn convert_tbl(
- input_path: &str,
- output_path: &str,
- file_format: &str,
- partitions: usize,
- batch_size: usize,
- compression: Compression,
-) -> Result<()> {
- let output_root_path = Path::new(output_path);
- for table in TPCH_TABLES {
- let start = Instant::now();
- let schema = get_tbl_tpch_table_schema(table);
-
- let input_path = format!("{input_path}/{table}.tbl");
- let options = CsvReadOptions::new()
- .schema(&schema)
- .has_header(false)
- .delimiter(b'|')
- .file_extension(".tbl");
-
- let config = SessionConfig::new().with_batch_size(batch_size);
- let ctx = SessionContext::with_config(config);
-
- // build plan to read the TBL file
- let mut csv = ctx.read_csv(&input_path, options).await?;
-
- // Select all apart from the padding column
- let selection = csv
- .schema()
- .fields()
- .iter()
- .take(schema.fields.len() - 1)
- .map(|d| Expr::Column(d.qualified_column()))
- .collect();
-
- csv = csv.select(selection)?;
- // optionally, repartition the file
- if partitions > 1 {
- csv = csv.repartition(Partitioning::RoundRobinBatch(partitions))?
- }
-
- // create the physical plan
- let csv = csv.create_physical_plan().await?;
-
- let output_path = output_root_path.join(table);
- let output_path = output_path.to_str().unwrap().to_owned();
-
- println!(
- "Converting '{}' to {} files in directory '{}'",
- &input_path, &file_format, &output_path
- );
- match file_format {
- "csv" => ctx.write_csv(csv, output_path).await?,
- "parquet" => {
- let props = WriterProperties::builder()
- .set_compression(compression)
- .build();
- ctx.write_parquet(csv, output_path, Some(props)).await?
- }
- other => {
- return Err(DataFusionError::NotImplemented(format!(
- "Invalid output format: {other}"
- )));
- }
- }
- println!("Conversion completed in {} ms", start.elapsed().as_millis());
- }
-
- Ok(())
-}
-
pub const QUERY_LIMIT: [Option<usize>; 22] = [
None,
Some(100),
diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs
new file mode 100644
index 0000000000..6aada30bc7
--- /dev/null
+++ b/benchmarks/src/tpch/run.rs
@@ -0,0 +1,442 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::get_query_sql;
+use crate::BenchmarkRun;
+use arrow::record_batch::RecordBatch;
+use arrow::util::pretty::{self, pretty_format_batches};
+use datafusion::datasource::file_format::csv::{CsvFormat,
DEFAULT_CSV_EXTENSION};
+use datafusion::datasource::file_format::parquet::{
+ ParquetFormat, DEFAULT_PARQUET_EXTENSION,
+};
+use datafusion::datasource::file_format::FileFormat;
+use datafusion::datasource::listing::{
+ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
+};
+use datafusion::datasource::{MemTable, TableProvider};
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use datafusion::physical_plan::{collect, displayable};
+use log::info;
+
+use std::path::PathBuf;
+use std::sync::Arc;
+use std::time::Instant;
+
+use datafusion::error::Result;
+use datafusion::prelude::*;
+use structopt::StructOpt;
+
+use super::{get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_TABLES};
+
+/// Run the tpch benchmark
+#[derive(Debug, StructOpt, Clone)]
+pub struct RunOpt {
+ /// Query number. If not specified, runs all queries
+ #[structopt(short, long)]
+ query: Option<usize>,
+
+ /// Activate debug mode to see query results
+ #[structopt(short, long)]
+ debug: bool,
+
+ /// Number of iterations of each test run
+ #[structopt(short = "i", long = "iterations", default_value = "3")]
+ iterations: usize,
+
+ /// Number of partitions to process in parallel
+ #[structopt(short = "n", long = "partitions", default_value = "2")]
+ partitions: usize,
+
+ /// Batch size when reading CSV or Parquet files
+ #[structopt(short = "s", long = "batch-size", default_value = "8192")]
+ batch_size: usize,
+
+ /// Path to data files
+ #[structopt(parse(from_os_str), required = true, short = "p", long =
"path")]
+ path: PathBuf,
+
+ /// File format: `csv` or `parquet`
+ #[structopt(short = "f", long = "format", default_value = "csv")]
+ file_format: String,
+
+ /// Load the data into a MemTable before executing the query
+ #[structopt(short = "m", long = "mem-table")]
+ mem_table: bool,
+
+ /// Path to machine readable output file
+ #[structopt(parse(from_os_str), short = "o", long = "output")]
+ output_path: Option<PathBuf>,
+
+ /// Whether to disable collection of statistics (and cost based
optimizations) or not.
+ #[structopt(short = "S", long = "disable-statistics")]
+ disable_statistics: bool,
+}
+
+const TPCH_QUERY_START_ID: usize = 1;
+const TPCH_QUERY_END_ID: usize = 22;
+
+impl RunOpt {
+ pub async fn run(&self) -> Result<()> {
+ println!("Running benchmarks with the following options: {self:?}");
+ let query_range = match self.query {
+ Some(query_id) => query_id..=query_id,
+ None => TPCH_QUERY_START_ID..=TPCH_QUERY_END_ID,
+ };
+
+ let mut benchmark_run = BenchmarkRun::new();
+ for query_id in query_range {
+ benchmark_run.start_new_case(&format!("Query {query_id}"));
+ let query_run = self.benchmark_query(query_id).await?;
+ for iter in query_run {
+ benchmark_run.write_iter(iter.elapsed, iter.row_count);
+ }
+ }
+ benchmark_run.maybe_write_json(self.output_path.as_ref())?;
+ Ok(())
+ }
+
+ async fn benchmark_query(&self, query_id: usize) ->
Result<Vec<QueryResult>> {
+ let config = SessionConfig::new()
+ .with_target_partitions(self.partitions)
+ .with_batch_size(self.batch_size)
+ .with_collect_statistics(!self.disable_statistics);
+ let ctx = SessionContext::with_config(config);
+
+ // register tables
+ self.register_tables(&ctx).await?;
+
+ let mut millis = vec![];
+ // run benchmark
+ let mut query_results = vec![];
+ for i in 0..self.iterations {
+ let start = Instant::now();
+
+ let sql = &get_query_sql(query_id)?;
+
+ // query 15 is special, with 3 statements. the second statement is
the one from which we
+ // want to capture the results
+ let mut result = vec![];
+ if query_id == 15 {
+ for (n, query) in sql.iter().enumerate() {
+ if n == 1 {
+ result = self.execute_query(&ctx, query).await?;
+ } else {
+ self.execute_query(&ctx, query).await?;
+ }
+ }
+ } else {
+ for query in sql {
+ result = self.execute_query(&ctx, query).await?;
+ }
+ }
+
+ let elapsed = start.elapsed(); //.as_secs_f64() * 1000.0;
+ let ms = elapsed.as_secs_f64() * 1000.0;
+ millis.push(ms);
+ info!("output:\n\n{}\n\n", pretty_format_batches(&result)?);
+ let row_count = result.iter().map(|b| b.num_rows()).sum();
+ println!(
+ "Query {query_id} iteration {i} took {ms:.1} ms and returned
{row_count} rows"
+ );
+ query_results.push(QueryResult { elapsed, row_count });
+ }
+
+ let avg = millis.iter().sum::<f64>() / millis.len() as f64;
+ println!("Query {query_id} avg time: {avg:.2} ms");
+
+ Ok(query_results)
+ }
+
+ async fn register_tables(&self, ctx: &SessionContext) -> Result<()> {
+ for table in TPCH_TABLES {
+ let table_provider = { self.get_table(ctx, table).await? };
+
+ if self.mem_table {
+ println!("Loading table '{table}' into memory");
+ let start = Instant::now();
+ let memtable =
+ MemTable::load(table_provider, Some(self.partitions),
&ctx.state())
+ .await?;
+ println!(
+ "Loaded table '{}' into memory in {} ms",
+ table,
+ start.elapsed().as_millis()
+ );
+ ctx.register_table(*table, Arc::new(memtable))?;
+ } else {
+ ctx.register_table(*table, table_provider)?;
+ }
+ }
+ Ok(())
+ }
+
+ async fn execute_query(
+ &self,
+ ctx: &SessionContext,
+ sql: &str,
+ ) -> Result<Vec<RecordBatch>> {
+ let debug = self.debug;
+ let plan = ctx.sql(sql).await?;
+ let (state, plan) = plan.into_parts();
+
+ if debug {
+ println!("=== Logical plan ===\n{plan:?}\n");
+ }
+
+ let plan = state.optimize(&plan)?;
+ if debug {
+ println!("=== Optimized logical plan ===\n{plan:?}\n");
+ }
+ let physical_plan = state.create_physical_plan(&plan).await?;
+ if debug {
+ println!(
+ "=== Physical plan ===\n{}\n",
+ displayable(physical_plan.as_ref()).indent(true)
+ );
+ }
+ let result = collect(physical_plan.clone(), state.task_ctx()).await?;
+ if debug {
+ println!(
+ "=== Physical plan with metrics ===\n{}\n",
+ DisplayableExecutionPlan::with_metrics(physical_plan.as_ref())
+ .indent(true)
+ );
+ if !result.is_empty() {
+ // do not call print_batches if there are no batches as the
result is confusing
+ // and makes it look like there is a batch with no columns
+ pretty::print_batches(&result)?;
+ }
+ }
+ Ok(result)
+ }
+
+ async fn get_table(
+ &self,
+ ctx: &SessionContext,
+ table: &str,
+ ) -> Result<Arc<dyn TableProvider>> {
+ let path = self.path.to_str().unwrap();
+ let table_format = self.file_format.as_str();
+ let target_partitions = self.partitions;
+
+ // Obtain a snapshot of the SessionState
+ let state = ctx.state();
+ let (format, path, extension): (Arc<dyn FileFormat>, String, &'static
str) =
+ match table_format {
+ // dbgen creates .tbl ('|' delimited) files without header
+ "tbl" => {
+ let path = format!("{path}/{table}.tbl");
+
+ let format = CsvFormat::default()
+ .with_delimiter(b'|')
+ .with_has_header(false);
+
+ (Arc::new(format), path, ".tbl")
+ }
+ "csv" => {
+ let path = format!("{path}/{table}");
+ let format = CsvFormat::default()
+ .with_delimiter(b',')
+ .with_has_header(true);
+
+ (Arc::new(format), path, DEFAULT_CSV_EXTENSION)
+ }
+ "parquet" => {
+ let path = format!("{path}/{table}");
+ let format =
ParquetFormat::default().with_enable_pruning(Some(true));
+
+ (Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
+ }
+ other => {
+ unimplemented!("Invalid file format '{}'", other);
+ }
+ };
+
+ let options = ListingOptions::new(format)
+ .with_file_extension(extension)
+ .with_target_partitions(target_partitions)
+ .with_collect_stat(state.config().collect_statistics());
+
+ let table_path = ListingTableUrl::parse(path)?;
+ let config =
ListingTableConfig::new(table_path).with_listing_options(options);
+
+ let config = match table_format {
+ "parquet" => config.infer_schema(&state).await?,
+ "tbl" =>
config.with_schema(Arc::new(get_tbl_tpch_table_schema(table))),
+ "csv" =>
config.with_schema(Arc::new(get_tpch_table_schema(table))),
+ _ => unreachable!(),
+ };
+
+ Ok(Arc::new(ListingTable::try_new(config)?))
+ }
+}
+
+struct QueryResult {
+ elapsed: std::time::Duration,
+ row_count: usize,
+}
+
+#[cfg(test)]
+// Only run with "ci" mode when we have the data
+#[cfg(feature = "ci")]
+mod tests {
+ use super::*;
+ use datafusion::error::{DataFusionError, Result};
+ use std::path::Path;
+
+ use datafusion_proto::bytes::{
+ logical_plan_from_bytes, logical_plan_to_bytes,
physical_plan_from_bytes,
+ physical_plan_to_bytes,
+ };
+
+ fn get_tpch_data_path() -> Result<String> {
+ let path =
+ std::env::var("TPCH_DATA").unwrap_or_else(|_|
"benchmarks/data".to_string());
+ if !Path::new(&path).exists() {
+ return Err(DataFusionError::Execution(format!(
+ "Benchmark data not found (set TPCH_DATA env var to override):
{}",
+ path
+ )));
+ }
+ Ok(path)
+ }
+
+ async fn round_trip_logical_plan(query: usize) -> Result<()> {
+ let ctx = SessionContext::default();
+ let path = get_tpch_data_path()?;
+ let opt = RunOpt {
+ query: Some(query),
+ debug: false,
+ iterations: 1,
+ partitions: 2,
+ batch_size: 8192,
+ path: PathBuf::from(path.to_string()),
+ file_format: "tbl".to_string(),
+ mem_table: false,
+ output_path: None,
+ disable_statistics: false,
+ };
+ opt.register_tables(&ctx).await?;
+ let queries = get_query_sql(query)?;
+ for query in queries {
+ let plan = ctx.sql(&query).await?;
+ let plan = plan.into_optimized_plan()?;
+ let bytes = logical_plan_to_bytes(&plan)?;
+ let plan2 = logical_plan_from_bytes(&bytes, &ctx)?;
+ let plan_formatted = format!("{}", plan.display_indent());
+ let plan2_formatted = format!("{}", plan2.display_indent());
+ assert_eq!(plan_formatted, plan2_formatted);
+ }
+ Ok(())
+ }
+
+ async fn round_trip_physical_plan(query: usize) -> Result<()> {
+ let ctx = SessionContext::default();
+ let path = get_tpch_data_path()?;
+ let opt = RunOpt {
+ query: Some(query),
+ debug: false,
+ iterations: 1,
+ partitions: 2,
+ batch_size: 8192,
+ path: PathBuf::from(path.to_string()),
+ file_format: "tbl".to_string(),
+ mem_table: false,
+ output_path: None,
+ disable_statistics: false,
+ };
+ opt.register_tables(&ctx).await?;
+ let queries = get_query_sql(query)?;
+ for query in queries {
+ let plan = ctx.sql(&query).await?;
+ let plan = plan.create_physical_plan().await?;
+ let bytes = physical_plan_to_bytes(plan.clone())?;
+ let plan2 = physical_plan_from_bytes(&bytes, &ctx)?;
+ let plan_formatted = format!("{}",
displayable(plan.as_ref()).indent(false));
+ let plan2_formatted =
+ format!("{}", displayable(plan2.as_ref()).indent(false));
+ assert_eq!(plan_formatted, plan2_formatted);
+ }
+ Ok(())
+ }
+
+ macro_rules! test_round_trip_logical {
+ ($tn:ident, $query:expr) => {
+ #[tokio::test]
+ async fn $tn() -> Result<()> {
+ round_trip_logical_plan($query).await
+ }
+ };
+ }
+
+ macro_rules! test_round_trip_physical {
+ ($tn:ident, $query:expr) => {
+ #[tokio::test]
+ async fn $tn() -> Result<()> {
+ round_trip_physical_plan($query).await
+ }
+ };
+ }
+
+ // logical plan tests
+ test_round_trip_logical!(round_trip_logical_plan_q1, 1);
+ test_round_trip_logical!(round_trip_logical_plan_q2, 2);
+ test_round_trip_logical!(round_trip_logical_plan_q3, 3);
+ test_round_trip_logical!(round_trip_logical_plan_q4, 4);
+ test_round_trip_logical!(round_trip_logical_plan_q5, 5);
+ test_round_trip_logical!(round_trip_logical_plan_q6, 6);
+ test_round_trip_logical!(round_trip_logical_plan_q7, 7);
+ test_round_trip_logical!(round_trip_logical_plan_q8, 8);
+ test_round_trip_logical!(round_trip_logical_plan_q9, 9);
+ test_round_trip_logical!(round_trip_logical_plan_q10, 10);
+ test_round_trip_logical!(round_trip_logical_plan_q11, 11);
+ test_round_trip_logical!(round_trip_logical_plan_q12, 12);
+ test_round_trip_logical!(round_trip_logical_plan_q13, 13);
+ test_round_trip_logical!(round_trip_logical_plan_q14, 14);
+ test_round_trip_logical!(round_trip_logical_plan_q15, 15);
+ test_round_trip_logical!(round_trip_logical_plan_q16, 16);
+ test_round_trip_logical!(round_trip_logical_plan_q17, 17);
+ test_round_trip_logical!(round_trip_logical_plan_q18, 18);
+ test_round_trip_logical!(round_trip_logical_plan_q19, 19);
+ test_round_trip_logical!(round_trip_logical_plan_q20, 20);
+ test_round_trip_logical!(round_trip_logical_plan_q21, 21);
+ test_round_trip_logical!(round_trip_logical_plan_q22, 22);
+
+ // physical plan tests
+ test_round_trip_physical!(round_trip_physical_plan_q1, 1);
+ test_round_trip_physical!(round_trip_physical_plan_q2, 2);
+ test_round_trip_physical!(round_trip_physical_plan_q3, 3);
+ test_round_trip_physical!(round_trip_physical_plan_q4, 4);
+ test_round_trip_physical!(round_trip_physical_plan_q5, 5);
+ test_round_trip_physical!(round_trip_physical_plan_q6, 6);
+ test_round_trip_physical!(round_trip_physical_plan_q7, 7);
+ test_round_trip_physical!(round_trip_physical_plan_q8, 8);
+ test_round_trip_physical!(round_trip_physical_plan_q9, 9);
+ test_round_trip_physical!(round_trip_physical_plan_q10, 10);
+ test_round_trip_physical!(round_trip_physical_plan_q11, 11);
+ test_round_trip_physical!(round_trip_physical_plan_q12, 12);
+ test_round_trip_physical!(round_trip_physical_plan_q13, 13);
+ test_round_trip_physical!(round_trip_physical_plan_q14, 14);
+ test_round_trip_physical!(round_trip_physical_plan_q15, 15);
+ test_round_trip_physical!(round_trip_physical_plan_q16, 16);
+ test_round_trip_physical!(round_trip_physical_plan_q17, 17);
+ test_round_trip_physical!(round_trip_physical_plan_q18, 18);
+ test_round_trip_physical!(round_trip_physical_plan_q19, 19);
+ test_round_trip_physical!(round_trip_physical_plan_q20, 20);
+ test_round_trip_physical!(round_trip_physical_plan_q21, 21);
+ test_round_trip_physical!(round_trip_physical_plan_q22, 22);
+}