Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2465#discussion_r157349402
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -225,6 +237,23 @@ private long doSeek(TopicPartition tp,
OffsetAndMetadata committedOffset) {
}
}
+ /**
+ * Checks If {@link OffsetAndMetadata} was committed by this topology,
either by this or another spout instance.
+ * This info is used to decide if {@link FirstPollOffsetStrategy}
should be applied
+ *
+ * @param committedOffset {@link OffsetAndMetadata} info committed to
Kafka
+ * @return true if this topology committed this {@link
OffsetAndMetadata}, false otherwise
+ */
+ private boolean isOffsetCommittedByThisTopology(OffsetAndMetadata
committedOffset) {
+ try {
+ return committedOffset != null &&
JSON_MAPPER.readValue(committedOffset.metadata(), KafkaSpoutMessageId.class)
--- End diff --
I'd probably just return false from this method. Since the topology id is
new for each deployment we know for certain that if there's no message id in
the meta, then it wasn't committed by this topology.
---