[jira] [Updated] (SPARK-41952) Upgrade Parquet to fix off-heap memory leaks in Zstd codec

2023-01-09 Thread Alexey Kudinkin (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Kudinkin updated SPARK-41952:

Shepherd: Dongjoon Hyun

> Upgrade Parquet to fix off-heap memory leaks in Zstd codec
> --
>
> Key: SPARK-41952
> URL: https://issues.apache.org/jira/browse/SPARK-41952
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 3.1.3, 3.3.1, 3.2.3
>Reporter: Alexey Kudinkin
>Priority: Critical
>
> Recently, native memory leak have been discovered in Parquet in conjunction 
> of it using Zstd decompressor from luben/zstd-jni library (PARQUET-2160).
> This is very problematic to a point where we can't use Parquet w/ Zstd due to 
> pervasive OOMs taking down our executors and disrupting our jobs.
> Luckily fix addressing this had already landed in Parquet:
> [https://github.com/apache/parquet-mr/pull/982]
>  
> Now, we just need to
>  # Updated version of Parquet is released in a timely manner
>  # Spark is upgraded onto this new version in the upcoming release
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-41952) Upgrade Parquet to fix off-heap memory leaks in Zstd codec

2023-01-09 Thread Alexey Kudinkin (Jira)
Alexey Kudinkin created SPARK-41952:
---

 Summary: Upgrade Parquet to fix off-heap memory leaks in Zstd codec
 Key: SPARK-41952
 URL: https://issues.apache.org/jira/browse/SPARK-41952
 Project: Spark
  Issue Type: Bug
  Components: Input/Output
Affects Versions: 3.2.3, 3.3.1, 3.1.3
Reporter: Alexey Kudinkin


Recently, native memory leak have been discovered in Parquet in conjunction of 
it using Zstd decompressor from luben/zstd-jni library (PARQUET-2160).

This is very problematic to a point where we can't use Parquet w/ Zstd due to 
pervasive OOMs taking down our executors and disrupting our jobs.

Luckily fix addressing this had already landed in Parquet:
[https://github.com/apache/parquet-mr/pull/982]

 

Now, we just need to
 # Updated version of Parquet is released in a timely manner
 # Spark is upgraded onto this new version in the upcoming release

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40876) Spark's Vectorized ParquetReader should support type promotions

2022-10-21 Thread Alexey Kudinkin (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622518#comment-17622518
 ] 

Alexey Kudinkin commented on SPARK-40876:
-

> This is because Parquet Int32, which is a physical type in Parquet, can 
> represent byte/short etc, using additional logical types. See 
> [here|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#numeric-types]
>  for more info. Based on the logical types, Spark converts Parquet type to 
> different Spark types, such as {{{}ByteType{}}}, {{{}ShortType{}}}, etc.

Right, i totally forgot that Parquet doesn't have smaller than Int32.

> Spark's Vectorized ParquetReader should support type promotions
> ---
>
> Key: SPARK-40876
> URL: https://issues.apache.org/jira/browse/SPARK-40876
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 3.3.0
>Reporter: Alexey Kudinkin
>Priority: Major
>
> Currently, when reading Parquet table using Spark's `VectorizedColumnReader`, 
> we hit an issue where we specify requested (projection) schema where one of 
> the field's type is widened from int32 to long.
> Expectation is that since this is totally legitimate primitive type 
> promotion, we should be able to read Ints into Longs w/ no problems (for ex, 
> Avro is able to do that perfectly fine).
> However, we're facing an issue where `ParquetVectorUpdaterFactory.getUpdater` 
> method fails w/ the exception listed below.
> Looking at the code, It actually seems to be allowing the opposite – it 
> allows to "down-size" Int32s persisted in the Parquet to be read as Bytes or 
> Shorts for ex. I'm actually not sure what's the rationale for this behavior, 
> and this actually seems like a bug to me (as this will essentially be leading 
> to data truncation):
> {code:java}
> case INT32:
>   if (sparkType == DataTypes.IntegerType || canReadAsIntDecimal(descriptor, 
> sparkType)) {
> return new IntegerUpdater();
>   } else if (sparkType == DataTypes.LongType && isUnsignedIntTypeMatched(32)) 
> {
> // In `ParquetToSparkSchemaConverter`, we map parquet UINT32 to our 
> LongType.
> // For unsigned int32, it stores as plain signed int32 in Parquet when 
> dictionary
> // fallbacks. We read them as long values.
> return new UnsignedIntegerUpdater();
>   } else if (sparkType == DataTypes.ByteType) {
> return new ByteUpdater();
>   } else if (sparkType == DataTypes.ShortType) {
> return new ShortUpdater();
>   } else if (sparkType == DataTypes.DateType) {
> if ("CORRECTED".equals(datetimeRebaseMode)) {
>   return new IntegerUpdater();
> } else {
>   boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
>   return new IntegerWithRebaseUpdater(failIfRebase);
> }
>   }
>   break; {code}
> Exception:
> {code:java}
> at 
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
>     at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>     at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>     at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
>     at scala.Option.foreach(Option.scala:407)
>     at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
>     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
>     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
>     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
>     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
>     at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at 
> 

[jira] [Comment Edited] (SPARK-40876) Spark's Vectorized ParquetReader should support type promotions

2022-10-21 Thread Alexey Kudinkin (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622518#comment-17622518
 ] 

Alexey Kudinkin edited comment on SPARK-40876 at 10/21/22 11:53 PM:


> This is because Parquet Int32, which is a physical type in Parquet, can 
> represent byte/short etc, using additional logical types. See 
> [here|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#numeric-types]
>  for more info. Based on the logical types, Spark converts Parquet type to 
> different Spark types, such as {{{}ByteType{}}}, {{{}ShortType{}}}, etc.

Right, i totally forgot that Parquet doesn't have integers smaller than Int32.


was (Author: alexey.kudinkin):
> This is because Parquet Int32, which is a physical type in Parquet, can 
> represent byte/short etc, using additional logical types. See 
> [here|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#numeric-types]
>  for more info. Based on the logical types, Spark converts Parquet type to 
> different Spark types, such as {{{}ByteType{}}}, {{{}ShortType{}}}, etc.

Right, i totally forgot that Parquet doesn't have smaller than Int32.

> Spark's Vectorized ParquetReader should support type promotions
> ---
>
> Key: SPARK-40876
> URL: https://issues.apache.org/jira/browse/SPARK-40876
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 3.3.0
>Reporter: Alexey Kudinkin
>Priority: Major
>
> Currently, when reading Parquet table using Spark's `VectorizedColumnReader`, 
> we hit an issue where we specify requested (projection) schema where one of 
> the field's type is widened from int32 to long.
> Expectation is that since this is totally legitimate primitive type 
> promotion, we should be able to read Ints into Longs w/ no problems (for ex, 
> Avro is able to do that perfectly fine).
> However, we're facing an issue where `ParquetVectorUpdaterFactory.getUpdater` 
> method fails w/ the exception listed below.
> Looking at the code, It actually seems to be allowing the opposite – it 
> allows to "down-size" Int32s persisted in the Parquet to be read as Bytes or 
> Shorts for ex. I'm actually not sure what's the rationale for this behavior, 
> and this actually seems like a bug to me (as this will essentially be leading 
> to data truncation):
> {code:java}
> case INT32:
>   if (sparkType == DataTypes.IntegerType || canReadAsIntDecimal(descriptor, 
> sparkType)) {
> return new IntegerUpdater();
>   } else if (sparkType == DataTypes.LongType && isUnsignedIntTypeMatched(32)) 
> {
> // In `ParquetToSparkSchemaConverter`, we map parquet UINT32 to our 
> LongType.
> // For unsigned int32, it stores as plain signed int32 in Parquet when 
> dictionary
> // fallbacks. We read them as long values.
> return new UnsignedIntegerUpdater();
>   } else if (sparkType == DataTypes.ByteType) {
> return new ByteUpdater();
>   } else if (sparkType == DataTypes.ShortType) {
> return new ShortUpdater();
>   } else if (sparkType == DataTypes.DateType) {
> if ("CORRECTED".equals(datetimeRebaseMode)) {
>   return new IntegerUpdater();
> } else {
>   boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
>   return new IntegerWithRebaseUpdater(failIfRebase);
> }
>   }
>   break; {code}
> Exception:
> {code:java}
> at 
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
>     at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>     at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>     at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
>     at scala.Option.foreach(Option.scala:407)
>     at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
>     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
>     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
>     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
>     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>     at 

[jira] [Commented] (SPARK-40876) Spark's Vectorized ParquetReader should support type promotions

2022-10-21 Thread Alexey Kudinkin (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622517#comment-17622517
 ] 

Alexey Kudinkin commented on SPARK-40876:
-

Alright, troubleshooting this more, the problem seems to be in 
`ParquetReadSupport.init` method: 
{code:java}
val parquetClippedSchema = 
ParquetReadSupport.clipParquetSchema(parquetFileSchema,
  catalystRequestedSchema, caseSensitive)

// We pass two schema to ParquetRecordMaterializer:
// - parquetRequestedSchema: the schema of the file data we want to read
// - catalystRequestedSchema: the schema of the rows we want to return
// The reader is responsible for reconciling the differences between the two.
val parquetRequestedSchema = if (schemaPruningEnabled && 
!enableVectorizedReader) {
  // Parquet-MR reader requires that parquetRequestedSchema include only those 
fields present
  // in the underlying parquetFileSchema. Therefore, we intersect the 
parquetClippedSchema
  // with the parquetFileSchema
  ParquetReadSupport.intersectParquetGroups(parquetClippedSchema, 
parquetFileSchema)
.map(groupType => new MessageType(groupType.getName, groupType.getFields))
.getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE)
} else {
  // Spark's vectorized reader only support atomic types currently. It also 
skip fields
  // in parquetRequestedSchema which are not present in the file.
  parquetClippedSchema
} {code}
Even though we clearly request the types to be promoted from Int32 to Int64 in 
the requested schema, it actually composes requested schema passed in to the 
reader by essentially just projecting the file-schema. As a result, requested 
schema that `VectorizedColumnReader` receives is actually the file's one (ie 
one w/ the Int32), but the columns will be allocated based on the Catalyst's 
one (which is Int64) causing misalignment.

> Spark's Vectorized ParquetReader should support type promotions
> ---
>
> Key: SPARK-40876
> URL: https://issues.apache.org/jira/browse/SPARK-40876
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 3.3.0
>Reporter: Alexey Kudinkin
>Priority: Major
>
> Currently, when reading Parquet table using Spark's `VectorizedColumnReader`, 
> we hit an issue where we specify requested (projection) schema where one of 
> the field's type is widened from int32 to long.
> Expectation is that since this is totally legitimate primitive type 
> promotion, we should be able to read Ints into Longs w/ no problems (for ex, 
> Avro is able to do that perfectly fine).
> However, we're facing an issue where `ParquetVectorUpdaterFactory.getUpdater` 
> method fails w/ the exception listed below.
> Looking at the code, It actually seems to be allowing the opposite – it 
> allows to "down-size" Int32s persisted in the Parquet to be read as Bytes or 
> Shorts for ex. I'm actually not sure what's the rationale for this behavior, 
> and this actually seems like a bug to me (as this will essentially be leading 
> to data truncation):
> {code:java}
> case INT32:
>   if (sparkType == DataTypes.IntegerType || canReadAsIntDecimal(descriptor, 
> sparkType)) {
> return new IntegerUpdater();
>   } else if (sparkType == DataTypes.LongType && isUnsignedIntTypeMatched(32)) 
> {
> // In `ParquetToSparkSchemaConverter`, we map parquet UINT32 to our 
> LongType.
> // For unsigned int32, it stores as plain signed int32 in Parquet when 
> dictionary
> // fallbacks. We read them as long values.
> return new UnsignedIntegerUpdater();
>   } else if (sparkType == DataTypes.ByteType) {
> return new ByteUpdater();
>   } else if (sparkType == DataTypes.ShortType) {
> return new ShortUpdater();
>   } else if (sparkType == DataTypes.DateType) {
> if ("CORRECTED".equals(datetimeRebaseMode)) {
>   return new IntegerUpdater();
> } else {
>   boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
>   return new IntegerWithRebaseUpdater(failIfRebase);
> }
>   }
>   break; {code}
> Exception:
> {code:java}
> at 
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
>     at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>     at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>     at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
>     at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
>   

[jira] [Updated] (SPARK-40876) Spark's Vectorized ParquetReader should support type promotions

2022-10-21 Thread Alexey Kudinkin (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Kudinkin updated SPARK-40876:

Description: 
Currently, when reading Parquet table using Spark's `VectorizedColumnReader`, 
we hit an issue where we specify requested (projection) schema where one of the 
field's type is widened from int32 to long.

Expectation is that since this is totally legitimate primitive type promotion, 
we should be able to read Ints into Longs w/ no problems (for ex, Avro is able 
to do that perfectly fine).

However, we're facing an issue where `ParquetVectorUpdaterFactory.getUpdater` 
method fails w/ the exception listed below.

Looking at the code, It actually seems to be allowing the opposite – it allows 
to "down-size" Int32s persisted in the Parquet to be read as Bytes or Shorts 
for ex. I'm actually not sure what's the rationale for this behavior, and this 
actually seems like a bug to me (as this will essentially be leading to data 
truncation):
{code:java}
case INT32:
  if (sparkType == DataTypes.IntegerType || canReadAsIntDecimal(descriptor, 
sparkType)) {
return new IntegerUpdater();
  } else if (sparkType == DataTypes.LongType && isUnsignedIntTypeMatched(32)) {
// In `ParquetToSparkSchemaConverter`, we map parquet UINT32 to our 
LongType.
// For unsigned int32, it stores as plain signed int32 in Parquet when 
dictionary
// fallbacks. We read them as long values.
return new UnsignedIntegerUpdater();
  } else if (sparkType == DataTypes.ByteType) {
return new ByteUpdater();
  } else if (sparkType == DataTypes.ShortType) {
return new ShortUpdater();
  } else if (sparkType == DataTypes.DateType) {
if ("CORRECTED".equals(datetimeRebaseMode)) {
  return new IntegerUpdater();
} else {
  boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
  return new IntegerWithRebaseUpdater(failIfRebase);
}
  }
  break; {code}
Exception:
{code:java}
at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
    at scala.Option.foreach(Option.scala:407)
    at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
    at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:304)
    at org.apache.spark.RangePartitioner.(Partitioner.scala:171)
    at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:293)
    at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:173)
    at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:167)
    at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:143)
    at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:139)
    at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:68)
    at 

[jira] [Created] (SPARK-40876) Spark's Vectorized ParquetReader should support type promotions

2022-10-21 Thread Alexey Kudinkin (Jira)
Alexey Kudinkin created SPARK-40876:
---

 Summary: Spark's Vectorized ParquetReader should support type 
promotions
 Key: SPARK-40876
 URL: https://issues.apache.org/jira/browse/SPARK-40876
 Project: Spark
  Issue Type: Improvement
  Components: Input/Output
Affects Versions: 3.3.0
Reporter: Alexey Kudinkin


Currently, when reading Parquet table using Spark's `VectorizedColumnReader`, 
we hit an issue where we specify requested (projection) schema where one of the 
field's type is widened from int32 to long.

Expectation is that since this is totally legitimate primitive type promotion, 
we should be able to read Ints into Longs w/ no problems (for ex, Avro is able 
to do that perfectly fine).

However, we're facing an issue where `ParquetVectorUpdaterFactory.getUpdater` 
method fails w/ the exception listed below.

Looking at the code, It actually seems to be allowing the opposite – it allows 
to "down-size" Int32s persisted in the Parquet to be read as Bytes or Shorts 
for ex. I'm actually not sure what's the rationale for this behavior, and this 
actually seems like a bug to me (as this will essentially be leading to data 
truncation):
{code:java}
case INT32:
  if (sparkType == DataTypes.IntegerType || canReadAsIntDecimal(descriptor, 
sparkType)) {
return new IntegerUpdater();
  } else if (sparkType == DataTypes.LongType && isUnsignedIntTypeMatched(32)) {
// In `ParquetToSparkSchemaConverter`, we map parquet UINT32 to our 
LongType.
// For unsigned int32, it stores as plain signed int32 in Parquet when 
dictionary
// fallbacks. We read them as long values.
return new UnsignedIntegerUpdater();
  } else if (sparkType == DataTypes.ByteType) {
return new ByteUpdater();
  } else if (sparkType == DataTypes.ShortType) {
return new ShortUpdater();
  } else if (sparkType == DataTypes.DateType) {
if ("CORRECTED".equals(datetimeRebaseMode)) {
  return new IntegerUpdater();
} else {
  boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
  return new IntegerWithRebaseUpdater(failIfRebase);
}
  }
  break; {code}
Exception:
{code:java}
at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
    at scala.Option.foreach(Option.scala:407)
    at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
    at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:304)
    at org.apache.spark.RangePartitioner.(Partitioner.scala:171)
    at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:293)
    at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:173)
    at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:167)
    at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:143)
    at 

[jira] [Created] (SPARK-38512) ResolveFunctions implemented incorrectly requiring multiple passes to Resolve Nested Expressions

2022-03-10 Thread Alexey Kudinkin (Jira)
Alexey Kudinkin created SPARK-38512:
---

 Summary: ResolveFunctions implemented incorrectly requiring 
multiple passes to Resolve Nested Expressions 
 Key: SPARK-38512
 URL: https://issues.apache.org/jira/browse/SPARK-38512
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.2.1, 3.2.0
Reporter: Alexey Kudinkin


ResolveFunctions Rule is implemented incorrectly requiring multiple passes to 
Resolve Nested Expressions:

While Plan object is traversed correctly in post-order (bottoms-up, 
`plan.resolveOperatorsUpWithPruning), internally, Plan children though are 
traversed incorrectly in pre-order (top-down, using 
`transformExpressionsWithPruning`):

 
{code:java}
case q: LogicalPlan =>
  q.transformExpressionsWithPruning(...) { ... } {code}
 

Traversing in pre-order means that attempt is taken to resolve the current 
node, before its children are resolved, which is incorrect, since the node 
itself could not be resolved before its children are.

While this is not leading to failures yet, this is taxing on performance – most 
of the expressions in Spark should be able to be resolved in a *single pass* 
(if resolved bottoms-up, take reproducible sample at the bottom). Instead, it 
currently takes Spark at least *N*  iterations to resolve such expressions, 
where N is proportional to the depth of the Expression tree.

 

Example to reproduce: 

 
{code:java}
def resolveExpr(spark: SparkSession, exprStr: String, tableSchema: StructType): 
Expression = {
  val expr = spark.sessionState.sqlParser.parseExpression(exprStr)
  val analyzer = spark.sessionState.analyzer
  val schemaFields = tableSchema.fields

  val resolvedExpr = {
val plan: LogicalPlan = Filter(expr, LocalRelation(schemaFields.head, 
schemaFields.drop(1): _*))
val rules: Seq[Rule[LogicalPlan]] = {
  analyzer.ResolveFunctions ::
  analyzer.ResolveReferences ::
  Nil
}

rules.foldRight(plan)((rule, plan) => rule.apply(plan))
  .asInstanceOf[Filter]
  .condition
  }
  resolvedExpr
}

// Invoke with
resolveExpr(spark, "date_format(to_timestamp(B, '-MM-dd'), 'MM/dd/')", 
StructType(StructField("B", StringType))){code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org