This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 6514ec7b67 Adds Partitioned CSV test to object store access tests
(#18370)
6514ec7b67 is described below
commit 6514ec7b6757995cf9fad73f6aca3b8c42a9ff7b
Author: Blake Orth <[email protected]>
AuthorDate: Thu Oct 30 12:51:21 2025 -0600
Adds Partitioned CSV test to object store access tests (#18370)
## Which issue does this PR close?
N/A -- This PR is a supporting effort to:
- https://github.com/apache/datafusion/pull/18146
- https://github.com/apache/datafusion/issues/17211
## Rationale for this change
Adding these tests not only improves test coverage/expected output
validation, but also gives us a common way to test and talk about object
store access for specific query scenarios.
## What changes are included in this PR?
- Adds a new test to the object store access integration tests that
selects all rows from a set of CSV files under a hive partitioned
directory structure
- Adds new test harness method to build a partitioned ListingTable
backed by CSV data
- Adds a new helper method to build a partitioned csv data and register
the table
## Are these changes tested?
The changes are tests!
## Are there any user-facing changes?
No
cc @alamb
---
.../core/tests/datasource/object_store_access.rs | 211 ++++++++++++++++++++-
1 file changed, 208 insertions(+), 3 deletions(-)
diff --git a/datafusion/core/tests/datasource/object_store_access.rs
b/datafusion/core/tests/datasource/object_store_access.rs
index d1592c2147..f89ca9e049 100644
--- a/datafusion/core/tests/datasource/object_store_access.rs
+++ b/datafusion/core/tests/datasource/object_store_access.rs
@@ -28,6 +28,9 @@ use arrow::array::{ArrayRef, Int32Array, RecordBatch};
use async_trait::async_trait;
use bytes::Bytes;
use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext};
+use datafusion_catalog_listing::{ListingOptions, ListingTable,
ListingTableConfig};
+use datafusion_datasource::ListingTableUrl;
+use datafusion_datasource_csv::CsvFormat;
use futures::stream::BoxStream;
use insta::assert_snapshot;
use object_store::memory::InMemory;
@@ -123,6 +126,163 @@ async fn query_multi_csv_file() {
);
}
+#[tokio::test]
+async fn query_partitioned_csv_file() {
+ let test = Test::new().with_partitioned_csv().await;
+ assert_snapshot!(
+ test.query("select * from csv_table_partitioned").await,
+ @r"
+ ------- Query Output (6 rows) -------
+ +---------+-------+-------+---+----+-----+
+ | d1 | d2 | d3 | a | b | c |
+ +---------+-------+-------+---+----+-----+
+ | 0.00001 | 1e-12 | true | 1 | 10 | 100 |
+ | 0.00003 | 5e-12 | false | 1 | 10 | 100 |
+ | 0.00002 | 2e-12 | true | 2 | 20 | 200 |
+ | 0.00003 | 5e-12 | false | 2 | 20 | 200 |
+ | 0.00003 | 3e-12 | true | 3 | 30 | 300 |
+ | 0.00003 | 5e-12 | false | 3 | 30 | 300 |
+ +---------+-------+-------+---+----+-----+
+ ------- Object Store Request Summary -------
+ RequestCountingObjectStore()
+ Total Requests: 13
+ - LIST (with delimiter) prefix=data
+ - LIST (with delimiter) prefix=data/a=1
+ - LIST (with delimiter) prefix=data/a=2
+ - LIST (with delimiter) prefix=data/a=3
+ - LIST (with delimiter) prefix=data/a=1/b=10
+ - LIST (with delimiter) prefix=data/a=2/b=20
+ - LIST (with delimiter) prefix=data/a=3/b=30
+ - LIST (with delimiter) prefix=data/a=1/b=10/c=100
+ - LIST (with delimiter) prefix=data/a=2/b=20/c=200
+ - LIST (with delimiter) prefix=data/a=3/b=30/c=300
+ - GET (opts) path=data/a=1/b=10/c=100/file_1.csv
+ - GET (opts) path=data/a=2/b=20/c=200/file_2.csv
+ - GET (opts) path=data/a=3/b=30/c=300/file_3.csv
+ "
+ );
+
+ assert_snapshot!(
+ test.query("select * from csv_table_partitioned WHERE a=2").await,
+ @r"
+ ------- Query Output (2 rows) -------
+ +---------+-------+-------+---+----+-----+
+ | d1 | d2 | d3 | a | b | c |
+ +---------+-------+-------+---+----+-----+
+ | 0.00002 | 2e-12 | true | 2 | 20 | 200 |
+ | 0.00003 | 5e-12 | false | 2 | 20 | 200 |
+ +---------+-------+-------+---+----+-----+
+ ------- Object Store Request Summary -------
+ RequestCountingObjectStore()
+ Total Requests: 4
+ - LIST (with delimiter) prefix=data/a=2
+ - LIST (with delimiter) prefix=data/a=2/b=20
+ - LIST (with delimiter) prefix=data/a=2/b=20/c=200
+ - GET (opts) path=data/a=2/b=20/c=200/file_2.csv
+ "
+ );
+
+ assert_snapshot!(
+ test.query("select * from csv_table_partitioned WHERE b=20").await,
+ @r"
+ ------- Query Output (2 rows) -------
+ +---------+-------+-------+---+----+-----+
+ | d1 | d2 | d3 | a | b | c |
+ +---------+-------+-------+---+----+-----+
+ | 0.00002 | 2e-12 | true | 2 | 20 | 200 |
+ | 0.00003 | 5e-12 | false | 2 | 20 | 200 |
+ +---------+-------+-------+---+----+-----+
+ ------- Object Store Request Summary -------
+ RequestCountingObjectStore()
+ Total Requests: 11
+ - LIST (with delimiter) prefix=data
+ - LIST (with delimiter) prefix=data/a=1
+ - LIST (with delimiter) prefix=data/a=2
+ - LIST (with delimiter) prefix=data/a=3
+ - LIST (with delimiter) prefix=data/a=1/b=10
+ - LIST (with delimiter) prefix=data/a=2/b=20
+ - LIST (with delimiter) prefix=data/a=3/b=30
+ - LIST (with delimiter) prefix=data/a=1/b=10/c=100
+ - LIST (with delimiter) prefix=data/a=2/b=20/c=200
+ - LIST (with delimiter) prefix=data/a=3/b=30/c=300
+ - GET (opts) path=data/a=2/b=20/c=200/file_2.csv
+ "
+ );
+
+ assert_snapshot!(
+ test.query("select * from csv_table_partitioned WHERE c=200").await,
+ @r"
+ ------- Query Output (2 rows) -------
+ +---------+-------+-------+---+----+-----+
+ | d1 | d2 | d3 | a | b | c |
+ +---------+-------+-------+---+----+-----+
+ | 0.00002 | 2e-12 | true | 2 | 20 | 200 |
+ | 0.00003 | 5e-12 | false | 2 | 20 | 200 |
+ +---------+-------+-------+---+----+-----+
+ ------- Object Store Request Summary -------
+ RequestCountingObjectStore()
+ Total Requests: 11
+ - LIST (with delimiter) prefix=data
+ - LIST (with delimiter) prefix=data/a=1
+ - LIST (with delimiter) prefix=data/a=2
+ - LIST (with delimiter) prefix=data/a=3
+ - LIST (with delimiter) prefix=data/a=1/b=10
+ - LIST (with delimiter) prefix=data/a=2/b=20
+ - LIST (with delimiter) prefix=data/a=3/b=30
+ - LIST (with delimiter) prefix=data/a=1/b=10/c=100
+ - LIST (with delimiter) prefix=data/a=2/b=20/c=200
+ - LIST (with delimiter) prefix=data/a=3/b=30/c=300
+ - GET (opts) path=data/a=2/b=20/c=200/file_2.csv
+ "
+ );
+
+ assert_snapshot!(
+ test.query("select * from csv_table_partitioned WHERE a=2 AND
b=20").await,
+ @r"
+ ------- Query Output (2 rows) -------
+ +---------+-------+-------+---+----+-----+
+ | d1 | d2 | d3 | a | b | c |
+ +---------+-------+-------+---+----+-----+
+ | 0.00002 | 2e-12 | true | 2 | 20 | 200 |
+ | 0.00003 | 5e-12 | false | 2 | 20 | 200 |
+ +---------+-------+-------+---+----+-----+
+ ------- Object Store Request Summary -------
+ RequestCountingObjectStore()
+ Total Requests: 3
+ - LIST (with delimiter) prefix=data/a=2/b=20
+ - LIST (with delimiter) prefix=data/a=2/b=20/c=200
+ - GET (opts) path=data/a=2/b=20/c=200/file_2.csv
+ "
+ );
+
+ assert_snapshot!(
+ test.query("select * from csv_table_partitioned WHERE a<2 AND b=10 AND
c=100").await,
+ @r"
+ ------- Query Output (2 rows) -------
+ +---------+-------+-------+---+----+-----+
+ | d1 | d2 | d3 | a | b | c |
+ +---------+-------+-------+---+----+-----+
+ | 0.00001 | 1e-12 | true | 1 | 10 | 100 |
+ | 0.00003 | 5e-12 | false | 1 | 10 | 100 |
+ +---------+-------+-------+---+----+-----+
+ ------- Object Store Request Summary -------
+ RequestCountingObjectStore()
+ Total Requests: 11
+ - LIST (with delimiter) prefix=data
+ - LIST (with delimiter) prefix=data/a=1
+ - LIST (with delimiter) prefix=data/a=2
+ - LIST (with delimiter) prefix=data/a=3
+ - LIST (with delimiter) prefix=data/a=1/b=10
+ - LIST (with delimiter) prefix=data/a=2/b=20
+ - LIST (with delimiter) prefix=data/a=3/b=30
+ - LIST (with delimiter) prefix=data/a=1/b=10/c=100
+ - LIST (with delimiter) prefix=data/a=2/b=20/c=200
+ - LIST (with delimiter) prefix=data/a=3/b=30/c=300
+ - GET (opts) path=data/a=1/b=10/c=100/file_1.csv
+ "
+ );
+}
+
#[tokio::test]
async fn create_single_parquet_file_default() {
// The default metadata size hint is 512KB
@@ -363,7 +523,7 @@ impl Test {
self
}
- /// Register a CSV file at the given path relative to the
[`datafusion_test_data`] directory
+ /// Register a CSV file at the given path
async fn register_csv(self, table_name: &str, path: &str) -> Self {
let mut options = CsvReadOptions::new();
options.has_header = true;
@@ -375,8 +535,30 @@ impl Test {
self
}
- /// Register a Parquet file at the given path relative to the
- /// [`datafusion_test_data`] directory
+ /// Register a partitioned CSV table at the given path
+ async fn register_partitioned_csv(self, table_name: &str, path: &str) ->
Self {
+ let file_format = Arc::new(CsvFormat::default().with_has_header(true));
+ let options = ListingOptions::new(file_format);
+
+ let url = format!("mem://{path}").parse().unwrap();
+ let table_url = ListingTableUrl::try_new(url, None).unwrap();
+
+ let session_state = self.session_context.state();
+ let mut config =
ListingTableConfig::new(table_url).with_listing_options(options);
+ config = config
+ .infer_partitions_from_path(&session_state)
+ .await
+ .unwrap();
+ config = config.infer_schema(&session_state).await.unwrap();
+
+ let table = Arc::new(ListingTable::try_new(config).unwrap());
+ self.session_context
+ .register_table(table_name, table)
+ .unwrap();
+ self
+ }
+
+ /// Register a Parquet file at the given path
async fn register_parquet(self, table_name: &str, path: &str) -> Self {
let path = format!("mem://{path}");
let mut options: ParquetReadOptions<'_> = ParquetReadOptions::new();
@@ -425,6 +607,29 @@ impl Test {
self.register_csv("csv_table", "/data/").await
}
+ /// Register three CSV files in a partitioned directory structure, called
+ /// `csv_table_partitioned`
+ async fn with_partitioned_csv(mut self) -> Test {
+ for i in 1..4 {
+ // upload CSV data to object store
+ let csv_data1 = format!(
+ r#"d1,d2,d3
+0.0000{i},{i}e-12,true
+0.00003,5e-12,false
+"#
+ );
+ self = self
+ .with_bytes(
+ &format!("/data/a={i}/b={}/c={}/file_{i}.csv", i * 10, i *
100,),
+ csv_data1,
+ )
+ .await;
+ }
+ // register table
+ self.register_partitioned_csv("csv_table_partitioned", "/data/")
+ .await
+ }
+
/// Add a single parquet file that has two columns and two row groups
named `parquet_table`
///
/// Column "a": Int32 with values 0-100] in row group 1
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]