Repository: storm Updated Branches: refs/heads/master 18f68f7a9 -> ce2d49b92
Revert "Merge branch 'STORM-756-v2' of https://github.com/HeartSaVioR/storm into STORM-756" This reverts commit cd00dde37d09ca9ff94bb7f3cad08f66cfe74b81, reversing changes made to a55b0503297f7cd7a4ba3f67ef1a6b2e2b33e4c2. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d2090d79 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d2090d79 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d2090d79 Branch: refs/heads/master Commit: d2090d799f8b5bb06a17094545d39c43f12ba6c7 Parents: 18f68f7 Author: Derek Dagit <[email protected]> Authored: Thu Dec 3 13:07:33 2015 -0600 Committer: Derek Dagit <[email protected]> Committed: Thu Dec 3 13:07:33 2015 -0600 ---------------------------------------------------------------------- conf/defaults.yaml | 1 - storm-core/src/jvm/backtype/storm/Config.java | 1 - .../src/jvm/backtype/storm/task/ShellBolt.java | 13 +- .../storm/utils/ShellBoltMessageQueue.java | 121 ------------------- .../test/clj/backtype/storm/multilang_test.clj | 2 +- .../storm/utils/ShellBoltMessageQueueTest.java | 84 ------------- 6 files changed, 8 insertions(+), 214 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/d2090d79/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index c1124bd..295ac7c 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -206,7 +206,6 @@ topology.tasks: null # maximum amount of time a message has to complete before it's considered failed topology.message.timeout.secs: 30 topology.multilang.serializer: "backtype.storm.multilang.JsonSerializer" -topology.shellbolt.max.pending: 100 topology.skip.missing.kryo.registrations: false topology.max.task.parallelism: null topology.max.spout.pending: null http://git-wip-us.apache.org/repos/asf/storm/blob/d2090d79/storm-core/src/jvm/backtype/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java index 01317ee..c30ffff 100644 --- a/storm-core/src/jvm/backtype/storm/Config.java +++ b/storm-core/src/jvm/backtype/storm/Config.java @@ -1707,7 +1707,6 @@ public class Config extends HashMap<String, Object> { /** * Max pending tuples in one ShellBolt */ - @NotNull @isInteger @isPositiveNumber public static final String TOPOLOGY_SHELLBOLT_MAX_PENDING="topology.shellbolt.max.pending"; http://git-wip-us.apache.org/repos/asf/storm/blob/d2090d79/storm-core/src/jvm/backtype/storm/task/ShellBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java index 84a2b8a..cf6a330 100644 --- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java +++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java @@ -26,7 +26,6 @@ import backtype.storm.multilang.BoltMsg; import backtype.storm.multilang.ShellMsg; import backtype.storm.topology.ReportedFailedException; import backtype.storm.tuple.Tuple; -import backtype.storm.utils.ShellBoltMessageQueue; import backtype.storm.utils.ShellProcess; import clojure.lang.RT; import com.google.common.util.concurrent.MoreExecutors; @@ -78,7 +77,7 @@ public class ShellBolt implements IBolt { private ShellProcess _process; private volatile boolean _running = true; private volatile Throwable _exception; - private ShellBoltMessageQueue _pendingWrites = new ShellBoltMessageQueue(); + private LinkedBlockingQueue _pendingWrites = new LinkedBlockingQueue(); private Random _rand; private Thread _readerThread; @@ -108,9 +107,8 @@ public class ShellBolt implements IBolt { final OutputCollector collector) { Object maxPending = stormConf.get(Config.TOPOLOGY_SHELLBOLT_MAX_PENDING); if (maxPending != null) { - this._pendingWrites = new ShellBoltMessageQueue(((Number)maxPending).intValue()); + this._pendingWrites = new LinkedBlockingQueue(((Number)maxPending).intValue()); } - _rand = new Random(); _collector = collector; @@ -156,7 +154,7 @@ public class ShellBolt implements IBolt { try { BoltMsg boltMsg = createBoltMessage(input, genId); - _pendingWrites.putBoltMsg(boltMsg); + _pendingWrites.put(boltMsg); } catch(InterruptedException e) { String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString(); throw new RuntimeException("Error during multilang processing " + processInfo, e); @@ -218,7 +216,7 @@ public class ShellBolt implements IBolt { if(shellMsg.getTask() == 0) { List<Integer> outtasks = _collector.emit(shellMsg.getStream(), anchors, shellMsg.getTuple()); if (shellMsg.areTaskIdsNeeded()) { - _pendingWrites.putTaskIds(outtasks); + _pendingWrites.put(outtasks); } } else { _collector.emitDirect((int) shellMsg.getTask(), @@ -324,6 +322,8 @@ public class ShellBolt implements IBolt { sendHeartbeatFlag.compareAndSet(false, true); } + + } private class BoltReaderRunnable implements Runnable { @@ -388,6 +388,7 @@ public class ShellBolt implements IBolt { } else if (write != null) { throw new RuntimeException("Unknown class type to write: " + write.getClass().getName()); } + } catch (InterruptedException e) { } catch (Throwable t) { die(t); } http://git-wip-us.apache.org/repos/asf/storm/blob/d2090d79/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java b/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java deleted file mode 100644 index b633bc5..0000000 --- a/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package backtype.storm.utils; - -import backtype.storm.multilang.BoltMsg; - -import java.io.Serializable; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -/** - * A data structure for ShellBolt which includes two queues (FIFO), - * which one is for task ids (unbounded), another one is for bolt msg (bounded). - */ -public class ShellBoltMessageQueue implements Serializable { - private final LinkedList<List<Integer>> taskIdsQueue = new LinkedList<>(); - private final LinkedBlockingQueue<BoltMsg> boltMsgQueue; - - private final ReentrantLock takeLock = new ReentrantLock(); - private final Condition notEmpty = takeLock.newCondition(); - - public ShellBoltMessageQueue(int boltMsgCapacity) { - if (boltMsgCapacity <= 0) { - throw new IllegalArgumentException(); - } - this.boltMsgQueue = new LinkedBlockingQueue<>(boltMsgCapacity); - } - - public ShellBoltMessageQueue() { - this(Integer.MAX_VALUE); - } - - /** - * put list of task id to its queue - * @param taskIds task ids that received the tuples - */ - public void putTaskIds(List<Integer> taskIds) { - taskIdsQueue.add(taskIds); - takeLock.lock(); - try { - notEmpty.signal(); - } finally { - takeLock.unlock(); - } - } - - /** - * put bolt message to its queue - * @param boltMsg BoltMsg to pass to subprocess - * @throws InterruptedException - */ - public void putBoltMsg(BoltMsg boltMsg) throws InterruptedException { - boltMsgQueue.put(boltMsg); - takeLock.lock(); - try { - notEmpty.signal(); - } finally { - takeLock.unlock(); - } - } - - /** - * poll() is a core feature of ShellBoltMessageQueue. - * It retrieves and removes the head of one queues, waiting up to the - * specified wait time if necessary for an element to become available. - * There's priority that what queue it retrieves first, taskIds is higher than boltMsgQueue. - * - * @param timeout how long to wait before giving up, in units of unit - * @param unit a TimeUnit determining how to interpret the timeout parameter - * @return List\<Integer\> if task id is available, - * BoltMsg if task id is not available but bolt message is available, - * null if the specified waiting time elapses before an element is available. - * @throws InterruptedException - */ - public Object poll(long timeout, TimeUnit unit) throws InterruptedException { - takeLock.lockInterruptibly(); - long nanos = unit.toNanos(timeout); - try { - // wait for available queue - while (taskIdsQueue.peek() == null && boltMsgQueue.peek() == null) { - if (nanos <= 0) { - return null; - } - nanos = notEmpty.awaitNanos(nanos); - } - - // taskIds first - List<Integer> taskIds = taskIdsQueue.peek(); - if (taskIds != null) { - taskIds = taskIdsQueue.poll(); - return taskIds; - } - - // boltMsgQueue should have at least one entry at the moment - return boltMsgQueue.poll(); - } finally { - takeLock.unlock(); - } - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d2090d79/storm-core/test/clj/backtype/storm/multilang_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/backtype/storm/multilang_test.clj b/storm-core/test/clj/backtype/storm/multilang_test.clj index b42a56f..ff8f2f1 100644 --- a/storm-core/test/clj/backtype/storm/multilang_test.clj +++ b/storm-core/test/clj/backtype/storm/multilang_test.clj @@ -47,7 +47,7 @@ "test" {TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true} topology) - (Thread/sleep 31000) + (Thread/sleep 11000) (.killTopology nimbus "test") (Thread/sleep 11000) ))) http://git-wip-us.apache.org/repos/asf/storm/blob/d2090d79/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java b/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java deleted file mode 100644 index 229efa1..0000000 --- a/storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.utils; - -import backtype.storm.multilang.BoltMsg; -import com.google.common.collect.Lists; -import junit.framework.TestCase; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -public class ShellBoltMessageQueueTest extends TestCase { - @Test - public void testPollTaskIdsFirst() throws InterruptedException { - ShellBoltMessageQueue queue = new ShellBoltMessageQueue(); - - // put bolt message first, then put task ids - queue.putBoltMsg(new BoltMsg()); - ArrayList<Integer> taskIds = Lists.newArrayList(1, 2, 3); - queue.putTaskIds(taskIds); - - Object msg = queue.poll(10, TimeUnit.SECONDS); - - // task ids should be pulled first - assertTrue(msg instanceof List<?>); - assertEquals(msg, taskIds); - } - - @Test - public void testPollWhileThereAreNoDataAvailable() throws InterruptedException { - ShellBoltMessageQueue queue = new ShellBoltMessageQueue(); - - long start = System.currentTimeMillis(); - Object msg = queue.poll(1, TimeUnit.SECONDS); - long finish = System.currentTimeMillis(); - - assertNull(msg); - assertTrue(finish - start > 1000); - } - - @Test - public void testPollShouldReturnASAPWhenDataAvailable() throws InterruptedException { - final ShellBoltMessageQueue queue = new ShellBoltMessageQueue(); - final List<Integer> taskIds = Lists.newArrayList(1, 2, 3); - - Thread t = new Thread(new Runnable() { - @Override - public void run() { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // NOOP - } - - queue.putTaskIds(taskIds); - } - }); - t.start(); - - long start = System.currentTimeMillis(); - Object msg = queue.poll(10, TimeUnit.SECONDS); - long finish = System.currentTimeMillis(); - - assertEquals(msg, taskIds); - assertTrue(finish - start < (10 * 1000)); - } -}
