Author: jlowe Date: Thu Jan 31 22:46:05 2013 New Revision: 1441239 URL: http://svn.apache.org/viewvc?rev=1441239&view=rev Log: YARN-364. AggregatedLogDeletionService can take too long to delete logs. Contributed by Jason Lowe
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1441239&r1=1441238&r2=1441239&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Thu Jan 31 22:46:05 2013 @@ -284,6 +284,9 @@ Release 0.23.7 - UNRELEASED YARN-343. Capacity Scheduler maximum-capacity value -1 is invalid (Xuan Gong via tgraves) + YARN-364. AggregatedLogDeletionService can take too long to delete logs + (jlowe) + Release 0.23.6 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1441239&r1=1441238&r2=1441239&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Thu Jan 31 22:46:05 2013 @@ -380,6 +380,15 @@ public class YarnConfiguration extends C public static final long DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS = -1; /** + * How long to wait between aggregated log retention checks. If set to + * a value <= 0 then the value is computed as one-tenth of the log retention + * setting. Be careful set this too small and you will spam the name node. + */ + public static final String LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS = + YARN_PREFIX + "log-aggregation.retain-check-interval-seconds"; + public static final long DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS = -1; + + /** * Number of seconds to retain logs on the NodeManager. Only applicable if Log * aggregation is disabled */ Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java?rev=1441239&r1=1441238&r2=1441239&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java Thu Jan 31 22:46:05 2013 @@ -140,9 +140,16 @@ public class AggregatedLogDeletionServic " too small (" + retentionSecs + ")"); return; } + long checkIntervalMsecs = 1000 * conf.getLong( + YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS); + if (checkIntervalMsecs <= 0) { + // when unspecified compute check interval as 1/10th of retention + checkIntervalMsecs = (retentionSecs * 1000) / 10; + } TimerTask task = new LogDeletionTask(conf, retentionSecs); timer = new Timer(); - timer.scheduleAtFixedRate(task, 0, retentionSecs * 1000); + timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs); super.start(); } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1441239&r1=1441238&r2=1441239&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Thu Jan 31 22:46:05 2013 @@ -411,6 +411,15 @@ </property> <property> + <description>How long to wait between aggregated log retention checks. + If set to 0 or a negative value then the value is computed as one-tenth + of the aggregated log retention time. Be careful set this too small and + you will spam the name node.</description> + <name>yarn.log-aggregation.retain-check-interval-seconds</name> + <value>-1</value> + </property> + + <property> <description>Time in seconds to retain user logs. Only applicable if log aggregation is disabled </description> Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java?rev=1441239&r1=1441238&r2=1441239&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java Thu Jan 31 22:46:05 2013 @@ -28,12 +28,19 @@ import org.apache.hadoop.fs.FilterFileSy import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Before; import org.junit.Test; import static org.mockito.Mockito.*; public class TestAggregatedLogDeletionService { + @Before + public void closeFilesystems() throws IOException { + // prevent the same mockfs instance from being reused due to FS cache + FileSystem.closeAll(); + } + @Test public void testDeletion() throws Exception { long now = System.currentTimeMillis(); @@ -121,6 +128,70 @@ public class TestAggregatedLogDeletionSe verify(mockFs).delete(app4Dir, true); } + @Test + public void testCheckInterval() throws Exception { + long RETENTION_SECS = 10 * 24 * 3600; + long now = System.currentTimeMillis(); + long toDeleteTime = now - RETENTION_SECS*1000; + + String root = "mockfs://foo/"; + String remoteRootLogDir = root+"tmp/logs"; + String suffix = "logs"; + Configuration conf = new Configuration(); + conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); + conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true"); + conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "864000"); + conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, "1"); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); + + // prevent us from picking up the same mockfs instance from another test + FileSystem.closeAll(); + Path rootPath = new Path(root); + FileSystem rootFs = rootPath.getFileSystem(conf); + FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem(); + + Path remoteRootLogPath = new Path(remoteRootLogDir); + + Path userDir = new Path(remoteRootLogPath, "me"); + FileStatus userDirStatus = new FileStatus(0, true, 0, 0, now, userDir); + + when(mockFs.listStatus(remoteRootLogPath)).thenReturn( + new FileStatus[]{userDirStatus}); + + Path userLogDir = new Path(userDir, suffix); + Path app1Dir = new Path(userLogDir, "application_1_1"); + FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir); + + when(mockFs.listStatus(userLogDir)).thenReturn( + new FileStatus[]{app1DirStatus}); + + Path app1Log1 = new Path(app1Dir, "host1"); + FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, now, app1Log1); + + when(mockFs.listStatus(app1Dir)).thenReturn( + new FileStatus[]{app1Log1Status}); + + AggregatedLogDeletionService deletionSvc = + new AggregatedLogDeletionService(); + deletionSvc.init(conf); + deletionSvc.start(); + + verify(mockFs, timeout(10000).atLeast(4)).listStatus(any(Path.class)); + verify(mockFs, never()).delete(app1Dir, true); + + // modify the timestamp of the logs and verify it's picked up quickly + app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir); + app1Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app1Log1); + when(mockFs.listStatus(userLogDir)).thenReturn( + new FileStatus[]{app1DirStatus}); + when(mockFs.listStatus(app1Dir)).thenReturn( + new FileStatus[]{app1Log1Status}); + + verify(mockFs, timeout(10000)).delete(app1Dir, true); + + deletionSvc.stop(); + } static class MockFileSystem extends FilterFileSystem { MockFileSystem() {