[1/2] storm git commit: [STORM-2607] Offset consumer + 1
Repository: storm Updated Branches: refs/heads/1.1.x-branch cb331d454 -> c9bbd5444 [STORM-2607] Offset consumer + 1 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a1ffcb93 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a1ffcb93 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a1ffcb93 Branch: refs/heads/1.1.x-branch Commit: a1ffcb936336d71bdf3693108e33e92c8d2a88ef Parents: cb331d4 Author: Rodolfo Ribeiro Authored: Thu Jun 29 12:06:20 2017 -0300 Committer: Stig Rohde Døssing Committed: Mon Oct 16 20:12:33 2017 +0200 -- .../kafka/spout/internal/OffsetManager.java | 20 .../kafka/spout/SingleTopicKafkaSpoutTest.java | 2 +- 2 files changed, 13 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/a1ffcb93/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java -- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java index b6d36d8..1c474e3 100755 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -71,16 +71,18 @@ public class OffsetManager { boolean found = false; long currOffset; long nextCommitOffset = committedOffset; +long lastOffMessageOffset = committedOffset; KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap currOffset = currAckedMsg.offset(); -if (currOffset == nextCommitOffset + 1) {// found the next offset to commit +if (currOffset == lastOffMessageOffset + 1) {// found the next offset to commit found = true; nextCommitMsg = currAckedMsg; -nextCommitOffset = currOffset; -} else if (currOffset > nextCommitOffset + 1) { -if (emittedOffsets.contains(nextCommitOffset + 1)) { +lastOffMessageOffset = currOffset; +nextCommitOffset = lastOffMessageOffset + 1; +} else if (currOffset > lastOffMessageOffset + 1) { +if (emittedOffsets.contains(lastOffMessageOffset + 1)) { LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset); break; } else { @@ -92,11 +94,12 @@ public class OffsetManager { first element after committedOffset in the ascending ordered emitted set. */ LOG.debug("Processed non contiguous offset. (committedOffset+1) is no longer part of the topic. Committed: [{}], Processed: [{}]", committedOffset, currOffset); -final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset + 1); +final Long nextEmittedOffset = emittedOffsets.ceiling(lastOffMessageOffset); if (nextEmittedOffset != null && currOffset == nextEmittedOffset) { found = true; nextCommitMsg = currAckedMsg; -nextCommitOffset = currOffset; +lastOffMessageOffset = currOffset; +nextCommitOffset = lastOffMessageOffset + 1; } else { LOG.debug("topic-partition [{}] has non-continuous offset [{}]. Next Offset to commit should be [{}]", tp, currOffset, nextEmittedOffset); break; @@ -112,7 +115,8 @@ public class OffsetManager { OffsetAndMetadata nextCommitOffsetAndMetadata = null; if (found) { nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread())); -LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed", tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset()); +LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed", +tp, earliestUncommittedOffset, nextCommitOffsetAndMetadata.offset() - 1); } else { LOG.debug("topic-partition [{}] has NO offsets ready to be committed", tp); } @@ -131,7 +135,7 @@ public class OffsetManager {
[2/2] storm git commit: STORM-2607: Switch OffsetManager to track earliest uncommitted offset instead of last committed offset for compatibility with commitSync consumer API
STORM-2607: Switch OffsetManager to track earliest uncommitted offset instead of last committed offset for compatibility with commitSync consumer API Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c9bbd544 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c9bbd544 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c9bbd544 Branch: refs/heads/1.1.x-branch Commit: c9bbd59eee98d0427c115ff13a6878706179 Parents: a1ffcb9 Author: Stig Rohde Døssing Authored: Sat Jul 1 17:29:56 2017 +0200 Committer: Stig Rohde Døssing Committed: Tue Oct 17 07:22:27 2017 +0200 -- .../apache/storm/kafka/spout/KafkaSpout.java| 6 +- .../kafka/spout/internal/OffsetManager.java | 86 +++- .../storm/kafka/spout/KafkaSpoutCommitTest.java | 6 +- .../kafka/spout/KafkaSpoutRebalanceTest.java| 14 +- .../kafka/spout/KafkaSpoutRetryLimitTest.java | 9 +- .../kafka/spout/SingleTopicKafkaSpoutTest.java | 2 +- .../kafka/spout/internal/OffsetManagerTest.java | 135 +++ 7 files changed, 180 insertions(+), 78 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/c9bbd544/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java -- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 4d88205..207e9b4 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -181,7 +181,7 @@ public class KafkaSpout extends BaseRichSpout { } /** - * sets the cursor to the location dictated by the first poll strategy and returns the fetch offset + * Sets the cursor to the location dictated by the first poll strategy and returns the fetch offset. */ private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { if (committedOffset != null) { // offset was committed for this TopicPartition @@ -190,8 +190,8 @@ public class KafkaSpout extends BaseRichSpout { } else if (firstPollOffsetStrategy.equals(LATEST)) { kafkaConsumer.seekToEnd(Collections.singleton(tp)); } else { -// By default polling starts at the last committed offset. +1 to point fetch to the first uncommitted offset. -kafkaConsumer.seek(tp, committedOffset.offset() + 1); +// By default polling starts at the last committed offset, i.e. the first offset that was not marked as processed. +kafkaConsumer.seek(tp, committedOffset.offset()); } } else {// no commits have ever been done, so start at the beginning or end depending on the strategy if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) { http://git-wip-us.apache.org/repos/asf/storm/blob/c9bbd544/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java -- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java index 1c474e3..7dfe7f6 100755 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -37,7 +37,7 @@ public class OffsetManager { /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset. * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */ private final long initialFetchOffset; -// Last offset committed to Kafka. Initially it is set to fetchOffset - 1 +// Committed offset, i.e. the offset where processing will resume if the spout restarts. Initially it is set to fetchOffset. private long committedOffset; // Emitted Offsets List private final NavigableSet emittedOffsets = new TreeSet<>(); @@ -47,8 +47,8 @@ public class OffsetManager { public OffsetManager(TopicPartition tp, long initialFetchOffset) { this.tp = tp; this.initialFetchOffset = initialFetchOffset; -this.committedOffset = initialFetchOffset - 1; -LOG.debug("Instantiated {}", this); +this.committedOffset = i
[2/2] storm git commit: STORM-2607: Switch OffsetManager to track earliest uncommitted offset instead of last committed offset for compatibility with commitSync consumer API
STORM-2607: Switch OffsetManager to track earliest uncommitted offset instead of last committed offset for compatibility with commitSync consumer API Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/148ee660 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/148ee660 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/148ee660 Branch: refs/heads/1.x-branch Commit: 148ee6609557d7436ce826da29ff3ec301458586 Parents: 4fc47b2 Author: Stig Rohde Døssing Authored: Sat Jul 1 17:29:56 2017 +0200 Committer: Stig Rohde Døssing Committed: Tue Oct 17 07:20:39 2017 +0200 -- .../apache/storm/kafka/spout/KafkaSpout.java| 6 +- .../kafka/spout/internal/OffsetManager.java | 86 +++- .../storm/kafka/spout/KafkaSpoutCommitTest.java | 6 +- .../kafka/spout/KafkaSpoutRebalanceTest.java| 4 +- .../kafka/spout/KafkaSpoutRetryLimitTest.java | 8 +- .../kafka/spout/SingleTopicKafkaSpoutTest.java | 2 +- .../kafka/spout/internal/OffsetManagerTest.java | 135 +++ 7 files changed, 174 insertions(+), 73 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/148ee660/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java -- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 3582bdb..68bce11 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -202,7 +202,7 @@ public class KafkaSpout extends BaseRichSpout { } /** - * sets the cursor to the location dictated by the first poll strategy and returns the fetch offset + * Sets the cursor to the location dictated by the first poll strategy and returns the fetch offset. */ private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { if (committedOffset != null) { // offset was committed for this TopicPartition @@ -211,8 +211,8 @@ public class KafkaSpout extends BaseRichSpout { } else if (firstPollOffsetStrategy.equals(LATEST)) { kafkaConsumer.seekToEnd(Collections.singleton(tp)); } else { -// By default polling starts at the last committed offset. +1 to point fetch to the first uncommitted offset. -kafkaConsumer.seek(tp, committedOffset.offset() + 1); +// By default polling starts at the last committed offset, i.e. the first offset that was not marked as processed. +kafkaConsumer.seek(tp, committedOffset.offset()); } } else {// no commits have ever been done, so start at the beginning or end depending on the strategy if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) { http://git-wip-us.apache.org/repos/asf/storm/blob/148ee660/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java -- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java index 1c474e3..7dfe7f6 100755 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -37,7 +37,7 @@ public class OffsetManager { /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset. * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */ private final long initialFetchOffset; -// Last offset committed to Kafka. Initially it is set to fetchOffset - 1 +// Committed offset, i.e. the offset where processing will resume if the spout restarts. Initially it is set to fetchOffset. private long committedOffset; // Emitted Offsets List private final NavigableSet emittedOffsets = new TreeSet<>(); @@ -47,8 +47,8 @@ public class OffsetManager { public OffsetManager(TopicPartition tp, long initialFetchOffset) { this.tp = tp; this.initialFetchOffset = initialFetchOffset; -this.committedOffset = initialFetchOffset - 1; -LOG.debug("Instantiated {}", this); +this.committedOffset = ini
[1/2] storm git commit: [STORM-2607] Offset consumer + 1
Repository: storm Updated Branches: refs/heads/1.x-branch 39e12aa22 -> 148ee6609 [STORM-2607] Offset consumer + 1 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4fc47b2a Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4fc47b2a Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4fc47b2a Branch: refs/heads/1.x-branch Commit: 4fc47b2a841bf05ec89c4ccbf089d95c28c965a1 Parents: 39e12aa Author: Rodolfo Ribeiro Authored: Thu Jun 29 12:06:20 2017 -0300 Committer: Stig Rohde Døssing Committed: Tue Oct 17 07:20:38 2017 +0200 -- .../kafka/spout/internal/OffsetManager.java | 20 .../kafka/spout/SingleTopicKafkaSpoutTest.java | 2 +- 2 files changed, 13 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/4fc47b2a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java -- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java index b6d36d8..1c474e3 100755 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -71,16 +71,18 @@ public class OffsetManager { boolean found = false; long currOffset; long nextCommitOffset = committedOffset; +long lastOffMessageOffset = committedOffset; KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap currOffset = currAckedMsg.offset(); -if (currOffset == nextCommitOffset + 1) {// found the next offset to commit +if (currOffset == lastOffMessageOffset + 1) {// found the next offset to commit found = true; nextCommitMsg = currAckedMsg; -nextCommitOffset = currOffset; -} else if (currOffset > nextCommitOffset + 1) { -if (emittedOffsets.contains(nextCommitOffset + 1)) { +lastOffMessageOffset = currOffset; +nextCommitOffset = lastOffMessageOffset + 1; +} else if (currOffset > lastOffMessageOffset + 1) { +if (emittedOffsets.contains(lastOffMessageOffset + 1)) { LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset); break; } else { @@ -92,11 +94,12 @@ public class OffsetManager { first element after committedOffset in the ascending ordered emitted set. */ LOG.debug("Processed non contiguous offset. (committedOffset+1) is no longer part of the topic. Committed: [{}], Processed: [{}]", committedOffset, currOffset); -final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset + 1); +final Long nextEmittedOffset = emittedOffsets.ceiling(lastOffMessageOffset); if (nextEmittedOffset != null && currOffset == nextEmittedOffset) { found = true; nextCommitMsg = currAckedMsg; -nextCommitOffset = currOffset; +lastOffMessageOffset = currOffset; +nextCommitOffset = lastOffMessageOffset + 1; } else { LOG.debug("topic-partition [{}] has non-continuous offset [{}]. Next Offset to commit should be [{}]", tp, currOffset, nextEmittedOffset); break; @@ -112,7 +115,8 @@ public class OffsetManager { OffsetAndMetadata nextCommitOffsetAndMetadata = null; if (found) { nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread())); -LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed", tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset()); +LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed", +tp, earliestUncommittedOffset, nextCommitOffsetAndMetadata.offset() - 1); } else { LOG.debug("topic-partition [{}] has NO offsets ready to be committed", tp); } @@ -131,7 +135,7 @@ public class OffsetManager {
[2/2] storm git commit: Merge branch 'STORM-2686-redesign' of https://github.com/Ethanlm/storm into STORM-2686
Merge branch 'STORM-2686-redesign' of https://github.com/Ethanlm/storm into STORM-2686 STORM-2686: Add locality awareness to LoadAwareShuffleGrouping This closes #2366 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c8947c2f Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c8947c2f Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c8947c2f Branch: refs/heads/master Commit: c8947c2fede62036c20472c9e0335ef90a06b536 Parents: 37403d1 01bd4f8 Author: Robert Evans Authored: Mon Oct 16 10:24:39 2017 -0500 Committer: Robert Evans Committed: Mon Oct 16 10:24:39 2017 -0500 -- conf/defaults.yaml | 2 + .../src/jvm/org/apache/storm/Config.java| 18 ++ .../apache/storm/daemon/worker/WorkerState.java | 2 +- .../grouping/LoadAwareShuffleGrouping.java | 178 ++- .../storm/task/GeneralTopologyContext.java | 4 + .../storm/task/WorkerTopologyContext.java | 40 - .../grouping/LoadAwareShuffleGroupingTest.java | 37 +++- 7 files changed, 267 insertions(+), 14 deletions(-) --
[1/2] storm git commit: [STORM-2686] Add locality awareness to LoadAwareShuffleGrouping
Repository: storm Updated Branches: refs/heads/master 37403d17d -> c8947c2fe [STORM-2686] Add locality awareness to LoadAwareShuffleGrouping Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/01bd4f82 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/01bd4f82 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/01bd4f82 Branch: refs/heads/master Commit: 01bd4f821c940e979c360a4667c27c0477fde9a7 Parents: 352cd46 Author: Ethan Li Authored: Mon Oct 9 18:41:39 2017 -0500 Committer: Ethan Li Committed: Tue Oct 10 17:06:31 2017 -0500 -- conf/defaults.yaml | 2 + .../src/jvm/org/apache/storm/Config.java| 18 ++ .../apache/storm/daemon/worker/WorkerState.java | 2 +- .../grouping/LoadAwareShuffleGrouping.java | 178 ++- .../storm/task/GeneralTopologyContext.java | 4 + .../storm/task/WorkerTopologyContext.java | 40 - .../grouping/LoadAwareShuffleGroupingTest.java | 37 +++- 7 files changed, 267 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/01bd4f82/conf/defaults.yaml -- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 103b04f..ad34054 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -261,6 +261,8 @@ topology.disruptor.batch.size: 100 topology.disruptor.batch.timeout.millis: 1 topology.disable.loadaware.messaging: false topology.state.checkpoint.interval.ms: 1000 +topology.localityaware.higher.bound.percent: 0.8 +topology.localityaware.lower.bound.percent: 0.2 # Configs for Resource Aware Scheduler # topology priority describing the importance of the topology in decreasing importance starting from 0 (i.e. 0 is the highest priority and the priority importance decreases as the priority number increases). http://git-wip-us.apache.org/repos/asf/storm/blob/01bd4f82/storm-client/src/jvm/org/apache/storm/Config.java -- diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java index e296e8f..6be0c21 100644 --- a/storm-client/src/jvm/org/apache/storm/Config.java +++ b/storm-client/src/jvm/org/apache/storm/Config.java @@ -65,6 +65,24 @@ public class Config extends HashMap { public static final String TOPOLOGY_DISABLE_LOADAWARE_MESSAGING = "topology.disable.loadaware.messaging"; /** + * This signifies the load congestion among target tasks in scope. Currently it's only used in LoadAwareShuffleGrouping. + * When the average load is higher than the higher bound, the executor should choose target tasks in a higher scope, + * The scopes and their orders are: EVERYTHING > RACK_LOCAL > HOST_LOCAL > WORKER_LOCAL + */ +@isPositiveNumber +@NotNull +public static final String TOPOLOGY_LOCALITYAWARE_HIGHER_BOUND_PERCENT = "topology.localityaware.higher.bound.percent"; + +/** + * This signifies the load congestion among target tasks in scope. Currently it's only used in LoadAwareShuffleGrouping. + * When the average load is lower than the lower bound, the executor should choose target tasks in a lower scope. + * The scopes and their orders are: EVERYTHING > RACK_LOCAL > HOST_LOCAL > WORKER_LOCAL + */ +@isPositiveNumber +@NotNull +public static final String TOPOLOGY_LOCALITYAWARE_LOWER_BOUND_PERCENT = "topology.localityaware.lower.bound.percent"; + +/** * Try to serialize all tuples, even for local transfers. This should only be used * for testing, as a sanity check that all of your tuples are setup properly. */ http://git-wip-us.apache.org/repos/asf/storm/blob/01bd4f82/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java -- diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java index 825de4b..ec2ff59 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java @@ -585,7 +585,7 @@ public class WorkerState { return new WorkerTopologyContext(systemTopology, topologyConf, taskToComponent, componentToSortedTasks, componentToStreamToFields, topologyId, codeDir, pidDir, port, taskIds, defaultSharedResources, -userSharedResources); +userSharedResources, cachedTaskToNodePort, assignmentId); } catch (IOException e) { throw Utils.wrapInRuntime(e); } http://git-wip-us.apache.org/repos/asf/storm/blo
[2/2] storm git commit: Merge branch 'branch_4' of https://github.com/httfighter/storm into STORM-2772
Merge branch 'branch_4' of https://github.com/httfighter/storm into STORM-2772 STORM-2772: In the DRPCSpout class, when the fetch from the DRPC server fails,the log should return to get the DRPC request failed instead of getting the DRPC result failed This closes #2364 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/37403d17 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/37403d17 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/37403d17 Branch: refs/heads/master Commit: 37403d17ddf4f762fa61aac9feb93143ae854645 Parents: ca02289 cf91236 Author: Robert Evans Authored: Mon Oct 16 08:48:56 2017 -0500 Committer: Robert Evans Committed: Mon Oct 16 08:48:56 2017 -0500 -- storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) --
[1/2] storm git commit: In the DRPCSpout class, when the fetch from the DRPC server fails, the log should return to get the DRPC request failed instead of getting the DRPC result failed
Repository: storm Updated Branches: refs/heads/master ca022899b -> 37403d17d In the DRPCSpout class, when the fetch from the DRPC server fails, the log should return to get the DRPC request failed instead of getting the DRPC result failed Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cf91236b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cf91236b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cf91236b Branch: refs/heads/master Commit: cf91236b385065209cf50d86d2d3071a402e5cd8 Parents: 352cd46 Author: httfighter Authored: Tue Oct 10 10:51:06 2017 +0800 Committer: httfighter Committed: Tue Oct 10 10:51:06 2017 +0800 -- storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/cf91236b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java -- diff --git a/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java index 8605c05..00ae469 100644 --- a/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java +++ b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java @@ -213,12 +213,12 @@ public class DRPCSpout extends BaseRichSpout { } } catch (AuthorizationException aze) { reconnectAsync(client); -LOG.error("Not authorized to fetch DRPC result from DRPC server", aze); +LOG.error("Not authorized to fetch DRPC request from DRPC server", aze); } catch (TException e) { reconnectAsync(client); -LOG.error("Failed to fetch DRPC result from DRPC server", e); +LOG.error("Failed to fetch DRPC request from DRPC server", e); } catch (Exception e) { -LOG.error("Failed to fetch DRPC result from DRPC server", e); +LOG.error("Failed to fetch DRPC request from DRPC server", e); } } checkFutures();