[1/2] storm git commit: [STORM-2607] Offset consumer + 1

2017-10-16 Thread srdo
Repository: storm
Updated Branches:
  refs/heads/1.1.x-branch cb331d454 -> c9bbd5444


[STORM-2607] Offset consumer + 1


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a1ffcb93
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a1ffcb93
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a1ffcb93

Branch: refs/heads/1.1.x-branch
Commit: a1ffcb936336d71bdf3693108e33e92c8d2a88ef
Parents: cb331d4
Author: Rodolfo Ribeiro 
Authored: Thu Jun 29 12:06:20 2017 -0300
Committer: Stig Rohde Døssing 
Committed: Mon Oct 16 20:12:33 2017 +0200

--
 .../kafka/spout/internal/OffsetManager.java | 20 
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  |  2 +-
 2 files changed, 13 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/storm/blob/a1ffcb93/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
--
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
index b6d36d8..1c474e3 100755
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -71,16 +71,18 @@ public class OffsetManager {
 boolean found = false;
 long currOffset;
 long nextCommitOffset = committedOffset;
+long lastOffMessageOffset = committedOffset;
 KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience 
variable to make it faster to create OffsetAndMetadata
 
 for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // complexity is 
that of a linear scan on a TreeMap
 currOffset = currAckedMsg.offset();
-if (currOffset == nextCommitOffset + 1) {// found the 
next offset to commit
+if (currOffset == lastOffMessageOffset + 1) {// found 
the next offset to commit
 found = true;
 nextCommitMsg = currAckedMsg;
-nextCommitOffset = currOffset;
-} else if (currOffset > nextCommitOffset + 1) {
-if (emittedOffsets.contains(nextCommitOffset + 1)) {
+lastOffMessageOffset = currOffset;
+nextCommitOffset = lastOffMessageOffset + 1;
+} else if (currOffset > lastOffMessageOffset + 1) {
+if (emittedOffsets.contains(lastOffMessageOffset + 1)) {
 LOG.debug("topic-partition [{}] has non-continuous offset 
[{}]. It will be processed in a subsequent batch.", tp, currOffset);
 break;
 } else {
@@ -92,11 +94,12 @@ public class OffsetManager {
 first element after committedOffset in the ascending 
ordered emitted set.
  */
 LOG.debug("Processed non contiguous offset. 
(committedOffset+1) is no longer part of the topic. Committed: [{}], Processed: 
[{}]", committedOffset, currOffset);
-final Long nextEmittedOffset = 
emittedOffsets.ceiling(nextCommitOffset + 1);
+final Long nextEmittedOffset = 
emittedOffsets.ceiling(lastOffMessageOffset);
 if (nextEmittedOffset != null && currOffset == 
nextEmittedOffset) {
 found = true;
 nextCommitMsg = currAckedMsg;
-nextCommitOffset = currOffset;
+lastOffMessageOffset = currOffset;
+nextCommitOffset = lastOffMessageOffset + 1;
 } else {
 LOG.debug("topic-partition [{}] has non-continuous 
offset [{}]. Next Offset to commit should be [{}]", tp, currOffset, 
nextEmittedOffset);
 break;
@@ -112,7 +115,8 @@ public class OffsetManager {
 OffsetAndMetadata nextCommitOffsetAndMetadata = null;
 if (found) {
 nextCommitOffsetAndMetadata = new 
OffsetAndMetadata(nextCommitOffset, 
nextCommitMsg.getMetadata(Thread.currentThread()));
-LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be 
committed", tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset());
+LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be 
committed",
+tp, earliestUncommittedOffset, 
nextCommitOffsetAndMetadata.offset() - 1);
 } else {
 LOG.debug("topic-partition [{}] has NO offsets ready to be 
committed", tp);
 }
@@ -131,7 +135,7 @@ public class OffsetManager {
 

[2/2] storm git commit: STORM-2607: Switch OffsetManager to track earliest uncommitted offset instead of last committed offset for compatibility with commitSync consumer API

2017-10-16 Thread srdo
STORM-2607: Switch OffsetManager to track earliest uncommitted offset instead 
of last committed offset for compatibility with commitSync consumer API


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c9bbd544
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c9bbd544
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c9bbd544

Branch: refs/heads/1.1.x-branch
Commit: c9bbd59eee98d0427c115ff13a6878706179
Parents: a1ffcb9
Author: Stig Rohde Døssing 
Authored: Sat Jul 1 17:29:56 2017 +0200
Committer: Stig Rohde Døssing 
Committed: Tue Oct 17 07:22:27 2017 +0200

--
 .../apache/storm/kafka/spout/KafkaSpout.java|   6 +-
 .../kafka/spout/internal/OffsetManager.java |  86 +++-
 .../storm/kafka/spout/KafkaSpoutCommitTest.java |   6 +-
 .../kafka/spout/KafkaSpoutRebalanceTest.java|  14 +-
 .../kafka/spout/KafkaSpoutRetryLimitTest.java   |   9 +-
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  |   2 +-
 .../kafka/spout/internal/OffsetManagerTest.java | 135 +++
 7 files changed, 180 insertions(+), 78 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/storm/blob/c9bbd544/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
--
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 4d88205..207e9b4 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -181,7 +181,7 @@ public class KafkaSpout extends BaseRichSpout {
 }
 
 /**
- * sets the cursor to the location dictated by the first poll strategy 
and returns the fetch offset
+ * Sets the cursor to the location dictated by the first poll strategy 
and returns the fetch offset.
  */
 private long doSeek(TopicPartition tp, OffsetAndMetadata 
committedOffset) {
 if (committedOffset != null) { // offset was committed 
for this TopicPartition
@@ -190,8 +190,8 @@ public class KafkaSpout extends BaseRichSpout {
 } else if (firstPollOffsetStrategy.equals(LATEST)) {
 kafkaConsumer.seekToEnd(Collections.singleton(tp));
 } else {
-// By default polling starts at the last committed offset. 
+1 to point fetch to the first uncommitted offset.
-kafkaConsumer.seek(tp, committedOffset.offset() + 1);
+// By default polling starts at the last committed offset, 
i.e. the first offset that was not marked as processed.
+kafkaConsumer.seek(tp, committedOffset.offset());
 }
 } else {// no commits have ever been done, so start at the 
beginning or end depending on the strategy
 if (firstPollOffsetStrategy.equals(EARLIEST) || 
firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {

http://git-wip-us.apache.org/repos/asf/storm/blob/c9bbd544/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
--
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
index 1c474e3..7dfe7f6 100755
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -37,7 +37,7 @@ public class OffsetManager {
 /* First offset to be fetched. It is either set to the beginning, end, or 
to the first uncommitted offset.
 * Initial value depends on offset strategy. See 
KafkaSpoutConsumerRebalanceListener */
 private final long initialFetchOffset;
-// Last offset committed to Kafka. Initially it is set to fetchOffset - 1
+// Committed offset, i.e. the offset where processing will resume if the 
spout restarts. Initially it is set to fetchOffset.
 private long committedOffset;
 // Emitted Offsets List
 private final NavigableSet emittedOffsets = new TreeSet<>();
@@ -47,8 +47,8 @@ public class OffsetManager {
 public OffsetManager(TopicPartition tp, long initialFetchOffset) {
 this.tp = tp;
 this.initialFetchOffset = initialFetchOffset;
-this.committedOffset = initialFetchOffset - 1;
-LOG.debug("Instantiated {}", this);
+this.committedOffset = i

[2/2] storm git commit: STORM-2607: Switch OffsetManager to track earliest uncommitted offset instead of last committed offset for compatibility with commitSync consumer API

2017-10-16 Thread srdo
STORM-2607: Switch OffsetManager to track earliest uncommitted offset instead 
of last committed offset for compatibility with commitSync consumer API


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/148ee660
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/148ee660
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/148ee660

Branch: refs/heads/1.x-branch
Commit: 148ee6609557d7436ce826da29ff3ec301458586
Parents: 4fc47b2
Author: Stig Rohde Døssing 
Authored: Sat Jul 1 17:29:56 2017 +0200
Committer: Stig Rohde Døssing 
Committed: Tue Oct 17 07:20:39 2017 +0200

--
 .../apache/storm/kafka/spout/KafkaSpout.java|   6 +-
 .../kafka/spout/internal/OffsetManager.java |  86 +++-
 .../storm/kafka/spout/KafkaSpoutCommitTest.java |   6 +-
 .../kafka/spout/KafkaSpoutRebalanceTest.java|   4 +-
 .../kafka/spout/KafkaSpoutRetryLimitTest.java   |   8 +-
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  |   2 +-
 .../kafka/spout/internal/OffsetManagerTest.java | 135 +++
 7 files changed, 174 insertions(+), 73 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/storm/blob/148ee660/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
--
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 3582bdb..68bce11 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -202,7 +202,7 @@ public class KafkaSpout extends BaseRichSpout {
 }
 
 /**
- * sets the cursor to the location dictated by the first poll strategy 
and returns the fetch offset
+ * Sets the cursor to the location dictated by the first poll strategy 
and returns the fetch offset.
  */
 private long doSeek(TopicPartition tp, OffsetAndMetadata 
committedOffset) {
 if (committedOffset != null) { // offset was committed 
for this TopicPartition
@@ -211,8 +211,8 @@ public class KafkaSpout extends BaseRichSpout {
 } else if (firstPollOffsetStrategy.equals(LATEST)) {
 kafkaConsumer.seekToEnd(Collections.singleton(tp));
 } else {
-// By default polling starts at the last committed offset. 
+1 to point fetch to the first uncommitted offset.
-kafkaConsumer.seek(tp, committedOffset.offset() + 1);
+// By default polling starts at the last committed offset, 
i.e. the first offset that was not marked as processed.
+kafkaConsumer.seek(tp, committedOffset.offset());
 }
 } else {// no commits have ever been done, so start at the 
beginning or end depending on the strategy
 if (firstPollOffsetStrategy.equals(EARLIEST) || 
firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {

http://git-wip-us.apache.org/repos/asf/storm/blob/148ee660/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
--
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
index 1c474e3..7dfe7f6 100755
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -37,7 +37,7 @@ public class OffsetManager {
 /* First offset to be fetched. It is either set to the beginning, end, or 
to the first uncommitted offset.
 * Initial value depends on offset strategy. See 
KafkaSpoutConsumerRebalanceListener */
 private final long initialFetchOffset;
-// Last offset committed to Kafka. Initially it is set to fetchOffset - 1
+// Committed offset, i.e. the offset where processing will resume if the 
spout restarts. Initially it is set to fetchOffset.
 private long committedOffset;
 // Emitted Offsets List
 private final NavigableSet emittedOffsets = new TreeSet<>();
@@ -47,8 +47,8 @@ public class OffsetManager {
 public OffsetManager(TopicPartition tp, long initialFetchOffset) {
 this.tp = tp;
 this.initialFetchOffset = initialFetchOffset;
-this.committedOffset = initialFetchOffset - 1;
-LOG.debug("Instantiated {}", this);
+this.committedOffset = ini

[1/2] storm git commit: [STORM-2607] Offset consumer + 1

2017-10-16 Thread srdo
Repository: storm
Updated Branches:
  refs/heads/1.x-branch 39e12aa22 -> 148ee6609


[STORM-2607] Offset consumer + 1


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4fc47b2a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4fc47b2a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4fc47b2a

Branch: refs/heads/1.x-branch
Commit: 4fc47b2a841bf05ec89c4ccbf089d95c28c965a1
Parents: 39e12aa
Author: Rodolfo Ribeiro 
Authored: Thu Jun 29 12:06:20 2017 -0300
Committer: Stig Rohde Døssing 
Committed: Tue Oct 17 07:20:38 2017 +0200

--
 .../kafka/spout/internal/OffsetManager.java | 20 
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  |  2 +-
 2 files changed, 13 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/storm/blob/4fc47b2a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
--
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
index b6d36d8..1c474e3 100755
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -71,16 +71,18 @@ public class OffsetManager {
 boolean found = false;
 long currOffset;
 long nextCommitOffset = committedOffset;
+long lastOffMessageOffset = committedOffset;
 KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience 
variable to make it faster to create OffsetAndMetadata
 
 for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // complexity is 
that of a linear scan on a TreeMap
 currOffset = currAckedMsg.offset();
-if (currOffset == nextCommitOffset + 1) {// found the 
next offset to commit
+if (currOffset == lastOffMessageOffset + 1) {// found 
the next offset to commit
 found = true;
 nextCommitMsg = currAckedMsg;
-nextCommitOffset = currOffset;
-} else if (currOffset > nextCommitOffset + 1) {
-if (emittedOffsets.contains(nextCommitOffset + 1)) {
+lastOffMessageOffset = currOffset;
+nextCommitOffset = lastOffMessageOffset + 1;
+} else if (currOffset > lastOffMessageOffset + 1) {
+if (emittedOffsets.contains(lastOffMessageOffset + 1)) {
 LOG.debug("topic-partition [{}] has non-continuous offset 
[{}]. It will be processed in a subsequent batch.", tp, currOffset);
 break;
 } else {
@@ -92,11 +94,12 @@ public class OffsetManager {
 first element after committedOffset in the ascending 
ordered emitted set.
  */
 LOG.debug("Processed non contiguous offset. 
(committedOffset+1) is no longer part of the topic. Committed: [{}], Processed: 
[{}]", committedOffset, currOffset);
-final Long nextEmittedOffset = 
emittedOffsets.ceiling(nextCommitOffset + 1);
+final Long nextEmittedOffset = 
emittedOffsets.ceiling(lastOffMessageOffset);
 if (nextEmittedOffset != null && currOffset == 
nextEmittedOffset) {
 found = true;
 nextCommitMsg = currAckedMsg;
-nextCommitOffset = currOffset;
+lastOffMessageOffset = currOffset;
+nextCommitOffset = lastOffMessageOffset + 1;
 } else {
 LOG.debug("topic-partition [{}] has non-continuous 
offset [{}]. Next Offset to commit should be [{}]", tp, currOffset, 
nextEmittedOffset);
 break;
@@ -112,7 +115,8 @@ public class OffsetManager {
 OffsetAndMetadata nextCommitOffsetAndMetadata = null;
 if (found) {
 nextCommitOffsetAndMetadata = new 
OffsetAndMetadata(nextCommitOffset, 
nextCommitMsg.getMetadata(Thread.currentThread()));
-LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be 
committed", tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset());
+LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be 
committed",
+tp, earliestUncommittedOffset, 
nextCommitOffsetAndMetadata.offset() - 1);
 } else {
 LOG.debug("topic-partition [{}] has NO offsets ready to be 
committed", tp);
 }
@@ -131,7 +135,7 @@ public class OffsetManager {
 

[2/2] storm git commit: Merge branch 'STORM-2686-redesign' of https://github.com/Ethanlm/storm into STORM-2686

2017-10-16 Thread bobby
Merge branch 'STORM-2686-redesign' of https://github.com/Ethanlm/storm into 
STORM-2686

STORM-2686: Add locality awareness to LoadAwareShuffleGrouping

This closes #2366


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c8947c2f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c8947c2f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c8947c2f

Branch: refs/heads/master
Commit: c8947c2fede62036c20472c9e0335ef90a06b536
Parents: 37403d1 01bd4f8
Author: Robert Evans 
Authored: Mon Oct 16 10:24:39 2017 -0500
Committer: Robert Evans 
Committed: Mon Oct 16 10:24:39 2017 -0500

--
 conf/defaults.yaml  |   2 +
 .../src/jvm/org/apache/storm/Config.java|  18 ++
 .../apache/storm/daemon/worker/WorkerState.java |   2 +-
 .../grouping/LoadAwareShuffleGrouping.java  | 178 ++-
 .../storm/task/GeneralTopologyContext.java  |   4 +
 .../storm/task/WorkerTopologyContext.java   |  40 -
 .../grouping/LoadAwareShuffleGroupingTest.java  |  37 +++-
 7 files changed, 267 insertions(+), 14 deletions(-)
--




[1/2] storm git commit: [STORM-2686] Add locality awareness to LoadAwareShuffleGrouping

2017-10-16 Thread bobby
Repository: storm
Updated Branches:
  refs/heads/master 37403d17d -> c8947c2fe


[STORM-2686] Add locality awareness to LoadAwareShuffleGrouping


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/01bd4f82
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/01bd4f82
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/01bd4f82

Branch: refs/heads/master
Commit: 01bd4f821c940e979c360a4667c27c0477fde9a7
Parents: 352cd46
Author: Ethan Li 
Authored: Mon Oct 9 18:41:39 2017 -0500
Committer: Ethan Li 
Committed: Tue Oct 10 17:06:31 2017 -0500

--
 conf/defaults.yaml  |   2 +
 .../src/jvm/org/apache/storm/Config.java|  18 ++
 .../apache/storm/daemon/worker/WorkerState.java |   2 +-
 .../grouping/LoadAwareShuffleGrouping.java  | 178 ++-
 .../storm/task/GeneralTopologyContext.java  |   4 +
 .../storm/task/WorkerTopologyContext.java   |  40 -
 .../grouping/LoadAwareShuffleGroupingTest.java  |  37 +++-
 7 files changed, 267 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/storm/blob/01bd4f82/conf/defaults.yaml
--
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 103b04f..ad34054 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -261,6 +261,8 @@ topology.disruptor.batch.size: 100
 topology.disruptor.batch.timeout.millis: 1
 topology.disable.loadaware.messaging: false
 topology.state.checkpoint.interval.ms: 1000
+topology.localityaware.higher.bound.percent: 0.8
+topology.localityaware.lower.bound.percent: 0.2
 
 # Configs for Resource Aware Scheduler
 # topology priority describing the importance of the topology in decreasing 
importance starting from 0 (i.e. 0 is the highest priority and the priority 
importance decreases as the priority number increases).

http://git-wip-us.apache.org/repos/asf/storm/blob/01bd4f82/storm-client/src/jvm/org/apache/storm/Config.java
--
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java 
b/storm-client/src/jvm/org/apache/storm/Config.java
index e296e8f..6be0c21 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -65,6 +65,24 @@ public class Config extends HashMap {
 public static final String TOPOLOGY_DISABLE_LOADAWARE_MESSAGING = 
"topology.disable.loadaware.messaging";
 
 /**
+ * This signifies the load congestion among target tasks in scope. 
Currently it's only used in LoadAwareShuffleGrouping.
+ * When the average load is higher than the higher bound, the executor 
should choose target tasks in a higher scope,
+ * The scopes and their orders are: EVERYTHING > RACK_LOCAL > HOST_LOCAL > 
WORKER_LOCAL
+ */
+@isPositiveNumber
+@NotNull
+public static final String TOPOLOGY_LOCALITYAWARE_HIGHER_BOUND_PERCENT = 
"topology.localityaware.higher.bound.percent";
+
+/**
+ * This signifies the load congestion among target tasks in scope. 
Currently it's only used in LoadAwareShuffleGrouping.
+ * When the average load is lower than the lower bound, the executor 
should choose target tasks in a lower scope.
+ * The scopes and their orders are: EVERYTHING > RACK_LOCAL > HOST_LOCAL > 
WORKER_LOCAL
+ */
+@isPositiveNumber
+@NotNull
+public static final String TOPOLOGY_LOCALITYAWARE_LOWER_BOUND_PERCENT = 
"topology.localityaware.lower.bound.percent";
+
+/**
  * Try to serialize all tuples, even for local transfers.  This should 
only be used
  * for testing, as a sanity check that all of your tuples are setup 
properly.
  */

http://git-wip-us.apache.org/repos/asf/storm/blob/01bd4f82/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
--
diff --git 
a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java 
b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index 825de4b..ec2ff59 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -585,7 +585,7 @@ public class WorkerState {
 return new WorkerTopologyContext(systemTopology, topologyConf, 
taskToComponent, componentToSortedTasks,
 componentToStreamToFields, topologyId, codeDir, pidDir, port, 
taskIds,
 defaultSharedResources,
-userSharedResources);
+userSharedResources, cachedTaskToNodePort, assignmentId);
 } catch (IOException e) {
 throw Utils.wrapInRuntime(e);
 }

http://git-wip-us.apache.org/repos/asf/storm/blo

[2/2] storm git commit: Merge branch 'branch_4' of https://github.com/httfighter/storm into STORM-2772

2017-10-16 Thread bobby
Merge branch 'branch_4' of https://github.com/httfighter/storm into STORM-2772

STORM-2772: In the DRPCSpout class, when the fetch from the DRPC server
fails,the log should return to get the DRPC request failed instead of
getting the DRPC result failed

This closes #2364


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/37403d17
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/37403d17
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/37403d17

Branch: refs/heads/master
Commit: 37403d17ddf4f762fa61aac9feb93143ae854645
Parents: ca02289 cf91236
Author: Robert Evans 
Authored: Mon Oct 16 08:48:56 2017 -0500
Committer: Robert Evans 
Committed: Mon Oct 16 08:48:56 2017 -0500

--
 storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
--




[1/2] storm git commit: In the DRPCSpout class, when the fetch from the DRPC server fails, the log should return to get the DRPC request failed instead of getting the DRPC result failed

2017-10-16 Thread bobby
Repository: storm
Updated Branches:
  refs/heads/master ca022899b -> 37403d17d


In the DRPCSpout class, when the fetch from the DRPC server fails, the log 
should return to get the DRPC request failed instead of getting the DRPC result 
failed


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cf91236b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cf91236b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cf91236b

Branch: refs/heads/master
Commit: cf91236b385065209cf50d86d2d3071a402e5cd8
Parents: 352cd46
Author: httfighter 
Authored: Tue Oct 10 10:51:06 2017 +0800
Committer: httfighter 
Committed: Tue Oct 10 10:51:06 2017 +0800

--
 storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/storm/blob/cf91236b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
--
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java 
b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
index 8605c05..00ae469 100644
--- a/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
+++ b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
@@ -213,12 +213,12 @@ public class DRPCSpout extends BaseRichSpout {
 }
 } catch (AuthorizationException aze) {
 reconnectAsync(client);
-LOG.error("Not authorized to fetch DRPC result from DRPC 
server", aze);
+LOG.error("Not authorized to fetch DRPC request from DRPC 
server", aze);
 } catch (TException e) {
 reconnectAsync(client);
-LOG.error("Failed to fetch DRPC result from DRPC server", 
e);
+LOG.error("Failed to fetch DRPC request from DRPC server", 
e);
 } catch (Exception e) {
-LOG.error("Failed to fetch DRPC result from DRPC server", 
e);
+LOG.error("Failed to fetch DRPC request from DRPC server", 
e);
 }
 }
 checkFutures();