[jira] [Updated] (SPARK-40876) Spark's Vectorized ParquetReader should support type promotions

2023-12-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-40876:
---
Labels: pull-request-available  (was: )

> Spark's Vectorized ParquetReader should support type promotions
> ---
>
> Key: SPARK-40876
> URL: https://issues.apache.org/jira/browse/SPARK-40876
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 3.3.0
>Reporter: Alexey Kudinkin
>Priority: Major
>  Labels: pull-request-available
>
> Currently, when reading Parquet table using Spark's `VectorizedColumnReader`, 
> we hit an issue where we specify requested (projection) schema where one of 
> the field's type is widened from int32 to long.
> Expectation is that since this is totally legitimate primitive type 
> promotion, we should be able to read Ints into Longs w/ no problems (for ex, 
> Avro is able to do that perfectly fine).
> However, we're facing an issue where `ParquetVectorUpdaterFactory.getUpdater` 
> method fails w/ the exception listed below.
> Looking at the code, It actually seems to be allowing the opposite – it 
> allows to "down-size" Int32s persisted in the Parquet to be read as Bytes or 
> Shorts for ex. I'm actually not sure what's the rationale for this behavior, 
> and this actually seems like a bug to me (as this will essentially be leading 
> to data truncation):
> {code:java}
> case INT32:
>   if (sparkType == DataTypes.IntegerType || canReadAsIntDecimal(descriptor, 
> sparkType)) {
> return new IntegerUpdater();
>   } else if (sparkType == DataTypes.LongType && isUnsignedIntTypeMatched(32)) 
> {
> // In `ParquetToSparkSchemaConverter`, we map parquet UINT32 to our 
> LongType.
> // For unsigned int32, it stores as plain signed int32 in Parquet when 
> dictionary
> // fallbacks. We read them as long values.
> return new UnsignedIntegerUpdater();
>   } else if (sparkType == DataTypes.ByteType) {
> return new ByteUpdater();
>   } else if (sparkType == DataTypes.ShortType) {
> return new ShortUpdater();
>   } else if (sparkType == DataTypes.DateType) {
> if ("CORRECTED".equals(datetimeRebaseMode)) {
>   return new IntegerUpdater();
> } else {
>   boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
>   return new IntegerWithRebaseUpdater(failIfRebase);
> }
>   }
>   break; {code}
> Exception:
> {code:java}
> at 
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
>     at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>     at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>     at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
>     at scala.Option.foreach(Option.scala:407)
>     at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
>     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
>     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
>     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
>     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
>     at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
>     at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
>     at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:304)
>     at org.apache.spark.RangePartitioner.(Partitioner.scala:171)
>     at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(Shuffle

[jira] [Updated] (SPARK-40876) Spark's Vectorized ParquetReader should support type promotions

2022-10-21 Thread Alexey Kudinkin (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Kudinkin updated SPARK-40876:

Description: 
Currently, when reading Parquet table using Spark's `VectorizedColumnReader`, 
we hit an issue where we specify requested (projection) schema where one of the 
field's type is widened from int32 to long.

Expectation is that since this is totally legitimate primitive type promotion, 
we should be able to read Ints into Longs w/ no problems (for ex, Avro is able 
to do that perfectly fine).

However, we're facing an issue where `ParquetVectorUpdaterFactory.getUpdater` 
method fails w/ the exception listed below.

Looking at the code, It actually seems to be allowing the opposite – it allows 
to "down-size" Int32s persisted in the Parquet to be read as Bytes or Shorts 
for ex. I'm actually not sure what's the rationale for this behavior, and this 
actually seems like a bug to me (as this will essentially be leading to data 
truncation):
{code:java}
case INT32:
  if (sparkType == DataTypes.IntegerType || canReadAsIntDecimal(descriptor, 
sparkType)) {
return new IntegerUpdater();
  } else if (sparkType == DataTypes.LongType && isUnsignedIntTypeMatched(32)) {
// In `ParquetToSparkSchemaConverter`, we map parquet UINT32 to our 
LongType.
// For unsigned int32, it stores as plain signed int32 in Parquet when 
dictionary
// fallbacks. We read them as long values.
return new UnsignedIntegerUpdater();
  } else if (sparkType == DataTypes.ByteType) {
return new ByteUpdater();
  } else if (sparkType == DataTypes.ShortType) {
return new ShortUpdater();
  } else if (sparkType == DataTypes.DateType) {
if ("CORRECTED".equals(datetimeRebaseMode)) {
  return new IntegerUpdater();
} else {
  boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
  return new IntegerWithRebaseUpdater(failIfRebase);
}
  }
  break; {code}
Exception:
{code:java}
at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
    at scala.Option.foreach(Option.scala:407)
    at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
    at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:304)
    at org.apache.spark.RangePartitioner.(Partitioner.scala:171)
    at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:293)
    at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:173)
    at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:167)
    at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:143)
    at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:139)
    at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:68)
    at 
org.apache.spark