Repository: hive Updated Branches: refs/heads/master a4a0ae1ff -> 91d25b48a
http://git-wip-us.apache.org/repos/asf/hive/blob/91d25b48/service/src/java/org/apache/hive/service/cli/operation/Operation.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java index 11a820f..0b27608 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -27,6 +27,7 @@ import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; @@ -35,7 +36,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.OperationLog; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationHandle; @@ -46,17 +46,12 @@ import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.session.HiveSession; import org.apache.hive.service.rpc.thrift.TProtocolVersion; -import org.apache.logging.log4j.ThreadContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Sets; public abstract class Operation { - // Constants of the key strings for the log4j ThreadContext. - public static final String SESSIONID_LOG_KEY = "sessionId"; - public static final String QUERYID_LOG_KEY = "queryId"; - protected final HiveSession parentSession; private volatile OperationState state = OperationState.INITIALIZED; private volatile MetricsScope currentStateScope; @@ -212,44 +207,9 @@ public abstract class Operation { protected void createOperationLog() { if (parentSession.isOperationLogEnabled()) { - File operationLogFile = new File(parentSession.getOperationLogSessionDir(), - opHandle.getHandleIdentifier().toString()); + File operationLogFile = new File(parentSession.getOperationLogSessionDir(), queryState.getQueryId()); isOperationLogEnabled = true; - // create log file - try { - if (operationLogFile.exists()) { - LOG.warn("The operation log file should not exist, but it is already there: " + - operationLogFile.getAbsolutePath()); - operationLogFile.delete(); - } - if (!operationLogFile.getParentFile().exists()) { - LOG.warn("Operations log directory for this session does not exist, it could have been deleted " + - "externally. Recreating the directory for future queries in this session but the older operation " + - "logs for this session are no longer available"); - if (!operationLogFile.getParentFile().mkdir()) { - LOG.warn("Log directory for this session could not be created, disabling " + - "operation logs: " + operationLogFile.getParentFile().getAbsolutePath()); - isOperationLogEnabled = false; - return; - } - } - if (!operationLogFile.createNewFile()) { - // the log file already exists and cannot be deleted. - // If it can be read/written, keep its contents and use it. - if (!operationLogFile.canRead() || !operationLogFile.canWrite()) { - LOG.warn("The already existed operation log file cannot be recreated, " + - "and it cannot be read or written: " + operationLogFile.getAbsolutePath()); - isOperationLogEnabled = false; - return; - } - } - } catch (Exception e) { - LOG.warn("Unable to create operation log file: " + operationLogFile.getAbsolutePath(), e); - isOperationLogEnabled = false; - return; - } - // create OperationLog object with above log file try { operationLog = new OperationLog(opHandle.toString(), operationLogFile, parentSession.getHiveConf()); @@ -259,15 +219,6 @@ public abstract class Operation { isOperationLogEnabled = false; return; } - - // register this operationLog to current thread - OperationLog.setCurrentOperationLog(operationLog); - } - } - - protected void unregisterOperationLog() { - if (isOperationLogEnabled) { - OperationLog.removeCurrentOperationLog(); } } @@ -277,22 +228,7 @@ public abstract class Operation { */ protected void beforeRun() { createOperationLog(); - registerLoggingContext(); - } - - /** - * Register logging context so that Log4J can print QueryId and/or SessionId for each message - */ - protected void registerLoggingContext() { - ThreadContext.put(SESSIONID_LOG_KEY, SessionState.get().getSessionId()); - ThreadContext.put(QUERYID_LOG_KEY, confOverlay.get(HiveConf.ConfVars.HIVEQUERYID.varname)); - } - - /** - * Unregister logging context - */ - protected void unregisterLoggingContext() { - ThreadContext.clearAll(); + LogUtils.registerLoggingContext(queryState.getConf()); } /** @@ -300,8 +236,7 @@ public abstract class Operation { * Clean up resources, which was set up in beforeRun(). */ protected void afterRun() { - unregisterLoggingContext(); - unregisterOperationLog(); + LogUtils.unregisterLoggingContext(); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/91d25b48/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index 3f8f68e..f62ee4e 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.ql.log.LogDivertAppender; import org.apache.hadoop.hive.ql.session.OperationLog; import org.apache.hive.service.AbstractService; import org.apache.hive.service.cli.FetchOrientation; @@ -46,11 +47,6 @@ import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.RowSetFactory; import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.session.HiveSession; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.core.Appender; -import org.apache.logging.log4j.core.LoggerContext; -import org.apache.logging.log4j.core.config.Configuration; -import org.apache.logging.log4j.core.config.LoggerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,12 +70,8 @@ public class OperationManager extends AbstractService { @Override public synchronized void init(HiveConf hiveConf) { - if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) { - initOperationLogCapture(hiveConf.getVar( - HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL)); - } else { - LOG.debug("Operation level logging is turned off"); - } + LogDivertAppender.registerRoutingAppender(hiveConf); + if (hiveConf.isWebUiQueryInfoCacheEnabled()) { historicSqlOperations = new SQLOperationDisplayCache( hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_MAX_HISTORIC_QUERIES)); @@ -97,17 +89,6 @@ public class OperationManager extends AbstractService { super.stop(); } - private void initOperationLogCapture(String loggingMode) { - // Register another Appender (with the same layout) that talks to us. - Appender ap = LogDivertAppender.createInstance(this, OperationLog.getLoggingLevel(loggingMode)); - LoggerContext context = (LoggerContext) LogManager.getContext(false); - Configuration configuration = context.getConfiguration(); - LoggerConfig loggerConfig = configuration.getLoggerConfig(LoggerFactory.getLogger(getClass()).getName()); - loggerConfig.addAppender(ap, null, null); - context.updateLoggers(); - ap.start(); - } - public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession, String statement, Map<String, String> confOverlay, boolean runAsync, long queryTimeout) throws HiveSQLException { @@ -361,11 +342,6 @@ public class OperationManager extends AbstractService { return Collections.unmodifiableCollection(handleToOperation.values()); } - - public OperationLog getOperationLogByThread() { - return OperationLog.getCurrentOperationLog(); - } - public List<Operation> removeExpiredOperations(OperationHandle[] handles) { List<Operation> removed = new ArrayList<Operation>(); for (OperationHandle handle : handles) { http://git-wip-us.apache.org/repos/asf/hive/blob/91d25b48/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index f41092e..04fc0a1 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -40,6 +40,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang3.CharEncoding; +import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; @@ -338,9 +339,7 @@ public class SQLOperation extends ExecuteStatementOperation { // TODO: can this result in cross-thread reuse of session state? SessionState.setCurrentSessionState(parentSessionState); PerfLogger.setPerfLogger(parentPerfLogger); - // Set current OperationLog in this async thread for keeping on saving query log. - registerCurrentOperationLog(); - registerLoggingContext(); + LogUtils.registerLoggingContext(queryState.getConf()); try { if (asyncPrepare) { prepare(queryState); @@ -351,8 +350,7 @@ public class SQLOperation extends ExecuteStatementOperation { setOperationException(e); LOG.error("Error running hive query: ", e); } finally { - unregisterLoggingContext(); - unregisterOperationLog(); + LogUtils.unregisterLoggingContext(); } return null; } @@ -393,18 +391,6 @@ public class SQLOperation extends ExecuteStatementOperation { } } - private void registerCurrentOperationLog() { - if (isOperationLogEnabled) { - if (operationLog == null) { - LOG.warn("Failed to get current OperationLog object of Operation: " + - getHandle().getHandleIdentifier()); - isOperationLogEnabled = false; - return; - } - OperationLog.setCurrentOperationLog(operationLog); - } - } - private synchronized void cleanup(OperationState state) throws HiveSQLException { setState(state);