This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 64c0055f955 HIVE-29464: Rethink MapWork.aliasToPartnInfo (#6344)
64c0055f955 is described below

commit 64c0055f9550d4f18de179ab2817739054fe7c37
Author: Hemanth Umashankar 
<[email protected]>
AuthorDate: Mon Mar 9 15:57:55 2026 +0530

    HIVE-29464: Rethink MapWork.aliasToPartnInfo (#6344)
    
    * HIVE-29464: Introduce getDistinctTableDescs() in MapWork to retrieve 
unique TableDesc objects without iterating partitions
    
    * Refactor MapWork partition access to purpose-built APIs and remove raw 
aliasToPartnInfo exposure
    
    * Return Iterator from getPartitionDescs() and remove redundant null checks 
by initializing aliasToPartnInfo at field declaration
    
    ---------
    
    Co-authored-by: Hemanth Umashankar <[email protected]>
---
 .../hive/druid/io/DruidVectorizedWrapper.java      | 13 +++--
 .../hive/kafka/KafkaDagCredentialSupplier.java     | 16 ++----
 .../hive/kafka/VectorizedKafkaRecordReader.java    | 12 ++---
 .../apache/hadoop/hive/ql/exec/MapOperator.java    |  7 ++-
 .../org/apache/hadoop/hive/ql/exec/Utilities.java  |  4 +-
 .../apache/hadoop/hive/ql/exec/mr/ExecDriver.java  |  9 +++-
 .../apache/hadoop/hive/ql/exec/mr/ExecMapper.java  |  5 +-
 .../hive/ql/exec/tez/HiveSplitGenerator.java       |  4 +-
 .../hive/ql/exec/tez/MapRecordProcessor.java       |  5 +-
 .../hadoop/hive/ql/exec/tez/SplitGrouper.java      |  2 +-
 .../hive/ql/io/TeradataBinaryRecordReader.java     | 13 ++---
 .../hadoop/hive/ql/optimizer/GenMapRedUtils.java   | 49 ++++++++---------
 .../hadoop/hive/ql/optimizer/MapJoinProcessor.java |  2 +-
 .../optimizer/physical/GenMRSkewJoinProcessor.java |  2 +-
 .../physical/SortMergeJoinTaskDispatcher.java      |  7 ++-
 .../org/apache/hadoop/hive/ql/plan/MapWork.java    | 63 ++++++++++++----------
 16 files changed, 111 insertions(+), 102 deletions(-)

diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidVectorizedWrapper.java
 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidVectorizedWrapper.java
index 586631d54a0..2345e13ab07 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidVectorizedWrapper.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidVectorizedWrapper.java
@@ -35,6 +35,7 @@
 import org.apache.hadoop.mapred.RecordReader;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.Properties;
 
 /**
@@ -123,13 +124,11 @@ public DruidVectorizedWrapper(DruidQueryRecordReader 
reader, Configuration jobCo
   private static DruidSerDe createAndInitializeSerde(Configuration jobConf) {
     DruidSerDe serDe = new DruidSerDe();
     MapWork mapWork = 
Preconditions.checkNotNull(Utilities.getMapWork(jobConf), "Map work is null");
-    Properties
-        properties =
-        mapWork.getPartitionDescs()
-            .stream()
-            .map(partitionDesc -> partitionDesc.getTableDesc().getProperties())
-            .findAny()
-            .orElseThrow(() -> new RuntimeException("Can not find table 
property at the map work"));
+    Iterator<org.apache.hadoop.hive.ql.plan.PartitionDesc> partitionIterator = 
mapWork.getPartitionDescs();
+    if (!partitionIterator.hasNext()) {
+      throw new RuntimeException("Can not find table property at the map 
work");
+    }
+    Properties properties = 
partitionIterator.next().getTableDesc().getProperties();
     try {
       serDe.initialize(jobConf, properties, null);
     } catch (SerDeException e) {
diff --git 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplier.java
 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplier.java
index 2e36ecfb198..91932153660 100644
--- 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplier.java
+++ 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplier.java
@@ -22,7 +22,6 @@
 import org.apache.hadoop.hive.ql.exec.tez.DagCredentialSupplier;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.SecurityUtil;
@@ -40,7 +39,6 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
@@ -56,22 +54,18 @@ public Token<?> obtainToken(BaseWork work, Set<TableDesc> 
fileSinkTableDescs, Co
     if(!(work instanceof MapWork)){
       return null;
     }
-    Map<String, PartitionDesc> partitions = ((MapWork) 
work).getAliasToPartnInfo();
-
-    // We don't need to iterate on all partitions, and check the same 
TableDesc.
-    PartitionDesc partition = 
partitions.values().stream().findFirst().orElse(null);
-    if (partition != null) {
-      TableDesc tableDesc = partition.getTableDesc();
+    TableDesc tableDesc = ((MapWork) 
work).getDistinctTableDescs().stream().findFirst().orElse(null);
+    if (tableDesc != null) {
       if (isTokenRequired(tableDesc)) {
         // don't collect delegation token again, if it was already successful
         return getKafkaDelegationTokenForBrokers(conf, tableDesc);
       }
     }
 
-    for (TableDesc tableDesc : fileSinkTableDescs) {
-      if (isTokenRequired(tableDesc)) {
+    for (TableDesc fileSinkTableDesc : fileSinkTableDescs) {
+      if (isTokenRequired(fileSinkTableDesc)) {
         // don't collect delegation token again, if it was already successful
-        return getKafkaDelegationTokenForBrokers(conf, tableDesc);
+        return getKafkaDelegationTokenForBrokers(conf, fileSinkTableDesc);
       }
     }
     return null;
diff --git 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java
 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java
index bb379f4be05..d3927c098cb 100644
--- 
a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java
+++ 
b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java
@@ -179,13 +179,11 @@ private int readNextBatch(VectorizedRowBatch 
vectorizedRowBatch,
   @SuppressWarnings("Duplicates") private static KafkaSerDe 
createAndInitializeSerde(Configuration jobConf) {
     KafkaSerDe serDe = new KafkaSerDe();
     MapWork mapWork = 
Preconditions.checkNotNull(Utilities.getMapWork(jobConf), "Map work is null");
-    Properties
-        properties =
-        mapWork.getPartitionDescs()
-            .stream()
-            .map(partitionDesc -> partitionDesc.getTableDesc().getProperties())
-            .findAny()
-            .orElseThrow(() -> new RuntimeException("Can not find table 
property at the map work"));
+    Iterator<org.apache.hadoop.hive.ql.plan.PartitionDesc> partitionIterator = 
mapWork.getPartitionDescs();
+    if (!partitionIterator.hasNext()) {
+      throw new RuntimeException("Can not find table property at the map 
work");
+    }
+    Properties properties = 
partitionIterator.next().getTableDesc().getProperties();
     try {
       serDe.initialize(jobConf, properties, null);
     } catch (SerDeException e) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
index 0cf1ab809f7..7aa9c41a220 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
@@ -22,6 +22,7 @@
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -388,7 +389,9 @@ private Map<String, Configuration> 
cloneConfsForColPruning(Configuration hconf)
       }
     }
 
-    for (PartitionDesc pd: conf.getAliasToPartnInfo().values()) {
+    Iterator<PartitionDesc> partitionIterator = conf.getPartitionDescs();
+    while (partitionIterator.hasNext()) {
+      PartitionDesc pd = partitionIterator.next();
       if (!tableNameToConf.containsKey(pd.getTableName())) {
         tableNameToConf.put(pd.getTableName(), hconf);
       }
@@ -413,7 +416,7 @@ public void initEmptyInputChildren(List<Operator<?>> 
children, Configuration hco
     for (Operator<?> child : children) {
       TableScanOperator tsOp = (TableScanOperator) child;
       StructObjectInspector soi = null;
-      PartitionDesc partDesc = 
conf.getAliasToPartnInfo().get(tsOp.getConf().getAlias());
+      PartitionDesc partDesc = 
conf.getPartitionDesc(tsOp.getConf().getAlias());
       Configuration newConf = 
tableNameToConf.get(partDesc.getTableDesc().getTableName());
       AbstractSerDe serde = partDesc.getTableDesc().getSerDe();
       partDesc.setProperties(partDesc.getProperties());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 3f87b123f17..37e91652fb8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -3772,7 +3772,7 @@ private static Path createDummyFileForEmptyTable(JobConf 
job, MapWork work,
       Path hiveScratchDir, String alias)
           throws Exception {
 
-    TableDesc tableDesc = work.getAliasToPartnInfo().get(alias).getTableDesc();
+    TableDesc tableDesc = work.getPartitionDesc(alias).getTableDesc();
     if (tableDesc.isNonNative()) {
       // if it does not need native storage, we can't create an empty file for 
it.
       return null;
@@ -3794,7 +3794,7 @@ private static Path createDummyFileForEmptyTable(JobConf 
job, MapWork work,
 
     work.setPathToAliases(pathToAliases);
 
-    PartitionDesc pDesc = work.getAliasToPartnInfo().get(alias).clone();
+    PartitionDesc pDesc = work.getPartitionDesc(alias).clone();
     work.addPathToPartitionInfo(newPath, pDesc);
 
     return newPath;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index d724d46cba3..e96ca2c0697 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -29,6 +29,7 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ThreadLocalRandom;
@@ -533,9 +534,13 @@ private void handleSampling(Context context, MapWork 
mWork, JobConf job)
 
     String alias = mWork.getAliases().get(0);
     Operator<?> topOp = mWork.getAliasToWork().get(alias);
-    PartitionDesc partDesc = mWork.getAliasToPartnInfo().get(alias);
+    PartitionDesc partDesc = mWork.getPartitionDesc(alias);
 
-    ArrayList<PartitionDesc> parts = mWork.getPartitionDescs();
+    List<PartitionDesc> parts = new ArrayList<>();
+    Iterator<PartitionDesc> partitionIterator = mWork.getPartitionDescs();
+    while (partitionIterator.hasNext()) {
+      parts.add(partitionIterator.next());
+    }
 
     List<Path> inputPaths = mWork.getPaths();
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
index f7a658ea924..4cf3522db38 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.exec.mr;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -83,7 +84,9 @@ public void configure(JobConf job) {
 
       // create map and fetch operators
       MapWork mrwork = Utilities.getMapWork(job);
-      for (PartitionDesc part : mrwork.getAliasToPartnInfo().values()) {
+      Iterator<PartitionDesc> partitionIterator = mrwork.getPartitionDescs();
+      while (partitionIterator.hasNext()) {
+        PartitionDesc part = partitionIterator.next();
         TableDesc tableDesc = part.getTableDesc();
         Utilities.copyJobSecretToTableProperties(tableDesc);
       }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index 5222e3b868d..cf7b78beff0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -341,7 +341,7 @@ public List<Event> initialize() throws Exception {
             // if files exists in input path then it has to be 1 as this code 
path gets triggered only
             // of order by queries which is expected to write only one file 
(written by one reducer)
             Preconditions.checkState(paths.size() == 1 && fileStatuses.length 
== 1 &&
-                mapWork.getAliasToPartnInfo().size() == 1,
+                mapWork.getPartitionCount() == 1,
               "Requested to generate single split. Paths and fileStatuses are 
expected to be 1. " +
                 "Got paths: " + paths.size() + " fileStatuses: " + 
fileStatuses.length);
             splits = new InputSplit[1];
@@ -354,7 +354,7 @@ public List<Event> initialize() throws Exception {
             String[] hosts = hostsSet.toArray(new String[0]);
             FileSplit fileSplit = new FileSplit(fileStatus.getPath(), 0, 
fileStatus.getLen(), hosts);
             String alias = mapWork.getAliases().get(0);
-            PartitionDesc partDesc = mapWork.getAliasToPartnInfo().get(alias);
+            PartitionDesc partDesc = mapWork.getPartitionDesc(alias);
             String partIF = partDesc.getInputFileFormatClassName();
             splits[0] = new HiveInputFormat.HiveInputSplit(fileSplit, partIF);
           }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 5a31f22b200..996a9fc7e5f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -22,6 +22,7 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -141,7 +142,9 @@ void init(MRTaskReporter mrReporter,
     // TODO HIVE-14042. Cleanup may be required if exiting early.
     Utilities.setMapWork(jconf, mapWork);
 
-    for (PartitionDesc part : mapWork.getAliasToPartnInfo().values()) {
+    Iterator<PartitionDesc> partitionIterator = mapWork.getPartitionDescs();
+    while (partitionIterator.hasNext()) {
+      PartitionDesc part = partitionIterator.next();
       TableDesc tableDesc = part.getTableDesc();
       Utilities.copyJobSecretToTableProperties(tableDesc);
     }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
index d091f0390a7..201d7332346 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
@@ -190,7 +190,7 @@ public Multimap<Integer, InputSplit> 
generateGroupedSplits(JobConf jobConf, Conf
         if ((aliases != null) && (aliases.size() == 1)) {
           Operator<? extends OperatorDesc> op = 
mapWork.getAliasToWork().get(aliases.getFirst());
           if (op instanceof TableScanOperator tableScan) {
-            PartitionDesc partitionDesc = 
mapWork.getAliasToPartnInfo().get(aliases.getFirst());
+            PartitionDesc partitionDesc = 
mapWork.getPartitionDesc(aliases.getFirst());
             isMinorCompaction &= 
AcidUtils.isCompactionTable(partitionDesc.getTableDesc().getProperties());
             if (!tableScan.getConf().isTranscationalTable() && 
!isMinorCompaction) {
               String splitPath = getFirstSplitPath(splits);
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryRecordReader.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryRecordReader.java
index be0a672bdb7..12906a1021c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/TeradataBinaryRecordReader.java
@@ -21,8 +21,8 @@
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Iterator;
 import java.util.Map;
-
 import com.google.common.collect.ImmutableMap;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.io.EndianUtils;
@@ -31,7 +31,6 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -85,10 +84,12 @@ public TeradataBinaryRecordReader(JobConf job, FileSplit 
fileSplit) throws IOExc
     String rowLength = job.get(TD_ROW_LENGTH);
     if (rowLength == null) {
       LOG.debug("No table property in JobConf. Try to recover the table 
directly");
-      Map<String, PartitionDesc> partitionDescMap = 
Utilities.getMapRedWork(job).getMapWork().getAliasToPartnInfo();
-      for (String alias : 
Utilities.getMapRedWork(job).getMapWork().getAliasToPartnInfo().keySet()) {
-        LOG.debug(format("the current alias: %s", alias));
-        rowLength = 
partitionDescMap.get(alias).getTableDesc().getProperties().getProperty(TD_ROW_LENGTH);
+      Iterator<org.apache.hadoop.hive.ql.plan.PartitionDesc> partitionIterator 
=
+          Utilities.getMapRedWork(job).getMapWork().getPartitionDescs();
+      while (partitionIterator.hasNext()) {
+        org.apache.hadoop.hive.ql.plan.PartitionDesc partitionDesc = 
partitionIterator.next();
+        LOG.debug("Checking partition metadata for row length");
+        rowLength = 
partitionDesc.getTableDesc().getProperties().getProperty(TD_ROW_LENGTH);
         if (rowLength != null) {
           break;
         }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 5af9ff1af4c..bd1be003a51 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -436,7 +436,7 @@ static void splitPlan(ReduceSinkOperator cRS, 
GenMRProcContext opProcCtx)
   /**
    * set the current task in the mapredWork.
    *
-   * @param alias_id
+   * @param aliasId
    *          current alias
    * @param topOp
    *          the top operator of the stack
@@ -447,16 +447,16 @@ static void splitPlan(ReduceSinkOperator cRS, 
GenMRProcContext opProcCtx)
    * @param opProcCtx
    *          processing context
    */
-  public static void setTaskPlan(String alias_id,
+  public static void setTaskPlan(String aliasId,
       TableScanOperator topOp, Task<?> task, boolean local,
       GenMRProcContext opProcCtx) throws SemanticException {
-    setTaskPlan(alias_id, topOp, task, local, opProcCtx, null);
+    setTaskPlan(aliasId, topOp, task, local, opProcCtx, null);
   }
 
   /**
    * set the current task in the mapredWork.
    *
-   * @param alias_id
+   * @param aliasId
    *          current alias
    * @param topOp
    *          the top operator of the stack
@@ -469,18 +469,18 @@ public static void setTaskPlan(String alias_id,
    * @param pList
    *          pruned partition list. If it is null it will be computed 
on-the-fly.
    */
-  public static void setTaskPlan(String alias_id,
+  public static void setTaskPlan(String aliasId,
       TableScanOperator topOp, Task<?> task, boolean local,
       GenMRProcContext opProcCtx, PrunedPartitionList pList) throws 
SemanticException {
     setMapWork(((MapredWork) task.getWork()).getMapWork(), 
opProcCtx.getParseCtx(),
-        opProcCtx.getInputs(), pList, topOp, alias_id, opProcCtx.getConf(), 
local);
+        opProcCtx.getInputs(), pList, topOp, aliasId, opProcCtx.getConf(), 
local);
     opProcCtx.addSeenOp(task, topOp);
   }
 
   /**
    * initialize MapWork
    *
-   * @param alias_id
+   * @param aliasId
    *          current alias
    * @param plan
    *          map work to initialize
@@ -494,7 +494,7 @@ public static void setTaskPlan(String alias_id,
    *          current instance of hive conf
    */
   public static void setMapWork(MapWork plan, ParseContext parseCtx, 
Set<ReadEntity> inputs,
-      PrunedPartitionList partsList, TableScanOperator tsOp, String alias_id,
+      PrunedPartitionList partsList, TableScanOperator tsOp, String aliasId,
       HiveConf conf, boolean local) throws SemanticException {
     ArrayList<Path> partDir = new ArrayList<Path>();
     ArrayList<PartitionDesc> partDesc = new ArrayList<PartitionDesc>();
@@ -513,12 +513,12 @@ public static void setMapWork(MapWork plan, ParseContext 
parseCtx, Set<ReadEntit
         partsList = new PrunedPartitionList(tab, null, Sets.newHashSet(new 
DummyPartition(tab)),
             Collections.emptyList(), false);
       } else {
-        partsList = PartitionPruner.prune(tsOp, parseCtx, alias_id);
+        partsList = PartitionPruner.prune(tsOp, parseCtx, aliasId);
         isFullAcidTable = tsOp.getConf().isFullAcidTable();
       }
     }
 
-    // Generate the map work for this alias_id
+    // Generate the map work for this aliasId
     // pass both confirmed and unknown partitions through the map-reduce
     // framework
     Set<Partition> parts = partsList.getPartitions();
@@ -544,7 +544,7 @@ public static void setMapWork(MapWork plan, ParseContext 
parseCtx, Set<ReadEntit
       target.putAll(props);
     }
 
-    plan.getAliasToPartnInfo().put(alias_id, aliasPartnDesc);
+    plan.putPartitionDesc(aliasId, aliasPartnDesc);
 
     long sizeNeeded = Integer.MAX_VALUE;
     int fileLimit = -1;
@@ -585,7 +585,7 @@ public static void setMapWork(MapWork plan, ParseContext 
parseCtx, Set<ReadEntit
     // where V is a view of the form: select * from T
     // The dependencies should include V at depth 0, and T at depth 1 
(inferred).
     Map<String, ReadEntity> viewToInput = parseCtx.getViewAliasToInput();
-    ReadEntity parentViewInfo = PlanUtils.getParentViewInfo(alias_id, 
viewToInput);
+    ReadEntity parentViewInfo = PlanUtils.getParentViewInfo(aliasId, 
viewToInput);
 
     // The table should also be considered a part of inputs, even if the table 
is a
     // partitioned table and whether any partition is selected or not
@@ -686,7 +686,7 @@ public static void setMapWork(MapWork plan, ParseContext 
parseCtx, Set<ReadEntit
         if (p == null) {
           continue;
         }
-        LOG.debug("Adding {} of table {}", p, alias_id);
+        LOG.debug("Adding {} of table {}", p, aliasId);
 
         partDir.add(p);
         try {
@@ -720,15 +720,15 @@ public static void setMapWork(MapWork plan, ParseContext 
parseCtx, Set<ReadEntit
         PartitionDesc prtDesc = iterPartnDesc.next();
 
         // Add the path to alias mapping
-        plan.addPathToAlias(path,alias_id);
+        plan.addPathToAlias(path,aliasId);
         plan.addPathToPartitionInfo(path, prtDesc);
         if (LOG.isDebugEnabled()) {
           LOG.debug("Information added for path " + path);
         }
       }
 
-      assert plan.getAliasToWork().get(alias_id) == null;
-      plan.getAliasToWork().put(alias_id, tsOp);
+      assert plan.getAliasToWork().get(aliasId) == null;
+      plan.getAliasToWork().put(aliasId, tsOp);
     } else {
       // populate local work if needed
       MapredLocalWork localPlan = plan.getMapRedLocalWork();
@@ -738,16 +738,16 @@ public static void setMapWork(MapWork plan, ParseContext 
parseCtx, Set<ReadEntit
             new LinkedHashMap<String, FetchWork>());
       }
 
-      assert localPlan.getAliasToWork().get(alias_id) == null;
-      assert localPlan.getAliasToFetchWork().get(alias_id) == null;
-      localPlan.getAliasToWork().put(alias_id, tsOp);
+      assert localPlan.getAliasToWork().get(aliasId) == null;
+      assert localPlan.getAliasToFetchWork().get(aliasId) == null;
+      localPlan.getAliasToWork().put(aliasId, tsOp);
       if (tblDir == null) {
         tblDesc = Utilities.getTableDesc(partsList.getSourceTable());
         localPlan.getAliasToFetchWork().put(
-            alias_id,
+            aliasId,
             new FetchWork(partDir, partDesc, tblDesc));
       } else {
-        localPlan.getAliasToFetchWork().put(alias_id,
+        localPlan.getAliasToFetchWork().put(aliasId,
             new FetchWork(tblDir, tblDesc));
       }
       plan.setMapRedLocalWork(localPlan);
@@ -1151,12 +1151,9 @@ public static void replaceMapWork(String sourceAlias, 
String targetAlias,
     Map<Path, List<String>> sourcePathToAliases = source.getPathToAliases();
     Map<Path, PartitionDesc> sourcePathToPartitionInfo = 
source.getPathToPartitionInfo();
     Map<String, Operator<? extends OperatorDesc>> sourceAliasToWork = 
source.getAliasToWork();
-    Map<String, PartitionDesc> sourceAliasToPartnInfo = 
source.getAliasToPartnInfo();
-
     Map<Path, List<String>> targetPathToAliases = target.getPathToAliases();
     Map<Path, PartitionDesc> targetPathToPartitionInfo = 
target.getPathToPartitionInfo();
     Map<String, Operator<? extends OperatorDesc>> targetAliasToWork = 
target.getAliasToWork();
-    Map<String, PartitionDesc> targetAliasToPartnInfo = 
target.getAliasToPartnInfo();
 
     if (!sourceAliasToWork.containsKey(sourceAlias) ||
         !targetAliasToWork.containsKey(targetAlias)) {
@@ -1174,7 +1171,7 @@ public static void replaceMapWork(String sourceAlias, 
String targetAlias,
 
     // Remove unnecessary information from target
     targetAliasToWork.remove(targetAlias);
-    targetAliasToPartnInfo.remove(targetAlias);
+    target.removeAlias(targetAlias);
     List<Path> pathsToRemove = new ArrayList<>();
     for (Entry<Path, List<String>> entry: targetPathToAliases.entrySet()) {
       List<String> aliases = entry.getValue();
@@ -1190,7 +1187,7 @@ public static void replaceMapWork(String sourceAlias, 
String targetAlias,
 
     // Add new information from source to target
     targetAliasToWork.put(sourceAlias, sourceAliasToWork.get(sourceAlias));
-    targetAliasToPartnInfo.putAll(sourceAliasToPartnInfo);
+    target.putPartitionDesc(sourceAlias, source.getPartitionDesc(sourceAlias));
     targetPathToPartitionInfo.putAll(sourcePathToPartitionInfo);
     List<Path> pathsToAdd = new ArrayList<>();
     for (Entry<Path, List<String>> entry: sourcePathToAliases.entrySet()) {
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
index b130a802ff4..ea5a40cb684 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
@@ -188,7 +188,7 @@ private static void genMapJoinLocalWork(MapredWork newWork, 
MapJoinOperator mapJ
       }
       // create fetchwork for partitioned table
       if (fetchWork == null) {
-        TableDesc table = 
newWork.getMapWork().getAliasToPartnInfo().get(alias).getTableDesc();
+        TableDesc table = 
newWork.getMapWork().getPartitionDesc(alias).getTableDesc();
         fetchWork = new FetchWork(partDir, partDesc, table);
       }
       // set alias to fetch work
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
index f17d111baef..ca9c8c707a3 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
@@ -271,7 +271,7 @@ public static void processSkewJoin(JoinOperator joinOp,
       PartitionDesc part = new PartitionDesc(tableDescList.get(src), null);
 
       newPlan.addPathToPartitionInfo(bigKeyDirPath, part);
-      newPlan.getAliasToPartnInfo().put(alias, part);
+      newPlan.putPartitionDesc(alias, part);
 
       Operator<? extends OperatorDesc> reducer = 
clonePlan.getReduceWork().getReducer();
       assert reducer instanceof JoinOperator;
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
index d67c5d72038..e92f88f8e7e 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
@@ -73,13 +73,12 @@ public SortMergeJoinTaskDispatcher(PhysicalContext context) 
{
   // plan are fixed. The operator tree will still contain the SMBJoinOperator
   private void genSMBJoinWork(MapWork currWork, SMBMapJoinOperator smbJoinOp) {
     // Remove the paths which are not part of aliasToPartitionInfo
-    Map<String, PartitionDesc> aliasToPartitionInfo = 
currWork.getAliasToPartnInfo();
     List<Path> removePaths = new ArrayList<>();
 
     for (Map.Entry<Path, List<String>> entry : 
currWork.getPathToAliases().entrySet()) {
       boolean keepPath = false;
       for (String alias : entry.getValue()) {
-        if (aliasToPartitionInfo.containsKey(alias)) {
+        if (currWork.hasPartitionDesc(alias)) {
           keepPath = true;
           break;
         }
@@ -99,7 +98,7 @@ private void genSMBJoinWork(MapWork currWork, 
SMBMapJoinOperator smbJoinOp) {
     }
 
     for (String alias : removeAliases) {
-      currWork.getAliasToPartnInfo().remove(alias);
+      currWork.removeAlias(alias);
       currWork.getAliasToWork().remove(alias);
     }
 
@@ -115,7 +114,7 @@ private void genSMBJoinWork(MapWork currWork, 
SMBMapJoinOperator smbJoinOp) {
       // Add the entry in mapredwork
       currWork.getAliasToWork().put(alias, op);
 
-      PartitionDesc partitionInfo = currWork.getAliasToPartnInfo().get(alias);
+      PartitionDesc partitionInfo = currWork.getPartitionDesc(alias);
       if (fetchWork.getTblDir() != null) {
         currWork.mergeAliasedInput(alias, fetchWork.getTblDir(), 
partitionInfo);
       } else {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index fa0f1e7b7f5..a18508a8cce 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -47,6 +47,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Interner;
+import com.google.common.collect.Iterators;
 
 /**
  * MapWork represents all the information used to run a map task on the 
cluster.
@@ -92,7 +93,7 @@ public enum LlapIODescriptor {
   private Map<String, Operator<? extends OperatorDesc>> aliasToWork =
       new LinkedHashMap<String, Operator<? extends OperatorDesc>>();
 
-  private Map<String, PartitionDesc> aliasToPartnInfo = new 
LinkedHashMap<String, PartitionDesc>();
+  private Map<String, PartitionDesc> aliasToPartnInfo = new LinkedHashMap<>();
 
   private Map<String, SplitSample> nameToSplitSample = new 
LinkedHashMap<String, SplitSample>();
 
@@ -364,20 +365,28 @@ public void internTable(Interner<TableDesc> interner) {
     }
   }
 
-  /**
-   * @return the aliasToPartnInfo
-   */
-  public Map<String, PartitionDesc> getAliasToPartnInfo() {
-    return aliasToPartnInfo;
+  public Iterator<PartitionDesc> getPartitionDescs() {
+    return 
Iterators.unmodifiableIterator(aliasToPartnInfo.values().iterator());
   }
 
-  /**
-   * @param aliasToPartnInfo
-   *          the aliasToPartnInfo to set
-   */
-  public void setAliasToPartnInfo(
-      LinkedHashMap<String, PartitionDesc> aliasToPartnInfo) {
-    this.aliasToPartnInfo = aliasToPartnInfo;
+  public PartitionDesc getPartitionDesc(String alias) {
+    return aliasToPartnInfo.get(alias);
+  }
+
+  public int getPartitionCount() {
+    return aliasToPartnInfo.size();
+  }
+
+  public boolean hasPartitionDesc(String alias) {
+    return aliasToPartnInfo.containsKey(alias);
+  }
+
+  public void putPartitionDesc(String alias, PartitionDesc partitionDesc) {
+    aliasToPartnInfo.put(alias, partitionDesc);
+  }
+
+  public void removeAlias(String alias) {
+    aliasToPartnInfo.remove(alias);
   }
 
   public Map<String, Operator<? extends OperatorDesc>> getAliasToWork() {
@@ -606,10 +615,6 @@ public ArrayList<Path> getPaths() {
     return new ArrayList<Path>(pathToAliases.keySet());
   }
 
-  public ArrayList<PartitionDesc> getPartitionDescs() {
-    return new ArrayList<PartitionDesc>(aliasToPartnInfo.values());
-  }
-
   public Path getTmpHDFSPath() {
     return tmpHDFSPath;
   }
@@ -655,22 +660,24 @@ public String getSamplingTypeString() {
         samplingType == 2 ? "SAMPLING_ON_START" : null;
   }
 
+  public Collection<TableDesc> getDistinctTableDescs() {
+    Map<String, TableDesc> tables = new LinkedHashMap<>();
+    for (PartitionDesc partition : aliasToPartnInfo.values()) {
+      TableDesc tableDesc = partition.getTableDesc();
+      if (tableDesc != null) {
+        tables.putIfAbsent(tableDesc.getTableName(), tableDesc);
+      }
+    }
+    return Collections.unmodifiableCollection(tables.values());
+  }
+
   @Override
   public void configureJobConf(JobConf job) {
     super.configureJobConf(job);
     // Configure each table only once, even if we read thousands of its 
partitions.
     // This avoids repeating expensive work (like loading storage drivers) for 
every single partition.
-    Set<String> processedTables = new HashSet<>();
-
-    for (PartitionDesc partition : aliasToPartnInfo.values()) {
-      TableDesc tableDesc = partition.getTableDesc();
-
-      // If we haven't seen this table before, configure it and remember it.
-      // If we have seen it, skip it.
-      if (tableDesc != null && 
!processedTables.contains(tableDesc.getTableName())) {
-        processedTables.add(tableDesc.getTableName());
-        PlanUtils.configureJobConf(tableDesc, job);
-      }
+    for (TableDesc tableDesc : getDistinctTableDescs()) {
+      PlanUtils.configureJobConf(tableDesc, job);
     }
     Collection<Operator<?>> mappers = aliasToWork.values();
     for (IConfigureJobConf icjc : OperatorUtils.findOperators(mappers, 
IConfigureJobConf.class)) {

Reply via email to