Repository: storm Updated Branches: refs/heads/master a55b05032 -> 041fbe349
STORM-756 Handle taskids response ASAP * create new queue which stores only taskids responses ** BoltReaderRunnable thread is no longer blocked by _pendingWrites.put() * let BoltWriterRunnable sends messages with respecting priorities ** heartbeat > taskids > tuple * set sleep time from multilang_test long enough ** that topology is activate and it processes some tuples for a while Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a16eca7e Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a16eca7e Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a16eca7e Branch: refs/heads/master Commit: a16eca7e0b35c35be064a742f834add1e4f20879 Parents: a8d253a Author: Jungtaek Lim <[email protected]> Authored: Sun Nov 22 18:25:51 2015 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Sun Nov 22 23:09:57 2015 +0900 ---------------------------------------------------------------------- .../src/jvm/backtype/storm/task/ShellBolt.java | 32 +++++++++++--------- .../test/clj/backtype/storm/multilang_test.clj | 2 +- 2 files changed, 19 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/a16eca7e/storm-core/src/jvm/backtype/storm/task/ShellBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java index dda99ca..8baf2c6 100644 --- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java +++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java @@ -37,8 +37,6 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import static java.util.concurrent.TimeUnit.SECONDS; - /** * A bolt that shells out to another process to process tuples. ShellBolt * communicates with that process over stdio using a special protocol. An ~100 @@ -77,7 +75,8 @@ public class ShellBolt implements IBolt { private ShellProcess _process; private volatile boolean _running = true; private volatile Throwable _exception; - private LinkedBlockingQueue _pendingWrites = new LinkedBlockingQueue(); + private LinkedBlockingQueue<BoltMsg> _pendingWrites = new LinkedBlockingQueue<>(); + private LinkedBlockingQueue<List<Integer>> _pendingTaskIds = new LinkedBlockingQueue<>(); private Random _rand; private Thread _readerThread; @@ -107,7 +106,7 @@ public class ShellBolt implements IBolt { final OutputCollector collector) { Object maxPending = stormConf.get(Config.TOPOLOGY_SHELLBOLT_MAX_PENDING); if (maxPending != null) { - this._pendingWrites = new LinkedBlockingQueue(((Number)maxPending).intValue()); + this._pendingWrites = new LinkedBlockingQueue<>(((Number)maxPending).intValue()); } _rand = new Random(); _collector = collector; @@ -212,7 +211,7 @@ public class ShellBolt implements IBolt { if(shellMsg.getTask() == 0) { List<Integer> outtasks = _collector.emit(shellMsg.getStream(), anchors, shellMsg.getTuple()); if (shellMsg.areTaskIdsNeeded()) { - _pendingWrites.put(outtasks); + _pendingTaskIds.put(outtasks); } } else { _collector.emitDirect((int) shellMsg.getTask(), @@ -318,8 +317,6 @@ public class ShellBolt implements IBolt { sendHeartbeatFlag.compareAndSet(false, true); } - - } private class BoltReaderRunnable implements Runnable { @@ -376,15 +373,22 @@ public class ShellBolt implements IBolt { sendHeartbeatFlag.compareAndSet(true, false); } - Object write = _pendingWrites.poll(1, SECONDS); - if (write instanceof BoltMsg) { - _process.writeBoltMsg((BoltMsg) write); - } else if (write instanceof List<?>) { - _process.writeTaskIds((List<Integer>)write); - } else if (write != null) { - throw new RuntimeException("Unknown class type to write: " + write.getClass().getName()); + List<Integer> taskIds = _pendingTaskIds.peek(); + if (taskIds != null) { + taskIds = _pendingTaskIds.poll(); + _process.writeTaskIds(taskIds); + continue; + } + + BoltMsg write = _pendingWrites.peek(); + if (write != null) { + write = _pendingWrites.poll(); + _process.writeBoltMsg(write); } + /* } catch (InterruptedException e) { + // NOOP + */ } catch (Throwable t) { die(t); } http://git-wip-us.apache.org/repos/asf/storm/blob/a16eca7e/storm-core/test/clj/backtype/storm/multilang_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/backtype/storm/multilang_test.clj b/storm-core/test/clj/backtype/storm/multilang_test.clj index ff8f2f1..b42a56f 100644 --- a/storm-core/test/clj/backtype/storm/multilang_test.clj +++ b/storm-core/test/clj/backtype/storm/multilang_test.clj @@ -47,7 +47,7 @@ "test" {TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true} topology) - (Thread/sleep 11000) + (Thread/sleep 31000) (.killTopology nimbus "test") (Thread/sleep 11000) )))
