I am trying to load a PMML file in a spark job. Instantiate it only once
and pass it to the executors. But I get a NotSerializableException for
org.xml.sax.helpers.LocatorImpl which is used inside jpmml.
I have this class Prediction.java:
public class Prediction implements Serializable {
private RegressionModelEvaluator rme;
public Prediction() throws Exception {
InputStream is = .....getResourceAsStream("model.pmml");
Source source = ImportFilter.apply(new InputSource(is));
PMML model = JAXBUtil.unmarshalPMML(source);
rme = new RegressionModelEvaluator(model);
is.close();
}
public Map predict(params) {
......
return rme.evaluate(params);
}
}
Now I want to instantiate it only once since the
"JAXBUtil.unmarshalPMML(source)" step takes about 2-3seconds. It works fine
I instantiate inside the map{}
So I do this in my driver:
Prediction prediction = new Prediction();
JavaRDD<String> result = rdd1
.cartesian(rdd2)
.map(t -> {...<need to use "prediction" here ...});
But when I do that, I get this error:
Caused by: java.io.NotSerializableException: org.xml.sax.helpers.LocatorImpl
Serialization stack:
- object not serializable (class: org.xml.sax.helpers.LocatorImpl,
value: org.xml.sax.helpers.LocatorImpl@6bce4140)
- field (class: org.dmg.pmml2_7.PMMLObject, name: locator, type:
interface org.xml.sax.Locator)
- object (class org.dmg.pmml2_7.PMML, org.dmg.pmml2_7.PMML@722531ab)
- field (class: org.jpmml.evaluator.PMMLManager, name: pmml, type:
class org.dmg.pmml2_7.PMML)
- object (class org.jpmml.evaluator.RegressionModelEvaluator,
org.jpmml.evaluator.RegressionModelEvaluator@4c24f3a2)
- field (class: com.x.ds.util.Predict, name: rme, type: class
org.jpmml.evaluator.RegressionModelEvaluator)
- object (class com.x.ds.util.Predict, com.x.ds.util.Predict@28f154cc)
- element of array (index: 1)
- array (class [Ljava.lang.Object;, size 2)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs,
type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda,
SerializedLambda[capturingClass=class runner.SparkSimRunner,
functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;,
implementation=invokeSpecial
runner/SparkSimRunner.lambda$runSimulations$ad93f7a8$1:(Lcom/x/ds/util/Predict;Lscala/Tuple2;)Ljava/util/List;,
instantiatedMethodType=(Lscala/Tuple2;)Ljava/util/List;, numCaptured=2])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class runner.SparkSimRunner$$Lambda$31/1961138094,
runner.SparkSimRunner$$Lambda$31/1961138094@782fd504)
- field (class:
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name:
fun$1, type: interface org.apache.spark.api.java.function.Function)
- object (class
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1,
<function1>)
at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
I am doing this right?
--
Thanks,
-Utkarsh