Re: Checkpoint of DStream joined with RDD
Hi Ted, all, do you have any advice regarding my questions in my initial email? I tried Spark 1.5.2 and 1.6.0 without success. The problem seems to be that RDDs use some transient fields which are not restored when they are recovered from checkpoint files. In case of some RDD implementations it is SparkContext, but it can be also implementation specific Configuration object, etc. I see in the sources that in the case of DStream recovery, the DStreamGraph takes care of restoring StreamingContext in all its DStream-s. But I haven't found any similar mechanism for RDDs. So my question is whether I am doing something wrong or this is a bug in Spark? If later, is there some workaround except for implementing a custom DStream which will return the same RDD every batch interval and joining at DStream level instead of RDD level in transform? Thanks, Lubo On 18.3.2016 18:36, Ted Yu wrote: This is the line where NPE came from: if (conf.get(SCAN) != null) { So Configuration instance was null. On Fri, Mar 18, 2016 at 9:58 AM, Lubomir Nerad mailto:lubomir.ne...@oracle.com>> wrote: The HBase version is 1.0.1.1. Thanks, Lubo On 18.3.2016 17:29, Ted Yu wrote: I looked at the places in SparkContext.scala where NewHadoopRDD is constrcuted. It seems the Configuration object shouldn't be null. Which hbase release are you using (so that I can see which line the NPE came from) ? Thanks On Fri, Mar 18, 2016 at 8:05 AM, Lubomir Nerad mailto:lubomir.ne...@oracle.com>> wrote: Hi, I tried to replicate the example of joining DStream with lookup RDD from http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation. It works fine, but when I enable checkpointing for the StreamingContext and let the application to recover from a previously created checkpoint, I always get an exception during start and the whole application fails. I tried various types of lookup RDD, but the result is the same. Exception in the case of HBase RDD is: Exception in thread "main" java.lang.NullPointerException at org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:119) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:109) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.Partitioner$$anonfun$2.apply(Partitioner.scala:58) at org.apache.spark.Partitioner$$anonfun$2.apply(Partitioner.scala:58) at scala.math.Ordering$$anon$5.compare(Ordering.scala:122) at java.util.TimSort.countRunAndMakeAscending(TimSort.java:351) at java.util.TimSort.sort(TimSort.java:216) at java.util.Arrays.sort(Arrays.java:1438) at scala.collection.SeqLike$class.sorted(SeqLike.scala:615) at scala.collection.AbstractSeq.sorted(Seq.scala:40) at scala.collection.SeqLike$class.sortBy(SeqLike.scala:594) at scala.collection.AbstractSeq.sortBy(Seq.scala:40) at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:58) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$join$2.apply(PairRDDFunctions.scala:651) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$join$2.apply(PairRDDFunctions.scala:651) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.PairRDDFunctions.join(PairRDDFunctions.scala:650) at org.apache
Re: Checkpoint of DStream joined with RDD
The HBase version is 1.0.1.1. Thanks, Lubo On 18.3.2016 17:29, Ted Yu wrote: I looked at the places in SparkContext.scala where NewHadoopRDD is constrcuted. It seems the Configuration object shouldn't be null. Which hbase release are you using (so that I can see which line the NPE came from) ? Thanks On Fri, Mar 18, 2016 at 8:05 AM, Lubomir Nerad mailto:lubomir.ne...@oracle.com>> wrote: Hi, I tried to replicate the example of joining DStream with lookup RDD from http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation. It works fine, but when I enable checkpointing for the StreamingContext and let the application to recover from a previously created checkpoint, I always get an exception during start and the whole application fails. I tried various types of lookup RDD, but the result is the same. Exception in the case of HBase RDD is: Exception in thread "main" java.lang.NullPointerException at org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:119) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:109) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.Partitioner$$anonfun$2.apply(Partitioner.scala:58) at org.apache.spark.Partitioner$$anonfun$2.apply(Partitioner.scala:58) at scala.math.Ordering$$anon$5.compare(Ordering.scala:122) at java.util.TimSort.countRunAndMakeAscending(TimSort.java:351) at java.util.TimSort.sort(TimSort.java:216) at java.util.Arrays.sort(Arrays.java:1438) at scala.collection.SeqLike$class.sorted(SeqLike.scala:615) at scala.collection.AbstractSeq.sorted(Seq.scala:40) at scala.collection.SeqLike$class.sortBy(SeqLike.scala:594) at scala.collection.AbstractSeq.sortBy(Seq.scala:40) at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:58) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$join$2.apply(PairRDDFunctions.scala:651) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$join$2.apply(PairRDDFunctions.scala:651) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.PairRDDFunctions.join(PairRDDFunctions.scala:650) at org.apache.spark.api.java.JavaPairRDD.join(JavaPairRDD.scala:546) I tried Spark 1.5.2 and 1.6.0 without success. The problem seems to be that RDDs use some transient fields which are not restored when they are recovered from checkpoint files. In case of some RDD implementations it is SparkContext, but it can be also implementation specific Configuration object, etc. I see in the sources that in the case of DStream recovery, the DStreamGraph takes care of restoring StreamingContext in all its DStream-s. But I haven't found any similar mechanism for RDDs. So my question is whether I am doing something wrong or this is a bug in Spark? If later, is there some workaround except for implementing a custom DStream which will return the same RDD every batch interval and joining at DStream level instead of RDD level in transform? I apologize if this has been discussed in the past and I missed it when looking into archive. Thanks, Lubo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org <mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org <mailto:user-h...@spark.apache.org>
Checkpoint of DStream joined with RDD
Hi, I tried to replicate the example of joining DStream with lookup RDD from http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation. It works fine, but when I enable checkpointing for the StreamingContext and let the application to recover from a previously created checkpoint, I always get an exception during start and the whole application fails. I tried various types of lookup RDD, but the result is the same. Exception in the case of HBase RDD is: Exception in thread "main" java.lang.NullPointerException at org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:119) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:109) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.Partitioner$$anonfun$2.apply(Partitioner.scala:58) at org.apache.spark.Partitioner$$anonfun$2.apply(Partitioner.scala:58) at scala.math.Ordering$$anon$5.compare(Ordering.scala:122) at java.util.TimSort.countRunAndMakeAscending(TimSort.java:351) at java.util.TimSort.sort(TimSort.java:216) at java.util.Arrays.sort(Arrays.java:1438) at scala.collection.SeqLike$class.sorted(SeqLike.scala:615) at scala.collection.AbstractSeq.sorted(Seq.scala:40) at scala.collection.SeqLike$class.sortBy(SeqLike.scala:594) at scala.collection.AbstractSeq.sortBy(Seq.scala:40) at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:58) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$join$2.apply(PairRDDFunctions.scala:651) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$join$2.apply(PairRDDFunctions.scala:651) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.PairRDDFunctions.join(PairRDDFunctions.scala:650) at org.apache.spark.api.java.JavaPairRDD.join(JavaPairRDD.scala:546) I tried Spark 1.5.2 and 1.6.0 without success. The problem seems to be that RDDs use some transient fields which are not restored when they are recovered from checkpoint files. In case of some RDD implementations it is SparkContext, but it can be also implementation specific Configuration object, etc. I see in the sources that in the case of DStream recovery, the DStreamGraph takes care of restoring StreamingContext in all its DStream-s. But I haven't found any similar mechanism for RDDs. So my question is whether I am doing something wrong or this is a bug in Spark? If later, is there some workaround except for implementing a custom DStream which will return the same RDD every batch interval and joining at DStream level instead of RDD level in transform? I apologize if this has been discussed in the past and I missed it when looking into archive. Thanks, Lubo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org