This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a7e71a71fb perf: Add TopK benchmarks as variation over the `sort_tpch` 
benchmarks (#15560)
a7e71a71fb is described below

commit a7e71a71fba59a3a20c619b51b5fbec565bf0dcc
Author: Geoffrey Claude <[email protected]>
AuthorDate: Fri Apr 4 18:25:39 2025 +0200

    perf: Add TopK benchmarks as variation over the `sort_tpch` benchmarks 
(#15560)
    
    * perf: Add TopK benchmarks as variation over the `sort_tpch` benchmarks
    
    * doc: Document TopK benchmark options: `--sorted` and `--limit`
---
 benchmarks/README.md           | 25 ++++++++++++++++++++++---
 benchmarks/src/sort_tpch.rs    | 34 +++++++++++++++++++++++++++++-----
 benchmarks/src/tpch/convert.rs | 22 +++++++++++++++++++---
 benchmarks/src/tpch/run.rs     | 31 +++++++++++++++++++++++--------
 4 files changed, 93 insertions(+), 19 deletions(-)

diff --git a/benchmarks/README.md b/benchmarks/README.md
index 8acaa298bd..86b2e1b3b9 100644
--- a/benchmarks/README.md
+++ b/benchmarks/README.md
@@ -200,6 +200,16 @@ cargo run --release --bin tpch -- convert --input ./data 
--output /mnt/tpch-parq
 
 Or if you want to verify and run all the queries in the benchmark, you can 
just run `cargo test`.
 
+#### Sorted Conversion
+
+The TPCH tables generated by the dbgen utility are sorted by their first 
column (their primary key for most tables, the `l_orderkey` column for the 
`lineitem` table.)
+
+To preserve this sorted order information during conversion (useful for 
benchmarking execution on pre-sorted data) include the `--sort` flag:
+
+```bash
+cargo run --release --bin tpch -- convert --input ./data --output 
/mnt/tpch-sorted-parquet --format parquet --sort
+```
+
 ### Comparing results between runs
 
 Any `dfbench` execution with `-o <dir>` argument will produce a
@@ -445,20 +455,29 @@ Test performance of end-to-end sort SQL queries. (While 
the `Sort` benchmark foc
 
 Sort integration benchmark runs whole table sort queries on TPCH `lineitem` 
table, with different characteristics. For example, different number of sort 
keys, different sort key cardinality, different number of payload columns, etc.
 
+If the TPCH tables have been converted as sorted on their first column (see 
[Sorted Conversion](#sorted-conversion)), you can use the `--sorted` flag to 
indicate that the input data is pre-sorted, allowing DataFusion to leverage 
that order during query execution.
+
+Additionally, an optional `--limit` flag is available for the sort benchmark. 
When specified, this flag appends a `LIMIT n` clause to the SQL query, 
effectively converting the query into a TopK query. Combining the `--sorted` 
and `--limit` options enables benchmarking of TopK queries on pre-sorted inputs.
+
 See [`sort_tpch.rs`](src/sort_tpch.rs) for more details.
 
 ### Sort TPCH Benchmark Example Runs
 1. Run all queries with default setting:
 ```bash
- cargo run --release --bin dfbench -- sort-tpch -p 
'....../datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_tpch.json'
+ cargo run --release --bin dfbench -- sort-tpch -p 
'./datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_tpch.json'
 ```
 
 2. Run a specific query:
 ```bash
- cargo run --release --bin dfbench -- sort-tpch -p 
'....../datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_tpch.json' --query 2
+ cargo run --release --bin dfbench -- sort-tpch -p 
'./datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_tpch.json' --query 2
 ```
 
-3. Run all queries with `bench.sh` script:
+3. Run all queries as TopK queries on presorted data:
+```bash
+ cargo run --release --bin dfbench -- sort-tpch --sorted --limit 10 -p 
'./datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_tpch.json'
+```
+
+4. Run all queries with `bench.sh` script:
 ```bash
 ./bench.sh run sort_tpch
 ```
diff --git a/benchmarks/src/sort_tpch.rs b/benchmarks/src/sort_tpch.rs
index 956bb92b6c..176234eca5 100644
--- a/benchmarks/src/sort_tpch.rs
+++ b/benchmarks/src/sort_tpch.rs
@@ -63,6 +63,15 @@ pub struct RunOpt {
     /// Load the data into a MemTable before executing the query
     #[structopt(short = "m", long = "mem-table")]
     mem_table: bool,
+
+    /// Mark the first column of each table as sorted in ascending order.
+    /// The tables should have been created with the `--sort` option for this 
to have any effect.
+    #[structopt(short = "t", long = "sorted")]
+    sorted: bool,
+
+    /// Append a `LIMIT n` clause to the query
+    #[structopt(short = "l", long = "limit")]
+    limit: Option<usize>,
 }
 
 struct QueryResult {
@@ -163,7 +172,7 @@ impl RunOpt {
         r#"
         SELECT l_shipmode, l_comment, l_partkey
         FROM lineitem
-        ORDER BY l_shipmode;
+        ORDER BY l_shipmode
         "#,
     ];
 
@@ -212,9 +221,14 @@ impl RunOpt {
             let start = Instant::now();
 
             let query_idx = query_id - 1; // 1-indexed -> 0-indexed
-            let sql = Self::SORT_QUERIES[query_idx];
+            let base_sql = Self::SORT_QUERIES[query_idx].to_string();
+            let sql = if let Some(limit) = self.limit {
+                format!("{base_sql} LIMIT {limit}")
+            } else {
+                base_sql
+            };
 
-            let row_count = self.execute_query(&ctx, sql).await?;
+            let row_count = self.execute_query(&ctx, sql.as_str()).await?;
 
             let elapsed = start.elapsed(); //.as_secs_f64() * 1000.0;
             let ms = elapsed.as_secs_f64() * 1000.0;
@@ -315,8 +329,18 @@ impl RunOpt {
             .with_collect_stat(state.config().collect_statistics());
 
         let table_path = ListingTableUrl::parse(path)?;
-        let config = 
ListingTableConfig::new(table_path).with_listing_options(options);
-        let config = config.infer_schema(&state).await?;
+        let schema = options.infer_schema(&state, &table_path).await?;
+        let options = if self.sorted {
+            let key_column_name = schema.fields()[0].name();
+            options
+                
.with_file_sort_order(vec![vec![col(key_column_name).sort(true, false)]])
+        } else {
+            options
+        };
+
+        let config = ListingTableConfig::new(table_path)
+            .with_listing_options(options)
+            .with_schema(schema);
 
         Ok(Arc::new(ListingTable::try_new(config)?))
     }
diff --git a/benchmarks/src/tpch/convert.rs b/benchmarks/src/tpch/convert.rs
index 7f391d9300..5219e09cd3 100644
--- a/benchmarks/src/tpch/convert.rs
+++ b/benchmarks/src/tpch/convert.rs
@@ -22,15 +22,14 @@ use std::path::{Path, PathBuf};
 
 use datafusion::common::not_impl_err;
 
+use super::get_tbl_tpch_table_schema;
+use super::TPCH_TABLES;
 use datafusion::error::Result;
 use datafusion::prelude::*;
 use parquet::basic::Compression;
 use parquet::file::properties::WriterProperties;
 use structopt::StructOpt;
 
-use super::get_tbl_tpch_table_schema;
-use super::TPCH_TABLES;
-
 /// Convert tpch .slt files to .parquet or .csv files
 #[derive(Debug, StructOpt)]
 pub struct ConvertOpt {
@@ -57,6 +56,10 @@ pub struct ConvertOpt {
     /// Batch size when reading CSV or Parquet files
     #[structopt(short = "s", long = "batch-size", default_value = "8192")]
     batch_size: usize,
+
+    /// Sort each table by its first column in ascending order.
+    #[structopt(short = "t", long = "sort")]
+    sort: bool,
 }
 
 impl ConvertOpt {
@@ -70,6 +73,7 @@ impl ConvertOpt {
         for table in TPCH_TABLES {
             let start = Instant::now();
             let schema = get_tbl_tpch_table_schema(table);
+            let key_column_name = schema.fields()[0].name();
 
             let input_path = format!("{input_path}/{table}.tbl");
             let options = CsvReadOptions::new()
@@ -77,6 +81,13 @@ impl ConvertOpt {
                 .has_header(false)
                 .delimiter(b'|')
                 .file_extension(".tbl");
+            let options = if self.sort {
+                // indicated that the file is already sorted by its first 
column to speed up the conversion
+                options
+                    .file_sort_order(vec![vec![col(key_column_name).sort(true, 
false)]])
+            } else {
+                options
+            };
 
             let config = SessionConfig::new().with_batch_size(self.batch_size);
             let ctx = SessionContext::new_with_config(config);
@@ -99,6 +110,11 @@ impl ConvertOpt {
             if partitions > 1 {
                 csv = 
csv.repartition(Partitioning::RoundRobinBatch(partitions))?
             }
+            let csv = if self.sort {
+                csv.sort_by(vec![col(key_column_name)])?
+            } else {
+                csv
+            };
 
             // create the physical plan
             let csv = csv.create_physical_plan().await?;
diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs
index eb9db821db..752a5a1a6b 100644
--- a/benchmarks/src/tpch/run.rs
+++ b/benchmarks/src/tpch/run.rs
@@ -90,6 +90,11 @@ pub struct RunOpt {
     /// True by default.
     #[structopt(short = "j", long = "prefer_hash_join", default_value = 
"true")]
     prefer_hash_join: BoolDefaultTrue,
+
+    /// Mark the first column of each table as sorted in ascending order.
+    /// The tables should have been created with the `--sort` option for this 
to have any effect.
+    #[structopt(short = "t", long = "sorted")]
+    sorted: bool,
 }
 
 const TPCH_QUERY_START_ID: usize = 1;
@@ -275,20 +280,28 @@ impl RunOpt {
                 }
             };
 
+        let table_path = ListingTableUrl::parse(path)?;
         let options = ListingOptions::new(format)
             .with_file_extension(extension)
             .with_target_partitions(target_partitions)
             .with_collect_stat(state.config().collect_statistics());
-
-        let table_path = ListingTableUrl::parse(path)?;
-        let config = 
ListingTableConfig::new(table_path).with_listing_options(options);
-
-        let config = match table_format {
-            "parquet" => config.infer_schema(&state).await?,
-            "tbl" => 
config.with_schema(Arc::new(get_tbl_tpch_table_schema(table))),
-            "csv" => 
config.with_schema(Arc::new(get_tpch_table_schema(table))),
+        let schema = match table_format {
+            "parquet" => options.infer_schema(&state, &table_path).await?,
+            "tbl" => Arc::new(get_tbl_tpch_table_schema(table)),
+            "csv" => Arc::new(get_tpch_table_schema(table)),
             _ => unreachable!(),
         };
+        let options = if self.sorted {
+            let key_column_name = schema.fields()[0].name();
+            options
+                
.with_file_sort_order(vec![vec![col(key_column_name).sort(true, false)]])
+        } else {
+            options
+        };
+
+        let config = ListingTableConfig::new(table_path)
+            .with_listing_options(options)
+            .with_schema(schema);
 
         Ok(Arc::new(ListingTable::try_new(config)?))
     }
@@ -357,6 +370,7 @@ mod tests {
             output_path: None,
             disable_statistics: false,
             prefer_hash_join: true,
+            sorted: false,
         };
         opt.register_tables(&ctx).await?;
         let queries = get_query_sql(query)?;
@@ -393,6 +407,7 @@ mod tests {
             output_path: None,
             disable_statistics: false,
             prefer_hash_join: true,
+            sorted: false,
         };
         opt.register_tables(&ctx).await?;
         let queries = get_query_sql(query)?;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to