[jira] [Updated] (SPARK-41952) Upgrade Parquet to fix off-heap memory leaks in Zstd codec
[ https://issues.apache.org/jira/browse/SPARK-41952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Kudinkin updated SPARK-41952: Shepherd: Dongjoon Hyun > Upgrade Parquet to fix off-heap memory leaks in Zstd codec > -- > > Key: SPARK-41952 > URL: https://issues.apache.org/jira/browse/SPARK-41952 > Project: Spark > Issue Type: Bug > Components: Input/Output >Affects Versions: 3.1.3, 3.3.1, 3.2.3 >Reporter: Alexey Kudinkin >Priority: Critical > > Recently, native memory leak have been discovered in Parquet in conjunction > of it using Zstd decompressor from luben/zstd-jni library (PARQUET-2160). > This is very problematic to a point where we can't use Parquet w/ Zstd due to > pervasive OOMs taking down our executors and disrupting our jobs. > Luckily fix addressing this had already landed in Parquet: > [https://github.com/apache/parquet-mr/pull/982] > > Now, we just need to > # Updated version of Parquet is released in a timely manner > # Spark is upgraded onto this new version in the upcoming release > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41952) Upgrade Parquet to fix off-heap memory leaks in Zstd codec
Alexey Kudinkin created SPARK-41952: --- Summary: Upgrade Parquet to fix off-heap memory leaks in Zstd codec Key: SPARK-41952 URL: https://issues.apache.org/jira/browse/SPARK-41952 Project: Spark Issue Type: Bug Components: Input/Output Affects Versions: 3.2.3, 3.3.1, 3.1.3 Reporter: Alexey Kudinkin Recently, native memory leak have been discovered in Parquet in conjunction of it using Zstd decompressor from luben/zstd-jni library (PARQUET-2160). This is very problematic to a point where we can't use Parquet w/ Zstd due to pervasive OOMs taking down our executors and disrupting our jobs. Luckily fix addressing this had already landed in Parquet: [https://github.com/apache/parquet-mr/pull/982] Now, we just need to # Updated version of Parquet is released in a timely manner # Spark is upgraded onto this new version in the upcoming release -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-40876) Spark's Vectorized ParquetReader should support type promotions
[ https://issues.apache.org/jira/browse/SPARK-40876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622518#comment-17622518 ] Alexey Kudinkin commented on SPARK-40876: - > This is because Parquet Int32, which is a physical type in Parquet, can > represent byte/short etc, using additional logical types. See > [here|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#numeric-types] > for more info. Based on the logical types, Spark converts Parquet type to > different Spark types, such as {{{}ByteType{}}}, {{{}ShortType{}}}, etc. Right, i totally forgot that Parquet doesn't have smaller than Int32. > Spark's Vectorized ParquetReader should support type promotions > --- > > Key: SPARK-40876 > URL: https://issues.apache.org/jira/browse/SPARK-40876 > Project: Spark > Issue Type: Improvement > Components: Input/Output >Affects Versions: 3.3.0 >Reporter: Alexey Kudinkin >Priority: Major > > Currently, when reading Parquet table using Spark's `VectorizedColumnReader`, > we hit an issue where we specify requested (projection) schema where one of > the field's type is widened from int32 to long. > Expectation is that since this is totally legitimate primitive type > promotion, we should be able to read Ints into Longs w/ no problems (for ex, > Avro is able to do that perfectly fine). > However, we're facing an issue where `ParquetVectorUpdaterFactory.getUpdater` > method fails w/ the exception listed below. > Looking at the code, It actually seems to be allowing the opposite – it > allows to "down-size" Int32s persisted in the Parquet to be read as Bytes or > Shorts for ex. I'm actually not sure what's the rationale for this behavior, > and this actually seems like a bug to me (as this will essentially be leading > to data truncation): > {code:java} > case INT32: > if (sparkType == DataTypes.IntegerType || canReadAsIntDecimal(descriptor, > sparkType)) { > return new IntegerUpdater(); > } else if (sparkType == DataTypes.LongType && isUnsignedIntTypeMatched(32)) > { > // In `ParquetToSparkSchemaConverter`, we map parquet UINT32 to our > LongType. > // For unsigned int32, it stores as plain signed int32 in Parquet when > dictionary > // fallbacks. We read them as long values. > return new UnsignedIntegerUpdater(); > } else if (sparkType == DataTypes.ByteType) { > return new ByteUpdater(); > } else if (sparkType == DataTypes.ShortType) { > return new ShortUpdater(); > } else if (sparkType == DataTypes.DateType) { > if ("CORRECTED".equals(datetimeRebaseMode)) { > return new IntegerUpdater(); > } else { > boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); > return new IntegerWithRebaseUpdater(failIfRebase); > } > } > break; {code} > Exception: > {code:java} > at > org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160) > at scala.Option.foreach(Option.scala:407) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279) > at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at >
[jira] [Comment Edited] (SPARK-40876) Spark's Vectorized ParquetReader should support type promotions
[ https://issues.apache.org/jira/browse/SPARK-40876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622518#comment-17622518 ] Alexey Kudinkin edited comment on SPARK-40876 at 10/21/22 11:53 PM: > This is because Parquet Int32, which is a physical type in Parquet, can > represent byte/short etc, using additional logical types. See > [here|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#numeric-types] > for more info. Based on the logical types, Spark converts Parquet type to > different Spark types, such as {{{}ByteType{}}}, {{{}ShortType{}}}, etc. Right, i totally forgot that Parquet doesn't have integers smaller than Int32. was (Author: alexey.kudinkin): > This is because Parquet Int32, which is a physical type in Parquet, can > represent byte/short etc, using additional logical types. See > [here|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#numeric-types] > for more info. Based on the logical types, Spark converts Parquet type to > different Spark types, such as {{{}ByteType{}}}, {{{}ShortType{}}}, etc. Right, i totally forgot that Parquet doesn't have smaller than Int32. > Spark's Vectorized ParquetReader should support type promotions > --- > > Key: SPARK-40876 > URL: https://issues.apache.org/jira/browse/SPARK-40876 > Project: Spark > Issue Type: Improvement > Components: Input/Output >Affects Versions: 3.3.0 >Reporter: Alexey Kudinkin >Priority: Major > > Currently, when reading Parquet table using Spark's `VectorizedColumnReader`, > we hit an issue where we specify requested (projection) schema where one of > the field's type is widened from int32 to long. > Expectation is that since this is totally legitimate primitive type > promotion, we should be able to read Ints into Longs w/ no problems (for ex, > Avro is able to do that perfectly fine). > However, we're facing an issue where `ParquetVectorUpdaterFactory.getUpdater` > method fails w/ the exception listed below. > Looking at the code, It actually seems to be allowing the opposite – it > allows to "down-size" Int32s persisted in the Parquet to be read as Bytes or > Shorts for ex. I'm actually not sure what's the rationale for this behavior, > and this actually seems like a bug to me (as this will essentially be leading > to data truncation): > {code:java} > case INT32: > if (sparkType == DataTypes.IntegerType || canReadAsIntDecimal(descriptor, > sparkType)) { > return new IntegerUpdater(); > } else if (sparkType == DataTypes.LongType && isUnsignedIntTypeMatched(32)) > { > // In `ParquetToSparkSchemaConverter`, we map parquet UINT32 to our > LongType. > // For unsigned int32, it stores as plain signed int32 in Parquet when > dictionary > // fallbacks. We read them as long values. > return new UnsignedIntegerUpdater(); > } else if (sparkType == DataTypes.ByteType) { > return new ByteUpdater(); > } else if (sparkType == DataTypes.ShortType) { > return new ShortUpdater(); > } else if (sparkType == DataTypes.DateType) { > if ("CORRECTED".equals(datetimeRebaseMode)) { > return new IntegerUpdater(); > } else { > boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); > return new IntegerWithRebaseUpdater(failIfRebase); > } > } > break; {code} > Exception: > {code:java} > at > org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160) > at scala.Option.foreach(Option.scala:407) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) > at
[jira] [Commented] (SPARK-40876) Spark's Vectorized ParquetReader should support type promotions
[ https://issues.apache.org/jira/browse/SPARK-40876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622517#comment-17622517 ] Alexey Kudinkin commented on SPARK-40876: - Alright, troubleshooting this more, the problem seems to be in `ParquetReadSupport.init` method: {code:java} val parquetClippedSchema = ParquetReadSupport.clipParquetSchema(parquetFileSchema, catalystRequestedSchema, caseSensitive) // We pass two schema to ParquetRecordMaterializer: // - parquetRequestedSchema: the schema of the file data we want to read // - catalystRequestedSchema: the schema of the rows we want to return // The reader is responsible for reconciling the differences between the two. val parquetRequestedSchema = if (schemaPruningEnabled && !enableVectorizedReader) { // Parquet-MR reader requires that parquetRequestedSchema include only those fields present // in the underlying parquetFileSchema. Therefore, we intersect the parquetClippedSchema // with the parquetFileSchema ParquetReadSupport.intersectParquetGroups(parquetClippedSchema, parquetFileSchema) .map(groupType => new MessageType(groupType.getName, groupType.getFields)) .getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE) } else { // Spark's vectorized reader only support atomic types currently. It also skip fields // in parquetRequestedSchema which are not present in the file. parquetClippedSchema } {code} Even though we clearly request the types to be promoted from Int32 to Int64 in the requested schema, it actually composes requested schema passed in to the reader by essentially just projecting the file-schema. As a result, requested schema that `VectorizedColumnReader` receives is actually the file's one (ie one w/ the Int32), but the columns will be allocated based on the Catalyst's one (which is Int64) causing misalignment. > Spark's Vectorized ParquetReader should support type promotions > --- > > Key: SPARK-40876 > URL: https://issues.apache.org/jira/browse/SPARK-40876 > Project: Spark > Issue Type: Improvement > Components: Input/Output >Affects Versions: 3.3.0 >Reporter: Alexey Kudinkin >Priority: Major > > Currently, when reading Parquet table using Spark's `VectorizedColumnReader`, > we hit an issue where we specify requested (projection) schema where one of > the field's type is widened from int32 to long. > Expectation is that since this is totally legitimate primitive type > promotion, we should be able to read Ints into Longs w/ no problems (for ex, > Avro is able to do that perfectly fine). > However, we're facing an issue where `ParquetVectorUpdaterFactory.getUpdater` > method fails w/ the exception listed below. > Looking at the code, It actually seems to be allowing the opposite – it > allows to "down-size" Int32s persisted in the Parquet to be read as Bytes or > Shorts for ex. I'm actually not sure what's the rationale for this behavior, > and this actually seems like a bug to me (as this will essentially be leading > to data truncation): > {code:java} > case INT32: > if (sparkType == DataTypes.IntegerType || canReadAsIntDecimal(descriptor, > sparkType)) { > return new IntegerUpdater(); > } else if (sparkType == DataTypes.LongType && isUnsignedIntTypeMatched(32)) > { > // In `ParquetToSparkSchemaConverter`, we map parquet UINT32 to our > LongType. > // For unsigned int32, it stores as plain signed int32 in Parquet when > dictionary > // fallbacks. We read them as long values. > return new UnsignedIntegerUpdater(); > } else if (sparkType == DataTypes.ByteType) { > return new ByteUpdater(); > } else if (sparkType == DataTypes.ShortType) { > return new ShortUpdater(); > } else if (sparkType == DataTypes.DateType) { > if ("CORRECTED".equals(datetimeRebaseMode)) { > return new IntegerUpdater(); > } else { > boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); > return new IntegerWithRebaseUpdater(failIfRebase); > } > } > break; {code} > Exception: > {code:java} > at > org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160) >
[jira] [Updated] (SPARK-40876) Spark's Vectorized ParquetReader should support type promotions
[ https://issues.apache.org/jira/browse/SPARK-40876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Kudinkin updated SPARK-40876: Description: Currently, when reading Parquet table using Spark's `VectorizedColumnReader`, we hit an issue where we specify requested (projection) schema where one of the field's type is widened from int32 to long. Expectation is that since this is totally legitimate primitive type promotion, we should be able to read Ints into Longs w/ no problems (for ex, Avro is able to do that perfectly fine). However, we're facing an issue where `ParquetVectorUpdaterFactory.getUpdater` method fails w/ the exception listed below. Looking at the code, It actually seems to be allowing the opposite – it allows to "down-size" Int32s persisted in the Parquet to be read as Bytes or Shorts for ex. I'm actually not sure what's the rationale for this behavior, and this actually seems like a bug to me (as this will essentially be leading to data truncation): {code:java} case INT32: if (sparkType == DataTypes.IntegerType || canReadAsIntDecimal(descriptor, sparkType)) { return new IntegerUpdater(); } else if (sparkType == DataTypes.LongType && isUnsignedIntTypeMatched(32)) { // In `ParquetToSparkSchemaConverter`, we map parquet UINT32 to our LongType. // For unsigned int32, it stores as plain signed int32 in Parquet when dictionary // fallbacks. We read them as long values. return new UnsignedIntegerUpdater(); } else if (sparkType == DataTypes.ByteType) { return new ByteUpdater(); } else if (sparkType == DataTypes.ShortType) { return new ShortUpdater(); } else if (sparkType == DataTypes.DateType) { if ("CORRECTED".equals(datetimeRebaseMode)) { return new IntegerUpdater(); } else { boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); return new IntegerWithRebaseUpdater(failIfRebase); } } break; {code} Exception: {code:java} at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.collect(RDD.scala:1029) at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:304) at org.apache.spark.RangePartitioner.(Partitioner.scala:171) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:293) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:173) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:167) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:143) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:139) at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:68) at
[jira] [Created] (SPARK-40876) Spark's Vectorized ParquetReader should support type promotions
Alexey Kudinkin created SPARK-40876: --- Summary: Spark's Vectorized ParquetReader should support type promotions Key: SPARK-40876 URL: https://issues.apache.org/jira/browse/SPARK-40876 Project: Spark Issue Type: Improvement Components: Input/Output Affects Versions: 3.3.0 Reporter: Alexey Kudinkin Currently, when reading Parquet table using Spark's `VectorizedColumnReader`, we hit an issue where we specify requested (projection) schema where one of the field's type is widened from int32 to long. Expectation is that since this is totally legitimate primitive type promotion, we should be able to read Ints into Longs w/ no problems (for ex, Avro is able to do that perfectly fine). However, we're facing an issue where `ParquetVectorUpdaterFactory.getUpdater` method fails w/ the exception listed below. Looking at the code, It actually seems to be allowing the opposite – it allows to "down-size" Int32s persisted in the Parquet to be read as Bytes or Shorts for ex. I'm actually not sure what's the rationale for this behavior, and this actually seems like a bug to me (as this will essentially be leading to data truncation): {code:java} case INT32: if (sparkType == DataTypes.IntegerType || canReadAsIntDecimal(descriptor, sparkType)) { return new IntegerUpdater(); } else if (sparkType == DataTypes.LongType && isUnsignedIntTypeMatched(32)) { // In `ParquetToSparkSchemaConverter`, we map parquet UINT32 to our LongType. // For unsigned int32, it stores as plain signed int32 in Parquet when dictionary // fallbacks. We read them as long values. return new UnsignedIntegerUpdater(); } else if (sparkType == DataTypes.ByteType) { return new ByteUpdater(); } else if (sparkType == DataTypes.ShortType) { return new ShortUpdater(); } else if (sparkType == DataTypes.DateType) { if ("CORRECTED".equals(datetimeRebaseMode)) { return new IntegerUpdater(); } else { boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); return new IntegerWithRebaseUpdater(failIfRebase); } } break; {code} Exception: {code:java} at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.collect(RDD.scala:1029) at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:304) at org.apache.spark.RangePartitioner.(Partitioner.scala:171) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:293) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:173) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:167) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:143) at
[jira] [Created] (SPARK-38512) ResolveFunctions implemented incorrectly requiring multiple passes to Resolve Nested Expressions
Alexey Kudinkin created SPARK-38512: --- Summary: ResolveFunctions implemented incorrectly requiring multiple passes to Resolve Nested Expressions Key: SPARK-38512 URL: https://issues.apache.org/jira/browse/SPARK-38512 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.2.1, 3.2.0 Reporter: Alexey Kudinkin ResolveFunctions Rule is implemented incorrectly requiring multiple passes to Resolve Nested Expressions: While Plan object is traversed correctly in post-order (bottoms-up, `plan.resolveOperatorsUpWithPruning), internally, Plan children though are traversed incorrectly in pre-order (top-down, using `transformExpressionsWithPruning`): {code:java} case q: LogicalPlan => q.transformExpressionsWithPruning(...) { ... } {code} Traversing in pre-order means that attempt is taken to resolve the current node, before its children are resolved, which is incorrect, since the node itself could not be resolved before its children are. While this is not leading to failures yet, this is taxing on performance – most of the expressions in Spark should be able to be resolved in a *single pass* (if resolved bottoms-up, take reproducible sample at the bottom). Instead, it currently takes Spark at least *N* iterations to resolve such expressions, where N is proportional to the depth of the Expression tree. Example to reproduce: {code:java} def resolveExpr(spark: SparkSession, exprStr: String, tableSchema: StructType): Expression = { val expr = spark.sessionState.sqlParser.parseExpression(exprStr) val analyzer = spark.sessionState.analyzer val schemaFields = tableSchema.fields val resolvedExpr = { val plan: LogicalPlan = Filter(expr, LocalRelation(schemaFields.head, schemaFields.drop(1): _*)) val rules: Seq[Rule[LogicalPlan]] = { analyzer.ResolveFunctions :: analyzer.ResolveReferences :: Nil } rules.foldRight(plan)((rule, plan) => rule.apply(plan)) .asInstanceOf[Filter] .condition } resolvedExpr } // Invoke with resolveExpr(spark, "date_format(to_timestamp(B, '-MM-dd'), 'MM/dd/')", StructType(StructField("B", StringType))){code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org