http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java index aeb89df..663e4b6 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java @@ -51,7 +51,7 @@ public class PigDelegator extends LauncherDelegator { boolean usesHcatalog, String completedUrl, boolean enablelog, Boolean enableJobReconnect) throws NotAuthorizedException, BadParam, BusyException, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, TooManyRequestsException { runAs = user; List<String> args = makeArgs(execute, srcFile, pigArgs,
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java index 5aed3b3..793881b 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java @@ -170,6 +170,7 @@ public class SecureProxySupport { return null; } }); + FileSystem.closeAllForUGI(ugi); return twrapper.tokens; } private static void collectTokens(FileSystem fs, TokenWrapper twrapper, Credentials creds, String userName) throws IOException { @@ -204,6 +205,7 @@ public class SecureProxySupport { return null; } }); + FileSystem.closeAllForUGI(ugi); } @@ -220,6 +222,7 @@ public class SecureProxySupport { return client.getDelegationToken(c.getUser(), u); } }); + FileSystem.closeAllForUGI(ugi); return s; } } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java index 2da0204..43a7d57 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java @@ -27,6 +27,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.servlet.http.HttpServletRequest; @@ -650,7 +652,7 @@ public class Server { @FormParam("enablelog") boolean enablelog, @FormParam("enablejobreconnect") Boolean enablejobreconnect) throws NotAuthorizedException, BusyException, BadParam, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, TooManyRequestsException { verifyUser(); verifyParam(inputs, "input"); verifyParam(mapper, "mapper"); @@ -704,7 +706,7 @@ public class Server { @FormParam("enablelog") boolean enablelog, @FormParam("enablejobreconnect") Boolean enablejobreconnect) throws NotAuthorizedException, BusyException, BadParam, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, TooManyRequestsException { verifyUser(); verifyParam(jar, "jar"); verifyParam(mainClass, "class"); @@ -754,7 +756,7 @@ public class Server { @FormParam("enablelog") boolean enablelog, @FormParam("enablejobreconnect") Boolean enablejobreconnect) throws NotAuthorizedException, BusyException, BadParam, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, TooManyRequestsException { verifyUser(); if (execute == null && srcFile == null) { throw new BadParam("Either execute or file parameter required"); @@ -805,7 +807,7 @@ public class Server { @FormParam("enablelog") boolean enablelog, @FormParam("enablejobreconnect") Boolean enablejobreconnect) throws NotAuthorizedException, BusyException, BadParam, QueueException, - IOException, InterruptedException { + IOException, InterruptedException, TooManyRequestsException { verifyUser(); if (command == null && optionsFile == null) throw new BadParam("Must define Sqoop command or a optionsfile contains Sqoop command to run Sqoop job."); @@ -859,7 +861,7 @@ public class Server { @FormParam("enablelog") boolean enablelog, @FormParam("enablejobreconnect") Boolean enablejobreconnect) throws NotAuthorizedException, BusyException, BadParam, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, TooManyRequestsException { verifyUser(); if (execute == null && srcFile == null) { throw new BadParam("Either execute or file parameter required"); @@ -891,7 +893,8 @@ public class Server { @Path("jobs/{jobid}") @Produces({MediaType.APPLICATION_JSON}) public QueueStatusBean showJobId(@PathParam("jobid") String jobid) - throws NotAuthorizedException, BadParam, IOException, InterruptedException { + throws NotAuthorizedException, BadParam, IOException, InterruptedException, + BusyException, TimeoutException, ExecutionException, TooManyRequestsException { verifyUser(); verifyParam(jobid, ":jobid"); @@ -968,7 +971,8 @@ public class Server { @QueryParam("showall") boolean showall, @QueryParam("jobid") String jobid, @QueryParam("numrecords") String numrecords) - throws NotAuthorizedException, BadParam, IOException, InterruptedException { + throws NotAuthorizedException, BadParam, IOException, InterruptedException, + BusyException, TimeoutException, ExecutionException, TooManyRequestsException { verifyUser(); @@ -980,19 +984,14 @@ public class Server { showDetails = true; } - ListDelegator ld = new ListDelegator(appConf); - List<String> list = ld.run(getDoAsUser(), showall); - List<JobItemBean> detailList = new ArrayList<JobItemBean>(); - int currRecord = 0; int numRecords; - // Parse numrecords to an integer try { if (numrecords != null) { numRecords = Integer.parseInt(numrecords); - if (numRecords <= 0) { - throw new BadParam("numrecords should be an integer > 0"); - } + if (numRecords <= 0) { + throw new BadParam("numrecords should be an integer > 0"); + } } else { numRecords = -1; @@ -1002,57 +1001,8 @@ public class Server { throw new BadParam("Invalid numrecords format: numrecords should be an integer > 0"); } - // Sort the list as requested - boolean isAscendingOrder = true; - switch (appConf.getListJobsOrder()) { - case lexicographicaldesc: - Collections.sort(list, Collections.reverseOrder()); - isAscendingOrder = false; - break; - case lexicographicalasc: - default: - Collections.sort(list); - break; - } - - for (String job : list) { - // If numRecords = -1, fetch all records. - // Hence skip all the below checks when numRecords = -1. - if (numRecords != -1) { - // If currRecord >= numRecords, we have already fetched the top #numRecords - if (currRecord >= numRecords) { - break; - } - else if (jobid == null || jobid.trim().length() == 0) { - currRecord++; - } - // If the current record needs to be returned based on the - // filter conditions specified by the user, increment the counter - else if (isAscendingOrder && job.compareTo(jobid) > 0 || !isAscendingOrder && job.compareTo(jobid) < 0) { - currRecord++; - } - // The current record should not be included in the output detailList. - else { - continue; - } - } - JobItemBean jobItem = new JobItemBean(); - jobItem.id = job; - if (showDetails) { - StatusDelegator sd = new StatusDelegator(appConf); - try { - jobItem.detail = sd.run(getDoAsUser(), job); - } - catch(Exception ex) { - /*if we could not get status for some reason, log it, and send empty status back with - * just the ID so that caller knows to even look in the log file*/ - LOG.info("Failed to get status detail for jobId='" + job + "'", ex); - jobItem.detail = new QueueStatusBean(job, "Failed to retrieve status; see WebHCat logs"); - } - } - detailList.add(jobItem); - } - return detailList; + ListDelegator ld = new ListDelegator(appConf); + return ld.run(getDoAsUser(), showall, jobid, numRecords, showDetails); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java index fde5f60..eb84fb2 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java @@ -50,7 +50,7 @@ public class SqoopDelegator extends LauncherDelegator { String callback, String completedUrl, boolean enablelog, Boolean enableJobReconnect, String libdir) throws NotAuthorizedException, BadParam, BusyException, QueueException, - IOException, InterruptedException + IOException, InterruptedException, TooManyRequestsException { if(TempletonUtils.isset(appConf.sqoopArchive())) { if(!TempletonUtils.isset(appConf.sqoopPath()) && !TempletonUtils.isset(appConf.sqoopHome())) { http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java index fac0170..c042ae8 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java @@ -19,10 +19,13 @@ package org.apache.hive.hcatalog.templeton; import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.mapred.JobID; @@ -41,18 +44,78 @@ import org.apache.hive.hcatalog.templeton.tool.JobState; */ public class StatusDelegator extends TempletonDelegator { private static final Logger LOG = LoggerFactory.getLogger(StatusDelegator.class); + private final String JOB_STATUS_EXECUTE_THREAD_PREFIX = "JobStatusExecute"; + + /** + * Current thread id used to set in execution threads. + */ + private final String statusThreadId = Thread.currentThread().getName(); + + /* + * Job status request executor to get status of a job. + */ + private static JobRequestExecutor<QueueStatusBean> jobRequest = + new JobRequestExecutor<QueueStatusBean>(JobRequestExecutor.JobRequestType.Status, + AppConfig.JOB_STATUS_MAX_THREADS, AppConfig.JOB_STATUS_TIMEOUT); public StatusDelegator(AppConfig appConf) { super(appConf); } - public QueueStatusBean run(String user, String id) + /* + * Gets status of job form job id. If maximum concurrent job status requests are configured + * then status request will be executed on a thread from thread pool. If job status request + * time out is configured then request execution thread will be interrupted if thread + * times out and does no action. + */ + public QueueStatusBean run(final String user, final String id, boolean enableThreadPool) + throws NotAuthorizedException, BadParam, IOException, InterruptedException, + BusyException, TimeoutException, ExecutionException, TooManyRequestsException { + if (jobRequest.isThreadPoolEnabled() && enableThreadPool) { + return jobRequest.execute(getJobStatusCallableTask(user, id)); + } else { + return getJobStatus(user, id); + } + } + + /* + * Job callable task for job status operation. Overrides behavior of execute() to get + * status of a job. No need to override behavior of cleanup() as there is nothing to be + * done if job sttaus operation is timed out or interrupted. + */ + private JobCallable<QueueStatusBean> getJobStatusCallableTask(final String user, + final String id) { + return new JobCallable<QueueStatusBean>() { + @Override + public QueueStatusBean execute() throws NotAuthorizedException, BadParam, IOException, + InterruptedException, BusyException { + /* + * Change the current thread name to include parent thread Id if it is executed + * in thread pool. Useful to extract logs specific to a job request and helpful + * to debug job issues. + */ + Thread.currentThread().setName(String.format("%s-%s-%s", JOB_STATUS_EXECUTE_THREAD_PREFIX, + statusThreadId, Thread.currentThread().getId())); + + return getJobStatus(user, id); + } + }; + } + + public QueueStatusBean run(final String user, final String id) + throws NotAuthorizedException, BadParam, IOException, InterruptedException, + BusyException, TimeoutException, ExecutionException, TooManyRequestsException { + return run(user, id, true); + } + + public QueueStatusBean getJobStatus(String user, String id) throws NotAuthorizedException, BadParam, IOException, InterruptedException { WebHCatJTShim tracker = null; JobState state = null; + UserGroupInformation ugi = null; try { - UserGroupInformation ugi = UgiFactory.getUgi(user); + ugi = UgiFactory.getUgi(user); tracker = ShimLoader.getHadoopShims().getWebHCatShim(appConf, ugi); JobID jobid = StatusDelegator.StringToJobID(id); if (jobid == null) @@ -66,6 +129,8 @@ public class StatusDelegator extends TempletonDelegator { tracker.close(); if (state != null) state.close(); + if (ugi != null) + FileSystem.closeAllForUGI(ugi); } } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java index 839b56a..590e49f 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java @@ -51,7 +51,7 @@ public class StreamingDelegator extends LauncherDelegator { Boolean enableJobReconnect, JobType jobType) throws NotAuthorizedException, BadParam, BusyException, QueueException, - ExecuteException, IOException, InterruptedException { + ExecuteException, IOException, InterruptedException, TooManyRequestsException { List<String> args = makeArgs(inputs, inputreader, output, mapper, reducer, combiner, fileList, cmdenvs, jarArgs); http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TooManyRequestsException.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TooManyRequestsException.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TooManyRequestsException.java new file mode 100644 index 0000000..9d55ad4 --- /dev/null +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TooManyRequestsException.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +/** + * Raise this exception if web service is busy with existing requests and not able + * service new requests. + */ +public class TooManyRequestsException extends SimpleWebException { + /* + * The current version of jetty server doesn't have the status + * HttpStatus.TOO_MANY_REQUESTS_429. Hence, passing this as constant. + */ + public static int TOO_MANY_REQUESTS_429 = 429; + + public TooManyRequestsException(String msg) { + super(TOO_MANY_REQUESTS_429, msg); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java index 15ab8b9..f4c4b76 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java @@ -81,9 +81,14 @@ public class TempletonControllerJob extends Configured implements Tool, JobSubmi this.appConf = conf; } - private JobID submittedJobId; + private Job job = null; public String getSubmittedId() { + if (job == null ) { + return null; + } + + JobID submittedJobId = job.getJobID(); if (submittedJobId == null) { return null; } else { @@ -119,7 +124,7 @@ public class TempletonControllerJob extends Configured implements Tool, JobSubmi String user = UserGroupInformation.getCurrentUser().getShortUserName(); conf.set("user.name", user); - Job job = new Job(conf); + job = new Job(conf); job.setJarByClass(LaunchMapper.class); job.setJobName(TempletonControllerJob.class.getSimpleName()); job.setMapperClass(LaunchMapper.class); @@ -141,7 +146,7 @@ public class TempletonControllerJob extends Configured implements Tool, JobSubmi job.submit(); - submittedJobId = job.getJobID(); + JobID submittedJobId = job.getJobID(); if(metastoreTokenStrForm != null) { //so that it can be cancelled later from CompleteDelegator DelegationTokenCache.getStringFormTokenCache().storeDelegationToken( http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java index 07b005b..e0ccc70 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java @@ -362,6 +362,7 @@ public class TempletonUtils { if (hadoopFsIsMissing(defaultFs, p)) throw new FileNotFoundException("File " + fname + " does not exist."); + FileSystem.closeAllForUGI(ugi); return p; } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/ConcurrentJobRequestsTestBase.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/ConcurrentJobRequestsTestBase.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/ConcurrentJobRequestsTestBase.java new file mode 100644 index 0000000..5fcae46 --- /dev/null +++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/ConcurrentJobRequestsTestBase.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.Future; + +import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; + +/* + * Base class for mocking job operations with concurrent requests. + */ +public class ConcurrentJobRequestsTestBase { + private static final Logger LOG = LoggerFactory.getLogger(ConcurrentJobRequestsTestBase.class); + private boolean started = false; + private Object lock = new Object(); + + MockAnswerTestHelper<QueueStatusBean> statusJobHelper = new MockAnswerTestHelper<QueueStatusBean>(); + MockAnswerTestHelper<QueueStatusBean> killJobHelper = new MockAnswerTestHelper<QueueStatusBean>(); + MockAnswerTestHelper<List<JobItemBean>> listJobHelper = new MockAnswerTestHelper<List<JobItemBean>>(); + MockAnswerTestHelper<Integer> submitJobHelper = new MockAnswerTestHelper<Integer>(); + + /* + * Waits for other threads to join and returns with its Id. + */ + private int waitForAllThreadsToStart(JobRunnable jobRunnable, int poolThreadCount) { + int currentId = jobRunnable.threadStartCount.incrementAndGet(); + LOG.info("Waiting for other threads with thread id: " + currentId); + synchronized(lock) { + /* + * We need a total of poolThreadCount + 1 threads to start at same. There are + * poolThreadCount threads in thread pool and another one which has started them. + * The thread which sees atomic counter as poolThreadCount+1 is the last thread` + * to join and wake up all threads to start all at once. + */ + if (currentId > poolThreadCount) { + LOG.info("Waking up all threads: " + currentId); + started = true; + this.lock.notifyAll(); + } else { + while (!started) { + try { + this.lock.wait(); + } catch (InterruptedException ignore) { + } + } + } + } + + return currentId; + } + + public JobRunnable ConcurrentJobsStatus(final int threadCount, AppConfig appConfig, + final boolean killThreads, boolean interruptThreads, final Answer<QueueStatusBean> answer) + throws IOException, InterruptedException, QueueException, NotAuthorizedException, + BadParam, BusyException { + + StatusDelegator delegator = new StatusDelegator(appConfig); + final StatusDelegator mockDelegator = Mockito.spy(delegator); + + Mockito.doAnswer(answer).when(mockDelegator).getJobStatus(Mockito.any(String.class), + Mockito.any(String.class)); + + JobRunnable statusJobRunnable = new JobRunnable() { + @Override + public void run() { + try { + int threadId = waitForAllThreadsToStart(this, threadCount); + LOG.info("Started executing Job Status operation. ThreadId : " + threadId); + mockDelegator.run("admin", "job_1000" + threadId); + } catch (Exception ex) { + exception = ex; + } + } + }; + + executeJobOperations(statusJobRunnable, threadCount, killThreads, interruptThreads); + return statusJobRunnable; + } + + public JobRunnable ConcurrentListJobs(final int threadCount, AppConfig config, + final boolean killThreads, boolean interruptThreads, final Answer<List<JobItemBean>> answer) + throws IOException, InterruptedException, QueueException, NotAuthorizedException, + BadParam, BusyException { + + ListDelegator delegator = new ListDelegator(config); + final ListDelegator mockDelegator = Mockito.spy(delegator); + + Mockito.doAnswer(answer).when(mockDelegator).listJobs(Mockito.any(String.class), + Mockito.any(boolean.class), Mockito.any(String.class), + Mockito.any(int.class), Mockito.any(boolean.class)); + + JobRunnable listJobRunnable = new JobRunnable() { + @Override + public void run() { + try { + int threadId = waitForAllThreadsToStart(this, threadCount); + LOG.info("Started executing Job List operation. ThreadId : " + threadId); + mockDelegator.run("admin", true, "", 10, true); + } catch (Exception ex) { + exception = ex; + } + } + }; + + executeJobOperations(listJobRunnable, threadCount, killThreads, interruptThreads); + return listJobRunnable; + } + + public JobRunnable SubmitConcurrentJobs(final int threadCount, AppConfig config, + final boolean killThreads, boolean interruptThreads, final Answer<Integer> responseAnswer, + final Answer<QueueStatusBean> timeoutResponseAnswer, final String jobIdResponse) + throws IOException, InterruptedException, QueueException, NotAuthorizedException, + BusyException, TimeoutException, Exception { + + LauncherDelegator delegator = new LauncherDelegator(config); + final LauncherDelegator mockDelegator = Mockito.spy(delegator); + final List<String> listArgs = new ArrayList<String>(); + + TempletonControllerJob mockCtrl = Mockito.mock(TempletonControllerJob.class); + + Mockito.doReturn(jobIdResponse).when(mockCtrl).getSubmittedId(); + + Mockito.doReturn(mockCtrl).when(mockDelegator).getTempletonController(); + + Mockito.doAnswer(responseAnswer).when(mockDelegator).runTempletonControllerJob( + Mockito.any(TempletonControllerJob.class), Mockito.any(List.class)); + + Mockito.doAnswer(timeoutResponseAnswer).when(mockDelegator).killJob( + Mockito.any(String.class), Mockito.any(String.class)); + + Mockito.doNothing().when(mockDelegator).registerJob(Mockito.any(String.class), + Mockito.any(String.class), Mockito.any(String.class), Mockito.any(Map.class)); + + JobRunnable submitJobRunnable = new JobRunnable() { + @Override + public void run() { + try { + int threadId = waitForAllThreadsToStart(this, threadCount); + LOG.info("Started executing Job Submit operation. ThreadId : " + threadId); + mockDelegator.enqueueController("admin", null, "", listArgs); + } catch (Throwable ex) { + exception = ex; + } + } + }; + + executeJobOperations(submitJobRunnable, threadCount, killThreads, interruptThreads); + return submitJobRunnable; + } + + public void executeJobOperations(JobRunnable jobRunnable, int threadCount, boolean killThreads, + boolean interruptThreads) + throws IOException, InterruptedException, QueueException, NotAuthorizedException { + + started = false; + + ExecutorService executorService = new ThreadPoolExecutor(threadCount, threadCount, 0L, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());; + + ArrayList<Future<?>> futures = new ArrayList<Future<?>>(); + for (int i = 0; i < threadCount; i++) { + futures.add(executorService.submit(jobRunnable)); + } + + waitForAllThreadsToStart(jobRunnable, threadCount); + LOG.info("Started all threads "); + + if (killThreads) { + executorService.shutdownNow(); + } else { + if (interruptThreads){ + for (Future<?> future : futures) { + LOG.info("Cancelling the thread"); + future.cancel(true); + } + } + + executorService.shutdown(); + } + + /* + * For both graceful or forceful shutdown, wait for tasks to terminate such that + * appropriate exceptions are raised and stored in JobRunnable.exception. + */ + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + LOG.info("Force Shutting down the pool\n"); + if (!killThreads) { + /* + * killThreads option has already done force shutdown. No need to do again. + */ + executorService.shutdownNow(); + } + } + } + + public abstract class JobRunnable implements Runnable { + public volatile Throwable exception = null; + public AtomicInteger threadStartCount = new AtomicInteger(0); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerTestHelper.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerTestHelper.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerTestHelper.java new file mode 100644 index 0000000..9f1744e --- /dev/null +++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerTestHelper.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import java.io.IOException; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/* + * Helper class to generate mocked response. + */ +public class MockAnswerTestHelper<T> { + public Answer<T> getIOExceptionAnswer() { + return new Answer<T>() { + @Override + public T answer(InvocationOnMock invocation) throws Exception { + throw new IOException("IOException raised manually."); + } + }; + } + + public Answer<T> getOutOfMemoryErrorAnswer() { + return new Answer<T>() { + @Override + public T answer(InvocationOnMock invocation) throws OutOfMemoryError { + throw new OutOfMemoryError("OutOfMemoryError raised manually."); + } + }; + } + + public Answer<T> getDelayedResonseAnswer(final int delayInSeconds, final T response) { + return new Answer<T>() { + @Override + public T answer(InvocationOnMock invocation) throws InterruptedException { + Thread.sleep(1000 * delayInSeconds); + return response; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java new file mode 100644 index 0000000..695dcc6 --- /dev/null +++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import java.util.ArrayList; + +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import static org.junit.Assert.assertTrue; + +/* + * Test submission of concurrent job requests. + */ +public class TestConcurrentJobRequests extends ConcurrentJobRequestsTestBase { + + private static AppConfig config; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @BeforeClass + public static void setUp() { + final String[] args = new String[] {}; + Main main = new Main(args); + config = main.getAppConfigInstance(); + } + + @Test + public void ConcurrentJobsStatusSuccess() { + try { + JobRunnable jobRunnable = ConcurrentJobsStatus(6, config, false, false, + statusJobHelper.getDelayedResonseAnswer(4, new QueueStatusBean("job_1000", "Job not found"))); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentListJobsSuccess() { + try { + JobRunnable jobRunnable = ConcurrentListJobs(6, config, false, false, + listJobHelper.getDelayedResonseAnswer(4, new ArrayList<JobItemBean>())); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentSubmitJobsSuccess() { + try { + JobRunnable jobRunnable = SubmitConcurrentJobs(6, config, false, false, + submitJobHelper.getDelayedResonseAnswer(4, 0), + killJobHelper.getDelayedResonseAnswer(4, null), "job_1000"); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreads.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreads.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreads.java new file mode 100644 index 0000000..6f8da40 --- /dev/null +++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreads.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.TimeoutException; +import org.eclipse.jetty.http.HttpStatus; + +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import static org.junit.Assert.assertTrue; + +/* + * Test submission of concurrent job requests with the controlled number of concurrent + * Requests. Verify that we get busy exception and appropriate message. + */ +public class TestConcurrentJobRequestsThreads extends ConcurrentJobRequestsTestBase { + + private static AppConfig config; + private static QueueStatusBean statusBean; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @BeforeClass + public static void setUp() { + final String[] args = new String[] {}; + Main main = new Main(args); + config = main.getAppConfigInstance(); + config.setInt(AppConfig.JOB_STATUS_MAX_THREADS, 5); + config.setInt(AppConfig.JOB_LIST_MAX_THREADS, 5); + config.setInt(AppConfig.JOB_SUBMIT_MAX_THREADS, 5); + statusBean = new QueueStatusBean("job_1000", "Job not found"); + } + + @Test + public void ConcurrentJobsStatusTooManyRequestsException() { + try { + JobRunnable jobRunnable = ConcurrentJobsStatus(6, config, false, false, + statusJobHelper.getDelayedResonseAnswer(4, statusBean)); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof TooManyRequestsException); + TooManyRequestsException ex = (TooManyRequestsException)jobRunnable.exception; + assertTrue(ex.httpCode == TooManyRequestsException.TOO_MANY_REQUESTS_429); + String expectedMessage = "Unable to service the status job request as templeton service is busy " + + "with too many status job requests. Please wait for some time before " + + "retrying the operation. Please refer to the config " + + "templeton.parallellism.job.status to configure concurrent requests."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + + /* + * Verify that new job requests have no issues. + */ + jobRunnable = ConcurrentJobsStatus(5, config, false, false, + statusJobHelper.getDelayedResonseAnswer(4, statusBean)); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentListJobsTooManyRequestsException() { + try { + JobRunnable jobRunnable = ConcurrentListJobs(6, config, false, false, + listJobHelper.getDelayedResonseAnswer(4, new ArrayList<JobItemBean>())); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof TooManyRequestsException); + TooManyRequestsException ex = (TooManyRequestsException)jobRunnable.exception; + assertTrue(ex.httpCode == TooManyRequestsException.TOO_MANY_REQUESTS_429); + String expectedMessage = "Unable to service the list job request as templeton service is busy " + + "with too many list job requests. Please wait for some time before " + + "retrying the operation. Please refer to the config " + + "templeton.parallellism.job.list to configure concurrent requests."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + + /* + * Verify that new job requests have no issues. + */ + jobRunnable = ConcurrentListJobs(5, config, false, false, + listJobHelper.getDelayedResonseAnswer(4, new ArrayList<JobItemBean>())); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentSubmitJobsTooManyRequestsException() { + try { + JobRunnable jobRunnable = SubmitConcurrentJobs(6, config, false, false, + submitJobHelper.getDelayedResonseAnswer(4, 0), + killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000"); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof TooManyRequestsException); + TooManyRequestsException ex = (TooManyRequestsException)jobRunnable.exception; + assertTrue(ex.httpCode == TooManyRequestsException.TOO_MANY_REQUESTS_429); + String expectedMessage = "Unable to service the submit job request as templeton service is busy " + + "with too many submit job requests. Please wait for some time before " + + "retrying the operation. Please refer to the config " + + "templeton.parallellism.job.submit to configure concurrent requests."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + + /* + * Verify that new job requests have no issues. + */ + jobRunnable = SubmitConcurrentJobs(5, config, false, false, + submitJobHelper.getDelayedResonseAnswer(4, 0), + killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000"); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreadsAndTimeout.java ---------------------------------------------------------------------- diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreadsAndTimeout.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreadsAndTimeout.java new file mode 100644 index 0000000..ef49cbd --- /dev/null +++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreadsAndTimeout.java @@ -0,0 +1,374 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.TimeoutException; +import org.eclipse.jetty.http.HttpStatus; + +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import static org.junit.Assert.assertTrue; + +/* + * Test submission of concurrent job requests with the controlled number of concurrent + * Requests and job request execution time outs. Verify that we get appropriate exceptions + * and exception message. + */ +public class TestConcurrentJobRequestsThreadsAndTimeout extends ConcurrentJobRequestsTestBase { + + private static AppConfig config; + private static QueueStatusBean statusBean; + private static String statusTooManyRequestsExceptionMessage; + private static String listTooManyRequestsExceptionMessage; + private static String submitTooManyRequestsExceptionMessage; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @BeforeClass + public static void setUp() { + final String[] args = new String[] {}; + Main main = new Main(args); + config = main.getAppConfigInstance(); + config.setInt(AppConfig.JOB_STATUS_MAX_THREADS, 5); + config.setInt(AppConfig.JOB_LIST_MAX_THREADS, 5); + config.setInt(AppConfig.JOB_SUBMIT_MAX_THREADS, 5); + config.setInt(AppConfig.JOB_SUBMIT_TIMEOUT, 5); + config.setInt(AppConfig.JOB_STATUS_TIMEOUT, 5); + config.setInt(AppConfig.JOB_LIST_TIMEOUT, 5); + config.setInt(AppConfig.JOB_TIMEOUT_TASK_RETRY_COUNT, 4); + config.setInt(AppConfig.JOB_TIMEOUT_TASK_RETRY_INTERVAL, 1); + statusBean = new QueueStatusBean("job_1000", "Job not found"); + + statusTooManyRequestsExceptionMessage = "Unable to service the status job request as " + + "templeton service is busy with too many status job requests. " + + "Please wait for some time before retrying the operation. " + + "Please refer to the config templeton.parallellism.job.status " + + "to configure concurrent requests."; + listTooManyRequestsExceptionMessage = "Unable to service the list job request as " + + "templeton service is busy with too many list job requests. " + + "Please wait for some time before retrying the operation. " + + "Please refer to the config templeton.parallellism.job.list " + + "to configure concurrent requests."; + submitTooManyRequestsExceptionMessage = "Unable to service the submit job request as " + + "templeton service is busy with too many submit job requests. " + + "Please wait for some time before retrying the operation. " + + "Please refer to the config templeton.parallellism.job.submit " + + "to configure concurrent requests."; + } + + @Test + public void ConcurrentJobsStatusTooManyRequestsException() { + try { + JobRunnable jobRunnable = ConcurrentJobsStatus(6, config, false, false, + statusJobHelper.getDelayedResonseAnswer(4, statusBean)); + verifyTooManyRequestsException(jobRunnable.exception, this.statusTooManyRequestsExceptionMessage); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentListJobsTooManyRequestsException() { + try { + JobRunnable jobRunnable = ConcurrentListJobs(6, config, false, false, + listJobHelper.getDelayedResonseAnswer(4, new ArrayList<JobItemBean>())); + verifyTooManyRequestsException(jobRunnable.exception, this.listTooManyRequestsExceptionMessage); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentSubmitJobsTooManyRequestsException() { + try { + JobRunnable jobRunnable = SubmitConcurrentJobs(6, config, false, false, + submitJobHelper.getDelayedResonseAnswer(4, 0), + killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000"); + verifyTooManyRequestsException(jobRunnable.exception, this.submitTooManyRequestsExceptionMessage); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentJobsStatusTimeOutException() { + try { + JobRunnable jobRunnable = ConcurrentJobsStatus(5, config, false, false, + statusJobHelper.getDelayedResonseAnswer(6, statusBean)); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof TimeoutException); + String expectedMessage = "Status job request got timed out. Please wait for some time before " + + "retrying the operation. Please refer to the config " + + "templeton.job.status.timeout to configure job request time out."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + + /* + * Verify that new job requests should succeed with no issues. + */ + jobRunnable = ConcurrentJobsStatus(5, config, false, false, + statusJobHelper.getDelayedResonseAnswer(0, statusBean)); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentListJobsTimeOutException() { + try { + JobRunnable jobRunnable = ConcurrentListJobs(5, config, false, false, + listJobHelper.getDelayedResonseAnswer(6, new ArrayList<JobItemBean>())); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof TimeoutException); + String expectedMessage = "List job request got timed out. Please wait for some time before " + + "retrying the operation. Please refer to the config " + + "templeton.job.list.timeout to configure job request time out."; + + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + + /* + * Verify that new job requests should succeed with no issues. + */ + jobRunnable = ConcurrentListJobs(5, config, false, false, + listJobHelper.getDelayedResonseAnswer(1, new ArrayList<JobItemBean>())); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentSubmitJobsTimeOutException() { + try { + JobRunnable jobRunnable = SubmitConcurrentJobs(5, config, false, false, + submitJobHelper.getDelayedResonseAnswer(6, 0), + killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000"); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof QueueException); + String expectedMessage = "Submit job request got timed out. Please wait for some time before " + + "retrying the operation. Please refer to the config " + + "templeton.job.submit.timeout to configure job request time out."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + + /* + * For submit operation, tasks are not cancelled. Verify that new job request + * should fail with TooManyRequestsException. + */ + jobRunnable = SubmitConcurrentJobs(1, config, false, false, + submitJobHelper.getDelayedResonseAnswer(0, 0), + killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000"); + verifyTooManyRequestsException(jobRunnable.exception, this.submitTooManyRequestsExceptionMessage); + + /* + * Sleep until all threads with clean up tasks are completed. + */ + Thread.sleep(2000); + + /* + * Now, tasks would have passed. Verify that new job requests should succeed with no issues. + */ + jobRunnable = SubmitConcurrentJobs(5, config, false, false, + submitJobHelper.getDelayedResonseAnswer(0, 0), + killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000"); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentStatusJobsVerifyExceptions() { + try { + /* + * Trigger kill threads and verify we get InterruptedException and expected Message. + */ + int timeoutTaskDelay = 4; + JobRunnable jobRunnable = ConcurrentJobsStatus(5, config, true, false, + statusJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, statusBean)); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof InterruptedException); + String expectedMessage = "Status job request got interrupted. Please wait for some time before " + + "retrying the operation."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + + /* + * Interrupt all thread and verify we get InterruptedException and expected Message. + */ + jobRunnable = ConcurrentJobsStatus(5, config, false, true, + statusJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, statusBean)); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof InterruptedException); + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + + /* + * Raise custom exception like IOException and verify expected Message. + */ + jobRunnable = ConcurrentJobsStatus(5, config, false, false, + statusJobHelper.getIOExceptionAnswer()); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception.getCause() instanceof IOException); + + /* + * Now new job requests should succeed as status operation has no cancel threads. + */ + jobRunnable = ConcurrentJobsStatus(5, config, false, false, + statusJobHelper.getDelayedResonseAnswer(0, statusBean)); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentListJobsVerifyExceptions() { + try { + /* + * Trigger kill threads and verify we get InterruptedException and expected Message. + */ + int timeoutTaskDelay = 4; + JobRunnable jobRunnable = ConcurrentListJobs(5, config, true, false, + listJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, new ArrayList<JobItemBean>())); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof InterruptedException); + String expectedMessage = "List job request got interrupted. Please wait for some time before " + + "retrying the operation."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + + /* + * Interrupt all thread and verify we get InterruptedException and expected Message. + */ + jobRunnable = ConcurrentListJobs(5, config, false, true, + listJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, new ArrayList<JobItemBean>())); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof InterruptedException); + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + + /* + * Raise custom exception like IOException and verify expected Message. + */ + jobRunnable = ConcurrentListJobs(5, config, false, false, + listJobHelper.getIOExceptionAnswer()); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception.getCause() instanceof IOException); + + /* + * Now new job requests should succeed as list operation has no cancel threads. + */ + jobRunnable = ConcurrentListJobs(5, config, false, false, + listJobHelper.getDelayedResonseAnswer(0, new ArrayList<JobItemBean>())); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + @Test + public void ConcurrentSubmitJobsVerifyExceptions() { + try { + int timeoutTaskDelay = 4; + + /* + * Raise custom exception like IOException and verify expected Message. + * This should not invoke cancel operation. + */ + JobRunnable jobRunnable = SubmitConcurrentJobs(1, config, false, false, + submitJobHelper.getIOExceptionAnswer(), + killJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, statusBean), "job_1002"); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof QueueException); + assertTrue(jobRunnable.exception.getMessage().contains("IOException raised manually.")); + + /* + * Raise custom exception like IOException and verify expected Message. + * This should not invoke cancel operation. + */ + jobRunnable = SubmitConcurrentJobs(1, config, false, false, + submitJobHelper.getOutOfMemoryErrorAnswer(), + killJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, statusBean), "job_1003"); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof QueueException); + assertTrue(jobRunnable.exception.getMessage().contains("OutOfMemoryError raised manually.")); + + /* + * Trigger kill threads and verify that we get InterruptedException and expected + * Message. This should raise 3 kill operations and ensure that retries keep the time out + * occupied for 4 sec. + */ + jobRunnable = SubmitConcurrentJobs(3, config, true, false, + submitJobHelper.getDelayedResonseAnswer(2, 0), + killJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, statusBean), "job_1000"); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof QueueException); + String expectedMessage = "Submit job request got interrupted. Please wait for some time " + + "before retrying the operation."; + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + + /* + * Interrupt all threads and verify we get InterruptedException and expected + * Message. Also raise 2 kill operations and ensure that retries keep the time out + * occupied for 4 sec. + */ + jobRunnable = SubmitConcurrentJobs(2, config, false, true, + submitJobHelper.getDelayedResonseAnswer(2, 0), + killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1001"); + assertTrue(jobRunnable.exception != null); + assertTrue(jobRunnable.exception instanceof QueueException); + assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage)); + + /* + * For submit operation, tasks are not cancelled. Verify that new job request + * should fail with TooManyRequestsException. + */ + jobRunnable = SubmitConcurrentJobs(1, config, false, false, + submitJobHelper.getDelayedResonseAnswer(0, 0), + killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1002"); + verifyTooManyRequestsException(jobRunnable.exception, this.submitTooManyRequestsExceptionMessage); + + /* + * Sleep until all threads with clean up tasks are completed. 2 seconds completing task + * and 1 sec grace period. + */ + Thread.sleep((timeoutTaskDelay + 2 + 1) * 1000); + + /* + * Now new job requests should succeed as all cancel threads would have completed. + */ + jobRunnable = SubmitConcurrentJobs(5, config, false, false, + submitJobHelper.getDelayedResonseAnswer(0, 0), + killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1004"); + assertTrue(jobRunnable.exception == null); + } catch (Exception e) { + assertTrue(false); + } + } + + private void verifyTooManyRequestsException(Throwable exception, String expectedMessage) { + assertTrue(exception != null); + assertTrue(exception instanceof TooManyRequestsException); + TooManyRequestsException ex = (TooManyRequestsException)exception; + assertTrue(ex.httpCode == TooManyRequestsException.TOO_MANY_REQUESTS_429); + assertTrue(exception.getMessage().contains(expectedMessage)); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hplsql/pom.xml ---------------------------------------------------------------------- diff --git a/hplsql/pom.xml b/hplsql/pom.xml index d1337cb..44da8b2 100644 --- a/hplsql/pom.xml +++ b/hplsql/pom.xml @@ -19,7 +19,7 @@ <parent> <groupId>org.apache.hive</groupId> <artifactId>hive</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>3.0.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hplsql/src/main/java/org/apache/hive/hplsql/Udf.java ---------------------------------------------------------------------- diff --git a/hplsql/src/main/java/org/apache/hive/hplsql/Udf.java b/hplsql/src/main/java/org/apache/hive/hplsql/Udf.java index 9c29eeb..4901e89 100644 --- a/hplsql/src/main/java/org/apache/hive/hplsql/Udf.java +++ b/hplsql/src/main/java/org/apache/hive/hplsql/Udf.java @@ -60,15 +60,7 @@ public class Udf extends GenericUDF { @Override public Object evaluate(DeferredObject[] arguments) throws HiveException { if (exec == null) { - exec = new Exec(); - String query = queryOI.getPrimitiveJavaObject(arguments[0].get()); - String[] args = { "-e", query, "-trace" }; - try { - exec.setUdfRun(true); - exec.init(args); - } catch (Exception e) { - throw new HiveException(e.getMessage()); - } + initExec(arguments); } if (arguments.length > 1) { setParameters(arguments); @@ -79,6 +71,22 @@ public class Udf extends GenericUDF { } return null; } + + /** + * init exec + */ + public void initExec(DeferredObject[] arguments) throws HiveException { + exec = new Exec(); + exec.enterGlobalScope(); + String query = queryOI.getPrimitiveJavaObject(arguments[0].get()); + String[] args = { "-e", query, "-trace" }; + try { + exec.setUdfRun(true); + exec.init(args); + } catch (Exception e) { + throw new HiveException(e.getMessage()); + } + } /** * Set parameters for the current call http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hplsql/src/test/java/org/apache/hive/hplsql/TestHplsqlUdf.java ---------------------------------------------------------------------- diff --git a/hplsql/src/test/java/org/apache/hive/hplsql/TestHplsqlUdf.java b/hplsql/src/test/java/org/apache/hive/hplsql/TestHplsqlUdf.java new file mode 100644 index 0000000..3896229 --- /dev/null +++ b/hplsql/src/test/java/org/apache/hive/hplsql/TestHplsqlUdf.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.hplsql; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredJavaObject; + +public class TestHplsqlUdf { + StringObjectInspector queryOI = PrimitiveObjectInspectorFactory.javaStringObjectInspector; + ObjectInspector argOI = PrimitiveObjectInspectorFactory.javaStringObjectInspector; + + /** + * test evaluate for exec init and setParameters + */ + @Test + public void testEvaluateWithoutRun() throws HiveException { + // init udf + Udf udf = new Udf(); + ObjectInspector[] initArguments = {queryOI, argOI}; + udf.initialize(initArguments); + //set arguments + DeferredObject queryObj = new DeferredJavaObject("hello(:1)"); + DeferredObject argObj = new DeferredJavaObject("name"); + DeferredObject[] argumentsObj = {queryObj, argObj}; + + // init exec and set parameters, included + udf.initExec(argumentsObj); + udf.setParameters(argumentsObj); + + // checking var exists and its value is right + Var var = udf.exec.findVariable(":1"); + Assert.assertNotNull(var); + String val = (String) var.value; + Assert.assertEquals(val, "name"); + } +} + http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/custom-serde/pom.xml ---------------------------------------------------------------------- diff --git a/itests/custom-serde/pom.xml b/itests/custom-serde/pom.xml index 166ffde..78b68c5 100644 --- a/itests/custom-serde/pom.xml +++ b/itests/custom-serde/pom.xml @@ -19,7 +19,7 @@ <parent> <groupId>org.apache.hive</groupId> <artifactId>hive-it</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>3.0.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/custom-udfs/pom.xml ---------------------------------------------------------------------- diff --git a/itests/custom-udfs/pom.xml b/itests/custom-udfs/pom.xml index b230b41..de7df16 100644 --- a/itests/custom-udfs/pom.xml +++ b/itests/custom-udfs/pom.xml @@ -19,7 +19,7 @@ limitations under the License. <parent> <groupId>org.apache.hive</groupId> <artifactId>hive-it</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>3.0.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/custom-udfs/udf-classloader-udf1/pom.xml ---------------------------------------------------------------------- diff --git a/itests/custom-udfs/udf-classloader-udf1/pom.xml b/itests/custom-udfs/udf-classloader-udf1/pom.xml index 0a95c94..f863efd 100644 --- a/itests/custom-udfs/udf-classloader-udf1/pom.xml +++ b/itests/custom-udfs/udf-classloader-udf1/pom.xml @@ -19,7 +19,7 @@ <parent> <groupId>org.apache.hive</groupId> <artifactId>hive-it-custom-udfs</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>3.0.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/custom-udfs/udf-classloader-udf2/pom.xml ---------------------------------------------------------------------- diff --git a/itests/custom-udfs/udf-classloader-udf2/pom.xml b/itests/custom-udfs/udf-classloader-udf2/pom.xml index e3f30f1..2553f3e 100644 --- a/itests/custom-udfs/udf-classloader-udf2/pom.xml +++ b/itests/custom-udfs/udf-classloader-udf2/pom.xml @@ -19,7 +19,7 @@ <parent> <groupId>org.apache.hive</groupId> <artifactId>hive-it-custom-udfs</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>3.0.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/custom-udfs/udf-classloader-util/pom.xml ---------------------------------------------------------------------- diff --git a/itests/custom-udfs/udf-classloader-util/pom.xml b/itests/custom-udfs/udf-classloader-util/pom.xml index fe285d7..565a661 100644 --- a/itests/custom-udfs/udf-classloader-util/pom.xml +++ b/itests/custom-udfs/udf-classloader-util/pom.xml @@ -19,7 +19,7 @@ <parent> <groupId>org.apache.hive</groupId> <artifactId>hive-it-custom-udfs</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>3.0.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/custom-udfs/udf-vectorized-badexample/pom.xml ---------------------------------------------------------------------- diff --git a/itests/custom-udfs/udf-vectorized-badexample/pom.xml b/itests/custom-udfs/udf-vectorized-badexample/pom.xml index 35c1a2f..6dc923d 100644 --- a/itests/custom-udfs/udf-vectorized-badexample/pom.xml +++ b/itests/custom-udfs/udf-vectorized-badexample/pom.xml @@ -19,7 +19,7 @@ <parent> <groupId>org.apache.hive</groupId> <artifactId>hive-it-custom-udfs</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>3.0.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/hcatalog-unit/pom.xml ---------------------------------------------------------------------- diff --git a/itests/hcatalog-unit/pom.xml b/itests/hcatalog-unit/pom.xml index 3ef87f9..c157aed 100644 --- a/itests/hcatalog-unit/pom.xml +++ b/itests/hcatalog-unit/pom.xml @@ -25,7 +25,7 @@ <parent> <groupId>org.apache.hive</groupId> <artifactId>hive-it</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>3.0.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java ---------------------------------------------------------------------- diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java index 8468b84..93ff498 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; @@ -915,6 +916,12 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable { } @Override + public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName, + String tableName) throws MetaException, NoSuchObjectException { + return objectStore.getAggrColStatsForTablePartitions(dbName, tableName); + } + + @Override @CanNotRetry public Boolean commitTransactionExpectDeadlock() { return null;