[1/2] storm git commit: [STORM-2607] Offset consumer + 1
Repository: storm Updated Branches: refs/heads/1.1.x-branch cb331d454 -> c9bbd5444 [STORM-2607] Offset consumer + 1 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a1ffcb93 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a1ffcb93 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a1ffcb93 Branch: refs/heads/1.1.x-branch Commit: a1ffcb936336d71bdf3693108e33e92c8d2a88ef Parents: cb331d4 Author: Rodolfo RibeiroAuthored: Thu Jun 29 12:06:20 2017 -0300 Committer: Stig Rohde Døssing Committed: Mon Oct 16 20:12:33 2017 +0200 -- .../kafka/spout/internal/OffsetManager.java | 20 .../kafka/spout/SingleTopicKafkaSpoutTest.java | 2 +- 2 files changed, 13 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/a1ffcb93/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java -- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java index b6d36d8..1c474e3 100755 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -71,16 +71,18 @@ public class OffsetManager { boolean found = false; long currOffset; long nextCommitOffset = committedOffset; +long lastOffMessageOffset = committedOffset; KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap currOffset = currAckedMsg.offset(); -if (currOffset == nextCommitOffset + 1) {// found the next offset to commit +if (currOffset == lastOffMessageOffset + 1) {// found the next offset to commit found = true; nextCommitMsg = currAckedMsg; -nextCommitOffset = currOffset; -} else if (currOffset > nextCommitOffset + 1) { -if (emittedOffsets.contains(nextCommitOffset + 1)) { +lastOffMessageOffset = currOffset; +nextCommitOffset = lastOffMessageOffset + 1; +} else if (currOffset > lastOffMessageOffset + 1) { +if (emittedOffsets.contains(lastOffMessageOffset + 1)) { LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset); break; } else { @@ -92,11 +94,12 @@ public class OffsetManager { first element after committedOffset in the ascending ordered emitted set. */ LOG.debug("Processed non contiguous offset. (committedOffset+1) is no longer part of the topic. Committed: [{}], Processed: [{}]", committedOffset, currOffset); -final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset + 1); +final Long nextEmittedOffset = emittedOffsets.ceiling(lastOffMessageOffset); if (nextEmittedOffset != null && currOffset == nextEmittedOffset) { found = true; nextCommitMsg = currAckedMsg; -nextCommitOffset = currOffset; +lastOffMessageOffset = currOffset; +nextCommitOffset = lastOffMessageOffset + 1; } else { LOG.debug("topic-partition [{}] has non-continuous offset [{}]. Next Offset to commit should be [{}]", tp, currOffset, nextEmittedOffset); break; @@ -112,7 +115,8 @@ public class OffsetManager { OffsetAndMetadata nextCommitOffsetAndMetadata = null; if (found) { nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread())); -LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed", tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset()); +LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed", +tp, earliestUncommittedOffset, nextCommitOffsetAndMetadata.offset() - 1); } else { LOG.debug("topic-partition [{}] has NO offsets ready to be committed", tp); }
[1/2] storm git commit: [STORM-2607] Offset consumer + 1
Repository: storm Updated Branches: refs/heads/1.x-branch 39e12aa22 -> 148ee6609 [STORM-2607] Offset consumer + 1 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4fc47b2a Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4fc47b2a Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4fc47b2a Branch: refs/heads/1.x-branch Commit: 4fc47b2a841bf05ec89c4ccbf089d95c28c965a1 Parents: 39e12aa Author: Rodolfo RibeiroAuthored: Thu Jun 29 12:06:20 2017 -0300 Committer: Stig Rohde Døssing Committed: Tue Oct 17 07:20:38 2017 +0200 -- .../kafka/spout/internal/OffsetManager.java | 20 .../kafka/spout/SingleTopicKafkaSpoutTest.java | 2 +- 2 files changed, 13 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/4fc47b2a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java -- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java index b6d36d8..1c474e3 100755 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -71,16 +71,18 @@ public class OffsetManager { boolean found = false; long currOffset; long nextCommitOffset = committedOffset; +long lastOffMessageOffset = committedOffset; KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap currOffset = currAckedMsg.offset(); -if (currOffset == nextCommitOffset + 1) {// found the next offset to commit +if (currOffset == lastOffMessageOffset + 1) {// found the next offset to commit found = true; nextCommitMsg = currAckedMsg; -nextCommitOffset = currOffset; -} else if (currOffset > nextCommitOffset + 1) { -if (emittedOffsets.contains(nextCommitOffset + 1)) { +lastOffMessageOffset = currOffset; +nextCommitOffset = lastOffMessageOffset + 1; +} else if (currOffset > lastOffMessageOffset + 1) { +if (emittedOffsets.contains(lastOffMessageOffset + 1)) { LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset); break; } else { @@ -92,11 +94,12 @@ public class OffsetManager { first element after committedOffset in the ascending ordered emitted set. */ LOG.debug("Processed non contiguous offset. (committedOffset+1) is no longer part of the topic. Committed: [{}], Processed: [{}]", committedOffset, currOffset); -final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset + 1); +final Long nextEmittedOffset = emittedOffsets.ceiling(lastOffMessageOffset); if (nextEmittedOffset != null && currOffset == nextEmittedOffset) { found = true; nextCommitMsg = currAckedMsg; -nextCommitOffset = currOffset; +lastOffMessageOffset = currOffset; +nextCommitOffset = lastOffMessageOffset + 1; } else { LOG.debug("topic-partition [{}] has non-continuous offset [{}]. Next Offset to commit should be [{}]", tp, currOffset, nextEmittedOffset); break; @@ -112,7 +115,8 @@ public class OffsetManager { OffsetAndMetadata nextCommitOffsetAndMetadata = null; if (found) { nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread())); -LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed", tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset()); +LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed", +tp, earliestUncommittedOffset, nextCommitOffsetAndMetadata.offset() - 1); } else { LOG.debug("topic-partition [{}] has NO offsets ready to be committed", tp); } @@