I am trying to load a PMML file in a spark job. Instantiate it only once
and pass it to the executors. But I get a NotSerializableException for
org.xml.sax.helpers.LocatorImpl which is used inside jpmml.

I have this class Prediction.java:
public class Prediction implements Serializable {
    private RegressionModelEvaluator rme;

    public Prediction() throws Exception {
        InputStream is = .....getResourceAsStream("model.pmml");
        Source source = ImportFilter.apply(new InputSource(is));
        PMML model = JAXBUtil.unmarshalPMML(source);
        rme = new RegressionModelEvaluator(model);
        is.close();
    }

    public Map predict(params) {
       ......
        return rme.evaluate(params);
    }
}


Now I want to instantiate it only once since the
"JAXBUtil.unmarshalPMML(source)" step takes about 2-3seconds. It works fine
I instantiate inside the map{}

So I do this in my driver:

Prediction prediction = new Prediction();
JavaRDD<String> result = rdd1
                .cartesian(rdd2)
                .map(t -> {...<need to use "prediction" here ...});

But when I do that, I get this error:
Caused by: java.io.NotSerializableException: org.xml.sax.helpers.LocatorImpl
Serialization stack:
    - object not serializable (class: org.xml.sax.helpers.LocatorImpl,
value: org.xml.sax.helpers.LocatorImpl@6bce4140)
    - field (class: org.dmg.pmml2_7.PMMLObject, name: locator, type:
interface org.xml.sax.Locator)
    - object (class org.dmg.pmml2_7.PMML, org.dmg.pmml2_7.PMML@722531ab)
    - field (class: org.jpmml.evaluator.PMMLManager, name: pmml, type:
class org.dmg.pmml2_7.PMML)
    - object (class org.jpmml.evaluator.RegressionModelEvaluator,
org.jpmml.evaluator.RegressionModelEvaluator@4c24f3a2)
    - field (class: com.x.ds.util.Predict, name: rme, type: class
org.jpmml.evaluator.RegressionModelEvaluator)
    - object (class com.x.ds.util.Predict, com.x.ds.util.Predict@28f154cc)
    - element of array (index: 1)
    - array (class [Ljava.lang.Object;, size 2)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs,
type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda,
SerializedLambda[capturingClass=class runner.SparkSimRunner,
functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;,
implementation=invokeSpecial
runner/SparkSimRunner.lambda$runSimulations$ad93f7a8$1:(Lcom/x/ds/util/Predict;Lscala/Tuple2;)Ljava/util/List;,
instantiatedMethodType=(Lscala/Tuple2;)Ljava/util/List;, numCaptured=2])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class runner.SparkSimRunner$$Lambda$31/1961138094,
runner.SparkSimRunner$$Lambda$31/1961138094@782fd504)
    - field (class:
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name:
fun$1, type: interface org.apache.spark.api.java.function.Function)
    - object (class
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1,
<function1>)
    at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
    at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)

I am doing this right?

-- 
Thanks,
-Utkarsh

Reply via email to