Repository: storm Updated Branches: refs/heads/master 19b8b7d13 -> 79a2a2a58
Revert "Merge branch 'revert-storm-756'" This reverts commit ce2d49b924b6690d7704eccbb91ebbbea0a601bb, reversing changes made to 18f68f7a967f7173b9c7c7444bedd53b13ad65fb. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/87f8fa1c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/87f8fa1c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/87f8fa1c Branch: refs/heads/master Commit: 87f8fa1c559b8764715fd6f82f8524565d8398aa Parents: ce2d49b Author: Jungtaek Lim <[email protected]> Authored: Fri Dec 4 08:03:46 2015 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Fri Dec 4 08:03:46 2015 +0900 ---------------------------------------------------------------------- CHANGELOG.md | 1 + conf/defaults.yaml | 1 + storm-core/src/jvm/backtype/storm/Config.java | 1 + .../src/jvm/backtype/storm/task/ShellBolt.java | 13 +- .../storm/utils/ShellBoltMessageQueue.java | 121 +++++++++++++++++++ .../test/clj/backtype/storm/multilang_test.clj | 2 +- .../storm/utils/ShellBoltMessageQueueTest.java | 84 +++++++++++++ 7 files changed, 215 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/87f8fa1c/CHANGELOG.md ---------------------------------------------------------------------- diff --git a/CHANGELOG.md b/CHANGELOG.md index 8420f1a..6684d7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## 0.11.0 * STORM-1361: Apache License missing from two Cassandra files + * STORM-756: Handle taskids response as soon as possible * STORM-1218: Use markdown for JavaDoc. * STORM-1075: Storm Cassandra connector. * STORM-965: excessive logging in storm when non-kerberos client tries to connect http://git-wip-us.apache.org/repos/asf/storm/blob/87f8fa1c/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 295ac7c..c1124bd 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -206,6 +206,7 @@ topology.tasks: null # maximum amount of time a message has to complete before it's considered failed topology.message.timeout.secs: 30 topology.multilang.serializer: "backtype.storm.multilang.JsonSerializer" +topology.shellbolt.max.pending: 100 topology.skip.missing.kryo.registrations: false topology.max.task.parallelism: null topology.max.spout.pending: null http://git-wip-us.apache.org/repos/asf/storm/blob/87f8fa1c/storm-core/src/jvm/backtype/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java index c30ffff..01317ee 100644 --- a/storm-core/src/jvm/backtype/storm/Config.java +++ b/storm-core/src/jvm/backtype/storm/Config.java @@ -1707,6 +1707,7 @@ public class Config extends HashMap<String, Object> { /** * Max pending tuples in one ShellBolt */ + @NotNull @isInteger @isPositiveNumber public static final String TOPOLOGY_SHELLBOLT_MAX_PENDING="topology.shellbolt.max.pending"; http://git-wip-us.apache.org/repos/asf/storm/blob/87f8fa1c/storm-core/src/jvm/backtype/storm/task/ShellBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java index cf6a330..84a2b8a 100644 --- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java +++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java @@ -26,6 +26,7 @@ import backtype.storm.multilang.BoltMsg; import backtype.storm.multilang.ShellMsg; import backtype.storm.topology.ReportedFailedException; import backtype.storm.tuple.Tuple; +import backtype.storm.utils.ShellBoltMessageQueue; import backtype.storm.utils.ShellProcess; import clojure.lang.RT; import com.google.common.util.concurrent.MoreExecutors; @@ -77,7 +78,7 @@ public class ShellBolt implements IBolt { private ShellProcess _process; private volatile boolean _running = true; private volatile Throwable _exception; - private LinkedBlockingQueue _pendingWrites = new LinkedBlockingQueue(); + private ShellBoltMessageQueue _pendingWrites = new ShellBoltMessageQueue(); private Random _rand; private Thread _readerThread; @@ -107,8 +108,9 @@ public class ShellBolt implements IBolt { final OutputCollector collector) { Object maxPending = stormConf.get(Config.TOPOLOGY_SHELLBOLT_MAX_PENDING); if (maxPending != null) { - this._pendingWrites = new LinkedBlockingQueue(((Number)maxPending).intValue()); + this._pendingWrites = new ShellBoltMessageQueue(((Number)maxPending).intValue()); } + _rand = new Random(); _collector = collector; @@ -154,7 +156,7 @@ public class ShellBolt implements IBolt { try { BoltMsg boltMsg = createBoltMessage(input, genId); - _pendingWrites.put(boltMsg); + _pendingWrites.putBoltMsg(boltMsg); } catch(InterruptedException e) { String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString(); throw new RuntimeException("Error during multilang processing " + processInfo, e); @@ -216,7 +218,7 @@ public class ShellBolt implements IBolt { if(shellMsg.getTask() == 0) { List<Integer> outtasks = _collector.emit(shellMsg.getStream(), anchors, shellMsg.getTuple()); if (shellMsg.areTaskIdsNeeded()) { - _pendingWrites.put(outtasks); + _pendingWrites.putTaskIds(outtasks); } } else { _collector.emitDirect((int) shellMsg.getTask(), @@ -322,8 +324,6 @@ public class ShellBolt implements IBolt { sendHeartbeatFlag.compareAndSet(false, true); } - - } private class BoltReaderRunnable implements Runnable { @@ -388,7 +388,6 @@ public class ShellBolt implements IBolt { } else if (write != null) { throw new RuntimeException("Unknown class type to write: " + write.getClass().getName()); } - } catch (InterruptedException e) { } catch (Throwable t) { die(t); } http://git-wip-us.apache.org/repos/asf/storm/blob/87f8fa1c/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java b/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java new file mode 100644 index 0000000..b633bc5 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package backtype.storm.utils; + +import backtype.storm.multilang.BoltMsg; + +import java.io.Serializable; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * A data structure for ShellBolt which includes two queues (FIFO), + * which one is for task ids (unbounded), another one is for bolt msg (bounded). + */ +public class ShellBoltMessageQueue implements Serializable { + private final LinkedList<List<Integer>> taskIdsQueue = new LinkedList<>(); + private final LinkedBlockingQueue<BoltMsg> boltMsgQueue; + + private final ReentrantLock takeLock = new ReentrantLock(); + private final Condition notEmpty = takeLock.newCondition(); + + public ShellBoltMessageQueue(int boltMsgCapacity) { + if (boltMsgCapacity <= 0) { + throw new IllegalArgumentException(); + } + this.boltMsgQueue = new LinkedBlockingQueue<>(boltMsgCapacity); + } + + public ShellBoltMessageQueue() { + this(Integer.MAX_VALUE); + } + + /** + * put list of task id to its queue + * @param taskIds task ids that received the tuples + */ + public void putTaskIds(List<Integer> taskIds) { + taskIdsQueue.add(taskIds); + takeLock.lock(); + try { + notEmpty.signal(); + } finally { + takeLock.unlock(); + } + } + + /** + * put bolt message to its queue + * @param boltMsg BoltMsg to pass to subprocess + * @throws InterruptedException + */ + public void putBoltMsg(BoltMsg boltMsg) throws InterruptedException { + boltMsgQueue.put(boltMsg); + takeLock.lock(); + try { + notEmpty.signal(); + } finally { + takeLock.unlock(); + } + } + + /** + * poll() is a core feature of ShellBoltMessageQueue. + * It retrieves and removes the head of one queues, waiting up to the + * specified wait time if necessary for an element to become available. + * There's priority that what queue it retrieves first, taskIds is higher than boltMsgQueue. + * + * @param timeout how long to wait before giving up, in units of unit + * @param unit a TimeUnit determining how to interpret the timeout parameter + * @return List\<Integer\> if task id is available, + * BoltMsg if task id is not available but bolt message is available, + * null if the specified waiting time elapses before an element is available. + * @throws InterruptedException + */ + public Object poll(long timeout, TimeUnit unit) throws InterruptedException { + takeLock.lockInterruptibly(); + long nanos = unit.toNanos(timeout); + try { + // wait for available queue + while (taskIdsQueue.peek() == null && boltMsgQueue.peek() == null) { + if (nanos <= 0) { + return null; + } + nanos = notEmpty.awaitNanos(nanos); + } + + // taskIds first + List<Integer> taskIds = taskIdsQueue.peek(); + if (taskIds != null) { + taskIds = taskIdsQueue.poll(); + return taskIds; + } + + // boltMsgQueue should have at least one entry at the moment + return boltMsgQueue.poll(); + } finally { + takeLock.unlock(); + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/87f8fa1c/storm-core/test/clj/backtype/storm/multilang_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/backtype/storm/multilang_test.clj b/storm-core/test/clj/backtype/storm/multilang_test.clj index ff8f2f1..b42a56f 100644 --- a/storm-core/test/clj/backtype/storm/multilang_test.clj +++ b/storm-core/test/clj/backtype/storm/multilang_test.clj @@ -47,7 +47,7 @@ "test" {TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true} topology) - (Thread/sleep 11000) + (Thread/sleep 31000) (.killTopology nimbus "test") (Thread/sleep 11000) ))) http://git-wip-us.apache.org/repos/asf/storm/blob/87f8fa1c/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java b/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java new file mode 100644 index 0000000..229efa1 --- /dev/null +++ b/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.utils; + +import backtype.storm.multilang.BoltMsg; +import com.google.common.collect.Lists; +import junit.framework.TestCase; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class ShellBoltMessageQueueTest extends TestCase { + @Test + public void testPollTaskIdsFirst() throws InterruptedException { + ShellBoltMessageQueue queue = new ShellBoltMessageQueue(); + + // put bolt message first, then put task ids + queue.putBoltMsg(new BoltMsg()); + ArrayList<Integer> taskIds = Lists.newArrayList(1, 2, 3); + queue.putTaskIds(taskIds); + + Object msg = queue.poll(10, TimeUnit.SECONDS); + + // task ids should be pulled first + assertTrue(msg instanceof List<?>); + assertEquals(msg, taskIds); + } + + @Test + public void testPollWhileThereAreNoDataAvailable() throws InterruptedException { + ShellBoltMessageQueue queue = new ShellBoltMessageQueue(); + + long start = System.currentTimeMillis(); + Object msg = queue.poll(1, TimeUnit.SECONDS); + long finish = System.currentTimeMillis(); + + assertNull(msg); + assertTrue(finish - start > 1000); + } + + @Test + public void testPollShouldReturnASAPWhenDataAvailable() throws InterruptedException { + final ShellBoltMessageQueue queue = new ShellBoltMessageQueue(); + final List<Integer> taskIds = Lists.newArrayList(1, 2, 3); + + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // NOOP + } + + queue.putTaskIds(taskIds); + } + }); + t.start(); + + long start = System.currentTimeMillis(); + Object msg = queue.poll(10, TimeUnit.SECONDS); + long finish = System.currentTimeMillis(); + + assertEquals(msg, taskIds); + assertTrue(finish - start < (10 * 1000)); + } +}
