This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a7e71a71fb perf: Add TopK benchmarks as variation over the `sort_tpch`
benchmarks (#15560)
a7e71a71fb is described below
commit a7e71a71fba59a3a20c619b51b5fbec565bf0dcc
Author: Geoffrey Claude <[email protected]>
AuthorDate: Fri Apr 4 18:25:39 2025 +0200
perf: Add TopK benchmarks as variation over the `sort_tpch` benchmarks
(#15560)
* perf: Add TopK benchmarks as variation over the `sort_tpch` benchmarks
* doc: Document TopK benchmark options: `--sorted` and `--limit`
---
benchmarks/README.md | 25 ++++++++++++++++++++++---
benchmarks/src/sort_tpch.rs | 34 +++++++++++++++++++++++++++++-----
benchmarks/src/tpch/convert.rs | 22 +++++++++++++++++++---
benchmarks/src/tpch/run.rs | 31 +++++++++++++++++++++++--------
4 files changed, 93 insertions(+), 19 deletions(-)
diff --git a/benchmarks/README.md b/benchmarks/README.md
index 8acaa298bd..86b2e1b3b9 100644
--- a/benchmarks/README.md
+++ b/benchmarks/README.md
@@ -200,6 +200,16 @@ cargo run --release --bin tpch -- convert --input ./data
--output /mnt/tpch-parq
Or if you want to verify and run all the queries in the benchmark, you can
just run `cargo test`.
+#### Sorted Conversion
+
+The TPCH tables generated by the dbgen utility are sorted by their first
column (their primary key for most tables, the `l_orderkey` column for the
`lineitem` table.)
+
+To preserve this sorted order information during conversion (useful for
benchmarking execution on pre-sorted data) include the `--sort` flag:
+
+```bash
+cargo run --release --bin tpch -- convert --input ./data --output
/mnt/tpch-sorted-parquet --format parquet --sort
+```
+
### Comparing results between runs
Any `dfbench` execution with `-o <dir>` argument will produce a
@@ -445,20 +455,29 @@ Test performance of end-to-end sort SQL queries. (While
the `Sort` benchmark foc
Sort integration benchmark runs whole table sort queries on TPCH `lineitem`
table, with different characteristics. For example, different number of sort
keys, different sort key cardinality, different number of payload columns, etc.
+If the TPCH tables have been converted as sorted on their first column (see
[Sorted Conversion](#sorted-conversion)), you can use the `--sorted` flag to
indicate that the input data is pre-sorted, allowing DataFusion to leverage
that order during query execution.
+
+Additionally, an optional `--limit` flag is available for the sort benchmark.
When specified, this flag appends a `LIMIT n` clause to the SQL query,
effectively converting the query into a TopK query. Combining the `--sorted`
and `--limit` options enables benchmarking of TopK queries on pre-sorted inputs.
+
See [`sort_tpch.rs`](src/sort_tpch.rs) for more details.
### Sort TPCH Benchmark Example Runs
1. Run all queries with default setting:
```bash
- cargo run --release --bin dfbench -- sort-tpch -p
'....../datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_tpch.json'
+ cargo run --release --bin dfbench -- sort-tpch -p
'./datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_tpch.json'
```
2. Run a specific query:
```bash
- cargo run --release --bin dfbench -- sort-tpch -p
'....../datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_tpch.json' --query 2
+ cargo run --release --bin dfbench -- sort-tpch -p
'./datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_tpch.json' --query 2
```
-3. Run all queries with `bench.sh` script:
+3. Run all queries as TopK queries on presorted data:
+```bash
+ cargo run --release --bin dfbench -- sort-tpch --sorted --limit 10 -p
'./datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_tpch.json'
+```
+
+4. Run all queries with `bench.sh` script:
```bash
./bench.sh run sort_tpch
```
diff --git a/benchmarks/src/sort_tpch.rs b/benchmarks/src/sort_tpch.rs
index 956bb92b6c..176234eca5 100644
--- a/benchmarks/src/sort_tpch.rs
+++ b/benchmarks/src/sort_tpch.rs
@@ -63,6 +63,15 @@ pub struct RunOpt {
/// Load the data into a MemTable before executing the query
#[structopt(short = "m", long = "mem-table")]
mem_table: bool,
+
+ /// Mark the first column of each table as sorted in ascending order.
+ /// The tables should have been created with the `--sort` option for this
to have any effect.
+ #[structopt(short = "t", long = "sorted")]
+ sorted: bool,
+
+ /// Append a `LIMIT n` clause to the query
+ #[structopt(short = "l", long = "limit")]
+ limit: Option<usize>,
}
struct QueryResult {
@@ -163,7 +172,7 @@ impl RunOpt {
r#"
SELECT l_shipmode, l_comment, l_partkey
FROM lineitem
- ORDER BY l_shipmode;
+ ORDER BY l_shipmode
"#,
];
@@ -212,9 +221,14 @@ impl RunOpt {
let start = Instant::now();
let query_idx = query_id - 1; // 1-indexed -> 0-indexed
- let sql = Self::SORT_QUERIES[query_idx];
+ let base_sql = Self::SORT_QUERIES[query_idx].to_string();
+ let sql = if let Some(limit) = self.limit {
+ format!("{base_sql} LIMIT {limit}")
+ } else {
+ base_sql
+ };
- let row_count = self.execute_query(&ctx, sql).await?;
+ let row_count = self.execute_query(&ctx, sql.as_str()).await?;
let elapsed = start.elapsed(); //.as_secs_f64() * 1000.0;
let ms = elapsed.as_secs_f64() * 1000.0;
@@ -315,8 +329,18 @@ impl RunOpt {
.with_collect_stat(state.config().collect_statistics());
let table_path = ListingTableUrl::parse(path)?;
- let config =
ListingTableConfig::new(table_path).with_listing_options(options);
- let config = config.infer_schema(&state).await?;
+ let schema = options.infer_schema(&state, &table_path).await?;
+ let options = if self.sorted {
+ let key_column_name = schema.fields()[0].name();
+ options
+
.with_file_sort_order(vec![vec![col(key_column_name).sort(true, false)]])
+ } else {
+ options
+ };
+
+ let config = ListingTableConfig::new(table_path)
+ .with_listing_options(options)
+ .with_schema(schema);
Ok(Arc::new(ListingTable::try_new(config)?))
}
diff --git a/benchmarks/src/tpch/convert.rs b/benchmarks/src/tpch/convert.rs
index 7f391d9300..5219e09cd3 100644
--- a/benchmarks/src/tpch/convert.rs
+++ b/benchmarks/src/tpch/convert.rs
@@ -22,15 +22,14 @@ use std::path::{Path, PathBuf};
use datafusion::common::not_impl_err;
+use super::get_tbl_tpch_table_schema;
+use super::TPCH_TABLES;
use datafusion::error::Result;
use datafusion::prelude::*;
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
use structopt::StructOpt;
-use super::get_tbl_tpch_table_schema;
-use super::TPCH_TABLES;
-
/// Convert tpch .slt files to .parquet or .csv files
#[derive(Debug, StructOpt)]
pub struct ConvertOpt {
@@ -57,6 +56,10 @@ pub struct ConvertOpt {
/// Batch size when reading CSV or Parquet files
#[structopt(short = "s", long = "batch-size", default_value = "8192")]
batch_size: usize,
+
+ /// Sort each table by its first column in ascending order.
+ #[structopt(short = "t", long = "sort")]
+ sort: bool,
}
impl ConvertOpt {
@@ -70,6 +73,7 @@ impl ConvertOpt {
for table in TPCH_TABLES {
let start = Instant::now();
let schema = get_tbl_tpch_table_schema(table);
+ let key_column_name = schema.fields()[0].name();
let input_path = format!("{input_path}/{table}.tbl");
let options = CsvReadOptions::new()
@@ -77,6 +81,13 @@ impl ConvertOpt {
.has_header(false)
.delimiter(b'|')
.file_extension(".tbl");
+ let options = if self.sort {
+ // indicated that the file is already sorted by its first
column to speed up the conversion
+ options
+ .file_sort_order(vec![vec![col(key_column_name).sort(true,
false)]])
+ } else {
+ options
+ };
let config = SessionConfig::new().with_batch_size(self.batch_size);
let ctx = SessionContext::new_with_config(config);
@@ -99,6 +110,11 @@ impl ConvertOpt {
if partitions > 1 {
csv =
csv.repartition(Partitioning::RoundRobinBatch(partitions))?
}
+ let csv = if self.sort {
+ csv.sort_by(vec![col(key_column_name)])?
+ } else {
+ csv
+ };
// create the physical plan
let csv = csv.create_physical_plan().await?;
diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs
index eb9db821db..752a5a1a6b 100644
--- a/benchmarks/src/tpch/run.rs
+++ b/benchmarks/src/tpch/run.rs
@@ -90,6 +90,11 @@ pub struct RunOpt {
/// True by default.
#[structopt(short = "j", long = "prefer_hash_join", default_value =
"true")]
prefer_hash_join: BoolDefaultTrue,
+
+ /// Mark the first column of each table as sorted in ascending order.
+ /// The tables should have been created with the `--sort` option for this
to have any effect.
+ #[structopt(short = "t", long = "sorted")]
+ sorted: bool,
}
const TPCH_QUERY_START_ID: usize = 1;
@@ -275,20 +280,28 @@ impl RunOpt {
}
};
+ let table_path = ListingTableUrl::parse(path)?;
let options = ListingOptions::new(format)
.with_file_extension(extension)
.with_target_partitions(target_partitions)
.with_collect_stat(state.config().collect_statistics());
-
- let table_path = ListingTableUrl::parse(path)?;
- let config =
ListingTableConfig::new(table_path).with_listing_options(options);
-
- let config = match table_format {
- "parquet" => config.infer_schema(&state).await?,
- "tbl" =>
config.with_schema(Arc::new(get_tbl_tpch_table_schema(table))),
- "csv" =>
config.with_schema(Arc::new(get_tpch_table_schema(table))),
+ let schema = match table_format {
+ "parquet" => options.infer_schema(&state, &table_path).await?,
+ "tbl" => Arc::new(get_tbl_tpch_table_schema(table)),
+ "csv" => Arc::new(get_tpch_table_schema(table)),
_ => unreachable!(),
};
+ let options = if self.sorted {
+ let key_column_name = schema.fields()[0].name();
+ options
+
.with_file_sort_order(vec![vec![col(key_column_name).sort(true, false)]])
+ } else {
+ options
+ };
+
+ let config = ListingTableConfig::new(table_path)
+ .with_listing_options(options)
+ .with_schema(schema);
Ok(Arc::new(ListingTable::try_new(config)?))
}
@@ -357,6 +370,7 @@ mod tests {
output_path: None,
disable_statistics: false,
prefer_hash_join: true,
+ sorted: false,
};
opt.register_tables(&ctx).await?;
let queries = get_query_sql(query)?;
@@ -393,6 +407,7 @@ mod tests {
output_path: None,
disable_statistics: false,
prefer_hash_join: true,
+ sorted: false,
};
opt.register_tables(&ctx).await?;
let queries = get_query_sql(query)?;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]