Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2465#discussion_r157375809 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -225,6 +243,25 @@ private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { } } + /** + * Checks If {@link OffsetAndMetadata} was committed by an instance of {@link KafkaSpout} in this topology. + * This info is used to decide if {@link FirstPollOffsetStrategy} should be applied + * + * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka + * @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise + */ + private boolean isOffsetCommittedByThisTopology(OffsetAndMetadata committedOffset) { + try { + final KafkaSpout.Info info = JSON_MAPPER.readValue(committedOffset.metadata(), KafkaSpout.Info.class); + return info.getTopologyId().equals(context.getStormId()); + } catch (IOException e) { + LOG.trace("Failed to deserialize {}. Error likely occurred because the last commit " + --- End diff -- Good point, hadn't thought of this code being hit on every emit. The reason I think WARN level is appropriate is that this is indicating a potential error, and we want it to show up in the log if an exception is caught here. Putting it at TRACE level means no one will see it. It's not solely for debugging purposes, it's informational telling the user that the spout is defaulting to a certain behavior because a feature failed to work. TRACE and DEBUG are IMO for logs that happen "normally" and not special cases like this. I'm wondering if we should cache the committed offset and metadata in a field between commits instead, i.e. add a partition -> lastCommitedOffsetAndMetadata map that we update in the commit method? It seems a little inefficient to run this code on every single emit, in addition to the issue with lots of logging.
---