HIVE-15947: Enhance Templeton service job operations reliability (Subramanyam 
Pattipaka, reviewed by Daniel Dai)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2f1d4b11
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2f1d4b11
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2f1d4b11

Branch: refs/heads/master
Commit: 2f1d4b11ec5384a6995875f7fb2e560d325d6564
Parents: 74ca2ee
Author: Daniel Dai <da...@hortonworks.com>
Authored: Thu Mar 16 10:38:57 2017 -0700
Committer: Daniel Dai <da...@hortonworks.com>
Committed: Thu Mar 16 10:38:57 2017 -0700

----------------------------------------------------------------------
 .../hive/hcatalog/templeton/AppConfig.java      |  37 ++
 .../hive/hcatalog/templeton/HiveDelegator.java  |   2 +-
 .../hive/hcatalog/templeton/JarDelegator.java   |   2 +-
 .../hive/hcatalog/templeton/JobCallable.java    | 115 ++++++
 .../hcatalog/templeton/JobRequestExecutor.java  | 341 +++++++++++++++++
 .../hcatalog/templeton/LauncherDelegator.java   | 184 ++++++++-
 .../hive/hcatalog/templeton/ListDelegator.java  | 142 ++++++-
 .../hive/hcatalog/templeton/PigDelegator.java   |   2 +-
 .../apache/hive/hcatalog/templeton/Server.java  |  82 +---
 .../hive/hcatalog/templeton/SqoopDelegator.java |   2 +-
 .../hcatalog/templeton/StatusDelegator.java     |  63 +++-
 .../hcatalog/templeton/StreamingDelegator.java  |   2 +-
 .../templeton/TooManyRequestsException.java     |  35 ++
 .../templeton/tool/TempletonControllerJob.java  |  11 +-
 .../ConcurrentJobRequestsTestBase.java          | 231 ++++++++++++
 .../templeton/MockAnswerTestHelper.java         |  56 +++
 .../templeton/TestConcurrentJobRequests.java    |  79 ++++
 .../TestConcurrentJobRequestsThreads.java       | 134 +++++++
 ...tConcurrentJobRequestsThreadsAndTimeout.java | 374 +++++++++++++++++++
 19 files changed, 1804 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
index 54d0907..0ea7d88 100644
--- 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
+++ 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
@@ -111,6 +111,43 @@ public class AppConfig extends Configuration {
   public static final String MR_AM_MEMORY_MB     = "templeton.mr.am.memory.mb";
   public static final String TEMPLETON_JOBSLIST_ORDER = 
"templeton.jobs.listorder";
 
+  /*
+   * These parameters controls the maximum number of concurrent job 
submit/status/list
+   * operations in templeton service. If more number of concurrent requests 
comes then
+   * they will be rejected with BusyException.
+   */
+  public static final String JOB_SUBMIT_MAX_THREADS = 
"templeton.parallellism.job.submit";
+  public static final String JOB_STATUS_MAX_THREADS = 
"templeton.parallellism.job.status";
+  public static final String JOB_LIST_MAX_THREADS = 
"templeton.parallellism.job.list";
+
+  /*
+   * These parameters controls the maximum time job submit/status/list 
operation is
+   * executed in templeton service. On time out, the execution is interrupted 
and
+   * TimeoutException is returned to client. On time out
+   *   For list and status operation, there is no action needed as they are 
read requests.
+   *   For submit operation, we do best effort to kill the job if its 
generated. Enabling
+   *     this parameter may have following side effects
+   *     1) There is a possibility for having active job for some time when 
the client gets
+   *        response for submit operation and a list operation from client 
could potential
+   *        show the newly created job which may eventually be killed with no 
guarantees.
+   *     2) If submit operation retried by client then there is a possibility 
of duplicate
+   *        jobs triggered.
+   *
+   * Time out configs should be configured in seconds.
+   *
+   */
+  public static final String JOB_SUBMIT_TIMEOUT   = 
"templeton.job.submit.timeout";
+  public static final String JOB_STATUS_TIMEOUT   = 
"templeton.job.status.timeout";
+  public static final String JOB_LIST_TIMEOUT   = "templeton.job.list.timeout";
+
+  /*
+   * If task execution time out is configured for submit operation then job 
may need to
+   * be killed on execution time out. These parameters controls the maximum 
number of
+   * retries and retry wait time in seconds for executing the time out task.
+   */
+  public static final String JOB_TIMEOUT_TASK_RETRY_COUNT = 
"templeton.job.timeout.task.retry.count";
+  public static final String JOB_TIMEOUT_TASK_RETRY_INTERVAL = 
"templeton.job.timeout.task.retry.interval";
+
   /**
    * see webhcat-default.xml
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
index f0296cb..1953028 100644
--- 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
+++ 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
@@ -49,7 +49,7 @@ public class HiveDelegator extends LauncherDelegator {
                String statusdir, String callback, String completedUrl, boolean 
enablelog,
                Boolean enableJobReconnect)
     throws NotAuthorizedException, BadParam, BusyException, QueueException,
-    ExecuteException, IOException, InterruptedException
+    ExecuteException, IOException, InterruptedException, 
TooManyRequestsException
   {
     runAs = user;
     List<String> args = makeArgs(execute, srcFile, defines, hiveArgs, 
otherFiles, statusdir,

http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
index 84cd5b9..1246b40 100644
--- 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
+++ 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
@@ -46,7 +46,7 @@ public class JarDelegator extends LauncherDelegator {
                boolean usesHcatalog, String completedUrl,
                boolean enablelog, Boolean enableJobReconnect, JobType jobType)
     throws NotAuthorizedException, BadParam, BusyException, QueueException,
-    ExecuteException, IOException, InterruptedException {
+    ExecuteException, IOException, InterruptedException, 
TooManyRequestsException {
     runAs = user;
     List<String> args = makeArgs(jar, mainClass,
       libjars, files, jarArgs, defines,

http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java
 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java
new file mode 100644
index 0000000..e703eff
--- /dev/null
+++ 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton;
+
+import java.util.concurrent.Callable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class JobCallable<T> implements Callable<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(JobCallable.class);
+
+  static public enum JobState {
+    STARTED,
+    FAILED,
+    COMPLETED
+  }
+
+  /*
+   * Job state of job request. Changes to the state are synchronized using
+   * setStateAndResult. This is required due to two different threads,
+   * main thread and job execute thread, tries to change state and organize
+   * clean up tasks.
+   */
+  private JobState jobState = JobState.STARTED;
+
+  /*
+   * Result of JobCallable task after successful task completion. This is
+   * expected to be set by the thread which executes JobCallable task.
+   */
+  public T returnResult = null;
+
+  /*
+   * Sets the job state to FAILED. Returns true if FAILED status is set.
+   * Otherwise, it returns false.
+   */
+  public boolean setJobStateFailed() {
+    return setStateAndResult(JobState.FAILED, null);
+  }
+
+  /*
+   * Sets the job state to COMPLETED and also sets the results value. Returns 
true
+   * if COMPLETED status is set. Otherwise, it returns false.
+   */
+  public boolean setJobStateCompleted(T result) {
+    return setStateAndResult(JobState.COMPLETED, result);
+  }
+
+  /*
+   * Sets the job state and result. Returns true if status and result are set.
+   * Otherwise, it returns false.
+   */
+  private synchronized boolean setStateAndResult(JobState jobState, T result) {
+    if (this.jobState == JobState.STARTED) {
+      this.jobState = jobState;
+      this.returnResult = result;
+      return true;
+    } else {
+      LOG.info("Failed to set job state to " + jobState + " due to job state "
+                  + this.jobState + ". Expected state is " + JobState.STARTED);
+    }
+
+    return false;
+  }
+
+  /*
+   * Executes the callable task with help of execute() call and gets the result
+   * of the task. It also sets job status as COMPLETED if state is not already
+   * set to FAILED and returns result to future.
+   */
+  public T call() throws Exception {
+
+    /*
+     * Don't catch any execution exceptions here and let the caller catch it.
+     */
+    T result = this.execute();
+
+    if (!this.setJobStateCompleted(result)) {
+     /*
+      * Failed to set job status as COMPLETED which mean the main thread would 
have
+      * exited and not waiting for the result. Call cleanup() to execute any 
cleanup.
+      */
+      cleanup();
+      return null;
+    }
+
+    return this.returnResult;
+  }
+
+  /*
+   * Abstract method to be overridden for task execution.
+   */
+  public abstract T execute() throws Exception;
+
+  /*
+   * Cleanup method called to run cleanup tasks if job state is FAILED. By 
default,
+   * no cleanup is provided.
+   */
+  public void cleanup() {}
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java
 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java
new file mode 100644
index 0000000..9ac4588
--- /dev/null
+++ 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JobRequestExecutor<T> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(JobRequestExecutor.class);
+  private static AppConfig appConf = Main.getAppConfigInstance();
+
+  /*
+   * Thread pool to execute job requests.
+   */
+  private ThreadPoolExecutor jobExecutePool = null;
+
+  /*
+   * Type of job request.
+   */
+  private JobRequestType requestType;
+
+  /*
+   * Config name used to find the number of concurrent requests.
+   */
+  private String concurrentRequestsConfigName;
+
+  /*
+   * Config name used to find the maximum time job request can be executed.
+   */
+  private String jobTimeoutConfigName;
+
+  /*
+   * Job request execution time out in seconds. If it is 0 then request
+   * will not be timed out.
+   */
+  private int requestExecutionTimeoutInSec = 0;
+
+  /*
+   * Amount of time a thread can be alive in thread pool before cleaning this 
up. Core threads
+   * will not be cleanup from thread pool.
+   */
+  private int threadKeepAliveTimeInHours = 1;
+
+  /*
+   * Maximum number of times a cancel request is sent to job request execution
+   * thread. Future.cancel may not be able to interrupt the thread if it is
+   * blocked on network calls.
+   */
+  private int maxTaskCancelRetryCount = 10;
+
+  /*
+   * Wait time in milliseconds before another cancel request is made.
+   */
+  private int maxTaskCancelRetryWaitTimeInMs = 1000;
+
+  /*
+   * A flag to indicate whether to cancel the task when exception 
TimeoutException or
+   * InterruptedException or CancellationException raised. The default is 
cancel thread.
+   */
+  private boolean enableCancelTask = true;
+
+  /*
+   * Job Request type.
+   */
+  public enum JobRequestType {
+    Submit,
+    Status,
+    List
+  }
+
+  /*
+   * Creates a job request object and sets up execution environment. Creates a 
thread pool
+   * to execute job requests.
+   *
+   * @param requestType
+   *          Job request type
+   *
+   * @param concurrentRequestsConfigName
+   *          Config name to be used to extract number of concurrent requests 
to be serviced.
+   *
+   * @param jobTimeoutConfigName
+   *          Config name to be used to extract maximum time a task can 
execute a request.
+   *
+   * @param enableCancelTask
+   *          A flag to indicate whether to cancel the task when exception 
TimeoutException
+   *          or InterruptedException or CancellationException raised.
+   *
+   */
+  public JobRequestExecutor(JobRequestType requestType, String 
concurrentRequestsConfigName,
+                            String jobTimeoutConfigName, boolean 
enableCancelTask) {
+
+    this.concurrentRequestsConfigName = concurrentRequestsConfigName;
+    this.jobTimeoutConfigName = jobTimeoutConfigName;
+    this.requestType = requestType;
+    this.enableCancelTask = enableCancelTask;
+
+    /*
+     * The default number of threads will be 0. That means thread pool is not 
used and
+     * operation is executed with the current thread.
+     */
+    int threads = !StringUtils.isEmpty(concurrentRequestsConfigName) ?
+                                appConf.getInt(concurrentRequestsConfigName, 
0) : 0;
+
+    if (threads > 0) {
+      /*
+       * Create a thread pool with no queue wait time to execute the 
operation. This will ensure
+       * that job requests are rejected if there are already maximum number of 
threads busy.
+       */
+      this.jobExecutePool = new ThreadPoolExecutor(threads, threads,
+                             threadKeepAliveTimeInHours, TimeUnit.HOURS,
+                             new SynchronousQueue<Runnable>());
+       this.jobExecutePool.allowCoreThreadTimeOut(true);
+
+      /*
+       * Get the job request time out value. If this configuration value is 
set to 0
+       * then job request will wait until it finishes.
+       */
+      if (!StringUtils.isEmpty(jobTimeoutConfigName)) {
+        this.requestExecutionTimeoutInSec = 
appConf.getInt(jobTimeoutConfigName, 0);
+      }
+
+      LOG.info("Configured " + threads + " threads for job request type " + 
this.requestType
+                 + " with time out " + this.requestExecutionTimeoutInSec + " 
s.");
+    } else {
+      /*
+       * If threads are not configured then they will be executed in current 
thread itself.
+       */
+      LOG.info("No thread pool configured for job request type " + 
this.requestType);
+    }
+  }
+
+  /*
+   * Creates a job request object and sets up execution environment. Creates a 
thread pool
+   * to execute job requests.
+   *
+   * @param requestType
+   *          Job request type
+   *
+   * @param concurrentRequestsConfigName
+   *          Config name to be used to extract number of concurrent requests 
to be serviced.
+   *
+   * @param jobTimeoutConfigName
+   *          Config name to be used to extract maximum time a task can 
execute a request.
+   *
+   */
+  public JobRequestExecutor(JobRequestType requestType, String 
concurrentRequestsConfigName,
+                            String jobTimeoutConfigName) {
+    this(requestType, concurrentRequestsConfigName, jobTimeoutConfigName, 
true);
+  }
+
+  /*
+   * Returns true of thread pool is created and can be used for executing a 
job request.
+   * Otherwise, returns false.
+   */
+  public boolean isThreadPoolEnabled() {
+    return this.jobExecutePool != null;
+  }
+
+  /*
+   * Executes job request operation. If thread pool is not created then job 
request is
+   * executed in current thread itself.
+   *
+   * @param jobExecuteCallable
+   *          Callable object to run the job request task.
+   *
+   */
+  public T execute(JobCallable<T> jobExecuteCallable) throws 
InterruptedException,
+                 TimeoutException, TooManyRequestsException, 
ExecutionException {
+    /*
+     * The callable shouldn't be null to execute. The thread pool also should 
be configured
+     * to execute requests.
+     */
+    assert (jobExecuteCallable != null);
+    assert (this.jobExecutePool != null);
+
+    String type = this.requestType.toString().toLowerCase();
+
+    String retryMessageForConcurrentRequests = "Please wait for some time 
before retrying "
+                  + "the operation. Please refer to the config " + 
concurrentRequestsConfigName
+                  + " to configure concurrent requests.";
+
+    LOG.debug("Starting new " + type + " job request with time out " + 
this.requestExecutionTimeoutInSec
+              + "seconds.");
+    Future<T> future = null;
+
+    try {
+      future = this.jobExecutePool.submit(jobExecuteCallable);
+    } catch (RejectedExecutionException rejectedException) {
+      /*
+       * Not able to find thread to execute the job request. Raise Busy 
exception and client
+       * can retry the operation.
+       */
+      String tooManyRequestsExceptionMessage = "Unable to service the " + type 
+ " job request as "
+                        + "templeton service is busy with too many " + type + 
" job requests. "
+                        + retryMessageForConcurrentRequests;
+
+      LOG.warn(tooManyRequestsExceptionMessage);
+      throw new TooManyRequestsException(tooManyRequestsExceptionMessage);
+    }
+
+    T result = null;
+
+    try {
+      result = this.requestExecutionTimeoutInSec > 0
+                ? future.get(this.requestExecutionTimeoutInSec, 
TimeUnit.SECONDS) : future.get();
+    } catch (TimeoutException e) {
+      /*
+       * See if the execution thread has just completed operation and result 
is available.
+       * If result is available then return the result. Otherwise, raise 
exception.
+       */
+      if ((result = tryGetJobResultOrSetJobStateFailed(jobExecuteCallable)) == 
null) {
+        String message = this.requestType + " job request got timed out. 
Please wait for some time "
+                       + "before retrying the operation. Please refer to the 
config "
+                       + jobTimeoutConfigName + " to configure job request 
time out.";
+        LOG.warn(message);
+
+        /*
+         * Throw TimeoutException to caller.
+         */
+        throw new TimeoutException(message);
+      }
+    } catch (InterruptedException e) {
+      /*
+       * See if the execution thread has just completed operation and result 
is available.
+       * If result is available then return the result. Otherwise, raise 
exception.
+       */
+      if ((result = tryGetJobResultOrSetJobStateFailed(jobExecuteCallable)) == 
null) {
+        String message = this.requestType + " job request got interrupted. 
Please wait for some time "
+                       + "before retrying the operation.";
+        LOG.warn(message);
+
+        /*
+         * Throw TimeoutException to caller.
+         */
+        throw new InterruptedException(message);
+      }
+    } catch (CancellationException e) {
+      /*
+       * See if the execution thread has just completed operation and result 
is available.
+       * If result is available then return the result. Otherwise, raise 
exception.
+       */
+      if ((result = tryGetJobResultOrSetJobStateFailed(jobExecuteCallable)) == 
null) {
+        String message = this.requestType + " job request got cancelled and 
thread got interrupted. "
+                       + "Please wait for some time before retrying the 
operation.";
+        LOG.warn(message);
+
+        throw new InterruptedException(message);
+      }
+    } finally {
+      /*
+       * If the thread is still active and needs to be cancelled then cancel 
it. This may
+       * happen in case task got interrupted, or timed out.
+       */
+      if (enableCancelTask) {
+        cancelExecutePoolThread(future);
+      }
+    }
+
+    LOG.debug("Completed " + type + " job request.");
+
+    return result;
+  }
+
+  /*
+   * Initiate cancel request to cancel the thread execution and interrupt the 
thread.
+   * If thread interruption is not handled by jobExecuteCallable then thread 
may continue
+   * running to completion. The cancel call may fail for some scenarios. In 
that case,
+   * retry the cancel call until it returns true or max retry count is reached.
+   *
+   * @param future
+   *          Future object which has handle to cancel the thread.
+   *
+   */
+  private void cancelExecutePoolThread(Future<T> future) {
+    int retryCount = 0;
+    while(retryCount < this.maxTaskCancelRetryCount && !future.isDone()) {
+      LOG.info("Task is still executing the job request. Cancelling it with 
retry count: "
+               + retryCount);
+      if (future.cancel(true)) {
+        /*
+         * Cancelled the job request and return to client.
+         */
+        LOG.info("Cancel job request issued successfully.");
+        return;
+      }
+
+      retryCount++;
+      try {
+        Thread.sleep(this.maxTaskCancelRetryWaitTimeInMs);
+      } catch (InterruptedException e) {
+        /*
+         * Nothing to do. Just retry.
+         */
+      }
+    }
+
+    LOG.warn("Failed to cancel the job. isCancelled: " + future.isCancelled()
+                    + " Retry count: " + retryCount);
+  }
+
+  /*
+   * Tries to get the job result if job request is completed. Otherwise it 
sets job status
+   * to FAILED such that execute thread can do necessary clean up based on 
FAILED state.
+   */
+  private T tryGetJobResultOrSetJobStateFailed(JobCallable<T> 
jobExecuteCallable) {
+    if (!jobExecuteCallable.setJobStateFailed()) {
+      LOG.info("Job is already COMPLETED. Returning the result.");
+      return jobExecuteCallable.returnResult;
+    } else {
+      LOG.info("Job status set to FAILED. Job clean up to be done by execute 
thread "
+              + "after job request is executed.");
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
index b3f44a2..1455316 100644
--- 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
+++ 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
@@ -23,6 +23,8 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,7 +34,7 @@ import org.apache.hadoop.hive.shims.HadoopShimsSecure;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hive.hcatalog.templeton.tool.JobState;
 import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob;
@@ -50,9 +52,26 @@ public class LauncherDelegator extends TempletonDelegator {
   static public enum JobType {JAR, STREAMING, PIG, HIVE, SQOOP}
   private boolean secureMeatastoreAccess = false;
   private final String HIVE_SHIMS_FILENAME_PATTERN = ".*hive-shims.*";
+  private final String JOB_SUBMIT_EXECUTE_THREAD_PREFIX = "JobSubmitExecute";
+  private final int jobTimeoutTaskRetryCount;
+  private final int jobTimeoutTaskRetryIntervalInSec;
+
+  /**
+   * Current thread used to set in execution threads.
+   */
+  private final String submitThreadId = Thread.currentThread().getName();
+
+  /**
+   * Job request executor to submit job requests.
+   */
+  private static JobRequestExecutor<EnqueueBean> jobRequest =
+                   new 
JobRequestExecutor<EnqueueBean>(JobRequestExecutor.JobRequestType.Submit,
+                   AppConfig.JOB_SUBMIT_MAX_THREADS, 
AppConfig.JOB_SUBMIT_TIMEOUT, false);
 
   public LauncherDelegator(AppConfig appConf) {
     super(appConf);
+    jobTimeoutTaskRetryCount = 
appConf.getInt(AppConfig.JOB_TIMEOUT_TASK_RETRY_COUNT, 0);
+    jobTimeoutTaskRetryIntervalInSec = 
appConf.getInt(AppConfig.JOB_TIMEOUT_TASK_RETRY_INTERVAL, 0);
   }
 
   public void registerJob(String id, String user, String callback,
@@ -70,11 +89,93 @@ public class LauncherDelegator extends TempletonDelegator {
     }
   }
 
+  /*
+   * Submit job request. If maximum concurrent job submit requests are 
configured then submit
+   * request will be executed on a thread from thread pool. If job submit 
request time out is
+   * configured then request execution thread will be interrupted if thread 
times out. Also
+   * does best efforts to identify if job is submitted and kill it quietly.
+   */
+  public EnqueueBean enqueueController(final String user, final Map<String, 
Object> userArgs,
+                     final String callback, final List<String> args)
+    throws NotAuthorizedException, BusyException, IOException, QueueException, 
TooManyRequestsException {
+
+    EnqueueBean bean = null;
+    final TempletonControllerJob controllerJob = getTempletonController();
+
+    if (jobRequest.isThreadPoolEnabled()) {
+      JobCallable<EnqueueBean> jobExecuteCallable = getJobSubmitTask(user, 
userArgs, callback,
+                                                                     args, 
controllerJob);
+      try {
+        bean = jobRequest.execute(jobExecuteCallable);
+      } catch (TimeoutException ex) {
+       /*
+        * Job request got timed out. Job kill should have started. Return to 
client with
+        * QueueException.
+        */
+        throw new QueueException(ex.getMessage());
+      } catch (InterruptedException ex) {
+       /*
+        * Job request got interrupted. Job kill should have started. Return to 
client with
+        * with QueueException.
+        */
+        throw new QueueException(ex.getMessage());
+      } catch (ExecutionException ex) {
+        /*
+         * ExecutionException is raised if job execution gets an exception. 
Return to client
+         * with the exception.
+         */
+        throw new QueueException(ex.getMessage());
+      }
+    } else {
+      LOG.info("No thread pool configured for submit job request. Executing "
+                      + "the job request in current thread.");
+
+      bean = enqueueJob(user, userArgs, callback, args, controllerJob);
+    }
+
+    return bean;
+  }
+
+  /*
+   * Job callable task for job submit operation. Overrides behavior of 
execute()
+   * to submit job. Also, overrides the behavior of cleanup() to kill the job 
in case
+   * job submission request is timed out or interrupted.
+   */
+  private JobCallable<EnqueueBean> getJobSubmitTask(final String user,
+                     final Map<String, Object> userArgs, final String callback,
+                     final List<String> args, final TempletonControllerJob 
controllerJob) {
+      return new JobCallable<EnqueueBean>() {
+        @Override
+        public EnqueueBean execute() throws NotAuthorizedException, 
BusyException, IOException,
+                                       QueueException {
+         /*
+          * Change the current thread name to include parent thread Id if it 
is executed
+          * in thread pool. Useful to extract logs specific to a job request 
and helpful
+          * to debug job issues.
+          */
+          Thread.currentThread().setName(String.format("%s-%s-%s", 
JOB_SUBMIT_EXECUTE_THREAD_PREFIX,
+                                       submitThreadId, 
Thread.currentThread().getId()));
+
+          return enqueueJob(user, userArgs, callback, args, controllerJob);
+        }
+
+        @Override
+        public void cleanup() {
+          /*
+           * Failed to set job status as COMPLETED which mean the main thread 
would have
+           * exited and not waiting for the result. Kill the submitted job.
+           */
+          LOG.info("Job kill not done by main thread. Trying to kill now.");
+          killTempletonJobWithRetry(user, controllerJob.getSubmittedId());
+        }
+      };
+  }
+
   /**
    * Enqueue the TempletonControllerJob directly calling doAs.
    */
-  public EnqueueBean enqueueController(String user, Map<String, Object> 
userArgs, String callback,
-                     List<String> args)
+  public EnqueueBean enqueueJob(String user, Map<String, Object> userArgs, 
String callback,
+                     List<String> args, TempletonControllerJob controllerJob)
     throws NotAuthorizedException, BusyException,
     IOException, QueueException {
     try {
@@ -82,7 +183,7 @@ public class LauncherDelegator extends TempletonDelegator {
 
       final long startTime = System.nanoTime();
 
-      String id = queueAsUser(ugi, args);
+      String id = queueAsUser(ugi, args, controllerJob);
 
       long elapsed = ((System.nanoTime() - startTime) / ((int) 1e6));
       LOG.debug("queued job " + id + " in " + elapsed + " ms");
@@ -99,21 +200,84 @@ public class LauncherDelegator extends TempletonDelegator {
     }
   }
 
-  private String queueAsUser(UserGroupInformation ugi, final List<String> args)
+  private String queueAsUser(UserGroupInformation ugi, final List<String> args,
+                            final TempletonControllerJob controllerJob)
     throws IOException, InterruptedException {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Launching job: " + args);
     }
     return ugi.doAs(new PrivilegedExceptionAction<String>() {
       public String run() throws Exception {
-        String[] array = new String[args.size()];
-        TempletonControllerJob ctrl = new 
TempletonControllerJob(secureMeatastoreAccess, appConf);
-        ToolRunner.run(ctrl, args.toArray(array));
-        return ctrl.getSubmittedId();
+        runTempletonControllerJob(controllerJob, args);
+        return controllerJob.getSubmittedId();
       }
     });
   }
 
+  /*
+   * Kills templeton job with multiple retries if job exists. Returns true if 
kill job
+   * attempt is success. Otherwise returns false.
+   */
+  private boolean killTempletonJobWithRetry(String user, String jobId) {
+    /*
+     * Make null safe Check if the job submission has gone through and if job 
is valid.
+     */
+    if (StringUtils.startsWith(jobId, "job_")) {
+      LOG.info("Started killing the job " + jobId);
+
+      boolean success = false;
+      int count = 0;
+      do {
+        try {
+          count++;
+          killJob(user, jobId);
+          success = true;
+          LOG.info("Kill job attempt succeeded.");
+         } catch (Exception e) {
+          LOG.info("Failed to kill the job due to exception: " + 
e.getMessage());
+          LOG.info("Waiting for " + jobTimeoutTaskRetryIntervalInSec + "s 
before retrying "
+                       + "the operation. Iteration: " + count);
+          try {
+            Thread.sleep(jobTimeoutTaskRetryIntervalInSec * 1000);
+          } catch (InterruptedException ex) {
+            LOG.info("Got interrupted while waiting for next retry.");
+          }
+        }
+      } while (!success && count < jobTimeoutTaskRetryCount);
+
+      return success;
+    } else {
+      LOG.info("Couldn't find a valid job id after job request is timed out.");
+      return false;
+    }
+  }
+
+  /*
+   * Gets new templeton controller objects.
+   */
+  protected TempletonControllerJob getTempletonController() {
+    return new TempletonControllerJob(secureMeatastoreAccess, appConf);
+  }
+
+  /*
+   * Runs the templeton controller job with 'args'. Utilizes ToolRunner to run
+   * the actual job.
+   */
+  protected int runTempletonControllerJob(TempletonControllerJob 
controllerJob, List<String> args)
+    throws IOException, InterruptedException, TimeoutException, Exception {
+    String[] array = new String[args.size()];
+    return ToolRunner.run(controllerJob, args.toArray(array));
+  }
+
+  /*
+   * Uses DeleteDelegator to kill a job and ignores all exceptions.
+   */
+  protected void killJob(String user, String jobId)
+  throws NotAuthorizedException, BadParam, IOException, InterruptedException {
+    DeleteDelegator d = new DeleteDelegator(appConf);
+    d.run(user, jobId);
+  }
+
   public List<String> makeLauncherArgs(AppConfig appConf, String statusdir,
                      String completedUrl,
                      List<String> copyFiles,
@@ -263,7 +427,7 @@ public class LauncherDelegator extends TempletonDelegator {
   }
   /**
    * This is called by subclasses when they determined that the sumbmitted job 
requires
-   * metastore access (e.g. Pig job that uses HCatalog).  This then determines 
if 
+   * metastore access (e.g. Pig job that uses HCatalog).  This then determines 
if
    * secure access is required and causes TempletonControllerJob to set up a 
delegation token.
    * @see TempletonControllerJob
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java
 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java
index a30ecd1..ed4cea9 100644
--- 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java
+++ 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java
@@ -19,9 +19,14 @@
 package org.apache.hive.hcatalog.templeton;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.mapred.JobStatus;
@@ -31,20 +36,81 @@ import org.apache.hadoop.security.UserGroupInformation;
  * List jobs owned by a user.
  */
 public class ListDelegator extends TempletonDelegator {
+  private static final Log LOG = LogFactory.getLog(ListDelegator.class);
+  private final String JOB_LIST_EXECUTE_THREAD_PREFIX = "JobListExecute";
+
+  /**
+   * Current thread id used to set in execution threads.
+   */
+  private final String listThreadId = Thread.currentThread().getName();
+
+  /*
+   * Job request executor to list job status requests.
+   */
+  private static JobRequestExecutor<List<JobItemBean>> jobRequest =
+                   new 
JobRequestExecutor<List<JobItemBean>>(JobRequestExecutor.JobRequestType.List,
+                   AppConfig.JOB_LIST_MAX_THREADS, AppConfig.JOB_LIST_TIMEOUT);
+
   public ListDelegator(AppConfig appConf) {
     super(appConf);
   }
 
-  public List<String> run(String user, boolean showall)
+  /*
+   * List status jobs request. If maximum concurrent job list requests are 
configured then
+   * list request will be executed on a thread from thread pool. If job list 
request time out
+   * is configured then request execution thread will be interrupted if thread 
times out and
+   * does no action.
+   */
+  public List<JobItemBean> run(final String user, final boolean showall, final 
String jobId,
+                               final int numRecords, final boolean showDetails)
+    throws NotAuthorizedException, BadParam, IOException, 
InterruptedException, BusyException,
+           TimeoutException, ExecutionException, TooManyRequestsException {
+
+    if (jobRequest.isThreadPoolEnabled()) {
+      return jobRequest.execute(getJobListTask(user, showall, 
jobId,numRecords, showDetails));
+    } else {
+      return listJobs(user, showall, jobId, numRecords, showDetails);
+    }
+  }
+
+  /*
+   * Job callable task for job list operation. Overrides behavior of execute() 
to list jobs.
+   * No need to override behavior of cleanup() as there is nothing to be done 
if list jobs
+   * operation is timed out or interrupted.
+   */
+  private JobCallable<List<JobItemBean>> getJobListTask(final String user, 
final boolean showall,
+                      final String jobId, final int numRecords, final boolean 
showDetails) {
+    return new JobCallable<List<JobItemBean>>() {
+      @Override
+      public List<JobItemBean> execute() throws NotAuthorizedException, 
BadParam, IOException,
+                                             InterruptedException {
+       /*
+        * Change the current thread name to include parent thread Id if it is 
executed
+        * in thread pool. Useful to extract logs specific to a job request and 
helpful
+        * to debug job issues.
+        */
+        Thread.currentThread().setName(String.format("%s-%s-%s", 
JOB_LIST_EXECUTE_THREAD_PREFIX,
+                                       listThreadId, 
Thread.currentThread().getId()));
+
+        return listJobs(user, showall, jobId, numRecords, showDetails);
+      }
+    };
+  }
+
+  /*
+   * Gets list of job ids and calls getJobStatus to get status for each job id.
+   */
+  public List<JobItemBean> listJobs(String user, boolean showall, String jobId,
+                                    int numRecords, boolean showDetails)
     throws NotAuthorizedException, BadParam, IOException, InterruptedException 
{
 
     UserGroupInformation ugi = UgiFactory.getUgi(user);
     WebHCatJTShim tracker = null;
+    ArrayList<String> ids = new ArrayList<String>();
+
     try {
       tracker = ShimLoader.getHadoopShims().getWebHCatShim(appConf, ugi);
 
-      ArrayList<String> ids = new ArrayList<String>();
-
       JobStatus[] jobs = tracker.getAllJobs();
 
       if (jobs != null) {
@@ -54,13 +120,79 @@ public class ListDelegator extends TempletonDelegator {
             ids.add(id);
         }
       }
-
-      return ids;
     } catch (IllegalStateException e) {
       throw new BadParam(e.getMessage());
     } finally {
       if (tracker != null)
         tracker.close();
     }
+
+    return getJobStatus(ids, user, showall, jobId, numRecords, showDetails);
+  }
+
+  /*
+   * Returns job status for list of input jobs as a list.
+   */
+  public List<JobItemBean> getJobStatus(ArrayList<String> jobIds, String user, 
boolean showall,
+                                       String jobId, int numRecords, boolean 
showDetails)
+                                       throws IOException, 
InterruptedException {
+
+    List<JobItemBean> detailList = new ArrayList<JobItemBean>();
+    int currRecord = 0;
+
+    // Sort the list as requested
+    boolean isAscendingOrder = true;
+    switch (appConf.getListJobsOrder()) {
+    case lexicographicaldesc:
+      Collections.sort(jobIds, Collections.reverseOrder());
+      isAscendingOrder = false;
+      break;
+    case lexicographicalasc:
+    default:
+      Collections.sort(jobIds);
+      break;
+    }
+
+    for (String job : jobIds) {
+      // If numRecords = -1, fetch all records.
+      // Hence skip all the below checks when numRecords = -1.
+      if (numRecords != -1) {
+        // If currRecord >= numRecords, we have already fetched the top 
#numRecords
+        if (currRecord >= numRecords) {
+          break;
+        }
+        else if (jobId == null || jobId.trim().length() == 0) {
+            currRecord++;
+        }
+        // If the current record needs to be returned based on the
+        // filter conditions specified by the user, increment the counter
+        else if (isAscendingOrder && job.compareTo(jobId) > 0 || 
!isAscendingOrder && job.compareTo(jobId) < 0) {
+          currRecord++;
+        }
+        // The current record should not be included in the output detailList.
+        else {
+          continue;
+        }
+      }
+      JobItemBean jobItem = new JobItemBean();
+      jobItem.id = job;
+      if (showDetails) {
+        StatusDelegator sd = new StatusDelegator(appConf);
+        try {
+          jobItem.detail = sd.run(user, job, false);
+        }
+        catch(Exception ex) {
+          /*
+           * if we could not get status for some reason, log it, and send 
empty status back with
+           * just the ID so that caller knows to even look in the log file
+           */
+          LOG.info("Failed to get status detail for jobId='" + job + "'", ex);
+          jobItem.detail = new QueueStatusBean(job, "Failed to retrieve 
status; see WebHCat logs");
+        }
+      }
+      detailList.add(jobItem);
+    }
+
+    return detailList;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
index aeb89df..663e4b6 100644
--- 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
+++ 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
@@ -51,7 +51,7 @@ public class PigDelegator extends LauncherDelegator {
                boolean usesHcatalog, String completedUrl, boolean enablelog,
                Boolean enableJobReconnect)
     throws NotAuthorizedException, BadParam, BusyException, QueueException,
-    ExecuteException, IOException, InterruptedException {
+    ExecuteException, IOException, InterruptedException, 
TooManyRequestsException {
     runAs = user;
     List<String> args = makeArgs(execute,
       srcFile, pigArgs,

http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
index 2da0204..43a7d57 100644
--- 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
+++ 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
@@ -27,6 +27,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.servlet.http.HttpServletRequest;
@@ -650,7 +652,7 @@ public class Server {
                       @FormParam("enablelog") boolean enablelog,
                       @FormParam("enablejobreconnect") Boolean 
enablejobreconnect)
     throws NotAuthorizedException, BusyException, BadParam, QueueException,
-    ExecuteException, IOException, InterruptedException {
+    ExecuteException, IOException, InterruptedException, 
TooManyRequestsException {
     verifyUser();
     verifyParam(inputs, "input");
     verifyParam(mapper, "mapper");
@@ -704,7 +706,7 @@ public class Server {
                   @FormParam("enablelog") boolean enablelog,
                   @FormParam("enablejobreconnect") Boolean enablejobreconnect)
     throws NotAuthorizedException, BusyException, BadParam, QueueException,
-    ExecuteException, IOException, InterruptedException {
+    ExecuteException, IOException, InterruptedException, 
TooManyRequestsException {
     verifyUser();
     verifyParam(jar, "jar");
     verifyParam(mainClass, "class");
@@ -754,7 +756,7 @@ public class Server {
                @FormParam("enablelog") boolean enablelog,
                @FormParam("enablejobreconnect") Boolean enablejobreconnect)
     throws NotAuthorizedException, BusyException, BadParam, QueueException,
-    ExecuteException, IOException, InterruptedException {
+    ExecuteException, IOException, InterruptedException, 
TooManyRequestsException {
     verifyUser();
     if (execute == null && srcFile == null) {
       throw new BadParam("Either execute or file parameter required");
@@ -805,7 +807,7 @@ public class Server {
               @FormParam("enablelog") boolean enablelog,
               @FormParam("enablejobreconnect") Boolean enablejobreconnect)
     throws NotAuthorizedException, BusyException, BadParam, QueueException,
-    IOException, InterruptedException {
+    IOException, InterruptedException, TooManyRequestsException {
     verifyUser();
     if (command == null && optionsFile == null)
       throw new BadParam("Must define Sqoop command or a optionsfile contains 
Sqoop command to run Sqoop job.");
@@ -859,7 +861,7 @@ public class Server {
               @FormParam("enablelog") boolean enablelog,
               @FormParam("enablejobreconnect") Boolean enablejobreconnect)
     throws NotAuthorizedException, BusyException, BadParam, QueueException,
-    ExecuteException, IOException, InterruptedException {
+    ExecuteException, IOException, InterruptedException, 
TooManyRequestsException {
     verifyUser();
     if (execute == null && srcFile == null) {
       throw new BadParam("Either execute or file parameter required");
@@ -891,7 +893,8 @@ public class Server {
   @Path("jobs/{jobid}")
   @Produces({MediaType.APPLICATION_JSON})
   public QueueStatusBean showJobId(@PathParam("jobid") String jobid)
-    throws NotAuthorizedException, BadParam, IOException, InterruptedException 
{
+    throws NotAuthorizedException, BadParam, IOException, InterruptedException,
+           BusyException, TimeoutException, ExecutionException, 
TooManyRequestsException {
 
     verifyUser();
     verifyParam(jobid, ":jobid");
@@ -968,7 +971,8 @@ public class Server {
                                        @QueryParam("showall") boolean showall,
                                        @QueryParam("jobid") String jobid,
                                        @QueryParam("numrecords") String 
numrecords)
-    throws NotAuthorizedException, BadParam, IOException, InterruptedException 
{
+    throws NotAuthorizedException, BadParam, IOException, InterruptedException,
+    BusyException, TimeoutException, ExecutionException, 
TooManyRequestsException {
 
     verifyUser();
 
@@ -980,19 +984,14 @@ public class Server {
       showDetails = true;
     }
 
-    ListDelegator ld = new ListDelegator(appConf);
-    List<String> list = ld.run(getDoAsUser(), showall);
-    List<JobItemBean> detailList = new ArrayList<JobItemBean>();
-    int currRecord = 0;
     int numRecords;
-
     // Parse numrecords to an integer
     try {
       if (numrecords != null) {
         numRecords = Integer.parseInt(numrecords);
-  if (numRecords <= 0) {
-    throw new BadParam("numrecords should be an integer > 0");
-  }
+        if (numRecords <= 0) {
+          throw new BadParam("numrecords should be an integer > 0");
+        }
       }
       else {
         numRecords = -1;
@@ -1002,57 +1001,8 @@ public class Server {
       throw new BadParam("Invalid numrecords format: numrecords should be an 
integer > 0");
     }
 
-    // Sort the list as requested
-    boolean isAscendingOrder = true;
-    switch (appConf.getListJobsOrder()) {
-    case lexicographicaldesc:
-      Collections.sort(list, Collections.reverseOrder());
-      isAscendingOrder = false;
-      break;
-    case lexicographicalasc:
-    default:
-      Collections.sort(list);
-      break;
-    }
-
-    for (String job : list) {
-      // If numRecords = -1, fetch all records.
-      // Hence skip all the below checks when numRecords = -1.
-      if (numRecords != -1) {
-        // If currRecord >= numRecords, we have already fetched the top 
#numRecords
-        if (currRecord >= numRecords) {
-          break;
-        }
-        else if (jobid == null || jobid.trim().length() == 0) {
-            currRecord++;
-        }
-        // If the current record needs to be returned based on the
-        // filter conditions specified by the user, increment the counter
-        else if (isAscendingOrder && job.compareTo(jobid) > 0 || 
!isAscendingOrder && job.compareTo(jobid) < 0) {
-          currRecord++;
-        }
-        // The current record should not be included in the output detailList.
-        else {
-          continue;
-        }
-      }
-      JobItemBean jobItem = new JobItemBean();
-      jobItem.id = job;
-      if (showDetails) {
-        StatusDelegator sd = new StatusDelegator(appConf);
-        try {
-          jobItem.detail = sd.run(getDoAsUser(), job);
-        }
-        catch(Exception ex) {
-          /*if we could not get status for some reason, log it, and send empty 
status back with
-          * just the ID so that caller knows to even look in the log file*/
-          LOG.info("Failed to get status detail for jobId='" + job + "'", ex);
-          jobItem.detail = new QueueStatusBean(job, "Failed to retrieve 
status; see WebHCat logs");
-        }
-      }
-      detailList.add(jobItem);
-    }
-    return detailList;
+    ListDelegator ld = new ListDelegator(appConf);
+    return ld.run(getDoAsUser(), showall, jobid, numRecords, showDetails);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java
 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java
index fde5f60..eb84fb2 100644
--- 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java
+++ 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java
@@ -50,7 +50,7 @@ public class SqoopDelegator extends LauncherDelegator {
                String callback, String completedUrl, boolean enablelog,
                Boolean enableJobReconnect, String libdir)
   throws NotAuthorizedException, BadParam, BusyException, QueueException,
-  IOException, InterruptedException
+  IOException, InterruptedException, TooManyRequestsException
   {
     if(TempletonUtils.isset(appConf.sqoopArchive())) {
       if(!TempletonUtils.isset(appConf.sqoopPath()) && 
!TempletonUtils.isset(appConf.sqoopHome())) {

http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java
 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java
index fac0170..4112eef 100644
--- 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java
+++ 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java
@@ -19,6 +19,8 @@
 package org.apache.hive.hcatalog.templeton;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,12 +43,71 @@ import org.apache.hive.hcatalog.templeton.tool.JobState;
  */
 public class StatusDelegator extends TempletonDelegator {
   private static final Logger LOG = 
LoggerFactory.getLogger(StatusDelegator.class);
+  private final String JOB_STATUS_EXECUTE_THREAD_PREFIX = "JobStatusExecute";
+
+  /**
+   * Current thread id used to set in execution threads.
+   */
+  private final String statusThreadId = Thread.currentThread().getName();
+
+  /*
+   * Job status request executor to get status of a job.
+   */
+  private static JobRequestExecutor<QueueStatusBean> jobRequest =
+                   new 
JobRequestExecutor<QueueStatusBean>(JobRequestExecutor.JobRequestType.Status,
+                   AppConfig.JOB_STATUS_MAX_THREADS, 
AppConfig.JOB_STATUS_TIMEOUT);
 
   public StatusDelegator(AppConfig appConf) {
     super(appConf);
   }
 
-  public QueueStatusBean run(String user, String id)
+  /*
+   * Gets status of job form job id. If maximum concurrent job status requests 
are configured
+   * then status request will be executed on a thread from thread pool. If job 
status request
+   * time out is configured then request execution thread will be interrupted 
if thread
+   * times out and does no action.
+   */
+  public QueueStatusBean run(final String user, final String id, boolean 
enableThreadPool)
+    throws NotAuthorizedException, BadParam, IOException, InterruptedException,
+           BusyException, TimeoutException, ExecutionException, 
TooManyRequestsException {
+    if (jobRequest.isThreadPoolEnabled() && enableThreadPool) {
+      return jobRequest.execute(getJobStatusCallableTask(user, id));
+    } else {
+      return getJobStatus(user, id);
+    }
+  }
+
+  /*
+   * Job callable task for job status operation. Overrides behavior of 
execute() to get
+   * status of a job. No need to override behavior of cleanup() as there is 
nothing to be
+   * done if job sttaus operation is timed out or interrupted.
+   */
+  private JobCallable<QueueStatusBean> getJobStatusCallableTask(final String 
user,
+                                 final String id) {
+    return new JobCallable<QueueStatusBean>() {
+      @Override
+      public QueueStatusBean execute() throws NotAuthorizedException, 
BadParam, IOException,
+                                    InterruptedException, BusyException {
+       /*
+        * Change the current thread name to include parent thread Id if it is 
executed
+        * in thread pool. Useful to extract logs specific to a job request and 
helpful
+        * to debug job issues.
+        */
+        Thread.currentThread().setName(String.format("%s-%s-%s", 
JOB_STATUS_EXECUTE_THREAD_PREFIX,
+                                       statusThreadId, 
Thread.currentThread().getId()));
+
+        return getJobStatus(user, id);
+      }
+    };
+  }
+
+  public QueueStatusBean run(final String user, final String id)
+    throws NotAuthorizedException, BadParam, IOException, InterruptedException,
+           BusyException, TimeoutException, ExecutionException, 
TooManyRequestsException {
+    return run(user, id, true);
+  }
+
+  public QueueStatusBean getJobStatus(String user, String id)
     throws NotAuthorizedException, BadParam, IOException, InterruptedException
   {
     WebHCatJTShim tracker = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
index 839b56a..590e49f 100644
--- 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
+++ 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
@@ -51,7 +51,7 @@ public class StreamingDelegator extends LauncherDelegator {
                Boolean enableJobReconnect,
                JobType jobType)
     throws NotAuthorizedException, BadParam, BusyException, QueueException,
-    ExecuteException, IOException, InterruptedException {
+    ExecuteException, IOException, InterruptedException, 
TooManyRequestsException {
       List<String> args = makeArgs(inputs, inputreader, output, mapper, 
reducer, combiner,
       fileList, cmdenvs, jarArgs);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TooManyRequestsException.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TooManyRequestsException.java
 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TooManyRequestsException.java
new file mode 100644
index 0000000..9d55ad4
--- /dev/null
+++ 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TooManyRequestsException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton;
+
+/**
+ * Raise this exception if web service is busy with existing requests and not 
able
+ * service new requests.
+ */
+public class TooManyRequestsException extends SimpleWebException {
+  /*
+   * The current version of jetty server doesn't have the status
+   * HttpStatus.TOO_MANY_REQUESTS_429. Hence, passing this as constant.
+   */
+  public static int TOO_MANY_REQUESTS_429 = 429;
+
+  public TooManyRequestsException(String msg) {
+    super(TOO_MANY_REQUESTS_429, msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
index 15ab8b9..f4c4b76 100644
--- 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
+++ 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
@@ -81,9 +81,14 @@ public class TempletonControllerJob extends Configured 
implements Tool, JobSubmi
     this.appConf = conf;
   }
 
-  private JobID submittedJobId;
+  private Job job = null;
 
   public String getSubmittedId() {
+    if (job == null ) {
+      return null;
+    }
+
+    JobID submittedJobId = job.getJobID();
     if (submittedJobId == null) {
       return null;
     } else {
@@ -119,7 +124,7 @@ public class TempletonControllerJob extends Configured 
implements Tool, JobSubmi
 
     String user = UserGroupInformation.getCurrentUser().getShortUserName();
     conf.set("user.name", user);
-    Job job = new Job(conf);
+    job = new Job(conf);
     job.setJarByClass(LaunchMapper.class);
     job.setJobName(TempletonControllerJob.class.getSimpleName());
     job.setMapperClass(LaunchMapper.class);
@@ -141,7 +146,7 @@ public class TempletonControllerJob extends Configured 
implements Tool, JobSubmi
 
     job.submit();
 
-    submittedJobId = job.getJobID();
+    JobID submittedJobId = job.getJobID();
     if(metastoreTokenStrForm != null) {
       //so that it can be cancelled later from CompleteDelegator
       DelegationTokenCache.getStringFormTokenCache().storeDelegationToken(

http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/ConcurrentJobRequestsTestBase.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/ConcurrentJobRequestsTestBase.java
 
b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/ConcurrentJobRequestsTestBase.java
new file mode 100644
index 0000000..5fcae46
--- /dev/null
+++ 
b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/ConcurrentJobRequestsTestBase.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.Future;
+
+import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+
+/*
+ * Base class for mocking job operations with concurrent requests.
+ */
+public class ConcurrentJobRequestsTestBase {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ConcurrentJobRequestsTestBase.class);
+  private boolean started = false;
+  private Object lock = new Object();
+
+  MockAnswerTestHelper<QueueStatusBean> statusJobHelper = new 
MockAnswerTestHelper<QueueStatusBean>();
+  MockAnswerTestHelper<QueueStatusBean> killJobHelper = new 
MockAnswerTestHelper<QueueStatusBean>();
+  MockAnswerTestHelper<List<JobItemBean>> listJobHelper = new 
MockAnswerTestHelper<List<JobItemBean>>();
+  MockAnswerTestHelper<Integer> submitJobHelper = new 
MockAnswerTestHelper<Integer>();
+
+  /*
+   * Waits for other threads to join and returns with its Id.
+   */
+  private int waitForAllThreadsToStart(JobRunnable jobRunnable, int 
poolThreadCount) {
+    int currentId = jobRunnable.threadStartCount.incrementAndGet();
+    LOG.info("Waiting for other threads with thread id: " + currentId);
+    synchronized(lock) {
+      /*
+       * We need a total of poolThreadCount + 1 threads to start at same. 
There are
+       * poolThreadCount threads in thread pool and another one which has 
started them.
+       * The thread which sees atomic counter as poolThreadCount+1 is the last 
thread`
+       * to join and wake up all threads to start all at once.
+       */
+      if (currentId > poolThreadCount) {
+        LOG.info("Waking up all threads: " + currentId);
+        started = true;
+        this.lock.notifyAll();
+      } else {
+        while (!started) {
+          try {
+            this.lock.wait();
+          } catch (InterruptedException ignore) {
+          }
+        }
+      }
+    }
+
+    return currentId;
+  }
+
+  public JobRunnable ConcurrentJobsStatus(final int threadCount, AppConfig 
appConfig,
+         final boolean killThreads, boolean interruptThreads, final 
Answer<QueueStatusBean> answer)
+         throws IOException, InterruptedException, QueueException, 
NotAuthorizedException,
+         BadParam, BusyException {
+
+    StatusDelegator delegator = new StatusDelegator(appConfig);
+    final StatusDelegator mockDelegator = Mockito.spy(delegator);
+
+    
Mockito.doAnswer(answer).when(mockDelegator).getJobStatus(Mockito.any(String.class),
+                             Mockito.any(String.class));
+
+    JobRunnable statusJobRunnable = new JobRunnable() {
+      @Override
+      public void run() {
+        try {
+          int threadId = waitForAllThreadsToStart(this, threadCount);
+          LOG.info("Started executing Job Status operation. ThreadId : " + 
threadId);
+          mockDelegator.run("admin", "job_1000" + threadId);
+        } catch (Exception ex) {
+          exception = ex;
+        }
+      }
+    };
+
+    executeJobOperations(statusJobRunnable, threadCount, killThreads, 
interruptThreads);
+    return statusJobRunnable;
+  }
+
+  public JobRunnable ConcurrentListJobs(final int threadCount, AppConfig 
config,
+         final boolean killThreads, boolean interruptThreads, final 
Answer<List<JobItemBean>> answer)
+         throws IOException, InterruptedException, QueueException, 
NotAuthorizedException,
+         BadParam, BusyException {
+
+    ListDelegator delegator = new ListDelegator(config);
+    final ListDelegator mockDelegator = Mockito.spy(delegator);
+
+    
Mockito.doAnswer(answer).when(mockDelegator).listJobs(Mockito.any(String.class),
+                             Mockito.any(boolean.class), 
Mockito.any(String.class),
+                             Mockito.any(int.class), 
Mockito.any(boolean.class));
+
+    JobRunnable listJobRunnable = new JobRunnable() {
+      @Override
+      public void run() {
+        try {
+          int threadId = waitForAllThreadsToStart(this, threadCount);
+          LOG.info("Started executing Job List operation. ThreadId : " + 
threadId);
+          mockDelegator.run("admin", true, "", 10, true);
+        } catch (Exception ex) {
+          exception = ex;
+        }
+      }
+    };
+
+    executeJobOperations(listJobRunnable, threadCount, killThreads, 
interruptThreads);
+    return listJobRunnable;
+  }
+
+  public JobRunnable SubmitConcurrentJobs(final int threadCount, AppConfig 
config,
+         final boolean killThreads, boolean interruptThreads, final 
Answer<Integer> responseAnswer,
+         final Answer<QueueStatusBean> timeoutResponseAnswer, final String 
jobIdResponse)
+         throws IOException, InterruptedException, QueueException, 
NotAuthorizedException,
+         BusyException, TimeoutException, Exception {
+
+    LauncherDelegator delegator = new LauncherDelegator(config);
+    final LauncherDelegator mockDelegator = Mockito.spy(delegator);
+    final List<String> listArgs = new ArrayList<String>();
+
+    TempletonControllerJob mockCtrl = 
Mockito.mock(TempletonControllerJob.class);
+
+    Mockito.doReturn(jobIdResponse).when(mockCtrl).getSubmittedId();
+
+    Mockito.doReturn(mockCtrl).when(mockDelegator).getTempletonController();
+
+    
Mockito.doAnswer(responseAnswer).when(mockDelegator).runTempletonControllerJob(
+              Mockito.any(TempletonControllerJob.class), 
Mockito.any(List.class));
+
+    Mockito.doAnswer(timeoutResponseAnswer).when(mockDelegator).killJob(
+              Mockito.any(String.class), Mockito.any(String.class));
+
+    
Mockito.doNothing().when(mockDelegator).registerJob(Mockito.any(String.class),
+           Mockito.any(String.class), Mockito.any(String.class), 
Mockito.any(Map.class));
+
+    JobRunnable submitJobRunnable = new JobRunnable() {
+      @Override
+      public void run() {
+        try {
+          int threadId = waitForAllThreadsToStart(this, threadCount);
+          LOG.info("Started executing Job Submit operation. ThreadId : " + 
threadId);
+          mockDelegator.enqueueController("admin", null, "", listArgs);
+        } catch (Throwable ex) {
+          exception = ex;
+        }
+      }
+    };
+
+    executeJobOperations(submitJobRunnable, threadCount, killThreads, 
interruptThreads);
+    return submitJobRunnable;
+  }
+
+  public void executeJobOperations(JobRunnable jobRunnable, int threadCount, 
boolean killThreads,
+                                   boolean interruptThreads)
+    throws IOException, InterruptedException, QueueException, 
NotAuthorizedException {
+
+    started = false;
+
+    ExecutorService executorService = new ThreadPoolExecutor(threadCount, 
threadCount, 0L,
+        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());;
+
+    ArrayList<Future<?>> futures = new ArrayList<Future<?>>();
+    for (int i = 0; i < threadCount; i++) {
+      futures.add(executorService.submit(jobRunnable));
+    }
+
+    waitForAllThreadsToStart(jobRunnable, threadCount);
+    LOG.info("Started all threads ");
+
+    if (killThreads) {
+      executorService.shutdownNow();
+    } else {
+      if (interruptThreads){
+        for (Future<?> future : futures) {
+          LOG.info("Cancelling the thread");
+          future.cancel(true);
+        }
+      }
+
+      executorService.shutdown();
+    }
+
+    /*
+     * For both graceful or forceful shutdown, wait for tasks to terminate 
such that
+     * appropriate exceptions are raised and stored in JobRunnable.exception.
+     */
+    if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+      LOG.info("Force Shutting down the pool\n");
+      if (!killThreads) {
+        /*
+         * killThreads option has already done force shutdown. No need to do 
again.
+         */
+        executorService.shutdownNow();
+      }
+    }
+  }
+
+  public abstract class JobRunnable implements Runnable {
+    public volatile Throwable exception = null;
+    public AtomicInteger threadStartCount = new AtomicInteger(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerTestHelper.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerTestHelper.java
 
b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerTestHelper.java
new file mode 100644
index 0000000..9f1744e
--- /dev/null
+++ 
b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerTestHelper.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton;
+
+import java.io.IOException;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/*
+ * Helper class to generate mocked response.
+ */
+public class MockAnswerTestHelper<T> {
+  public Answer<T> getIOExceptionAnswer() {
+    return new Answer<T>() {
+      @Override
+      public T answer(InvocationOnMock invocation) throws Exception {
+        throw new IOException("IOException raised manually.");
+      }
+    };
+  }
+
+  public Answer<T> getOutOfMemoryErrorAnswer() {
+    return new Answer<T>() {
+      @Override
+      public T answer(InvocationOnMock invocation) throws OutOfMemoryError {
+        throw new OutOfMemoryError("OutOfMemoryError raised manually.");
+      }
+    };
+  }
+
+  public Answer<T> getDelayedResonseAnswer(final int delayInSeconds, final T 
response) {
+    return new Answer<T>() {
+      @Override
+      public T answer(InvocationOnMock invocation) throws InterruptedException 
{
+        Thread.sleep(1000 * delayInSeconds);
+        return response;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java
 
b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java
new file mode 100644
index 0000000..695dcc6
--- /dev/null
+++ 
b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton;
+
+import java.util.ArrayList;
+
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import static org.junit.Assert.assertTrue;
+
+/*
+ * Test submission of concurrent job requests.
+ */
+public class TestConcurrentJobRequests extends ConcurrentJobRequestsTestBase {
+
+  private static AppConfig config;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @BeforeClass
+  public static void setUp() {
+    final String[] args = new String[] {};
+    Main main = new Main(args);
+    config = main.getAppConfigInstance();
+  }
+
+  @Test
+  public void ConcurrentJobsStatusSuccess() {
+    try {
+      JobRunnable jobRunnable = ConcurrentJobsStatus(6, config, false, false,
+                statusJobHelper.getDelayedResonseAnswer(4, new 
QueueStatusBean("job_1000", "Job not found")));
+      assertTrue(jobRunnable.exception == null);
+    } catch (Exception e) {
+      assertTrue(false);
+    }
+  }
+
+  @Test
+  public void ConcurrentListJobsSuccess() {
+    try {
+      JobRunnable jobRunnable = ConcurrentListJobs(6, config, false, false,
+                listJobHelper.getDelayedResonseAnswer(4, new 
ArrayList<JobItemBean>()));
+      assertTrue(jobRunnable.exception == null);
+    } catch (Exception e) {
+      assertTrue(false);
+    }
+  }
+
+  @Test
+  public void ConcurrentSubmitJobsSuccess() {
+    try {
+      JobRunnable jobRunnable = SubmitConcurrentJobs(6, config, false, false,
+                submitJobHelper.getDelayedResonseAnswer(4, 0),
+                killJobHelper.getDelayedResonseAnswer(4, null), "job_1000");
+      assertTrue(jobRunnable.exception == null);
+    } catch (Exception e) {
+      assertTrue(false);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreads.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreads.java
 
b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreads.java
new file mode 100644
index 0000000..6f8da40
--- /dev/null
+++ 
b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreads.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.TimeoutException;
+import org.eclipse.jetty.http.HttpStatus;
+
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import static org.junit.Assert.assertTrue;
+
+/*
+ * Test submission of concurrent job requests with the controlled number of 
concurrent
+ * Requests. Verify that we get busy exception and appropriate message.
+ */
+public class TestConcurrentJobRequestsThreads extends 
ConcurrentJobRequestsTestBase {
+
+  private static AppConfig config;
+  private static QueueStatusBean statusBean;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @BeforeClass
+  public static void setUp() {
+    final String[] args = new String[] {};
+    Main main = new Main(args);
+    config = main.getAppConfigInstance();
+    config.setInt(AppConfig.JOB_STATUS_MAX_THREADS, 5);
+    config.setInt(AppConfig.JOB_LIST_MAX_THREADS, 5);
+    config.setInt(AppConfig.JOB_SUBMIT_MAX_THREADS, 5);
+    statusBean = new QueueStatusBean("job_1000", "Job not found");
+  }
+
+  @Test
+  public void ConcurrentJobsStatusTooManyRequestsException() {
+    try {
+      JobRunnable jobRunnable = ConcurrentJobsStatus(6, config, false, false,
+                statusJobHelper.getDelayedResonseAnswer(4, statusBean));
+      assertTrue(jobRunnable.exception != null);
+      assertTrue(jobRunnable.exception instanceof TooManyRequestsException);
+      TooManyRequestsException ex = 
(TooManyRequestsException)jobRunnable.exception;
+      assertTrue(ex.httpCode == 
TooManyRequestsException.TOO_MANY_REQUESTS_429);
+      String expectedMessage = "Unable to service the status job request as 
templeton service is busy "
+                                 + "with too many status job requests. Please 
wait for some time before "
+                                 + "retrying the operation. Please refer to 
the config "
+                                 + "templeton.parallellism.job.status to 
configure concurrent requests.";
+      assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+      /*
+       * Verify that new job requests have no issues.
+       */
+      jobRunnable = ConcurrentJobsStatus(5, config, false, false,
+                statusJobHelper.getDelayedResonseAnswer(4, statusBean));
+      assertTrue(jobRunnable.exception == null);
+    } catch (Exception e) {
+      assertTrue(false);
+    }
+  }
+
+  @Test
+  public void ConcurrentListJobsTooManyRequestsException() {
+    try {
+      JobRunnable jobRunnable = ConcurrentListJobs(6, config, false, false,
+                listJobHelper.getDelayedResonseAnswer(4, new 
ArrayList<JobItemBean>()));
+      assertTrue(jobRunnable.exception != null);
+      assertTrue(jobRunnable.exception instanceof TooManyRequestsException);
+      TooManyRequestsException ex = 
(TooManyRequestsException)jobRunnable.exception;
+      assertTrue(ex.httpCode == 
TooManyRequestsException.TOO_MANY_REQUESTS_429);
+      String expectedMessage = "Unable to service the list job request as 
templeton service is busy "
+                               + "with too many list job requests. Please wait 
for some time before "
+                               + "retrying the operation. Please refer to the 
config "
+                               + "templeton.parallellism.job.list to configure 
concurrent requests.";
+      assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+      /*
+       * Verify that new job requests have no issues.
+       */
+      jobRunnable = ConcurrentListJobs(5, config, false, false,
+                listJobHelper.getDelayedResonseAnswer(4, new 
ArrayList<JobItemBean>()));
+      assertTrue(jobRunnable.exception == null);
+    } catch (Exception e) {
+      assertTrue(false);
+    }
+  }
+
+  @Test
+  public void ConcurrentSubmitJobsTooManyRequestsException() {
+    try {
+      JobRunnable jobRunnable = SubmitConcurrentJobs(6, config, false, false,
+                submitJobHelper.getDelayedResonseAnswer(4, 0),
+                killJobHelper.getDelayedResonseAnswer(0, statusBean), 
"job_1000");
+      assertTrue(jobRunnable.exception != null);
+      assertTrue(jobRunnable.exception instanceof TooManyRequestsException);
+      TooManyRequestsException ex = 
(TooManyRequestsException)jobRunnable.exception;
+      assertTrue(ex.httpCode == 
TooManyRequestsException.TOO_MANY_REQUESTS_429);
+      String expectedMessage = "Unable to service the submit job request as 
templeton service is busy "
+                                + "with too many submit job requests. Please 
wait for some time before "
+                                + "retrying the operation. Please refer to the 
config "
+                                + "templeton.parallellism.job.submit to 
configure concurrent requests.";
+      assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+      /*
+       * Verify that new job requests have no issues.
+       */
+      jobRunnable = SubmitConcurrentJobs(5, config, false, false,
+                submitJobHelper.getDelayedResonseAnswer(4, 0),
+                killJobHelper.getDelayedResonseAnswer(0, statusBean), 
"job_1000");
+      assertTrue(jobRunnable.exception == null);
+    } catch (Exception e) {
+      assertTrue(false);
+    }
+  }
+}

Reply via email to