This is an automated email from the ASF dual-hosted git repository.

sankarh pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new e15bafb0b8d HIVE-27377: Backport of HIVE-24803: WorkloadManager 
doesn't update allocation and metrics after Kill Trigger action (Nikhil Gupta, 
reviewed by Ashish Sharma, Sankar Hariappan)
e15bafb0b8d is described below

commit e15bafb0b8d55093912c2939c764da7736942076
Author: Diksha628 <43694846+diksha...@users.noreply.github.com>
AuthorDate: Tue Sep 26 10:24:10 2023 +0530

    HIVE-27377: Backport of HIVE-24803: WorkloadManager doesn't update 
allocation and metrics after Kill Trigger action (Nikhil Gupta, reviewed by 
Ashish Sharma, Sankar Hariappan)
    
    Signed-off-by: Sankar Hariappan <sank...@apache.org>
    Closes (#4660)
---
 .../apache/hive/jdbc/TestWMMetricsWithTrigger.java | 227 +++++++++++++++++++++
 .../hadoop/hive/ql/exec/tez/WorkloadManager.java   |  38 +++-
 2 files changed, 263 insertions(+), 2 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestWMMetricsWithTrigger.java
 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestWMMetricsWithTrigger.java
new file mode 100644
index 00000000000..0af905ea4b9
--- /dev/null
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestWMMetricsWithTrigger.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.jdbc;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.metrics.MetricsTestUtils;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+import org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics;
+import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager;
+import org.apache.hadoop.hive.ql.wm.*;
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+public class TestWMMetricsWithTrigger {
+
+  private final Logger LOG = LoggerFactory.getLogger(getClass().getName());
+  private static MiniHS2 miniHS2 = null;
+  private static List<Iterable<AbstractMetric>> metricValues = new 
ArrayList<>();
+  private static final String tableName = "testWmMetricsTriggerTbl";
+  private static final String testDbName = "testWmMetricsTrigger";
+  private static String wmPoolName = "llap";
+
+  public static class SleepMsUDF extends UDF {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestWMMetricsWithTrigger.class);
+
+    public Integer evaluate(final Integer value, final Integer ms) {
+      try {
+        LOG.info("Sleeping for " + ms + " milliseconds");
+        Thread.sleep(ms);
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted Exception");
+        // No-op
+      }
+      return value;
+    }
+  }
+
+  private static class ExceptionHolder {
+    Throwable throwable;
+  }
+
+  static HiveConf defaultConf() throws Exception {
+    String confDir = "../../data/conf/llap/";
+    if (StringUtils.isNotBlank(confDir)) {
+      HiveConf.setHiveSiteLocation(new URL("file://" + new 
File(confDir).toURI().getPath() + "/hive-site.xml"));
+      System.out.println("Setting hive-site: " + 
HiveConf.getHiveSiteLocation());
+    }
+    HiveConf defaultConf = new HiveConf();
+    defaultConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+    defaultConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+    defaultConf.setBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED, 
false);
+    defaultConf.setVar(HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER,
+        "org.apache.hadoop.hive.ql.security.SessionStateUserAuthenticator");
+    defaultConf.addResource(new URL("file://" + new 
File(confDir).toURI().getPath() + "/tez-site.xml"));
+    defaultConf.setTimeVar(HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL, 
100, TimeUnit.MILLISECONDS);
+    defaultConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE, 
"default");
+    defaultConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED, 
true);
+    defaultConf.setVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER, 
MetricsReporting.JSON_FILE.name());
+    // don't want cache hits from llap io for testing filesystem bytes read 
counters
+    defaultConf.setVar(HiveConf.ConfVars.LLAP_IO_MEMORY_MODE, "none");
+    return defaultConf;
+  }
+
+  @BeforeClass
+  public static void beforeTest() throws Exception {
+    HiveConf conf = defaultConf();
+
+    Class.forName(MiniHS2.getJdbcDriverName());
+    miniHS2 = new MiniHS2(conf, MiniHS2.MiniClusterType.LLAP);
+    Map<String, String> confOverlay = new HashMap<>();
+    miniHS2.start(confOverlay);
+    miniHS2.getDFS().getFileSystem().mkdirs(new 
Path("/apps_staging_dir/anonymous"));
+
+    Connection conDefault =
+        BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), 
System.getProperty("user.name"), "bar");
+    Statement stmt = conDefault.createStatement();
+    String tblName = testDbName + "." + tableName;
+    String dataFileDir = conf.get("test.data.files").replace('\\', 
'/').replace("c:", "");
+    Path dataFilePath = new Path(dataFileDir, "kv1.txt");
+    String udfName = TestWMMetricsWithTrigger.SleepMsUDF.class.getName();
+    stmt.execute("drop database if exists " + testDbName + " cascade");
+    stmt.execute("create database " + testDbName);
+    stmt.execute("dfs -put " + dataFilePath.toString() + " " + "kv1.txt");
+    stmt.execute("use " + testDbName);
+    stmt.execute("create table " + tblName + " (int_col int, value string) ");
+    stmt.execute("load data inpath 'kv1.txt' into table " + tblName);
+    stmt.execute("create function sleep as '" + udfName + "'");
+    stmt.close();
+    conDefault.close();
+    setupPlanAndTrigger();
+  }
+
+  private static void setupPlanAndTrigger() throws Exception {
+    WorkloadManager wm = WorkloadManager.getInstance();
+    WMPool wmPool = new WMPool("test_plan", wmPoolName);
+    wmPool.setAllocFraction(1.0f);
+    wmPool.setQueryParallelism(1);
+    WMFullResourcePlan resourcePlan = new WMFullResourcePlan(new 
WMResourcePlan("rp"), Lists.newArrayList(wmPool));
+    resourcePlan.getPlan().setDefaultPoolPath(wmPoolName);
+    Expression expression = ExpressionFactory.fromString("EXECUTION_TIME > 
10000");
+    Trigger trigger = new ExecutionTrigger("kill_query", expression, new 
Action(Action.Type.KILL_QUERY));
+    WMTrigger wmTrigger = wmTriggerFromTrigger(trigger);
+    resourcePlan.addToTriggers(wmTrigger);
+    resourcePlan.addToPoolTriggers(new WMPoolTrigger("llap", 
trigger.getName()));
+    wm.updateResourcePlanAsync(resourcePlan).get(10, TimeUnit.SECONDS);
+  }
+
+  @AfterClass
+  public static void afterTest() {
+    if (miniHS2.isStarted()) {
+      miniHS2.stop();
+    }
+    metricValues.clear();
+    metricValues = null;
+  }
+
+  void runQueryWithTrigger(int queryTimeoutSecs) throws Exception {
+    LOG.info("Starting test");
+    String query = "select sleep(t1.int_col + t2.int_col, 500), t1.value from 
" + tableName + " t1 join " + tableName
+        + " t2 on t1.int_col>=t2.int_col";
+    long start = System.currentTimeMillis();
+    Connection conn =
+        BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName), 
System.getProperty("user.name"), "bar");
+    final Statement selStmt = conn.createStatement();
+    Throwable throwable = null;
+    try {
+      if (queryTimeoutSecs > 0) {
+        selStmt.setQueryTimeout(queryTimeoutSecs);
+      }
+      selStmt.execute(query);
+    } catch (SQLException e) {
+      throwable = e;
+    }
+    selStmt.close();
+    assertNotNull("Expected non-null throwable", throwable);
+    assertEquals(SQLException.class, throwable.getClass());
+    assertTrue("Query was killed due to " + throwable.getMessage() + " and not 
because of trigger violation",
+        throwable.getMessage().contains("violated"));
+    long end = System.currentTimeMillis();
+    LOG.info("time taken: {} ms", (end - start));
+  }
+
+  private static WMTrigger wmTriggerFromTrigger(Trigger trigger) {
+    WMTrigger result = new WMTrigger("rp", trigger.getName());
+    result.setTriggerExpression(trigger.getExpression().toString());
+    result.setActionExpression(trigger.getAction().toString());
+    return result;
+  }
+
+  @Test(timeout = 30000)
+  public void testWmPoolMetricsAfterKillTrigger() throws Exception {
+    verifyMetrics(0, 4, 1, 0);
+
+    ExceptionHolder stmtHolder = new ExceptionHolder();
+    // Run Query with Kill Trigger in place in a separate thread
+    Thread tExecute = new Thread(() -> {
+      try {
+        runQueryWithTrigger(10);
+      } catch (Exception e) {
+        LOG.error("Exception while executing runQueryWithTrigger", e);
+        stmtHolder.throwable = e;
+      }
+    });
+    tExecute.start();
+
+    //Wait for Workload Manager main thread to update the metrics after query 
enters processing.
+    Thread.sleep(5000);
+    verifyMetrics(4, 4, 1, 1);
+
+    tExecute.join();
+    assertNull("Exception while executing statement", stmtHolder.throwable);
+
+    //Wait for Workload Manager main thread to update the metrics after kill 
query succeeded.
+    Thread.sleep(10000);
+
+    //Metrics should reset to original value after query is killed
+    verifyMetrics(0, 4, 1, 0);
+
+  }
+
+  private static void verifyMetrics(int numExecutors, int numExecutorsMax, int 
numParallelQueries, int numRunningQueries)
+      throws Exception {
+    CodahaleMetrics metrics = (CodahaleMetrics) MetricsFactory.getInstance();
+    String json = metrics.dumpJson();
+    MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.GAUGE, 
"WM_llap_numExecutors", numExecutors);
+    MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.GAUGE, 
"WM_llap_numExecutorsMax", numExecutorsMax);
+    MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.GAUGE, 
"WM_llap_numParallelQueries", numParallelQueries);
+    MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.GAUGE, 
"WM_llap_numRunningQueries", numRunningQueries);
+  }
+
+}
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index e3976aca1a2..9029285835c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -721,6 +721,9 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
           }
           wmEvent.endEvent(ctx.session);
         }
+
+        // Running query metrics needs to be updated for the pool
+        updatePoolMetricsAfterKillTrigger(poolsToRedistribute, ctx);
         break;
       }
       case RESTART_REQUIRED: {
@@ -731,6 +734,9 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
         //       "in use". That is because all the user ops above like return, 
reopen, etc.
         //       don't actually return/reopen/... when kill query is in 
progress.
         syncWork.toRestartInUse.add(ctx.session);
+
+        // Running query metrics needs to be updated for the pool
+        updatePoolMetricsAfterKillTrigger(poolsToRedistribute, ctx);
         break;
       }
       default: throw new AssertionError("Unknown state " + kr);
@@ -786,6 +792,19 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
     }
   }
 
+  private void updatePoolMetricsAfterKillTrigger(HashSet<String> 
poolsToRedistribute, KillQueryContext ctx) {
+    String poolName = ctx.getPoolName();
+    if (StringUtils.isNotBlank(poolName)) {
+      poolsToRedistribute.add(poolName);
+      PoolState pool = pools.get(poolName);
+      if ((pool != null) && (pool.metrics != null)) {
+          LOG.debug(String.format("Removing 1 query from pool %s, Current 
numRunningQueries: %s", pool.fullName,
+              pool.metrics.numRunningQueries.value()));
+          pool.metrics.removeRunningQueries(1);
+      }
+    }
+  }
+
   private void dumpPoolState(PoolState ps, List<String> set) {
     StringBuilder sb = new StringBuilder();
     sb.append("POOL ").append(ps.fullName).append(": qp 
").append(ps.queryParallelism)
@@ -2272,6 +2291,7 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
    */
   static final class KillQueryContext {
     private SettableFuture<Boolean> killSessionFuture;
+    private String poolName;
     private final String reason;
     private final WmTezSession session;
     // Note: all the fields are only modified by master thread.
@@ -2318,6 +2338,14 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
       return KillQueryResult.OK;
     }
 
+    String getPoolName() {
+      return poolName;
+    }
+
+    void setPoolName(String poolName) {
+      this.poolName = poolName;
+    }
+
     @Override
     public String toString() {
       return "KillQueryContext [isUserDone=" + isUserDone + ", isKillDone=" + 
isKillDone
@@ -2340,13 +2368,19 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
     KillQueryContext killQueryContext, Map<WmTezSession, GetRequest> toReuse) {
 
     WmTezSession toKill = killQueryContext.session;
+    String poolName = toKill.getPoolName();
+
+    boolean validPoolName = StringUtils.isNotBlank(poolName);
+
+    if (validPoolName) {
+      killQueryContext.setPoolName(poolName);
+    }
     toKillQuery.put(toKill, killQueryContext);
 
     // The way this works is, a session in WM pool will move back to tez AM 
pool on a kill and will get
     // reassigned back to WM pool on GetRequest based on user pool mapping. 
Only if we remove the session from active
     // sessions list of its WM pool will the queue'd GetRequest be processed
-    String poolName = toKill.getPoolName();
-    if (poolName != null) {
+    if (validPoolName) {
       PoolState poolState = pools.get(poolName);
       if (poolState != null) {
         poolState.getSessions().remove(toKill);

Reply via email to