mbutrovich commented on code in PR #4229:
URL: https://github.com/apache/datafusion-comet/pull/4229#discussion_r3252804933
##########
native/core/src/parquet/parquet_support.rs:
##########
@@ -93,6 +93,9 @@ pub struct SparkParquetOptions {
/// requested schema does carry ids raises a runtime error rather than
silently
/// producing nulls (mirrors
`spark.sql.parquet.fieldId.read.ignoreMissing`).
pub ignore_missing_field_id: bool,
+ /// Whether type promotion (schema evolution) is allowed, e.g. INT32 ->
INT64,
+ /// FLOAT -> DOUBLE. Mirrors spark.comet.schemaEvolution.enabled.
Review Comment:
`spark.comet.schemaEvolution.enabled` was removed in this PR. Should this
comment point at the per-version `ShimCometConf.COMET_SCHEMA_EVOLUTION_ENABLED`
constant instead?
##########
native/core/src/parquet/schema_adapter.rs:
##########
@@ -750,12 +1030,119 @@ impl SparkPhysicalExprAdapter {
}
}
+/// Defers a Parquet type-promotion rejection to runtime: returns an empty
array
+/// when the input batch has no rows, and raises `ParquetSchemaConvert`
otherwise.
+///
+/// Mirrors Spark's vectorized reader, which only invokes
+/// `ParquetVectorUpdaterFactory.getUpdater` while decoding a row group. A
+/// Parquet file with no row groups (e.g. one written from an empty DataFrame)
+/// never triggers the per-row-group check, so a partition mixing such a file
+/// with another whose schema would otherwise fail the type-promotion check
+/// (SPARK-26709) is still readable.
+#[derive(Debug, Eq)]
+struct RejectOnNonEmpty {
+ child: Arc<dyn PhysicalExpr>,
+ target_field: FieldRef,
+ column: String,
+ physical_type: String,
Review Comment:
`parquet_primitive_name` returns `&'static str` (line 266), but the field
stores `String` and every construction site (`690`, `725`, `768`, `801`, `871`)
calls `.to_string()` on the static. Any reason not to make the field `&'static
str` and skip the allocation? On the same struct, would `column` and
`spark_type` work as `Arc<str>` so `with_new_children` (line 1123) does not
have to clone three strings each time?
##########
native/core/src/parquet/schema_adapter.rs:
##########
@@ -523,77 +617,263 @@ impl SparkPhysicalExprAdapter {
let target_type = cast.target_field().data_type();
// Reject reading a string/binary Parquet column as anything other
- // than string, binary, or a binary-encoded decimal. This mirrors
- // Spark's TypeUtil.checkParquetType for the BINARY case (lines
- // 208-221): a BINARY (or UTF8-annotated BINARY) physical column is
- // only readable as StringType, BinaryType, or a binary-encoded
- // decimal; every other target type (numeric, boolean, date,
- // timestamp, ...) raises SchemaColumnConvertNotSupportedException.
+ // than string or binary. Mirrors Spark's
+ // `ParquetVectorUpdaterFactory.getUpdater` `BINARY` case
+ // (lines 199-205): a Parquet BINARY (or UTF8-annotated BINARY)
+ // column is readable as StringType / BinaryType, or as DecimalType
+ // only when the column carries a `DecimalLogicalTypeAnnotation`
+ // (`canReadAsDecimal` / `canReadAsBinaryDecimal`). Arrow already
+ // surfaces decimal-annotated BINARY as `DataType::Decimal128`, so
+ // observing `physical_type == Binary` here unambiguously means a
+ // non-decimal source — which Spark rejects with
+ // `SchemaColumnConvertNotSupportedException`.
//
// Without this guard, Spark's Cast below (in is_adapting_schema
// mode) falls through to DataFusion's cast, which silently parses
// the bytes (returning nulls for non-numeric strings, parsing
// date/timestamp/boolean strings, or in some paths reinterpreting
- // raw bytes). See issue #4088.
+ // raw bytes). For numeric / decimal targets the cast can't run at
+ // all and Arrow's `RecordBatch::try_new` raises a generic
+ // `column types must match schema types` error instead of the
+ // Spark-equivalent `SchemaColumnConvertNotSupportedException`.
+ // See issues #4088 and #4351.
if matches!(
physical_type,
DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary |
DataType::LargeBinary
) && !matches!(
target_type,
- DataType::Utf8
- | DataType::LargeUtf8
- | DataType::Binary
- | DataType::LargeBinary
- | DataType::Decimal128(_, _)
- | DataType::Decimal256(_, _)
+ DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary |
DataType::LargeBinary
) {
return Err(DataFusionError::External(Box::new(
SparkError::ParquetSchemaConvert {
file_path: String::new(),
- column: cast.input_field().name().to_string(),
- physical_type: physical_type.to_string(),
- spark_type: target_type.to_string(),
+ column: format!("[{}]", cast.input_field().name()),
+ physical_type:
parquet_primitive_name(physical_type).to_string(),
+ spark_type: spark_catalog_name(target_type),
},
)));
}
- // Decimal-to-decimal scale-narrowing check.
- // Reject reads where the read schema has a smaller scale than the
- // file's, because Spark's Cast below would silently truncate
- // fractional digits, producing wrong values. This matches the
- // unconditionally-lossy case in issue #4089 (e.g. Decimal(10,2)
read
- // as Decimal(5,0)).
- //
- // Other decimal mismatches are intentionally NOT rejected here,
- // even though Spark's vectorized reader would reject them via
- // `ParquetVectorUpdaterFactory#isDecimalTypeMatched` (which
requires
- // exact precision and scale):
+ // Reject reading a BOOLEAN/INT/FLOAT/DOUBLE Parquet column as
+ // StringType/BinaryType. Spark's
`ParquetVectorUpdaterFactory.getUpdater`
+ // has no `int -> string` etc. updater for these primitive cases,
+ // and without this guard Spark's Cast below silently produces
+ // string values from the numeric column. FIXED_LEN_BYTE_ARRAY,
+ // dictionary-encoded, and decimal-typed physical columns are NOT
+ // rejected here because Spark either allows the read or it falls
+ // outside the documented mismatch set.
//
- // - Precision-only changes with the same scale (e.g. Decimal(5,2)
- // read as Decimal(3,2)): Spark 4.0's parquet-mr fallback path
- // (PARQUET_VECTORIZED_READER_ENABLED=false) and the vectorized
- // type-widening path produce null on per-value overflow, which
- // DataFusion's cast already does in the adapting-schema path.
+ // Deferred to runtime via `RejectOnNonEmpty` so files with no row
+ // groups pass silently (matching Spark's per-row-group check) and
+ // the JVM shim translates the error to
+ // `SchemaColumnConvertNotSupportedException`.
+ let physical_is_primitive_numeric = matches!(
+ physical_type,
+ DataType::Boolean
+ | DataType::Int8
+ | DataType::Int16
+ | DataType::Int32
+ | DataType::Int64
+ | DataType::Float32
+ | DataType::Float64
+ );
+ if physical_is_primitive_numeric
+ && matches!(
+ target_type,
+ DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary |
DataType::LargeBinary
+ )
+ {
+ let rejection: Arc<dyn PhysicalExpr> =
Arc::new(RejectOnNonEmpty {
+ child,
+ target_field: Arc::clone(cast.target_field()),
+ column: format!("[{}]", cast.input_field().name()),
+ physical_type:
parquet_primitive_name(physical_type).to_string(),
+ spark_type: spark_catalog_name(target_type),
+ });
+ return Ok(Transformed::yes(rejection));
+ }
+
+ // Decimal-to-decimal narrowing check.
+ // Reject reads where the requested decimal type cannot represent
+ // every value in the file's decimal column. Spark's vectorized
+ // reader rejects this in
`ParquetVectorUpdaterFactory.canReadAsDecimal`
+ // -> `isDecimalTypeMatched` (the `DecimalLogicalTypeAnnotation`
+ // branch): the read is allowed only when `scaleIncrease >= 0` AND
+ // `precisionIncrease >= scaleIncrease`. Equivalently:
+ // - `dst_scale >= src_scale` (no scale narrowing)
+ // - `dst_precision - dst_scale >= src_precision - src_scale`
+ // (no integer-precision narrowing)
+ // Either condition's failure means the cast would silently drop
+ // fractional digits or overflow on integer-side magnitude. See
+ // issues #4089 and #4343.
//
- // - Scale widening (e.g. Decimal(10,2) read as Decimal(10,4)): the
- // cast is lossless (no truncation, no overflow), so allowing it
- // here is strictly more permissive than Spark's vectorized
reader
- // without risking wrong values.
- if let (DataType::Decimal128(_src_p, src_s),
DataType::Decimal128(_dst_p, dst_s)) =
+ // Scale widening that exceeds available precision (e.g.
+ // Decimal(5, 2) -> Decimal(5, 3): scaleIncrease=1,
+ // precisionIncrease=0) is also rejected by the second condition,
+ // because it would shift integer digits into the fractional part
+ // and lose the most-significant digit.
+ if let (DataType::Decimal128(src_p, src_s),
DataType::Decimal128(dst_p, dst_s)) =
(physical_type, target_type)
{
- if dst_s < src_s {
+ let src_int_precision = i32::from(*src_p) - i32::from(*src_s);
+ let dst_int_precision = i32::from(*dst_p) - i32::from(*dst_s);
+ if dst_s < src_s || dst_int_precision < src_int_precision {
return Err(DataFusionError::External(Box::new(
SparkError::ParquetSchemaConvert {
file_path: String::new(),
- column: cast.input_field().name().to_string(),
- physical_type: physical_type.to_string(),
- spark_type: target_type.to_string(),
+ column: format!("[{}]", cast.input_field().name()),
+ physical_type:
parquet_primitive_name(physical_type).to_string(),
+ spark_type: spark_catalog_name(target_type),
},
)));
}
}
+ // Integer-to-decimal narrowing check.
+ // Reject reads where the requested decimal type cannot represent
+ // every value of the integer Parquet column. Spark's vectorized
+ // reader rejects this in
`ParquetVectorUpdaterFactory.canReadAsDecimal`
+ // (via `isDecimalTypeMatched`'s integer branch): for an INT32
+ // column the requested decimal must satisfy
+ // `precision - scale >= 10` (matching `DecimalType.IntDecimal`),
+ // and for INT64 `precision - scale >= 20` (matching
+ // `DecimalType.LongDecimal`). Without this guard the cast silently
+ // truncates (e.g. INT32 -> Decimal(5, 0) for a value of 100000)
+ // or rescales values to fit, producing wrong answers. See #4344.
+ //
+ // Unlike the type-promotion check below, this rejection is
+ // unconditional in Spark across all supported versions, so we
+ // reject at plan time (no `allow_type_promotion` gating). Empty
+ // Parquet files do not exercise this path because the cast is
+ // only constructed when the file's column has a different type
+ // than the requested schema; there is no SPARK-26709-style
+ // empty-file scenario for integer-to-decimal that we know of.
+ let int_decimal_min_int_precision = match physical_type {
+ DataType::Int8 | DataType::Int16 | DataType::Int32 =>
Some(10i32),
+ DataType::Int64 => Some(20i32),
+ _ => None,
+ };
+ if let Some(min_int_precision) = int_decimal_min_int_precision {
+ let dst_precision_scale = match target_type {
+ DataType::Decimal128(p, s) | DataType::Decimal256(p, s) =>
Some((*p, *s)),
+ _ => None,
+ };
+ if let Some((dst_p, dst_s)) = dst_precision_scale {
+ let dst_int_precision = i32::from(dst_p) -
i32::from(dst_s);
+ if dst_int_precision < min_int_precision {
+ return Err(DataFusionError::External(Box::new(
+ SparkError::ParquetSchemaConvert {
+ file_path: String::new(),
+ column: format!("[{}]",
cast.input_field().name()),
+ physical_type:
parquet_primitive_name(physical_type).to_string(),
+ spark_type: spark_catalog_name(target_type),
+ },
+ )));
+ }
+ }
+ }
+
+ // Type promotion (widening) check.
+ // When allow_type_promotion is false, reject the three widenings
+ // (INT32→INT64, FLOAT→DOUBLE, INT32→DOUBLE) that Spark 3.x's
+ // vectorized reader rejects. The flag is set from Comet's
+ // per-Spark-version constant in ShimCometConf (false on 3.x,
+ // true on 4.x). The companion check below handles conversions
+ // Spark rejects on every supported version.
+ //
+ // The rejection is deferred to runtime via `RejectOnNonEmpty` so
+ // that files with no row groups (e.g. an empty DataFrame written
+ // to Parquet) pass through unaffected, matching Spark's
+ // per-row-group `ParquetVectorUpdaterFactory.getUpdater` check
+ // (SPARK-26709).
+ if !self.parquet_options.allow_type_promotion {
+ let is_disallowed_promotion = matches!(
+ (physical_type, target_type),
+ (DataType::Int32, DataType::Int64)
+ | (DataType::Float32, DataType::Float64)
+ | (DataType::Int32, DataType::Float64)
+ );
+ if is_disallowed_promotion {
+ let rejection: Arc<dyn PhysicalExpr> =
Arc::new(RejectOnNonEmpty {
+ child: Arc::clone(&child),
+ target_field: Arc::clone(cast.target_field()),
+ column: format!("[{}]", cast.input_field().name()),
+ physical_type:
parquet_primitive_name(physical_type).to_string(),
+ spark_type: spark_catalog_name(target_type),
+ });
+ return Ok(Transformed::yes(rejection));
+ }
+ }
+
+ // Reject primitive Parquet conversions that Spark's vectorized
+ // reader rejects on every supported version (i.e. cases that have
+ // no matching branch in `ParquetVectorUpdaterFactory.getUpdater`).
+ // Without this guard the `Cast` below silently truncates,
+ // overflows, or reinterprets values:
+ //
+ // - `INT64 -> Int*` truncates to the lower bits.
+ // - `INT64 -> Float*` and `INT32 -> Float32` lose precision.
+ // - `Float* -> Int*` and `Float64 -> Float32` truncate /
overflow.
+ // - `INT32 -> Timestamp` reinterprets the int as epoch seconds
+ // (only DATE-annotated INT32 reaches Arrow as `Date32`, so a
+ // raw `Int32 -> Timestamp` here means there was no DATE
+ // annotation and Spark would reject).
+ // - `INT64 -> Date32` / `INT64 -> Timestamp` similarly:
timestamp-
+ // and date-annotated INT64 columns surface as Arrow
+ // `Timestamp` / `Date32`, so a raw `Int64` source means no
+ // annotation.
+ // - `Date32 -> Timestamp(_, Some(_))` (LTZ): Spark only allows
+ // `Date -> TimestampNTZ` via `DateToTimestampNTZUpdater`.
+ // - `Timestamp -> Date32`: no Timestamp updater branches into
+ // Date target.
+ //
+ // Mirror the type-promotion check above and defer to runtime so
+ // empty Parquet files pass through (SPARK-26709). See #4297.
+ let is_spark_rejected_conversion = matches!(
+ (physical_type, target_type),
+ // Long -> narrower int.
+ (
+ DataType::Int64,
+ DataType::Int8 | DataType::Int16 | DataType::Int32,
+ )
+ // Long -> floating point.
Review Comment:
A lot of these inline comments say the same thing as the match arm directly
below them, e.g. `// Long -> narrower int.` above `(DataType::Int64,
DataType::Int8 | DataType::Int16 | DataType::Int32)`. The arms read fine on
their own. The ones that carry actual context (the `IntegerToDoubleUpdater`
note at 844, the "raw INT32; DATE-annotated columns surface as Date32"
parenthetical at 849, the SPARK-26709 references) seem worth keeping. Would
dropping the others make this block easier to read?
##########
native/core/src/parquet/schema_adapter.rs:
##########
@@ -523,77 +617,263 @@ impl SparkPhysicalExprAdapter {
let target_type = cast.target_field().data_type();
// Reject reading a string/binary Parquet column as anything other
- // than string, binary, or a binary-encoded decimal. This mirrors
- // Spark's TypeUtil.checkParquetType for the BINARY case (lines
- // 208-221): a BINARY (or UTF8-annotated BINARY) physical column is
- // only readable as StringType, BinaryType, or a binary-encoded
- // decimal; every other target type (numeric, boolean, date,
- // timestamp, ...) raises SchemaColumnConvertNotSupportedException.
+ // than string or binary. Mirrors Spark's
+ // `ParquetVectorUpdaterFactory.getUpdater` `BINARY` case
+ // (lines 199-205): a Parquet BINARY (or UTF8-annotated BINARY)
+ // column is readable as StringType / BinaryType, or as DecimalType
+ // only when the column carries a `DecimalLogicalTypeAnnotation`
+ // (`canReadAsDecimal` / `canReadAsBinaryDecimal`). Arrow already
+ // surfaces decimal-annotated BINARY as `DataType::Decimal128`, so
+ // observing `physical_type == Binary` here unambiguously means a
+ // non-decimal source — which Spark rejects with
+ // `SchemaColumnConvertNotSupportedException`.
//
// Without this guard, Spark's Cast below (in is_adapting_schema
// mode) falls through to DataFusion's cast, which silently parses
// the bytes (returning nulls for non-numeric strings, parsing
// date/timestamp/boolean strings, or in some paths reinterpreting
- // raw bytes). See issue #4088.
+ // raw bytes). For numeric / decimal targets the cast can't run at
+ // all and Arrow's `RecordBatch::try_new` raises a generic
+ // `column types must match schema types` error instead of the
+ // Spark-equivalent `SchemaColumnConvertNotSupportedException`.
+ // See issues #4088 and #4351.
if matches!(
physical_type,
DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary |
DataType::LargeBinary
) && !matches!(
target_type,
- DataType::Utf8
- | DataType::LargeUtf8
- | DataType::Binary
- | DataType::LargeBinary
- | DataType::Decimal128(_, _)
- | DataType::Decimal256(_, _)
+ DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary |
DataType::LargeBinary
) {
return Err(DataFusionError::External(Box::new(
SparkError::ParquetSchemaConvert {
file_path: String::new(),
- column: cast.input_field().name().to_string(),
- physical_type: physical_type.to_string(),
- spark_type: target_type.to_string(),
+ column: format!("[{}]", cast.input_field().name()),
Review Comment:
Seven call sites do `format!("[{}]", cast.input_field().name())`. Is there a
reason the bracket framing has to live at the call site rather than inside
`SparkError::ParquetSchemaConvert`'s `Display` impl (or a constructor on the
variant)?
##########
native/core/src/parquet/schema_adapter.rs:
##########
@@ -523,77 +617,263 @@ impl SparkPhysicalExprAdapter {
let target_type = cast.target_field().data_type();
// Reject reading a string/binary Parquet column as anything other
- // than string, binary, or a binary-encoded decimal. This mirrors
- // Spark's TypeUtil.checkParquetType for the BINARY case (lines
- // 208-221): a BINARY (or UTF8-annotated BINARY) physical column is
- // only readable as StringType, BinaryType, or a binary-encoded
- // decimal; every other target type (numeric, boolean, date,
- // timestamp, ...) raises SchemaColumnConvertNotSupportedException.
+ // than string or binary. Mirrors Spark's
+ // `ParquetVectorUpdaterFactory.getUpdater` `BINARY` case
+ // (lines 199-205): a Parquet BINARY (or UTF8-annotated BINARY)
+ // column is readable as StringType / BinaryType, or as DecimalType
+ // only when the column carries a `DecimalLogicalTypeAnnotation`
+ // (`canReadAsDecimal` / `canReadAsBinaryDecimal`). Arrow already
+ // surfaces decimal-annotated BINARY as `DataType::Decimal128`, so
+ // observing `physical_type == Binary` here unambiguously means a
+ // non-decimal source — which Spark rejects with
+ // `SchemaColumnConvertNotSupportedException`.
//
// Without this guard, Spark's Cast below (in is_adapting_schema
// mode) falls through to DataFusion's cast, which silently parses
// the bytes (returning nulls for non-numeric strings, parsing
// date/timestamp/boolean strings, or in some paths reinterpreting
- // raw bytes). See issue #4088.
+ // raw bytes). For numeric / decimal targets the cast can't run at
+ // all and Arrow's `RecordBatch::try_new` raises a generic
+ // `column types must match schema types` error instead of the
+ // Spark-equivalent `SchemaColumnConvertNotSupportedException`.
+ // See issues #4088 and #4351.
if matches!(
physical_type,
DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary |
DataType::LargeBinary
) && !matches!(
target_type,
- DataType::Utf8
- | DataType::LargeUtf8
- | DataType::Binary
- | DataType::LargeBinary
- | DataType::Decimal128(_, _)
- | DataType::Decimal256(_, _)
+ DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary |
DataType::LargeBinary
) {
return Err(DataFusionError::External(Box::new(
SparkError::ParquetSchemaConvert {
file_path: String::new(),
- column: cast.input_field().name().to_string(),
- physical_type: physical_type.to_string(),
- spark_type: target_type.to_string(),
+ column: format!("[{}]", cast.input_field().name()),
+ physical_type:
parquet_primitive_name(physical_type).to_string(),
+ spark_type: spark_catalog_name(target_type),
},
)));
}
- // Decimal-to-decimal scale-narrowing check.
- // Reject reads where the read schema has a smaller scale than the
- // file's, because Spark's Cast below would silently truncate
- // fractional digits, producing wrong values. This matches the
- // unconditionally-lossy case in issue #4089 (e.g. Decimal(10,2)
read
- // as Decimal(5,0)).
- //
- // Other decimal mismatches are intentionally NOT rejected here,
- // even though Spark's vectorized reader would reject them via
- // `ParquetVectorUpdaterFactory#isDecimalTypeMatched` (which
requires
- // exact precision and scale):
+ // Reject reading a BOOLEAN/INT/FLOAT/DOUBLE Parquet column as
+ // StringType/BinaryType. Spark's
`ParquetVectorUpdaterFactory.getUpdater`
+ // has no `int -> string` etc. updater for these primitive cases,
+ // and without this guard Spark's Cast below silently produces
+ // string values from the numeric column. FIXED_LEN_BYTE_ARRAY,
+ // dictionary-encoded, and decimal-typed physical columns are NOT
+ // rejected here because Spark either allows the read or it falls
+ // outside the documented mismatch set.
//
- // - Precision-only changes with the same scale (e.g. Decimal(5,2)
- // read as Decimal(3,2)): Spark 4.0's parquet-mr fallback path
- // (PARQUET_VECTORIZED_READER_ENABLED=false) and the vectorized
- // type-widening path produce null on per-value overflow, which
- // DataFusion's cast already does in the adapting-schema path.
+ // Deferred to runtime via `RejectOnNonEmpty` so files with no row
+ // groups pass silently (matching Spark's per-row-group check) and
+ // the JVM shim translates the error to
+ // `SchemaColumnConvertNotSupportedException`.
+ let physical_is_primitive_numeric = matches!(
+ physical_type,
+ DataType::Boolean
+ | DataType::Int8
+ | DataType::Int16
+ | DataType::Int32
+ | DataType::Int64
+ | DataType::Float32
+ | DataType::Float64
+ );
+ if physical_is_primitive_numeric
+ && matches!(
+ target_type,
+ DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary |
DataType::LargeBinary
+ )
+ {
+ let rejection: Arc<dyn PhysicalExpr> =
Arc::new(RejectOnNonEmpty {
Review Comment:
All three sites build the same five-field struct literal with the same
`column: format!("[{}]", cast.input_field().name())` shape and the same
`parquet_primitive_name(...).to_string()` and `spark_catalog_name(...)` calls.
Would a small constructor like
```rust
fn make_rejection(child: Arc<dyn PhysicalExpr>, cast: &CastColumnExpr,
physical: &DataType, target: &DataType) -> Arc<dyn
PhysicalExpr>
```
pull the duplication out? The four `SparkError::ParquetSchemaConvert`
literals at `574`, `648`, `722`, `765` repeat the same shape and could share a
sibling helper.
##########
native/core/src/parquet/schema_adapter.rs:
##########
@@ -774,25 +1161,590 @@ mod test {
use std::fs::File;
use std::sync::Arc;
+ /// Reading a non-BINARY Parquet column as `StringType` must raise the same
+ /// `_LEGACY_ERROR_TEMP_2063`-shaped error as Spark's vectorized reader
+ /// (`ParquetVectorUpdaterFactory.getUpdater` has no INT32 -> string
updater).
#[tokio::test]
- async fn parquet_roundtrip_int_as_string() -> Result<(), DataFusionError> {
- let file_schema = Arc::new(Schema::new(vec![
- Field::new("id", DataType::Int32, false),
- Field::new("name", DataType::Utf8, false),
- ]));
+ async fn parquet_int_read_as_string_errors() -> Result<(),
DataFusionError> {
+ let file_schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Int32, false)]));
+ let values = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc<dyn
arrow::array::Array>;
+ let batch = RecordBatch::try_new(Arc::clone(&file_schema),
vec![values])?;
- let ids = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc<dyn
arrow::array::Array>;
- let names = Arc::new(StringArray::from(vec!["Alice", "Bob",
"Charlie"]))
- as Arc<dyn arrow::array::Array>;
- let batch = RecordBatch::try_new(Arc::clone(&file_schema), vec![ids,
names])?;
+ let required_schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Utf8, false)]));
- let required_schema = Arc::new(Schema::new(vec![
- Field::new("id", DataType::Utf8, false),
- Field::new("name", DataType::Utf8, false),
- ]));
+ let err = roundtrip(&batch, required_schema)
+ .await
+ .expect_err("expected ParquetSchemaConvert error for INT32 ->
string");
+ let msg = err.to_string();
+ assert!(
+ msg.contains("Column: [[a]]")
+ && msg.contains("Expected: string")
+ && msg.contains("Found: INT32"),
+ "unexpected error: {msg}"
+ );
+ Ok(())
+ }
+
+ /// Companion: BINARY (string physical) read as IntegerType must raise the
+ /// same Spark-compatible error.
+ #[tokio::test]
+ async fn parquet_string_read_as_int_errors() -> Result<(),
DataFusionError> {
+ let file_schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Utf8, false)]));
+ let values =
+ Arc::new(StringArray::from(vec!["bcd", "efg"])) as Arc<dyn
arrow::array::Array>;
+ let batch = RecordBatch::try_new(Arc::clone(&file_schema),
vec![values])?;
+
+ let required_schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Int32, false)]));
+
+ let err = roundtrip(&batch, required_schema)
+ .await
+ .expect_err("expected ParquetSchemaConvert error for BINARY ->
int");
+ let msg = err.to_string();
+ assert!(
+ msg.contains("Column: [[a]]")
+ && msg.contains("Expected: int")
+ && msg.contains("Found: BINARY"),
+ "unexpected error: {msg}"
+ );
+ Ok(())
+ }
+
+ /// Reading a plain BINARY Parquet column (no
`DecimalLogicalTypeAnnotation`)
+ /// as `DecimalType` must raise a Spark-compatible `ParquetSchemaConvert`
+ /// error. Spark's `ParquetVectorUpdaterFactory.getUpdater` BINARY case
+ /// only allows the read when `canReadAsDecimal` / `canReadAsBinaryDecimal`
+ /// returns true, both of which require the column to carry a
+ /// `DecimalLogicalTypeAnnotation`. See #4351.
+ #[tokio::test]
+ async fn parquet_binary_read_as_decimal_errors() -> Result<(),
DataFusionError> {
+ let file_schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Binary, false)]));
+ let values =
+ Arc::new(BinaryArray::from_vec(vec![b"1.2", b"3.4"])) as Arc<dyn
arrow::array::Array>;
+ let batch = RecordBatch::try_new(Arc::clone(&file_schema),
vec![values])?;
+
+ let required_schema = Arc::new(Schema::new(vec![Field::new(
+ "a",
+ DataType::Decimal128(37, 1),
+ false,
+ )]));
+
+ let err = roundtrip(&batch, required_schema)
+ .await
+ .expect_err("expected ParquetSchemaConvert error for BINARY ->
decimal");
+ let msg = err.to_string();
+ assert!(
+ msg.contains("Column: [[a]]")
+ && msg.contains("Expected: decimal(37,1)")
+ && msg.contains("Found: BINARY"),
+ "unexpected error: {msg}"
+ );
+ Ok(())
+ }
+
+ /// Reading an INT32 Parquet column as a DECIMAL whose integer-precision
+ /// (`precision - scale`) cannot represent the full INT32 range must raise
+ /// a Spark-compatible `ParquetSchemaConvert` error. Spark's
+ /// `ParquetVectorUpdaterFactory.canReadAsDecimal` requires
+ /// `precision - scale >= 10` (matching
`DecimalType.IntDecimal.precision`).
+ /// See #4344.
+ #[tokio::test]
+ async fn parquet_int32_read_as_narrow_decimal_errors() -> Result<(),
DataFusionError> {
+ let file_schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Int32, false)]));
+ let values = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc<dyn
arrow::array::Array>;
+ let batch = RecordBatch::try_new(Arc::clone(&file_schema),
vec![values])?;
+
+ // Decimal(9, 0): integer-precision = 9, below the 10 required for
INT32.
+ let required_schema = Arc::new(Schema::new(vec![Field::new(
+ "a",
+ DataType::Decimal128(9, 0),
+ false,
+ )]));
+
+ let err = roundtrip(&batch, required_schema)
+ .await
+ .expect_err("expected ParquetSchemaConvert error for INT32 ->
Decimal(9, 0)");
+ let msg = err.to_string();
+ assert!(
+ msg.contains("Column: [[a]]")
+ && msg.contains("Expected: decimal")
+ && msg.contains("Found: INT32"),
+ "unexpected error: {msg}"
+ );
+ Ok(())
+ }
+
+ /// Reading an INT32 Parquet column as a DECIMAL whose integer-precision is
+ /// at least 10 must succeed.
+ #[tokio::test]
+ async fn parquet_int32_read_as_wide_decimal_succeeds() -> Result<(),
DataFusionError> {
+ let file_schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Int32, false)]));
+ let values = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc<dyn
arrow::array::Array>;
+ let batch = RecordBatch::try_new(Arc::clone(&file_schema),
vec![values])?;
+
+ // Decimal(10, 0): integer-precision = 10, exactly the minimum for
INT32.
+ let required_schema = Arc::new(Schema::new(vec![Field::new(
+ "a",
+ DataType::Decimal128(10, 0),
+ false,
+ )]));
let _ = roundtrip(&batch, required_schema).await?;
+ Ok(())
+ }
+ /// Reading an INT64 Parquet column as a DECIMAL whose integer-precision
+ /// is below the 20 required for INT64 must raise the Spark-compatible
+ /// error. See #4344.
+ #[tokio::test]
+ async fn parquet_int64_read_as_narrow_decimal_errors() -> Result<(),
DataFusionError> {
+ let file_schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Int64, false)]));
+ let values = Arc::new(Int64Array::from(vec![1i64, 2, 3])) as Arc<dyn
arrow::array::Array>;
+ let batch = RecordBatch::try_new(Arc::clone(&file_schema),
vec![values])?;
+
+ // Decimal(19, 0): integer-precision = 19, below the 20 required for
INT64.
+ let required_schema = Arc::new(Schema::new(vec![Field::new(
+ "a",
+ DataType::Decimal128(19, 0),
+ false,
+ )]));
+
+ let err = roundtrip(&batch, required_schema)
+ .await
+ .expect_err("expected ParquetSchemaConvert error for INT64 ->
Decimal(19, 0)");
+ let msg = err.to_string();
+ assert!(
+ msg.contains("Column: [[a]]")
+ && msg.contains("Expected: decimal")
+ && msg.contains("Found: INT64"),
+ "unexpected error: {msg}"
+ );
+ Ok(())
+ }
+
+ /// Non-zero scale is rejected when `precision - scale` falls below the
+ /// integer minimum. INT32 -> Decimal(10, 1) has integer-precision 9.
+ #[tokio::test]
+ async fn parquet_int32_read_as_decimal_with_scale_errors() -> Result<(),
DataFusionError> {
+ let file_schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Int32, false)]));
+ let values = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc<dyn
arrow::array::Array>;
+ let batch = RecordBatch::try_new(Arc::clone(&file_schema),
vec![values])?;
+
+ let required_schema = Arc::new(Schema::new(vec![Field::new(
+ "a",
+ DataType::Decimal128(10, 1),
+ false,
+ )]));
+
+ let err = roundtrip(&batch, required_schema)
+ .await
+ .expect_err("expected ParquetSchemaConvert error for INT32 ->
Decimal(10, 1)");
+ let msg = err.to_string();
+ assert!(
+ msg.contains("Column: [[a]]")
+ && msg.contains("Expected: decimal")
+ && msg.contains("Found: INT32"),
+ "unexpected error: {msg}"
+ );
+ Ok(())
+ }
+
+ /// Helper to build a tiny decimal Parquet batch for the
decimal-to-decimal tests.
+ fn decimal_batch(precision: u8, scale: i8) -> Result<RecordBatch,
DataFusionError> {
+ let file_schema = Arc::new(Schema::new(vec![Field::new(
+ "a",
+ DataType::Decimal128(precision, scale),
+ false,
+ )]));
+ let values = Arc::new(
+ Decimal128Array::from(vec![123i128, 456])
+ .with_precision_and_scale(precision, scale)
+ .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?,
+ ) as Arc<dyn arrow::array::Array>;
+ Ok(RecordBatch::try_new(file_schema, vec![values])?)
+ }
+
+ /// Reading Decimal(P, S) as Decimal(P', S) where P' < P (precision-only
+ /// narrowing, equal scale) must raise the Spark-compatible error. Spark's
+ /// `isDecimalTypeMatched` rejects this because `precisionIncrease < 0`
+ /// while `scaleIncrease == 0`. See #4343.
+ #[tokio::test]
+ async fn parquet_decimal_precision_narrowing_errors() -> Result<(),
DataFusionError> {
+ let batch = decimal_batch(10, 2)?;
+ let required_schema = Arc::new(Schema::new(vec![Field::new(
+ "a",
+ DataType::Decimal128(5, 2),
+ false,
+ )]));
+
+ let err = roundtrip(&batch, required_schema)
+ .await
+ .expect_err("expected ParquetSchemaConvert for Decimal(10, 2) ->
Decimal(5, 2)");
+ let msg = err.to_string();
+ assert!(
+ msg.contains("Column: [[a]]") && msg.contains("Expected:
decimal(5,2)"),
+ "unexpected error: {msg}"
+ );
+ Ok(())
+ }
+
+ /// Reading Decimal(P, S) as Decimal(P', S') where the integer-precision
+ /// `P - S` shrinks must raise the Spark-compatible error. Example:
+ /// Decimal(10, 4) (int-precision 6) -> Decimal(5, 2) (int-precision 3).
+ /// See #4343.
+ #[tokio::test]
+ async fn parquet_decimal_int_precision_narrowing_errors() -> Result<(),
DataFusionError> {
+ let batch = decimal_batch(10, 4)?;
+ let required_schema = Arc::new(Schema::new(vec![Field::new(
+ "a",
+ DataType::Decimal128(5, 2),
+ false,
+ )]));
+
+ let err = roundtrip(&batch, required_schema)
+ .await
+ .expect_err("expected ParquetSchemaConvert for Decimal(10, 4) ->
Decimal(5, 2)");
+ let msg = err.to_string();
+ assert!(msg.contains("Column: [[a]]"), "unexpected error: {msg}");
+ Ok(())
+ }
+
+ /// Reading Decimal(P, S) as Decimal(P, S') where S' > S but `P - S` did
+ /// not grow means the cast would shift integer digits into the fractional
+ /// part and lose the most-significant digit. Example: Decimal(5, 2) ->
+ /// Decimal(5, 3): scaleIncrease=1, precisionIncrease=0. See #4343.
+ #[tokio::test]
+ async fn parquet_decimal_scale_widening_without_precision_errors() ->
Result<(), DataFusionError>
+ {
+ let batch = decimal_batch(5, 2)?;
+ let required_schema = Arc::new(Schema::new(vec![Field::new(
+ "a",
+ DataType::Decimal128(5, 3),
+ false,
+ )]));
+
+ let err = roundtrip(&batch, required_schema)
+ .await
+ .expect_err("expected ParquetSchemaConvert for Decimal(5, 2) ->
Decimal(5, 3)");
+ let msg = err.to_string();
+ assert!(msg.contains("Column: [[a]]"), "unexpected error: {msg}");
+ Ok(())
+ }
+
+ /// Sanity check: widening both precision and scale by the same amount is
+ /// allowed (the cast is lossless). Decimal(5, 2) -> Decimal(7, 4) gives
+ /// scaleIncrease=2, precisionIncrease=2, so `precisionIncrease >=
scaleIncrease`.
+ #[tokio::test]
+ async fn parquet_decimal_widening_succeeds() -> Result<(),
DataFusionError> {
+ let batch = decimal_batch(5, 2)?;
+ let required_schema = Arc::new(Schema::new(vec![Field::new(
+ "a",
+ DataType::Decimal128(7, 4),
+ false,
+ )]));
+
+ let _ = roundtrip(&batch, required_schema).await?;
+ Ok(())
+ }
+
+ /// Helper for the #4297 rejection tests: write a 1-row batch and assert
+ /// that reading it under `read_type` raises `ParquetSchemaConvert`.
+ async fn assert_rejected_conversion(
+ file_field: Field,
+ values: Arc<dyn arrow::array::Array>,
+ read_type: DataType,
+ ) -> Result<String, DataFusionError> {
+ let file_schema = Arc::new(Schema::new(vec![file_field]));
+ let batch = RecordBatch::try_new(Arc::clone(&file_schema),
vec![values])?;
+ let read_field_name = file_schema.field(0).name();
+ let required_schema = Arc::new(Schema::new(vec![Field::new(
+ read_field_name,
+ read_type,
+ false,
+ )]));
+ let err = roundtrip(&batch, required_schema)
+ .await
+ .expect_err("expected ParquetSchemaConvert");
+ Ok(err.to_string())
+ }
+
+ /// `INT64 -> INT32` truncates to the lower 32 bits in DataFusion's cast.
+ /// Spark's vectorized reader rejects this. See #4297.
+ #[tokio::test]
+ async fn parquet_long_read_as_int_errors() -> Result<(), DataFusionError> {
+ let values =
+ Arc::new(Int64Array::from(vec![1i64, 1 << 33])) as Arc<dyn
arrow::array::Array>;
+ let msg = assert_rejected_conversion(
+ Field::new("a", DataType::Int64, false),
+ values,
+ DataType::Int32,
+ )
+ .await?;
+ assert!(
+ msg.contains("Found: INT64") && msg.contains("Expected: int"),
+ "unexpected error: {msg}"
+ );
+ Ok(())
+ }
+
+ /// `INT64 -> Float64` loses precision for large values; Spark rejects.
+ #[tokio::test]
+ async fn parquet_long_read_as_double_errors() -> Result<(),
DataFusionError> {
+ let values = Arc::new(Int64Array::from(vec![1i64, (1i64 << 54) + 1]))
+ as Arc<dyn arrow::array::Array>;
+ let msg = assert_rejected_conversion(
+ Field::new("a", DataType::Int64, false),
+ values,
+ DataType::Float64,
+ )
+ .await?;
+ assert!(
+ msg.contains("Found: INT64") && msg.contains("Expected: double"),
+ "unexpected error: {msg}"
+ );
+ Ok(())
+ }
+
+ /// `Float64 -> Float32` overflows / loses precision; Spark rejects.
+ #[tokio::test]
+ async fn parquet_double_read_as_float_errors() -> Result<(),
DataFusionError> {
+ let values =
+ Arc::new(Float64Array::from(vec![1.5_f64, 1e40])) as Arc<dyn
arrow::array::Array>;
+ let msg = assert_rejected_conversion(
+ Field::new("a", DataType::Float64, false),
+ values,
+ DataType::Float32,
+ )
+ .await?;
+ assert!(
+ msg.contains("Found: DOUBLE") && msg.contains("Expected: float"),
+ "unexpected error: {msg}"
+ );
+ Ok(())
+ }
+
+ /// `Float32 -> Int64` truncates the fractional part; Spark rejects.
+ #[tokio::test]
+ async fn parquet_float_read_as_long_errors() -> Result<(),
DataFusionError> {
+ let values =
+ Arc::new(Float32Array::from(vec![1.5_f32, 2.5])) as Arc<dyn
arrow::array::Array>;
+ let msg = assert_rejected_conversion(
+ Field::new("a", DataType::Float32, false),
+ values,
+ DataType::Int64,
+ )
+ .await?;
+ assert!(
+ msg.contains("Found: FLOAT") && msg.contains("Expected: bigint"),
+ "unexpected error: {msg}"
+ );
+ Ok(())
+ }
+
+ /// `Float64 -> Int64` similarly.
+ #[tokio::test]
+ async fn parquet_double_read_as_long_errors() -> Result<(),
DataFusionError> {
+ let values =
+ Arc::new(Float64Array::from(vec![1.5_f64, 2.5])) as Arc<dyn
arrow::array::Array>;
+ let msg = assert_rejected_conversion(
+ Field::new("a", DataType::Float64, false),
+ values,
+ DataType::Int64,
+ )
+ .await?;
+ assert!(
+ msg.contains("Found: DOUBLE") && msg.contains("Expected: bigint"),
+ "unexpected error: {msg}"
+ );
+ Ok(())
+ }
+
+ /// `Int32 -> Float32` loses precision for values past `2^24`. Spark
+ /// allows `Int32 -> Float64` but rejects `Int32 -> Float32`.
+ #[tokio::test]
+ async fn parquet_int_read_as_float_errors() -> Result<(), DataFusionError>
{
+ let values =
+ Arc::new(Int32Array::from(vec![1, (1 << 25) + 1])) as Arc<dyn
arrow::array::Array>;
+ let msg = assert_rejected_conversion(
+ Field::new("a", DataType::Int32, false),
+ values,
+ DataType::Float32,
+ )
+ .await?;
+ assert!(
+ msg.contains("Found: INT32") && msg.contains("Expected: float"),
+ "unexpected error: {msg}"
+ );
+ Ok(())
+ }
+
+ /// `Int32 -> Timestamp(_, None)`: raw INT32 reinterpreted as epoch seconds
+ /// produces dates near the Unix epoch. Only DATE-annotated INT32 columns
+ /// (which surface as `Date32`) are allowed to read as `TimestampNTZ`.
+ #[tokio::test]
+ async fn parquet_int_read_as_timestamp_ntz_errors() -> Result<(),
DataFusionError> {
+ let values = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc<dyn
arrow::array::Array>;
+ let msg = assert_rejected_conversion(
+ Field::new("a", DataType::Int32, false),
+ values,
+ DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
+ )
+ .await?;
+ assert!(
+ msg.contains("Found: INT32") && msg.contains("Expected:
timestamp"),
+ "unexpected error: {msg}"
+ );
+ Ok(())
+ }
+
+ /// `Int64 -> Date32` similarly: raw INT64 (no DATE annotation, otherwise
+ /// the file would surface as `Date32`).
+ #[tokio::test]
+ async fn parquet_long_read_as_date_errors() -> Result<(), DataFusionError>
{
+ let values = Arc::new(Int64Array::from(vec![1i64, 2])) as Arc<dyn
arrow::array::Array>;
+ let msg = assert_rejected_conversion(
+ Field::new("a", DataType::Int64, false),
+ values,
+ DataType::Date32,
+ )
+ .await?;
+ assert!(
+ msg.contains("Found: INT64") && msg.contains("Expected: date"),
+ "unexpected error: {msg}"
+ );
+ Ok(())
+ }
+
+ /// `Date32 -> Timestamp(_, Some(_))` (LTZ). Spark's vectorized reader
+ /// allows `Date -> TimestampNTZ` but not `Date -> Timestamp(LTZ)`.
+ #[tokio::test]
+ async fn parquet_date_read_as_ltz_timestamp_errors() -> Result<(),
DataFusionError> {
+ let values =
+ Arc::new(Date32Array::from(vec![18262, 18263])) as Arc<dyn
arrow::array::Array>;
+ let msg = assert_rejected_conversion(
+ Field::new("a", DataType::Date32, false),
+ values,
+ DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond,
Some("UTC".into())),
+ )
+ .await?;
+ assert!(
+ msg.contains("Found: INT32") && msg.contains("Expected:
timestamp"),
+ "unexpected error: {msg}"
+ );
+ Ok(())
+ }
+
+ /// `Timestamp(_, _) -> Date32`: no Timestamp updater branches into
+ /// `DateType`, so Spark rejects.
+ #[tokio::test]
+ async fn parquet_timestamp_read_as_date_errors() -> Result<(),
DataFusionError> {
+ let values = Arc::new(TimestampMicrosecondArray::from(vec![0i64,
1_000_000]))
+ as Arc<dyn arrow::array::Array>;
+ let msg = assert_rejected_conversion(
+ Field::new(
+ "a",
+ DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond,
None),
+ false,
+ ),
+ values,
+ DataType::Date32,
+ )
+ .await?;
+ assert!(msg.contains("Expected: date"), "unexpected error: {msg}");
+ Ok(())
+ }
+
+ /// SPARK-26709: an empty Parquet file with a column that would otherwise
fail
+ /// the type-promotion check (INT32 read as INT64 when
allow_type_promotion is
+ /// false) must still be readable. Spark's vectorized reader only enforces
the
+ /// check per row group, so a file with no row groups passes silently. The
+ /// adapter's plan-time rejection must not fire for the empty-file case.
+ #[tokio::test]
+ async fn parquet_empty_file_disallowed_widening() -> Result<(),
DataFusionError> {
+ let file_schema = Arc::new(Schema::new(vec![Field::new("col",
DataType::Int32, false)]));
+ let filename = get_temp_filename();
+ let filename =
filename.as_path().as_os_str().to_str().unwrap().to_string();
+ let file = File::create(&filename)?;
+ let writer = ArrowWriter::try_new(file, Arc::clone(&file_schema),
None)?;
Review Comment:
Both tests build `ArrowWriter` and `FileScanConfigBuilder` and
`expr_adapter_factory` from scratch, even though `roundtrip` at
`schema_adapter.rs:815` already does that setup. Could threading an
`expect_empty: bool` (or factoring the shared setup out) let both tests go
through `roundtrip` and drop the duplicated ~30 lines?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]