Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2465#discussion_r157346366
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -225,6 +237,23 @@ private long doSeek(TopicPartition tp, 
OffsetAndMetadata committedOffset) {
             }
         }
     
    +    /**
    +     * Checks If {@link OffsetAndMetadata} was committed by this topology, 
either by this or another spout instance.
    +     * This info is used to decide if {@link FirstPollOffsetStrategy} 
should be applied
    +     *
    +     * @param committedOffset {@link OffsetAndMetadata} info committed to 
Kafka
    +     * @return true if this topology committed this {@link 
OffsetAndMetadata}, false otherwise
    +     */
    +    private boolean isOffsetCommittedByThisTopology(OffsetAndMetadata 
committedOffset) {
    +        try {
    +            return committedOffset != null && 
JSON_MAPPER.readValue(committedOffset.metadata(), KafkaSpoutMessageId.class)
    --- End diff --
    
    Also I think this will crash the spout when starting up on a topic that 
already has commits from before this change. We should probably warn in the log 
and pick some reasonable default instead of erroring out if there's no message 
id in the metadata.


---

Reply via email to