Repository: hive Updated Branches: refs/heads/master f7dea1060 -> acc62e3d5
HIVE-18628: Make tez dag status check interval configurable (Prasanth Jayachandran reviewed by Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/acc62e3d Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/acc62e3d Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/acc62e3d Branch: refs/heads/master Commit: acc62e3d53c03bbc1f2d72362b0d4661c9f419fb Parents: f7dea10 Author: Prasanth Jayachandran <prasan...@apache.org> Authored: Wed Feb 7 00:57:12 2018 -0800 Committer: Prasanth Jayachandran <prasan...@apache.org> Committed: Wed Feb 7 00:57:12 2018 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 4 +- .../hive/jdbc/AbstractJdbcTriggersTest.java | 2 +- .../jdbc/TestTriggersMoveWorkloadManager.java | 39 +++++++++++++++++++- .../hive/jdbc/TestTriggersWorkloadManager.java | 2 +- .../hive/ql/exec/tez/TezSessionPoolManager.java | 2 +- .../ql/exec/tez/TriggerValidatorRunnable.java | 3 ++ .../hive/ql/exec/tez/WorkloadManager.java | 2 +- .../ql/exec/tez/monitoring/TezJobMonitor.java | 11 +++--- 8 files changed, 54 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/acc62e3d/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 99e8457..eca3573 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3120,6 +3120,8 @@ public class HiveConf extends Configuration { true, "Allows hive server 2 to send progress bar update information. This is currently available" + " only if the execution engine is tez."), + TEZ_DAG_STATUS_CHECK_INTERVAL("hive.tez.dag.status.check.interval", "500ms", + new TimeValidator(TimeUnit.MILLISECONDS), "Interval between subsequent DAG status invocation."), SPARK_EXEC_INPLACE_PROGRESS("hive.spark.exec.inplace.progress", true, "Updates spark job execution progress in-place in the terminal."), TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION("hive.tez.container.max.java.heap.fraction", 0.8f, @@ -3532,7 +3534,7 @@ public class HiveConf extends Configuration { Constants.LLAP_LOGGER_NAME_CONSOLE), "logger used for llap-daemons."), - HIVE_TRIGGER_VALIDATION_INTERVAL_MS("hive.trigger.validation.interval.ms", "500ms", + HIVE_TRIGGER_VALIDATION_INTERVAL("hive.trigger.validation.interval", "500ms", new TimeValidator(TimeUnit.MILLISECONDS), "Interval for validating triggers during execution of a query. Triggers defined in resource plan will get\n" + "validated for all SQL operations after every defined interval (default: 500ms) and corresponding action\n" + http://git-wip-us.apache.org/repos/asf/hive/blob/acc62e3d/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java index 62ee66f..e1d8ab2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java @@ -69,7 +69,7 @@ public abstract class AbstractJdbcTriggersTest { conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default"); - conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, 100, TimeUnit.MILLISECONDS); + conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL, 100, TimeUnit.MILLISECONDS); conf.setBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS, true); conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true); conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false); http://git-wip-us.apache.org/repos/asf/hive/blob/acc62e3d/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java index 74ca958..8aca2a6 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java @@ -59,7 +59,8 @@ public class TestTriggersMoveWorkloadManager extends AbstractJdbcTriggersTest { conf = new HiveConf(); conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); - conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, 100, TimeUnit.MILLISECONDS); + conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL, 50, TimeUnit.MILLISECONDS); + conf.setTimeVar(ConfVars.TEZ_DAG_STATUS_CHECK_INTERVAL, 50, TimeUnit.MILLISECONDS); conf.setVar(ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE, "default"); conf.setBoolean("hive.test.workload.management", true); conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true); @@ -185,6 +186,42 @@ public class TestTriggersMoveWorkloadManager extends AbstractJdbcTriggersTest { runQueryWithTrigger(query, setCmds, killTrigger + " violated", errCaptureExpect); } + // TODO: disabling this test as tez publishes counters only after task completion which will cause write side counters + // to be not validated correctly (DAG will be completed before validation) +// @Test(timeout = 60000) +// public void testTriggerMoveKill() throws Exception { +// Expression moveExpression1 = ExpressionFactory.fromString("HDFS_BYTES_READ > 100"); +// Expression moveExpression2 = ExpressionFactory.fromString("HDFS_BYTES_WRITTEN > 200"); +// Trigger moveTrigger1 = new ExecutionTrigger("move_big_read", moveExpression1, +// new Action(Action.Type.MOVE_TO_POOL, "ETL")); +// Trigger killTrigger = new ExecutionTrigger("big_write_kill", moveExpression2, +// new Action(Action.Type.KILL_QUERY)); +// setupTriggers(Lists.newArrayList(moveTrigger1), Lists.newArrayList(killTrigger)); +// String query = "select t1.under_col, t1.value from " + tableName + " t1 join " + tableName + +// " t2 on t1.under_col>=t2.under_col order by t1.under_col, t1.value"; +// List<String> setCmds = new ArrayList<>(); +// setCmds.add("set hive.tez.session.events.print.summary=json"); +// setCmds.add("set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); +// setCmds.add("set hive.exec.failure.hooks=org.apache.hadoop.hive.ql.hooks.PostExecWMEventsSummaryPrinter"); +// List<String> errCaptureExpect = new ArrayList<>(); +// errCaptureExpect.add("Workload Manager Events Summary"); +// errCaptureExpect.add("Event: GET Pool: BI Cluster %: 80.00"); +// errCaptureExpect.add("Event: MOVE Pool: ETL Cluster %: 20.00"); +// errCaptureExpect.add("Event: KILL Pool: null Cluster %: 0.00"); +// errCaptureExpect.add("Event: RETURN Pool: null Cluster %: 0.00"); +// errCaptureExpect.add("\"eventType\" : \"GET\""); +// errCaptureExpect.add("\"eventType\" : \"MOVE\""); +// errCaptureExpect.add("\"eventType\" : \"KILL\""); +// errCaptureExpect.add("\"eventType\" : \"RETURN\""); +// errCaptureExpect.add("\"name\" : \"move_big_read\""); +// errCaptureExpect.add("\"name\" : \"big_write_kill\""); +// // violation in BI queue +// errCaptureExpect.add("\"violationMsg\" : \"Trigger " + moveTrigger1 + " violated"); +// // violation in ETL queue +// errCaptureExpect.add("\"violationMsg\" : \"Trigger " + killTrigger + " violated"); +// runQueryWithTrigger(query, setCmds, killTrigger + " violated", errCaptureExpect); +// } + @Test(timeout = 60000) public void testTriggerMoveConflictKill() throws Exception { Expression moveExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 100"); http://git-wip-us.apache.org/repos/asf/hive/blob/acc62e3d/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java index 285e533..85391ac 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java @@ -51,7 +51,7 @@ public class TestTriggersWorkloadManager extends TestTriggersTezSessionPoolManag conf = new HiveConf(); conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); - conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, 100, TimeUnit.MILLISECONDS); + conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL, 100, TimeUnit.MILLISECONDS); conf.setVar(ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE, "default"); conf.setBoolean("hive.test.workload.management", true); conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true); http://git-wip-us.apache.org/repos/asf/hive/blob/acc62e3d/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 82fdf6c..d0b32b8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -182,7 +182,7 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger public void initTriggers(final HiveConf conf) throws HiveException { if (triggerValidatorRunnable == null) { final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, ConfVars - .HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS); + .HIVE_TRIGGER_VALIDATION_INTERVAL, TimeUnit.MILLISECONDS); sessionTriggerProvider = new SessionTriggerProvider(openSessions, new LinkedList<>()); triggerActionHandler = new KillTriggerActionHandler(); triggerValidatorRunnable = new TriggerValidatorRunnable( http://git-wip-us.apache.org/repos/asf/hive/blob/acc62e3d/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java index 6414f05..9ccaa1f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java @@ -53,6 +53,9 @@ public class TriggerValidatorRunnable implements Runnable { for (Trigger currentTrigger : triggers) { String desiredCounter = currentTrigger.getExpression().getCounterLimit().getName(); // there could be interval where desired counter value is not populated by the time we make this check + if (LOG.isDebugEnabled()) { + LOG.debug("Validating trigger: {} against currentCounters: {}", currentTrigger, currentCounters); + } if (currentCounters.containsKey(desiredCounter)) { long currentCounterValue = currentCounters.get(desiredCounter); if (currentTrigger.apply(currentCounterValue)) { http://git-wip-us.apache.org/repos/asf/hive/blob/acc62e3d/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index 98505b6..915b016 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -251,7 +251,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida allocationManager.start(); final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, - HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS); + HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL, TimeUnit.MILLISECONDS); TriggerActionHandler<?> triggerActionHandler = new KillMoveTriggerActionHandler(this); triggerValidatorRunnable = new PerPoolTriggerValidatorRunnable(perPoolProviders, triggerActionHandler, triggerValidationIntervalMs); http://git-wip-us.apache.org/repos/asf/hive/blob/acc62e3d/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java index 3558475..166ecfc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java @@ -69,10 +69,9 @@ import com.google.common.base.Preconditions; * completion. */ public class TezJobMonitor { + private static final Logger LOG = LoggerFactory.getLogger(TezJobMonitor.class); static final String CLASS_NAME = TezJobMonitor.class.getName(); - private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); - private static final int MIN_CHECK_INTERVAL = 200; private static final int MAX_CHECK_INTERVAL = 1000; private static final int MAX_RETRY_INTERVAL = 2500; private static final int MAX_RETRY_FAILURES = (MAX_RETRY_INTERVAL / MAX_CHECK_INTERVAL) + 1; @@ -157,7 +156,8 @@ public class TezJobMonitor { DAGStatus.State lastState = null; boolean running = false; - long checkInterval = MIN_CHECK_INTERVAL; + long checkInterval = HiveConf.getTimeVar(hiveConf, HiveConf.ConfVars.TEZ_DAG_STATUS_CHECK_INTERVAL, + TimeUnit.MILLISECONDS); WmContext wmContext = null; while (true) { @@ -179,6 +179,9 @@ public class TezJobMonitor { if (desiredCounters != null && !desiredCounters.isEmpty()) { Map<String, Long> currentCounters = getCounterValues(dagCounters, vertexNames, vertexProgressMap, desiredCounters, done); + if (LOG.isDebugEnabled()) { + LOG.debug("Requested DAG status. checkInterval: {}. currentCounters: {}", checkInterval, currentCounters); + } wmContext.setCurrentCounters(currentCounters); } } @@ -204,8 +207,6 @@ public class TezJobMonitor { console.printInfo("Status: Running (" + dagClient.getExecutionContext() + ")\n"); this.executionStartTime = System.currentTimeMillis(); running = true; - // from running -> failed/succeeded, the AM breaks out of timeouts - checkInterval = MAX_CHECK_INTERVAL; } updateFunction.update(status, vertexProgressMap); break;