Issue:
I have a random forest model that am trying to load during streaming using
following code. The code is working fine when I am running the code from
Eclipse but getting NPE when running the code using spark-submit.
JavaStreamingContext jssc = new JavaStreamingContext(jsc,
Durations.seconds(duration));
System.out.println("&&&&&&&&&&&&&&&&&&&&& trying to get the context
&&&&&&&&&&&&&&&&&&& " );
final RandomForestModel model =
RandomForestModel.load(jssc.sparkContext().sc(), MODEL_DIRECTORY);//line 116
causing the issue.
System.out.println("&&&&&&&&&&&&&&&&&&&&& model debug &&&&&&&&&&&&&&&&&&&&&&& "
+ model.toDebugString());
Exception Details:
INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 2.0,
whose tasks have all completed, from pool
Exception in thread "main" java.lang.NullPointerException
at
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$SplitData.toSplit(DecisionTreeModel.scala:144)
at
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$$anonfun$16.apply(DecisionTreeModel.scala:291)
at
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$$anonfun$16.apply(DecisionTreeModel.scala:291)
at scala.Option.map(Option.scala:145)
at
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$.constructNode(DecisionTreeModel.scala:291)
at
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$.constructNode(DecisionTreeModel.scala:286)
at
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$.constructNode(DecisionTreeModel.scala:287)
at
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$.constructNode(DecisionTreeModel.scala:286)
at
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$.constructTree(DecisionTreeModel.scala:268)
at
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$$anonfun$12.apply(DecisionTreeModel.scala:251)
at
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$$anonfun$12.apply(DecisionTreeModel.scala:250)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at
org.apache.spark.mllib.tree.model.DecisionTreeModel$SaveLoadV1_0$.constructTrees(DecisionTreeModel.scala:250)
at
org.apache.spark.mllib.tree.model.TreeEnsembleModel$SaveLoadV1_0$.loadTrees(treeEnsembleModels.scala:340)
at
org.apache.spark.mllib.tree.model.RandomForestModel$.load(treeEnsembleModels.scala:72)
at
org.apache.spark.mllib.tree.model.RandomForestModel.load(treeEnsembleModels.scala)
at
com.markmonitor.antifraud.ce.KafkaURLStreaming.main(KafkaURLStreaming.java:116)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Nov 19, 2015 1:10:56 PM WARNING: parquet.hadoop.ParquetRecordReader: Can not
initialize counter due to context is not a instance of TaskInputOutputContext,
but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
Spark Source Code:
case class PredictData(predict: Double, prob: Double) {
def toPredict: Predict = new Predict(predict, prob)
}
Thanks,
Rachana