Hi,

I'm currently implementing an exactly once mechanism based on the following
example:

https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala

the pseudo code is as follow:

dstream.transform (store offset in a variable on driver side )
dstream.map
dstream.foreachRdd( action + save offset in db)

this code doesn't work if the processing time is greater than batch
interval (same problem as windowed (
https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/Windowed.scala
)

Indeed, at each batch interval a new rdd is created and stacked, thus
method transform is called several times and update the global variable and
at last when we perform saving the offset range does not correspond to the
one processed.

1) Do I need to work at the RDD level (inside a big forEachRDD like in the
first example) instead of dstream ?

2) I can use a map[BatchTime, OffsetRange] as a global variable but in case
of crash this map will not reflect anymore the generatedRdds (restored from
checkpoint, RDD prepared but not executed)
  2.1 ) Do I need to store this map elsewhere (cassandra) ?
  2.2)  Is there a way to retrieve offset range restored ? (transform
method is not called anymore for the checkpointed rdd)
  2.3) Is possible to store some context along the RDD to be serialized ?

Lots of questions, let me kow if it's not clear !

Reply via email to