[1/2] storm git commit: [STORM-2607] Offset consumer + 1

2017-10-16 Thread srdo
Repository: storm
Updated Branches:
  refs/heads/1.1.x-branch cb331d454 -> c9bbd5444


[STORM-2607] Offset consumer + 1


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a1ffcb93
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a1ffcb93
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a1ffcb93

Branch: refs/heads/1.1.x-branch
Commit: a1ffcb936336d71bdf3693108e33e92c8d2a88ef
Parents: cb331d4
Author: Rodolfo Ribeiro 
Authored: Thu Jun 29 12:06:20 2017 -0300
Committer: Stig Rohde Døssing 
Committed: Mon Oct 16 20:12:33 2017 +0200

--
 .../kafka/spout/internal/OffsetManager.java | 20 
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  |  2 +-
 2 files changed, 13 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/storm/blob/a1ffcb93/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
--
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
index b6d36d8..1c474e3 100755
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -71,16 +71,18 @@ public class OffsetManager {
 boolean found = false;
 long currOffset;
 long nextCommitOffset = committedOffset;
+long lastOffMessageOffset = committedOffset;
 KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience 
variable to make it faster to create OffsetAndMetadata
 
 for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // complexity is 
that of a linear scan on a TreeMap
 currOffset = currAckedMsg.offset();
-if (currOffset == nextCommitOffset + 1) {// found the 
next offset to commit
+if (currOffset == lastOffMessageOffset + 1) {// found 
the next offset to commit
 found = true;
 nextCommitMsg = currAckedMsg;
-nextCommitOffset = currOffset;
-} else if (currOffset > nextCommitOffset + 1) {
-if (emittedOffsets.contains(nextCommitOffset + 1)) {
+lastOffMessageOffset = currOffset;
+nextCommitOffset = lastOffMessageOffset + 1;
+} else if (currOffset > lastOffMessageOffset + 1) {
+if (emittedOffsets.contains(lastOffMessageOffset + 1)) {
 LOG.debug("topic-partition [{}] has non-continuous offset 
[{}]. It will be processed in a subsequent batch.", tp, currOffset);
 break;
 } else {
@@ -92,11 +94,12 @@ public class OffsetManager {
 first element after committedOffset in the ascending 
ordered emitted set.
  */
 LOG.debug("Processed non contiguous offset. 
(committedOffset+1) is no longer part of the topic. Committed: [{}], Processed: 
[{}]", committedOffset, currOffset);
-final Long nextEmittedOffset = 
emittedOffsets.ceiling(nextCommitOffset + 1);
+final Long nextEmittedOffset = 
emittedOffsets.ceiling(lastOffMessageOffset);
 if (nextEmittedOffset != null && currOffset == 
nextEmittedOffset) {
 found = true;
 nextCommitMsg = currAckedMsg;
-nextCommitOffset = currOffset;
+lastOffMessageOffset = currOffset;
+nextCommitOffset = lastOffMessageOffset + 1;
 } else {
 LOG.debug("topic-partition [{}] has non-continuous 
offset [{}]. Next Offset to commit should be [{}]", tp, currOffset, 
nextEmittedOffset);
 break;
@@ -112,7 +115,8 @@ public class OffsetManager {
 OffsetAndMetadata nextCommitOffsetAndMetadata = null;
 if (found) {
 nextCommitOffsetAndMetadata = new 
OffsetAndMetadata(nextCommitOffset, 
nextCommitMsg.getMetadata(Thread.currentThread()));
-LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be 
committed", tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset());
+LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be 
committed",
+tp, earliestUncommittedOffset, 
nextCommitOffsetAndMetadata.offset() - 1);
 } else {
 LOG.debug("topic-partition [{}] has NO offsets ready to be 
committed", tp);
 }

[1/2] storm git commit: [STORM-2607] Offset consumer + 1

2017-10-16 Thread srdo
Repository: storm
Updated Branches:
  refs/heads/1.x-branch 39e12aa22 -> 148ee6609


[STORM-2607] Offset consumer + 1


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4fc47b2a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4fc47b2a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4fc47b2a

Branch: refs/heads/1.x-branch
Commit: 4fc47b2a841bf05ec89c4ccbf089d95c28c965a1
Parents: 39e12aa
Author: Rodolfo Ribeiro 
Authored: Thu Jun 29 12:06:20 2017 -0300
Committer: Stig Rohde Døssing 
Committed: Tue Oct 17 07:20:38 2017 +0200

--
 .../kafka/spout/internal/OffsetManager.java | 20 
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  |  2 +-
 2 files changed, 13 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/storm/blob/4fc47b2a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
--
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
index b6d36d8..1c474e3 100755
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -71,16 +71,18 @@ public class OffsetManager {
 boolean found = false;
 long currOffset;
 long nextCommitOffset = committedOffset;
+long lastOffMessageOffset = committedOffset;
 KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience 
variable to make it faster to create OffsetAndMetadata
 
 for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // complexity is 
that of a linear scan on a TreeMap
 currOffset = currAckedMsg.offset();
-if (currOffset == nextCommitOffset + 1) {// found the 
next offset to commit
+if (currOffset == lastOffMessageOffset + 1) {// found 
the next offset to commit
 found = true;
 nextCommitMsg = currAckedMsg;
-nextCommitOffset = currOffset;
-} else if (currOffset > nextCommitOffset + 1) {
-if (emittedOffsets.contains(nextCommitOffset + 1)) {
+lastOffMessageOffset = currOffset;
+nextCommitOffset = lastOffMessageOffset + 1;
+} else if (currOffset > lastOffMessageOffset + 1) {
+if (emittedOffsets.contains(lastOffMessageOffset + 1)) {
 LOG.debug("topic-partition [{}] has non-continuous offset 
[{}]. It will be processed in a subsequent batch.", tp, currOffset);
 break;
 } else {
@@ -92,11 +94,12 @@ public class OffsetManager {
 first element after committedOffset in the ascending 
ordered emitted set.
  */
 LOG.debug("Processed non contiguous offset. 
(committedOffset+1) is no longer part of the topic. Committed: [{}], Processed: 
[{}]", committedOffset, currOffset);
-final Long nextEmittedOffset = 
emittedOffsets.ceiling(nextCommitOffset + 1);
+final Long nextEmittedOffset = 
emittedOffsets.ceiling(lastOffMessageOffset);
 if (nextEmittedOffset != null && currOffset == 
nextEmittedOffset) {
 found = true;
 nextCommitMsg = currAckedMsg;
-nextCommitOffset = currOffset;
+lastOffMessageOffset = currOffset;
+nextCommitOffset = lastOffMessageOffset + 1;
 } else {
 LOG.debug("topic-partition [{}] has non-continuous 
offset [{}]. Next Offset to commit should be [{}]", tp, currOffset, 
nextEmittedOffset);
 break;
@@ -112,7 +115,8 @@ public class OffsetManager {
 OffsetAndMetadata nextCommitOffsetAndMetadata = null;
 if (found) {
 nextCommitOffsetAndMetadata = new 
OffsetAndMetadata(nextCommitOffset, 
nextCommitMsg.getMetadata(Thread.currentThread()));
-LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be 
committed", tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset());
+LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be 
committed",
+tp, earliestUncommittedOffset, 
nextCommitOffsetAndMetadata.offset() - 1);
 } else {
 LOG.debug("topic-partition [{}] has NO offsets ready to be 
committed", tp);
 }
@@