Repository: hive Updated Branches: refs/heads/master 9a02aa86b -> 5ddd5851f
HIVE-18665: LLAP: Ignore cache-affinity if the LLAP IO elevator is disabled (Gopal V, reviewed by Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5ddd5851 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5ddd5851 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5ddd5851 Branch: refs/heads/master Commit: 5ddd5851f179f265a7bf912656e1cc4c87a1a7a0 Parents: 9a02aa8 Author: Gopal V <gop...@apache.org> Authored: Tue Feb 13 10:23:01 2018 -0800 Committer: Gopal V <gop...@apache.org> Committed: Tue Feb 13 10:23:08 2018 -0800 ---------------------------------------------------------------------- .../hive/ql/exec/tez/HiveSplitGenerator.java | 10 ++-- .../apache/hadoop/hive/ql/exec/tez/Utils.java | 12 ++++- .../org/apache/hadoop/hive/ql/plan/MapWork.java | 49 ++++++++++++++------ 3 files changed, 52 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/5ddd5851/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index f3aa151..57f6c66 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -97,7 +97,8 @@ public class HiveSplitGenerator extends InputInitializer { // Assuming grouping enabled always. userPayloadProto = MRInputUserPayloadProto.newBuilder().setGroupingEnabled(true).build(); - this.splitLocationProvider = Utils.getSplitLocationProvider(conf, LOG); + this.splitLocationProvider = + Utils.getSplitLocationProvider(conf, work.getCacheAffinity(), LOG); LOG.info("SplitLocationProvider: " + splitLocationProvider); // Read all credentials into the credentials instance stored in JobConf. @@ -123,14 +124,15 @@ public class HiveSplitGenerator extends InputInitializer { this.jobConf = new JobConf(conf); - this.splitLocationProvider = Utils.getSplitLocationProvider(conf, LOG); - LOG.info("SplitLocationProvider: " + splitLocationProvider); - // Read all credentials into the credentials instance stored in JobConf. ShimLoader.getHadoopShims().getMergedCredentials(jobConf); this.work = Utilities.getMapWork(jobConf); + this.splitLocationProvider = + Utils.getSplitLocationProvider(conf, work.getCacheAffinity(), LOG); + LOG.info("SplitLocationProvider: " + splitLocationProvider); + // Events can start coming in the moment the InputInitializer is created. The pruner // must be setup and initialized here so that it sets up it's structures to start accepting events. // Setting it up in initialize leads to a window where events may come in before the pruner is http://git-wip-us.apache.org/repos/asf/hive/blob/5ddd5851/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java index b33f027..bc438bb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java @@ -32,11 +32,19 @@ import org.apache.hadoop.mapred.split.SplitLocationProvider; import org.slf4j.Logger; public class Utils { - public static SplitLocationProvider getSplitLocationProvider(Configuration conf, Logger LOG) throws + + public static SplitLocationProvider getSplitLocationProvider(Configuration conf, Logger LOG) + throws IOException { + // fall back to checking confs + return getSplitLocationProvider(conf, true, LOG); + } + + public static SplitLocationProvider getSplitLocationProvider(Configuration conf, boolean useCacheAffinity, Logger LOG) throws IOException { boolean useCustomLocations = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE).equals("llap") - && HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS); + && HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS) + && useCacheAffinity; SplitLocationProvider splitLocationProvider; LOG.info("SplitGenerator using llap affinitized locations: " + useCustomLocations); if (useCustomLocations) { http://git-wip-us.apache.org/repos/asf/hive/blob/5ddd5851/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index fa7a8a3..9298630 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -73,6 +73,25 @@ import com.google.common.collect.Interner; @SuppressWarnings({"serial"}) public class MapWork extends BaseWork { + public enum LlapIODescriptor { + DISABLED(null, false), + NO_INPUTS("no inputs", false), + UNKNOWN("unknown", false), + SOME_INPUTS("some inputs", false), + ACID("may be used (ACID table)", true), + ALL_INPUTS("all inputs", true), + CACHE_ONLY("all inputs (cache only)", true); + + final String desc; + final boolean cached; + + LlapIODescriptor(String desc, boolean cached) { + this.desc = desc; + this.cached = cached; + } + + } + // use LinkedHashMap to make sure the iteration order is // deterministic, to ease testing private LinkedHashMap<Path, ArrayList<String>> pathToAliases = new LinkedHashMap<>(); @@ -153,7 +172,7 @@ public class MapWork extends BaseWork { private byte[] includedBuckets; /** Whether LLAP IO will be used for inputs. */ - private String llapIoDesc; + private LlapIODescriptor llapIoDesc; private boolean isMergeFromResolver; @@ -295,32 +314,32 @@ public class MapWork extends BaseWork { isLlapOn, canWrapAny, hasPathToPartInfo, hasLlap, hasNonLlap, hasAcid, hasCacheOnly); } - private static String deriveLlapIoDescString(boolean isLlapOn, boolean canWrapAny, + private static LlapIODescriptor deriveLlapIoDescString(boolean isLlapOn, boolean canWrapAny, boolean hasPathToPartInfo, boolean hasLlap, boolean hasNonLlap, boolean hasAcid, boolean hasCacheOnly) { if (!isLlapOn) { - return null; // LLAP IO is off, don't output. + return LlapIODescriptor.DISABLED; // LLAP IO is off, don't output. } if (!canWrapAny && !hasCacheOnly) { - return "no inputs"; // Cannot use with input formats. + return LlapIODescriptor.NO_INPUTS; //"no inputs"; // Cannot use with input formats. } if (!hasPathToPartInfo) { - return "unknown"; // No information to judge. + return LlapIODescriptor.UNKNOWN; //"unknown"; // No information to judge. } int varieties = (hasAcid ? 1 : 0) + (hasLlap ? 1 : 0) + (hasCacheOnly ? 1 : 0) + (hasNonLlap ? 1 : 0); if (varieties > 1) { - return "some inputs"; // Will probably never actually happen. + return LlapIODescriptor.SOME_INPUTS; //"some inputs"; // Will probably never actually happen. } if (hasAcid) { - return "may be used (ACID table)"; + return LlapIODescriptor.ACID; //"may be used (ACID table)"; } if (hasLlap) { - return "all inputs"; + return LlapIODescriptor.ALL_INPUTS; } if (hasCacheOnly) { - return "all inputs (cache only)"; + return LlapIODescriptor.CACHE_ONLY; } - return "no inputs"; + return LlapIODescriptor.NO_INPUTS; } public void internTable(Interner<TableDesc> interner) { @@ -370,11 +389,15 @@ public class MapWork extends BaseWork { } @Explain(displayName = "LLAP IO", vectorization = Vectorization.SUMMARY_PATH) - public String getLlapIoDesc() { - return llapIoDesc; + public String getLlapIoDescString() { + return llapIoDesc.desc; + } + + public boolean getCacheAffinity() { + return llapIoDesc.cached; } - public void setNameToSplitSample(HashMap<String, SplitSample> nameToSplitSample) { + public void setNameToSplitSample(HashMap<String, SplitSample> nameToSplitSample) { this.nameToSplitSample = nameToSplitSample; }