This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 5b4b31086 [CELEBORN-2122] Avoiding multiple accesses to HDFS when 
retrieving in…
5b4b31086 is described below

commit 5b4b310861888b51f07655ad7e5401ea14a56abe
Author: xxx <[email protected]>
AuthorDate: Wed Aug 27 14:16:59 2025 +0800

    [CELEBORN-2122] Avoiding multiple accesses to HDFS when retrieving in…
    
    …dexes in DfsPartitionReader
    
    ### What changes were proposed in this pull request?
    
    Avoiding multiple accesses to HDFS when retrieving indexes in 
DfsPartitionReader
    
    ### Why are the changes needed?
    
    This optimization method improves read performance by reducing the number 
of interactions with HDFS, merging multiple small I/O operations into a single 
large I/O operation. Especially when index files are small, the strategy of 
reading the entire file at once can significantly reduce the number of I/O 
operations.
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    
    ### How was this patch tested?
    
    CI
    
    Closes #3443 from xy2953396112/CELEBORN-2122.
    
    Authored-by: xxx <[email protected]>
    Signed-off-by: mingji <[email protected]>
    (cherry picked from commit f590fb275d3b5a3630b0e734ada43417e0bf71a3)
    Signed-off-by: mingji <[email protected]>
---
 .../celeborn/client/read/DfsPartitionReader.java     | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
index e39fb7ce8..63136f00d 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
@@ -144,7 +144,7 @@ public class DfsPartitionReader implements PartitionReader {
     } else {
       dataFilePath = new Path(location.getStorageInfo().getFilePath());
       dfsInputStream = hadoopFs.open(dataFilePath);
-      chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(conf, location));
+      chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(location));
     }
     this.startChunkIndex = startChunkIndex == -1 ? 0 : startChunkIndex;
     this.endChunkIndex =
@@ -181,15 +181,19 @@ public class DfsPartitionReader implements 
PartitionReader {
     }
   }
 
-  private List<Long> getChunkOffsetsFromUnsortedIndex(CelebornConf conf, 
PartitionLocation location)
+  private List<Long> getChunkOffsetsFromUnsortedIndex(PartitionLocation 
location)
       throws IOException {
     List<Long> offsets;
-    try (FSDataInputStream indexInputStream =
-        hadoopFs.open(new 
Path(Utils.getIndexFilePath(location.getStorageInfo().getFilePath())))) {
-      offsets = new ArrayList<>();
-      int offsetCount = indexInputStream.readInt();
-      for (int i = 0; i < offsetCount; i++) {
-        offsets.add(indexInputStream.readLong());
+    String indexPath = 
Utils.getIndexFilePath(location.getStorageInfo().getFilePath());
+    try (FSDataInputStream indexInputStream = hadoopFs.open(new 
Path(indexPath))) {
+      long indexSize = hadoopFs.getFileStatus(new Path(indexPath)).getLen();
+      byte[] indexBuffer = new byte[(int) indexSize];
+      indexInputStream.readFully(0L, indexBuffer);
+      ByteBuffer buffer = ByteBuffer.wrap(indexBuffer);
+      int offsetSize = buffer.getInt();
+      offsets = new ArrayList<>(offsetSize);
+      for (int i = 0; i < offsetSize; i++) {
+        offsets.add(buffer.getLong());
       }
     }
     return offsets;

Reply via email to