[ https://issues.apache.org/jira/browse/SPARK-21395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16089106#comment-16089106 ]
Apache Spark commented on SPARK-21395: -------------------------------------- User 'debugger87' has created a pull request for this issue: https://github.com/apache/spark/pull/18649 > Spark SQL hive-thriftserver doesn't register operation log before execute sql > statement > --------------------------------------------------------------------------------------- > > Key: SPARK-21395 > URL: https://issues.apache.org/jira/browse/SPARK-21395 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.1.0, 2.1.1 > Reporter: Chaozhong Yang > > In HiveServer2, TFetchResultsReq has a member which is named as `fetchType`. > If fetchType is equal to be `1`, the thrift server should return operation > log to client. However, we found Spark SQL's thrift server always return > nothing to client for TFetchResultsReq with fetchType(1). We > have checked the > ${HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}/${session-id} directory > carefully and found that there were existed operation log files with zero > bytes(empty file). Why? Let's take a look at SQLOperation.java in Hive: > {code:java} > @Override > public void runInternal() throws HiveSQLException { > setState(OperationState.PENDING); > final HiveConf opConfig = getConfigForOperation(); > prepare(opConfig); > if (!shouldRunAsync()) { > runQuery(opConfig); > } else { > // We'll pass ThreadLocals in the background thread from the foreground > (handler) thread > final SessionState parentSessionState = SessionState.get(); > // ThreadLocal Hive object needs to be set in background thread. > // The metastore client in Hive is associated with right user. > final Hive parentHive = getSessionHive(); > // Current UGI will get used by metastore when metsatore is in embedded > mode > // So this needs to get passed to the new background thread > final UserGroupInformation currentUGI = getCurrentUGI(opConfig); > // Runnable impl to call runInternal asynchronously, > // from a different thread > Runnable backgroundOperation = new Runnable() { > @Override > public void run() { > PrivilegedExceptionAction<Object> doAsAction = new > PrivilegedExceptionAction<Object>() { > @Override > public Object run() throws HiveSQLException { > Hive.set(parentHive); > SessionState.setCurrentSessionState(parentSessionState); > // Set current OperationLog in this async thread for keeping on > saving query log. > registerCurrentOperationLog(); > try { > runQuery(opConfig); > } catch (HiveSQLException e) { > setOperationException(e); > LOG.error("Error running hive query: ", e); > } finally { > unregisterOperationLog(); > } > return null; > } > }; > try { > currentUGI.doAs(doAsAction); > } catch (Exception e) { > setOperationException(new HiveSQLException(e)); > LOG.error("Error running hive query as user : " + > currentUGI.getShortUserName(), e); > } > finally { > /** > * We'll cache the ThreadLocal RawStore object for this > background thread for an orderly cleanup > * when this thread is garbage collected later. > * @see > org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize() > */ > if (ThreadWithGarbageCleanup.currentThread() instanceof > ThreadWithGarbageCleanup) { > ThreadWithGarbageCleanup currentThread = > (ThreadWithGarbageCleanup) > ThreadWithGarbageCleanup.currentThread(); > currentThread.cacheThreadLocalRawStore(); > } > } > } > }; > try { > // This submit blocks if no background threads are available to run > this operation > Future<?> backgroundHandle = > > getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation); > setBackgroundHandle(backgroundHandle); > } catch (RejectedExecutionException rejected) { > setState(OperationState.ERROR); > throw new HiveSQLException("The background threadpool cannot accept" + > " new task for execution, please retry the operation", rejected); > } > } > } > {code} > Obviously, registerOperationLog is the key point that Hive can produce and > return operation log to client. > But, in Spark SQL, SparkExecuteStatementOperation doesn't > registerOperationLog before execute sql statement: > {code:scala} > override def runInternal(): Unit = { > setState(OperationState.PENDING) > setHasResultSet(true) // avoid no resultset for async run > if (!runInBackground) { > execute() > } else { > val sparkServiceUGI = Utils.getUGI() > // Runnable impl to call runInternal asynchronously, > // from a different thread > val backgroundOperation = new Runnable() { > override def run(): Unit = { > val doAsAction = new PrivilegedExceptionAction[Unit]() { > override def run(): Unit = { > try { > execute() > } catch { > case e: HiveSQLException => > setOperationException(e) > log.error("Error running hive query: ", e) > } > } > } > try { > sparkServiceUGI.doAs(doAsAction) > } catch { > case e: Exception => > setOperationException(new HiveSQLException(e)) > logError("Error running hive query as user : " + > sparkServiceUGI.getShortUserName(), e) > } > } > } > try { > // This submit blocks if no background threads are available to run > this operation > val backgroundHandle = > > parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation) > setBackgroundHandle(backgroundHandle) > } catch { > case rejected: RejectedExecutionException => > setState(OperationState.ERROR) > throw new HiveSQLException("The background threadpool cannot > accept" + > " new task for execution, please retry the operation", rejected) > case NonFatal(e) => > logError(s"Error executing query in background", e) > setState(OperationState.ERROR) > throw e > } > } > } > {code} > LogDrivertAppender append logOutput into operation log file depends on > current thread local operationLog: > {code:java} > @Override > protected void subAppend(LoggingEvent event) { > super.subAppend(event); > // That should've gone into our writer. Notify the LogContext. > String logOutput = writer.toString(); > writer.reset(); > OperationLog log = operationManager.getOperationLogByThread(); > if (log == null) { > LOG.debug(" ---+++=== Dropped log event from thread " + > event.getThreadName()); > return; > } > log.writeOperationLog(logOutput); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org