Github user apiwoni commented on the issue:
https://github.com/apache/storm/pull/1679
@srdo I'm not sure why you expect the poll to return records after
seekToEnd? seekToEnd seeks to the end of the log in Kafka, and it doesn't
actually do the seek until poll or position is called on the consumer.
Based on Kafka spout loop and logs here's what I see and expect:
org.apache.storm.kafka.spout.KafkaSpout#pollKafkaBroker
org.apache.storm.kafka.spout.KafkaSpout#doSeekRetriableTopicPartitions
org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff#retriableTopicPartitions
// Log message indicates messageId is ready to be
retried
Topic partitions with entries ready to be retried
[[TOPIC-0]]
org.apache.storm.kafka.spout.KafkaSpout.OffsetEntry#findNextCommitOffset(TOPIC-0)
// Returns null because of non-continuous offset
topic-partition [TOPIC-0] has non-continuous offset
[219355]. It will be processed in a subsequent batch.
org.apache.kafka.clients.consumer.KafkaConsumer#seekToEnd(TOPIC-0)
// This should seek to last committed offset for
retriable partition, if exists, else to the last offset
org.apache.kafka.clients.consumer.internals.SubscriptionState#needOffsetReset(OffsetResetStrategy.LATEST)
org.apache.kafka.clients.consumer.KafkaConsumer#poll
// I think poll should return records to be retried here from
even though committed offset is 219355
Are you sure that after calling seekToEnd another call has been made to
commit offset and subsequent call to seekToEnd did not got to last committed
offset but to the last offset as comment in KafkaSpout states?
seekToEnd calls
org.apache.kafka.clients.consumer.internals.SubscriptionState#needOffsetReset(OffsetResetStrategy.LATEST)
and my understanding of OffsetResetStrategy.LATEST is that if offset exists
doesn't exist on the broker (consumer_offset topic or Zookeeper) it should
indeed be set to last offset, but if current offset already exists that is
where seekToEnd should go.
---