This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a4d494c54f fix: serialize listing table without partition column 
(#15737)
a4d494c54f is described below

commit a4d494c54f2c69e87ec14ed4a123839d8733c0d9
Author: Chen Chongchen <[email protected]>
AuthorDate: Fri Apr 18 03:05:34 2025 +0800

    fix: serialize listing table without partition column (#15737)
    
    * fix: serialize listing table without partition column
    
    * remove unwrap
    
    * format
    
    * clippy
---
 datafusion/proto/proto/datafusion.proto            |  2 +-
 datafusion/proto/src/generated/prost.rs            |  4 +-
 datafusion/proto/src/logical_plan/mod.rs           | 76 +++++++++++++++-------
 .../proto/tests/cases/roundtrip_logical_plan.rs    | 35 +++++++++-
 4 files changed, 91 insertions(+), 26 deletions(-)

diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index 908b95ab56..39236da3b9 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -90,7 +90,7 @@ message ListingTableScanNode {
   ProjectionColumns projection = 4;
   datafusion_common.Schema schema = 5;
   repeated LogicalExprNode filters = 6;
-  repeated string table_partition_cols = 7;
+  repeated PartitionColumn table_partition_cols = 7;
   bool collect_stat = 8;
   uint32 target_partitions = 9;
   oneof FileFormatType {
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index d2165dad48..41c60b22e3 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -115,8 +115,8 @@ pub struct ListingTableScanNode {
     pub schema: ::core::option::Option<super::datafusion_common::Schema>,
     #[prost(message, repeated, tag = "6")]
     pub filters: ::prost::alloc::vec::Vec<LogicalExprNode>,
-    #[prost(string, repeated, tag = "7")]
-    pub table_partition_cols: 
::prost::alloc::vec::Vec<::prost::alloc::string::String>,
+    #[prost(message, repeated, tag = "7")]
+    pub table_partition_cols: ::prost::alloc::vec::Vec<PartitionColumn>,
     #[prost(bool, tag = "8")]
     pub collect_stat: bool,
     #[prost(uint32, tag = "9")]
diff --git a/datafusion/proto/src/logical_plan/mod.rs 
b/datafusion/proto/src/logical_plan/mod.rs
index c65569ef1c..806f604ccc 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -33,7 +33,7 @@ use crate::{
 };
 
 use crate::protobuf::{proto_error, ToProtoError};
-use arrow::datatypes::{DataType, Schema, SchemaRef};
+use arrow::datatypes::{DataType, Schema, SchemaBuilder, SchemaRef};
 use datafusion::datasource::cte_worktable::CteWorkTable;
 #[cfg(feature = "avro")]
 use datafusion::datasource::file_format::avro::AvroFormat;
@@ -458,23 +458,25 @@ impl AsLogicalPlan for LogicalPlanNode {
                     .map(ListingTableUrl::parse)
                     .collect::<Result<Vec<_>, _>>()?;
 
+                let partition_columns = scan
+                    .table_partition_cols
+                    .iter()
+                    .map(|col| {
+                        let Some(arrow_type) = col.arrow_type.as_ref() else {
+                            return Err(proto_error(
+                                "Missing Arrow type in partition 
columns".to_string(),
+                            ));
+                        };
+                        let arrow_type = 
DataType::try_from(arrow_type).map_err(|e| {
+                            proto_error(format!("Received an unknown 
ArrowType: {}", e))
+                        })?;
+                        Ok((col.name.clone(), arrow_type))
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+
                 let options = ListingOptions::new(file_format)
                     .with_file_extension(&scan.file_extension)
-                    .with_table_partition_cols(
-                        scan.table_partition_cols
-                            .iter()
-                            .map(|col| {
-                                (
-                                    col.clone(),
-                                    schema
-                                        .field_with_name(col)
-                                        .unwrap()
-                                        .data_type()
-                                        .clone(),
-                                )
-                            })
-                            .collect(),
-                    )
+                    .with_table_partition_cols(partition_columns)
                     .with_collect_stat(scan.collect_stat)
                     .with_target_partitions(scan.target_partitions as usize)
                     .with_file_sort_order(all_sort_orders);
@@ -1046,7 +1048,6 @@ impl AsLogicalPlan for LogicalPlanNode {
                         })
                     }
                 };
-                let schema: protobuf::Schema = schema.as_ref().try_into()?;
 
                 let filters: Vec<protobuf::LogicalExprNode> =
                     serialize_exprs(filters, extension_codec)?;
@@ -1099,6 +1100,21 @@ impl AsLogicalPlan for LogicalPlanNode {
 
                     let options = listing_table.options();
 
+                    let mut builder = SchemaBuilder::from(schema.as_ref());
+                    for (idx, field) in 
schema.fields().iter().enumerate().rev() {
+                        if options
+                            .table_partition_cols
+                            .iter()
+                            .any(|(name, _)| name == field.name())
+                        {
+                            builder.remove(idx);
+                        }
+                    }
+
+                    let schema = builder.finish();
+
+                    let schema: protobuf::Schema = (&schema).try_into()?;
+
                     let mut exprs_vec: Vec<SortExprNodeCollection> = vec![];
                     for order in &options.file_sort_order {
                         let expr_vec = SortExprNodeCollection {
@@ -1107,6 +1123,24 @@ impl AsLogicalPlan for LogicalPlanNode {
                         exprs_vec.push(expr_vec);
                     }
 
+                    let partition_columns = options
+                        .table_partition_cols
+                        .iter()
+                        .map(|(name, arrow_type)| {
+                            let arrow_type = 
protobuf::ArrowType::try_from(arrow_type)
+                                .map_err(|e| {
+                                    proto_error(format!(
+                                        "Received an unknown ArrowType: {}",
+                                        e
+                                    ))
+                                })?;
+                            Ok(protobuf::PartitionColumn {
+                                name: name.clone(),
+                                arrow_type: Some(arrow_type),
+                            })
+                        })
+                        .collect::<Result<Vec<_>>>()?;
+
                     Ok(LogicalPlanNode {
                         logical_plan_type: Some(LogicalPlanType::ListingScan(
                             protobuf::ListingTableScanNode {
@@ -1114,11 +1148,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                                 table_name: Some(table_name.clone().into()),
                                 collect_stat: options.collect_stat,
                                 file_extension: options.file_extension.clone(),
-                                table_partition_cols: options
-                                    .table_partition_cols
-                                    .iter()
-                                    .map(|x| x.0.clone())
-                                    .collect::<Vec<_>>(),
+                                table_partition_cols: partition_columns,
                                 paths: listing_table
                                     .table_paths()
                                     .iter()
@@ -1133,6 +1163,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                         )),
                     })
                 } else if let Some(view_table) = 
source.downcast_ref::<ViewTable>() {
+                    let schema: protobuf::Schema = schema.as_ref().try_into()?;
                     Ok(LogicalPlanNode {
                         logical_plan_type: 
Some(LogicalPlanType::ViewScan(Box::new(
                             protobuf::ViewTableScanNode {
@@ -1167,6 +1198,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                         )),
                     })
                 } else {
+                    let schema: protobuf::Schema = schema.as_ref().try_into()?;
                     let mut bytes = vec![];
                     extension_codec
                         .try_encode_table_provider(table_name, provider, &mut 
bytes)
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 9fa1f74ae1..bc57ac7c4d 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -24,7 +24,10 @@ use arrow::datatypes::{
     DECIMAL256_MAX_PRECISION,
 };
 use arrow::util::pretty::pretty_format_batches;
-use datafusion::datasource::file_format::json::JsonFormatFactory;
+use datafusion::datasource::file_format::json::{JsonFormat, JsonFormatFactory};
+use datafusion::datasource::listing::{
+    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
+};
 use datafusion::optimizer::eliminate_nested_union::EliminateNestedUnion;
 use datafusion::optimizer::Optimizer;
 use datafusion_common::parsers::CompressionTypeVariant;
@@ -2559,3 +2562,33 @@ async fn roundtrip_union_query() -> Result<()> {
     );
     Ok(())
 }
+
+#[tokio::test]
+async fn roundtrip_custom_listing_tables_schema() -> Result<()> {
+    let ctx = SessionContext::new();
+    // Make sure during round-trip, constraint information is preserved
+    let file_format = JsonFormat::default();
+    let table_partition_cols = vec![("part".to_owned(), DataType::Int64)];
+    let data = "../core/tests/data/partitioned_table_json";
+    let listing_table_url = ListingTableUrl::parse(data)?;
+    let listing_options = ListingOptions::new(Arc::new(file_format))
+        .with_table_partition_cols(table_partition_cols);
+
+    let config = ListingTableConfig::new(listing_table_url)
+        .with_listing_options(listing_options)
+        .infer_schema(&ctx.state())
+        .await?;
+
+    ctx.register_table("hive_style", 
Arc::new(ListingTable::try_new(config)?))?;
+
+    let plan = ctx
+        .sql("SELECT part, value FROM hive_style LIMIT 1")
+        .await?
+        .logical_plan()
+        .clone();
+
+    let bytes = logical_plan_to_bytes(&plan)?;
+    let new_plan = logical_plan_from_bytes(&bytes, &ctx)?;
+    assert_eq!(plan, new_plan);
+    Ok(())
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to