This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new a61d6a517 [CELEBORN-2064] Fix the issue where reading replica
partition that returns zero chunk causes tasks to hang
a61d6a517 is described below
commit a61d6a517f180d7c86d97012e0314ab58589821c
Author: xinyuwang1 <[email protected]>
AuthorDate: Fri Aug 1 13:47:07 2025 -0700
[CELEBORN-2064] Fix the issue where reading replica partition that returns
zero chunk causes tasks to hang
### What changes were proposed in this pull request?
Re-validate hasNextChunk within getNextChunk.
### Why are the changes needed?
Fix the issue where reading replica partition that returns zero chunk
causes tasks to hang
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
manual test
Closes #3364 from littlexyw/fix_get_next_chunk.
Authored-by: xinyuwang1 <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
.../org/apache/celeborn/client/read/CelebornInputStream.java | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index fb5b626f8..032448188 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -405,7 +405,7 @@ public abstract class CelebornInputStream extends
InputStream {
}
currentReader = createReaderWithRetry(currentLocation._1,
currentLocation._2);
fileIndex++;
- while (!currentReader.hasNext()) {
+ while (!currentReader.hasNext() || (fetchChunk && (currentChunk =
getNextChunk()) == null)) {
currentReader.close();
currentReader = null;
currentLocation = nextReadableLocation();
@@ -415,9 +415,6 @@ public abstract class CelebornInputStream extends
InputStream {
currentReader = createReaderWithRetry(currentLocation._1,
currentLocation._2);
fileIndex++;
}
- if (fetchChunk) {
- currentChunk = getNextChunk();
- }
}
private boolean isExcluded(PartitionLocation location) {
@@ -526,6 +523,9 @@ public abstract class CelebornInputStream extends
InputStream {
throw new CelebornIOException(
"Fetch data from excluded worker! " +
currentReader.getLocation());
}
+ if (!currentReader.hasNext()) {
+ return null;
+ }
return currentReader.next();
} catch (Exception e) {
shuffleClient.excludeFailedFetchLocation(
@@ -775,8 +775,7 @@ public abstract class CelebornInputStream extends
InputStream {
currentChunk.release();
}
currentChunk = null;
- if (currentReader.hasNext()) {
- currentChunk = getNextChunk();
+ if (currentReader.hasNext() && (currentChunk = getNextChunk()) != null) {
return true;
} else if (fileIndex < locations.size()) {
moveToNextReader(true);
@@ -806,6 +805,7 @@ public abstract class CelebornInputStream extends
InputStream {
if (firstChunk && currentReader != null) {
init();
currentChunk = getNextChunk();
+ while (currentChunk == null && moveToNextChunk()) ;
firstChunk = false;
}
if (currentChunk == null) {