Xin Wang created STORM-645: ------------------------------ Summary: KafkaUtils repeat fetch messages which offset is out of range Key: STORM-645 URL: https://issues.apache.org/jira/browse/STORM-645 Project: Apache Storm Issue Type: Bug Components: storm-kafka Affects Versions: 0.9.2-incubating, 0.9.3 Reporter: Xin Wang Assignee: Xin Wang
KafkaUtils repeat fetch messages which offset is out of range. This happened when failed list(SortedSet<Long> failed) is not empty and some offset in it is OutOfRange. [FIX] storm.kafka.PartitionManager.fill(): ... try { msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); } catch (UpdateOffsetException e) { _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig); LOG.warn("Using new offset: {}", _emittedToOffset); // fetch failed, so don't update the metrics //fix bug: remove this offset from failed list when it is OutOfRange if (had_failed) { failed.remove(offset); } return; } ... -- This message was sent by Atlassian JIRA (v6.3.4#6332)