HIVE-11077 Add support in parser and wire up to txn manager (Eugene Koifman, reviewed by Alan Gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/012c99ff Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/012c99ff Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/012c99ff Branch: refs/heads/branch-1 Commit: 012c99ff22f3f6978bd4f520716cb6d26ab1138a Parents: 8e8e391 Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Wed Jul 22 12:55:09 2015 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Wed Jul 22 12:55:09 2015 -0700 ---------------------------------------------------------------------- .../hadoop/hive/cli/TestOptionsProcessor.java | 1 - .../hadoop/hive/common/ValidReadTxnList.java | 2 +- .../hadoop/hive/metastore/txn/TxnHandler.java | 9 +- .../metastore/txn/ValidCompactorTxnList.java | 2 +- .../java/org/apache/hadoop/hive/ql/Context.java | 1 - .../java/org/apache/hadoop/hive/ql/Driver.java | 196 +++++--- .../org/apache/hadoop/hive/ql/ErrorMsg.java | 2 +- .../org/apache/hadoop/hive/ql/QueryPlan.java | 18 +- .../apache/hadoop/hive/ql/exec/MoveTask.java | 2 +- .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 36 +- .../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 8 + .../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 21 + .../hive/ql/lockmgr/HiveTxnManagerImpl.java | 10 + .../hadoop/hive/ql/lockmgr/LockException.java | 8 +- .../hadoop/hive/ql/metadata/HiveException.java | 3 + .../hive/ql/parse/BaseSemanticAnalyzer.java | 13 + .../org/apache/hadoop/hive/ql/parse/HiveLexer.g | 11 + .../apache/hadoop/hive/ql/parse/HiveParser.g | 70 +++ .../hadoop/hive/ql/parse/IdentifiersParser.g | 19 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 25 +- .../hive/ql/parse/SemanticAnalyzerFactory.java | 12 + .../hadoop/hive/ql/plan/HiveOperation.java | 32 +- .../ql/processors/CommandProcessorResponse.java | 21 +- .../hadoop/hive/ql/processors/HiveCommand.java | 3 + .../authorization/plugin/HiveOperationType.java | 5 + .../plugin/sqlstd/Operation2Privilege.java | 11 + .../hadoop/hive/ql/session/SessionState.java | 34 +- .../apache/hadoop/hive/ql/TestTxnCommands.java | 473 +++++++++++++++++++ .../positive/TestTransactionStatement.java | 102 ++++ .../hive/ql/session/TestSessionState.java | 2 +- .../clientnegative/exchange_partition.q.out | 2 +- .../clientpositive/exchange_partition.q.out | 4 +- .../clientpositive/exchange_partition2.q.out | 4 +- .../clientpositive/exchange_partition3.q.out | 4 +- 34 files changed, 1020 insertions(+), 146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/cli/src/test/org/apache/hadoop/hive/cli/TestOptionsProcessor.java ---------------------------------------------------------------------- diff --git a/cli/src/test/org/apache/hadoop/hive/cli/TestOptionsProcessor.java b/cli/src/test/org/apache/hadoop/hive/cli/TestOptionsProcessor.java index 9d0399a..ac22ab1 100644 --- a/cli/src/test/org/apache/hadoop/hive/cli/TestOptionsProcessor.java +++ b/cli/src/test/org/apache/hadoop/hive/cli/TestOptionsProcessor.java @@ -56,7 +56,6 @@ public class TestOptionsProcessor { assertEquals("execString", sessionState.execString); assertEquals(0, sessionState.initFiles.size()); assertTrue(sessionState.getIsVerbose()); - sessionState.setConf(null); assertTrue(sessionState.getIsSilent()); } http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java b/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java index 479e0df..fda242d 100644 --- a/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java +++ b/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java @@ -105,7 +105,7 @@ public class ValidReadTxnList implements ValidTxnList { @Override public void readFromString(String src) { - if (src == null) { + if (src == null || src.length() == 0) { highWatermark = Long.MAX_VALUE; exceptions = new long[0]; } else { http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index fd9c275..c0e83c6 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -380,8 +380,9 @@ public class TxnHandler { "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid; LOG.debug("Going to execute insert <" + s + ">"); if (stmt.executeUpdate(s) < 1) { - LOG.warn("Expected to move at least one record from txn_components to " + - "completed_txn_components when committing txn!"); + //this can be reasonable for an empty txn START/COMMIT + LOG.info("Expected to move at least one record from txn_components to " + + "completed_txn_components when committing txn! txnid:" + txnid); } // Always access TXN_COMPONENTS before HIVE_LOCKS; @@ -1351,7 +1352,7 @@ public class TxnHandler { throws NoSuchTxnException, TxnAbortedException, MetaException, SQLException { // We want to minimize the number of concurrent lock requests being issued. If we do not we // get a large number of deadlocks in the database, since this method has to both clean - // timedout locks and insert new locks. This synchronization barrier will not eliminiate all + // timedout locks and insert new locks. This synchronization barrier will not eliminate all // deadlocks, and the code is still resilient in the face of a database deadlock. But it // will reduce the number. This could have been done via a lock table command in the // underlying database, but was not for two reasons. One, different databases have different @@ -1452,7 +1453,7 @@ public class TxnHandler { long extLockId, boolean alwaysCommit) throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException { - List<LockInfo> locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId); + List<LockInfo> locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);//being acquired now LockResponse response = new LockResponse(); response.setLockid(extLockId); http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java index 71f14e5..67631ba 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java @@ -88,7 +88,7 @@ public class ValidCompactorTxnList extends ValidReadTxnList { @Override public void readFromString(String src) { - if (src == null) { + if (src == null || src.length() == 0) { highWatermark = Long.MAX_VALUE; exceptions = new long[0]; } else { http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/ql/src/java/org/apache/hadoop/hive/ql/Context.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index a74bbbe..ca0d487 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -96,7 +96,6 @@ public class Context { // List of Locks for this query protected List<HiveLock> hiveLocks; - protected HiveLockManager hiveLockMgr; // Transaction manager for this query protected HiveTxnManager hiveTxnManager; http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index d161503..f501b37 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -385,7 +385,10 @@ public class Driver implements CommandProcessor { SessionState.get().setupQueryCurrentTimestamp(); try { - command = new VariableSubstitution().substitute(conf,command); + // Initialize the transaction manager. This must be done before analyze is called. + SessionState.get().initTxnMgr(conf); + + command = new VariableSubstitution().substitute(conf, command); ctx = new Context(conf); ctx.setTryCount(getTryCount()); ctx.setCmd(command); @@ -397,13 +400,6 @@ public class Driver implements CommandProcessor { tree = ParseUtils.findRootNonNullToken(tree); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE); - // Initialize the transaction manager. This must be done before analyze is called. Also - // record the valid transactions for this query. We have to do this at compile time - // because we use the information in planning the query. Also, - // we want to record it at this point so that users see data valid at the point that they - // submit the query. - SessionState.get().initTxnMgr(conf); - recordValidTxns(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ANALYZE); BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree); @@ -443,10 +439,8 @@ public class Driver implements CommandProcessor { // to avoid returning sensitive data String queryStr = HookUtils.redactLogString(conf, command); - String operationName = ctx.getExplain() ? - HiveOperation.EXPLAIN.getOperationName() : SessionState.get().getCommandType(); plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId, - operationName, getSchema(sem, conf)); + SessionState.get().getHiveOperation(), getSchema(sem, conf)); conf.setVar(HiveConf.ConfVars.HIVEQUERYSTRING, queryStr); @@ -505,7 +499,8 @@ public class Driver implements CommandProcessor { downstreamError = e; console.printError(errorMessage, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); - return error.getErrorCode(); + return error.getErrorCode();//todo: this is bad if returned as cmd shell exit + // since it exceeds valid range of shell return values } finally { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.COMPILE); dumpMetaCallTimingWithoutEx("compilation"); @@ -935,30 +930,32 @@ public class Driver implements CommandProcessor { // Write the current set of valid transactions into the conf file so that it can be read by // the input format. private void recordValidTxns() throws LockException { - ValidTxnList txns = SessionState.get().getTxnMgr().getValidTxns(); + HiveTxnManager txnMgr = SessionState.get().getTxnMgr(); + ValidTxnList txns = txnMgr.getValidTxns(); String txnStr = txns.toString(); conf.set(ValidTxnList.VALID_TXNS_KEY, txnStr); - LOG.debug("Encoding valid txns info " + txnStr); - // TODO I think when we switch to cross query transactions we need to keep this list in - // session state rather than agressively encoding it in the conf like this. We can let the - // TableScanOperators then encode it in the conf before calling the input formats. + LOG.debug("Encoding valid txns info " + txnStr + " txnid:" + txnMgr.getCurrentTxnId()); } /** * Acquire read and write locks needed by the statement. The list of objects to be locked are - * obtained from the inputs and outputs populated by the compiler. The lock acuisition scheme is + * obtained from the inputs and outputs populated by the compiler. The lock acquisition scheme is * pretty simple. If all the locks cannot be obtained, error out. Deadlock is avoided by making * sure that the locks are lexicographically sorted. * * This method also records the list of valid transactions. This must be done after any * transactions have been opened and locks acquired. + * @param startTxnImplicitly in AC=false, the 1st DML starts a txn **/ - private int acquireLocksAndOpenTxn() { + private int acquireLocksAndOpenTxn(boolean startTxnImplicitly) { PerfLogger perfLogger = PerfLogger.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ACQUIRE_READ_WRITE_LOCKS); SessionState ss = SessionState.get(); HiveTxnManager txnMgr = ss.getTxnMgr(); + if(startTxnImplicitly) { + assert !txnMgr.getAutoCommit(); + } try { // Don't use the userName member, as it may or may not have been set. Get the value from @@ -974,27 +971,34 @@ public class Driver implements CommandProcessor { "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); return 10; } - if (acidSinks != null && acidSinks.size() > 0) { + + boolean existingTxn = txnMgr.isTxnOpen(); + if((txnMgr.getAutoCommit() && haveAcidWrite()) || plan.getOperation() == HiveOperation.START_TRANSACTION || + (!txnMgr.getAutoCommit() && startTxnImplicitly)) { // We are writing to tables in an ACID compliant way, so we need to open a transaction - long txnId = ss.getCurrentTxn(); - if (txnId == SessionState.NO_CURRENT_TXN) { - txnId = txnMgr.openTxn(userFromUGI); - ss.setCurrentTxn(txnId); - LOG.debug("Setting current transaction to " + txnId); - } - // Set the transaction id in all of the acid file sinks - if (acidSinks != null) { - for (FileSinkDesc desc : acidSinks) { - desc.setTransactionId(txnId); - desc.setStatementId(txnMgr.getStatementId()); - } + txnMgr.openTxn(userFromUGI); + } + // Set the transaction id in all of the acid file sinks + if (haveAcidWrite()) { + for (FileSinkDesc desc : acidSinks) { + desc.setTransactionId(txnMgr.getCurrentTxnId()); + desc.setStatementId(txnMgr.getStatementId()); } - - // TODO Once we move to cross query transactions we need to add the open transaction to - // our list of valid transactions. We don't have a way to do that right now. } - + /*Note, we have to record snapshot after lock acquisition to prevent lost update problem + consider 2 concurrent "update table T set x = x + 1". 1st will get the locks and the + 2nd will block until 1st one commits and only then lock in the snapshot, i.e. it will + see the changes made by 1st one. This takes care of autoCommit=true case. + For multi-stmt txns this is not sufficient and will be managed via WriteSet tracking + in the lock manager.*/ txnMgr.acquireLocks(plan, ctx, userFromUGI); + if(!existingTxn) { + //For multi-stmt txns we should record the snapshot when txn starts but + // don't update it after that until txn completes. Thus the check for {@code existingTxn} + //For autoCommit=true, Read-only statements, txn is implicit, i.e. lock in the snapshot + //for each statement. + recordValidTxns(); + } return 0; } catch (LockException e) { @@ -1009,6 +1013,9 @@ public class Driver implements CommandProcessor { } } + private boolean haveAcidWrite() { + return acidSinks != null && !acidSinks.isEmpty(); + } /** * @param hiveLocks * list of hive locks to be released Release all the locks specified. If some of the @@ -1026,17 +1033,14 @@ public class Driver implements CommandProcessor { HiveTxnManager txnMgr = ss.getTxnMgr(); // If we've opened a transaction we need to commit or rollback rather than explicitly // releasing the locks. - if (ss.getCurrentTxn() != SessionState.NO_CURRENT_TXN && ss.isAutoCommit()) { - try { - if (commit) { - txnMgr.commitTxn(); - } else { - txnMgr.rollbackTxn(); - } - } finally { - ss.setCurrentTxn(SessionState.NO_CURRENT_TXN); + if (txnMgr.isTxnOpen()) { + if (commit) { + txnMgr.commitTxn();//both commit & rollback clear ALL locks for this tx + } else { + txnMgr.rollbackTxn(); } } else { + //since there is no tx, we only have locks for current query (if any) if (hiveLocks != null) { txnMgr.getLockManager().releaseLocks(hiveLocks); } @@ -1178,44 +1182,77 @@ public class Driver implements CommandProcessor { // Since we're reusing the compiled plan, we need to update its start time for current run plan.setQueryStartTime(perfLogger.getStartTime(PerfLogger.DRIVER_RUN)); } - // the reason that we set the txn manager for the cxt here is because each // query has its own ctx object. The txn mgr is shared across the // same instance of Driver, which can run multiple queries. - ctx.setHiveTxnManager(SessionState.get().getTxnMgr()); + HiveTxnManager txnManager = SessionState.get().getTxnMgr(); + ctx.setHiveTxnManager(txnManager); + + boolean startTxnImplicitly = false; + { + //this block ensures op makes sense in given context, e.g. COMMIT is valid only if txn is open + //DDL is not allowed in a txn, etc. + //an error in an open txn does a rollback of the txn + if (txnManager.isTxnOpen() && !plan.getOperation().isAllowedInTransaction()) { + assert !txnManager.getAutoCommit() : "didn't expect AC=true"; + return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, null, + plan.getOperationName(), Long.toString(txnManager.getCurrentTxnId()))); + } + if(!txnManager.isTxnOpen() && plan.getOperation().isRequiresOpenTransaction()) { + return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, null, plan.getOperationName())); + } + if(!txnManager.isTxnOpen() && plan.getOperation() == HiveOperation.QUERY && !txnManager.getAutoCommit()) { + //this effectively makes START TRANSACTION optional and supports JDBC setAutoCommit(false) semantics + //also, indirectly allows DDL to be executed outside a txn context + startTxnImplicitly = true; + } + if(txnManager.getAutoCommit() && plan.getOperation() == HiveOperation.START_TRANSACTION) { + return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT, null, plan.getOperationName())); + } + } + if(plan.getOperation() == HiveOperation.SET_AUTOCOMMIT) { + try { + if(plan.getAutoCommitValue() && !txnManager.getAutoCommit()) { + /*here, if there is an open txn, we want to commit it; this behavior matches + * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)*/ + releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), true); + txnManager.setAutoCommit(true); + } + else if(!plan.getAutoCommitValue() && txnManager.getAutoCommit()) { + txnManager.setAutoCommit(false); + } + else {/*didn't change autoCommit value - no-op*/} + } + catch(LockException e) { + return handleHiveException(e, 12); + } + } if (requiresLock()) { - ret = acquireLocksAndOpenTxn(); + ret = acquireLocksAndOpenTxn(startTxnImplicitly); if (ret != 0) { - try { - releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false); - } catch (LockException e) { - // Not much to do here - } - return createProcessorResponse(ret); + return rollback(createProcessorResponse(ret)); } } ret = execute(); if (ret != 0) { //if needRequireLock is false, the release here will do nothing because there is no lock - try { - releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false); - } catch (LockException e) { - // Nothing to do here - } - return createProcessorResponse(ret); + return rollback(createProcessorResponse(ret)); } //if needRequireLock is false, the release here will do nothing because there is no lock try { - releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), true); + if(txnManager.getAutoCommit() || plan.getOperation() == HiveOperation.COMMIT) { + releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), true); + } + else if(plan.getOperation() == HiveOperation.ROLLBACK) { + releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false); + } + else { + //txn (if there is one started) is not finished + } } catch (LockException e) { - errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e); - SQLState = ErrorMsg.findSQLState(e.getMessage()); - downstreamError = e; - console.printError(errorMessage + "\n" - + org.apache.hadoop.util.StringUtils.stringifyException(e)); - return createProcessorResponse(12); + return handleHiveException(e, 12); } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_RUN); @@ -1238,6 +1275,31 @@ public class Driver implements CommandProcessor { return createProcessorResponse(ret); } + private CommandProcessorResponse rollback(CommandProcessorResponse cpr) { + try { + releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false); + } + catch (LockException e) { + LOG.error("rollback() FAILED: " + cpr);//make sure not to loose + handleHiveException(e, 12, "Additional info in hive.log at \"rollback() FAILED\""); + } + return cpr; + } + private CommandProcessorResponse handleHiveException(HiveException e, int ret) { + return handleHiveException(e, ret, null); + } + private CommandProcessorResponse handleHiveException(HiveException e, int ret, String rootMsg) { + errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e); + if(rootMsg != null) { + errorMessage += "\n" + rootMsg; + } + SQLState = e.getCanonicalErrorMsg() != null ? + e.getCanonicalErrorMsg().getSQLState() : ErrorMsg.findSQLState(e.getMessage()); + downstreamError = e; + console.printError(errorMessage + "\n" + + org.apache.hadoop.util.StringUtils.stringifyException(e)); + return createProcessorResponse(ret); + } private boolean requiresLock() { if (!checkConcurrency()) { return false; http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 20509ce..cef72b4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -447,7 +447,7 @@ public enum ErrorMsg { " (={2}). This is controlled by hive.limit.query.max.table.partition.", true), OP_NOT_ALLOWED_IN_AUTOCOMMIT(20006, "Operation {0} is not allowed when autoCommit=true.", true),//todo: better SQLState? OP_NOT_ALLOWED_IN_TXN(20007, "Operation {0} is not allowed in a transaction. TransactionID={1}.", true), - OP_NOT_ALLOWED_WITHOUT_TXN(2008, "Operation {0} is not allowed since autoCommit=false and there is no active transaction", true), + OP_NOT_ALLOWED_WITHOUT_TXN(20008, "Operation {0} is not allowed since autoCommit=false and there is no active transaction", true), //========================== 30000 range starts here ========================// STATSPUBLISHER_NOT_OBTAINED(30000, "StatsPublisher cannot be obtained. " + http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java index 29a3939..b9776ea 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo; import org.apache.hadoop.hive.ql.parse.TableAccessInfo; +import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob; import org.apache.hadoop.hive.ql.plan.api.AdjacencyType; @@ -106,14 +107,16 @@ public class QueryPlan implements Serializable { private QueryProperties queryProperties; private transient Long queryStartTime; - private String operationName; + private final HiveOperation operation; + private Boolean autoCommitValue; public QueryPlan() { this.reducerTimeStatsPerJobList = new ArrayList<ReducerTimeStatsPerJob>(); + operation = null; } public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId, - String operationName, Schema resultSchema) { + HiveOperation operation, Schema resultSchema) { this.queryString = queryString; rootTasks = new ArrayList<Task<? extends Serializable>>(); @@ -134,7 +137,8 @@ public class QueryPlan implements Serializable { query.putToQueryAttributes("queryString", this.queryString); queryProperties = sem.getQueryProperties(); queryStartTime = startTime; - this.operationName = operationName; + this.operation = operation; + this.autoCommitValue = sem.getAutoCommitValue(); this.resultSchema = resultSchema; } @@ -794,6 +798,12 @@ public class QueryPlan implements Serializable { } public String getOperationName() { - return operationName; + return operation == null ? null : operation.getOperationName(); + } + public HiveOperation getOperation() { + return operation; + } + public Boolean getAutoCommitValue() { + return autoCommitValue; } } http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index b07a37a..0a466e4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -380,7 +380,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable { tbd.getHoldDDLTime(), isSkewedStoredAsDirs(tbd), work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, - SessionState.get().getCurrentTxn()); + SessionState.get().getTxnMgr().getCurrentTxnId()); console.printInfo("\t Time taken for load dynamic partitions : " + (System.currentTimeMillis() - startTime)); http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 445f606..4813d5b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -21,7 +21,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.LockComponentBuilder; import org.apache.hadoop.hive.metastore.LockRequestBuilder; @@ -51,6 +50,10 @@ public class DbTxnManager extends HiveTxnManagerImpl { private DbLockManager lockMgr = null; private IMetaStoreClient client = null; + /** + * The Metastore NEXT_TXN_ID.NTXN_NEXT is initialized to 1; it contains the next available + * transaction id. Thus is 1 is first transaction id. + */ private long txnId = 0; /** * assigns a unique monotonically increasing ID to each statement @@ -75,14 +78,16 @@ public class DbTxnManager extends HiveTxnManagerImpl { @Override public long openTxn(String user) throws LockException { init(); + if(isTxnOpen()) { + throw new LockException("Transaction already opened. txnId=" + txnId);//ToDo: ErrorMsg + } try { txnId = client.openTxn(user); statementId = 0; LOG.debug("Opened txn " + txnId); return txnId; } catch (TException e) { - throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), - e); + throw new LockException(e, ErrorMsg.METASTORE_COMMUNICATION_FAILED); } } @@ -232,7 +237,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { } List<HiveLock> locks = new ArrayList<HiveLock>(1); - if(txnId > 0) { + if(isTxnOpen()) { statementId++; } LockState lockState = lockMgr.lock(rqstBuilder.build(), plan.getQueryId(), isBlocking, locks); @@ -242,9 +247,8 @@ public class DbTxnManager extends HiveTxnManagerImpl { @Override public void commitTxn() throws LockException { - if (txnId == 0) { - throw new RuntimeException("Attempt to commit before opening a " + - "transaction"); + if (!isTxnOpen()) { + throw new RuntimeException("Attempt to commit before opening a transaction"); } try { lockMgr.clearLocalLockRecords(); @@ -267,9 +271,8 @@ public class DbTxnManager extends HiveTxnManagerImpl { @Override public void rollbackTxn() throws LockException { - if (txnId == 0) { - throw new RuntimeException("Attempt to rollback before opening a " + - "transaction"); + if (!isTxnOpen()) { + throw new RuntimeException("Attempt to rollback before opening a transaction"); } try { lockMgr.clearLocalLockRecords(); @@ -292,7 +295,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { LOG.debug("Heartbeating lock and transaction " + txnId); List<HiveLock> locks = lockMgr.getLocks(false, false); if (locks.size() == 0) { - if (txnId == 0) { + if (!isTxnOpen()) { // No locks, no txn, we outta here. return; } else { @@ -350,7 +353,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { @Override protected void destruct() { try { - if (txnId > 0) rollbackTxn(); + if (isTxnOpen()) rollbackTxn(); if (lockMgr != null) lockMgr.close(); } catch (Exception e) { LOG.error("Caught exception " + e.getClass().getName() + " with message <" + e.getMessage() @@ -376,8 +379,15 @@ public class DbTxnManager extends HiveTxnManagerImpl { } } @Override + public boolean isTxnOpen() { + return txnId > 0; + } + @Override + public long getCurrentTxnId() { + return txnId; + } + @Override public int getStatementId() { return statementId; } - } http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java index 1906982..be5a593 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java @@ -52,6 +52,14 @@ class DummyTxnManager extends HiveTxnManagerImpl { // No-op return 0L; } + @Override + public boolean isTxnOpen() { + return false; + } + @Override + public long getCurrentTxnId() { + return 0L; + } @Override public int getStatementId() { http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java index c900548..74512d7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java @@ -173,6 +173,27 @@ public interface HiveTxnManager { */ boolean supportsAcid(); + /** + * This behaves exactly as + * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean) + */ + void setAutoCommit(boolean autoCommit) throws LockException; + + /** + * This behaves exactly as + * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#getAutoCommit() + */ + boolean getAutoCommit(); + + boolean isTxnOpen(); + /** + * if {@code isTxnOpen()}, returns the currently active transaction ID + */ + long getCurrentTxnId(); + + /** + * 0..N Id of current statement within currently opened transaction + */ int getStatementId(); } http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java index ceeae68..ed022d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.plan.UnlockTableDesc; abstract class HiveTxnManagerImpl implements HiveTxnManager { protected HiveConf conf; + private boolean isAutoCommit = true;//true by default; matches JDBC spec void setHiveConf(HiveConf c) { conf = c; @@ -58,6 +59,15 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager { protected void finalize() throws Throwable { destruct(); } + @Override + public void setAutoCommit(boolean autoCommit) throws LockException { + isAutoCommit = autoCommit; + } + + @Override + public boolean getAutoCommit() { + return isAutoCommit; + } @Override public int lockTable(Hive db, LockTableDesc lockTbl) throws HiveException { http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java index 9894a70..8ea457e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.lockmgr; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.metadata.HiveException; /** @@ -43,5 +44,10 @@ public class LockException extends HiveException { public LockException(String message, Throwable cause) { super(message, cause); } - + public LockException(Throwable cause, ErrorMsg errorMsg, String... msgArgs) { + super(cause, errorMsg, msgArgs); + } + public LockException(Throwable cause, ErrorMsg errorMsg) { + super(cause, errorMsg); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java index 1d895ca..d017705 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java @@ -60,6 +60,9 @@ public class HiveException extends Exception { canonicalErrorMsg = errorMsg; } + public HiveException(Throwable cause, ErrorMsg errorMsg) { + this(cause, errorMsg, new String[0]); + } /** * @return {@link ErrorMsg#GENERIC_ERROR} by default */ http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index d72991f..fbe93f9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -125,6 +125,19 @@ public abstract class BaseSemanticAnalyzer { * Columns accessed by updates */ protected ColumnAccessInfo updateColumnAccessInfo; + /** + * the value of set autocommit true|false + * It's an object to make sure it's {@code null} if the parsed statement is + * not 'set autocommit...' + */ + private Boolean autoCommitValue; + + public Boolean getAutoCommitValue() { + return autoCommitValue; + } + void setAutoCommitValue(Boolean autoCommit) { + autoCommitValue = autoCommit; + } public boolean skipAuthorization() { http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g index bdd7cb7..3ec1e34 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g @@ -306,6 +306,17 @@ KW_DAY: 'DAY'; KW_HOUR: 'HOUR'; KW_MINUTE: 'MINUTE'; KW_SECOND: 'SECOND'; +KW_START: 'START'; +KW_TRANSACTION: 'TRANSACTION'; +KW_COMMIT: 'COMMIT'; +KW_ROLLBACK: 'ROLLBACK'; +KW_WORK: 'WORK'; +KW_ONLY: 'ONLY'; +KW_WRITE: 'WRITE'; +KW_ISOLATION: 'ISOLATION'; +KW_LEVEL: 'LEVEL'; +KW_SNAPSHOT: 'SNAPSHOT'; +KW_AUTOCOMMIT: 'AUTOCOMMIT'; // Operators // NOTE: if you add a new function/operator, add it to sysFuncNames so that describe function _FUNC_ will work. http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index b267bd2..6c373dd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -356,6 +356,15 @@ TOK_ANONYMOUS; TOK_COL_NAME; TOK_URI_TYPE; TOK_SERVER_TYPE; +TOK_START_TRANSACTION; +TOK_ISOLATION_LEVEL; +TOK_ISOLATION_SNAPSHOT; +TOK_TXN_ACCESS_MODE; +TOK_TXN_READ_ONLY; +TOK_TXN_READ_WRITE; +TOK_COMMIT; +TOK_ROLLBACK; +TOK_SET_AUTOCOMMIT; } @@ -377,6 +386,7 @@ import org.apache.hadoop.hive.conf.HiveConf; private static HashMap<String, String> xlateMap; static { + //this is used to support auto completion in CLI xlateMap = new HashMap<String, String>(); // Keywords @@ -695,6 +705,7 @@ execStatement | ddlStatement | deleteStatement | updateStatement + | sqlTransactionStatement ; loadStatement @@ -2395,3 +2406,62 @@ updateStatement : KW_UPDATE tableName setColumnsClause whereClause? -> ^(TOK_UPDATE_TABLE tableName setColumnsClause whereClause?) ; + +/* +BEGIN user defined transaction boundaries; follows SQL 2003 standard exactly except for addition of +"setAutoCommitStatement" which is not in the standard doc but is supported by most SQL engines. +*/ +sqlTransactionStatement +@init { pushMsg("transaction statement", state); } +@after { popMsg(state); } + : + startTransactionStatement + | commitStatement + | rollbackStatement + | setAutoCommitStatement + ; + +startTransactionStatement + : + KW_START KW_TRANSACTION ( transactionMode ( COMMA transactionMode )* )? -> ^(TOK_START_TRANSACTION transactionMode*) + ; + +transactionMode + : + isolationLevel + | transactionAccessMode -> ^(TOK_TXN_ACCESS_MODE transactionAccessMode) + ; + +transactionAccessMode + : + KW_READ KW_ONLY -> TOK_TXN_READ_ONLY + | KW_READ KW_WRITE -> TOK_TXN_READ_WRITE + ; + +isolationLevel + : + KW_ISOLATION KW_LEVEL levelOfIsolation -> ^(TOK_ISOLATION_LEVEL levelOfIsolation) + ; + +/*READ UNCOMMITTED | READ COMMITTED | REPEATABLE READ | SERIALIZABLE may be supported later*/ +levelOfIsolation + : + KW_SNAPSHOT -> TOK_ISOLATION_SNAPSHOT + ; + +commitStatement + : + KW_COMMIT ( KW_WORK )? -> TOK_COMMIT + ; + +rollbackStatement + : + KW_ROLLBACK ( KW_WORK )? -> TOK_ROLLBACK + ; +setAutoCommitStatement + : + KW_SET KW_AUTOCOMMIT booleanValueTok -> ^(TOK_SET_AUTOCOMMIT booleanValueTok) + ; +/* +END user defined transaction boundaries +*/ http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g index 4f8be52..501287d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g @@ -499,6 +499,12 @@ booleanValue KW_TRUE^ | KW_FALSE^ ; +booleanValueTok + : + KW_TRUE -> TOK_TRUE + | KW_FALSE -> TOK_FALSE + ; + tableOrPartition : tableName partitionSpec? -> ^(TOK_TAB tableName partitionSpec?) @@ -629,7 +635,18 @@ nonReserved | KW_STREAMTABLE | KW_STRING | KW_STRUCT | KW_TABLES | KW_TBLPROPERTIES | KW_TEMPORARY | KW_TERMINATED | KW_TINYINT | KW_TOUCH | KW_TRANSACTIONS | KW_UNARCHIVE | KW_UNDO | KW_UNIONTYPE | KW_UNLOCK | KW_UNSET | KW_UNSIGNED | KW_URI | KW_USE | KW_UTC | KW_UTCTIMESTAMP | KW_VALUE_TYPE | KW_VIEW | KW_WHILE | KW_YEAR - ; + | KW_WORK + | KW_START + | KW_TRANSACTION + | KW_COMMIT + | KW_ROLLBACK + | KW_ONLY + | KW_WRITE + | KW_ISOLATION + | KW_LEVEL + | KW_SNAPSHOT + | KW_AUTOCOMMIT +; //The following SQL2011 reserved keywords are used as cast function name only, it is a subset of the sql11ReservedKeywordsUsedAsIdentifier. sql11ReservedKeywordsUsedAsCastFunctionName http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 5719cf4..2ae6309 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -10056,6 +10056,25 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { viewsExpanded.add(createVwDesc.getViewName()); } + switch(ast.getToken().getType()) { + case HiveParser.TOK_SET_AUTOCOMMIT: + assert ast.getChildCount() == 1; + if(ast.getChild(0).getType() == HiveParser.TOK_TRUE) { + setAutoCommitValue(true); + } + else if(ast.getChild(0).getType() == HiveParser.TOK_FALSE) { + setAutoCommitValue(false); + } + else { + assert false : "Unexpected child of TOK_SET_AUTOCOMMIT: " + ast.getChild(0).getType(); + } + //fall through + case HiveParser.TOK_START_TRANSACTION: + case HiveParser.TOK_COMMIT: + case HiveParser.TOK_ROLLBACK: + SessionState.get().setCommandType(SemanticAnalyzerFactory.getOperation(ast.getToken().getType())); + return false; + } // 4. continue analyzing from the child ASTNode. Phase1Ctx ctx_1 = initPhase1Ctx(); preProcessForInsert(child, qb); @@ -10178,7 +10197,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } // 6. Generate table access stats if required - if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_TABLEKEYS) == true) { + if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_TABLEKEYS)) { TableAccessAnalyzer tableAccessAnalyzer = new TableAccessAnalyzer(pCtx); setTableAccessInfo(tableAccessAnalyzer.analyzeTableAccess()); } @@ -10201,7 +10220,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { boolean isColumnInfoNeedForAuth = SessionState.get().isAuthorizationModeV2() && HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED); if (isColumnInfoNeedForAuth - || HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS) == true) { + || HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS)) { ColumnAccessAnalyzer columnAccessAnalyzer = new ColumnAccessAnalyzer(pCtx); setColumnAccessInfo(columnAccessAnalyzer.analyzeColumnAccess()); } @@ -10691,7 +10710,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { * Add default properties for table property. If a default parameter exists * in the tblProp, the value in tblProp will be kept. * - * @param table + * @param tblProp * property map * @return Modified table property map */ http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java index 97d02ea..4a3802d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java @@ -59,6 +59,7 @@ public final class SemanticAnalyzerFactory { commandType.put(HiveParser.TOK_ALTERTABLE_UNARCHIVE, HiveOperation.ALTERTABLE_UNARCHIVE); commandType.put(HiveParser.TOK_ALTERTABLE_PROPERTIES, HiveOperation.ALTERTABLE_PROPERTIES); commandType.put(HiveParser.TOK_ALTERTABLE_DROPPROPERTIES, HiveOperation.ALTERTABLE_PROPERTIES); + commandType.put(HiveParser.TOK_ALTERTABLE_EXCHANGEPARTITION, HiveOperation.ALTERTABLE_EXCHANGEPARTITION); commandType.put(HiveParser.TOK_SHOWDATABASES, HiveOperation.SHOWDATABASES); commandType.put(HiveParser.TOK_SHOWTABLES, HiveOperation.SHOWTABLES); commandType.put(HiveParser.TOK_SHOWCOLUMNS, HiveOperation.SHOWCOLUMNS); @@ -111,6 +112,10 @@ public final class SemanticAnalyzerFactory { commandType.put(HiveParser.TOK_ALTERTABLE_PARTCOLTYPE, HiveOperation.ALTERTABLE_PARTCOLTYPE); commandType.put(HiveParser.TOK_SHOW_COMPACTIONS, HiveOperation.SHOW_COMPACTIONS); commandType.put(HiveParser.TOK_SHOW_TRANSACTIONS, HiveOperation.SHOW_TRANSACTIONS); + commandType.put(HiveParser.TOK_START_TRANSACTION, HiveOperation.START_TRANSACTION); + commandType.put(HiveParser.TOK_COMMIT, HiveOperation.COMMIT); + commandType.put(HiveParser.TOK_ROLLBACK, HiveOperation.ROLLBACK); + commandType.put(HiveParser.TOK_SET_AUTOCOMMIT, HiveOperation.SET_AUTOCOMMIT); } static { @@ -274,6 +279,10 @@ public final class SemanticAnalyzerFactory { case HiveParser.TOK_DELETE_FROM: return new UpdateDeleteSemanticAnalyzer(conf); + case HiveParser.TOK_START_TRANSACTION: + case HiveParser.TOK_COMMIT: + case HiveParser.TOK_ROLLBACK: + case HiveParser.TOK_SET_AUTOCOMMIT: default: { SemanticAnalyzer semAnalyzer = HiveConf .getBoolVar(conf, HiveConf.ConfVars.HIVE_CBO_ENABLED) ? new CalcitePlanner(conf) @@ -293,4 +302,7 @@ public final class SemanticAnalyzerFactory { private SemanticAnalyzerFactory() { // prevent instantiation } + static HiveOperation getOperation(int hiveParserToken) { + return commandType.get(hiveParserToken); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java index 75cdf16..fc6be2b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java @@ -102,7 +102,7 @@ public enum HiveOperation { CREATETABLE("CREATETABLE", null, new Privilege[]{Privilege.CREATE}), TRUNCATETABLE("TRUNCATETABLE", null, new Privilege[]{Privilege.DROP}), CREATETABLE_AS_SELECT("CREATETABLE_AS_SELECT", new Privilege[]{Privilege.SELECT}, new Privilege[]{Privilege.CREATE}), - QUERY("QUERY", new Privilege[]{Privilege.SELECT}, new Privilege[]{Privilege.ALTER_DATA, Privilege.CREATE}), + QUERY("QUERY", new Privilege[]{Privilege.SELECT}, new Privilege[]{Privilege.ALTER_DATA, Privilege.CREATE}, true, false), ALTERINDEX_PROPS("ALTERINDEX_PROPS",null, null), ALTERDATABASE("ALTERDATABASE", null, null), ALTERDATABASE_OWNER("ALTERDATABASE_OWNER", null, null), @@ -113,11 +113,16 @@ public enum HiveOperation { ALTERTBLPART_SKEWED_LOCATION("ALTERTBLPART_SKEWED_LOCATION", new Privilege[] {Privilege.ALTER_DATA}, null), ALTERTABLE_PARTCOLTYPE("ALTERTABLE_PARTCOLTYPE", new Privilege[] { Privilege.SELECT }, new Privilege[] { Privilege.ALTER_DATA }), + ALTERTABLE_EXCHANGEPARTITION("ALTERTABLE_EXCHANGEPARTITION", null, null), ALTERVIEW_RENAME("ALTERVIEW_RENAME", new Privilege[] {Privilege.ALTER_METADATA}, null), ALTERVIEW_AS("ALTERVIEW_AS", new Privilege[] {Privilege.ALTER_METADATA}, null), ALTERTABLE_COMPACT("ALTERTABLE_COMPACT", new Privilege[]{Privilege.SELECT}, new Privilege[]{Privilege.ALTER_DATA}), SHOW_COMPACTIONS("SHOW COMPACTIONS", null, null), - SHOW_TRANSACTIONS("SHOW TRANSACTIONS", null, null); + SHOW_TRANSACTIONS("SHOW TRANSACTIONS", null, null), + START_TRANSACTION("START TRANSACTION", null, null, false, false), + COMMIT("COMMIT", null, null, true, true), + ROLLBACK("ROLLBACK", null, null, true, true), + SET_AUTOCOMMIT("SET AUTOCOMMIT", null, null, true, false); ; private String operationName; @@ -126,6 +131,12 @@ public enum HiveOperation { private Privilege[] outputRequiredPrivileges; + /** + * Only a small set of operations is allowed inside an open transactions, e.g. DML + */ + private final boolean allowedInTransaction; + private final boolean requiresOpenTransaction; + public Privilege[] getInputRequiredPrivileges() { return inputRequiredPrivileges; } @@ -138,11 +149,26 @@ public enum HiveOperation { return operationName; } + public boolean isAllowedInTransaction() { + return allowedInTransaction; + } + public boolean isRequiresOpenTransaction() { return requiresOpenTransaction; } + private HiveOperation(String operationName, - Privilege[] inputRequiredPrivileges, Privilege[] outputRequiredPrivileges) { + Privilege[] inputRequiredPrivileges, Privilege[] outputRequiredPrivileges) { + this(operationName, inputRequiredPrivileges, outputRequiredPrivileges, false, false); + } + private HiveOperation(String operationName, + Privilege[] inputRequiredPrivileges, Privilege[] outputRequiredPrivileges, + boolean allowedInTransaction, boolean requiresOpenTransaction) { this.operationName = operationName; this.inputRequiredPrivileges = inputRequiredPrivileges; this.outputRequiredPrivileges = outputRequiredPrivileges; + this.requiresOpenTransaction = requiresOpenTransaction; + if(requiresOpenTransaction) { + allowedInTransaction = true; + } + this.allowedInTransaction = allowedInTransaction; } public static class PrivilegeAgreement { http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java b/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java index 4584517..21b7457 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java @@ -19,16 +19,19 @@ package org.apache.hadoop.hive.ql.processors; import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.ql.ErrorMsg; /** * Encapsulates the basic response info returned by classes the implement the * <code>CommandProcessor</code> interface. Typically <code>errorMessage</code> * and <code>SQLState</code> will only be set if the <code>responseCode</code> - * is not 0. + * is not 0. Note that often {@code responseCode} ends up the exit value of + * command shell process so should keep it to < 127. */ public class CommandProcessorResponse { private final int responseCode; private final String errorMessage; + private final int hiveErrorCode; private final String SQLState; private final Schema resSchema; @@ -49,6 +52,10 @@ public class CommandProcessorResponse { public CommandProcessorResponse(int responseCode, String errorMessage, String SQLState, Schema schema) { this(responseCode, errorMessage, SQLState, schema, null); } + public CommandProcessorResponse(int responseCode, ErrorMsg canonicalErrMsg, Throwable t, String ... msgArgs) { + this(responseCode, canonicalErrMsg.format(msgArgs), + canonicalErrMsg.getSQLState(), null, t, canonicalErrMsg.getErrorCode()); + } /** * Create CommandProcessorResponse object indicating an error. @@ -63,12 +70,17 @@ public class CommandProcessorResponse { } public CommandProcessorResponse(int responseCode, String errorMessage, String SQLState, - Schema schema, Throwable exception) { + Schema schema, Throwable exception) { + this(responseCode, errorMessage, SQLState, schema, exception, -1); + } + public CommandProcessorResponse(int responseCode, String errorMessage, String SQLState, + Schema schema, Throwable exception, int hiveErrorCode) { this.responseCode = responseCode; this.errorMessage = errorMessage; this.SQLState = SQLState; this.resSchema = schema; this.exception = exception; + this.hiveErrorCode = hiveErrorCode; } public int getResponseCode() { return responseCode; } @@ -76,8 +88,11 @@ public class CommandProcessorResponse { public String getSQLState() { return SQLState; } public Schema getSchema() { return resSchema; } public Throwable getException() { return exception; } + public int getErrorCode() { return hiveErrorCode; } public String toString() { - return "(" + responseCode + "," + errorMessage + "," + SQLState + + return "(" + responseCode + "," + errorMessage + "," + + (hiveErrorCode > 0 ? hiveErrorCode + "," : "" ) + + SQLState + (resSchema == null ? "" : ",") + (exception == null ? "" : exception.getMessage()) + ")"; } http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java b/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java index 319a79b..c8c9831 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java @@ -75,6 +75,9 @@ public enum HiveCommand { } else if(command.length > 1 && "from".equalsIgnoreCase(command[1])) { //special handling for SQL "delete from <table> where..." return null; + } + else if(command.length > 1 && "set".equalsIgnoreCase(command[0]) && "autocommit".equalsIgnoreCase(command[1])) { + return null;//don't want set autocommit true|false to get mixed with set hive.foo.bar... } else if (COMMANDS.contains(cmd)) { HiveCommand hiveCommand = HiveCommand.valueOf(cmd); http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HiveOperationType.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HiveOperationType.java b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HiveOperationType.java index b974b59..71be469 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HiveOperationType.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HiveOperationType.java @@ -125,6 +125,11 @@ public enum HiveOperationType { ADD, DELETE, COMPILE, + START_TRANSACTION, + COMMIT, + ROLLBACK, + SET_AUTOCOMMIT, + ALTERTABLE_EXCHANGEPARTITION, // ==== Hive command operations ends here ==== // // ==== HiveServer2 metadata api types start here ==== // http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/Operation2Privilege.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/Operation2Privilege.java b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/Operation2Privilege.java index a6226b6..8e61d57 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/Operation2Privilege.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/Operation2Privilege.java @@ -400,6 +400,17 @@ public class Operation2Privilege { op2Priv.put(HiveOperationType.GET_COLUMNS, PrivRequirement.newIOPrivRequirement(SEL_NOGRANT_AR, null)); + op2Priv.put(HiveOperationType.START_TRANSACTION, PrivRequirement.newIOPrivRequirement + (null, null)); + op2Priv.put(HiveOperationType.COMMIT, PrivRequirement.newIOPrivRequirement + (null, null)); + op2Priv.put(HiveOperationType.ROLLBACK, PrivRequirement.newIOPrivRequirement + (null, null)); + op2Priv.put(HiveOperationType.SET_AUTOCOMMIT, PrivRequirement.newIOPrivRequirement + (null, null)); + op2Priv.put(HiveOperationType.ALTERTABLE_EXCHANGEPARTITION, + PrivRequirement.newIOPrivRequirement(null, null)); + } /** http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index cbca280..9caf27e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -112,7 +112,7 @@ public class SessionState { /** * current configuration. */ - protected HiveConf conf; + private final HiveConf conf; /** * silent mode. @@ -245,23 +245,6 @@ public class SessionState { private HiveTxnManager txnMgr = null; /** - * When {@link #setCurrentTxn(long)} is set to this or {@link #getCurrentTxn()}} returns this it - * indicates that there is not a current transaction in this session. - */ - public static final long NO_CURRENT_TXN = -1L; - - /** - * Transaction currently open - */ - private long currentTxn = NO_CURRENT_TXN; - - /** - * Whether we are in auto-commit state or not. Currently we are always in auto-commit, - * so there are not setters for this yet. - */ - private final boolean txnAutoCommit = true; - - /** * store the jars loaded last time */ private final Set<String> preReloadableAuxJars = new HashSet<String>(); @@ -289,9 +272,6 @@ public class SessionState { return conf; } - public void setConf(HiveConf conf) { - this.conf = conf; - } public File getTmpOutputFile() { return tmpOutputFile; @@ -402,18 +382,6 @@ public class SessionState { return txnMgr; } - public long getCurrentTxn() { - return currentTxn; - } - - public void setCurrentTxn(long currTxn) { - currentTxn = currTxn; - } - - public boolean isAutoCommit() { - return txnAutoCommit; - } - public HadoopShims.HdfsEncryptionShim getHdfsEncryptionShim() throws HiveException { if (hdfsEncryptionShim == null) { try { http://git-wip-us.apache.org/repos/asf/hive/blob/012c99ff/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java new file mode 100644 index 0000000..c73621f --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -0,0 +1,473 @@ +package org.apache.hadoop.hive.ql; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.orc.FileDump; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +/** + * The LockManager is not ready, but for no-concurrency straight-line path we can + * test AC=true, and AC=false with commit/rollback/exception and test resulting data. + * + * Can also test, calling commit in AC=true mode, etc, toggling AC... + */ +public class TestTxnCommands { + private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + + File.separator + TestTxnCommands.class.getCanonicalName() + + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; + //bucket count for test tables; set it to 1 for easier debugging + private static int BUCKET_COUNT = 2; + @Rule + public TestName testName = new TestName(); + private HiveConf hiveConf; + private Driver d; + private static enum Table { + ACIDTBL("acidTbl"), + ACIDTBL2("acidTbl2"), + NONACIDORCTBL("nonAcidOrcTbl"), + NONACIDORCTBL2("nonAcidOrcTbl2"); + + private final String name; + @Override + public String toString() { + return name; + } + Table(String name) { + this.name = name; + } + } + + @Before + public void setUp() throws Exception { + tearDown(); + hiveConf = new HiveConf(this.getClass()); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); + TxnDbUtil.setConfValues(hiveConf); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVEENFORCEBUCKETING, true); + TxnDbUtil.prepDb(); + File f = new File(TEST_WAREHOUSE_DIR); + if (f.exists()) { + FileUtil.fullyDelete(f); + } + if (!(new File(TEST_WAREHOUSE_DIR).mkdirs())) { + throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR); + } + SessionState.start(new SessionState(hiveConf)); + d = new Driver(hiveConf); + dropTables(); + runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("create temporary table " + Table.ACIDTBL2 + "(a int, b int, c int) clustered by (c) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + } + private void dropTables() throws Exception { + for(Table t : Table.values()) { + runStatementOnDriver("drop table if exists " + t); + } + } + @After + public void tearDown() throws Exception { + try { + if (d != null) { + runStatementOnDriver("set autocommit true"); + dropTables(); + d.destroy(); + d.close(); + d = null; + } + } finally { + TxnDbUtil.cleanDb(); + FileUtils.deleteDirectory(new File(TEST_DATA_DIR)); + } + } + @Test + public void testInsertOverwrite() throws Exception { + runStatementOnDriver("insert overwrite table " + Table.NONACIDORCTBL + " select a,b from " + Table.NONACIDORCTBL2); + runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "3(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); + + } + @Ignore("not needed but useful for testing") + @Test + public void testNonAcidInsert() throws Exception { + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); + List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(2,3)"); + List<String> rs1 = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + } + + /** + * Useful for debugging. Dumps ORC file in JSON to CWD. + */ + private void dumpBucketData(Table table, long txnId, int stmtId, int bucketNum) throws Exception { + if(true) { + return; + } + Path bucket = AcidUtils.createBucketFile(new Path(new Path(TEST_WAREHOUSE_DIR, table.toString().toLowerCase()), AcidUtils.deltaSubdir(txnId, txnId, stmtId)), bucketNum); + FileOutputStream delta = new FileOutputStream(testName.getMethodName() + "_" + bucket.getParent().getName() + "_" + bucket.getName()); +// try { +// FileDump.printJsonData(hiveConf, bucket.toString(), delta); +// } +// catch(FileNotFoundException ex) { + ;//this happens if you change BUCKET_COUNT +// } + delta.close(); + } + /** + * Dump all data in the table by bucket in JSON format + */ + private void dumpTableData(Table table, long txnId, int stmtId) throws Exception { + for(int bucketNum = 0; bucketNum < BUCKET_COUNT; bucketNum++) { + dumpBucketData(table, txnId, stmtId, bucketNum); + } + } + @Test + public void testSimpleAcidInsert() throws Exception { + int[][] rows1 = {{1,2},{3,4}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1)); + //List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + //Assert.assertEquals("Data didn't match in autocommit=true (rs)", stringifyValues(rows1), rs); + runStatementOnDriver("set autocommit false"); + runStatementOnDriver("START TRANSACTION"); + int[][] rows2 = {{5,6},{7,8}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2)); + List<String> allData = stringifyValues(rows1); + allData.addAll(stringifyValues(rows2)); + List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Data didn't match inside tx (rs0)", allData, rs0); + runStatementOnDriver("COMMIT WORK"); + dumpTableData(Table.ACIDTBL, 1, 0); + dumpTableData(Table.ACIDTBL, 2, 0); + runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + runStatementOnDriver("COMMIT");//txn started implicitly by previous statement + runStatementOnDriver("set autocommit true"); + List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Data didn't match inside tx (rs0)", allData, rs1); + } + + /** + * add tests for all transitions - AC=t, AC=t, AC=f, commit (for example) + * @throws Exception + */ + @Test + public void testErrors() throws Exception { + runStatementOnDriver("set autocommit true"); + CommandProcessorResponse cpr = runStatementOnDriverNegative("start transaction"); + Assert.assertEquals("Error didn't match: " + cpr, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode()); + runStatementOnDriver("set autocommit false"); + runStatementOnDriver("start transaction"); + CommandProcessorResponse cpr2 = runStatementOnDriverNegative("create table foo(x int, y int)"); + Assert.assertEquals("Expected DDL to fail in an open txn", ErrorMsg.OP_NOT_ALLOWED_IN_TXN.getErrorCode(), cpr2.getErrorCode()); + runStatementOnDriver("set autocommit true"); + CommandProcessorResponse cpr3 = runStatementOnDriverNegative("update " + Table.ACIDTBL + " set a = 1 where b != 1"); + Assert.assertEquals("Expected update of bucket column to fail", + "FAILED: SemanticException [Error 10302]: Updating values of bucketing columns is not supported. Column a.", + cpr3.getErrorMessage()); + //line below should in principle work but Driver doesn't propagate errorCode properly + //Assert.assertEquals("Expected update of bucket column to fail", ErrorMsg.UPDATE_CANNOT_UPDATE_BUCKET_VALUE.getErrorCode(), cpr3.getErrorCode()); + cpr3 = runStatementOnDriverNegative("commit work");//not allowed in AC=true + Assert.assertEquals("Error didn't match: " + cpr3, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode()); + cpr3 = runStatementOnDriverNegative("rollback work");//not allowed in AC=true + Assert.assertEquals("Error didn't match: " + cpr3, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode()); + runStatementOnDriver("set autocommit false"); + cpr3 = runStatementOnDriverNegative("commit");//not allowed in w/o tx + Assert.assertEquals("Error didn't match: " + cpr3, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode()); + cpr3 = runStatementOnDriverNegative("rollback");//not allowed in w/o tx + Assert.assertEquals("Error didn't match: " + cpr3, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode()); + runStatementOnDriver("start transaction"); + cpr3 = runStatementOnDriverNegative("start transaction");//not allowed in a tx + Assert.assertEquals("Expected start transaction to fail", ErrorMsg.OP_NOT_ALLOWED_IN_TXN.getErrorCode(), cpr3.getErrorCode()); + runStatementOnDriver("start transaction");//ok since previously opened txn was killed + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)"); + List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Can't see my own write", 1, rs0.size()); + runStatementOnDriver("set autocommit true");//this should commit previous txn + rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Can't see my own write", 1, rs0.size()); + } + @Test + public void testReadMyOwnInsert() throws Exception { + runStatementOnDriver("set autocommit false"); + runStatementOnDriver("START TRANSACTION"); + List<String> rs = runStatementOnDriver("select * from " + Table.ACIDTBL); + Assert.assertEquals("Expected empty " + Table.ACIDTBL, 0, rs.size()); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)"); + List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Can't see my own write", 1, rs0.size()); + runStatementOnDriver("commit"); + runStatementOnDriver("START TRANSACTION"); + List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + runStatementOnDriver("rollback work"); + Assert.assertEquals("Can't see write after commit", 1, rs1.size()); + } + @Test + public void testImplicitRollback() throws Exception { + runStatementOnDriver("set autocommit false"); + runStatementOnDriver("START TRANSACTION"); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)"); + List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Can't see my own write", 1, rs0.size()); + //next command should produce an error + CommandProcessorResponse cpr = runStatementOnDriverNegative("select * from no_such_table"); + Assert.assertEquals("Txn didn't fail?", + "FAILED: SemanticException [Error 10001]: Line 1:14 Table not found 'no_such_table'", + cpr.getErrorMessage()); + runStatementOnDriver("start transaction"); + List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + runStatementOnDriver("commit"); + Assert.assertEquals("Didn't rollback as expected", 0, rs1.size()); + } + @Test + public void testExplicitRollback() throws Exception { + runStatementOnDriver("set autocommit false"); + runStatementOnDriver("START TRANSACTION"); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)"); + runStatementOnDriver("ROLLBACK"); + runStatementOnDriver("set autocommit true"); + List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Rollback didn't rollback", 0, rs.size()); + } + + @Test + public void testMultipleInserts() throws Exception { + runStatementOnDriver("set autocommit false"); + runStatementOnDriver("START TRANSACTION"); + int[][] rows1 = {{1,2},{3,4}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1)); + int[][] rows2 = {{5,6},{7,8}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2)); + List<String> allData = stringifyValues(rows1); + allData.addAll(stringifyValues(rows2)); + List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Content didn't match before commit rs", allData, rs); + runStatementOnDriver("commit"); + dumpTableData(Table.ACIDTBL, 1, 0); + dumpTableData(Table.ACIDTBL, 1, 1); + runStatementOnDriver("set autocommit true"); + List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Content didn't match after commit rs1", allData, rs1); + } + @Test + public void testDelete() throws Exception { + int[][] rows1 = {{1,2},{3,4}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1)); + List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0); + runStatementOnDriver("set autocommit false"); + runStatementOnDriver("START TRANSACTION"); + runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 4"); + int[][] updatedData2 = {{1,2}}; + List<String> rs3 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData2), rs3); + runStatementOnDriver("commit"); + runStatementOnDriver("set autocommit true"); + List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData2), rs4); + } + + @Test + public void testUpdateOfInserts() throws Exception { + int[][] rows1 = {{1,2},{3,4}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1)); + List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0); + runStatementOnDriver("set autocommit false"); + runStatementOnDriver("START TRANSACTION"); + int[][] rows2 = {{5,6},{7,8}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2)); + List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + List<String> allData = stringifyValues(rows1); + allData.addAll(stringifyValues(rows2)); + Assert.assertEquals("Content didn't match rs1", allData, rs1); + runStatementOnDriver("update " + Table.ACIDTBL + " set b = 1 where b != 1"); + int[][] updatedData = {{1,1},{3,1},{5,1},{7,1}}; + List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Wrong data after update", stringifyValues(updatedData), rs2); + runStatementOnDriver("commit"); + runStatementOnDriver("set autocommit true"); + List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData), rs4); + } + @Test + public void testUpdateDeleteOfInserts() throws Exception { + int[][] rows1 = {{1,2},{3,4}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1)); + List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0); + runStatementOnDriver("set autocommit false"); + runStatementOnDriver("START TRANSACTION"); + int[][] rows2 = {{5,6},{7,8}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2)); + List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + List<String> allData = stringifyValues(rows1); + allData.addAll(stringifyValues(rows2)); + Assert.assertEquals("Content didn't match rs1", allData, rs1); + runStatementOnDriver("update " + Table.ACIDTBL + " set b = 1 where b != 1"); + int[][] updatedData = {{1,1},{3,1},{5,1},{7,1}}; + List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Wrong data after update", stringifyValues(updatedData), rs2); + runStatementOnDriver("delete from " + Table.ACIDTBL + " where a = 7 and b = 1"); + dumpTableData(Table.ACIDTBL, 1, 0); + dumpTableData(Table.ACIDTBL, 2, 0); + dumpTableData(Table.ACIDTBL, 2, 2); + dumpTableData(Table.ACIDTBL, 2, 4); + int[][] updatedData2 = {{1,1},{3,1},{5,1}}; + List<String> rs3 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData2), rs3); + runStatementOnDriver("commit"); + runStatementOnDriver("set autocommit true"); + List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData2), rs4); + } + @Test + public void testMultipleDelete() throws Exception { + int[][] rows1 = {{1,2},{3,4},{5,6},{7,8}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1)); + List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0); + runStatementOnDriver("set autocommit false"); + runStatementOnDriver("START TRANSACTION"); + runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 8"); + int[][] updatedData2 = {{1,2},{3,4},{5,6}}; + List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData2), rs2); + runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 4"); + int[][] updatedData3 = {{1, 2}, {5, 6}}; + List<String> rs3 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Wrong data after delete2", stringifyValues(updatedData3), rs3); + runStatementOnDriver("update " + Table.ACIDTBL + " set b=3"); + dumpTableData(Table.ACIDTBL, 1, 0); + //nothing actually hashes to bucket0, so update/delete deltas don't have it + dumpTableData(Table.ACIDTBL, 2, 0); + dumpTableData(Table.ACIDTBL, 2, 2); + dumpTableData(Table.ACIDTBL, 2, 4); + List<String> rs5 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + int [][] updatedData4 = {{1,3},{5,3}}; + Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData4), rs5); + runStatementOnDriver("commit"); + runStatementOnDriver("set autocommit true"); + List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData4), rs4); + } + @Test + public void testDeleteIn() throws Exception { + runStatementOnDriver("delete from " + Table.ACIDTBL + " where a IN (SELECT A.a from " + + Table.ACIDTBL + " A)"); + int[][] tableData = {{1,2},{3,2},{5,2},{1,3},{3,3},{5,3}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData)); + runStatementOnDriver("insert into " + Table.ACIDTBL2 + "(a,b,c) values(1,7,17),(3,7,17)"); +// runStatementOnDriver("select b from " + Table.ACIDTBL + " where a in (select b from " + Table.NONACIDORCTBL + ")"); + runStatementOnDriver("delete from " + Table.ACIDTBL + " where a in(select a from " + Table.ACIDTBL2 + ")"); +// runStatementOnDriver("delete from " + Table.ACIDTBL + " where a in(select a from " + Table.NONACIDORCTBL + ")"); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) select a,b from " + Table.ACIDTBL2); + List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + int[][] updatedData = {{1,7},{3,7},{5,2},{5,3}}; + Assert.assertEquals("Bulk update failed", stringifyValues(updatedData), rs); + } + + /** + * takes raw data and turns it into a string as if from Driver.getResults() + * sorts rows in dictionary order + */ + private List<String> stringifyValues(int[][] rowsIn) { + assert rowsIn.length > 0; + int[][] rows = rowsIn.clone(); + Arrays.sort(rows, new RowComp()); + List<String> rs = new ArrayList<String>(); + for(int[] row : rows) { + assert row.length > 0; + StringBuilder sb = new StringBuilder(); + for(int value : row) { + sb.append(value).append("\t"); + } + sb.setLength(sb.length() - 1); + rs.add(sb.toString()); + } + return rs; + } + private static final class RowComp implements Comparator<int[]> { + public int compare(int[] row1, int[] row2) { + assert row1 != null && row2 != null && row1.length == row2.length; + for(int i = 0; i < row1.length; i++) { + int comp = Integer.compare(row1[i], row2[i]); + if(comp != 0) { + return comp; + } + } + return 0; + } + } + private String makeValuesClause(int[][] rows) { + assert rows.length > 0; + StringBuilder sb = new StringBuilder("values"); + for(int[] row : rows) { + assert row.length > 0; + if(row.length > 1) { + sb.append("("); + } + for(int value : row) { + sb.append(value).append(","); + } + sb.setLength(sb.length() - 1);//remove trailing comma + if(row.length > 1) { + sb.append(")"); + } + sb.append(","); + } + sb.setLength(sb.length() - 1);//remove trailing comma + return sb.toString(); + } + + private List<String> runStatementOnDriver(String stmt) throws Exception { + CommandProcessorResponse cpr = d.run(stmt); + if(cpr.getResponseCode() != 0) { + throw new RuntimeException(stmt + " failed: " + cpr); + } + List<String> rs = new ArrayList<String>(); + d.getResults(rs); + return rs; + } + private CommandProcessorResponse runStatementOnDriverNegative(String stmt) throws Exception { + CommandProcessorResponse cpr = d.run(stmt); + if(cpr.getResponseCode() != 0) { + return cpr; + } + throw new RuntimeException("Didn't get expected failure!"); + } + +// @Ignore + @Test + public void exchangePartition() throws Exception { + runStatementOnDriver("create database ex1"); + runStatementOnDriver("create database ex2"); + + runStatementOnDriver("CREATE TABLE ex1.exchange_part_test1 (f1 string) PARTITIONED BY (ds STRING)"); + runStatementOnDriver("CREATE TABLE ex2.exchange_part_test2 (f1 string) PARTITIONED BY (ds STRING)"); + runStatementOnDriver("ALTER TABLE ex2.exchange_part_test2 ADD PARTITION (ds='2013-04-05')"); + runStatementOnDriver("ALTER TABLE ex1.exchange_part_test1 EXCHANGE PARTITION (ds='2013-04-05') WITH TABLE ex2.exchange_part_test2"); + } +}