Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2465#discussion_r157346366
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -225,6 +237,23 @@ private long doSeek(TopicPartition tp,
OffsetAndMetadata committedOffset) {
}
}
+ /**
+ * Checks If {@link OffsetAndMetadata} was committed by this topology,
either by this or another spout instance.
+ * This info is used to decide if {@link FirstPollOffsetStrategy}
should be applied
+ *
+ * @param committedOffset {@link OffsetAndMetadata} info committed to
Kafka
+ * @return true if this topology committed this {@link
OffsetAndMetadata}, false otherwise
+ */
+ private boolean isOffsetCommittedByThisTopology(OffsetAndMetadata
committedOffset) {
+ try {
+ return committedOffset != null &&
JSON_MAPPER.readValue(committedOffset.metadata(), KafkaSpoutMessageId.class)
--- End diff --
Also I think this will crash the spout when starting up on a topic that
already has commits from before this change. We should probably warn in the log
and pick some reasonable default instead of erroring out if there's no message
id in the metadata.
---