This is an automated email from the ASF dual-hosted git repository.

sankarh pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new b1503b5123f HIVE-27784: Backport of HIVE-20364, HIVE-20549 to branch-3 
(#4789)
b1503b5123f is described below

commit b1503b5123fde96cf7a7583e41a70083c704b3cd
Author: Aman Raj <104416558+amanraj2...@users.noreply.github.com>
AuthorDate: Mon Oct 16 13:31:43 2023 +0530

    HIVE-27784: Backport of HIVE-20364, HIVE-20549 to branch-3 (#4789)
    
    * HIVE-20364: Update default for hive.map.aggr.hash.min.reduction
    * HIVE-20549: Allow user set query tag, and kill query with tag (Daniel 
Dai, reviewed by Thejas Nair, Sergey Shelukhin)
    * Removed explainanalyze_2.q test to fix in HIVE-27795
    
    ---------
    
    Co-authored-by: Ashutosh Chauhan <hashut...@apache.org>
    Co-authored-by: Mahesh Kumar Behera <mbeh...@hortonworks.com>
    Co-authored-by: Daniel Dai <dai...@gmail.com>
    
    Signed-off-by: Sankar Hariappan <sank...@apache.org>
    Closes (#4789)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   7 +-
 .../hive/jdbc/TestJdbcWithMiniLlapArrow.java       | 153 +++++++++++++++------
 .../test/resources/testconfiguration.properties    |   5 +-
 .../java/org/apache/hive/jdbc/HiveStatement.java   |   6 +-
 ql/src/java/org/apache/hadoop/hive/ql/Driver.java  |   7 +-
 .../java/org/apache/hadoop/hive/ql/QueryState.java |  23 +++-
 .../hive/ql/exec/tez/KillTriggerActionHandler.java |   5 +
 .../hadoop/hive/ql/exec/tez/WorkloadManager.java   |   3 +
 .../hive/ql/parse/ReplicationSemanticAnalyzer.java |   2 +-
 .../clientnegative/authorization_kill_query.q      |  15 --
 .../service/cli/operation/OperationManager.java    |  29 ++--
 .../apache/hive/service/server/KillQueryImpl.java  | 112 +++++++++++----
 12 files changed, 257 insertions(+), 110 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index bf20a78b588..6bd226c442f 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1519,6 +1519,10 @@ public class HiveConf extends Configuration {
     HIVEQUERYID("hive.query.id", "",
         "ID for query being executed (might be multiple per a session)"),
 
+      HIVEQUERYTAG("hive.query.tag", null, "Tag for the queries in the 
session. User can kill the queries with the tag " +
+        "in another session. Currently there is no tag duplication check, user 
need to make sure his tag is unique. " +
+        "Also 'kill query' needs to be issued to all HiveServer2 instances to 
proper kill the queries"),
+
     HIVEJOBNAMELENGTH("hive.jobname.length", 50, "max jobname length"),
 
     // hive jar
@@ -1688,7 +1692,7 @@ public class HiveConf extends Configuration {
         "How many rows with the same key value should be cached in memory per 
smb joined table."),
     HIVEGROUPBYMAPINTERVAL("hive.groupby.mapaggr.checkinterval", 100000,
         "Number of rows after which size of the grouping keys/aggregation 
classes is performed"),
-    HIVEMAPAGGRHASHMEMORY("hive.map.aggr.hash.percentmemory", (float) 0.5,
+    HIVEMAPAGGRHASHMEMORY("hive.map.aggr.hash.percentmemory", (float) 0.99,
         "Portion of total memory to be used by map-side group aggregation hash 
table"),
     
HIVEMAPJOINFOLLOWEDBYMAPAGGRHASHMEMORY("hive.mapjoin.followby.map.aggr.hash.percentmemory",
 (float) 0.3,
         "Portion of total memory to be used by map-side group aggregation hash 
table, when this group by is followed by map join"),
@@ -5451,6 +5455,7 @@ public class HiveConf extends Configuration {
     ConfVars.SHOW_JOB_FAIL_DEBUG_INFO.varname,
     ConfVars.TASKLOG_DEBUG_TIMEOUT.varname,
     ConfVars.HIVEQUERYID.varname,
+    ConfVars.HIVEQUERYTAG.varname,
   };
 
   /**
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
index 3dcc4928b1a..dcb8701e696 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
@@ -43,9 +43,12 @@ import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat;
 import org.apache.hive.jdbc.miniHS2.MiniHS2;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * TestJdbcWithMiniLlap for Arrow format
@@ -57,6 +60,7 @@ public class TestJdbcWithMiniLlapArrow extends 
BaseJdbcWithMiniLlap {
   private static final String tableName = "testJdbcMinihs2Tbl";
   private static String dataFileDir;
   private static final String testDbName = "testJdbcMinihs2";
+  private static final String tag = "mytag";
 
   private static class ExceptionHolder {
     Throwable throwable;
@@ -66,6 +70,12 @@ public class TestJdbcWithMiniLlapArrow extends 
BaseJdbcWithMiniLlap {
   public static void beforeTest() throws Exception {
     HiveConf conf = defaultConf();
     conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true);
+    conf.setVar(ConfVars.HIVE_AUTHENTICATOR_MANAGER, 
"org.apache.hadoop.hive.ql.security" +
+            ".SessionStateUserAuthenticator");
+    conf.setVar(ConfVars.USERS_IN_ADMIN_ROLE, System.getProperty("user.name"));
+    conf.setBoolVar(ConfVars.HIVE_AUTHORIZATION_ENABLED, true);
+    
conf.setVar(ConfVars.HIVE_AUTHORIZATION_SQL_STD_AUTH_CONFIG_WHITELIST_APPEND, 
ConfVars.HIVE_SUPPORT_CONCURRENCY
+            .varname + "|" + ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname);
     MiniHS2.cleanupLocalDir();
     miniHS2 = BaseJdbcWithMiniLlap.beforeTest(conf);
     dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", 
"");
@@ -73,8 +83,19 @@ public class TestJdbcWithMiniLlapArrow extends 
BaseJdbcWithMiniLlap {
     Connection conDefault = 
BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(),
             System.getProperty("user.name"), "bar");
     Statement stmt = conDefault.createStatement();
+    String tblName = testDbName + "." + tableName;
+    Path dataFilePath = new Path(dataFileDir, "kv1.txt");
+    String udfName = SleepMsUDF.class.getName();
     stmt.execute("drop database if exists " + testDbName + " cascade");
     stmt.execute("create database " + testDbName);
+    stmt.execute("set role admin");
+    stmt.execute("dfs -put " + dataFilePath.toString() + " " + "kv1.txt");
+    stmt.execute("use " + testDbName);
+    stmt.execute("create table " + tblName + " (int_col int, value string) ");
+    stmt.execute("load data inpath 'kv1.txt' into table " + tblName);
+    stmt.execute("create function sleepMsUDF as '" + udfName + "'");
+    stmt.execute("grant select on table " + tblName + " to role public");
+
     stmt.close();
     conDefault.close();
   }
@@ -291,29 +312,16 @@ public class TestJdbcWithMiniLlapArrow extends 
BaseJdbcWithMiniLlap {
    * that runs for a sufficiently long time.
    * @throws Exception
    */
-  @Test
-  public void testKillQuery() throws Exception {
-    Connection con = 
BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName),
-            System.getProperty("user.name"), "bar");
+  private void testKillQueryInternal(String user, String killUser, boolean 
useTag, final
+      ExceptionHolder stmtHolder, final ExceptionHolder tKillHolder) throws 
Exception {
+    Connection con1 = 
BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName),
+            user, "bar");
     Connection con2 = 
BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName),
-            System.getProperty("user.name"), "bar");
+            killUser, "bar");
 
-    String udfName = SleepMsUDF.class.getName();
-    Statement stmt1 = con.createStatement();
     final Statement stmt2 = con2.createStatement();
-    Path dataFilePath = new Path(dataFileDir, "kv1.txt");
-
-    String tblName = testDbName + "." + tableName;
-
-    stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'");
-    stmt1.execute("create table " + tblName + " (int_col int, value string) ");
-    stmt1.execute("load data local inpath '" + dataFilePath.toString() + "' 
into table " + tblName);
-
-
-    stmt1.close();
-    final Statement stmt = con.createStatement();
-    final ExceptionHolder tExecuteHolder = new ExceptionHolder();
-    final ExceptionHolder tKillHolder = new ExceptionHolder();
+    final HiveStatement stmt1 = (HiveStatement)con1.createStatement();
+    final StringBuffer stmtQueryId = new StringBuffer();
 
     // Thread executing the query
     Thread tExecute = new Thread(new Runnable() {
@@ -321,43 +329,104 @@ public class TestJdbcWithMiniLlapArrow extends 
BaseJdbcWithMiniLlap {
       public void run() {
         try {
           System.out.println("Executing query: ");
-          stmt.execute("set hive.llap.execution.mode = none");
+          stmt1.execute("set hive.llap.execution.mode = none");
 
+          if (useTag) {
+            stmt1.execute("set hive.query.tag = " + tag);
+          }
           // The test table has 500 rows, so total query time should be ~ 
500*500ms
-          stmt.executeQuery("select sleepMsUDF(t1.int_col, 100), t1.int_col, 
t2.int_col " +
+          stmt1.executeAsync("select sleepMsUDF(t1.int_col, 100), t1.int_col, 
t2.int_col " +
                   "from " + tableName + " t1 join " + tableName + " t2 on 
t1.int_col = t2.int_col");
+          stmtQueryId.append(stmt1.getQueryId());
+          stmt1.getUpdateCount();
         } catch (SQLException e) {
-          tExecuteHolder.throwable = e;
+          stmtHolder.throwable = e;
         }
       }
     });
-    // Thread killing the query
-    Thread tKill = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          Thread.sleep(5000);
-          String queryId = ((HiveStatement) stmt).getQueryId();
-          System.out.println("Killing query: " + queryId);
-          stmt2.execute("kill query '" + queryId + "'");
-          stmt2.close();
-        } catch (Exception e) {
-          tKillHolder.throwable = e;
+
+    tExecute.start();
+
+    // wait for other thread to create the stmt handle
+    int count = 0;
+    while (count < 15) {
+      try {
+        tKillHolder.throwable = null;
+        Thread.sleep(2000);
+        String queryId;
+        if (useTag) {
+          queryId = tag;
+        } else {
+          if (stmtQueryId.length() != 0) {
+            queryId = stmtQueryId.toString();
+          } else {
+            count++;
+            continue;
+          }
         }
+        System.out.println("Killing query: " + queryId);
+        if (killUser.equals(System.getProperty("user.name"))) {
+          stmt2.execute("set role admin");
+        }
+        stmt2.execute("kill query '" + queryId + "'");
+        stmt2.close();
+        break;
+      } catch (SQLException e) {
+        count++;
+        tKillHolder.throwable = e;
       }
-    });
+    }
 
-    tExecute.start();
-    tKill.start();
     tExecute.join();
-    tKill.join();
-    stmt.close();
-    con2.close();
-    con.close();
+    try {
+      stmt1.close();
+      con1.close();
+      con2.close();
+    } catch (Exception e) {
+      // ignore error
+    }
+  }
 
+  @Test
+  @Override
+  public void testKillQuery() throws Exception {
+    testKillQueryById();
+    testKillQueryByTagNegative();
+    testKillQueryByTagAdmin();
+    testKillQueryByTagOwner();
+  }
+
+  public void testKillQueryById() throws Exception {
+    ExceptionHolder tExecuteHolder = new ExceptionHolder();
+    ExceptionHolder tKillHolder = new ExceptionHolder();
+    testKillQueryInternal(System.getProperty("user.name"), 
System.getProperty("user.name"), false,
+            tExecuteHolder, tKillHolder);
+    assertNotNull("tExecute", tExecuteHolder.throwable);
+    assertNull("tCancel", tKillHolder.throwable);
+  }
+
+  public void testKillQueryByTagNegative() throws Exception {
+    ExceptionHolder tExecuteHolder = new ExceptionHolder();
+    ExceptionHolder tKillHolder = new ExceptionHolder();
+    testKillQueryInternal("user1", "user2", true, tExecuteHolder, tKillHolder);
+    assertNotNull("tCancel", tKillHolder.throwable);
+    assertTrue(tKillHolder.throwable.getMessage(), 
tKillHolder.throwable.getMessage().contains("No privilege"));
+  }
+
+  public void testKillQueryByTagAdmin() throws Exception {
+    ExceptionHolder tExecuteHolder = new ExceptionHolder();
+    ExceptionHolder tKillHolder = new ExceptionHolder();
+    testKillQueryInternal("user1", System.getProperty("user.name"), true, 
tExecuteHolder, tKillHolder);
     assertNotNull("tExecute", tExecuteHolder.throwable);
     assertNull("tCancel", tKillHolder.throwable);
   }
 
+  public void testKillQueryByTagOwner() throws Exception {
+    ExceptionHolder tExecuteHolder = new ExceptionHolder();
+    ExceptionHolder tKillHolder = new ExceptionHolder();
+    testKillQueryInternal("user1", "user1", true, tExecuteHolder, tKillHolder);
+    assertNotNull("tExecute", tExecuteHolder.throwable);
+    assertNull("tCancel", tKillHolder.throwable);
+  }
 }
 
diff --git a/itests/src/test/resources/testconfiguration.properties 
b/itests/src/test/resources/testconfiguration.properties
index 144a5a8ad48..16a3e082d99 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -36,7 +36,8 @@ disabled.query.files=ql_rewrite_gbtoidx.q,\
   union_stats.q,\
   sample2.q,\
   sample4.q,\
-  sample6.q
+  sample6.q, \
+  explainanalyze_2.q
 
 
 # NOTE: Add tests to minitez only if it is very
@@ -437,6 +438,7 @@ minillap.query.files=acid_bucket_pruning.q,\
   multi_count_distinct_null.q,\
   cttl.q
 
+# explainanalyze_2.q to be fixed in HIVE-27795
 minillaplocal.query.files=\
   dec_str.q,\
   dp_counter_non_mm.q,\
@@ -527,7 +529,6 @@ minillaplocal.query.files=\
   escape1.q,\
   escape2.q,\
   exchgpartition2lel.q,\
-  explainanalyze_2.q,\
   explainuser_1.q,\
   explainuser_4.q,\
   external_jdbc_auth.q,\
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java 
b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
index 06542cee02e..be5079fd7bb 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
@@ -997,8 +997,12 @@ public class HiveStatement implements java.sql.Statement {
 
   @VisibleForTesting
   public String getQueryId() throws SQLException {
+    TOperationHandle stmtHandleTemp = stmtHandle; // cache it, as it might get 
modified by other thread.
+    if (stmtHandleTemp == null) {
+      throw new SQLException("stmtHandle is null");
+    }
     try {
-      return client.GetQueryId(new TGetQueryIdReq(stmtHandle)).getQueryId();
+      return client.GetQueryId(new 
TGetQueryIdReq(stmtHandleTemp)).getQueryId();
     } catch (TException e) {
       throw new SQLException(e);
     }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java 
b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 270ab6e9030..ae38aa4ef22 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -701,7 +701,12 @@ public class Driver implements IDriver {
 
         try {
           perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DO_AUTHORIZATION);
-          doAuthorization(queryState.getHiveOperation(), sem, command);
+          // Authorization check for kill query will be in KillQueryImpl
+          // As both admin or operation owner can perform the operation.
+          // Which is not directly supported in authorizer
+          if (queryState.getHiveOperation() != HiveOperation.KILL_QUERY) {
+            doAuthorization(queryState.getHiveOperation(), sem, command);
+          }
         } catch (AuthorizationException authExp) {
           console.printError("Authorization failed:" + authExp.getMessage()
               + ". Use SHOW GRANT to get more details.");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java 
b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
index b6f069966e6..a06dd485cda 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
@@ -23,7 +23,9 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
 import org.apache.hadoop.hive.ql.session.LineageState;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.tez.dag.api.TezConfiguration;
 
 /**
  * The class to store query level info such as queryId. Multiple queries can 
run
@@ -54,6 +56,11 @@ public class QueryState {
   // id cannot be queried for some reason like hive server restart.
   private String queryTag = null;
 
+  static public final String USERID_TAG = "userid";
+  /**
+   * Holds the number of rows affected for insert queries.
+   */
+  private long numModifiedRows = 0;
   /**
    * Private constructor, use QueryState.Builder instead.
    * @param conf The query specific configuration object
@@ -107,21 +114,25 @@ public class QueryState {
   }
 
   public String getQueryTag() {
-    return queryTag;
+    return HiveConf.getVar(this.queryConf, HiveConf.ConfVars.HIVEQUERYTAG);
   }
 
   public void setQueryTag(String queryTag) {
-    this.queryTag = queryTag;
+    HiveConf.setVar(this.queryConf, HiveConf.ConfVars.HIVEQUERYTAG, queryTag);
   }
 
-  public static void setMapReduceJobTag(HiveConf queryConf, String queryTag) {
-    String jobTag = queryConf.get(MRJobConfig.JOB_TAGS);
-    if (jobTag == null) {
+  public static void setApplicationTag(HiveConf queryConf, String queryTag) {
+    String jobTag = HiveConf.getVar(queryConf, HiveConf.ConfVars.HIVEQUERYTAG);
+    if (jobTag == null || jobTag.isEmpty()) {
       jobTag = queryTag;
     } else {
       jobTag = jobTag.concat("," + queryTag);
     }
+    if (SessionState.get() != null) {
+      jobTag = jobTag.concat("," + USERID_TAG + "=" + 
SessionState.get().getUserName());
+    }
     queryConf.set(MRJobConfig.JOB_TAGS, jobTag);
+    queryConf.set(TezConfiguration.TEZ_APPLICATION_TAGS, jobTag);
   }
 
   /**
@@ -233,7 +244,7 @@ public class QueryState {
       if (generateNewQueryId) {
         String queryId = QueryPlan.makeQueryId();
         queryConf.setVar(HiveConf.ConfVars.HIVEQUERYID, queryId);
-        setMapReduceJobTag(queryConf, queryId);
+        setApplicationTag(queryConf, queryId);
 
         // FIXME: druid storage handler relies on query.id to maintain some 
staging directories
         // expose queryid to session level
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
index f357775c866..ee539ba1763 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
@@ -18,8 +18,10 @@ package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.util.Map;
 
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.session.KillQuery;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.wm.Trigger;
 import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
 import org.slf4j.Logger;
@@ -39,6 +41,9 @@ public class KillTriggerActionHandler implements 
TriggerActionHandler<TezSession
           TezSessionState sessionState = entry.getKey();
           String queryId = sessionState.getWmContext().getQueryId();
           try {
+            SessionState ss = new SessionState(new HiveConf());
+            ss.setIsHiveServerQuery(true);
+            SessionState.start(ss);
             KillQuery killQuery = sessionState.getKillQuery();
             // if kill query is null then session might have been released to 
pool or closed already
             if (killQuery != null) {
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 9029285835c..2478ab9fc67 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -469,6 +469,9 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
       final String reason = killCtx.reason;
       LOG.info("Killing query for {}", toKill);
       workPool.submit(() -> {
+        SessionState ss = new SessionState(new HiveConf());
+        ss.setIsHiveServerQuery(true);
+        SessionState.start(ss);
         // Note: we get query ID here, rather than in the caller, where it 
would be more correct
         //       because we know which exact query we intend to kill. This is 
valid because we
         //       are not expecting query ID to change - we never reuse the 
session for which a
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 42e0339d422..f83146125f3 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -359,7 +359,7 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
         if (key.equalsIgnoreCase(HIVEQUERYID.varname)) {
           String queryTag = config.getValue();
           if (!StringUtils.isEmpty(queryTag)) {
-            QueryState.setMapReduceJobTag(conf, queryTag);
+            QueryState.setApplicationTag(conf, queryTag);
           }
           queryState.setQueryTag(queryTag);
         } else {
diff --git a/ql/src/test/queries/clientnegative/authorization_kill_query.q 
b/ql/src/test/queries/clientnegative/authorization_kill_query.q
deleted file mode 100644
index 5379f877644..00000000000
--- a/ql/src/test/queries/clientnegative/authorization_kill_query.q
+++ /dev/null
@@ -1,15 +0,0 @@
-set hive.security.authorization.enabled=true;
-set hive.test.authz.sstd.hs2.mode=true;
-set 
hive.security.authorization.manager=org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactoryForTest;
-set 
hive.security.authenticator.manager=org.apache.hadoop.hive.ql.security.SessionStateConfigUserAuthenticator;
-
-set user.name=hive_admin_user;
-set role ADMIN;
-explain authorization kill query 'dummyqueryid';
-kill query 'dummyqueryid';
-
-set user.name=ruser1;
-
--- kill query as non-admin should fail
-explain authorization kill query 'dummyqueryid';
-kill query 'dummyqueryid';
diff --git 
a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java 
b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
index 2a776057b72..0f6864a8b66 100644
--- 
a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
+++ 
b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
@@ -22,12 +22,18 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.google.common.collect.Multimap;
+import com.google.common.collect.MultimapBuilder;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.SetMultimap;
 import org.apache.hadoop.hive.common.metrics.common.Metrics;
 import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
 import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
@@ -62,7 +68,8 @@ public class OperationManager extends AbstractService {
       new ConcurrentHashMap<OperationHandle, Operation>();
   private final ConcurrentHashMap<String, Operation> queryIdOperation =
       new ConcurrentHashMap<String, Operation>();
-  private final ConcurrentHashMap<String, String> queryTagToIdMap = new 
ConcurrentHashMap<>();
+  private final SetMultimap<String, String> queryTagToIdMap =
+          
Multimaps.synchronizedSetMultimap(MultimapBuilder.hashKeys().hashSetValues().build());
 
   //Following fields for displaying queries on WebUI
   private Object webuiLock = new Object();
@@ -205,12 +212,7 @@ public class OperationManager extends AbstractService {
   public void updateQueryTag(String queryId, String queryTag) {
     Operation operation = queryIdOperation.get(queryId);
     if (operation != null) {
-      String queryIdTemp = queryTagToIdMap.get(queryTag);
-      if (queryIdTemp != null) {
-        throw new RuntimeException("tag " + queryTag + " is already applied 
for query " + queryIdTemp);
-      }
       queryTagToIdMap.put(queryTag, queryId);
-      LOG.info("Query " + queryId + " is updated with tag " + queryTag);
       return;
     }
     LOG.info("Query id is missing during query tag updation");
@@ -225,7 +227,7 @@ public class OperationManager extends AbstractService {
     queryIdOperation.remove(queryId);
     String queryTag = operation.getQueryTag();
     if (queryTag != null) {
-      queryTagToIdMap.remove(queryTag);
+      queryTagToIdMap.remove(queryTag, queryId);
     }
     LOG.info("Removed queryId: {} corresponding to operation: {} with tag: 
{}", queryId, opHandle, queryTag);
     if (operation instanceof SQLOperation) {
@@ -442,11 +444,14 @@ public class OperationManager extends AbstractService {
     return queryIdOperation.get(queryId);
   }
 
-  public Operation getOperationByQueryTag(String queryTag) {
-    String queryId = queryTagToIdMap.get(queryTag);
-    if (queryId != null) {
-      return getOperationByQueryId(queryId);
+  public Set<Operation> getOperationsByQueryTag(String queryTag) {
+    Set<String> queryIds = queryTagToIdMap.get(queryTag);
+    Set<Operation> result = new HashSet<Operation>();
+    for (String queryId : queryIds) {
+      if (queryId != null && getOperationByQueryId(queryId) != null) {
+        result.add(getOperationByQueryId(queryId));
+      }
     }
-    return null;
+    return result;
   }
 }
diff --git a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java 
b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
index 490a04da675..c7f2c9117b9 100644
--- a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
+++ b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
@@ -21,8 +21,13 @@ package org.apache.hive.service.server;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext;
+import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType;
+import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject;
 import org.apache.hadoop.hive.ql.session.KillQuery;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
@@ -40,6 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -49,6 +55,7 @@ public class KillQueryImpl implements KillQuery {
   private final static Logger LOG = 
LoggerFactory.getLogger(KillQueryImpl.class);
 
   private final OperationManager operationManager;
+  private enum TagOrId {TAG, ID, UNKNOWN};
 
   public KillQueryImpl(OperationManager operationManager) {
     this.operationManager = operationManager;
@@ -64,7 +71,10 @@ public class KillQueryImpl implements KillQuery {
     GetApplicationsResponse apps = proxy.getApplications(gar);
     List<ApplicationReport> appsList = apps.getApplicationList();
     for(ApplicationReport appReport : appsList) {
-      childYarnJobs.add(appReport.getApplicationId());
+      if (isAdmin() || 
appReport.getApplicationTags().contains(QueryState.USERID_TAG + "=" + 
SessionState.get()
+              .getUserName())) {
+        childYarnJobs.add(appReport.getApplicationId());
+      }
     }
 
     if (childYarnJobs.isEmpty()) {
@@ -81,6 +91,7 @@ public class KillQueryImpl implements KillQuery {
       if (tag == null) {
         return;
       }
+      LOG.info("Killing yarn jobs using query tag:" + tag);
       Set<ApplicationId> childYarnJobs = getChildYarnJobs(conf, tag);
       if (!childYarnJobs.isEmpty()) {
         YarnClient yarnClient = YarnClient.createYarnClient();
@@ -91,44 +102,87 @@ public class KillQueryImpl implements KillQuery {
         }
       }
     } catch (IOException | YarnException ye) {
-      throw new RuntimeException("Exception occurred while killing child 
job(s)", ye);
+      LOG.warn("Exception occurred while killing child job({})", ye);
+    }
+  }
+
+  private static boolean isAdmin() {
+    boolean isAdmin = false;
+    if (SessionState.get().getAuthorizerV2() != null) {
+      try {
+        
SessionState.get().getAuthorizerV2().checkPrivileges(HiveOperationType.KILL_QUERY,
+                new ArrayList<HivePrivilegeObject>(), new 
ArrayList<HivePrivilegeObject>(),
+                new HiveAuthzContext.Builder().build());
+        isAdmin = true;
+      } catch (Exception e) {
+      }
+    }
+    return isAdmin;
+  }
+
+  private boolean cancelOperation(Operation operation, boolean isAdmin, String 
errMsg) throws
+          HiveSQLException {
+    if (isAdmin || 
operation.getParentSession().getUserName().equals(SessionState.get()
+            .getAuthenticator().getUserName())) {
+      OperationHandle handle = operation.getHandle();
+      operationManager.cancelOperation(handle, errMsg);
+      return true;
+    } else {
+      return false;
     }
   }
 
   @Override
-  public void killQuery(String queryId, String errMsg, HiveConf conf) throws 
HiveException {
+  public void killQuery(String queryIdOrTag, String errMsg, HiveConf conf) 
throws HiveException {
     try {
-      String queryTag = null;
-
-      Operation operation = operationManager.getOperationByQueryId(queryId);
-      if (operation == null) {
-        // Check if user has passed the query tag to kill the operation. This 
is possible if the application
-        // restarts and it does not have the proper query id. The tag can be 
used in that case to kill the query.
-        operation = operationManager.getOperationByQueryTag(queryId);
-        if (operation == null) {
-          LOG.info("Query not found: " + queryId);
-        }
+      TagOrId tagOrId = TagOrId.UNKNOWN;
+      Set<Operation> operationsToKill = new HashSet<Operation>();
+      if (operationManager.getOperationByQueryId(queryIdOrTag) != null) {
+        
operationsToKill.add(operationManager.getOperationByQueryId(queryIdOrTag));
+        tagOrId = TagOrId.ID;
       } else {
-        // This is the normal flow, where the query is tagged and user wants 
to kill the query using the query id.
-        queryTag = operation.getQueryTag();
+        
operationsToKill.addAll(operationManager.getOperationsByQueryTag(queryIdOrTag));
+        if (!operationsToKill.isEmpty()) {
+          tagOrId = TagOrId.TAG;
+        }
       }
-
-      if (queryTag == null) {
-        //use query id as tag if user wanted to kill only the yarn jobs after 
hive server restart. The yarn jobs are
-        //tagged with query id by default. This will cover the case where the 
application after restarts wants to kill
-        //the yarn jobs with query tag. The query tag can be passed as query 
id.
-        queryTag = queryId;
+      if (operationsToKill.isEmpty()) {
+        LOG.info("Query not found: " + queryIdOrTag);
       }
-
-      LOG.info("Killing yarn jobs for query id : " + queryId + " using tag :" 
+ queryTag);
-      killChildYarnJobs(conf, queryTag);
-
-      if (operation != null) {
-        OperationHandle handle = operation.getHandle();
-        operationManager.cancelOperation(handle, errMsg);
+      boolean admin = isAdmin();
+      switch(tagOrId) {
+        case ID:
+          Operation operation = operationsToKill.iterator().next();
+          boolean canceled = cancelOperation(operation, admin, errMsg);
+          if (canceled) {
+            String queryTag = operation.getQueryTag();
+            if (queryTag == null) {
+              queryTag = queryIdOrTag;
+            }
+            killChildYarnJobs(conf, queryTag);
+          } else {
+            // no privilege to cancel
+            throw new HiveSQLException("No privilege");
+          }
+          break;
+        case TAG:
+          int numCanceled = 0;
+          for (Operation operationToKill : operationsToKill) {
+            if (cancelOperation(operationToKill, admin, errMsg)) {
+              numCanceled++;
+            }
+          }
+          killChildYarnJobs(conf, queryIdOrTag);
+          if (numCanceled == 0) {
+            throw new HiveSQLException("No privilege");
+          }
+          break;
+        case UNKNOWN:
+          killChildYarnJobs(conf, queryIdOrTag);
+          break;
       }
     } catch (HiveSQLException e) {
-      LOG.error("Kill query failed for query " + queryId, e);
+      LOG.error("Kill query failed for query " + queryIdOrTag, e);
       throw new HiveException(e.getMessage(), e);
     }
   }

Reply via email to