Repository: hive Updated Branches: refs/heads/spark 77ecb5706 -> d64ca1fe6
HIVE-10302: Cache small tables in memory [Spark Branch] (Jimmy, reviewed by Xuefu) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d64ca1fe Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d64ca1fe Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d64ca1fe Branch: refs/heads/spark Commit: d64ca1fe68fb9a7d44f888b340dac8bbb4a50fcc Parents: 77ecb57 Author: Jimmy Xiang <[email protected]> Authored: Mon Apr 13 11:09:23 2015 -0700 Committer: Jimmy Xiang <[email protected]> Committed: Fri Apr 24 11:15:38 2015 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/MapJoinOperator.java | 6 +- .../hive/ql/exec/spark/HashTableLoader.java | 23 +++++- .../ql/exec/spark/HivePairFlatMapFunction.java | 1 + .../hive/ql/exec/spark/SmallTableCache.java | 73 ++++++++++++++++++++ .../hive/ql/exec/spark/SparkUtilities.java | 6 ++ 5 files changed, 105 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d64ca1fe/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 9c3ec8e..111fc7d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.HashTableLoaderFactory; import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; @@ -49,6 +50,7 @@ import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.KeyValueContainer; import org.apache.hadoop.hive.ql.exec.persistence.ObjectContainer; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -457,7 +459,9 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem } if ((this.getExecContext() != null) && (this.getExecContext().getLocalWork() != null) && (this.getExecContext().getLocalWork().getInputFileChangeSensitive()) - && mapJoinTables != null) { + && mapJoinTables != null + && !(HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE).equals("spark") + && SparkUtilities.isDedicatedCluster(hconf))) { for (MapJoinTableContainer tableContainer : mapJoinTables) { if (tableContainer != null) { tableContainer.clear(); http://git-wip-us.apache.org/repos/asf/hive/blob/d64ca1fe/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java index fe108c4..6be4ab3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java @@ -113,15 +113,32 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable } String fileName = localWork.getBucketFileName(bigInputPath); Path path = Utilities.generatePath(baseDir, desc.getDumpFilePrefix(), (byte) pos, fileName); - LOG.info("\tLoad back all hashtable files from tmp folder uri:" + path); - mapJoinTables[pos] = mapJoinTableSerdes[pos].load(fs, path); + mapJoinTables[pos] = load(fs, path, mapJoinTableSerdes[pos]); } } catch (Exception e) { throw new HiveException(e); } } - @SuppressWarnings("unchecked") + private MapJoinTableContainer load(FileSystem fs, Path path, + MapJoinTableContainerSerDe mapJoinTableSerde) throws HiveException { + LOG.info("\tLoad back all hashtable files from tmp folder uri:" + path); + if (!SparkUtilities.isDedicatedCluster(hconf)) { + return mapJoinTableSerde.load(fs, path); + } + MapJoinTableContainer mapJoinTable = SmallTableCache.get(path); + if (mapJoinTable == null) { + synchronized (path.toString().intern()) { + mapJoinTable = SmallTableCache.get(path); + if (mapJoinTable == null) { + mapJoinTable = mapJoinTableSerde.load(fs, path); + SmallTableCache.cache(path, mapJoinTable); + } + } + } + return mapJoinTable; + } + private void loadDirectly(MapJoinTableContainer[] mapJoinTables, String inputFileName) throws Exception { MapredLocalWork localWork = context.getLocalWork(); http://git-wip-us.apache.org/repos/asf/hive/blob/d64ca1fe/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java index 2f137f9..7df626b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java @@ -48,6 +48,7 @@ public abstract class HivePairFlatMapFunction<T, K, V> implements PairFlatMapFun protected void initJobConf() { if (jobConf == null) { jobConf = KryoSerializer.deserializeJobConf(this.buffer); + SmallTableCache.initialize(jobConf); setupMRLegacyConfigs(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/d64ca1fe/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SmallTableCache.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SmallTableCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SmallTableCache.java new file mode 100644 index 0000000..1992d16 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SmallTableCache.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark; + +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; + +public class SmallTableCache { + private static final Log LOG = LogFactory.getLog(SmallTableCache.class.getName()); + + private static final ConcurrentHashMap<Path, MapJoinTableContainer> + tableContainerMap = new ConcurrentHashMap<Path, MapJoinTableContainer>(); + private static volatile String queryId; + + /** + * Check if this is a new query. If so, clean up the cache + * that is for the previous query, and reset the current query id. + */ + public static void initialize(Configuration conf) { + String currentQueryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname); + if (!currentQueryId.equals(queryId)) { + if (!tableContainerMap.isEmpty()) { + synchronized (tableContainerMap) { + if (!currentQueryId.equals(queryId) && !tableContainerMap.isEmpty()) { + for (MapJoinTableContainer tableContainer: tableContainerMap.values()) { + tableContainer.clear(); + } + tableContainerMap.clear(); + if (LOG.isDebugEnabled()) { + LOG.debug("Cleaned up small table cache for query " + queryId); + } + } + } + } + queryId = currentQueryId; + } + } + + public static void cache(Path path, MapJoinTableContainer tableContainer) { + if (tableContainerMap.putIfAbsent(path, tableContainer) == null && LOG.isDebugEnabled()) { + LOG.debug("Cached small table file " + path + " for query " + queryId); + } + } + + public static MapJoinTableContainer get(Path path) { + MapJoinTableContainer tableContainer = tableContainerMap.get(path); + if (tableContainer != null && LOG.isDebugEnabled()) { + LOG.debug("Loaded small table file " + path + " from cache for query " + queryId); + } + return tableContainer; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/d64ca1fe/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index 72ab913..8499933 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -23,6 +23,7 @@ import java.net.URI; import java.net.URISyntaxException; import org.apache.commons.io.FilenameUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -99,6 +100,11 @@ public class SparkUtilities { return name; } + public static boolean isDedicatedCluster(Configuration conf) { + String master = conf.get("spark.master"); + return master.startsWith("yarn-") || master.startsWith("local"); + } + public static SparkSession getSparkSession(HiveConf conf, SparkSessionManager sparkSessionManager) throws HiveException { SparkSession sparkSession = SessionState.get().getSparkSession();
