[ https://issues.apache.org/jira/browse/SPARK-35096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-35096: ------------------------------------ Assignee: Apache Spark > foreachBatch throws ArrayIndexOutOfBoundsException if schema is case > Insensitive > -------------------------------------------------------------------------------- > > Key: SPARK-35096 > URL: https://issues.apache.org/jira/browse/SPARK-35096 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.0.0 > Reporter: Sandeep Katta > Assignee: Apache Spark > Priority: Major > > Below code works fine before spark3, running on spark3 throws > java.lang.ArrayIndexOutOfBoundsException > {code:java} > val inputPath = "/Users/xyz/data/testcaseInsensitivity" > val output_path = "/Users/xyz/output" > spark.range(10).write.format("parquet").save(inputPath) > def process_row(microBatch: DataFrame, batchId: Long): Unit = { > val df = microBatch.select($"ID".alias("other")) // Doesn't work > df.write.format("parquet").mode("append").save(output_path) > } > val schema = new StructType().add("id", LongType) > val stream_df = > spark.readStream.schema(schema).format("parquet").load(inputPath) > stream_df.writeStream.trigger(Trigger.Once).foreachBatch(process_row _) > .start().awaitTermination() > {code} > Stack Trace: > {code:java} > Caused by: java.lang.ArrayIndexOutOfBoundsException: 0 > at org.apache.spark.sql.types.StructType.apply(StructType.scala:414) > at > org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$4.$anonfun$applyOrElse$3(objects.scala:216) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > at scala.collection.immutable.List.foreach(List.scala:392) > at scala.collection.TraversableLike.map(TraversableLike.scala:238) > at scala.collection.TraversableLike.map$(TraversableLike.scala:231) > at scala.collection.immutable.List.map(List.scala:298) > at > org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$4.applyOrElse(objects.scala:215) > at > org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$4.applyOrElse(objects.scala:203) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29) > at > org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149) > at > org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29) > at > org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149) > at > org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:298) > at > org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$.apply(objects.scala:203) > at > org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$.apply(objects.scala:121) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149) > at > scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60) > at > scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68) > at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138) > at scala.collection.immutable.List.foreach(List.scala:392) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:116) > at > org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116) > at > org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:82) > at > org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) > at > org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133) > at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) > at > org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133) > at > org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:82) > at > org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:79) > at > org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$4(QueryExecution.scala:197) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:381) > at > org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$writePlans(QueryExecution.scala:197) > at > org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:207) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:95) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) > at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) > at > org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:944) > at > org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:396) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:380) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:269) > at process_row(<console>:32) > at $anonfun$res4$1(<console>:30) > at $anonfun$res4$1$adapted(<console>:30) > at > org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:36) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:573) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) > at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:571) > at > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352) > at > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:571) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352) > at > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191) > at > org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:39) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334) > ... 1 more > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org