This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 88dc1e5ac  [#2209] feat(server): Support config buffer size for 
LocalFileWriter (#2210)
88dc1e5ac is described below

commit 88dc1e5ac548be639372b4efe9fb0410e2030a20
Author: leewish <[email protected]>
AuthorDate: Mon Oct 21 18:14:21 2024 +0800

     [#2209] feat(server): Support config buffer size for LocalFileWriter 
(#2210)
    
    ### What changes were proposed in this pull request?
    
    Support config buffer size for LocalFileWriter
    
    ### Why are the changes needed?
    
    Fix: #2209
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Locally test.
    
    Co-authored-by: wenlongwlli <[email protected]>
---
 .../apache/uniffle/common/config/RssBaseConf.java  | 12 ++++++
 .../apache/uniffle/server/ShuffleFlushManager.java |  1 +
 .../uniffle/storage/common/LocalStorage.java       |  1 +
 .../handler/impl/LocalFileWriteHandler.java        | 43 ++++++++++++++++++++--
 .../storage/handler/impl/LocalFileWriter.java      |  9 ++++-
 .../request/CreateShuffleWriteHandlerRequest.java  | 12 ++++++
 6 files changed, 73 insertions(+), 5 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
index b2cd4e9d0..4e197204a 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
@@ -294,6 +294,18 @@ public class RssBaseConf extends RssConf {
                   + " first combining the username and the password with a 
colon (uniffle:uniffle123)"
                   + ", and then by encoding the resulting string in base64 
(dW5pZmZsZTp1bmlmZmxlMTIz).");
 
+  public static final ConfigOption<String> RSS_STORAGE_WRITE_DATA_BUFFER_SIZE =
+      ConfigOptions.key("rss.storage.write.dataBufferSize")
+          .stringType()
+          .defaultValue("8k")
+          .withDescription("The buffer size to cache the write data content.");
+
+  public static final ConfigOption<String> RSS_STORAGE_WRITE_INDEX_BUFFER_SIZE 
=
+      ConfigOptions.key("rss.storage.write.indexBufferSize")
+          .stringType()
+          .defaultValue("8k")
+          .withDescription("The buffer size to cache the write index 
content.");
+
   public boolean loadConfFromFile(String fileName, List<ConfigOption<Object>> 
configOptions) {
     Map<String, String> properties = RssUtils.getPropertiesFromFile(fileName);
     if (properties == null) {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 0d3d5ca0d..574b9ef0a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -158,6 +158,7 @@ public class ShuffleFlushManager {
       int maxConcurrencyPerPartitionToWrite = 
getMaxConcurrencyPerPartitionWrite(event);
       CreateShuffleWriteHandlerRequest request =
           new CreateShuffleWriteHandlerRequest(
+              this.shuffleServerConf,
               storageType,
               event.getAppId(),
               event.getShuffleId(),
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java 
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
index f2c79ac1d..e748608de 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
@@ -132,6 +132,7 @@ public class LocalStorage extends AbstractStorage {
   @Override
   ShuffleWriteHandler newWriteHandler(CreateShuffleWriteHandlerRequest 
request) {
     return new LocalFileWriteHandler(
+        request.getRssBaseConf(),
         request.getAppId(),
         request.getShuffleId(),
         request.getStartPartition(),
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
index 9f9fd0c5d..2b398fd92 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.ByteBufUtils;
 import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
@@ -37,24 +38,57 @@ public class LocalFileWriteHandler implements 
ShuffleWriteHandler {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(LocalFileWriteHandler.class);
 
+  private final RssBaseConf rssBaseConf;
   private String fileNamePrefix;
   private String basePath;
+  private final int dataBufferSize;
+  private final int indexBufferSize;
 
   public LocalFileWriteHandler(
+      RssBaseConf rssBaseConf,
       String appId,
       int shuffleId,
       int startPartition,
       int endPartition,
       String storageBasePath,
       String fileNamePrefix) {
+    this.rssBaseConf = rssBaseConf;
     this.fileNamePrefix = fileNamePrefix;
     this.basePath =
         ShuffleStorageUtils.getFullShuffleDataFolder(
             storageBasePath,
             ShuffleStorageUtils.getShuffleDataPath(appId, shuffleId, 
startPartition, endPartition));
+    this.dataBufferSize =
+        (int)
+            this.rssBaseConf.getSizeAsBytes(
+                RssBaseConf.RSS_STORAGE_WRITE_DATA_BUFFER_SIZE.key(),
+                RssBaseConf.RSS_STORAGE_WRITE_DATA_BUFFER_SIZE.defaultValue());
+    this.indexBufferSize =
+        (int)
+            this.rssBaseConf.getSizeAsBytes(
+                RssBaseConf.RSS_STORAGE_WRITE_INDEX_BUFFER_SIZE.key(),
+                
RssBaseConf.RSS_STORAGE_WRITE_INDEX_BUFFER_SIZE.defaultValue());
     createBasePath();
   }
 
+  @VisibleForTesting
+  public LocalFileWriteHandler(
+      String appId,
+      int shuffleId,
+      int startPartition,
+      int endPartition,
+      String storageBasePath,
+      String fileNamePrefix) {
+    this(
+        new RssBaseConf(),
+        appId,
+        shuffleId,
+        startPartition,
+        endPartition,
+        storageBasePath,
+        fileNamePrefix);
+  }
+
   private void createBasePath() {
     File baseFolder = new File(basePath);
     if (baseFolder.isDirectory()) {
@@ -96,8 +130,8 @@ public class LocalFileWriteHandler implements 
ShuffleWriteHandler {
     String dataFileName = 
ShuffleStorageUtils.generateDataFileName(fileNamePrefix);
     String indexFileName = 
ShuffleStorageUtils.generateIndexFileName(fileNamePrefix);
 
-    try (LocalFileWriter dataWriter = createWriter(dataFileName);
-        LocalFileWriter indexWriter = createWriter(indexFileName)) {
+    try (LocalFileWriter dataWriter = createWriter(dataFileName, 
dataBufferSize);
+        LocalFileWriter indexWriter = createWriter(indexFileName, 
indexBufferSize); ) {
 
       long startTime = System.currentTimeMillis();
       for (ShufflePartitionedBlock block : shuffleBlocks) {
@@ -131,9 +165,10 @@ public class LocalFileWriteHandler implements 
ShuffleWriteHandler {
     }
   }
 
-  private LocalFileWriter createWriter(String fileName) throws IOException, 
IllegalStateException {
+  private LocalFileWriter createWriter(String fileName, int bufferSize)
+      throws IOException, IllegalStateException {
     File file = new File(basePath, fileName);
-    return new LocalFileWriter(file);
+    return new LocalFileWriter(file, bufferSize);
   }
 
   @VisibleForTesting
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriter.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriter.java
index 01c188f3f..5d5aae7b9 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriter.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriter.java
@@ -24,6 +24,8 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.uniffle.storage.api.FileWriter;
 import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
 
@@ -33,10 +35,15 @@ public class LocalFileWriter implements FileWriter, 
Closeable {
   private FileOutputStream fileOutputStream;
   private long nextOffset;
 
+  @VisibleForTesting
   public LocalFileWriter(File file) throws IOException {
+    this(file, 8 * 1024);
+  }
+
+  public LocalFileWriter(File file, int bufferSize) throws IOException {
     fileOutputStream = new FileOutputStream(file, true);
     // init fsDataOutputStream
-    dataOutputStream = new DataOutputStream(new 
BufferedOutputStream(fileOutputStream));
+    dataOutputStream = new DataOutputStream(new 
BufferedOutputStream(fileOutputStream, bufferSize));
     nextOffset = file.length();
   }
 
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleWriteHandlerRequest.java
 
b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleWriteHandlerRequest.java
index 0d9c21f49..7ed7e6f50 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleWriteHandlerRequest.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleWriteHandlerRequest.java
@@ -17,10 +17,14 @@
 
 package org.apache.uniffle.storage.request;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 
+import org.apache.uniffle.common.config.RssBaseConf;
+
 public class CreateShuffleWriteHandlerRequest {
 
+  private RssBaseConf rssBaseConf;
   private String storageType;
   private String appId;
   private int shuffleId;
@@ -33,6 +37,7 @@ public class CreateShuffleWriteHandlerRequest {
   private String user;
   private int maxFileNumber;
 
+  @VisibleForTesting
   public CreateShuffleWriteHandlerRequest(
       String storageType,
       String appId,
@@ -45,6 +50,7 @@ public class CreateShuffleWriteHandlerRequest {
       int storageDataReplica,
       String user) {
     this(
+        new RssBaseConf(),
         storageType,
         appId,
         shuffleId,
@@ -59,6 +65,7 @@ public class CreateShuffleWriteHandlerRequest {
   }
 
   public CreateShuffleWriteHandlerRequest(
+      RssBaseConf rssBaseConf,
       String storageType,
       String appId,
       int shuffleId,
@@ -70,6 +77,7 @@ public class CreateShuffleWriteHandlerRequest {
       int storageDataReplica,
       String user,
       int maxFileNumber) {
+    this.rssBaseConf = rssBaseConf;
     this.storageType = storageType;
     this.appId = appId;
     this.shuffleId = shuffleId;
@@ -83,6 +91,10 @@ public class CreateShuffleWriteHandlerRequest {
     this.maxFileNumber = maxFileNumber;
   }
 
+  public RssBaseConf getRssBaseConf() {
+    return rssBaseConf;
+  }
+
   public String getStorageType() {
     return storageType;
   }

Reply via email to