Repository: hive Updated Branches: refs/heads/master c4b2549be -> 1d7665683
HIVE-11550 ACID queries pollute HiveConf (Eugene Koifman, reviewed by Alan Gates) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1d766568 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1d766568 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1d766568 Branch: refs/heads/master Commit: 1d76656833080a5cb1ea3717079945213e8d48cb Parents: c4b2549 Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Fri May 13 12:21:54 2016 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Fri May 13 12:21:54 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/optimizer/Optimizer.java | 2 +- .../correlation/AbstractCorrelationProcCtx.java | 21 +++++++++++++++++++- .../hadoop/hive/ql/parse/ParseContext.java | 18 +++++++++++++++-- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 10 ++++++++-- .../hadoop/hive/ql/parse/TaskCompiler.java | 3 ++- 5 files changed, 47 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1d766568/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java index 55c71dd..bf9a0a3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java @@ -195,7 +195,7 @@ public class Optimizer { transformations.add(new FixedBucketPruningOptimizer(compatMode)); } - if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) { + if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION) || pctx.hasAcidWrite()) { transformations.add(new ReduceSinkDeDuplication()); } transformations.add(new NonBlockingOpDeDupProc()); http://git-wip-us.apache.org/repos/asf/hive/blob/1d766568/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java index 5b673df..bb4ec55 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java @@ -28,8 +28,12 @@ import java.util.Set; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; abstract class AbstractCorrelationProcCtx implements NodeProcessorCtx { + private static final Logger LOG = LoggerFactory.getLogger(AbstractCorrelationProcCtx.class); private ParseContext pctx; // For queries using script, the optimization cannot be applied without user's confirmation // If script preserves alias and value for columns related to keys, user can set this true @@ -45,7 +49,22 @@ abstract class AbstractCorrelationProcCtx implements NodeProcessorCtx { public AbstractCorrelationProcCtx(ParseContext pctx) { removedOps = new HashSet<Operator<?>>(); trustScript = pctx.getConf().getBoolVar(HIVESCRIPTOPERATORTRUST); - minReducer = pctx.getConf().getIntVar(HIVEOPTREDUCEDEDUPLICATIONMINREDUCER); + if(pctx.hasAcidWrite()) { + StringBuilder tblNames = new StringBuilder(); + for(FileSinkDesc fsd : pctx.getAcidSinks()) { + if(fsd.getTable() != null) { + tblNames.append(fsd.getTable().getDbName()).append('.').append(fsd.getTable().getTableName()).append(','); + } + } + if(tblNames.length() > 0) { + tblNames.setLength(tblNames.length() - 1);//traling ',' + } + LOG.info("Overriding " + HIVEOPTREDUCEDEDUPLICATIONMINREDUCER + " to 1 due to a write to transactional table(s) " + tblNames); + minReducer = 1; + } + else { + minReducer = pctx.getConf().getIntVar(HIVEOPTREDUCEDEDUPLICATIONMINREDUCER); + } isMapAggr = pctx.getConf().getBoolVar(HIVEMAPSIDEAGGREGATE); this.pctx = pctx; } http://git-wip-us.apache.org/repos/asf/hive/blob/1d766568/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index 5d0f905..96ef20d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse; import java.io.Serializable; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -48,6 +49,7 @@ import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.AnalyzeRewriteContext; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc.SampleDesc; import org.apache.hadoop.hive.ql.plan.LoadFileDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; @@ -115,6 +117,8 @@ public class ParseContext { private Map<SelectOperator, Table> viewProjectToViewSchema; private ColumnAccessInfo columnAccessInfo; private boolean needViewColumnAuthorization; + private Set<FileSinkDesc> acidFileSinks = Collections.emptySet(); + public ParseContext() { } @@ -174,7 +178,8 @@ public class ParseContext { Map<String, ReadEntity> viewAliasToInput, List<ReduceSinkOperator> reduceSinkOperatorsAddedByEnforceBucketingSorting, AnalyzeRewriteContext analyzeRewrite, CreateTableDesc createTableDesc, - QueryProperties queryProperties, Map<SelectOperator, Table> viewProjectToTableSchema) { + QueryProperties queryProperties, Map<SelectOperator, Table> viewProjectToTableSchema, + Set<FileSinkDesc> acidFileSinks) { this.queryState = queryState; this.conf = queryState.getConf(); this.opToPartPruner = opToPartPruner; @@ -211,8 +216,17 @@ public class ParseContext { // authorization info. this.columnAccessInfo = new ColumnAccessInfo(); } + if(acidFileSinks != null && !acidFileSinks.isEmpty()) { + this.acidFileSinks = new HashSet<>(); + this.acidFileSinks.addAll(acidFileSinks); + } + } + public Set<FileSinkDesc> getAcidSinks() { + return acidFileSinks; + } + public boolean hasAcidWrite() { + return !acidFileSinks.isEmpty(); } - /** * @return the context */ http://git-wip-us.apache.org/repos/asf/hive/blob/1d766568/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index f79a525..7162c08 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -452,7 +452,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting, - analyzeRewrite, tableDesc, queryProperties, viewProjectToTableSchema); + analyzeRewrite, tableDesc, queryProperties, viewProjectToTableSchema, acidFileSinks); } public CompilationOpContext getOpContext() { @@ -7026,9 +7026,15 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { LOG.debug("Couldn't find table " + tableName + " in insertIntoTable"); throw new SemanticException(ErrorMsg.NO_INSERT_OVERWRITE_WITH_ACID.getMsg()); } + /* LOG.info("Modifying config values for ACID write"); conf.setBoolVar(ConfVars.HIVEOPTREDUCEDEDUPLICATION, true); conf.setIntVar(ConfVars.HIVEOPTREDUCEDEDUPLICATIONMINREDUCER, 1); + These props are now enabled elsewhere (see commit diffs). It would be better instead to throw + if they are not set. For exmaple, if user has set hive.optimize.reducededuplication=false for + some reason, we'll run a query contrary to what they wanted... But throwing now would be + backwards incompatible. + */ conf.set(AcidUtils.CONF_ACID_KEY, "true"); if (table.getNumBuckets() < 1) { @@ -10687,7 +10693,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting, - analyzeRewrite, tableDesc, queryProperties, viewProjectToTableSchema); + analyzeRewrite, tableDesc, queryProperties, viewProjectToTableSchema, acidFileSinks); // 5. Take care of view creation if (createVwDesc != null) { http://git-wip-us.apache.org/repos/asf/hive/blob/1d766568/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 7efc987..4049f40 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -460,7 +460,8 @@ public abstract class TaskCompiler { pCtx.getNameToSplitSample(), pCtx.getSemanticInputs(), rootTasks, pCtx.getOpToPartToSkewedPruner(), pCtx.getViewAliasToInput(), pCtx.getReduceSinkOperatorsAddedByEnforceBucketingSorting(), - pCtx.getAnalyzeRewrite(), pCtx.getCreateTable(), pCtx.getQueryProperties(), pCtx.getViewProjectToTableSchema()); + pCtx.getAnalyzeRewrite(), pCtx.getCreateTable(), pCtx.getQueryProperties(), pCtx.getViewProjectToTableSchema(), + pCtx.getAcidSinks()); clone.setFetchTask(pCtx.getFetchTask()); clone.setLineageInfo(pCtx.getLineageInfo()); clone.setMapJoinOps(pCtx.getMapJoinOps());