Merge branch 'STORM-929' into STORM-935

Conflicts:
        storm-core/src/jvm/backtype/storm/Config.java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/05948253
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/05948253
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/05948253

Branch: refs/heads/0.10.x-branch
Commit: 0594825303397289eee258915b05d0c0a697a717
Parents: da76088
Author: errordaiwa <xingy...@outlook.com>
Authored: Tue Jul 14 12:40:45 2015 +0800
Committer: Jungtaek Lim <kabh...@gmail.com>
Committed: Fri Jul 17 08:01:01 2015 +0900

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/05948253/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java 
b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index c4c936a..4d3f18b 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -98,8 +98,11 @@ public class DisruptorQueue implements IStatefulObject {
     public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
         try {
             final long nextSequence = _consumer.get() + 1;
-            final long availableSequence = _barrier.waitFor(nextSequence);
-            if(availableSequence >= nextSequence) {
+            final long availableSequence =
+                    _waitTimeout == 0L ? _barrier.waitFor(nextSequence) : 
_barrier.waitFor(nextSequence, _waitTimeout,
+                            TimeUnit.MILLISECONDS);
+
+            if (availableSequence >= nextSequence) {
                 consumeBatchToCursor(availableSequence, handler);
             }
         } catch (AlertException e) {

Reply via email to