Consider two code snippets as the following:

// Java code:

abstract class Ops implements Serializable{

  public abstract Integer apply(Integer x);

  public void doSomething(JavaRDD<Integer> rdd) {
    rdd.map(x -> x + apply(x))
       .collect()
       .forEach(System.out::println);
  }
}

public class AbstractTest {

  public static void main(String[] args) {
    new AbstractTest().job();
  }

  public void job() {
    SparkConf conf = new SparkConf()
      .setAppName(AbstractTest.class.getName())
      .setMaster("local[*]");
    JavaSparkContext jsc = new JavaSparkContext(conf);

    JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6));

    Ops ops = new Ops() {
      @Override
      public Integer apply(Integer x) {
        return x + 1;
      }
    };

    ops.doSomething(rdd);

  }
}


// Scala code:

abstract class Ops extends Serializable {

  def apply(x: Int): Int

  def doSomething(rdd: RDD[Int]): Unit = {
    rdd.map(x => apply(x)).collect foreach println
  }
}

class AbstractTest {
  def job(): Unit = {
    val conf = new SparkConf()
      .setAppName(this.getClass.getName)
      .setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7))

    val ops = new Ops() {
      override def apply(x: Int): Int = x + 1
    }

    ops.doSomething(rdd)
  }
}

object AbstractTest {
def main(args: Array[String]): Unit = {
new AbstractTest().job()
}
}

They are actually very similar, just doing the same thing, whereas the
scala one works fine, and the java one does not. Task not serializable
exception is encountered when the java code is executed, here is the state
trace:

Exception in thread "main" org.apache.spark.SparkException: Task not
serializable
at org.apache.spark.util.*ClosureCleaner*
$.ensureSerializable(ClosureCleaner.scala:315)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:294)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:293)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.map(RDD.scala:293)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:90)
at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:47)
at fr.leboncoin.etl.jobs.test.Ops.doSomething(AbstractTest.java:24)
at fr.leboncoin.etl.jobs.test.AbstractTest.job(AbstractTest.java:52)
at fr.leboncoin.etl.jobs.test.AbstractTest.main(AbstractTest.java:33)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.io.NotSerializableException:
fr.leboncoin.etl.jobs.test.AbstractTest

Serialization stack:
*- object not serializable (class: test.AbstractTest, value:
test.AbstractTest@61d84e08)*
* - field (class: test.AbstractTest$1, name: this$0, type: class
test.AbstractTest)*
* - object (class test.AbstractTest$1, test.AbstractTest$1@476e8796)*
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs,
type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda,
SerializedLambda[capturingClass=class fr.leboncoin.etl.jobs.test.Ops,
functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;,
implementation=invokeSpecial
fr/leboncoin/etl/jobs/test/Ops.lambda$doSomething$6d6228b6$1:(Ljava/lang/Integer;)Ljava/lang/Integer;,
instantiatedMethodType=(Ljava/lang/Integer;)Ljava/lang/Integer;,
numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class fr.leboncoin.etl.jobs.test.Ops$$Lambda$8/476868388,
fr.leboncoin.etl.jobs.test.Ops$$Lambda$8/476868388@65753040)
- field (class:
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name:
fun$1, type: interface org.apache.spark.api.java.function.Function)
- object (class
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1,
<function1>)
at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
... 19 more

It seems that, in order to serialize the anonymous class `
*test.AbstractTest$1*` (ops), it serialize `*test.AbstractTest*` first,
which should not be serialized.

The difference is on the type of RDD. In java code, JavaRDD is used. I am
wondering whether the ClosureCleaner does not work well with JavaRDD.
According to spark code, JavaRDD uses scala API apparently:

def map[R](f: JFunction[T, R]): JavaRDD[R] =
  new JavaRDD(rdd.map(f)(fakeClassTag))(fakeClassTag)

You can reproduce this issue easily, any help is appreciated.

-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France

Reply via email to