Dependency injection for spark executors

2023-04-20 Thread Deepak Patankar
I am writing a spark application which uses java and spring boot to process
rows. For every row it performs some logic and saves data into the database.


The logic is performed using some services defined in my application and
some external libraries.

While running my service I am getting NotSerializableException.

My spark job and service is :

// It is a temperorary job, which would be removed after testingpublic
class HelloWorld implements Runnable, Serializable {

@Autowired
GraphRequestProcessor graphProcessor;

@Override
public void run() {
String sparkAppName = "hello-job";
JavaSparkContext sparkCtx = new
JavaSparkContext(getSparkConf(sparkAppName));
JavaRDD rdd = sparkCtx.parallelize(Arrays.asList(1, 2, 3, 4));
rdd.map(new Function() {
@Override
public Object call(Integer integer) throws Exception {
System.out.println(integer);
return integer;
}
});
System.out.println("Done");
}

public static SparkConf getSparkConf(String appName) {
Map env = System.getenv();
String master = env.get("SPARK_MASTER");
SparkConf sparkConfig = new
SparkConf().setAppName(appName).set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator",
"com.example.sparktest.AppKryoRegistrar");
if (master != null) {
sparkConfig.setMaster(master);
} else {
sparkConfig.setMaster("local[*]");
}
return sparkConfig;
}

}

The graphRequestProcessor is actually used in the task. I am skipping
calling it here, to keep the example simple. I have registered the
GraphRequestProcessor and GraphRequestProcessorImpl in the kryo registrar.
Since I am using kryo, I think the serialization would be handled by kryo
itself.

After running my spark application, I am getting the error

Serialization stack:
- object not serializable (class:
com.example.app.dolos.impl.GraphRequestProcessorImpl$$EnhancerBySpringCGLIB$$6fc280f7,
value: com.example.app.dolos.impl.GraphRequestProcessorImpl@4a225014)
- field (class: com.example.app.dolos.job.HelloWorld, name:
graphRequestProcessor, type: interface
com.booking.app.dolos.service.GraphRequestProcessor)
- object (class com.example.app.dolos.job.HelloWorld,
com.example.app.dolos.job.HelloWorld@65448932)
- field (class: com.example.app.dolos.job.HelloWorld$1, name:
this$0, type: class com.example.app.dolos.job.HelloWorld)
- object (class com.example.app.dolos.job.HelloWorld$1,
com.booking.app.dolos.job.HelloWorld$1@1a2b23f2)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name:
capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda,
SerializedLambda[capturingClass=class
org.apache.spark.api.java.JavaPairRDD$,
functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;,
implementation=invokeStatic
org/apache/spark/api/java/JavaPairRDD$.$anonfun$toScalaFunction$1:(Lorg/apache/spark/api/java/function/Function;Ljava/lang/Object;)Ljava/lang/Object;,
instantiatedMethodType=(Ljava/lang/Object;)Ljava/lang/Object;,
numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class
org.apache.spark.api.java.JavaPairRDD$$$Lambda$1942/0x000800d1e840,
org.apache.spark.api.java.JavaPairRDD$$$Lambda$1942/0x000800d1e840@1666e021)
at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
... 13 more

I read the answer

here
and understood that Java might not be able to serialise my class. But how
do I solve the above issue without making all my classes extend Serializable ?
How do we manage serialisation for big java projects ? Is there a way to
use Dependency injection frameworks at the spark executors ?


Map Partition is called Multiple Times

2023-07-25 Thread Deepak Patankar
I am trying to run a spark job which performs some database operations
and saves passed records in one table and the failed ones in another.

Here is the code for the same:

```
log.info("Starting the spark job {}");

String sparkAppName = generateSparkAppName("reading-graph");
SparkConf sparkConf = getSparkConf(sparkAppName);
SparkSession sparkSession =
SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate();

LocalDate startDate = LocalDate.of(2022, 1, 1);
final LongAccumulator accumulator =
sparkSession.sparkContext().longAccumulator("Successful Processed
Count");
LocalDate endDate = startDate.plusDays(90);

Dataset rows = sparkSession.table("db.graph_details")
.select(new Column("_mm_dd"), new Column("timestamp"), new
Column("email_id")))
.filter("_mm_dd >= '" + startDate + "' AND _mm_dd < '" + endDate);

Dataset> tuple2Dataset =
rows.mapPartitions(new GetPaymentsGraphFeatures(accumulator),
Encoders.tuple(Encoders.BOOLEAN(), Encoders.STRING()));
tuple2Dataset.persist();

Dataset successfulRows =
tuple2Dataset.filter((FilterFunction>)
booleanRowTuple2 -> booleanRowTuple2._1).map(
(MapFunction, Row>) booleanRowTuple2 ->
mapToRow(booleanRowTuple2._2), RowEncoder.apply(getSchema()));

Dataset failedRows =
tuple2Dataset.filter((FilterFunction>)
booleanRowTuple2 -> !booleanRowTuple2._1).map(
(MapFunction, Row>) booleanRowTuple2 ->
mapToRow(booleanRowTuple2._2), RowEncoder.apply(getFailureSchema()));

successfulRows.write().mode("overwrite").saveAsTable("db.deepak_jan_result");
failedRows.write().mode("overwrite").saveAsTable("db.deepak_jan_result_failures");
tuple2Dataset.unpersist();
log.info("Completed the spark job");
```

The spark job is running the mapPartitions twice, once to get the
successfulRows and once to get the failedRows. But ideally the
mapPartitions should be run once right ?

My job to process the successful action takes more than 1 hour. Can
that be causing this behaviour ?

How can I ensure that the map partitions run only once ?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org