This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 2b70eb4de feat(server) Support config buffer size for
HadoopShuffleWriteHandler (#2216)
2b70eb4de is described below
commit 2b70eb4de01464aee79fe96c5bc734b97b5cbcf5
Author: leewish <[email protected]>
AuthorDate: Tue Oct 22 19:19:55 2024 +0800
feat(server) Support config buffer size for HadoopShuffleWriteHandler
(#2216)
### What changes were proposed in this pull request?
Support config buffer size for HadoopShuffleWriteHandler
### Why are the changes needed?
Fix: #2209
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Locally test.
Co-authored-by: wenlongwlli <[email protected]>
---
.../apache/uniffle/common/config/RssBaseConf.java | 24 ++++++--
.../uniffle/storage/common/HadoopStorage.java | 2 +
.../storage/handler/impl/HadoopFileWriter.java | 16 ++++--
.../handler/impl/HadoopShuffleWriteHandler.java | 67 +++++++++++++++++++---
.../handler/impl/LocalFileWriteHandler.java | 8 +--
.../impl/PooledHadoopShuffleWriteHandler.java | 3 +
6 files changed, 97 insertions(+), 23 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
index 4e197204a..cba44578a 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
@@ -294,17 +294,29 @@ public class RssBaseConf extends RssConf {
+ " first combining the username and the password with a
colon (uniffle:uniffle123)"
+ ", and then by encoding the resulting string in base64
(dW5pZmZsZTp1bmlmZmxlMTIz).");
- public static final ConfigOption<String> RSS_STORAGE_WRITE_DATA_BUFFER_SIZE =
- ConfigOptions.key("rss.storage.write.dataBufferSize")
+ public static final ConfigOption<String>
RSS_STORAGE_LOCALFILE_WRITE_DATA_BUFFER_SIZE =
+ ConfigOptions.key("rss.storage.localfile.write.dataBufferSize")
.stringType()
.defaultValue("8k")
- .withDescription("The buffer size to cache the write data content.");
+ .withDescription("The buffer size to cache the write data content
for LOCALFILE.");
- public static final ConfigOption<String> RSS_STORAGE_WRITE_INDEX_BUFFER_SIZE
=
- ConfigOptions.key("rss.storage.write.indexBufferSize")
+ public static final ConfigOption<String>
RSS_STORAGE_LOCALFILE_WRITE_INDEX_BUFFER_SIZE =
+ ConfigOptions.key("rss.storage.localfile.write.indexBufferSize")
.stringType()
.defaultValue("8k")
- .withDescription("The buffer size to cache the write index
content.");
+ .withDescription("The buffer size to cache the write index content
for LOCALFILE.");
+
+ public static final ConfigOption<String>
RSS_STORAGE_HDFS_WRITE_DATA_BUFFER_SIZE =
+ ConfigOptions.key("rss.storage.hdfs.write.dataBufferSize")
+ .stringType()
+ .defaultValue("8k")
+ .withDescription("The buffer size to cache the write data content
for HDFS.");
+
+ public static final ConfigOption<String>
RSS_STORAGE_HDFS_WRITE_INDEX_BUFFER_SIZE =
+ ConfigOptions.key("rss.storage.hdfs.write.indexBufferSize")
+ .stringType()
+ .defaultValue("8k")
+ .withDescription("The buffer size to cache the write index content
for HDFS.");
public boolean loadConfFromFile(String fileName, List<ConfigOption<Object>>
configOptions) {
Map<String, String> properties = RssUtils.getPropertiesFromFile(fileName);
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/common/HadoopStorage.java
b/storage/src/main/java/org/apache/uniffle/storage/common/HadoopStorage.java
index ab5186288..7665beac2 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/HadoopStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/HadoopStorage.java
@@ -83,6 +83,7 @@ public class HadoopStorage extends AbstractStorage {
String user = request.getUser();
if (request.getMaxFileNumber() == 1) {
return new HadoopShuffleWriteHandler(
+ request.getRssBaseConf(),
request.getAppId(),
request.getShuffleId(),
request.getStartPartition(),
@@ -93,6 +94,7 @@ public class HadoopStorage extends AbstractStorage {
user);
} else {
return new PooledHadoopShuffleWriteHandler(
+ request.getRssBaseConf(),
request.getAppId(),
request.getShuffleId(),
request.getStartPartition(),
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopFileWriter.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopFileWriter.java
index e6074f09d..d946f091c 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopFileWriter.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopFileWriter.java
@@ -22,6 +22,7 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -44,19 +45,26 @@ public class HadoopFileWriter implements FileWriter,
Closeable {
private FSDataOutputStream fsDataOutputStream;
private long nextOffset;
+ @VisibleForTesting
public HadoopFileWriter(FileSystem fileSystem, Path path, Configuration
hadoopConf)
throws IOException {
+ this(fileSystem, path, hadoopConf, 8 * 1024);
+ }
+
+ public HadoopFileWriter(
+ FileSystem fileSystem, Path path, Configuration hadoopConf, int
bufferSize)
+ throws IOException {
this.path = path;
this.hadoopConf = hadoopConf;
this.fileSystem = fileSystem;
- initStream();
+ initStream(bufferSize);
}
- private void initStream() throws IOException, IllegalStateException {
+ private void initStream(int bufferSize) throws IOException,
IllegalStateException {
final FileSystem writerFs = fileSystem;
if (writerFs.isFile(path)) {
if (hadoopConf.getBoolean("dfs.support.append", true)) {
- fsDataOutputStream = writerFs.append(path);
+ fsDataOutputStream = writerFs.append(path, bufferSize);
nextOffset = fsDataOutputStream.getPos();
} else {
String msg = path + " exists but append mode is not support!";
@@ -68,7 +76,7 @@ public class HadoopFileWriter implements FileWriter,
Closeable {
LOG.error(msg);
throw new IllegalStateException(msg);
} else {
- fsDataOutputStream = writerFs.create(path);
+ fsDataOutputStream = writerFs.create(path, true, bufferSize);
nextOffset = fsDataOutputStream.getPos();
}
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
index b5deb78b1..54f44bc98 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleWriteHandler.java
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
import org.apache.uniffle.common.util.ByteBufUtils;
@@ -41,6 +42,7 @@ public class HadoopShuffleWriteHandler implements
ShuffleWriteHandler {
private static final Logger LOG =
LoggerFactory.getLogger(HadoopShuffleWriteHandler.class);
+ private RssBaseConf rssBaseConf;
private Configuration hadoopConf;
private String basePath;
private String fileNamePrefix;
@@ -48,6 +50,8 @@ public class HadoopShuffleWriteHandler implements
ShuffleWriteHandler {
private int failTimes = 0;
private String user;
private FileSystem fileSystem;
+ private final int dataBufferSize;
+ private final int indexBufferSize;
// Only for test cases when using non-kerberized dfs cluster.
@VisibleForTesting
@@ -60,16 +64,43 @@ public class HadoopShuffleWriteHandler implements
ShuffleWriteHandler {
String fileNamePrefix,
Configuration hadoopConf)
throws Exception {
- this.hadoopConf = hadoopConf;
- this.fileNamePrefix = fileNamePrefix;
- this.basePath =
- ShuffleStorageUtils.getFullShuffleDataFolder(
- storageBasePath,
- ShuffleStorageUtils.getShuffleDataPath(appId, shuffleId,
startPartition, endPartition));
- initialize();
+ this(
+ new RssBaseConf(),
+ appId,
+ shuffleId,
+ startPartition,
+ endPartition,
+ storageBasePath,
+ fileNamePrefix,
+ hadoopConf,
+ "");
+ }
+
+ @VisibleForTesting
+ public HadoopShuffleWriteHandler(
+ String appId,
+ int shuffleId,
+ int startPartition,
+ int endPartition,
+ String storageBasePath,
+ String fileNamePrefix,
+ Configuration hadoopConf,
+ String user)
+ throws Exception {
+ this(
+ new RssBaseConf(),
+ appId,
+ shuffleId,
+ startPartition,
+ endPartition,
+ storageBasePath,
+ fileNamePrefix,
+ hadoopConf,
+ user);
}
public HadoopShuffleWriteHandler(
+ RssBaseConf rssBaseConf,
String appId,
int shuffleId,
int startPartition,
@@ -79,6 +110,7 @@ public class HadoopShuffleWriteHandler implements
ShuffleWriteHandler {
Configuration hadoopConf,
String user)
throws Exception {
+ this.rssBaseConf = rssBaseConf;
this.hadoopConf = hadoopConf;
this.fileNamePrefix = fileNamePrefix;
this.basePath =
@@ -86,6 +118,16 @@ public class HadoopShuffleWriteHandler implements
ShuffleWriteHandler {
storageBasePath,
ShuffleStorageUtils.getShuffleDataPath(appId, shuffleId,
startPartition, endPartition));
this.user = user;
+ this.dataBufferSize =
+ (int)
+ this.rssBaseConf.getSizeAsBytes(
+ RssBaseConf.RSS_STORAGE_HDFS_WRITE_DATA_BUFFER_SIZE.key(),
+
RssBaseConf.RSS_STORAGE_HDFS_WRITE_DATA_BUFFER_SIZE.defaultValue());
+ this.indexBufferSize =
+ (int)
+ this.rssBaseConf.getSizeAsBytes(
+ RssBaseConf.RSS_STORAGE_HDFS_WRITE_INDEX_BUFFER_SIZE.key(),
+
RssBaseConf.RSS_STORAGE_HDFS_WRITE_INDEX_BUFFER_SIZE.defaultValue());
initialize();
}
@@ -120,8 +162,8 @@ public class HadoopShuffleWriteHandler implements
ShuffleWriteHandler {
ShuffleStorageUtils.generateDataFileName(fileNamePrefix + "_" +
failTimes);
String indexFileName =
ShuffleStorageUtils.generateIndexFileName(fileNamePrefix + "_" +
failTimes);
- try (HadoopFileWriter dataWriter = createWriter(dataFileName);
- HadoopFileWriter indexWriter = createWriter(indexFileName)) {
+ try (HadoopFileWriter dataWriter = createWriter(dataFileName,
dataBufferSize);
+ HadoopFileWriter indexWriter = createWriter(indexFileName,
indexBufferSize)) {
for (ShufflePartitionedBlock block : shuffleBlocks) {
long blockId = block.getBlockId();
long crc = block.getCrc();
@@ -175,6 +217,13 @@ public class HadoopShuffleWriteHandler implements
ShuffleWriteHandler {
return writer;
}
+ public HadoopFileWriter createWriter(String fileName, int bufferSize)
+ throws IOException, IllegalStateException {
+ Path path = new Path(basePath, fileName);
+ HadoopFileWriter writer = new HadoopFileWriter(fileSystem, path,
hadoopConf, bufferSize);
+ return writer;
+ }
+
@VisibleForTesting
public void setFailTimes(int failTimes) {
this.failTimes = failTimes;
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
index 2b398fd92..8b927a7a9 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java
@@ -61,13 +61,13 @@ public class LocalFileWriteHandler implements
ShuffleWriteHandler {
this.dataBufferSize =
(int)
this.rssBaseConf.getSizeAsBytes(
- RssBaseConf.RSS_STORAGE_WRITE_DATA_BUFFER_SIZE.key(),
- RssBaseConf.RSS_STORAGE_WRITE_DATA_BUFFER_SIZE.defaultValue());
+ RssBaseConf.RSS_STORAGE_LOCALFILE_WRITE_DATA_BUFFER_SIZE.key(),
+
RssBaseConf.RSS_STORAGE_LOCALFILE_WRITE_DATA_BUFFER_SIZE.defaultValue());
this.indexBufferSize =
(int)
this.rssBaseConf.getSizeAsBytes(
- RssBaseConf.RSS_STORAGE_WRITE_INDEX_BUFFER_SIZE.key(),
-
RssBaseConf.RSS_STORAGE_WRITE_INDEX_BUFFER_SIZE.defaultValue());
+
RssBaseConf.RSS_STORAGE_LOCALFILE_WRITE_INDEX_BUFFER_SIZE.key(),
+
RssBaseConf.RSS_STORAGE_LOCALFILE_WRITE_INDEX_BUFFER_SIZE.defaultValue());
createBasePath();
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java
index f47bd2bce..f46ebfdb2 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHadoopShuffleWriteHandler.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
@@ -70,6 +71,7 @@ public class PooledHadoopShuffleWriteHandler implements
ShuffleWriteHandler {
}
public PooledHadoopShuffleWriteHandler(
+ RssBaseConf rssBaseConf,
String appId,
int shuffleId,
int startPartition,
@@ -90,6 +92,7 @@ public class PooledHadoopShuffleWriteHandler implements
ShuffleWriteHandler {
index -> {
try {
return new HadoopShuffleWriteHandler(
+ rssBaseConf,
appId,
shuffleId,
startPartition,