This is an automated email from the ASF dual-hosted git repository. bslim pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new e7f2fccd HIVE-21785: Add task queue/runtime stats per LLAP daemon to output (Oliver Draese via Slim Bouguerra) e7f2fccd is described below commit e7f2fccd72f325e7cb94a15c4d4b6733a72662a0 Author: Olli Draese <odra...@cloudera.com> AuthorDate: Wed Jun 5 14:19:31 2019 -0700 HIVE-21785: Add task queue/runtime stats per LLAP daemon to output (Oliver Draese via Slim Bouguerra) --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 8 +++++--- .../hive/llap/counters/WmFragmentCounters.java | 21 ++++++++++++++++++++- .../hive/llap/daemon/impl/ContainerRunnerImpl.java | 15 +++++++++------ .../hive/llap/daemon/impl/TaskExecutorService.java | 4 +--- 4 files changed, 35 insertions(+), 13 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 1c0aa2a..148cf7e 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2755,7 +2755,7 @@ public class HiveConf extends Configuration { COMPACTOR_CRUD_QUERY_BASED("hive.compactor.crud.query.based", false, "Means Major compaction on full CRUD tables is done as a query, " + "and minor compaction will be disabled."), - SPLIT_GROUPING_MODE("hive.split.grouping.mode", "query", new StringSet("query", "compactor"), + SPLIT_GROUPING_MODE("hive.split.grouping.mode", "query", new StringSet("query", "compactor"), "This is set to compactor from within the query based compactor. This enables the Tez SplitGrouper " + "to group splits based on their bucket number, so that all rows from different bucket files " + " for the same bucket number can end up in the same bucket file after the compaction."), @@ -4407,7 +4407,7 @@ public class HiveConf extends Configuration { LLAP_CLIENT_CONSISTENT_SPLITS("hive.llap.client.consistent.splits", true, "Whether to setup split locations to match nodes on which llap daemons are running, " + "preferring one of the locations provided by the split itself. If there is no llap daemon " + - "running on any of those locations (or on the cloud), fall back to a cache affinity to" + + "running on any of those locations (or on the cloud), fall back to a cache affinity to" + " an LLAP node. This is effective only if hive.execution.mode is llap."), LLAP_VALIDATE_ACLS("hive.llap.validate.acls", true, "Whether LLAP should reject permissive ACLs in some cases (e.g. its own management\n" + @@ -4448,7 +4448,9 @@ public class HiveConf extends Configuration { LLAP_COLLECT_LOCK_METRICS("hive.llap.lockmetrics.collect", false, "Whether lock metrics (wait times, counts) are collected for LLAP " + "related locks"), - + LLAP_TASK_TIME_SUMMARY( + "hive.llap.task.time.print.summary", false, + "Display queue and runtime of tasks by host for every query executed by the shell."), HIVE_TRIGGER_VALIDATION_INTERVAL("hive.trigger.validation.interval", "500ms", new TimeValidator(TimeUnit.MILLISECONDS), "Interval for validating triggers during execution of a query. Triggers defined in resource plan will get\n" + diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java index e4dfe4e..231c962 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/WmFragmentCounters.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hive.llap.counters; import java.util.concurrent.atomic.AtomicLongArray; +import org.apache.hadoop.hive.llap.metrics.MetricsUtils; +import org.apache.tez.common.counters.CounterGroup; import org.apache.tez.common.counters.TezCounters; /** @@ -31,10 +33,12 @@ public class WmFragmentCounters { private LlapWmCounters currentCounter = null; private long currentCounterStartTime = 0; private final AtomicLongArray fixedCounters; + private final boolean addTaskTimeCounters; - public WmFragmentCounters() { + public WmFragmentCounters(boolean addTaskTimeCounters) { // Note: WmFragmentCounters are created before Tez counters are created. this.fixedCounters = new AtomicLongArray(LlapWmCounters.values().length); + this.addTaskTimeCounters = addTaskTimeCounters; } public void changeStateQueued(boolean isGuaranteed) { @@ -119,6 +123,21 @@ public class WmFragmentCounters { for (int i = 0; i < fixedCounters.length(); ++i) { tezCounters.findCounter(LlapWmCounters.values()[i]).setValue(fixedCounters.get(i)); } + + // add queue and runtime (together with task count) on a "per daemon" level + // to the Tez counters. + if (addTaskTimeCounters) { + String hostName = MetricsUtils.getHostName(); + long queued = fixedCounters.get(LlapWmCounters.GUARANTEED_QUEUED_NS.ordinal()) + + fixedCounters.get(LlapWmCounters.SPECULATIVE_QUEUED_NS.ordinal()); + long running = fixedCounters.get(LlapWmCounters.GUARANTEED_RUNNING_NS.ordinal()) + + fixedCounters.get(LlapWmCounters.SPECULATIVE_RUNNING_NS.ordinal()); + + CounterGroup cg = tezCounters.getGroup("LlapTaskRuntimeAgg by daemon"); + cg.findCounter("QueueTime-" + hostName).setValue(queued); + cg.findCounter("RunTime-" + hostName).setValue(running); + cg.findCounter("Count-" + hostName).setValue(1); + } } @Override diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 7a3ca2f..44e2fa9 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -27,12 +27,11 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.UgiFactory; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.DaemonId; import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.NotTezEventHelper; -import org.apache.hadoop.hive.llap.counters.FragmentCountersMap; -import org.apache.hadoop.hive.llap.counters.LlapWmCounters; import org.apache.hadoop.hive.llap.counters.WmFragmentCounters; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; @@ -65,7 +64,6 @@ import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.security.LlapSignerImpl; import org.apache.hadoop.hive.llap.tez.Converters; import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; -import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -73,7 +71,6 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; import org.apache.log4j.NDC; -import org.apache.tez.common.counters.TezCounters; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.TezConfiguration; @@ -142,7 +139,6 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu this.executorService = executorService; completionListener = (SchedulerFragmentCompletingListener) executorService; - // Distribute the available memory between the tasks. this.memoryPerExecutor = (long)(totalMemoryAvailableBytes / (float) numExecutors); this.metrics = metrics; @@ -280,8 +276,15 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu Configuration callableConf = new Configuration(getConfig()); UserGroupInformation fsTaskUgi = fsUgiFactory == null ? null : fsUgiFactory.createUgi(); boolean isGuaranteed = request.hasIsGuaranteed() && request.getIsGuaranteed(); + + // enable the printing of (per daemon) LLAP task queue/run times via LLAP_TASK_TIME_SUMMARY + ConfVars tezSummary = ConfVars.TEZ_EXEC_SUMMARY; + ConfVars llapTasks = ConfVars.LLAP_TASK_TIME_SUMMARY; + boolean addTaskTimes = callableConf.getBoolean(tezSummary.varname, tezSummary.defaultBoolVal) + && callableConf.getBoolean(llapTasks.varname, llapTasks.defaultBoolVal); + // TODO: ideally we'd register TezCounters here, but it seems impossible before registerTask. - WmFragmentCounters wmCounters = new WmFragmentCounters(); + WmFragmentCounters wmCounters = new WmFragmentCounters(addTaskTimes); TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, callableConf, new ExecutionContextImpl(localAddress.get().getHostName()), env, credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler, diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index aaf9674..82bb06a 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -44,7 +44,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.hadoop.hive.llap.counters.FragmentCountersMap; import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener; import org.apache.hadoop.hive.llap.daemon.impl.comparator.LlapQueueComparatorBase; @@ -54,7 +53,6 @@ import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.util.Clock; -import org.apache.tez.common.counters.TezCounters; import org.apache.tez.runtime.task.EndReason; import org.apache.tez.runtime.task.TaskRunner2Result; import org.slf4j.Logger; @@ -680,8 +678,8 @@ public class TaskExecutorService extends AbstractService LOG.info("Attempting to execute {}", taskWrapper); } TaskRunnerCallable task = taskWrapper.getTaskRunnerCallable(); - task.setWmCountersRunning(); ListenableFuture<TaskRunner2Result> future = executorService.submit(task); + task.setWmCountersRunning(); runningFragmentCount.incrementAndGet(); taskWrapper.setIsInWaitQueue(false);