Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2465#discussion_r158115939
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
---
@@ -185,29 +188,25 @@ public long commit(OffsetAndMetadata committedOffset)
{
LOG.trace("{}", this);
LOG.debug("Committed [{}] offsets in the range [{}-{}] for
topic-partition [{}]."
- + " Processing will resume at [{}] if the spout restarts.",
+ + " Processing will resume at [{}] upon spout restart",
numCommittedOffsets, preCommitCommittedOffset,
this.committedOffset - 1, tp, this.committedOffset);
return numCommittedOffsets;
}
- public long getCommittedOffset() {
- return committedOffset;
- }
-
- public boolean isEmpty() {
- return ackedMsgs.isEmpty();
- }
-
- public boolean contains(ConsumerRecord record) {
- return contains(new KafkaSpoutMessageId(record));
+ /**
+ * Checks if this OffsetManager has committed to Kafka.
+ *
+ * @return true if this OffsetManager has made at least one commit to
Kafka, false otherwise
+ */
+ public boolean hasCommitted() {
+ return committed;
}
public boolean contains(KafkaSpoutMessageId msgId) {
return ackedMsgs.contains(msgId);
}
- //VisibleForTesting
--- End diff --
Didn't you replace this with the annotation earlier?
---