[jira] [Comment Edited] (SPARK-42068) Implicit conversion is not working with parallelization in scala with java 11 and spark3

2023-01-31 Thread Attila Zsolt Piros (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682768#comment-17682768
 ] 

Attila Zsolt Piros edited comment on SPARK-42068 at 1/31/23 9:42 PM:
-

Resolving this issue as a workaround exits by using a custom 
ForkJoinWorkerThreadFactory which does not change the context class loader. 
Moreover there is no other way to replace the thread factory for a directly 
constructed ForkJoinPool as the default thread factory is final field, see 
https://github.com/openjdk/jdk/blame/jdk-11%2B28/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java#L1184-L1185
 
which always set as
https://github.com/openjdk/jdk/blame/jdk-11%2B28/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java#L3195-L3196
and passed to newly constructed ForkJoinPool when no other thread factory is 
passed:
https://github.com/openjdk/jdk/blame/jdk-11%2B28/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java#L2165-L2166


was (Author: attilapiros):
Resolving this issue as a workaround exits by using a custom 
ForkJoinWorkerThreadFactory which does not change the context class loader. 
Moreover there is no other way to replace the thread factory for a directly 
constructed ForkJoinPool as the default thread factory is final field, see 
https://github.com/openjdk/jdk/blame/jdk-11%2B28/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java#L1184-L1185
 
and
https://github.com/openjdk/jdk/blame/jdk-11%2B28/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java#L2165-L2166

> Implicit conversion is not working with parallelization in scala with java 11 
> and spark3
> 
>
> Key: SPARK-42068
> URL: https://issues.apache.org/jira/browse/SPARK-42068
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.3.1, 3.2.3, 3.4.0
> Environment: spark version 3.3.1 Using Scala version 2.12.15 (OpenJDK 
> 64-Bit Server VM, Java 11.0.17)
>Reporter: Srinivas Rishindra Pothireddi
>Assignee: Attila Zsolt Piros
>Priority: Major
>
> The following code snippet fails with java 11 with spark3, but works with 
> java 8. It also works with spark2 and java 11. 
> {code:java}
> import scala.collection.mutable
> import scala.collection.parallel.{ExecutionContextTaskSupport, 
> ForkJoinTaskSupport}
> case class Person(name: String, age: Int)
> val pc = List(1, 2, 3).par
> val forkJoinPool = new java.util.concurrent.ForkJoinPool(2)
> pc.tasksupport = new ForkJoinTaskSupport(forkJoinPool)
> pc.map { x =>
>     val personList: Array[Person] = (1 to 999).map(value => Person("p" + 
> value, value)).toArray
>     //creating RDD of Person
>     val rddPerson = spark.sparkContext.parallelize(personList, 5)
>     val evenAgePerson = rddPerson.filter(_.age % 2 == 0)
>     import spark.implicits._
>     val evenAgePersonDF = evenAgePerson.toDF("Name", "Age")
> } {code}
> The error is as follows.
> {code:java}
> scala.ScalaReflectionException: object $read not found.
>   at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:185)
>   at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:29)
>   at $typecreator6$1.apply(:37)
>   at 
> scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:237)
>   at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:237)
>   at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:52)
>   at org.apache.spark.sql.Encoders$.product(Encoders.scala:300)
>   at 
> org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder(SQLImplicits.scala:261)
>   at 
> org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder$(SQLImplicits.scala:261)
>   at 
> org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:32)
>   at $anonfun$res0$1(:37)
>   at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
>   at 
> scala.collection.parallel.AugmentedIterableIterator.map2combiner(RemainsIterator.scala:116)
>   at 
> scala.collection.parallel.AugmentedIterableIterator.map2combiner$(RemainsIterator.scala:113)
>   at 
> scala.collection.parallel.immutable.ParVector$ParVectorIterator.map2combiner(ParVector.scala:66)
>   at 
> scala.collection.parallel.ParIterableLike$Map.leaf(ParIterableLike.scala:1064)
>   at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53)
>   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67)
>   at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56)
>   at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50)
>   at 
> 

[jira] [Comment Edited] (SPARK-42068) Implicit conversion is not working with parallelization in scala with java 11 and spark3

2023-01-31 Thread Attila Zsolt Piros (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682768#comment-17682768
 ] 

Attila Zsolt Piros edited comment on SPARK-42068 at 1/31/23 9:40 PM:
-

Resolving this issue as a workaround exits by using a custom 
ForkJoinWorkerThreadFactory which does not change the context class loader. 
Moreover there is no other way to replace the thread factory for a directly 
constructed ForkJoinPool as the default thread factory is final field, see 
https://github.com/openjdk/jdk/blame/jdk-11%2B28/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java#L1184-L1185
 
and
https://github.com/openjdk/jdk/blame/jdk-11%2B28/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java#L2165-L2166


was (Author: attilapiros):
Resolving this issue as a workaround exits by using a custom 
ForkJoinWorkerThreadFactory which does not change the context class loader. 
Moreover there is no other way to replace the thread factory for a directly 
constructed ForkJoinPool as the default thread factory is final field, see 
https://github.com/openjdk/jdk/blame/jdk-11%2B28/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java#L1184-L1185
 
and
https://github.com/openjdk/jdk/blame/jdk-11%2B28/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java#L2166


> Implicit conversion is not working with parallelization in scala with java 11 
> and spark3
> 
>
> Key: SPARK-42068
> URL: https://issues.apache.org/jira/browse/SPARK-42068
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.3.1, 3.2.3, 3.4.0
> Environment: spark version 3.3.1 Using Scala version 2.12.15 (OpenJDK 
> 64-Bit Server VM, Java 11.0.17)
>Reporter: Srinivas Rishindra Pothireddi
>Assignee: Attila Zsolt Piros
>Priority: Major
>
> The following code snippet fails with java 11 with spark3, but works with 
> java 8. It also works with spark2 and java 11. 
> {code:java}
> import scala.collection.mutable
> import scala.collection.parallel.{ExecutionContextTaskSupport, 
> ForkJoinTaskSupport}
> case class Person(name: String, age: Int)
> val pc = List(1, 2, 3).par
> val forkJoinPool = new java.util.concurrent.ForkJoinPool(2)
> pc.tasksupport = new ForkJoinTaskSupport(forkJoinPool)
> pc.map { x =>
>     val personList: Array[Person] = (1 to 999).map(value => Person("p" + 
> value, value)).toArray
>     //creating RDD of Person
>     val rddPerson = spark.sparkContext.parallelize(personList, 5)
>     val evenAgePerson = rddPerson.filter(_.age % 2 == 0)
>     import spark.implicits._
>     val evenAgePersonDF = evenAgePerson.toDF("Name", "Age")
> } {code}
> The error is as follows.
> {code:java}
> scala.ScalaReflectionException: object $read not found.
>   at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:185)
>   at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:29)
>   at $typecreator6$1.apply(:37)
>   at 
> scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:237)
>   at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:237)
>   at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:52)
>   at org.apache.spark.sql.Encoders$.product(Encoders.scala:300)
>   at 
> org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder(SQLImplicits.scala:261)
>   at 
> org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder$(SQLImplicits.scala:261)
>   at 
> org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:32)
>   at $anonfun$res0$1(:37)
>   at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
>   at 
> scala.collection.parallel.AugmentedIterableIterator.map2combiner(RemainsIterator.scala:116)
>   at 
> scala.collection.parallel.AugmentedIterableIterator.map2combiner$(RemainsIterator.scala:113)
>   at 
> scala.collection.parallel.immutable.ParVector$ParVectorIterator.map2combiner(ParVector.scala:66)
>   at 
> scala.collection.parallel.ParIterableLike$Map.leaf(ParIterableLike.scala:1064)
>   at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53)
>   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67)
>   at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56)
>   at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50)
>   at 
> scala.collection.parallel.ParIterableLike$Map.tryLeaf(ParIterableLike.scala:1061)
>   at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.internal(Tasks.scala:160)
>   at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.internal$(Tasks.scala:157)
>   at 
> 

[jira] [Comment Edited] (SPARK-42068) Implicit conversion is not working with parallelization in scala with java 11 and spark3

2023-01-31 Thread Attila Zsolt Piros (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682768#comment-17682768
 ] 

Attila Zsolt Piros edited comment on SPARK-42068 at 1/31/23 9:39 PM:
-

Resolving this issue as a workaround exits by using a custom 
ForkJoinWorkerThreadFactory which does not change the context class loader. 
Moreover there is no other way to replace the thread factory for a directly 
constructed ForkJoinPool as the default thread factory is final field, see 
https://github.com/openjdk/jdk/blame/jdk-11%2B28/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java#L1184-L1185
 
and
https://github.com/openjdk/jdk/blame/jdk-11%2B28/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java#L2166



was (Author: attilapiros):
Resolving this issue as a workaround exits by using a custom 
ForkJoinWorkerThreadFactory which does not change the context class loader. 
Moreover there is no other way to replace the thread factory for a directly 
constructed ForkJoinPool as the default thread factory is final field, see 
https://github.com/openjdk/jdk/blame/jdk-11%2B28/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java#L1184-L1185
 


> Implicit conversion is not working with parallelization in scala with java 11 
> and spark3
> 
>
> Key: SPARK-42068
> URL: https://issues.apache.org/jira/browse/SPARK-42068
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.3.1, 3.2.3, 3.4.0
> Environment: spark version 3.3.1 Using Scala version 2.12.15 (OpenJDK 
> 64-Bit Server VM, Java 11.0.17)
>Reporter: Srinivas Rishindra Pothireddi
>Assignee: Attila Zsolt Piros
>Priority: Major
>
> The following code snippet fails with java 11 with spark3, but works with 
> java 8. It also works with spark2 and java 11. 
> {code:java}
> import scala.collection.mutable
> import scala.collection.parallel.{ExecutionContextTaskSupport, 
> ForkJoinTaskSupport}
> case class Person(name: String, age: Int)
> val pc = List(1, 2, 3).par
> val forkJoinPool = new java.util.concurrent.ForkJoinPool(2)
> pc.tasksupport = new ForkJoinTaskSupport(forkJoinPool)
> pc.map { x =>
>     val personList: Array[Person] = (1 to 999).map(value => Person("p" + 
> value, value)).toArray
>     //creating RDD of Person
>     val rddPerson = spark.sparkContext.parallelize(personList, 5)
>     val evenAgePerson = rddPerson.filter(_.age % 2 == 0)
>     import spark.implicits._
>     val evenAgePersonDF = evenAgePerson.toDF("Name", "Age")
> } {code}
> The error is as follows.
> {code:java}
> scala.ScalaReflectionException: object $read not found.
>   at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:185)
>   at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:29)
>   at $typecreator6$1.apply(:37)
>   at 
> scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:237)
>   at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:237)
>   at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:52)
>   at org.apache.spark.sql.Encoders$.product(Encoders.scala:300)
>   at 
> org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder(SQLImplicits.scala:261)
>   at 
> org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder$(SQLImplicits.scala:261)
>   at 
> org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:32)
>   at $anonfun$res0$1(:37)
>   at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
>   at 
> scala.collection.parallel.AugmentedIterableIterator.map2combiner(RemainsIterator.scala:116)
>   at 
> scala.collection.parallel.AugmentedIterableIterator.map2combiner$(RemainsIterator.scala:113)
>   at 
> scala.collection.parallel.immutable.ParVector$ParVectorIterator.map2combiner(ParVector.scala:66)
>   at 
> scala.collection.parallel.ParIterableLike$Map.leaf(ParIterableLike.scala:1064)
>   at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53)
>   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67)
>   at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56)
>   at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50)
>   at 
> scala.collection.parallel.ParIterableLike$Map.tryLeaf(ParIterableLike.scala:1061)
>   at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.internal(Tasks.scala:160)
>   at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.internal$(Tasks.scala:157)
>   at 
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:440)
>   at 
> 

[jira] [Comment Edited] (SPARK-42068) Implicit conversion is not working with parallelization in scala with java 11 and spark3

2023-01-31 Thread Attila Zsolt Piros (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682756#comment-17682756
 ] 

Attila Zsolt Piros edited comment on SPARK-42068 at 1/31/23 9:10 PM:
-

This issue is because of JDK bugfix 
([https://bugs.openjdk.org/browse/JDK-8172726]) which sets the context class 
loader to the system class loader for fork join pools, for details see [this 
code|https://github.com/openjdk/jdk/blame/jdk-11%2B28/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java#L721].

The JDK bugfix is only available from JDK9 so this is why it is not an issue on 
Java 8.

In case of Java 8 (on a Scala REPL/Spark shell) the context class loader is the 
same as callers class loader: 
{*}scala.tools.nsc.interpreter.IMain$TranslatingClassLoader{*}. This is needed 
as in case of a REPL for each line new code is generated and compiled. This can 
be only resolved by this TranslatingClassLoader. 
You can check this generated code parts by passing *-Xprint:typer* to Scala 
REPL/spark-shell.  Passing this argument reveal where the *$read* object 
mentioned in the exception ("scala.ScalaReflectionException: object $read not 
found") is coming from.

*The workaround*

We just need to use a ForkJoinWorkerThreadFactory which does not changes the 
context class loader:
{noformat}
import java.util.concurrent._
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory
...
class CustomForkJoinWorkerThread(pool: ForkJoinPool) extends 
ForkJoinWorkerThread(pool)
object customForkJoinWorkerThreadFactory extends ForkJoinWorkerThreadFactory {
override def newThread(pool: ForkJoinPool): ForkJoinWorkerThread =
  new CustomForkJoinWorkerThread(pool)
}
val forkJoinPool = new ForkJoinPool(2, customForkJoinWorkerThreadFactory, null, 
false)
{noformat}

In case of common pools *ForkJoinPool.commonPool()* one need to set the 
"java.util.concurrent.ForkJoinPool.common.threadFactory" system property to the 
new fully-qualified name of the CustomForkJoinWorkerThread but this only have 
effect to the common pools not those pools which created directly by calling a 
constructor of the ForkJoinPool.


was (Author: attilapiros):
This issue is because of JDK bugfix 
([https://bugs.openjdk.org/browse/JDK-8172726]) which sets the context class 
loader to the system class loader for fork join pools, for details see [this 
code|https://github.com/openjdk/jdk/blame/jdk-11%2B28/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java#L721].

The JDK bugfix is only available from JDK9 so this is why it is not an issue on 
Java 8.

In case of Java 8 (on a Scala REPL/Spark shell) the context class loader is the 
same as callers class loader: 
{*}scala.tools.nsc.interpreter.IMain$TranslatingClassLoader{*}. This is needed 
as in case of a REPL for each line new code is generated and compiled. This can 
be only resolved by this TranslatingClassLoader. 
You can check this generated code parts by passing *-Xprint:typer* to Scala 
REPL/spark-shell.  Passing this argument reveal where the *$read* object 
mentioned in the exception ("scala.ScalaReflectionException: object $read not 
found") is coming from.

*The workaround*

We just need to use a ForkJoinPool which does not changes the context class 
loader:
{noformat}
import java.util.concurrent._
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory
...
class CustomForkJoinWorkerThread(pool: ForkJoinPool) extends 
ForkJoinWorkerThread(pool)
object customForkJoinWorkerThreadFactory extends ForkJoinWorkerThreadFactory {
override def newThread(pool: ForkJoinPool): ForkJoinWorkerThread =
  new CustomForkJoinWorkerThread(pool)
}
val forkJoinPool = new ForkJoinPool(2, customForkJoinWorkerThreadFactory, null, 
false)
{noformat}

In case of common pools *ForkJoinPool.commonPool()* one need to set the 
"java.util.concurrent.ForkJoinPool.common.threadFactory" system property to the 
new fully-qualified name of the CustomForkJoinWorkerThread but this only have 
effect to the common pools not those pools which created directly by calling a 
constructor of the ForkJoinPool.

> Implicit conversion is not working with parallelization in scala with java 11 
> and spark3
> 
>
> Key: SPARK-42068
> URL: https://issues.apache.org/jira/browse/SPARK-42068
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.3.1, 3.2.3, 3.4.0
> Environment: spark version 3.3.1 Using Scala version 2.12.15 (OpenJDK 
> 64-Bit Server VM, Java 11.0.17)
>Reporter: Srinivas Rishindra Pothireddi
>Priority: Major
>
> The following code snippet fails with java 11 with spark3, but works with 
> java 8. It also works with spark2 and java 11. 
> {code:java}
> import 

[jira] [Comment Edited] (SPARK-42068) Implicit conversion is not working with parallelization in scala with java 11 and spark3

2023-01-31 Thread Attila Zsolt Piros (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682756#comment-17682756
 ] 

Attila Zsolt Piros edited comment on SPARK-42068 at 1/31/23 9:09 PM:
-

This issue is because of JDK bugfix 
([https://bugs.openjdk.org/browse/JDK-8172726]) which sets the context class 
loader to the system class loader for fork join pools, for details see [this 
code|https://github.com/openjdk/jdk/blame/jdk-11%2B28/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java#L721].

The JDK bugfix is only available from JDK9 so this is why it is not an issue on 
Java 8.

In case of Java 8 (on a Scala REPL/Spark shell) the context class loader is the 
same as callers class loader: 
{*}scala.tools.nsc.interpreter.IMain$TranslatingClassLoader{*}. This is needed 
as in case of a REPL for each line new code is generated and compiled. This can 
be only resolved by this TranslatingClassLoader. 
You can check this generated code parts by passing *-Xprint:typer* to Scala 
REPL/spark-shell.  Passing this argument reveal where the *$read* object 
mentioned in the exception ("scala.ScalaReflectionException: object $read not 
found") is coming from.

*The workaround*

We just need to use a ForkJoinPool which does not changes the context class 
loader:
{noformat}
import java.util.concurrent._
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory
...
class CustomForkJoinWorkerThread(pool: ForkJoinPool) extends 
ForkJoinWorkerThread(pool)
object customForkJoinWorkerThreadFactory extends ForkJoinWorkerThreadFactory {
override def newThread(pool: ForkJoinPool): ForkJoinWorkerThread =
  new CustomForkJoinWorkerThread(pool)
}
val forkJoinPool = new ForkJoinPool(2, customForkJoinWorkerThreadFactory, null, 
false)
{noformat}

In case of common pools *ForkJoinPool.commonPool()* one need to set the 
"java.util.concurrent.ForkJoinPool.common.threadFactory" system property to the 
new fully-qualified name of the CustomForkJoinWorkerThread but this only have 
effect to the common pools not those pools which created directly by calling a 
constructor of the ForkJoinPool.


was (Author: attilapiros):
This issue is because of JDK bugfix 
([https://bugs.openjdk.org/browse/JDK-8172726]) which sets the context class 
loader to the system class loader for fork join pools, for details see [this 
code|https://github.com/openjdk/jdk/blame/jdk-11%2B28/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java#L721].

The JDK bugfix is only available from JDK9 so this is why it is not an issue on 
Java 8.

In case of Java 8 (on a Scala REPL/Spark shell) the context class loader is the 
same as callers class loader: 
{*}scala.tools.nsc.interpreter.IMain$TranslatingClassLoader{*}. This is needed 
as in case of a REPL for each line new code is generated and compiled. This can 
be only resolved by this TranslatingClassLoader. 
You can check this generated code parts by passing *-Xprint:typer* to Scala 
REPL/spark-shell.  Passing this argument reveal where the *$read* object 
mentioned in the exception ("scala.ScalaReflectionException: object $read not 
found") is coming from.

*The workaround*

We just need to use a ForkJoinPool which does not changes the context class 
loader
{noformat}
import java.util.concurrent._
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory
...
class CustomForkJoinWorkerThread(pool: ForkJoinPool) extends 
ForkJoinWorkerThread(pool)
object customForkJoinWorkerThreadFactory extends ForkJoinWorkerThreadFactory {
override def newThread(pool: ForkJoinPool): ForkJoinWorkerThread =
  new CustomForkJoinWorkerThread(pool)
}
val forkJoinPool = new ForkJoinPool(2, customForkJoinWorkerThreadFactory, null, 
false)
{noformat}

In case of common pools *ForkJoinPool.commonPool()* one need to set the 
"java.util.concurrent.ForkJoinPool.common.threadFactory" system property to the 
new fully-qualified name of the CustomForkJoinWorkerThread but this only have 
effect to the common pools not those pools which created directly by calling a 
constructor of the ForkJoinPool.

> Implicit conversion is not working with parallelization in scala with java 11 
> and spark3
> 
>
> Key: SPARK-42068
> URL: https://issues.apache.org/jira/browse/SPARK-42068
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.3.1, 3.2.3, 3.4.0
> Environment: spark version 3.3.1 Using Scala version 2.12.15 (OpenJDK 
> 64-Bit Server VM, Java 11.0.17)
>Reporter: Srinivas Rishindra Pothireddi
>Priority: Major
>
> The following code snippet fails with java 11 with spark3, but works with 
> java 8. It also works with spark2 and java 11. 
> {code:java}
> import 

[jira] [Comment Edited] (SPARK-42068) Implicit conversion is not working with parallelization in scala with java 11 and spark3

2023-01-31 Thread Attila Zsolt Piros (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682756#comment-17682756
 ] 

Attila Zsolt Piros edited comment on SPARK-42068 at 1/31/23 9:08 PM:
-

This issue is because of JDK bugfix 
([https://bugs.openjdk.org/browse/JDK-8172726]) which sets the context class 
loader to the system class loader for fork join pools, for details see [this 
code|https://github.com/openjdk/jdk/blame/jdk-11%2B28/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java#L721].

The JDK bugfix is only available from JDK9 so this is why it is not an issue on 
Java 8.

In case of Java 8 (on a Scala REPL/Spark shell) the context class loader is the 
same as callers class loader: 
{*}scala.tools.nsc.interpreter.IMain$TranslatingClassLoader{*}. This is needed 
as in case of a REPL for each line new code is generated and compiled. This can 
be only resolved by this TranslatingClassLoader. 
You can check this generated code parts by passing *-Xprint:typer* to Scala 
REPL/spark-shell.  Passing this argument reveal where the *$read* object 
mentioned in the exception ("scala.ScalaReflectionException: object $read not 
found") is coming from.

*The workaround*

We just need to use a ForkJoinPool which does not changes the context class 
loader
{noformat}
import java.util.concurrent._
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory
...
class CustomForkJoinWorkerThread(pool: ForkJoinPool) extends 
ForkJoinWorkerThread(pool)
object customForkJoinWorkerThreadFactory extends ForkJoinWorkerThreadFactory {
override def newThread(pool: ForkJoinPool): ForkJoinWorkerThread =
  new CustomForkJoinWorkerThread(pool)
}
val forkJoinPool = new ForkJoinPool(2, customForkJoinWorkerThreadFactory, null, 
false)
{noformat}

In case of common pools *ForkJoinPool.commonPool()* one need to set the 
"java.util.concurrent.ForkJoinPool.common.threadFactory" system property to the 
new fully-qualified name of the CustomForkJoinWorkerThread but this only have 
effect to the common pools not those pools which created directly by calling a 
constructor of the ForkJoinPool.


was (Author: attilapiros):
This issue is because of JDK bugfix 
([https://bugs.openjdk.org/browse/JDK-8172726]) which sets the context class 
loader to the system class loader for fork join pools, for details see [this 
code|https://github.com/openjdk/jdk/blame/jdk-11%2B28/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java#L721].

The JDK bugfix is only available from JDK9 so this is why it is not an issue on 
Java 8.

In case of Java 8 (on a Scala REPL/Spark shell) the context class loader is the 
same as callers class loader: 
{*}scala.tools.nsc.interpreter.IMain$TranslatingClassLoader{*}. This is needed 
as in case of a REPL for each line new code is generated and compiled. This can 
be only resolved by this TranslatingClassLoader. 
You can check this generated code parts by passing *-Xprint:typer* to Scala 
REPL/spark-shell.  Passing this argument reveal where the *$read* object 
mentioned in the exception ("scala.ScalaReflectionException: object $read not 
found") is coming from.

*The workaround*

We just need to use a ForkJoinPool which does not changes the context class 
loader
{noformat}
import java.util.concurrent._
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory
...
class CustomForkJoinWorkerThread(pool: ForkJoinPool) extends 
ForkJoinWorkerThread(pool)
object customForkJoinWorkerThreadFactory extends ForkJoinWorkerThreadFactory {
override def newThread(pool: ForkJoinPool): ForkJoinWorkerThread =
  new CustomForkJoinWorkerThread(pool)
}
val forkJoinPool = new ForkJoinPool(maxTasks, 
customForkJoinWorkerThreadFactory, null, false)
{noformat}

In case of common pools *ForkJoinPool.commonPool()* one need to set the 
"java.util.concurrent.ForkJoinPool.common.threadFactory" system property to the 
new fully-qualified name of the CustomForkJoinWorkerThread but this only have 
effect to the common pools not those pools which created directly by calling a 
constructor of the ForkJoinPool.

> Implicit conversion is not working with parallelization in scala with java 11 
> and spark3
> 
>
> Key: SPARK-42068
> URL: https://issues.apache.org/jira/browse/SPARK-42068
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.3.1, 3.2.3, 3.4.0
> Environment: spark version 3.3.1 Using Scala version 2.12.15 (OpenJDK 
> 64-Bit Server VM, Java 11.0.17)
>Reporter: Srinivas Rishindra Pothireddi
>Priority: Major
>
> The following code snippet fails with java 11 with spark3, but works with 
> java 8. It also works with spark2 and java 11. 
> {code:java}
> import