This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 8719afa  HIVE-23837: Configure StorageHandlers if FileSinkOperator is 
child of MergeJoinWork (Peter Varga, reviewed by Denys Kuzmenko)
8719afa is described below

commit 8719afaec7f30fa7bb4c9c1fea974f8228ea1e03
Author: Peter Varga <pva...@cloudera.com>
AuthorDate: Fri Jul 17 09:36:33 2020 +0200

    HIVE-23837: Configure StorageHandlers if FileSinkOperator is child of 
MergeJoinWork (Peter Varga, reviewed by Denys Kuzmenko)
    
    Closes (#1244)
---
 .../java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java |  1 +
 ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java       | 10 +++++++++-
 ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java        |  4 +---
 ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java  |  7 ++-----
 ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java     |  9 ---------
 5 files changed, 13 insertions(+), 18 deletions(-)

diff --git 
a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java 
b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
index 16658d0..1e772dd 100644
--- 
a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
+++ 
b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
@@ -330,6 +330,7 @@ public class HBaseStorageHandler extends 
DefaultStorageHandler
   }
   @Override
   public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
+    LOG.debug("Configuring JobConf for table {}.{}", tableDesc.getDbName(), 
tableDesc.getTableName());
     try {
       HBaseSerDe.configureJobConf(tableDesc, jobConf);
       /*
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
index 31f54c3..35e02e5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
@@ -28,6 +28,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Stack;
 
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation;
@@ -510,7 +512,13 @@ public abstract class BaseWork extends 
AbstractOperatorDesc {
     reservedMemoryMB = memoryMB;
   }
 
-  public abstract void configureJobConf(JobConf job);
+  public void configureJobConf(JobConf job) {
+    OperatorUtils.findOperators(getAllRootOperators(), 
FileSinkOperator.class).forEach(fs -> {
+      LOG.debug("Configuring JobConf for table {}.{}", 
fs.getConf().getTableInfo().getDbName(),
+          fs.getConf().getTableInfo().getTableName());
+      PlanUtils.configureJobConf(fs.getConf().getTableInfo(), job);
+    });
+  }
 
   public void setTag(int tag) {
     this.tag = tag;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index 045e06b..2ae6727 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -657,13 +657,11 @@ public class MapWork extends BaseWork {
 
   @Override
   public void configureJobConf(JobConf job) {
+    super.configureJobConf(job);
     for (PartitionDesc partition : aliasToPartnInfo.values()) {
       PlanUtils.configureJobConf(partition.getTableDesc(), job);
     }
     Collection<Operator<?>> mappers = aliasToWork.values();
-    for (FileSinkOperator fs : OperatorUtils.findOperators(mappers, 
FileSinkOperator.class)) {
-      PlanUtils.configureJobConf(fs.getConf().getTableInfo(), job);
-    }
     for (IConfigureJobConf icjc : OperatorUtils.findOperators(mappers, 
IConfigureJobConf.class)) {
       icjc.configureJobConf(job);
     }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java
index dd907ef..fe7f428 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java
@@ -26,10 +26,11 @@ import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.plan.BaseWork.BaseExplainVectorization;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 import org.apache.hadoop.hive.ql.plan.Explain.Vectorization;
 import org.apache.hadoop.mapred.JobConf;
@@ -64,10 +65,6 @@ public class MergeJoinWork extends BaseWork {
     return getMainWork().getAnyRootOperator();
   }
 
-  @Override
-  public void configureJobConf(JobConf job) {
-  }
-
   public CommonMergeJoinOperator getMergeJoinOperator() {
     return this.mergeJoinOp;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
index 51298ce..66c3b56 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
@@ -205,15 +205,6 @@ public class ReduceWork extends BaseWork {
     this.numReduceTasks = numReduceTasks;
   }
 
-  @Override
-  public void configureJobConf(JobConf job) {
-    if (reducer != null) {
-      for (FileSinkOperator fs : OperatorUtils.findOperators(reducer, 
FileSinkOperator.class)) {
-        PlanUtils.configureJobConf(fs.getConf().getTableInfo(), job);
-      }
-    }
-  }
-
   public void setAutoReduceParallelism(boolean isAutoReduceParallelism) {
     this.isAutoReduceParallelism = isAutoReduceParallelism;
   }

Reply via email to