This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new efd9587b2f fix: coerce int96 resolution inside of list, struct, and 
map types (#16058)
efd9587b2f is described below

commit efd9587b2f271dd54c31e19416f04810f42675ca
Author: Matt Butrovich <[email protected]>
AuthorDate: Tue May 20 06:51:07 2025 -0400

    fix: coerce int96 resolution inside of list, struct, and map types (#16058)
    
    * Add test generated from schema in Comet.
    
    * Checkpoint DFS.
    
    * Checkpoint with working transformation.
    
    * fmt, clippy fixes.
    
    * Remove maximum stack depth.
    
    * More testing.
    
    * Improve tests.
    
    * Improve docs.
    
    * Use a smaller HashSet instead of HashMap with every field in it. More 
docs.
    
    * Use a smaller HashSet instead of HashMap with every field in it. More 
docs.
    
    * More docs.
    
    * More docs.
    
    * Fix typo.
    
    * Refactor match with nested if lets to make it more readable.
    
    * Address some PR feedback.
    
    * Rename variables in struct processing to address PR feedback. Do List 
next.
    
    * Rename variables in list processing to address PR feedback.
    
    * Update docs.
    
    * Simplify list parquet path generation.
    
    * Map support.
    
    * Remove old TODO.
    
    * Reduce redundant docs be referring to docs above.
    
    * Reduce redundant docs be referring to docs above.
    
    * Add parquet file generated from CometFuzzTestSuite ParquetGenerator 
(similar to schema in file_format tests) to exercise end-to-end support.
    
    * Fix clippy.
---
 .../core/src/datasource/physical_plan/parquet.rs   | 120 +++++-
 datafusion/core/tests/data/int96_nested.parquet    | Bin 0 -> 4004 bytes
 datafusion/datasource-parquet/src/file_format.rs   | 415 +++++++++++++++++++--
 3 files changed, 510 insertions(+), 25 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs 
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index e4d5060e06..0da230682b 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -44,7 +44,7 @@ mod tests {
     use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder};
     use arrow::record_batch::RecordBatch;
     use arrow::util::pretty::pretty_format_batches;
-    use arrow_schema::SchemaRef;
+    use arrow_schema::{SchemaRef, TimeUnit};
     use bytes::{BufMut, BytesMut};
     use datafusion_common::config::TableParquetOptions;
     use datafusion_common::test_util::{batches_to_sort_string, 
batches_to_string};
@@ -1229,6 +1229,124 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn parquet_exec_with_int96_nested() -> Result<()> {
+        // This test ensures that we maintain compatibility with coercing 
int96 to the desired
+        // resolution when they're within a nested type (e.g., struct, map, 
list). This file
+        // originates from a modified CometFuzzTestSuite ParquetGenerator to 
generate combinations
+        // of primitive and complex columns using int96. Other tests cover 
reading the data
+        // correctly with this coercion. Here we're only checking the coerced 
schema is correct.
+        let testdata = "../../datafusion/core/tests/data";
+        let filename = "int96_nested.parquet";
+        let session_ctx = SessionContext::new();
+        let state = session_ctx.state();
+        let task_ctx = state.task_ctx();
+
+        let parquet_exec = scan_format(
+            &state,
+            
&ParquetFormat::default().with_coerce_int96(Some("us".to_string())),
+            None,
+            testdata,
+            filename,
+            None,
+            None,
+        )
+        .await
+        .unwrap();
+        assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
+
+        let mut results = parquet_exec.execute(0, task_ctx.clone())?;
+        let batch = results.next().await.unwrap()?;
+
+        let expected_schema = Arc::new(Schema::new(vec![
+            Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), 
true),
+            Field::new_struct(
+                "c1",
+                vec![Field::new(
+                    "c0",
+                    DataType::Timestamp(TimeUnit::Microsecond, None),
+                    true,
+                )],
+                true,
+            ),
+            Field::new_struct(
+                "c2",
+                vec![Field::new_list(
+                    "c0",
+                    Field::new(
+                        "element",
+                        DataType::Timestamp(TimeUnit::Microsecond, None),
+                        true,
+                    ),
+                    true,
+                )],
+                true,
+            ),
+            Field::new_map(
+                "c3",
+                "key_value",
+                Field::new(
+                    "key",
+                    DataType::Timestamp(TimeUnit::Microsecond, None),
+                    false,
+                ),
+                Field::new(
+                    "value",
+                    DataType::Timestamp(TimeUnit::Microsecond, None),
+                    true,
+                ),
+                false,
+                true,
+            ),
+            Field::new_list(
+                "c4",
+                Field::new(
+                    "element",
+                    DataType::Timestamp(TimeUnit::Microsecond, None),
+                    true,
+                ),
+                true,
+            ),
+            Field::new_list(
+                "c5",
+                Field::new_struct(
+                    "element",
+                    vec![Field::new(
+                        "c0",
+                        DataType::Timestamp(TimeUnit::Microsecond, None),
+                        true,
+                    )],
+                    true,
+                ),
+                true,
+            ),
+            Field::new_list(
+                "c6",
+                Field::new_map(
+                    "element",
+                    "key_value",
+                    Field::new(
+                        "key",
+                        DataType::Timestamp(TimeUnit::Microsecond, None),
+                        false,
+                    ),
+                    Field::new(
+                        "value",
+                        DataType::Timestamp(TimeUnit::Microsecond, None),
+                        true,
+                    ),
+                    false,
+                    true,
+                ),
+                true,
+            ),
+        ]));
+
+        assert_eq!(batch.schema(), expected_schema);
+
+        Ok(())
+    }
+
     #[tokio::test]
     async fn parquet_exec_with_range() -> Result<()> {
         fn file_range(meta: &ObjectMeta, start: i64, end: i64) -> 
PartitionedFile {
diff --git a/datafusion/core/tests/data/int96_nested.parquet 
b/datafusion/core/tests/data/int96_nested.parquet
new file mode 100644
index 0000000000..708823ded6
Binary files /dev/null and b/datafusion/core/tests/data/int96_nested.parquet 
differ
diff --git a/datafusion/datasource-parquet/src/file_format.rs 
b/datafusion/datasource-parquet/src/file_format.rs
index 83013e5c97..253bd8872d 100644
--- a/datafusion/datasource-parquet/src/file_format.rs
+++ b/datafusion/datasource-parquet/src/file_format.rs
@@ -18,9 +18,11 @@
 //! [`ParquetFormat`]: Parquet [`FileFormat`] abstractions
 
 use std::any::Any;
+use std::cell::RefCell;
 use std::fmt;
 use std::fmt::Debug;
 use std::ops::Range;
+use std::rc::Rc;
 use std::sync::Arc;
 
 use arrow::array::RecordBatch;
@@ -41,7 +43,7 @@ use datafusion_common::parsers::CompressionTypeVariant;
 use datafusion_common::stats::Precision;
 use datafusion_common::{
     internal_datafusion_err, internal_err, not_impl_err, ColumnStatistics,
-    DataFusionError, GetExt, Result, DEFAULT_PARQUET_EXTENSION,
+    DataFusionError, GetExt, HashSet, Result, DEFAULT_PARQUET_EXTENSION,
 };
 use datafusion_common::{HashMap, Statistics};
 use datafusion_common_runtime::{JoinSet, SpawnedTask};
@@ -557,38 +559,186 @@ pub fn coerce_int96_to_resolution(
     file_schema: &Schema,
     time_unit: &TimeUnit,
 ) -> Option<Schema> {
-    let mut transform = false;
-    let parquet_fields: HashMap<_, _> = parquet_schema
+    // Traverse the parquet_schema columns looking for int96 physical types. 
If encountered, insert
+    // the field's full path into a set.
+    let int96_fields: HashSet<_> = parquet_schema
         .columns()
         .iter()
-        .map(|f| {
-            let dt = f.physical_type();
-            if dt.eq(&Type::INT96) {
-                transform = true;
-            }
-            (f.name(), dt)
-        })
+        .filter(|f| f.physical_type() == Type::INT96)
+        .map(|f| f.path().string())
         .collect();
 
-    if !transform {
+    if int96_fields.is_empty() {
+        // The schema doesn't contain any int96 fields, so skip the remaining 
logic.
         return None;
     }
 
-    let transformed_fields: Vec<Arc<Field>> = file_schema
-        .fields
-        .iter()
-        .map(|field| match parquet_fields.get(field.name().as_str()) {
-            Some(Type::INT96) => {
-                field_with_new_type(field, DataType::Timestamp(*time_unit, 
None))
+    // Do a DFS into the schema using a stack, looking for timestamp(nanos) 
fields that originated
+    // as int96 to coerce to the provided time_unit.
+
+    type NestedFields = Rc<RefCell<Vec<FieldRef>>>;
+    type StackContext<'a> = (
+        Vec<&'a str>, // The Parquet column path (e.g., "c0.list.element.c1") 
for the current field.
+        &'a FieldRef, // The current field to be processed.
+        NestedFields, // The parent's fields that this field will be 
(possibly) type-coerced and
+        // inserted into. All fields have a parent, so this is not an Option 
type.
+        Option<NestedFields>, // Nested types need to create their own vector 
of fields for their
+                              // children. For primitive types this will 
remain None. For nested
+                              // types it is None the first time they are 
processed. Then, we
+                              // instantiate a vector for its children, push 
the field back onto the
+                              // stack to be processed again, and DFS into its 
children. The next
+                              // time we process the field, we know we have 
DFS'd into the children
+                              // because this field is Some.
+    );
+
+    // This is our top-level fields from which we will construct our schema. 
We pass this into our
+    // initial stack context as the parent fields, and the DFS populates it.
+    let fields = 
Rc::new(RefCell::new(Vec::with_capacity(file_schema.fields.len())));
+
+    // TODO: It might be possible to only DFS into nested fields that we know 
contain an int96 if we
+    // use some sort of LPM data structure to check if we're currently DFS'ing 
nested types that are
+    // in a column path that contains an int96. That can be a future 
optimization for large schemas.
+    let transformed_schema = {
+        // Populate the stack with our top-level fields.
+        let mut stack: Vec<StackContext> = file_schema
+            .fields()
+            .iter()
+            .rev()
+            .map(|f| (vec![f.name().as_str()], f, Rc::clone(&fields), None))
+            .collect();
+
+        // Pop fields to DFS into until we have exhausted the stack.
+        while let Some((parquet_path, current_field, parent_fields, 
child_fields)) =
+            stack.pop()
+        {
+            match (current_field.data_type(), child_fields) {
+                (DataType::Struct(unprocessed_children), None) => {
+                    // This is the first time popping off this struct. We 
don't yet know the
+                    // correct types of its children (i.e., if they need 
coercing) so we create
+                    // a vector for child_fields, push the struct node back 
onto the stack to be
+                    // processed again (see below) after processing all its 
children.
+                    let child_fields = Rc::new(RefCell::new(Vec::with_capacity(
+                        unprocessed_children.len(),
+                    )));
+                    // Note that here we push the struct back onto the stack 
with its
+                    // parent_fields in the same position, now with 
Some(child_fields).
+                    stack.push((
+                        parquet_path.clone(),
+                        current_field,
+                        parent_fields,
+                        Some(Rc::clone(&child_fields)),
+                    ));
+                    // Push all the children in reverse to maintain original 
schema order due to
+                    // stack processing.
+                    for child in unprocessed_children.into_iter().rev() {
+                        let mut child_path = parquet_path.clone();
+                        // Build up a normalized path that we'll use as a key 
into the original
+                        // int96_fields set above to test if this originated 
as int96.
+                        child_path.push(".");
+                        child_path.push(child.name());
+                        // Note that here we push the field onto the stack 
using the struct's
+                        // new child_fields vector as the field's 
parent_fields.
+                        stack.push((child_path, child, 
Rc::clone(&child_fields), None));
+                    }
+                }
+                (DataType::Struct(unprocessed_children), 
Some(processed_children)) => {
+                    // This is the second time popping off this struct. The 
child_fields vector
+                    // now contains each field that has been DFS'd into, and 
we can construct
+                    // the resulting struct with correct child types.
+                    let processed_children = processed_children.borrow();
+                    assert_eq!(processed_children.len(), 
unprocessed_children.len());
+                    let processed_struct = Field::new_struct(
+                        current_field.name(),
+                        processed_children.as_slice(),
+                        current_field.is_nullable(),
+                    );
+                    
parent_fields.borrow_mut().push(Arc::new(processed_struct));
+                }
+                (DataType::List(unprocessed_child), None) => {
+                    // This is the first time popping off this list. See 
struct docs above.
+                    let child_fields = 
Rc::new(RefCell::new(Vec::with_capacity(1)));
+                    stack.push((
+                        parquet_path.clone(),
+                        current_field,
+                        parent_fields,
+                        Some(Rc::clone(&child_fields)),
+                    ));
+                    let mut child_path = parquet_path.clone();
+                    // Spark uses a definition for arrays/lists that results 
in a group
+                    // named "list" that is not maintained when parsing to 
Arrow. We just push
+                    // this name into the path.
+                    child_path.push(".list.");
+                    child_path.push(unprocessed_child.name());
+                    stack.push((
+                        child_path.clone(),
+                        unprocessed_child,
+                        Rc::clone(&child_fields),
+                        None,
+                    ));
+                }
+                (DataType::List(_), Some(processed_children)) => {
+                    // This is the second time popping off this list. See 
struct docs above.
+                    let processed_children = processed_children.borrow();
+                    assert_eq!(processed_children.len(), 1);
+                    let processed_list = Field::new_list(
+                        current_field.name(),
+                        Arc::clone(&processed_children[0]),
+                        current_field.is_nullable(),
+                    );
+                    parent_fields.borrow_mut().push(Arc::new(processed_list));
+                }
+                (DataType::Map(unprocessed_child, _), None) => {
+                    // This is the first time popping off this map. See struct 
docs above.
+                    let child_fields = 
Rc::new(RefCell::new(Vec::with_capacity(1)));
+                    stack.push((
+                        parquet_path.clone(),
+                        current_field,
+                        parent_fields,
+                        Some(Rc::clone(&child_fields)),
+                    ));
+                    let mut child_path = parquet_path.clone();
+                    child_path.push(".");
+                    child_path.push(unprocessed_child.name());
+                    stack.push((
+                        child_path.clone(),
+                        unprocessed_child,
+                        Rc::clone(&child_fields),
+                        None,
+                    ));
+                }
+                (DataType::Map(_, sorted), Some(processed_children)) => {
+                    // This is the second time popping off this map. See 
struct docs above.
+                    let processed_children = processed_children.borrow();
+                    assert_eq!(processed_children.len(), 1);
+                    let processed_map = Field::new(
+                        current_field.name(),
+                        DataType::Map(Arc::clone(&processed_children[0]), 
*sorted),
+                        current_field.is_nullable(),
+                    );
+                    parent_fields.borrow_mut().push(Arc::new(processed_map));
+                }
+                (DataType::Timestamp(TimeUnit::Nanosecond, None), None)
+                    if int96_fields.contains(parquet_path.concat().as_str()) =>
+                // We found a timestamp(nanos) and it originated as int96. 
Coerce it to the correct
+                // time_unit.
+                {
+                    parent_fields.borrow_mut().push(field_with_new_type(
+                        current_field,
+                        DataType::Timestamp(*time_unit, None),
+                    ));
+                }
+                // Other types can be cloned as they are.
+                _ => 
parent_fields.borrow_mut().push(Arc::clone(current_field)),
             }
-            _ => Arc::clone(field),
-        })
-        .collect();
+        }
+        assert_eq!(fields.borrow().len(), file_schema.fields.len());
+        Schema::new_with_metadata(
+            fields.borrow_mut().clone(),
+            file_schema.metadata.clone(),
+        )
+    };
 
-    Some(Schema::new_with_metadata(
-        transformed_fields,
-        file_schema.metadata.clone(),
-    ))
+    Some(transformed_schema)
 }
 
 /// Coerces the file schema if the table schema uses a view type.
@@ -1576,3 +1726,220 @@ fn create_max_min_accs(
         .collect();
     (max_values, min_values)
 }
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
+
+    use super::*;
+
+    use arrow::datatypes::DataType;
+    use parquet::schema::parser::parse_message_type;
+
+    #[test]
+    fn coerce_int96_to_resolution_with_mixed_timestamps() {
+        // Unclear if Spark (or other writer) could generate a file with mixed 
timestamps like this,
+        // but we want to test the scenario just in case since it's at least a 
valid schema as far
+        // as the Parquet spec is concerned.
+        let spark_schema = "
+        message spark_schema {
+          optional int96 c0;
+          optional int64 c1 (TIMESTAMP(NANOS,true));
+          optional int64 c2 (TIMESTAMP(NANOS,false));
+          optional int64 c3 (TIMESTAMP(MILLIS,true));
+          optional int64 c4 (TIMESTAMP(MILLIS,false));
+          optional int64 c5 (TIMESTAMP(MICROS,true));
+          optional int64 c6 (TIMESTAMP(MICROS,false));
+        }
+        ";
+
+        let schema = parse_message_type(spark_schema).expect("should parse 
schema");
+        let descr = SchemaDescriptor::new(Arc::new(schema));
+
+        let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
+
+        let result =
+            coerce_int96_to_resolution(&descr, &arrow_schema, 
&TimeUnit::Microsecond)
+                .unwrap();
+
+        // Only the first field (c0) should be converted to a microsecond 
timestamp because it's the
+        // only timestamp that originated from an INT96.
+        let expected_schema = Schema::new(vec![
+            Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), 
true),
+            Field::new(
+                "c1",
+                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
+                true,
+            ),
+            Field::new("c2", DataType::Timestamp(TimeUnit::Nanosecond, None), 
true),
+            Field::new(
+                "c3",
+                DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
+                true,
+            ),
+            Field::new("c4", DataType::Timestamp(TimeUnit::Millisecond, None), 
true),
+            Field::new(
+                "c5",
+                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
+                true,
+            ),
+            Field::new("c6", DataType::Timestamp(TimeUnit::Microsecond, None), 
true),
+        ]);
+
+        assert_eq!(result, expected_schema);
+    }
+
+    #[test]
+    fn coerce_int96_to_resolution_with_nested_types() {
+        // This schema is derived from Comet's CometFuzzTestSuite 
ParquetGenerator only using int96
+        // primitive types with generateStruct, generateArray, and generateMap 
set to true, with one
+        // additional field added to c4's struct to make sure all fields in a 
struct get modified.
+        // 
https://github.com/apache/datafusion-comet/blob/main/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala
+        let spark_schema = "
+        message spark_schema {
+          optional int96 c0;
+          optional group c1 {
+            optional int96 c0;
+          }
+          optional group c2 {
+            optional group c0 (LIST) {
+              repeated group list {
+                optional int96 element;
+              }
+            }
+          }
+          optional group c3 (LIST) {
+            repeated group list {
+              optional int96 element;
+            }
+          }
+          optional group c4 (LIST) {
+            repeated group list {
+              optional group element {
+                optional int96 c0;
+                optional int96 c1;
+              }
+            }
+          }
+          optional group c5 (MAP) {
+            repeated group key_value {
+              required int96 key;
+              optional int96 value;
+            }
+          }
+          optional group c6 (LIST) {
+            repeated group list {
+              optional group element (MAP) {
+                repeated group key_value {
+                  required int96 key;
+                  optional int96 value;
+                }
+              }
+            }
+          }
+        }
+        ";
+
+        let schema = parse_message_type(spark_schema).expect("should parse 
schema");
+        let descr = SchemaDescriptor::new(Arc::new(schema));
+
+        let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
+
+        let result =
+            coerce_int96_to_resolution(&descr, &arrow_schema, 
&TimeUnit::Microsecond)
+                .unwrap();
+
+        let expected_schema = Schema::new(vec![
+            Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), 
true),
+            Field::new_struct(
+                "c1",
+                vec![Field::new(
+                    "c0",
+                    DataType::Timestamp(TimeUnit::Microsecond, None),
+                    true,
+                )],
+                true,
+            ),
+            Field::new_struct(
+                "c2",
+                vec![Field::new_list(
+                    "c0",
+                    Field::new(
+                        "element",
+                        DataType::Timestamp(TimeUnit::Microsecond, None),
+                        true,
+                    ),
+                    true,
+                )],
+                true,
+            ),
+            Field::new_list(
+                "c3",
+                Field::new(
+                    "element",
+                    DataType::Timestamp(TimeUnit::Microsecond, None),
+                    true,
+                ),
+                true,
+            ),
+            Field::new_list(
+                "c4",
+                Field::new_struct(
+                    "element",
+                    vec![
+                        Field::new(
+                            "c0",
+                            DataType::Timestamp(TimeUnit::Microsecond, None),
+                            true,
+                        ),
+                        Field::new(
+                            "c1",
+                            DataType::Timestamp(TimeUnit::Microsecond, None),
+                            true,
+                        ),
+                    ],
+                    true,
+                ),
+                true,
+            ),
+            Field::new_map(
+                "c5",
+                "key_value",
+                Field::new(
+                    "key",
+                    DataType::Timestamp(TimeUnit::Microsecond, None),
+                    false,
+                ),
+                Field::new(
+                    "value",
+                    DataType::Timestamp(TimeUnit::Microsecond, None),
+                    true,
+                ),
+                false,
+                true,
+            ),
+            Field::new_list(
+                "c6",
+                Field::new_map(
+                    "element",
+                    "key_value",
+                    Field::new(
+                        "key",
+                        DataType::Timestamp(TimeUnit::Microsecond, None),
+                        false,
+                    ),
+                    Field::new(
+                        "value",
+                        DataType::Timestamp(TimeUnit::Microsecond, None),
+                        true,
+                    ),
+                    false,
+                    true,
+                ),
+                true,
+            ),
+        ]);
+
+        assert_eq!(result, expected_schema);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to