The difference is really that Java and Scala work differently. In
Java, your anonymous subclass of Ops defined in (a method of)
AbstractTest captures a reference to it. That much is 'correct' in
that it's how Java is supposed to work, and AbstractTest is indeed not
serializable since you didn't declare it so.

However the reference isn't actually used and Spark tries to remove
references where possible for you. It can't always do it IIRC (e.g.
nulling some fields would mutate objects in unpredictable ways) and I
think that's what happens here.

In the first place you want to avoid having this hidden reference by
making, for instance, a static inner class or something. There's
probably a lot of ways to rewrite this.

Scala just works differently in the code that's generated.

On Mon, Aug 10, 2015 at 4:32 PM, Hao Ren <inv...@gmail.com> wrote:
> Consider two code snippets as the following:
>
> // Java code:
>
> abstract class Ops implements Serializable{
>
>   public abstract Integer apply(Integer x);
>
>   public void doSomething(JavaRDD<Integer> rdd) {
>     rdd.map(x -> x + apply(x))
>        .collect()
>        .forEach(System.out::println);
>   }
> }
>
> public class AbstractTest {
>
>   public static void main(String[] args) {
>     new AbstractTest().job();
>   }
>
>   public void job() {
>     SparkConf conf = new SparkConf()
>       .setAppName(AbstractTest.class.getName())
>       .setMaster("local[*]");
>     JavaSparkContext jsc = new JavaSparkContext(conf);
>
>     JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6));
>
>     Ops ops = new Ops() {
>       @Override
>       public Integer apply(Integer x) {
>         return x + 1;
>       }
>     };
>
>     ops.doSomething(rdd);
>
>   }
> }
>
>
> // Scala code:
>
> abstract class Ops extends Serializable {
>
>   def apply(x: Int): Int
>
>   def doSomething(rdd: RDD[Int]): Unit = {
>     rdd.map(x => apply(x)).collect foreach println
>   }
> }
>
> class AbstractTest {
>   def job(): Unit = {
>     val conf = new SparkConf()
>       .setAppName(this.getClass.getName)
>       .setMaster("local[*]")
>     val sc = new SparkContext(conf)
>
>     val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7))
>
>     val ops = new Ops() {
>       override def apply(x: Int): Int = x + 1
>     }
>
>     ops.doSomething(rdd)
>   }
> }
>
> object AbstractTest {
>   def main(args: Array[String]): Unit = {
>     new AbstractTest().job()
>   }
> }
>
> They are actually very similar, just doing the same thing, whereas the scala
> one works fine, and the java one does not. Task not serializable exception
> is encountered when the java code is executed, here is the state trace:
>
> Exception in thread "main" org.apache.spark.SparkException: Task not
> serializable
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
> at
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
> at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:294)
> at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:293)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
> at org.apache.spark.rdd.RDD.map(RDD.scala:293)
> at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:90)
> at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:47)
> at fr.leboncoin.etl.jobs.test.Ops.doSomething(AbstractTest.java:24)
> at fr.leboncoin.etl.jobs.test.AbstractTest.job(AbstractTest.java:52)
> at fr.leboncoin.etl.jobs.test.AbstractTest.main(AbstractTest.java:33)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> Caused by: java.io.NotSerializableException:
> fr.leboncoin.etl.jobs.test.AbstractTest
>
> Serialization stack:
> - object not serializable (class: test.AbstractTest, value:
> test.AbstractTest@61d84e08)
> - field (class: test.AbstractTest$1, name: this$0, type: class
> test.AbstractTest)
> - object (class test.AbstractTest$1, test.AbstractTest$1@476e8796)
> - element of array (index: 0)
> - array (class [Ljava.lang.Object;, size 1)
> - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type:
> class [Ljava.lang.Object;)
> - object (class java.lang.invoke.SerializedLambda,
> SerializedLambda[capturingClass=class fr.leboncoin.etl.jobs.test.Ops,
> functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;,
> implementation=invokeSpecial
> fr/leboncoin/etl/jobs/test/Ops.lambda$doSomething$6d6228b6$1:(Ljava/lang/Integer;)Ljava/lang/Integer;,
> instantiatedMethodType=(Ljava/lang/Integer;)Ljava/lang/Integer;,
> numCaptured=1])
> - writeReplace data (class: java.lang.invoke.SerializedLambda)
> - object (class fr.leboncoin.etl.jobs.test.Ops$$Lambda$8/476868388,
> fr.leboncoin.etl.jobs.test.Ops$$Lambda$8/476868388@65753040)
> - field (class:
> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name:
> fun$1, type: interface org.apache.spark.api.java.function.Function)
> - object (class
> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1,
> <function1>)
> at
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
> ... 19 more
>
> It seems that, in order to serialize the anonymous class
> `test.AbstractTest$1` (ops), it serialize `test.AbstractTest` first, which
> should not be serialized.
>
> The difference is on the type of RDD. In java code, JavaRDD is used. I am
> wondering whether the ClosureCleaner does not work well with JavaRDD.
> According to spark code, JavaRDD uses scala API apparently:
>
> def map[R](f: JFunction[T, R]): JavaRDD[R] =
>   new JavaRDD(rdd.map(f)(fakeClassTag))(fakeClassTag)
>
> You can reproduce this issue easily, any help is appreciated.
>
> --
> Hao Ren
>
> Data Engineer @ leboncoin
>
> Paris, France

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to