Repository: storm
Updated Branches:
  refs/heads/master d8384f43b -> 29a44e57a


STORM-2231 Fix multi-threads issue on executor send queue

* we have use cases which launches threads and emit/ack concurrently
* launched threads will create ThreadLocalBatcher for each
  * hence 'synchronized' in add() wouldn't help unlike background flushes


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2b15fc4f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2b15fc4f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2b15fc4f

Branch: refs/heads/master
Commit: 2b15fc4fc7189f2f42a0fea13f2ca00a6675d6c7
Parents: 0bf7e70
Author: Jungtaek Lim <[email protected]>
Authored: Thu Aug 24 20:11:41 2017 +0900
Committer: Jungtaek Lim <[email protected]>
Committed: Thu Aug 24 20:23:43 2017 +0900

----------------------------------------------------------------------
 storm-client/src/jvm/org/apache/storm/executor/Executor.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2b15fc4f/storm-client/src/jvm/org/apache/storm/executor/Executor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java 
b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index c1c6350..842141c 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -387,7 +387,7 @@ public abstract class Executor implements Callable, 
EventHandler<Object> {
         int waitTimeOutMs = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS));
         int batchSize = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_SIZE));
         int batchTimeOutMs = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS));
-        return new DisruptorQueue("executor" + executorId + "-send-queue", 
ProducerType.SINGLE,
+        return new DisruptorQueue("executor" + executorId + "-send-queue", 
ProducerType.MULTI,
                 sendSize, waitTimeOutMs, batchSize, batchTimeOutMs);
     }
 

Reply via email to