This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f9d9bcb5 [#1972] fix(server): Fix clear leak shuffle data 
accidentally remove data of new coming appId issue (#1971)
0f9d9bcb5 is described below

commit 0f9d9bcb5648c95051c9f8c4831943f126d463b9
Author: maobaolong <baoloong...@tencent.com>
AuthorDate: Thu Aug 29 17:33:48 2024 +0800

    [#1972] fix(server): Fix clear leak shuffle data accidentally remove data 
of new coming appId issue (#1971)
    
    ### What changes were proposed in this pull request?
    
    Fix clear leak shuffle data accidentally remove data of new coming appId 
issue
    
    ### Why are the changes needed?
    
    Fix: #1972
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    No need.
---
 .../uniffle/common/log/TestLoggerExtension.java    | 141 +++++++++++++++++++++
 .../common/log/TestLoggerParamResolver.java        |  40 ++++++
 .../apache/uniffle/server/ShuffleTaskManager.java  |   4 +-
 .../server/storage/HadoopStorageManager.java       |   3 +-
 .../server/storage/HybridStorageManager.java       |   5 +-
 .../server/storage/LocalStorageManager.java        |   4 +-
 .../uniffle/server/storage/StorageManager.java     |   3 +-
 .../server/storage/LocalStorageManagerTest.java    |  46 +++++++
 8 files changed, 239 insertions(+), 7 deletions(-)

diff --git 
a/common/src/test/java/org/apache/uniffle/common/log/TestLoggerExtension.java 
b/common/src/test/java/org/apache/uniffle/common/log/TestLoggerExtension.java
new file mode 100644
index 000000000..df0e815eb
--- /dev/null
+++ 
b/common/src/test/java/org/apache/uniffle/common/log/TestLoggerExtension.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.log;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+import javax.annotation.concurrent.GuardedBy;
+
+import org.apache.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.Logger;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public class TestLoggerExtension implements BeforeEachCallback, 
AfterEachCallback {
+  private static final ExtensionContext.Namespace NAMESPACE =
+      ExtensionContext.Namespace.create(TestLoggerExtension.class);
+  private static final String LOG_COLLECTOR_KEY = "TestLogAppender";
+  private TestAppender appender;
+
+  @Override
+  public void beforeEach(ExtensionContext context) {
+    appender = new TestAppender();
+    if (LogManager.getLogger(LogManager.ROOT_LOGGER_NAME) instanceof Logger) {
+      org.apache.logging.log4j.core.Logger log4jlogger =
+          (org.apache.logging.log4j.core.Logger)
+              
org.apache.logging.log4j.LogManager.getLogger(LogManager.ROOT_LOGGER_NAME);
+      appender.start();
+      log4jlogger.addAppender(appender);
+    }
+    context.getStore(NAMESPACE).put(LOG_COLLECTOR_KEY, this);
+  }
+
+  @Override
+  public void afterEach(ExtensionContext context) {
+    if (LogManager.getLogger(LogManager.ROOT_LOGGER_NAME) instanceof Logger) {
+      Logger log4jlogger = (Logger) 
LogManager.getLogger(LogManager.ROOT_LOGGER_NAME);
+      appender.stop();
+      log4jlogger.removeAppender(appender);
+    }
+  }
+
+  public static TestLoggerExtension getTestLogger(ExtensionContext context) {
+    return context.getStore(NAMESPACE).get(LOG_COLLECTOR_KEY, 
TestLoggerExtension.class);
+  }
+
+  /**
+   * Determine if a specific pattern appears in log output.
+   *
+   * @param pattern a pattern text to search for in log events
+   * @return true if a log message containing the pattern exists, false 
otherwise
+   */
+  public boolean wasLogged(String pattern) {
+    return appender.wasLogged(Pattern.compile(".*" + pattern + ".*"));
+  }
+
+  /**
+   * Determine if a specific pattern appears in log output with the specified 
level.
+   *
+   * @param pattern a pattern text to search for in log events
+   * @return true if a log message containing the pattern exists, false 
otherwise
+   */
+  public boolean wasLoggedWithLevel(String pattern, Level level) {
+    return appender.wasLoggedWithLevel(Pattern.compile(".*" + pattern + ".*"), 
level);
+  }
+
+  /**
+   * Count the number of times a specific pattern appears in log messages.
+   *
+   * @param pattern Pattern to search for in log events
+   * @return The number of log messages which match the pattern
+   */
+  public int logCount(String pattern) {
+    // [\s\S] will match all character include line break
+    return appender.logCount(Pattern.compile("[\\s\\S]*" + pattern + 
"[\\s\\S]*"));
+  }
+
+  public class TestAppender extends AbstractAppender {
+    @GuardedBy("this") private final List<LogEvent> events = new ArrayList<>();
+
+    protected TestAppender() {
+      super("", null, null, false, null);
+    }
+
+    /** Determines whether a message with the given pattern was logged. */
+    public synchronized boolean wasLogged(Pattern pattern) {
+      for (LogEvent e : events) {
+        if (pattern.matcher(e.getMessage().getFormattedMessage()).matches()) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    /** Determines whether a message with the given pattern was logged. */
+    public synchronized boolean wasLoggedWithLevel(Pattern pattern, Level 
level) {
+      for (LogEvent e : events) {
+        if (e.getLevel().equals(level)
+            && 
pattern.matcher(e.getMessage().getFormattedMessage()).matches()) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    /** Counts the number of log message with a given pattern. */
+    public synchronized int logCount(Pattern pattern) {
+      int logCount = 0;
+      for (LogEvent e : events) {
+        if (pattern.matcher(e.getMessage().getFormattedMessage()).matches()) {
+          logCount++;
+        }
+      }
+      return logCount;
+    }
+
+    @Override
+    public void append(LogEvent event) {
+      events.add(event);
+    }
+  }
+}
diff --git 
a/common/src/test/java/org/apache/uniffle/common/log/TestLoggerParamResolver.java
 
b/common/src/test/java/org/apache/uniffle/common/log/TestLoggerParamResolver.java
new file mode 100644
index 000000000..10fba0021
--- /dev/null
+++ 
b/common/src/test/java/org/apache/uniffle/common/log/TestLoggerParamResolver.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.log;
+
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolutionException;
+import org.junit.jupiter.api.extension.ParameterResolver;
+
+public class TestLoggerParamResolver implements ParameterResolver {
+  @Override
+  public boolean supportsParameter(
+      final ParameterContext parameterContext, final ExtensionContext 
extensionContext)
+      throws ParameterResolutionException {
+    return 
ExtensionContext.class.isAssignableFrom(parameterContext.getParameter().getType())
+        && parameterContext.getIndex() == 0;
+  }
+
+  @Override
+  public Object resolveParameter(
+      final ParameterContext parameterContext, final ExtensionContext 
extensionContext)
+      throws ParameterResolutionException {
+    return extensionContext;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 226682e63..b48258423 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -832,8 +832,8 @@ public class ShuffleTaskManager {
   public void checkLeakShuffleData() {
     LOG.info("Start check leak shuffle data");
     try {
-      Set<String> appIds = Sets.newHashSet(shuffleTaskInfos.keySet());
-      storageManager.checkAndClearLeakedShuffleData(appIds);
+      storageManager.checkAndClearLeakedShuffleData(
+          () -> Sets.newHashSet(shuffleTaskInfos.keySet()));
       LOG.info("Finish check leak shuffle data");
     } catch (Exception e) {
       LOG.warn("Error happened in checkLeakShuffleData", e);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
index 33d9b820b..99c055e5f 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -181,7 +182,7 @@ public class HadoopStorageManager extends 
SingleStorageManager {
   }
 
   @Override
-  public void checkAndClearLeakedShuffleData(Collection<String> appIds) {}
+  public void checkAndClearLeakedShuffleData(Supplier<Collection<String>> 
appIdsSupplier) {}
 
   @Override
   public Map<String, StorageInfo> getStorageInfo() {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java
index c1169f42f..05b83e558 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.server.storage;
 import java.lang.reflect.Constructor;
 import java.util.Collection;
 import java.util.Map;
+import java.util.function.Supplier;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -145,8 +146,8 @@ public class HybridStorageManager implements StorageManager 
{
   }
 
   @Override
-  public void checkAndClearLeakedShuffleData(Collection<String> appIds) {
-    warmStorageManager.checkAndClearLeakedShuffleData(appIds);
+  public void checkAndClearLeakedShuffleData(Supplier<Collection<String>> 
appIdsSupplier) {
+    warmStorageManager.checkAndClearLeakedShuffleData(appIdsSupplier);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 10f831ad1..6be2af740 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -34,6 +34,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -375,7 +376,7 @@ public class LocalStorageManager extends 
SingleStorageManager {
   }
 
   @Override
-  public void checkAndClearLeakedShuffleData(Collection<String> appIds) {
+  public void checkAndClearLeakedShuffleData(Supplier<Collection<String>> 
appIdsSupplier) {
     Set<String> appIdsOnStorages = new HashSet<>();
     for (LocalStorage localStorage : localStorages) {
       if (!localStorage.isCorrupted()) {
@@ -384,6 +385,7 @@ public class LocalStorageManager extends 
SingleStorageManager {
       }
     }
 
+    Collection<String> appIds = appIdsSupplier.get();
     for (String appId : appIdsOnStorages) {
       if (!appIds.contains(appId)) {
         ShuffleDeleteHandler deleteHandler =
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java 
b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
index 402edc2cd..70425a22d 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.server.storage;
 
 import java.util.Collection;
 import java.util.Map;
+import java.util.function.Supplier;
 
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.storage.StorageInfo;
@@ -55,7 +56,7 @@ public interface StorageManager {
 
   // todo: add an interface that check storage isHealthy
 
-  void checkAndClearLeakedShuffleData(Collection<String> appIds);
+  void checkAndClearLeakedShuffleData(Supplier<Collection<String>> 
appIdsSupplier);
 
   /**
    * Report a map of storage mount point -> storage info mapping. For local 
storages, the mount
diff --git 
a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
index 5584e2609..63c5c12dd 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
@@ -24,6 +24,7 @@ import java.io.InputStreamReader;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -32,29 +33,40 @@ import org.apache.commons.lang3.SystemUtils;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.ExtensionContext;
 
 import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.log.TestLoggerExtension;
+import org.apache.uniffle.common.log.TestLoggerParamResolver;
 import org.apache.uniffle.common.storage.StorageInfo;
 import org.apache.uniffle.common.storage.StorageMedia;
 import org.apache.uniffle.common.storage.StorageStatus;
+import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.server.ShuffleDataFlushEvent;
 import org.apache.uniffle.server.ShuffleDataReadEvent;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.server.ShuffleServerMetrics;
+import org.apache.uniffle.server.ShuffleTaskInfo;
 import org.apache.uniffle.storage.common.LocalStorage;
 import org.apache.uniffle.storage.common.Storage;
 import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 import static 
uk.org.webcompere.systemstubs.SystemStubs.withEnvironmentVariables;
 
 /** The class is to test the {@link LocalStorageManager} */
+@ExtendWith(TestLoggerExtension.class)
+@ExtendWith(TestLoggerParamResolver.class)
 public class LocalStorageManagerTest {
 
   @BeforeAll
@@ -332,4 +344,38 @@ public class LocalStorageManagerTest {
               assertEquals(StorageMedia.SSD, 
storageInfo.get(mountPoint).getType());
             });
   }
+
+  @Test
+  public void testNewAppWhileCheckLeak(ExtensionContext context) {
+    String[] storagePaths = {"/tmp/rss-data1"};
+
+    ShuffleServerConf conf = new ShuffleServerConf();
+    conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, 
Arrays.asList(storagePaths));
+    conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L);
+    conf.setString(
+        ShuffleServerConf.RSS_STORAGE_TYPE.key(),
+        org.apache.uniffle.storage.util.StorageType.LOCALFILE.name());
+    LocalStorageManager localStorageManager = new LocalStorageManager(conf);
+
+    List<LocalStorage> storages = localStorageManager.getStorages();
+    assertNotNull(storages);
+
+    // test normal case
+    Map<String, ShuffleTaskInfo> shuffleTaskInfos = 
JavaUtils.newConcurrentMap();
+    shuffleTaskInfos.put("app0", new ShuffleTaskInfo("app0"));
+    shuffleTaskInfos.put("app1", new ShuffleTaskInfo("app1"));
+    shuffleTaskInfos.put("app2", new ShuffleTaskInfo("app2"));
+    
localStorageManager.checkAndClearLeakedShuffleData(shuffleTaskInfos::keySet);
+    TestLoggerExtension testLogger = 
TestLoggerExtension.getTestLogger(context);
+    assertFalse(testLogger.wasLogged("app"));
+
+    // test race condition case, app 3 is new app
+    shuffleTaskInfos.put("3", new ShuffleTaskInfo("app3"));
+    LocalStorage mockLocalStorage = mock(LocalStorage.class);
+    
when(mockLocalStorage.getAppIds()).thenReturn(Collections.singleton("app3"));
+    storages.add(mockLocalStorage);
+    
localStorageManager.checkAndClearLeakedShuffleData(shuffleTaskInfos::keySet);
+    assertTrue(testLogger.wasLogged("Delete shuffle data for 
appId\\[app3\\]"));
+    System.out.println();
+  }
 }

Reply via email to