This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new d10080238 [CELEBORN-2029][FLINK] Some minor optimizations in the Flink 
integration
d10080238 is described below

commit d10080238902ae1ee9f2859021d5ac01567c59f5
Author: codenohup <[email protected]>
AuthorDate: Mon Jun 9 16:37:40 2025 +0800

    [CELEBORN-2029][FLINK] Some minor optimizations in the Flink integration
    
    ### What changes were proposed in this pull request?
    Some minor performance optimizations in the internal implementation
    
    ### Why are the changes needed?
    During our use of Flink with Celeborn, we identified several minor 
optimizations that can be made:
    
    1. In the client side, the Flink-Celeborn client parses the 
`pushDataTimeout` configuration too frequently, which is unnecessary and 
cpu-intensive.
    
    2. On the worker side, Celeborn needs to filter readers that are able for 
reading. However, using Java Stream's collection operations is costly in terms 
of performance.
    
    3. Also on the worker side, Celeborn currently checks whether a reader can 
continue reading by comparing the current read offset with the total file size. 
This check involves retrieving the total file size, which is an expensive 
operation. Since this value is constant, it should be cached in memory instead 
of being fetched multiple times.
    
    4. In the Flink’s hybrid shuffle integration, the `EndOfSegment` event 
should not be bundled with data buffers. If it is, there is a risk of data 
corruption or misinterpretation.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Manual test.
    
    Closes #3318 from codenohup/CELEBORN-2029.
    
    Authored-by: codenohup <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
    (cherry picked from commit feba7baec67f29330dc43a7d6382c3b9066194e8)
    Signed-off-by: SteNicholas <[email protected]>
---
 .../plugin/flink/client/FlinkShuffleClientImpl.java     | 10 +++++-----
 .../plugin/flink/tiered/CelebornTierProducerAgent.java  |  3 +++
 .../plugin/flink/tiered/CelebornTierProducerAgent.java  |  3 +++
 .../service/deploy/worker/storage/MapPartitionData.java | 17 ++++++++++-------
 .../deploy/worker/storage/MapPartitionDataReader.java   | 13 +++++++++----
 5 files changed, 30 insertions(+), 16 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
index 9886620c3..2cb9df565 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
@@ -238,7 +238,7 @@ public class FlinkShuffleClientImpl extends 
ShuffleClientImpl {
           partitionLocations,
           subPartitionIndexStart,
           subPartitionIndexEnd,
-          conf.pushDataTimeoutMs());
+          pushDataTimeout);
     }
   }
 
@@ -448,7 +448,7 @@ public class FlinkShuffleClientImpl extends 
ShuffleClientImpl {
                                 .build()
                                 .toByteArray())
                         .toByteBuffer(),
-                    conf.pushDataTimeoutMs());
+                    pushDataTimeout);
           } catch (IOException e) {
             // ioexeption revive
             return revive(shuffleId, mapId, attemptId, location);
@@ -499,7 +499,7 @@ public class FlinkShuffleClientImpl extends 
ShuffleClientImpl {
                                 .build()
                                 .toByteArray())
                         .toByteBuffer(),
-                    conf.pushDataTimeoutMs());
+                    pushDataTimeout);
           } catch (IOException e) {
             // ioexeption revive
             return revive(shuffleId, mapId, attemptId, location);
@@ -580,7 +580,7 @@ public class FlinkShuffleClientImpl extends 
ShuffleClientImpl {
                           .build()
                           .toByteArray())
                   .toByteBuffer(),
-              conf.pushDataTimeoutMs());
+              pushDataTimeout);
           return null;
         });
   }
@@ -620,7 +620,7 @@ public class FlinkShuffleClientImpl extends 
ShuffleClientImpl {
                           .build()
                           .toByteArray())
                   .toByteBuffer(),
-              conf.pushDataTimeoutMs());
+              pushDataTimeout);
           return null;
         });
   }
diff --git 
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java
 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java
index 8cf9edebd..8ddb40f18 100644
--- 
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java
+++ 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java
@@ -441,6 +441,9 @@ public class CelebornTierProducerAgent implements 
TierProducerAgent {
               END_OF_SEGMENT,
               endSegmentMemorySegment.size());
       processBuffer(endOfSegmentBuffer, subPartitionId);
+      // drain the bufferPacker to ensure that the EndOfSegment event is not 
bundled with the data
+      // buffer
+      bufferPacker.drain();
     } catch (Exception e) {
       ExceptionUtils.rethrow(e, "Failed to append end of segment event.");
     }
diff --git 
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java
 
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java
index 8cf9edebd..8ddb40f18 100644
--- 
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java
+++ 
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java
@@ -441,6 +441,9 @@ public class CelebornTierProducerAgent implements 
TierProducerAgent {
               END_OF_SEGMENT,
               endSegmentMemorySegment.size());
       processBuffer(endOfSegmentBuffer, subPartitionId);
+      // drain the bufferPacker to ensure that the EndOfSegment event is not 
bundled with the data
+      // buffer
+      bufferPacker.drain();
     } catch (Exception e) {
       ExceptionUtils.rethrow(e, "Failed to append end of segment event.");
     }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
index b1372037d..46577e049 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
@@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
-import java.util.stream.Collectors;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
@@ -186,13 +185,17 @@ public class MapPartitionData implements 
MemoryManager.ReadBufferTargetChangeLis
     }
 
     try {
-      PriorityQueue<MapPartitionDataReader> sortedReaders =
-          new PriorityQueue<>(
-              readers.values().stream()
-                  .filter(MapPartitionDataReader::shouldReadData)
-                  .collect(Collectors.toList()));
-      for (MapPartitionDataReader reader : sortedReaders) {
+      // Find all readers that can read data.
+      // Avoid using Java Stream's collect() operation in this case, as the 
internal array used
+      // by Stream.collect() may be resized and copied multiple times if the 
exact size of the
+      // final result is not known in advance
+      PriorityQueue<MapPartitionDataReader> sortedReaders = new 
PriorityQueue<>();
+      for (MapPartitionDataReader reader : readers.values()) {
+        if (!reader.shouldReadData()) {
+          continue;
+        }
         openReader(reader);
+        sortedReaders.add(reader);
       }
       while (bufferQueue.bufferAvailable() && !sortedReaders.isEmpty()) {
         BufferRecycler bufferRecycler = new 
BufferRecycler(MapPartitionData.this::recycle);
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
index 96d723bde..8f8a2c353 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
@@ -96,6 +96,10 @@ public class MapPartitionDataReader implements 
Comparable<MapPartitionDataReader
   protected boolean errorNotified;
 
   private FileChannel dataFileChannel;
+
+  // The size of the data file, it is initialized in the open method and 
remains unchanged
+  // afterward.
+  private long dataFileChannelSize;
   private FileChannel indexFileChannel;
 
   private Channel associatedChannel;
@@ -132,6 +136,7 @@ public class MapPartitionDataReader implements 
Comparable<MapPartitionDataReader
       throws IOException {
     if (!isOpen) {
       this.dataFileChannel = dataFileChannel;
+      this.dataFileChannelSize = dataFileChannel.size();
       this.indexFileChannel = indexFileChannel;
       // index is (offset,length)
       long indexRegionSize = mapFileMeta.getNumSubpartitions() * (long) 
INDEX_ENTRY_SIZE;
@@ -340,13 +345,13 @@ public class MapPartitionDataReader implements 
Comparable<MapPartitionDataReader
       logger.debug(
           "readBuffer updateConsumingOffset, {},  {}, {}, {}",
           streamId,
-          dataFileChannel.size(),
+          dataFileChannelSize,
           dataConsumingOffset,
           currentPartitionRemainingBytes);
 
       // if these checks fail, the partition file must be corrupted
       if (dataConsumingOffset < 0
-          || dataConsumingOffset + currentPartitionRemainingBytes > 
dataFileChannel.size()
+          || dataConsumingOffset + currentPartitionRemainingBytes > 
dataFileChannelSize
           || currentPartitionRemainingBytes < 0) {
         throw new FileCorruptedException("File " + fileInfo.getFilePath() + " 
is corrupted");
       }
@@ -383,7 +388,7 @@ public class MapPartitionDataReader implements 
Comparable<MapPartitionDataReader
         logger.debug(
             "readBuffer end, {},  {}, {}, {}",
             streamId,
-            dataFileChannel.size(),
+            dataFileChannelSize,
             dataConsumingOffset,
             currentPartitionRemainingBytes);
         int prevDataRegion = currentDataRegion;
@@ -394,7 +399,7 @@ public class MapPartitionDataReader implements 
Comparable<MapPartitionDataReader
       logger.debug(
           "readBuffer run: {}, {}, {}, {}",
           streamId,
-          dataFileChannel.size(),
+          dataFileChannelSize,
           dataConsumingOffset,
           currentPartitionRemainingBytes);
       return true;

Reply via email to