This is an automated email from the ASF dual-hosted git repository.

daijy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b0b9ae  HIVE-20550: Switch WebHCat to use beeline to submit Hive 
queries (Daniel Dai, reviewed by Thejas Nair)
7b0b9ae is described below

commit 7b0b9ae328757a57d1d88377ea84091487a25f8f
Author: Daniel Dai <dai...@gmail.com>
AuthorDate: Tue Feb 26 16:35:30 2019 -0800

    HIVE-20550: Switch WebHCat to use beeline to submit Hive queries (Daniel 
Dai, reviewed by Thejas Nair)
    
    Signed-off-by: Thejas M Nair <the...@hortonworks.com>
---
 .../test/e2e/templeton/drivers/TestDriverCurl.pm   |  6 +--
 .../test/e2e/templeton/tests/jobsubmission.conf    |  6 +--
 .../hive/hcatalog/templeton/DeleteDelegator.java   | 59 +++++++++++++++++++---
 .../hive/hcatalog/templeton/HiveDelegator.java     | 25 +++------
 .../hive/hcatalog/templeton/tool/JobState.java     | 13 +++++
 .../templeton/tool/JobSubmissionConstants.java     |  3 ++
 .../hive/hcatalog/templeton/tool/LaunchMapper.java | 23 ++++++---
 .../hcatalog/templeton/tool/TempletonUtils.java    |  6 +++
 .../templeton/tool/TestTempletonUtils.java         |  3 ++
 9 files changed, 107 insertions(+), 37 deletions(-)

diff --git a/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm 
b/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm
index 66a6ca1..e62269b 100644
--- a/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm
+++ b/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm
@@ -555,12 +555,12 @@ sub execCurlCmd(){
     my %result;
     my $out;
     my $err;
-    IPC::Run::run(\@curl_cmd, \undef, $out, $err)
+    IPC::Run::run(\@curl_cmd, \undef, $log, $log)
         or die "Failed running curl cmd " . join ' ', @curl_cmd;
 
     $result{'rc'} = $? >> 8;
-    $result{'stderr'} = $err;
-    $result{'stdout'} = $out;
+    $result{'stderr'} = $log;
+    $result{'stdout'} = $log;
     $result{'body'} = `cat $res_body`;
 
     my @full_header = `cat $res_header`;
diff --git a/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf 
b/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf
index a1b0284..824eb92 100644
--- a/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf
+++ b/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf
@@ -324,7 +324,7 @@ $cfg =
                                 #results
      'status_code' => 200,
      'check_job_created' => 1,
-     'check_job_exit_value' => 64,
+     'check_job_exit_value' => 1,
 
     },
  
@@ -443,7 +443,7 @@ $cfg =
      'num' => 9,
      'method' => 'POST',
      'url' => ':TEMPLETON_URL:/templeton/v1/hive?user.name=:UNAME:',
-     'post_options' => ['execute=add jar piggybank.jar', 
'files=:INPDIR_HDFS:/piggybank.jar',],
+     'post_options' => ['execute=add jar :INPDIR_HDFS:/piggybank.jar',],
      'json_field_substr_match' => { 'id' => '\d+'},
                                 #results
      'status_code' => 200,
@@ -499,7 +499,7 @@ $cfg =
     {
                                 #enable logs
      'num' => 13,
-     'ignore23' => 'Log collector does not work with Hadoop 2',
+     'ignore' => 'Log collector does not work with Hadoop 2/3',
      'method' => 'POST',
      'url' => ':TEMPLETON_URL:/templeton/v1/hive?user.name=:UNAME:',
      'post_options' => ['execute=select a,b from mynums', 
'statusdir=:OUTDIR:/status', 'enablelog=true'],
diff --git 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java
 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java
index 049c9a4..5afd1b9 100644
--- 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java
+++ 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/DeleteDelegator.java
@@ -19,8 +19,16 @@
 package org.apache.hive.hcatalog.templeton;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.hive.hcatalog.templeton.tool.TempletonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
@@ -39,6 +47,36 @@ public class DeleteDelegator extends TempletonDelegator {
     super(appConf);
   }
 
+  private String runProgram(String[] cmd) throws IOException, 
InterruptedException {
+    ProcessBuilder pb = new ProcessBuilder(cmd);
+    Set<String> keys = new HashSet<String>(pb.environment().keySet());
+    for (String key : keys) {
+      pb.environment().remove(key);
+    }
+    Process p = pb.start();
+    String stdout = IOUtils.toString(p.getInputStream());
+    String stderr = IOUtils.toString(p.getErrorStream());
+    int code = p.waitFor();
+    if (code != 0) {
+      throw new IOException("non-zero exit code " + code + " when running " + 
Arrays.toString(cmd) + "\n"
+              + "stdout: " + stdout + "\n" + "stderr: " + stderr + "\n");
+    }
+    return stdout;
+  }
+
+  private void killHiveQuery(String user, String tag) throws IOException, 
InterruptedException {
+    String[] cmd = new String[] {appConf.hivePath(), 
"--getUrlsFromBeelineSite"};
+    String urlsString = runProgram(cmd);
+    String[] urls = urlsString.substring(6).split(",");
+    for (String url : urls) {
+      if (url != null && !url.trim().isEmpty()) {
+        cmd = new String[]{appConf.hivePath(), "-u", "jdbc:hive2://" + url, 
"-n", user,
+                "-e", "kill query '" + tag + "'"};
+        runProgram(cmd);
+      }
+    }
+  }
+
   public QueueStatusBean run(String user, String id)
     throws NotAuthorizedException, BadParam, IOException, InterruptedException
   {
@@ -53,13 +91,20 @@ public class DeleteDelegator extends TempletonDelegator {
         throw new BadParam("Invalid jobid: " + id);
       tracker.killJob(jobid);
       state = new JobState(id, Main.getAppConfigInstance());
-      List<JobState> children = state.getChildren();
-      if (children != null) {
-        for (JobState child : children) {
-          try {
-            tracker.killJob(StatusDelegator.StringToJobID(child.getId()));
-          } catch (IOException e) {
-            LOG.warn("templeton: fail to kill job " + child.getId());
+      if (state.getJobType() != null) {
+        LauncherDelegator.JobType jobType = 
LauncherDelegator.JobType.valueOf(state.getJobType());
+        if (jobType == LauncherDelegator.JobType.HIVE) {
+          killHiveQuery(user, jobid.toString());
+        } else {
+          List<JobState> children = state.getChildren();
+          if (children != null) {
+            for (JobState child : children) {
+              try {
+                tracker.killJob(StatusDelegator.StringToJobID(child.getId()));
+              } catch (IOException e) {
+                LOG.warn("templeton: fail to kill job " + child.getId());
+              }
+            }
           }
         }
       }
diff --git 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
index a3f57df..3f1968d 100644
--- 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
+++ 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
@@ -52,13 +52,13 @@ public class HiveDelegator extends LauncherDelegator {
     ExecuteException, IOException, InterruptedException, 
TooManyRequestsException
   {
     runAs = user;
-    List<String> args = makeArgs(execute, srcFile, defines, hiveArgs, 
otherFiles, statusdir,
+    List<String> args = makeArgs(user, execute, srcFile, defines, hiveArgs, 
otherFiles, statusdir,
                    completedUrl, enablelog, enableJobReconnect);
 
     return enqueueController(user, userArgs, callback, args);
   }
 
-  private List<String> makeArgs(String execute, String srcFile,
+  private List<String> makeArgs(String user, String execute, String srcFile,
              List<String> defines, List<String> hiveArgs, String otherFiles,
              String statusdir, String completedUrl, boolean enablelog,
              Boolean enableJobReconnect)
@@ -73,26 +73,15 @@ public class HiveDelegator extends LauncherDelegator {
       
       args.add(appConf.hivePath());
 
-      args.add("--service");
-      args.add("cli");
-
-      //the token file location as initial hiveconf arg
-      args.add("--hiveconf");
-      args.add(TempletonControllerJob.TOKEN_FILE_ARG_PLACEHOLDER);
-
-      //this is needed specifcally for Hive on Tez (in addition to
-      //JobSubmissionConstants.TOKEN_FILE_ARG_PLACEHOLDER)
-      args.add("--hiveconf");
-      args.add(JobSubmissionConstants.TOKEN_FILE_ARG_PLACEHOLDER_TEZ);
+      args.add("-n");
+      args.add(user);
+      args.add("-p");
+      args.add("default");
 
       //add mapreduce job tag placeholder
       args.add("--hiveconf");
-      args.add(TempletonControllerJob.MAPREDUCE_JOB_TAGS_ARG_PLACEHOLDER);
+      args.add(TempletonControllerJob.HIVE_QUERY_TAG_ARG_PLACEHOLDER);
 
-      for (String prop : appConf.hiveProps()) {
-        args.add("--hiveconf");
-        args.add(prop);
-      }
       for (String prop : defines) {
         args.add("--hiveconf");
         args.add(prop);
diff --git 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobState.java
 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobState.java
index 74cf1e5..52738b7 100644
--- 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobState.java
+++ 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobState.java
@@ -127,6 +127,19 @@ public class JobState {
   }
 
   /**
+   * The job type
+   */
+  public String getJobType()
+          throws IOException {
+    return getField("jobType");
+  }
+
+  public void setJobType(String jobType)
+          throws IOException {
+    setField("jobType", jobType);
+  }
+
+  /**
    * Add a jobid to the list of children of this job.
    *
    * @param jobid
diff --git 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java
 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java
index f3a79f6..9e90b8d 100644
--- 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java
+++ 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java
@@ -54,8 +54,11 @@ public interface JobSubmissionConstants {
   // previously running child jobs can be killed before the child job is 
launched
   // again.
   public static final String MAPREDUCE_JOB_TAGS = "mapreduce.job.tags";
+  public static final String HIVE_QUERY_TAG = "hive.query.tag";
   public static final String MAPREDUCE_JOB_TAGS_ARG_PLACEHOLDER =
     "__MR_JOB_TAGS_OPTION=MR_JOB_TAGS_JOBID__";
+  public static final String HIVE_QUERY_TAG_ARG_PLACEHOLDER =
+          "__HIVE_QUERY_TAG_OPTION=HIVE_QUERY_TAG_JOBID__";
 
   public static final String HADOOP_CLASSPATH = "HADOOP_CLASSPATH";
   /**
diff --git 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java
 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java
index 5de04e3..b1f4a6a 100644
--- 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java
+++ 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java
@@ -153,7 +153,8 @@ public class LaunchMapper extends Mapper<NullWritable, 
NullWritable, Text, Text>
       env.put(pathVarName, paths);
     }
   }
-  protected Process startJob(Configuration conf, String jobId, String user, 
String overrideClasspath)
+  protected Process startJob(Configuration conf, String jobId, String user, 
String overrideClasspath,
+                             LauncherDelegator.JobType jobType)
     throws IOException, InterruptedException {
 
     copyLocal(COPY_NAME, conf);
@@ -172,8 +173,14 @@ public class LaunchMapper extends Mapper<NullWritable, 
NullWritable, Text, Text>
     List<String> jarArgsList = new LinkedList<String>(Arrays.asList(jarArgs));
     handleTokenFile(jarArgsList, 
JobSubmissionConstants.TOKEN_FILE_ARG_PLACEHOLDER, 
"mapreduce.job.credentials.binary");
     handleTokenFile(jarArgsList, 
JobSubmissionConstants.TOKEN_FILE_ARG_PLACEHOLDER_TEZ, "tez.credentials.path");
-    handleMapReduceJobTag(jarArgsList, 
JobSubmissionConstants.MAPREDUCE_JOB_TAGS_ARG_PLACEHOLDER,
-        JobSubmissionConstants.MAPREDUCE_JOB_TAGS, jobId);
+    if (jobType == LauncherDelegator.JobType.HIVE) {
+      replaceJobTag(jarArgsList, 
JobSubmissionConstants.HIVE_QUERY_TAG_ARG_PLACEHOLDER,
+              JobSubmissionConstants.HIVE_QUERY_TAG, jobId);
+    } else {
+      replaceJobTag(jarArgsList, 
JobSubmissionConstants.MAPREDUCE_JOB_TAGS_ARG_PLACEHOLDER,
+              JobSubmissionConstants.MAPREDUCE_JOB_TAGS, jobId);
+    }
+
     return TrivialExecService.getInstance().run(jarArgsList, removeEnv, env);
   }
 
@@ -245,11 +252,11 @@ public class LaunchMapper extends Mapper<NullWritable, 
NullWritable, Text, Text>
   }
 
   /**
-   * Replace the placeholder mapreduce tags with our MR jobid so that all 
child jobs
+   * Replace the placeholder tags with our MR jobid so that all child jobs or 
hive queries are
    * get tagged with it. This is used on launcher task restart to prevent from 
having
    * same jobs running in parallel.
    */
-  private static void handleMapReduceJobTag(List<String> jarArgsList, String 
placeholder,
+  private static void replaceJobTag(List<String> jarArgsList, String 
placeholder,
       String mapReduceJobTagsProp, String currentJobId) throws IOException {
     String arg = String.format("%s=%s", mapReduceJobTagsProp, currentJobId);
     for(int i = 0; i < jarArgsList.size(); i++) {
@@ -401,8 +408,12 @@ public class LaunchMapper extends Mapper<NullWritable, 
NullWritable, Text, Text>
     Process proc = startJob(conf,
             context.getJobID().toString(),
             conf.get("user.name"),
-            conf.get(OVERRIDE_CLASSPATH));
+            conf.get(OVERRIDE_CLASSPATH),
+            jobType);
 
+    JobState state = new JobState(context.getJobID().toString(), conf);
+    state.setJobType(jobType.toString());
+    state.close();
     ExecutorService pool = Executors.newCachedThreadPool();
     executeWatcher(pool, conf, context.getJobID(),
             proc.getInputStream(), statusdir, STDOUT_FNAME);
diff --git 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java
 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java
index df8c32e..29499a2 100644
--- 
a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java
+++ 
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java
@@ -115,6 +115,7 @@ public class TempletonUtils {
    * groups that don't add information such as "Map 1: -/-"
    */
   public static final Pattern HIVE_TEZ_COMPLETE = 
Pattern.compile("(Map|Reducer) (\\d+:) (\\d+(\\(\\+\\d+\\))?/\\d+)");
+  public static final Pattern HIVE_BEELINE_COMPLETE = 
Pattern.compile("VERTICES: .* (\\d+%)");
   /**
    * Pig on Tez produces progress report that looks like this
    * DAG Status: status=RUNNING, progress=TotalTasks: 3 Succeeded: 0 Running: 
0 Failed: 0 Killed: 0
@@ -139,6 +140,11 @@ public class TempletonUtils {
     if (pig.find())
       return pig.group().trim();
 
+    Matcher beeline = HIVE_BEELINE_COMPLETE.matcher(line);
+    if (beeline.find()) {
+      return beeline.group(1).trim() + " complete";
+    }
+
     Matcher hive = HIVE_COMPLETE.matcher(line);
     if(hive.find()) {
       return "map " + hive.group(1) + " reduce " + hive.group(2);
diff --git 
a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java
 
b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java
index 51ed867..cf48221 100644
--- 
a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java
+++ 
b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java
@@ -90,6 +90,9 @@ public class TestTempletonUtils {
 
     String fifty = "2011-12-15 18:12:36,333 [main] INFO  
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher 
- 50% complete";
     Assert.assertEquals("50% complete", 
TempletonUtils.extractPercentComplete(fifty));
+
+    String beeline = "VERTICES: 01/02  [==========================>>] 70%  
ELAPSED TIME: 3.79 s";
+    Assert.assertEquals("70% complete", 
TempletonUtils.extractPercentComplete(beeline));
   }
 
   @Test

Reply via email to