HIVE-15947: Enhance Templeton service job operations reliability (Subramanyam Pattipaka, reviewed by Daniel Dai)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2f1d4b11 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2f1d4b11 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2f1d4b11 Branch: refs/heads/master Commit: 2f1d4b11ec5384a6995875f7fb2e560d325d6564 Parents: 74ca2ee Author: Daniel Dai <da...@hortonworks.com> Authored: Thu Mar 16 10:38:57 2017 -0700 Committer: Daniel Dai <da...@hortonworks.com> Committed: Thu Mar 16 10:38:57 2017 -0700 ---------------------------------------------------------------------- .../hive/hcatalog/templeton/AppConfig.java | 37 ++ .../hive/hcatalog/templeton/HiveDelegator.java | 2 +- .../hive/hcatalog/templeton/JarDelegator.java | 2 +- .../hive/hcatalog/templeton/JobCallable.java | 115 ++++++ .../hcatalog/templeton/JobRequestExecutor.java | 341 +++++++++++++++++ .../hcatalog/templeton/LauncherDelegator.java | 184 ++++++++- .../hive/hcatalog/templeton/ListDelegator.java | 142 ++++++- .../hive/hcatalog/templeton/PigDelegator.java | 2 +- .../apache/hive/hcatalog/templeton/Server.java | 82 +--- .../hive/hcatalog/templeton/SqoopDelegator.java | 2 +- .../hcatalog/templeton/StatusDelegator.java | 63 +++- .../hcatalog/templeton/StreamingDelegator.java | 2 +- .../templeton/TooManyRequestsException.java | 35 ++ .../templeton/tool/TempletonControllerJob.java | 11 +- .../ConcurrentJobRequestsTestBase.java | 231 ++++++++++++ .../templeton/MockAnswerTestHelper.java | 56 +++ .../templeton/TestConcurrentJobRequests.java | 79 ++++ .../TestConcurrentJobRequestsThreads.java | 134 +++++++ ...tConcurrentJobRequestsThreadsAndTimeout.java | 374 +++++++++++++++++++ 19 files changed, 1804 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java index 54d0907..0ea7d88 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java @@ -111,6 +111,43 @@ public class AppConfig extends Configuration { public static final String MR_AM_MEMORY_MB = "templeton.mr.am.memory.mb"; public static final String TEMPLETON_JOBSLIST_ORDER = "templeton.jobs.listorder"; + /* + * These parameters controls the maximum number of concurrent job submit/status/list + * operations in templeton service. If more number of concurrent requests comes then + * they will be rejected with BusyException. + */ + public static final String JOB_SUBMIT_MAX_THREADS = "templeton.parallellism.job.submit"; + public static final String JOB_STATUS_MAX_THREADS = "templeton.parallellism.job.status"; + public static final String JOB_LIST_MAX_THREADS = "templeton.parallellism.job.list"; + + /* + * These parameters controls the maximum time job submit/status/list operation is + * executed in templeton service. On time out, the execution is interrupted and + * TimeoutException is returned to client. On time out + * For list and status operation, there is no action needed as they are read requests. + * For submit operation, we do best effort to kill the job if its generated. Enabling + * this parameter may have following side effects + * 1) There is a possibility for having active job for some time when the client gets + * response for submit operation and a list operation from client could potential + * show the newly created job which may eventually be killed with no guarantees. + * 2) If submit operation retried by client then there is a possibility of duplicate + * jobs triggered. + * + * Time out configs should be configured in seconds. + * + */ + public static final String JOB_SUBMIT_TIMEOUT = "templeton.job.submit.timeout"; + public static final String JOB_STATUS_TIMEOUT = "templeton.job.status.timeout"; + public static final String JOB_LIST_TIMEOUT = "templeton.job.list.timeout"; + + /* + * If task execution time out is configured for submit operation then job may need to + * be killed on execution time out. These parameters controls the maximum number of + * retries and retry wait time in seconds for executing the time out task. + */ + public static final String JOB_TIMEOUT_TASK_RETRY_COUNT = "templeton.job.timeout.task.retry.count"; + public static final String JOB_TIMEOUT_TASK_RETRY_INTERVAL = "templeton.job.timeout.task.retry.interval"; + /** * see webhcat-default.xml */ http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java index f0296cb..1953028 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java @@ -49,7 +49,7 @@ public class HiveDelegator extends LauncherDelegator { String statusdir, String callback, String completedUrl, boolean enablelog, Boolean enableJobReconnect) throws NotAuthorizedException, BadParam, BusyException, QueueException, - ExecuteException, IOException, InterruptedException + ExecuteException, IOException, InterruptedException, TooManyRequestsException { runAs = user; List<String> args = makeArgs(execute, srcFile, defines, hiveArgs, otherFiles, statusdir, http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java index 84cd5b9..1246b40 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java @@ -46,7 +46,7 @@ public class JarDelegator extends LauncherDelegator { boolean usesHcatalog, String completedUrl, boolean enablelog, Boolean enableJobReconnect, JobType jobType) throws NotAuthorizedException, BadParam, BusyException, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, TooManyRequestsException { runAs = user; List<String> args = makeArgs(jar, mainClass, libjars, files, jarArgs, defines, http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java new file mode 100644 index 0000000..e703eff --- /dev/null +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobCallable.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import java.util.concurrent.Callable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class JobCallable<T> implements Callable<T> { + private static final Logger LOG = LoggerFactory.getLogger(JobCallable.class); + + static public enum JobState { + STARTED, + FAILED, + COMPLETED + } + + /* + * Job state of job request. Changes to the state are synchronized using + * setStateAndResult. This is required due to two different threads, + * main thread and job execute thread, tries to change state and organize + * clean up tasks. + */ + private JobState jobState = JobState.STARTED; + + /* + * Result of JobCallable task after successful task completion. This is + * expected to be set by the thread which executes JobCallable task. + */ + public T returnResult = null; + + /* + * Sets the job state to FAILED. Returns true if FAILED status is set. + * Otherwise, it returns false. + */ + public boolean setJobStateFailed() { + return setStateAndResult(JobState.FAILED, null); + } + + /* + * Sets the job state to COMPLETED and also sets the results value. Returns true + * if COMPLETED status is set. Otherwise, it returns false. + */ + public boolean setJobStateCompleted(T result) { + return setStateAndResult(JobState.COMPLETED, result); + } + + /* + * Sets the job state and result. Returns true if status and result are set. + * Otherwise, it returns false. + */ + private synchronized boolean setStateAndResult(JobState jobState, T result) { + if (this.jobState == JobState.STARTED) { + this.jobState = jobState; + this.returnResult = result; + return true; + } else { + LOG.info("Failed to set job state to " + jobState + " due to job state " + + this.jobState + ". Expected state is " + JobState.STARTED); + } + + return false; + } + + /* + * Executes the callable task with help of execute() call and gets the result + * of the task. It also sets job status as COMPLETED if state is not already + * set to FAILED and returns result to future. + */ + public T call() throws Exception { + + /* + * Don't catch any execution exceptions here and let the caller catch it. + */ + T result = this.execute(); + + if (!this.setJobStateCompleted(result)) { + /* + * Failed to set job status as COMPLETED which mean the main thread would have + * exited and not waiting for the result. Call cleanup() to execute any cleanup. + */ + cleanup(); + return null; + } + + return this.returnResult; + } + + /* + * Abstract method to be overridden for task execution. + */ + public abstract T execute() throws Exception; + + /* + * Cleanup method called to run cleanup tasks if job state is FAILED. By default, + * no cleanup is provided. + */ + public void cleanup() {} +} http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java new file mode 100644 index 0000000..9ac4588 --- /dev/null +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobRequestExecutor.java @@ -0,0 +1,341 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.Future; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JobRequestExecutor<T> { + private static final Logger LOG = LoggerFactory.getLogger(JobRequestExecutor.class); + private static AppConfig appConf = Main.getAppConfigInstance(); + + /* + * Thread pool to execute job requests. + */ + private ThreadPoolExecutor jobExecutePool = null; + + /* + * Type of job request. + */ + private JobRequestType requestType; + + /* + * Config name used to find the number of concurrent requests. + */ + private String concurrentRequestsConfigName; + + /* + * Config name used to find the maximum time job request can be executed. + */ + private String jobTimeoutConfigName; + + /* + * Job request execution time out in seconds. If it is 0 then request + * will not be timed out. + */ + private int requestExecutionTimeoutInSec = 0; + + /* + * Amount of time a thread can be alive in thread pool before cleaning this up. Core threads + * will not be cleanup from thread pool. + */ + private int threadKeepAliveTimeInHours = 1; + + /* + * Maximum number of times a cancel request is sent to job request execution + * thread. Future.cancel may not be able to interrupt the thread if it is + * blocked on network calls. + */ + private int maxTaskCancelRetryCount = 10; + + /* + * Wait time in milliseconds before another cancel request is made. + */ + private int maxTaskCancelRetryWaitTimeInMs = 1000; + + /* + * A flag to indicate whether to cancel the task when exception TimeoutException or + * InterruptedException or CancellationException raised. The default is cancel thread. + */ + private boolean enableCancelTask = true; + + /* + * Job Request type. + */ + public enum JobRequestType { + Submit, + Status, + List + } + + /* + * Creates a job request object and sets up execution environment. Creates a thread pool + * to execute job requests. + * + * @param requestType + * Job request type + * + * @param concurrentRequestsConfigName + * Config name to be used to extract number of concurrent requests to be serviced. + * + * @param jobTimeoutConfigName + * Config name to be used to extract maximum time a task can execute a request. + * + * @param enableCancelTask + * A flag to indicate whether to cancel the task when exception TimeoutException + * or InterruptedException or CancellationException raised. + * + */ + public JobRequestExecutor(JobRequestType requestType, String concurrentRequestsConfigName, + String jobTimeoutConfigName, boolean enableCancelTask) { + + this.concurrentRequestsConfigName = concurrentRequestsConfigName; + this.jobTimeoutConfigName = jobTimeoutConfigName; + this.requestType = requestType; + this.enableCancelTask = enableCancelTask; + + /* + * The default number of threads will be 0. That means thread pool is not used and + * operation is executed with the current thread. + */ + int threads = !StringUtils.isEmpty(concurrentRequestsConfigName) ? + appConf.getInt(concurrentRequestsConfigName, 0) : 0; + + if (threads > 0) { + /* + * Create a thread pool with no queue wait time to execute the operation. This will ensure + * that job requests are rejected if there are already maximum number of threads busy. + */ + this.jobExecutePool = new ThreadPoolExecutor(threads, threads, + threadKeepAliveTimeInHours, TimeUnit.HOURS, + new SynchronousQueue<Runnable>()); + this.jobExecutePool.allowCoreThreadTimeOut(true); + + /* + * Get the job request time out value. If this configuration value is set to 0 + * then job request will wait until it finishes. + */ + if (!StringUtils.isEmpty(jobTimeoutConfigName)) { + this.requestExecutionTimeoutInSec = appConf.getInt(jobTimeoutConfigName, 0); + } + + LOG.info("Configured " + threads + " threads for job request type " + this.requestType + + " with time out " + this.requestExecutionTimeoutInSec + " s."); + } else { + /* + * If threads are not configured then they will be executed in current thread itself. + */ + LOG.info("No thread pool configured for job request type " + this.requestType); + } + } + + /* + * Creates a job request object and sets up execution environment. Creates a thread pool + * to execute job requests. + * + * @param requestType + * Job request type + * + * @param concurrentRequestsConfigName + * Config name to be used to extract number of concurrent requests to be serviced. + * + * @param jobTimeoutConfigName + * Config name to be used to extract maximum time a task can execute a request. + * + */ + public JobRequestExecutor(JobRequestType requestType, String concurrentRequestsConfigName, + String jobTimeoutConfigName) { + this(requestType, concurrentRequestsConfigName, jobTimeoutConfigName, true); + } + + /* + * Returns true of thread pool is created and can be used for executing a job request. + * Otherwise, returns false. + */ + public boolean isThreadPoolEnabled() { + return this.jobExecutePool != null; + } + + /* + * Executes job request operation. If thread pool is not created then job request is + * executed in current thread itself. + * + * @param jobExecuteCallable + * Callable object to run the job request task. + * + */ + public T execute(JobCallable<T> jobExecuteCallable) throws InterruptedException, + TimeoutException, TooManyRequestsException, ExecutionException { + /* + * The callable shouldn't be null to execute. The thread pool also should be configured + * to execute requests. + */ + assert (jobExecuteCallable != null); + assert (this.jobExecutePool != null); + + String type = this.requestType.toString().toLowerCase(); + + String retryMessageForConcurrentRequests = "Please wait for some time before retrying " + + "the operation. Please refer to the config " + concurrentRequestsConfigName + + " to configure concurrent requests."; + + LOG.debug("Starting new " + type + " job request with time out " + this.requestExecutionTimeoutInSec + + "seconds."); + Future<T> future = null; + + try { + future = this.jobExecutePool.submit(jobExecuteCallable); + } catch (RejectedExecutionException rejectedException) { + /* + * Not able to find thread to execute the job request. Raise Busy exception and client + * can retry the operation. + */ + String tooManyRequestsExceptionMessage = "Unable to service the " + type + " job request as " + + "templeton service is busy with too many " + type + " job requests. " + + retryMessageForConcurrentRequests; + + LOG.warn(tooManyRequestsExceptionMessage); + throw new TooManyRequestsException(tooManyRequestsExceptionMessage); + } + + T result = null; + + try { + result = this.requestExecutionTimeoutInSec > 0 + ? future.get(this.requestExecutionTimeoutInSec, TimeUnit.SECONDS) : future.get(); + } catch (TimeoutException e) { + /* + * See if the execution thread has just completed operation and result is available. + * If result is available then return the result. Otherwise, raise exception. + */ + if ((result = tryGetJobResultOrSetJobStateFailed(jobExecuteCallable)) == null) { + String message = this.requestType + " job request got timed out. Please wait for some time " + + "before retrying the operation. Please refer to the config " + + jobTimeoutConfigName + " to configure job request time out."; + LOG.warn(message); + + /* + * Throw TimeoutException to caller. + */ + throw new TimeoutException(message); + } + } catch (InterruptedException e) { + /* + * See if the execution thread has just completed operation and result is available. + * If result is available then return the result. Otherwise, raise exception. + */ + if ((result = tryGetJobResultOrSetJobStateFailed(jobExecuteCallable)) == null) { + String message = this.requestType + " job request got interrupted. Please wait for some time " + + "before retrying the operation."; + LOG.warn(message); + + /* + * Throw TimeoutException to caller. + */ + throw new InterruptedException(message); + } + } catch (CancellationException e) { + /* + * See if the execution thread has just completed operation and result is available. + * If result is available then return the result. Otherwise, raise exception. + */ + if ((result = tryGetJobResultOrSetJobStateFailed(jobExecuteCallable)) == null) { + String message = this.requestType + " job request got cancelled and thread got interrupted. " + + "Please wait for some time before retrying the operation."; + LOG.warn(message); + + throw new InterruptedException(message); + } + } finally { + /* + * If the thread is still active and needs to be cancelled then cancel it. This may + * happen in case task got interrupted, or timed out. + */ + if (enableCancelTask) { + cancelExecutePoolThread(future); + } + } + + LOG.debug("Completed " + type + " job request."); + + return result; + } + + /* + * Initiate cancel request to cancel the thread execution and interrupt the thread. + * If thread interruption is not handled by jobExecuteCallable then thread may continue + * running to completion. The cancel call may fail for some scenarios. In that case, + * retry the cancel call until it returns true or max retry count is reached. + * + * @param future + * Future object which has handle to cancel the thread. + * + */ + private void cancelExecutePoolThread(Future<T> future) { + int retryCount = 0; + while(retryCount < this.maxTaskCancelRetryCount && !future.isDone()) { + LOG.info("Task is still executing the job request. Cancelling it with retry count: " + + retryCount); + if (future.cancel(true)) { + /* + * Cancelled the job request and return to client. + */ + LOG.info("Cancel job request issued successfully."); + return; + } + + retryCount++; + try { + Thread.sleep(this.maxTaskCancelRetryWaitTimeInMs); + } catch (InterruptedException e) { + /* + * Nothing to do. Just retry. + */ + } + } + + LOG.warn("Failed to cancel the job. isCancelled: " + future.isCancelled() + + " Retry count: " + retryCount); + } + + /* + * Tries to get the job result if job request is completed. Otherwise it sets job status + * to FAILED such that execute thread can do necessary clean up based on FAILED state. + */ + private T tryGetJobResultOrSetJobStateFailed(JobCallable<T> jobExecuteCallable) { + if (!jobExecuteCallable.setJobStateFailed()) { + LOG.info("Job is already COMPLETED. Returning the result."); + return jobExecuteCallable.returnResult; + } else { + LOG.info("Job status set to FAILED. Job clean up to be done by execute thread " + + "after job request is executed."); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java index b3f44a2..1455316 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java @@ -23,6 +23,8 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +34,7 @@ import org.apache.hadoop.hive.shims.HadoopShimsSecure; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.util.ToolRunner; import org.apache.hive.hcatalog.templeton.tool.JobState; import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob; @@ -50,9 +52,26 @@ public class LauncherDelegator extends TempletonDelegator { static public enum JobType {JAR, STREAMING, PIG, HIVE, SQOOP} private boolean secureMeatastoreAccess = false; private final String HIVE_SHIMS_FILENAME_PATTERN = ".*hive-shims.*"; + private final String JOB_SUBMIT_EXECUTE_THREAD_PREFIX = "JobSubmitExecute"; + private final int jobTimeoutTaskRetryCount; + private final int jobTimeoutTaskRetryIntervalInSec; + + /** + * Current thread used to set in execution threads. + */ + private final String submitThreadId = Thread.currentThread().getName(); + + /** + * Job request executor to submit job requests. + */ + private static JobRequestExecutor<EnqueueBean> jobRequest = + new JobRequestExecutor<EnqueueBean>(JobRequestExecutor.JobRequestType.Submit, + AppConfig.JOB_SUBMIT_MAX_THREADS, AppConfig.JOB_SUBMIT_TIMEOUT, false); public LauncherDelegator(AppConfig appConf) { super(appConf); + jobTimeoutTaskRetryCount = appConf.getInt(AppConfig.JOB_TIMEOUT_TASK_RETRY_COUNT, 0); + jobTimeoutTaskRetryIntervalInSec = appConf.getInt(AppConfig.JOB_TIMEOUT_TASK_RETRY_INTERVAL, 0); } public void registerJob(String id, String user, String callback, @@ -70,11 +89,93 @@ public class LauncherDelegator extends TempletonDelegator { } } + /* + * Submit job request. If maximum concurrent job submit requests are configured then submit + * request will be executed on a thread from thread pool. If job submit request time out is + * configured then request execution thread will be interrupted if thread times out. Also + * does best efforts to identify if job is submitted and kill it quietly. + */ + public EnqueueBean enqueueController(final String user, final Map<String, Object> userArgs, + final String callback, final List<String> args) + throws NotAuthorizedException, BusyException, IOException, QueueException, TooManyRequestsException { + + EnqueueBean bean = null; + final TempletonControllerJob controllerJob = getTempletonController(); + + if (jobRequest.isThreadPoolEnabled()) { + JobCallable<EnqueueBean> jobExecuteCallable = getJobSubmitTask(user, userArgs, callback, + args, controllerJob); + try { + bean = jobRequest.execute(jobExecuteCallable); + } catch (TimeoutException ex) { + /* + * Job request got timed out. Job kill should have started. Return to client with + * QueueException. + */ + throw new QueueException(ex.getMessage()); + } catch (InterruptedException ex) { + /* + * Job request got interrupted. Job kill should have started. Return to client with + * with QueueException. + */ + throw new QueueException(ex.getMessage()); + } catch (ExecutionException ex) { + /* + * ExecutionException is raised if job execution gets an exception. Return to client + * with the exception. + */ + throw new QueueException(ex.getMessage()); + } + } else { + LOG.info("No thread pool configured for submit job request. Executing " + + "the job request in current thread."); + + bean = enqueueJob(user, userArgs, callback, args, controllerJob); + } + + return bean; + } + + /* + * Job callable task for job submit operation. Overrides behavior of execute() + * to submit job. Also, overrides the behavior of cleanup() to kill the job in case + * job submission request is timed out or interrupted. + */ + private JobCallable<EnqueueBean> getJobSubmitTask(final String user, + final Map<String, Object> userArgs, final String callback, + final List<String> args, final TempletonControllerJob controllerJob) { + return new JobCallable<EnqueueBean>() { + @Override + public EnqueueBean execute() throws NotAuthorizedException, BusyException, IOException, + QueueException { + /* + * Change the current thread name to include parent thread Id if it is executed + * in thread pool. Useful to extract logs specific to a job request and helpful + * to debug job issues. + */ + Thread.currentThread().setName(String.format("%s-%s-%s", JOB_SUBMIT_EXECUTE_THREAD_PREFIX, + submitThreadId, Thread.currentThread().getId())); + + return enqueueJob(user, userArgs, callback, args, controllerJob); + } + + @Override + public void cleanup() { + /* + * Failed to set job status as COMPLETED which mean the main thread would have + * exited and not waiting for the result. Kill the submitted job. + */ + LOG.info("Job kill not done by main thread. Trying to kill now."); + killTempletonJobWithRetry(user, controllerJob.getSubmittedId()); + } + }; + } + /** * Enqueue the TempletonControllerJob directly calling doAs. */ - public EnqueueBean enqueueController(String user, Map<String, Object> userArgs, String callback, - List<String> args) + public EnqueueBean enqueueJob(String user, Map<String, Object> userArgs, String callback, + List<String> args, TempletonControllerJob controllerJob) throws NotAuthorizedException, BusyException, IOException, QueueException { try { @@ -82,7 +183,7 @@ public class LauncherDelegator extends TempletonDelegator { final long startTime = System.nanoTime(); - String id = queueAsUser(ugi, args); + String id = queueAsUser(ugi, args, controllerJob); long elapsed = ((System.nanoTime() - startTime) / ((int) 1e6)); LOG.debug("queued job " + id + " in " + elapsed + " ms"); @@ -99,21 +200,84 @@ public class LauncherDelegator extends TempletonDelegator { } } - private String queueAsUser(UserGroupInformation ugi, final List<String> args) + private String queueAsUser(UserGroupInformation ugi, final List<String> args, + final TempletonControllerJob controllerJob) throws IOException, InterruptedException { if(LOG.isDebugEnabled()) { LOG.debug("Launching job: " + args); } return ugi.doAs(new PrivilegedExceptionAction<String>() { public String run() throws Exception { - String[] array = new String[args.size()]; - TempletonControllerJob ctrl = new TempletonControllerJob(secureMeatastoreAccess, appConf); - ToolRunner.run(ctrl, args.toArray(array)); - return ctrl.getSubmittedId(); + runTempletonControllerJob(controllerJob, args); + return controllerJob.getSubmittedId(); } }); } + /* + * Kills templeton job with multiple retries if job exists. Returns true if kill job + * attempt is success. Otherwise returns false. + */ + private boolean killTempletonJobWithRetry(String user, String jobId) { + /* + * Make null safe Check if the job submission has gone through and if job is valid. + */ + if (StringUtils.startsWith(jobId, "job_")) { + LOG.info("Started killing the job " + jobId); + + boolean success = false; + int count = 0; + do { + try { + count++; + killJob(user, jobId); + success = true; + LOG.info("Kill job attempt succeeded."); + } catch (Exception e) { + LOG.info("Failed to kill the job due to exception: " + e.getMessage()); + LOG.info("Waiting for " + jobTimeoutTaskRetryIntervalInSec + "s before retrying " + + "the operation. Iteration: " + count); + try { + Thread.sleep(jobTimeoutTaskRetryIntervalInSec * 1000); + } catch (InterruptedException ex) { + LOG.info("Got interrupted while waiting for next retry."); + } + } + } while (!success && count < jobTimeoutTaskRetryCount); + + return success; + } else { + LOG.info("Couldn't find a valid job id after job request is timed out."); + return false; + } + } + + /* + * Gets new templeton controller objects. + */ + protected TempletonControllerJob getTempletonController() { + return new TempletonControllerJob(secureMeatastoreAccess, appConf); + } + + /* + * Runs the templeton controller job with 'args'. Utilizes ToolRunner to run + * the actual job. + */ + protected int runTempletonControllerJob(TempletonControllerJob controllerJob, List<String> args) + throws IOException, InterruptedException, TimeoutException, Exception { + String[] array = new String[args.size()]; + return ToolRunner.run(controllerJob, args.toArray(array)); + } + + /* + * Uses DeleteDelegator to kill a job and ignores all exceptions. + */ + protected void killJob(String user, String jobId) + throws NotAuthorizedException, BadParam, IOException, InterruptedException { + DeleteDelegator d = new DeleteDelegator(appConf); + d.run(user, jobId); + } + public List<String> makeLauncherArgs(AppConfig appConf, String statusdir, String completedUrl, List<String> copyFiles, @@ -263,7 +427,7 @@ public class LauncherDelegator extends TempletonDelegator { } /** * This is called by subclasses when they determined that the sumbmitted job requires - * metastore access (e.g. Pig job that uses HCatalog). This then determines if + * metastore access (e.g. Pig job that uses HCatalog). This then determines if * secure access is required and causes TempletonControllerJob to set up a delegation token. * @see TempletonControllerJob */ http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java index a30ecd1..ed4cea9 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ListDelegator.java @@ -19,9 +19,14 @@ package org.apache.hive.hcatalog.templeton; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.ArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.JobStatus; @@ -31,20 +36,81 @@ import org.apache.hadoop.security.UserGroupInformation; * List jobs owned by a user. */ public class ListDelegator extends TempletonDelegator { + private static final Log LOG = LogFactory.getLog(ListDelegator.class); + private final String JOB_LIST_EXECUTE_THREAD_PREFIX = "JobListExecute"; + + /** + * Current thread id used to set in execution threads. + */ + private final String listThreadId = Thread.currentThread().getName(); + + /* + * Job request executor to list job status requests. + */ + private static JobRequestExecutor<List<JobItemBean>> jobRequest = + new JobRequestExecutor<List<JobItemBean>>(JobRequestExecutor.JobRequestType.List, + AppConfig.JOB_LIST_MAX_THREADS, AppConfig.JOB_LIST_TIMEOUT); + public ListDelegator(AppConfig appConf) { super(appConf); } - public List<String> run(String user, boolean showall) + /* + * List status jobs request. If maximum concurrent job list requests are configured then + * list request will be executed on a thread from thread pool. If job list request time out + * is configured then request execution thread will be interrupted if thread times out and + * does no action. + */ + public List<JobItemBean> run(final String user, final boolean showall, final String jobId, + final int numRecords, final boolean showDetails) + throws NotAuthorizedException, BadParam, IOException, InterruptedException, BusyException, + TimeoutException, ExecutionException, TooManyRequestsException { + + if (jobRequest.isThreadPoolEnabled()) { + return jobRequest.execute(getJobListTask(user, showall, jobId,numRecords, showDetails)); + } else { + return listJobs(user, showall, jobId, numRecords, showDetails); + } + } + + /* + * Job callable task for job list operation. Overrides behavior of execute() to list jobs. + * No need to override behavior of cleanup() as there is nothing to be done if list jobs + * operation is timed out or interrupted. + */ + private JobCallable<List<JobItemBean>> getJobListTask(final String user, final boolean showall, + final String jobId, final int numRecords, final boolean showDetails) { + return new JobCallable<List<JobItemBean>>() { + @Override + public List<JobItemBean> execute() throws NotAuthorizedException, BadParam, IOException, + InterruptedException { + /* + * Change the current thread name to include parent thread Id if it is executed + * in thread pool. Useful to extract logs specific to a job request and helpful + * to debug job issues. + */ + Thread.currentThread().setName(String.format("%s-%s-%s", JOB_LIST_EXECUTE_THREAD_PREFIX, + listThreadId, Thread.currentThread().getId())); + + return listJobs(user, showall, jobId, numRecords, showDetails); + } + }; + } + + /* + * Gets list of job ids and calls getJobStatus to get status for each job id. + */ + public List<JobItemBean> listJobs(String user, boolean showall, String jobId, + int numRecords, boolean showDetails) throws NotAuthorizedException, BadParam, IOException, InterruptedException { UserGroupInformation ugi = UgiFactory.getUgi(user); WebHCatJTShim tracker = null; + ArrayList<String> ids = new ArrayList<String>(); + try { tracker = ShimLoader.getHadoopShims().getWebHCatShim(appConf, ugi); - ArrayList<String> ids = new ArrayList<String>(); - JobStatus[] jobs = tracker.getAllJobs(); if (jobs != null) { @@ -54,13 +120,79 @@ public class ListDelegator extends TempletonDelegator { ids.add(id); } } - - return ids; } catch (IllegalStateException e) { throw new BadParam(e.getMessage()); } finally { if (tracker != null) tracker.close(); } + + return getJobStatus(ids, user, showall, jobId, numRecords, showDetails); + } + + /* + * Returns job status for list of input jobs as a list. + */ + public List<JobItemBean> getJobStatus(ArrayList<String> jobIds, String user, boolean showall, + String jobId, int numRecords, boolean showDetails) + throws IOException, InterruptedException { + + List<JobItemBean> detailList = new ArrayList<JobItemBean>(); + int currRecord = 0; + + // Sort the list as requested + boolean isAscendingOrder = true; + switch (appConf.getListJobsOrder()) { + case lexicographicaldesc: + Collections.sort(jobIds, Collections.reverseOrder()); + isAscendingOrder = false; + break; + case lexicographicalasc: + default: + Collections.sort(jobIds); + break; + } + + for (String job : jobIds) { + // If numRecords = -1, fetch all records. + // Hence skip all the below checks when numRecords = -1. + if (numRecords != -1) { + // If currRecord >= numRecords, we have already fetched the top #numRecords + if (currRecord >= numRecords) { + break; + } + else if (jobId == null || jobId.trim().length() == 0) { + currRecord++; + } + // If the current record needs to be returned based on the + // filter conditions specified by the user, increment the counter + else if (isAscendingOrder && job.compareTo(jobId) > 0 || !isAscendingOrder && job.compareTo(jobId) < 0) { + currRecord++; + } + // The current record should not be included in the output detailList. + else { + continue; + } + } + JobItemBean jobItem = new JobItemBean(); + jobItem.id = job; + if (showDetails) { + StatusDelegator sd = new StatusDelegator(appConf); + try { + jobItem.detail = sd.run(user, job, false); + } + catch(Exception ex) { + /* + * if we could not get status for some reason, log it, and send empty status back with + * just the ID so that caller knows to even look in the log file + */ + LOG.info("Failed to get status detail for jobId='" + job + "'", ex); + jobItem.detail = new QueueStatusBean(job, "Failed to retrieve status; see WebHCat logs"); + } + } + detailList.add(jobItem); + } + + return detailList; } } http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java index aeb89df..663e4b6 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java @@ -51,7 +51,7 @@ public class PigDelegator extends LauncherDelegator { boolean usesHcatalog, String completedUrl, boolean enablelog, Boolean enableJobReconnect) throws NotAuthorizedException, BadParam, BusyException, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, TooManyRequestsException { runAs = user; List<String> args = makeArgs(execute, srcFile, pigArgs, http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java index 2da0204..43a7d57 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java @@ -27,6 +27,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.servlet.http.HttpServletRequest; @@ -650,7 +652,7 @@ public class Server { @FormParam("enablelog") boolean enablelog, @FormParam("enablejobreconnect") Boolean enablejobreconnect) throws NotAuthorizedException, BusyException, BadParam, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, TooManyRequestsException { verifyUser(); verifyParam(inputs, "input"); verifyParam(mapper, "mapper"); @@ -704,7 +706,7 @@ public class Server { @FormParam("enablelog") boolean enablelog, @FormParam("enablejobreconnect") Boolean enablejobreconnect) throws NotAuthorizedException, BusyException, BadParam, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, TooManyRequestsException { verifyUser(); verifyParam(jar, "jar"); verifyParam(mainClass, "class"); @@ -754,7 +756,7 @@ public class Server { @FormParam("enablelog") boolean enablelog, @FormParam("enablejobreconnect") Boolean enablejobreconnect) throws NotAuthorizedException, BusyException, BadParam, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, TooManyRequestsException { verifyUser(); if (execute == null && srcFile == null) { throw new BadParam("Either execute or file parameter required"); @@ -805,7 +807,7 @@ public class Server { @FormParam("enablelog") boolean enablelog, @FormParam("enablejobreconnect") Boolean enablejobreconnect) throws NotAuthorizedException, BusyException, BadParam, QueueException, - IOException, InterruptedException { + IOException, InterruptedException, TooManyRequestsException { verifyUser(); if (command == null && optionsFile == null) throw new BadParam("Must define Sqoop command or a optionsfile contains Sqoop command to run Sqoop job."); @@ -859,7 +861,7 @@ public class Server { @FormParam("enablelog") boolean enablelog, @FormParam("enablejobreconnect") Boolean enablejobreconnect) throws NotAuthorizedException, BusyException, BadParam, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, TooManyRequestsException { verifyUser(); if (execute == null && srcFile == null) { throw new BadParam("Either execute or file parameter required"); @@ -891,7 +893,8 @@ public class Server { @Path("jobs/{jobid}") @Produces({MediaType.APPLICATION_JSON}) public QueueStatusBean showJobId(@PathParam("jobid") String jobid) - throws NotAuthorizedException, BadParam, IOException, InterruptedException { + throws NotAuthorizedException, BadParam, IOException, InterruptedException, + BusyException, TimeoutException, ExecutionException, TooManyRequestsException { verifyUser(); verifyParam(jobid, ":jobid"); @@ -968,7 +971,8 @@ public class Server { @QueryParam("showall") boolean showall, @QueryParam("jobid") String jobid, @QueryParam("numrecords") String numrecords) - throws NotAuthorizedException, BadParam, IOException, InterruptedException { + throws NotAuthorizedException, BadParam, IOException, InterruptedException, + BusyException, TimeoutException, ExecutionException, TooManyRequestsException { verifyUser(); @@ -980,19 +984,14 @@ public class Server { showDetails = true; } - ListDelegator ld = new ListDelegator(appConf); - List<String> list = ld.run(getDoAsUser(), showall); - List<JobItemBean> detailList = new ArrayList<JobItemBean>(); - int currRecord = 0; int numRecords; - // Parse numrecords to an integer try { if (numrecords != null) { numRecords = Integer.parseInt(numrecords); - if (numRecords <= 0) { - throw new BadParam("numrecords should be an integer > 0"); - } + if (numRecords <= 0) { + throw new BadParam("numrecords should be an integer > 0"); + } } else { numRecords = -1; @@ -1002,57 +1001,8 @@ public class Server { throw new BadParam("Invalid numrecords format: numrecords should be an integer > 0"); } - // Sort the list as requested - boolean isAscendingOrder = true; - switch (appConf.getListJobsOrder()) { - case lexicographicaldesc: - Collections.sort(list, Collections.reverseOrder()); - isAscendingOrder = false; - break; - case lexicographicalasc: - default: - Collections.sort(list); - break; - } - - for (String job : list) { - // If numRecords = -1, fetch all records. - // Hence skip all the below checks when numRecords = -1. - if (numRecords != -1) { - // If currRecord >= numRecords, we have already fetched the top #numRecords - if (currRecord >= numRecords) { - break; - } - else if (jobid == null || jobid.trim().length() == 0) { - currRecord++; - } - // If the current record needs to be returned based on the - // filter conditions specified by the user, increment the counter - else if (isAscendingOrder && job.compareTo(jobid) > 0 || !isAscendingOrder && job.compareTo(jobid) < 0) { - currRecord++; - } - // The current record should not be included in the output detailList. - else { - continue; - } - } - JobItemBean jobItem = new JobItemBean(); - jobItem.id = job; - if (showDetails) { - StatusDelegator sd = new StatusDelegator(appConf); - try { - jobItem.detail = sd.run(getDoAsUser(), job); - } - catch(Exception ex) { - /*if we could not get status for some reason, log it, and send empty status back with - * just the ID so that caller knows to even look in the log file*/ - LOG.info("Failed to get status detail for jobId='" + job + "'", ex); - jobItem.detail = new QueueStatusBean(job, "Failed to retrieve status; see WebHCat logs"); - } - } - detailList.add(jobItem); - } - return detailList; + ListDelegator ld = new ListDelegator(appConf); + return ld.run(getDoAsUser(), showall, jobid, numRecords, showDetails); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java index fde5f60..eb84fb2 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java @@ -50,7 +50,7 @@ public class SqoopDelegator extends LauncherDelegator { String callback, String completedUrl, boolean enablelog, Boolean enableJobReconnect, String libdir) throws NotAuthorizedException, BadParam, BusyException, QueueException, - IOException, InterruptedException + IOException, InterruptedException, TooManyRequestsException { if(TempletonUtils.isset(appConf.sqoopArchive())) { if(!TempletonUtils.isset(appConf.sqoopPath()) && !TempletonUtils.isset(appConf.sqoopHome())) { http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java index fac0170..4112eef 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java @@ -19,6 +19,8 @@ package org.apache.hive.hcatalog.templeton; import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,12 +43,71 @@ import org.apache.hive.hcatalog.templeton.tool.JobState; */ public class StatusDelegator extends TempletonDelegator { private static final Logger LOG = LoggerFactory.getLogger(StatusDelegator.class); + private final String JOB_STATUS_EXECUTE_THREAD_PREFIX = "JobStatusExecute"; + + /** + * Current thread id used to set in execution threads. + */ + private final String statusThreadId = Thread.currentThread().getName(); + + /* + * Job status request executor to get status of a job. + */ + private static JobRequestExecutor<QueueStatusBean> jobRequest = + new JobRequestExecutor<QueueStatusBean>(JobRequestExecutor.JobRequestType.Status, + AppConfig.JOB_STATUS_MAX_THREADS, AppConfig.JOB_STATUS_TIMEOUT); public StatusDelegator(AppConfig appConf) { super(appConf); } - public QueueStatusBean run(String user, String id) + /* + * Gets status of job form job id. If maximum concurrent job status requests are configured + * then status request will be executed on a thread from thread pool. If job status request + * time out is configured then request execution thread will be interrupted if thread + * times out and does no action. + */ + public QueueStatusBean run(final String user, final String id, boolean enableThreadPool) + throws NotAuthorizedException, BadParam, IOException, InterruptedException, + BusyException, TimeoutException, ExecutionException, TooManyRequestsException { + if (jobRequest.isThreadPoolEnabled() && enableThreadPool) { + return jobRequest.execute(getJobStatusCallableTask(user, id)); + } else { + return getJobStatus(user, id); + } + } + + /* + * Job callable task for job status operation. Overrides behavior of execute() to get + * status of a job. No need to override behavior of cleanup() as there is nothing to be + * done if job sttaus operation is timed out or interrupted. + */ + private JobCallable<QueueStatusBean> getJobStatusCallableTask(final String user, + final String id) { + return new JobCallable<QueueStatusBean>() { + @Override + public QueueStatusBean execute() throws NotAuthorizedException, BadParam, IOException, + InterruptedException, BusyException { + /* + * Change the current thread name to include parent thread Id if it is executed + * in thread pool. Useful to extract logs specific to a job request and helpful + * to debug job issues. + */ + Thread.currentThread().setName(String.format("%s-%s-%s", JOB_STATUS_EXECUTE_THREAD_PREFIX, + statusThreadId, Thread.currentThread().getId())); + + return getJobStatus(user, id); + } + }; + } + + public QueueStatusBean run(final String user, final String id) + throws NotAuthorizedException, BadParam, IOException, InterruptedException, + BusyException, TimeoutException, ExecutionException, TooManyRequestsException { + return run(user, id, true); + } + + public QueueStatusBean getJobStatus(String user, String id) throws NotAuthorizedException, BadParam, IOException, InterruptedException { WebHCatJTShim tracker = null; http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java index 839b56a..590e49f 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java @@ -51,7 +51,7 @@ public class StreamingDelegator extends LauncherDelegator { Boolean enableJobReconnect, JobType jobType) throws NotAuthorizedException, BadParam, BusyException, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, TooManyRequestsException { List<String> args = makeArgs(inputs, inputreader, output, mapper, reducer, combiner, fileList, cmdenvs, jarArgs); http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TooManyRequestsException.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TooManyRequestsException.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TooManyRequestsException.java new file mode 100644 index 0000000..9d55ad4 --- /dev/null +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TooManyRequestsException.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +/** + * Raise this exception if web service is busy with existing requests and not able + * service new requests. + */ +public class TooManyRequestsException extends SimpleWebException { + /* + * The current version of jetty server doesn't have the status + * HttpStatus.TOO_MANY_REQUESTS_429. Hence, passing this as constant. + */ + public static int TOO_MANY_REQUESTS_429 = 429; + + public TooManyRequestsException(String msg) { + super(TOO_MANY_REQUESTS_429, msg); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java index 15ab8b9..f4c4b76 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java @@ -81,9 +81,14 @@ public class TempletonControllerJob extends Configured implements Tool, JobSubmi this.appConf = conf; } - private JobID submittedJobId; + private Job job = null; public String getSubmittedId() { + if (job == null ) { + return null; + } + + JobID submittedJobId = job.getJobID(); if (submittedJobId == null) { return null; } else { @@ -119,7 +124,7 @@ public class TempletonControllerJob extends Configured implements Tool, JobSubmi String user = UserGroupInformation.getCurrentUser().getShortUserName(); conf.set("user.name", user); - Job job = new Job(conf); + job = new Job(conf); job.setJarByClass(LaunchMapper.class); job.setJobName(TempletonControllerJob.class.getSimpleName()); job.setMapperClass(LaunchMapper.class); @@ -141,7 +146,7 @@ public class TempletonControllerJob extends Configured implements Tool, JobSubmi job.submit(); - submittedJobId = job.getJobID(); + JobID submittedJobId = job.getJobID(); if(metastoreTokenStrForm != null) { //so that it can be cancelled later from CompleteDelegator DelegationTokenCache.getStringFormTokenCache().storeDelegationToken( http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/ConcurrentJobRequestsTestBase.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/ConcurrentJobRequestsTestBase.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/ConcurrentJobRequestsTestBase.java new file mode 100644 index 0000000..5fcae46 --- /dev/null +++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/ConcurrentJobRequestsTestBase.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.Future; + +import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; + +/* + * Base class for mocking job operations with concurrent requests. + */ +public class ConcurrentJobRequestsTestBase { + private static final Logger LOG = LoggerFactory.getLogger(ConcurrentJobRequestsTestBase.class); + private boolean started = false; + private Object lock = new Object(); + + MockAnswerTestHelper<QueueStatusBean> statusJobHelper = new MockAnswerTestHelper<QueueStatusBean>(); + MockAnswerTestHelper<QueueStatusBean> killJobHelper = new MockAnswerTestHelper<QueueStatusBean>(); + MockAnswerTestHelper<List<JobItemBean>> listJobHelper = new MockAnswerTestHelper<List<JobItemBean>>(); + MockAnswerTestHelper<Integer> submitJobHelper = new MockAnswerTestHelper<Integer>(); + + /* + * Waits for other threads to join and returns with its Id. + */ + private int waitForAllThreadsToStart(JobRunnable jobRunnable, int poolThreadCount) { + int currentId = jobRunnable.threadStartCount.incrementAndGet(); + LOG.info("Waiting for other threads with thread id: " + currentId); + synchronized(lock) { + /* + * We need a total of poolThreadCount + 1 threads to start at same. There are + * poolThreadCount threads in thread pool and another one which has started them. + * The thread which sees atomic counter as poolThreadCount+1 is the last thread` + * to join and wake up all threads to start all at once. + */ + if (currentId > poolThreadCount) { + LOG.info("Waking up all threads: " + currentId); + started = true; + this.lock.notifyAll(); + } else { + while (!started) { + try { + this.lock.wait(); + } catch (InterruptedException ignore) { + } + } + } + } + + return currentId; + } + + public JobRunnable ConcurrentJobsStatus(final int threadCount, AppConfig appConfig, + final boolean killThreads, boolean interruptThreads, final Answer<QueueStatusBean> answer) + throws IOException, InterruptedException, QueueException, NotAuthorizedException, + BadParam, BusyException { + + StatusDelegator delegator = new StatusDelegator(appConfig); + final StatusDelegator mockDelegator = Mockito.spy(delegator); + + Mockito.doAnswer(answer).when(mockDelegator).getJobStatus(Mockito.any(String.class), + Mockito.any(String.class)); + + JobRunnable statusJobRunnable = new JobRunnable() { + @Override + public void run() { + try { + int threadId = waitForAllThreadsToStart(this, threadCount); + LOG.info("Started executing Job Status operation. ThreadId : " + threadId); + mockDelegator.run("admin", "job_1000" + threadId); + } catch (Exception ex) { + exception = ex; + } + } + }; + + executeJobOperations(statusJobRunnable, threadCount, killThreads, interruptThreads); + return statusJobRunnable; + } + + public JobRunnable ConcurrentListJobs(final int threadCount, AppConfig config, + final boolean killThreads, boolean interruptThreads, final Answer<List<JobItemBean>> answer) + throws IOException, InterruptedException, QueueException, NotAuthorizedException, + BadParam, BusyException { + + ListDelegator delegator = new ListDelegator(config); + final ListDelegator mockDelegator = Mockito.spy(delegator); + + Mockito.doAnswer(answer).when(mockDelegator).listJobs(Mockito.any(String.class), + Mockito.any(boolean.class), Mockito.any(String.class), + Mockito.any(int.class), Mockito.any(boolean.class)); + + JobRunnable listJobRunnable = new JobRunnable() { + @Override + public void run() { + try { + int threadId = waitForAllThreadsToStart(this, threadCount); + LOG.info("Started executing Job List operation. ThreadId : " + threadId); + mockDelegator.run("admin", true, "", 10, true); + } catch (Exception ex) { + exception = ex; + } + } + }; + + executeJobOperations(listJobRunnable, threadCount, killThreads, interruptThreads); + return listJobRunnable; + } + + public JobRunnable SubmitConcurrentJobs(final int threadCount, AppConfig config, + final boolean killThreads, boolean interruptThreads, final Answer<Integer> responseAnswer, + final Answer<QueueStatusBean> timeoutResponseAnswer, final String jobIdResponse) + throws IOException, InterruptedException, QueueException, NotAuthorizedException, + BusyException, TimeoutException, Exception { + + LauncherDelegator delegator = new LauncherDelegator(config); + final LauncherDelegator mockDelegator = Mockito.spy(delegator); + final List<String> listArgs = new ArrayList<String>(); + + TempletonControllerJob mockCtrl = Mockito.mock(TempletonControllerJob.class); + + Mockito.doReturn(jobIdResponse).when(mockCtrl).getSubmittedId(); + + Mockito.doReturn(mockCtrl).when(mockDelegator).getTempletonController(); + + Mockito.doAnswer(responseAnswer).when(mockDelegator).runTempletonControllerJob( + Mockito.any(TempletonControllerJob.class), Mockito.any(List.class)); + + Mockito.doAnswer(timeoutResponseAnswer).when(mockDelegator).killJob( + Mockito.any(String.class), Mockito.any(String.class)); + + Mockito.doNothing().when(mockDelegator).registerJob(Mockito.any(String.class), + Mockito.any(String.class), Mockito.any(String.class), Mockito.any(Map.class)); + + JobRunnable submitJobRunnable = new JobRunnable() { + @Override + public void run() { + try { + int threadId = waitForAllThreadsToStart(this, threadCount); + LOG.info("Started executing Job Submit operation. ThreadId : " + threadId); + mockDelegator.enqueueController("admin", null, "", listArgs); + } catch (Throwable ex) { + exception = ex; + } + } + }; + + executeJobOperations(submitJobRunnable, threadCount, killThreads, interruptThreads); + return submitJobRunnable; + } + + public void executeJobOperations(JobRunnable jobRunnable, int threadCount, boolean killThreads, + boolean interruptThreads) + throws IOException, InterruptedException, QueueException, NotAuthorizedException { + + started = false; + + ExecutorService executorService = new ThreadPoolExecutor(threadCount, threadCount, 0L, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());; + + ArrayList<Future<?>> futures = new ArrayList<Future<?>>(); + for (int i = 0; i < threadCount; i++) { + futures.add(executorService.submit(jobRunnable)); + } + + waitForAllThreadsToStart(jobRunnable, threadCount); + LOG.info("Started all threads "); + + if (killThreads) { + executorService.shutdownNow(); + } else { + if (interruptThreads){ + for (Future<?> future : futures) { + LOG.info("Cancelling the thread"); + future.cancel(true); + } + } + + executorService.shutdown(); + } + + /* + * For both graceful or forceful shutdown, wait for tasks to terminate such that + * appropriate exceptions are raised and stored in JobRunnable.exception. + */ + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + LOG.info("Force Shutting down the pool\n"); + if (!killThreads) { + /* + * killThreads option has already done force shutdown. No need to do again. + */ + executorService.shutdownNow(); + } + } + } + + public abstract class JobRunnable implements Runnable { + public volatile Throwable exception = null; + public AtomicInteger threadStartCount = new AtomicInteger(0); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerTestHelper.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerTestHelper.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerTestHelper.java new file mode 100644 index 0000000..9f1744e --- /dev/null +++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerTestHelper.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import java.io.IOException; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/* + * Helper class to generate mocked response. + */ +public class MockAnswerTestHelper<T> { + public Answer<T> getIOExceptionAnswer() { + return new Answer<T>() { + @Override + public T answer(InvocationOnMock invocation) throws Exception { + throw new IOException("IOException raised manually."); + } + }; + } + + public Answer<T> getOutOfMemoryErrorAnswer() { + return new Answer<T>() { + @Override + public T answer(InvocationOnMock invocation) throws OutOfMemoryError { + throw new OutOfMemoryError("OutOfMemoryError raised manually."); + } + }; + } + + public Answer<T> getDelayedResonseAnswer(final int delayInSeconds, final T response) { + return new Answer<T>() { + @Override + public T answer(InvocationOnMock invocation) throws InterruptedException { + Thread.sleep(1000 * delayInSeconds); + return response; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java new file mode 100644 index 0000000..695dcc6 --- /dev/null +++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import java.util.ArrayList; + +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import static org.junit.Assert.assertTrue; + +/* + * Test submission of concurrent job requests. + */ +public class TestConcurrentJobRequests extends ConcurrentJobRequestsTestBase { + + private static AppConfig config; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @BeforeClass + public static void setUp() { + final String[] args = new String[] {}; + Main main = new Main(args); + config = main.getAppConfigInstance(); + } + + @Test + public void ConcurrentJobsStatusSuccess() { + try { + JobRunnable jobRunnable = ConcurrentJobsStatus(6, config, false, false, + statusJobHelper.getDelayedResonseAnswer(4, new QueueStatusBean("job_1000", "Job not found"))); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentListJobsSuccess() { + try { + JobRunnable jobRunnable = ConcurrentListJobs(6, config, false, false, + listJobHelper.getDelayedResonseAnswer(4, new ArrayList<JobItemBean>())); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentSubmitJobsSuccess() { + try { + JobRunnable jobRunnable = SubmitConcurrentJobs(6, config, false, false, + submitJobHelper.getDelayedResonseAnswer(4, 0), + killJobHelper.getDelayedResonseAnswer(4, null), "job_1000"); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreads.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreads.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreads.java new file mode 100644 index 0000000..6f8da40 --- /dev/null +++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreads.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.TimeoutException; +import org.eclipse.jetty.http.HttpStatus; + +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import static org.junit.Assert.assertTrue; + +/* + * Test submission of concurrent job requests with the controlled number of concurrent + * Requests. Verify that we get busy exception and appropriate message. + */ +public class TestConcurrentJobRequestsThreads extends ConcurrentJobRequestsTestBase { + + private static AppConfig config; + private static QueueStatusBean statusBean; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @BeforeClass + public static void setUp() { + final String[] args = new String[] {}; + Main main = new Main(args); + config = main.getAppConfigInstance(); + config.setInt(AppConfig.JOB_STATUS_MAX_THREADS, 5); + config.setInt(AppConfig.JOB_LIST_MAX_THREADS, 5); + config.setInt(AppConfig.JOB_SUBMIT_MAX_THREADS, 5); + statusBean = new QueueStatusBean("job_1000", "Job not found"); + } + + @Test + public void ConcurrentJobsStatusTooManyRequestsException() { + try { + JobRunnable jobRunnable = ConcurrentJobsStatus(6, config, false, false, + statusJobHelper.getDelayedResonseAnswer(4, statusBean)); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof TooManyRequestsException); + TooManyRequestsException ex = (TooManyRequestsException)jobRunnable.exception; + assertTrue(ex.httpCode == TooManyRequestsException.TOO_MANY_REQUESTS_429); + String expectedMessage = "Unable to service the status job request as templeton service is busy " + + "with too many status job requests. Please wait for some time before " + + "retrying the operation. Please refer to the config " + + "templeton.parallellism.job.status to configure concurrent requests."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + + /* + * Verify that new job requests have no issues. + */ + jobRunnable = ConcurrentJobsStatus(5, config, false, false, + statusJobHelper.getDelayedResonseAnswer(4, statusBean)); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentListJobsTooManyRequestsException() { + try { + JobRunnable jobRunnable = ConcurrentListJobs(6, config, false, false, + listJobHelper.getDelayedResonseAnswer(4, new ArrayList<JobItemBean>())); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof TooManyRequestsException); + TooManyRequestsException ex = (TooManyRequestsException)jobRunnable.exception; + assertTrue(ex.httpCode == TooManyRequestsException.TOO_MANY_REQUESTS_429); + String expectedMessage = "Unable to service the list job request as templeton service is busy " + + "with too many list job requests. Please wait for some time before " + + "retrying the operation. Please refer to the config " + + "templeton.parallellism.job.list to configure concurrent requests."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + + /* + * Verify that new job requests have no issues. + */ + jobRunnable = ConcurrentListJobs(5, config, false, false, + listJobHelper.getDelayedResonseAnswer(4, new ArrayList<JobItemBean>())); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentSubmitJobsTooManyRequestsException() { + try { + JobRunnable jobRunnable = SubmitConcurrentJobs(6, config, false, false, + submitJobHelper.getDelayedResonseAnswer(4, 0), + killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000"); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof TooManyRequestsException); + TooManyRequestsException ex = (TooManyRequestsException)jobRunnable.exception; + assertTrue(ex.httpCode == TooManyRequestsException.TOO_MANY_REQUESTS_429); + String expectedMessage = "Unable to service the submit job request as templeton service is busy " + + "with too many submit job requests. Please wait for some time before " + + "retrying the operation. Please refer to the config " + + "templeton.parallellism.job.submit to configure concurrent requests."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + + /* + * Verify that new job requests have no issues. + */ + jobRunnable = SubmitConcurrentJobs(5, config, false, false, + submitJobHelper.getDelayedResonseAnswer(4, 0), + killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000"); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } +}