Repository: hive Updated Branches: refs/heads/master 74ca2eedf -> 2f1d4b11e
http://git-wip-us.apache.org/repos/asf/hive/blob/2f1d4b11/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreadsAndTimeout.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreadsAndTimeout.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreadsAndTimeout.java new file mode 100644 index 0000000..ef49cbd --- /dev/null +++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreadsAndTimeout.java @@ -0,0 +1,374 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.TimeoutException; +import org.eclipse.jetty.http.HttpStatus; + +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import static org.junit.Assert.assertTrue; + +/* + * Test submission of concurrent job requests with the controlled number of concurrent + * Requests and job request execution time outs. Verify that we get appropriate exceptions + * and exception message. + */ +public class TestConcurrentJobRequestsThreadsAndTimeout extends ConcurrentJobRequestsTestBase { + + private static AppConfig config; + private static QueueStatusBean statusBean; + private static String statusTooManyRequestsExceptionMessage; + private static String listTooManyRequestsExceptionMessage; + private static String submitTooManyRequestsExceptionMessage; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @BeforeClass + public static void setUp() { + final String[] args = new String[] {}; + Main main = new Main(args); + config = main.getAppConfigInstance(); + config.setInt(AppConfig.JOB_STATUS_MAX_THREADS, 5); + config.setInt(AppConfig.JOB_LIST_MAX_THREADS, 5); + config.setInt(AppConfig.JOB_SUBMIT_MAX_THREADS, 5); + config.setInt(AppConfig.JOB_SUBMIT_TIMEOUT, 5); + config.setInt(AppConfig.JOB_STATUS_TIMEOUT, 5); + config.setInt(AppConfig.JOB_LIST_TIMEOUT, 5); + config.setInt(AppConfig.JOB_TIMEOUT_TASK_RETRY_COUNT, 4); + config.setInt(AppConfig.JOB_TIMEOUT_TASK_RETRY_INTERVAL, 1); + statusBean = new QueueStatusBean("job_1000", "Job not found"); + + statusTooManyRequestsExceptionMessage = "Unable to service the status job request as " + + "templeton service is busy with too many status job requests. " + + "Please wait for some time before retrying the operation. " + + "Please refer to the config templeton.parallellism.job.status " + + "to configure concurrent requests."; + listTooManyRequestsExceptionMessage = "Unable to service the list job request as " + + "templeton service is busy with too many list job requests. " + + "Please wait for some time before retrying the operation. " + + "Please refer to the config templeton.parallellism.job.list " + + "to configure concurrent requests."; + submitTooManyRequestsExceptionMessage = "Unable to service the submit job request as " + + "templeton service is busy with too many submit job requests. " + + "Please wait for some time before retrying the operation. " + + "Please refer to the config templeton.parallellism.job.submit " + + "to configure concurrent requests."; + } + + @Test + public void ConcurrentJobsStatusTooManyRequestsException() { + try { + JobRunnable jobRunnable = ConcurrentJobsStatus(6, config, false, false, + statusJobHelper.getDelayedResonseAnswer(4, statusBean)); + verifyTooManyRequestsException(jobRunnable.exception, this.statusTooManyRequestsExceptionMessage); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentListJobsTooManyRequestsException() { + try { + JobRunnable jobRunnable = ConcurrentListJobs(6, config, false, false, + listJobHelper.getDelayedResonseAnswer(4, new ArrayList<JobItemBean>())); + verifyTooManyRequestsException(jobRunnable.exception, this.listTooManyRequestsExceptionMessage); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentSubmitJobsTooManyRequestsException() { + try { + JobRunnable jobRunnable = SubmitConcurrentJobs(6, config, false, false, + submitJobHelper.getDelayedResonseAnswer(4, 0), + killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000"); + verifyTooManyRequestsException(jobRunnable.exception, this.submitTooManyRequestsExceptionMessage); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentJobsStatusTimeOutException() { + try { + JobRunnable jobRunnable = ConcurrentJobsStatus(5, config, false, false, + statusJobHelper.getDelayedResonseAnswer(6, statusBean)); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof TimeoutException); + String expectedMessage = "Status job request got timed out. Please wait for some time before " + + "retrying the operation. Please refer to the config " + + "templeton.job.status.timeout to configure job request time out."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + + /* + * Verify that new job requests should succeed with no issues. + */ + jobRunnable = ConcurrentJobsStatus(5, config, false, false, + statusJobHelper.getDelayedResonseAnswer(0, statusBean)); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentListJobsTimeOutException() { + try { + JobRunnable jobRunnable = ConcurrentListJobs(5, config, false, false, + listJobHelper.getDelayedResonseAnswer(6, new ArrayList<JobItemBean>())); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof TimeoutException); + String expectedMessage = "List job request got timed out. Please wait for some time before " + + "retrying the operation. Please refer to the config " + + "templeton.job.list.timeout to configure job request time out."; + + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + + /* + * Verify that new job requests should succeed with no issues. + */ + jobRunnable = ConcurrentListJobs(5, config, false, false, + listJobHelper.getDelayedResonseAnswer(1, new ArrayList<JobItemBean>())); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentSubmitJobsTimeOutException() { + try { + JobRunnable jobRunnable = SubmitConcurrentJobs(5, config, false, false, + submitJobHelper.getDelayedResonseAnswer(6, 0), + killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000"); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof QueueException); + String expectedMessage = "Submit job request got timed out. Please wait for some time before " + + "retrying the operation. Please refer to the config " + + "templeton.job.submit.timeout to configure job request time out."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + + /* + * For submit operation, tasks are not cancelled. Verify that new job request + * should fail with TooManyRequestsException. + */ + jobRunnable = SubmitConcurrentJobs(1, config, false, false, + submitJobHelper.getDelayedResonseAnswer(0, 0), + killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000"); + verifyTooManyRequestsException(jobRunnable.exception, this.submitTooManyRequestsExceptionMessage); + + /* + * Sleep until all threads with clean up tasks are completed. + */ + Thread.sleep(2000); + + /* + * Now, tasks would have passed. Verify that new job requests should succeed with no issues. + */ + jobRunnable = SubmitConcurrentJobs(5, config, false, false, + submitJobHelper.getDelayedResonseAnswer(0, 0), + killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000"); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentStatusJobsVerifyExceptions() { + try { + /* + * Trigger kill threads and verify we get InterruptedException and expected Message. + */ + int timeoutTaskDelay = 4; + JobRunnable jobRunnable = ConcurrentJobsStatus(5, config, true, false, + statusJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, statusBean)); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof InterruptedException); + String expectedMessage = "Status job request got interrupted. Please wait for some time before " + + "retrying the operation."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + + /* + * Interrupt all thread and verify we get InterruptedException and expected Message. + */ + jobRunnable = ConcurrentJobsStatus(5, config, false, true, + statusJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, statusBean)); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof InterruptedException); + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + + /* + * Raise custom exception like IOException and verify expected Message. + */ + jobRunnable = ConcurrentJobsStatus(5, config, false, false, + statusJobHelper.getIOExceptionAnswer()); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception.getCause() instanceof IOException); + + /* + * Now new job requests should succeed as status operation has no cancel threads. + */ + jobRunnable = ConcurrentJobsStatus(5, config, false, false, + statusJobHelper.getDelayedResonseAnswer(0, statusBean)); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentListJobsVerifyExceptions() { + try { + /* + * Trigger kill threads and verify we get InterruptedException and expected Message. + */ + int timeoutTaskDelay = 4; + JobRunnable jobRunnable = ConcurrentListJobs(5, config, true, false, + listJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, new ArrayList<JobItemBean>())); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof InterruptedException); + String expectedMessage = "List job request got interrupted. Please wait for some time before " + + "retrying the operation."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + + /* + * Interrupt all thread and verify we get InterruptedException and expected Message. + */ + jobRunnable = ConcurrentListJobs(5, config, false, true, + listJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, new ArrayList<JobItemBean>())); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof InterruptedException); + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + + /* + * Raise custom exception like IOException and verify expected Message. + */ + jobRunnable = ConcurrentListJobs(5, config, false, false, + listJobHelper.getIOExceptionAnswer()); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception.getCause() instanceof IOException); + + /* + * Now new job requests should succeed as list operation has no cancel threads. + */ + jobRunnable = ConcurrentListJobs(5, config, false, false, + listJobHelper.getDelayedResonseAnswer(0, new ArrayList<JobItemBean>())); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentSubmitJobsVerifyExceptions() { + try { + int timeoutTaskDelay = 4; + + /* + * Raise custom exception like IOException and verify expected Message. + * This should not invoke cancel operation. + */ + JobRunnable jobRunnable = SubmitConcurrentJobs(1, config, false, false, + submitJobHelper.getIOExceptionAnswer(), + killJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, statusBean), "job_1002"); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof QueueException); + assertTrue(jobRunnable.exception.getMessage().contains("IOException raised manually.")); + + /* + * Raise custom exception like IOException and verify expected Message. + * This should not invoke cancel operation. + */ + jobRunnable = SubmitConcurrentJobs(1, config, false, false, + submitJobHelper.getOutOfMemoryErrorAnswer(), + killJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, statusBean), "job_1003"); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof QueueException); + assertTrue(jobRunnable.exception.getMessage().contains("OutOfMemoryError raised manually.")); + + /* + * Trigger kill threads and verify that we get InterruptedException and expected + * Message. This should raise 3 kill operations and ensure that retries keep the time out + * occupied for 4 sec. + */ + jobRunnable = SubmitConcurrentJobs(3, config, true, false, + submitJobHelper.getDelayedResonseAnswer(2, 0), + killJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, statusBean), "job_1000"); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof QueueException); + String expectedMessage = "Submit job request got interrupted. Please wait for some time " + + "before retrying the operation."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + + /* + * Interrupt all threads and verify we get InterruptedException and expected + * Message. Also raise 2 kill operations and ensure that retries keep the time out + * occupied for 4 sec. + */ + jobRunnable = SubmitConcurrentJobs(2, config, false, true, + submitJobHelper.getDelayedResonseAnswer(2, 0), + killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1001"); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof QueueException); + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + + /* + * For submit operation, tasks are not cancelled. Verify that new job request + * should fail with TooManyRequestsException. + */ + jobRunnable = SubmitConcurrentJobs(1, config, false, false, + submitJobHelper.getDelayedResonseAnswer(0, 0), + killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1002"); + verifyTooManyRequestsException(jobRunnable.exception, this.submitTooManyRequestsExceptionMessage); + + /* + * Sleep until all threads with clean up tasks are completed. 2 seconds completing task + * and 1 sec grace period. + */ + Thread.sleep((timeoutTaskDelay + 2 + 1) * 1000); + + /* + * Now new job requests should succeed as all cancel threads would have completed. + */ + jobRunnable = SubmitConcurrentJobs(5, config, false, false, + submitJobHelper.getDelayedResonseAnswer(0, 0), + killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1004"); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + private void verifyTooManyRequestsException(Throwable exception, String expectedMessage) { + assertTrue(exception != null); + assertTrue(exception instanceof TooManyRequestsException); + TooManyRequestsException ex = (TooManyRequestsException)exception; + assertTrue(ex.httpCode == TooManyRequestsException.TOO_MANY_REQUESTS_429); + assertTrue(exception.getMessage().contains(expectedMessage)); + } + +}