Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2465#discussion_r157726005
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -204,33 +226,70 @@ private void initialize(Collection<TopicPartition>
partitions) {
/**
* Sets the cursor to the location dictated by the first poll
strategy and returns the fetch offset.
*/
- private long doSeek(TopicPartition tp, OffsetAndMetadata
committedOffset) {
- if (committedOffset != null) { // offset was
committed for this TopicPartition
- if (firstPollOffsetStrategy.equals(EARLIEST)) {
-
kafkaConsumer.seekToBeginning(Collections.singleton(tp));
- } else if (firstPollOffsetStrategy.equals(LATEST)) {
- kafkaConsumer.seekToEnd(Collections.singleton(tp));
+ private long doSeek(TopicPartition newTp, OffsetAndMetadata
committedOffset) {
+ LOG.trace("Seeking offset for topic-partition {} with {} and
{}", newTp, firstPollOffsetStrategy, committedOffset);
--- End diff --
Nit: Can you put [] around the inserted values here so it matches the other
logs?
---