This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 66aead7f51 Allow DynamicFileCatalog support to query partitioned file 
(#12683)
66aead7f51 is described below

commit 66aead7f51624d532ed9b3d011c5378d3643f624
Author: Jax Liu <[email protected]>
AuthorDate: Thu Oct 3 06:04:50 2024 +0800

    Allow DynamicFileCatalog support to query partitioned file (#12683)
    
    * support to query partitioned table for dynamic file catalog
    
    * cargo clippy
    
    * split partitions inferring to another function
---
 datafusion/core/src/datasource/dynamic_file.rs     |  13 +-
 datafusion/core/src/datasource/listing/table.rs    |  36 ++++-
 .../sqllogictest/test_files/dynamic_file.slt       | 167 ++++++++++++++++++++-
 3 files changed, 208 insertions(+), 8 deletions(-)

diff --git a/datafusion/core/src/datasource/dynamic_file.rs 
b/datafusion/core/src/datasource/dynamic_file.rs
index 3c409af297..6654d0871c 100644
--- a/datafusion/core/src/datasource/dynamic_file.rs
+++ b/datafusion/core/src/datasource/dynamic_file.rs
@@ -69,11 +69,18 @@ impl UrlTableFactory for DynamicListTableFactory {
             .ok_or_else(|| plan_datafusion_err!("get current SessionStore 
error"))?;
 
         match ListingTableConfig::new(table_url.clone())
-            .infer(state)
+            .infer_options(state)
             .await
         {
-            Ok(cfg) => ListingTable::try_new(cfg)
-                .map(|table| Some(Arc::new(table) as Arc<dyn TableProvider>)),
+            Ok(cfg) => {
+                let cfg = cfg
+                    .infer_partitions_from_path(state)
+                    .await?
+                    .infer_schema(state)
+                    .await?;
+                ListingTable::try_new(cfg)
+                    .map(|table| Some(Arc::new(table) as Arc<dyn 
TableProvider>))
+            }
             Err(_) => Ok(None),
         }
     }
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 3eb8eed9de..a9c6aec175 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -33,7 +33,7 @@ use crate::datasource::{
 };
 use crate::execution::context::SessionState;
 use datafusion_catalog::TableProvider;
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::{config_err, DataFusionError, Result};
 use datafusion_expr::dml::InsertOp;
 use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
 use datafusion_expr::{SortExpr, TableType};
@@ -192,6 +192,38 @@ impl ListingTableConfig {
     pub async fn infer(self, state: &SessionState) -> Result<Self> {
         self.infer_options(state).await?.infer_schema(state).await
     }
+
+    /// Infer the partition columns from the path. Requires `self.options` to 
be set prior to using.
+    pub async fn infer_partitions_from_path(self, state: &SessionState) -> 
Result<Self> {
+        match self.options {
+            Some(options) => {
+                let Some(url) = self.table_paths.first() else {
+                    return config_err!("No table path found");
+                };
+                let partitions = options
+                    .infer_partitions(state, url)
+                    .await?
+                    .into_iter()
+                    .map(|col_name| {
+                        (
+                            col_name,
+                            DataType::Dictionary(
+                                Box::new(DataType::UInt16),
+                                Box::new(DataType::Utf8),
+                            ),
+                        )
+                    })
+                    .collect::<Vec<_>>();
+                let options = options.with_table_partition_cols(partitions);
+                Ok(Self {
+                    table_paths: self.table_paths,
+                    file_schema: self.file_schema,
+                    options: Some(options),
+                })
+            }
+            None => config_err!("No `ListingOptions` set for inferring 
schema"),
+        }
+    }
 }
 
 /// Options for creating a [`ListingTable`]
@@ -505,7 +537,7 @@ impl ListingOptions {
     /// Infer the partitioning at the given path on the provided object store.
     /// For performance reasons, it doesn't read all the files on disk
     /// and therefore may fail to detect invalid partitioning.
-    async fn infer_partitions(
+    pub(crate) async fn infer_partitions(
         &self,
         state: &SessionState,
         table_path: &ListingTableUrl,
diff --git a/datafusion/sqllogictest/test_files/dynamic_file.slt 
b/datafusion/sqllogictest/test_files/dynamic_file.slt
index e177fd3de2..69f9a43ad4 100644
--- a/datafusion/sqllogictest/test_files/dynamic_file.slt
+++ b/datafusion/sqllogictest/test_files/dynamic_file.slt
@@ -25,9 +25,170 @@ SELECT * FROM 
'../core/tests/data/partitioned_table_arrow/part=123' ORDER BY f0;
 1 foo true
 2 bar false
 
-# dynamic file query doesn't support partitioned table
-statement error DataFusion error: Error during planning: table 
'datafusion.public.../core/tests/data/partitioned_table_arrow' not found
-SELECT * FROM '../core/tests/data/partitioned_table_arrow' ORDER BY f0;
+# Read partitioned file
+statement ok
+CREATE TABLE src_table_1 (
+  int_col INT,
+  string_col TEXT,
+  bigint_col BIGINT,
+  partition_col INT
+) AS VALUES
+(1, 'aaa', 100, 1),
+(2, 'bbb', 200, 1),
+(3, 'ccc', 300, 1),
+(4, 'ddd', 400, 1);
+
+statement ok
+CREATE TABLE src_table_2 (
+  int_col INT,
+  string_col TEXT,
+  bigint_col BIGINT,
+  partition_col INT
+) AS VALUES
+(5, 'eee', 500, 2),
+(6, 'fff', 600, 2),
+(7, 'ggg', 700, 2),
+(8, 'hhh', 800, 2);
+
+# Read partitioned csv file
+
+query I
+COPY  src_table_1 TO 'test_files/scratch/dynamic_file/csv_partitions'
+STORED AS CSV
+PARTITIONED BY (partition_col);
+----
+4
+
+query I
+COPY  src_table_2 TO 'test_files/scratch/dynamic_file/csv_partitions'
+STORED AS CSV
+PARTITIONED BY (partition_col);
+----
+4
+
+query ITIT rowsort
+SELECT int_col, string_col, bigint_col, partition_col FROM 
'test_files/scratch/dynamic_file/csv_partitions';
+----
+1 aaa 100 1
+2 bbb 200 1
+3 ccc 300 1
+4 ddd 400 1
+5 eee 500 2
+6 fff 600 2
+7 ggg 700 2
+8 hhh 800 2
+
+# Read partitioned json file
+
+query I
+COPY  src_table_1 TO 'test_files/scratch/dynamic_file/json_partitions'
+STORED AS JSON
+PARTITIONED BY (partition_col);
+----
+4
+
+query I
+COPY  src_table_2 TO 'test_files/scratch/dynamic_file/json_partitions'
+STORED AS JSON
+PARTITIONED BY (partition_col);
+----
+4
+
+query ITIT rowsort
+SELECT int_col, string_col, bigint_col, partition_col FROM 
'test_files/scratch/dynamic_file/json_partitions';
+----
+1 aaa 100 1
+2 bbb 200 1
+3 ccc 300 1
+4 ddd 400 1
+5 eee 500 2
+6 fff 600 2
+7 ggg 700 2
+8 hhh 800 2
+
+# Read partitioned arrow file
+
+query I
+COPY  src_table_1 TO 'test_files/scratch/dynamic_file/arrow_partitions'
+STORED AS ARROW
+PARTITIONED BY (partition_col);
+----
+4
+
+query I
+COPY  src_table_2 TO 'test_files/scratch/dynamic_file/arrow_partitions'
+STORED AS ARROW
+PARTITIONED BY (partition_col);
+----
+4
+
+query ITIT rowsort
+SELECT int_col, string_col, bigint_col, partition_col FROM 
'test_files/scratch/dynamic_file/arrow_partitions';
+----
+1 aaa 100 1
+2 bbb 200 1
+3 ccc 300 1
+4 ddd 400 1
+5 eee 500 2
+6 fff 600 2
+7 ggg 700 2
+8 hhh 800 2
+
+# Read partitioned parquet file
+
+query I
+COPY  src_table_1 TO 'test_files/scratch/dynamic_file/parquet_partitions'
+STORED AS PARQUET
+PARTITIONED BY (partition_col);
+----
+4
+
+query I
+COPY  src_table_2 TO 'test_files/scratch/dynamic_file/parquet_partitions'
+STORED AS PARQUET
+PARTITIONED BY (partition_col);
+----
+4
+
+query ITIT rowsort
+select * from 'test_files/scratch/dynamic_file/parquet_partitions';
+----
+1 aaa 100 1
+2 bbb 200 1
+3 ccc 300 1
+4 ddd 400 1
+5 eee 500 2
+6 fff 600 2
+7 ggg 700 2
+8 hhh 800 2
+
+# Read partitioned parquet file with multiple partition columns
+
+query I
+COPY  src_table_1 TO 'test_files/scratch/dynamic_file/nested_partition'
+STORED AS PARQUET
+PARTITIONED BY (partition_col, string_col);
+----
+4
+
+query I
+COPY  src_table_2 TO 'test_files/scratch/dynamic_file/nested_partition'
+STORED AS PARQUET
+PARTITIONED BY (partition_col, string_col);
+----
+4
+
+query IITT rowsort
+select * from 'test_files/scratch/dynamic_file/nested_partition';
+----
+1 100 1 aaa
+2 200 1 bbb
+3 300 1 ccc
+4 400 1 ddd
+5 500 2 eee
+6 600 2 fff
+7 700 2 ggg
+8 800 2 hhh
 
 # read avro file
 query IT


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to