I had an incorrect variable name in line 70 while sanitizing the code for this email. Here is the actual code:
45 val windowedEventCounts = events.reduceByKeyAndWindow(_ + _, _ - _, 30, 5, filterFunc = filterFunction) val usefulEvents = windowedEventCounts.filter { case (event, count) => { count > requestThreshold } } 70 usefulEvents.foreachRDD(filteredEvents => { ... }) Thanks,RK From: RK <prk...@yahoo.com> To: User <user@spark.apache.org> Sent: Tuesday, March 1, 2016 3:17 PM Subject: Checkpoint RDD ReliableCheckpointRDD at foreachRDD has different number of partitions from original RDD MapPartitionsRDD at reduceByKeyAndWindow Here is a code snippet in my spark job. I added the numbers at the start of code lines to show the relevant line numbers in exception. 45 val windowedEventCounts = events.reduceByKeyAndWindow(_ + _, _ - _, 30, 5, filterFunc = filterFunction) val usefulEvents = windowedEventCounts.filter { case (event, count) => { count > requestThreshold } } 70 usefulEvents.foreachRDD(events => { ... }) Every once in a while, I see this error in my log files. org.apache.spark.SparkException: Checkpoint RDD ReliableCheckpointRDD[28052] at foreachRDD at EventProcessor.scala:70(103) has different number of partitions from original RDD MapPartitionsRDD[28050] at reduceByKeyAndWindow at EventProcessor.scala:45(108) Has anyone seen this issue and under what circumstances will it exception occur? Thanks,RK