This is an automated email from the ASF dual-hosted git repository.

bslim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new e7f2fccd HIVE-21785: Add task queue/runtime stats per LLAP daemon to 
output (Oliver Draese via Slim Bouguerra)
e7f2fccd is described below

commit e7f2fccd72f325e7cb94a15c4d4b6733a72662a0
Author: Olli Draese <odra...@cloudera.com>
AuthorDate: Wed Jun 5 14:19:31 2019 -0700

    HIVE-21785: Add task queue/runtime stats per LLAP daemon to output (Oliver 
Draese via Slim Bouguerra)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java  |  8 +++++---
 .../hive/llap/counters/WmFragmentCounters.java      | 21 ++++++++++++++++++++-
 .../hive/llap/daemon/impl/ContainerRunnerImpl.java  | 15 +++++++++------
 .../hive/llap/daemon/impl/TaskExecutorService.java  |  4 +---
 4 files changed, 35 insertions(+), 13 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 1c0aa2a..148cf7e 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2755,7 +2755,7 @@ public class HiveConf extends Configuration {
     COMPACTOR_CRUD_QUERY_BASED("hive.compactor.crud.query.based", false,
         "Means Major compaction on full CRUD tables is done as a query, "
         + "and minor compaction will be disabled."),
-    SPLIT_GROUPING_MODE("hive.split.grouping.mode", "query", new 
StringSet("query", "compactor"), 
+    SPLIT_GROUPING_MODE("hive.split.grouping.mode", "query", new 
StringSet("query", "compactor"),
         "This is set to compactor from within the query based compactor. This 
enables the Tez SplitGrouper "
         + "to group splits based on their bucket number, so that all rows from 
different bucket files "
         + " for the same bucket number can end up in the same bucket file 
after the compaction."),
@@ -4407,7 +4407,7 @@ public class HiveConf extends Configuration {
     LLAP_CLIENT_CONSISTENT_SPLITS("hive.llap.client.consistent.splits", true,
         "Whether to setup split locations to match nodes on which llap daemons 
are running, " +
         "preferring one of the locations provided by the split itself. If 
there is no llap daemon " +
-        "running on any of those locations (or on the cloud), fall back to a 
cache affinity to" + 
+        "running on any of those locations (or on the cloud), fall back to a 
cache affinity to" +
         " an LLAP node. This is effective only if hive.execution.mode is 
llap."),
     LLAP_VALIDATE_ACLS("hive.llap.validate.acls", true,
         "Whether LLAP should reject permissive ACLs in some cases (e.g. its 
own management\n" +
@@ -4448,7 +4448,9 @@ public class HiveConf extends Configuration {
     LLAP_COLLECT_LOCK_METRICS("hive.llap.lockmetrics.collect", false,
         "Whether lock metrics (wait times, counts) are collected for LLAP "
         + "related locks"),
-
+    LLAP_TASK_TIME_SUMMARY(
+        "hive.llap.task.time.print.summary", false,
+        "Display queue and runtime of tasks by host for every query executed 
by the shell."),
     HIVE_TRIGGER_VALIDATION_INTERVAL("hive.trigger.validation.interval", 
"500ms",
       new TimeValidator(TimeUnit.MILLISECONDS),
       "Interval for validating triggers during execution of a query. Triggers 
defined in resource plan will get\n" +
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java
index e4dfe4e..231c962 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.llap.counters;
 
 import java.util.concurrent.atomic.AtomicLongArray;
 
+import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
+import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TezCounters;
 
 /**
@@ -31,10 +33,12 @@ public class WmFragmentCounters {
   private LlapWmCounters currentCounter = null;
   private long currentCounterStartTime = 0;
   private final AtomicLongArray fixedCounters;
+  private final boolean addTaskTimeCounters;
 
-  public WmFragmentCounters() {
+  public WmFragmentCounters(boolean addTaskTimeCounters) {
     // Note: WmFragmentCounters are created before Tez counters are created.
     this.fixedCounters = new AtomicLongArray(LlapWmCounters.values().length);
+    this.addTaskTimeCounters = addTaskTimeCounters;
   }
 
   public void changeStateQueued(boolean isGuaranteed) {
@@ -119,6 +123,21 @@ public class WmFragmentCounters {
     for (int i = 0; i < fixedCounters.length(); ++i) {
       
tezCounters.findCounter(LlapWmCounters.values()[i]).setValue(fixedCounters.get(i));
     }
+
+    // add queue and runtime (together with task count) on a "per daemon" level
+    // to the Tez counters.
+    if (addTaskTimeCounters) {
+      String hostName = MetricsUtils.getHostName();
+      long queued = 
fixedCounters.get(LlapWmCounters.GUARANTEED_QUEUED_NS.ordinal())
+                    + 
fixedCounters.get(LlapWmCounters.SPECULATIVE_QUEUED_NS.ordinal());
+      long running = 
fixedCounters.get(LlapWmCounters.GUARANTEED_RUNNING_NS.ordinal())
+                     + 
fixedCounters.get(LlapWmCounters.SPECULATIVE_RUNNING_NS.ordinal());
+
+      CounterGroup cg = tezCounters.getGroup("LlapTaskRuntimeAgg by daemon");
+      cg.findCounter("QueueTime-" + hostName).setValue(queued);
+      cg.findCounter("RunTime-" + hostName).setValue(running);
+      cg.findCounter("Count-" + hostName).setValue(1);
+    }
   }
 
   @Override
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 7a3ca2f..44e2fa9 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -27,12 +27,11 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.UgiFactory;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.DaemonId;
 import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.llap.NotTezEventHelper;
-import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
-import org.apache.hadoop.hive.llap.counters.LlapWmCounters;
 import org.apache.hadoop.hive.llap.counters.WmFragmentCounters;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
@@ -65,7 +64,6 @@ import 
org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
 import org.apache.hadoop.hive.llap.security.LlapSignerImpl;
 import org.apache.hadoop.hive.llap.tez.Converters;
 import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
-import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -73,7 +71,6 @@ import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
 import org.apache.log4j.NDC;
-import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -142,7 +139,6 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
     this.executorService = executorService;
     completionListener = (SchedulerFragmentCompletingListener) executorService;
 
-
     // Distribute the available memory between the tasks.
     this.memoryPerExecutor = (long)(totalMemoryAvailableBytes / (float) 
numExecutors);
     this.metrics = metrics;
@@ -280,8 +276,15 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
       Configuration callableConf = new Configuration(getConfig());
       UserGroupInformation fsTaskUgi = fsUgiFactory == null ? null : 
fsUgiFactory.createUgi();
       boolean isGuaranteed = request.hasIsGuaranteed() && 
request.getIsGuaranteed();
+
+      // enable the printing of (per daemon) LLAP task queue/run times via 
LLAP_TASK_TIME_SUMMARY
+      ConfVars tezSummary = ConfVars.TEZ_EXEC_SUMMARY;
+      ConfVars llapTasks = ConfVars.LLAP_TASK_TIME_SUMMARY;
+      boolean addTaskTimes = callableConf.getBoolean(tezSummary.varname, 
tezSummary.defaultBoolVal)
+                             && callableConf.getBoolean(llapTasks.varname, 
llapTasks.defaultBoolVal);
+
       // TODO: ideally we'd register TezCounters here, but it seems impossible 
before registerTask.
-      WmFragmentCounters wmCounters = new WmFragmentCounters();
+      WmFragmentCounters wmCounters = new WmFragmentCounters(addTaskTimes);
       TaskRunnerCallable callable = new TaskRunnerCallable(request, 
fragmentInfo, callableConf,
           new ExecutionContextImpl(localAddress.get().getHostName()), env,
           credentials, memoryPerExecutor, amReporter, confParams, metrics, 
killedTaskHandler,
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index aaf9674..82bb06a 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -44,7 +44,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
 import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler;
 import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener;
 import 
org.apache.hadoop.hive.llap.daemon.impl.comparator.LlapQueueComparatorBase;
@@ -54,7 +53,6 @@ import 
org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
 import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.util.Clock;
-import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.runtime.task.EndReason;
 import org.apache.tez.runtime.task.TaskRunner2Result;
 import org.slf4j.Logger;
@@ -680,8 +678,8 @@ public class TaskExecutorService extends AbstractService
       LOG.info("Attempting to execute {}", taskWrapper);
     }
     TaskRunnerCallable task = taskWrapper.getTaskRunnerCallable();
-    task.setWmCountersRunning();
     ListenableFuture<TaskRunner2Result> future = executorService.submit(task);
+    task.setWmCountersRunning();
     runningFragmentCount.incrementAndGet();
     taskWrapper.setIsInWaitQueue(false);
 

Reply via email to