[ https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15430872#comment-15430872 ]
Cody Koeninger commented on SPARK-17147: ---------------------------------------- My point is more that this probably isn't just two lines in CachedKafkaConsumer. There's other code, both within the spark streaming connector and in users of the connector, that assumes an offset range from..until has a number of messages equal to (until - from). I haven't seen what databricks is coming up with for the structured streaming connector, but I'd imagine that an assumption that offsets are contiguous would certainly simplify that implementation, and might actually be necessary depending on how recovery works. This might be a simple as your change plus logging a warning when a stream starts on a compacted topic, but we need to think through the issues here. > Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets > ------------------------------------------------------------------------ > > Key: SPARK-17147 > URL: https://issues.apache.org/jira/browse/SPARK-17147 > Project: Spark > Issue Type: Bug > Components: Streaming > Affects Versions: 2.0.0 > Reporter: Robert Conrad > > When Kafka does log compaction offsets often end up with gaps, meaning the > next requested offset will be frequently not be offset+1. The logic in > KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset > will always be just an increment of 1 above the previous offset. > I have worked around this problem by changing CachedKafkaConsumer to use the > returned record's offset, from: > {{nextOffset = offset + 1}} > to: > {{nextOffset = record.offset + 1}} > and changed KafkaRDD from: > {{requestOffset += 1}} > to: > {{requestOffset = r.offset() + 1}} > (I also had to change some assert logic in CachedKafkaConsumer). > There's a strong possibility that I have misconstrued how to use the > streaming kafka consumer, and I'm happy to close this out if that's the case. > If, however, it is supposed to support non-consecutive offsets (e.g. due to > log compaction) I am also happy to contribute a PR. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org