Copilot commented on code in PR #6376: URL: https://github.com/apache/hive/pull/6376#discussion_r3079630679
########## ql/src/test/org/apache/hadoop/hive/ql/TestCachedResults.java: ########## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.testutils.HiveTestEnvSetup; + +import org.junit.After; +import org.junit.Assert; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; + +import org.junit.rules.TestRule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class TestCachedResults { + + private static final Logger LOG = LoggerFactory.getLogger(TestCachedResults.class); + + @ClassRule + public static HiveTestEnvSetup envSetup = new HiveTestEnvSetup(); + + @Rule + public TestRule methodRule = envSetup.getMethodRule(); + + private static HiveConf conf; + private static String cacheDir; + + private ScheduledExecutorService scheduler; + private long maxCacheSize = 0; + private static long maxAllowedCacheSize = 1000000; + + + @BeforeClass + public static void setUp() throws Exception { + conf = envSetup.getTestCtx().hiveConf; + + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED, true); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_DIRECTORY, "/tmp/hive/cache"); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_NONTRANSACTIONAL_TABLES_ENABLED, true); + HiveConf.setLongVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_SIZE, maxAllowedCacheSize); + LOG.info("max allowed cache size : {}", conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_SIZE)); + + cacheDir = conf.getVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_DIRECTORY); + + HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + + SessionState.start(conf); + createAndPopulateTables(); + } + + public static void createAndPopulateTables() throws Exception { + IDriver driver = DriverFactory.newDriver(conf); + runQuery(driver, "DROP TABLE IF EXISTS tab"); + runQuery(driver, "CREATE TABLE tab (id INT)"); + runQuery(driver, + "INSERT INTO TABLE tab " + + "SELECT pos + 1 AS id FROM (" + + " SELECT posexplode(split(space(999), ' ')) AS (pos, val)" + + ") t" + ); + } + + @AfterClass + public static void afterClass() throws Exception { + IDriver driver = DriverFactory.newDriver(conf); + driver.run("DROP TABLE IF EXISTS tab"); + } + + @Before + public void beforeEach() { + scheduler = Executors.newSingleThreadScheduledExecutor(); + maxCacheSize = 0; + } + + @After + public void afterEach() throws InterruptedException { + QueryResultsCache.cleanupInstance(); + Path cacheDirPath = new Path(cacheDir); + try { + FileSystem fs = cacheDirPath.getFileSystem(conf); + fs.delete(cacheDirPath, true); + } catch (IOException e) { + LOG.warn("Failed to clean up cache directory: {}", cacheDir, e); + } + scheduler.shutdownNow(); + scheduler.awaitTermination(1, TimeUnit.SECONDS); + } + + private void executeQueries(IDriver driver) throws Exception { + + runQuery(driver, + "SELECT t1.id, " + + " SUM(t2.id) OVER (PARTITION BY t1.id % 10 ORDER BY t1.id) AS running_sum, " + + " AVG(t2.id) OVER (PARTITION BY t1.id % 10) AS window_avg, " + + " COUNT(t2.id) OVER (PARTITION BY t1.id % 10 ORDER BY t1.id) AS running_cnt " + + "FROM tab t1 " + + "JOIN tab t2 ON t1.id % 10 = t2.id % 10 " + + "WHERE t1.id <= 300" + ); + // running same query to check for cache hit while debugging + runQuery(driver, + "SELECT t1.id, " + + " SUM(t2.id) OVER (PARTITION BY t1.id % 10 ORDER BY t1.id) AS running_sum, " + + " AVG(t2.id) OVER (PARTITION BY t1.id % 10) AS window_avg, " + + " COUNT(t2.id) OVER (PARTITION BY t1.id % 10 ORDER BY t1.id) AS running_cnt " + + "FROM tab t1 " + + "JOIN tab t2 ON t1.id % 10 = t2.id % 10 " + + "WHERE t1.id <= 300" + ); + runQuery(driver, + "SELECT base.id, base.bucket, agg.bucket_avg " + + "FROM ( " + + " SELECT id, id % 10 AS bucket FROM tab WHERE id <= 500 " + + ") base " + + "JOIN ( " + + " SELECT id % 10 AS bucket, AVG(id) AS bucket_avg, COUNT(*) AS cnt " + + " FROM tab " + + " GROUP BY id % 10 " + + ") agg ON base.bucket = agg.bucket " + + "ORDER BY base.id" + ); + // running same query to check for cache hit while debugging + runQuery(driver, + "SELECT base.id, base.bucket, agg.bucket_avg " + + "FROM ( " + + " SELECT id, id % 10 AS bucket FROM tab WHERE id <= 500 " + + ") base " + + "JOIN ( " + + " SELECT id % 10 AS bucket, AVG(id) AS bucket_avg, COUNT(*) AS cnt " + + " FROM tab " + + " GROUP BY id % 10 " + + ") agg ON base.bucket = agg.bucket " + + "ORDER BY base.id" + ); + runQuery(driver, + "WITH base AS ( " + + " SELECT id, id % 2 AS is_even, id % 5 AS mod5, id % 10 AS mod10 " + + " FROM tab " + + "), " + + "joined AS ( " + + " SELECT a.id AS a_id, b.id AS b_id, " + + " a.mod5, a.mod10, " + + " (a.id * b.id) AS product " + + " FROM base a " + + " JOIN base b ON a.mod5 = b.mod5 AND a.is_even = b.is_even " + + " WHERE a.id <= 200 AND b.id <= 200 " + + ") " + + "SELECT mod5, mod10, " + + " COUNT(*) AS cnt, " + + " SUM(product) AS total_product, " + + " MAX(product) AS max_product, " + + " MIN(a_id) AS min_a " + + "FROM joined " + + "GROUP BY mod5, mod10 " + + "ORDER BY mod5, mod10" + ); + // running same query to check for cache hit while debugging + runQuery(driver, + "WITH base AS ( " + + " SELECT id, id % 2 AS is_even, id % 5 AS mod5, id % 10 AS mod10 " + + " FROM tab " + + "), " + + "joined AS ( " + + " SELECT a.id AS a_id, b.id AS b_id, " + + " a.mod5, a.mod10, " + + " (a.id * b.id) AS product " + + " FROM base a " + + " JOIN base b ON a.mod5 = b.mod5 AND a.is_even = b.is_even " + + " WHERE a.id <= 200 AND b.id <= 200 " + + ") " + + "SELECT mod5, mod10, " + + " COUNT(*) AS cnt, " + + " SUM(product) AS total_product, " + + " MAX(product) AS max_product, " + + " MIN(a_id) AS min_a " + + "FROM joined " + + "GROUP BY mod5, mod10 " + + "ORDER BY mod5, mod10" + ); + } + + @Test + public void testSafeCacheWrite() throws Exception { + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED, true); Review Comment: The tests mutate a shared static `conf` but `testUnsafeCacheWrite` never explicitly sets `HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED` back to `false`. If `testSafeCacheWrite` runs first (test order is not guaranteed), `testUnsafeCacheWrite` will execute in safe mode and assert the opposite behavior. Make each test explicitly set the flag to the intended value (or reset it in `@Before` / `@After`). ########## ql/src/test/org/apache/hadoop/hive/ql/TestCachedResults.java: ########## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.testutils.HiveTestEnvSetup; + +import org.junit.After; +import org.junit.Assert; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; + +import org.junit.rules.TestRule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class TestCachedResults { + + private static final Logger LOG = LoggerFactory.getLogger(TestCachedResults.class); + + @ClassRule + public static HiveTestEnvSetup envSetup = new HiveTestEnvSetup(); + + @Rule + public TestRule methodRule = envSetup.getMethodRule(); + + private static HiveConf conf; + private static String cacheDir; + + private ScheduledExecutorService scheduler; + private long maxCacheSize = 0; Review Comment: `maxCacheSize` is written from the scheduler thread and read from the test thread without any memory-visibility guarantees, which can make assertions flaky (e.g., reading a stale `0`). Use an `AtomicLong` (or make it `volatile` and update carefully) to ensure cross-thread visibility. ```suggestion private volatile long maxCacheSize = 0; ``` ########## ql/src/test/org/apache/hadoop/hive/ql/TestCachedResults.java: ########## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.testutils.HiveTestEnvSetup; + +import org.junit.After; +import org.junit.Assert; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; + +import org.junit.rules.TestRule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class TestCachedResults { + + private static final Logger LOG = LoggerFactory.getLogger(TestCachedResults.class); + + @ClassRule + public static HiveTestEnvSetup envSetup = new HiveTestEnvSetup(); + + @Rule + public TestRule methodRule = envSetup.getMethodRule(); + + private static HiveConf conf; + private static String cacheDir; + + private ScheduledExecutorService scheduler; + private long maxCacheSize = 0; + private static long maxAllowedCacheSize = 1000000; + + + @BeforeClass + public static void setUp() throws Exception { + conf = envSetup.getTestCtx().hiveConf; + + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED, true); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_DIRECTORY, "/tmp/hive/cache"); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_NONTRANSACTIONAL_TABLES_ENABLED, true); + HiveConf.setLongVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_SIZE, maxAllowedCacheSize); + LOG.info("max allowed cache size : {}", conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_SIZE)); + + cacheDir = conf.getVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_DIRECTORY); + + HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + + SessionState.start(conf); + createAndPopulateTables(); + } + + public static void createAndPopulateTables() throws Exception { + IDriver driver = DriverFactory.newDriver(conf); + runQuery(driver, "DROP TABLE IF EXISTS tab"); + runQuery(driver, "CREATE TABLE tab (id INT)"); + runQuery(driver, + "INSERT INTO TABLE tab " + + "SELECT pos + 1 AS id FROM (" + + " SELECT posexplode(split(space(999), ' ')) AS (pos, val)" + + ") t" + ); + } + + @AfterClass + public static void afterClass() throws Exception { + IDriver driver = DriverFactory.newDriver(conf); + driver.run("DROP TABLE IF EXISTS tab"); + } + + @Before + public void beforeEach() { + scheduler = Executors.newSingleThreadScheduledExecutor(); + maxCacheSize = 0; + } + + @After + public void afterEach() throws InterruptedException { + QueryResultsCache.cleanupInstance(); + Path cacheDirPath = new Path(cacheDir); + try { + FileSystem fs = cacheDirPath.getFileSystem(conf); + fs.delete(cacheDirPath, true); + } catch (IOException e) { + LOG.warn("Failed to clean up cache directory: {}", cacheDir, e); + } + scheduler.shutdownNow(); + scheduler.awaitTermination(1, TimeUnit.SECONDS); + } + + private void executeQueries(IDriver driver) throws Exception { + + runQuery(driver, + "SELECT t1.id, " + + " SUM(t2.id) OVER (PARTITION BY t1.id % 10 ORDER BY t1.id) AS running_sum, " + + " AVG(t2.id) OVER (PARTITION BY t1.id % 10) AS window_avg, " + + " COUNT(t2.id) OVER (PARTITION BY t1.id % 10 ORDER BY t1.id) AS running_cnt " + + "FROM tab t1 " + + "JOIN tab t2 ON t1.id % 10 = t2.id % 10 " + + "WHERE t1.id <= 300" + ); + // running same query to check for cache hit while debugging + runQuery(driver, + "SELECT t1.id, " + + " SUM(t2.id) OVER (PARTITION BY t1.id % 10 ORDER BY t1.id) AS running_sum, " + + " AVG(t2.id) OVER (PARTITION BY t1.id % 10) AS window_avg, " + + " COUNT(t2.id) OVER (PARTITION BY t1.id % 10 ORDER BY t1.id) AS running_cnt " + + "FROM tab t1 " + + "JOIN tab t2 ON t1.id % 10 = t2.id % 10 " + + "WHERE t1.id <= 300" + ); + runQuery(driver, + "SELECT base.id, base.bucket, agg.bucket_avg " + + "FROM ( " + + " SELECT id, id % 10 AS bucket FROM tab WHERE id <= 500 " + + ") base " + + "JOIN ( " + + " SELECT id % 10 AS bucket, AVG(id) AS bucket_avg, COUNT(*) AS cnt " + + " FROM tab " + + " GROUP BY id % 10 " + + ") agg ON base.bucket = agg.bucket " + + "ORDER BY base.id" + ); + // running same query to check for cache hit while debugging + runQuery(driver, + "SELECT base.id, base.bucket, agg.bucket_avg " + + "FROM ( " + + " SELECT id, id % 10 AS bucket FROM tab WHERE id <= 500 " + + ") base " + + "JOIN ( " + + " SELECT id % 10 AS bucket, AVG(id) AS bucket_avg, COUNT(*) AS cnt " + + " FROM tab " + + " GROUP BY id % 10 " + + ") agg ON base.bucket = agg.bucket " + + "ORDER BY base.id" + ); + runQuery(driver, + "WITH base AS ( " + + " SELECT id, id % 2 AS is_even, id % 5 AS mod5, id % 10 AS mod10 " + + " FROM tab " + + "), " + + "joined AS ( " + + " SELECT a.id AS a_id, b.id AS b_id, " + + " a.mod5, a.mod10, " + + " (a.id * b.id) AS product " + + " FROM base a " + + " JOIN base b ON a.mod5 = b.mod5 AND a.is_even = b.is_even " + + " WHERE a.id <= 200 AND b.id <= 200 " + + ") " + + "SELECT mod5, mod10, " + + " COUNT(*) AS cnt, " + + " SUM(product) AS total_product, " + + " MAX(product) AS max_product, " + + " MIN(a_id) AS min_a " + + "FROM joined " + + "GROUP BY mod5, mod10 " + + "ORDER BY mod5, mod10" + ); + // running same query to check for cache hit while debugging + runQuery(driver, + "WITH base AS ( " + + " SELECT id, id % 2 AS is_even, id % 5 AS mod5, id % 10 AS mod10 " + + " FROM tab " + + "), " + + "joined AS ( " + + " SELECT a.id AS a_id, b.id AS b_id, " + + " a.mod5, a.mod10, " + + " (a.id * b.id) AS product " + + " FROM base a " + + " JOIN base b ON a.mod5 = b.mod5 AND a.is_even = b.is_even " + + " WHERE a.id <= 200 AND b.id <= 200 " + + ") " + + "SELECT mod5, mod10, " + + " COUNT(*) AS cnt, " + + " SUM(product) AS total_product, " + + " MAX(product) AS max_product, " + + " MIN(a_id) AS min_a " + + "FROM joined " + + "GROUP BY mod5, mod10 " + + "ORDER BY mod5, mod10" + ); + } + + @Test + public void testSafeCacheWrite() throws Exception { + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED, true); + startCacheMonitor(1); + IDriver driver = DriverFactory.newDriver(conf); + executeQueries(driver); + stopCacheMonitor(); + Assert.assertNotEquals("cache size should have grown", 0, maxCacheSize); + LOG.info("Maximum cache size in safe mode went upto : {}", maxCacheSize); + Assert.assertFalse("max cache size recorded should be smaller than max allowed cache size", + maxCacheSize > maxAllowedCacheSize); + } + + @Test + public void testUnsafeCacheWrite() throws Exception { + startCacheMonitor(1); + IDriver driver = DriverFactory.newDriver(conf); + executeQueries(driver); + stopCacheMonitor(); + Assert.assertNotEquals("cache size should have grown", 0, maxCacheSize); + LOG.info("Maximum cache size in non safe mode went upto : {}", maxCacheSize); + Assert.assertFalse("max cache size recorded should be greater than max allowed cache size", + maxCacheSize < maxAllowedCacheSize); + } + + private void startCacheMonitor(long intervalMs) { + scheduler.scheduleAtFixedRate(() -> { + long size = getFolderSize(new File(cacheDir)); + maxCacheSize = Math.max(maxCacheSize, size); + }, 0, intervalMs, TimeUnit.MILLISECONDS); + } Review Comment: `maxCacheSize` is written from the scheduler thread and read from the test thread without any memory-visibility guarantees, which can make assertions flaky (e.g., reading a stale `0`). Use an `AtomicLong` (or make it `volatile` and update carefully) to ensure cross-thread visibility. ########## ql/src/test/org/apache/hadoop/hive/ql/TestCachedResults.java: ########## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.testutils.HiveTestEnvSetup; + +import org.junit.After; +import org.junit.Assert; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; + +import org.junit.rules.TestRule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class TestCachedResults { + + private static final Logger LOG = LoggerFactory.getLogger(TestCachedResults.class); + + @ClassRule + public static HiveTestEnvSetup envSetup = new HiveTestEnvSetup(); + + @Rule + public TestRule methodRule = envSetup.getMethodRule(); + + private static HiveConf conf; + private static String cacheDir; + + private ScheduledExecutorService scheduler; + private long maxCacheSize = 0; + private static long maxAllowedCacheSize = 1000000; + + + @BeforeClass + public static void setUp() throws Exception { + conf = envSetup.getTestCtx().hiveConf; + + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED, true); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_DIRECTORY, "/tmp/hive/cache"); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_NONTRANSACTIONAL_TABLES_ENABLED, true); + HiveConf.setLongVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_SIZE, maxAllowedCacheSize); + LOG.info("max allowed cache size : {}", conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_SIZE)); + + cacheDir = conf.getVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_DIRECTORY); + + HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + + SessionState.start(conf); + createAndPopulateTables(); + } + + public static void createAndPopulateTables() throws Exception { + IDriver driver = DriverFactory.newDriver(conf); + runQuery(driver, "DROP TABLE IF EXISTS tab"); + runQuery(driver, "CREATE TABLE tab (id INT)"); + runQuery(driver, + "INSERT INTO TABLE tab " + + "SELECT pos + 1 AS id FROM (" + + " SELECT posexplode(split(space(999), ' ')) AS (pos, val)" + + ") t" + ); + } + + @AfterClass + public static void afterClass() throws Exception { + IDriver driver = DriverFactory.newDriver(conf); + driver.run("DROP TABLE IF EXISTS tab"); + } + + @Before + public void beforeEach() { + scheduler = Executors.newSingleThreadScheduledExecutor(); + maxCacheSize = 0; + } + + @After + public void afterEach() throws InterruptedException { + QueryResultsCache.cleanupInstance(); + Path cacheDirPath = new Path(cacheDir); + try { + FileSystem fs = cacheDirPath.getFileSystem(conf); + fs.delete(cacheDirPath, true); + } catch (IOException e) { + LOG.warn("Failed to clean up cache directory: {}", cacheDir, e); + } + scheduler.shutdownNow(); + scheduler.awaitTermination(1, TimeUnit.SECONDS); + } + + private void executeQueries(IDriver driver) throws Exception { + + runQuery(driver, + "SELECT t1.id, " + + " SUM(t2.id) OVER (PARTITION BY t1.id % 10 ORDER BY t1.id) AS running_sum, " + + " AVG(t2.id) OVER (PARTITION BY t1.id % 10) AS window_avg, " + + " COUNT(t2.id) OVER (PARTITION BY t1.id % 10 ORDER BY t1.id) AS running_cnt " + + "FROM tab t1 " + + "JOIN tab t2 ON t1.id % 10 = t2.id % 10 " + + "WHERE t1.id <= 300" + ); + // running same query to check for cache hit while debugging + runQuery(driver, + "SELECT t1.id, " + + " SUM(t2.id) OVER (PARTITION BY t1.id % 10 ORDER BY t1.id) AS running_sum, " + + " AVG(t2.id) OVER (PARTITION BY t1.id % 10) AS window_avg, " + + " COUNT(t2.id) OVER (PARTITION BY t1.id % 10 ORDER BY t1.id) AS running_cnt " + + "FROM tab t1 " + + "JOIN tab t2 ON t1.id % 10 = t2.id % 10 " + + "WHERE t1.id <= 300" + ); + runQuery(driver, + "SELECT base.id, base.bucket, agg.bucket_avg " + + "FROM ( " + + " SELECT id, id % 10 AS bucket FROM tab WHERE id <= 500 " + + ") base " + + "JOIN ( " + + " SELECT id % 10 AS bucket, AVG(id) AS bucket_avg, COUNT(*) AS cnt " + + " FROM tab " + + " GROUP BY id % 10 " + + ") agg ON base.bucket = agg.bucket " + + "ORDER BY base.id" + ); + // running same query to check for cache hit while debugging + runQuery(driver, + "SELECT base.id, base.bucket, agg.bucket_avg " + + "FROM ( " + + " SELECT id, id % 10 AS bucket FROM tab WHERE id <= 500 " + + ") base " + + "JOIN ( " + + " SELECT id % 10 AS bucket, AVG(id) AS bucket_avg, COUNT(*) AS cnt " + + " FROM tab " + + " GROUP BY id % 10 " + + ") agg ON base.bucket = agg.bucket " + + "ORDER BY base.id" + ); + runQuery(driver, + "WITH base AS ( " + + " SELECT id, id % 2 AS is_even, id % 5 AS mod5, id % 10 AS mod10 " + + " FROM tab " + + "), " + + "joined AS ( " + + " SELECT a.id AS a_id, b.id AS b_id, " + + " a.mod5, a.mod10, " + + " (a.id * b.id) AS product " + + " FROM base a " + + " JOIN base b ON a.mod5 = b.mod5 AND a.is_even = b.is_even " + + " WHERE a.id <= 200 AND b.id <= 200 " + + ") " + + "SELECT mod5, mod10, " + + " COUNT(*) AS cnt, " + + " SUM(product) AS total_product, " + + " MAX(product) AS max_product, " + + " MIN(a_id) AS min_a " + + "FROM joined " + + "GROUP BY mod5, mod10 " + + "ORDER BY mod5, mod10" + ); + // running same query to check for cache hit while debugging + runQuery(driver, + "WITH base AS ( " + + " SELECT id, id % 2 AS is_even, id % 5 AS mod5, id % 10 AS mod10 " + + " FROM tab " + + "), " + + "joined AS ( " + + " SELECT a.id AS a_id, b.id AS b_id, " + + " a.mod5, a.mod10, " + + " (a.id * b.id) AS product " + + " FROM base a " + + " JOIN base b ON a.mod5 = b.mod5 AND a.is_even = b.is_even " + + " WHERE a.id <= 200 AND b.id <= 200 " + + ") " + + "SELECT mod5, mod10, " + + " COUNT(*) AS cnt, " + + " SUM(product) AS total_product, " + + " MAX(product) AS max_product, " + + " MIN(a_id) AS min_a " + + "FROM joined " + + "GROUP BY mod5, mod10 " + + "ORDER BY mod5, mod10" + ); + } + + @Test + public void testSafeCacheWrite() throws Exception { + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED, true); + startCacheMonitor(1); + IDriver driver = DriverFactory.newDriver(conf); + executeQueries(driver); + stopCacheMonitor(); + Assert.assertNotEquals("cache size should have grown", 0, maxCacheSize); + LOG.info("Maximum cache size in safe mode went upto : {}", maxCacheSize); + Assert.assertFalse("max cache size recorded should be smaller than max allowed cache size", + maxCacheSize > maxAllowedCacheSize); + } + + @Test + public void testUnsafeCacheWrite() throws Exception { Review Comment: The tests mutate a shared static `conf` but `testUnsafeCacheWrite` never explicitly sets `HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED` back to `false`. If `testSafeCacheWrite` runs first (test order is not guaranteed), `testUnsafeCacheWrite` will execute in safe mode and assert the opposite behavior. Make each test explicitly set the flag to the intended value (or reset it in `@Before` / `@After`). ```suggestion public void testUnsafeCacheWrite() throws Exception { HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED, false); ``` ########## ql/src/test/org/apache/hadoop/hive/ql/TestCachedResults.java: ########## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.testutils.HiveTestEnvSetup; + +import org.junit.After; +import org.junit.Assert; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; + +import org.junit.rules.TestRule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class TestCachedResults { + + private static final Logger LOG = LoggerFactory.getLogger(TestCachedResults.class); + + @ClassRule + public static HiveTestEnvSetup envSetup = new HiveTestEnvSetup(); + + @Rule + public TestRule methodRule = envSetup.getMethodRule(); + + private static HiveConf conf; + private static String cacheDir; + + private ScheduledExecutorService scheduler; + private long maxCacheSize = 0; + private static long maxAllowedCacheSize = 1000000; + + + @BeforeClass + public static void setUp() throws Exception { + conf = envSetup.getTestCtx().hiveConf; + + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED, true); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_DIRECTORY, "/tmp/hive/cache"); Review Comment: Hard-coding `/tmp/hive/cache` in a test can cause collisions across concurrent test runs and may fail on environments without a writable `/tmp` (or on non-Unix setups). Prefer using a per-test temporary directory (e.g., test framework temp dirs / `java.nio.file.Files#createTempDirectory`) and set the conf to that path. ########## ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java: ########## @@ -549,6 +578,39 @@ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) { return false; } + if (isSafeCacheWriteEnabled) { + Path resultDir = new Path(cacheDirPath, UUID.randomUUID().toString()); + FileSystem cacheFs = resultDir.getFileSystem(conf); + cacheFs.mkdirs(resultDir); + + Set<FileStatus> cacheFilesToFetch = new HashSet<>(); + rwLock.writeLock().lock(); + boolean succeeded = true; + try { + for (FileStatus fs : fetchWork.getFilesToFetch()) { + FileSystem srcFs = fs.getPath().getFileSystem(conf); + Path srcFile = fs.getPath(); + Path destFile = new Path(resultDir, + new Path(fs.getPath().toString().substring(safeDir.length() + 1))); Review Comment: Building relative paths via `substring(safeDir.length() + 1)` is fragile: it assumes the string form of each path always starts with `safeDir` (same scheme/authority, same qualification, no trailing slash differences). If that assumption breaks, it can produce incorrect paths or throw `StringIndexOutOfBoundsException`. Prefer computing a proper relative path using qualified `Path`/`URI` comparison (e.g., qualify both paths and derive the relative suffix from URI paths) and validate that the source is inside `safeDir` before deriving the destination. ########## ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java: ########## @@ -549,6 +578,39 @@ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) { return false; } + if (isSafeCacheWriteEnabled) { + Path resultDir = new Path(cacheDirPath, UUID.randomUUID().toString()); + FileSystem cacheFs = resultDir.getFileSystem(conf); + cacheFs.mkdirs(resultDir); + + Set<FileStatus> cacheFilesToFetch = new HashSet<>(); + rwLock.writeLock().lock(); + boolean succeeded = true; + try { + for (FileStatus fs : fetchWork.getFilesToFetch()) { + FileSystem srcFs = fs.getPath().getFileSystem(conf); + Path srcFile = fs.getPath(); + Path destFile = new Path(resultDir, + new Path(fs.getPath().toString().substring(safeDir.length() + 1))); + succeeded = FileUtil.copy(srcFs, srcFile, cacheFs, destFile, false, conf); + if (!succeeded) { + throw new IOException("File copy failed for " + srcFile + " -> " + destFile); + } + cacheFilesToFetch.add(cacheFs.getFileStatus(destFile)); + } + } catch (IOException e) { + LOG.warn("Failed to write cache entry to {}", resultDir, e); + } finally { + rwLock.writeLock().unlock(); + } + if (!succeeded) { + removeInvalidStaleFiles(cacheFs, cacheFilesToFetch); + return false; + } + fetchWork.setFilesToFetch(cacheFilesToFetch); + fetchWork.setTblDir(new Path(resultDir, fetchWork.getTblDir().toString().substring(safeDir.length() + 1))); + } Review Comment: If an `IOException` is thrown in the try-block for reasons other than `FileUtil.copy` returning `false`, the `catch` logs but does not set `succeeded = false`. That can allow the method to proceed with a partial/inconsistent `cacheFilesToFetch` set and rewritten `tblDir`. Set `succeeded = false` in the catch (and/or rethrow) and ensure the failure path executes. ########## ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java: ########## @@ -549,6 +578,39 @@ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) { return false; } + if (isSafeCacheWriteEnabled) { + Path resultDir = new Path(cacheDirPath, UUID.randomUUID().toString()); + FileSystem cacheFs = resultDir.getFileSystem(conf); + cacheFs.mkdirs(resultDir); + + Set<FileStatus> cacheFilesToFetch = new HashSet<>(); + rwLock.writeLock().lock(); + boolean succeeded = true; + try { + for (FileStatus fs : fetchWork.getFilesToFetch()) { Review Comment: The cache write lock is held while performing potentially slow filesystem copies. This can block unrelated cache operations for the duration of the copy and hurt concurrency under load. Consider copying files outside the cache rwLock, and only acquiring the lock for the minimal critical section where shared cache state is updated. ########## ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java: ########## @@ -549,6 +578,39 @@ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) { return false; } + if (isSafeCacheWriteEnabled) { + Path resultDir = new Path(cacheDirPath, UUID.randomUUID().toString()); + FileSystem cacheFs = resultDir.getFileSystem(conf); + cacheFs.mkdirs(resultDir); + + Set<FileStatus> cacheFilesToFetch = new HashSet<>(); + rwLock.writeLock().lock(); + boolean succeeded = true; + try { + for (FileStatus fs : fetchWork.getFilesToFetch()) { + FileSystem srcFs = fs.getPath().getFileSystem(conf); + Path srcFile = fs.getPath(); + Path destFile = new Path(resultDir, + new Path(fs.getPath().toString().substring(safeDir.length() + 1))); + succeeded = FileUtil.copy(srcFs, srcFile, cacheFs, destFile, false, conf); Review Comment: The cache write lock is held while performing potentially slow filesystem copies. This can block unrelated cache operations for the duration of the copy and hurt concurrency under load. Consider copying files outside the cache rwLock, and only acquiring the lock for the minimal critical section where shared cache state is updated. ########## common/src/java/org/apache/hadoop/hive/conf/HiveConf.java: ########## @@ -5670,7 +5670,9 @@ public static enum ConfVars { HIVE_QUERY_RESULTS_CACHE_ENABLED("hive.query.results.cache.enabled", true, "If the query results cache is enabled. This will keep results of previously executed queries " + "to be reused if the same query is executed again."), - + HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED("hive.query.results.safe.cache.write.enabled", false, + "If the query results safe cache is enabled. This will safely write to cache directory by first evaluating " + + "the cache entry is not overspilling the the cache directory before writing it to cache directory "), Review Comment: The new conf var description has grammatical issues (e.g., duplicated 'the', awkward phrasing) and it doesn’t clearly describe the actual behavior implemented (write results to a non-cache destination first, then copy into cache only after the entry is deemed valid/acceptable). Please reword to be precise and user-facing. ```suggestion "If enabled, query results are written to a temporary non-cache location first and copied into the " + "cache directory only after the cache entry is accepted as valid, for example after size checks."), ``` ########## ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java: ########## @@ -7500,19 +7500,24 @@ protected Table getTargetTable(QB qb, String dest) throws SemanticException { } private Path getDestinationFilePath(QB qb, final String destinationFile, boolean isMmTable) { + Path defaultPath = new Path(destinationFile); if (this.isResultsCacheEnabled() && this.queryTypeCanUseCache(qb)) { assert (!isMmTable); QueryResultsCache instance = QueryResultsCache.getInstance(); // QueryResultsCache should have been initialized by now if (instance != null) { - Path resultCacheTopDir = instance.getCacheDirPath(); - String dirName = UUID.randomUUID().toString(); - Path resultDir = new Path(resultCacheTopDir, dirName); - this.ctx.setFsResultCacheDirs(resultDir); - return resultDir; + if (!conf.getBoolVar(ConfVars.HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED)) { + Path resultCacheTopDir = instance.getCacheDirPath(); + String dirName = UUID.randomUUID().toString(); + Path resultDir = new Path(resultCacheTopDir, dirName); + this.ctx.setFsResultCacheDirs(resultDir); + return resultDir; + } else { + instance.setSafeDir(defaultPath.toString()); Review Comment: `setSafeDir` stores request-specific state on the singleton `QueryResultsCache` instance. In a multi-session / multi-query JVM, concurrent queries can overwrite `safeDir`, causing incorrect path computations and potentially copying/deleting the wrong files. Move this state to something query-scoped (e.g., store the safe source root on the `CacheEntry`/`QueryInfo`/`FetchWork`, or pass it as a parameter into `setEntryValid`) instead of keeping it on the shared cache singleton. ```suggestion ``` ########## ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java: ########## @@ -549,6 +578,39 @@ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) { return false; } + if (isSafeCacheWriteEnabled) { + Path resultDir = new Path(cacheDirPath, UUID.randomUUID().toString()); + FileSystem cacheFs = resultDir.getFileSystem(conf); + cacheFs.mkdirs(resultDir); + + Set<FileStatus> cacheFilesToFetch = new HashSet<>(); + rwLock.writeLock().lock(); + boolean succeeded = true; + try { + for (FileStatus fs : fetchWork.getFilesToFetch()) { + FileSystem srcFs = fs.getPath().getFileSystem(conf); + Path srcFile = fs.getPath(); + Path destFile = new Path(resultDir, + new Path(fs.getPath().toString().substring(safeDir.length() + 1))); + succeeded = FileUtil.copy(srcFs, srcFile, cacheFs, destFile, false, conf); + if (!succeeded) { + throw new IOException("File copy failed for " + srcFile + " -> " + destFile); + } + cacheFilesToFetch.add(cacheFs.getFileStatus(destFile)); + } + } catch (IOException e) { + LOG.warn("Failed to write cache entry to {}", resultDir, e); + } finally { + rwLock.writeLock().unlock(); + } + if (!succeeded) { + removeInvalidStaleFiles(cacheFs, cacheFilesToFetch); + return false; + } + fetchWork.setFilesToFetch(cacheFilesToFetch); + fetchWork.setTblDir(new Path(resultDir, fetchWork.getTblDir().toString().substring(safeDir.length() + 1))); Review Comment: Building relative paths via `substring(safeDir.length() + 1)` is fragile: it assumes the string form of each path always starts with `safeDir` (same scheme/authority, same qualification, no trailing slash differences). If that assumption breaks, it can produce incorrect paths or throw `StringIndexOutOfBoundsException`. Prefer computing a proper relative path using qualified `Path`/`URI` comparison (e.g., qualify both paths and derive the relative suffix from URI paths) and validate that the source is inside `safeDir` before deriving the destination. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
