Repository: hive
Updated Branches:
  refs/heads/master c4b2549be -> 1d7665683


HIVE-11550 ACID queries pollute HiveConf (Eugene Koifman, reviewed by Alan 
Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1d766568
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1d766568
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1d766568

Branch: refs/heads/master
Commit: 1d76656833080a5cb1ea3717079945213e8d48cb
Parents: c4b2549
Author: Eugene Koifman <ekoif...@hortonworks.com>
Authored: Fri May 13 12:21:54 2016 -0700
Committer: Eugene Koifman <ekoif...@hortonworks.com>
Committed: Fri May 13 12:21:54 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/optimizer/Optimizer.java     |  2 +-
 .../correlation/AbstractCorrelationProcCtx.java | 21 +++++++++++++++++++-
 .../hadoop/hive/ql/parse/ParseContext.java      | 18 +++++++++++++++--
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  | 10 ++++++++--
 .../hadoop/hive/ql/parse/TaskCompiler.java      |  3 ++-
 5 files changed, 47 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1d766568/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
index 55c71dd..bf9a0a3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
@@ -195,7 +195,7 @@ public class Optimizer {
       transformations.add(new FixedBucketPruningOptimizer(compatMode));
     }
 
-    if(HiveConf.getBoolVar(hiveConf, 
HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) {
+    if(HiveConf.getBoolVar(hiveConf, 
HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION) || pctx.hasAcidWrite()) {
       transformations.add(new ReduceSinkDeDuplication());
     }
     transformations.add(new NonBlockingOpDeDupProc());

http://git-wip-us.apache.org/repos/asf/hive/blob/1d766568/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java
index 5b673df..bb4ec55 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java
@@ -28,8 +28,12 @@ import java.util.Set;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 abstract class AbstractCorrelationProcCtx implements NodeProcessorCtx {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractCorrelationProcCtx.class);
   private ParseContext pctx;
   // For queries using script, the optimization cannot be applied without 
user's confirmation
   // If script preserves alias and value for columns related to keys, user can 
set this true
@@ -45,7 +49,22 @@ abstract class AbstractCorrelationProcCtx implements 
NodeProcessorCtx {
   public AbstractCorrelationProcCtx(ParseContext pctx) {
     removedOps = new HashSet<Operator<?>>();
     trustScript = pctx.getConf().getBoolVar(HIVESCRIPTOPERATORTRUST);
-    minReducer = 
pctx.getConf().getIntVar(HIVEOPTREDUCEDEDUPLICATIONMINREDUCER);
+    if(pctx.hasAcidWrite()) {
+      StringBuilder tblNames = new StringBuilder();
+      for(FileSinkDesc fsd : pctx.getAcidSinks()) {
+        if(fsd.getTable() != null) {
+          
tblNames.append(fsd.getTable().getDbName()).append('.').append(fsd.getTable().getTableName()).append(',');
+        }
+      }
+      if(tblNames.length() > 0) {
+        tblNames.setLength(tblNames.length() - 1);//traling ','
+      }
+      LOG.info("Overriding " + HIVEOPTREDUCEDEDUPLICATIONMINREDUCER + " to 1 
due to a write to transactional table(s) " + tblNames);
+      minReducer = 1;
+    }
+    else {
+      minReducer = 
pctx.getConf().getIntVar(HIVEOPTREDUCEDEDUPLICATIONMINREDUCER);
+    }
     isMapAggr = pctx.getConf().getBoolVar(HIVEMAPSIDEAGGREGATE);
     this.pctx = pctx;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/1d766568/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
index 5d0f905..96ef20d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -48,6 +49,7 @@ import 
org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
 import 
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.AnalyzeRewriteContext;
 import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.FilterDesc.SampleDesc;
 import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
@@ -115,6 +117,8 @@ public class ParseContext {
   private Map<SelectOperator, Table> viewProjectToViewSchema;  
   private ColumnAccessInfo columnAccessInfo;
   private boolean needViewColumnAuthorization;
+  private Set<FileSinkDesc> acidFileSinks = Collections.emptySet();
+
 
   public ParseContext() {
   }
@@ -174,7 +178,8 @@ public class ParseContext {
       Map<String, ReadEntity> viewAliasToInput,
       List<ReduceSinkOperator> 
reduceSinkOperatorsAddedByEnforceBucketingSorting,
       AnalyzeRewriteContext analyzeRewrite, CreateTableDesc createTableDesc,
-      QueryProperties queryProperties, Map<SelectOperator, Table> 
viewProjectToTableSchema) {
+      QueryProperties queryProperties, Map<SelectOperator, Table> 
viewProjectToTableSchema,
+      Set<FileSinkDesc> acidFileSinks) {
     this.queryState = queryState;
     this.conf = queryState.getConf();
     this.opToPartPruner = opToPartPruner;
@@ -211,8 +216,17 @@ public class ParseContext {
       // authorization info.
       this.columnAccessInfo = new ColumnAccessInfo();
     }
+    if(acidFileSinks != null && !acidFileSinks.isEmpty()) {
+      this.acidFileSinks = new HashSet<>();
+      this.acidFileSinks.addAll(acidFileSinks);
+    }
+  }
+  public Set<FileSinkDesc> getAcidSinks() {
+    return acidFileSinks;
+  }
+  public boolean hasAcidWrite() {
+    return !acidFileSinks.isEmpty();
   }
-
   /**
    * @return the context
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/1d766568/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index f79a525..7162c08 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -452,7 +452,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject,
         opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks,
         opToPartToSkewedPruner, viewAliasToInput, 
reduceSinkOperatorsAddedByEnforceBucketingSorting,
-        analyzeRewrite, tableDesc, queryProperties, viewProjectToTableSchema);
+        analyzeRewrite, tableDesc, queryProperties, viewProjectToTableSchema, 
acidFileSinks);
   }
 
   public CompilationOpContext getOpContext() {
@@ -7026,9 +7026,15 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
       LOG.debug("Couldn't find table " + tableName + " in insertIntoTable");
       throw new 
SemanticException(ErrorMsg.NO_INSERT_OVERWRITE_WITH_ACID.getMsg());
     }
+    /*
     LOG.info("Modifying config values for ACID write");
     conf.setBoolVar(ConfVars.HIVEOPTREDUCEDEDUPLICATION, true);
     conf.setIntVar(ConfVars.HIVEOPTREDUCEDEDUPLICATIONMINREDUCER, 1);
+    These props are now enabled elsewhere (see commit diffs).  It would be 
better instead to throw
+    if they are not set.  For exmaple, if user has set 
hive.optimize.reducededuplication=false for
+    some reason, we'll run a query contrary to what they wanted...  But 
throwing now would be
+    backwards incompatible.
+    */
     conf.set(AcidUtils.CONF_ACID_KEY, "true");
 
     if (table.getNumBuckets() < 1) {
@@ -10687,7 +10693,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
         listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject, 
opToSamplePruner,
         globalLimitCtx, nameToSplitSample, inputs, rootTasks, 
opToPartToSkewedPruner,
         viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting,
-        analyzeRewrite, tableDesc, queryProperties, viewProjectToTableSchema);
+        analyzeRewrite, tableDesc, queryProperties, viewProjectToTableSchema, 
acidFileSinks);
 
     // 5. Take care of view creation
     if (createVwDesc != null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/1d766568/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index 7efc987..4049f40 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -460,7 +460,8 @@ public abstract class TaskCompiler {
         pCtx.getNameToSplitSample(), pCtx.getSemanticInputs(), rootTasks,
         pCtx.getOpToPartToSkewedPruner(), pCtx.getViewAliasToInput(),
         pCtx.getReduceSinkOperatorsAddedByEnforceBucketingSorting(),
-        pCtx.getAnalyzeRewrite(), pCtx.getCreateTable(), 
pCtx.getQueryProperties(), pCtx.getViewProjectToTableSchema());
+        pCtx.getAnalyzeRewrite(), pCtx.getCreateTable(), 
pCtx.getQueryProperties(), pCtx.getViewProjectToTableSchema(),
+        pCtx.getAcidSinks());
     clone.setFetchTask(pCtx.getFetchTask());
     clone.setLineageInfo(pCtx.getLineageInfo());
     clone.setMapJoinOps(pCtx.getMapJoinOps());

Reply via email to