Repository: hive Updated Branches: refs/heads/llap b49c8dea7 -> 52b5703dd
HIVE-10942 : LLAP: expose what's running on the daemon thru JMX (Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/52b5703d Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/52b5703d Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/52b5703d Branch: refs/heads/llap Commit: 52b5703ddcfe5e98a4f89c66367ec8f0d0f2cda2 Parents: b49c8de Author: Sergey Shelukhin <ser...@apache.org> Authored: Thu Jun 4 18:52:45 2015 -0700 Committer: Sergey Shelukhin <ser...@apache.org> Committed: Thu Jun 4 18:52:45 2015 -0700 ---------------------------------------------------------------------- .../llap/daemon/impl/ContainerRunnerImpl.java | 11 ++++- .../hive/llap/daemon/impl/LlapDaemon.java | 6 +++ .../hive/llap/daemon/impl/LlapDaemonMXBean.java | 8 ++++ .../hadoop/hive/llap/daemon/impl/Scheduler.java | 3 ++ .../llap/daemon/impl/TaskExecutorService.java | 50 ++++++++++++++++++++ .../llap/daemon/impl/TaskRunnerCallable.java | 5 ++ 6 files changed, 81 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/52b5703d/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 46ec074..6f9f429 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -20,6 +20,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicReference; @@ -145,7 +146,9 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu localAddress.get().getHostName(), request.getFragmentSpec().getDagName(), request.getFragmentSpec().getVertexName(), request.getFragmentSpec().getFragmentNumber(), request.getFragmentSpec().getAttemptNumber()); - LOG.info("Queueing container for execution: " + stringifySubmitRequest(request)); + if (LOG.isInfoEnabled()) { + LOG.info("Queueing container for execution: " + stringifySubmitRequest(request)); + } // This is the start of container-annotated logging. // TODO Reduce the length of this string. Way too verbose at the moment. String ndcContextString = request.getFragmentSpec().getFragmentIdentifierString(); @@ -167,7 +170,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu fragmentSpec.getVertexName(), fragmentSpec.getFragmentNumber(), fragmentSpec.getAttemptNumber(), request.getUser(), request.getFragmentSpec()); - String []localDirs = fragmentInfo.getLocalDirs(); + String[] localDirs = fragmentInfo.getLocalDirs(); Preconditions.checkNotNull(localDirs); if (LOG.isDebugEnabled()) { @@ -301,4 +304,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu amReporter.taskKilled(amLocation, port, user, jobToken, taskAttemptId); } } + + public Set<String> getExecutorStatus() { + return executorService.getExecutorsStatus(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/52b5703d/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 5574483..7b53e63 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hive.llap.daemon.impl; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Arrays; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -314,6 +315,11 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla } @Override + public Set<String> getExecutorsStatus() { + return containerRunner.getExecutorStatus(); + } + + @Override public long getExecutorMemoryPerInstance() { return executorMemoryPerInstance; } http://git-wip-us.apache.org/repos/asf/hive/blob/52b5703d/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java index 50be5c7..d6449db 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.llap.daemon.impl; +import java.util.Set; + import javax.management.MXBean; /** @@ -50,6 +52,12 @@ public interface LlapDaemonMXBean { public String getLocalDirs(); /** + * Executor states. + * @return Executor states. + */ + public Set<String> getExecutorsStatus(); + + /** * Gets llap daemon configured executor memory per instance. * @return memory per instance */ http://git-wip-us.apache.org/repos/asf/hive/blob/52b5703d/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java index eb06a2f..1d35b10 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.llap.daemon.impl; +import java.util.Set; import java.util.concurrent.RejectedExecutionException; /** @@ -36,4 +37,6 @@ public interface Scheduler<T> { * @param fragmentId */ void killFragment(String fragmentId); + + Set<String> getExecutorsStatus(); } http://git-wip-us.apache.org/repos/asf/hive/blob/52b5703d/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index ff7fb29..4c0fb8e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -17,7 +17,12 @@ */ package org.apache.hadoop.hive.llap.daemon.impl; +import java.text.SimpleDateFormat; import java.util.Comparator; +import java.util.Date; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -33,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.service.AbstractService; import org.apache.tez.runtime.task.EndReason; import org.apache.tez.runtime.task.TaskRunner2Result; @@ -132,6 +138,50 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta shutDown(false); } + private static final ThreadLocal<SimpleDateFormat> sdf = new ThreadLocal<SimpleDateFormat>() { + @Override + protected SimpleDateFormat initialValue() { + return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + } + }; + + @Override + public Set<String> getExecutorsStatus() { + Set<String> result = new HashSet<>(); + StringBuilder value = new StringBuilder(); + for (Map.Entry<String, TaskWrapper> e : knownTasks.entrySet()) { + value.setLength(0); + value.append(e.getKey()); + TaskWrapper task = e.getValue(); + boolean isFirst = true; + TaskRunnerCallable c = task.getTaskRunnerCallable(); + if (c != null && c.getRequest() != null && c.getRequest().getFragmentSpec() != null) { + FragmentSpecProto fs = c.getRequest().getFragmentSpec(); + value.append(isFirst ? " (" : ", ").append(fs.getDagName()) + .append("/").append(fs.getVertexName()); + isFirst = false; + } + value.append(isFirst ? " (" : ", "); + if (task.isInWaitQueue()) { + value.append("in queue"); + } else if (c != null) { + long startTime = c.getStartTime(); + if (startTime != 0) { + value.append("started at ").append(sdf.get().format(new Date(startTime))); + } else { + value.append("not started"); + } + } else { + value.append("has no callable"); + } + if (task.isInPreemptionQueue()) { + value.append(", ").append("preemptable"); + } + value.append(")"); + result.add(value.toString()); + } + return result; + } /** * Worker that takes tasks from wait queue and schedule it for execution. http://git-wip-us.apache.org/repos/asf/hive/blob/52b5703d/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 1c12e12..cd6a0da 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @@ -133,6 +134,10 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { this.fragmentCompletionHanler = fragmentCompleteHandler; } + public long getStartTime() { + return startTime; + } + @Override protected TaskRunner2Result callInternal() throws Exception { isStarted.set(true);