This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 88dc1e5ac [#2209] feat(server): Support config buffer size for
LocalFileWriter (#2210)
88dc1e5ac is described below
commit 88dc1e5ac548be639372b4efe9fb0410e2030a20
Author: leewish <[email protected]>
AuthorDate: Mon Oct 21 18:14:21 2024 +0800
[#2209] feat(server): Support config buffer size for LocalFileWriter
(#2210)
### What changes were proposed in this pull request?
Support config buffer size for LocalFileWriter
### Why are the changes needed?
Fix: #2209
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Locally test.
Co-authored-by: wenlongwlli <[email protected]>
---
.../apache/uniffle/common/config/RssBaseConf.java | 12 ++++++
.../apache/uniffle/server/ShuffleFlushManager.java | 1 +
.../uniffle/storage/common/LocalStorage.java | 1 +
.../handler/impl/LocalFileWriteHandler.java | 43 ++++++++++++++++++++--
.../storage/handler/impl/LocalFileWriter.java | 9 ++++-
.../request/CreateShuffleWriteHandlerRequest.java | 12 ++++++
6 files changed, 73 insertions(+), 5 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
index b2cd4e9d0..4e197204a 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
@@ -294,6 +294,18 @@ public class RssBaseConf extends RssConf {
+ " first combining the username and the password with a
colon (uniffle:uniffle123)"
+ ", and then by encoding the resulting string in base64
(dW5pZmZsZTp1bmlmZmxlMTIz).");
+ public static final ConfigOption<String> RSS_STORAGE_WRITE_DATA_BUFFER_SIZE =
+ ConfigOptions.key("rss.storage.write.dataBufferSize")
+ .stringType()
+ .defaultValue("8k")
+ .withDescription("The buffer size to cache the write data content.");
+
+ public static final ConfigOption<String> RSS_STORAGE_WRITE_INDEX_BUFFER_SIZE
=
+ ConfigOptions.key("rss.storage.write.indexBufferSize")
+ .stringType()
+ .defaultValue("8k")
+ .withDescription("The buffer size to cache the write index
content.");
+
public boolean loadConfFromFile(String fileName, List<ConfigOption<Object>>
configOptions) {
Map<String, String> properties = RssUtils.getPropertiesFromFile(fileName);
if (properties == null) {
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 0d3d5ca0d..574b9ef0a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -158,6 +158,7 @@ public class ShuffleFlushManager {
int maxConcurrencyPerPartitionToWrite =
getMaxConcurrencyPerPartitionWrite(event);
CreateShuffleWriteHandlerRequest request =
new CreateShuffleWriteHandlerRequest(
+ this.shuffleServerConf,
storageType,
event.getAppId(),
event.getShuffleId(),
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
index f2c79ac1d..e748608de 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
@@ -132,6 +132,7 @@ public class LocalStorage extends AbstractStorage {
@Override
ShuffleWriteHandler newWriteHandler(CreateShuffleWriteHandlerRequest
request) {
return new LocalFileWriteHandler(
+ request.getRssBaseConf(),
request.getAppId(),
request.getShuffleId(),
request.getStartPartition(),
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
index 9f9fd0c5d..2b398fd92 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.ByteBufUtils;
import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
@@ -37,24 +38,57 @@ public class LocalFileWriteHandler implements
ShuffleWriteHandler {
private static final Logger LOG =
LoggerFactory.getLogger(LocalFileWriteHandler.class);
+ private final RssBaseConf rssBaseConf;
private String fileNamePrefix;
private String basePath;
+ private final int dataBufferSize;
+ private final int indexBufferSize;
public LocalFileWriteHandler(
+ RssBaseConf rssBaseConf,
String appId,
int shuffleId,
int startPartition,
int endPartition,
String storageBasePath,
String fileNamePrefix) {
+ this.rssBaseConf = rssBaseConf;
this.fileNamePrefix = fileNamePrefix;
this.basePath =
ShuffleStorageUtils.getFullShuffleDataFolder(
storageBasePath,
ShuffleStorageUtils.getShuffleDataPath(appId, shuffleId,
startPartition, endPartition));
+ this.dataBufferSize =
+ (int)
+ this.rssBaseConf.getSizeAsBytes(
+ RssBaseConf.RSS_STORAGE_WRITE_DATA_BUFFER_SIZE.key(),
+ RssBaseConf.RSS_STORAGE_WRITE_DATA_BUFFER_SIZE.defaultValue());
+ this.indexBufferSize =
+ (int)
+ this.rssBaseConf.getSizeAsBytes(
+ RssBaseConf.RSS_STORAGE_WRITE_INDEX_BUFFER_SIZE.key(),
+
RssBaseConf.RSS_STORAGE_WRITE_INDEX_BUFFER_SIZE.defaultValue());
createBasePath();
}
+ @VisibleForTesting
+ public LocalFileWriteHandler(
+ String appId,
+ int shuffleId,
+ int startPartition,
+ int endPartition,
+ String storageBasePath,
+ String fileNamePrefix) {
+ this(
+ new RssBaseConf(),
+ appId,
+ shuffleId,
+ startPartition,
+ endPartition,
+ storageBasePath,
+ fileNamePrefix);
+ }
+
private void createBasePath() {
File baseFolder = new File(basePath);
if (baseFolder.isDirectory()) {
@@ -96,8 +130,8 @@ public class LocalFileWriteHandler implements
ShuffleWriteHandler {
String dataFileName =
ShuffleStorageUtils.generateDataFileName(fileNamePrefix);
String indexFileName =
ShuffleStorageUtils.generateIndexFileName(fileNamePrefix);
- try (LocalFileWriter dataWriter = createWriter(dataFileName);
- LocalFileWriter indexWriter = createWriter(indexFileName)) {
+ try (LocalFileWriter dataWriter = createWriter(dataFileName,
dataBufferSize);
+ LocalFileWriter indexWriter = createWriter(indexFileName,
indexBufferSize); ) {
long startTime = System.currentTimeMillis();
for (ShufflePartitionedBlock block : shuffleBlocks) {
@@ -131,9 +165,10 @@ public class LocalFileWriteHandler implements
ShuffleWriteHandler {
}
}
- private LocalFileWriter createWriter(String fileName) throws IOException,
IllegalStateException {
+ private LocalFileWriter createWriter(String fileName, int bufferSize)
+ throws IOException, IllegalStateException {
File file = new File(basePath, fileName);
- return new LocalFileWriter(file);
+ return new LocalFileWriter(file, bufferSize);
}
@VisibleForTesting
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriter.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriter.java
index 01c188f3f..5d5aae7b9 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriter.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriter.java
@@ -24,6 +24,8 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.uniffle.storage.api.FileWriter;
import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
@@ -33,10 +35,15 @@ public class LocalFileWriter implements FileWriter,
Closeable {
private FileOutputStream fileOutputStream;
private long nextOffset;
+ @VisibleForTesting
public LocalFileWriter(File file) throws IOException {
+ this(file, 8 * 1024);
+ }
+
+ public LocalFileWriter(File file, int bufferSize) throws IOException {
fileOutputStream = new FileOutputStream(file, true);
// init fsDataOutputStream
- dataOutputStream = new DataOutputStream(new
BufferedOutputStream(fileOutputStream));
+ dataOutputStream = new DataOutputStream(new
BufferedOutputStream(fileOutputStream, bufferSize));
nextOffset = file.length();
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleWriteHandlerRequest.java
b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleWriteHandlerRequest.java
index 0d9c21f49..7ed7e6f50 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleWriteHandlerRequest.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleWriteHandlerRequest.java
@@ -17,10 +17,14 @@
package org.apache.uniffle.storage.request;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
+import org.apache.uniffle.common.config.RssBaseConf;
+
public class CreateShuffleWriteHandlerRequest {
+ private RssBaseConf rssBaseConf;
private String storageType;
private String appId;
private int shuffleId;
@@ -33,6 +37,7 @@ public class CreateShuffleWriteHandlerRequest {
private String user;
private int maxFileNumber;
+ @VisibleForTesting
public CreateShuffleWriteHandlerRequest(
String storageType,
String appId,
@@ -45,6 +50,7 @@ public class CreateShuffleWriteHandlerRequest {
int storageDataReplica,
String user) {
this(
+ new RssBaseConf(),
storageType,
appId,
shuffleId,
@@ -59,6 +65,7 @@ public class CreateShuffleWriteHandlerRequest {
}
public CreateShuffleWriteHandlerRequest(
+ RssBaseConf rssBaseConf,
String storageType,
String appId,
int shuffleId,
@@ -70,6 +77,7 @@ public class CreateShuffleWriteHandlerRequest {
int storageDataReplica,
String user,
int maxFileNumber) {
+ this.rssBaseConf = rssBaseConf;
this.storageType = storageType;
this.appId = appId;
this.shuffleId = shuffleId;
@@ -83,6 +91,10 @@ public class CreateShuffleWriteHandlerRequest {
this.maxFileNumber = maxFileNumber;
}
+ public RssBaseConf getRssBaseConf() {
+ return rssBaseConf;
+ }
+
public String getStorageType() {
return storageType;
}