Add a missing space, fix potential NPE, add comment to javadoc about reset timeout being expensive
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bc79b4a8 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bc79b4a8 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bc79b4a8 Branch: refs/heads/master Commit: bc79b4a8d757a3191a85815877345d38710c73e2 Parents: d36be51 Author: Stig Døssing <[email protected]> Authored: Wed Mar 2 17:58:01 2016 +0100 Committer: Stig Døssing <[email protected]> Committed: Wed Mar 2 17:59:14 2016 +0100 ---------------------------------------------------------------------- storm-core/src/jvm/org/apache/storm/daemon/Acker.java | 5 ++++- storm-core/src/jvm/org/apache/storm/task/OutputCollector.java | 1 + .../jvm/org/apache/storm/topology/BasicOutputCollector.java | 6 ++++++ 3 files changed, 11 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/bc79b4a8/storm-core/src/jvm/org/apache/storm/daemon/Acker.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java index eb14af7..d7b9a2e 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java @@ -101,7 +101,10 @@ public class Acker implements IBolt { } curr.failed = true; pending.put(id, curr); - } else if(ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) { + } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) { + if (curr == null) { + curr = new AckObject(); + } pending.put(id, curr); } else { LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask()); http://git-wip-us.apache.org/repos/asf/storm/blob/bc79b4a8/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java b/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java index 071d8aa..4db87f0 100644 --- a/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java +++ b/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java @@ -221,6 +221,7 @@ public class OutputCollector implements IOutputCollector { /** * Resets the message timeout for any tuple trees to which the given tuple belongs. * The timeout is reset to Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS. + * Note that this is an expensive operation, and should be used sparingly. * @param input the tuple to reset timeout for */ @Override http://git-wip-us.apache.org/repos/asf/storm/blob/bc79b4a8/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java index 343c349..1d1e5ff 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java +++ b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java @@ -52,6 +52,12 @@ public class BasicOutputCollector implements IBasicOutputCollector { emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple); } + /** + * Resets the message timeout for any tuple trees to which the given tuple belongs. + * The timeout is reset to Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS. + * Note that this is an expensive operation, and should be used sparingly. + * @param input the tuple to reset timeout for + */ public void resetTimeout(Tuple tuple){ out.resetTimeout(tuple); }
