This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 8bf1a0591 [CELEBORN-1217] Improve exception message of loadFileGroup
for ShuffleClientImpl
8bf1a0591 is described below
commit 8bf1a059104824ced8151668895f6e69406b3682
Author: SteNicholas <[email protected]>
AuthorDate: Thu Jan 11 11:04:53 2024 +0800
[CELEBORN-1217] Improve exception message of loadFileGroup for
ShuffleClientImpl
### What changes were proposed in this pull request?
Improve exception message of `loadFileGroup` for `ShuffleClientImpl`.
### Why are the changes needed?
The exception message of `ShuffleClientImpl#loadFileGroup` that is
`org.apache.celeborn.common.exception.CelebornIOException: Shuffle data lost
for shuffle %s partitionId %s!` is confusing to users, which does not only mean
shuffle data lost but also other exception situation like stage end time out
etc. It's recommended to improve exception message of `loadFileGroup` for
`ShuffleClientImpl`.
```
Caused by: org.apache.kyuubi.jdbc.hive.KyuubiSQLException:
org.apache.kyuubi.KyuubiSQLException: org.apache.kyuubi.KyuubiSQLException:
Error operating ExecuteStatement: org.apache.spark.SparkException: Job aborted
due to stage failure: Task 15 in stage 60.0 failed 4 times, most recent
failure: Lost task 15.3 in stage 60.0 (TID 170802) (xxx executor 60):
org.apache.celeborn.common.exception.CelebornIOException: Shuffle data lost for
shuffle 1 partitionId 15!
at
org.apache.celeborn.client.ShuffleClientImpl.loadFileGroup(ShuffleClientImpl.java:1591)
at
org.apache.celeborn.client.ShuffleClientImpl.readPartition(ShuffleClientImpl.java:1600)
at
org.apache.spark.shuffle.celeborn.CelebornShuffleReader.$anonfun$read$1(CelebornShuffleReader.scala:88)
at
org.apache.spark.shuffle.celeborn.CelebornShuffleReader.$anonfun$read$1$adapted(CelebornShuffleReader.scala:80)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage37.sort_addToSorter_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage37.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753)
at
org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
at
org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoinExec.scala:822)
at
org.apache.spark.sql.execution.joins.SortMergeJoinScanner.<init>(SortMergeJoinExec.scala:686)
at
org.apache.spark.sql.execution.joins.SortMergeJoinExec.$anonfun$doExecute$1(SortMergeJoinExec.scala:185)
at
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:398)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:362)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:398)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:362)
at
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:398)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:362)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:398)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:362)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:398)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:362)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:91)
at org.apache.spark.scheduler.Task.run(Task.scala:143)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:591)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:596)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal test.
Closes #2219 from SteNicholas/CELEBORN-1217.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../flink/readclient/FlinkShuffleClientImpl.java | 19 ++--
.../apache/celeborn/client/ShuffleClientImpl.java | 110 ++++++++++++---------
2 files changed, 77 insertions(+), 52 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
index 5e2974547..6ced153c4 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import scala.Tuple2;
import scala.reflect.ClassTag$;
import com.google.common.annotations.VisibleForTesting;
@@ -145,7 +146,7 @@ public class FlinkShuffleClientImpl extends
ShuffleClientImpl {
int shuffleId, int partitionId, int subPartitionIndexStart, int
subPartitionIndexEnd)
throws IOException {
String shuffleKey = Utils.makeShuffleKey(appUniqueId, shuffleId);
- ReduceFileGroups fileGroups = loadFileGroup(shuffleId, partitionId);
+ ReduceFileGroups fileGroups = updateFileGroup(shuffleId, partitionId);
if (fileGroups.partitionGroups.size() == 0
|| !fileGroups.partitionGroups.containsKey(partitionId)) {
logger.error("Shuffle data is empty for shuffle {} partitionId {}.",
shuffleId, partitionId);
@@ -170,7 +171,8 @@ public class FlinkShuffleClientImpl extends
ShuffleClientImpl {
}
@Override
- protected ReduceFileGroups updateFileGroup(int shuffleId, int partitionId)
throws IOException {
+ protected ReduceFileGroups updateFileGroup(int shuffleId, int partitionId)
+ throws CelebornIOException {
ReduceFileGroups reduceFileGroups =
reduceFileGroupsMap.computeIfAbsent(shuffleId, (id) -> new
ReduceFileGroups());
if (reduceFileGroups.partitionIds != null
@@ -186,14 +188,15 @@ public class FlinkShuffleClientImpl extends
ShuffleClientImpl {
Utils.makeReducerKey(shuffleId, partitionId));
} else {
// refresh file groups
- ReduceFileGroups newGroups = loadFileGroupInternal(shuffleId);
+ Tuple2<ReduceFileGroups, String> fileGroups =
loadFileGroupInternal(shuffleId);
+ ReduceFileGroups newGroups = fileGroups._1;
if (newGroups == null) {
- throw new IOException(
- "Load file group from lifecycle manager failed: "
- + Utils.makeReducerKey(shuffleId, partitionId));
+ throw new CelebornIOException(
+ loadFileGroupException(shuffleId, partitionId, fileGroups._2));
} else if (!newGroups.partitionIds.contains(partitionId)) {
- throw new IOException(
- "shuffle data lost for partition: " +
Utils.makeReducerKey(shuffleId, partitionId));
+ throw new CelebornIOException(
+ String.format(
+ "Shuffle data lost for shuffle %d partition %d.",
shuffleId, partitionId));
}
reduceFileGroups.update(newGroups);
}
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index a7b4e7083..b995bc13a 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -25,11 +25,13 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import scala.Tuple2;
import scala.reflect.ClassTag$;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1567,64 +1569,84 @@ public class ShuffleClientImpl extends ShuffleClient {
return true;
}
- protected ReduceFileGroups loadFileGroupInternal(int shuffleId) {
+ protected Tuple2<ReduceFileGroups, String> loadFileGroupInternal(int
shuffleId) {
{
long getReducerFileGroupStartTime = System.nanoTime();
+ String exceptionMsg = null;
try {
if (lifecycleManagerRef == null) {
- logger.warn("Driver endpoint is null!");
- return null;
- }
+ exceptionMsg = "Driver endpoint is null!";
+ logger.warn(exceptionMsg);
+ } else {
+ GetReducerFileGroup getReducerFileGroup = new
GetReducerFileGroup(shuffleId);
- GetReducerFileGroup getReducerFileGroup = new
GetReducerFileGroup(shuffleId);
+ GetReducerFileGroupResponse response =
+ lifecycleManagerRef.askSync(
+ getReducerFileGroup,
+ conf.clientRpcGetReducerFileGroupRpcAskTimeout(),
+ ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class));
- GetReducerFileGroupResponse response =
- lifecycleManagerRef.askSync(
- getReducerFileGroup,
- conf.clientRpcGetReducerFileGroupRpcAskTimeout(),
- ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class));
-
- switch (response.status()) {
- case SUCCESS:
- logger.info(
- "Shuffle {} request reducer file group success using {} ms,
result partition size {}.",
- shuffleId,
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
getReducerFileGroupStartTime),
- response.fileGroup().size());
- return new ReduceFileGroups(
- response.fileGroup(), response.attempts(),
response.partitionIds());
- case STAGE_END_TIME_OUT:
- case SHUFFLE_DATA_LOST:
- logger.warn(
- "Request {} return {} for {}.", getReducerFileGroup,
response.status(), shuffleId);
- return null;
- case SHUFFLE_NOT_REGISTERED:
- logger.warn(
- "Request {} return {} for {}.", getReducerFileGroup,
response.status(), shuffleId);
- // return empty result
- return new ReduceFileGroups(
- response.fileGroup(), response.attempts(),
response.partitionIds());
+ switch (response.status()) {
+ case SUCCESS:
+ logger.info(
+ "Shuffle {} request reducer file group success using {} ms,
result partition size {}.",
+ shuffleId,
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
getReducerFileGroupStartTime),
+ response.fileGroup().size());
+ return Tuple2.apply(
+ new ReduceFileGroups(
+ response.fileGroup(), response.attempts(),
response.partitionIds()),
+ null);
+ case SHUFFLE_NOT_REGISTERED:
+ logger.warn(
+ "Request {} return {} for {}.",
+ getReducerFileGroup,
+ response.status(),
+ shuffleId);
+ // return empty result
+ return Tuple2.apply(
+ new ReduceFileGroups(
+ response.fileGroup(), response.attempts(),
response.partitionIds()),
+ null);
+ case STAGE_END_TIME_OUT:
+ case SHUFFLE_DATA_LOST:
+ exceptionMsg =
+ String.format(
+ "Request %s return %s for %s.",
+ getReducerFileGroup, response.status(), shuffleId);
+ logger.warn(exceptionMsg);
+ }
}
} catch (Exception e) {
logger.error("Exception raised while call GetReducerFileGroup for
{}.", shuffleId, e);
+ exceptionMsg = e.getMessage();
}
- return null;
+ return Tuple2.apply(null, exceptionMsg);
}
}
- protected ReduceFileGroups updateFileGroup(int shuffleId, int partitionId)
throws IOException {
- return reduceFileGroupsMap.computeIfAbsent(shuffleId, (id) ->
loadFileGroupInternal(shuffleId));
+ protected ReduceFileGroups updateFileGroup(int shuffleId, int partitionId)
+ throws CelebornIOException {
+ if (reduceFileGroupsMap.containsKey(shuffleId)) {
+ return reduceFileGroupsMap.get(shuffleId);
+ } else {
+ Tuple2<ReduceFileGroups, String> fileGroups =
loadFileGroupInternal(shuffleId);
+ ReduceFileGroups newGroups = fileGroups._1;
+ if (newGroups == null) {
+ throw new CelebornIOException(
+ loadFileGroupException(shuffleId, partitionId, fileGroups._2));
+ }
+ reduceFileGroupsMap.put(shuffleId, newGroups);
+ return newGroups;
+ }
}
- protected ReduceFileGroups loadFileGroup(int shuffleId, int partitionId)
throws IOException {
- ReduceFileGroups reduceFileGroups = updateFileGroup(shuffleId,
partitionId);
- if (reduceFileGroups == null) {
- String msg =
- "Shuffle data lost for shuffle " + shuffleId + " partitionId " +
partitionId + "!";
- logger.error(msg);
- throw new CelebornIOException(msg);
- }
- return reduceFileGroups;
+ protected String loadFileGroupException(int shuffleId, int partitionId,
String exceptionMsg) {
+ return String.format(
+ "Failed to load file group of shuffle %d partition %d! %s",
+ shuffleId,
+ partitionId,
+ StringUtils.isEmpty(exceptionMsg) ? StringUtils.EMPTY : exceptionMsg);
}
@Override
@@ -1640,7 +1662,7 @@ public class ShuffleClientImpl extends ShuffleClient {
logger.warn("Shuffle data is empty for shuffle {}:
UNKNOWN_APP_SHUFFLE_ID.", shuffleId);
return CelebornInputStream.empty();
}
- ReduceFileGroups fileGroups = loadFileGroup(shuffleId, partitionId);
+ ReduceFileGroups fileGroups = updateFileGroup(shuffleId, partitionId);
if (fileGroups.partitionGroups.isEmpty()
|| !fileGroups.partitionGroups.containsKey(partitionId)) {