This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 5b4b31086 [CELEBORN-2122] Avoiding multiple accesses to HDFS when
retrieving in…
5b4b31086 is described below
commit 5b4b310861888b51f07655ad7e5401ea14a56abe
Author: xxx <[email protected]>
AuthorDate: Wed Aug 27 14:16:59 2025 +0800
[CELEBORN-2122] Avoiding multiple accesses to HDFS when retrieving in…
…dexes in DfsPartitionReader
### What changes were proposed in this pull request?
Avoiding multiple accesses to HDFS when retrieving indexes in
DfsPartitionReader
### Why are the changes needed?
This optimization method improves read performance by reducing the number
of interactions with HDFS, merging multiple small I/O operations into a single
large I/O operation. Especially when index files are small, the strategy of
reading the entire file at once can significantly reduce the number of I/O
operations.
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
CI
Closes #3443 from xy2953396112/CELEBORN-2122.
Authored-by: xxx <[email protected]>
Signed-off-by: mingji <[email protected]>
(cherry picked from commit f590fb275d3b5a3630b0e734ada43417e0bf71a3)
Signed-off-by: mingji <[email protected]>
---
.../celeborn/client/read/DfsPartitionReader.java | 20 ++++++++++++--------
1 file changed, 12 insertions(+), 8 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
index e39fb7ce8..63136f00d 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
@@ -144,7 +144,7 @@ public class DfsPartitionReader implements PartitionReader {
} else {
dataFilePath = new Path(location.getStorageInfo().getFilePath());
dfsInputStream = hadoopFs.open(dataFilePath);
- chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(conf, location));
+ chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(location));
}
this.startChunkIndex = startChunkIndex == -1 ? 0 : startChunkIndex;
this.endChunkIndex =
@@ -181,15 +181,19 @@ public class DfsPartitionReader implements
PartitionReader {
}
}
- private List<Long> getChunkOffsetsFromUnsortedIndex(CelebornConf conf,
PartitionLocation location)
+ private List<Long> getChunkOffsetsFromUnsortedIndex(PartitionLocation
location)
throws IOException {
List<Long> offsets;
- try (FSDataInputStream indexInputStream =
- hadoopFs.open(new
Path(Utils.getIndexFilePath(location.getStorageInfo().getFilePath())))) {
- offsets = new ArrayList<>();
- int offsetCount = indexInputStream.readInt();
- for (int i = 0; i < offsetCount; i++) {
- offsets.add(indexInputStream.readLong());
+ String indexPath =
Utils.getIndexFilePath(location.getStorageInfo().getFilePath());
+ try (FSDataInputStream indexInputStream = hadoopFs.open(new
Path(indexPath))) {
+ long indexSize = hadoopFs.getFileStatus(new Path(indexPath)).getLen();
+ byte[] indexBuffer = new byte[(int) indexSize];
+ indexInputStream.readFully(0L, indexBuffer);
+ ByteBuffer buffer = ByteBuffer.wrap(indexBuffer);
+ int offsetSize = buffer.getInt();
+ offsets = new ArrayList<>(offsetSize);
+ for (int i = 0; i < offsetSize; i++) {
+ offsets.add(buffer.getLong());
}
}
return offsets;