This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 94c191702 [CELEBORN-2052] Fix unexpected warning logs in Flink caused
by duplicate BufferStreamEnd messages
94c191702 is described below
commit 94c191702ff41dbd6903cbd08dcb126d7ffeeb33
Author: codenohup <[email protected]>
AuthorDate: Mon Jul 7 17:56:58 2025 +0800
[CELEBORN-2052] Fix unexpected warning logs in Flink caused by duplicate
BufferStreamEnd messages
### What changes were proposed in this pull request?
In the Flink shuffle service plugin, if _RemoteBufferStreamReader_ receives
an _EndOfPartition_ event, it will close itself.
At the same time, when the Celeborn worker releases the corresponding
stream, it also sends a _BufferStreamEnd_ message to
_RemoteBufferStreamReader_, which leads the _ReadClientHandler_ to receive data
from an already-closed stream and consequently logs an unexpected warning.

### Why are the changes needed?
Incorrect logs can easily cause confusion for users.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Covered by existing test cases.
Closes #3357 from codenohup/fix-bufferstreamend.
Authored-by: codenohup <[email protected]>
Signed-off-by: mingji <[email protected]>
(cherry picked from commit cd5d9cd93d1bf1a9ac9250edf1bf62aee0eb0ae2)
Signed-off-by: mingji <[email protected]>
---
.../org/apache/celeborn/plugin/flink/network/ReadClientHandler.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/ReadClientHandler.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/ReadClientHandler.java
index 1ef2b7781..75e3bfe7b 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/ReadClientHandler.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/ReadClientHandler.java
@@ -64,11 +64,13 @@ public class ReadClientHandler extends BaseMessageHandler {
logger.debug("received streamId: {}, msg :{}", streamId, msg);
handler.accept(msg);
} else {
- if (msg != null && msg instanceof ReadData) {
+ if (msg instanceof ReadData) {
((ReadData) msg).getFlinkBuffer().release();
}
- logger.warn("Unexpected streamId received: {}", streamId);
+ if (!(msg instanceof BufferStreamEnd)) {
+ logger.warn("Unexpected streamId received: {}, msg: {}", streamId,
msg);
+ }
}
}