This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new af0ba1a5e [CELEBORN-2281] Improve error logging and null checks in
CreditStreamManager
af0ba1a5e is described below
commit af0ba1a5ec0e1faf3d4a0d189058c755aeb6b18c
Author: sychen <[email protected]>
AuthorDate: Wed Mar 18 15:55:32 2026 +0800
[CELEBORN-2281] Improve error logging and null checks in CreditStreamManager
### What changes were proposed in this pull request?
- Initialize `AtomicReference<IOException>` with proper syntax.
- Add exception to `logger.error` for better error context.
- Simplify and improve null checks and logging in `addCredit` and
`cleanResource` methods.
### Why are the changes needed?
nit.
### Does this PR resolve a correctness bug?
No.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GHA.
Closes #3626 from cxzl25/CELEBORN-2281.
Lead-authored-by: sychen <[email protected]>
Co-authored-by: cxzl25 <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../deploy/worker/storage/CreditStreamManager.java | 25 +++++++++-------------
1 file changed, 10 insertions(+), 15 deletions(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
index e8f408afe..1272776d3 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java
@@ -93,7 +93,7 @@ public class CreditStreamManager {
streamId,
fileInfo);
- AtomicReference<IOException> exception = new AtomicReference();
+ AtomicReference<IOException> exception = new AtomicReference<>();
MapPartitionData mapPartitionData =
activeMapPartitions.compute(
fileInfo,
@@ -166,7 +166,7 @@ public class CreditStreamManager {
mapPartitionData.addReaderCredit(numCredit, streamId);
}
} catch (Throwable e) {
- logger.error("streamId: {}, add credit end: {}", streamId, numCredit);
+ logger.error("streamId: {}, failed to add credit: {}", streamId,
numCredit, e);
}
}
@@ -185,23 +185,21 @@ public class CreditStreamManager {
logger.warn("Only non-null SegmentMapPartitionData is expected for
notifyRequiredSegment.");
}
} catch (Throwable e) {
- logger.error(
- String.format("Fail to notify segmentId %s for stream %s.",
requiredSegmentId, streamId),
- e);
+ logger.error("Failed to notify segmentId {} for stream {}.",
requiredSegmentId, streamId, e);
throw e;
}
}
public void addCredit(int numCredit, long streamId) {
- if (!streams.containsKey(streamId)) {
+ StreamState streamState = streams.get(streamId);
+ if (streamState == null) {
// In flink hybrid shuffle integration strategy, the stream may release
in worker before
// client receive bufferStreamEnd,
// and the client may send request with old streamId, so ignore
non-exist streams.
logger.warn("Ignore AddCredit from stream {}, numCredit {}.", streamId,
numCredit);
return;
}
- MapPartitionData mapPartitionData =
streams.get(streamId).getMapPartitionData();
- addCredit(mapPartitionData, numCredit, streamId);
+ addCredit(streamState.getMapPartitionData(), numCredit, streamId);
}
public void notifyRequiredSegment(int requiredSegmentId, long streamId, int
subPartitionId) {
@@ -278,8 +276,9 @@ public class CreditStreamManager {
public void cleanResource(Long streamId) {
logger.debug("received clean stream: {}", streamId);
- if (streams.containsKey(streamId)) {
- MapPartitionData mapPartitionData =
streams.get(streamId).getMapPartitionData();
+ StreamState streamState = streams.get(streamId);
+ if (streamState != null) {
+ MapPartitionData mapPartitionData = streamState.getMapPartitionData();
if (mapPartitionData != null) {
if (mapPartitionData.releaseReader(streamId)) {
streams.remove(streamId);
@@ -380,11 +379,7 @@ public class CreditStreamManager {
@Override
public String toString() {
- final StringBuilder sb = new StringBuilder("DelayedStreamId{");
- sb.append("createMillis=").append(createMillis);
- sb.append(", streamId=").append(streamId);
- sb.append('}');
- return sb.toString();
+ return "DelayedStreamId{" + "createMillis=" + createMillis + ",
streamId=" + streamId + '}';
}
}
}