This is an automated email from the ASF dual-hosted git repository.

aajisaka pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 2532eca0133 YARN-11241. Add uncleaning option for local app log file 
with log-aggregation enabled (#4703)
2532eca0133 is described below

commit 2532eca0133ef6ffe452df4035c4176351a27491
Author: Ashutosh Gupta <ashutosh.gu...@st.niituniversity.in>
AuthorDate: Mon Sep 12 15:32:20 2022 +0100

    YARN-11241. Add uncleaning option for local app log file with 
log-aggregation enabled (#4703)
    
    Co-authored-by: Ashutosh Gupta <ashu...@amazon.com>
    Signed-off-by: Akira Ajisaka <aajis...@apache.org>
    (cherry picked from commit 65a027b11299ac2d57556406a614442d8fc9acd4)
---
 .../apache/hadoop/yarn/conf/YarnConfiguration.java |  7 ++
 .../src/main/resources/yarn-default.xml            |  9 +++
 .../logaggregation/AppLogAggregatorImpl.java       | 43 +++++++-----
 .../logaggregation/TestLogAggregationService.java  | 76 +++++++++++++++-------
 4 files changed, 96 insertions(+), 39 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index c1bb6aa68d2..213e5e640a8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1514,6 +1514,13 @@ public class YarnConfiguration extends Configuration {
   public static final long DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS
       = 10 * 60 * 1000;
 
+  /**
+   * Whether to clean up nodemanager logs when log aggregation is enabled.
+   */
+  public static final String LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP =
+      YARN_PREFIX + "log-aggregation.enable-local-cleanup";
+  public static final boolean DEFAULT_LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP = 
true;
+
   /**
    * Number of seconds to retain logs on the NodeManager. Only applicable if 
Log
    * aggregation is disabled
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 4be357b78a6..2d58a1cac2b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1516,6 +1516,15 @@
     <value>600000</value>
   </property>
 
+  <property>
+    <description>Whether to clean up nodemanager logs when log aggregation is 
enabled. Setting to
+      false disables the cleanup nodemanager logging, and it causes disk full 
in the long run. Users
+      can set to false for test-only purpose.
+    </description>
+    <name>yarn.log-aggregation.enable-local-cleanup</name>
+    <value>true</value>
+  </property>
+
   <property>
     <description>Time in seconds to retain user logs. Only applicable if
     log aggregation is disabled
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 4cc0dc3c713..0a8ddc17b1c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -86,6 +86,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator 
{
   private final Dispatcher dispatcher;
   private final ApplicationId appId;
   private final String applicationId;
+  private final boolean enableLocalCleanup;
   private boolean logAggregationDisabled = false;
   private final Configuration conf;
   private final DeletionService delService;
@@ -172,6 +173,13 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
     this.logAggregationContext = logAggregationContext;
     this.context = context;
     this.nodeId = nodeId;
+    this.enableLocalCleanup =
+        conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP,
+            YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP);
+    if (!this.enableLocalCleanup) {
+      LOG.warn("{} is only for testing and not for any production system ",
+          YarnConfiguration.LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP);
+    }
     this.logAggPolicy = getLogAggPolicy(conf);
     this.recoveredLogInitedTime = recoveredLogInitedTime;
     this.logFileSizeThreshold =
@@ -337,26 +345,26 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
             appFinished, finishedContainers.contains(container));
         if (uploadedFilePathsInThisCycle.size() > 0) {
           uploadedLogsInThisCycle = true;
-          LOG.trace("Uploaded the following files for {}: {}",
-              container, uploadedFilePathsInThisCycle.toString());
-          List<Path> uploadedFilePathsInThisCycleList = new ArrayList<>();
-          
uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle);
-          if (LOG.isDebugEnabled()) {
-            for (Path uploadedFilePath : uploadedFilePathsInThisCycleList) {
-              try {
-                long fileSize = lfs.getFileStatus(uploadedFilePath).getLen();
-                if (fileSize >= logFileSizeThreshold) {
-                  LOG.debug("Log File " + uploadedFilePath
-                      + " size is " + fileSize + " bytes");
+          if (enableLocalCleanup) {
+            LOG.trace("Uploaded the following files for {}: {}", container,
+                uploadedFilePathsInThisCycle.toString());
+            List<Path> uploadedFilePathsInThisCycleList = new ArrayList<>();
+            
uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle);
+            if (LOG.isDebugEnabled()) {
+              for (Path uploadedFilePath : uploadedFilePathsInThisCycleList) {
+                try {
+                  long fileSize = lfs.getFileStatus(uploadedFilePath).getLen();
+                  if (fileSize >= logFileSizeThreshold) {
+                    LOG.debug("Log File " + uploadedFilePath + " size is " + 
fileSize + " bytes");
+                  }
+                } catch (Exception e1) {
+                  LOG.error("Failed to get log file size " + e1);
                 }
-              } catch (Exception e1) {
-                LOG.error("Failed to get log file size " + e1);
               }
             }
+            deletionTask = new FileDeletionTask(delService, 
this.userUgi.getShortUserName(), null,
+                uploadedFilePathsInThisCycleList);
           }
-          deletionTask = new FileDeletionTask(delService,
-              this.userUgi.getShortUserName(), null,
-              uploadedFilePathsInThisCycleList);
         }
 
         // This container is finished, and all its logs have been uploaded,
@@ -528,6 +536,9 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
   }
 
   private void doAppLogAggregationPostCleanUp() {
+    if (!enableLocalCleanup) {
+      return;
+    }
     // Remove the local app-log-dirs
     List<Path> localAppLogDirs = new ArrayList<Path>();
     for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) {
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 4cc9ac1f3a7..8185f5019c7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -234,31 +234,47 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     // ensure filesystems were closed
     verify(logAggregationService).closeFileSystems(
         any(UserGroupInformation.class));
-    List<Path> dirList = new ArrayList<>();
-    dirList.add(new Path(app1LogDir.toURI()));
-    verify(delSrvc, times(2)).delete(argThat(new FileDeletionMatcher(
-        delSrvc, user, null, dirList)));
-    
-    String containerIdStr = container11.toString();
-    File containerLogDir = new File(app1LogDir, containerIdStr);
-    int count = 0;
-    int maxAttempts = 50;
-    for (String fileType : new String[] { "stdout", "stderr", "syslog" }) {
-      File f = new File(containerLogDir, fileType);
-      count = 0;
-      while ((f.exists()) && (count < maxAttempts)) {
-        count++;
-        Thread.sleep(100);
+    boolean filesShouldBeDeleted =
+        
this.conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP,
+            YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP);
+    if (filesShouldBeDeleted) {
+      List<Path> dirList = new ArrayList<>();
+      dirList.add(new Path(app1LogDir.toURI()));
+      verify(delSrvc, times(2)).delete(argThat(new FileDeletionMatcher(
+          delSrvc, user, null, dirList)));
+
+      String containerIdStr = container11.toString();
+      File containerLogDir = new File(app1LogDir, containerIdStr);
+      int count = 0;
+      int maxAttempts = 50;
+      for (String fileType : new String[]{"stdout", "stderr", "syslog"}) {
+        File f = new File(containerLogDir, fileType);
+        count = 0;
+        while ((f.exists()) && (count < maxAttempts)) {
+          count++;
+          Thread.sleep(100);
+        }
+        Assert.assertFalse("File [" + f + "] was not deleted", f.exists());
       }
-      Assert.assertFalse("File [" + f + "] was not deleted", f.exists());
-    }
-    count = 0;
-    while ((app1LogDir.exists()) && (count < maxAttempts)) {
-      count++;
-      Thread.sleep(100);
+      Assert.assertFalse("Directory [" + app1LogDir + "] was not deleted",
+          app1LogDir.exists());
+    } else {
+      List<Path> dirList = new ArrayList<>();
+      dirList.add(new Path(app1LogDir.toURI()));
+      verify(delSrvc, never()).delete(argThat(new FileDeletionMatcher(
+          delSrvc, user, null, dirList)));
+
+      String containerIdStr = container11.toString();
+      File containerLogDir = new File(app1LogDir, containerIdStr);
+      Thread.sleep(5000);
+      for (String fileType : new String[]{"stdout", "stderr", "syslog"}) {
+        File f = new File(containerLogDir, fileType);
+        Assert.assertTrue("File [" + f + "] was not deleted", f.exists());
+      }
+      Assert.assertTrue("Directory [" + app1LogDir + "] was not deleted",
+          app1LogDir.exists());
     }
-    Assert.assertFalse("Directory [" + app1LogDir + "] was not deleted",
-      app1LogDir.exists());
+    delSrvc.stop();
 
     Path logFilePath = logAggregationService
         .getLogAggregationFileController(conf)
@@ -297,6 +313,20 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     verifyLocalFileDeletion(logAggregationService);
   }
 
+  @Test
+  public void testLocalFileRemainsAfterUploadOnCleanupDisable() throws 
Exception {
+    this.delSrvc = new DeletionService(createContainerExecutor());
+    delSrvc = spy(delSrvc);
+    this.delSrvc.init(conf);
+    
this.conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP, 
false);
+    this.conf.set(YarnConfiguration.NM_LOG_DIRS, 
localLogDir.getAbsolutePath());
+    this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+        this.remoteRootLogDir.getAbsolutePath());
+    LogAggregationService logAggregationService = spy(
+        new LogAggregationService(dispatcher, this.context, this.delSrvc, 
super.dirsHandler));
+    verifyLocalFileDeletion(logAggregationService);
+  }
+
   @Test
   public void testLocalFileDeletionOnDiskFull() throws Exception {
     this.delSrvc = new DeletionService(createContainerExecutor());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to