HIVE-18513: Query results caching (Jason Dere, reviewed by Jesus Camacho Rodriguez)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1733a371 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1733a371 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1733a371 Branch: refs/heads/master Commit: 1733a3712f0c548f9181e6e5b5298f466cad755e Parents: 075077d Author: Jason Dere <jd...@hortonworks.com> Authored: Wed Feb 7 13:39:41 2018 -0800 Committer: Jason Dere <jd...@hortonworks.com> Committed: Wed Feb 7 13:39:41 2018 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 21 + .../apache/hadoop/hive/ql/log/PerfLogger.java | 1 + data/conf/hive-site.xml | 5 + data/conf/llap/hive-site.xml | 5 + data/conf/perf-reg/spark/hive-site.xml | 5 + data/conf/perf-reg/tez/hive-site.xml | 5 + data/conf/rlist/hive-site.xml | 5 + data/conf/spark/local/hive-site.xml | 5 + data/conf/spark/standalone/hive-site.xml | 5 + data/conf/spark/yarn-client/hive-site.xml | 5 + data/conf/tez/hive-site.xml | 5 + .../src/test/resources/hive-site.xml | 5 + .../test/resources/testconfiguration.properties | 1 + .../org/apache/hadoop/hive/ql/QTestUtil.java | 4 + .../java/org/apache/hadoop/hive/ql/Driver.java | 64 ++ .../hive/ql/cache/results/CacheUsage.java | 73 ++ .../ql/cache/results/QueryResultsCache.java | 666 +++++++++++++++++++ .../apache/hadoop/hive/ql/exec/Utilities.java | 33 + .../ql/optimizer/calcite/HiveCalciteUtil.java | 38 ++ .../HiveRelOpMaterializationValidator.java | 285 ++++++++ .../hive/ql/parse/BaseSemanticAnalyzer.java | 13 + .../hadoop/hive/ql/parse/CalcitePlanner.java | 11 + .../hadoop/hive/ql/parse/QBParseInfo.java | 3 + .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 273 ++++++++ .../apache/hadoop/hive/ql/plan/FetchWork.java | 13 + .../hadoop/hive/ql/session/SessionState.java | 22 +- .../queries/clientpositive/results_cache_1.q | 96 +++ .../queries/clientpositive/results_cache_2.q | 41 ++ .../clientpositive/results_cache_capacity.q | 52 ++ .../clientpositive/results_cache_lifetime.q | 14 + .../clientpositive/results_cache_temptable.q | 42 ++ .../clientpositive/results_cache_with_masking.q | 17 + .../clientpositive/llap/results_cache_1.q.out | 584 ++++++++++++++++ .../clientpositive/results_cache_1.q.out | 579 ++++++++++++++++ .../clientpositive/results_cache_2.q.out | 176 +++++ .../clientpositive/results_cache_capacity.q.out | 238 +++++++ .../clientpositive/results_cache_lifetime.q.out | 112 ++++ .../results_cache_temptable.q.out | 293 ++++++++ .../results_cache_with_masking.q.out | 106 +++ .../apache/hive/service/server/HiveServer2.java | 10 + 40 files changed, 3910 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 26e08e4..67e22f6 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3707,6 +3707,27 @@ public class HiveConf extends Configuration { HIVE_EXEC_INPUT_LISTING_MAX_THREADS("hive.exec.input.listing.max.threads", 0, new SizeValidator(0L, true, 1024L, true), "Maximum number of threads that Hive uses to list file information from file systems (recommended > 1 for blobstore)."), + HIVE_QUERY_RESULTS_CACHE_ENABLED("hive.query.results.cache.enabled", true, + "If the query results cache is enabled. This will keep results of previously executed queries " + + "to be reused if the same query is executed again."), + + HIVE_QUERY_RESULTS_CACHE_DIRECTORY("hive.query.results.cache.directory", + "/tmp/hive/_resultscache_", + "Location of the query results cache directory. Temporary results from queries " + + "will be moved to this location."), + + HIVE_QUERY_RESULTS_CACHE_MAX_ENTRY_LIFETIME("hive.query.results.cache.max.entry.lifetime", "3600s", + new TimeValidator(TimeUnit.SECONDS), + "Maximum lifetime in seconds for an entry in the query results cache. A nonpositive value means infinite."), + + HIVE_QUERY_RESULTS_CACHE_MAX_SIZE("hive.query.results.cache.max.size", + (long) 2 * 1024 * 1024 * 1024, + "Maximum total size in bytes that the query results cache directory is allowed to use on the filesystem."), + + HIVE_QUERY_RESULTS_CACHE_MAX_ENTRY_SIZE("hive.query.results.cache.max.entry.size", + (long) 10 * 1024 * 1024, + "Maximum size in bytes that a single query result is allowed to use in the results cache directory"), + /* BLOBSTORE section */ HIVE_BLOBSTORE_SUPPORTED_SCHEMES("hive.blobstore.supported.schemes", "s3,s3a,s3n", http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java index 2767bca..764a832 100644 --- a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java +++ b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java @@ -71,6 +71,7 @@ public class PerfLogger { public static final String TEZ_INIT_OPERATORS = "TezInitializeOperators"; public static final String LOAD_HASHTABLE = "LoadHashtable"; public static final String TEZ_GET_SESSION = "TezGetSession"; + public static final String SAVE_TO_RESULTS_CACHE = "saveToResultsCache"; public static final String SPARK_SUBMIT_TO_RUNNING = "SparkSubmitToRunning"; public static final String SPARK_BUILD_PLAN = "SparkBuildPlan"; http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/hive-site.xml ---------------------------------------------------------------------- diff --git a/data/conf/hive-site.xml b/data/conf/hive-site.xml index 01f83d1..b56cbd2 100644 --- a/data/conf/hive-site.xml +++ b/data/conf/hive-site.xml @@ -328,4 +328,9 @@ <value>99</value> </property> +<property> + <name>hive.query.results.cache.enabled</name> + <value>false</value> +</property> + </configuration> http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/llap/hive-site.xml ---------------------------------------------------------------------- diff --git a/data/conf/llap/hive-site.xml b/data/conf/llap/hive-site.xml index cdda875..c4c299c 100644 --- a/data/conf/llap/hive-site.xml +++ b/data/conf/llap/hive-site.xml @@ -348,4 +348,9 @@ <value>99</value> </property> +<property> + <name>hive.query.results.cache.enabled</name> + <value>false</value> +</property> + </configuration> http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/perf-reg/spark/hive-site.xml ---------------------------------------------------------------------- diff --git a/data/conf/perf-reg/spark/hive-site.xml b/data/conf/perf-reg/spark/hive-site.xml index 497a61f..5ca660d 100644 --- a/data/conf/perf-reg/spark/hive-site.xml +++ b/data/conf/perf-reg/spark/hive-site.xml @@ -265,4 +265,9 @@ <value>org.apache.hadoop.hive.metastore.ObjectStore</value> </property> +<property> + <name>hive.query.results.cache.enabled</name> + <value>false</value> +</property> + </configuration> http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/perf-reg/tez/hive-site.xml ---------------------------------------------------------------------- diff --git a/data/conf/perf-reg/tez/hive-site.xml b/data/conf/perf-reg/tez/hive-site.xml index 012369f..62ecb74 100644 --- a/data/conf/perf-reg/tez/hive-site.xml +++ b/data/conf/perf-reg/tez/hive-site.xml @@ -282,4 +282,9 @@ <value>true</value> </property> +<property> + <name>hive.query.results.cache.enabled</name> + <value>false</value> +</property> + </configuration> http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/rlist/hive-site.xml ---------------------------------------------------------------------- diff --git a/data/conf/rlist/hive-site.xml b/data/conf/rlist/hive-site.xml index 9de00e5..630e481 100644 --- a/data/conf/rlist/hive-site.xml +++ b/data/conf/rlist/hive-site.xml @@ -319,4 +319,9 @@ <value>99</value> </property> +<property> + <name>hive.query.results.cache.enabled</name> + <value>false</value> +</property> + </configuration> http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/spark/local/hive-site.xml ---------------------------------------------------------------------- diff --git a/data/conf/spark/local/hive-site.xml b/data/conf/spark/local/hive-site.xml index fd0e6a0..8ff6256 100644 --- a/data/conf/spark/local/hive-site.xml +++ b/data/conf/spark/local/hive-site.xml @@ -261,4 +261,9 @@ <value>false</value> </property> +<property> + <name>hive.query.results.cache.enabled</name> + <value>false</value> +</property> + </configuration> http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/spark/standalone/hive-site.xml ---------------------------------------------------------------------- diff --git a/data/conf/spark/standalone/hive-site.xml b/data/conf/spark/standalone/hive-site.xml index 7095979..84851c7 100644 --- a/data/conf/spark/standalone/hive-site.xml +++ b/data/conf/spark/standalone/hive-site.xml @@ -266,4 +266,9 @@ <value>false</value> </property> +<property> + <name>hive.query.results.cache.enabled</name> + <value>false</value> +</property> + </configuration> http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/spark/yarn-client/hive-site.xml ---------------------------------------------------------------------- diff --git a/data/conf/spark/yarn-client/hive-site.xml b/data/conf/spark/yarn-client/hive-site.xml index a9a788b..6c63362 100644 --- a/data/conf/spark/yarn-client/hive-site.xml +++ b/data/conf/spark/yarn-client/hive-site.xml @@ -306,4 +306,9 @@ <value>false</value> </property> +<property> + <name>hive.query.results.cache.enabled</name> + <value>false</value> +</property> + </configuration> http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/tez/hive-site.xml ---------------------------------------------------------------------- diff --git a/data/conf/tez/hive-site.xml b/data/conf/tez/hive-site.xml index 4519678..236adc7 100644 --- a/data/conf/tez/hive-site.xml +++ b/data/conf/tez/hive-site.xml @@ -293,4 +293,9 @@ <value>99</value> </property> +<property> + <name>hive.query.results.cache.enabled</name> + <value>false</value> +</property> + </configuration> http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/itests/hive-blobstore/src/test/resources/hive-site.xml ---------------------------------------------------------------------- diff --git a/itests/hive-blobstore/src/test/resources/hive-site.xml b/itests/hive-blobstore/src/test/resources/hive-site.xml index 038db0d..775c559 100644 --- a/itests/hive-blobstore/src/test/resources/hive-site.xml +++ b/itests/hive-blobstore/src/test/resources/hive-site.xml @@ -284,6 +284,11 @@ <value>hdfs,pfile,file,s3,s3a,pblob</value> </property> + <property> + <name>hive.query.results.cache.enabled</name> + <value>false</value> + </property> + <!-- To run these tests: # Create a file blobstore-conf.xml - DO NOT ADD TO REVISION CONTROL http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 9a76b85..2a22db9 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -254,6 +254,7 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\ ptf.q,\ ptf_matchpath.q,\ ptf_streaming.q,\ + results_cache_1.q,\ sample1.q,\ selectDistinctStar.q,\ select_dummy_source.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java ---------------------------------------------------------------------- diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index 2d0aca0..fcce531 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -101,6 +101,7 @@ import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster; import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -1013,6 +1014,9 @@ public class QTestUtil { return; } + // Remove any cached results from the previous test. + QueryResultsCache.cleanupInstance(); + // allocate and initialize a new conf since a test can // modify conf by using 'set' commands conf = new HiveConf(IDriver.class); http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 2d7e459..c6f7d64 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -55,6 +55,9 @@ import org.apache.hadoop.hive.metastore.ColumnType; import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.ql.cache.results.CacheUsage; +import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; +import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.DagUtils; import org.apache.hadoop.hive.ql.exec.ExplainTask; @@ -188,6 +191,9 @@ public class Driver implements IDriver { // either initTxnMgr or from the SessionState, in that order. private HiveTxnManager queryTxnMgr; + private CacheUsage cacheUsage; + private CacheEntry usedCacheEntry; + private enum DriverState { INITIALIZED, COMPILING, @@ -638,6 +644,11 @@ public class Driver implements IDriver { } LOG.info("Semantic Analysis Completed"); + // Retrieve information about cache usage for the query. + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)) { + cacheUsage = sem.getCacheUsage(); + } + // validate the plan sem.validate(); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE); @@ -1788,6 +1799,45 @@ public class Driver implements IDriver { return new CommandProcessorResponse(ret, errorMessage, SQLState, downstreamError); } + private void useFetchFromCache(CacheEntry cacheEntry) { + // Change query FetchTask to use new location specified in results cache. + FetchTask fetchTaskFromCache = (FetchTask) TaskFactory.get(cacheEntry.getFetchWork(), conf); + fetchTaskFromCache.initialize(queryState, plan, null, ctx.getOpContext()); + plan.setFetchTask(fetchTaskFromCache); + cacheUsage = new CacheUsage(CacheUsage.CacheStatus.QUERY_USING_CACHE, cacheEntry); + } + + private void checkCacheUsage() throws Exception { + if (cacheUsage != null) { + if (cacheUsage.getStatus() == CacheUsage.CacheStatus.QUERY_USING_CACHE) { + // Using a previously cached result. + CacheEntry cacheEntry = cacheUsage.getCacheEntry(); + + // Reader count already incremented during cache lookup. + // Save to usedCacheEntry to ensure reader is released after query. + usedCacheEntry = cacheEntry; + } else if (cacheUsage.getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS && + plan.getFetchTask() != null) { + // The query could not be resolved using the cache, but the query results + // can be added to the cache for future queries to use. + PerfLogger perfLogger = SessionState.getPerfLogger(); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE); + + CacheEntry savedCacheEntry = + QueryResultsCache.getInstance().addToCache( + cacheUsage.getQueryInfo(), + plan.getFetchTask().getWork()); + if (savedCacheEntry != null) { + useFetchFromCache(savedCacheEntry); + // addToCache() already increments the reader count. Set usedCacheEntry so it gets released. + usedCacheEntry = savedCacheEntry; + } + + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE); + } + } + } + private void execute() throws CommandProcessorResponse { PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE); @@ -2002,6 +2052,8 @@ public class Driver implements IDriver { } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RUN_TASKS); + checkCacheUsage(); + // in case we decided to run everything in local mode, restore the // the jobtracker setting to its initial value ctx.restoreOriginalTracker(); @@ -2410,12 +2462,23 @@ public class Driver implements IDriver { LOG.debug(" Exception while clearing the FetchTask ", e); } } + + private void releaseCachedResult() { + // Assumes the reader count has been incremented automatically by the results cache by either + // lookup or creating the cache entry. + if (usedCacheEntry != null) { + usedCacheEntry.releaseReader(); + usedCacheEntry = null; + } + } + // Close and release resources within a running query process. Since it runs under // driver state COMPILING, EXECUTING or INTERRUPT, it would not have race condition // with the releases probably running in the other closing thread. private int closeInProcess(boolean destroyed) { releaseDriverContext(); releasePlan(); + releaseCachedResult(); releaseFetchTask(); releaseResStream(); releaseContext(); @@ -2445,6 +2508,7 @@ public class Driver implements IDriver { return 0; } releasePlan(); + releaseCachedResult(); releaseFetchTask(); releaseResStream(); releaseContext(); http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/cache/results/CacheUsage.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/CacheUsage.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/CacheUsage.java new file mode 100644 index 0000000..08b791a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/CacheUsage.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.cache.results; + +import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry; +import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.QueryInfo; + +/** + * Helper class during semantic analysis that indicates if the query can use the cache, + * or if the results from the query can be added to the results cache. + */ +public class CacheUsage { + + public enum CacheStatus { + CACHE_NOT_USED, + QUERY_USING_CACHE, + CAN_CACHE_QUERY_RESULTS, + }; + + private CacheUsage.CacheStatus status; + private CacheEntry cacheEntry; + private QueryInfo queryInfo; + + public CacheUsage(CacheStatus status, CacheEntry cacheEntry) { + this.status = status; + this.cacheEntry = cacheEntry; + } + + public CacheUsage(CacheStatus status, QueryInfo queryInfo) { + this.status = status; + this.queryInfo = queryInfo; + } + + public CacheUsage.CacheStatus getStatus() { + return status; + } + + public void setStatus(CacheUsage.CacheStatus status) { + this.status = status; + } + + public CacheEntry getCacheEntry() { + return cacheEntry; + } + + public void setCacheEntry(CacheEntry cacheEntry) { + this.cacheEntry = cacheEntry; + } + + public QueryInfo getQueryInfo() { + return queryInfo; + } + + public void setQueryInfo(QueryInfo queryInfo) { + this.queryInfo = queryInfo; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java new file mode 100644 index 0000000..131127e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java @@ -0,0 +1,666 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.cache.results; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.hooks.Entity.Type; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo; +import org.apache.hadoop.hive.ql.parse.TableAccessInfo; +import org.apache.hadoop.hive.ql.plan.FetchWork; +import org.apache.hadoop.hive.ql.plan.HiveOperation; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A class to handle management and lookup of cached Hive query results. + */ +public final class QueryResultsCache { + + private static final Logger LOG = LoggerFactory.getLogger(QueryResultsCache.class); + + public static class LookupInfo { + private String queryText; + + public LookupInfo(String queryText) { + super(); + this.queryText = queryText; + } + + public String getQueryText() { + return queryText; + } + } + + public static class QueryInfo { + private LookupInfo lookupInfo; + private HiveOperation hiveOperation; + private List<FieldSchema> resultSchema; + private TableAccessInfo tableAccessInfo; + private ColumnAccessInfo columnAccessInfo; + private Set<ReadEntity> inputs; + + public QueryInfo( + LookupInfo lookupInfo, + HiveOperation hiveOperation, + List<FieldSchema> resultSchema, + TableAccessInfo tableAccessInfo, + ColumnAccessInfo columnAccessInfo, + Set<ReadEntity> inputs) { + this.lookupInfo = lookupInfo; + this.hiveOperation = hiveOperation; + this.resultSchema = resultSchema; + this.tableAccessInfo = tableAccessInfo; + this.columnAccessInfo = columnAccessInfo; + this.inputs = inputs; + } + + public LookupInfo getLookupInfo() { + return lookupInfo; + } + + public void setLookupInfo(LookupInfo lookupInfo) { + this.lookupInfo = lookupInfo; + } + + public HiveOperation getHiveOperation() { + return hiveOperation; + } + + public void setHiveOperation(HiveOperation hiveOperation) { + this.hiveOperation = hiveOperation; + } + + public List<FieldSchema> getResultSchema() { + return resultSchema; + } + + public void setResultSchema(List<FieldSchema> resultSchema) { + this.resultSchema = resultSchema; + } + + public TableAccessInfo getTableAccessInfo() { + return tableAccessInfo; + } + + public void setTableAccessInfo(TableAccessInfo tableAccessInfo) { + this.tableAccessInfo = tableAccessInfo; + } + + public ColumnAccessInfo getColumnAccessInfo() { + return columnAccessInfo; + } + + public void setColumnAccessInfo(ColumnAccessInfo columnAccessInfo) { + this.columnAccessInfo = columnAccessInfo; + } + + public Set<ReadEntity> getInputs() { + return inputs; + } + + public void setInputs(Set<ReadEntity> inputs) { + this.inputs = inputs; + } + } + + public static class CacheEntry { + private QueryInfo queryInfo; + private FetchWork fetchWork; + private Path cachedResultsPath; + + // Cache administration + private long createTime; + private long size; + private AtomicBoolean valid = new AtomicBoolean(false); + private AtomicInteger readers = new AtomicInteger(0); + private ScheduledFuture<?> invalidationFuture = null; + + public boolean isValid() { + return valid.get(); + } + + public void releaseReader() { + int readerCount = 0; + synchronized (this) { + readerCount = readers.decrementAndGet(); + } + LOG.debug("releaseReader: entry: {}, readerCount: {}", this, readerCount); + Preconditions.checkState(readerCount >= 0); + + cleanupIfNeeded(); + } + + public String toString() { + return "CacheEntry query: [" + getQueryInfo().getLookupInfo().getQueryText() + + "], location: " + cachedResultsPath + + ", size: " + size; + } + + public boolean addReader() { + boolean added = false; + int readerCount = 0; + synchronized (this) { + if (valid.get()) { + readerCount = readers.incrementAndGet(); + added = true; + } + } + Preconditions.checkState(readerCount > 0); + LOG.debug("addReader: entry: {}, readerCount: {}", this, readerCount); + return added; + } + + private int numReaders() { + return readers.get(); + } + + private void invalidate() { + boolean wasValid = setValidity(false); + + if (wasValid) { + LOG.info("Invalidated cache entry: {}", this); + + if (invalidationFuture != null) { + // The cache entry has just been invalidated, no need for the scheduled invalidation. + invalidationFuture.cancel(false); + } + cleanupIfNeeded(); + } + } + + /** + * Set the validity, returning the previous validity value. + * @param valid + * @return + */ + private boolean setValidity(boolean valid) { + synchronized(this) { + return this.valid.getAndSet(valid); + } + } + + private void cleanupIfNeeded() { + if (!isValid() && readers.get() <= 0) { + QueryResultsCache.cleanupEntry(this); + } + } + + private String getQueryText() { + return getQueryInfo().getLookupInfo().getQueryText(); + } + + public FetchWork getFetchWork() { + return fetchWork; + } + + public QueryInfo getQueryInfo() { + return queryInfo; + } + + public Path getCachedResultsPath() { + return cachedResultsPath; + } + } + + // Allow lookup by query string + private final Map<String, Set<CacheEntry>> queryMap = new HashMap<String, Set<CacheEntry>>(); + + // LRU. Could also implement LRU as a doubly linked list if CacheEntry keeps its node. + // Use synchronized map since even read actions cause the lru to get updated. + private final Map<CacheEntry, CacheEntry> lru = Collections.synchronizedMap( + new LinkedHashMap<CacheEntry, CacheEntry>(INITIAL_LRU_SIZE, LRU_LOAD_FACTOR, true)); + + private final HiveConf conf; + private Path cacheDirPath; + private long cacheSize = 0; + private long maxCacheSize; + private long maxEntrySize; + private long maxEntryLifetime; + private ReadWriteLock rwLock = new ReentrantReadWriteLock(); + + private QueryResultsCache(HiveConf configuration) throws IOException { + this.conf = configuration; + + // Set up cache directory + Path rootCacheDir = new Path(conf.getVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_DIRECTORY)); + LOG.info("Initializing query results cache at {}", rootCacheDir); + Utilities.ensurePathIsWritable(rootCacheDir, conf); + + String currentCacheDirName = "results-" + UUID.randomUUID().toString(); + cacheDirPath = new Path(rootCacheDir, currentCacheDirName); + FileSystem fs = cacheDirPath.getFileSystem(conf); + FsPermission fsPermission = new FsPermission("700"); + fs.mkdirs(cacheDirPath, fsPermission); + + // Results cache directory should be cleaned up at process termination. + fs.deleteOnExit(cacheDirPath); + + maxCacheSize = conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_SIZE); + maxEntrySize = conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_ENTRY_SIZE); + maxEntryLifetime = conf.getTimeVar( + HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_ENTRY_LIFETIME, + TimeUnit.MILLISECONDS); + + LOG.info("Query results cache: cacheDirectory {}, maxCacheSize {}, maxEntrySize {}, maxEntryLifetime {}", + cacheDirPath, maxCacheSize, maxEntrySize, maxEntryLifetime); + } + + private static final AtomicBoolean inited = new AtomicBoolean(false); + private static QueryResultsCache instance; + + public static void initialize(HiveConf conf) throws IOException { + if (!inited.getAndSet(true)) { + try { + instance = new QueryResultsCache(conf); + } catch (IOException err) { + inited.set(false); + throw err; + } + } + } + + public static QueryResultsCache getInstance() { + return instance; + } + + /** + * Check if the cache contains an entry for the requested LookupInfo. + * @param request + * @param addReader Should the reader count be incremented during the lookup. + * This will ensure the returned entry can be used after the lookup. + * If true, the caller will be responsible for decrementing the reader count + * using CacheEntry.releaseReader(). + * @return The cached result if there is a match in the cache, or null if no match is found. + */ + public CacheEntry lookup(LookupInfo request, boolean addReader) { + CacheEntry result = null; + + LOG.debug("QueryResultsCache lookup for query: {}", request.queryText); + + Lock readLock = rwLock.readLock(); + try { + readLock.lock(); + Set<CacheEntry> candidates = queryMap.get(request.queryText); + if (candidates != null) { + for (CacheEntry candidate : candidates) { + if (entryMatches(request, candidate)) { + result = candidate; + break; + } + } + + if (result != null) { + lru.get(result); // Update LRU + + if (!result.isValid()) { + // Entry is in the cache, but not valid. + // This can happen when the entry is first added, before the data has been moved + // to the results cache directory. We cannot use this entry yet. + result = null; + } else { + if (addReader) { + // Caller will need to be responsible for releasing the reader count. + result.addReader(); + } + } + } + } + } finally { + readLock.unlock(); + } + + LOG.debug("QueryResultsCache lookup result: {}", result); + + return result; + } + + /** + * Add an entry to the query results cache. + * Important: Adding the entry to the cache will increment the reader count for the cache entry. + * CacheEntry.releaseReader() should be called when the caller is done with the cache entry. + * + * @param queryInfo + * @param fetchWork + * @return The entry if added to the cache. null if the entry is not added. + */ + public CacheEntry addToCache(QueryInfo queryInfo, FetchWork fetchWork) { + + CacheEntry addedEntry = null; + boolean dataDirMoved = false; + Path queryResultsPath = null; + Path cachedResultsPath = null; + String queryText = queryInfo.getLookupInfo().getQueryText(); + + // Should we remove other candidate entries if they are equivalent to these query results? + try { + CacheEntry potentialEntry = new CacheEntry(); + potentialEntry.queryInfo = queryInfo; + queryResultsPath = fetchWork.getTblDir(); + FileSystem resultsFs = queryResultsPath.getFileSystem(conf); + ContentSummary cs = resultsFs.getContentSummary(queryResultsPath); + potentialEntry.size = cs.getLength(); + + Lock writeLock = rwLock.writeLock(); + try { + writeLock.lock(); + + if (!shouldEntryBeAdded(potentialEntry)) { + return null; + } + if (!clearSpaceForCacheEntry(potentialEntry)) { + return null; + } + + LOG.info("Adding cache entry for query '{}'", queryText); + + // Add the entry to the cache structures while under write lock. Do not mark the entry + // as valid yet, since the query results have not yet been moved to the cache directory. + // Do the data move after unlocking since it might take time. + // Mark the entry as valid once the data has been moved to the cache directory. + Set<CacheEntry> entriesForQuery = queryMap.get(queryText); + if (entriesForQuery == null) { + entriesForQuery = new HashSet<CacheEntry>(); + queryMap.put(queryText, entriesForQuery); + } + entriesForQuery.add(potentialEntry); + lru.put(potentialEntry, potentialEntry); + cacheSize += potentialEntry.size; + addedEntry = potentialEntry; + + } finally { + writeLock.unlock(); + } + + // Move the query results to the query cache directory. + cachedResultsPath = moveResultsToCacheDirectory(queryResultsPath); + dataDirMoved = true; + LOG.info("Moved query results from {} to {} (size {}) for query '{}'", + queryResultsPath, cachedResultsPath, cs.getLength(), queryText); + + // Create a new FetchWork to reference the new cache location. + FetchWork fetchWorkForCache = + new FetchWork(cachedResultsPath, fetchWork.getTblDesc(), fetchWork.getLimit()); + fetchWorkForCache.setCachedResult(true); + addedEntry.fetchWork = fetchWorkForCache; + addedEntry.cachedResultsPath = cachedResultsPath; + addedEntry.createTime = System.currentTimeMillis(); + addedEntry.setValidity(true); + + // Mark this entry as being in use. Caller will need to release later. + addedEntry.addReader(); + + scheduleEntryInvalidation(addedEntry); + } catch (Exception err) { + LOG.error("Failed to create cache entry for query results for query: " + queryText, err); + + if (addedEntry != null) { + // If the entry was already added to the cache when we hit error, clean up properly. + + if (dataDirMoved) { + // If data was moved from original location to cache directory, we need to move it back! + LOG.info("Restoring query results from {} back to {}", cachedResultsPath, queryResultsPath); + try { + FileSystem fs = cachedResultsPath.getFileSystem(conf); + fs.rename(cachedResultsPath, queryResultsPath); + addedEntry.cachedResultsPath = null; + } catch (Exception err2) { + String errMsg = "Failed cleanup during failed attempt to cache query: " + queryText; + LOG.error(errMsg); + throw new RuntimeException(errMsg); + } + } + + addedEntry.invalidate(); + if (addedEntry.numReaders() > 0) { + addedEntry.releaseReader(); + } + } + + return null; + } + + return addedEntry; + } + + public void clear() { + Lock writeLock = rwLock.writeLock(); + try { + writeLock.lock(); + LOG.info("Clearing the results cache"); + for (CacheEntry entry : lru.keySet().toArray(EMPTY_CACHEENTRY_ARRAY)) { + try { + removeEntry(entry); + } catch (Exception err) { + LOG.error("Error removing cache entry " + entry, err); + } + } + } finally { + writeLock.unlock(); + } + } + + public long getSize() { + Lock readLock = rwLock.readLock(); + try { + readLock.lock(); + return cacheSize; + } finally { + readLock.unlock(); + } + } + + private static final int INITIAL_LRU_SIZE = 16; + private static final float LRU_LOAD_FACTOR = 0.75f; + private static final CacheEntry[] EMPTY_CACHEENTRY_ARRAY = {}; + + private boolean entryMatches(LookupInfo lookupInfo, CacheEntry entry) { + QueryInfo queryInfo = entry.getQueryInfo(); + for (ReadEntity readEntity : queryInfo.getInputs()) { + // Check that the tables used do not resolve to temp tables. + if (readEntity.getType() == Type.TABLE) { + Table tableUsed = readEntity.getTable(); + Map<String, Table> tempTables = + SessionHiveMetaStoreClient.getTempTablesForDatabase(tableUsed.getDbName()); + if (tempTables != null && tempTables.containsKey(tableUsed.getTableName())) { + LOG.info("{} resolves to a temporary table in the current session. This query cannot use the cache.", + tableUsed.getTableName()); + return false; + } + } + } + + return true; + } + + private void removeEntry(CacheEntry entry) { + entry.invalidate(); + removeFromLookup(entry); + lru.remove(entry); + // Should the cache size be updated here, or after the result data has actually been deleted? + cacheSize -= entry.size; + } + + private void removeFromLookup(CacheEntry entry) { + String queryString = entry.getQueryText(); + Set<CacheEntry> entries = queryMap.get(queryString); + Preconditions.checkState(entries != null); + boolean deleted = entries.remove(entry); + Preconditions.checkState(deleted); + if (entries.isEmpty()) { + queryMap.remove(queryString); + } + } + + private void calculateEntrySize(CacheEntry entry, FetchWork fetchWork) throws IOException { + Path queryResultsPath = fetchWork.getTblDir(); + FileSystem resultsFs = queryResultsPath.getFileSystem(conf); + ContentSummary cs = resultsFs.getContentSummary(queryResultsPath); + entry.size = cs.getLength(); + } + + /** + * Determines if the cache entry should be added to the results cache. + */ + private boolean shouldEntryBeAdded(CacheEntry entry) { + // Assumes the cache lock has already been taken. + if (maxEntrySize >= 0 && entry.size > maxEntrySize) { + LOG.debug("Cache entry size {} larger than max entry size ({})", entry.size, maxEntrySize); + return false; + } + + return true; + } + + private Path moveResultsToCacheDirectory(Path queryResultsPath) throws IOException { + String dirName = UUID.randomUUID().toString(); + Path cachedResultsPath = new Path(cacheDirPath, dirName); + FileSystem fs = cachedResultsPath.getFileSystem(conf); + fs.rename(queryResultsPath, cachedResultsPath); + return cachedResultsPath; + } + + private boolean hasSpaceForCacheEntry(CacheEntry entry) { + if (maxCacheSize >= 0) { + return (cacheSize + entry.size) <= maxCacheSize; + } + // Negative max cache size means unbounded. + return true; + } + + private boolean clearSpaceForCacheEntry(CacheEntry entry) { + if (hasSpaceForCacheEntry(entry)) { + return true; + } + + LOG.info("Clearing space for cache entry for query: [{}] with size {}", + entry.getQueryText(), entry.size); + + // Entries should be in LRU order in the keyset iterator. + CacheEntry[] entries = lru.keySet().toArray(EMPTY_CACHEENTRY_ARRAY); + for (CacheEntry removalCandidate : entries) { + if (!removalCandidate.isValid()) { + // Likely an entry which is still getting its results moved to the cache directory. + continue; + } + // Only delete the entry if it has no readers. + if (!(removalCandidate.numReaders() > 0)) { + LOG.info("Removing entry: {}", removalCandidate); + removeEntry(removalCandidate); + if (hasSpaceForCacheEntry(entry)) { + return true; + } + } + } + + LOG.info("Could not free enough space for cache entry for query: [{}] withe size {}", + entry.getQueryText(), entry.size); + return false; + } + + + @VisibleForTesting + public static void cleanupInstance() { + // This should only ever be called in testing scenarios. + // There should not be any other users of the cache or its entries or this may mess up cleanup. + if (inited.get()) { + getInstance().clear(); + instance = null; + inited.set(false); + } + } + + private static ScheduledExecutorService invalidationExecutor = null; + private static ExecutorService deletionExecutor = null; + + static { + ThreadFactory threadFactory = + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("QueryResultsCache %d").build(); + invalidationExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory); + deletionExecutor = Executors.newSingleThreadExecutor(threadFactory); + } + + private void scheduleEntryInvalidation(final CacheEntry entry) { + if (maxEntryLifetime >= 0) { + // Schedule task to invalidate cache entry. + ScheduledFuture<?> future = invalidationExecutor.schedule(new Runnable() { + @Override + public void run() { + entry.invalidate(); + } + }, maxEntryLifetime, TimeUnit.MILLISECONDS); + entry.invalidationFuture = future; + } + } + + private static void cleanupEntry(final CacheEntry entry) { + Preconditions.checkState(!entry.isValid()); + + if (entry.cachedResultsPath != null) { + deletionExecutor.execute(new Runnable() { + @Override + public void run() { + Path path = entry.cachedResultsPath; + LOG.info("Cache directory cleanup: deleting {}", path); + try { + FileSystem fs = entry.cachedResultsPath.getFileSystem(getInstance().conf); + fs.delete(entry.cachedResultsPath, true); + } catch (Exception err) { + LOG.error("Error while trying to delete " + path, err); + } + } + }); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 675ca12..8f44c94 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -2626,6 +2626,10 @@ public final class Utilities { return getTasks(tasks, new TaskFilterFunction<>(ExecDriver.class)); } + public static int getNumClusterJobs(List<Task<? extends Serializable>> tasks) { + return getMRTasks(tasks).size() + getTezTasks(tasks).size() + getSparkTasks(tasks).size(); + } + static class TaskFilterFunction<T> implements DAGTraversal.Function { private Set<Task<? extends Serializable>> visited = new HashSet<>(); private Class<T> requiredType; @@ -4445,4 +4449,33 @@ public final class Utilities { return AcidUtils.ORIGINAL_PATTERN.matcher(path.getName()).matches() || AcidUtils.ORIGINAL_PATTERN_COPY.matcher(path.getName()).matches(); } + + /** + * Checks if path passed in exists and has writable permissions. + * The path will be created if it does not exist. + * @param rootHDFSDirPath + * @param conf + */ + public static void ensurePathIsWritable(Path rootHDFSDirPath, HiveConf conf) throws IOException { + FsPermission writableHDFSDirPermission = new FsPermission((short)00733); + FileSystem fs = rootHDFSDirPath.getFileSystem(conf); + if (!fs.exists(rootHDFSDirPath)) { + Utilities.createDirsWithPermission(conf, rootHDFSDirPath, writableHDFSDirPermission, true); + } + FsPermission currentHDFSDirPermission = fs.getFileStatus(rootHDFSDirPath).getPermission(); + if (rootHDFSDirPath != null && rootHDFSDirPath.toUri() != null) { + String schema = rootHDFSDirPath.toUri().getScheme(); + LOG.debug("HDFS dir: " + rootHDFSDirPath + " with schema " + schema + ", permission: " + + currentHDFSDirPermission); + } else { + LOG.debug( + "HDFS dir: " + rootHDFSDirPath + ", permission: " + currentHDFSDirPermission); + } + // If the root HDFS scratch dir already exists, make sure it is writeable. + if (!((currentHDFSDirPermission.toShort() & writableHDFSDirPermission + .toShort()) == writableHDFSDirPermission.toShort())) { + throw new RuntimeException("The dir: " + rootHDFSDirPath + + " on HDFS should be writable. Current permissions are: " + currentHDFSDirPermission); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java index f0dd167..1f8a48c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java @@ -967,6 +967,44 @@ public class HiveCalciteUtil { return AggregateCall.create(aggFunction, false, argList, -1, aggFnRetType, null); } + /** + * Is the expression usable for query materialization. + */ + public static boolean isMaterializable(RexNode expr) { + return (checkMaterializable(expr) == null); + } + + /** + * Check if the expression is usable for query materialization, returning the first failing expression. + */ + public static RexCall checkMaterializable(RexNode expr) { + boolean deterministic = true; + RexCall failingCall = null; + + if (expr == null) { + return null; + } + + RexVisitor<Void> visitor = new RexVisitorImpl<Void>(true) { + @Override + public Void visitCall(org.apache.calcite.rex.RexCall call) { + // non-deterministic functions as well as runtime constants are not materializable. + if (!call.getOperator().isDeterministic() || call.getOperator().isDynamicFunction()) { + throw new Util.FoundOne(call); + } + return super.visitCall(call); + } + }; + + try { + expr.accept(visitor); + } catch (Util.FoundOne e) { + failingCall = (RexCall) e.getNode(); + } + + return failingCall; + } + public static HiveTableFunctionScan createUDTFForSetOp(RelOptCluster cluster, RelNode input) throws SemanticException { RelTraitSet traitSet = TraitsUtil.getDefaultTraitSet(cluster); http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOpMaterializationValidator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOpMaterializationValidator.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOpMaterializationValidator.java new file mode 100644 index 0000000..8c1bcb3 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOpMaterializationValidator.java @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.calcite; + +import java.util.List; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.TableFunctionScan; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.logical.LogicalAggregate; +import org.apache.calcite.rel.logical.LogicalCorrelate; +import org.apache.calcite.rel.logical.LogicalExchange; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rel.logical.LogicalIntersect; +import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.rel.logical.LogicalMatch; +import org.apache.calcite.rel.logical.LogicalMinus; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.logical.LogicalSort; +import org.apache.calcite.rel.logical.LogicalUnion; +import org.apache.calcite.rel.logical.LogicalValues; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.Util; + +import org.apache.hadoop.hive.metastore.TableType; + +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveExcept; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveIntersect; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSemiJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortLimit; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Checks the query plan for conditions that would make the plan unsuitable for + * materialized views or query caching: + * - References to temporary or external tables + * - References to non-determinisitc functions. + */ +public class HiveRelOpMaterializationValidator extends HiveRelShuttleImpl { + static final Logger LOG = LoggerFactory.getLogger(HiveRelOpMaterializationValidator.class); + + protected String invalidMaterializationReason; + + public void validateQueryMaterialization(RelNode relNode) { + try { + relNode.accept(this); + } catch (Util.FoundOne e) { + // Can ignore - the check failed. + } + } + + @Override + public RelNode visit(TableScan scan) { + if (scan instanceof HiveTableScan) { + HiveTableScan hiveScan = (HiveTableScan) scan; + RelOptHiveTable relOptHiveTable = (RelOptHiveTable) hiveScan.getTable(); + Table tab = relOptHiveTable.getHiveTableMD(); + if (tab.isTemporary()) { + fail(tab.getTableName() + " is a temporary table"); + } + TableType tt = tab.getTableType(); + if (tab.getTableType() == TableType.EXTERNAL_TABLE) { + fail(tab.getFullyQualifiedName() + " is an external table"); + } + return scan; + } + + // TableScan of a non-Hive table - don't support for materializations. + fail(scan.getTable().getQualifiedName() + " is a table scan of a non-Hive table."); + return scan; + } + + @Override + public RelNode visit(HiveProject project) { + for (RexNode expr : project.getProjects()) { + checkExpr(expr); + } + return super.visit(project); + } + + @Override + public RelNode visit(HiveFilter filter) { + checkExpr(filter.getCondition()); + return super.visit(filter); + } + + @Override + public RelNode visit(HiveJoin join) { + checkExpr(join.getCondition()); + return super.visit(join); + } + + @Override + public RelNode visit(HiveAggregate aggregate) { + // Is there anything to check here? + return super.visit(aggregate); + } + + @Override + public RelNode visit(RelNode node) { + // There are several Hive RelNode types which do not have their own visit() method + // defined in the HiveRelShuttle interface, which need to be handled appropriately here. + // Per jcamachorodriguez we should not encounter HiveMultiJoin/HiveSortExchange + // during these checks, so no need to add those here. + if (node instanceof HiveUnion) { + return visit((HiveUnion) node); + } else if (node instanceof HiveSortLimit) { + return visit((HiveSortLimit) node); + } else if (node instanceof HiveSemiJoin) { + return visit((HiveSemiJoin) node); + } else if (node instanceof HiveExcept) { + return visit((HiveExcept) node); + } else if (node instanceof HiveIntersect) { + return visit((HiveIntersect) node); + } + + // Fall-back for an unexpected RelNode type + return fail(node); + } + + @Override + public RelNode visit(TableFunctionScan scan) { + checkExpr(scan.getCall()); + return super.visit(scan); + } + + @Override + public RelNode visit(LogicalValues values) { + // Not expected to be encountered for Hive - fail + return fail(values); + } + + @Override + public RelNode visit(LogicalFilter filter) { + // Not expected to be encountered for Hive - fail + return fail(filter); + } + + @Override + public RelNode visit(LogicalProject project) { + // Not expected to be encountered for Hive - fail + return fail(project); + } + + @Override + public RelNode visit(LogicalJoin join) { + // Not expected to be encountered for Hive - fail + return fail(join); + } + + @Override + public RelNode visit(LogicalCorrelate correlate) { + // Not expected to be encountered for Hive - fail + return fail(correlate); + } + + @Override + public RelNode visit(LogicalUnion union) { + // Not expected to be encountered for Hive - fail + return fail(union); + } + + @Override + public RelNode visit(LogicalIntersect intersect) { + // Not expected to be encountered for Hive - fail + return fail(intersect); + } + + @Override + public RelNode visit(LogicalMinus minus) { + // Not expected to be encountered for Hive - fail + return fail(minus); + } + + @Override + public RelNode visit(LogicalAggregate aggregate) { + // Not expected to be encountered for Hive - fail + return fail(aggregate); + } + + @Override + public RelNode visit(LogicalMatch match) { + // Not expected to be encountered for Hive - fail + return fail(match); + } + + @Override + public RelNode visit(LogicalSort sort) { + // Not expected to be encountered for Hive - fail + return fail(sort); + } + + @Override + public RelNode visit(LogicalExchange exchange) { + // Not expected to be encountered for Hive - fail + return fail(exchange); + } + + // Note: Not currently part of the HiveRelNode interface + private RelNode visit(HiveUnion union) { + return visitChildren(union); + } + + // Note: Not currently part of the HiveRelNode interface + private RelNode visit(HiveSortLimit sort) { + checkExpr(sort.getFetchExpr()); + checkExpr(sort.getOffsetExpr()); + return visitChildren(sort); + } + + // Note: Not currently part of the HiveRelNode interface + private RelNode visit(HiveSemiJoin semiJoin) { + checkExpr(semiJoin.getCondition()); + checkExpr(semiJoin.getJoinFilter()); + return visitChildren(semiJoin); + } + + // Note: Not currently part of the HiveRelNode interface + private RelNode visit(HiveExcept except) { + return visitChildren(except); + } + + // Note: Not currently part of the HiveRelNode interface + private RelNode visit(HiveIntersect intersect) { + return visitChildren(intersect); + } + + private void fail(String reason) { + setInvalidMaterializationReason(reason); + throw Util.FoundOne.NULL; + } + + private RelNode fail(RelNode node) { + setInvalidMaterializationReason("Unsupported RelNode type " + node.getRelTypeName() + + " encountered in the query plan"); + throw Util.FoundOne.NULL; + } + + private void checkExpr(RexNode expr) { + RexCall invalidCall = HiveCalciteUtil.checkMaterializable(expr); + if (invalidCall != null) { + fail(invalidCall.getOperator().getName() + " is not a deterministic function"); + } + } + + public String getInvalidMaterializationReason() { + return invalidMaterializationReason; + } + + public void setInvalidMaterializationReason(String invalidMaterializationReason) { + this.invalidMaterializationReason = invalidMaterializationReason; + } + + public boolean isValidMaterialization() { + return invalidMaterializationReason == null; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index e553a81..8a1bfd2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -55,6 +55,8 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryProperties; import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.cache.results.CacheUsage; +import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -148,6 +150,9 @@ public abstract class BaseSemanticAnalyzer { protected LineageInfo linfo; protected TableAccessInfo tableAccessInfo; protected ColumnAccessInfo columnAccessInfo; + + protected CacheUsage cacheUsage; + /** * Columns accessed by updates */ @@ -1945,4 +1950,12 @@ public abstract class BaseSemanticAnalyzer { } return SessionState.get().getTxnMgr(); } + + public CacheUsage getCacheUsage() { + return cacheUsage; + } + + public void setCacheUsage(CacheUsage cacheUsage) { + this.cacheUsage = cacheUsage; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 85a1f34..cf2bc13 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -154,6 +154,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.HiveConfPlannerContext; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveDefaultRelMetadataProvider; import org.apache.hadoop.hive.ql.optimizer.calcite.HivePlannerContext; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelOpMaterializationValidator; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRexExecutorImpl; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveTypeSystemImpl; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; @@ -1442,6 +1443,16 @@ public class CalcitePlanner extends SemanticAnalyzer { } perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Plan generation"); + // Validate query materialization (materialized views, query results caching. + // This check needs to occur before constant folding, which may remove some + // function calls from the query plan. + HiveRelOpMaterializationValidator matValidator = new HiveRelOpMaterializationValidator(); + matValidator.validateQueryMaterialization(calciteGenPlan); + if (!matValidator.isValidMaterialization()) { + String reason = matValidator.getInvalidMaterializationReason(); + setInvalidQueryMaterializationReason(reason); + } + // Create executor RexExecutor executorProvider = new HiveRexExecutorImpl(optCluster); calciteGenPlan.getCluster().getPlanner().setExecutor(executorProvider); http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java index ae2ec3d..5789ee0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java @@ -675,6 +675,9 @@ public class QBParseInfo { return insertOverwriteTables; } + public boolean hasInsertTables() { + return this.insertIntoTables.size() > 0 || this.insertOverwriteTables.size() > 0; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index e6ee968..8e587f1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -84,10 +84,13 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryProperties; import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.cache.results.CacheUsage; +import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.ArchiveUtils; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; +import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.FunctionInfo; @@ -361,6 +364,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { HiveParser.TOK_ORDERBY, HiveParser.TOK_WINDOWSPEC, HiveParser.TOK_CLUSTERBY, HiveParser.TOK_DISTRIBUTEBY, HiveParser.TOK_SORTBY); + private String invalidQueryMaterializationReason; + static class Phase1Ctx { String dest; int nextNum; @@ -11211,6 +11216,76 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { return getTableObjectByName(tableName, true); } + private static void walkASTAndQualifyNames(ASTNode ast, + Set<String> cteAlias, Context ctx, Hive db, Set<Integer> ignoredTokens, UnparseTranslator unparseTranslator) { + Queue<Node> queue = new LinkedList<>(); + queue.add(ast); + while (!queue.isEmpty()) { + ASTNode astNode = (ASTNode) queue.poll(); + if (astNode.getToken().getType() == HiveParser.TOK_TABNAME) { + // Check if this is table name is qualified or not + String tabIdName = getUnescapedName(astNode).toLowerCase(); + // if alias to CTE contains the table name, we do not do the translation because + // cte is actually a subquery. + if (!cteAlias.contains(tabIdName)) { + unparseTranslator.addTableNameTranslation(astNode, SessionState.get().getCurrentDatabase()); + } + } + + if (astNode.getChildCount() > 0 && !ignoredTokens.contains(astNode.getToken().getType())) { + for (Node child : astNode.getChildren()) { + queue.offer(child); + } + } + } + } + + // Walk through the AST. + // Replace all TOK_TABREF with fully qualified table name, if it is not already fully qualified. + protected String rewriteQueryWithQualifiedNames(ASTNode ast, TokenRewriteStream tokenRewriteStream) + throws SemanticException { + UnparseTranslator unparseTranslator = new UnparseTranslator(conf); + unparseTranslator.enable(); + + // 1. collect information about CTE if there is any. + // The base table of CTE should be masked. + // The CTE itself should not be masked in the references in the following main query. + Set<String> cteAlias = new HashSet<>(); + if (ast.getChildCount() > 0 + && HiveParser.TOK_CTE == ((ASTNode) ast.getChild(0)).getToken().getType()) { + // the structure inside CTE is like this + // TOK_CTE + // TOK_SUBQUERY + // sq1 (may refer to sq2) + // ... + // TOK_SUBQUERY + // sq2 + ASTNode cte = (ASTNode) ast.getChild(0); + // we start from sq2, end up with sq1. + for (int index = cte.getChildCount() - 1; index >= 0; index--) { + ASTNode subq = (ASTNode) cte.getChild(index); + String alias = unescapeIdentifier(subq.getChild(1).getText()); + if (cteAlias.contains(alias)) { + throw new SemanticException("Duplicate definition of " + alias); + } else { + cteAlias.add(alias); + walkASTAndQualifyNames(ast, cteAlias, ctx, db, ignoredTokens, unparseTranslator); + } + } + // walk the other part of ast + for (int index = 1; index < ast.getChildCount(); index++) { + walkASTAndQualifyNames(ast, cteAlias, ctx, db, ignoredTokens, unparseTranslator); + } + } else { // there is no CTE, walk the whole AST + walkASTAndQualifyNames(ast, cteAlias, ctx, db, ignoredTokens, unparseTranslator); + } + + unparseTranslator.applyTranslations(tokenRewriteStream); + String rewrittenQuery = tokenRewriteStream.toString( + ast.getTokenStartIndex(), ast.getTokenStopIndex()); + return rewrittenQuery; + } + private static void walkASTMarkTABREF(TableMask tableMask, ASTNode ast, Set<String> cteAlias, Context ctx, Hive db, Map<String, Table> tabNameToTabObject, Set<Integer> ignoredTokens) throws SemanticException { @@ -11549,6 +11624,20 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } } + // Check query results cache. + // If no masking/filtering required, then we can check the cache now, before + // generating the operator tree and going through CBO. + // Otherwise we have to wait until after the masking/filtering step. + boolean isCacheEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED); + QueryResultsCache.LookupInfo lookupInfo = null; + boolean needsTransform = needsTransform(); + if (isCacheEnabled && !needsTransform && queryTypeCanUseCache()) { + lookupInfo = createLookupInfoForQuery(ast); + if (checkResultsCache(lookupInfo)) { + return; + } + } + // 2. Gen OP Tree from resolved Parse Tree Operator sinkOp = genOPTree(ast, plannerCtx); @@ -11571,6 +11660,16 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } } + // Check query results cache + // In the case that row or column masking/filtering was required, the cache must be checked + // here, after applying the masking/filtering rewrite rules to the AST. + if (isCacheEnabled && needsTransform && queryTypeCanUseCache()) { + lookupInfo = createLookupInfoForQuery(ast); + if (checkResultsCache(lookupInfo)) { + return; + } + } + // 3. Deduce Resultset Schema if (createVwDesc != null && !this.ctx.isCboSucceeded()) { resultSchema = convertRowSchemaToViewSchema(opParseCtx.get(sinkOp).getRowResolver()); @@ -11705,6 +11804,15 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { putAccessedColumnsToReadEntity(inputs, columnAccessInfo); } + if (isCacheEnabled && lookupInfo != null) { + if (queryCanBeCached()) { + QueryResultsCache.QueryInfo queryInfo = createCacheQueryInfoForQuery(lookupInfo); + + // Specify that the results of this query can be cached. + setCacheUsage(new CacheUsage( + CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS, queryInfo)); + } + } } private void putAccessedColumnsToReadEntity(HashSet<ReadEntity> inputs, ColumnAccessInfo columnAccessInfo) { @@ -13913,6 +14021,158 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { this.loadFileWork = loadFileWork; } + /** + * Generate the query string for this query (with fully resolved table references). + * @return The query string with resolved references. NULL if an error occurred. + */ + private String getQueryStringForCache(ASTNode ast) { + // Use the UnparseTranslator to resolve unqualified table names. + String queryString = ctx.getTokenRewriteStream().toString(ast.getTokenStartIndex(), ast.getTokenStopIndex()); + + // Re-using the TokenRewriteStream map for views so we do not overwrite the current TokenRewriteStream + String rewriteStreamName = "__qualified_query_string__"; + ASTNode astNode; + try { + astNode = ParseUtils.parse(queryString, ctx, rewriteStreamName); + TokenRewriteStream tokenRewriteStream = ctx.getViewTokenRewriteStream(rewriteStreamName); + String fullyQualifiedQuery = rewriteQueryWithQualifiedNames(astNode, tokenRewriteStream); + return fullyQualifiedQuery; + } catch (Exception err) { + LOG.error("Unexpected error while reparsing the query string [" + queryString + "]", err); + // Don't fail the query - just return null (caller should skip cache lookup). + return null; + } + } + + private QueryResultsCache.LookupInfo createLookupInfoForQuery(ASTNode astNode) { + QueryResultsCache.LookupInfo lookupInfo = null; + String queryString = getQueryStringForCache(astNode); + if (queryString != null) { + lookupInfo = new QueryResultsCache.LookupInfo(queryString); + } + return lookupInfo; + } + + /** + * Set the query plan to use cache entry passed in to return the query results. + * @param cacheEntry The results cache entry that will be used to resolve the query. + */ + private void useCachedResult(QueryResultsCache.CacheEntry cacheEntry) { + // Change query FetchTask to use new location specified in results cache. + FetchTask fetchTask = (FetchTask) TaskFactory.get(cacheEntry.getFetchWork(), conf); + setFetchTask(fetchTask); + + queryState.setCommandType(cacheEntry.getQueryInfo().getHiveOperation()); + resultSchema = cacheEntry.getQueryInfo().getResultSchema(); + setTableAccessInfo(cacheEntry.getQueryInfo().getTableAccessInfo()); + setColumnAccessInfo(cacheEntry.getQueryInfo().getColumnAccessInfo()); + inputs.addAll(cacheEntry.getQueryInfo().getInputs()); + + // Indicate that the query will use a cached result. + setCacheUsage(new CacheUsage( + CacheUsage.CacheStatus.QUERY_USING_CACHE, cacheEntry)); + } + + private QueryResultsCache.QueryInfo createCacheQueryInfoForQuery(QueryResultsCache.LookupInfo lookupInfo) { + return new QueryResultsCache.QueryInfo(lookupInfo, queryState.getHiveOperation(), + resultSchema, getTableAccessInfo(), getColumnAccessInfo(), inputs); + } + + /** + * Some initial checks for a query to see if we can look this query up in the results cache. + */ + private boolean queryTypeCanUseCache() { + if (this instanceof ColumnStatsSemanticAnalyzer) { + // Column stats generates "select compute_stats() .." queries. + // Disable caching for these. + return false; + } + + if (queryState.getHiveOperation() != HiveOperation.QUERY) { + return false; + } + + if (qb.getParseInfo().isAnalyzeCommand()) { + return false; + } + + if (qb.getParseInfo().hasInsertTables()) { + return false; + } + + return true; + } + + private boolean needsTransform() { + return SessionState.get().getAuthorizerV2() != null && + SessionState.get().getAuthorizerV2().needTransform(); + } + + /** + * Called after a query plan has been generated, to determine if the results of this query + * can be added to the results cache. + */ + private boolean queryCanBeCached() { + if (!queryTypeCanUseCache()) { + LOG.info("Not eligible for results caching - wrong query type"); + return false; + } + + // Query should have a fetch task. + if (getFetchTask() == null) { + LOG.info("Not eligible for results caching - no fetch task"); + return false; + } + + // At least one mr/tez/spark job + if (Utilities.getNumClusterJobs(getRootTasks()) == 0) { + LOG.info("Not eligible for results caching - no mr/tez/spark jobs"); + return false; + } + + // The query materialization validation check only occurs in CBO. Thus only cache results if CBO was used. + if (!ctx.isCboSucceeded()) { + LOG.info("Caching of query results is disabled if CBO was not run."); + return false; + } + + if (!isValidQueryMaterialization()) { + LOG.info("Not eligible for results caching - {}", getInvalidQueryMaterializationReason()); + return false; + } + + return true; + } + + /** + * Check the query results cache to see if the query represented by the lookupInfo can be + * answered using the results cache. If the cache contains a suitable entry, the semantic analyzer + * will be configured to use the found cache entry to anwer the query. + */ + private boolean checkResultsCache(QueryResultsCache.LookupInfo lookupInfo) { + if (lookupInfo == null) { + return false; + } + try { + // In case this has not been initialized elsewhere. + QueryResultsCache.initialize(conf); + } catch (Exception err) { + throw new IllegalStateException(err); + } + // Don't increment the reader count for explain queries. + boolean isExplainQuery = (ctx.getExplainConfig() != null); + QueryResultsCache.CacheEntry cacheEntry = + QueryResultsCache.getInstance().lookup(lookupInfo, !isExplainQuery); + if (cacheEntry != null) { + // Use the cache rather than full query execution. + useCachedResult(cacheEntry); + + // At this point the caller should return from semantic analysis. + return true; + } + return false; + } + private static final class ColsAndTypes { public ColsAndTypes(String cols, String colTypes) { this.cols = cols; @@ -13921,4 +14181,17 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { public String cols; public String colTypes; } + + public String getInvalidQueryMaterializationReason() { + return invalidQueryMaterializationReason; + } + + public void setInvalidQueryMaterializationReason( + String invalidQueryMaterializationReason) { + this.invalidQueryMaterializationReason = invalidQueryMaterializationReason; + } + + public boolean isValidQueryMaterialization() { + return (invalidQueryMaterializationReason == null); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java index 7243dc7..1f139c8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java @@ -75,6 +75,11 @@ public class FetchWork implements Serializable { */ private boolean isUsingThriftJDBCBinarySerDe = false; + /** + * Whether this FetchWork is returning a cached query result + */ + private boolean isCachedResult = false; + public boolean isHiveServerQuery() { return isHiveServerQuery; } @@ -364,4 +369,12 @@ public class FetchWork implements Serializable { } return new FetchExplainVectorization(this); } + @Explain(displayName = "Cached Query Result", displayOnlyOnTrue = true, explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public boolean isCachedResult() { + return isCachedResult; + } + + public void setCachedResult(boolean isCachedResult) { + this.isCachedResult = isCachedResult; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 3946d4a..dfc2dfa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -725,27 +725,7 @@ public class SessionState { */ private Path createRootHDFSDir(HiveConf conf) throws IOException { Path rootHDFSDirPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR)); - FsPermission writableHDFSDirPermission = new FsPermission((short)00733); - FileSystem fs = rootHDFSDirPath.getFileSystem(conf); - if (!fs.exists(rootHDFSDirPath)) { - Utilities.createDirsWithPermission(conf, rootHDFSDirPath, writableHDFSDirPermission, true); - } - FsPermission currentHDFSDirPermission = fs.getFileStatus(rootHDFSDirPath).getPermission(); - if (rootHDFSDirPath != null && rootHDFSDirPath.toUri() != null) { - String schema = rootHDFSDirPath.toUri().getScheme(); - LOG.debug( - "HDFS root scratch dir: " + rootHDFSDirPath + " with schema " + schema + ", permission: " + - currentHDFSDirPermission); - } else { - LOG.debug( - "HDFS root scratch dir: " + rootHDFSDirPath + ", permission: " + currentHDFSDirPermission); - } - // If the root HDFS scratch dir already exists, make sure it is writeable. - if (!((currentHDFSDirPermission.toShort() & writableHDFSDirPermission - .toShort()) == writableHDFSDirPermission.toShort())) { - throw new RuntimeException("The root scratch dir: " + rootHDFSDirPath - + " on HDFS should be writable. Current permissions are: " + currentHDFSDirPermission); - } + Utilities.ensurePathIsWritable(rootHDFSDirPath, conf); return rootHDFSDirPath; } http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/test/queries/clientpositive/results_cache_1.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/results_cache_1.q b/ql/src/test/queries/clientpositive/results_cache_1.q new file mode 100644 index 0000000..4aea60e --- /dev/null +++ b/ql/src/test/queries/clientpositive/results_cache_1.q @@ -0,0 +1,96 @@ + +set hive.query.results.cache.enabled=true; + +explain +select count(*) from src a join src b on (a.key = b.key); +select count(*) from src a join src b on (a.key = b.key); + +set test.comment="Cache should be used for this query"; +set test.comment; +explain +select count(*) from src a join src b on (a.key = b.key); +select count(*) from src a join src b on (a.key = b.key); + +set hive.query.results.cache.enabled=false; +set test.comment="Cache is disabled, should not be used here."; +set test.comment; +explain +select count(*) from src a join src b on (a.key = b.key); + +create database db1; +use db1; +create table src as select key, value from default.src; + +set hive.query.results.cache.enabled=true; +set test.comment="Same query string, but different current database. Cache should not be used since unqualified tablenames resolve to different tables"; +set test.comment; +explain +select count(*) from src a join src b on (a.key = b.key); + +use default; + +-- Union +select * from src where key = 0 +union all +select * from src where key = 2; + +set test.comment="Union all. Cache should be used now"; +set test.comment; +explain +select * from src where key = 0 +union all +select * from src where key = 2; + +select * from src where key = 0 +union all +select * from src where key = 2; + + +-- CTE +with q1 as ( select distinct key from q2 ), +q2 as ( select key, value from src where key < 10 ) +select * from q1 a, q1 b where a.key = b.key; + +set test.comment="CTE. Cache should be used now"; +set test.comment; +explain +with q1 as ( select distinct key from q2 ), +q2 as ( select key, value from src where key < 10 ) +select * from q1 a, q1 b where a.key = b.key; + +with q1 as ( select distinct key from q2 ), +q2 as ( select key, value from src where key < 10 ) +select * from q1 a, q1 b where a.key = b.key; + +-- Intersect/Except +with q1 as ( select distinct key, value from src ), +q2 as ( select key, value from src where key < 10 ), +q3 as ( select key, value from src where key = 0 ) +select * from q1 intersect all select * from q2 except all select * from q3; + +set test.comment="Intersect/Except. Cache should be used now"; +set test.comment; +explain +with q1 as ( select distinct key, value from src ), +q2 as ( select key, value from src where key < 10 ), +q3 as ( select key, value from src where key = 0 ) +select * from q1 intersect all select * from q2 except all select * from q3; + +with q1 as ( select distinct key, value from src ), +q2 as ( select key, value from src where key < 10 ), +q3 as ( select key, value from src where key = 0 ) +select * from q1 intersect all select * from q2 except all select * from q3; + +-- Semijoin. Use settings from cbo_semijoin +set hive.mapred.mode=nonstrict; +set hive.exec.check.crossproducts=false; +set hive.stats.fetch.column.stats=true; +set hive.auto.convert.join=false; + +select a, c, count(*) from (select key as a, c_int+1 as b, sum(c_int) as c from cbo_t1 where (cbo_t1.c_int + 1 >= 0) and (cbo_t1.c_int > 0 or cbo_t1.c_float >= 0) group by c_float, cbo_t1.c_int, key having cbo_t1.c_float > 0 and (c_int >=1 or c_float >= 1) and (c_int + c_float) >= 0 order by a+b desc, c asc limit 5) cbo_t1 left semi join (select key as p, c_int+1 as q, sum(c_int) as r from cbo_t2 where (cbo_t2.c_int + 1 >= 0) and (cbo_t2.c_int > 0 or cbo_t2.c_float >= 0) group by c_float, cbo_t2.c_int, key having cbo_t2.c_float > 0 and (c_int >=1 or c_float >= 1) and (c_int + c_float) >= 0 order by q+r/10 desc, p limit 5) cbo_t2 on cbo_t1.a=p left semi join cbo_t3 on cbo_t1.a=key where (b + 1 >= 0) and (b > 0 or a >= 0) group by a, c having a > 0 and (a >=1 or c >= 1) and (a + c) >= 0 order by c, a; + +set test.comment="Semijoin. Cache should be used now"; +set test.comment; +explain +select a, c, count(*) from (select key as a, c_int+1 as b, sum(c_int) as c from cbo_t1 where (cbo_t1.c_int + 1 >= 0) and (cbo_t1.c_int > 0 or cbo_t1.c_float >= 0) group by c_float, cbo_t1.c_int, key having cbo_t1.c_float > 0 and (c_int >=1 or c_float >= 1) and (c_int + c_float) >= 0 order by a+b desc, c asc limit 5) cbo_t1 left semi join (select key as p, c_int+1 as q, sum(c_int) as r from cbo_t2 where (cbo_t2.c_int + 1 >= 0) and (cbo_t2.c_int > 0 or cbo_t2.c_float >= 0) group by c_float, cbo_t2.c_int, key having cbo_t2.c_float > 0 and (c_int >=1 or c_float >= 1) and (c_int + c_float) >= 0 order by q+r/10 desc, p limit 5) cbo_t2 on cbo_t1.a=p left semi join cbo_t3 on cbo_t1.a=key where (b + 1 >= 0) and (b > 0 or a >= 0) group by a, c having a > 0 and (a >=1 or c >= 1) and (a + c) >= 0 order by c, a; +select a, c, count(*) from (select key as a, c_int+1 as b, sum(c_int) as c from cbo_t1 where (cbo_t1.c_int + 1 >= 0) and (cbo_t1.c_int > 0 or cbo_t1.c_float >= 0) group by c_float, cbo_t1.c_int, key having cbo_t1.c_float > 0 and (c_int >=1 or c_float >= 1) and (c_int + c_float) >= 0 order by a+b desc, c asc limit 5) cbo_t1 left semi join (select key as p, c_int+1 as q, sum(c_int) as r from cbo_t2 where (cbo_t2.c_int + 1 >= 0) and (cbo_t2.c_int > 0 or cbo_t2.c_float >= 0) group by c_float, cbo_t2.c_int, key having cbo_t2.c_float > 0 and (c_int >=1 or c_float >= 1) and (c_int + c_float) >= 0 order by q+r/10 desc, p limit 5) cbo_t2 on cbo_t1.a=p left semi join cbo_t3 on cbo_t1.a=key where (b + 1 >= 0) and (b > 0 or a >= 0) group by a, c having a > 0 and (a >=1 or c >= 1) and (a + c) >= 0 order by c, a; http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/test/queries/clientpositive/results_cache_2.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/results_cache_2.q b/ql/src/test/queries/clientpositive/results_cache_2.q new file mode 100644 index 0000000..96a9092 --- /dev/null +++ b/ql/src/test/queries/clientpositive/results_cache_2.q @@ -0,0 +1,41 @@ + +set hive.query.results.cache.enabled=true; +set hive.fetch.task.conversion=more; + +-- Test 1: fetch task +explain +select key, value from src where key=0; +select key, value from src where key=0; + +set test.comment=Query only requires fetch task - should not use results cache; +set test.comment; +explain +select key, value from src where key=0; + + +-- Test 2: deterministic function should use cache. +select c1, count(*) +from (select sign(value) c1, value from src where key < 10) q +group by c1; + +set test.comment=This query should use the cache; +set test.comment; +explain +select c1, count(*) +from (select sign(value) c1, value from src where key < 10) q +group by c1; + +-- Test 3: non-deterministic functions should not be cached +-- Set current timestamp config to get repeatable result. +set hive.test.currenttimestamp=2012-01-01 01:02:03; + +select c1, count(*) +from (select current_timestamp c1, value from src where key < 10) q +group by c1; + +set test.comment=Queries using non-deterministic functions should not use results cache; +set test.comment; +explain +select c1, count(*) +from (select current_timestamp c1, value from src where key < 10) q +group by c1;