[ https://issues.apache.org/jira/browse/STORM-2229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hugo Louro closed STORM-2229. ----------------------------- > KafkaSpout does not resend failed tuples > ---------------------------------------- > > Key: STORM-2229 > URL: https://issues.apache.org/jira/browse/STORM-2229 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client > Affects Versions: 1.0.0, 1.0.1, 1.0.2 > Reporter: Matthias Klein > > When the topology fails a tuple, it is never resent by the KafkaSpout. This > can easily be shown by constructing a small topology failing every tuple. > Apparent reason: > {code} > public class KafkaSpout<K, V> extends BaseRichSpout { > //... > private void doSeekRetriableTopicPartitions() { > final Set<TopicPartition> retriableTopicPartitions = > retryService.retriableTopicPartitions(); > for (TopicPartition rtp : retriableTopicPartitions) { > final OffsetAndMetadata offsetAndMeta = > acked.get(rtp).findNextCommitOffset(); > if (offsetAndMeta != null) { > kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek > to the next offset that is ready to commit in next commit cycle > } else { > kafkaConsumer.seekToEnd(toArrayList(rtp)); // Seek to last > committed offset <== Does seek to end of partition > } > } > } > {code} > The code seeks to the end of the partition instead of seeking to the first > uncommited offset. > Preliminary fix (worked for me, but needs to be checked by an expert) > {code} > private void doSeekRetriableTopicPartitions() { > final Set<TopicPartition> retriableTopicPartitions = > retryService.retriableTopicPartitions(); > for (TopicPartition rtp : retriableTopicPartitions) { > final OffsetAndMetadata offsetAndMeta = > acked.get(rtp).findNextCommitOffset(); > if (offsetAndMeta != null) { > kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek > to the next offset that is ready to commit in next commit cycle > } else { > OffsetAndMetadata committed = kafkaConsumer.committed(rtp); > if(committed == null) { > // No offsets commited yet for this partition - start > from beginning > kafkaConsumer.seekToBeginning(toArrayList(rtp)); > } else { > // Seek to first uncommitted offset > kafkaConsumer.seek(rtp, committed.offset() + 1); > } > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)