I have an app that uses Kafka Streaming to pull data from `input` topic and
push to `output` topic with `processing.guarantee=exactly_once`. Due to
`exactly_once` gaps (transaction markers) are created in Kafka. Let's call
this app `kafka-streamer`.

Now I've another app that listens to this output topic (actually they are
multiple topics with a Pattern/Regex) and processes the data using
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html.
Let's call this app `spark-streamer`.

Due to the gaps, the first thing that happens is spark streaming fails. To
fix this I enabled `spark.streaming.kafka.allowNonConsecutiveOffsets=true`
in the spark config before creating the StreamingContext. Now let's look at
the issues that were faced when I start `spark-streamer` (I also went
through some of the spark-streaming-kafka code in the limited amount of
time I had):

1. Once `spark-streamer` starts if there are unconsumed offsets present in
the topic partition, it does poll them but won't process (create RDDs)
until some new message is pushed to the topic partition after the app is
started. Code:
https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala#L160
- I can see we poll the data but I'm not sure where the code is to process
it. But anyway, when I run the app I'm pretty sure the data doesn't get
processed (but it does get polled in `compactedStart()`) until
`compactedNext()` is called.
2. In `compactedNext()` if no data is polled within 120s (default timeout),
we throw an exception and the my app literally crashes. Code:
https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala#L178
- Why do we throw an exception and not keep polling just like a normal
KafkaConsumer would do/behave ?

Would be of great help if somebody can help me out with the 2 questions
listed above!

-- 
Thanks and Best Regards,
Rishabh

Reply via email to