Author: sseth Date: Mon Mar 9 17:03:59 2015 New Revision: 1665305 URL: http://svn.apache.org/r1665305 Log: HIVE-9807. LLAP: Add event logging for execution elements. (Siddharth Seth)
Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java hive/branches/llap/llap-server/src/test/resources/llap-daemon-log4j.properties Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java?rev=1665305&view=auto ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java (added) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java Mon Mar 9 17:03:59 2015 @@ -0,0 +1,148 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon; + +import org.apache.log4j.Logger; + +public class HistoryLogger { + + + private static final String HISTORY_EVENT_TYPE = "Event"; + private static final String HISTORY_APPLICATION_ID = "ApplicationId"; + private static final String HISTORY_CONTAINER_ID = "ContainerId"; + private static final String HISTORY_SUBMIT_TIME = "SubmitTime"; + private static final String HISTORY_START_TIME = "StartTime"; + private static final String HISTORY_END_TIME = "EndTime"; + private static final String HISTORY_DAG_NAME = "DagName"; + private static final String HISTORY_VERTEX_NAME = "VertexName"; + private static final String HISTORY_TASK_ID = "TaskId"; + private static final String HISTORY_ATTEMPT_ID = "TaskAttemptId"; + private static final String HISTORY_HOSTNAME = "HostName"; + private static final String HISTORY_SUCCEEDED = "Succeeded"; + + private static final String EVENT_TYPE_FRAGMENT_START = "FRAGMENT_START"; + private static final String EVENT_TYPE_FRAGMENT_END = "FRAGMENT_END"; + + private static final Logger HISTORY_LOGGER = Logger.getLogger(HistoryLogger.class); + + public static void logFragmentStart(String applicationIdStr, String containerIdStr, + String hostname, + String dagName, String vertexName, int taskId, + int attemptId) { + HISTORY_LOGGER.info( + constructFragmentStartString(applicationIdStr, containerIdStr, hostname, dagName, + vertexName, taskId, attemptId)); + } + + public static void logFragmentEnd(String applicationIdStr, String containerIdStr, String hostname, + String dagName, String vertexName, int taskId, int attemptId, + long startTime, boolean failed) { + HISTORY_LOGGER.info(constructFragmentEndString(applicationIdStr, containerIdStr, hostname, + dagName, vertexName, taskId, attemptId, startTime, failed)); + } + + + private static String constructFragmentStartString(String applicationIdStr, String containerIdStr, + String hostname, String dagName, + String vertexName, int taskId, int attemptId) { + HistoryLineBuilder lb = new HistoryLineBuilder(EVENT_TYPE_FRAGMENT_START); + lb.addHostName(hostname); + lb.addAppid(applicationIdStr); + lb.addContainerId(containerIdStr); + lb.addDagName(dagName); + lb.addVertexName(vertexName); + lb.addTaskId(taskId); + lb.addTaskAttemptId(attemptId); + lb.addTime(HISTORY_SUBMIT_TIME); + return lb.toString(); + } + + private static String constructFragmentEndString(String applicationIdStr, String containerIdStr, + String hostname, String dagName, + String vertexName, int taskId, int attemptId, + long startTime, boolean succeeded) { + HistoryLineBuilder lb = new HistoryLineBuilder(EVENT_TYPE_FRAGMENT_END); + lb.addHostName(hostname); + lb.addAppid(applicationIdStr); + lb.addContainerId(containerIdStr); + lb.addDagName(dagName); + lb.addVertexName(vertexName); + lb.addTaskId(taskId); + lb.addTaskAttemptId(attemptId); + lb.addSuccessStatus(succeeded); + lb.addTime(HISTORY_START_TIME, startTime); + lb.addTime(HISTORY_END_TIME); + return lb.toString(); + } + + private static class HistoryLineBuilder { + private final StringBuilder sb = new StringBuilder(); + + HistoryLineBuilder(String eventType) { + sb.append(HISTORY_EVENT_TYPE).append("=").append(eventType); + } + + HistoryLineBuilder addHostName(String hostname) { + return setKeyValue(HISTORY_HOSTNAME, hostname); + } + + HistoryLineBuilder addAppid(String appId) { + return setKeyValue(HISTORY_APPLICATION_ID, appId); + } + + HistoryLineBuilder addContainerId(String containerId) { + return setKeyValue(HISTORY_CONTAINER_ID, containerId); + } + + HistoryLineBuilder addDagName(String dagName) { + return setKeyValue(HISTORY_DAG_NAME, dagName); + } + + HistoryLineBuilder addVertexName(String vertexName) { + return setKeyValue(HISTORY_VERTEX_NAME, vertexName); + } + + HistoryLineBuilder addTaskId(int taskId) { + return setKeyValue(HISTORY_TASK_ID, String.valueOf(taskId)); + } + + HistoryLineBuilder addTaskAttemptId(int attemptId) { + return setKeyValue(HISTORY_ATTEMPT_ID, String.valueOf(attemptId)); + } + + HistoryLineBuilder addTime(String timeParam, long millis) { + return setKeyValue(timeParam, String.valueOf(millis)); + } + + HistoryLineBuilder addTime(String timeParam) { + return setKeyValue(timeParam, String.valueOf(System.currentTimeMillis())); + } + + HistoryLineBuilder addSuccessStatus(boolean status) { + return setKeyValue(HISTORY_SUCCEEDED, String.valueOf(status)); + } + + + private HistoryLineBuilder setKeyValue(String key, String value) { + sb.append(", ").append(key).append("=").append(value); + return this; + } + + @Override + public String toString() { + return sb.toString(); + } + } +} Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java?rev=1665305&r1=1665304&r2=1665305&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java Mon Mar 9 17:03:59 2015 @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.CallableWithNdc; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; +import org.apache.hadoop.hive.llap.daemon.HistoryLogger; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RunContainerRequestProto; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; @@ -129,7 +130,9 @@ public class ContainerRunnerImpl extends @Override public void queueContainer(RunContainerRequestProto request) throws IOException { - LOG.info("Queing container for execution: " + request); + HistoryLogger.logFragmentStart(request.getApplicationIdString(), request.getContainerIdString(), + localAddress.get().getHostName(), null, null, -1, -1); + LOG.info("Queuing container for execution: " + request); // This is the start of container-annotated logging. NDC.push(request.getContainerIdString()); try { @@ -169,7 +172,7 @@ public class ContainerRunnerImpl extends ContainerRunnerCallable callable = new ContainerRunnerCallable(request, new Configuration(getConfig()), new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs, - workingDir, credentials, memoryPerExecutor); + workingDir, credentials, memoryPerExecutor, localAddress.get().getHostName()); ListenableFuture<ContainerExecutionResult> future = executorService .submit(callable); Futures.addCallback(future, new ContainerRunnerCallback(request, callable)); @@ -194,12 +197,14 @@ public class ContainerRunnerImpl extends private final Credentials credentials; private final long memoryAvailable; private volatile TezChild tezChild; + private final String localHostname; + private volatile long startTime; ContainerRunnerCallable(RunContainerRequestProto request, Configuration conf, ExecutionContext executionContext, Map<String, String> envMap, String[] localDirs, String workingDir, Credentials credentials, - long memoryAvailable) { + long memoryAvailable, String localHostName) { this.request = request; this.conf = conf; this.executionContext = executionContext; @@ -209,11 +214,13 @@ public class ContainerRunnerImpl extends this.objectRegistry = new ObjectRegistryImpl(); this.credentials = credentials; this.memoryAvailable = memoryAvailable; + this.localHostname = localHostName; } @Override protected ContainerExecutionResult callInternal() throws Exception { + this.startTime = System.currentTimeMillis(); Stopwatch sw = new Stopwatch().start(); tezChild = new TezChild(conf, request.getAmHost(), request.getAmPort(), @@ -270,6 +277,11 @@ public class ContainerRunnerImpl extends metrics.incrExecutorTotalAskedToDie(); break; } + HistoryLogger + .logFragmentEnd(request.getApplicationIdString(), + request.getContainerIdString(), + localAddress.get().getHostName(), null, null, -1, -1, + containerRunnerCallable.startTime, true); metrics.decrExecutorNumQueuedRequests(); } @@ -282,6 +294,11 @@ public class ContainerRunnerImpl extends if (tezChild != null) { tezChild.shutdown(); } + HistoryLogger + .logFragmentEnd(request.getApplicationIdString(), + request.getContainerIdString(), + localAddress.get().getHostName(), null, null, -1, -1, + containerRunnerCallable.startTime, false); metrics.decrExecutorNumQueuedRequests(); } } Modified: hive/branches/llap/llap-server/src/test/resources/llap-daemon-log4j.properties URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/resources/llap-daemon-log4j.properties?rev=1665305&r1=1665304&r2=1665305&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/test/resources/llap-daemon-log4j.properties (original) +++ hive/branches/llap/llap-server/src/test/resources/llap-daemon-log4j.properties Mon Mar 9 17:03:59 2015 @@ -19,27 +19,43 @@ llap.daemon.root.logger=INFO,console llap.daemon.log.dir=. llap.daemon.log.file=llapdaemon.log +llap.daemon.historylog.file=llapdaemon_history.log +log4j.logger.org.apache.hadoop.hive.llap.daemon.HistoryLogger=INFO,HISTORYAPPENDER + # Define the root logger to the system property "llap.daemon.root.logger". log4j.rootLogger=${llap.daemon.root.logger} # Logging Threshold log4j.threshold=ALL + # Null Appender log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender -# + + +# History Events appender +log4j.appender.HISTORYAPPENDER=org.apache.log4j.RollingFileAppender +log4j.appender.HISTORYAPPENDER.File=${llap.daemon.log.dir}/${llap.daemon.historylog.file} +log4j.appender.HISTORYAPPENDER.MaxFileSize=${llap.daemon.log.maxfilesize} +log4j.appender.HISTORYAPPENDER.MaxBackupIndex=${llap.daemon.log.maxbackupindex} +log4j.appender.HISTORYAPPENDER.layout=org.apache.log4j.EnhancedPatternLayout +log4j.appender.HISTORYAPPENDER.layout.ConversionPattern=%m%n + + + # Rolling File Appender - cap space usage at 5gb. # llap.daemon.log.maxfilesize=256MB llap.daemon.log.maxbackupindex=20 log4j.appender.RFA=org.apache.log4j.RollingFileAppender log4j.appender.RFA.File=${llap.daemon.log.dir}/${llap.daemon.log.file} +log4j.appender.RFA.Append=true log4j.appender.RFA.MaxFileSize=${llap.daemon.log.maxfilesize} log4j.appender.RFA.MaxBackupIndex=${llap.daemon.log.maxbackupindex} -log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout=org.apache.log4j.EnhancedPatternLayout # Pattern format: Date LogLevel LoggerName LogMessage log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t(%x)] %p %c: %m%n @@ -59,7 +75,7 @@ log4j.appender.DRFA.DatePattern=.yyyy-MM # 30-day backup #log4j.appender.DRFA.MaxBackupIndex=30 -log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout +log4j.appender.DRFA.layout=org.apache.log4j.EnhancedPatternLayout # Pattern format: Date LogLevel LoggerName LogMessage log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} [%t(%x)] %p %c: %m%n @@ -74,5 +90,5 @@ log4j.appender.DRFA.layout.ConversionPat log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err -log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout=org.apache.log4j.EnhancedPatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} [%t(%x)] %p %c{2} : %m%n