This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new d3aa5dc8 Support writing multi files of single partition to improve 
speed in HDFS storage (#396)
d3aa5dc8 is described below

commit d3aa5dc81bfb9bc63a5a93e2d31f0871cbf573b6
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Dec 13 14:11:44 2022 +0800

    Support writing multi files of single partition to improve speed in HDFS 
storage (#396)
    
    ### What changes were proposed in this pull request?
    1. Introduce the `PooledHdfsShuffleWriteHandler` to support writing single 
partition to multiple HDFS files concurrently.
    
    ### Why are the changes needed?
    As the problem mentioned by 
https://github.com/apache/incubator-uniffle/issues/378#issuecomment-1342637715, 
the writing speed of HDFS is too slow and it can't write concurrently. 
Especially when huge partition exists, this problem will cause other apps slow 
due to the slight memory.
    
    So the improvement of writing speed is an important factor to flush the 
huge partition to HDFS quickly.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes.
    
    ### How was this patch tested?
    1. UTs
---
 docs/server_guide.md                               |   1 +
 .../ShuffleServerConcurrentWriteOfHdfsTest.java    | 152 +++++++++++++++++++++
 .../uniffle/test/ShuffleServerWithHdfsTest.java    |   2 +-
 .../apache/uniffle/server/ShuffleFlushManager.java |   6 +-
 .../apache/uniffle/server/ShuffleServerConf.java   |   7 +
 .../uniffle/server/ShuffleFlushManagerTest.java    |  30 ++++
 .../apache/uniffle/storage/common/HdfsStorage.java |  35 +++--
 .../storage/handler/api/ShuffleWriteHandler.java   |   6 +-
 .../handler/impl/HdfsShuffleWriteHandler.java      |   2 +-
 .../handler/impl/LocalFileWriteHandler.java        |   2 +-
 .../impl/PooledHdfsShuffleWriteHandler.java        |  88 ++++++++++++
 .../request/CreateShuffleWriteHandlerRequest.java  |  33 +++++
 12 files changed, 346 insertions(+), 18 deletions(-)

diff --git a/docs/server_guide.md b/docs/server_guide.md
index 871aa237..17cb7875 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -84,3 +84,4 @@ This document will introduce how to deploy Uniffle shuffle 
servers.
 |rss.server.disk.capacity|-1|Disk capacity that shuffle server can use. If 
it's negative, it will use the default disk whole space|
 |rss.server.multistorage.fallback.strategy.class|-|The fallback strategy for 
`MEMORY_LOCALFILE_HDFS`. Support 
`org.apache.uniffle.server.storage.RotateStorageManagerFallbackStrategy`,`org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy`
 and `org.apache.uniffle.server.storage.HdfsStorageManagerFallbackStrategy`. If 
not set, `org.apache.uniffle.server.storage.HdfsStorageManagerFallbackStrategy` 
will be used.|
 |rss.server.leak.shuffledata.check.interval|3600000|The interval of leak 
shuffle data check (ms)|
+|rss.server.max.concurrency.of.single.partition.writer|1|The max concurrency 
of single partition writer, the data partition file number is equal to this 
value. Default value is 1. This config could improve the writing speed, 
especially for huge partition.|
\ No newline at end of file
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHdfsTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHdfsTest.java
new file mode 100644
index 00000000..caa1663d
--- /dev/null
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHdfsTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+
+import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
+import org.apache.uniffle.client.request.RssFinishShuffleRequest;
+import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
+import org.apache.uniffle.client.request.RssSendCommitRequest;
+import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.client.util.DefaultIdHelper;
+import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static 
org.apache.uniffle.common.util.Constants.SHUFFLE_DATA_FILE_SUFFIX;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ShuffleServerConcurrentWriteOfHdfsTest extends 
ShuffleServerWithHdfsTest {
+  private static final int MAX_CONCURRENCY = 3;
+
+  @BeforeAll
+  public static void setupServers() throws Exception {
+    CoordinatorConf coordinatorConf = getCoordinatorConf();
+    createCoordinatorServer(coordinatorConf);
+    ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+    shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, 
StorageType.HDFS.name());
+    
shuffleServerConf.setInteger(ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION,
 MAX_CONCURRENCY);
+    
shuffleServerConf.setBoolean(shuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED, 
true);
+    shuffleServerConf.setLong(shuffleServerConf.SINGLE_BUFFER_FLUSH_THRESHOLD, 
1024 * 1024L);
+    createShuffleServer(shuffleServerConf);
+    startServers();
+  }
+
+  @Test
+  public void testConcurrentWrite2Hdfs() throws Exception {
+    String appId = "testConcurrentWrite2Hdfs";
+    String dataBasePath = HDFS_URI + "rss/test";
+    RssRegisterShuffleRequest rrsr = new RssRegisterShuffleRequest(
+        appId,
+        0,
+        Lists.newArrayList(new PartitionRange(0, 1)),
+        dataBasePath
+    );
+    shuffleServerClient.registerShuffle(rrsr);
+
+    List<Roaring64NavigableMap> bitmaps = new ArrayList<>();
+    Map<Long, byte[]> expectedDataList = new HashMap<>();
+    IntStream.range(0, 20).forEach(x -> {
+      Roaring64NavigableMap bitmap = Roaring64NavigableMap.bitmapOf();
+      bitmaps.add(bitmap);
+
+      Map<Long, byte[]> expectedData = Maps.newHashMap();
+
+      List<ShuffleBlockInfo> blocks = createShuffleBlockList(
+          0,
+          0,
+          0,
+          1,
+          1024 * 1025,
+          bitmap,
+          expectedData,
+          mockSSI
+      );
+      expectedDataList.putAll(expectedData);
+
+      Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = 
Maps.newHashMap();
+      partitionToBlocks.put(0, blocks);
+      Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks = 
Maps.newHashMap();
+      shuffleToBlocks.put(0, partitionToBlocks);
+      RssSendShuffleDataRequest rssdr = new RssSendShuffleDataRequest(appId, 
3, 1000, shuffleToBlocks);
+      shuffleServerClient.sendShuffleData(rssdr);
+    });
+
+    RssSendCommitRequest rscr = new RssSendCommitRequest(appId, 0);
+    shuffleServerClient.sendCommit(rscr);
+
+    RssFinishShuffleRequest rfsr = new RssFinishShuffleRequest(appId, 0);
+    shuffleServerClient.finishShuffle(rfsr);
+
+    // Check the concurrent hdfs file creation
+    FileStatus[] fileStatuses = fs.listStatus(new Path(dataBasePath + "/" + 
appId + "/0/0-1"));
+    long actual = Arrays
+        .stream(fileStatuses)
+        .filter(x -> x.getPath().getName().endsWith(SHUFFLE_DATA_FILE_SUFFIX))
+        .count();
+    assertEquals(MAX_CONCURRENCY, actual);
+
+    ShuffleServerInfo ssi = new ShuffleServerInfo(LOCALHOST, 
SHUFFLE_SERVER_PORT);
+    Roaring64NavigableMap blocksBitmap = Roaring64NavigableMap.bitmapOf();
+    bitmaps.stream().forEach(x -> {
+      Iterator<Long> iterator = x.iterator();
+      while (iterator.hasNext()) {
+        blocksBitmap.add(iterator.next());
+      }
+    });
+
+    ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(
+        StorageType.HDFS.name(),
+        appId,
+        0,
+        0,
+        100,
+        2,
+        10,
+        1000,
+        dataBasePath,
+        blocksBitmap,
+        Roaring64NavigableMap.bitmapOf(0),
+        Lists.newArrayList(ssi),
+        new Configuration(),
+        new DefaultIdHelper()
+    );
+
+    validateResult(readClient, expectedDataList, blocksBitmap);
+  }
+}
+
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHdfsTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHdfsTest.java
index 90a4fef6..c27968c0 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHdfsTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHdfsTest.java
@@ -51,7 +51,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class ShuffleServerWithHdfsTest extends ShuffleReadWriteBase {
 
-  private ShuffleServerGrpcClient shuffleServerClient;
+  protected ShuffleServerGrpcClient shuffleServerClient;
 
   @BeforeAll
   public static void setupServers() throws Exception {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 367fdaff..edddf1a6 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -69,6 +69,7 @@ public class ShuffleFlushManager {
   private final BlockingQueue<PendingShuffleFlushEvent> pendingEvents = 
Queues.newLinkedBlockingQueue();
   private final long pendingEventTimeoutSec;
   private int processPendingEventIndex = 0;
+  private final int maxConcurrencyOfSingleOnePartition;
 
   public ShuffleFlushManager(ShuffleServerConf shuffleServerConf, String 
shuffleServerId, ShuffleServer shuffleServer,
                              StorageManager storageManager) {
@@ -80,6 +81,8 @@ public class ShuffleFlushManager {
     retryMax = 
shuffleServerConf.getInteger(ShuffleServerConf.SERVER_WRITE_RETRY_MAX);
     storageType = shuffleServerConf.get(RssBaseConf.RSS_STORAGE_TYPE);
     storageDataReplica = 
shuffleServerConf.get(RssBaseConf.RSS_STORAGE_DATA_REPLICA);
+    this.maxConcurrencyOfSingleOnePartition =
+        
shuffleServerConf.get(ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION);
 
     int waitQueueSize = shuffleServerConf.getInteger(
         ShuffleServerConf.SERVER_FLUSH_THREAD_POOL_QUEUE_SIZE);
@@ -200,7 +203,8 @@ public class ShuffleFlushManager {
             shuffleServerId,
             hadoopConf,
             storageDataReplica,
-            user);
+            user,
+            maxConcurrencyOfSingleOnePartition);
         ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
         writeSuccess = storageManager.write(storage, handler, event);
         if (writeSuccess) {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 52acc549..38454598 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -314,6 +314,13 @@ public class ShuffleServerConf extends RssBaseConf {
           .defaultValue(3600 * 1000L)
           .withDescription("the interval of leak shuffle data check");
 
+  public static final ConfigOption<Integer> 
SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION = ConfigOptions
+      .key("rss.server.max.concurrency.of.single.partition.writer")
+      .intType()
+      .defaultValue(1)
+      .withDescription("The max concurrency of single partition writer, the 
data partition file number is "
+          + "equal to this value. Default value is 1.");
+
   public ShuffleServerConf() {
   }
 
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 3e8afaa9..ea0e236e 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
+import java.util.stream.IntStream;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -117,6 +118,35 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     assertEquals("value", manager.getHadoopConf().get("a.b"));
   }
 
+  @Test
+  public void concurrentWrite2HdfsWriteOfSinglePartition() throws Exception {
+    ShuffleServerConf shuffleServerConf = new ShuffleServerConf();
+    shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, 
Collections.emptyList());
+    shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, 
StorageType.HDFS.name());
+    int maxConcurrency = 3;
+    
shuffleServerConf.setInteger(ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION,
 maxConcurrency);
+
+    String appId = "concurrentWrite2HdfsWriteOfSinglePartition_appId";
+    StorageManager storageManager =
+        
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
+    storageManager.registerRemoteStorage(appId, remoteStorage);
+    ShuffleFlushManager manager =
+        new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", 
mockShuffleServer, storageManager);
+
+    IntStream.range(0, 20).forEach(x -> {
+      ShuffleDataFlushEvent event = createShuffleDataFlushEvent(appId, 1, 1, 
1, null);
+      manager.addToFlushQueue(event);
+    });
+    waitForFlush(manager, appId, 1, 10 * 5);
+
+    FileStatus[] fileStatuses = fs.listStatus(new Path(HDFS_URI + "/rss/test/" 
+ appId + "/1/1-1"));
+    long actual = Arrays.stream(fileStatuses).filter(x -> 
x.getPath().getName().endsWith("data")).count();
+
+    assertEquals(maxConcurrency, actual);
+    actual = Arrays.stream(fileStatuses).filter(x -> 
x.getPath().getName().endsWith("index")).count();
+    assertEquals(maxConcurrency, actual);
+  }
+
   @Test
   public void writeTest() throws Exception {
     String appId = "writeTest_appId";
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/common/HdfsStorage.java 
b/storage/src/main/java/org/apache/uniffle/storage/common/HdfsStorage.java
index 55d0898b..46ae771f 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/HdfsStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/HdfsStorage.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.storage.handler.api.ServerReadHandler;
 import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
 import org.apache.uniffle.storage.handler.impl.HdfsShuffleWriteHandler;
+import org.apache.uniffle.storage.handler.impl.PooledHdfsShuffleWriteHandler;
 import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
 import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest;
 
@@ -99,16 +100,30 @@ public class HdfsStorage extends AbstractStorage {
   ShuffleWriteHandler newWriteHandler(CreateShuffleWriteHandlerRequest 
request) {
     try {
       String user = request.getUser();
-      return new HdfsShuffleWriteHandler(
-          request.getAppId(),
-          request.getShuffleId(),
-          request.getStartPartition(),
-          request.getEndPartition(),
-          storagePath,
-          request.getFileNamePrefix(),
-          conf,
-          user
-      );
+      if (request.getMaxFileNumber() == 1) {
+        return new HdfsShuffleWriteHandler(
+            request.getAppId(),
+            request.getShuffleId(),
+            request.getStartPartition(),
+            request.getEndPartition(),
+            storagePath,
+            request.getFileNamePrefix(),
+            conf,
+            user
+        );
+      } else {
+        return new PooledHdfsShuffleWriteHandler(
+            request.getAppId(),
+            request.getShuffleId(),
+            request.getStartPartition(),
+            request.getEndPartition(),
+            storagePath,
+            request.getFileNamePrefix(),
+            conf,
+            user,
+            request.getMaxFileNumber()
+        );
+      }
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleWriteHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleWriteHandler.java
index 0a127012..b5b242e9 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleWriteHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleWriteHandler.java
@@ -17,7 +17,6 @@
 
 package org.apache.uniffle.storage.handler.api;
 
-import java.io.IOException;
 import java.util.List;
 
 import org.apache.uniffle.common.ShufflePartitionedBlock;
@@ -28,8 +27,7 @@ public interface ShuffleWriteHandler {
    * Write the blocks to storage
    *
    * @param shuffleBlocks blocks to storage
-   * @throws IOException
-   * @throws IllegalStateException
+   * @throws Exception
    */
-  void write(List<ShufflePartitionedBlock> shuffleBlocks) throws IOException, 
IllegalStateException;
+  void write(List<ShufflePartitionedBlock> shuffleBlocks) throws Exception;
 }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleWriteHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleWriteHandler.java
index 36240cae..8ac30e13 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleWriteHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleWriteHandler.java
@@ -102,7 +102,7 @@ public class HdfsShuffleWriteHandler implements 
ShuffleWriteHandler {
 
   @Override
   public void write(
-      List<ShufflePartitionedBlock> shuffleBlocks) throws IOException, 
IllegalStateException {
+      List<ShufflePartitionedBlock> shuffleBlocks) throws Exception {
     final long start = System.currentTimeMillis();
     writeLock.lock();
     try {
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
index 1b24e7d0..881982d2 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
@@ -87,7 +87,7 @@ public class LocalFileWriteHandler implements 
ShuffleWriteHandler {
 
   @Override
   public synchronized void write(
-      List<ShufflePartitionedBlock> shuffleBlocks) throws IOException, 
IllegalStateException {
+      List<ShufflePartitionedBlock> shuffleBlocks) throws Exception {
 
     // Ignore this write, if the shuffle directory is deleted after being 
uploaded in multi mode
     // or after its app heartbeat times out.
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java
new file mode 100644
index 00000000..e6ee9ea8
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.handler.impl;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
+import org.apache.uniffle.storage.util.ShuffleStorageUtils;
+
+public class PooledHdfsShuffleWriteHandler implements ShuffleWriteHandler {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PooledHdfsShuffleWriteHandler.class);
+
+  private final BlockingQueue<HdfsShuffleWriteHandler> queue;
+  private final int maxConcurrency;
+  private final String basePath;
+
+  public PooledHdfsShuffleWriteHandler(
+      String appId,
+      int shuffleId,
+      int startPartition,
+      int endPartition,
+      String storageBasePath,
+      String fileNamePrefix,
+      Configuration hadoopConf,
+      String user,
+      int concurrency) {
+    // todo: support max concurrency specified by client side
+    this.maxConcurrency = concurrency;
+    this.queue = new LinkedBlockingQueue<>(maxConcurrency);
+    this.basePath = 
ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePath,
+        ShuffleStorageUtils.getShuffleDataPath(appId, shuffleId, 
startPartition, endPartition));
+
+    // todo: support init lazily
+    try {
+      for (int i = 0; i < maxConcurrency; i++) {
+        queue.offer(
+            new HdfsShuffleWriteHandler(
+                appId,
+                shuffleId,
+                startPartition,
+                endPartition,
+                storageBasePath,
+                fileNamePrefix + "_" + i,
+                hadoopConf,
+                user
+            )
+        );
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("Errors on initializing Hdfs writer 
handler.", e);
+    }
+  }
+
+  @Override
+  public void write(List<ShufflePartitionedBlock> shuffleBlocks) throws 
Exception {
+    if (queue.isEmpty()) {
+      LOGGER.warn("No free hdfs writer handler, it will wait. storage path: 
{}", basePath);
+    }
+    HdfsShuffleWriteHandler writeHandler = queue.take();
+    try {
+      writeHandler.write(shuffleBlocks);
+    } finally {
+      queue.offer(writeHandler);
+    }
+  }
+}
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleWriteHandlerRequest.java
 
b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleWriteHandlerRequest.java
index 2328f7a6..6855e907 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleWriteHandlerRequest.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleWriteHandlerRequest.java
@@ -31,6 +31,7 @@ public class CreateShuffleWriteHandlerRequest {
   private Configuration conf;
   private int storageDataReplica;
   private String user;
+  private int maxFileNumber;
 
   public CreateShuffleWriteHandlerRequest(
       String storageType,
@@ -43,6 +44,33 @@ public class CreateShuffleWriteHandlerRequest {
       Configuration conf,
       int storageDataReplica,
       String user) {
+    this(
+        storageType,
+        appId,
+        shuffleId,
+        startPartition,
+        endPartition,
+        storageBasePaths,
+        fileNamePrefix,
+        conf,
+        storageDataReplica,
+        user,
+        1
+    );
+  }
+
+  public CreateShuffleWriteHandlerRequest(
+      String storageType,
+      String appId,
+      int shuffleId,
+      int startPartition,
+      int endPartition,
+      String[] storageBasePaths,
+      String fileNamePrefix,
+      Configuration conf,
+      int storageDataReplica,
+      String user,
+      int maxFileNumber) {
     this.storageType = storageType;
     this.appId = appId;
     this.shuffleId = shuffleId;
@@ -53,6 +81,7 @@ public class CreateShuffleWriteHandlerRequest {
     this.conf = conf;
     this.storageDataReplica = storageDataReplica;
     this.user = user;
+    this.maxFileNumber = maxFileNumber;
   }
 
   public String getStorageType() {
@@ -98,4 +127,8 @@ public class CreateShuffleWriteHandlerRequest {
   public void setUser(String user) {
     this.user = user;
   }
+
+  public int getMaxFileNumber() {
+    return maxFileNumber;
+  }
 }

Reply via email to