Github user hmcl commented on a diff in the pull request:
https://github.com/apache/storm/pull/2465#discussion_r157374387
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -225,6 +243,25 @@ private long doSeek(TopicPartition tp,
OffsetAndMetadata committedOffset) {
}
}
+ /**
+ * Checks If {@link OffsetAndMetadata} was committed by an instance of
{@link KafkaSpout} in this topology.
+ * This info is used to decide if {@link FirstPollOffsetStrategy}
should be applied
+ *
+ * @param committedOffset {@link OffsetAndMetadata} info committed to
Kafka
+ * @return true if this topology committed this {@link
OffsetAndMetadata}, false otherwise
+ */
+ private boolean isOffsetCommittedByThisTopology(OffsetAndMetadata
committedOffset) {
+ try {
+ final KafkaSpout.Info info =
JSON_MAPPER.readValue(committedOffset.metadata(), KafkaSpout.Info.class);
+ return info.getTopologyId().equals(context.getStormId());
+ } catch (IOException e) {
+ LOG.trace("Failed to deserialize {}. Error likely occurred
because the last commit " +
--- End diff --
If we put it at WARN level, it will write this message potentially
thousands of times (equal to the number of records that get polled), until the
first commit is done. I put it at TRACE because this is technically for
debugging purposes, and I am printing the exception stack trace as well. I can
put it at DEBUG, but I don't think it should be WARN. I can put the message at
DEBUG and stack trade at TRACE.
On a side note, I don't think TRACE level is used enough, but I consider it
is very useful to print more detail after the problem is identified.
---