This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 493bf19 [Improvement] LocalStorage init use multi thread (#72)
493bf19 is described below
commit 493bf19e5b6f9bcfc4fc742e2356d259f8200ee7
Author: xianjingfeng <[email protected]>
AuthorDate: Fri Jul 29 15:56:59 2022 +0800
[Improvement] LocalStorage init use multi thread (#72)
### **What changes were proposed in this pull request?**
solve issue #71, use multi thread to clean local storage
### **Why are the changes needed?**
If shuffle server exit abnormally, there will be many files need to be
clear when shuffle server start again and this operation will cost a lot of time
### **Does this PR introduce any user-facing change?**
No
### **How was this patch tested?**
Add new ut
---
.../server/storage/LocalStorageManager.java | 57 ++++++++++++++++----
.../server/storage/LocalStorageManagerTest.java | 63 ++++++++++++++++++++++
2 files changed, 111 insertions(+), 9 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 4dde90f..ee42b58 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -17,13 +17,20 @@
package org.apache.uniffle.server.storage;
+import java.util.Arrays;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.util.RssUtils;
@@ -42,8 +49,9 @@ import org.apache.uniffle.storage.util.ShuffleStorageUtils;
import org.apache.uniffle.storage.util.StorageType;
public class LocalStorageManager extends SingleStorageManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(LocalStorageManager.class);
- private final List<LocalStorage> localStorages = Lists.newArrayList();
+ private final List<LocalStorage> localStorages;
private final String[] storageBasePaths;
private final LocalStorageChecker checker;
private List<LocalStorage> unCorruptedStorages = Lists.newArrayList();
@@ -63,15 +71,46 @@ public class LocalStorageManager extends
SingleStorageManager {
if (highWaterMarkOfWrite < lowWaterMarkOfWrite) {
throw new IllegalArgumentException("highWaterMarkOfWrite must be larger
than lowWaterMarkOfWrite");
}
- for (String storagePath : storageBasePaths) {
- localStorages.add(LocalStorage.newBuilder()
- .basePath(storagePath)
- .capacity(capacity)
- .lowWaterMarkOfWrite(lowWaterMarkOfWrite)
- .highWaterMarkOfWrite(highWaterMarkOfWrite)
- .shuffleExpiredTimeoutMs(shuffleExpiredTimeoutMs)
- .build());
+
+ // We must make sure the order of `storageBasePaths` and `localStorages`
is same, or some unit test may be fail
+ CountDownLatch countDownLatch = new
CountDownLatch(storageBasePaths.length);
+ AtomicInteger successCount = new AtomicInteger();
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ LocalStorage[] localStorageArray = new
LocalStorage[storageBasePaths.length];
+ for (int i = 0; i < storageBasePaths.length; i++) {
+ final int idx = i;
+ String storagePath = storageBasePaths[i];
+ executorService.submit(() -> {
+ try {
+ localStorageArray[idx] = LocalStorage.newBuilder()
+ .basePath(storagePath)
+ .capacity(capacity)
+ .lowWaterMarkOfWrite(lowWaterMarkOfWrite)
+ .highWaterMarkOfWrite(highWaterMarkOfWrite)
+ .shuffleExpiredTimeoutMs(shuffleExpiredTimeoutMs)
+ .build();
+ successCount.incrementAndGet();
+ } catch (Exception e) {
+ LOG.error("LocalStorage init failed!", e);
+ } finally {
+ countDownLatch.countDown();
+ }
+ });
+ }
+
+ try {
+ countDownLatch.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
+
+ executorService.shutdown();
+ int failedCount = storageBasePaths.length - successCount.get();
+ if (failedCount > 0) {
+ throw new RuntimeException(String.format("[%s] local storage init
failed!", failedCount));
+ }
+
+ localStorages = Arrays.asList(localStorageArray);
this.checker = new LocalStorageChecker(conf, localStorages);
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
new file mode 100644
index 0000000..477b607
--- /dev/null
+++
b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.storage;
+
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.server.ShuffleServerMetrics;
+import org.apache.uniffle.storage.common.LocalStorage;
+import org.apache.uniffle.storage.util.StorageType;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class LocalStorageManagerTest {
+
+ private static LocalStorageManager localStorageManager;
+ private static String[] storagePaths = {"/tmp/rssdata", "/tmp/rssdata2"};
+
+ @BeforeAll
+ public static void prepare() {
+ ShuffleServerMetrics.register();
+ ShuffleServerConf conf = new ShuffleServerConf();
+ conf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, String.join(",",
storagePaths));
+ conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L);
+ conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE,
StorageType.LOCALFILE.name());
+ localStorageManager = new LocalStorageManager(conf);
+ }
+
+ @AfterAll
+ public static void clear() {
+ ShuffleServerMetrics.clear();
+ }
+
+ @Test
+ public void testInitLocalStorageManager() {
+ List<LocalStorage> storages = localStorageManager.getStorages();
+ assertNotNull(storages);
+ assertTrue(storages.size() == storagePaths.length);
+ for (int i = 0; i < storagePaths.length; i++) {
+ assertTrue(storagePaths[i].equals(storages.get(i).getBasePath()));
+ }
+ }
+}
+