[ 
https://issues.apache.org/jira/browse/STORM-2229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hugo Louro closed STORM-2229.
-----------------------------

> KafkaSpout does not resend failed tuples
> ----------------------------------------
>
>                 Key: STORM-2229
>                 URL: https://issues.apache.org/jira/browse/STORM-2229
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.0.0, 1.0.1, 1.0.2
>            Reporter: Matthias Klein
>
> When the topology fails a tuple, it is never resent by the KafkaSpout. This 
> can easily be shown by constructing a small topology failing every tuple.
> Apparent reason:
> {code}
> public class KafkaSpout<K, V> extends BaseRichSpout {
> //...
> private void doSeekRetriableTopicPartitions() {
>         final Set<TopicPartition> retriableTopicPartitions = 
> retryService.retriableTopicPartitions();
>         for (TopicPartition rtp : retriableTopicPartitions) {
>             final OffsetAndMetadata offsetAndMeta = 
> acked.get(rtp).findNextCommitOffset();
>             if (offsetAndMeta != null) {
>                 kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek 
> to the next offset that is ready to commit in next commit cycle
>             } else {
>                 kafkaConsumer.seekToEnd(toArrayList(rtp));    // Seek to last 
> committed offset <== Does seek to end of partition
>             }
>         }
>     }
> {code}
> The code seeks to the end of the partition instead of seeking to the first 
> uncommited offset.
> Preliminary fix (worked for me, but needs to be checked by an expert)
> {code}
>     private void doSeekRetriableTopicPartitions() {
>         final Set<TopicPartition> retriableTopicPartitions = 
> retryService.retriableTopicPartitions();
>         for (TopicPartition rtp : retriableTopicPartitions) {
>             final OffsetAndMetadata offsetAndMeta = 
> acked.get(rtp).findNextCommitOffset();
>             if (offsetAndMeta != null) {
>                 kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek 
> to the next offset that is ready to commit in next commit cycle
>             } else {
>                 OffsetAndMetadata committed = kafkaConsumer.committed(rtp);
>                 if(committed == null) {
>                     // No offsets commited yet for this partition - start 
> from beginning 
>                     kafkaConsumer.seekToBeginning(toArrayList(rtp));
>                 } else {
>                    // Seek to first uncommitted offset
>                     kafkaConsumer.seek(rtp, committed.offset() + 1);
>                 }
>             }
>         }
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to