Repository: storm Updated Branches: refs/heads/1.1.x-branch cb331d454 -> c9bbd5444
[STORM-2607] Offset consumer + 1 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a1ffcb93 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a1ffcb93 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a1ffcb93 Branch: refs/heads/1.1.x-branch Commit: a1ffcb936336d71bdf3693108e33e92c8d2a88ef Parents: cb331d4 Author: Rodolfo Ribeiro <rodolfo.ribe...@b2wdigital.com> Authored: Thu Jun 29 12:06:20 2017 -0300 Committer: Stig Rohde Døssing <s...@apache.org> Committed: Mon Oct 16 20:12:33 2017 +0200 ---------------------------------------------------------------------- .../kafka/spout/internal/OffsetManager.java | 20 ++++++++++++-------- .../kafka/spout/SingleTopicKafkaSpoutTest.java | 2 +- 2 files changed, 13 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/a1ffcb93/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java index b6d36d8..1c474e3 100755 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -71,16 +71,18 @@ public class OffsetManager { boolean found = false; long currOffset; long nextCommitOffset = committedOffset; + long lastOffMessageOffset = committedOffset; KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap currOffset = currAckedMsg.offset(); - if (currOffset == nextCommitOffset + 1) { // found the next offset to commit + if (currOffset == lastOffMessageOffset + 1) { // found the next offset to commit found = true; nextCommitMsg = currAckedMsg; - nextCommitOffset = currOffset; - } else if (currOffset > nextCommitOffset + 1) { - if (emittedOffsets.contains(nextCommitOffset + 1)) { + lastOffMessageOffset = currOffset; + nextCommitOffset = lastOffMessageOffset + 1; + } else if (currOffset > lastOffMessageOffset + 1) { + if (emittedOffsets.contains(lastOffMessageOffset + 1)) { LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset); break; } else { @@ -92,11 +94,12 @@ public class OffsetManager { first element after committedOffset in the ascending ordered emitted set. */ LOG.debug("Processed non contiguous offset. (committedOffset+1) is no longer part of the topic. Committed: [{}], Processed: [{}]", committedOffset, currOffset); - final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset + 1); + final Long nextEmittedOffset = emittedOffsets.ceiling(lastOffMessageOffset); if (nextEmittedOffset != null && currOffset == nextEmittedOffset) { found = true; nextCommitMsg = currAckedMsg; - nextCommitOffset = currOffset; + lastOffMessageOffset = currOffset; + nextCommitOffset = lastOffMessageOffset + 1; } else { LOG.debug("topic-partition [{}] has non-continuous offset [{}]. Next Offset to commit should be [{}]", tp, currOffset, nextEmittedOffset); break; @@ -112,7 +115,8 @@ public class OffsetManager { OffsetAndMetadata nextCommitOffsetAndMetadata = null; if (found) { nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread())); - LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed", tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset()); + LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed", + tp, earliestUncommittedOffset, nextCommitOffsetAndMetadata.offset() - 1); } else { LOG.debug("topic-partition [{}] has NO offsets ready to be committed", tp); } @@ -131,7 +135,7 @@ public class OffsetManager { */ public long commit(OffsetAndMetadata committedOffset) { final long preCommitCommittedOffsets = this.committedOffset; - long numCommittedOffsets = 0; + final long numCommittedOffsets = committedOffset.offset() - this.committedOffset - 1; this.committedOffset = committedOffset.offset(); for (Iterator<KafkaSpoutMessageId> iterator = ackedMsgs.iterator(); iterator.hasNext();) { if (iterator.next().offset() <= committedOffset.offset()) { http://git-wip-us.apache.org/repos/asf/storm/blob/a1ffcb93/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java index fff6902..5141a8f 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java @@ -129,7 +129,7 @@ public class SingleTopicKafkaSpoutTest { Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue(); assertThat("Expected commits for only one topic partition", commits.entrySet().size(), is(1)); OffsetAndMetadata offset = commits.entrySet().iterator().next().getValue(); - assertThat("Expected committed offset to cover all emitted messages", offset.offset(), is(messageCount - 1)); + assertThat("Expected committed offset to cover all emitted messages", offset.offset(), is(messageCount)); } @Test