Repository: lens Updated Branches: refs/heads/master a1445f3d3 -> aeca66ad4
LENS-993 : Fix thread pool creations on server Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/aeca66ad Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/aeca66ad Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/aeca66ad Branch: refs/heads/master Commit: aeca66ad432d7717b9d7651d7a3c9277da25bb70 Parents: a1445f3 Author: Amareshwari Sriramadasu <amareshw...@apache.org> Authored: Mon Mar 28 15:37:11 2016 +0530 Committer: Amareshwari Sriramadasu <amareshw...@apache.org> Committed: Mon Mar 28 15:37:11 2016 +0530 ---------------------------------------------------------------------- .../server/api/events/AsyncEventListener.java | 22 +++++++------------- .../lens/server/query/QueryEndNotifier.java | 5 ++--- .../server/query/QueryExecutionServiceImpl.java | 3 ++- .../lens/server/query/ResultFormatter.java | 5 ++--- .../lens/server/query/TestEventService.java | 2 +- 5 files changed, 15 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/aeca66ad/lens-server-api/src/main/java/org/apache/lens/server/api/events/AsyncEventListener.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/events/AsyncEventListener.java b/lens-server-api/src/main/java/org/apache/lens/server/api/events/AsyncEventListener.java index 84728e5..9473465 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/events/AsyncEventListener.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/events/AsyncEventListener.java @@ -60,44 +60,38 @@ public abstract class AsyncEventListener<T extends LensEvent> implements LensEve * Create a single threaded event listener with an unbounded queue, with daemon threads. */ public AsyncEventListener() { - this(1, 1); + this(1); } /** * Create a event listener with poolSize threads with an unbounded queue and daemon threads. * * @param poolSize the pool size - * @param maxPoolSize the max pool size */ - public AsyncEventListener(int poolSize, int maxPoolSize) { - this(poolSize, maxPoolSize, -1, 10, true); + public AsyncEventListener(int poolSize) { + this(poolSize, 60, true); } /** * Create an asynchronous event listener which uses a thread poool to process events. * * @param poolSize size of the event processing pool - * @param maxPoolSize the max pool size - * @param maxQueueSize max size of the event queue, if this is non positive, then the queue is unbounded * @param timeOutSeconds time out in seconds when an idle thread is destroyed * @param isDaemon if the threads used to process should be daemon threads, * if false, then implementation should call stop() * to stop the thread pool */ - public AsyncEventListener(int poolSize, int maxPoolSize, int maxQueueSize, long timeOutSeconds, - final boolean isDaemon) { - if (maxQueueSize <= 0) { - eventQueue = new LinkedBlockingQueue<Runnable>(); - } else { - eventQueue = new ArrayBlockingQueue<Runnable>(maxQueueSize); - } + public AsyncEventListener(int poolSize, long timeOutSeconds, final boolean isDaemon) { + eventQueue = new LinkedBlockingQueue<>(); ThreadFactory factory = new BasicThreadFactory.Builder() .namingPattern(getName()+"_AsyncThread-%d") .daemon(isDaemon) .priority(Thread.NORM_PRIORITY) .build(); - processor = new ThreadPoolExecutor(poolSize, maxPoolSize, timeOutSeconds, TimeUnit.SECONDS, eventQueue, factory); + // fixed pool with min and max equal to poolSize + processor = new ThreadPoolExecutor(poolSize, poolSize, timeOutSeconds, TimeUnit.SECONDS, eventQueue, factory); + processor.allowCoreThreadTimeOut(true); } /** http://git-wip-us.apache.org/repos/asf/lens/blob/aeca66ad/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java index 63c38d9..91fddc9 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java @@ -95,8 +95,7 @@ public class QueryEndNotifier extends AsyncEventListener<QueryEnded> { private final LogSegregationContext logSegregationContext; /** QueryEndNotifier core and max pool size */ - private static final int CORE_POOL_SIZE = 2; - private static final int MAX_POOL_SIZE = 5; + private static final int CORE_POOL_SIZE = 5; /** Instantiates a new query end notifier. * @@ -104,7 +103,7 @@ public class QueryEndNotifier extends AsyncEventListener<QueryEnded> { * @param hiveConf the hive conf */ public QueryEndNotifier(QueryExecutionServiceImpl queryService, HiveConf hiveConf, @NonNull final LogSegregationContext logSegregationContext) { - super(CORE_POOL_SIZE, MAX_POOL_SIZE); + super(CORE_POOL_SIZE); this.queryService = queryService; HiveConf conf = hiveConf; from = conf.get(MAIL_FROM_ADDRESS); http://git-wip-us.apache.org/repos/asf/lens/blob/aeca66ad/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java index 90c25e4..581530f 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java @@ -1274,8 +1274,9 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE }; log.debug("starting estimate pool"); + ThreadPoolExecutor estimatePool = new ThreadPoolExecutor(minPoolSize, maxPoolSize, keepAlive, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>(), threadFactory); + new SynchronousQueue<Runnable>(), threadFactory); estimatePool.allowCoreThreadTimeOut(true); estimatePool.prestartCoreThread(); this.estimatePool = estimatePool; http://git-wip-us.apache.org/repos/asf/lens/blob/aeca66ad/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java b/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java index 9955278..c8bfa56 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java @@ -47,8 +47,7 @@ public class ResultFormatter extends AsyncEventListener<QueryExecuted> { QueryExecutionServiceImpl queryService; /** ResultFormatter core and max pool size */ - private static final int CORE_POOL_SIZE = 5; - private static final int MAX_POOL_SIZE = 10; + private static final int CORE_POOL_SIZE = 10; private final LogSegregationContext logSegregationContext; @@ -58,7 +57,7 @@ public class ResultFormatter extends AsyncEventListener<QueryExecuted> { * @param queryService the query service */ public ResultFormatter(QueryExecutionServiceImpl queryService, @NonNull LogSegregationContext logSegregationContext) { - super(CORE_POOL_SIZE, MAX_POOL_SIZE); + super(CORE_POOL_SIZE); this.queryService = queryService; this.logSegregationContext = logSegregationContext; } http://git-wip-us.apache.org/repos/asf/lens/blob/aeca66ad/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java b/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java index a2ca17f..573d388 100644 --- a/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java +++ b/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java @@ -540,7 +540,7 @@ public class TestEventService { private static class DummyAsncEventListener extends AsyncEventListener<QuerySuccess> { public DummyAsncEventListener(){ - super(5, 10); //core pool = 5 and max Pool size =10 + super(5); //core pool = 5 } @Override public void process(QuerySuccess event) {