TheBuilderJR opened a new issue, #14757:
URL: https://github.com/apache/datafusion/issues/14757
### Describe the bug
I'd expect as I add fields to structs, I should be able to cast one into
another. You can see in the repro below this doesn't seem to be allowed:
### To Reproduce
```
use std::fs;
use std::sync::Arc;
use datafusion::prelude::*;
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::array::{Array, StringArray, StructArray,
TimestampMillisecondArray, Float64Array};
use datafusion::datasource::listing::{ListingOptions, ListingTable,
ListingTableConfig, ListingTableUrl};
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::dataframe::DataFrameWriteOptions;
#[tokio::test]
async fn test_datafusion_schema_evolution_with_compaction() -> Result<(),
Box<dyn std::error::Error>> {
let ctx = SessionContext::new();
let schema1 = Arc::new(Schema::new(vec![
Field::new("component", DataType::Utf8, true),
Field::new("message", DataType::Utf8, true),
Field::new("stack", DataType::Utf8, true),
Field::new("timestamp", DataType::Utf8, true),
Field::new(
"timestamp_utc",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new(
"additionalInfo",
DataType::Struct(vec![
Field::new("location", DataType::Utf8, true),
Field::new(
"timestamp_utc",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
].into()),
true,
),
]));
let batch1 = RecordBatch::try_new(
schema1.clone(),
vec![
Arc::new(StringArray::from(vec![Some("component1")])),
Arc::new(StringArray::from(vec![Some("message1")])),
Arc::new(StringArray::from(vec![Some("stack_trace")])),
Arc::new(StringArray::from(vec![Some("2025-02-18T00:00:00Z")])),
Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("location", DataType::Utf8, true)),
Arc::new(StringArray::from(vec![Some("USA")])) as
Arc<dyn Array>,
),
(
Arc::new(Field::new(
"timestamp_utc",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
)),
Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
),
])),
],
)?;
let path1 = "test_data1.parquet";
let _ = fs::remove_file(path1);
let df1 = ctx.read_batch(batch1)?;
df1.write_parquet(
path1,
DataFrameWriteOptions::default()
.with_single_file_output(true)
.with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
None
).await?;
let schema2 = Arc::new(Schema::new(vec![
Field::new("component", DataType::Utf8, true),
Field::new("message", DataType::Utf8, true),
Field::new("stack", DataType::Utf8, true),
Field::new("timestamp", DataType::Utf8, true),
Field::new(
"timestamp_utc",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new(
"additionalInfo",
DataType::Struct(vec![
Field::new("location", DataType::Utf8, true),
Field::new(
"timestamp_utc",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new(
"reason",
DataType::Struct(vec![
Field::new("_level", DataType::Float64, true),
Field::new(
"details",
DataType::Struct(vec![
Field::new("rurl", DataType::Utf8, true),
Field::new("s", DataType::Float64, true),
Field::new("t", DataType::Utf8, true),
].into()),
true,
),
].into()),
true,
),
].into()),
true,
),
]));
let batch2 = RecordBatch::try_new(
schema2.clone(),
vec![
Arc::new(StringArray::from(vec![Some("component1")])),
Arc::new(StringArray::from(vec![Some("message1")])),
Arc::new(StringArray::from(vec![Some("stack_trace")])),
Arc::new(StringArray::from(vec![Some("2025-02-18T00:00:00Z")])),
Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("location", DataType::Utf8, true)),
Arc::new(StringArray::from(vec![Some("USA")])) as
Arc<dyn Array>,
),
(
Arc::new(Field::new(
"timestamp_utc",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
)),
Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
),
(
Arc::new(Field::new(
"reason",
DataType::Struct(vec![
Field::new("_level", DataType::Float64, true),
Field::new(
"details",
DataType::Struct(vec![
Field::new("rurl", DataType::Utf8, true),
Field::new("s", DataType::Float64, true),
Field::new("t", DataType::Utf8, true),
].into()),
true,
),
].into()),
true,
)),
Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("_level", DataType::Float64,
true)),
Arc::new(Float64Array::from(vec![Some(1.5)])) as
Arc<dyn Array>,
),
(
Arc::new(Field::new(
"details",
DataType::Struct(vec![
Field::new("rurl", DataType::Utf8, true),
Field::new("s", DataType::Float64, true),
Field::new("t", DataType::Utf8, true),
].into()),
true,
)),
Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("rurl",
DataType::Utf8, true)),
Arc::new(StringArray::from(vec![Some("https://example.com")])) as Arc<dyn
Array>,
),
(
Arc::new(Field::new("s",
DataType::Float64, true)),
Arc::new(Float64Array::from(vec![Some(3.14)])) as Arc<dyn Array>,
),
(
Arc::new(Field::new("t", DataType::Utf8,
true)),
Arc::new(StringArray::from(vec![Some("data")])) as Arc<dyn Array>,
),
])),
),
])),
),
])),
],
)?;
let path2 = "test_data2.parquet";
let _ = fs::remove_file(path2);
let df2 = ctx.read_batch(batch2)?;
df2.write_parquet(
path2,
DataFrameWriteOptions::default()
.with_single_file_output(true)
.with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
None
).await?;
let paths_str = vec![path1.to_string(), path2.to_string()];
let config = ListingTableConfig::new_with_multi_paths(
paths_str
.into_iter()
.map(|p| ListingTableUrl::parse(&p))
.collect::<Result<Vec<_>, _>>()?
)
.with_schema(schema2.as_ref().clone().into())
.infer(&ctx.state()).await?;
let config = ListingTableConfig {
options: Some(ListingOptions {
file_sort_order: vec![vec![
col("timestamp_utc").sort(true, true),
]],
..config.options.unwrap_or_else(||
ListingOptions::new(Arc::new(ParquetFormat::default())))
}),
..config
};
let listing_table = ListingTable::try_new(config)?;
ctx.register_table("events", Arc::new(listing_table))?;
let df = ctx.sql("SELECT * FROM events ORDER BY timestamp_utc").await?;
let results = df.clone().collect().await?;
assert_eq!(results[0].num_rows(), 2);
let compacted_path = "test_data_compacted.parquet";
let _ = fs::remove_file(compacted_path);
df.write_parquet(
compacted_path,
DataFrameWriteOptions::default()
.with_single_file_output(true)
.with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
None
).await?;
let new_ctx = SessionContext::new();
let config =
ListingTableConfig::new_with_multi_paths(vec![ListingTableUrl::parse(compacted_path)?])
.with_schema(schema2.as_ref().clone().into())
.infer(&new_ctx.state()).await?;
let listing_table = ListingTable::try_new(config)?;
new_ctx.register_table("events", Arc::new(listing_table))?;
let df = new_ctx.sql("SELECT * FROM events ORDER BY
timestamp_utc").await?;
let compacted_results = df.collect().await?;
assert_eq!(compacted_results[0].num_rows(), 2);
assert_eq!(results, compacted_results);
let _ = fs::remove_file(path1);
let _ = fs::remove_file(path2);
let _ = fs::remove_file(compacted_path);
Ok(())
}
```
produces
```
thread 'test_datafusion_schema_evolution_with_compaction' panicked at
/Users/bobren/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-array-54.2.0/src/array/struct_array.rs:91:46:
called `Result::unwrap()` on an `Err` value: InvalidArgumentError("Incorrect
datatype for StructArray field \"timestamp_utc\", expected
Timestamp(Millisecond, Some(\"UTC\")) got Timestamp(Millisecond, None)")
```
### Expected behavior
i expected that test to pass
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]