Github user apiwoni commented on the issue:

    https://github.com/apache/storm/pull/1679
  
    @srdo  I'm not sure why you expect the poll to return records after 
seekToEnd? seekToEnd seeks to the end of the log in Kafka, and it doesn't 
actually do the seek until poll or position is called on the consumer. 
    
    Based on Kafka spout loop and logs here's what I see and expect:
    
    org.apache.storm.kafka.spout.KafkaSpout#pollKafkaBroker
        org.apache.storm.kafka.spout.KafkaSpout#doSeekRetriableTopicPartitions
                
org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff#retriableTopicPartitions
                        // Log message indicates messageId is ready to be 
retried
                        Topic partitions with entries ready to be retried 
[[TOPIC-0]]
                
org.apache.storm.kafka.spout.KafkaSpout.OffsetEntry#findNextCommitOffset(TOPIC-0)
                        // Returns null because of non-continuous offset
                        topic-partition [TOPIC-0] has non-continuous offset 
[219355]. It will be processed in a subsequent batch.
                
org.apache.kafka.clients.consumer.KafkaConsumer#seekToEnd(TOPIC-0)
                        // This should seek to last committed offset for 
retriable partition, if exists, else to the last offset
                        
org.apache.kafka.clients.consumer.internals.SubscriptionState#needOffsetReset(OffsetResetStrategy.LATEST)
        org.apache.kafka.clients.consumer.KafkaConsumer#poll
                // I think poll should return records to be retried here from 
even though committed offset is 219355
    
    Are you sure that after calling seekToEnd another call has been made to 
commit offset and subsequent call to seekToEnd did not got to last committed 
offset but to the last offset as comment in KafkaSpout states? 
    seekToEnd calls 
org.apache.kafka.clients.consumer.internals.SubscriptionState#needOffsetReset(OffsetResetStrategy.LATEST)
 and my understanding of OffsetResetStrategy.LATEST is that if offset exists 
doesn't exist on the broker (consumer_offset topic or Zookeeper) it should 
indeed be set to last offset, but if current offset already exists that is 
where seekToEnd should go.



---

Reply via email to