Hi,

I am using spark streaming to join every RDD of a DStream to a stand alone RDD 
to generate a new DStream as followed:

def joinWithBatchEvent(contentFeature: RDD[(String, String)],
                       batchEvent: DStream[((String, String), (Long, Double, 
Double))]) = {
  batchEvent.map(event => {
    (event._1._2, (event._1._1, event._2._1, event._2._2, event._2._3))
  }).transform(eventRDD => {
    eventRDD.leftOuterJoin(contentFeature).map(result =>
      (result._2._1._1, (result._1, result._2._1._2, result._2._1._3, 
result._2._1._4, result._2._2))
    )
  })
}

It works well when it start from a new StreamContext.
But if the StreamContext is restored from checkpoint, there will be an 
exception as followed and the Graph can not be setup.
Do you know how to solve this problem? Thanks very much!

5/09/07 14:07:18 INFO spark.SparkContext: Starting job: saveAsTextFiles at 
CFBModel.scala:49
15/09/07 14:07:18 INFO scheduler.DAGScheduler: Registering RDD 12 (repartition 
at EventComponent.scala:64)
15/09/07 14:07:18 INFO scheduler.DAGScheduler: Registering RDD 17 (flatMap at 
CFBModel.scala:25)
15/09/07 14:07:18 INFO scheduler.DAGScheduler: Registering RDD 20 (map at 
ContentFeature.scala:100)
15/09/07 14:07:18 WARN scheduler.DAGScheduler: Creating new stage failed due to 
exception - job: 1
java.lang.IllegalArgumentException: Flat hash tables cannot contain null 
elements.
        at 
scala.collection.mutable.FlatHashTable$HashUtils$class.elemHashCode(FlatHashTable.scala:390)
        at scala.collection.mutable.HashSet.elemHashCode(HashSet.scala:41)
        at 
scala.collection.mutable.FlatHashTable$class.findEntryImpl(FlatHashTable.scala:123)
        at 
scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119)
        at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41)
        at scala.collection.mutable.HashSet.contains(HashSet.scala:58)
        at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43)
        at scala.collection.mutable.AbstractSet.apply(Set.scala:45)
        at 
org.apache.spark.scheduler.DAGScheduler.visit$2(DAGScheduler.scala:336)
        at 
org.apache.spark.scheduler.DAGScheduler.getAncestorShuffleDependencies(DAGScheduler.scala:355)
        at 
org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:317)
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:218)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:301)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:298)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at 
org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:298)
        at 
org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:310)
        at 
org.apache.spark.scheduler.DAGScheduler.newStage(DAGScheduler.scala:244)
        at 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:731)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/09/07 14:07:18 INFO scheduler.DAGScheduler: Job 1 failed: saveAsTextFiles at 
CFBModel.scala:49, took 0.016406 s
15/09/07 14:07:18 ERROR scheduler.JobScheduler: Error running job streaming job 
1441605900000 ms.0
java.lang.IllegalArgumentException: Flat hash tables cannot contain null 
elements.
        at 
scala.collection.mutable.FlatHashTable$HashUtils$class.elemHashCode(FlatHashTable.scala:390)
        at scala.collection.mutable.HashSet.elemHashCode(HashSet.scala:41)
        at 
scala.collection.mutable.FlatHashTable$class.findEntryImpl(FlatHashTable.scala:123)
        at 
scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119)
        at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41)
        at scala.collection.mutable.HashSet.contains(HashSet.scala:58)
        at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43)
        at scala.collection.mutable.AbstractSet.apply(Set.scala:45)
        at 
org.apache.spark.scheduler.DAGScheduler.visit$2(DAGScheduler.scala:336)
        at 
org.apache.spark.scheduler.DAGScheduler.getAncestorShuffleDependencies(DAGScheduler.scala:355)
        at 
org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:317)
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:218)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:301)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:298)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at 
org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:298)
        at 
org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:310)
        at 
org.apache.spark.scheduler.DAGScheduler.newStage(DAGScheduler.scala:244)
        at 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:731)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Exception in thread "main" java.lang.IllegalArgumentException: Flat hash tables 
cannot contain null elements.
        at 
scala.collection.mutable.FlatHashTable$HashUtils$class.elemHashCode(FlatHashTable.scala:390)
        at scala.collection.mutable.HashSet.elemHashCode(HashSet.scala:41)
        at 
scala.collection.mutable.FlatHashTable$class.findEntryImpl(FlatHashTable.scala:123)
        at 
scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119)
        at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41)
        at scala.collection.mutable.HashSet.contains(HashSet.scala:58)
        at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43)
        at scala.collection.mutable.AbstractSet.apply(Set.scala:45)
        at 
org.apache.spark.scheduler.DAGScheduler.visit$2(DAGScheduler.scala:336)
        at 
org.apache.spark.scheduler.DAGScheduler.getAncestorShuffleDependencies(DAGScheduler.scala:355)
        at 
org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:317)
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:218)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:301)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:298)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at 
org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:298)
        at 
org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:310)
        at 
org.apache.spark.scheduler.DAGScheduler.newStage(DAGScheduler.scala:244)
        at 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:731)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


Thanks,
Hanbin Zheng

Reply via email to