Xin Wang created STORM-645:
------------------------------

             Summary: KafkaUtils repeat fetch messages which offset is out of 
range
                 Key: STORM-645
                 URL: https://issues.apache.org/jira/browse/STORM-645
             Project: Apache Storm
          Issue Type: Bug
          Components: storm-kafka
    Affects Versions: 0.9.2-incubating, 0.9.3
            Reporter: Xin Wang
            Assignee: Xin Wang


KafkaUtils repeat fetch messages which offset is out of range.
This happened when failed list(SortedSet<Long> failed) is not empty and some 
offset in it is OutOfRange.

[FIX]
storm.kafka.PartitionManager.fill():
...
try {
        msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, 
offset);
} catch (UpdateOffsetException e) {
         _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, 
_partition.partition, _spoutConfig);
        LOG.warn("Using new offset: {}", _emittedToOffset);
        // fetch failed, so don't update the metrics

        //fix bug: remove this offset from failed list when it is OutOfRange
        if (had_failed) {
                failed.remove(offset);
        }

            return;
}
...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to