[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15453884#comment-15453884 ]
Sean McKibben edited comment on SPARK-17147 at 9/1/16 1:08 AM: --------------------------------------------------------------- I think Kafka's log compaction's design is still intended for sequential reading, even if the offsets are not consecutive for a compacted topic. Kafka's internal log cleaner process copies log segments to new files which have been compacted, so the messages are still stored sequentially even if the offset metadata for them increases by more than one. The typical consumer just does a poll() to get the next records, regardless of their offsets, but this Spark's CachedKafkaConsumer checks the offset of each record before calling poll(), and if that offset isn't the previous record's offset +1, it's going to call consumer.seek() before the next poll(), which I think is producing the dramatic slowdown I've seen. It is certainly possible, using a non-Spark Kafka consumer, to get equivalent read speeds regardless of whether a topic is compacted. I think the interplay between the CachedKafkaConsumer and the KafkaRDD might need to be adjusted. I haven't looked to see if more than one KafkaRDD will ever be asking for records from a single CachedKafkaConsumer instance, but since CachedKafkaConsumer was inspecting each offset to see if it was exactly the offset requested, and not just >= the requested offset, I'm guessing there was a reason. The main issue here is that it's becoming apparent that Kafka consumers can't assume consecutively increasing offsets. Unfortunately that is an assumption that Spark-Kafka was making, and I think that assumption will need to be removed. (Edit: changed "monotonically" to "consecutively" above, since consumers _can_ assume an ever increasing set of offsets, just not consecutively increasing) was (Author: graphex): I think Kafka's log compaction's design is still intended for sequential reading, even if the offsets are not consecutive for a compacted topic. Kafka's internal log cleaner process copies log segments to new files which have been compacted, so the messages are still stored sequentially even if the offset metadata for them increases by more than one. The typical consumer just does a poll() to get the next records, regardless of their offsets, but this Spark's CachedKafkaConsumer checks the offset of each record before calling poll(), and if that offset isn't the previous record's offset +1, it's going to call consumer.seek() before the next poll(), which I think is producing the dramatic slowdown I've seen. It is certainly possible, using a non-Spark Kafka consumer, to get equivalent read speeds regardless of whether a topic is compacted. I think the interplay between the CachedKafkaConsumer and the KafkaRDD might need to be adjusted. I haven't looked to see if more than one KafkaRDD will ever be asking for records from a single CachedKafkaConsumer instance, but since CachedKafkaConsumer was inspecting each offset to see if it was exactly the offset requested, and not just >= the requested offset, I'm guessing there was a reason. The main issue here is that it's becoming apparent that Kafka consumers can't assume monotonically incrementing offsets. Unfortunately that is an assumption that Spark-Kafka was making, and I think that assumption will need to be removed. > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > ------------------------------------------------------------------------ > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: Streaming > Affects Versions: 2.0.0 > Reporter: Robert Conrad > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org