Repository: hadoop Updated Branches: refs/heads/branch-2 9bd149978 -> acc6f622e
YARN-2183. [YARN-1492] Cleaner service for cache manager. (Chris Trezzo and Sangjin Lee via kasha) (cherry picked from commit c51e53d7aad46059f52d4046a5fedfdfd3c37955) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/acc6f622 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/acc6f622 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/acc6f622 Branch: refs/heads/branch-2 Commit: acc6f622e0937f92ce8c3bec04b92247787e5904 Parents: 9bd1499 Author: Karthik Kambatla <ka...@apache.org> Authored: Sat Oct 25 10:31:06 2014 -0700 Committer: Karthik Kambatla <ka...@apache.org> Committed: Sat Oct 25 11:50:06 2014 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/conf/YarnConfiguration.java | 47 ++- .../src/main/resources/yarn-default.xml | 27 +- .../server/sharedcache/SharedCacheUtil.java | 10 + .../sharedcachemanager/CleanerService.java | 218 +++++++++++++ .../server/sharedcachemanager/CleanerTask.java | 308 +++++++++++++++++++ .../sharedcachemanager/SharedCacheManager.java | 7 + .../metrics/CleanerMetrics.java | 172 +++++++++++ .../store/InMemorySCMStore.java | 45 ++- .../sharedcachemanager/store/SCMStore.java | 36 ++- .../sharedcachemanager/TestCleanerTask.java | 152 +++++++++ .../metrics/TestCleanerMetrics.java | 65 ++++ .../store/TestInMemorySCMStore.java | 4 +- 13 files changed, 1054 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 89b2750..53f8e44 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -12,6 +12,9 @@ Release 2.7.0 - UNRELEASED YARN-2180. [YARN-1492] In-memory backing store for cache manager. (Chris Trezzo via kasha) + YARN-2183. [YARN-1492] Cleaner service for cache manager. + (Chris Trezzo and Sangjin Lee via kasha) + IMPROVEMENTS YARN-1979. TestDirectoryCollection fails when the umask is unusual. http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 76a3818..c545e44 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1393,25 +1393,54 @@ public class YarnConfiguration extends Configuration { * the last reference exceeds the staleness period. This value is specified in * minutes. */ - public static final String IN_MEMORY_STALENESS_PERIOD = - IN_MEMORY_STORE_PREFIX + "staleness-period"; - public static final int DEFAULT_IN_MEMORY_STALENESS_PERIOD = 7 * 24 * 60; + public static final String IN_MEMORY_STALENESS_PERIOD_MINS = + IN_MEMORY_STORE_PREFIX + "staleness-period-mins"; + public static final int DEFAULT_IN_MEMORY_STALENESS_PERIOD_MINS = + 7 * 24 * 60; /** * Initial delay before the in-memory store runs its first check to remove * dead initial applications. Specified in minutes. */ - public static final String IN_MEMORY_INITIAL_DELAY = - IN_MEMORY_STORE_PREFIX + "initial-delay"; - public static final int DEFAULT_IN_MEMORY_INITIAL_DELAY = 10; + public static final String IN_MEMORY_INITIAL_DELAY_MINS = + IN_MEMORY_STORE_PREFIX + "initial-delay-mins"; + public static final int DEFAULT_IN_MEMORY_INITIAL_DELAY_MINS = 10; /** * The frequency at which the in-memory store checks to remove dead initial * applications. Specified in minutes. */ - public static final String IN_MEMORY_CHECK_PERIOD = - IN_MEMORY_STORE_PREFIX + "check-period"; - public static final int DEFAULT_IN_MEMORY_CHECK_PERIOD = 12 * 60; + public static final String IN_MEMORY_CHECK_PERIOD_MINS = + IN_MEMORY_STORE_PREFIX + "check-period-mins"; + public static final int DEFAULT_IN_MEMORY_CHECK_PERIOD_MINS = 12 * 60; + + // SCM Cleaner service configuration + + private static final String SCM_CLEANER_PREFIX = SHARED_CACHE_PREFIX + + "cleaner."; + + /** + * The frequency at which a cleaner task runs. Specified in minutes. + */ + public static final String SCM_CLEANER_PERIOD_MINS = + SCM_CLEANER_PREFIX + "period-mins"; + public static final int DEFAULT_SCM_CLEANER_PERIOD_MINS = 24 * 60; + + /** + * Initial delay before the first cleaner task is scheduled. Specified in + * minutes. + */ + public static final String SCM_CLEANER_INITIAL_DELAY_MINS = + SCM_CLEANER_PREFIX + "initial-delay-mins"; + public static final int DEFAULT_SCM_CLEANER_INITIAL_DELAY_MINS = 10; + + /** + * The time to sleep between processing each shared cache resource. Specified + * in milliseconds. + */ + public static final String SCM_CLEANER_RESOURCE_SLEEP_MS = + SCM_CLEANER_PREFIX + "resource-sleep-ms"; + public static final long DEFAULT_SCM_CLEANER_RESOURCE_SLEEP_MS = 0L; //////////////////////////////// // Other Configs http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 922d068..ee7e232 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1379,24 +1379,45 @@ <description>A resource in the in-memory store is considered stale if the time since the last reference exceeds the staleness period. This value is specified in minutes.</description> - <name>yarn.sharedcache.store.in-memory.staleness-period</name> + <name>yarn.sharedcache.store.in-memory.staleness-period-mins</name> <value>10080</value> </property> <property> <description>Initial delay before the in-memory store runs its first check to remove dead initial applications. Specified in minutes.</description> - <name>yarn.sharedcache.store.in-memory.initial-delay</name> + <name>yarn.sharedcache.store.in-memory.initial-delay-mins</name> <value>10</value> </property> <property> <description>The frequency at which the in-memory store checks to remove dead initial applications. Specified in minutes.</description> - <name>yarn.sharedcache.store.in-memory.check-period</name> + <name>yarn.sharedcache.store.in-memory.check-period-mins</name> <value>720</value> </property> + <property> + <description>The frequency at which a cleaner task runs. + Specified in minutes.</description> + <name>yarn.sharedcache.cleaner.period-mins</name> + <value>1440</value> + </property> + + <property> + <description>Initial delay before the first cleaner task is scheduled. + Specified in minutes.</description> + <name>yarn.sharedcache.cleaner.initial-delay-mins</name> + <value>10</value> + </property> + + <property> + <description>The time to sleep between processing each shared cache + resource. Specified in milliseconds.</description> + <name>yarn.sharedcache.cleaner.resource-sleep-ms</name> + <value>0</value> + </property> + <!-- Other configuration --> <property> <description>The interval that the yarn client library uses to poll the http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java index 4b933ac..d3cf379 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java @@ -78,4 +78,14 @@ public class SharedCacheUtil { return sb.toString(); } + + @Private + public static String getCacheEntryGlobPattern(int depth) { + StringBuilder pattern = new StringBuilder(); + for (int i = 0; i < depth; i++) { + pattern.append("*/"); + } + pattern.append("*"); + return pattern.toString(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java new file mode 100644 index 0000000..593be4a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * The cleaner service that maintains the shared cache area, and cleans up stale + * entries on a regular basis. + */ +@Private +@Evolving +public class CleanerService extends CompositeService { + /** + * The name of the global cleaner lock that the cleaner creates to indicate + * that a cleaning process is in progress. + */ + public static final String GLOBAL_CLEANER_PID = ".cleaner_pid"; + + private static final Log LOG = LogFactory.getLog(CleanerService.class); + + private Configuration conf; + private CleanerMetrics metrics; + private ScheduledExecutorService scheduledExecutor; + private final SCMStore store; + private final Lock cleanerTaskLock; + + public CleanerService(SCMStore store) { + super("CleanerService"); + this.store = store; + this.cleanerTaskLock = new ReentrantLock(); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.conf = conf; + + // create scheduler executor service that services the cleaner tasks + // use 2 threads to accommodate the on-demand tasks and reduce the chance of + // back-to-back runs + ThreadFactory tf = + new ThreadFactoryBuilder().setNameFormat("Shared cache cleaner").build(); + scheduledExecutor = Executors.newScheduledThreadPool(2, tf); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + if (!writeGlobalCleanerPidFile()) { + throw new YarnException("The global cleaner pid file already exists! " + + "It appears there is another CleanerService running in the cluster"); + } + + this.metrics = CleanerMetrics.initSingleton(conf); + + // Start dependent services (i.e. AppChecker) + super.serviceStart(); + + Runnable task = + CleanerTask.create(conf, store, metrics, cleanerTaskLock); + long periodInMinutes = getPeriod(conf); + scheduledExecutor.scheduleAtFixedRate(task, getInitialDelay(conf), + periodInMinutes, TimeUnit.MINUTES); + LOG.info("Scheduled the shared cache cleaner task to run every " + + periodInMinutes + " minutes."); + } + + @Override + protected void serviceStop() throws Exception { + LOG.info("Shutting down the background thread."); + scheduledExecutor.shutdownNow(); + try { + if (scheduledExecutor.awaitTermination(10, TimeUnit.SECONDS)) { + LOG.info("The background thread stopped."); + } else { + LOG.warn("Gave up waiting for the cleaner task to shutdown."); + } + } catch (InterruptedException e) { + LOG.warn("The cleaner service was interrupted while shutting down the task.", + e); + } + + removeGlobalCleanerPidFile(); + + super.serviceStop(); + } + + /** + * Execute an on-demand cleaner task. + */ + protected void runCleanerTask() { + Runnable task = + CleanerTask.create(conf, store, metrics, cleanerTaskLock); + // this is a non-blocking call (it simply submits the task to the executor + // queue and returns) + this.scheduledExecutor.execute(task); + } + + /** + * To ensure there are not multiple instances of the SCM running on a given + * cluster, a global pid file is used. This file contains the hostname of the + * machine that owns the pid file. + * + * @return true if the pid file was written, false otherwise + * @throws YarnException + */ + private boolean writeGlobalCleanerPidFile() throws YarnException { + String root = + conf.get(YarnConfiguration.SHARED_CACHE_ROOT, + YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT); + Path pidPath = new Path(root, GLOBAL_CLEANER_PID); + try { + FileSystem fs = FileSystem.get(this.conf); + + if (fs.exists(pidPath)) { + return false; + } + + FSDataOutputStream os = fs.create(pidPath, false); + // write the hostname and the process id in the global cleaner pid file + final String ID = ManagementFactory.getRuntimeMXBean().getName(); + os.writeUTF(ID); + os.close(); + // add it to the delete-on-exit to ensure it gets deleted when the JVM + // exits + fs.deleteOnExit(pidPath); + } catch (IOException e) { + throw new YarnException(e); + } + LOG.info("Created the global cleaner pid file at " + pidPath.toString()); + return true; + } + + private void removeGlobalCleanerPidFile() { + try { + FileSystem fs = FileSystem.get(this.conf); + String root = + conf.get(YarnConfiguration.SHARED_CACHE_ROOT, + YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT); + + Path pidPath = new Path(root, GLOBAL_CLEANER_PID); + + + fs.delete(pidPath, false); + LOG.info("Removed the global cleaner pid file at " + pidPath.toString()); + } catch (IOException e) { + LOG.error( + "Unable to remove the global cleaner pid file! The file may need " + + "to be removed manually.", e); + } + } + + private static int getInitialDelay(Configuration conf) { + int initialDelayInMinutes = + conf.getInt(YarnConfiguration.SCM_CLEANER_INITIAL_DELAY_MINS, + YarnConfiguration.DEFAULT_SCM_CLEANER_INITIAL_DELAY_MINS); + // negative value is invalid; use the default + if (initialDelayInMinutes < 0) { + throw new HadoopIllegalArgumentException("Negative initial delay value: " + + initialDelayInMinutes + + ". The initial delay must be greater than zero."); + } + return initialDelayInMinutes; + } + + private static int getPeriod(Configuration conf) { + int periodInMinutes = + conf.getInt(YarnConfiguration.SCM_CLEANER_PERIOD_MINS, + YarnConfiguration.DEFAULT_SCM_CLEANER_PERIOD_MINS); + // non-positive value is invalid; use the default + if (periodInMinutes <= 0) { + throw new HadoopIllegalArgumentException("Non-positive period value: " + + periodInMinutes + + ". The cleaner period must be greater than or equal to zero."); + } + return periodInMinutes; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java new file mode 100644 index 0000000..a7fdcbd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerTask.java @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import java.io.IOException; +import java.util.concurrent.locks.Lock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil; +import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; + +/** + * The task that runs and cleans up the shared cache area for stale entries and + * orphaned files. It is expected that only one cleaner task runs at any given + * point in time. + */ +@Private +@Evolving +class CleanerTask implements Runnable { + private static final String RENAMED_SUFFIX = "-renamed"; + private static final Log LOG = LogFactory.getLog(CleanerTask.class); + + private final String location; + private final long sleepTime; + private final int nestedLevel; + private final Path root; + private final FileSystem fs; + private final SCMStore store; + private final CleanerMetrics metrics; + private final Lock cleanerTaskLock; + + /** + * Creates a cleaner task based on the configuration. This is provided for + * convenience. + * + * @param conf + * @param store + * @param metrics + * @param cleanerTaskLock lock that ensures a serial execution of cleaner + * task + * @return an instance of a CleanerTask + */ + public static CleanerTask create(Configuration conf, SCMStore store, + CleanerMetrics metrics, Lock cleanerTaskLock) { + try { + // get the root directory for the shared cache + String location = + conf.get(YarnConfiguration.SHARED_CACHE_ROOT, + YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT); + + long sleepTime = + conf.getLong(YarnConfiguration.SCM_CLEANER_RESOURCE_SLEEP_MS, + YarnConfiguration.DEFAULT_SCM_CLEANER_RESOURCE_SLEEP_MS); + int nestedLevel = SharedCacheUtil.getCacheDepth(conf); + FileSystem fs = FileSystem.get(conf); + + return new CleanerTask(location, sleepTime, nestedLevel, fs, store, + metrics, cleanerTaskLock); + } catch (IOException e) { + LOG.error("Unable to obtain the filesystem for the cleaner service", e); + throw new ExceptionInInitializerError(e); + } + } + + /** + * Creates a cleaner task based on the root directory location and the + * filesystem. + */ + CleanerTask(String location, long sleepTime, int nestedLevel, FileSystem fs, + SCMStore store, CleanerMetrics metrics, Lock cleanerTaskLock) { + this.location = location; + this.sleepTime = sleepTime; + this.nestedLevel = nestedLevel; + this.root = new Path(location); + this.fs = fs; + this.store = store; + this.metrics = metrics; + this.cleanerTaskLock = cleanerTaskLock; + } + + @Override + public void run() { + if (!this.cleanerTaskLock.tryLock()) { + // there is already another task running + LOG.warn("A cleaner task is already running. " + + "This scheduled cleaner task will do nothing."); + return; + } + + try { + if (!fs.exists(root)) { + LOG.error("The shared cache root " + location + " was not found. " + + "The cleaner task will do nothing."); + return; + } + + // we're now ready to process the shared cache area + process(); + } catch (Throwable e) { + LOG.error("Unexpected exception while initializing the cleaner task. " + + "This task will do nothing,", e); + } finally { + // this is set to false regardless of if it is a scheduled or on-demand + // task + this.cleanerTaskLock.unlock(); + } + } + + /** + * Sweeps and processes the shared cache area to clean up stale and orphaned + * files. + */ + void process() { + // mark the beginning of the run in the metrics + metrics.reportCleaningStart(); + try { + // now traverse individual directories and process them + // the directory structure is specified by the nested level parameter + // (e.g. 9/c/d/<checksum>) + String pattern = SharedCacheUtil.getCacheEntryGlobPattern(nestedLevel); + FileStatus[] resources = + fs.globStatus(new Path(root, pattern)); + int numResources = resources == null ? 0 : resources.length; + LOG.info("Processing " + numResources + " resources in the shared cache"); + long beginMs = System.currentTimeMillis(); + if (resources != null) { + for (FileStatus resource : resources) { + // check for interruption so it can abort in a timely manner in case + // of shutdown + if (Thread.currentThread().isInterrupted()) { + LOG.warn("The cleaner task was interrupted. Aborting."); + break; + } + + if (resource.isDirectory()) { + processSingleResource(resource); + } else { + LOG.warn("Invalid file at path " + resource.getPath().toString() + + + " when a directory was expected"); + } + // add sleep time between cleaning each directory if it is non-zero + if (sleepTime > 0) { + Thread.sleep(sleepTime); + } + } + } + long endMs = System.currentTimeMillis(); + long durationMs = endMs - beginMs; + LOG.info("Processed " + numResources + " resource(s) in " + durationMs + + " ms."); + } catch (IOException e1) { + LOG.error("Unable to complete the cleaner task", e1); + } catch (InterruptedException e2) { + Thread.currentThread().interrupt(); // restore the interrupt + } + } + + /** + * Returns a path for the root directory for the shared cache. + */ + Path getRootPath() { + return root; + } + + /** + * Processes a single shared cache resource directory. + */ + void processSingleResource(FileStatus resource) { + Path path = resource.getPath(); + // indicates the processing status of the resource + ResourceStatus resourceStatus = ResourceStatus.INIT; + + // first, if the path ends with the renamed suffix, it indicates the + // directory was moved (as stale) but somehow not deleted (probably due to + // SCM failure); delete the directory + if (path.toString().endsWith(RENAMED_SUFFIX)) { + LOG.info("Found a renamed directory that was left undeleted at " + + path.toString() + ". Deleting."); + try { + if (fs.delete(path, true)) { + resourceStatus = ResourceStatus.DELETED; + } + } catch (IOException e) { + LOG.error("Error while processing a shared cache resource: " + path, e); + } + } else { + // this is the path to the cache resource directory + // the directory name is the resource key (i.e. a unique identifier) + String key = path.getName(); + + try { + store.cleanResourceReferences(key); + } catch (YarnException e) { + LOG.error("Exception thrown while removing dead appIds.", e); + } + + if (store.isResourceEvictable(key, resource)) { + try { + /* + * TODO See YARN-2663: There is a race condition between + * store.removeResource(key) and + * removeResourceFromCacheFileSystem(path) operations because they do + * not happen atomically and resources can be uploaded with different + * file names by the node managers. + */ + // remove the resource from scm (checks for appIds as well) + if (store.removeResource(key)) { + // remove the resource from the file system + boolean deleted = removeResourceFromCacheFileSystem(path); + if (deleted) { + resourceStatus = ResourceStatus.DELETED; + } else { + LOG.error("Failed to remove path from the file system." + + " Skipping this resource: " + path); + resourceStatus = ResourceStatus.ERROR; + } + } else { + // we did not delete the resource because it contained application + // ids + resourceStatus = ResourceStatus.PROCESSED; + } + } catch (IOException e) { + LOG.error( + "Failed to remove path from the file system. Skipping this resource: " + + path, e); + resourceStatus = ResourceStatus.ERROR; + } + } else { + resourceStatus = ResourceStatus.PROCESSED; + } + } + + // record the processing + switch (resourceStatus) { + case DELETED: + metrics.reportAFileDelete(); + break; + case PROCESSED: + metrics.reportAFileProcess(); + break; + case ERROR: + metrics.reportAFileError(); + break; + default: + LOG.error("Cleaner encountered an invalid status (" + resourceStatus + + ") while processing resource: " + path.getName()); + } + } + + private boolean removeResourceFromCacheFileSystem(Path path) + throws IOException { + // rename the directory to make the delete atomic + Path renamedPath = new Path(path.toString() + RENAMED_SUFFIX); + if (fs.rename(path, renamedPath)) { + // the directory can be removed safely now + // log the original path + LOG.info("Deleting " + path.toString()); + return fs.delete(renamedPath, true); + } else { + // we were unable to remove it for some reason: it's best to leave + // it at that + LOG.error("We were not able to rename the directory to " + + renamedPath.toString() + ". We will leave it intact."); + } + return false; + } + + /** + * A status indicating what happened with the processing of a given cache + * resource. + */ + private enum ResourceStatus { + INIT, + /** Resource was successfully processed, but not deleted **/ + PROCESSED, + /** Resource was successfully deleted **/ + DELETED, + /** The cleaner task ran into an error while processing the resource **/ + ERROR + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java index 2f3ddb1..3fdb588 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java @@ -64,6 +64,9 @@ public class SharedCacheManager extends CompositeService { this.store = createSCMStoreService(conf); addService(store); + CleanerService cs = createCleanerService(store); + addService(cs); + // init metrics DefaultMetricsSystem.initialize("SharedCacheManager"); JvmMetrics.initSingleton("SharedCacheManager", null); @@ -90,6 +93,10 @@ public class SharedCacheManager extends CompositeService { return store; } + private CleanerService createCleanerService(SCMStore store) { + return new CleanerService(store); + } + @Override protected void serviceStop() throws Exception { http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java new file mode 100644 index 0000000..5c8ea3d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/CleanerMetrics.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.sharedcachemanager.metrics; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsAnnotations; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MetricsSourceBuilder; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; + +/** + * This class is for maintaining the various Cleaner activity statistics and + * publishing them through the metrics interfaces. + */ +@Private +@Evolving +@Metrics(name = "CleanerActivity", about = "Cleaner service metrics", context = "yarn") +public class CleanerMetrics { + public static final Log LOG = LogFactory.getLog(CleanerMetrics.class); + private final MetricsRegistry registry = new MetricsRegistry("cleaner"); + + enum Singleton { + INSTANCE; + + CleanerMetrics impl; + + synchronized CleanerMetrics init(Configuration conf) { + if (impl == null) { + impl = create(conf); + } + return impl; + } + } + + public static CleanerMetrics initSingleton(Configuration conf) { + return Singleton.INSTANCE.init(conf); + } + + public static CleanerMetrics getInstance() { + CleanerMetrics topMetrics = Singleton.INSTANCE.impl; + if (topMetrics == null) + throw new IllegalStateException( + "The CleanerMetics singlton instance is not initialized." + + " Have you called init first?"); + return topMetrics; + } + + @Metric("number of deleted files over all runs") + private MutableCounterLong totalDeletedFiles; + + public long getTotalDeletedFiles() { + return totalDeletedFiles.value(); + } + + private @Metric("number of deleted files in the last run") + MutableGaugeLong deletedFiles; + + public long getDeletedFiles() { + return deletedFiles.value(); + } + + private @Metric("number of processed files over all runs") + MutableCounterLong totalProcessedFiles; + + public long getTotalProcessedFiles() { + return totalProcessedFiles.value(); + } + + private @Metric("number of processed files in the last run") + MutableGaugeLong processedFiles; + + public long getProcessedFiles() { + return processedFiles.value(); + } + + @Metric("number of file errors over all runs") + private MutableCounterLong totalFileErrors; + + public long getTotalFileErrors() { + return totalFileErrors.value(); + } + + private @Metric("number of file errors in the last run") + MutableGaugeLong fileErrors; + + public long getFileErrors() { + return fileErrors.value(); + } + + private CleanerMetrics() { + } + + /** + * The metric source obtained after parsing the annotations + */ + MetricsSource metricSource; + + static CleanerMetrics create(Configuration conf) { + MetricsSystem ms = DefaultMetricsSystem.instance(); + + CleanerMetrics metricObject = new CleanerMetrics(); + MetricsSourceBuilder sb = MetricsAnnotations.newSourceBuilder(metricObject); + final MetricsSource s = sb.build(); + ms.register("cleaner", "The cleaner service of truly shared cache", s); + metricObject.metricSource = s; + return metricObject; + } + + /** + * Report a delete operation at the current system time + */ + public void reportAFileDelete() { + totalProcessedFiles.incr(); + processedFiles.incr(); + totalDeletedFiles.incr(); + deletedFiles.incr(); + } + + /** + * Report a process operation at the current system time + */ + public void reportAFileProcess() { + totalProcessedFiles.incr(); + processedFiles.incr(); + } + + /** + * Report a process operation error at the current system time + */ + public void reportAFileError() { + totalProcessedFiles.incr(); + processedFiles.incr(); + totalFileErrors.incr(); + fileErrors.incr(); + } + + /** + * Report the start a new run of the cleaner. + * + */ + public void reportCleaningStart() { + processedFiles.set(0); + deletedFiles.set(0); + fileErrors.set(0); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java index 79369d8..b8fe14f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java @@ -48,7 +48,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil; import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker; -import org.apache.hadoop.yarn.server.sharedcachemanager.SharedCacheManager; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -83,13 +82,12 @@ public class InMemorySCMStore extends SCMStore { private final Object initialAppsLock = new Object(); private long startTime; private int stalenessMinutes; - private AppChecker appChecker; private ScheduledExecutorService scheduler; private int initialDelayMin; private int checkPeriodMin; - public InMemorySCMStore() { - super(InMemorySCMStore.class.getName()); + public InMemorySCMStore(AppChecker appChecker) { + super(InMemorySCMStore.class.getName(), appChecker); } private String intern(String key) { @@ -108,9 +106,6 @@ public class InMemorySCMStore extends SCMStore { this.checkPeriodMin = getCheckPeriod(conf); this.stalenessMinutes = getStalenessPeriod(conf); - appChecker = createAppCheckerService(conf); - addService(appChecker); - bootstrap(conf); ThreadFactory tf = @@ -157,11 +152,6 @@ public class InMemorySCMStore extends SCMStore { super.serviceStop(); } - @VisibleForTesting - AppChecker createAppCheckerService(Configuration conf) { - return SharedCacheManager.createAppCheckerService(conf); - } - private void bootstrap(Configuration conf) throws IOException { Map<String, String> initialCachedResources = getInitialCachedResources(FileSystem.get(conf), conf); @@ -201,14 +191,10 @@ public class InMemorySCMStore extends SCMStore { // now traverse individual directories and process them // the directory structure is specified by the nested level parameter // (e.g. 9/c/d/<checksum>/file) - StringBuilder pattern = new StringBuilder(); - for (int i = 0; i < nestedLevel + 1; i++) { - pattern.append("*/"); - } - pattern.append("*"); + String pattern = SharedCacheUtil.getCacheEntryGlobPattern(nestedLevel+1); LOG.info("Querying for all individual cached resource files"); - FileStatus[] entries = fs.globStatus(new Path(root, pattern.toString())); + FileStatus[] entries = fs.globStatus(new Path(root, pattern)); int numEntries = entries == null ? 0 : entries.length; LOG.info("Found " + numEntries + " files: processing for one resource per " + "key"); @@ -360,6 +346,17 @@ public class InMemorySCMStore extends SCMStore { } /** + * Provides atomicity for the method. + */ + @Override + public void cleanResourceReferences(String key) throws YarnException { + String interned = intern(key); + synchronized (interned) { + super.cleanResourceReferences(key); + } + } + + /** * Removes the given resource from the store. Returns true if the resource is * found and removed or if the resource is not found. Returns false if it was * unable to remove the resource because the resource reference list was not @@ -427,8 +424,8 @@ public class InMemorySCMStore extends SCMStore { private static int getStalenessPeriod(Configuration conf) { int stalenessMinutes = - conf.getInt(YarnConfiguration.IN_MEMORY_STALENESS_PERIOD, - YarnConfiguration.DEFAULT_IN_MEMORY_STALENESS_PERIOD); + conf.getInt(YarnConfiguration.IN_MEMORY_STALENESS_PERIOD_MINS, + YarnConfiguration.DEFAULT_IN_MEMORY_STALENESS_PERIOD_MINS); // non-positive value is invalid; use the default if (stalenessMinutes <= 0) { throw new HadoopIllegalArgumentException("Non-positive staleness value: " @@ -440,8 +437,8 @@ public class InMemorySCMStore extends SCMStore { private static int getInitialDelay(Configuration conf) { int initialMinutes = - conf.getInt(YarnConfiguration.IN_MEMORY_INITIAL_DELAY, - YarnConfiguration.DEFAULT_IN_MEMORY_INITIAL_DELAY); + conf.getInt(YarnConfiguration.IN_MEMORY_INITIAL_DELAY_MINS, + YarnConfiguration.DEFAULT_IN_MEMORY_INITIAL_DELAY_MINS); // non-positive value is invalid; use the default if (initialMinutes <= 0) { throw new HadoopIllegalArgumentException( @@ -453,8 +450,8 @@ public class InMemorySCMStore extends SCMStore { private static int getCheckPeriod(Configuration conf) { int checkMinutes = - conf.getInt(YarnConfiguration.IN_MEMORY_CHECK_PERIOD, - YarnConfiguration.DEFAULT_IN_MEMORY_CHECK_PERIOD); + conf.getInt(YarnConfiguration.IN_MEMORY_CHECK_PERIOD_MINS, + YarnConfiguration.DEFAULT_IN_MEMORY_CHECK_PERIOD_MINS); // non-positive value is invalid; use the default if (checkMinutes <= 0) { throw new HadoopIllegalArgumentException( http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java index 397d904..6be00b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java @@ -19,11 +19,15 @@ package org.apache.hadoop.yarn.server.sharedcachemanager.store; import java.util.Collection; +import java.util.HashSet; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker; /** @@ -35,8 +39,11 @@ import org.apache.hadoop.service.CompositeService; @Evolving public abstract class SCMStore extends CompositeService { - protected SCMStore(String name) { + protected final AppChecker appChecker; + + protected SCMStore(String name, AppChecker appChecker) { super(name); + this.appChecker = appChecker; } /** @@ -119,6 +126,33 @@ public abstract class SCMStore extends CompositeService { Collection<SharedCacheResourceReference> refs, boolean updateAccessTime); /** + * Clean all resource references to a cache resource that contain application + * ids pointing to finished applications. If the resource key does not exist, + * do nothing. + * + * @param key a unique identifier for a resource + * @throws YarnException + */ + @Private + public void cleanResourceReferences(String key) throws YarnException { + Collection<SharedCacheResourceReference> refs = getResourceReferences(key); + if (!refs.isEmpty()) { + Set<SharedCacheResourceReference> refsToRemove = + new HashSet<SharedCacheResourceReference>(); + for (SharedCacheResourceReference r : refs) { + if (!appChecker.isApplicationActive(r.getAppId())) { + // application in resource reference is dead, it is safe to remove the + // reference + refsToRemove.add(r); + } + } + if (refsToRemove.size() > 0) { + removeResourceReferences(key, refsToRemove, false); + } + } + } + + /** * Check if a specific resource is evictable according to the store's enabled * cache eviction policies. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java new file mode 100644 index 0000000..421b5bb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestCleanerTask.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; +import org.junit.Test; + +public class TestCleanerTask { + private static final String ROOT = + YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT; + private static final long SLEEP_TIME = + YarnConfiguration.DEFAULT_SCM_CLEANER_RESOURCE_SLEEP_MS; + private static final int NESTED_LEVEL = + YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL; + + @Test + public void testNonExistentRoot() throws Exception { + FileSystem fs = mock(FileSystem.class); + CleanerMetrics metrics = mock(CleanerMetrics.class); + SCMStore store = mock(SCMStore.class); + + CleanerTask task = + createSpiedTask(fs, store, metrics, new ReentrantLock()); + // the shared cache root does not exist + when(fs.exists(task.getRootPath())).thenReturn(false); + + task.run(); + + // process() should not be called + verify(task, never()).process(); + } + + @Test + public void testProcessFreshResource() throws Exception { + FileSystem fs = mock(FileSystem.class); + CleanerMetrics metrics = mock(CleanerMetrics.class); + SCMStore store = mock(SCMStore.class); + + CleanerTask task = + createSpiedTask(fs, store, metrics, new ReentrantLock()); + + // mock a resource that is not evictable + when(store.isResourceEvictable(isA(String.class), isA(FileStatus.class))) + .thenReturn(false); + FileStatus status = mock(FileStatus.class); + when(status.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc")); + + // process the resource + task.processSingleResource(status); + + // the directory should not be renamed + verify(fs, never()).rename(eq(status.getPath()), isA(Path.class)); + // metrics should record a processed file (but not delete) + verify(metrics).reportAFileProcess(); + verify(metrics, never()).reportAFileDelete(); + } + + @Test + public void testProcessEvictableResource() throws Exception { + FileSystem fs = mock(FileSystem.class); + CleanerMetrics metrics = mock(CleanerMetrics.class); + SCMStore store = mock(SCMStore.class); + + CleanerTask task = + createSpiedTask(fs, store, metrics, new ReentrantLock()); + + // mock an evictable resource + when(store.isResourceEvictable(isA(String.class), isA(FileStatus.class))) + .thenReturn(true); + FileStatus status = mock(FileStatus.class); + when(status.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc")); + when(store.removeResource(isA(String.class))).thenReturn(true); + // rename succeeds + when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true); + // delete returns true + when(fs.delete(isA(Path.class), anyBoolean())).thenReturn(true); + + // process the resource + task.processSingleResource(status); + + // the directory should be renamed + verify(fs).rename(eq(status.getPath()), isA(Path.class)); + // metrics should record a deleted file + verify(metrics).reportAFileDelete(); + verify(metrics, never()).reportAFileProcess(); + } + + private CleanerTask createSpiedTask(FileSystem fs, SCMStore store, + CleanerMetrics metrics, Lock isCleanerRunning) { + return spy(new CleanerTask(ROOT, SLEEP_TIME, NESTED_LEVEL, fs, store, + metrics, isCleanerRunning)); + } + + @Test + public void testResourceIsInUseHasAnActiveApp() throws Exception { + FileSystem fs = mock(FileSystem.class); + CleanerMetrics metrics = mock(CleanerMetrics.class); + SCMStore store = mock(SCMStore.class); + + FileStatus resource = mock(FileStatus.class); + when(resource.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc")); + // resource is stale + when(store.isResourceEvictable(isA(String.class), isA(FileStatus.class))) + .thenReturn(true); + // but still has appIds + when(store.removeResource(isA(String.class))).thenReturn(false); + + CleanerTask task = + createSpiedTask(fs, store, metrics, new ReentrantLock()); + + // process the resource + task.processSingleResource(resource); + + // metrics should record a processed file (but not delete) + verify(metrics).reportAFileProcess(); + verify(metrics, never()).reportAFileDelete(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java new file mode 100644 index 0000000..26ab179 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/TestCleanerMetrics.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.sharedcachemanager.metrics; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Before; +import org.junit.Test; + +public class TestCleanerMetrics { + + Configuration conf = new Configuration(); + CleanerMetrics cleanerMetrics; + + @Before + public void init() { + CleanerMetrics.initSingleton(conf); + cleanerMetrics = CleanerMetrics.getInstance(); + } + + @Test + public void testMetricsOverMultiplePeriods() { + simulateACleanerRun(); + assertMetrics(4, 4, 1, 1); + simulateACleanerRun(); + assertMetrics(4, 8, 1, 2); + } + + public void simulateACleanerRun() { + cleanerMetrics.reportCleaningStart(); + cleanerMetrics.reportAFileProcess(); + cleanerMetrics.reportAFileDelete(); + cleanerMetrics.reportAFileProcess(); + cleanerMetrics.reportAFileProcess(); + } + + void assertMetrics(int proc, int totalProc, int del, int totalDel) { + assertEquals( + "Processed files in the last period are not measured correctly", proc, + cleanerMetrics.getProcessedFiles()); + assertEquals("Total processed files are not measured correctly", + totalProc, cleanerMetrics.getTotalProcessedFiles()); + assertEquals( + "Deleted files in the last period are not measured correctly", del, + cleanerMetrics.getDeletedFiles()); + assertEquals("Total deleted files are not measured correctly", + totalDel, cleanerMetrics.getTotalDeletedFiles()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/acc6f622/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java index 891703e..831ef6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java @@ -60,10 +60,8 @@ public class TestInMemorySCMStore { @Before public void setup() { - this.store = spy(new InMemorySCMStore()); this.checker = spy(new DummyAppChecker()); - doReturn(checker).when(store).createAppCheckerService( - isA(Configuration.class)); + this.store = spy(new InMemorySCMStore(checker)); } @After