Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2465#discussion_r157375809
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -225,6 +243,25 @@ private long doSeek(TopicPartition tp, 
OffsetAndMetadata committedOffset) {
             }
         }
     
    +    /**
    +     * Checks If {@link OffsetAndMetadata} was committed by an instance of 
{@link KafkaSpout} in this topology.
    +     * This info is used to decide if {@link FirstPollOffsetStrategy} 
should be applied
    +     *
    +     * @param committedOffset {@link OffsetAndMetadata} info committed to 
Kafka
    +     * @return true if this topology committed this {@link 
OffsetAndMetadata}, false otherwise
    +     */
    +    private boolean isOffsetCommittedByThisTopology(OffsetAndMetadata 
committedOffset) {
    +        try {
    +            final KafkaSpout.Info info = 
JSON_MAPPER.readValue(committedOffset.metadata(), KafkaSpout.Info.class);
    +            return info.getTopologyId().equals(context.getStormId());
    +        } catch (IOException e) {
    +            LOG.trace("Failed to deserialize {}. Error likely occurred 
because the last commit " +
    --- End diff --
    
    Good point, hadn't thought of this code being hit on every emit. The reason 
I think WARN level is appropriate is that this is indicating a potential error, 
and we want it to show up in the log if an exception is caught here. Putting it 
at TRACE level means no one will see it. It's not solely for debugging 
purposes, it's informational telling the user that the spout is defaulting to a 
certain behavior because a feature failed to work. TRACE and DEBUG are IMO for 
logs that happen "normally" and not special cases like this.
    
    I'm wondering if we should cache the committed offset and metadata in a 
field between commits instead, i.e. add a partition -> 
lastCommitedOffsetAndMetadata map that we update in the commit method? It seems 
a little inefficient to run this code on every single emit, in addition to the 
issue with lots of logging. 


---

Reply via email to