Repository: lens
Updated Branches:
  refs/heads/master a1445f3d3 -> aeca66ad4


LENS-993 : Fix thread pool creations on server


Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/aeca66ad
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/aeca66ad
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/aeca66ad

Branch: refs/heads/master
Commit: aeca66ad432d7717b9d7651d7a3c9277da25bb70
Parents: a1445f3
Author: Amareshwari Sriramadasu <amareshw...@apache.org>
Authored: Mon Mar 28 15:37:11 2016 +0530
Committer: Amareshwari Sriramadasu <amareshw...@apache.org>
Committed: Mon Mar 28 15:37:11 2016 +0530

----------------------------------------------------------------------
 .../server/api/events/AsyncEventListener.java   | 22 +++++++-------------
 .../lens/server/query/QueryEndNotifier.java     |  5 ++---
 .../server/query/QueryExecutionServiceImpl.java |  3 ++-
 .../lens/server/query/ResultFormatter.java      |  5 ++---
 .../lens/server/query/TestEventService.java     |  2 +-
 5 files changed, 15 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lens/blob/aeca66ad/lens-server-api/src/main/java/org/apache/lens/server/api/events/AsyncEventListener.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/events/AsyncEventListener.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/events/AsyncEventListener.java
index 84728e5..9473465 100644
--- 
a/lens-server-api/src/main/java/org/apache/lens/server/api/events/AsyncEventListener.java
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/events/AsyncEventListener.java
@@ -60,44 +60,38 @@ public abstract class AsyncEventListener<T extends 
LensEvent> implements LensEve
    * Create a single threaded event listener with an unbounded queue, with 
daemon threads.
    */
   public AsyncEventListener() {
-    this(1, 1);
+    this(1);
   }
 
   /**
    * Create a event listener with poolSize threads with an unbounded queue and 
daemon threads.
    *
    * @param poolSize the pool size
-   * @param maxPoolSize the max pool size
    */
-  public AsyncEventListener(int poolSize, int maxPoolSize) {
-    this(poolSize, maxPoolSize, -1, 10, true);
+  public AsyncEventListener(int poolSize) {
+    this(poolSize, 60, true);
   }
 
   /**
    * Create an asynchronous event listener which uses a thread poool to 
process events.
    *
    * @param poolSize       size of the event processing pool
-   * @param maxPoolSize    the max pool size
-   * @param maxQueueSize   max size of the event queue, if this is non 
positive, then the queue is unbounded
    * @param timeOutSeconds time out in seconds when an idle thread is destroyed
    * @param isDaemon       if the threads used to process should be daemon 
threads,
    *                       if false, then implementation should call stop()
    *                       to stop the thread pool
    */
-  public AsyncEventListener(int poolSize, int maxPoolSize, int maxQueueSize, 
long timeOutSeconds,
-      final boolean isDaemon) {
-    if (maxQueueSize <= 0) {
-      eventQueue = new LinkedBlockingQueue<Runnable>();
-    } else {
-      eventQueue = new ArrayBlockingQueue<Runnable>(maxQueueSize);
-    }
+  public AsyncEventListener(int poolSize, long timeOutSeconds, final boolean 
isDaemon) {
+    eventQueue = new LinkedBlockingQueue<>();
 
     ThreadFactory factory = new BasicThreadFactory.Builder()
       .namingPattern(getName()+"_AsyncThread-%d")
       .daemon(isDaemon)
       .priority(Thread.NORM_PRIORITY)
       .build();
-    processor = new ThreadPoolExecutor(poolSize, maxPoolSize, timeOutSeconds, 
TimeUnit.SECONDS, eventQueue, factory);
+    // fixed pool with min and max equal to poolSize
+    processor = new ThreadPoolExecutor(poolSize, poolSize, timeOutSeconds, 
TimeUnit.SECONDS, eventQueue, factory);
+    processor.allowCoreThreadTimeOut(true);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/lens/blob/aeca66ad/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java
index 63c38d9..91fddc9 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java
@@ -95,8 +95,7 @@ public class QueryEndNotifier extends 
AsyncEventListener<QueryEnded> {
   private final LogSegregationContext logSegregationContext;
 
   /** QueryEndNotifier core and max pool size */
-  private static final int CORE_POOL_SIZE = 2;
-  private static final int MAX_POOL_SIZE = 5;
+  private static final int CORE_POOL_SIZE = 5;
 
   /** Instantiates a new query end notifier.
    *
@@ -104,7 +103,7 @@ public class QueryEndNotifier extends 
AsyncEventListener<QueryEnded> {
    * @param hiveConf     the hive conf */
   public QueryEndNotifier(QueryExecutionServiceImpl queryService, HiveConf 
hiveConf,
     @NonNull final LogSegregationContext logSegregationContext) {
-    super(CORE_POOL_SIZE, MAX_POOL_SIZE);
+    super(CORE_POOL_SIZE);
     this.queryService = queryService;
     HiveConf conf = hiveConf;
     from = conf.get(MAIL_FROM_ADDRESS);

http://git-wip-us.apache.org/repos/asf/lens/blob/aeca66ad/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
index 90c25e4..581530f 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
@@ -1274,8 +1274,9 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
     };
 
     log.debug("starting estimate pool");
+
     ThreadPoolExecutor estimatePool = new ThreadPoolExecutor(minPoolSize, 
maxPoolSize, keepAlive, TimeUnit.MILLISECONDS,
-      new LinkedBlockingQueue<Runnable>(), threadFactory);
+      new SynchronousQueue<Runnable>(), threadFactory);
     estimatePool.allowCoreThreadTimeOut(true);
     estimatePool.prestartCoreThread();
     this.estimatePool = estimatePool;

http://git-wip-us.apache.org/repos/asf/lens/blob/aeca66ad/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java 
b/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java
index 9955278..c8bfa56 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java
@@ -47,8 +47,7 @@ public class ResultFormatter extends 
AsyncEventListener<QueryExecuted> {
   QueryExecutionServiceImpl queryService;
 
   /** ResultFormatter core and max pool size */
-  private static final int CORE_POOL_SIZE = 5;
-  private static final int MAX_POOL_SIZE = 10;
+  private static final int CORE_POOL_SIZE = 10;
 
   private final LogSegregationContext logSegregationContext;
 
@@ -58,7 +57,7 @@ public class ResultFormatter extends 
AsyncEventListener<QueryExecuted> {
    * @param queryService the query service
    */
   public ResultFormatter(QueryExecutionServiceImpl queryService, @NonNull 
LogSegregationContext logSegregationContext) {
-    super(CORE_POOL_SIZE, MAX_POOL_SIZE);
+    super(CORE_POOL_SIZE);
     this.queryService = queryService;
     this.logSegregationContext = logSegregationContext;
   }

http://git-wip-us.apache.org/repos/asf/lens/blob/aeca66ad/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java 
b/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java
index a2ca17f..573d388 100644
--- 
a/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java
+++ 
b/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java
@@ -540,7 +540,7 @@ public class TestEventService {
 
   private static class DummyAsncEventListener extends 
AsyncEventListener<QuerySuccess> {
     public DummyAsncEventListener(){
-      super(5, 10); //core pool = 5 and max Pool size =10
+      super(5); //core pool = 5
     }
     @Override
     public void process(QuerySuccess event) {

Reply via email to