This is an automated email from the ASF dual-hosted git repository.

slfan1989 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ee94f6cdcbc YARN-11277. Trigger log-dir deletion by size for 
NonAggregatingLogHandler. (#4797)
ee94f6cdcbc is described below

commit ee94f6cdcbc4914cbdf1a2132f318083c4d38494
Author: Xianming Lei <31424839+le...@users.noreply.github.com>
AuthorDate: Mon Jun 5 11:08:06 2023 +0800

    YARN-11277. Trigger log-dir deletion by size for NonAggregatingLogHandler. 
(#4797)
    
    Reviewed-by: Akira Ajisaka <aajis...@apache.org>
    Reviewed-by: Ashutosh Gupta <ashu...@amazon.com>
    Reviewed-by: Shilun Fan <slfan1...@apache.org>
    Signed-off-by: Shilun Fan <slfan1...@apache.org>
---
 .../apache/hadoop/yarn/conf/YarnConfiguration.java | 11 +++
 .../src/main/resources/yarn-default.xml            | 23 +++++
 .../loghandler/NonAggregatingLogHandler.java       | 49 +++++++++--
 .../loghandler/TestNonAggregatingLogHandler.java   | 99 ++++++++++++++++++++++
 4 files changed, 174 insertions(+), 8 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index b6601b835d6..ff531cdc2a7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -4960,6 +4960,17 @@ public class YarnConfiguration extends Configuration {
   public static final String APPS_CACHE_EXPIRE = YARN_PREFIX + 
"apps.cache.expire";
   public static final String DEFAULT_APPS_CACHE_EXPIRE = "30s";
 
+  /** Enabled trigger log-dir deletion by size for NonAggregatingLogHandler. */
+  public static final String NM_LOG_TRIGGER_DELETE_BY_SIZE_ENABLED = NM_PREFIX 
+
+      "log.trigger.delete.by-size.enabled";
+  public static final boolean DEFAULT_NM_LOG_TRIGGER_DELETE_BY_SIZE_ENABLED = 
false;
+
+  /** Trigger log-dir deletion when the total log size of an app is greater 
than
+   *  yarn.nodemanager.log.delete.threshold.
+   *  Depends on yarn.nodemanager.log.trigger.delete.by-size.enabled = true. */
+  public static final String NM_LOG_DELETE_THRESHOLD = NM_PREFIX + 
"log.delete.threshold";
+  public static final long DEFAULT_NM_LOG_DELETE_THRESHOLD = 100L * 1024 * 
1024 * 1024;
+
   public YarnConfiguration() {
     super();
   }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 1f0982aede0..aea92260013 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -5293,4 +5293,27 @@
     </description>
   </property>
 
+  <property>
+    <name>yarn.nodemanager.log.trigger.delete.by-size.enabled</name>
+    <value>false</value>
+    <description>
+      Optional.
+      Enabled trigger log-dir deletion by size for NonAggregatingLogHandler
+    </description>
+  </property>
+
+  <property>
+    <name>yarn.nodemanager.log.delete.threshold</name>
+    <value>100g</value>
+    <description>
+      Optional.
+      Trigger log-dir deletion when the total log size of an app is greater 
than
+      yarn.nodemanager.log.delete.threshold and
+      yarn.nodemanager.log.trigger.delete.by-size.enabled = true.
+      You can use the following suffix (case insensitive): k(kilo), m(mega), 
g(giga), t(tera), p(peta),
+      e(exa) to specify the size (such as 128k, 512m, 1g, etc.),
+      Or provide complete size in bytes (such as 134217728 for 128 MB).
+    </description>
+  </property>
+
 </configuration>
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
index e8b2f472fb4..1721756f056 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
@@ -71,6 +71,8 @@ public class NonAggregatingLogHandler extends AbstractService 
implements
   private final LocalDirsHandlerService dirsHandler;
   private final NMStateStoreService stateStore;
   private long deleteDelaySeconds;
+  private boolean enableTriggerDeleteBySize;
+  private long deleteThreshold;
   private ScheduledThreadPoolExecutor sched;
 
   public NonAggregatingLogHandler(Dispatcher dispatcher,
@@ -90,6 +92,12 @@ public class NonAggregatingLogHandler extends 
AbstractService implements
     this.deleteDelaySeconds =
         conf.getLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS,
                 YarnConfiguration.DEFAULT_NM_LOG_RETAIN_SECONDS);
+    this.enableTriggerDeleteBySize =
+        
conf.getBoolean(YarnConfiguration.NM_LOG_TRIGGER_DELETE_BY_SIZE_ENABLED,
+        YarnConfiguration.DEFAULT_NM_LOG_TRIGGER_DELETE_BY_SIZE_ENABLED);
+    this.deleteThreshold =
+        conf.getLongBytes(YarnConfiguration.NM_LOG_DELETE_THRESHOLD,
+        YarnConfiguration.DEFAULT_NM_LOG_DELETE_THRESHOLD);
     sched = createScheduledThreadPoolExecutor(conf);
     super.serviceInit(conf);
     recover();
@@ -165,13 +173,9 @@ public class NonAggregatingLogHandler extends 
AbstractService implements
         LogHandlerAppFinishedEvent appFinishedEvent =
             (LogHandlerAppFinishedEvent) event;
         ApplicationId appId = appFinishedEvent.getApplicationId();
-        // Schedule - so that logs are available on the UI till they're 
deleted.
-        LOG.info("Scheduling Log Deletion for application: "
-            + appId + ", with delay of "
-            + this.deleteDelaySeconds + " seconds");
         String user = appOwners.remove(appId);
         if (user == null) {
-          LOG.error("Unable to locate user for " + appId);
+          LOG.error("Unable to locate user for {}", appId);
           // send LOG_HANDLING_FAILED out
           NonAggregatingLogHandler.this.dispatcher.getEventHandler().handle(
               new ApplicationEvent(appId,
@@ -191,8 +195,20 @@ public class NonAggregatingLogHandler extends 
AbstractService implements
           LOG.error("Unable to record log deleter state", e);
         }
         try {
-          sched.schedule(logDeleter, this.deleteDelaySeconds,
-              TimeUnit.SECONDS);
+          boolean logDeleterStarted = false;
+          if (enableTriggerDeleteBySize) {
+            final long appLogSize = calculateSizeOfAppLogs(user, appId);
+            if (appLogSize >= deleteThreshold) {
+              LOG.info("Log Deletion for application: {}, with no delay, 
size={}", appId, appLogSize);
+              sched.schedule(logDeleter, 0, TimeUnit.SECONDS);
+              logDeleterStarted = true;
+            }
+          }
+          if (!logDeleterStarted) {
+            LOG.info("Scheduling Log Deletion for application: {}, with delay 
of {} seconds",
+                appId, this.deleteDelaySeconds);
+            sched.schedule(logDeleter, this.deleteDelaySeconds, 
TimeUnit.SECONDS);
+          }
         } catch (RejectedExecutionException e) {
           // Handling this event in local thread before starting threads
           // or after calling sched.shutdownNow().
@@ -200,7 +216,6 @@ public class NonAggregatingLogHandler extends 
AbstractService implements
         }
         break;
       default:
-        ; // Ignore
     }
   }
 
@@ -220,6 +235,24 @@ public class NonAggregatingLogHandler extends 
AbstractService implements
     return sched;
   }
 
+  private long calculateSizeOfAppLogs(String user, ApplicationId 
applicationId) {
+    FileContext lfs = getLocalFileContext(getConfig());
+    long appLogsSize = 0L;
+    for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) {
+      Path logDir = new Path(rootLogDir, applicationId.toString());
+      try {
+        appLogsSize += lfs.getFileStatus(logDir).getLen();
+      } catch (UnsupportedFileSystemException ue) {
+        LOG.warn("Unsupported file system used for log dir {}", logDir, ue);
+        continue;
+      } catch (IOException ie) {
+        LOG.error("Unable to getFileStatus for {}", logDir, ie);
+        continue;
+      }
+    }
+    return appLogsSize;
+  }
+
   class LogDeleterRunnable implements Runnable {
     private String user;
     private ApplicationId applicationId;
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
index 8d5adf64321..55a201b7fc2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
@@ -596,4 +596,103 @@ public class TestNonAggregatingLogHandler {
     }
   }
 
+  @Test
+  public void testLogSizeThresholdDeletion() throws IOException {
+    ApplicationId anotherAppId = BuilderUtils.newApplicationId(4567, 1);
+    ContainerId container22 = BuilderUtils.newContainerId(appAttemptId, 2);
+    String user2 = "test_user2";
+    File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 2);
+    String localLogDirsString = localLogDirs[0].getAbsolutePath() + ","
+                                    + localLogDirs[1].getAbsolutePath();
+
+    conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
+    conf.setBoolean(YarnConfiguration.NM_LOG_TRIGGER_DELETE_BY_SIZE_ENABLED, 
true);
+    conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
+    conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 60 * 1000);
+    conf.set(YarnConfiguration.NM_LOG_DELETE_THRESHOLD, "15g");
+
+    dirsHandler.init(conf);
+
+    NonAggregatingLogHandler rawLogHandler =
+        new NonAggregatingLogHandler(dispatcher, mockDelService, dirsHandler,
+            new NMNullStateStoreService());
+    NonAggregatingLogHandler logHandler = spy(rawLogHandler);
+    AbstractFileSystem spylfs =
+        spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+    FileContext lfs = FileContext.getFileContext(spylfs, conf);
+    doReturn(lfs).when(logHandler)
+        .getLocalFileContext(isA(Configuration.class));
+    FsPermission defaultPermission =
+        FsPermission.getDirDefault().applyUMask(lfs.getUMask());
+    FileStatus fs1 =
+        new FileStatus(10 * 1024 * 1024 * 1024L, true, 1, 0,
+            System.currentTimeMillis(), 0, defaultPermission, "", "",
+            new Path(localLogDirs[0].getAbsolutePath()));
+    FileStatus fs2 =
+        new FileStatus(5 * 1024 * 1024 * 1024L, true, 1, 0,
+            System.currentTimeMillis(), 0, defaultPermission, "", "",
+            new Path(localLogDirs[0].getAbsolutePath()));
+    Path path1 = new Path(localLogDirs[0].getAbsolutePath(), appId.toString());
+    Path path2 = new Path(localLogDirs[1].getAbsolutePath(), appId.toString());
+    Path path3 = new Path(localLogDirs[0].getAbsolutePath(), 
anotherAppId.toString());
+    Path path4 = new Path(localLogDirs[1].getAbsolutePath(), 
anotherAppId.toString());
+
+    doReturn(fs1).when(spylfs).getFileStatus(eq(path1));
+    doReturn(fs1).when(spylfs).getFileStatus(eq(path2));
+    doReturn(fs2).when(spylfs).getFileStatus(eq(path3));
+    doReturn(fs2).when(spylfs).getFileStatus(eq(path4));
+
+    logHandler.init(conf);
+    logHandler.start();
+
+    logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, null));
+
+    logHandler.handle(new LogHandlerContainerFinishedEvent(container11,
+        ContainerType.APPLICATION_MASTER, 0));
+
+    logHandler.handle(new LogHandlerAppFinishedEvent(appId));
+
+    logHandler.handle(new LogHandlerAppStartedEvent(anotherAppId, user2,
+        null, null));
+
+    logHandler.handle(new LogHandlerContainerFinishedEvent(container22,
+        ContainerType.APPLICATION_MASTER, 0));
+
+    logHandler.handle(new LogHandlerAppFinishedEvent(anotherAppId));
+
+    Path[] localAppLogDirs = new Path[]{path1, path2};
+    Path[] anotherLocalAppLogDirs = new Path[]{path3, path4};
+
+    testDeletionServiceCall(mockDelService, user, 5000, localAppLogDirs);
+    testDeletionServiceNeverCall(mockDelService, user2, 5000, 
anotherLocalAppLogDirs);
+
+    logHandler.close();
+    for (int i = 0; i < localLogDirs.length; i++) {
+      FileUtils.deleteDirectory(localLogDirs[i]);
+    }
+  }
+
+  static void testDeletionServiceNeverCall(DeletionService delService, String 
user,
+      long timeout, Path... matchPaths) {
+    long verifyStartTime = System.currentTimeMillis();
+    WantedButNotInvoked notInvokedException = null;
+    boolean matched = false;
+    while (!matched && System.currentTimeMillis() < verifyStartTime + timeout) 
{
+      try {
+        verify(delService, never()).delete(argThat(new FileDeletionMatcher(
+            delService, user, null, Arrays.asList(matchPaths))));
+        matched = true;
+      } catch (WantedButNotInvoked e) {
+        notInvokedException = e;
+        try {
+          Thread.sleep(50l);
+        } catch (InterruptedException i) {
+        }
+      }
+    }
+    if (!matched) {
+      throw notInvokedException;
+    }
+    return;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to