This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 64c0055f955 HIVE-29464: Rethink MapWork.aliasToPartnInfo (#6344)
64c0055f955 is described below
commit 64c0055f9550d4f18de179ab2817739054fe7c37
Author: Hemanth Umashankar
<[email protected]>
AuthorDate: Mon Mar 9 15:57:55 2026 +0530
HIVE-29464: Rethink MapWork.aliasToPartnInfo (#6344)
* HIVE-29464: Introduce getDistinctTableDescs() in MapWork to retrieve
unique TableDesc objects without iterating partitions
* Refactor MapWork partition access to purpose-built APIs and remove raw
aliasToPartnInfo exposure
* Return Iterator from getPartitionDescs() and remove redundant null checks
by initializing aliasToPartnInfo at field declaration
---------
Co-authored-by: Hemanth Umashankar <[email protected]>
---
.../hive/druid/io/DruidVectorizedWrapper.java | 13 +++--
.../hive/kafka/KafkaDagCredentialSupplier.java | 16 ++----
.../hive/kafka/VectorizedKafkaRecordReader.java | 12 ++---
.../apache/hadoop/hive/ql/exec/MapOperator.java | 7 ++-
.../org/apache/hadoop/hive/ql/exec/Utilities.java | 4 +-
.../apache/hadoop/hive/ql/exec/mr/ExecDriver.java | 9 +++-
.../apache/hadoop/hive/ql/exec/mr/ExecMapper.java | 5 +-
.../hive/ql/exec/tez/HiveSplitGenerator.java | 4 +-
.../hive/ql/exec/tez/MapRecordProcessor.java | 5 +-
.../hadoop/hive/ql/exec/tez/SplitGrouper.java | 2 +-
.../hive/ql/io/TeradataBinaryRecordReader.java | 13 ++---
.../hadoop/hive/ql/optimizer/GenMapRedUtils.java | 49 ++++++++---------
.../hadoop/hive/ql/optimizer/MapJoinProcessor.java | 2 +-
.../optimizer/physical/GenMRSkewJoinProcessor.java | 2 +-
.../physical/SortMergeJoinTaskDispatcher.java | 7 ++-
.../org/apache/hadoop/hive/ql/plan/MapWork.java | 63 ++++++++++++----------
16 files changed, 111 insertions(+), 102 deletions(-)
diff --git
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidVectorizedWrapper.java
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidVectorizedWrapper.java
index 586631d54a0..2345e13ab07 100644
---
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidVectorizedWrapper.java
+++
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidVectorizedWrapper.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.mapred.RecordReader;
import java.io.IOException;
+import java.util.Iterator;
import java.util.Properties;
/**
@@ -123,13 +124,11 @@ public DruidVectorizedWrapper(DruidQueryRecordReader
reader, Configuration jobCo
private static DruidSerDe createAndInitializeSerde(Configuration jobConf) {
DruidSerDe serDe = new DruidSerDe();
MapWork mapWork =
Preconditions.checkNotNull(Utilities.getMapWork(jobConf), "Map work is null");
- Properties
- properties =
- mapWork.getPartitionDescs()
- .stream()
- .map(partitionDesc -> partitionDesc.getTableDesc().getProperties())
- .findAny()
- .orElseThrow(() -> new RuntimeException("Can not find table
property at the map work"));
+ Iterator<org.apache.hadoop.hive.ql.plan.PartitionDesc> partitionIterator =
mapWork.getPartitionDescs();
+ if (!partitionIterator.hasNext()) {
+ throw new RuntimeException("Can not find table property at the map
work");
+ }
+ Properties properties =
partitionIterator.next().getTableDesc().getProperties();
try {
serDe.initialize(jobConf, properties, null);
} catch (SerDeException e) {
diff --git
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplier.java
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplier.java
index 2e36ecfb198..91932153660 100644
---
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplier.java
+++
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplier.java
@@ -22,7 +22,6 @@
import org.apache.hadoop.hive.ql.exec.tez.DagCredentialSupplier;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SecurityUtil;
@@ -40,7 +39,6 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@@ -56,22 +54,18 @@ public Token<?> obtainToken(BaseWork work, Set<TableDesc>
fileSinkTableDescs, Co
if(!(work instanceof MapWork)){
return null;
}
- Map<String, PartitionDesc> partitions = ((MapWork)
work).getAliasToPartnInfo();
-
- // We don't need to iterate on all partitions, and check the same
TableDesc.
- PartitionDesc partition =
partitions.values().stream().findFirst().orElse(null);
- if (partition != null) {
- TableDesc tableDesc = partition.getTableDesc();
+ TableDesc tableDesc = ((MapWork)
work).getDistinctTableDescs().stream().findFirst().orElse(null);
+ if (tableDesc != null) {
if (isTokenRequired(tableDesc)) {
// don't collect delegation token again, if it was already successful
return getKafkaDelegationTokenForBrokers(conf, tableDesc);
}
}
- for (TableDesc tableDesc : fileSinkTableDescs) {
- if (isTokenRequired(tableDesc)) {
+ for (TableDesc fileSinkTableDesc : fileSinkTableDescs) {
+ if (isTokenRequired(fileSinkTableDesc)) {
// don't collect delegation token again, if it was already successful
- return getKafkaDelegationTokenForBrokers(conf, tableDesc);
+ return getKafkaDelegationTokenForBrokers(conf, fileSinkTableDesc);
}
}
return null;
diff --git
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java
index bb379f4be05..d3927c098cb 100644
---
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java
+++
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java
@@ -179,13 +179,11 @@ private int readNextBatch(VectorizedRowBatch
vectorizedRowBatch,
@SuppressWarnings("Duplicates") private static KafkaSerDe
createAndInitializeSerde(Configuration jobConf) {
KafkaSerDe serDe = new KafkaSerDe();
MapWork mapWork =
Preconditions.checkNotNull(Utilities.getMapWork(jobConf), "Map work is null");
- Properties
- properties =
- mapWork.getPartitionDescs()
- .stream()
- .map(partitionDesc -> partitionDesc.getTableDesc().getProperties())
- .findAny()
- .orElseThrow(() -> new RuntimeException("Can not find table
property at the map work"));
+ Iterator<org.apache.hadoop.hive.ql.plan.PartitionDesc> partitionIterator =
mapWork.getPartitionDescs();
+ if (!partitionIterator.hasNext()) {
+ throw new RuntimeException("Can not find table property at the map
work");
+ }
+ Properties properties =
partitionIterator.next().getTableDesc().getProperties();
try {
serDe.initialize(jobConf, properties, null);
} catch (SerDeException e) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
index 0cf1ab809f7..7aa9c41a220 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
@@ -22,6 +22,7 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -388,7 +389,9 @@ private Map<String, Configuration>
cloneConfsForColPruning(Configuration hconf)
}
}
- for (PartitionDesc pd: conf.getAliasToPartnInfo().values()) {
+ Iterator<PartitionDesc> partitionIterator = conf.getPartitionDescs();
+ while (partitionIterator.hasNext()) {
+ PartitionDesc pd = partitionIterator.next();
if (!tableNameToConf.containsKey(pd.getTableName())) {
tableNameToConf.put(pd.getTableName(), hconf);
}
@@ -413,7 +416,7 @@ public void initEmptyInputChildren(List<Operator<?>>
children, Configuration hco
for (Operator<?> child : children) {
TableScanOperator tsOp = (TableScanOperator) child;
StructObjectInspector soi = null;
- PartitionDesc partDesc =
conf.getAliasToPartnInfo().get(tsOp.getConf().getAlias());
+ PartitionDesc partDesc =
conf.getPartitionDesc(tsOp.getConf().getAlias());
Configuration newConf =
tableNameToConf.get(partDesc.getTableDesc().getTableName());
AbstractSerDe serde = partDesc.getTableDesc().getSerDe();
partDesc.setProperties(partDesc.getProperties());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 3f87b123f17..37e91652fb8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -3772,7 +3772,7 @@ private static Path createDummyFileForEmptyTable(JobConf
job, MapWork work,
Path hiveScratchDir, String alias)
throws Exception {
- TableDesc tableDesc = work.getAliasToPartnInfo().get(alias).getTableDesc();
+ TableDesc tableDesc = work.getPartitionDesc(alias).getTableDesc();
if (tableDesc.isNonNative()) {
// if it does not need native storage, we can't create an empty file for
it.
return null;
@@ -3794,7 +3794,7 @@ private static Path createDummyFileForEmptyTable(JobConf
job, MapWork work,
work.setPathToAliases(pathToAliases);
- PartitionDesc pDesc = work.getAliasToPartnInfo().get(alias).clone();
+ PartitionDesc pDesc = work.getPartitionDesc(alias).clone();
work.addPathToPartitionInfo(newPath, pDesc);
return newPath;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index d724d46cba3..e96ca2c0697 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -29,6 +29,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;
@@ -533,9 +534,13 @@ private void handleSampling(Context context, MapWork
mWork, JobConf job)
String alias = mWork.getAliases().get(0);
Operator<?> topOp = mWork.getAliasToWork().get(alias);
- PartitionDesc partDesc = mWork.getAliasToPartnInfo().get(alias);
+ PartitionDesc partDesc = mWork.getPartitionDesc(alias);
- ArrayList<PartitionDesc> parts = mWork.getPartitionDescs();
+ List<PartitionDesc> parts = new ArrayList<>();
+ Iterator<PartitionDesc> partitionIterator = mWork.getPartitionDescs();
+ while (partitionIterator.hasNext()) {
+ parts.add(partitionIterator.next());
+ }
List<Path> inputPaths = mWork.getPaths();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
index f7a658ea924..4cf3522db38 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.exec.mr;
import java.io.IOException;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -83,7 +84,9 @@ public void configure(JobConf job) {
// create map and fetch operators
MapWork mrwork = Utilities.getMapWork(job);
- for (PartitionDesc part : mrwork.getAliasToPartnInfo().values()) {
+ Iterator<PartitionDesc> partitionIterator = mrwork.getPartitionDescs();
+ while (partitionIterator.hasNext()) {
+ PartitionDesc part = partitionIterator.next();
TableDesc tableDesc = part.getTableDesc();
Utilities.copyJobSecretToTableProperties(tableDesc);
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index 5222e3b868d..cf7b78beff0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -341,7 +341,7 @@ public List<Event> initialize() throws Exception {
// if files exists in input path then it has to be 1 as this code
path gets triggered only
// of order by queries which is expected to write only one file
(written by one reducer)
Preconditions.checkState(paths.size() == 1 && fileStatuses.length
== 1 &&
- mapWork.getAliasToPartnInfo().size() == 1,
+ mapWork.getPartitionCount() == 1,
"Requested to generate single split. Paths and fileStatuses are
expected to be 1. " +
"Got paths: " + paths.size() + " fileStatuses: " +
fileStatuses.length);
splits = new InputSplit[1];
@@ -354,7 +354,7 @@ public List<Event> initialize() throws Exception {
String[] hosts = hostsSet.toArray(new String[0]);
FileSplit fileSplit = new FileSplit(fileStatus.getPath(), 0,
fileStatus.getLen(), hosts);
String alias = mapWork.getAliases().get(0);
- PartitionDesc partDesc = mapWork.getAliasToPartnInfo().get(alias);
+ PartitionDesc partDesc = mapWork.getPartitionDesc(alias);
String partIF = partDesc.getInputFileFormatClassName();
splits[0] = new HiveInputFormat.HiveInputSplit(fileSplit, partIF);
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 5a31f22b200..996a9fc7e5f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -22,6 +22,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -141,7 +142,9 @@ void init(MRTaskReporter mrReporter,
// TODO HIVE-14042. Cleanup may be required if exiting early.
Utilities.setMapWork(jconf, mapWork);
- for (PartitionDesc part : mapWork.getAliasToPartnInfo().values()) {
+ Iterator<PartitionDesc> partitionIterator = mapWork.getPartitionDescs();
+ while (partitionIterator.hasNext()) {
+ PartitionDesc part = partitionIterator.next();
TableDesc tableDesc = part.getTableDesc();
Utilities.copyJobSecretToTableProperties(tableDesc);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
index d091f0390a7..201d7332346 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
@@ -190,7 +190,7 @@ public Multimap<Integer, InputSplit>
generateGroupedSplits(JobConf jobConf, Conf
if ((aliases != null) && (aliases.size() == 1)) {
Operator<? extends OperatorDesc> op =
mapWork.getAliasToWork().get(aliases.getFirst());
if (op instanceof TableScanOperator tableScan) {
- PartitionDesc partitionDesc =
mapWork.getAliasToPartnInfo().get(aliases.getFirst());
+ PartitionDesc partitionDesc =
mapWork.getPartitionDesc(aliases.getFirst());
isMinorCompaction &=
AcidUtils.isCompactionTable(partitionDesc.getTableDesc().getProperties());
if (!tableScan.getConf().isTranscationalTable() &&
!isMinorCompaction) {
String splitPath = getFirstSplitPath(splits);
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryRecordReader.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryRecordReader.java
index be0a672bdb7..12906a1021c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryRecordReader.java
@@ -21,8 +21,8 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Iterator;
import java.util.Map;
-
import com.google.common.collect.ImmutableMap;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.io.EndianUtils;
@@ -31,7 +31,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -85,10 +84,12 @@ public TeradataBinaryRecordReader(JobConf job, FileSplit
fileSplit) throws IOExc
String rowLength = job.get(TD_ROW_LENGTH);
if (rowLength == null) {
LOG.debug("No table property in JobConf. Try to recover the table
directly");
- Map<String, PartitionDesc> partitionDescMap =
Utilities.getMapRedWork(job).getMapWork().getAliasToPartnInfo();
- for (String alias :
Utilities.getMapRedWork(job).getMapWork().getAliasToPartnInfo().keySet()) {
- LOG.debug(format("the current alias: %s", alias));
- rowLength =
partitionDescMap.get(alias).getTableDesc().getProperties().getProperty(TD_ROW_LENGTH);
+ Iterator<org.apache.hadoop.hive.ql.plan.PartitionDesc> partitionIterator
=
+ Utilities.getMapRedWork(job).getMapWork().getPartitionDescs();
+ while (partitionIterator.hasNext()) {
+ org.apache.hadoop.hive.ql.plan.PartitionDesc partitionDesc =
partitionIterator.next();
+ LOG.debug("Checking partition metadata for row length");
+ rowLength =
partitionDesc.getTableDesc().getProperties().getProperty(TD_ROW_LENGTH);
if (rowLength != null) {
break;
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 5af9ff1af4c..bd1be003a51 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -436,7 +436,7 @@ static void splitPlan(ReduceSinkOperator cRS,
GenMRProcContext opProcCtx)
/**
* set the current task in the mapredWork.
*
- * @param alias_id
+ * @param aliasId
* current alias
* @param topOp
* the top operator of the stack
@@ -447,16 +447,16 @@ static void splitPlan(ReduceSinkOperator cRS,
GenMRProcContext opProcCtx)
* @param opProcCtx
* processing context
*/
- public static void setTaskPlan(String alias_id,
+ public static void setTaskPlan(String aliasId,
TableScanOperator topOp, Task<?> task, boolean local,
GenMRProcContext opProcCtx) throws SemanticException {
- setTaskPlan(alias_id, topOp, task, local, opProcCtx, null);
+ setTaskPlan(aliasId, topOp, task, local, opProcCtx, null);
}
/**
* set the current task in the mapredWork.
*
- * @param alias_id
+ * @param aliasId
* current alias
* @param topOp
* the top operator of the stack
@@ -469,18 +469,18 @@ public static void setTaskPlan(String alias_id,
* @param pList
* pruned partition list. If it is null it will be computed
on-the-fly.
*/
- public static void setTaskPlan(String alias_id,
+ public static void setTaskPlan(String aliasId,
TableScanOperator topOp, Task<?> task, boolean local,
GenMRProcContext opProcCtx, PrunedPartitionList pList) throws
SemanticException {
setMapWork(((MapredWork) task.getWork()).getMapWork(),
opProcCtx.getParseCtx(),
- opProcCtx.getInputs(), pList, topOp, alias_id, opProcCtx.getConf(),
local);
+ opProcCtx.getInputs(), pList, topOp, aliasId, opProcCtx.getConf(),
local);
opProcCtx.addSeenOp(task, topOp);
}
/**
* initialize MapWork
*
- * @param alias_id
+ * @param aliasId
* current alias
* @param plan
* map work to initialize
@@ -494,7 +494,7 @@ public static void setTaskPlan(String alias_id,
* current instance of hive conf
*/
public static void setMapWork(MapWork plan, ParseContext parseCtx,
Set<ReadEntity> inputs,
- PrunedPartitionList partsList, TableScanOperator tsOp, String alias_id,
+ PrunedPartitionList partsList, TableScanOperator tsOp, String aliasId,
HiveConf conf, boolean local) throws SemanticException {
ArrayList<Path> partDir = new ArrayList<Path>();
ArrayList<PartitionDesc> partDesc = new ArrayList<PartitionDesc>();
@@ -513,12 +513,12 @@ public static void setMapWork(MapWork plan, ParseContext
parseCtx, Set<ReadEntit
partsList = new PrunedPartitionList(tab, null, Sets.newHashSet(new
DummyPartition(tab)),
Collections.emptyList(), false);
} else {
- partsList = PartitionPruner.prune(tsOp, parseCtx, alias_id);
+ partsList = PartitionPruner.prune(tsOp, parseCtx, aliasId);
isFullAcidTable = tsOp.getConf().isFullAcidTable();
}
}
- // Generate the map work for this alias_id
+ // Generate the map work for this aliasId
// pass both confirmed and unknown partitions through the map-reduce
// framework
Set<Partition> parts = partsList.getPartitions();
@@ -544,7 +544,7 @@ public static void setMapWork(MapWork plan, ParseContext
parseCtx, Set<ReadEntit
target.putAll(props);
}
- plan.getAliasToPartnInfo().put(alias_id, aliasPartnDesc);
+ plan.putPartitionDesc(aliasId, aliasPartnDesc);
long sizeNeeded = Integer.MAX_VALUE;
int fileLimit = -1;
@@ -585,7 +585,7 @@ public static void setMapWork(MapWork plan, ParseContext
parseCtx, Set<ReadEntit
// where V is a view of the form: select * from T
// The dependencies should include V at depth 0, and T at depth 1
(inferred).
Map<String, ReadEntity> viewToInput = parseCtx.getViewAliasToInput();
- ReadEntity parentViewInfo = PlanUtils.getParentViewInfo(alias_id,
viewToInput);
+ ReadEntity parentViewInfo = PlanUtils.getParentViewInfo(aliasId,
viewToInput);
// The table should also be considered a part of inputs, even if the table
is a
// partitioned table and whether any partition is selected or not
@@ -686,7 +686,7 @@ public static void setMapWork(MapWork plan, ParseContext
parseCtx, Set<ReadEntit
if (p == null) {
continue;
}
- LOG.debug("Adding {} of table {}", p, alias_id);
+ LOG.debug("Adding {} of table {}", p, aliasId);
partDir.add(p);
try {
@@ -720,15 +720,15 @@ public static void setMapWork(MapWork plan, ParseContext
parseCtx, Set<ReadEntit
PartitionDesc prtDesc = iterPartnDesc.next();
// Add the path to alias mapping
- plan.addPathToAlias(path,alias_id);
+ plan.addPathToAlias(path,aliasId);
plan.addPathToPartitionInfo(path, prtDesc);
if (LOG.isDebugEnabled()) {
LOG.debug("Information added for path " + path);
}
}
- assert plan.getAliasToWork().get(alias_id) == null;
- plan.getAliasToWork().put(alias_id, tsOp);
+ assert plan.getAliasToWork().get(aliasId) == null;
+ plan.getAliasToWork().put(aliasId, tsOp);
} else {
// populate local work if needed
MapredLocalWork localPlan = plan.getMapRedLocalWork();
@@ -738,16 +738,16 @@ public static void setMapWork(MapWork plan, ParseContext
parseCtx, Set<ReadEntit
new LinkedHashMap<String, FetchWork>());
}
- assert localPlan.getAliasToWork().get(alias_id) == null;
- assert localPlan.getAliasToFetchWork().get(alias_id) == null;
- localPlan.getAliasToWork().put(alias_id, tsOp);
+ assert localPlan.getAliasToWork().get(aliasId) == null;
+ assert localPlan.getAliasToFetchWork().get(aliasId) == null;
+ localPlan.getAliasToWork().put(aliasId, tsOp);
if (tblDir == null) {
tblDesc = Utilities.getTableDesc(partsList.getSourceTable());
localPlan.getAliasToFetchWork().put(
- alias_id,
+ aliasId,
new FetchWork(partDir, partDesc, tblDesc));
} else {
- localPlan.getAliasToFetchWork().put(alias_id,
+ localPlan.getAliasToFetchWork().put(aliasId,
new FetchWork(tblDir, tblDesc));
}
plan.setMapRedLocalWork(localPlan);
@@ -1151,12 +1151,9 @@ public static void replaceMapWork(String sourceAlias,
String targetAlias,
Map<Path, List<String>> sourcePathToAliases = source.getPathToAliases();
Map<Path, PartitionDesc> sourcePathToPartitionInfo =
source.getPathToPartitionInfo();
Map<String, Operator<? extends OperatorDesc>> sourceAliasToWork =
source.getAliasToWork();
- Map<String, PartitionDesc> sourceAliasToPartnInfo =
source.getAliasToPartnInfo();
-
Map<Path, List<String>> targetPathToAliases = target.getPathToAliases();
Map<Path, PartitionDesc> targetPathToPartitionInfo =
target.getPathToPartitionInfo();
Map<String, Operator<? extends OperatorDesc>> targetAliasToWork =
target.getAliasToWork();
- Map<String, PartitionDesc> targetAliasToPartnInfo =
target.getAliasToPartnInfo();
if (!sourceAliasToWork.containsKey(sourceAlias) ||
!targetAliasToWork.containsKey(targetAlias)) {
@@ -1174,7 +1171,7 @@ public static void replaceMapWork(String sourceAlias,
String targetAlias,
// Remove unnecessary information from target
targetAliasToWork.remove(targetAlias);
- targetAliasToPartnInfo.remove(targetAlias);
+ target.removeAlias(targetAlias);
List<Path> pathsToRemove = new ArrayList<>();
for (Entry<Path, List<String>> entry: targetPathToAliases.entrySet()) {
List<String> aliases = entry.getValue();
@@ -1190,7 +1187,7 @@ public static void replaceMapWork(String sourceAlias,
String targetAlias,
// Add new information from source to target
targetAliasToWork.put(sourceAlias, sourceAliasToWork.get(sourceAlias));
- targetAliasToPartnInfo.putAll(sourceAliasToPartnInfo);
+ target.putPartitionDesc(sourceAlias, source.getPartitionDesc(sourceAlias));
targetPathToPartitionInfo.putAll(sourcePathToPartitionInfo);
List<Path> pathsToAdd = new ArrayList<>();
for (Entry<Path, List<String>> entry: sourcePathToAliases.entrySet()) {
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
index b130a802ff4..ea5a40cb684 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
@@ -188,7 +188,7 @@ private static void genMapJoinLocalWork(MapredWork newWork,
MapJoinOperator mapJ
}
// create fetchwork for partitioned table
if (fetchWork == null) {
- TableDesc table =
newWork.getMapWork().getAliasToPartnInfo().get(alias).getTableDesc();
+ TableDesc table =
newWork.getMapWork().getPartitionDesc(alias).getTableDesc();
fetchWork = new FetchWork(partDir, partDesc, table);
}
// set alias to fetch work
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
index f17d111baef..ca9c8c707a3 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
@@ -271,7 +271,7 @@ public static void processSkewJoin(JoinOperator joinOp,
PartitionDesc part = new PartitionDesc(tableDescList.get(src), null);
newPlan.addPathToPartitionInfo(bigKeyDirPath, part);
- newPlan.getAliasToPartnInfo().put(alias, part);
+ newPlan.putPartitionDesc(alias, part);
Operator<? extends OperatorDesc> reducer =
clonePlan.getReduceWork().getReducer();
assert reducer instanceof JoinOperator;
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
index d67c5d72038..e92f88f8e7e 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
@@ -73,13 +73,12 @@ public SortMergeJoinTaskDispatcher(PhysicalContext context)
{
// plan are fixed. The operator tree will still contain the SMBJoinOperator
private void genSMBJoinWork(MapWork currWork, SMBMapJoinOperator smbJoinOp) {
// Remove the paths which are not part of aliasToPartitionInfo
- Map<String, PartitionDesc> aliasToPartitionInfo =
currWork.getAliasToPartnInfo();
List<Path> removePaths = new ArrayList<>();
for (Map.Entry<Path, List<String>> entry :
currWork.getPathToAliases().entrySet()) {
boolean keepPath = false;
for (String alias : entry.getValue()) {
- if (aliasToPartitionInfo.containsKey(alias)) {
+ if (currWork.hasPartitionDesc(alias)) {
keepPath = true;
break;
}
@@ -99,7 +98,7 @@ private void genSMBJoinWork(MapWork currWork,
SMBMapJoinOperator smbJoinOp) {
}
for (String alias : removeAliases) {
- currWork.getAliasToPartnInfo().remove(alias);
+ currWork.removeAlias(alias);
currWork.getAliasToWork().remove(alias);
}
@@ -115,7 +114,7 @@ private void genSMBJoinWork(MapWork currWork,
SMBMapJoinOperator smbJoinOp) {
// Add the entry in mapredwork
currWork.getAliasToWork().put(alias, op);
- PartitionDesc partitionInfo = currWork.getAliasToPartnInfo().get(alias);
+ PartitionDesc partitionInfo = currWork.getPartitionDesc(alias);
if (fetchWork.getTblDir() != null) {
currWork.mergeAliasedInput(alias, fetchWork.getTblDir(),
partitionInfo);
} else {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index fa0f1e7b7f5..a18508a8cce 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -47,6 +47,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Interner;
+import com.google.common.collect.Iterators;
/**
* MapWork represents all the information used to run a map task on the
cluster.
@@ -92,7 +93,7 @@ public enum LlapIODescriptor {
private Map<String, Operator<? extends OperatorDesc>> aliasToWork =
new LinkedHashMap<String, Operator<? extends OperatorDesc>>();
- private Map<String, PartitionDesc> aliasToPartnInfo = new
LinkedHashMap<String, PartitionDesc>();
+ private Map<String, PartitionDesc> aliasToPartnInfo = new LinkedHashMap<>();
private Map<String, SplitSample> nameToSplitSample = new
LinkedHashMap<String, SplitSample>();
@@ -364,20 +365,28 @@ public void internTable(Interner<TableDesc> interner) {
}
}
- /**
- * @return the aliasToPartnInfo
- */
- public Map<String, PartitionDesc> getAliasToPartnInfo() {
- return aliasToPartnInfo;
+ public Iterator<PartitionDesc> getPartitionDescs() {
+ return
Iterators.unmodifiableIterator(aliasToPartnInfo.values().iterator());
}
- /**
- * @param aliasToPartnInfo
- * the aliasToPartnInfo to set
- */
- public void setAliasToPartnInfo(
- LinkedHashMap<String, PartitionDesc> aliasToPartnInfo) {
- this.aliasToPartnInfo = aliasToPartnInfo;
+ public PartitionDesc getPartitionDesc(String alias) {
+ return aliasToPartnInfo.get(alias);
+ }
+
+ public int getPartitionCount() {
+ return aliasToPartnInfo.size();
+ }
+
+ public boolean hasPartitionDesc(String alias) {
+ return aliasToPartnInfo.containsKey(alias);
+ }
+
+ public void putPartitionDesc(String alias, PartitionDesc partitionDesc) {
+ aliasToPartnInfo.put(alias, partitionDesc);
+ }
+
+ public void removeAlias(String alias) {
+ aliasToPartnInfo.remove(alias);
}
public Map<String, Operator<? extends OperatorDesc>> getAliasToWork() {
@@ -606,10 +615,6 @@ public ArrayList<Path> getPaths() {
return new ArrayList<Path>(pathToAliases.keySet());
}
- public ArrayList<PartitionDesc> getPartitionDescs() {
- return new ArrayList<PartitionDesc>(aliasToPartnInfo.values());
- }
-
public Path getTmpHDFSPath() {
return tmpHDFSPath;
}
@@ -655,22 +660,24 @@ public String getSamplingTypeString() {
samplingType == 2 ? "SAMPLING_ON_START" : null;
}
+ public Collection<TableDesc> getDistinctTableDescs() {
+ Map<String, TableDesc> tables = new LinkedHashMap<>();
+ for (PartitionDesc partition : aliasToPartnInfo.values()) {
+ TableDesc tableDesc = partition.getTableDesc();
+ if (tableDesc != null) {
+ tables.putIfAbsent(tableDesc.getTableName(), tableDesc);
+ }
+ }
+ return Collections.unmodifiableCollection(tables.values());
+ }
+
@Override
public void configureJobConf(JobConf job) {
super.configureJobConf(job);
// Configure each table only once, even if we read thousands of its
partitions.
// This avoids repeating expensive work (like loading storage drivers) for
every single partition.
- Set<String> processedTables = new HashSet<>();
-
- for (PartitionDesc partition : aliasToPartnInfo.values()) {
- TableDesc tableDesc = partition.getTableDesc();
-
- // If we haven't seen this table before, configure it and remember it.
- // If we have seen it, skip it.
- if (tableDesc != null &&
!processedTables.contains(tableDesc.getTableName())) {
- processedTables.add(tableDesc.getTableName());
- PlanUtils.configureJobConf(tableDesc, job);
- }
+ for (TableDesc tableDesc : getDistinctTableDescs()) {
+ PlanUtils.configureJobConf(tableDesc, job);
}
Collection<Operator<?>> mappers = aliasToWork.values();
for (IConfigureJobConf icjc : OperatorUtils.findOperators(mappers,
IConfigureJobConf.class)) {