This is an automated email from the ASF dual-hosted git repository. hashutosh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new f7e9d9b HIVE-24270: Move scratchdir cleanup to background f7e9d9b is described below commit f7e9d9b14e9f1fb266aefa9cad73d509d9d614af Author: Mustafa Iman <mustafai...@gmail.com> AuthorDate: Tue Oct 13 14:14:10 2020 -0700 HIVE-24270: Move scratchdir cleanup to background Signed-off-by: Ashutosh Chauhan <hashut...@apache.org> --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 5 + ql/src/java/org/apache/hadoop/hive/ql/Context.java | 13 +- .../hadoop/hive/ql/cleanup/CleanupService.java | 38 ++++++ .../hive/ql/cleanup/EventualCleanupService.java | 145 ++++++++++++++++++++ .../hadoop/hive/ql/cleanup/SyncCleanupService.java | 68 +++++++++ .../hadoop/hive/ql/session/SessionState.java | 17 ++- .../hadoop/hive/ql/cleanup/TestCleanupService.java | 152 +++++++++++++++++++++ .../hive/service/cli/session/HiveSessionImpl.java | 7 +- .../hive/service/cli/session/SessionManager.java | 18 +++ 9 files changed, 451 insertions(+), 12 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index edaa75b..45a44e9 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -5238,6 +5238,11 @@ public class HiveConf extends Configuration { HIVE_SCHEDULED_QUERIES_MAX_EXECUTORS("hive.scheduled.queries.max.executors", 4, new RangeValidator(1, null), "Maximal number of scheduled query executors to allow."), + HIVE_ASYNC_CLEANUP_SERVICE_THREAD_COUNT("hive.async.cleanup.service.thread.count", 10, new RangeValidator(0, null), + "Number of threads that run some eventual cleanup operations after queries/sessions close. 0 means cleanup is sync."), + HIVE_ASYNC_CLEANUP_SERVICE_QUEUE_SIZE("hive.async.cleanup.service.queue.size", 10000, new RangeValidator(10, Integer.MAX_VALUE), + "Size of the async cleanup queue. If cleanup queue is full, cleanup operations become synchronous. " + + "Applicable only when number of async cleanup is turned on."), HIVE_QUERY_RESULTS_CACHE_ENABLED("hive.query.results.cache.enabled", true, "If the query results cache is enabled. This will keep results of previously executed queries " + "to be reused if the same query is executed again."), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index a41c5c8..e4141fe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -673,22 +673,21 @@ public class Context { if(this.fsResultCacheDirs != null) { resultCacheDir = this.fsResultCacheDirs.toUri().getPath(); } - for (Map.Entry<String, Path> entry : fsScratchDirs.entrySet()) { + SessionState sessionState = SessionState.get(); + for (Path p: fsScratchDirs.values()) { try { - Path p = entry.getValue(); if (p.toUri().getPath().contains(stagingDir) && subDirOf(p, fsScratchDirs.values()) ) { LOG.debug("Skip deleting stagingDir: " + p); FileSystem fs = p.getFileSystem(conf); fs.cancelDeleteOnExit(p); continue; // staging dir is deleted when deleting the scratch dir } - if(resultCacheDir == null || !p.toUri().getPath().contains(resultCacheDir)) { + if (resultCacheDir == null || !p.toUri().getPath().contains(resultCacheDir)) { // delete only the paths which aren't result cache dir path // because that will be taken care by removeResultCacheDir - FileSystem fs = p.getFileSystem(conf); - LOG.debug("Deleting scratch dir: {}", p); - fs.delete(p, true); - fs.cancelDeleteOnExit(p); + FileSystem fs = p.getFileSystem(conf); + LOG.info("Deleting scratch dir: {}", p); + sessionState.getCleanupService().deleteRecursive(p, fs); } } catch (Exception e) { LOG.warn("Error Removing Scratch: " diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cleanup/CleanupService.java b/ql/src/java/org/apache/hadoop/hive/ql/cleanup/CleanupService.java new file mode 100644 index 0000000..919298e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/cleanup/CleanupService.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.cleanup; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; + +/** + * A service to remove temporary files at the end of a query/session + */ +public interface CleanupService { + + void start(); + + void shutdown(); + + void shutdownNow(); + + boolean deleteRecursive(Path path, FileSystem fileSystem) throws IOException; +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cleanup/EventualCleanupService.java b/ql/src/java/org/apache/hadoop/hive/ql/cleanup/EventualCleanupService.java new file mode 100644 index 0000000..fcfde42 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/cleanup/EventualCleanupService.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.cleanup; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class EventualCleanupService implements CleanupService { + private final int threadCount; + private final int queueSize; + private final ThreadFactory factory; + private final Logger LOG = LoggerFactory.getLogger(EventualCleanupService.class.getName()); + private final AtomicBoolean isRunning = new AtomicBoolean(true); + private final BlockingQueue<AsyncDeleteAction> deleteActions; + private ExecutorService cleanerExecutorService; + + public EventualCleanupService(int threadCount, int queueSize) { + if (queueSize < threadCount) { + throw new IllegalArgumentException("Queue size should be greater or equal to thread count. Queue size: " + + queueSize + ", thread count: " + threadCount); + } + this.factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("EventualCleanupService thread %d").build(); + this.threadCount = threadCount; + this.queueSize = queueSize; + this.deleteActions = new LinkedBlockingQueue<>(queueSize); + } + + @Override + public synchronized void start() { + if (cleanerExecutorService != null) { + LOG.debug("EventualCleanupService is already running."); + return; + } + cleanerExecutorService = Executors.newFixedThreadPool(threadCount, factory); + for (int i = 0; i < threadCount; i++) { + cleanerExecutorService.submit(new CleanupRunnable()); + } + LOG.info("EventualCleanupService started with {} threads and queue of size {}", threadCount, queueSize); + } + + @Override + public boolean deleteRecursive(Path path, FileSystem fileSystem) { + if (isRunning.get()) { + if (deleteActions.offer(new AsyncDeleteAction(path, fileSystem))) { + LOG.info("Delete {} operation was queued", path); + } else { + try { + fileSystem.cancelDeleteOnExit(path); + fileSystem.delete(path, true); + LOG.info("Deleted {} synchronously as the async queue was full", path); + } catch (IOException e) { + LOG.warn("Error removing path {}: {}", path, e); + } + } + + return true; + } else { + LOG.warn("Delete request {} was ignored as cleanup service is shutting down", path); + return false; + } + } + + @Override + public void shutdown() { + isRunning.set(false); + cleanerExecutorService.shutdown(); + } + + @Override + public void shutdownNow() { + isRunning.set(false); + cleanerExecutorService.shutdownNow(); + } + + @VisibleForTesting + public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException { + return cleanerExecutorService.awaitTermination(timeout, timeUnit); + } + + + private static class AsyncDeleteAction { + Path path; + FileSystem fileSystem; + + public AsyncDeleteAction(Path path, FileSystem fileSystem) { + this.path = path; + this.fileSystem = fileSystem; + } + } + + private class CleanupRunnable implements Runnable { + @Override + public void run() { + while (isRunning.get() || deleteActions.size() > 0) { + try { + AsyncDeleteAction deleteAction = deleteActions.poll(1, TimeUnit.MINUTES); + if (deleteAction != null) { + Path path = null; + try { + FileSystem fs = deleteAction.fileSystem; + path = deleteAction.path; + fs.delete(path, true); + fs.cancelDeleteOnExit(path); + LOG.info("Deleted {}", path); + } catch (IOException e) { + LOG.warn("Error removing path {}: {}", path, e); + } + } + } catch (InterruptedException e) { + LOG.debug("PathCleaner was interrupted"); + } + } + LOG.info("Cleanup thread shutdown shutdown"); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cleanup/SyncCleanupService.java b/ql/src/java/org/apache/hadoop/hive/ql/cleanup/SyncCleanupService.java new file mode 100644 index 0000000..597f6a2 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/cleanup/SyncCleanupService.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.cleanup; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Dummy cleanup service that just synchronously deletes files. + * This is created for use in background threads such as compactor + * or scheduled query runners. + */ +public class SyncCleanupService implements CleanupService { + + private static final Logger LOG = LoggerFactory.getLogger(SyncCleanupService.class.getName()); + + public static SyncCleanupService INSTANCE = new SyncCleanupService(); + + private SyncCleanupService() { + //no-op + } + + @Override + public void start() { + //no-op + } + + @Override + public boolean deleteRecursive(Path path, FileSystem fileSystem) throws IOException { + fileSystem.cancelDeleteOnExit(path); + if (fileSystem.delete(path, true)) { + LOG.info("Deleted directory: {} on fs with scheme {}", path, fileSystem.getScheme()); + return true; + } else { + return false; + } + } + + @Override + public void shutdown() { + //no-op + } + + @Override + public void shutdownNow() { + //no-op + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 1c3537f..658843a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -74,7 +74,9 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.cache.CachedStore; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.ql.cleanup.CleanupService; import org.apache.hadoop.hive.ql.MapRedStats; +import org.apache.hadoop.hive.ql.cleanup.SyncCleanupService; import org.apache.hadoop.hive.ql.exec.AddToClassPathAction; import org.apache.hadoop.hive.ql.exec.FunctionInfo; import org.apache.hadoop.hive.ql.exec.Registry; @@ -320,6 +322,8 @@ public class SessionState implements ISessionAuthState{ private final Registry registry; + private final CleanupService cleanupService; + /** * Used to cache functions in use for a query, during query planning * and is later used for function usage authorization. @@ -432,6 +436,10 @@ public class SessionState implements ISessionAuthState{ } public SessionState(HiveConf conf, String userName) { + this(conf, userName, SyncCleanupService.INSTANCE); + } + + public SessionState(HiveConf conf, String userName, CleanupService cleanupService) { this.sessionConf = conf; this.userName = userName; this.registry = new Registry(false); @@ -458,6 +466,7 @@ public class SessionState implements ISessionAuthState{ resourceDownloader = new ResourceDownloader(conf, HiveConf.getVar(conf, ConfVars.DOWNLOADED_RESOURCES_DIR)); killQuery = new NullKillQuery(); + this.cleanupService = cleanupService; ShimLoader.getHadoopShims().setHadoopSessionContext(getSessionId()); } @@ -909,6 +918,10 @@ public class SessionState implements ISessionAuthState{ } } + public CleanupService getCleanupService() { + return cleanupService; + } + private void dropSessionPaths(Configuration conf) throws IOException { if (hdfsSessionPath != null) { if (hdfsSessionPathLockFile != null) { @@ -935,9 +948,7 @@ public class SessionState implements ISessionAuthState{ } else { fs = path.getFileSystem(conf); } - fs.cancelDeleteOnExit(path); - fs.delete(path, true); - LOG.info("Deleted directory: {} on fs with scheme {}", path, fs.getScheme()); + cleanupService.deleteRecursive(path, fs); } catch (IllegalArgumentException | UnsupportedOperationException | IOException e) { LOG.error("Failed to delete path at {} on fs with scheme {}", path, (fs == null ? "Unknown-null" : fs.getScheme()), e); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/cleanup/TestCleanupService.java b/ql/src/test/org/apache/hadoop/hive/ql/cleanup/TestCleanupService.java new file mode 100644 index 0000000..9538a72 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/cleanup/TestCleanupService.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.cleanup; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.After; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class TestCleanupService { + + private static final String TEMP_DIR = TestCleanupService.class.getName() + "-tempdir"; + private CleanupService cleanupService; + + @After + public void tearDown() throws IOException { + if (cleanupService != null) { + cleanupService.shutdownNow(); + } + Path p = new Path(TEMP_DIR); + FileSystem fs = p.getFileSystem(new Configuration()); + fs.delete(p, true); + } + + @Test(expected = IllegalArgumentException.class) + public void testEventualCleanupService_throwsWhenMisconfigured() { + cleanupService = new EventualCleanupService(10, 4); + } + + @Test + public void testEventualCleanupService_deletesManyFiles() throws IOException, InterruptedException { + testDeleteManyFiles(new EventualCleanupService(4, 1000), 1000); + } + + @Test + public void testEventualCleanupService_deletesManyFilesWithQueueSize4() throws IOException, InterruptedException { + testDeleteManyFiles(new EventualCleanupService(4, 4), 100); + } + + @Test + public void testSyncCleanupService_deletesManyFiles() throws IOException, InterruptedException { + testDeleteManyFiles(SyncCleanupService.INSTANCE, 10); + } + + @Test + public void testEventualCleanupService_finishesCleanupBeforeExit() throws IOException, InterruptedException { + EventualCleanupService cleanupService = new EventualCleanupService(4, 1000); + testDeleteManyFiles(cleanupService, 1000, true); + assertTrue(cleanupService.await(1, TimeUnit.SECONDS)); + } + + private void testDeleteManyFiles(CleanupService cleanupService, int n) throws IOException, InterruptedException { + testDeleteManyFiles(cleanupService, n, false); + } + + private void testDeleteManyFiles(CleanupService cleanupService, int n, + boolean shutdownAfterQueueing) throws IOException, InterruptedException { + this.cleanupService = cleanupService; + Configuration conf = new Configuration(); + cleanupService.start(); + + Collection<Path> files = createManyFiles(n); + + for (Path p: files) { + cleanupService.deleteRecursive(p, p.getFileSystem(conf)); + } + + if (shutdownAfterQueueing) { + cleanupService.shutdown(); + } + + assertTrueEventually(() -> { + try { + for (Path p : files) { + FileSystem fs = p.getFileSystem(conf); + assertNotNull(fs); + assertFalse(p + " should not exist", fs.exists(p)); + } + } catch (Exception e) { + throw new AssertionError(e); + } + }); + } + + private Path createFile(String name) throws IOException { + Path p = new Path(TEMP_DIR, name); + FileSystem fs = p.getFileSystem(new Configuration()); + fs.create(p); + return p; + } + + private Collection<Path> createManyFiles(int n) throws IOException { + Collection<Path> files = new ArrayList<>(); + for (int i = 0; i < n; i++) { + files.add(createFile("many_" + i)); + } + return files; + } + + private void assertTrueEventually(AssertTask assertTask) throws InterruptedException { + assertTrueEventually(assertTask, 100000); + } + + private void assertTrueEventually(AssertTask assertTask, int timeoutMillis) throws InterruptedException { + long endTime = System.currentTimeMillis() + timeoutMillis; + AssertionError assertionError = null; + + while (System.currentTimeMillis() < endTime) { + try { + assertTask.call(); + return; + } catch (AssertionError e) { + assertionError = e; + sleep(50); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + throw assertionError; + } + + private static interface AssertTask { + void call() throws AssertionError; + } +} diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index b415a76..82d039b 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -51,7 +51,6 @@ import org.apache.hadoop.hive.ql.processors.SetProcessor; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.common.util.HiveVersionInfo; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.FetchOrientation; @@ -157,7 +156,11 @@ public class HiveSessionImpl implements HiveSession { * That's why it is important to create SessionState here rather than in the constructor. */ public void open(Map<String, String> sessionConfMap) throws HiveSQLException { - sessionState = new SessionState(sessionConf, username); + if (sessionManager != null) { + sessionState = new SessionState(sessionConf, username, sessionManager.getCleanupService()); + } else { + sessionState = new SessionState(sessionConf, username); + } sessionState.setUserIpAddress(ipAddress); sessionState.setIsHiveServerQuery(true); sessionState.setForwardedAddresses(SessionManager.getForwardedAddresses()); diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java index ae2c17b..11f8efd 100644 --- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -45,7 +45,10 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.common.metrics.common.MetricsVariable; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.cleanup.SyncCleanupService; import org.apache.hadoop.hive.ql.hooks.HookContext; +import org.apache.hadoop.hive.ql.cleanup.CleanupService; +import org.apache.hadoop.hive.ql.cleanup.EventualCleanupService; import org.apache.hadoop.hive.ql.hooks.HookUtils; import org.apache.hive.service.CompositeService; import org.apache.hive.service.cli.HiveSQLException; @@ -101,6 +104,7 @@ public class SessionManager extends CompositeService { private final HiveServer2 hiveServer2; private String sessionImplWithUGIclassName; private String sessionImplclassName; + private CleanupService cleanupService; public SessionManager(HiveServer2 hiveServer2, boolean allowSessions) { super(SessionManager.class.getSimpleName()); @@ -135,6 +139,15 @@ public class SessionManager extends CompositeService { userIpAddressLimit = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS); LOG.info("Connections limit are user: {} ipaddress: {} user-ipaddress: {}", userLimit, ipAddressLimit, userIpAddressLimit); + + int cleanupThreadCount = hiveConf.getIntVar(ConfVars.HIVE_ASYNC_CLEANUP_SERVICE_THREAD_COUNT); + int cleanupQueueSize = hiveConf.getIntVar(ConfVars.HIVE_ASYNC_CLEANUP_SERVICE_QUEUE_SIZE); + if (cleanupThreadCount > 0) { + cleanupService = new EventualCleanupService(cleanupThreadCount, cleanupQueueSize); + } else { + cleanupService = SyncCleanupService.INSTANCE; + } + cleanupService.start(); super.init(hiveConf); } @@ -279,6 +292,10 @@ public class SessionManager extends CompositeService { } } + public CleanupService getCleanupService() { + return cleanupService; + } + private final Object timeoutCheckerLock = new Object(); private void startTimeoutChecker() { @@ -343,6 +360,7 @@ public class SessionManager extends CompositeService { shutdownTimeoutChecker(); if (backgroundOperationPool != null) { backgroundOperationPool.shutdown(); + cleanupService.shutdown(); long timeout = hiveConf.getTimeVar( ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS); try {