Repository: hadoop
Updated Branches:
  refs/heads/branch-2 9bd149978 -> acc6f622e


YARN-2183. [YARN-1492] Cleaner service for cache manager. (Chris Trezzo and 
Sangjin Lee via kasha)
(cherry picked from commit c51e53d7aad46059f52d4046a5fedfdfd3c37955)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/acc6f622
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/acc6f622
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/acc6f622

Branch: refs/heads/branch-2
Commit: acc6f622e0937f92ce8c3bec04b92247787e5904
Parents: 9bd1499
Author: Karthik Kambatla <ka...@apache.org>
Authored: Sat Oct 25 10:31:06 2014 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Sat Oct 25 11:50:06 2014 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  47 ++-
 .../src/main/resources/yarn-default.xml         |  27 +-
 .../server/sharedcache/SharedCacheUtil.java     |  10 +
 .../sharedcachemanager/CleanerService.java      | 218 +++++++++++++
 .../server/sharedcachemanager/CleanerTask.java  | 308 +++++++++++++++++++
 .../sharedcachemanager/SharedCacheManager.java  |   7 +
 .../metrics/CleanerMetrics.java                 | 172 +++++++++++
 .../store/InMemorySCMStore.java                 |  45 ++-
 .../sharedcachemanager/store/SCMStore.java      |  36 ++-
 .../sharedcachemanager/TestCleanerTask.java     | 152 +++++++++
 .../metrics/TestCleanerMetrics.java             |  65 ++++
 .../store/TestInMemorySCMStore.java             |   4 +-
 13 files changed, 1054 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 89b2750..53f8e44 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -12,6 +12,9 @@ Release 2.7.0 - UNRELEASED
     YARN-2180. [YARN-1492] In-memory backing store for cache manager. 
     (Chris Trezzo via kasha)
 
+    YARN-2183. [YARN-1492] Cleaner service for cache manager.
+    (Chris Trezzo and Sangjin Lee via kasha)
+
   IMPROVEMENTS
 
     YARN-1979. TestDirectoryCollection fails when the umask is unusual.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 76a3818..c545e44 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1393,25 +1393,54 @@ public class YarnConfiguration extends Configuration {
    * the last reference exceeds the staleness period. This value is specified 
in
    * minutes.
    */
-  public static final String IN_MEMORY_STALENESS_PERIOD =
-      IN_MEMORY_STORE_PREFIX + "staleness-period";
-  public static final int DEFAULT_IN_MEMORY_STALENESS_PERIOD = 7 * 24 * 60;
+  public static final String IN_MEMORY_STALENESS_PERIOD_MINS =
+      IN_MEMORY_STORE_PREFIX + "staleness-period-mins";
+  public static final int DEFAULT_IN_MEMORY_STALENESS_PERIOD_MINS =
+      7 * 24 * 60;
 
   /**
    * Initial delay before the in-memory store runs its first check to remove
    * dead initial applications. Specified in minutes.
    */
-  public static final String IN_MEMORY_INITIAL_DELAY =
-      IN_MEMORY_STORE_PREFIX + "initial-delay";
-  public static final int DEFAULT_IN_MEMORY_INITIAL_DELAY = 10;
+  public static final String IN_MEMORY_INITIAL_DELAY_MINS =
+      IN_MEMORY_STORE_PREFIX + "initial-delay-mins";
+  public static final int DEFAULT_IN_MEMORY_INITIAL_DELAY_MINS = 10;
   
   /**
    * The frequency at which the in-memory store checks to remove dead initial
    * applications. Specified in minutes.
    */
-  public static final String IN_MEMORY_CHECK_PERIOD =
-      IN_MEMORY_STORE_PREFIX + "check-period";
-  public static final int DEFAULT_IN_MEMORY_CHECK_PERIOD = 12 * 60;
+  public static final String IN_MEMORY_CHECK_PERIOD_MINS =
+      IN_MEMORY_STORE_PREFIX + "check-period-mins";
+  public static final int DEFAULT_IN_MEMORY_CHECK_PERIOD_MINS = 12 * 60;
+
+  // SCM Cleaner service configuration
+
+  private static final String SCM_CLEANER_PREFIX = SHARED_CACHE_PREFIX
+      + "cleaner.";
+
+  /**
+   * The frequency at which a cleaner task runs. Specified in minutes.
+   */
+  public static final String SCM_CLEANER_PERIOD_MINS =
+      SCM_CLEANER_PREFIX + "period-mins";
+  public static final int DEFAULT_SCM_CLEANER_PERIOD_MINS = 24 * 60;
+
+  /**
+   * Initial delay before the first cleaner task is scheduled. Specified in
+   * minutes.
+   */
+  public static final String SCM_CLEANER_INITIAL_DELAY_MINS =
+      SCM_CLEANER_PREFIX + "initial-delay-mins";
+  public static final int DEFAULT_SCM_CLEANER_INITIAL_DELAY_MINS = 10;
+
+  /**
+   * The time to sleep between processing each shared cache resource. Specified
+   * in milliseconds.
+   */
+  public static final String SCM_CLEANER_RESOURCE_SLEEP_MS =
+      SCM_CLEANER_PREFIX + "resource-sleep-ms";
+  public static final long DEFAULT_SCM_CLEANER_RESOURCE_SLEEP_MS = 0L;
 
   ////////////////////////////////
   // Other Configs

http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 922d068..ee7e232 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1379,24 +1379,45 @@
     <description>A resource in the in-memory store is considered stale
     if the time since the last reference exceeds the staleness period.
     This value is specified in minutes.</description>
-    <name>yarn.sharedcache.store.in-memory.staleness-period</name>
+    <name>yarn.sharedcache.store.in-memory.staleness-period-mins</name>
     <value>10080</value>
   </property>
   
   <property>
     <description>Initial delay before the in-memory store runs its first check
     to remove dead initial applications. Specified in minutes.</description>
-    <name>yarn.sharedcache.store.in-memory.initial-delay</name>
+    <name>yarn.sharedcache.store.in-memory.initial-delay-mins</name>
     <value>10</value>
   </property>
   
   <property>
     <description>The frequency at which the in-memory store checks to remove
     dead initial applications. Specified in minutes.</description>
-    <name>yarn.sharedcache.store.in-memory.check-period</name>
+    <name>yarn.sharedcache.store.in-memory.check-period-mins</name>
     <value>720</value>
   </property>
 
+  <property>
+    <description>The frequency at which a cleaner task runs.
+    Specified in minutes.</description>
+    <name>yarn.sharedcache.cleaner.period-mins</name>
+    <value>1440</value>
+  </property>
+
+  <property>
+    <description>Initial delay before the first cleaner task is scheduled.
+    Specified in minutes.</description>
+    <name>yarn.sharedcache.cleaner.initial-delay-mins</name>
+    <value>10</value>
+  </property>
+
+  <property>
+    <description>The time to sleep between processing each shared cache
+    resource. Specified in milliseconds.</description>
+    <name>yarn.sharedcache.cleaner.resource-sleep-ms</name>
+    <value>0</value>
+  </property>
+
   <!-- Other configuration -->
   <property>
     <description>The interval that the yarn client library uses to poll the

http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java
index 4b933ac..d3cf379 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java
@@ -78,4 +78,14 @@ public class SharedCacheUtil {
 
     return sb.toString();
   }
+
+  @Private
+  public static String getCacheEntryGlobPattern(int depth) {
+    StringBuilder pattern = new StringBuilder();
+    for (int i = 0; i < depth; i++) {
+      pattern.append("*/");
+    }
+    pattern.append("*");
+    return pattern.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java
new file mode 100644
index 0000000..593be4a
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * The cleaner service that maintains the shared cache area, and cleans up 
stale
+ * entries on a regular basis.
+ */
+@Private
+@Evolving
+public class CleanerService extends CompositeService {
+  /**
+   * The name of the global cleaner lock that the cleaner creates to indicate
+   * that a cleaning process is in progress.
+   */
+  public static final String GLOBAL_CLEANER_PID = ".cleaner_pid";
+
+  private static final Log LOG = LogFactory.getLog(CleanerService.class);
+
+  private Configuration conf;
+  private CleanerMetrics metrics;
+  private ScheduledExecutorService scheduledExecutor;
+  private final SCMStore store;
+  private final Lock cleanerTaskLock;
+
+  public CleanerService(SCMStore store) {
+    super("CleanerService");
+    this.store = store;
+    this.cleanerTaskLock = new ReentrantLock();
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    this.conf = conf;
+
+    // create scheduler executor service that services the cleaner tasks
+    // use 2 threads to accommodate the on-demand tasks and reduce the chance 
of
+    // back-to-back runs
+    ThreadFactory tf =
+        new ThreadFactoryBuilder().setNameFormat("Shared cache 
cleaner").build();
+    scheduledExecutor = Executors.newScheduledThreadPool(2, tf);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    if (!writeGlobalCleanerPidFile()) {
+      throw new YarnException("The global cleaner pid file already exists! " +
+          "It appears there is another CleanerService running in the cluster");
+    }
+
+    this.metrics = CleanerMetrics.initSingleton(conf);
+
+    // Start dependent services (i.e. AppChecker)
+    super.serviceStart();
+
+    Runnable task =
+        CleanerTask.create(conf, store, metrics, cleanerTaskLock);
+    long periodInMinutes = getPeriod(conf);
+    scheduledExecutor.scheduleAtFixedRate(task, getInitialDelay(conf),
+        periodInMinutes, TimeUnit.MINUTES);
+    LOG.info("Scheduled the shared cache cleaner task to run every "
+        + periodInMinutes + " minutes.");
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    LOG.info("Shutting down the background thread.");
+    scheduledExecutor.shutdownNow();
+    try {
+      if (scheduledExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
+        LOG.info("The background thread stopped.");
+      } else {
+        LOG.warn("Gave up waiting for the cleaner task to shutdown.");
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("The cleaner service was interrupted while shutting down the 
task.",
+          e);
+    }
+
+    removeGlobalCleanerPidFile();
+
+    super.serviceStop();
+  }
+
+  /**
+   * Execute an on-demand cleaner task.
+   */
+  protected void runCleanerTask() {
+    Runnable task =
+        CleanerTask.create(conf, store, metrics, cleanerTaskLock);
+    // this is a non-blocking call (it simply submits the task to the executor
+    // queue and returns)
+    this.scheduledExecutor.execute(task);
+  }
+
+  /**
+   * To ensure there are not multiple instances of the SCM running on a given
+   * cluster, a global pid file is used. This file contains the hostname of the
+   * machine that owns the pid file.
+   *
+   * @return true if the pid file was written, false otherwise
+   * @throws YarnException
+   */
+  private boolean writeGlobalCleanerPidFile() throws YarnException {
+    String root =
+        conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
+            YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
+    Path pidPath = new Path(root, GLOBAL_CLEANER_PID);
+    try {
+      FileSystem fs = FileSystem.get(this.conf);
+
+      if (fs.exists(pidPath)) {
+        return false;
+      }
+
+      FSDataOutputStream os = fs.create(pidPath, false);
+      // write the hostname and the process id in the global cleaner pid file
+      final String ID = ManagementFactory.getRuntimeMXBean().getName();
+      os.writeUTF(ID);
+      os.close();
+      // add it to the delete-on-exit to ensure it gets deleted when the JVM
+      // exits
+      fs.deleteOnExit(pidPath);
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+    LOG.info("Created the global cleaner pid file at " + pidPath.toString());
+    return true;
+  }
+
+  private void removeGlobalCleanerPidFile() {
+    try {
+      FileSystem fs = FileSystem.get(this.conf);
+      String root =
+          conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
+              YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
+
+      Path pidPath = new Path(root, GLOBAL_CLEANER_PID);
+
+
+      fs.delete(pidPath, false);
+      LOG.info("Removed the global cleaner pid file at " + pidPath.toString());
+    } catch (IOException e) {
+      LOG.error(
+          "Unable to remove the global cleaner pid file! The file may need "
+              + "to be removed manually.", e);
+    }
+  }
+
+  private static int getInitialDelay(Configuration conf) {
+    int initialDelayInMinutes =
+        conf.getInt(YarnConfiguration.SCM_CLEANER_INITIAL_DELAY_MINS,
+            YarnConfiguration.DEFAULT_SCM_CLEANER_INITIAL_DELAY_MINS);
+    // negative value is invalid; use the default
+    if (initialDelayInMinutes < 0) {
+      throw new HadoopIllegalArgumentException("Negative initial delay value: "
+          + initialDelayInMinutes
+          + ". The initial delay must be greater than zero.");
+    }
+    return initialDelayInMinutes;
+  }
+
+  private static int getPeriod(Configuration conf) {
+    int periodInMinutes =
+        conf.getInt(YarnConfiguration.SCM_CLEANER_PERIOD_MINS,
+            YarnConfiguration.DEFAULT_SCM_CLEANER_PERIOD_MINS);
+    // non-positive value is invalid; use the default
+    if (periodInMinutes <= 0) {
+      throw new HadoopIllegalArgumentException("Non-positive period value: "
+          + periodInMinutes
+          + ". The cleaner period must be greater than or equal to zero.");
+    }
+    return periodInMinutes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java
new file mode 100644
index 0000000..a7fdcbd
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import java.io.IOException;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil;
+import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+
+/**
+ * The task that runs and cleans up the shared cache area for stale entries and
+ * orphaned files. It is expected that only one cleaner task runs at any given
+ * point in time.
+ */
+@Private
+@Evolving
+class CleanerTask implements Runnable {
+  private static final String RENAMED_SUFFIX = "-renamed";
+  private static final Log LOG = LogFactory.getLog(CleanerTask.class);
+
+  private final String location;
+  private final long sleepTime;
+  private final int nestedLevel;
+  private final Path root;
+  private final FileSystem fs;
+  private final SCMStore store;
+  private final CleanerMetrics metrics;
+  private final Lock cleanerTaskLock;
+
+  /**
+   * Creates a cleaner task based on the configuration. This is provided for
+   * convenience.
+   *
+   * @param conf
+   * @param store
+   * @param metrics
+   * @param cleanerTaskLock lock that ensures a serial execution of cleaner
+   *                        task
+   * @return an instance of a CleanerTask
+   */
+  public static CleanerTask create(Configuration conf, SCMStore store,
+      CleanerMetrics metrics, Lock cleanerTaskLock) {
+    try {
+      // get the root directory for the shared cache
+      String location =
+          conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
+              YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
+
+      long sleepTime =
+          conf.getLong(YarnConfiguration.SCM_CLEANER_RESOURCE_SLEEP_MS,
+              YarnConfiguration.DEFAULT_SCM_CLEANER_RESOURCE_SLEEP_MS);
+      int nestedLevel = SharedCacheUtil.getCacheDepth(conf);
+      FileSystem fs = FileSystem.get(conf);
+
+      return new CleanerTask(location, sleepTime, nestedLevel, fs, store,
+          metrics, cleanerTaskLock);
+    } catch (IOException e) {
+      LOG.error("Unable to obtain the filesystem for the cleaner service", e);
+      throw new ExceptionInInitializerError(e);
+    }
+  }
+
+  /**
+   * Creates a cleaner task based on the root directory location and the
+   * filesystem.
+   */
+  CleanerTask(String location, long sleepTime, int nestedLevel, FileSystem fs,
+      SCMStore store, CleanerMetrics metrics, Lock cleanerTaskLock) {
+    this.location = location;
+    this.sleepTime = sleepTime;
+    this.nestedLevel = nestedLevel;
+    this.root = new Path(location);
+    this.fs = fs;
+    this.store = store;
+    this.metrics = metrics;
+    this.cleanerTaskLock = cleanerTaskLock;
+  }
+
+  @Override
+  public void run() {
+    if (!this.cleanerTaskLock.tryLock()) {
+      // there is already another task running
+      LOG.warn("A cleaner task is already running. "
+          + "This scheduled cleaner task will do nothing.");
+      return;
+    }
+
+    try {
+      if (!fs.exists(root)) {
+        LOG.error("The shared cache root " + location + " was not found. "
+            + "The cleaner task will do nothing.");
+        return;
+      }
+
+      // we're now ready to process the shared cache area
+      process();
+    } catch (Throwable e) {
+      LOG.error("Unexpected exception while initializing the cleaner task. "
+          + "This task will do nothing,", e);
+    } finally {
+      // this is set to false regardless of if it is a scheduled or on-demand
+      // task
+      this.cleanerTaskLock.unlock();
+    }
+  }
+
+  /**
+   * Sweeps and processes the shared cache area to clean up stale and orphaned
+   * files.
+   */
+  void process() {
+    // mark the beginning of the run in the metrics
+    metrics.reportCleaningStart();
+    try {
+      // now traverse individual directories and process them
+      // the directory structure is specified by the nested level parameter
+      // (e.g. 9/c/d/<checksum>)
+      String pattern = SharedCacheUtil.getCacheEntryGlobPattern(nestedLevel);
+      FileStatus[] resources =
+          fs.globStatus(new Path(root, pattern));
+      int numResources = resources == null ? 0 : resources.length;
+      LOG.info("Processing " + numResources + " resources in the shared 
cache");
+      long beginMs = System.currentTimeMillis();
+      if (resources != null) {
+        for (FileStatus resource : resources) {
+          // check for interruption so it can abort in a timely manner in case
+          // of shutdown
+          if (Thread.currentThread().isInterrupted()) {
+            LOG.warn("The cleaner task was interrupted. Aborting.");
+            break;
+          }
+
+          if (resource.isDirectory()) {
+            processSingleResource(resource);
+          } else {
+            LOG.warn("Invalid file at path " + resource.getPath().toString()
+                +
+                " when a directory was expected");
+          }
+          // add sleep time between cleaning each directory if it is non-zero
+          if (sleepTime > 0) {
+            Thread.sleep(sleepTime);
+          }
+        }
+      }
+      long endMs = System.currentTimeMillis();
+      long durationMs = endMs - beginMs;
+      LOG.info("Processed " + numResources + " resource(s) in " + durationMs +
+          " ms.");
+    } catch (IOException e1) {
+      LOG.error("Unable to complete the cleaner task", e1);
+    } catch (InterruptedException e2) {
+      Thread.currentThread().interrupt(); // restore the interrupt
+    }
+  }
+
+  /**
+   * Returns a path for the root directory for the shared cache.
+   */
+  Path getRootPath() {
+    return root;
+  }
+
+  /**
+   * Processes a single shared cache resource directory.
+   */
+  void processSingleResource(FileStatus resource) {
+    Path path = resource.getPath();
+    // indicates the processing status of the resource
+    ResourceStatus resourceStatus = ResourceStatus.INIT;
+
+    // first, if the path ends with the renamed suffix, it indicates the
+    // directory was moved (as stale) but somehow not deleted (probably due to
+    // SCM failure); delete the directory
+    if (path.toString().endsWith(RENAMED_SUFFIX)) {
+      LOG.info("Found a renamed directory that was left undeleted at " +
+          path.toString() + ". Deleting.");
+      try {
+        if (fs.delete(path, true)) {
+          resourceStatus = ResourceStatus.DELETED;
+        }
+      } catch (IOException e) {
+        LOG.error("Error while processing a shared cache resource: " + path, 
e);
+      }
+    } else {
+      // this is the path to the cache resource directory
+      // the directory name is the resource key (i.e. a unique identifier)
+      String key = path.getName();
+
+      try {
+        store.cleanResourceReferences(key);
+      } catch (YarnException e) {
+        LOG.error("Exception thrown while removing dead appIds.", e);
+      }
+
+      if (store.isResourceEvictable(key, resource)) {
+        try {
+          /*
+           * TODO See YARN-2663: There is a race condition between
+           * store.removeResource(key) and
+           * removeResourceFromCacheFileSystem(path) operations because they do
+           * not happen atomically and resources can be uploaded with different
+           * file names by the node managers.
+           */
+          // remove the resource from scm (checks for appIds as well)
+          if (store.removeResource(key)) {
+            // remove the resource from the file system
+            boolean deleted = removeResourceFromCacheFileSystem(path);
+            if (deleted) {
+              resourceStatus = ResourceStatus.DELETED;
+            } else {
+              LOG.error("Failed to remove path from the file system."
+                  + " Skipping this resource: " + path);
+              resourceStatus = ResourceStatus.ERROR;
+            }
+          } else {
+            // we did not delete the resource because it contained application
+            // ids
+            resourceStatus = ResourceStatus.PROCESSED;
+          }
+        } catch (IOException e) {
+          LOG.error(
+              "Failed to remove path from the file system. Skipping this 
resource: "
+                  + path, e);
+          resourceStatus = ResourceStatus.ERROR;
+        }
+      } else {
+        resourceStatus = ResourceStatus.PROCESSED;
+      }
+    }
+
+    // record the processing
+    switch (resourceStatus) {
+    case DELETED:
+      metrics.reportAFileDelete();
+      break;
+    case PROCESSED:
+      metrics.reportAFileProcess();
+      break;
+    case ERROR:
+      metrics.reportAFileError();
+      break;
+    default:
+      LOG.error("Cleaner encountered an invalid status (" + resourceStatus
+          + ") while processing resource: " + path.getName());
+    }
+  }
+
+  private boolean removeResourceFromCacheFileSystem(Path path)
+      throws IOException {
+    // rename the directory to make the delete atomic
+    Path renamedPath = new Path(path.toString() + RENAMED_SUFFIX);
+    if (fs.rename(path, renamedPath)) {
+      // the directory can be removed safely now
+      // log the original path
+      LOG.info("Deleting " + path.toString());
+      return fs.delete(renamedPath, true);
+    } else {
+      // we were unable to remove it for some reason: it's best to leave
+      // it at that
+      LOG.error("We were not able to rename the directory to "
+          + renamedPath.toString() + ". We will leave it intact.");
+    }
+    return false;
+  }
+
+  /**
+   * A status indicating what happened with the processing of a given cache
+   * resource.
+   */
+  private enum ResourceStatus {
+    INIT,
+    /** Resource was successfully processed, but not deleted **/
+    PROCESSED,
+    /** Resource was successfully deleted **/
+    DELETED,
+    /** The cleaner task ran into an error while processing the resource **/
+    ERROR
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
index 2f3ddb1..3fdb588 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
@@ -64,6 +64,9 @@ public class SharedCacheManager extends CompositeService {
     this.store = createSCMStoreService(conf);
     addService(store);
 
+    CleanerService cs = createCleanerService(store);
+    addService(cs);
+
     // init metrics
     DefaultMetricsSystem.initialize("SharedCacheManager");
     JvmMetrics.initSingleton("SharedCacheManager", null);
@@ -90,6 +93,10 @@ public class SharedCacheManager extends CompositeService {
     return store;
   }
 
+  private CleanerService createCleanerService(SCMStore store) {
+    return new CleanerService(store);
+  }
+
   @Override
   protected void serviceStop() throws Exception {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java
new file mode 100644
index 0000000..5c8ea3d
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.sharedcachemanager.metrics;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsAnnotations;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MetricsSourceBuilder;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+
+/**
+ * This class is for maintaining the various Cleaner activity statistics and
+ * publishing them through the metrics interfaces.
+ */
+@Private
+@Evolving
+@Metrics(name = "CleanerActivity", about = "Cleaner service metrics", context 
= "yarn")
+public class CleanerMetrics {
+  public static final Log LOG = LogFactory.getLog(CleanerMetrics.class);
+  private final MetricsRegistry registry = new MetricsRegistry("cleaner");
+
+  enum Singleton {
+    INSTANCE;
+
+    CleanerMetrics impl;
+
+    synchronized CleanerMetrics init(Configuration conf) {
+      if (impl == null) {
+        impl = create(conf);
+      }
+      return impl;
+    }
+  }
+
+  public static CleanerMetrics initSingleton(Configuration conf) {
+    return Singleton.INSTANCE.init(conf);
+  }
+
+  public static CleanerMetrics getInstance() {
+    CleanerMetrics topMetrics = Singleton.INSTANCE.impl;
+    if (topMetrics == null)
+      throw new IllegalStateException(
+          "The CleanerMetics singlton instance is not initialized."
+              + " Have you called init first?");
+    return topMetrics;
+  }
+
+  @Metric("number of deleted files over all runs")
+  private MutableCounterLong totalDeletedFiles;
+
+  public long getTotalDeletedFiles() {
+    return totalDeletedFiles.value();
+  }
+
+  private @Metric("number of deleted files in the last run")
+  MutableGaugeLong deletedFiles;
+
+  public long getDeletedFiles() {
+    return deletedFiles.value();
+  }
+
+  private @Metric("number of processed files over all runs")
+  MutableCounterLong totalProcessedFiles;
+
+  public long getTotalProcessedFiles() {
+    return totalProcessedFiles.value();
+  }
+
+  private @Metric("number of processed files in the last run")
+  MutableGaugeLong processedFiles;
+
+  public long getProcessedFiles() {
+    return processedFiles.value();
+  }
+
+  @Metric("number of file errors over all runs")
+  private MutableCounterLong totalFileErrors;
+
+  public long getTotalFileErrors() {
+    return totalFileErrors.value();
+  }
+
+  private @Metric("number of file errors in the last run")
+  MutableGaugeLong fileErrors;
+
+  public long getFileErrors() {
+    return fileErrors.value();
+  }
+
+  private CleanerMetrics() {
+  }
+
+  /**
+   * The metric source obtained after parsing the annotations
+   */
+  MetricsSource metricSource;
+
+  static CleanerMetrics create(Configuration conf) {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+
+    CleanerMetrics metricObject = new CleanerMetrics();
+    MetricsSourceBuilder sb = 
MetricsAnnotations.newSourceBuilder(metricObject);
+    final MetricsSource s = sb.build();
+    ms.register("cleaner", "The cleaner service of truly shared cache", s);
+    metricObject.metricSource = s;
+    return metricObject;
+  }
+
+  /**
+   * Report a delete operation at the current system time
+   */
+  public void reportAFileDelete() {
+    totalProcessedFiles.incr();
+    processedFiles.incr();
+    totalDeletedFiles.incr();
+    deletedFiles.incr();
+  }
+
+  /**
+   * Report a process operation at the current system time
+   */
+  public void reportAFileProcess() {
+    totalProcessedFiles.incr();
+    processedFiles.incr();
+  }
+
+  /**
+   * Report a process operation error at the current system time
+   */
+  public void reportAFileError() {
+    totalProcessedFiles.incr();
+    processedFiles.incr();
+    totalFileErrors.incr();
+    fileErrors.incr();
+  }
+
+  /**
+   * Report the start a new run of the cleaner.
+   *
+   */
+  public void reportCleaningStart() {
+    processedFiles.set(0);
+    deletedFiles.set(0);
+    fileErrors.set(0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
index 79369d8..b8fe14f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil;
 import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;
-import org.apache.hadoop.yarn.server.sharedcachemanager.SharedCacheManager;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -83,13 +82,12 @@ public class InMemorySCMStore extends SCMStore {
   private final Object initialAppsLock = new Object();
   private long startTime;
   private int stalenessMinutes;
-  private AppChecker appChecker;
   private ScheduledExecutorService scheduler;
   private int initialDelayMin;
   private int checkPeriodMin;
 
-  public InMemorySCMStore() {
-    super(InMemorySCMStore.class.getName());
+  public InMemorySCMStore(AppChecker appChecker) {
+    super(InMemorySCMStore.class.getName(), appChecker);
   }
 
   private String intern(String key) {
@@ -108,9 +106,6 @@ public class InMemorySCMStore extends SCMStore {
     this.checkPeriodMin = getCheckPeriod(conf);
     this.stalenessMinutes = getStalenessPeriod(conf);
 
-    appChecker = createAppCheckerService(conf);
-    addService(appChecker);
-
     bootstrap(conf);
 
     ThreadFactory tf =
@@ -157,11 +152,6 @@ public class InMemorySCMStore extends SCMStore {
     super.serviceStop();
   }
 
-  @VisibleForTesting
-  AppChecker createAppCheckerService(Configuration conf) {
-    return SharedCacheManager.createAppCheckerService(conf);
-  }
-
   private void bootstrap(Configuration conf) throws IOException {
     Map<String, String> initialCachedResources =
         getInitialCachedResources(FileSystem.get(conf), conf);
@@ -201,14 +191,10 @@ public class InMemorySCMStore extends SCMStore {
     // now traverse individual directories and process them
     // the directory structure is specified by the nested level parameter
     // (e.g. 9/c/d/<checksum>/file)
-    StringBuilder pattern = new StringBuilder();
-    for (int i = 0; i < nestedLevel + 1; i++) {
-      pattern.append("*/");
-    }
-    pattern.append("*");
+    String pattern = SharedCacheUtil.getCacheEntryGlobPattern(nestedLevel+1);
 
     LOG.info("Querying for all individual cached resource files");
-    FileStatus[] entries = fs.globStatus(new Path(root, pattern.toString()));
+    FileStatus[] entries = fs.globStatus(new Path(root, pattern));
     int numEntries = entries == null ? 0 : entries.length;
     LOG.info("Found " + numEntries + " files: processing for one resource per "
         + "key");
@@ -360,6 +346,17 @@ public class InMemorySCMStore extends SCMStore {
   }
 
   /**
+   * Provides atomicity for the method.
+   */
+  @Override
+  public void cleanResourceReferences(String key) throws YarnException {
+    String interned = intern(key);
+    synchronized (interned) {
+      super.cleanResourceReferences(key);
+    }
+  }
+
+  /**
    * Removes the given resource from the store. Returns true if the resource is
    * found and removed or if the resource is not found. Returns false if it was
    * unable to remove the resource because the resource reference list was not
@@ -427,8 +424,8 @@ public class InMemorySCMStore extends SCMStore {
 
   private static int getStalenessPeriod(Configuration conf) {
     int stalenessMinutes =
-        conf.getInt(YarnConfiguration.IN_MEMORY_STALENESS_PERIOD,
-            YarnConfiguration.DEFAULT_IN_MEMORY_STALENESS_PERIOD);
+        conf.getInt(YarnConfiguration.IN_MEMORY_STALENESS_PERIOD_MINS,
+            YarnConfiguration.DEFAULT_IN_MEMORY_STALENESS_PERIOD_MINS);
     // non-positive value is invalid; use the default
     if (stalenessMinutes <= 0) {
       throw new HadoopIllegalArgumentException("Non-positive staleness value: "
@@ -440,8 +437,8 @@ public class InMemorySCMStore extends SCMStore {
 
   private static int getInitialDelay(Configuration conf) {
     int initialMinutes =
-        conf.getInt(YarnConfiguration.IN_MEMORY_INITIAL_DELAY,
-            YarnConfiguration.DEFAULT_IN_MEMORY_INITIAL_DELAY);
+        conf.getInt(YarnConfiguration.IN_MEMORY_INITIAL_DELAY_MINS,
+            YarnConfiguration.DEFAULT_IN_MEMORY_INITIAL_DELAY_MINS);
     // non-positive value is invalid; use the default
     if (initialMinutes <= 0) {
       throw new HadoopIllegalArgumentException(
@@ -453,8 +450,8 @@ public class InMemorySCMStore extends SCMStore {
 
   private static int getCheckPeriod(Configuration conf) {
     int checkMinutes =
-        conf.getInt(YarnConfiguration.IN_MEMORY_CHECK_PERIOD,
-            YarnConfiguration.DEFAULT_IN_MEMORY_CHECK_PERIOD);
+        conf.getInt(YarnConfiguration.IN_MEMORY_CHECK_PERIOD_MINS,
+            YarnConfiguration.DEFAULT_IN_MEMORY_CHECK_PERIOD_MINS);
     // non-positive value is invalid; use the default
     if (checkMinutes <= 0) {
       throw new HadoopIllegalArgumentException(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java
index 397d904..6be00b9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java
@@ -19,11 +19,15 @@
 package org.apache.hadoop.yarn.server.sharedcachemanager.store;
 
 import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;
 
 
 /**
@@ -35,8 +39,11 @@ import org.apache.hadoop.service.CompositeService;
 @Evolving
 public abstract class SCMStore extends CompositeService {
 
-  protected SCMStore(String name) {
+  protected final AppChecker appChecker;
+
+  protected SCMStore(String name, AppChecker appChecker) {
     super(name);
+    this.appChecker = appChecker;
   }
 
   /**
@@ -119,6 +126,33 @@ public abstract class SCMStore extends CompositeService {
       Collection<SharedCacheResourceReference> refs, boolean updateAccessTime);
 
   /**
+   * Clean all resource references to a cache resource that contain application
+   * ids pointing to finished applications. If the resource key does not exist,
+   * do nothing.
+   *
+   * @param key a unique identifier for a resource
+   * @throws YarnException
+   */
+  @Private
+  public void cleanResourceReferences(String key) throws YarnException {
+    Collection<SharedCacheResourceReference> refs = getResourceReferences(key);
+    if (!refs.isEmpty()) {
+      Set<SharedCacheResourceReference> refsToRemove =
+          new HashSet<SharedCacheResourceReference>();
+      for (SharedCacheResourceReference r : refs) {
+        if (!appChecker.isApplicationActive(r.getAppId())) {
+          // application in resource reference is dead, it is safe to remove 
the
+          // reference
+          refsToRemove.add(r);
+        }
+      }
+      if (refsToRemove.size() > 0) {
+        removeResourceReferences(key, refsToRemove, false);
+      }
+    }
+  }
+
+  /**
    * Check if a specific resource is evictable according to the store's enabled
    * cache eviction policies.
    * 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java
new file mode 100644
index 0000000..421b5bb
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager;
+
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+import org.junit.Test;
+
+public class TestCleanerTask {
+  private static final String ROOT =
+      YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT;
+  private static final long SLEEP_TIME =
+      YarnConfiguration.DEFAULT_SCM_CLEANER_RESOURCE_SLEEP_MS;
+  private static final int NESTED_LEVEL =
+      YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL;
+
+  @Test
+  public void testNonExistentRoot() throws Exception {
+    FileSystem fs = mock(FileSystem.class);
+    CleanerMetrics metrics = mock(CleanerMetrics.class);
+    SCMStore store = mock(SCMStore.class);
+
+    CleanerTask task =
+        createSpiedTask(fs, store, metrics, new ReentrantLock());
+    // the shared cache root does not exist
+    when(fs.exists(task.getRootPath())).thenReturn(false);
+
+    task.run();
+
+    // process() should not be called
+    verify(task, never()).process();
+  }
+
+  @Test
+  public void testProcessFreshResource() throws Exception {
+    FileSystem fs = mock(FileSystem.class);
+    CleanerMetrics metrics = mock(CleanerMetrics.class);
+    SCMStore store = mock(SCMStore.class);
+
+    CleanerTask task =
+        createSpiedTask(fs, store, metrics, new ReentrantLock());
+
+    // mock a resource that is not evictable
+    when(store.isResourceEvictable(isA(String.class), isA(FileStatus.class)))
+        .thenReturn(false);
+    FileStatus status = mock(FileStatus.class);
+    when(status.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc"));
+
+    // process the resource
+    task.processSingleResource(status);
+
+    // the directory should not be renamed
+    verify(fs, never()).rename(eq(status.getPath()), isA(Path.class));
+    // metrics should record a processed file (but not delete)
+    verify(metrics).reportAFileProcess();
+    verify(metrics, never()).reportAFileDelete();
+  }
+
+  @Test
+  public void testProcessEvictableResource() throws Exception {
+    FileSystem fs = mock(FileSystem.class);
+    CleanerMetrics metrics = mock(CleanerMetrics.class);
+    SCMStore store = mock(SCMStore.class);
+
+    CleanerTask task =
+        createSpiedTask(fs, store, metrics, new ReentrantLock());
+
+    // mock an evictable resource
+    when(store.isResourceEvictable(isA(String.class), isA(FileStatus.class)))
+        .thenReturn(true);
+    FileStatus status = mock(FileStatus.class);
+    when(status.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc"));
+    when(store.removeResource(isA(String.class))).thenReturn(true);
+    // rename succeeds
+    when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true);
+    // delete returns true
+    when(fs.delete(isA(Path.class), anyBoolean())).thenReturn(true);
+
+    // process the resource
+    task.processSingleResource(status);
+
+    // the directory should be renamed
+    verify(fs).rename(eq(status.getPath()), isA(Path.class));
+    // metrics should record a deleted file
+    verify(metrics).reportAFileDelete();
+    verify(metrics, never()).reportAFileProcess();
+  }
+
+  private CleanerTask createSpiedTask(FileSystem fs, SCMStore store,
+      CleanerMetrics metrics, Lock isCleanerRunning) {
+    return spy(new CleanerTask(ROOT, SLEEP_TIME, NESTED_LEVEL, fs, store,
+        metrics, isCleanerRunning));
+  }
+
+  @Test
+  public void testResourceIsInUseHasAnActiveApp() throws Exception {
+    FileSystem fs = mock(FileSystem.class);
+    CleanerMetrics metrics = mock(CleanerMetrics.class);
+    SCMStore store = mock(SCMStore.class);
+
+    FileStatus resource = mock(FileStatus.class);
+    when(resource.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc"));
+    // resource is stale
+    when(store.isResourceEvictable(isA(String.class), isA(FileStatus.class)))
+        .thenReturn(true);
+    // but still has appIds
+    when(store.removeResource(isA(String.class))).thenReturn(false);
+
+    CleanerTask task =
+        createSpiedTask(fs, store, metrics, new ReentrantLock());
+
+    // process the resource
+    task.processSingleResource(resource);
+
+    // metrics should record a processed file (but not delete)
+    verify(metrics).reportAFileProcess();
+    verify(metrics, never()).reportAFileDelete();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java
new file mode 100644
index 0000000..26ab179
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.sharedcachemanager.metrics;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCleanerMetrics {
+
+  Configuration conf = new Configuration();
+  CleanerMetrics cleanerMetrics;
+
+  @Before
+  public void init() {
+    CleanerMetrics.initSingleton(conf);
+    cleanerMetrics = CleanerMetrics.getInstance();
+  }
+
+  @Test
+  public void testMetricsOverMultiplePeriods() {
+    simulateACleanerRun();
+    assertMetrics(4, 4, 1, 1);
+    simulateACleanerRun();
+    assertMetrics(4, 8, 1, 2);
+  }
+
+  public void simulateACleanerRun() {
+    cleanerMetrics.reportCleaningStart();
+    cleanerMetrics.reportAFileProcess();
+    cleanerMetrics.reportAFileDelete();
+    cleanerMetrics.reportAFileProcess();
+    cleanerMetrics.reportAFileProcess();
+  }
+
+  void assertMetrics(int proc, int totalProc, int del, int totalDel) {
+    assertEquals(
+        "Processed files in the last period are not measured correctly", proc,
+        cleanerMetrics.getProcessedFiles());
+    assertEquals("Total processed files are not measured correctly",
+        totalProc, cleanerMetrics.getTotalProcessedFiles());
+    assertEquals(
+        "Deleted files in the last period are not measured correctly", del,
+        cleanerMetrics.getDeletedFiles());
+    assertEquals("Total deleted files are not measured correctly",
+        totalDel, cleanerMetrics.getTotalDeletedFiles());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java
index 891703e..831ef6e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java
@@ -60,10 +60,8 @@ public class TestInMemorySCMStore {
 
   @Before
   public void setup() {
-    this.store = spy(new InMemorySCMStore());
     this.checker = spy(new DummyAppChecker());
-    doReturn(checker).when(store).createAppCheckerService(
-        isA(Configuration.class));
+    this.store = spy(new InMemorySCMStore(checker));
   }
 
   @After

Reply via email to