Repository: hive
Updated Branches:
  refs/heads/branch-1 9259ff86a -> 387c11401


HIVE-15068: Run ClearDanglingScratchDir periodically inside HS2 (Daniel Dai, 
reviewed by Thejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/387c1140
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/387c1140
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/387c1140

Branch: refs/heads/branch-1
Commit: 387c11401f30ec76f2e176688fc149ccafb4ab7e
Parents: 9259ff8
Author: Daniel Dai <da...@hortonworks.com>
Authored: Tue Nov 1 17:41:01 2016 -0700
Committer: Daniel Dai <da...@hortonworks.com>
Committed: Tue Nov 1 17:41:01 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   5 +
 .../server/TestHS2ClearDanglingScratchDir.java  |  82 +++++++++
 .../ql/session/ClearDanglingScratchDir.java     | 181 +++++++++++--------
 .../apache/hive/service/server/HiveServer2.java |  26 +++
 4 files changed, 219 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/387c1140/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 13335b8..1051369 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1848,6 +1848,11 @@ public class HiveConf extends Configuration {
         "SSL Versions to disable for all Hive Servers"),
 
      // HiveServer2 specific configs
+    
HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR("hive.server2.clear.dangling.scratchdir",
 false,
+        "Clear dangling scratch dir periodically in HS2"),
+    
HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR_INTERVAL("hive.server2.clear.dangling.scratchdir.interval",
+        "1800s", new TimeValidator(TimeUnit.SECONDS),
+        "Interval to clear dangling scratch dir periodically in HS2"),
     HIVE_SERVER2_MAX_START_ATTEMPTS("hive.server2.max.start.attempts", 30L, 
new RangeValidator(0L, null),
         "Number of times HiveServer2 will attempt to start before exiting, 
sleeping 60 seconds " +
         "between retries. \n The default of 30 will keep trying for 30 
minutes."),

http://git-wip-us.apache.org/repos/asf/hive/blob/387c1140/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestHS2ClearDanglingScratchDir.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestHS2ClearDanglingScratchDir.java
 
b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestHS2ClearDanglingScratchDir.java
new file mode 100644
index 0000000..081ac96
--- /dev/null
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestHS2ClearDanglingScratchDir.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.server;
+
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.WindowsPathUtil;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.util.Shell;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestHS2ClearDanglingScratchDir {
+  @Test
+  public void testScratchDirCleared() throws Exception {
+    MiniDFSCluster m_dfs = new MiniDFSCluster.Builder(new 
Configuration()).numDataNodes(1).format(true).build();
+    HiveConf conf = new HiveConf();
+    conf.addResource(m_dfs.getConfiguration(0));
+    if (Shell.WINDOWS) {
+      WindowsPathUtil.convertPathsFromWindowsToHdfs(conf);
+    }
+    conf.set(HiveConf.ConfVars.HIVE_SCRATCH_DIR_LOCK.toString(), "true");
+    
conf.set(HiveConf.ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR.toString(), 
"true");
+
+    Path scratchDir = new Path(HiveConf.getVar(conf, 
HiveConf.ConfVars.SCRATCHDIR));
+    m_dfs.getFileSystem().mkdirs(scratchDir);
+    m_dfs.getFileSystem().setPermission(scratchDir, new FsPermission("777"));
+
+    // Fake two live session
+    SessionState.start(conf);
+    conf.setVar(HiveConf.ConfVars.HIVESESSIONID, UUID.randomUUID().toString());
+    SessionState.start(conf);
+
+    // Fake dead session
+    Path fakeSessionPath = new Path(new Path(scratchDir, 
Utils.getUGI().getShortUserName()),
+        UUID.randomUUID().toString());
+    m_dfs.getFileSystem().mkdirs(fakeSessionPath);
+    m_dfs.getFileSystem().create(new Path(fakeSessionPath, 
"inuse.lck")).close();
+
+    FileStatus[] scratchDirs = m_dfs.getFileSystem()
+        .listStatus(new Path(scratchDir, Utils.getUGI().getShortUserName()));
+
+    Assert.assertEquals(scratchDirs.length, 3);
+
+    HiveServer2.scheduleClearDanglingScratchDir(conf, 0);
+
+    // Check dead session get cleared
+    long start = System.currentTimeMillis();
+    long end;
+    do {
+      Thread.sleep(200);
+      end = System.currentTimeMillis();
+      if (end - start > 5000) {
+        Assert.fail("timeout, scratch dir has not been cleared");
+      }
+      scratchDirs = m_dfs.getFileSystem()
+          .listStatus(new Path(scratchDir, Utils.getUGI().getShortUserName()));
+    } while (scratchDirs.length != 2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/387c1140/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java 
b/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java
index 725f954..2fff92e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.session;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -35,6 +36,8 @@ import 
org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A tool to remove dangling scratch directory. A scratch directory could be 
left behind
@@ -51,7 +54,13 @@ import org.apache.hadoop.ipc.RemoteException;
  *    again after 10 min. Once it become writable, cleardanglingscratchDir 
will be able to
  *    remove it
  */
-public class ClearDanglingScratchDir {
+public class ClearDanglingScratchDir implements Runnable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ClearDanglingScratchDir.class);
+  boolean dryRun = false;
+  boolean verbose = false;
+  boolean useConsole = false;
+  String rootHDFSDir;
+  HiveConf conf;
 
   public static void main(String[] args) throws Exception {
     try {
@@ -82,97 +91,119 @@ public class ClearDanglingScratchDir {
 
     HiveConf conf = new HiveConf();
 
-    Path rootHDFSDirPath;
+    String rootHDFSDir;
     if (cli.hasOption("s")) {
-      rootHDFSDirPath = new Path(cli.getOptionValue("s"));
+      rootHDFSDir = cli.getOptionValue("s");
     } else {
-      rootHDFSDirPath = new Path(HiveConf.getVar(conf, 
HiveConf.ConfVars.SCRATCHDIR));
+      rootHDFSDir = HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR);
     }
+    ClearDanglingScratchDir clearDanglingScratchDirMain = new 
ClearDanglingScratchDir(dryRun,
+        verbose, true, rootHDFSDir, conf);
+    clearDanglingScratchDirMain.run();
+  }
 
-    FileSystem fs = FileSystem.get(rootHDFSDirPath.toUri(), conf);
-    FileStatus[] userHDFSDirList = fs.listStatus(rootHDFSDirPath);
-
-    List<Path> scratchDirToRemove = new ArrayList<Path>();
-    for (FileStatus userHDFSDir : userHDFSDirList) {
-      FileStatus[] scratchDirList = fs.listStatus(userHDFSDir.getPath());
-      for (FileStatus scratchDir : scratchDirList) {
-        Path lockFilePath = new Path(scratchDir.getPath(), 
SessionState.LOCK_FILE_NAME);
-        if (!fs.exists(lockFilePath)) {
-          String message = "Skipping " + scratchDir.getPath() + " since it 
does not contain " +
-              SessionState.LOCK_FILE_NAME;
-          if (verbose) {
-            SessionState.getConsole().printInfo(message);
-          } else {
-            SessionState.getConsole().logInfo(message);
+  public ClearDanglingScratchDir(boolean dryRun, boolean verbose, boolean 
useConsole,
+      String rootHDFSDir, HiveConf conf) {
+    this.dryRun = dryRun;
+    this.verbose = verbose;
+    this.useConsole = useConsole;
+    this.rootHDFSDir = rootHDFSDir;
+    this.conf = conf;
+  }
+
+  @Override
+  public void run() {
+    try {
+      Path rootHDFSDirPath = new Path(rootHDFSDir);
+      FileSystem fs = FileSystem.get(rootHDFSDirPath.toUri(), conf);
+      FileStatus[] userHDFSDirList = fs.listStatus(rootHDFSDirPath);
+
+      List<Path> scratchDirToRemove = new ArrayList<Path>();
+      for (FileStatus userHDFSDir : userHDFSDirList) {
+        FileStatus[] scratchDirList = fs.listStatus(userHDFSDir.getPath());
+        for (FileStatus scratchDir : scratchDirList) {
+          Path lockFilePath = new Path(scratchDir.getPath(), 
SessionState.LOCK_FILE_NAME);
+          if (!fs.exists(lockFilePath)) {
+            String message = "Skipping " + scratchDir.getPath() + " since it 
does not contain " +
+                SessionState.LOCK_FILE_NAME;
+            if (verbose) {
+              consoleMessage(message);
+            }
+            continue;
           }
-          continue;
-        }
-        boolean removable = false;
-        boolean inuse = false;
-        try {
-          IOUtils.closeStream(fs.append(lockFilePath));
-          removable = true;
-        } catch (RemoteException eAppend) {
-          // RemoteException with AlreadyBeingCreatedException will be thrown
-          // if the file is currently held by a writer
-          
if(AlreadyBeingCreatedException.class.getName().equals(eAppend.getClassName())){
-            inuse = true;
-          } else if 
(UnsupportedOperationException.class.getName().equals(eAppend.getClassName())) {
-            // Append is not supported in the cluster, try to use create
-            try {
-              IOUtils.closeStream(fs.create(lockFilePath, false));
-            } catch (RemoteException eCreate) {
-              if 
(AlreadyBeingCreatedException.class.getName().equals(eCreate.getClassName())){
-                // If the file is held by a writer, will throw 
AlreadyBeingCreatedException
-                inuse = true;
-              }  else {
-                SessionState.getConsole().printInfo("Unexpected error:" + 
eCreate.getMessage());
+          boolean removable = false;
+          boolean inuse = false;
+          try {
+            IOUtils.closeStream(fs.append(lockFilePath));
+            removable = true;
+          } catch (RemoteException eAppend) {
+            // RemoteException with AlreadyBeingCreatedException will be thrown
+            // if the file is currently held by a writer
+            
if(AlreadyBeingCreatedException.class.getName().equals(eAppend.getClassName())){
+              inuse = true;
+            } else if 
(UnsupportedOperationException.class.getName().equals(eAppend.getClassName())) {
+              // Append is not supported in the cluster, try to use create
+              try {
+                IOUtils.closeStream(fs.create(lockFilePath, false));
+              } catch (RemoteException eCreate) {
+                if 
(AlreadyBeingCreatedException.class.getName().equals(eCreate.getClassName())){
+                  // If the file is held by a writer, will throw 
AlreadyBeingCreatedException
+                  inuse = true;
+                }  else {
+                  consoleMessage("Unexpected error:" + eCreate.getMessage());
+                }
+              } catch (FileAlreadyExistsException eCreateNormal) {
+                  // Otherwise, throw FileAlreadyExistsException, which means 
the file owner is
+                  // dead
+                  removable = true;
               }
-            } catch (FileAlreadyExistsException eCreateNormal) {
-                // Otherwise, throw FileAlreadyExistsException, which means 
the file owner is
-                // dead
-                removable = true;
+            } else {
+              consoleMessage("Unexpected error:" + eAppend.getMessage());
             }
-          } else {
-            SessionState.getConsole().printInfo("Unexpected error:" + 
eAppend.getMessage());
           }
-        }
-        if (inuse) {
-          // Cannot open the lock file for writing, must be held by a live 
process
-          String message = scratchDir.getPath() + " is being used by live 
process";
-          if (verbose) {
-            SessionState.getConsole().printInfo(message);
-          } else {
-            SessionState.getConsole().logInfo(message);
+          if (inuse) {
+            // Cannot open the lock file for writing, must be held by a live 
process
+            String message = scratchDir.getPath() + " is being used by live 
process";
+            if (verbose) {
+              consoleMessage(message);
+            }
+          }
+          if (removable) {
+            scratchDirToRemove.add(scratchDir.getPath());
           }
-        }
-        if (removable) {
-          scratchDirToRemove.add(scratchDir.getPath());
         }
       }
-    }
 
-    if (scratchDirToRemove.size()==0) {
-      SessionState.getConsole().printInfo("Cannot find any scratch directory 
to clear");
-      return;
-    }
-    SessionState.getConsole().printInfo("Removing " + 
scratchDirToRemove.size() + " scratch directories");
-    for (Path scratchDir : scratchDirToRemove) {
-      if (dryRun) {
-        System.out.println(scratchDir);
-      } else {
-        boolean succ = fs.delete(scratchDir, true);
-        if (!succ) {
-          SessionState.getConsole().printInfo("Cannot remove " + scratchDir);
+      if (scratchDirToRemove.size()==0) {
+        consoleMessage("Cannot find any scratch directory to clear");
+        return;
+      }
+      consoleMessage("Removing " + scratchDirToRemove.size() + " scratch 
directories");
+      for (Path scratchDir : scratchDirToRemove) {
+        if (dryRun) {
+          System.out.println(scratchDir);
         } else {
-          String message = scratchDir + " removed";
-          if (verbose) {
-            SessionState.getConsole().printInfo(message);
+          boolean succ = fs.delete(scratchDir, true);
+          if (!succ) {
+            consoleMessage("Cannot remove " + scratchDir);
           } else {
-            SessionState.getConsole().logInfo(message);
+            String message = scratchDir + " removed";
+            if (verbose) {
+              consoleMessage(message);
+            }
           }
         }
       }
+    } catch (IOException e) {
+      consoleMessage("Unexpected exception " + e.getMessage());
+    }
+  }
+
+  private void consoleMessage(String message) {
+    if (useConsole) {
+      SessionState.getConsole().printInfo(message);
+    } else {
+      LOG.info(message);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/387c1140/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java 
b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 2ca7ff7..c939932 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -25,7 +25,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Random;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.cli.GnuParser;
@@ -34,6 +37,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.curator.framework.CuratorFramework;
@@ -53,6 +57,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
 import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
+import org.apache.hadoop.hive.ql.session.ClearDanglingScratchDir;
 import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -71,6 +76,7 @@ import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooDefs.Perms;
 import org.apache.zookeeper.data.ACL;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 
 /**
@@ -410,6 +416,21 @@ public class HiveServer2 extends CompositeService {
     }
   }
 
+  @VisibleForTesting
+  public static void scheduleClearDanglingScratchDir(HiveConf hiveConf, int 
initialWaitInSec) {
+    if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR)) 
{
+      ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(
+          new BasicThreadFactory.Builder()
+          .namingPattern("cleardanglingscratchdir-%d")
+          .daemon(true)
+          .build());
+      executor.scheduleAtFixedRate(new ClearDanglingScratchDir(false, false, 
false,
+          HiveConf.getVar(hiveConf, HiveConf.ConfVars.SCRATCHDIR), hiveConf), 
initialWaitInSec,
+          HiveConf.getTimeVar(hiveConf, 
ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR_INTERVAL,
+          TimeUnit.SECONDS), TimeUnit.SECONDS);
+    }
+  }
+
   private static void startHiveServer2() throws Throwable {
     long attempts = 0, maxAttempts = 1;
     while (true) {
@@ -420,6 +441,11 @@ public class HiveServer2 extends CompositeService {
       try {
         // Cleanup the scratch dir before starting
         ServerUtils.cleanUpScratchDir(hiveConf);
+        // Schedule task to cleanup dangling scratch dir periodically,
+        // initial wait for a random time between 0-10 min to
+        // avoid intial spike when using multiple HS2
+        scheduleClearDanglingScratchDir(hiveConf, new Random().nextInt(600));
+
         server = new HiveServer2();
         server.init(hiveConf);
         server.start();

Reply via email to