[ https://issues.apache.org/jira/browse/STORM-645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xin Wang closed STORM-645. -------------------------- Resolution: Fixed > KafkaUtils repeat fetch messages which offset is out of range > ------------------------------------------------------------- > > Key: STORM-645 > URL: https://issues.apache.org/jira/browse/STORM-645 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka > Affects Versions: 0.9.2-incubating, 0.9.3 > Reporter: Xin Wang > Assignee: Xin Wang > > KafkaUtils repeat fetch messages which offset is out of range. > This happened when failed list(SortedSet<Long> failed) is not empty and some > offset in it is OutOfRange. > [FIX] > storm.kafka.PartitionManager.fill(): > ... > try { > msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, > offset); > } catch (UpdateOffsetException e) { > _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, > _partition.partition, _spoutConfig); > LOG.warn("Using new offset: {}", _emittedToOffset); > // fetch failed, so don't update the metrics > //fix bug: remove this offset from failed list when it is OutOfRange > if (had_failed) { > failed.remove(offset); > } > return; > } > ... -- This message was sent by Atlassian JIRA (v6.3.4#6332)