Srinivas Rishindra Pothireddi created SPARK-42068: -----------------------------------------------------
Summary: Parallelization in Scala is not working with Java 11 and spark3 Key: SPARK-42068 URL: https://issues.apache.org/jira/browse/SPARK-42068 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.2.3, 3.3.1, 3.4.0 Environment: ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.3.1 /_/ Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.17) Reporter: Srinivas Rishindra Pothireddi The following code snippet fails with java 11 with spark3, but works with java 8. It also works with spark2 and java 11. {code:java} import scala.collection.mutable import scala.collection.parallel.{ExecutionContextTaskSupport, ForkJoinTaskSupport} case class Person(name: String, age: Int) val pc = List(1, 2, 3).par val forkJoinPool = new java.util.concurrent.ForkJoinPool(2) pc.tasksupport = new ForkJoinTaskSupport(forkJoinPool) pc.map { x => val personList: Array[Person] = (1 to 999).map(value => Person("p" + value, value)).toArray //creating RDD of Person val rddPerson = spark.sparkContext.parallelize(personList, 5) val evenAgePerson = rddPerson.filter(_.age % 2 == 0) import spark.implicits._ val evenAgePersonDF = evenAgePerson.toDF("Name", "Age") } {code} The error is as follows. {code:java} scala.ScalaReflectionException: object $read not found. at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:185) at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:29) at $typecreator6$1.apply(<console>:37) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:237) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:237) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:52) at org.apache.spark.sql.Encoders$.product(Encoders.scala:300) at org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder(SQLImplicits.scala:261) at org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder$(SQLImplicits.scala:261) at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:32) at $anonfun$res0$1(<console>:37) at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23) at scala.collection.parallel.AugmentedIterableIterator.map2combiner(RemainsIterator.scala:116) at scala.collection.parallel.AugmentedIterableIterator.map2combiner$(RemainsIterator.scala:113) at scala.collection.parallel.immutable.ParVector$ParVectorIterator.map2combiner(ParVector.scala:66) at scala.collection.parallel.ParIterableLike$Map.leaf(ParIterableLike.scala:1064) at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67) at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56) at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50) at scala.collection.parallel.ParIterableLike$Map.tryLeaf(ParIterableLike.scala:1061) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.internal(Tasks.scala:160) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.internal$(Tasks.scala:157) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:440) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute(Tasks.scala:150) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute$(Tasks.scala:149) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:440) at java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base/java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:396) at java.base/java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:721) at scala.collection.parallel.ForkJoinTasks$WrappedTask.sync(Tasks.scala:379) at scala.collection.parallel.ForkJoinTasks$WrappedTask.sync$(Tasks.scala:379) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:440) at scala.collection.parallel.ForkJoinTasks.executeAndWaitResult(Tasks.scala:423) at scala.collection.parallel.ForkJoinTasks.executeAndWaitResult$(Tasks.scala:416) at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:60) at scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:968) at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67) at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56) at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50) at scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:963) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute(Tasks.scala:153) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute$(Tasks.scala:149) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:440) at java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) Suppressed: scala.ScalaReflectionException: object $read not found. at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:185) at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:29) at $typecreator6$1.apply(<console>:37) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:237) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:237) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:52) at org.apache.spark.sql.Encoders$.product(Encoders.scala:300) at org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder(SQLImplicits.scala:261) at org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder$(SQLImplicits.scala:261) at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:32) at $anonfun$res0$1(<console>:37) at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23) at scala.collection.parallel.AugmentedIterableIterator.map2combiner(RemainsIterator.scala:116) at scala.collection.parallel.AugmentedIterableIterator.map2combiner$(RemainsIterator.scala:113) at scala.collection.parallel.immutable.ParVector$ParVectorIterator.map2combiner(ParVector.scala:66) at scala.collection.parallel.ParIterableLike$Map.leaf(ParIterableLike.scala:1064) at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67) at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56) at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50) at scala.collection.parallel.ParIterableLike$Map.tryLeaf(ParIterableLike.scala:1061) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.internal(Tasks.scala:160) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.internal$(Tasks.scala:157) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:440) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute(Tasks.scala:150) ... 8 more Suppressed: scala.ScalaReflectionException: object $read not found. at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:185) at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:29) at $typecreator6$1.apply(<console>:37) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:237) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:237) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:52) at org.apache.spark.sql.Encoders$.product(Encoders.scala:300) at org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder(SQLImplicits.scala:261) at org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder$(SQLImplicits.scala:261) at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:32) at $anonfun$res0$1(<console>:37) at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23) at scala.collection.parallel.AugmentedIterableIterator.map2combiner(RemainsIterator.scala:116) at scala.collection.parallel.AugmentedIterableIterator.map2combiner$(RemainsIterator.scala:113) at scala.collection.parallel.immutable.ParVector$ParVectorIterator.map2combiner(ParVector.scala:66) at scala.collection.parallel.ParIterableLike$Map.leaf(ParIterableLike.scala:1064) at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67) at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56) at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50) at scala.collection.parallel.ParIterableLike$Map.tryLeaf(ParIterableLike.scala:1061) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.internal(Tasks.scala:170) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.internal$(Tasks.scala:157) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:440) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute(Tasks.scala:150) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute$(Tasks.scala:149) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:440) at java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) {code} I noticed the following snippet in the stacktrace. {code:java} at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67) {code} It seems to be that a java8 runtime is being called even though the environment is java 11. I am not sure if this is an issue with Scala 2.12.15 or with spark itself. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org