This is an automated email from the ASF dual-hosted git repository. roryqi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push: new 0f9d9bcb5 [#1972] fix(server): Fix clear leak shuffle data accidentally remove data of new coming appId issue (#1971) 0f9d9bcb5 is described below commit 0f9d9bcb5648c95051c9f8c4831943f126d463b9 Author: maobaolong <baoloong...@tencent.com> AuthorDate: Thu Aug 29 17:33:48 2024 +0800 [#1972] fix(server): Fix clear leak shuffle data accidentally remove data of new coming appId issue (#1971) ### What changes were proposed in this pull request? Fix clear leak shuffle data accidentally remove data of new coming appId issue ### Why are the changes needed? Fix: #1972 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No need. --- .../uniffle/common/log/TestLoggerExtension.java | 141 +++++++++++++++++++++ .../common/log/TestLoggerParamResolver.java | 40 ++++++ .../apache/uniffle/server/ShuffleTaskManager.java | 4 +- .../server/storage/HadoopStorageManager.java | 3 +- .../server/storage/HybridStorageManager.java | 5 +- .../server/storage/LocalStorageManager.java | 4 +- .../uniffle/server/storage/StorageManager.java | 3 +- .../server/storage/LocalStorageManagerTest.java | 46 +++++++ 8 files changed, 239 insertions(+), 7 deletions(-) diff --git a/common/src/test/java/org/apache/uniffle/common/log/TestLoggerExtension.java b/common/src/test/java/org/apache/uniffle/common/log/TestLoggerExtension.java new file mode 100644 index 000000000..df0e815eb --- /dev/null +++ b/common/src/test/java/org/apache/uniffle/common/log/TestLoggerExtension.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.log; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; +import javax.annotation.concurrent.GuardedBy; + +import org.apache.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.Logger; +import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +public class TestLoggerExtension implements BeforeEachCallback, AfterEachCallback { + private static final ExtensionContext.Namespace NAMESPACE = + ExtensionContext.Namespace.create(TestLoggerExtension.class); + private static final String LOG_COLLECTOR_KEY = "TestLogAppender"; + private TestAppender appender; + + @Override + public void beforeEach(ExtensionContext context) { + appender = new TestAppender(); + if (LogManager.getLogger(LogManager.ROOT_LOGGER_NAME) instanceof Logger) { + org.apache.logging.log4j.core.Logger log4jlogger = + (org.apache.logging.log4j.core.Logger) + org.apache.logging.log4j.LogManager.getLogger(LogManager.ROOT_LOGGER_NAME); + appender.start(); + log4jlogger.addAppender(appender); + } + context.getStore(NAMESPACE).put(LOG_COLLECTOR_KEY, this); + } + + @Override + public void afterEach(ExtensionContext context) { + if (LogManager.getLogger(LogManager.ROOT_LOGGER_NAME) instanceof Logger) { + Logger log4jlogger = (Logger) LogManager.getLogger(LogManager.ROOT_LOGGER_NAME); + appender.stop(); + log4jlogger.removeAppender(appender); + } + } + + public static TestLoggerExtension getTestLogger(ExtensionContext context) { + return context.getStore(NAMESPACE).get(LOG_COLLECTOR_KEY, TestLoggerExtension.class); + } + + /** + * Determine if a specific pattern appears in log output. + * + * @param pattern a pattern text to search for in log events + * @return true if a log message containing the pattern exists, false otherwise + */ + public boolean wasLogged(String pattern) { + return appender.wasLogged(Pattern.compile(".*" + pattern + ".*")); + } + + /** + * Determine if a specific pattern appears in log output with the specified level. + * + * @param pattern a pattern text to search for in log events + * @return true if a log message containing the pattern exists, false otherwise + */ + public boolean wasLoggedWithLevel(String pattern, Level level) { + return appender.wasLoggedWithLevel(Pattern.compile(".*" + pattern + ".*"), level); + } + + /** + * Count the number of times a specific pattern appears in log messages. + * + * @param pattern Pattern to search for in log events + * @return The number of log messages which match the pattern + */ + public int logCount(String pattern) { + // [\s\S] will match all character include line break + return appender.logCount(Pattern.compile("[\\s\\S]*" + pattern + "[\\s\\S]*")); + } + + public class TestAppender extends AbstractAppender { + @GuardedBy("this") private final List<LogEvent> events = new ArrayList<>(); + + protected TestAppender() { + super("", null, null, false, null); + } + + /** Determines whether a message with the given pattern was logged. */ + public synchronized boolean wasLogged(Pattern pattern) { + for (LogEvent e : events) { + if (pattern.matcher(e.getMessage().getFormattedMessage()).matches()) { + return true; + } + } + return false; + } + + /** Determines whether a message with the given pattern was logged. */ + public synchronized boolean wasLoggedWithLevel(Pattern pattern, Level level) { + for (LogEvent e : events) { + if (e.getLevel().equals(level) + && pattern.matcher(e.getMessage().getFormattedMessage()).matches()) { + return true; + } + } + return false; + } + + /** Counts the number of log message with a given pattern. */ + public synchronized int logCount(Pattern pattern) { + int logCount = 0; + for (LogEvent e : events) { + if (pattern.matcher(e.getMessage().getFormattedMessage()).matches()) { + logCount++; + } + } + return logCount; + } + + @Override + public void append(LogEvent event) { + events.add(event); + } + } +} diff --git a/common/src/test/java/org/apache/uniffle/common/log/TestLoggerParamResolver.java b/common/src/test/java/org/apache/uniffle/common/log/TestLoggerParamResolver.java new file mode 100644 index 000000000..10fba0021 --- /dev/null +++ b/common/src/test/java/org/apache/uniffle/common/log/TestLoggerParamResolver.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.log; + +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; +import org.junit.jupiter.api.extension.ParameterResolver; + +public class TestLoggerParamResolver implements ParameterResolver { + @Override + public boolean supportsParameter( + final ParameterContext parameterContext, final ExtensionContext extensionContext) + throws ParameterResolutionException { + return ExtensionContext.class.isAssignableFrom(parameterContext.getParameter().getType()) + && parameterContext.getIndex() == 0; + } + + @Override + public Object resolveParameter( + final ParameterContext parameterContext, final ExtensionContext extensionContext) + throws ParameterResolutionException { + return extensionContext; + } +} diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index 226682e63..b48258423 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -832,8 +832,8 @@ public class ShuffleTaskManager { public void checkLeakShuffleData() { LOG.info("Start check leak shuffle data"); try { - Set<String> appIds = Sets.newHashSet(shuffleTaskInfos.keySet()); - storageManager.checkAndClearLeakedShuffleData(appIds); + storageManager.checkAndClearLeakedShuffleData( + () -> Sets.newHashSet(shuffleTaskInfos.keySet())); LOG.info("Finish check leak shuffle data"); } catch (Exception e) { LOG.warn("Error happened in checkLeakShuffleData", e); diff --git a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java index 33d9b820b..99c055e5f 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.function.Supplier; import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; @@ -181,7 +182,7 @@ public class HadoopStorageManager extends SingleStorageManager { } @Override - public void checkAndClearLeakedShuffleData(Collection<String> appIds) {} + public void checkAndClearLeakedShuffleData(Supplier<Collection<String>> appIdsSupplier) {} @Override public Map<String, StorageInfo> getStorageInfo() { diff --git a/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java index c1169f42f..05b83e558 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java @@ -20,6 +20,7 @@ package org.apache.uniffle.server.storage; import java.lang.reflect.Constructor; import java.util.Collection; import java.util.Map; +import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -145,8 +146,8 @@ public class HybridStorageManager implements StorageManager { } @Override - public void checkAndClearLeakedShuffleData(Collection<String> appIds) { - warmStorageManager.checkAndClearLeakedShuffleData(appIds); + public void checkAndClearLeakedShuffleData(Supplier<Collection<String>> appIdsSupplier) { + warmStorageManager.checkAndClearLeakedShuffleData(appIdsSupplier); } @Override diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java index 10f831ad1..6be2af740 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java @@ -34,6 +34,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -375,7 +376,7 @@ public class LocalStorageManager extends SingleStorageManager { } @Override - public void checkAndClearLeakedShuffleData(Collection<String> appIds) { + public void checkAndClearLeakedShuffleData(Supplier<Collection<String>> appIdsSupplier) { Set<String> appIdsOnStorages = new HashSet<>(); for (LocalStorage localStorage : localStorages) { if (!localStorage.isCorrupted()) { @@ -384,6 +385,7 @@ public class LocalStorageManager extends SingleStorageManager { } } + Collection<String> appIds = appIdsSupplier.get(); for (String appId : appIdsOnStorages) { if (!appIds.contains(appId)) { ShuffleDeleteHandler deleteHandler = diff --git a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java index 402edc2cd..70425a22d 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java @@ -19,6 +19,7 @@ package org.apache.uniffle.server.storage; import java.util.Collection; import java.util.Map; +import java.util.function.Supplier; import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.storage.StorageInfo; @@ -55,7 +56,7 @@ public interface StorageManager { // todo: add an interface that check storage isHealthy - void checkAndClearLeakedShuffleData(Collection<String> appIds); + void checkAndClearLeakedShuffleData(Supplier<Collection<String>> appIdsSupplier); /** * Report a map of storage mount point -> storage info mapping. For local storages, the mount diff --git a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java index 5584e2609..63c5c12dd 100644 --- a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java @@ -24,6 +24,7 @@ import java.io.InputStreamReader; import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -32,29 +33,40 @@ import org.apache.commons.lang3.SystemUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.ExtensionContext; import org.apache.uniffle.common.ShufflePartitionedBlock; +import org.apache.uniffle.common.log.TestLoggerExtension; +import org.apache.uniffle.common.log.TestLoggerParamResolver; import org.apache.uniffle.common.storage.StorageInfo; import org.apache.uniffle.common.storage.StorageMedia; import org.apache.uniffle.common.storage.StorageStatus; +import org.apache.uniffle.common.util.JavaUtils; import org.apache.uniffle.server.ShuffleDataFlushEvent; import org.apache.uniffle.server.ShuffleDataReadEvent; import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.server.ShuffleServerMetrics; +import org.apache.uniffle.server.ShuffleTaskInfo; import org.apache.uniffle.storage.common.LocalStorage; import org.apache.uniffle.storage.common.Storage; import org.apache.uniffle.storage.util.StorageType; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import static uk.org.webcompere.systemstubs.SystemStubs.withEnvironmentVariables; /** The class is to test the {@link LocalStorageManager} */ +@ExtendWith(TestLoggerExtension.class) +@ExtendWith(TestLoggerParamResolver.class) public class LocalStorageManagerTest { @BeforeAll @@ -332,4 +344,38 @@ public class LocalStorageManagerTest { assertEquals(StorageMedia.SSD, storageInfo.get(mountPoint).getType()); }); } + + @Test + public void testNewAppWhileCheckLeak(ExtensionContext context) { + String[] storagePaths = {"/tmp/rss-data1"}; + + ShuffleServerConf conf = new ShuffleServerConf(); + conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storagePaths)); + conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L); + conf.setString( + ShuffleServerConf.RSS_STORAGE_TYPE.key(), + org.apache.uniffle.storage.util.StorageType.LOCALFILE.name()); + LocalStorageManager localStorageManager = new LocalStorageManager(conf); + + List<LocalStorage> storages = localStorageManager.getStorages(); + assertNotNull(storages); + + // test normal case + Map<String, ShuffleTaskInfo> shuffleTaskInfos = JavaUtils.newConcurrentMap(); + shuffleTaskInfos.put("app0", new ShuffleTaskInfo("app0")); + shuffleTaskInfos.put("app1", new ShuffleTaskInfo("app1")); + shuffleTaskInfos.put("app2", new ShuffleTaskInfo("app2")); + localStorageManager.checkAndClearLeakedShuffleData(shuffleTaskInfos::keySet); + TestLoggerExtension testLogger = TestLoggerExtension.getTestLogger(context); + assertFalse(testLogger.wasLogged("app")); + + // test race condition case, app 3 is new app + shuffleTaskInfos.put("3", new ShuffleTaskInfo("app3")); + LocalStorage mockLocalStorage = mock(LocalStorage.class); + when(mockLocalStorage.getAppIds()).thenReturn(Collections.singleton("app3")); + storages.add(mockLocalStorage); + localStorageManager.checkAndClearLeakedShuffleData(shuffleTaskInfos::keySet); + assertTrue(testLogger.wasLogged("Delete shuffle data for appId\\[app3\\]")); + System.out.println(); + } }