I'm doing a POC to test recovery with spark streaming from Kafka.  I'm
using the technique for storing the offsets in Kafka, as described at:

https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself

I.e., grabbing the list of offsets before I start processing a batch of
RDD's, and then committing them when I'm done.  The process pretty much
works:  when I shut down my streaming process and then start it up again,
it pretty much picks up where it left off.

However, it looks like there's some overlap happening, where a few of the
messages are being processed by both the old and the new streaming job
runs.  I.e., see the following log messages:

End of old job run:
17/04/27 20:04:40 INFO KafkaRecoveryTester$: Committing rdd offsets:
OffsetRange(topic: 'Datalake', partition: 0, range: [250959 ->
250962]);OffsetRange(topic: 'Datalake', partition: 1, range: [15 ->
18]);OffsetRange(topic: 'Datalake', partition: 3, range: [15 ->
18]);OffsetRange(topic: 'Datalake', partition: 2, range: [14 -> 17])

Start of new job run:
17/04/27 20:56:50 INFO KafkaRecoveryTester$: Processing rdd with offsets:
OffsetRange(topic: 'Datalake', partition: 3, range: [15 ->
100]);OffsetRange(topic: 'Datalake', partition: 1, range: [15 ->
100]);OffsetRange(topic: 'Datalake', partition: 2, range: [14 ->
100]);OffsetRange(topic: 'Datalake', partition: 0, range: [250959 ->
251044])


Notice that in partition 0, for example, the 3 messages with offsets 250959
through 250961 are being processed twice - once by the old job, and once by
the new.  I would have expected that in the new run, the offset range for
partition 0 would have been 250962 -> 251044, which would result in
exactly-once semantics.

Am I misunderstanding how this should work?  (I.e., exactly-once semantics
is not possible here?)

Thanks,

DR

Reply via email to