Merge branch 'STORM-929' into STORM-935 Conflicts: storm-core/src/jvm/backtype/storm/Config.java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/05948253 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/05948253 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/05948253 Branch: refs/heads/0.10.x-branch Commit: 0594825303397289eee258915b05d0c0a697a717 Parents: da76088 Author: errordaiwa <xingy...@outlook.com> Authored: Tue Jul 14 12:40:45 2015 +0800 Committer: Jungtaek Lim <kabh...@gmail.com> Committed: Fri Jul 17 08:01:01 2015 +0900 ---------------------------------------------------------------------- storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/05948253/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java index c4c936a..4d3f18b 100644 --- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java +++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java @@ -98,8 +98,11 @@ public class DisruptorQueue implements IStatefulObject { public void consumeBatchWhenAvailable(EventHandler<Object> handler) { try { final long nextSequence = _consumer.get() + 1; - final long availableSequence = _barrier.waitFor(nextSequence); - if(availableSequence >= nextSequence) { + final long availableSequence = + _waitTimeout == 0L ? _barrier.waitFor(nextSequence) : _barrier.waitFor(nextSequence, _waitTimeout, + TimeUnit.MILLISECONDS); + + if (availableSequence >= nextSequence) { consumeBatchToCursor(availableSequence, handler); } } catch (AlertException e) {