This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b70eb4de feat(server) Support config buffer size for 
HadoopShuffleWriteHandler (#2216)
2b70eb4de is described below

commit 2b70eb4de01464aee79fe96c5bc734b97b5cbcf5
Author: leewish <[email protected]>
AuthorDate: Tue Oct 22 19:19:55 2024 +0800

    feat(server) Support config buffer size for HadoopShuffleWriteHandler 
(#2216)
    
    ### What changes were proposed in this pull request?
    
    Support config buffer size for HadoopShuffleWriteHandler
    
    ### Why are the changes needed?
    
    Fix: #2209
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Locally test.
    
    Co-authored-by: wenlongwlli <[email protected]>
---
 .../apache/uniffle/common/config/RssBaseConf.java  | 24 ++++++--
 .../uniffle/storage/common/HadoopStorage.java      |  2 +
 .../storage/handler/impl/HadoopFileWriter.java     | 16 ++++--
 .../handler/impl/HadoopShuffleWriteHandler.java    | 67 +++++++++++++++++++---
 .../handler/impl/LocalFileWriteHandler.java        |  8 +--
 .../impl/PooledHadoopShuffleWriteHandler.java      |  3 +
 6 files changed, 97 insertions(+), 23 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
index 4e197204a..cba44578a 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
@@ -294,17 +294,29 @@ public class RssBaseConf extends RssConf {
                   + " first combining the username and the password with a 
colon (uniffle:uniffle123)"
                   + ", and then by encoding the resulting string in base64 
(dW5pZmZsZTp1bmlmZmxlMTIz).");
 
-  public static final ConfigOption<String> RSS_STORAGE_WRITE_DATA_BUFFER_SIZE =
-      ConfigOptions.key("rss.storage.write.dataBufferSize")
+  public static final ConfigOption<String> 
RSS_STORAGE_LOCALFILE_WRITE_DATA_BUFFER_SIZE =
+      ConfigOptions.key("rss.storage.localfile.write.dataBufferSize")
           .stringType()
           .defaultValue("8k")
-          .withDescription("The buffer size to cache the write data content.");
+          .withDescription("The buffer size to cache the write data content 
for LOCALFILE.");
 
-  public static final ConfigOption<String> RSS_STORAGE_WRITE_INDEX_BUFFER_SIZE 
=
-      ConfigOptions.key("rss.storage.write.indexBufferSize")
+  public static final ConfigOption<String> 
RSS_STORAGE_LOCALFILE_WRITE_INDEX_BUFFER_SIZE =
+      ConfigOptions.key("rss.storage.localfile.write.indexBufferSize")
           .stringType()
           .defaultValue("8k")
-          .withDescription("The buffer size to cache the write index 
content.");
+          .withDescription("The buffer size to cache the write index content 
for LOCALFILE.");
+
+  public static final ConfigOption<String> 
RSS_STORAGE_HDFS_WRITE_DATA_BUFFER_SIZE =
+      ConfigOptions.key("rss.storage.hdfs.write.dataBufferSize")
+          .stringType()
+          .defaultValue("8k")
+          .withDescription("The buffer size to cache the write data content 
for HDFS.");
+
+  public static final ConfigOption<String> 
RSS_STORAGE_HDFS_WRITE_INDEX_BUFFER_SIZE =
+      ConfigOptions.key("rss.storage.hdfs.write.indexBufferSize")
+          .stringType()
+          .defaultValue("8k")
+          .withDescription("The buffer size to cache the write index content 
for HDFS.");
 
   public boolean loadConfFromFile(String fileName, List<ConfigOption<Object>> 
configOptions) {
     Map<String, String> properties = RssUtils.getPropertiesFromFile(fileName);
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/common/HadoopStorage.java 
b/storage/src/main/java/org/apache/uniffle/storage/common/HadoopStorage.java
index ab5186288..7665beac2 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/HadoopStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/HadoopStorage.java
@@ -83,6 +83,7 @@ public class HadoopStorage extends AbstractStorage {
       String user = request.getUser();
       if (request.getMaxFileNumber() == 1) {
         return new HadoopShuffleWriteHandler(
+            request.getRssBaseConf(),
             request.getAppId(),
             request.getShuffleId(),
             request.getStartPartition(),
@@ -93,6 +94,7 @@ public class HadoopStorage extends AbstractStorage {
             user);
       } else {
         return new PooledHadoopShuffleWriteHandler(
+            request.getRssBaseConf(),
             request.getAppId(),
             request.getShuffleId(),
             request.getStartPartition(),
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopFileWriter.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopFileWriter.java
index e6074f09d..d946f091c 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopFileWriter.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopFileWriter.java
@@ -22,6 +22,7 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -44,19 +45,26 @@ public class HadoopFileWriter implements FileWriter, 
Closeable {
   private FSDataOutputStream fsDataOutputStream;
   private long nextOffset;
 
+  @VisibleForTesting
   public HadoopFileWriter(FileSystem fileSystem, Path path, Configuration 
hadoopConf)
       throws IOException {
+    this(fileSystem, path, hadoopConf, 8 * 1024);
+  }
+
+  public HadoopFileWriter(
+      FileSystem fileSystem, Path path, Configuration hadoopConf, int 
bufferSize)
+      throws IOException {
     this.path = path;
     this.hadoopConf = hadoopConf;
     this.fileSystem = fileSystem;
-    initStream();
+    initStream(bufferSize);
   }
 
-  private void initStream() throws IOException, IllegalStateException {
+  private void initStream(int bufferSize) throws IOException, 
IllegalStateException {
     final FileSystem writerFs = fileSystem;
     if (writerFs.isFile(path)) {
       if (hadoopConf.getBoolean("dfs.support.append", true)) {
-        fsDataOutputStream = writerFs.append(path);
+        fsDataOutputStream = writerFs.append(path, bufferSize);
         nextOffset = fsDataOutputStream.getPos();
       } else {
         String msg = path + " exists but append mode is not support!";
@@ -68,7 +76,7 @@ public class HadoopFileWriter implements FileWriter, 
Closeable {
       LOG.error(msg);
       throw new IllegalStateException(msg);
     } else {
-      fsDataOutputStream = writerFs.create(path);
+      fsDataOutputStream = writerFs.create(path, true, bufferSize);
       nextOffset = fsDataOutputStream.getPos();
     }
   }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
index b5deb78b1..54f44bc98 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
 import org.apache.uniffle.common.util.ByteBufUtils;
@@ -41,6 +42,7 @@ public class HadoopShuffleWriteHandler implements 
ShuffleWriteHandler {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HadoopShuffleWriteHandler.class);
 
+  private RssBaseConf rssBaseConf;
   private Configuration hadoopConf;
   private String basePath;
   private String fileNamePrefix;
@@ -48,6 +50,8 @@ public class HadoopShuffleWriteHandler implements 
ShuffleWriteHandler {
   private int failTimes = 0;
   private String user;
   private FileSystem fileSystem;
+  private final int dataBufferSize;
+  private final int indexBufferSize;
 
   // Only for test cases when using non-kerberized dfs cluster.
   @VisibleForTesting
@@ -60,16 +64,43 @@ public class HadoopShuffleWriteHandler implements 
ShuffleWriteHandler {
       String fileNamePrefix,
       Configuration hadoopConf)
       throws Exception {
-    this.hadoopConf = hadoopConf;
-    this.fileNamePrefix = fileNamePrefix;
-    this.basePath =
-        ShuffleStorageUtils.getFullShuffleDataFolder(
-            storageBasePath,
-            ShuffleStorageUtils.getShuffleDataPath(appId, shuffleId, 
startPartition, endPartition));
-    initialize();
+    this(
+        new RssBaseConf(),
+        appId,
+        shuffleId,
+        startPartition,
+        endPartition,
+        storageBasePath,
+        fileNamePrefix,
+        hadoopConf,
+        "");
+  }
+
+  @VisibleForTesting
+  public HadoopShuffleWriteHandler(
+      String appId,
+      int shuffleId,
+      int startPartition,
+      int endPartition,
+      String storageBasePath,
+      String fileNamePrefix,
+      Configuration hadoopConf,
+      String user)
+      throws Exception {
+    this(
+        new RssBaseConf(),
+        appId,
+        shuffleId,
+        startPartition,
+        endPartition,
+        storageBasePath,
+        fileNamePrefix,
+        hadoopConf,
+        user);
   }
 
   public HadoopShuffleWriteHandler(
+      RssBaseConf rssBaseConf,
       String appId,
       int shuffleId,
       int startPartition,
@@ -79,6 +110,7 @@ public class HadoopShuffleWriteHandler implements 
ShuffleWriteHandler {
       Configuration hadoopConf,
       String user)
       throws Exception {
+    this.rssBaseConf = rssBaseConf;
     this.hadoopConf = hadoopConf;
     this.fileNamePrefix = fileNamePrefix;
     this.basePath =
@@ -86,6 +118,16 @@ public class HadoopShuffleWriteHandler implements 
ShuffleWriteHandler {
             storageBasePath,
             ShuffleStorageUtils.getShuffleDataPath(appId, shuffleId, 
startPartition, endPartition));
     this.user = user;
+    this.dataBufferSize =
+        (int)
+            this.rssBaseConf.getSizeAsBytes(
+                RssBaseConf.RSS_STORAGE_HDFS_WRITE_DATA_BUFFER_SIZE.key(),
+                
RssBaseConf.RSS_STORAGE_HDFS_WRITE_DATA_BUFFER_SIZE.defaultValue());
+    this.indexBufferSize =
+        (int)
+            this.rssBaseConf.getSizeAsBytes(
+                RssBaseConf.RSS_STORAGE_HDFS_WRITE_INDEX_BUFFER_SIZE.key(),
+                
RssBaseConf.RSS_STORAGE_HDFS_WRITE_INDEX_BUFFER_SIZE.defaultValue());
     initialize();
   }
 
@@ -120,8 +162,8 @@ public class HadoopShuffleWriteHandler implements 
ShuffleWriteHandler {
           ShuffleStorageUtils.generateDataFileName(fileNamePrefix + "_" + 
failTimes);
       String indexFileName =
           ShuffleStorageUtils.generateIndexFileName(fileNamePrefix + "_" + 
failTimes);
-      try (HadoopFileWriter dataWriter = createWriter(dataFileName);
-          HadoopFileWriter indexWriter = createWriter(indexFileName)) {
+      try (HadoopFileWriter dataWriter = createWriter(dataFileName, 
dataBufferSize);
+          HadoopFileWriter indexWriter = createWriter(indexFileName, 
indexBufferSize)) {
         for (ShufflePartitionedBlock block : shuffleBlocks) {
           long blockId = block.getBlockId();
           long crc = block.getCrc();
@@ -175,6 +217,13 @@ public class HadoopShuffleWriteHandler implements 
ShuffleWriteHandler {
     return writer;
   }
 
+  public HadoopFileWriter createWriter(String fileName, int bufferSize)
+      throws IOException, IllegalStateException {
+    Path path = new Path(basePath, fileName);
+    HadoopFileWriter writer = new HadoopFileWriter(fileSystem, path, 
hadoopConf, bufferSize);
+    return writer;
+  }
+
   @VisibleForTesting
   public void setFailTimes(int failTimes) {
     this.failTimes = failTimes;
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
index 2b398fd92..8b927a7a9 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
@@ -61,13 +61,13 @@ public class LocalFileWriteHandler implements 
ShuffleWriteHandler {
     this.dataBufferSize =
         (int)
             this.rssBaseConf.getSizeAsBytes(
-                RssBaseConf.RSS_STORAGE_WRITE_DATA_BUFFER_SIZE.key(),
-                RssBaseConf.RSS_STORAGE_WRITE_DATA_BUFFER_SIZE.defaultValue());
+                RssBaseConf.RSS_STORAGE_LOCALFILE_WRITE_DATA_BUFFER_SIZE.key(),
+                
RssBaseConf.RSS_STORAGE_LOCALFILE_WRITE_DATA_BUFFER_SIZE.defaultValue());
     this.indexBufferSize =
         (int)
             this.rssBaseConf.getSizeAsBytes(
-                RssBaseConf.RSS_STORAGE_WRITE_INDEX_BUFFER_SIZE.key(),
-                
RssBaseConf.RSS_STORAGE_WRITE_INDEX_BUFFER_SIZE.defaultValue());
+                
RssBaseConf.RSS_STORAGE_LOCALFILE_WRITE_INDEX_BUFFER_SIZE.key(),
+                
RssBaseConf.RSS_STORAGE_LOCALFILE_WRITE_INDEX_BUFFER_SIZE.defaultValue());
     createBasePath();
   }
 
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java
index f47bd2bce..f46ebfdb2 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
 import org.apache.uniffle.storage.util.ShuffleStorageUtils;
@@ -70,6 +71,7 @@ public class PooledHadoopShuffleWriteHandler implements 
ShuffleWriteHandler {
   }
 
   public PooledHadoopShuffleWriteHandler(
+      RssBaseConf rssBaseConf,
       String appId,
       int shuffleId,
       int startPartition,
@@ -90,6 +92,7 @@ public class PooledHadoopShuffleWriteHandler implements 
ShuffleWriteHandler {
         index -> {
           try {
             return new HadoopShuffleWriteHandler(
+                rssBaseConf,
                 appId,
                 shuffleId,
                 startPartition,

Reply via email to