I am writing a spark application which uses java and spring boot to process rows. For every row it performs some logic and saves data into the database. <https://stackoverflow.com/posts/76058897/timeline>
The logic is performed using some services defined in my application and some external libraries. While running my service I am getting NotSerializableException. My spark job and service is : // It is a temperorary job, which would be removed after testingpublic class HelloWorld implements Runnable, Serializable { @Autowired GraphRequestProcessor graphProcessor; @Override public void run() { String sparkAppName = "hello-job"; JavaSparkContext sparkCtx = new JavaSparkContext(getSparkConf(sparkAppName)); JavaRDD<Integer> rdd = sparkCtx.parallelize(Arrays.asList(1, 2, 3, 4)); rdd.map(new Function<Integer, Object>() { @Override public Object call(Integer integer) throws Exception { System.out.println(integer); return integer; } }); System.out.println("Done"); } public static SparkConf getSparkConf(String appName) { Map<String, String> env = System.getenv(); String master = env.get("SPARK_MASTER"); SparkConf sparkConfig = new SparkConf().setAppName(appName).set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrator", "com.example.sparktest.AppKryoRegistrar"); if (master != null) { sparkConfig.setMaster(master); } else { sparkConfig.setMaster("local[*]"); } return sparkConfig; } } The graphRequestProcessor is actually used in the task. I am skipping calling it here, to keep the example simple. I have registered the GraphRequestProcessor and GraphRequestProcessorImpl in the kryo registrar. Since I am using kryo, I think the serialization would be handled by kryo itself. After running my spark application, I am getting the error Serialization stack: - object not serializable (class: com.example.app.dolos.impl.GraphRequestProcessorImpl$$EnhancerBySpringCGLIB$$6fc280f7, value: com.example.app.dolos.impl.GraphRequestProcessorImpl@4a225014) - field (class: com.example.app.dolos.job.HelloWorld, name: graphRequestProcessor, type: interface com.booking.app.dolos.service.GraphRequestProcessor) - object (class com.example.app.dolos.job.HelloWorld, com.example.app.dolos.job.HelloWorld@65448932) - field (class: com.example.app.dolos.job.HelloWorld$1, name: this$0, type: class com.example.app.dolos.job.HelloWorld) - object (class com.example.app.dolos.job.HelloWorld$1, com.booking.app.dolos.job.HelloWorld$1@1a2b23f2) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.api.java.JavaPairRDD$, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/api/java/JavaPairRDD$.$anonfun$toScalaFunction$1:(Lorg/apache/spark/api/java/function/Function;Ljava/lang/Object;)Ljava/lang/Object;, instantiatedMethodType=(Ljava/lang/Object;)Ljava/lang/Object;, numCaptured=1]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class org.apache.spark.api.java.JavaPairRDD$$$Lambda$1942/0x0000000800d1e840, org.apache.spark.api.java.JavaPairRDD$$$Lambda$1942/0x0000000800d1e840@1666e021) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413) ... 13 more I read the answer <https://stackoverflow.com/questions/40259196/understanding-sparks-closures-and-their-serialization> here and understood that Java might not be able to serialise my class. But how do I solve the above issue without making all my classes extend Serializable ? How do we manage serialisation for big java projects ? Is there a way to use Dependency injection frameworks at the spark executors ?