[ https://issues.apache.org/jira/browse/STORM-2624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16088106#comment-16088106 ]
Stig Rohde Døssing commented on STORM-2624: ------------------------------------------- Hi [~sergiyk]. Please open a PR at https://github.com/apache/storm, the code is more likely to get reviewed there, and the code can be validated via the CI setup. Thanks :) > Kafka Storm Spout: Got fetch request with offset out of range > ------------------------------------------------------------- > > Key: STORM-2624 > URL: https://issues.apache.org/jira/browse/STORM-2624 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka > Affects Versions: 1.0.1, 1.0.2, 1.1.0 > Reporter: Sergiy Kharytesku > > If partition offset is out of range then kafka spout stops emitting new > messages and keeps logging following warning: > 2016-10-26 11:11:31.070 o.a.s.k.KafkaUtils [WARN] > Partition{host=somehost.org:9092, topic=my-topic, partition=0} Got fetch > request with offset out of range: [3] > 2016-10-26 11:11:31.078 o.a.s.k.KafkaUtils [WARN] > Partition{host=somehost.org:9092, topic=my-topic, partition=0} Got fetch > request with offset out of range: [3] > ... > I believe the trivial fix is in PartitonManager.java in fill method > line 237: > {code:java} > long partitionLatestOffset = KafkaUtils.getOffset(_consumer, > _partition.topic, _partition.partition, kafka.api.OffsetRequest.LatestTime()); > if (partitionLatestOffset < offset) { > offset = partitionLatestOffset; > } else { > offset = KafkaUtils.getOffset(_consumer, _partition.topic, > _partition.partition, kafka.api.OffsetRequest.EarliestTime()); > } > {code} > change to: > {code:java} > offset = KafkaUtils.getOffset(_consumer, _partition.topic, > _partition.partition, _spoutConfig.startOffsetTime); > {code} > line 259: > {code:java} > if (offset > _emittedToOffset) { > _lostMessageCount.incrBy(offset - _emittedToOffset); > _emittedToOffset = offset; > LOG.warn("{} Using new offset: {}", _partition, > _emittedToOffset); > } > {code} > change to: > {code:java} > if (offset > _emittedToOffset) { > _lostMessageCount.incrBy(offset - _emittedToOffset); > } > _emittedToOffset = offset; > LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset); > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)