Author: sseth
Date: Mon Mar  9 17:03:59 2015
New Revision: 1665305

URL: http://svn.apache.org/r1665305
Log:
HIVE-9807. LLAP: Add event logging for execution elements. (Siddharth Seth)

Added:
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java
Modified:
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
    
hive/branches/llap/llap-server/src/test/resources/llap-daemon-log4j.properties

Added: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java?rev=1665305&view=auto
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java
 (added)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java
 Mon Mar  9 17:03:59 2015
@@ -0,0 +1,148 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon;
+
+import org.apache.log4j.Logger;
+
+public class HistoryLogger {
+
+
+  private static final String HISTORY_EVENT_TYPE = "Event";
+  private static final String HISTORY_APPLICATION_ID = "ApplicationId";
+  private static final String HISTORY_CONTAINER_ID = "ContainerId";
+  private static final String HISTORY_SUBMIT_TIME = "SubmitTime";
+  private static final String HISTORY_START_TIME = "StartTime";
+  private static final String HISTORY_END_TIME = "EndTime";
+  private static final String HISTORY_DAG_NAME = "DagName";
+  private static final String HISTORY_VERTEX_NAME = "VertexName";
+  private static final String HISTORY_TASK_ID = "TaskId";
+  private static final String HISTORY_ATTEMPT_ID = "TaskAttemptId";
+  private static final String HISTORY_HOSTNAME = "HostName";
+  private static final String HISTORY_SUCCEEDED = "Succeeded";
+
+  private static final String EVENT_TYPE_FRAGMENT_START = "FRAGMENT_START";
+  private static final String EVENT_TYPE_FRAGMENT_END = "FRAGMENT_END";
+
+  private static final Logger HISTORY_LOGGER = 
Logger.getLogger(HistoryLogger.class);
+
+  public static void logFragmentStart(String applicationIdStr, String 
containerIdStr,
+                                      String hostname,
+                                      String dagName, String vertexName, int 
taskId,
+                                      int attemptId) {
+    HISTORY_LOGGER.info(
+        constructFragmentStartString(applicationIdStr, containerIdStr, 
hostname, dagName,
+            vertexName, taskId, attemptId));
+  }
+
+  public static void logFragmentEnd(String applicationIdStr, String 
containerIdStr, String hostname,
+                                    String dagName, String vertexName, int 
taskId, int attemptId,
+                                    long startTime, boolean failed) {
+    HISTORY_LOGGER.info(constructFragmentEndString(applicationIdStr, 
containerIdStr, hostname,
+        dagName, vertexName, taskId, attemptId, startTime, failed));
+  }
+
+
+  private static String constructFragmentStartString(String applicationIdStr, 
String containerIdStr,
+                                                     String hostname, String 
dagName,
+                                                     String vertexName, int 
taskId, int attemptId) {
+    HistoryLineBuilder lb = new HistoryLineBuilder(EVENT_TYPE_FRAGMENT_START);
+    lb.addHostName(hostname);
+    lb.addAppid(applicationIdStr);
+    lb.addContainerId(containerIdStr);
+    lb.addDagName(dagName);
+    lb.addVertexName(vertexName);
+    lb.addTaskId(taskId);
+    lb.addTaskAttemptId(attemptId);
+    lb.addTime(HISTORY_SUBMIT_TIME);
+    return lb.toString();
+  }
+
+  private static String constructFragmentEndString(String applicationIdStr, 
String containerIdStr,
+                                                   String hostname, String 
dagName,
+                                                   String vertexName, int 
taskId, int attemptId,
+                                                   long startTime, boolean 
succeeded) {
+    HistoryLineBuilder lb = new HistoryLineBuilder(EVENT_TYPE_FRAGMENT_END);
+    lb.addHostName(hostname);
+    lb.addAppid(applicationIdStr);
+    lb.addContainerId(containerIdStr);
+    lb.addDagName(dagName);
+    lb.addVertexName(vertexName);
+    lb.addTaskId(taskId);
+    lb.addTaskAttemptId(attemptId);
+    lb.addSuccessStatus(succeeded);
+    lb.addTime(HISTORY_START_TIME, startTime);
+    lb.addTime(HISTORY_END_TIME);
+    return lb.toString();
+  }
+
+  private static class HistoryLineBuilder {
+    private final StringBuilder sb = new StringBuilder();
+
+    HistoryLineBuilder(String eventType) {
+      sb.append(HISTORY_EVENT_TYPE).append("=").append(eventType);
+    }
+
+    HistoryLineBuilder addHostName(String hostname) {
+      return setKeyValue(HISTORY_HOSTNAME, hostname);
+    }
+
+    HistoryLineBuilder addAppid(String appId) {
+      return setKeyValue(HISTORY_APPLICATION_ID, appId);
+    }
+
+    HistoryLineBuilder addContainerId(String containerId) {
+      return setKeyValue(HISTORY_CONTAINER_ID, containerId);
+    }
+
+    HistoryLineBuilder addDagName(String dagName) {
+      return setKeyValue(HISTORY_DAG_NAME, dagName);
+    }
+
+    HistoryLineBuilder addVertexName(String vertexName) {
+      return setKeyValue(HISTORY_VERTEX_NAME, vertexName);
+    }
+
+    HistoryLineBuilder addTaskId(int taskId) {
+      return setKeyValue(HISTORY_TASK_ID, String.valueOf(taskId));
+    }
+
+    HistoryLineBuilder addTaskAttemptId(int attemptId) {
+      return setKeyValue(HISTORY_ATTEMPT_ID, String.valueOf(attemptId));
+    }
+
+    HistoryLineBuilder addTime(String timeParam, long millis) {
+      return setKeyValue(timeParam, String.valueOf(millis));
+    }
+
+    HistoryLineBuilder addTime(String timeParam) {
+      return setKeyValue(timeParam, 
String.valueOf(System.currentTimeMillis()));
+    }
+
+    HistoryLineBuilder addSuccessStatus(boolean status) {
+      return setKeyValue(HISTORY_SUCCEEDED, String.valueOf(status));
+    }
+
+
+    private HistoryLineBuilder setKeyValue(String key, String value) {
+      sb.append(", ").append(key).append("=").append(value);
+      return this;
+    }
+
+    @Override
+    public String toString() {
+      return sb.toString();
+    }
+  }
+}

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java?rev=1665305&r1=1665304&r2=1665305&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
 Mon Mar  9 17:03:59 2015
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.CallableWithNdc;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
+import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RunContainerRequestProto;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
 import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
@@ -129,7 +130,9 @@ public class ContainerRunnerImpl extends
 
   @Override
   public void queueContainer(RunContainerRequestProto request) throws 
IOException {
-    LOG.info("Queing container for execution: " + request);
+    HistoryLogger.logFragmentStart(request.getApplicationIdString(), 
request.getContainerIdString(),
+        localAddress.get().getHostName(), null, null, -1, -1);
+    LOG.info("Queuing container for execution: " + request);
     // This is the start of container-annotated logging.
     NDC.push(request.getContainerIdString());
     try {
@@ -169,7 +172,7 @@ public class ContainerRunnerImpl extends
 
       ContainerRunnerCallable callable = new ContainerRunnerCallable(request, 
new Configuration(getConfig()),
           new ExecutionContextImpl(localAddress.get().getHostName()), env, 
localDirs,
-          workingDir, credentials, memoryPerExecutor);
+          workingDir, credentials, memoryPerExecutor, 
localAddress.get().getHostName());
       ListenableFuture<ContainerExecutionResult> future = executorService
           .submit(callable);
       Futures.addCallback(future, new ContainerRunnerCallback(request, 
callable));
@@ -194,12 +197,14 @@ public class ContainerRunnerImpl extends
     private final Credentials credentials;
     private final long memoryAvailable;
     private volatile TezChild tezChild;
+    private final String localHostname;
+    private volatile long startTime;
 
 
     ContainerRunnerCallable(RunContainerRequestProto request, Configuration 
conf,
                             ExecutionContext executionContext, Map<String, 
String> envMap,
                             String[] localDirs, String workingDir, Credentials 
credentials,
-                            long memoryAvailable) {
+                            long memoryAvailable, String localHostName) {
       this.request = request;
       this.conf = conf;
       this.executionContext = executionContext;
@@ -209,11 +214,13 @@ public class ContainerRunnerImpl extends
       this.objectRegistry = new ObjectRegistryImpl();
       this.credentials = credentials;
       this.memoryAvailable = memoryAvailable;
+      this.localHostname = localHostName;
 
     }
 
     @Override
     protected ContainerExecutionResult callInternal() throws Exception {
+      this.startTime = System.currentTimeMillis();
       Stopwatch sw = new Stopwatch().start();
       tezChild =
           new TezChild(conf, request.getAmHost(), request.getAmPort(),
@@ -270,6 +277,11 @@ public class ContainerRunnerImpl extends
           metrics.incrExecutorTotalAskedToDie();
           break;
       }
+      HistoryLogger
+          .logFragmentEnd(request.getApplicationIdString(),
+              request.getContainerIdString(),
+              localAddress.get().getHostName(), null, null, -1, -1,
+              containerRunnerCallable.startTime, true);
       metrics.decrExecutorNumQueuedRequests();
     }
 
@@ -282,6 +294,11 @@ public class ContainerRunnerImpl extends
       if (tezChild != null) {
         tezChild.shutdown();
       }
+      HistoryLogger
+          .logFragmentEnd(request.getApplicationIdString(),
+              request.getContainerIdString(),
+              localAddress.get().getHostName(), null, null, -1, -1,
+              containerRunnerCallable.startTime, false);
       metrics.decrExecutorNumQueuedRequests();
     }
   }

Modified: 
hive/branches/llap/llap-server/src/test/resources/llap-daemon-log4j.properties
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/resources/llap-daemon-log4j.properties?rev=1665305&r1=1665304&r2=1665305&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/test/resources/llap-daemon-log4j.properties 
(original)
+++ 
hive/branches/llap/llap-server/src/test/resources/llap-daemon-log4j.properties 
Mon Mar  9 17:03:59 2015
@@ -19,27 +19,43 @@ llap.daemon.root.logger=INFO,console
 llap.daemon.log.dir=.
 llap.daemon.log.file=llapdaemon.log
 
+llap.daemon.historylog.file=llapdaemon_history.log
+log4j.logger.org.apache.hadoop.hive.llap.daemon.HistoryLogger=INFO,HISTORYAPPENDER
+
 # Define the root logger to the system property "llap.daemon.root.logger".
 log4j.rootLogger=${llap.daemon.root.logger}
 
 # Logging Threshold
 log4j.threshold=ALL
 
+
 # Null Appender
 log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender
 
-#
+
+
+# History Events appender
+log4j.appender.HISTORYAPPENDER=org.apache.log4j.RollingFileAppender
+log4j.appender.HISTORYAPPENDER.File=${llap.daemon.log.dir}/${llap.daemon.historylog.file}
+log4j.appender.HISTORYAPPENDER.MaxFileSize=${llap.daemon.log.maxfilesize}
+log4j.appender.HISTORYAPPENDER.MaxBackupIndex=${llap.daemon.log.maxbackupindex}
+log4j.appender.HISTORYAPPENDER.layout=org.apache.log4j.EnhancedPatternLayout
+log4j.appender.HISTORYAPPENDER.layout.ConversionPattern=%m%n
+
+
+
 # Rolling File Appender - cap space usage at 5gb.
 #
 llap.daemon.log.maxfilesize=256MB
 llap.daemon.log.maxbackupindex=20
 log4j.appender.RFA=org.apache.log4j.RollingFileAppender
 log4j.appender.RFA.File=${llap.daemon.log.dir}/${llap.daemon.log.file}
+log4j.appender.RFA.Append=true
 
 log4j.appender.RFA.MaxFileSize=${llap.daemon.log.maxfilesize}
 log4j.appender.RFA.MaxBackupIndex=${llap.daemon.log.maxbackupindex}
 
-log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout=org.apache.log4j.EnhancedPatternLayout
 
 # Pattern format: Date LogLevel LoggerName LogMessage
 log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t(%x)] %p %c: %m%n
@@ -59,7 +75,7 @@ log4j.appender.DRFA.DatePattern=.yyyy-MM
 
 # 30-day backup
 #log4j.appender.DRFA.MaxBackupIndex=30
-log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.DRFA.layout=org.apache.log4j.EnhancedPatternLayout
 
 # Pattern format: Date LogLevel LoggerName LogMessage
 log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} [%t(%x)] %p %c: %m%n
@@ -74,5 +90,5 @@ log4j.appender.DRFA.layout.ConversionPat
 
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout=org.apache.log4j.EnhancedPatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} [%t(%x)] 
%p %c{2} : %m%n


Reply via email to