This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 8719afa HIVE-23837: Configure StorageHandlers if FileSinkOperator is child of MergeJoinWork (Peter Varga, reviewed by Denys Kuzmenko) 8719afa is described below commit 8719afaec7f30fa7bb4c9c1fea974f8228ea1e03 Author: Peter Varga <pva...@cloudera.com> AuthorDate: Fri Jul 17 09:36:33 2020 +0200 HIVE-23837: Configure StorageHandlers if FileSinkOperator is child of MergeJoinWork (Peter Varga, reviewed by Denys Kuzmenko) Closes (#1244) --- .../java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java | 1 + ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java | 10 +++++++++- ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java | 4 +--- ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java | 7 ++----- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java | 9 --------- 5 files changed, 13 insertions(+), 18 deletions(-) diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java index 16658d0..1e772dd 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java @@ -330,6 +330,7 @@ public class HBaseStorageHandler extends DefaultStorageHandler } @Override public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { + LOG.debug("Configuring JobConf for table {}.{}", tableDesc.getDbName(), tableDesc.getTableName()); try { HBaseSerDe.configureJobConf(tableDesc, jobConf); /* diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java index 31f54c3..35e02e5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java @@ -28,6 +28,8 @@ import java.util.Map; import java.util.Set; import java.util.Stack; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation; @@ -510,7 +512,13 @@ public abstract class BaseWork extends AbstractOperatorDesc { reservedMemoryMB = memoryMB; } - public abstract void configureJobConf(JobConf job); + public void configureJobConf(JobConf job) { + OperatorUtils.findOperators(getAllRootOperators(), FileSinkOperator.class).forEach(fs -> { + LOG.debug("Configuring JobConf for table {}.{}", fs.getConf().getTableInfo().getDbName(), + fs.getConf().getTableInfo().getTableName()); + PlanUtils.configureJobConf(fs.getConf().getTableInfo(), job); + }); + } public void setTag(int tag) { this.tag = tag; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index 045e06b..2ae6727 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -657,13 +657,11 @@ public class MapWork extends BaseWork { @Override public void configureJobConf(JobConf job) { + super.configureJobConf(job); for (PartitionDesc partition : aliasToPartnInfo.values()) { PlanUtils.configureJobConf(partition.getTableDesc(), job); } Collection<Operator<?>> mappers = aliasToWork.values(); - for (FileSinkOperator fs : OperatorUtils.findOperators(mappers, FileSinkOperator.class)) { - PlanUtils.configureJobConf(fs.getConf().getTableInfo(), job); - } for (IConfigureJobConf icjc : OperatorUtils.findOperators(mappers, IConfigureJobConf.class)) { icjc.configureJobConf(job); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java index dd907ef..fe7f428 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java @@ -26,10 +26,11 @@ import java.util.Map.Entry; import java.util.Set; import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; -import org.apache.hadoop.hive.ql.plan.BaseWork.BaseExplainVectorization; import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.ql.plan.Explain.Vectorization; import org.apache.hadoop.mapred.JobConf; @@ -64,10 +65,6 @@ public class MergeJoinWork extends BaseWork { return getMainWork().getAnyRootOperator(); } - @Override - public void configureJobConf(JobConf job) { - } - public CommonMergeJoinOperator getMergeJoinOperator() { return this.mergeJoinOp; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java index 51298ce..66c3b56 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java @@ -205,15 +205,6 @@ public class ReduceWork extends BaseWork { this.numReduceTasks = numReduceTasks; } - @Override - public void configureJobConf(JobConf job) { - if (reducer != null) { - for (FileSinkOperator fs : OperatorUtils.findOperators(reducer, FileSinkOperator.class)) { - PlanUtils.configureJobConf(fs.getConf().getTableInfo(), job); - } - } - } - public void setAutoReduceParallelism(boolean isAutoReduceParallelism) { this.isAutoReduceParallelism = isAutoReduceParallelism; }