This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 6514ec7b67 Adds Partitioned CSV test to object store access tests 
(#18370)
6514ec7b67 is described below

commit 6514ec7b6757995cf9fad73f6aca3b8c42a9ff7b
Author: Blake Orth <[email protected]>
AuthorDate: Thu Oct 30 12:51:21 2025 -0600

    Adds Partitioned CSV test to object store access tests (#18370)
    
    ## Which issue does this PR close?
    
    N/A -- This PR is a supporting effort to:
     - https://github.com/apache/datafusion/pull/18146
     - https://github.com/apache/datafusion/issues/17211
    
    ## Rationale for this change
    
    Adding these tests not only improves test coverage/expected output
    validation, but also gives us a common way to test and talk about object
    store access for specific query scenarios.
    
    ## What changes are included in this PR?
    
    - Adds a new test to the object store access integration tests that
    selects all rows from a set of CSV files under a hive partitioned
    directory structure
    - Adds new test harness method to build a partitioned ListingTable
    backed by CSV data
    - Adds a new helper method to build a partitioned csv data and register
    the table
    
    ## Are these changes tested?
    
    The changes are tests!
    
    ## Are there any user-facing changes?
    
    No
    
    cc @alamb
---
 .../core/tests/datasource/object_store_access.rs   | 211 ++++++++++++++++++++-
 1 file changed, 208 insertions(+), 3 deletions(-)

diff --git a/datafusion/core/tests/datasource/object_store_access.rs 
b/datafusion/core/tests/datasource/object_store_access.rs
index d1592c2147..f89ca9e049 100644
--- a/datafusion/core/tests/datasource/object_store_access.rs
+++ b/datafusion/core/tests/datasource/object_store_access.rs
@@ -28,6 +28,9 @@ use arrow::array::{ArrayRef, Int32Array, RecordBatch};
 use async_trait::async_trait;
 use bytes::Bytes;
 use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext};
+use datafusion_catalog_listing::{ListingOptions, ListingTable, 
ListingTableConfig};
+use datafusion_datasource::ListingTableUrl;
+use datafusion_datasource_csv::CsvFormat;
 use futures::stream::BoxStream;
 use insta::assert_snapshot;
 use object_store::memory::InMemory;
@@ -123,6 +126,163 @@ async fn query_multi_csv_file() {
     );
 }
 
+#[tokio::test]
+async fn query_partitioned_csv_file() {
+    let test = Test::new().with_partitioned_csv().await;
+    assert_snapshot!(
+        test.query("select * from csv_table_partitioned").await,
+        @r"
+    ------- Query Output (6 rows) -------
+    +---------+-------+-------+---+----+-----+
+    | d1      | d2    | d3    | a | b  | c   |
+    +---------+-------+-------+---+----+-----+
+    | 0.00001 | 1e-12 | true  | 1 | 10 | 100 |
+    | 0.00003 | 5e-12 | false | 1 | 10 | 100 |
+    | 0.00002 | 2e-12 | true  | 2 | 20 | 200 |
+    | 0.00003 | 5e-12 | false | 2 | 20 | 200 |
+    | 0.00003 | 3e-12 | true  | 3 | 30 | 300 |
+    | 0.00003 | 5e-12 | false | 3 | 30 | 300 |
+    +---------+-------+-------+---+----+-----+
+    ------- Object Store Request Summary -------
+    RequestCountingObjectStore()
+    Total Requests: 13
+    - LIST (with delimiter) prefix=data
+    - LIST (with delimiter) prefix=data/a=1
+    - LIST (with delimiter) prefix=data/a=2
+    - LIST (with delimiter) prefix=data/a=3
+    - LIST (with delimiter) prefix=data/a=1/b=10
+    - LIST (with delimiter) prefix=data/a=2/b=20
+    - LIST (with delimiter) prefix=data/a=3/b=30
+    - LIST (with delimiter) prefix=data/a=1/b=10/c=100
+    - LIST (with delimiter) prefix=data/a=2/b=20/c=200
+    - LIST (with delimiter) prefix=data/a=3/b=30/c=300
+    - GET  (opts) path=data/a=1/b=10/c=100/file_1.csv
+    - GET  (opts) path=data/a=2/b=20/c=200/file_2.csv
+    - GET  (opts) path=data/a=3/b=30/c=300/file_3.csv
+    "
+    );
+
+    assert_snapshot!(
+        test.query("select * from csv_table_partitioned WHERE a=2").await,
+        @r"
+    ------- Query Output (2 rows) -------
+    +---------+-------+-------+---+----+-----+
+    | d1      | d2    | d3    | a | b  | c   |
+    +---------+-------+-------+---+----+-----+
+    | 0.00002 | 2e-12 | true  | 2 | 20 | 200 |
+    | 0.00003 | 5e-12 | false | 2 | 20 | 200 |
+    +---------+-------+-------+---+----+-----+
+    ------- Object Store Request Summary -------
+    RequestCountingObjectStore()
+    Total Requests: 4
+    - LIST (with delimiter) prefix=data/a=2
+    - LIST (with delimiter) prefix=data/a=2/b=20
+    - LIST (with delimiter) prefix=data/a=2/b=20/c=200
+    - GET  (opts) path=data/a=2/b=20/c=200/file_2.csv
+    "
+    );
+
+    assert_snapshot!(
+        test.query("select * from csv_table_partitioned WHERE b=20").await,
+        @r"
+    ------- Query Output (2 rows) -------
+    +---------+-------+-------+---+----+-----+
+    | d1      | d2    | d3    | a | b  | c   |
+    +---------+-------+-------+---+----+-----+
+    | 0.00002 | 2e-12 | true  | 2 | 20 | 200 |
+    | 0.00003 | 5e-12 | false | 2 | 20 | 200 |
+    +---------+-------+-------+---+----+-----+
+    ------- Object Store Request Summary -------
+    RequestCountingObjectStore()
+    Total Requests: 11
+    - LIST (with delimiter) prefix=data
+    - LIST (with delimiter) prefix=data/a=1
+    - LIST (with delimiter) prefix=data/a=2
+    - LIST (with delimiter) prefix=data/a=3
+    - LIST (with delimiter) prefix=data/a=1/b=10
+    - LIST (with delimiter) prefix=data/a=2/b=20
+    - LIST (with delimiter) prefix=data/a=3/b=30
+    - LIST (with delimiter) prefix=data/a=1/b=10/c=100
+    - LIST (with delimiter) prefix=data/a=2/b=20/c=200
+    - LIST (with delimiter) prefix=data/a=3/b=30/c=300
+    - GET  (opts) path=data/a=2/b=20/c=200/file_2.csv
+    "
+    );
+
+    assert_snapshot!(
+        test.query("select * from csv_table_partitioned WHERE c=200").await,
+        @r"
+    ------- Query Output (2 rows) -------
+    +---------+-------+-------+---+----+-----+
+    | d1      | d2    | d3    | a | b  | c   |
+    +---------+-------+-------+---+----+-----+
+    | 0.00002 | 2e-12 | true  | 2 | 20 | 200 |
+    | 0.00003 | 5e-12 | false | 2 | 20 | 200 |
+    +---------+-------+-------+---+----+-----+
+    ------- Object Store Request Summary -------
+    RequestCountingObjectStore()
+    Total Requests: 11
+    - LIST (with delimiter) prefix=data
+    - LIST (with delimiter) prefix=data/a=1
+    - LIST (with delimiter) prefix=data/a=2
+    - LIST (with delimiter) prefix=data/a=3
+    - LIST (with delimiter) prefix=data/a=1/b=10
+    - LIST (with delimiter) prefix=data/a=2/b=20
+    - LIST (with delimiter) prefix=data/a=3/b=30
+    - LIST (with delimiter) prefix=data/a=1/b=10/c=100
+    - LIST (with delimiter) prefix=data/a=2/b=20/c=200
+    - LIST (with delimiter) prefix=data/a=3/b=30/c=300
+    - GET  (opts) path=data/a=2/b=20/c=200/file_2.csv
+    "
+    );
+
+    assert_snapshot!(
+        test.query("select * from csv_table_partitioned WHERE a=2 AND 
b=20").await,
+        @r"
+    ------- Query Output (2 rows) -------
+    +---------+-------+-------+---+----+-----+
+    | d1      | d2    | d3    | a | b  | c   |
+    +---------+-------+-------+---+----+-----+
+    | 0.00002 | 2e-12 | true  | 2 | 20 | 200 |
+    | 0.00003 | 5e-12 | false | 2 | 20 | 200 |
+    +---------+-------+-------+---+----+-----+
+    ------- Object Store Request Summary -------
+    RequestCountingObjectStore()
+    Total Requests: 3
+    - LIST (with delimiter) prefix=data/a=2/b=20
+    - LIST (with delimiter) prefix=data/a=2/b=20/c=200
+    - GET  (opts) path=data/a=2/b=20/c=200/file_2.csv
+    "
+    );
+
+    assert_snapshot!(
+        test.query("select * from csv_table_partitioned WHERE a<2 AND b=10 AND 
c=100").await,
+        @r"
+    ------- Query Output (2 rows) -------
+    +---------+-------+-------+---+----+-----+
+    | d1      | d2    | d3    | a | b  | c   |
+    +---------+-------+-------+---+----+-----+
+    | 0.00001 | 1e-12 | true  | 1 | 10 | 100 |
+    | 0.00003 | 5e-12 | false | 1 | 10 | 100 |
+    +---------+-------+-------+---+----+-----+
+    ------- Object Store Request Summary -------
+    RequestCountingObjectStore()
+    Total Requests: 11
+    - LIST (with delimiter) prefix=data
+    - LIST (with delimiter) prefix=data/a=1
+    - LIST (with delimiter) prefix=data/a=2
+    - LIST (with delimiter) prefix=data/a=3
+    - LIST (with delimiter) prefix=data/a=1/b=10
+    - LIST (with delimiter) prefix=data/a=2/b=20
+    - LIST (with delimiter) prefix=data/a=3/b=30
+    - LIST (with delimiter) prefix=data/a=1/b=10/c=100
+    - LIST (with delimiter) prefix=data/a=2/b=20/c=200
+    - LIST (with delimiter) prefix=data/a=3/b=30/c=300
+    - GET  (opts) path=data/a=1/b=10/c=100/file_1.csv
+    "
+    );
+}
+
 #[tokio::test]
 async fn create_single_parquet_file_default() {
     // The default metadata size hint is 512KB
@@ -363,7 +523,7 @@ impl Test {
         self
     }
 
-    /// Register a CSV file at the given path relative to the 
[`datafusion_test_data`] directory
+    /// Register a CSV file at the given path
     async fn register_csv(self, table_name: &str, path: &str) -> Self {
         let mut options = CsvReadOptions::new();
         options.has_header = true;
@@ -375,8 +535,30 @@ impl Test {
         self
     }
 
-    /// Register a Parquet file at the given path relative to the
-    /// [`datafusion_test_data`] directory
+    /// Register a partitioned CSV table at the given path
+    async fn register_partitioned_csv(self, table_name: &str, path: &str) -> 
Self {
+        let file_format = Arc::new(CsvFormat::default().with_has_header(true));
+        let options = ListingOptions::new(file_format);
+
+        let url = format!("mem://{path}").parse().unwrap();
+        let table_url = ListingTableUrl::try_new(url, None).unwrap();
+
+        let session_state = self.session_context.state();
+        let mut config = 
ListingTableConfig::new(table_url).with_listing_options(options);
+        config = config
+            .infer_partitions_from_path(&session_state)
+            .await
+            .unwrap();
+        config = config.infer_schema(&session_state).await.unwrap();
+
+        let table = Arc::new(ListingTable::try_new(config).unwrap());
+        self.session_context
+            .register_table(table_name, table)
+            .unwrap();
+        self
+    }
+
+    /// Register a Parquet file at the given path
     async fn register_parquet(self, table_name: &str, path: &str) -> Self {
         let path = format!("mem://{path}");
         let mut options: ParquetReadOptions<'_> = ParquetReadOptions::new();
@@ -425,6 +607,29 @@ impl Test {
         self.register_csv("csv_table", "/data/").await
     }
 
+    /// Register three CSV files in a partitioned directory structure, called
+    /// `csv_table_partitioned`
+    async fn with_partitioned_csv(mut self) -> Test {
+        for i in 1..4 {
+            // upload CSV data to object store
+            let csv_data1 = format!(
+                r#"d1,d2,d3
+0.0000{i},{i}e-12,true
+0.00003,5e-12,false
+"#
+            );
+            self = self
+                .with_bytes(
+                    &format!("/data/a={i}/b={}/c={}/file_{i}.csv", i * 10, i * 
100,),
+                    csv_data1,
+                )
+                .await;
+        }
+        // register table
+        self.register_partitioned_csv("csv_table_partitioned", "/data/")
+            .await
+    }
+
     /// Add a single parquet file that has two columns and two row groups 
named `parquet_table`
     ///
     /// Column "a": Int32 with values 0-100] in row group 1


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to