Hi, I'm currently implementing an exactly once mechanism based on the following example:
https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala the pseudo code is as follow: dstream.transform (store offset in a variable on driver side ) dstream.map dstream.foreachRdd( action + save offset in db) this code doesn't work if the processing time is greater than batch interval (same problem as windowed ( https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/Windowed.scala ) Indeed, at each batch interval a new rdd is created and stacked, thus method transform is called several times and update the global variable and at last when we perform saving the offset range does not correspond to the one processed. 1) Do I need to work at the RDD level (inside a big forEachRDD like in the first example) instead of dstream ? 2) I can use a map[BatchTime, OffsetRange] as a global variable but in case of crash this map will not reflect anymore the generatedRdds (restored from checkpoint, RDD prepared but not executed) 2.1 ) Do I need to store this map elsewhere (cassandra) ? 2.2) Is there a way to retrieve offset range restored ? (transform method is not called anymore for the checkpointed rdd) 2.3) Is possible to store some context along the RDD to be serialized ? Lots of questions, let me kow if it's not clear !