[ https://issues.apache.org/jira/browse/SPARK-29497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17748963#comment-17748963 ]
Jay Sen commented on SPARK-29497: --------------------------------- In Java, This looks like by design. I was able to make it serializable and resolve the problem. For ex: If you are using `UnaryOperator` you can tell to be serializable via following syntax. {code:java} UnaryOperator<Row> rowTransform = (UnaryOperator<Row> & Serializable) row -> { .... }{code} I found this at: [https://stackoverflow.com/questions/22807912/how-to-serialize-a-lambda] There is also good explanation on serialization logic here: [https://stackoverflow.com/questions/28186607/java-lang-classcastexception-using-lambda-expressions-in-spark-job-on-remote-ser] Hope this helps. > Cannot assign instance of java.lang.invoke.SerializedLambda to field > -------------------------------------------------------------------- > > Key: SPARK-29497 > URL: https://issues.apache.org/jira/browse/SPARK-29497 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.4.3, 3.0.1, 3.2.0 > Environment: Spark 2.4.3 Scala 2.12 > Spark 3.2.0 Scala 2.13.5 (Java 11.0.12) > Reporter: Rob Russo > Priority: Major > > Note this is for scala 2.12: > There seems to be an issue in spark with serializing a udf that is created > from a function assigned to a class member that references another function > assigned to a class member. This is similar to > https://issues.apache.org/jira/browse/SPARK-25047 but it looks like the > resolution has an issue with this case. After trimming it down to the base > issue I came up with the following to reproduce: > > > {code:java} > object TestLambdaShell extends Serializable { > val hello: String => String = s => s"hello $s!" > val lambdaTest: String => String = hello( _ ) > def functionTest: String => String = hello( _ ) > } > val hello = udf( TestLambdaShell.hello ) > val functionTest = udf( TestLambdaShell.functionTest ) > val lambdaTest = udf( TestLambdaShell.lambdaTest ) > sc.parallelize(Seq("world"),1).toDF("test").select(hello($"test")).show(1) > sc.parallelize(Seq("world"),1).toDF("test").select(functionTest($"test")).show(1) > sc.parallelize(Seq("world"),1).toDF("test").select(lambdaTest($"test")).show(1) > {code} > > All of which works except the last line which results in an exception on the > executors: > > {code:java} > Caused by: java.lang.ClassCastException: cannot assign instance of > java.lang.invoke.SerializedLambda to field > $$$82b5b23cea489b2712a1db46c77e458$$$$w$TestLambdaShell$.lambdaTest of type > scala.Function1 in instance of > $$$82b5b23cea489b2712a1db46c77e458$$$$w$TestLambdaShell$ > at > java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) > at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > at > scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > at > scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > at > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) > at > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83) > at org.apache.spark.scheduler.Task.run(Task.scala:121) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > > In spark 2.2.x I used a class that had something like this that worked fine, > now that we've upgraded to 2.12 we ran into a few serialization issues in > places, most of which were solved by extending serializable but this case was > not fixed by that. > > Also this happens regardless of whether it's done in the shell or in a jar. > > > > So after much more debugging, this turns out to be some weird mix of scala > 2.12.0 and scala 2.12.8. Spark is compiled on 2.12.8 and so is our own code > but I noticed that the maven compiled class did not match the compiled class > using 2.12.8 scalac directly. After a lot of digging we realized that > scala-compiler actually indirectly depends on scala library 2.12.0 and only > when the spark dependency is added does it start using it for some reason. > Without the spark dependency and just direct scala 2.12.8 dependencies, the > code builds fine and compiles correctly as 2.12.8. > > We were able to fix this using: > > {code:java} > <failOnMultipleScalaVersions>true</failOnMultipleScalaVersions> > <scalaCompatVersion>2.12</scalaCompatVersion> > <scalaVersion>2.12.8</scalaVersion> > {code} > > And this resolves our issue for our own jars that we create and link to > spark. However, my original test case still seems to reproduce in the spark > shell and for us also in apache zeppelin so it seems almost like somehow they > are also compiling it on 2.12.0 but I'm not quite sure how. In the spark > pom.xml it seems to have the fail on multiple versions and compiles fine so > I'm not quite sure how this is happening but at least its more isolated now. > I'm also wondering if anything else could be affected by this. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org