I had an incorrect variable name in line 70 while sanitizing the code for this 
email.
Here is the actual code:

45    val windowedEventCounts = events.reduceByKeyAndWindow(_ + _, _ - _, 30, 
5, filterFunc = filterFunction)
        val usefulEvents = windowedEventCounts.filter { case (event, count) => 
{ count > requestThreshold } }
70    usefulEvents.foreachRDD(filteredEvents => { ... })


Thanks,RK
      From: RK <prk...@yahoo.com>
 To: User <user@spark.apache.org> 
 Sent: Tuesday, March 1, 2016 3:17 PM
 Subject: Checkpoint RDD ReliableCheckpointRDD at foreachRDD has different 
number of partitions from original RDD MapPartitionsRDD at reduceByKeyAndWindow
   
Here is a code snippet in my spark job. I added the numbers at the start of 
code lines to show the relevant line numbers in exception.


45    val windowedEventCounts = events.reduceByKeyAndWindow(_ + _, _ - _, 30, 
5, filterFunc = filterFunction)
        val usefulEvents = windowedEventCounts.filter { case (event, count) => 
{ count > requestThreshold } }
70    usefulEvents.foreachRDD(events => { ... })


Every once in a while, I see this error in my log files.

org.apache.spark.SparkException: Checkpoint RDD ReliableCheckpointRDD[28052] at 
foreachRDD at EventProcessor.scala:70(103) has different number of partitions 
from original RDD MapPartitionsRDD[28050] at reduceByKeyAndWindow at 
EventProcessor.scala:45(108)


Has anyone seen this issue and under what circumstances will it exception occur?
Thanks,RK

  

Reply via email to