HIVE-10302 Load small tables (for map join) in executor memory only once [Spark Branch] (added new file missed in rebasing)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9bc8ea2a Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9bc8ea2a Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9bc8ea2a Branch: refs/heads/spark Commit: 9bc8ea2ae25e2ef766604068ac143eeeaa376929 Parents: 3817bb5 Author: Jimmy Xiang <jxi...@cloudera.com> Authored: Mon Jun 1 15:59:32 2015 -0700 Committer: Jimmy Xiang <jxi...@cloudera.com> Committed: Mon Jun 1 15:59:32 2015 -0700 ---------------------------------------------------------------------- .../hive/ql/exec/spark/SmallTableCache.java | 73 ++++++++++++++++++++ 1 file changed, 73 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/9bc8ea2a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SmallTableCache.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SmallTableCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SmallTableCache.java new file mode 100644 index 0000000..1992d16 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SmallTableCache.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark; + +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; + +public class SmallTableCache { + private static final Log LOG = LogFactory.getLog(SmallTableCache.class.getName()); + + private static final ConcurrentHashMap<Path, MapJoinTableContainer> + tableContainerMap = new ConcurrentHashMap<Path, MapJoinTableContainer>(); + private static volatile String queryId; + + /** + * Check if this is a new query. If so, clean up the cache + * that is for the previous query, and reset the current query id. + */ + public static void initialize(Configuration conf) { + String currentQueryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname); + if (!currentQueryId.equals(queryId)) { + if (!tableContainerMap.isEmpty()) { + synchronized (tableContainerMap) { + if (!currentQueryId.equals(queryId) && !tableContainerMap.isEmpty()) { + for (MapJoinTableContainer tableContainer: tableContainerMap.values()) { + tableContainer.clear(); + } + tableContainerMap.clear(); + if (LOG.isDebugEnabled()) { + LOG.debug("Cleaned up small table cache for query " + queryId); + } + } + } + } + queryId = currentQueryId; + } + } + + public static void cache(Path path, MapJoinTableContainer tableContainer) { + if (tableContainerMap.putIfAbsent(path, tableContainer) == null && LOG.isDebugEnabled()) { + LOG.debug("Cached small table file " + path + " for query " + queryId); + } + } + + public static MapJoinTableContainer get(Path path) { + MapJoinTableContainer tableContainer = tableContainerMap.get(path); + if (tableContainer != null && LOG.isDebugEnabled()) { + LOG.debug("Loaded small table file " + path + " from cache for query " + queryId); + } + return tableContainer; + } +}