Author: jlowe
Date: Thu Jan 31 22:46:05 2013
New Revision: 1441239

URL: http://svn.apache.org/viewvc?rev=1441239&view=rev
Log:
YARN-364. AggregatedLogDeletionService can take too long to delete logs. 
Contributed by Jason Lowe

Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1441239&r1=1441238&r2=1441239&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Thu Jan 31 22:46:05 2013
@@ -284,6 +284,9 @@ Release 0.23.7 - UNRELEASED
     YARN-343. Capacity Scheduler maximum-capacity value -1 is invalid (Xuan 
     Gong via tgraves)
 
+    YARN-364. AggregatedLogDeletionService can take too long to delete logs
+    (jlowe)
+
 Release 0.23.6 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1441239&r1=1441238&r2=1441239&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 Thu Jan 31 22:46:05 2013
@@ -380,6 +380,15 @@ public class YarnConfiguration extends C
   public static final long DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS = -1;
   
   /**
+   * How long to wait between aggregated log retention checks. If set to
+   * a value <= 0 then the value is computed as one-tenth of the log retention
+   * setting. Be careful set this too small and you will spam the name node.
+   */
+  public static final String LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS =
+      YARN_PREFIX + "log-aggregation.retain-check-interval-seconds";
+  public static final long 
DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS = -1;
+
+  /**
    * Number of seconds to retain logs on the NodeManager. Only applicable if 
Log
    * aggregation is disabled
    */

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java?rev=1441239&r1=1441238&r2=1441239&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
 Thu Jan 31 22:46:05 2013
@@ -140,9 +140,16 @@ public class AggregatedLogDeletionServic
                " too small (" + retentionSecs + ")");
       return;
     }
+    long checkIntervalMsecs = 1000 * conf.getLong(
+        YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
+        
YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS);
+    if (checkIntervalMsecs <= 0) {
+      // when unspecified compute check interval as 1/10th of retention
+      checkIntervalMsecs = (retentionSecs * 1000) / 10;
+    }
     TimerTask task = new LogDeletionTask(conf, retentionSecs);
     timer = new Timer();
-    timer.scheduleAtFixedRate(task, 0, retentionSecs * 1000);
+    timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
     super.start();
   }
 

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1441239&r1=1441238&r2=1441239&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 Thu Jan 31 22:46:05 2013
@@ -411,6 +411,15 @@
   </property> 
   
   <property>
+    <description>How long to wait between aggregated log retention checks.
+    If set to 0 or a negative value then the value is computed as one-tenth
+    of the aggregated log retention time. Be careful set this too small and
+    you will spam the name node.</description>
+    <name>yarn.log-aggregation.retain-check-interval-seconds</name>
+    <value>-1</value>
+  </property>
+
+  <property>
     <description>Time in seconds to retain user logs. Only applicable if
     log aggregation is disabled
     </description>

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java?rev=1441239&r1=1441238&r2=1441239&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
 Thu Jan 31 22:46:05 2013
@@ -28,12 +28,19 @@ import org.apache.hadoop.fs.FilterFileSy
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Before;
 import org.junit.Test;
 
 import static org.mockito.Mockito.*;
 
 public class TestAggregatedLogDeletionService {
   
+  @Before
+  public void closeFilesystems() throws IOException {
+    // prevent the same mockfs instance from being reused due to FS cache
+    FileSystem.closeAll();
+  }
+
   @Test
   public void testDeletion() throws Exception {
     long now = System.currentTimeMillis();
@@ -121,6 +128,70 @@ public class TestAggregatedLogDeletionSe
     verify(mockFs).delete(app4Dir, true);
   }
 
+  @Test
+  public void testCheckInterval() throws Exception {
+    long RETENTION_SECS = 10 * 24 * 3600;
+    long now = System.currentTimeMillis();
+    long toDeleteTime = now - RETENTION_SECS*1000;
+
+    String root = "mockfs://foo/";
+    String remoteRootLogDir = root+"tmp/logs";
+    String suffix = "logs";
+    Configuration conf = new Configuration();
+    conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
+    conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
+    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "864000");
+    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, 
"1");
+    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
+    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
+
+    // prevent us from picking up the same mockfs instance from another test
+    FileSystem.closeAll();
+    Path rootPath = new Path(root);
+    FileSystem rootFs = rootPath.getFileSystem(conf);
+    FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
+
+    Path remoteRootLogPath = new Path(remoteRootLogDir);
+
+    Path userDir = new Path(remoteRootLogPath, "me");
+    FileStatus userDirStatus = new FileStatus(0, true, 0, 0, now, userDir);
+
+    when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
+        new FileStatus[]{userDirStatus});
+
+    Path userLogDir = new Path(userDir, suffix);
+    Path app1Dir = new Path(userLogDir, "application_1_1");
+    FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir);
+
+    when(mockFs.listStatus(userLogDir)).thenReturn(
+        new FileStatus[]{app1DirStatus});
+
+    Path app1Log1 = new Path(app1Dir, "host1");
+    FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, now, app1Log1);
+
+    when(mockFs.listStatus(app1Dir)).thenReturn(
+        new FileStatus[]{app1Log1Status});
+
+    AggregatedLogDeletionService deletionSvc =
+        new AggregatedLogDeletionService();
+    deletionSvc.init(conf);
+    deletionSvc.start();
+
+    verify(mockFs, timeout(10000).atLeast(4)).listStatus(any(Path.class));
+    verify(mockFs, never()).delete(app1Dir, true);
+
+    // modify the timestamp of the logs and verify it's picked up quickly
+    app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir);
+    app1Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app1Log1);
+    when(mockFs.listStatus(userLogDir)).thenReturn(
+        new FileStatus[]{app1DirStatus});
+    when(mockFs.listStatus(app1Dir)).thenReturn(
+        new FileStatus[]{app1Log1Status});
+
+    verify(mockFs, timeout(10000)).delete(app1Dir, true);
+
+    deletionSvc.stop();
+  }
   
   static class MockFileSystem extends FilterFileSystem {
     MockFileSystem() {


Reply via email to