This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new efd9587b2f fix: coerce int96 resolution inside of list, struct, and
map types (#16058)
efd9587b2f is described below
commit efd9587b2f271dd54c31e19416f04810f42675ca
Author: Matt Butrovich <[email protected]>
AuthorDate: Tue May 20 06:51:07 2025 -0400
fix: coerce int96 resolution inside of list, struct, and map types (#16058)
* Add test generated from schema in Comet.
* Checkpoint DFS.
* Checkpoint with working transformation.
* fmt, clippy fixes.
* Remove maximum stack depth.
* More testing.
* Improve tests.
* Improve docs.
* Use a smaller HashSet instead of HashMap with every field in it. More
docs.
* Use a smaller HashSet instead of HashMap with every field in it. More
docs.
* More docs.
* More docs.
* Fix typo.
* Refactor match with nested if lets to make it more readable.
* Address some PR feedback.
* Rename variables in struct processing to address PR feedback. Do List
next.
* Rename variables in list processing to address PR feedback.
* Update docs.
* Simplify list parquet path generation.
* Map support.
* Remove old TODO.
* Reduce redundant docs be referring to docs above.
* Reduce redundant docs be referring to docs above.
* Add parquet file generated from CometFuzzTestSuite ParquetGenerator
(similar to schema in file_format tests) to exercise end-to-end support.
* Fix clippy.
---
.../core/src/datasource/physical_plan/parquet.rs | 120 +++++-
datafusion/core/tests/data/int96_nested.parquet | Bin 0 -> 4004 bytes
datafusion/datasource-parquet/src/file_format.rs | 415 +++++++++++++++++++--
3 files changed, 510 insertions(+), 25 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index e4d5060e06..0da230682b 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -44,7 +44,7 @@ mod tests {
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
- use arrow_schema::SchemaRef;
+ use arrow_schema::{SchemaRef, TimeUnit};
use bytes::{BufMut, BytesMut};
use datafusion_common::config::TableParquetOptions;
use datafusion_common::test_util::{batches_to_sort_string,
batches_to_string};
@@ -1229,6 +1229,124 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn parquet_exec_with_int96_nested() -> Result<()> {
+ // This test ensures that we maintain compatibility with coercing
int96 to the desired
+ // resolution when they're within a nested type (e.g., struct, map,
list). This file
+ // originates from a modified CometFuzzTestSuite ParquetGenerator to
generate combinations
+ // of primitive and complex columns using int96. Other tests cover
reading the data
+ // correctly with this coercion. Here we're only checking the coerced
schema is correct.
+ let testdata = "../../datafusion/core/tests/data";
+ let filename = "int96_nested.parquet";
+ let session_ctx = SessionContext::new();
+ let state = session_ctx.state();
+ let task_ctx = state.task_ctx();
+
+ let parquet_exec = scan_format(
+ &state,
+
&ParquetFormat::default().with_coerce_int96(Some("us".to_string())),
+ None,
+ testdata,
+ filename,
+ None,
+ None,
+ )
+ .await
+ .unwrap();
+ assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
+
+ let mut results = parquet_exec.execute(0, task_ctx.clone())?;
+ let batch = results.next().await.unwrap()?;
+
+ let expected_schema = Arc::new(Schema::new(vec![
+ Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None),
true),
+ Field::new_struct(
+ "c1",
+ vec![Field::new(
+ "c0",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ true,
+ )],
+ true,
+ ),
+ Field::new_struct(
+ "c2",
+ vec![Field::new_list(
+ "c0",
+ Field::new(
+ "element",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ true,
+ ),
+ true,
+ )],
+ true,
+ ),
+ Field::new_map(
+ "c3",
+ "key_value",
+ Field::new(
+ "key",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ false,
+ ),
+ Field::new(
+ "value",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ true,
+ ),
+ false,
+ true,
+ ),
+ Field::new_list(
+ "c4",
+ Field::new(
+ "element",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ true,
+ ),
+ true,
+ ),
+ Field::new_list(
+ "c5",
+ Field::new_struct(
+ "element",
+ vec![Field::new(
+ "c0",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ true,
+ )],
+ true,
+ ),
+ true,
+ ),
+ Field::new_list(
+ "c6",
+ Field::new_map(
+ "element",
+ "key_value",
+ Field::new(
+ "key",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ false,
+ ),
+ Field::new(
+ "value",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ true,
+ ),
+ false,
+ true,
+ ),
+ true,
+ ),
+ ]));
+
+ assert_eq!(batch.schema(), expected_schema);
+
+ Ok(())
+ }
+
#[tokio::test]
async fn parquet_exec_with_range() -> Result<()> {
fn file_range(meta: &ObjectMeta, start: i64, end: i64) ->
PartitionedFile {
diff --git a/datafusion/core/tests/data/int96_nested.parquet
b/datafusion/core/tests/data/int96_nested.parquet
new file mode 100644
index 0000000000..708823ded6
Binary files /dev/null and b/datafusion/core/tests/data/int96_nested.parquet
differ
diff --git a/datafusion/datasource-parquet/src/file_format.rs
b/datafusion/datasource-parquet/src/file_format.rs
index 83013e5c97..253bd8872d 100644
--- a/datafusion/datasource-parquet/src/file_format.rs
+++ b/datafusion/datasource-parquet/src/file_format.rs
@@ -18,9 +18,11 @@
//! [`ParquetFormat`]: Parquet [`FileFormat`] abstractions
use std::any::Any;
+use std::cell::RefCell;
use std::fmt;
use std::fmt::Debug;
use std::ops::Range;
+use std::rc::Rc;
use std::sync::Arc;
use arrow::array::RecordBatch;
@@ -41,7 +43,7 @@ use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::stats::Precision;
use datafusion_common::{
internal_datafusion_err, internal_err, not_impl_err, ColumnStatistics,
- DataFusionError, GetExt, Result, DEFAULT_PARQUET_EXTENSION,
+ DataFusionError, GetExt, HashSet, Result, DEFAULT_PARQUET_EXTENSION,
};
use datafusion_common::{HashMap, Statistics};
use datafusion_common_runtime::{JoinSet, SpawnedTask};
@@ -557,38 +559,186 @@ pub fn coerce_int96_to_resolution(
file_schema: &Schema,
time_unit: &TimeUnit,
) -> Option<Schema> {
- let mut transform = false;
- let parquet_fields: HashMap<_, _> = parquet_schema
+ // Traverse the parquet_schema columns looking for int96 physical types.
If encountered, insert
+ // the field's full path into a set.
+ let int96_fields: HashSet<_> = parquet_schema
.columns()
.iter()
- .map(|f| {
- let dt = f.physical_type();
- if dt.eq(&Type::INT96) {
- transform = true;
- }
- (f.name(), dt)
- })
+ .filter(|f| f.physical_type() == Type::INT96)
+ .map(|f| f.path().string())
.collect();
- if !transform {
+ if int96_fields.is_empty() {
+ // The schema doesn't contain any int96 fields, so skip the remaining
logic.
return None;
}
- let transformed_fields: Vec<Arc<Field>> = file_schema
- .fields
- .iter()
- .map(|field| match parquet_fields.get(field.name().as_str()) {
- Some(Type::INT96) => {
- field_with_new_type(field, DataType::Timestamp(*time_unit,
None))
+ // Do a DFS into the schema using a stack, looking for timestamp(nanos)
fields that originated
+ // as int96 to coerce to the provided time_unit.
+
+ type NestedFields = Rc<RefCell<Vec<FieldRef>>>;
+ type StackContext<'a> = (
+ Vec<&'a str>, // The Parquet column path (e.g., "c0.list.element.c1")
for the current field.
+ &'a FieldRef, // The current field to be processed.
+ NestedFields, // The parent's fields that this field will be
(possibly) type-coerced and
+ // inserted into. All fields have a parent, so this is not an Option
type.
+ Option<NestedFields>, // Nested types need to create their own vector
of fields for their
+ // children. For primitive types this will
remain None. For nested
+ // types it is None the first time they are
processed. Then, we
+ // instantiate a vector for its children, push
the field back onto the
+ // stack to be processed again, and DFS into its
children. The next
+ // time we process the field, we know we have
DFS'd into the children
+ // because this field is Some.
+ );
+
+ // This is our top-level fields from which we will construct our schema.
We pass this into our
+ // initial stack context as the parent fields, and the DFS populates it.
+ let fields =
Rc::new(RefCell::new(Vec::with_capacity(file_schema.fields.len())));
+
+ // TODO: It might be possible to only DFS into nested fields that we know
contain an int96 if we
+ // use some sort of LPM data structure to check if we're currently DFS'ing
nested types that are
+ // in a column path that contains an int96. That can be a future
optimization for large schemas.
+ let transformed_schema = {
+ // Populate the stack with our top-level fields.
+ let mut stack: Vec<StackContext> = file_schema
+ .fields()
+ .iter()
+ .rev()
+ .map(|f| (vec![f.name().as_str()], f, Rc::clone(&fields), None))
+ .collect();
+
+ // Pop fields to DFS into until we have exhausted the stack.
+ while let Some((parquet_path, current_field, parent_fields,
child_fields)) =
+ stack.pop()
+ {
+ match (current_field.data_type(), child_fields) {
+ (DataType::Struct(unprocessed_children), None) => {
+ // This is the first time popping off this struct. We
don't yet know the
+ // correct types of its children (i.e., if they need
coercing) so we create
+ // a vector for child_fields, push the struct node back
onto the stack to be
+ // processed again (see below) after processing all its
children.
+ let child_fields = Rc::new(RefCell::new(Vec::with_capacity(
+ unprocessed_children.len(),
+ )));
+ // Note that here we push the struct back onto the stack
with its
+ // parent_fields in the same position, now with
Some(child_fields).
+ stack.push((
+ parquet_path.clone(),
+ current_field,
+ parent_fields,
+ Some(Rc::clone(&child_fields)),
+ ));
+ // Push all the children in reverse to maintain original
schema order due to
+ // stack processing.
+ for child in unprocessed_children.into_iter().rev() {
+ let mut child_path = parquet_path.clone();
+ // Build up a normalized path that we'll use as a key
into the original
+ // int96_fields set above to test if this originated
as int96.
+ child_path.push(".");
+ child_path.push(child.name());
+ // Note that here we push the field onto the stack
using the struct's
+ // new child_fields vector as the field's
parent_fields.
+ stack.push((child_path, child,
Rc::clone(&child_fields), None));
+ }
+ }
+ (DataType::Struct(unprocessed_children),
Some(processed_children)) => {
+ // This is the second time popping off this struct. The
child_fields vector
+ // now contains each field that has been DFS'd into, and
we can construct
+ // the resulting struct with correct child types.
+ let processed_children = processed_children.borrow();
+ assert_eq!(processed_children.len(),
unprocessed_children.len());
+ let processed_struct = Field::new_struct(
+ current_field.name(),
+ processed_children.as_slice(),
+ current_field.is_nullable(),
+ );
+
parent_fields.borrow_mut().push(Arc::new(processed_struct));
+ }
+ (DataType::List(unprocessed_child), None) => {
+ // This is the first time popping off this list. See
struct docs above.
+ let child_fields =
Rc::new(RefCell::new(Vec::with_capacity(1)));
+ stack.push((
+ parquet_path.clone(),
+ current_field,
+ parent_fields,
+ Some(Rc::clone(&child_fields)),
+ ));
+ let mut child_path = parquet_path.clone();
+ // Spark uses a definition for arrays/lists that results
in a group
+ // named "list" that is not maintained when parsing to
Arrow. We just push
+ // this name into the path.
+ child_path.push(".list.");
+ child_path.push(unprocessed_child.name());
+ stack.push((
+ child_path.clone(),
+ unprocessed_child,
+ Rc::clone(&child_fields),
+ None,
+ ));
+ }
+ (DataType::List(_), Some(processed_children)) => {
+ // This is the second time popping off this list. See
struct docs above.
+ let processed_children = processed_children.borrow();
+ assert_eq!(processed_children.len(), 1);
+ let processed_list = Field::new_list(
+ current_field.name(),
+ Arc::clone(&processed_children[0]),
+ current_field.is_nullable(),
+ );
+ parent_fields.borrow_mut().push(Arc::new(processed_list));
+ }
+ (DataType::Map(unprocessed_child, _), None) => {
+ // This is the first time popping off this map. See struct
docs above.
+ let child_fields =
Rc::new(RefCell::new(Vec::with_capacity(1)));
+ stack.push((
+ parquet_path.clone(),
+ current_field,
+ parent_fields,
+ Some(Rc::clone(&child_fields)),
+ ));
+ let mut child_path = parquet_path.clone();
+ child_path.push(".");
+ child_path.push(unprocessed_child.name());
+ stack.push((
+ child_path.clone(),
+ unprocessed_child,
+ Rc::clone(&child_fields),
+ None,
+ ));
+ }
+ (DataType::Map(_, sorted), Some(processed_children)) => {
+ // This is the second time popping off this map. See
struct docs above.
+ let processed_children = processed_children.borrow();
+ assert_eq!(processed_children.len(), 1);
+ let processed_map = Field::new(
+ current_field.name(),
+ DataType::Map(Arc::clone(&processed_children[0]),
*sorted),
+ current_field.is_nullable(),
+ );
+ parent_fields.borrow_mut().push(Arc::new(processed_map));
+ }
+ (DataType::Timestamp(TimeUnit::Nanosecond, None), None)
+ if int96_fields.contains(parquet_path.concat().as_str()) =>
+ // We found a timestamp(nanos) and it originated as int96.
Coerce it to the correct
+ // time_unit.
+ {
+ parent_fields.borrow_mut().push(field_with_new_type(
+ current_field,
+ DataType::Timestamp(*time_unit, None),
+ ));
+ }
+ // Other types can be cloned as they are.
+ _ =>
parent_fields.borrow_mut().push(Arc::clone(current_field)),
}
- _ => Arc::clone(field),
- })
- .collect();
+ }
+ assert_eq!(fields.borrow().len(), file_schema.fields.len());
+ Schema::new_with_metadata(
+ fields.borrow_mut().clone(),
+ file_schema.metadata.clone(),
+ )
+ };
- Some(Schema::new_with_metadata(
- transformed_fields,
- file_schema.metadata.clone(),
- ))
+ Some(transformed_schema)
}
/// Coerces the file schema if the table schema uses a view type.
@@ -1576,3 +1726,220 @@ fn create_max_min_accs(
.collect();
(max_values, min_values)
}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+
+ use super::*;
+
+ use arrow::datatypes::DataType;
+ use parquet::schema::parser::parse_message_type;
+
+ #[test]
+ fn coerce_int96_to_resolution_with_mixed_timestamps() {
+ // Unclear if Spark (or other writer) could generate a file with mixed
timestamps like this,
+ // but we want to test the scenario just in case since it's at least a
valid schema as far
+ // as the Parquet spec is concerned.
+ let spark_schema = "
+ message spark_schema {
+ optional int96 c0;
+ optional int64 c1 (TIMESTAMP(NANOS,true));
+ optional int64 c2 (TIMESTAMP(NANOS,false));
+ optional int64 c3 (TIMESTAMP(MILLIS,true));
+ optional int64 c4 (TIMESTAMP(MILLIS,false));
+ optional int64 c5 (TIMESTAMP(MICROS,true));
+ optional int64 c6 (TIMESTAMP(MICROS,false));
+ }
+ ";
+
+ let schema = parse_message_type(spark_schema).expect("should parse
schema");
+ let descr = SchemaDescriptor::new(Arc::new(schema));
+
+ let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
+
+ let result =
+ coerce_int96_to_resolution(&descr, &arrow_schema,
&TimeUnit::Microsecond)
+ .unwrap();
+
+ // Only the first field (c0) should be converted to a microsecond
timestamp because it's the
+ // only timestamp that originated from an INT96.
+ let expected_schema = Schema::new(vec![
+ Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None),
true),
+ Field::new(
+ "c1",
+ DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
+ true,
+ ),
+ Field::new("c2", DataType::Timestamp(TimeUnit::Nanosecond, None),
true),
+ Field::new(
+ "c3",
+ DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
+ true,
+ ),
+ Field::new("c4", DataType::Timestamp(TimeUnit::Millisecond, None),
true),
+ Field::new(
+ "c5",
+ DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
+ true,
+ ),
+ Field::new("c6", DataType::Timestamp(TimeUnit::Microsecond, None),
true),
+ ]);
+
+ assert_eq!(result, expected_schema);
+ }
+
+ #[test]
+ fn coerce_int96_to_resolution_with_nested_types() {
+ // This schema is derived from Comet's CometFuzzTestSuite
ParquetGenerator only using int96
+ // primitive types with generateStruct, generateArray, and generateMap
set to true, with one
+ // additional field added to c4's struct to make sure all fields in a
struct get modified.
+ //
https://github.com/apache/datafusion-comet/blob/main/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala
+ let spark_schema = "
+ message spark_schema {
+ optional int96 c0;
+ optional group c1 {
+ optional int96 c0;
+ }
+ optional group c2 {
+ optional group c0 (LIST) {
+ repeated group list {
+ optional int96 element;
+ }
+ }
+ }
+ optional group c3 (LIST) {
+ repeated group list {
+ optional int96 element;
+ }
+ }
+ optional group c4 (LIST) {
+ repeated group list {
+ optional group element {
+ optional int96 c0;
+ optional int96 c1;
+ }
+ }
+ }
+ optional group c5 (MAP) {
+ repeated group key_value {
+ required int96 key;
+ optional int96 value;
+ }
+ }
+ optional group c6 (LIST) {
+ repeated group list {
+ optional group element (MAP) {
+ repeated group key_value {
+ required int96 key;
+ optional int96 value;
+ }
+ }
+ }
+ }
+ }
+ ";
+
+ let schema = parse_message_type(spark_schema).expect("should parse
schema");
+ let descr = SchemaDescriptor::new(Arc::new(schema));
+
+ let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
+
+ let result =
+ coerce_int96_to_resolution(&descr, &arrow_schema,
&TimeUnit::Microsecond)
+ .unwrap();
+
+ let expected_schema = Schema::new(vec![
+ Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None),
true),
+ Field::new_struct(
+ "c1",
+ vec![Field::new(
+ "c0",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ true,
+ )],
+ true,
+ ),
+ Field::new_struct(
+ "c2",
+ vec![Field::new_list(
+ "c0",
+ Field::new(
+ "element",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ true,
+ ),
+ true,
+ )],
+ true,
+ ),
+ Field::new_list(
+ "c3",
+ Field::new(
+ "element",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ true,
+ ),
+ true,
+ ),
+ Field::new_list(
+ "c4",
+ Field::new_struct(
+ "element",
+ vec![
+ Field::new(
+ "c0",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ true,
+ ),
+ Field::new(
+ "c1",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ true,
+ ),
+ ],
+ true,
+ ),
+ true,
+ ),
+ Field::new_map(
+ "c5",
+ "key_value",
+ Field::new(
+ "key",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ false,
+ ),
+ Field::new(
+ "value",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ true,
+ ),
+ false,
+ true,
+ ),
+ Field::new_list(
+ "c6",
+ Field::new_map(
+ "element",
+ "key_value",
+ Field::new(
+ "key",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ false,
+ ),
+ Field::new(
+ "value",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ true,
+ ),
+ false,
+ true,
+ ),
+ true,
+ ),
+ ]);
+
+ assert_eq!(result, expected_schema);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]