Repository: hive Updated Branches: refs/heads/branch-3 604178d91 -> 59f8aae2c
HIVE-19382 : Acquire locks before generating valid transaction list for some operations (Jesus Camacho Rodriguez via Ashutosh Chauhan) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1cd543f0 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1cd543f0 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1cd543f0 Branch: refs/heads/branch-3 Commit: 1cd543f046fa8d73b9b975eaa530730d6a4f7f13 Parents: 604178d Author: Jesus Camacho Rodriguez <jcama...@apache.org> Authored: Tue Jun 12 16:09:33 2018 -0700 Committer: Jesus Camacho Rodriguez <jcama...@apache.org> Committed: Tue Jun 12 16:09:33 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hive/ql/Context.java | 43 ++++ .../java/org/apache/hadoop/hive/ql/Driver.java | 201 ++++++++++++++++--- .../apache/hadoop/hive/ql/exec/FetchTask.java | 1 + 3 files changed, 222 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1cd543f0/ql/src/java/org/apache/hadoop/hive/ql/Context.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 1921ea7..9eda4ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -291,6 +291,7 @@ public class Context { public DestClausePrefix addDestNamePrefix(int pos, DestClausePrefix prefix) { return insertBranchToNamePrefix.put(pos, prefix); } + public Context(Configuration conf) throws IOException { this(conf, generateExecutionId()); } @@ -315,6 +316,48 @@ public class Context { viewsTokenRewriteStreams = new HashMap<>(); } + protected Context(Context ctx) { + // This method creates a deep copy of context, but the copy is partial, + // hence it needs to be used carefully. In particular, following objects + // are ignored: + // opContext, pathToCS, cboInfo, cboSucceeded, tokenRewriteStream, viewsTokenRewriteStreams, + // rewrittenStatementContexts, cteTables, loadTableOutputMap, planMapper, insertBranchToNamePrefix + this.isHDFSCleanup = ctx.isHDFSCleanup; + this.resFile = ctx.resFile; + this.resDir = ctx.resDir; + this.resFs = ctx.resFs; + this.resDirPaths = ctx.resDirPaths; + this.resDirFilesNum = ctx.resDirFilesNum; + this.initialized = ctx.initialized; + this.originalTracker = ctx.originalTracker; + this.nonLocalScratchPath = ctx.nonLocalScratchPath; + this.localScratchDir = ctx.localScratchDir; + this.scratchDirPermission = ctx.scratchDirPermission; + this.fsScratchDirs.putAll(ctx.fsScratchDirs); + this.conf = ctx.conf; + this.pathid = ctx.pathid; + this.explainConfig = ctx.explainConfig; + this.cmd = ctx.cmd; + this.executionId = ctx.executionId; + this.hiveLocks = ctx.hiveLocks; + this.hiveTxnManager = ctx.hiveTxnManager; + this.needLockMgr = ctx.needLockMgr; + this.sequencer = ctx.sequencer; + this.outputLockObjects.putAll(ctx.outputLockObjects); + this.stagingDir = ctx.stagingDir; + this.heartbeater = ctx.heartbeater; + this.skipTableMasking = ctx.skipTableMasking; + this.isUpdateDeleteMerge = ctx.isUpdateDeleteMerge; + this.isLoadingMaterializedView = ctx.isLoadingMaterializedView; + this.operation = ctx.operation; + this.wmContext = ctx.wmContext; + this.isExplainPlan = ctx.isExplainPlan; + this.statsSource = ctx.statsSource; + this.executionIndex = ctx.executionIndex; + this.viewsTokenRewriteStreams = new HashMap<>(); + this.rewrittenStatementContexts = new HashSet<>(); + } + public Map<String, Path> getFsScratchDirs() { return fsScratchDirs; } http://git-wip-us.apache.org/repos/asf/hive/blob/1cd543f0/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 08f9a67..8b5262a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -39,15 +39,16 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; -import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; @@ -60,11 +61,11 @@ import org.apache.hadoop.hive.conf.HiveVariableSource; import org.apache.hadoop.hive.conf.VariableSubstitution; import org.apache.hadoop.hive.metastore.ColumnType; import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; -import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; -import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.cache.results.CacheUsage; import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry; @@ -92,6 +93,7 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.log.PerfLogger; @@ -141,6 +143,7 @@ import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hive.common.util.ShutdownHookManager; +import org.apache.hive.common.util.TxnIdUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -215,6 +218,9 @@ public class Driver implements IDriver { private CacheEntry usedCacheEntry; private ValidWriteIdList compactionWriteIds = null; + private Context backupContext = null; + private boolean retrial = false; + private enum DriverState { INITIALIZED, COMPILING, @@ -625,10 +631,11 @@ public class Driver implements IDriver { // because at that point we need access to the objects. Hive.get().getMSC().flushCache(); - BaseSemanticAnalyzer sem; - // Do semantic analysis and plan generation - if (hookRunner.hasPreAnalyzeHooks()) { - HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl(); + backupContext = new Context(ctx); + boolean executeHooks = hookRunner.hasPreAnalyzeHooks(); + + HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl(); + if (executeHooks) { hookCtx.setConf(conf); hookCtx.setUserName(userName); hookCtx.setIpAddress(SessionState.get().getUserIpAddress()); @@ -636,24 +643,24 @@ public class Driver implements IDriver { hookCtx.setHiveOperation(queryState.getHiveOperation()); tree = hookRunner.runPreAnalyzeHooks(hookCtx, tree); - sem = SemanticAnalyzerFactory.get(queryState, tree); + } + + // Do semantic analysis and plan generation + BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree); + + if (!retrial) { openTransaction(); - // TODO: Lock acquisition should be moved before this method call - // when we want to implement lock-based concurrency control generateValidTxnList(); - sem.analyze(tree, ctx); - hookCtx.update(sem); + } + + sem.analyze(tree, ctx); + if (executeHooks) { + hookCtx.update(sem); hookRunner.runPostAnalyzeHooks(hookCtx, sem.getAllRootTasks()); - } else { - sem = SemanticAnalyzerFactory.get(queryState, tree); - openTransaction(); - // TODO: Lock acquisition should be moved before this method call - // when we want to implement lock-based concurrency control - generateValidTxnList(); - sem.analyze(tree, ctx); } - LOG.info("Semantic Analysis Completed"); + + LOG.info("Semantic Analysis Completed (retrial = {})", retrial); // Retrieve information about cache usage for the query. if (conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)) { @@ -671,7 +678,6 @@ public class Driver implements IDriver { plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId, queryState.getHiveOperation(), schema); - conf.set("mapreduce.workflow.id", "hive_" + queryId); conf.set("mapreduce.workflow.name", queryStr); @@ -774,6 +780,86 @@ public class Driver implements IDriver { } } + // Checks whether txn list has been invalidated while planning the query. + // This would happen if query requires exclusive/semi-shared lock, and there + // has been a committed transaction on the table over which the lock is + // required. + private boolean isValidTxnListState() throws LockException { + // 1) Get valid txn list. + String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); + if (txnString == null) { + // Not a transactional op, nothing more to do + return true; + } + ValidTxnList currentTxnList = queryTxnMgr.getValidTxns(); + String currentTxnString = currentTxnList.toString(); + if (currentTxnString.equals(txnString)) { + // Still valid, nothing more to do + return true; + } + // 2) Get locks that are relevant: + // - Exclusive for INSERT OVERWRITE. + // - Semi-shared for UPDATE/DELETE. + if (ctx.getHiveLocks() == null || ctx.getHiveLocks().isEmpty()) { + // Nothing to check + return true; + } + Set<String> nonSharedLocks = new HashSet<>(); + for (HiveLock lock : ctx.getHiveLocks()) { + if (lock.getHiveLockMode() == HiveLockMode.EXCLUSIVE || + lock.getHiveLockMode() == HiveLockMode.SEMI_SHARED) { + if (lock.getHiveLockObject().getPaths().length == 2) { + // Pos 0 of lock paths array contains dbname, pos 1 contains tblname + nonSharedLocks.add( + Warehouse.getQualifiedName( + lock.getHiveLockObject().getPaths()[0], lock.getHiveLockObject().getPaths()[1])); + } + } + } + // 3) Get txn tables that are being written + ValidTxnWriteIdList txnWriteIdList = + new ValidTxnWriteIdList(conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY)); + if (txnWriteIdList == null) { + // Nothing to check + return true; + } + List<Pair<String, Table>> writtenTables = getWrittenTableList(plan); + ValidTxnWriteIdList currentTxnWriteIds = + queryTxnMgr.getValidWriteIds( + writtenTables.stream() + .filter(e -> AcidUtils.isTransactionalTable(e.getRight())) + .map(e -> e.getLeft()) + .collect(Collectors.toList()), + currentTxnString); + for (Pair<String, Table> tableInfo : writtenTables) { + String fullQNameForLock = Warehouse.getQualifiedName( + tableInfo.getRight().getDbName(), + MetaStoreUtils.encodeTableName(tableInfo.getRight().getTableName())); + if (nonSharedLocks.contains(fullQNameForLock)) { + // Check if table is transactional + if (AcidUtils.isTransactionalTable(tableInfo.getRight())) { + // Check that write id is still valid + if (!TxnIdUtils.checkEquivalentWriteIds( + txnWriteIdList.getTableValidWriteIdList(tableInfo.getLeft()), + currentTxnWriteIds.getTableValidWriteIdList(tableInfo.getLeft()))) { + // Write id has changed, it is not valid anymore, + // we need to recompile + return false; + } + } + nonSharedLocks.remove(fullQNameForLock); + } + } + if (!nonSharedLocks.isEmpty()) { + throw new LockException("Wrong state: non-shared locks contain information for tables that have not" + + " been visited when trying to validate the locks from query tables.\n" + + "Tables: " + writtenTables.stream().map(e -> e.getLeft()).collect(Collectors.toList()) + "\n" + + "Remaining locks after check: " + nonSharedLocks); + } + // It passes the test, it is valid + return true; + } + private void setTriggerContext(final String queryId) { final long queryStartTime; // query info is created by SQLOperation which will have start time of the operation. When JDBC Statement is not @@ -1392,6 +1478,34 @@ public class Driver implements IDriver { tableList.add(fullTableName); } + // Make the list of transactional tables list which are getting written by current txn + private List<Pair<String, Table>> getWrittenTableList(QueryPlan plan) { + List<Pair<String, Table>> result = new ArrayList<>(); + Set<String> tableList = new HashSet<>(); + for (WriteEntity output : plan.getOutputs()) { + Table tbl; + switch (output.getType()) { + case TABLE: { + tbl = output.getTable(); + break; + } + case PARTITION: + case DUMMYPARTITION: { + tbl = output.getPartition().getTable(); + break; + } + default: { + continue; + } + } + String fullTableName = AcidUtils.getFullTableName(tbl.getDbName(), tbl.getTableName()); + if (tableList.add(fullTableName)) { + result.add(new ImmutablePair(fullTableName, tbl)); + } + } + return result; + } + private String getUserFromUGI() { // Don't use the userName member, as it may or may not have been set. Get the value from // conf, which calls into getUGI to figure out who the process is running as. @@ -1462,7 +1576,6 @@ public class Driver implements IDriver { acidDdlDesc.setWriteId(writeId); } - /*It's imperative that {@code acquireLocks()} is called for all commands so that HiveTxnManager can transition its state machine correctly*/ queryTxnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState); @@ -1823,6 +1936,48 @@ public class Driver implements IDriver { lockAndRespond(); try { + if (!isValidTxnListState()) { + // Snapshot was outdated when locks were acquired, hence regenerate context, + // txn list and retry + // TODO: Lock acquisition should be moved before analyze, this is a bit hackish. + // Currently, we acquire a snapshot, we compile the query wrt that snapshot, + // and then, we acquire locks. If snapshot is still valid, we continue as usual. + // But if snapshot is not valid, we recompile the query. + retrial = true; + backupContext.addRewrittenStatementContext(ctx); + backupContext.setHiveLocks(ctx.getHiveLocks()); + ctx = backupContext; + conf.set(ValidTxnList.VALID_TXNS_KEY, queryTxnMgr.getValidTxns().toString()); + if (plan.hasAcidResourcesInQuery()) { + recordValidWriteIds(queryTxnMgr); + } + + if (!alreadyCompiled) { + // compile internal will automatically reset the perf logger + compileInternal(command, true); + } else { + // Since we're reusing the compiled plan, we need to update its start time for current run + plan.setQueryStartTime(queryDisplay.getQueryStartTime()); + } + + if (!isValidTxnListState()) { + // Throw exception + throw handleHiveException(new HiveException("Operation could not be executed"), 14); + } + + //Reset the PerfLogger + perfLogger = SessionState.getPerfLogger(true); + + // the reason that we set the txn manager for the cxt here is because each + // query has its own ctx object. The txn mgr is shared across the + // same instance of Driver, which can run multiple queries. + ctx.setHiveTxnManager(queryTxnMgr); + } + } catch (LockException e) { + throw handleHiveException(e, 13); + } + + try { execute(); } catch (CommandProcessorResponse cpr) { rollback(cpr); http://git-wip-us.apache.org/repos/asf/hive/blob/1cd543f0/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index e555aec..caa9d83 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -59,6 +59,7 @@ public class FetchTask extends Task<FetchWork> implements Serializable { public void setValidWriteIdList(String writeIdStr) { fetch.setValidWriteIdList(writeIdStr); } + @Override public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext ctx, CompilationOpContext opContext) {