This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new d10080238 [CELEBORN-2029][FLINK] Some minor optimizations in the Flink
integration
d10080238 is described below
commit d10080238902ae1ee9f2859021d5ac01567c59f5
Author: codenohup <[email protected]>
AuthorDate: Mon Jun 9 16:37:40 2025 +0800
[CELEBORN-2029][FLINK] Some minor optimizations in the Flink integration
### What changes were proposed in this pull request?
Some minor performance optimizations in the internal implementation
### Why are the changes needed?
During our use of Flink with Celeborn, we identified several minor
optimizations that can be made:
1. In the client side, the Flink-Celeborn client parses the
`pushDataTimeout` configuration too frequently, which is unnecessary and
cpu-intensive.
2. On the worker side, Celeborn needs to filter readers that are able for
reading. However, using Java Stream's collection operations is costly in terms
of performance.
3. Also on the worker side, Celeborn currently checks whether a reader can
continue reading by comparing the current read offset with the total file size.
This check involves retrieving the total file size, which is an expensive
operation. Since this value is constant, it should be cached in memory instead
of being fetched multiple times.
4. In the Flinkās hybrid shuffle integration, the `EndOfSegment` event
should not be bundled with data buffers. If it is, there is a risk of data
corruption or misinterpretation.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manual test.
Closes #3318 from codenohup/CELEBORN-2029.
Authored-by: codenohup <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
(cherry picked from commit feba7baec67f29330dc43a7d6382c3b9066194e8)
Signed-off-by: SteNicholas <[email protected]>
---
.../plugin/flink/client/FlinkShuffleClientImpl.java | 10 +++++-----
.../plugin/flink/tiered/CelebornTierProducerAgent.java | 3 +++
.../plugin/flink/tiered/CelebornTierProducerAgent.java | 3 +++
.../service/deploy/worker/storage/MapPartitionData.java | 17 ++++++++++-------
.../deploy/worker/storage/MapPartitionDataReader.java | 13 +++++++++----
5 files changed, 30 insertions(+), 16 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
index 9886620c3..2cb9df565 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
@@ -238,7 +238,7 @@ public class FlinkShuffleClientImpl extends
ShuffleClientImpl {
partitionLocations,
subPartitionIndexStart,
subPartitionIndexEnd,
- conf.pushDataTimeoutMs());
+ pushDataTimeout);
}
}
@@ -448,7 +448,7 @@ public class FlinkShuffleClientImpl extends
ShuffleClientImpl {
.build()
.toByteArray())
.toByteBuffer(),
- conf.pushDataTimeoutMs());
+ pushDataTimeout);
} catch (IOException e) {
// ioexeption revive
return revive(shuffleId, mapId, attemptId, location);
@@ -499,7 +499,7 @@ public class FlinkShuffleClientImpl extends
ShuffleClientImpl {
.build()
.toByteArray())
.toByteBuffer(),
- conf.pushDataTimeoutMs());
+ pushDataTimeout);
} catch (IOException e) {
// ioexeption revive
return revive(shuffleId, mapId, attemptId, location);
@@ -580,7 +580,7 @@ public class FlinkShuffleClientImpl extends
ShuffleClientImpl {
.build()
.toByteArray())
.toByteBuffer(),
- conf.pushDataTimeoutMs());
+ pushDataTimeout);
return null;
});
}
@@ -620,7 +620,7 @@ public class FlinkShuffleClientImpl extends
ShuffleClientImpl {
.build()
.toByteArray())
.toByteBuffer(),
- conf.pushDataTimeoutMs());
+ pushDataTimeout);
return null;
});
}
diff --git
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java
index 8cf9edebd..8ddb40f18 100644
---
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java
+++
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java
@@ -441,6 +441,9 @@ public class CelebornTierProducerAgent implements
TierProducerAgent {
END_OF_SEGMENT,
endSegmentMemorySegment.size());
processBuffer(endOfSegmentBuffer, subPartitionId);
+ // drain the bufferPacker to ensure that the EndOfSegment event is not
bundled with the data
+ // buffer
+ bufferPacker.drain();
} catch (Exception e) {
ExceptionUtils.rethrow(e, "Failed to append end of segment event.");
}
diff --git
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java
index 8cf9edebd..8ddb40f18 100644
---
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java
+++
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierProducerAgent.java
@@ -441,6 +441,9 @@ public class CelebornTierProducerAgent implements
TierProducerAgent {
END_OF_SEGMENT,
endSegmentMemorySegment.size());
processBuffer(endOfSegmentBuffer, subPartitionId);
+ // drain the bufferPacker to ensure that the EndOfSegment event is not
bundled with the data
+ // buffer
+ bufferPacker.drain();
} catch (Exception e) {
ExceptionUtils.rethrow(e, "Failed to append end of segment event.");
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
index b1372037d..46577e049 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
@@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
-import java.util.stream.Collectors;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
@@ -186,13 +185,17 @@ public class MapPartitionData implements
MemoryManager.ReadBufferTargetChangeLis
}
try {
- PriorityQueue<MapPartitionDataReader> sortedReaders =
- new PriorityQueue<>(
- readers.values().stream()
- .filter(MapPartitionDataReader::shouldReadData)
- .collect(Collectors.toList()));
- for (MapPartitionDataReader reader : sortedReaders) {
+ // Find all readers that can read data.
+ // Avoid using Java Stream's collect() operation in this case, as the
internal array used
+ // by Stream.collect() may be resized and copied multiple times if the
exact size of the
+ // final result is not known in advance
+ PriorityQueue<MapPartitionDataReader> sortedReaders = new
PriorityQueue<>();
+ for (MapPartitionDataReader reader : readers.values()) {
+ if (!reader.shouldReadData()) {
+ continue;
+ }
openReader(reader);
+ sortedReaders.add(reader);
}
while (bufferQueue.bufferAvailable() && !sortedReaders.isEmpty()) {
BufferRecycler bufferRecycler = new
BufferRecycler(MapPartitionData.this::recycle);
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
index 96d723bde..8f8a2c353 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
@@ -96,6 +96,10 @@ public class MapPartitionDataReader implements
Comparable<MapPartitionDataReader
protected boolean errorNotified;
private FileChannel dataFileChannel;
+
+ // The size of the data file, it is initialized in the open method and
remains unchanged
+ // afterward.
+ private long dataFileChannelSize;
private FileChannel indexFileChannel;
private Channel associatedChannel;
@@ -132,6 +136,7 @@ public class MapPartitionDataReader implements
Comparable<MapPartitionDataReader
throws IOException {
if (!isOpen) {
this.dataFileChannel = dataFileChannel;
+ this.dataFileChannelSize = dataFileChannel.size();
this.indexFileChannel = indexFileChannel;
// index is (offset,length)
long indexRegionSize = mapFileMeta.getNumSubpartitions() * (long)
INDEX_ENTRY_SIZE;
@@ -340,13 +345,13 @@ public class MapPartitionDataReader implements
Comparable<MapPartitionDataReader
logger.debug(
"readBuffer updateConsumingOffset, {}, {}, {}, {}",
streamId,
- dataFileChannel.size(),
+ dataFileChannelSize,
dataConsumingOffset,
currentPartitionRemainingBytes);
// if these checks fail, the partition file must be corrupted
if (dataConsumingOffset < 0
- || dataConsumingOffset + currentPartitionRemainingBytes >
dataFileChannel.size()
+ || dataConsumingOffset + currentPartitionRemainingBytes >
dataFileChannelSize
|| currentPartitionRemainingBytes < 0) {
throw new FileCorruptedException("File " + fileInfo.getFilePath() + "
is corrupted");
}
@@ -383,7 +388,7 @@ public class MapPartitionDataReader implements
Comparable<MapPartitionDataReader
logger.debug(
"readBuffer end, {}, {}, {}, {}",
streamId,
- dataFileChannel.size(),
+ dataFileChannelSize,
dataConsumingOffset,
currentPartitionRemainingBytes);
int prevDataRegion = currentDataRegion;
@@ -394,7 +399,7 @@ public class MapPartitionDataReader implements
Comparable<MapPartitionDataReader
logger.debug(
"readBuffer run: {}, {}, {}, {}",
streamId,
- dataFileChannel.size(),
+ dataFileChannelSize,
dataConsumingOffset,
currentPartitionRemainingBytes);
return true;