This is an automated email from the ASF dual-hosted git repository. sankarh pushed a commit to branch branch-3 in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/branch-3 by this push: new b1503b5123f HIVE-27784: Backport of HIVE-20364, HIVE-20549 to branch-3 (#4789) b1503b5123f is described below commit b1503b5123fde96cf7a7583e41a70083c704b3cd Author: Aman Raj <104416558+amanraj2...@users.noreply.github.com> AuthorDate: Mon Oct 16 13:31:43 2023 +0530 HIVE-27784: Backport of HIVE-20364, HIVE-20549 to branch-3 (#4789) * HIVE-20364: Update default for hive.map.aggr.hash.min.reduction * HIVE-20549: Allow user set query tag, and kill query with tag (Daniel Dai, reviewed by Thejas Nair, Sergey Shelukhin) * Removed explainanalyze_2.q test to fix in HIVE-27795 --------- Co-authored-by: Ashutosh Chauhan <hashut...@apache.org> Co-authored-by: Mahesh Kumar Behera <mbeh...@hortonworks.com> Co-authored-by: Daniel Dai <dai...@gmail.com> Signed-off-by: Sankar Hariappan <sank...@apache.org> Closes (#4789) --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 7 +- .../hive/jdbc/TestJdbcWithMiniLlapArrow.java | 153 +++++++++++++++------ .../test/resources/testconfiguration.properties | 5 +- .../java/org/apache/hive/jdbc/HiveStatement.java | 6 +- ql/src/java/org/apache/hadoop/hive/ql/Driver.java | 7 +- .../java/org/apache/hadoop/hive/ql/QueryState.java | 23 +++- .../hive/ql/exec/tez/KillTriggerActionHandler.java | 5 + .../hadoop/hive/ql/exec/tez/WorkloadManager.java | 3 + .../hive/ql/parse/ReplicationSemanticAnalyzer.java | 2 +- .../clientnegative/authorization_kill_query.q | 15 -- .../service/cli/operation/OperationManager.java | 29 ++-- .../apache/hive/service/server/KillQueryImpl.java | 112 +++++++++++---- 12 files changed, 257 insertions(+), 110 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index bf20a78b588..6bd226c442f 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1519,6 +1519,10 @@ public class HiveConf extends Configuration { HIVEQUERYID("hive.query.id", "", "ID for query being executed (might be multiple per a session)"), + HIVEQUERYTAG("hive.query.tag", null, "Tag for the queries in the session. User can kill the queries with the tag " + + "in another session. Currently there is no tag duplication check, user need to make sure his tag is unique. " + + "Also 'kill query' needs to be issued to all HiveServer2 instances to proper kill the queries"), + HIVEJOBNAMELENGTH("hive.jobname.length", 50, "max jobname length"), // hive jar @@ -1688,7 +1692,7 @@ public class HiveConf extends Configuration { "How many rows with the same key value should be cached in memory per smb joined table."), HIVEGROUPBYMAPINTERVAL("hive.groupby.mapaggr.checkinterval", 100000, "Number of rows after which size of the grouping keys/aggregation classes is performed"), - HIVEMAPAGGRHASHMEMORY("hive.map.aggr.hash.percentmemory", (float) 0.5, + HIVEMAPAGGRHASHMEMORY("hive.map.aggr.hash.percentmemory", (float) 0.99, "Portion of total memory to be used by map-side group aggregation hash table"), HIVEMAPJOINFOLLOWEDBYMAPAGGRHASHMEMORY("hive.mapjoin.followby.map.aggr.hash.percentmemory", (float) 0.3, "Portion of total memory to be used by map-side group aggregation hash table, when this group by is followed by map join"), @@ -5451,6 +5455,7 @@ public class HiveConf extends Configuration { ConfVars.SHOW_JOB_FAIL_DEBUG_INFO.varname, ConfVars.TASKLOG_DEBUG_TIMEOUT.varname, ConfVars.HIVEQUERYID.varname, + ConfVars.HIVEQUERYTAG.varname, }; /** diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java index 3dcc4928b1a..dcb8701e696 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java @@ -43,9 +43,12 @@ import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat; import org.apache.hive.jdbc.miniHS2.MiniHS2; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * TestJdbcWithMiniLlap for Arrow format @@ -57,6 +60,7 @@ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap { private static final String tableName = "testJdbcMinihs2Tbl"; private static String dataFileDir; private static final String testDbName = "testJdbcMinihs2"; + private static final String tag = "mytag"; private static class ExceptionHolder { Throwable throwable; @@ -66,6 +70,12 @@ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap { public static void beforeTest() throws Exception { HiveConf conf = defaultConf(); conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true); + conf.setVar(ConfVars.HIVE_AUTHENTICATOR_MANAGER, "org.apache.hadoop.hive.ql.security" + + ".SessionStateUserAuthenticator"); + conf.setVar(ConfVars.USERS_IN_ADMIN_ROLE, System.getProperty("user.name")); + conf.setBoolVar(ConfVars.HIVE_AUTHORIZATION_ENABLED, true); + conf.setVar(ConfVars.HIVE_AUTHORIZATION_SQL_STD_AUTH_CONFIG_WHITELIST_APPEND, ConfVars.HIVE_SUPPORT_CONCURRENCY + .varname + "|" + ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname); MiniHS2.cleanupLocalDir(); miniHS2 = BaseJdbcWithMiniLlap.beforeTest(conf); dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); @@ -73,8 +83,19 @@ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap { Connection conDefault = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); Statement stmt = conDefault.createStatement(); + String tblName = testDbName + "." + tableName; + Path dataFilePath = new Path(dataFileDir, "kv1.txt"); + String udfName = SleepMsUDF.class.getName(); stmt.execute("drop database if exists " + testDbName + " cascade"); stmt.execute("create database " + testDbName); + stmt.execute("set role admin"); + stmt.execute("dfs -put " + dataFilePath.toString() + " " + "kv1.txt"); + stmt.execute("use " + testDbName); + stmt.execute("create table " + tblName + " (int_col int, value string) "); + stmt.execute("load data inpath 'kv1.txt' into table " + tblName); + stmt.execute("create function sleepMsUDF as '" + udfName + "'"); + stmt.execute("grant select on table " + tblName + " to role public"); + stmt.close(); conDefault.close(); } @@ -291,29 +312,16 @@ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap { * that runs for a sufficiently long time. * @throws Exception */ - @Test - public void testKillQuery() throws Exception { - Connection con = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName), - System.getProperty("user.name"), "bar"); + private void testKillQueryInternal(String user, String killUser, boolean useTag, final + ExceptionHolder stmtHolder, final ExceptionHolder tKillHolder) throws Exception { + Connection con1 = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName), + user, "bar"); Connection con2 = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName), - System.getProperty("user.name"), "bar"); + killUser, "bar"); - String udfName = SleepMsUDF.class.getName(); - Statement stmt1 = con.createStatement(); final Statement stmt2 = con2.createStatement(); - Path dataFilePath = new Path(dataFileDir, "kv1.txt"); - - String tblName = testDbName + "." + tableName; - - stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'"); - stmt1.execute("create table " + tblName + " (int_col int, value string) "); - stmt1.execute("load data local inpath '" + dataFilePath.toString() + "' into table " + tblName); - - - stmt1.close(); - final Statement stmt = con.createStatement(); - final ExceptionHolder tExecuteHolder = new ExceptionHolder(); - final ExceptionHolder tKillHolder = new ExceptionHolder(); + final HiveStatement stmt1 = (HiveStatement)con1.createStatement(); + final StringBuffer stmtQueryId = new StringBuffer(); // Thread executing the query Thread tExecute = new Thread(new Runnable() { @@ -321,43 +329,104 @@ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap { public void run() { try { System.out.println("Executing query: "); - stmt.execute("set hive.llap.execution.mode = none"); + stmt1.execute("set hive.llap.execution.mode = none"); + if (useTag) { + stmt1.execute("set hive.query.tag = " + tag); + } // The test table has 500 rows, so total query time should be ~ 500*500ms - stmt.executeQuery("select sleepMsUDF(t1.int_col, 100), t1.int_col, t2.int_col " + + stmt1.executeAsync("select sleepMsUDF(t1.int_col, 100), t1.int_col, t2.int_col " + "from " + tableName + " t1 join " + tableName + " t2 on t1.int_col = t2.int_col"); + stmtQueryId.append(stmt1.getQueryId()); + stmt1.getUpdateCount(); } catch (SQLException e) { - tExecuteHolder.throwable = e; + stmtHolder.throwable = e; } } }); - // Thread killing the query - Thread tKill = new Thread(new Runnable() { - @Override - public void run() { - try { - Thread.sleep(5000); - String queryId = ((HiveStatement) stmt).getQueryId(); - System.out.println("Killing query: " + queryId); - stmt2.execute("kill query '" + queryId + "'"); - stmt2.close(); - } catch (Exception e) { - tKillHolder.throwable = e; + + tExecute.start(); + + // wait for other thread to create the stmt handle + int count = 0; + while (count < 15) { + try { + tKillHolder.throwable = null; + Thread.sleep(2000); + String queryId; + if (useTag) { + queryId = tag; + } else { + if (stmtQueryId.length() != 0) { + queryId = stmtQueryId.toString(); + } else { + count++; + continue; + } } + System.out.println("Killing query: " + queryId); + if (killUser.equals(System.getProperty("user.name"))) { + stmt2.execute("set role admin"); + } + stmt2.execute("kill query '" + queryId + "'"); + stmt2.close(); + break; + } catch (SQLException e) { + count++; + tKillHolder.throwable = e; } - }); + } - tExecute.start(); - tKill.start(); tExecute.join(); - tKill.join(); - stmt.close(); - con2.close(); - con.close(); + try { + stmt1.close(); + con1.close(); + con2.close(); + } catch (Exception e) { + // ignore error + } + } + @Test + @Override + public void testKillQuery() throws Exception { + testKillQueryById(); + testKillQueryByTagNegative(); + testKillQueryByTagAdmin(); + testKillQueryByTagOwner(); + } + + public void testKillQueryById() throws Exception { + ExceptionHolder tExecuteHolder = new ExceptionHolder(); + ExceptionHolder tKillHolder = new ExceptionHolder(); + testKillQueryInternal(System.getProperty("user.name"), System.getProperty("user.name"), false, + tExecuteHolder, tKillHolder); + assertNotNull("tExecute", tExecuteHolder.throwable); + assertNull("tCancel", tKillHolder.throwable); + } + + public void testKillQueryByTagNegative() throws Exception { + ExceptionHolder tExecuteHolder = new ExceptionHolder(); + ExceptionHolder tKillHolder = new ExceptionHolder(); + testKillQueryInternal("user1", "user2", true, tExecuteHolder, tKillHolder); + assertNotNull("tCancel", tKillHolder.throwable); + assertTrue(tKillHolder.throwable.getMessage(), tKillHolder.throwable.getMessage().contains("No privilege")); + } + + public void testKillQueryByTagAdmin() throws Exception { + ExceptionHolder tExecuteHolder = new ExceptionHolder(); + ExceptionHolder tKillHolder = new ExceptionHolder(); + testKillQueryInternal("user1", System.getProperty("user.name"), true, tExecuteHolder, tKillHolder); assertNotNull("tExecute", tExecuteHolder.throwable); assertNull("tCancel", tKillHolder.throwable); } + public void testKillQueryByTagOwner() throws Exception { + ExceptionHolder tExecuteHolder = new ExceptionHolder(); + ExceptionHolder tKillHolder = new ExceptionHolder(); + testKillQueryInternal("user1", "user1", true, tExecuteHolder, tKillHolder); + assertNotNull("tExecute", tExecuteHolder.throwable); + assertNull("tCancel", tKillHolder.throwable); + } } diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 144a5a8ad48..16a3e082d99 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -36,7 +36,8 @@ disabled.query.files=ql_rewrite_gbtoidx.q,\ union_stats.q,\ sample2.q,\ sample4.q,\ - sample6.q + sample6.q, \ + explainanalyze_2.q # NOTE: Add tests to minitez only if it is very @@ -437,6 +438,7 @@ minillap.query.files=acid_bucket_pruning.q,\ multi_count_distinct_null.q,\ cttl.q +# explainanalyze_2.q to be fixed in HIVE-27795 minillaplocal.query.files=\ dec_str.q,\ dp_counter_non_mm.q,\ @@ -527,7 +529,6 @@ minillaplocal.query.files=\ escape1.q,\ escape2.q,\ exchgpartition2lel.q,\ - explainanalyze_2.q,\ explainuser_1.q,\ explainuser_4.q,\ external_jdbc_auth.q,\ diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index 06542cee02e..be5079fd7bb 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -997,8 +997,12 @@ public class HiveStatement implements java.sql.Statement { @VisibleForTesting public String getQueryId() throws SQLException { + TOperationHandle stmtHandleTemp = stmtHandle; // cache it, as it might get modified by other thread. + if (stmtHandleTemp == null) { + throw new SQLException("stmtHandle is null"); + } try { - return client.GetQueryId(new TGetQueryIdReq(stmtHandle)).getQueryId(); + return client.GetQueryId(new TGetQueryIdReq(stmtHandleTemp)).getQueryId(); } catch (TException e) { throw new SQLException(e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 270ab6e9030..ae38aa4ef22 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -701,7 +701,12 @@ public class Driver implements IDriver { try { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DO_AUTHORIZATION); - doAuthorization(queryState.getHiveOperation(), sem, command); + // Authorization check for kill query will be in KillQueryImpl + // As both admin or operation owner can perform the operation. + // Which is not directly supported in authorizer + if (queryState.getHiveOperation() != HiveOperation.KILL_QUERY) { + doAuthorization(queryState.getHiveOperation(), sem, command); + } } catch (AuthorizationException authExp) { console.printError("Authorization failed:" + authExp.getMessage() + ". Use SHOW GRANT to get more details."); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java index b6f069966e6..a06dd485cda 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java @@ -23,7 +23,9 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.session.LineageState; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.tez.dag.api.TezConfiguration; /** * The class to store query level info such as queryId. Multiple queries can run @@ -54,6 +56,11 @@ public class QueryState { // id cannot be queried for some reason like hive server restart. private String queryTag = null; + static public final String USERID_TAG = "userid"; + /** + * Holds the number of rows affected for insert queries. + */ + private long numModifiedRows = 0; /** * Private constructor, use QueryState.Builder instead. * @param conf The query specific configuration object @@ -107,21 +114,25 @@ public class QueryState { } public String getQueryTag() { - return queryTag; + return HiveConf.getVar(this.queryConf, HiveConf.ConfVars.HIVEQUERYTAG); } public void setQueryTag(String queryTag) { - this.queryTag = queryTag; + HiveConf.setVar(this.queryConf, HiveConf.ConfVars.HIVEQUERYTAG, queryTag); } - public static void setMapReduceJobTag(HiveConf queryConf, String queryTag) { - String jobTag = queryConf.get(MRJobConfig.JOB_TAGS); - if (jobTag == null) { + public static void setApplicationTag(HiveConf queryConf, String queryTag) { + String jobTag = HiveConf.getVar(queryConf, HiveConf.ConfVars.HIVEQUERYTAG); + if (jobTag == null || jobTag.isEmpty()) { jobTag = queryTag; } else { jobTag = jobTag.concat("," + queryTag); } + if (SessionState.get() != null) { + jobTag = jobTag.concat("," + USERID_TAG + "=" + SessionState.get().getUserName()); + } queryConf.set(MRJobConfig.JOB_TAGS, jobTag); + queryConf.set(TezConfiguration.TEZ_APPLICATION_TAGS, jobTag); } /** @@ -233,7 +244,7 @@ public class QueryState { if (generateNewQueryId) { String queryId = QueryPlan.makeQueryId(); queryConf.setVar(HiveConf.ConfVars.HIVEQUERYID, queryId); - setMapReduceJobTag(queryConf, queryId); + setApplicationTag(queryConf, queryId); // FIXME: druid storage handler relies on query.id to maintain some staging directories // expose queryid to session level diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java index f357775c866..ee539ba1763 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.util.Map; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.KillQuery; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.wm.Trigger; import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; import org.slf4j.Logger; @@ -39,6 +41,9 @@ public class KillTriggerActionHandler implements TriggerActionHandler<TezSession TezSessionState sessionState = entry.getKey(); String queryId = sessionState.getWmContext().getQueryId(); try { + SessionState ss = new SessionState(new HiveConf()); + ss.setIsHiveServerQuery(true); + SessionState.start(ss); KillQuery killQuery = sessionState.getKillQuery(); // if kill query is null then session might have been released to pool or closed already if (killQuery != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index 9029285835c..2478ab9fc67 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -469,6 +469,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida final String reason = killCtx.reason; LOG.info("Killing query for {}", toKill); workPool.submit(() -> { + SessionState ss = new SessionState(new HiveConf()); + ss.setIsHiveServerQuery(true); + SessionState.start(ss); // Note: we get query ID here, rather than in the caller, where it would be more correct // because we know which exact query we intend to kill. This is valid because we // are not expecting query ID to change - we never reuse the session for which a diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 42e0339d422..f83146125f3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -359,7 +359,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { if (key.equalsIgnoreCase(HIVEQUERYID.varname)) { String queryTag = config.getValue(); if (!StringUtils.isEmpty(queryTag)) { - QueryState.setMapReduceJobTag(conf, queryTag); + QueryState.setApplicationTag(conf, queryTag); } queryState.setQueryTag(queryTag); } else { diff --git a/ql/src/test/queries/clientnegative/authorization_kill_query.q b/ql/src/test/queries/clientnegative/authorization_kill_query.q deleted file mode 100644 index 5379f877644..00000000000 --- a/ql/src/test/queries/clientnegative/authorization_kill_query.q +++ /dev/null @@ -1,15 +0,0 @@ -set hive.security.authorization.enabled=true; -set hive.test.authz.sstd.hs2.mode=true; -set hive.security.authorization.manager=org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactoryForTest; -set hive.security.authenticator.manager=org.apache.hadoop.hive.ql.security.SessionStateConfigUserAuthenticator; - -set user.name=hive_admin_user; -set role ADMIN; -explain authorization kill query 'dummyqueryid'; -kill query 'dummyqueryid'; - -set user.name=ruser1; - --- kill query as non-admin should fail -explain authorization kill query 'dummyqueryid'; -kill query 'dummyqueryid'; diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index 2a776057b72..0f6864a8b66 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -22,12 +22,18 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import com.google.common.collect.Multimap; +import com.google.common.collect.MultimapBuilder; +import com.google.common.collect.Multimaps; +import com.google.common.collect.SetMultimap; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; @@ -62,7 +68,8 @@ public class OperationManager extends AbstractService { new ConcurrentHashMap<OperationHandle, Operation>(); private final ConcurrentHashMap<String, Operation> queryIdOperation = new ConcurrentHashMap<String, Operation>(); - private final ConcurrentHashMap<String, String> queryTagToIdMap = new ConcurrentHashMap<>(); + private final SetMultimap<String, String> queryTagToIdMap = + Multimaps.synchronizedSetMultimap(MultimapBuilder.hashKeys().hashSetValues().build()); //Following fields for displaying queries on WebUI private Object webuiLock = new Object(); @@ -205,12 +212,7 @@ public class OperationManager extends AbstractService { public void updateQueryTag(String queryId, String queryTag) { Operation operation = queryIdOperation.get(queryId); if (operation != null) { - String queryIdTemp = queryTagToIdMap.get(queryTag); - if (queryIdTemp != null) { - throw new RuntimeException("tag " + queryTag + " is already applied for query " + queryIdTemp); - } queryTagToIdMap.put(queryTag, queryId); - LOG.info("Query " + queryId + " is updated with tag " + queryTag); return; } LOG.info("Query id is missing during query tag updation"); @@ -225,7 +227,7 @@ public class OperationManager extends AbstractService { queryIdOperation.remove(queryId); String queryTag = operation.getQueryTag(); if (queryTag != null) { - queryTagToIdMap.remove(queryTag); + queryTagToIdMap.remove(queryTag, queryId); } LOG.info("Removed queryId: {} corresponding to operation: {} with tag: {}", queryId, opHandle, queryTag); if (operation instanceof SQLOperation) { @@ -442,11 +444,14 @@ public class OperationManager extends AbstractService { return queryIdOperation.get(queryId); } - public Operation getOperationByQueryTag(String queryTag) { - String queryId = queryTagToIdMap.get(queryTag); - if (queryId != null) { - return getOperationByQueryId(queryId); + public Set<Operation> getOperationsByQueryTag(String queryTag) { + Set<String> queryIds = queryTagToIdMap.get(queryTag); + Set<Operation> result = new HashSet<Operation>(); + for (String queryId : queryIds) { + if (queryId != null && getOperationByQueryId(queryId) != null) { + result.add(getOperationByQueryId(queryId)); + } } - return null; + return result; } } diff --git a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java index 490a04da675..c7f2c9117b9 100644 --- a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java +++ b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java @@ -21,8 +21,13 @@ package org.apache.hive.service.server; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject; import org.apache.hadoop.hive.ql.session.KillQuery; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; @@ -40,6 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -49,6 +55,7 @@ public class KillQueryImpl implements KillQuery { private final static Logger LOG = LoggerFactory.getLogger(KillQueryImpl.class); private final OperationManager operationManager; + private enum TagOrId {TAG, ID, UNKNOWN}; public KillQueryImpl(OperationManager operationManager) { this.operationManager = operationManager; @@ -64,7 +71,10 @@ public class KillQueryImpl implements KillQuery { GetApplicationsResponse apps = proxy.getApplications(gar); List<ApplicationReport> appsList = apps.getApplicationList(); for(ApplicationReport appReport : appsList) { - childYarnJobs.add(appReport.getApplicationId()); + if (isAdmin() || appReport.getApplicationTags().contains(QueryState.USERID_TAG + "=" + SessionState.get() + .getUserName())) { + childYarnJobs.add(appReport.getApplicationId()); + } } if (childYarnJobs.isEmpty()) { @@ -81,6 +91,7 @@ public class KillQueryImpl implements KillQuery { if (tag == null) { return; } + LOG.info("Killing yarn jobs using query tag:" + tag); Set<ApplicationId> childYarnJobs = getChildYarnJobs(conf, tag); if (!childYarnJobs.isEmpty()) { YarnClient yarnClient = YarnClient.createYarnClient(); @@ -91,44 +102,87 @@ public class KillQueryImpl implements KillQuery { } } } catch (IOException | YarnException ye) { - throw new RuntimeException("Exception occurred while killing child job(s)", ye); + LOG.warn("Exception occurred while killing child job({})", ye); + } + } + + private static boolean isAdmin() { + boolean isAdmin = false; + if (SessionState.get().getAuthorizerV2() != null) { + try { + SessionState.get().getAuthorizerV2().checkPrivileges(HiveOperationType.KILL_QUERY, + new ArrayList<HivePrivilegeObject>(), new ArrayList<HivePrivilegeObject>(), + new HiveAuthzContext.Builder().build()); + isAdmin = true; + } catch (Exception e) { + } + } + return isAdmin; + } + + private boolean cancelOperation(Operation operation, boolean isAdmin, String errMsg) throws + HiveSQLException { + if (isAdmin || operation.getParentSession().getUserName().equals(SessionState.get() + .getAuthenticator().getUserName())) { + OperationHandle handle = operation.getHandle(); + operationManager.cancelOperation(handle, errMsg); + return true; + } else { + return false; } } @Override - public void killQuery(String queryId, String errMsg, HiveConf conf) throws HiveException { + public void killQuery(String queryIdOrTag, String errMsg, HiveConf conf) throws HiveException { try { - String queryTag = null; - - Operation operation = operationManager.getOperationByQueryId(queryId); - if (operation == null) { - // Check if user has passed the query tag to kill the operation. This is possible if the application - // restarts and it does not have the proper query id. The tag can be used in that case to kill the query. - operation = operationManager.getOperationByQueryTag(queryId); - if (operation == null) { - LOG.info("Query not found: " + queryId); - } + TagOrId tagOrId = TagOrId.UNKNOWN; + Set<Operation> operationsToKill = new HashSet<Operation>(); + if (operationManager.getOperationByQueryId(queryIdOrTag) != null) { + operationsToKill.add(operationManager.getOperationByQueryId(queryIdOrTag)); + tagOrId = TagOrId.ID; } else { - // This is the normal flow, where the query is tagged and user wants to kill the query using the query id. - queryTag = operation.getQueryTag(); + operationsToKill.addAll(operationManager.getOperationsByQueryTag(queryIdOrTag)); + if (!operationsToKill.isEmpty()) { + tagOrId = TagOrId.TAG; + } } - - if (queryTag == null) { - //use query id as tag if user wanted to kill only the yarn jobs after hive server restart. The yarn jobs are - //tagged with query id by default. This will cover the case where the application after restarts wants to kill - //the yarn jobs with query tag. The query tag can be passed as query id. - queryTag = queryId; + if (operationsToKill.isEmpty()) { + LOG.info("Query not found: " + queryIdOrTag); } - - LOG.info("Killing yarn jobs for query id : " + queryId + " using tag :" + queryTag); - killChildYarnJobs(conf, queryTag); - - if (operation != null) { - OperationHandle handle = operation.getHandle(); - operationManager.cancelOperation(handle, errMsg); + boolean admin = isAdmin(); + switch(tagOrId) { + case ID: + Operation operation = operationsToKill.iterator().next(); + boolean canceled = cancelOperation(operation, admin, errMsg); + if (canceled) { + String queryTag = operation.getQueryTag(); + if (queryTag == null) { + queryTag = queryIdOrTag; + } + killChildYarnJobs(conf, queryTag); + } else { + // no privilege to cancel + throw new HiveSQLException("No privilege"); + } + break; + case TAG: + int numCanceled = 0; + for (Operation operationToKill : operationsToKill) { + if (cancelOperation(operationToKill, admin, errMsg)) { + numCanceled++; + } + } + killChildYarnJobs(conf, queryIdOrTag); + if (numCanceled == 0) { + throw new HiveSQLException("No privilege"); + } + break; + case UNKNOWN: + killChildYarnJobs(conf, queryIdOrTag); + break; } } catch (HiveSQLException e) { - LOG.error("Kill query failed for query " + queryId, e); + LOG.error("Kill query failed for query " + queryIdOrTag, e); throw new HiveException(e.getMessage(), e); } }