[ 
https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15453884#comment-15453884
 ] 

Sean McKibben commented on SPARK-17147:
---------------------------------------

I think Kafka's log compaction's design is still intended for sequential 
reading, even if the offsets are not consecutive for a compacted topic. Kafka's 
internal log cleaner process copies log segments to new files which have been 
compacted, so the messages are still stored sequentially even if the offset 
metadata for them increases by more than one. The typical consumer just does a 
poll() to get the next records, regardless of their offsets, but this Spark's 
CachedKafkaConsumer checks the offset of each record before calling poll(), and 
if that offset isn't the previous record's offset +1, it's going to call 
consumer.seek() before the next poll(), which I think is producing the dramatic 
slowdown I've seen.
It is certainly possible, using a non-Spark Kafka consumer, to get equivalent 
read speeds regardless of whether a topic is compacted.
I think the interplay between the CachedKafkaConsumer and the KafkaRDD might 
need to be adjusted. I haven't looked to see if more than one KafkaRDD will 
ever be asking for records from a single CachedKafkaConsumer instance, but 
since CachedKafkaConsumer was inspecting each offset to see if it was exactly 
the offset requested, and not just >= the requested offset, I'm guessing there 
was a reason. 

The main issue here is that it's becoming apparent that Kafka consumers can't 
assume monotonically incrementing offsets. Unfortunately that is an assumption 
that Spark-Kafka was making, and I think that assumption will need to be 
removed.

> Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets
> ------------------------------------------------------------------------
>
>                 Key: SPARK-17147
>                 URL: https://issues.apache.org/jira/browse/SPARK-17147
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 2.0.0
>            Reporter: Robert Conrad
>
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset 
> will always be just an increment of 1 above the previous offset. 
> I have worked around this problem by changing CachedKafkaConsumer to use the 
> returned record's offset, from:
> {{nextOffset = offset + 1}}
> to:
> {{nextOffset = record.offset + 1}}
> and changed KafkaRDD from:
> {{requestOffset += 1}}
> to:
> {{requestOffset = r.offset() + 1}}
> (I also had to change some assert logic in CachedKafkaConsumer).
> There's a strong possibility that I have misconstrued how to use the 
> streaming kafka consumer, and I'm happy to close this out if that's the case. 
> If, however, it is supposed to support non-consecutive offsets (e.g. due to 
> log compaction) I am also happy to contribute a PR.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to