Repository: hive Updated Branches: refs/heads/branch-1 9259ff86a -> 387c11401
HIVE-15068: Run ClearDanglingScratchDir periodically inside HS2 (Daniel Dai, reviewed by Thejas Nair) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/387c1140 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/387c1140 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/387c1140 Branch: refs/heads/branch-1 Commit: 387c11401f30ec76f2e176688fc149ccafb4ab7e Parents: 9259ff8 Author: Daniel Dai <da...@hortonworks.com> Authored: Tue Nov 1 17:41:01 2016 -0700 Committer: Daniel Dai <da...@hortonworks.com> Committed: Tue Nov 1 17:41:01 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 5 + .../server/TestHS2ClearDanglingScratchDir.java | 82 +++++++++ .../ql/session/ClearDanglingScratchDir.java | 181 +++++++++++-------- .../apache/hive/service/server/HiveServer2.java | 26 +++ 4 files changed, 219 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/387c1140/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 13335b8..1051369 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1848,6 +1848,11 @@ public class HiveConf extends Configuration { "SSL Versions to disable for all Hive Servers"), // HiveServer2 specific configs + HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR("hive.server2.clear.dangling.scratchdir", false, + "Clear dangling scratch dir periodically in HS2"), + HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR_INTERVAL("hive.server2.clear.dangling.scratchdir.interval", + "1800s", new TimeValidator(TimeUnit.SECONDS), + "Interval to clear dangling scratch dir periodically in HS2"), HIVE_SERVER2_MAX_START_ATTEMPTS("hive.server2.max.start.attempts", 30L, new RangeValidator(0L, null), "Number of times HiveServer2 will attempt to start before exiting, sleeping 60 seconds " + "between retries. \n The default of 30 will keep trying for 30 minutes."), http://git-wip-us.apache.org/repos/asf/hive/blob/387c1140/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestHS2ClearDanglingScratchDir.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestHS2ClearDanglingScratchDir.java b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestHS2ClearDanglingScratchDir.java new file mode 100644 index 0000000..081ac96 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestHS2ClearDanglingScratchDir.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.service.server; + +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.WindowsPathUtil; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.util.Shell; +import org.junit.Assert; +import org.junit.Test; + +public class TestHS2ClearDanglingScratchDir { + @Test + public void testScratchDirCleared() throws Exception { + MiniDFSCluster m_dfs = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1).format(true).build(); + HiveConf conf = new HiveConf(); + conf.addResource(m_dfs.getConfiguration(0)); + if (Shell.WINDOWS) { + WindowsPathUtil.convertPathsFromWindowsToHdfs(conf); + } + conf.set(HiveConf.ConfVars.HIVE_SCRATCH_DIR_LOCK.toString(), "true"); + conf.set(HiveConf.ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR.toString(), "true"); + + Path scratchDir = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR)); + m_dfs.getFileSystem().mkdirs(scratchDir); + m_dfs.getFileSystem().setPermission(scratchDir, new FsPermission("777")); + + // Fake two live session + SessionState.start(conf); + conf.setVar(HiveConf.ConfVars.HIVESESSIONID, UUID.randomUUID().toString()); + SessionState.start(conf); + + // Fake dead session + Path fakeSessionPath = new Path(new Path(scratchDir, Utils.getUGI().getShortUserName()), + UUID.randomUUID().toString()); + m_dfs.getFileSystem().mkdirs(fakeSessionPath); + m_dfs.getFileSystem().create(new Path(fakeSessionPath, "inuse.lck")).close(); + + FileStatus[] scratchDirs = m_dfs.getFileSystem() + .listStatus(new Path(scratchDir, Utils.getUGI().getShortUserName())); + + Assert.assertEquals(scratchDirs.length, 3); + + HiveServer2.scheduleClearDanglingScratchDir(conf, 0); + + // Check dead session get cleared + long start = System.currentTimeMillis(); + long end; + do { + Thread.sleep(200); + end = System.currentTimeMillis(); + if (end - start > 5000) { + Assert.fail("timeout, scratch dir has not been cleared"); + } + scratchDirs = m_dfs.getFileSystem() + .listStatus(new Path(scratchDir, Utils.getUGI().getShortUserName())); + } while (scratchDirs.length != 2); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/387c1140/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java b/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java index 725f954..2fff92e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.session; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -35,6 +36,8 @@ import org.apache.hadoop.hive.common.LogUtils.LogInitializationException; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A tool to remove dangling scratch directory. A scratch directory could be left behind @@ -51,7 +54,13 @@ import org.apache.hadoop.ipc.RemoteException; * again after 10 min. Once it become writable, cleardanglingscratchDir will be able to * remove it */ -public class ClearDanglingScratchDir { +public class ClearDanglingScratchDir implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(ClearDanglingScratchDir.class); + boolean dryRun = false; + boolean verbose = false; + boolean useConsole = false; + String rootHDFSDir; + HiveConf conf; public static void main(String[] args) throws Exception { try { @@ -82,97 +91,119 @@ public class ClearDanglingScratchDir { HiveConf conf = new HiveConf(); - Path rootHDFSDirPath; + String rootHDFSDir; if (cli.hasOption("s")) { - rootHDFSDirPath = new Path(cli.getOptionValue("s")); + rootHDFSDir = cli.getOptionValue("s"); } else { - rootHDFSDirPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR)); + rootHDFSDir = HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR); } + ClearDanglingScratchDir clearDanglingScratchDirMain = new ClearDanglingScratchDir(dryRun, + verbose, true, rootHDFSDir, conf); + clearDanglingScratchDirMain.run(); + } - FileSystem fs = FileSystem.get(rootHDFSDirPath.toUri(), conf); - FileStatus[] userHDFSDirList = fs.listStatus(rootHDFSDirPath); - - List<Path> scratchDirToRemove = new ArrayList<Path>(); - for (FileStatus userHDFSDir : userHDFSDirList) { - FileStatus[] scratchDirList = fs.listStatus(userHDFSDir.getPath()); - for (FileStatus scratchDir : scratchDirList) { - Path lockFilePath = new Path(scratchDir.getPath(), SessionState.LOCK_FILE_NAME); - if (!fs.exists(lockFilePath)) { - String message = "Skipping " + scratchDir.getPath() + " since it does not contain " + - SessionState.LOCK_FILE_NAME; - if (verbose) { - SessionState.getConsole().printInfo(message); - } else { - SessionState.getConsole().logInfo(message); + public ClearDanglingScratchDir(boolean dryRun, boolean verbose, boolean useConsole, + String rootHDFSDir, HiveConf conf) { + this.dryRun = dryRun; + this.verbose = verbose; + this.useConsole = useConsole; + this.rootHDFSDir = rootHDFSDir; + this.conf = conf; + } + + @Override + public void run() { + try { + Path rootHDFSDirPath = new Path(rootHDFSDir); + FileSystem fs = FileSystem.get(rootHDFSDirPath.toUri(), conf); + FileStatus[] userHDFSDirList = fs.listStatus(rootHDFSDirPath); + + List<Path> scratchDirToRemove = new ArrayList<Path>(); + for (FileStatus userHDFSDir : userHDFSDirList) { + FileStatus[] scratchDirList = fs.listStatus(userHDFSDir.getPath()); + for (FileStatus scratchDir : scratchDirList) { + Path lockFilePath = new Path(scratchDir.getPath(), SessionState.LOCK_FILE_NAME); + if (!fs.exists(lockFilePath)) { + String message = "Skipping " + scratchDir.getPath() + " since it does not contain " + + SessionState.LOCK_FILE_NAME; + if (verbose) { + consoleMessage(message); + } + continue; } - continue; - } - boolean removable = false; - boolean inuse = false; - try { - IOUtils.closeStream(fs.append(lockFilePath)); - removable = true; - } catch (RemoteException eAppend) { - // RemoteException with AlreadyBeingCreatedException will be thrown - // if the file is currently held by a writer - if(AlreadyBeingCreatedException.class.getName().equals(eAppend.getClassName())){ - inuse = true; - } else if (UnsupportedOperationException.class.getName().equals(eAppend.getClassName())) { - // Append is not supported in the cluster, try to use create - try { - IOUtils.closeStream(fs.create(lockFilePath, false)); - } catch (RemoteException eCreate) { - if (AlreadyBeingCreatedException.class.getName().equals(eCreate.getClassName())){ - // If the file is held by a writer, will throw AlreadyBeingCreatedException - inuse = true; - } else { - SessionState.getConsole().printInfo("Unexpected error:" + eCreate.getMessage()); + boolean removable = false; + boolean inuse = false; + try { + IOUtils.closeStream(fs.append(lockFilePath)); + removable = true; + } catch (RemoteException eAppend) { + // RemoteException with AlreadyBeingCreatedException will be thrown + // if the file is currently held by a writer + if(AlreadyBeingCreatedException.class.getName().equals(eAppend.getClassName())){ + inuse = true; + } else if (UnsupportedOperationException.class.getName().equals(eAppend.getClassName())) { + // Append is not supported in the cluster, try to use create + try { + IOUtils.closeStream(fs.create(lockFilePath, false)); + } catch (RemoteException eCreate) { + if (AlreadyBeingCreatedException.class.getName().equals(eCreate.getClassName())){ + // If the file is held by a writer, will throw AlreadyBeingCreatedException + inuse = true; + } else { + consoleMessage("Unexpected error:" + eCreate.getMessage()); + } + } catch (FileAlreadyExistsException eCreateNormal) { + // Otherwise, throw FileAlreadyExistsException, which means the file owner is + // dead + removable = true; } - } catch (FileAlreadyExistsException eCreateNormal) { - // Otherwise, throw FileAlreadyExistsException, which means the file owner is - // dead - removable = true; + } else { + consoleMessage("Unexpected error:" + eAppend.getMessage()); } - } else { - SessionState.getConsole().printInfo("Unexpected error:" + eAppend.getMessage()); } - } - if (inuse) { - // Cannot open the lock file for writing, must be held by a live process - String message = scratchDir.getPath() + " is being used by live process"; - if (verbose) { - SessionState.getConsole().printInfo(message); - } else { - SessionState.getConsole().logInfo(message); + if (inuse) { + // Cannot open the lock file for writing, must be held by a live process + String message = scratchDir.getPath() + " is being used by live process"; + if (verbose) { + consoleMessage(message); + } + } + if (removable) { + scratchDirToRemove.add(scratchDir.getPath()); } - } - if (removable) { - scratchDirToRemove.add(scratchDir.getPath()); } } - } - if (scratchDirToRemove.size()==0) { - SessionState.getConsole().printInfo("Cannot find any scratch directory to clear"); - return; - } - SessionState.getConsole().printInfo("Removing " + scratchDirToRemove.size() + " scratch directories"); - for (Path scratchDir : scratchDirToRemove) { - if (dryRun) { - System.out.println(scratchDir); - } else { - boolean succ = fs.delete(scratchDir, true); - if (!succ) { - SessionState.getConsole().printInfo("Cannot remove " + scratchDir); + if (scratchDirToRemove.size()==0) { + consoleMessage("Cannot find any scratch directory to clear"); + return; + } + consoleMessage("Removing " + scratchDirToRemove.size() + " scratch directories"); + for (Path scratchDir : scratchDirToRemove) { + if (dryRun) { + System.out.println(scratchDir); } else { - String message = scratchDir + " removed"; - if (verbose) { - SessionState.getConsole().printInfo(message); + boolean succ = fs.delete(scratchDir, true); + if (!succ) { + consoleMessage("Cannot remove " + scratchDir); } else { - SessionState.getConsole().logInfo(message); + String message = scratchDir + " removed"; + if (verbose) { + consoleMessage(message); + } } } } + } catch (IOException e) { + consoleMessage("Unexpected exception " + e.getMessage()); + } + } + + private void consoleMessage(String message) { + if (useConsole) { + SessionState.getConsole().printInfo(message); + } else { + LOG.info(message); } } http://git-wip-us.apache.org/repos/asf/hive/blob/387c1140/service/src/java/org/apache/hive/service/server/HiveServer2.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 2ca7ff7..c939932 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -25,7 +25,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Random; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.commons.cli.GnuParser; @@ -34,6 +37,7 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.curator.framework.CuratorFramework; @@ -53,6 +57,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; +import org.apache.hadoop.hive.ql.session.ClearDanglingScratchDir; import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; @@ -71,6 +76,7 @@ import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooDefs.Perms; import org.apache.zookeeper.data.ACL; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; /** @@ -410,6 +416,21 @@ public class HiveServer2 extends CompositeService { } } + @VisibleForTesting + public static void scheduleClearDanglingScratchDir(HiveConf hiveConf, int initialWaitInSec) { + if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR)) { + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new BasicThreadFactory.Builder() + .namingPattern("cleardanglingscratchdir-%d") + .daemon(true) + .build()); + executor.scheduleAtFixedRate(new ClearDanglingScratchDir(false, false, false, + HiveConf.getVar(hiveConf, HiveConf.ConfVars.SCRATCHDIR), hiveConf), initialWaitInSec, + HiveConf.getTimeVar(hiveConf, ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR_INTERVAL, + TimeUnit.SECONDS), TimeUnit.SECONDS); + } + } + private static void startHiveServer2() throws Throwable { long attempts = 0, maxAttempts = 1; while (true) { @@ -420,6 +441,11 @@ public class HiveServer2 extends CompositeService { try { // Cleanup the scratch dir before starting ServerUtils.cleanUpScratchDir(hiveConf); + // Schedule task to cleanup dangling scratch dir periodically, + // initial wait for a random time between 0-10 min to + // avoid intial spike when using multiple HS2 + scheduleClearDanglingScratchDir(hiveConf, new Random().nextInt(600)); + server = new HiveServer2(); server.init(hiveConf); server.start();