This is an automated email from the ASF dual-hosted git repository. daijy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 7b0b9ae HIVE-20550: Switch WebHCat to use beeline to submit Hive queries (Daniel Dai, reviewed by Thejas Nair) 7b0b9ae is described below commit 7b0b9ae328757a57d1d88377ea84091487a25f8f Author: Daniel Dai <dai...@gmail.com> AuthorDate: Tue Feb 26 16:35:30 2019 -0800 HIVE-20550: Switch WebHCat to use beeline to submit Hive queries (Daniel Dai, reviewed by Thejas Nair) Signed-off-by: Thejas M Nair <the...@hortonworks.com> --- .../test/e2e/templeton/drivers/TestDriverCurl.pm | 6 +-- .../test/e2e/templeton/tests/jobsubmission.conf | 6 +-- .../hive/hcatalog/templeton/DeleteDelegator.java | 59 +++++++++++++++++++--- .../hive/hcatalog/templeton/HiveDelegator.java | 25 +++------ .../hive/hcatalog/templeton/tool/JobState.java | 13 +++++ .../templeton/tool/JobSubmissionConstants.java | 3 ++ .../hive/hcatalog/templeton/tool/LaunchMapper.java | 23 ++++++--- .../hcatalog/templeton/tool/TempletonUtils.java | 6 +++ .../templeton/tool/TestTempletonUtils.java | 3 ++ 9 files changed, 107 insertions(+), 37 deletions(-) diff --git a/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm b/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm index 66a6ca1..e62269b 100644 --- a/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm +++ b/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm @@ -555,12 +555,12 @@ sub execCurlCmd(){ my %result; my $out; my $err; - IPC::Run::run(\@curl_cmd, \undef, $out, $err) + IPC::Run::run(\@curl_cmd, \undef, $log, $log) or die "Failed running curl cmd " . join ' ', @curl_cmd; $result{'rc'} = $? >> 8; - $result{'stderr'} = $err; - $result{'stdout'} = $out; + $result{'stderr'} = $log; + $result{'stdout'} = $log; $result{'body'} = `cat $res_body`; my @full_header = `cat $res_header`; diff --git a/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf b/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf index a1b0284..824eb92 100644 --- a/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf +++ b/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf @@ -324,7 +324,7 @@ $cfg = #results 'status_code' => 200, 'check_job_created' => 1, - 'check_job_exit_value' => 64, + 'check_job_exit_value' => 1, }, @@ -443,7 +443,7 @@ $cfg = 'num' => 9, 'method' => 'POST', 'url' => ':TEMPLETON_URL:/templeton/v1/hive?user.name=:UNAME:', - 'post_options' => ['execute=add jar piggybank.jar', 'files=:INPDIR_HDFS:/piggybank.jar',], + 'post_options' => ['execute=add jar :INPDIR_HDFS:/piggybank.jar',], 'json_field_substr_match' => { 'id' => '\d+'}, #results 'status_code' => 200, @@ -499,7 +499,7 @@ $cfg = { #enable logs 'num' => 13, - 'ignore23' => 'Log collector does not work with Hadoop 2', + 'ignore' => 'Log collector does not work with Hadoop 2/3', 'method' => 'POST', 'url' => ':TEMPLETON_URL:/templeton/v1/hive?user.name=:UNAME:', 'post_options' => ['execute=select a,b from mynums', 'statusdir=:OUTDIR:/status', 'enablelog=true'], diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java index 049c9a4..5afd1b9 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java @@ -19,8 +19,16 @@ package org.apache.hive.hcatalog.templeton; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import org.apache.commons.io.IOUtils; +import org.apache.hive.hcatalog.templeton.tool.TempletonUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim; @@ -39,6 +47,36 @@ public class DeleteDelegator extends TempletonDelegator { super(appConf); } + private String runProgram(String[] cmd) throws IOException, InterruptedException { + ProcessBuilder pb = new ProcessBuilder(cmd); + Set<String> keys = new HashSet<String>(pb.environment().keySet()); + for (String key : keys) { + pb.environment().remove(key); + } + Process p = pb.start(); + String stdout = IOUtils.toString(p.getInputStream()); + String stderr = IOUtils.toString(p.getErrorStream()); + int code = p.waitFor(); + if (code != 0) { + throw new IOException("non-zero exit code " + code + " when running " + Arrays.toString(cmd) + "\n" + + "stdout: " + stdout + "\n" + "stderr: " + stderr + "\n"); + } + return stdout; + } + + private void killHiveQuery(String user, String tag) throws IOException, InterruptedException { + String[] cmd = new String[] {appConf.hivePath(), "--getUrlsFromBeelineSite"}; + String urlsString = runProgram(cmd); + String[] urls = urlsString.substring(6).split(","); + for (String url : urls) { + if (url != null && !url.trim().isEmpty()) { + cmd = new String[]{appConf.hivePath(), "-u", "jdbc:hive2://" + url, "-n", user, + "-e", "kill query '" + tag + "'"}; + runProgram(cmd); + } + } + } + public QueueStatusBean run(String user, String id) throws NotAuthorizedException, BadParam, IOException, InterruptedException { @@ -53,13 +91,20 @@ public class DeleteDelegator extends TempletonDelegator { throw new BadParam("Invalid jobid: " + id); tracker.killJob(jobid); state = new JobState(id, Main.getAppConfigInstance()); - List<JobState> children = state.getChildren(); - if (children != null) { - for (JobState child : children) { - try { - tracker.killJob(StatusDelegator.StringToJobID(child.getId())); - } catch (IOException e) { - LOG.warn("templeton: fail to kill job " + child.getId()); + if (state.getJobType() != null) { + LauncherDelegator.JobType jobType = LauncherDelegator.JobType.valueOf(state.getJobType()); + if (jobType == LauncherDelegator.JobType.HIVE) { + killHiveQuery(user, jobid.toString()); + } else { + List<JobState> children = state.getChildren(); + if (children != null) { + for (JobState child : children) { + try { + tracker.killJob(StatusDelegator.StringToJobID(child.getId())); + } catch (IOException e) { + LOG.warn("templeton: fail to kill job " + child.getId()); + } + } } } } diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java index a3f57df..3f1968d 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java @@ -52,13 +52,13 @@ public class HiveDelegator extends LauncherDelegator { ExecuteException, IOException, InterruptedException, TooManyRequestsException { runAs = user; - List<String> args = makeArgs(execute, srcFile, defines, hiveArgs, otherFiles, statusdir, + List<String> args = makeArgs(user, execute, srcFile, defines, hiveArgs, otherFiles, statusdir, completedUrl, enablelog, enableJobReconnect); return enqueueController(user, userArgs, callback, args); } - private List<String> makeArgs(String execute, String srcFile, + private List<String> makeArgs(String user, String execute, String srcFile, List<String> defines, List<String> hiveArgs, String otherFiles, String statusdir, String completedUrl, boolean enablelog, Boolean enableJobReconnect) @@ -73,26 +73,15 @@ public class HiveDelegator extends LauncherDelegator { args.add(appConf.hivePath()); - args.add("--service"); - args.add("cli"); - - //the token file location as initial hiveconf arg - args.add("--hiveconf"); - args.add(TempletonControllerJob.TOKEN_FILE_ARG_PLACEHOLDER); - - //this is needed specifcally for Hive on Tez (in addition to - //JobSubmissionConstants.TOKEN_FILE_ARG_PLACEHOLDER) - args.add("--hiveconf"); - args.add(JobSubmissionConstants.TOKEN_FILE_ARG_PLACEHOLDER_TEZ); + args.add("-n"); + args.add(user); + args.add("-p"); + args.add("default"); //add mapreduce job tag placeholder args.add("--hiveconf"); - args.add(TempletonControllerJob.MAPREDUCE_JOB_TAGS_ARG_PLACEHOLDER); + args.add(TempletonControllerJob.HIVE_QUERY_TAG_ARG_PLACEHOLDER); - for (String prop : appConf.hiveProps()) { - args.add("--hiveconf"); - args.add(prop); - } for (String prop : defines) { args.add("--hiveconf"); args.add(prop); diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobState.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobState.java index 74cf1e5..52738b7 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobState.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobState.java @@ -127,6 +127,19 @@ public class JobState { } /** + * The job type + */ + public String getJobType() + throws IOException { + return getField("jobType"); + } + + public void setJobType(String jobType) + throws IOException { + setField("jobType", jobType); + } + + /** * Add a jobid to the list of children of this job. * * @param jobid diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java index f3a79f6..9e90b8d 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java @@ -54,8 +54,11 @@ public interface JobSubmissionConstants { // previously running child jobs can be killed before the child job is launched // again. public static final String MAPREDUCE_JOB_TAGS = "mapreduce.job.tags"; + public static final String HIVE_QUERY_TAG = "hive.query.tag"; public static final String MAPREDUCE_JOB_TAGS_ARG_PLACEHOLDER = "__MR_JOB_TAGS_OPTION=MR_JOB_TAGS_JOBID__"; + public static final String HIVE_QUERY_TAG_ARG_PLACEHOLDER = + "__HIVE_QUERY_TAG_OPTION=HIVE_QUERY_TAG_JOBID__"; public static final String HADOOP_CLASSPATH = "HADOOP_CLASSPATH"; /** diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java index 5de04e3..b1f4a6a 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java @@ -153,7 +153,8 @@ public class LaunchMapper extends Mapper<NullWritable, NullWritable, Text, Text> env.put(pathVarName, paths); } } - protected Process startJob(Configuration conf, String jobId, String user, String overrideClasspath) + protected Process startJob(Configuration conf, String jobId, String user, String overrideClasspath, + LauncherDelegator.JobType jobType) throws IOException, InterruptedException { copyLocal(COPY_NAME, conf); @@ -172,8 +173,14 @@ public class LaunchMapper extends Mapper<NullWritable, NullWritable, Text, Text> List<String> jarArgsList = new LinkedList<String>(Arrays.asList(jarArgs)); handleTokenFile(jarArgsList, JobSubmissionConstants.TOKEN_FILE_ARG_PLACEHOLDER, "mapreduce.job.credentials.binary"); handleTokenFile(jarArgsList, JobSubmissionConstants.TOKEN_FILE_ARG_PLACEHOLDER_TEZ, "tez.credentials.path"); - handleMapReduceJobTag(jarArgsList, JobSubmissionConstants.MAPREDUCE_JOB_TAGS_ARG_PLACEHOLDER, - JobSubmissionConstants.MAPREDUCE_JOB_TAGS, jobId); + if (jobType == LauncherDelegator.JobType.HIVE) { + replaceJobTag(jarArgsList, JobSubmissionConstants.HIVE_QUERY_TAG_ARG_PLACEHOLDER, + JobSubmissionConstants.HIVE_QUERY_TAG, jobId); + } else { + replaceJobTag(jarArgsList, JobSubmissionConstants.MAPREDUCE_JOB_TAGS_ARG_PLACEHOLDER, + JobSubmissionConstants.MAPREDUCE_JOB_TAGS, jobId); + } + return TrivialExecService.getInstance().run(jarArgsList, removeEnv, env); } @@ -245,11 +252,11 @@ public class LaunchMapper extends Mapper<NullWritable, NullWritable, Text, Text> } /** - * Replace the placeholder mapreduce tags with our MR jobid so that all child jobs + * Replace the placeholder tags with our MR jobid so that all child jobs or hive queries are * get tagged with it. This is used on launcher task restart to prevent from having * same jobs running in parallel. */ - private static void handleMapReduceJobTag(List<String> jarArgsList, String placeholder, + private static void replaceJobTag(List<String> jarArgsList, String placeholder, String mapReduceJobTagsProp, String currentJobId) throws IOException { String arg = String.format("%s=%s", mapReduceJobTagsProp, currentJobId); for(int i = 0; i < jarArgsList.size(); i++) { @@ -401,8 +408,12 @@ public class LaunchMapper extends Mapper<NullWritable, NullWritable, Text, Text> Process proc = startJob(conf, context.getJobID().toString(), conf.get("user.name"), - conf.get(OVERRIDE_CLASSPATH)); + conf.get(OVERRIDE_CLASSPATH), + jobType); + JobState state = new JobState(context.getJobID().toString(), conf); + state.setJobType(jobType.toString()); + state.close(); ExecutorService pool = Executors.newCachedThreadPool(); executeWatcher(pool, conf, context.getJobID(), proc.getInputStream(), statusdir, STDOUT_FNAME); diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java index df8c32e..29499a2 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java @@ -115,6 +115,7 @@ public class TempletonUtils { * groups that don't add information such as "Map 1: -/-" */ public static final Pattern HIVE_TEZ_COMPLETE = Pattern.compile("(Map|Reducer) (\\d+:) (\\d+(\\(\\+\\d+\\))?/\\d+)"); + public static final Pattern HIVE_BEELINE_COMPLETE = Pattern.compile("VERTICES: .* (\\d+%)"); /** * Pig on Tez produces progress report that looks like this * DAG Status: status=RUNNING, progress=TotalTasks: 3 Succeeded: 0 Running: 0 Failed: 0 Killed: 0 @@ -139,6 +140,11 @@ public class TempletonUtils { if (pig.find()) return pig.group().trim(); + Matcher beeline = HIVE_BEELINE_COMPLETE.matcher(line); + if (beeline.find()) { + return beeline.group(1).trim() + " complete"; + } + Matcher hive = HIVE_COMPLETE.matcher(line); if(hive.find()) { return "map " + hive.group(1) + " reduce " + hive.group(2); diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java index 51ed867..cf48221 100644 --- a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java +++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java @@ -90,6 +90,9 @@ public class TestTempletonUtils { String fifty = "2011-12-15 18:12:36,333 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 50% complete"; Assert.assertEquals("50% complete", TempletonUtils.extractPercentComplete(fifty)); + + String beeline = "VERTICES: 01/02 [==========================>>] 70% ELAPSED TIME: 3.79 s"; + Assert.assertEquals("70% complete", TempletonUtils.extractPercentComplete(beeline)); } @Test