Repository: storm
Updated Branches:
  refs/heads/master a55b05032 -> 041fbe349


STORM-756 Handle taskids response ASAP

* create new queue which stores only taskids responses
** BoltReaderRunnable thread is no longer blocked by _pendingWrites.put()
* let BoltWriterRunnable sends messages with respecting priorities
** heartbeat > taskids > tuple
* set sleep time from multilang_test long enough
** that topology is activate and it processes some tuples for a while


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a16eca7e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a16eca7e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a16eca7e

Branch: refs/heads/master
Commit: a16eca7e0b35c35be064a742f834add1e4f20879
Parents: a8d253a
Author: Jungtaek Lim <[email protected]>
Authored: Sun Nov 22 18:25:51 2015 +0900
Committer: Jungtaek Lim <[email protected]>
Committed: Sun Nov 22 23:09:57 2015 +0900

----------------------------------------------------------------------
 .../src/jvm/backtype/storm/task/ShellBolt.java  | 32 +++++++++++---------
 .../test/clj/backtype/storm/multilang_test.clj  |  2 +-
 2 files changed, 19 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a16eca7e/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java 
b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index dda99ca..8baf2c6 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -37,8 +37,6 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
-
 /**
  * A bolt that shells out to another process to process tuples. ShellBolt
  * communicates with that process over stdio using a special protocol. An ~100
@@ -77,7 +75,8 @@ public class ShellBolt implements IBolt {
     private ShellProcess _process;
     private volatile boolean _running = true;
     private volatile Throwable _exception;
-    private LinkedBlockingQueue _pendingWrites = new LinkedBlockingQueue();
+    private LinkedBlockingQueue<BoltMsg> _pendingWrites = new 
LinkedBlockingQueue<>();
+    private LinkedBlockingQueue<List<Integer>> _pendingTaskIds = new 
LinkedBlockingQueue<>();
     private Random _rand;
 
     private Thread _readerThread;
@@ -107,7 +106,7 @@ public class ShellBolt implements IBolt {
                         final OutputCollector collector) {
         Object maxPending = 
stormConf.get(Config.TOPOLOGY_SHELLBOLT_MAX_PENDING);
         if (maxPending != null) {
-           this._pendingWrites = new 
LinkedBlockingQueue(((Number)maxPending).intValue());
+           this._pendingWrites = new 
LinkedBlockingQueue<>(((Number)maxPending).intValue());
         }
         _rand = new Random();
         _collector = collector;
@@ -212,7 +211,7 @@ public class ShellBolt implements IBolt {
         if(shellMsg.getTask() == 0) {
             List<Integer> outtasks = _collector.emit(shellMsg.getStream(), 
anchors, shellMsg.getTuple());
             if (shellMsg.areTaskIdsNeeded()) {
-                _pendingWrites.put(outtasks);
+                _pendingTaskIds.put(outtasks);
             }
         } else {
             _collector.emitDirect((int) shellMsg.getTask(),
@@ -318,8 +317,6 @@ public class ShellBolt implements IBolt {
 
             sendHeartbeatFlag.compareAndSet(false, true);
         }
-
-
     }
 
     private class BoltReaderRunnable implements Runnable {
@@ -376,15 +373,22 @@ public class ShellBolt implements IBolt {
                         sendHeartbeatFlag.compareAndSet(true, false);
                     }
 
-                    Object write = _pendingWrites.poll(1, SECONDS);
-                    if (write instanceof BoltMsg) {
-                        _process.writeBoltMsg((BoltMsg) write);
-                    } else if (write instanceof List<?>) {
-                        _process.writeTaskIds((List<Integer>)write);
-                    } else if (write != null) {
-                        throw new RuntimeException("Unknown class type to 
write: " + write.getClass().getName());
+                    List<Integer> taskIds = _pendingTaskIds.peek();
+                    if (taskIds != null) {
+                        taskIds = _pendingTaskIds.poll();
+                        _process.writeTaskIds(taskIds);
+                        continue;
+                    }
+
+                    BoltMsg write = _pendingWrites.peek();
+                    if (write != null) {
+                        write = _pendingWrites.poll();
+                        _process.writeBoltMsg(write);
                     }
+                    /*
                 } catch (InterruptedException e) {
+                    // NOOP
+                    */
                 } catch (Throwable t) {
                     die(t);
                 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a16eca7e/storm-core/test/clj/backtype/storm/multilang_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/multilang_test.clj 
b/storm-core/test/clj/backtype/storm/multilang_test.clj
index ff8f2f1..b42a56f 100644
--- a/storm-core/test/clj/backtype/storm/multilang_test.clj
+++ b/storm-core/test/clj/backtype/storm/multilang_test.clj
@@ -47,7 +47,7 @@
                "test"
                {TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 
TOPOLOGY-DEBUG true}
                topology)
-       (Thread/sleep 11000)
+       (Thread/sleep 31000)
        (.killTopology nimbus "test")
        (Thread/sleep 11000)
        )))

Reply via email to