Repository: hive
Updated Branches:
  refs/heads/llap b49c8dea7 -> 52b5703dd


HIVE-10942 : LLAP: expose what's running on the daemon thru JMX (Sergey 
Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/52b5703d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/52b5703d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/52b5703d

Branch: refs/heads/llap
Commit: 52b5703ddcfe5e98a4f89c66367ec8f0d0f2cda2
Parents: b49c8de
Author: Sergey Shelukhin <ser...@apache.org>
Authored: Thu Jun 4 18:52:45 2015 -0700
Committer: Sergey Shelukhin <ser...@apache.org>
Committed: Thu Jun 4 18:52:45 2015 -0700

----------------------------------------------------------------------
 .../llap/daemon/impl/ContainerRunnerImpl.java   | 11 ++++-
 .../hive/llap/daemon/impl/LlapDaemon.java       |  6 +++
 .../hive/llap/daemon/impl/LlapDaemonMXBean.java |  8 ++++
 .../hadoop/hive/llap/daemon/impl/Scheduler.java |  3 ++
 .../llap/daemon/impl/TaskExecutorService.java   | 50 ++++++++++++++++++++
 .../llap/daemon/impl/TaskRunnerCallable.java    |  5 ++
 6 files changed, 81 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/52b5703d/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 46ec074..6f9f429 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -20,6 +20,7 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -145,7 +146,9 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
         localAddress.get().getHostName(), 
request.getFragmentSpec().getDagName(),
         request.getFragmentSpec().getVertexName(), 
request.getFragmentSpec().getFragmentNumber(),
         request.getFragmentSpec().getAttemptNumber());
-    LOG.info("Queueing container for execution: " + 
stringifySubmitRequest(request));
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Queueing container for execution: " + 
stringifySubmitRequest(request));
+    }
     // This is the start of container-annotated logging.
     // TODO Reduce the length of this string. Way too verbose at the moment.
     String ndcContextString = 
request.getFragmentSpec().getFragmentIdentifierString();
@@ -167,7 +170,7 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
               fragmentSpec.getVertexName(), fragmentSpec.getFragmentNumber(),
               fragmentSpec.getAttemptNumber(), request.getUser(), 
request.getFragmentSpec());
 
-      String []localDirs = fragmentInfo.getLocalDirs();
+      String[] localDirs = fragmentInfo.getLocalDirs();
       Preconditions.checkNotNull(localDirs);
 
       if (LOG.isDebugEnabled()) {
@@ -301,4 +304,8 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
       amReporter.taskKilled(amLocation, port, user, jobToken, taskAttemptId);
     }
   }
+
+  public Set<String> getExecutorStatus() {
+    return executorService.getExecutorsStatus();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/52b5703d/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 5574483..7b53e63 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -17,6 +17,7 @@ package org.apache.hadoop.hive.llap.daemon.impl;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -314,6 +315,11 @@ public class LlapDaemon extends CompositeService 
implements ContainerRunner, Lla
   }
 
   @Override
+  public Set<String> getExecutorsStatus() {
+    return containerRunner.getExecutorStatus();
+  }
+
+  @Override
   public long getExecutorMemoryPerInstance() {
     return executorMemoryPerInstance;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/52b5703d/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java
index 50be5c7..d6449db 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonMXBean.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hive.llap.daemon.impl;
 
+import java.util.Set;
+
 import javax.management.MXBean;
 
 /**
@@ -50,6 +52,12 @@ public interface LlapDaemonMXBean {
   public String getLocalDirs();
 
   /**
+   * Executor states.
+   * @return Executor states.
+   */
+  public Set<String> getExecutorsStatus();
+
+  /**
    * Gets llap daemon configured executor memory per instance.
    * @return memory per instance
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/52b5703d/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
index eb06a2f..1d35b10 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.llap.daemon.impl;
 
+import java.util.Set;
 import java.util.concurrent.RejectedExecutionException;
 
 /**
@@ -36,4 +37,6 @@ public interface Scheduler<T> {
    * @param fragmentId
    */
   void killFragment(String fragmentId);
+
+  Set<String> getExecutorsStatus();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/52b5703d/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index ff7fb29..4c0fb8e 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -17,7 +17,12 @@
  */
 package org.apache.hadoop.hive.llap.daemon.impl;
 
+import java.text.SimpleDateFormat;
 import java.util.Comparator;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -33,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.tez.runtime.task.EndReason;
 import org.apache.tez.runtime.task.TaskRunner2Result;
@@ -132,6 +138,50 @@ public class TaskExecutorService extends AbstractService 
implements Scheduler<Ta
     shutDown(false);
   }
 
+  private static final ThreadLocal<SimpleDateFormat> sdf = new 
ThreadLocal<SimpleDateFormat>() {
+    @Override
+    protected SimpleDateFormat initialValue() {
+      return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    }
+  };
+
+  @Override
+  public Set<String> getExecutorsStatus() {
+    Set<String> result = new HashSet<>();
+    StringBuilder value = new StringBuilder();
+    for (Map.Entry<String, TaskWrapper> e : knownTasks.entrySet()) {
+      value.setLength(0);
+      value.append(e.getKey());
+      TaskWrapper task = e.getValue();
+      boolean isFirst = true;
+      TaskRunnerCallable c = task.getTaskRunnerCallable();
+      if (c != null && c.getRequest() != null && 
c.getRequest().getFragmentSpec() != null) {
+        FragmentSpecProto fs = c.getRequest().getFragmentSpec();
+        value.append(isFirst ? " (" : ", ").append(fs.getDagName())
+          .append("/").append(fs.getVertexName());
+        isFirst = false;
+      }
+      value.append(isFirst ? " (" : ", ");
+      if (task.isInWaitQueue()) {
+        value.append("in queue");
+      } else if (c != null) {
+        long startTime = c.getStartTime();
+        if (startTime != 0) {
+          value.append("started at ").append(sdf.get().format(new 
Date(startTime)));
+        } else {
+          value.append("not started");
+        }
+      } else {
+        value.append("has no callable");
+      }
+      if (task.isInPreemptionQueue()) {
+        value.append(", ").append("preemptable");
+      }
+      value.append(")");
+      result.add(value.toString());
+    }
+    return result;
+  }
 
   /**
    * Worker that takes tasks from wait queue and schedule it for execution.

http://git-wip-us.apache.org/repos/asf/hive/blob/52b5703d/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 1c12e12..cd6a0da 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -133,6 +134,10 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
     this.fragmentCompletionHanler = fragmentCompleteHandler;
   }
 
+  public long getStartTime() {
+    return startTime;
+  }
+
   @Override
   protected TaskRunner2Result callInternal() throws Exception {
     isStarted.set(true);

Reply via email to