Hi,
I tried to replicate the example of joining DStream with lookup RDD from
http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation.
It works fine, but when I enable checkpointing for the StreamingContext
and let the application to recover from a previously created checkpoint,
I always get an exception during start and the whole application fails.
I tried various types of lookup RDD, but the result is the same.
Exception in the case of HBase RDD is:
Exception in thread "main" java.lang.NullPointerException
at
org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:119)
at
org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:109)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.Partitioner$$anonfun$2.apply(Partitioner.scala:58)
at org.apache.spark.Partitioner$$anonfun$2.apply(Partitioner.scala:58)
at scala.math.Ordering$$anon$5.compare(Ordering.scala:122)
at java.util.TimSort.countRunAndMakeAscending(TimSort.java:351)
at java.util.TimSort.sort(TimSort.java:216)
at java.util.Arrays.sort(Arrays.java:1438)
at scala.collection.SeqLike$class.sorted(SeqLike.scala:615)
at scala.collection.AbstractSeq.sorted(Seq.scala:40)
at scala.collection.SeqLike$class.sortBy(SeqLike.scala:594)
at scala.collection.AbstractSeq.sortBy(Seq.scala:40)
at
org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:58)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$join$2.apply(PairRDDFunctions.scala:651)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$join$2.apply(PairRDDFunctions.scala:651)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at
org.apache.spark.rdd.PairRDDFunctions.join(PairRDDFunctions.scala:650)
at org.apache.spark.api.java.JavaPairRDD.join(JavaPairRDD.scala:546)
I tried Spark 1.5.2 and 1.6.0 without success. The problem seems to be
that RDDs use some transient fields which are not restored when they are
recovered from checkpoint files. In case of some RDD implementations it
is SparkContext, but it can be also implementation specific
Configuration object, etc. I see in the sources that in the case of
DStream recovery, the DStreamGraph takes care of restoring
StreamingContext in all its DStream-s. But I haven't found any similar
mechanism for RDDs.
So my question is whether I am doing something wrong or this is a bug in
Spark? If later, is there some workaround except for implementing a
custom DStream which will return the same RDD every batch interval and
joining at DStream level instead of RDD level in transform?
I apologize if this has been discussed in the past and I missed it when
looking into archive.
Thanks,
Lubo
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org