This is an automated email from the ASF dual-hosted git repository.

quapaw pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e6ecc4f3e44 YARN-11188. Only files belong to the first file controller 
are removed even if multiple log aggregation file controllers are configured. 
Contributed by Szilard Nemeth.
e6ecc4f3e44 is described below

commit e6ecc4f3e4433ae23fd745f6e0c641a019664253
Author: 9uapaw <gyora...@gmail.com>
AuthorDate: Wed Jun 22 14:40:00 2022 +0200

    YARN-11188. Only files belong to the first file controller are removed even 
if multiple log aggregation file controllers are configured. Contributed by 
Szilard Nemeth.
---
 .../AggregatedLogDeletionService.java              | 55 ++++++++++++------
 .../TestAggregatedLogDeletionService.java          | 67 ++++++++++++++++++++--
 .../AggregatedLogDeletionServiceForTest.java       | 13 +++--
 .../testutils/LogAggregationTestcase.java          | 35 +++++++++--
 .../testutils/MockRMClientUtils.java               |  4 +-
 5 files changed, 141 insertions(+), 33 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
index eb6466a3a02..4f10b2fd4ce 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.logaggregation;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
 
@@ -57,7 +59,7 @@ public class AggregatedLogDeletionService extends 
AbstractService {
   
   private Timer timer = null;
   private long checkIntervalMsecs;
-  private LogDeletionTask task;
+  private List<LogDeletionTask> tasks;
   
   public static class LogDeletionTask extends TimerTask {
     private Configuration conf;
@@ -66,14 +68,12 @@ public class AggregatedLogDeletionService extends 
AbstractService {
     private Path remoteRootLogDir = null;
     private ApplicationClientProtocol rmClient = null;
     
-    public LogDeletionTask(Configuration conf, long retentionSecs, 
ApplicationClientProtocol rmClient) {
+    public LogDeletionTask(Configuration conf, long retentionSecs,
+                           ApplicationClientProtocol rmClient,
+                           LogAggregationFileController fileController) {
       this.conf = conf;
       this.retentionMillis = retentionSecs * 1000;
       this.suffix = LogAggregationUtils.getBucketSuffix();
-      LogAggregationFileControllerFactory factory =
-          new LogAggregationFileControllerFactory(conf);
-      LogAggregationFileController fileController =
-          factory.getFileControllerForWrite();
       this.remoteRootLogDir = fileController.getRemoteRootLogDir();
       this.rmClient = rmClient;
     }
@@ -220,7 +220,7 @@ public class AggregatedLogDeletionService extends 
AbstractService {
 
   @Override
   protected void serviceStart() throws Exception {
-    scheduleLogDeletionTask();
+    scheduleLogDeletionTasks();
     super.serviceStart();
   }
 
@@ -249,13 +249,13 @@ public class AggregatedLogDeletionService extends 
AbstractService {
       setConfig(conf);
       stopRMClient();
       stopTimer();
-      scheduleLogDeletionTask();
+      scheduleLogDeletionTasks();
     } else {
       LOG.warn("Failed to execute refreshLogRetentionSettings : Aggregated Log 
Deletion Service is not started");
     }
   }
   
-  private void scheduleLogDeletionTask() throws IOException {
+  private void scheduleLogDeletionTasks() throws IOException {
     Configuration conf = getConfig();
     if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
         YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
@@ -271,9 +271,28 @@ public class AggregatedLogDeletionService extends 
AbstractService {
       return;
     }
     setLogAggCheckIntervalMsecs(retentionSecs);
-    task = new LogDeletionTask(conf, retentionSecs, createRMClient());
-    timer = new Timer();
-    timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
+
+    tasks = createLogDeletionTasks(conf, retentionSecs, createRMClient());
+    for (LogDeletionTask task : tasks) {
+      timer = new Timer();
+      timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
+    }
+  }
+
+  @VisibleForTesting
+  public List<LogDeletionTask> createLogDeletionTasks(Configuration conf, long 
retentionSecs,
+                                                      
ApplicationClientProtocol rmClient)
+          throws IOException {
+    List<LogDeletionTask> tasks = new ArrayList<>();
+    LogAggregationFileControllerFactory factory = new 
LogAggregationFileControllerFactory(conf);
+    List<LogAggregationFileController> fileControllers =
+            factory.getConfiguredLogAggregationFileControllerList();
+    for (LogAggregationFileController fileController : fileControllers) {
+      LogDeletionTask task = new LogDeletionTask(conf, retentionSecs, rmClient,
+              fileController);
+      tasks.add(task);
+    }
+    return tasks;
   }
 
   private void stopTimer() {
@@ -295,14 +314,18 @@ public class AggregatedLogDeletionService extends 
AbstractService {
   // as @Idempotent, it will automatically take care of RM restart/failover.
   @VisibleForTesting
   protected ApplicationClientProtocol createRMClient() throws IOException {
-    return ClientRMProxy.createRMProxy(getConfig(),
-      ApplicationClientProtocol.class);
+    return ClientRMProxy.createRMProxy(getConfig(), 
ApplicationClientProtocol.class);
   }
 
   @VisibleForTesting
   protected void stopRMClient() {
-    if (task != null && task.getRMClient() != null) {
-      RPC.stopProxy(task.getRMClient());
+    for (LogDeletionTask task : tasks) {
+      if (task != null && task.getRMClient() != null) {
+        RPC.stopProxy(task.getRMClient());
+        //The RMClient instance is the same for all deletion tasks.
+        //It is enough to close the RM client once
+        break;
+      }
     }
   }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
index 13a9afa84e0..285ac43322a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
@@ -42,6 +42,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import static 
org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT;
+import static 
org.apache.hadoop.yarn.logaggregation.LogAggregationTestUtils.enableFileControllers;
 import static 
org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcaseBuilder.NO_TIMEOUT;
 import static org.mockito.Mockito.mock;
 
@@ -118,12 +119,12 @@ public class TestAggregatedLogDeletionService {
             .withRunningApps(4)
             .injectExceptionForAppDirDeletion(3)
             .build()
-            .setupAndRunDeletionService()
+            .startDeletionService()
             .verifyAppDirsDeleted(timeout, 1, 3)
             .verifyAppDirsNotDeleted(timeout, 2, 4)
             .verifyAppFileDeleted(4, 1, timeout)
             .verifyAppFileNotDeleted(4, 2, timeout)
-            .teardown();
+            .teardown(1);
   }
 
   @Test
@@ -155,7 +156,7 @@ public class TestAggregatedLogDeletionService {
             .build();
     
     testcase
-            .setupAndRunDeletionService()
+            .startDeletionService()
             //app1Dir would be deleted since it is done above log retention 
period
             .verifyAppDirDeleted(1, 10000L)
             //app2Dir is not expected to be deleted since it is below the 
threshold
@@ -176,7 +177,8 @@ public class TestAggregatedLogDeletionService {
             .verifyCheckIntervalMilliSecondsEqualTo(checkIntervalMilliSeconds)
             //app2Dir should be deleted since it falls above the threshold
             .verifyAppDirDeleted(2, 10000L)
-            .teardown();
+            //Close expected 2 times: once for refresh and once for stopping
+            .teardown(2);
   }
   
   @Test
@@ -202,7 +204,7 @@ public class TestAggregatedLogDeletionService {
             .withFinishedApps(1)
             .withRunningApps()
             .build()
-            .setupAndRunDeletionService()
+            .startDeletionService()
             .verifyAnyPathListedAtLeast(4, 10000L)
             .verifyAppDirNotDeleted(1, NO_TIMEOUT)
             // modify the timestamp of the logs and verify if it is picked up 
quickly
@@ -211,7 +213,7 @@ public class TestAggregatedLogDeletionService {
             .changeModTimeOfBucketDir(toDeleteTime)
             .reinitAllPaths()
             .verifyAppDirDeleted(1, 10000L)
-            .teardown();
+            .teardown(1);
   }
 
   @Test
@@ -241,6 +243,59 @@ public class TestAggregatedLogDeletionService {
             .verifyAppDirDeleted(3, NO_TIMEOUT);
   }
 
+  @Test
+  public void testDeletionTwoControllers() throws IOException {
+    long now = System.currentTimeMillis();
+    long toDeleteTime = now - (2000 * 1000);
+    long toKeepTime = now - (1500 * 1000);
+
+
+    Configuration conf = setupConfiguration(1800, -1);
+    enableFileControllers(conf, REMOTE_ROOT_LOG_DIR, ALL_FILE_CONTROLLERS,
+            ALL_FILE_CONTROLLER_NAMES);
+    long timeout = 2000L;
+    LogAggregationTestcaseBuilder.create(conf)
+            .withRootPath(ROOT)
+            .withRemoteRootLogPath(REMOTE_ROOT_LOG_DIR)
+            .withBothFileControllers()
+            .withUserDir(USER_ME, toKeepTime)
+            .withSuffixDir(SUFFIX, toDeleteTime)
+            .withBucketDir(toDeleteTime)
+            .withApps(//Apps for TFile
+                    Lists.newArrayList(
+                            new AppDescriptor(T_FILE, toDeleteTime, 
Lists.newArrayList()),
+                    new AppDescriptor(T_FILE, toDeleteTime, Lists.newArrayList(
+                            Pair.of(DIR_HOST1, toDeleteTime),
+                            Pair.of(DIR_HOST2, toKeepTime))),
+                    new AppDescriptor(T_FILE, toDeleteTime, Lists.newArrayList(
+                            Pair.of(DIR_HOST1, toDeleteTime),
+                            Pair.of(DIR_HOST2, toDeleteTime))),
+                    new AppDescriptor(T_FILE, toDeleteTime, Lists.newArrayList(
+                            Pair.of(DIR_HOST1, toDeleteTime),
+                            Pair.of(DIR_HOST2, toKeepTime))),
+                    //Apps for IFile
+                    new AppDescriptor(I_FILE, toDeleteTime, 
Lists.newArrayList()),
+                    new AppDescriptor(I_FILE, toDeleteTime, Lists.newArrayList(
+                            Pair.of(DIR_HOST1, toDeleteTime),
+                            Pair.of(DIR_HOST2, toKeepTime))),
+                    new AppDescriptor(I_FILE, toDeleteTime, Lists.newArrayList(
+                            Pair.of(DIR_HOST1, toDeleteTime),
+                            Pair.of(DIR_HOST2, toDeleteTime))),
+                    new AppDescriptor(I_FILE, toDeleteTime, Lists.newArrayList(
+                            Pair.of(DIR_HOST1, toDeleteTime),
+                            Pair.of(DIR_HOST2, toKeepTime)))))
+            .withFinishedApps(1, 2, 3, 5, 6, 7)
+            .withRunningApps(4, 8)
+            .injectExceptionForAppDirDeletion(3, 6)
+            .build()
+            .startDeletionService()
+            .verifyAppDirsDeleted(timeout, 1, 3, 5, 7)
+            .verifyAppDirsNotDeleted(timeout, 2, 4, 6, 8)
+            .verifyAppFilesDeleted(timeout, Lists.newArrayList(Pair.of(4, 1), 
Pair.of(8, 1)))
+            .verifyAppFilesNotDeleted(timeout, Lists.newArrayList(Pair.of(4, 
2), Pair.of(8, 2)))
+            .teardown(1);
+  }
+
   static class MockFileSystem extends FilterFileSystem {
     MockFileSystem() {
       super(mock(FileSystem.class));
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/AggregatedLogDeletionServiceForTest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/AggregatedLogDeletionServiceForTest.java
index 49042cf458b..76ec8aab537 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/AggregatedLogDeletionServiceForTest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/AggregatedLogDeletionServiceForTest.java
@@ -32,6 +32,7 @@ public class AggregatedLogDeletionServiceForTest extends 
AggregatedLogDeletionSe
   private final List<ApplicationId> finishedApplications;
   private final List<ApplicationId> runningApplications;
   private final Configuration conf;
+  private ApplicationClientProtocol mockRMClient;
 
   public AggregatedLogDeletionServiceForTest(List<ApplicationId> 
runningApplications,
                                              List<ApplicationId> 
finishedApplications) {
@@ -48,11 +49,16 @@ public class AggregatedLogDeletionServiceForTest extends 
AggregatedLogDeletionSe
 
   @Override
   protected ApplicationClientProtocol createRMClient() throws IOException {
+    if (mockRMClient != null) {
+      return mockRMClient;
+    }
     try {
-      return createMockRMClient(finishedApplications, runningApplications);
+      mockRMClient =
+          createMockRMClient(finishedApplications, runningApplications);
     } catch (Exception e) {
       throw new IOException(e);
     }
+    return mockRMClient;
   }
 
   @Override
@@ -60,8 +66,7 @@ public class AggregatedLogDeletionServiceForTest extends 
AggregatedLogDeletionSe
     return conf;
   }
 
-  @Override
-  protected void stopRMClient() {
-    // DO NOTHING
+  public ApplicationClientProtocol getMockRMClient() {
+    return mockRMClient;
   }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/LogAggregationTestcase.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/LogAggregationTestcase.java
index 8f535d40714..f2074f8c8e6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/LogAggregationTestcase.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/LogAggregationTestcase.java
@@ -28,10 +28,12 @@ import org.apache.hadoop.util.Sets;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService;
+import 
org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService.LogDeletionTask;
 import 
org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcaseBuilder.AppDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -77,6 +79,7 @@ public class LogAggregationTestcase {
   private List<PathWithFileStatus> appDirs;
   private final List<AppDescriptor> appDescriptors;
   private AggregatedLogDeletionServiceForTest deletionService;
+  private ApplicationClientProtocol rmClient;
 
   public LogAggregationTestcase(LogAggregationTestcaseBuilder builder) throws 
IOException {
     conf = builder.conf;
@@ -102,6 +105,8 @@ public class LogAggregationTestcase {
     mockFs = ((FilterFileSystem) builder.rootFs).getRawFileSystem();
     validateAppControllers();
     setupMocks();
+
+    setupDeletionService();
   }
 
   private void validateAppControllers() {
@@ -241,10 +246,13 @@ public class LogAggregationTestcase {
     when(mockFs.listStatus(dir.path)).thenReturn(fileStatuses);
   }
 
-  public LogAggregationTestcase setupAndRunDeletionService() {
+  private void setupDeletionService() {
     List<ApplicationId> finishedApps = createFinishedAppsList();
     List<ApplicationId> runningApps = createRunningAppsList();
     deletionService = new AggregatedLogDeletionServiceForTest(runningApps, 
finishedApps, conf);
+  }
+
+  public LogAggregationTestcase startDeletionService() {
     deletionService.init(conf);
     deletionService.start();
     return this;
@@ -271,10 +279,13 @@ public class LogAggregationTestcase {
   public LogAggregationTestcase runDeletionTask(long retentionSeconds) throws 
Exception {
     List<ApplicationId> finishedApps = createFinishedAppsList();
     List<ApplicationId> runningApps = createRunningAppsList();
-    ApplicationClientProtocol rmClient = createMockRMClient(finishedApps, 
runningApps);
-    AggregatedLogDeletionService.LogDeletionTask deletionTask =
-            new AggregatedLogDeletionService.LogDeletionTask(conf, 
retentionSeconds, rmClient);
-    deletionTask.run();
+    rmClient = createMockRMClient(finishedApps, runningApps);
+    List<LogDeletionTask> tasks = deletionService.createLogDeletionTasks(conf, 
retentionSeconds,
+            rmClient);
+    for (LogDeletionTask deletionTask : tasks) {
+      deletionTask.run();
+    }
+
     return this;
   }
 
@@ -359,8 +370,20 @@ public class LogAggregationTestcase {
     verify(mockFs, timeout(timeout).times(times)).delete(file.path, true);
   }
 
-  public void teardown() {
+  private void verifyMockRmClientWasClosedNTimes(int expectedRmClientCloses)
+      throws IOException {
+    ApplicationClientProtocol mockRMClient;
+    if (deletionService != null) {
+      mockRMClient = deletionService.getMockRMClient();
+    } else {
+      mockRMClient = rmClient;
+    }
+    verify((Closeable)mockRMClient, times(expectedRmClientCloses)).close();
+  }
+
+  public void teardown(int expectedRmClientCloses) throws IOException {
     deletionService.stop();
+    verifyMockRmClientWasClosedNTimes(expectedRmClientCloses);
   }
 
   public LogAggregationTestcase refreshLogRetentionSettings() throws 
IOException {
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/MockRMClientUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/MockRMClientUtils.java
index c3f69c2a67d..6eb1eb1ecbe 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/MockRMClientUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/MockRMClientUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.logaggregation.testutils;
 
+import org.apache.hadoop.test.MockitoUtil;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
@@ -34,7 +35,8 @@ public class MockRMClientUtils {
   public static ApplicationClientProtocol createMockRMClient(
           List<ApplicationId> finishedApplications,
           List<ApplicationId> runningApplications) throws Exception {
-    final ApplicationClientProtocol mockProtocol = 
mock(ApplicationClientProtocol.class);
+    final ApplicationClientProtocol mockProtocol =
+        MockitoUtil.mockProtocol(ApplicationClientProtocol.class);
     if (finishedApplications != null && !finishedApplications.isEmpty()) {
       for (ApplicationId appId : finishedApplications) {
         GetApplicationReportRequest request = 
GetApplicationReportRequest.newInstance(appId);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to