Repository: hive Updated Branches: refs/heads/master 67499f41a -> c5539a1b1
HIVE-18063: Make CommandProcessorResponse an exception instead of a return class (Zoltan Haindrich, reviewed by Ashutosh Chauhan) Signed-off-by: Zoltan Haindrich <k...@rxd.hu> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c5539a1b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c5539a1b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c5539a1b Branch: refs/heads/master Commit: c5539a1b19e603818011cb49d11def0e51f161ce Parents: 67499f4 Author: Zoltan Haindrich <k...@rxd.hu> Authored: Tue Nov 21 20:27:38 2017 +0100 Committer: Zoltan Haindrich <k...@rxd.hu> Committed: Wed Nov 22 09:00:33 2017 +0100 ---------------------------------------------------------------------- .../apache/hadoop/hive/ql/TestAcidOnTez.java | 4 +- .../java/org/apache/hadoop/hive/ql/Driver.java | 242 +++++++++---------- .../apache/hadoop/hive/ql/exec/Utilities.java | 11 +- .../hive/ql/io/CombineHiveInputFormat.java | 7 +- .../zookeeper/ZooKeeperHiveLockManager.java | 3 +- .../ql/processors/CommandProcessorResponse.java | 2 +- .../ql/udf/generic/GenericUDTFGetSplits.java | 13 +- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 14 +- .../hadoop/hive/ql/TxnCommandsBaseForTests.java | 2 +- .../hive/ql/lockmgr/TestDummyTxnManager.java | 7 +- 10 files changed, 145 insertions(+), 160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c5539a1b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java index 65a1ed1..26377e0 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java @@ -150,8 +150,8 @@ public class TestAcidOnTez { try { if (d != null) { dropTables(); - d.destroy(); d.close(); + d.destroy(); d = null; } TxnDbUtil.cleanDb(hiveConf); @@ -785,8 +785,8 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree ~/dev/hiverwgit/itests/h ss.close(); } if (d != null) { - d.destroy(); d.close(); + d.destroy(); } SessionState.start(conf); http://git-wip-us.apache.org/repos/asf/hive/blob/c5539a1b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index af9f193..389a1a6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; import com.google.common.annotations.VisibleForTesting; @@ -188,15 +189,12 @@ public class Driver implements CommandProcessor { // either initTxnMgr or from the SessionState, in that order. private HiveTxnManager queryTxnMgr; - public enum DriverState { + private enum DriverState { INITIALIZED, COMPILING, COMPILED, EXECUTING, EXECUTED, - // a state that the driver enters after close() has been called to interrupt its running - // query in the query cancellation - INTERRUPT, // a state that the driver enters after close() has been called to clean the query results // and release the resources after the query has been executed CLOSED, @@ -210,6 +208,7 @@ public class Driver implements CommandProcessor { // resource releases public final ReentrantLock stateLock = new ReentrantLock(); public DriverState driverState = DriverState.INITIALIZED; + public AtomicBoolean aborted = new AtomicBoolean(); private static ThreadLocal<LockedDriverState> lds = new ThreadLocal<LockedDriverState>() { @Override protected LockedDriverState initialValue() { @@ -230,6 +229,19 @@ public class Driver implements CommandProcessor { lds.remove(); } } + + public boolean isAborted() { + return aborted.get(); + } + + public void abort() { + aborted.set(true); + } + + @Override + public String toString() { + return String.format("%s(aborted:%s)", driverState, aborted.get()); + } } private boolean checkConcurrency() { @@ -430,13 +442,18 @@ public class Driver implements CommandProcessor { * @return 0 for ok */ public int compile(String command, boolean resetTaskIds) { - return compile(command, resetTaskIds, false); + try { + compile(command, resetTaskIds, false); + return 0; + } catch (CommandProcessorResponse cpr) { + return cpr.getErrorCode(); + } } // deferClose indicates if the close/destroy should be deferred when the process has been // interrupted, it should be set to true if the compile is called within another method like // runInternal, which defers the close to the called in that method. - private int compile(String command, boolean resetTaskIds, boolean deferClose) { + private void compile(String command, boolean resetTaskIds, boolean deferClose) throws CommandProcessorResponse { PerfLogger perfLogger = SessionState.getPerfLogger(true); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE); @@ -463,9 +480,7 @@ public class Driver implements CommandProcessor { LOG.warn("WARNING! Query command could not be redacted." + e); } - if (isInterrupted()) { - return handleInterruption("at beginning of compilation."); //indicate if need clean resource - } + checkInterrupted("at beginning of compilation.", null, null); if (ctx != null && ctx.getExplainAnalyze() != AnalyzeState.RUNNING) { // close the existing ctx etc before compiling a new query, but does not destroy driver @@ -522,9 +537,8 @@ public class Driver implements CommandProcessor { }; ShutdownHookManager.addShutdownHook(shutdownRunner, SHUTDOWN_HOOK_PRIORITY); - if (isInterrupted()) { - return handleInterruption("before parsing and analysing the query"); - } + checkInterrupted("before parsing and analysing the query", null, null); + if (ctx == null) { ctx = new Context(conf); setTriggerContext(queryId); @@ -567,7 +581,7 @@ public class Driver implements CommandProcessor { String userFromUGI = getUserFromUGI(); if (!queryTxnMgr.isTxnOpen()) { if(userFromUGI == null) { - return 10; + throw createProcessorResponse(10); } long txnid = queryTxnMgr.openTxn(ctx, userFromUGI); } @@ -597,9 +611,7 @@ public class Driver implements CommandProcessor { sem.validate(); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE); - if (isInterrupted()) { - return handleInterruption("after analyzing query."); - } + checkInterrupted("after analyzing query.", null, null); // get the output schema schema = getSchema(sem, conf); @@ -628,7 +640,7 @@ public class Driver implements CommandProcessor { + ". Use SHOW GRANT to get more details."); errorMessage = authExp.getMessage(); SQLState = "42000"; - return 403; + throw createProcessorResponse(403); } finally { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DO_AUTHORIZATION); } @@ -644,11 +656,10 @@ public class Driver implements CommandProcessor { } } } - return 0; + } catch (CommandProcessorResponse cpr) { + throw cpr; } catch (Exception e) { - if (isInterrupted()) { - return handleInterruption("during query compilation: " + e.getMessage()); - } + checkInterrupted("during query compilation: " + e.getMessage(), null, null); compileError = true; ErrorMsg error = ErrorMsg.getErrorMsg(e.getMessage()); @@ -672,8 +683,7 @@ public class Driver implements CommandProcessor { downstreamError = e; console.printError(errorMessage, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); - return error.getErrorCode();//todo: this is bad if returned as cmd shell exit - // since it exceeds valid range of shell return values + throw createProcessorResponse(error.getErrorCode()); } finally { // Trigger post compilation hook. Note that if the compilation fails here then // before/after execution hook will never be executed. @@ -689,7 +699,7 @@ public class Driver implements CommandProcessor { ImmutableMap<String, Long> compileHMSTimings = dumpMetaCallTimingWithoutEx("compilation"); queryDisplay.setHmsTimings(QueryDisplay.Phase.COMPILATION, compileHMSTimings); - boolean isInterrupted = isInterrupted(); + boolean isInterrupted = lDrvState.isAborted(); if (isInterrupted && !deferClose) { closeInProcess(true); } @@ -760,9 +770,6 @@ public class Driver implements CommandProcessor { } return shouldOpenImplicitTxn; } - private int handleInterruption(String msg) { - return handleInterruptionWithHook(msg, null, null); - } private int handleInterruptionWithHook(String msg, HookContext hookContext, PerfLogger perfLogger) { @@ -779,16 +786,9 @@ public class Driver implements CommandProcessor { return 1000; } - private boolean isInterrupted() { - lDrvState.stateLock.lock(); - try { - if (lDrvState.driverState == DriverState.INTERRUPT) { - return true; - } else { - return false; - } - } finally { - lDrvState.stateLock.unlock(); + private void checkInterrupted(String msg, HookContext hookContext, PerfLogger perfLogger) throws CommandProcessorResponse { + if (lDrvState.isAborted()) { + throw createProcessorResponse(handleInterruptionWithHook(msg, hookContext, perfLogger)); } } @@ -1223,8 +1223,9 @@ public class Driver implements CommandProcessor { * * This method also records the list of valid transactions. This must be done after any * transactions have been opened. + * @throws CommandProcessorResponse **/ - private int acquireLocks() { + private void acquireLocks() throws CommandProcessorResponse { PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ACQUIRE_READ_WRITE_LOCKS); @@ -1233,12 +1234,12 @@ public class Driver implements CommandProcessor { acid txn manager requires all locks to be associated with a txn so if we end up here w/o an open txn it's because we are processing something like "use <database> which by definition needs no locks*/ - return 0; + return; } try { String userFromUGI = getUserFromUGI(); if(userFromUGI == null) { - return 10; + throw createProcessorResponse(10); } // Set the transaction id in all of the acid file sinks if (haveAcidWrite()) { @@ -1260,14 +1261,13 @@ public class Driver implements CommandProcessor { if(queryTxnMgr.recordSnapshot(plan)) { recordValidTxns(queryTxnMgr); } - return 0; } catch (Exception e) { errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage(); SQLState = ErrorMsg.findSQLState(e.getMessage()); downstreamError = e; console.printError(errorMessage, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); - return 10; + throw createProcessorResponse(10); } finally { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ACQUIRE_READ_WRITE_LOCKS); } @@ -1353,11 +1353,12 @@ public class Driver implements CommandProcessor { public CommandProcessorResponse run(String command, boolean alreadyCompiled) throws CommandNeedRetryException { - CommandProcessorResponse cpr = runInternal(command, alreadyCompiled); - if(cpr.getResponseCode() == 0) { - return cpr; - } + try { + runInternal(command, alreadyCompiled); + return createProcessorResponse(0); + } catch (CommandProcessorResponse cpr) { + SessionState ss = SessionState.get(); if(ss == null) { return cpr; @@ -1408,31 +1409,38 @@ public class Driver implements CommandProcessor { org.apache.hadoop.util.StringUtils.stringifyException(ex)); } return cpr; + } } public CommandProcessorResponse compileAndRespond(String command) { - return createProcessorResponse(compileInternal(command, false)); + try { + compileInternal(command, false); + return createProcessorResponse(0); + } catch (CommandProcessorResponse e) { + return e; + } } - public CommandProcessorResponse lockAndRespond() { + public void lockAndRespond() throws CommandProcessorResponse { // Assumes the query has already been compiled if (plan == null) { throw new IllegalStateException( "No previously compiled query for driver - queryId=" + queryState.getQueryId()); } - int ret = 0; if (requiresLock()) { - ret = acquireLocks(); - } - if (ret != 0) { - return rollback(createProcessorResponse(ret)); + try { + acquireLocks(); + } catch (CommandProcessorResponse cpr) { + rollback(cpr); + throw cpr; + } } - return createProcessorResponse(ret); } private static final ReentrantLock globalCompileLock = new ReentrantLock(); - private int compileInternal(String command, boolean deferClose) { + + private void compileInternal(String command, boolean deferClose) throws CommandProcessorResponse { int ret; Metrics metrics = MetricsFactory.getInstance(); @@ -1450,22 +1458,21 @@ public class Driver implements CommandProcessor { } if (compileLock == null) { - return ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode(); + throw createProcessorResponse(ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode()); } - try { - ret = compile(command, true, deferClose); - } finally { - compileLock.unlock(); - } - if (ret != 0) { + try { + compile(command, true, deferClose); + } catch (CommandProcessorResponse cpr) { try { releaseLocksAndCommitOrRollback(false); } catch (LockException e) { - LOG.warn("Exception in releasing locks. " - + org.apache.hadoop.util.StringUtils.stringifyException(e)); + LOG.warn("Exception in releasing locks. " + org.apache.hadoop.util.StringUtils.stringifyException(e)); } + throw cpr; + } finally { + compileLock.unlock(); } //Save compile-time PerfLogging for WebUI. @@ -1473,7 +1480,6 @@ public class Driver implements CommandProcessor { //or a reset PerfLogger. queryDisplay.setPerfLogStarts(QueryDisplay.Phase.COMPILATION, perfLogger.getStartTimes()); queryDisplay.setPerfLogEnds(QueryDisplay.Phase.COMPILATION, perfLogger.getEndTimes()); - return ret; } /** @@ -1535,8 +1541,8 @@ public class Driver implements CommandProcessor { return compileLock; } - private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled) - throws CommandNeedRetryException { + private void runInternal(String command, boolean alreadyCompiled) + throws CommandNeedRetryException, CommandProcessorResponse { errorMessage = null; SQLState = null; downstreamError = null; @@ -1550,7 +1556,7 @@ public class Driver implements CommandProcessor { } else { errorMessage = "FAILED: Precompiled query has been cancelled or closed."; console.printError(errorMessage); - return createProcessorResponse(12); + throw createProcessorResponse(12); } } else { lDrvState.driverState = DriverState.COMPILING; @@ -1578,20 +1584,16 @@ public class Driver implements CommandProcessor { downstreamError = e; console.printError(errorMessage + "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); - return createProcessorResponse(12); + throw createProcessorResponse(12); } PerfLogger perfLogger = null; - int ret; if (!alreadyCompiled) { // compile internal will automatically reset the perf logger - ret = compileInternal(command, true); + compileInternal(command, true); // then we continue to use this perf logger perfLogger = SessionState.getPerfLogger(); - if (ret != 0) { - return createProcessorResponse(ret); - } } else { // reuse existing perf logger. perfLogger = SessionState.getPerfLogger(); @@ -1603,21 +1605,18 @@ public class Driver implements CommandProcessor { // same instance of Driver, which can run multiple queries. ctx.setHiveTxnManager(queryTxnMgr); - if (isInterrupted()) { - return createProcessorResponse(handleInterruption("at acquiring the lock.")); - } + checkInterrupted("at acquiring the lock.", null, null); - CommandProcessorResponse resp = lockAndRespond(); - if (resp.failed()) { - return resp; - } + lockAndRespond(); - ret = execute(); - if (ret != 0) { - //if needRequireLock is false, the release here will do nothing because there is no lock - return rollback(createProcessorResponse(ret)); + try { + execute(); + } catch (CommandProcessorResponse cpr) { + rollback(cpr); + throw cpr; } + //if needRequireLock is false, the release here will do nothing because there is no lock try { //since set autocommit starts an implicit txn, close it @@ -1631,7 +1630,7 @@ public class Driver implements CommandProcessor { //txn (if there is one started) is not finished } } catch (LockException e) { - return handleHiveException(e, 12); + throw handleHiveException(e, 12); } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_RUN); @@ -1649,31 +1648,27 @@ public class Driver implements CommandProcessor { downstreamError = e; console.printError(errorMessage + "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); - return createProcessorResponse(12); + throw createProcessorResponse(12); } isFinishedWithError = false; - return createProcessorResponse(ret); } finally { - if (isInterrupted()) { + if (lDrvState.isAborted()) { closeInProcess(true); } else { // only release the related resources ctx, driverContext as normal releaseResources(); } + lDrvState.stateLock.lock(); try { - if (lDrvState.driverState == DriverState.INTERRUPT) { - lDrvState.driverState = DriverState.ERROR; - } else { - lDrvState.driverState = isFinishedWithError ? DriverState.ERROR : DriverState.EXECUTED; - } + lDrvState.driverState = isFinishedWithError ? DriverState.ERROR : DriverState.EXECUTED; } finally { lDrvState.stateLock.unlock(); } } } - private CommandProcessorResponse rollback(CommandProcessorResponse cpr) { + private CommandProcessorResponse rollback(CommandProcessorResponse cpr) throws CommandProcessorResponse { //console.printError(cpr.toString()); try { @@ -1685,10 +1680,12 @@ public class Driver implements CommandProcessor { } return cpr; } - private CommandProcessorResponse handleHiveException(HiveException e, int ret) { + + private CommandProcessorResponse handleHiveException(HiveException e, int ret) throws CommandProcessorResponse { return handleHiveException(e, ret, null); } - private CommandProcessorResponse handleHiveException(HiveException e, int ret, String rootMsg) { + + private CommandProcessorResponse handleHiveException(HiveException e, int ret, String rootMsg) throws CommandProcessorResponse { errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e); if(rootMsg != null) { errorMessage += "\n" + rootMsg; @@ -1698,7 +1695,7 @@ public class Driver implements CommandProcessor { downstreamError = e; console.printError(errorMessage + "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); - return createProcessorResponse(ret); + throw createProcessorResponse(ret); } private boolean requiresLock() { if (!checkConcurrency()) { @@ -1759,7 +1756,7 @@ public class Driver implements CommandProcessor { return new CommandProcessorResponse(ret, errorMessage, SQLState, downstreamError); } - private int execute() throws CommandNeedRetryException { + private void execute() throws CommandNeedRetryException, CommandProcessorResponse { PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE); @@ -1779,10 +1776,9 @@ public class Driver implements CommandProcessor { if (lDrvState.driverState != DriverState.COMPILED && lDrvState.driverState != DriverState.EXECUTING) { SQLState = "HY008"; - errorMessage = "FAILED: query " + queryStr + " has " + - (lDrvState.driverState == DriverState.INTERRUPT ? "been cancelled" : "not been compiled."); + errorMessage = "FAILED: unexpected driverstate: " + lDrvState + ", for query " + queryStr; console.printError(errorMessage); - return 1000; + throw createProcessorResponse(1000); } else { lDrvState.driverState = DriverState.EXECUTING; } @@ -1852,9 +1848,8 @@ public class Driver implements CommandProcessor { // At any time, at most maxthreads tasks can be running // The main thread polls the TaskRunners to check if they have finished. - if (isInterrupted()) { - return handleInterruptionWithHook("before running tasks.", hookContext, perfLogger); - } + checkInterrupted("before running tasks.", hookContext, perfLogger); + DriverContext driverCxt = new DriverContext(ctx); driverCxt.prepare(plan); @@ -1914,9 +1909,8 @@ public class Driver implements CommandProcessor { TaskResult result = tskRun.getTaskResult(); int exitVal = result.getExitVal(); - if (isInterrupted()) { - return handleInterruptionWithHook("when checking the execution result.", hookContext, perfLogger); - } + checkInterrupted("when checking the execution result.", hookContext, perfLogger); + if (exitVal != 0) { if (tsk.ifRetryCmdWhenFail()) { driverCxt.shutdown(); @@ -1962,7 +1956,7 @@ public class Driver implements CommandProcessor { // in case we decided to run everything in local mode, restore the // the jobtracker setting to its initial value ctx.restoreOriginalTracker(); - return exitVal; + throw createProcessorResponse(exitVal); } } @@ -1993,7 +1987,7 @@ public class Driver implements CommandProcessor { errorMessage = "FAILED: Operation cancelled"; invokeFailureHooks(perfLogger, hookContext, errorMessage, null); console.printError(errorMessage); - return 1000; + throw createProcessorResponse(1000); } // remove incomplete outputs. @@ -2029,11 +2023,13 @@ public class Driver implements CommandProcessor { } catch (CommandNeedRetryException e) { executionError = true; throw e; + } catch (CommandProcessorResponse cpr) { + executionError = true; + throw cpr; } catch (Throwable e) { executionError = true; - if (isInterrupted()) { - return handleInterruptionWithHook("during query execution: \n" + e.getMessage(), hookContext, perfLogger); - } + + checkInterrupted("during query execution: \n" + e.getMessage(), hookContext, perfLogger); ctx.restoreOriginalTracker(); if (SessionState.get() != null) { @@ -2053,7 +2049,7 @@ public class Driver implements CommandProcessor { downstreamError = e; console.printError(errorMessage + "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); - return (12); + throw createProcessorResponse(12); } finally { // Trigger query hooks after query completes its execution. try { @@ -2083,17 +2079,13 @@ public class Driver implements CommandProcessor { } console.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu)); } - boolean isInterrupted = isInterrupted(); lDrvState.stateLock.lock(); try { - if (isInterrupted) { - } else { - lDrvState.driverState = executionError ? DriverState.ERROR : DriverState.EXECUTED; - } + lDrvState.driverState = executionError ? DriverState.ERROR : DriverState.EXECUTED; } finally { lDrvState.stateLock.unlock(); } - if (isInterrupted) { + if (lDrvState.isAborted()) { LOG.info("Executing command(queryId=" + queryId + ") has been interrupted after " + duration + " seconds"); } else { LOG.info("Completed executing command(queryId=" + queryId + "); Time taken: " + duration + " seconds"); @@ -2103,8 +2095,6 @@ public class Driver implements CommandProcessor { if (console != null) { console.printInfo("OK"); } - - return (0); } private void releasePlan(QueryPlan plan) { @@ -2436,9 +2426,8 @@ public class Driver implements CommandProcessor { try { releaseDriverContext(); if (lDrvState.driverState == DriverState.COMPILING || - lDrvState.driverState == DriverState.EXECUTING || - lDrvState.driverState == DriverState.INTERRUPT) { - lDrvState.driverState = DriverState.INTERRUPT; + lDrvState.driverState == DriverState.EXECUTING) { + lDrvState.abort(); return 0; } releasePlan(); @@ -2463,8 +2452,7 @@ public class Driver implements CommandProcessor { try { // in the cancel case where the driver state is INTERRUPTED, destroy will be deferred to // the query process - if (lDrvState.driverState == DriverState.DESTROYED || - lDrvState.driverState == DriverState.INTERRUPT) { + if (lDrvState.driverState == DriverState.DESTROYED) { return; } else { lDrvState.driverState = DriverState.DESTROYED; http://git-wip-us.apache.org/repos/asf/hive/blob/c5539a1b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 29c33f9..d7397e0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -110,7 +110,6 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.Driver.DriverState; import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryPlan; @@ -3283,7 +3282,7 @@ public final class Utilities { boolean hasLogged = false; for (Map.Entry<Path, ArrayList<String>> e : pathToAliases) { - if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) { + if (lDrvStat != null && lDrvStat.isAborted()) { throw new IOException("Operation is Canceled."); } @@ -3339,7 +3338,7 @@ public final class Utilities { finalPathsToAdd.addAll(getInputPathsWithPool(job, work, hiveScratchDir, ctx, skipDummy, pathsToAdd, pool)); } else { for (final Path path : pathsToAdd) { - if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) { + if (lDrvStat != null && lDrvStat.isAborted()) { throw new IOException("Operation is Canceled."); } Path newPath = new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy).call(); @@ -3360,7 +3359,7 @@ public final class Utilities { try { Map<GetInputPathsCallable, Future<Path>> getPathsCallableToFuture = new LinkedHashMap<>(); for (final Path path : pathsToAdd) { - if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) { + if (lDrvStat != null && lDrvStat.isAborted()) { throw new IOException("Operation is Canceled."); } GetInputPathsCallable callable = new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy); @@ -3369,7 +3368,7 @@ public final class Utilities { pool.shutdown(); for (Map.Entry<GetInputPathsCallable, Future<Path>> future : getPathsCallableToFuture.entrySet()) { - if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) { + if (lDrvStat != null && lDrvStat.isAborted()) { throw new IOException("Operation is Canceled."); } @@ -3960,7 +3959,7 @@ public final class Utilities { String[] classNames = org.apache.hadoop.util.StringUtils.getStrings(HiveConf.getVar(hiveConf, confVar)); if (classNames == null) { - return Collections.emptyList(); + return new ArrayList<>(0); } Collection<Class<?>> classList = new ArrayList<Class<?>>(classNames.length); for (String className : classNames) { http://git-wip-us.apache.org/repos/asf/hive/blob/c5539a1b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index 6a188ac..21238db 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.ql.io; -import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -45,8 +43,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; -import org.apache.hadoop.hive.ql.Driver.DriverState; import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -367,8 +363,9 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ LockedDriverState lDrvStat = LockedDriverState.getLockedDriverState(); for (Path path : paths) { - if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) + if (lDrvStat != null && lDrvStat.isAborted()) { throw new IOException("Operation is Canceled. "); + } PartitionDesc part = HiveFileFormatUtils.getFromPathRecursively( pathToPartitionInfo, path, IOPrepareCache.get().allocatePartitionDescMap()); http://git-wip-us.apache.org/repos/asf/hive/blob/c5539a1b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java index 42c0042..cbb46df 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.Driver.DriverState; import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.lockmgr.*; @@ -198,7 +197,7 @@ public class ZooKeeperHiveLockManager implements HiveLockManager { boolean isInterrupted = false; if (lDrvState != null) { lDrvState.stateLock.lock(); - if (lDrvState.driverState == DriverState.INTERRUPT) { + if (lDrvState.isAborted()) { isInterrupted = true; } lDrvState.stateLock.unlock(); http://git-wip-us.apache.org/repos/asf/hive/blob/c5539a1b/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java b/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java index a4687f2..63b563e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java @@ -30,7 +30,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg; * is not 0. Note that often {@code responseCode} ends up the exit value of * command shell process so should keep it to < 127. */ -public class CommandProcessorResponse { +public class CommandProcessorResponse extends Exception { private final int responseCode; private final String errorMessage; private final int hiveErrorCode; http://git-wip-us.apache.org/repos/asf/hive/blob/c5539a1b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index dc79283..4148a8a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -115,7 +115,7 @@ import com.google.common.base.Preconditions; /** * GenericUDTFGetSplits. - * + * */ @Description(name = "get_splits", value = "_FUNC_(string,int) - " + "Returns an array of length int serialized splits for the referenced tables string.") @@ -301,9 +301,10 @@ public class GenericUDTFGetSplits extends GenericUDTF { // Table will be queried directly by LLAP // Acquire locks if necessary - they will be released during session cleanup. // The read will have READ_COMMITTED level semantics. - cpr = driver.lockAndRespond(); - if (cpr.getResponseCode() != 0) { - throw new HiveException("Failed to acquire locks: " + cpr.getException()); + try { + driver.lockAndRespond(); + } catch (CommandProcessorResponse cpr1) { + throw new HiveException("Failed to acquire locks", cpr1); } // Attach the resources to the session cleanup. @@ -394,7 +395,7 @@ public class GenericUDTFGetSplits extends GenericUDTF { LlapSigner signer = null; if (UserGroupInformation.isSecurityEnabled()) { signer = coordinator.getLlapSigner(job); - + // 1. Generate the token for query user (applies to all splits). queryUser = SessionState.getUserFromAuthenticator(); if (queryUser == null) { @@ -563,7 +564,7 @@ public class GenericUDTFGetSplits extends GenericUDTF { /** * Returns a local resource representing a jar. This resource will be used to * execute the plan on the cluster. - * + * * @param localJarPath * Local path to the jar to be localized. * @return LocalResource corresponding to the localized hive exec resource. http://git-wip-us.apache.org/repos/asf/hive/blob/c5539a1b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index b877253..2e73e48 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -169,8 +169,8 @@ public class TestTxnCommands2 { try { if (d != null) { dropTables(); - d.destroy(); d.close(); + d.destroy(); d = null; } TxnDbUtil.cleanDb(hiveConf); @@ -364,7 +364,7 @@ public class TestTxnCommands2 { Assert.assertEquals(536870912, BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(0))); Assert.assertEquals(536936448, BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(1))); /* - * All ROW__IDs are unique on read after conversion to acid + * All ROW__IDs are unique on read after conversion to acid * ROW__IDs are exactly the same before and after compaction * Also check the file name (only) after compaction for completeness * Note: order of rows in a file ends up being the reverse of order in values clause (why?!) @@ -1523,7 +1523,7 @@ public class TestTxnCommands2 { runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals)); List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); Assert.assertEquals(stringifyValues(vals), r); - String query = "merge into " + Table.ACIDTBL + + String query = "merge into " + Table.ACIDTBL + " using " + Table.NONACIDPART2 + " source ON " + Table.ACIDTBL + ".a = a2 and b + 1 = source.b2 + 1 " + "WHEN MATCHED THEN UPDATE set b = source.b2 " + "WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2)"; @@ -1647,7 +1647,7 @@ public class TestTxnCommands2 { // 1 ReadEntity: default@values__tmp__table__1 // 1 WriteEntity: default@acidtblpart Type=TABLE WriteType=INSERT isDP=false runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1'),(3,3,'p1'),(4,4,'p2')"); - + List<String> r1 = runStatementOnDriver("select count(*) from " + Table.ACIDTBLPART); Assert.assertEquals("4", r1.get(0)); //In DbTxnManager.acquireLocks() we have @@ -1655,12 +1655,12 @@ public class TestTxnCommands2 { // 1 WriteEntity: default@acidtblpart Type=TABLE WriteType=INSERT isDP=false //todo: side note on the above: LockRequestBuilder combines the both default@acidtblpart entries to 1 runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) select * from " + Table.ACIDTBLPART + " where p='p1'"); - + //In DbTxnManager.acquireLocks() we have // 2 ReadEntity: [default@acidtblpart@p=p1, default@acidtblpart] // 1 WriteEntity: default@acidtblpart@p=p2 Type=PARTITION WriteType=INSERT isDP=false runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p='p2') select a,b from " + Table.ACIDTBLPART + " where p='p1'"); - + //In UpdateDeleteSemanticAnalyzer, after super analyze // 3 ReadEntity: [default@acidtblpart, default@acidtblpart@p=p1, default@acidtblpart@p=p2] // 1 WriteEntity: [default@acidtblpart TABLE/INSERT] @@ -1670,7 +1670,7 @@ public class TestTxnCommands2 { //todo: Why acquire per partition locks - if you have many partitions that's hugely inefficient. //could acquire 1 table level Shared_write intead runStatementOnDriver("update " + Table.ACIDTBLPART + " set b = 1"); - + //In UpdateDeleteSemanticAnalyzer, after super analyze // Read [default@acidtblpart, default@acidtblpart@p=p1] // Write default@acidtblpart TABLE/INSERT http://git-wip-us.apache.org/repos/asf/hive/blob/c5539a1b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index 8737369..9f31eb1 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -111,8 +111,8 @@ public abstract class TxnCommandsBaseForTests { try { if (d != null) { dropTables(); - d.destroy(); d.close(); + d.destroy(); d = null; } } finally { http://git-wip-us.apache.org/repos/asf/hive/blob/c5539a1b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java index 9d27d21..913b60c 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.Driver.DriverState; import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.hooks.ReadEntity; @@ -83,7 +82,9 @@ public class TestDummyTxnManager { @After public void tearDown() throws Exception { - if (txnMgr != null) txnMgr.closeTxnManager(); + if (txnMgr != null) { + txnMgr.closeTxnManager(); + } } /** @@ -100,7 +101,7 @@ public class TestDummyTxnManager { expectedLocks.add(new ZooKeeperHiveLock("default.table1", new HiveLockObject(), HiveLockMode.SHARED)); LockedDriverState lDrvState = new LockedDriverState(); LockedDriverState lDrvInp = new LockedDriverState(); - lDrvInp.driverState = DriverState.INTERRUPT; + lDrvInp.abort(); LockException lEx = new LockException(ErrorMsg.LOCK_ACQUIRE_CANCELLED.getMsg()); when(mockLockManager.lock(anyListOf(HiveLockObj.class), eq(false), eq(lDrvState))).thenReturn(expectedLocks); when(mockLockManager.lock(anyListOf(HiveLockObj.class), eq(false), eq(lDrvInp))).thenThrow(lEx);