Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2465#discussion_r158336381 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java --- @@ -185,29 +188,25 @@ public long commit(OffsetAndMetadata committedOffset) { LOG.trace("{}", this); LOG.debug("Committed [{}] offsets in the range [{}-{}] for topic-partition [{}]." - + " Processing will resume at [{}] if the spout restarts.", + + " Processing will resume at [{}] upon spout restart", numCommittedOffsets, preCommitCommittedOffset, this.committedOffset - 1, tp, this.committedOffset); return numCommittedOffsets; } - public long getCommittedOffset() { - return committedOffset; - } - - public boolean isEmpty() { - return ackedMsgs.isEmpty(); - } - - public boolean contains(ConsumerRecord record) { - return contains(new KafkaSpoutMessageId(record)); + /** + * Checks if this OffsetManager has committed to Kafka. + * + * @return true if this OffsetManager has made at least one commit to Kafka, false otherwise + */ + public boolean hasCommitted() { + return committed; } public boolean contains(KafkaSpoutMessageId msgId) { return ackedMsgs.contains(msgId); } - //VisibleForTesting --- End diff -- I did when I cherry-picked the code from master. However, It was like this before, and the code with the annotation does not compile. I guess I would have to add the library dependencies for that. I just left it unchanged.
---