This is an automated email from the ASF dual-hosted git repository. sankarh pushed a commit to branch branch-3 in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/branch-3 by this push: new e15bafb0b8d HIVE-27377: Backport of HIVE-24803: WorkloadManager doesn't update allocation and metrics after Kill Trigger action (Nikhil Gupta, reviewed by Ashish Sharma, Sankar Hariappan) e15bafb0b8d is described below commit e15bafb0b8d55093912c2939c764da7736942076 Author: Diksha628 <43694846+diksha...@users.noreply.github.com> AuthorDate: Tue Sep 26 10:24:10 2023 +0530 HIVE-27377: Backport of HIVE-24803: WorkloadManager doesn't update allocation and metrics after Kill Trigger action (Nikhil Gupta, reviewed by Ashish Sharma, Sankar Hariappan) Signed-off-by: Sankar Hariappan <sank...@apache.org> Closes (#4660) --- .../apache/hive/jdbc/TestWMMetricsWithTrigger.java | 227 +++++++++++++++++++++ .../hadoop/hive/ql/exec/tez/WorkloadManager.java | 38 +++- 2 files changed, 263 insertions(+), 2 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestWMMetricsWithTrigger.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestWMMetricsWithTrigger.java new file mode 100644 index 00000000000..0af905ea4b9 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestWMMetricsWithTrigger.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.jdbc; + +import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.metrics.MetricsTestUtils; +import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; +import org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics; +import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager; +import org.apache.hadoop.hive.ql.wm.*; +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.net.URL; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.*; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +public class TestWMMetricsWithTrigger { + + private final Logger LOG = LoggerFactory.getLogger(getClass().getName()); + private static MiniHS2 miniHS2 = null; + private static List<Iterable<AbstractMetric>> metricValues = new ArrayList<>(); + private static final String tableName = "testWmMetricsTriggerTbl"; + private static final String testDbName = "testWmMetricsTrigger"; + private static String wmPoolName = "llap"; + + public static class SleepMsUDF extends UDF { + private static final Logger LOG = LoggerFactory.getLogger(TestWMMetricsWithTrigger.class); + + public Integer evaluate(final Integer value, final Integer ms) { + try { + LOG.info("Sleeping for " + ms + " milliseconds"); + Thread.sleep(ms); + } catch (InterruptedException e) { + LOG.warn("Interrupted Exception"); + // No-op + } + return value; + } + } + + private static class ExceptionHolder { + Throwable throwable; + } + + static HiveConf defaultConf() throws Exception { + String confDir = "../../data/conf/llap/"; + if (StringUtils.isNotBlank(confDir)) { + HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); + System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); + } + HiveConf defaultConf = new HiveConf(); + defaultConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + defaultConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + defaultConf.setBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED, false); + defaultConf.setVar(HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER, + "org.apache.hadoop.hive.ql.security.SessionStateUserAuthenticator"); + defaultConf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + "/tez-site.xml")); + defaultConf.setTimeVar(HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL, 100, TimeUnit.MILLISECONDS); + defaultConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE, "default"); + defaultConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED, true); + defaultConf.setVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER, MetricsReporting.JSON_FILE.name()); + // don't want cache hits from llap io for testing filesystem bytes read counters + defaultConf.setVar(HiveConf.ConfVars.LLAP_IO_MEMORY_MODE, "none"); + return defaultConf; + } + + @BeforeClass + public static void beforeTest() throws Exception { + HiveConf conf = defaultConf(); + + Class.forName(MiniHS2.getJdbcDriverName()); + miniHS2 = new MiniHS2(conf, MiniHS2.MiniClusterType.LLAP); + Map<String, String> confOverlay = new HashMap<>(); + miniHS2.start(confOverlay); + miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); + + Connection conDefault = + BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); + Statement stmt = conDefault.createStatement(); + String tblName = testDbName + "." + tableName; + String dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); + Path dataFilePath = new Path(dataFileDir, "kv1.txt"); + String udfName = TestWMMetricsWithTrigger.SleepMsUDF.class.getName(); + stmt.execute("drop database if exists " + testDbName + " cascade"); + stmt.execute("create database " + testDbName); + stmt.execute("dfs -put " + dataFilePath.toString() + " " + "kv1.txt"); + stmt.execute("use " + testDbName); + stmt.execute("create table " + tblName + " (int_col int, value string) "); + stmt.execute("load data inpath 'kv1.txt' into table " + tblName); + stmt.execute("create function sleep as '" + udfName + "'"); + stmt.close(); + conDefault.close(); + setupPlanAndTrigger(); + } + + private static void setupPlanAndTrigger() throws Exception { + WorkloadManager wm = WorkloadManager.getInstance(); + WMPool wmPool = new WMPool("test_plan", wmPoolName); + wmPool.setAllocFraction(1.0f); + wmPool.setQueryParallelism(1); + WMFullResourcePlan resourcePlan = new WMFullResourcePlan(new WMResourcePlan("rp"), Lists.newArrayList(wmPool)); + resourcePlan.getPlan().setDefaultPoolPath(wmPoolName); + Expression expression = ExpressionFactory.fromString("EXECUTION_TIME > 10000"); + Trigger trigger = new ExecutionTrigger("kill_query", expression, new Action(Action.Type.KILL_QUERY)); + WMTrigger wmTrigger = wmTriggerFromTrigger(trigger); + resourcePlan.addToTriggers(wmTrigger); + resourcePlan.addToPoolTriggers(new WMPoolTrigger("llap", trigger.getName())); + wm.updateResourcePlanAsync(resourcePlan).get(10, TimeUnit.SECONDS); + } + + @AfterClass + public static void afterTest() { + if (miniHS2.isStarted()) { + miniHS2.stop(); + } + metricValues.clear(); + metricValues = null; + } + + void runQueryWithTrigger(int queryTimeoutSecs) throws Exception { + LOG.info("Starting test"); + String query = "select sleep(t1.int_col + t2.int_col, 500), t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.int_col>=t2.int_col"; + long start = System.currentTimeMillis(); + Connection conn = + BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName), System.getProperty("user.name"), "bar"); + final Statement selStmt = conn.createStatement(); + Throwable throwable = null; + try { + if (queryTimeoutSecs > 0) { + selStmt.setQueryTimeout(queryTimeoutSecs); + } + selStmt.execute(query); + } catch (SQLException e) { + throwable = e; + } + selStmt.close(); + assertNotNull("Expected non-null throwable", throwable); + assertEquals(SQLException.class, throwable.getClass()); + assertTrue("Query was killed due to " + throwable.getMessage() + " and not because of trigger violation", + throwable.getMessage().contains("violated")); + long end = System.currentTimeMillis(); + LOG.info("time taken: {} ms", (end - start)); + } + + private static WMTrigger wmTriggerFromTrigger(Trigger trigger) { + WMTrigger result = new WMTrigger("rp", trigger.getName()); + result.setTriggerExpression(trigger.getExpression().toString()); + result.setActionExpression(trigger.getAction().toString()); + return result; + } + + @Test(timeout = 30000) + public void testWmPoolMetricsAfterKillTrigger() throws Exception { + verifyMetrics(0, 4, 1, 0); + + ExceptionHolder stmtHolder = new ExceptionHolder(); + // Run Query with Kill Trigger in place in a separate thread + Thread tExecute = new Thread(() -> { + try { + runQueryWithTrigger(10); + } catch (Exception e) { + LOG.error("Exception while executing runQueryWithTrigger", e); + stmtHolder.throwable = e; + } + }); + tExecute.start(); + + //Wait for Workload Manager main thread to update the metrics after query enters processing. + Thread.sleep(5000); + verifyMetrics(4, 4, 1, 1); + + tExecute.join(); + assertNull("Exception while executing statement", stmtHolder.throwable); + + //Wait for Workload Manager main thread to update the metrics after kill query succeeded. + Thread.sleep(10000); + + //Metrics should reset to original value after query is killed + verifyMetrics(0, 4, 1, 0); + + } + + private static void verifyMetrics(int numExecutors, int numExecutorsMax, int numParallelQueries, int numRunningQueries) + throws Exception { + CodahaleMetrics metrics = (CodahaleMetrics) MetricsFactory.getInstance(); + String json = metrics.dumpJson(); + MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.GAUGE, "WM_llap_numExecutors", numExecutors); + MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.GAUGE, "WM_llap_numExecutorsMax", numExecutorsMax); + MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.GAUGE, "WM_llap_numParallelQueries", numParallelQueries); + MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.GAUGE, "WM_llap_numRunningQueries", numRunningQueries); + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index e3976aca1a2..9029285835c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -721,6 +721,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida } wmEvent.endEvent(ctx.session); } + + // Running query metrics needs to be updated for the pool + updatePoolMetricsAfterKillTrigger(poolsToRedistribute, ctx); break; } case RESTART_REQUIRED: { @@ -731,6 +734,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida // "in use". That is because all the user ops above like return, reopen, etc. // don't actually return/reopen/... when kill query is in progress. syncWork.toRestartInUse.add(ctx.session); + + // Running query metrics needs to be updated for the pool + updatePoolMetricsAfterKillTrigger(poolsToRedistribute, ctx); break; } default: throw new AssertionError("Unknown state " + kr); @@ -786,6 +792,19 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida } } + private void updatePoolMetricsAfterKillTrigger(HashSet<String> poolsToRedistribute, KillQueryContext ctx) { + String poolName = ctx.getPoolName(); + if (StringUtils.isNotBlank(poolName)) { + poolsToRedistribute.add(poolName); + PoolState pool = pools.get(poolName); + if ((pool != null) && (pool.metrics != null)) { + LOG.debug(String.format("Removing 1 query from pool %s, Current numRunningQueries: %s", pool.fullName, + pool.metrics.numRunningQueries.value())); + pool.metrics.removeRunningQueries(1); + } + } + } + private void dumpPoolState(PoolState ps, List<String> set) { StringBuilder sb = new StringBuilder(); sb.append("POOL ").append(ps.fullName).append(": qp ").append(ps.queryParallelism) @@ -2272,6 +2291,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida */ static final class KillQueryContext { private SettableFuture<Boolean> killSessionFuture; + private String poolName; private final String reason; private final WmTezSession session; // Note: all the fields are only modified by master thread. @@ -2318,6 +2338,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida return KillQueryResult.OK; } + String getPoolName() { + return poolName; + } + + void setPoolName(String poolName) { + this.poolName = poolName; + } + @Override public String toString() { return "KillQueryContext [isUserDone=" + isUserDone + ", isKillDone=" + isKillDone @@ -2340,13 +2368,19 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida KillQueryContext killQueryContext, Map<WmTezSession, GetRequest> toReuse) { WmTezSession toKill = killQueryContext.session; + String poolName = toKill.getPoolName(); + + boolean validPoolName = StringUtils.isNotBlank(poolName); + + if (validPoolName) { + killQueryContext.setPoolName(poolName); + } toKillQuery.put(toKill, killQueryContext); // The way this works is, a session in WM pool will move back to tez AM pool on a kill and will get // reassigned back to WM pool on GetRequest based on user pool mapping. Only if we remove the session from active // sessions list of its WM pool will the queue'd GetRequest be processed - String poolName = toKill.getPoolName(); - if (poolName != null) { + if (validPoolName) { PoolState poolState = pools.get(poolName); if (poolState != null) { poolState.getSessions().remove(toKill);