[ 
https://issues.apache.org/jira/browse/SPARK-35096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-35096:
------------------------------------

    Assignee: Apache Spark

> foreachBatch throws ArrayIndexOutOfBoundsException if schema is case 
> Insensitive
> --------------------------------------------------------------------------------
>
>                 Key: SPARK-35096
>                 URL: https://issues.apache.org/jira/browse/SPARK-35096
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Sandeep Katta
>            Assignee: Apache Spark
>            Priority: Major
>
> Below code works fine before spark3, running on spark3 throws 
> java.lang.ArrayIndexOutOfBoundsException
> {code:java}
> val inputPath = "/Users/xyz/data/testcaseInsensitivity"
> val output_path = "/Users/xyz/output"
> spark.range(10).write.format("parquet").save(inputPath)
> def process_row(microBatch: DataFrame, batchId: Long): Unit = {
>   val df = microBatch.select($"ID".alias("other")) // Doesn't work
>   df.write.format("parquet").mode("append").save(output_path)
> }
> val schema = new StructType().add("id", LongType)
> val stream_df = 
> spark.readStream.schema(schema).format("parquet").load(inputPath)
> stream_df.writeStream.trigger(Trigger.Once).foreachBatch(process_row _)
>   .start().awaitTermination()
> {code}
> Stack Trace:
> {code:java}
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
>   at org.apache.spark.sql.types.StructType.apply(StructType.scala:414)
>   at 
> org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$4.$anonfun$applyOrElse$3(objects.scala:216)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>   at scala.collection.immutable.List.map(List.scala:298)
>   at 
> org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$4.applyOrElse(objects.scala:215)
>   at 
> org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$4.applyOrElse(objects.scala:203)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:298)
>   at 
> org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$.apply(objects.scala:203)
>   at 
> org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$.apply(objects.scala:121)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149)
>   at 
> scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
>   at 
> scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
>   at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:116)
>   at 
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
>   at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116)
>   at 
> org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:82)
>   at 
> org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
>   at 
> org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
>   at 
> org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
>   at 
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:82)
>   at 
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:79)
>   at 
> org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$4(QueryExecution.scala:197)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:381)
>   at 
> org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$writePlans(QueryExecution.scala:197)
>   at 
> org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:207)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:95)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>   at 
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:944)
>   at 
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:396)
>   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:380)
>   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:269)
>   at process_row(<console>:32)
>   at $anonfun$res4$1(<console>:30)
>   at $anonfun$res4$1$adapted(<console>:30)
>   at 
> org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:36)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:573)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:571)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:571)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223)
>   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
>   at 
> org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:39)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
>   ... 1 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to