This is an automated email from the ASF dual-hosted git repository. ethanfeng pushed a commit to branch branch-0.4 in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.4 by this push: new c6af7ccbf [CELEBORN-1217] Improve exception message of loadFileGroup for ShuffleClientImpl c6af7ccbf is described below commit c6af7ccbfc8130e1a7ed87c4a638c98ded5b9273 Author: SteNicholas <programg...@163.com> AuthorDate: Thu Jan 11 11:04:53 2024 +0800 [CELEBORN-1217] Improve exception message of loadFileGroup for ShuffleClientImpl ### What changes were proposed in this pull request? Improve exception message of `loadFileGroup` for `ShuffleClientImpl`. ### Why are the changes needed? The exception message of `ShuffleClientImpl#loadFileGroup` that is `org.apache.celeborn.common.exception.CelebornIOException: Shuffle data lost for shuffle %s partitionId %s!` is confusing to users, which does not only mean shuffle data lost but also other exception situation like stage end time out etc. It's recommended to improve exception message of `loadFileGroup` for `ShuffleClientImpl`. ``` Caused by: org.apache.kyuubi.jdbc.hive.KyuubiSQLException: org.apache.kyuubi.KyuubiSQLException: org.apache.kyuubi.KyuubiSQLException: Error operating ExecuteStatement: org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 60.0 failed 4 times, most recent failure: Lost task 15.3 in stage 60.0 (TID 170802) (xxx executor 60): org.apache.celeborn.common.exception.CelebornIOException: Shuffle data lost for shuffle 1 partitionId 15! at org.apache.celeborn.client.ShuffleClientImpl.loadFileGroup(ShuffleClientImpl.java:1591) at org.apache.celeborn.client.ShuffleClientImpl.readPartition(ShuffleClientImpl.java:1600) at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.$anonfun$read$1(CelebornShuffleReader.scala:88) at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.$anonfun$read$1$adapted(CelebornShuffleReader.scala:80) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage37.sort_addToSorter_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage37.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753) at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83) at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoinExec.scala:822) at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.<init>(SortMergeJoinExec.scala:686) at org.apache.spark.sql.execution.joins.SortMergeJoinExec.$anonfun$doExecute$1(SortMergeJoinExec.scala:185) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:398) at org.apache.spark.rdd.RDD.iterator(RDD.scala:362) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:398) at org.apache.spark.rdd.RDD.iterator(RDD.scala:362) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:398) at org.apache.spark.rdd.RDD.iterator(RDD.scala:362) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:398) at org.apache.spark.rdd.RDD.iterator(RDD.scala:362) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:398) at org.apache.spark.rdd.RDD.iterator(RDD.scala:362) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:91) at org.apache.spark.scheduler.Task.run(Task.scala:143) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:591) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:596) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Internal test. Closes #2219 from SteNicholas/CELEBORN-1217. Authored-by: SteNicholas <programg...@163.com> Signed-off-by: mingji <fengmingxiao....@alibaba-inc.com> (cherry picked from commit 8bf1a059104824ced8151668895f6e69406b3682) Signed-off-by: mingji <fengmingxiao....@alibaba-inc.com> --- .../flink/readclient/FlinkShuffleClientImpl.java | 19 ++-- .../apache/celeborn/client/ShuffleClientImpl.java | 110 ++++++++++++--------- 2 files changed, 77 insertions(+), 52 deletions(-) diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java index 5e2974547..6ced153c4 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import scala.Tuple2; import scala.reflect.ClassTag$; import com.google.common.annotations.VisibleForTesting; @@ -145,7 +146,7 @@ public class FlinkShuffleClientImpl extends ShuffleClientImpl { int shuffleId, int partitionId, int subPartitionIndexStart, int subPartitionIndexEnd) throws IOException { String shuffleKey = Utils.makeShuffleKey(appUniqueId, shuffleId); - ReduceFileGroups fileGroups = loadFileGroup(shuffleId, partitionId); + ReduceFileGroups fileGroups = updateFileGroup(shuffleId, partitionId); if (fileGroups.partitionGroups.size() == 0 || !fileGroups.partitionGroups.containsKey(partitionId)) { logger.error("Shuffle data is empty for shuffle {} partitionId {}.", shuffleId, partitionId); @@ -170,7 +171,8 @@ public class FlinkShuffleClientImpl extends ShuffleClientImpl { } @Override - protected ReduceFileGroups updateFileGroup(int shuffleId, int partitionId) throws IOException { + protected ReduceFileGroups updateFileGroup(int shuffleId, int partitionId) + throws CelebornIOException { ReduceFileGroups reduceFileGroups = reduceFileGroupsMap.computeIfAbsent(shuffleId, (id) -> new ReduceFileGroups()); if (reduceFileGroups.partitionIds != null @@ -186,14 +188,15 @@ public class FlinkShuffleClientImpl extends ShuffleClientImpl { Utils.makeReducerKey(shuffleId, partitionId)); } else { // refresh file groups - ReduceFileGroups newGroups = loadFileGroupInternal(shuffleId); + Tuple2<ReduceFileGroups, String> fileGroups = loadFileGroupInternal(shuffleId); + ReduceFileGroups newGroups = fileGroups._1; if (newGroups == null) { - throw new IOException( - "Load file group from lifecycle manager failed: " - + Utils.makeReducerKey(shuffleId, partitionId)); + throw new CelebornIOException( + loadFileGroupException(shuffleId, partitionId, fileGroups._2)); } else if (!newGroups.partitionIds.contains(partitionId)) { - throw new IOException( - "shuffle data lost for partition: " + Utils.makeReducerKey(shuffleId, partitionId)); + throw new CelebornIOException( + String.format( + "Shuffle data lost for shuffle %d partition %d.", shuffleId, partitionId)); } reduceFileGroups.update(newGroups); } diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index a7b4e7083..b995bc13a 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -25,11 +25,13 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import scala.Tuple2; import scala.reflect.ClassTag$; import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1567,64 +1569,84 @@ public class ShuffleClientImpl extends ShuffleClient { return true; } - protected ReduceFileGroups loadFileGroupInternal(int shuffleId) { + protected Tuple2<ReduceFileGroups, String> loadFileGroupInternal(int shuffleId) { { long getReducerFileGroupStartTime = System.nanoTime(); + String exceptionMsg = null; try { if (lifecycleManagerRef == null) { - logger.warn("Driver endpoint is null!"); - return null; - } + exceptionMsg = "Driver endpoint is null!"; + logger.warn(exceptionMsg); + } else { + GetReducerFileGroup getReducerFileGroup = new GetReducerFileGroup(shuffleId); - GetReducerFileGroup getReducerFileGroup = new GetReducerFileGroup(shuffleId); + GetReducerFileGroupResponse response = + lifecycleManagerRef.askSync( + getReducerFileGroup, + conf.clientRpcGetReducerFileGroupRpcAskTimeout(), + ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class)); - GetReducerFileGroupResponse response = - lifecycleManagerRef.askSync( - getReducerFileGroup, - conf.clientRpcGetReducerFileGroupRpcAskTimeout(), - ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class)); - - switch (response.status()) { - case SUCCESS: - logger.info( - "Shuffle {} request reducer file group success using {} ms, result partition size {}.", - shuffleId, - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - getReducerFileGroupStartTime), - response.fileGroup().size()); - return new ReduceFileGroups( - response.fileGroup(), response.attempts(), response.partitionIds()); - case STAGE_END_TIME_OUT: - case SHUFFLE_DATA_LOST: - logger.warn( - "Request {} return {} for {}.", getReducerFileGroup, response.status(), shuffleId); - return null; - case SHUFFLE_NOT_REGISTERED: - logger.warn( - "Request {} return {} for {}.", getReducerFileGroup, response.status(), shuffleId); - // return empty result - return new ReduceFileGroups( - response.fileGroup(), response.attempts(), response.partitionIds()); + switch (response.status()) { + case SUCCESS: + logger.info( + "Shuffle {} request reducer file group success using {} ms, result partition size {}.", + shuffleId, + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - getReducerFileGroupStartTime), + response.fileGroup().size()); + return Tuple2.apply( + new ReduceFileGroups( + response.fileGroup(), response.attempts(), response.partitionIds()), + null); + case SHUFFLE_NOT_REGISTERED: + logger.warn( + "Request {} return {} for {}.", + getReducerFileGroup, + response.status(), + shuffleId); + // return empty result + return Tuple2.apply( + new ReduceFileGroups( + response.fileGroup(), response.attempts(), response.partitionIds()), + null); + case STAGE_END_TIME_OUT: + case SHUFFLE_DATA_LOST: + exceptionMsg = + String.format( + "Request %s return %s for %s.", + getReducerFileGroup, response.status(), shuffleId); + logger.warn(exceptionMsg); + } } } catch (Exception e) { logger.error("Exception raised while call GetReducerFileGroup for {}.", shuffleId, e); + exceptionMsg = e.getMessage(); } - return null; + return Tuple2.apply(null, exceptionMsg); } } - protected ReduceFileGroups updateFileGroup(int shuffleId, int partitionId) throws IOException { - return reduceFileGroupsMap.computeIfAbsent(shuffleId, (id) -> loadFileGroupInternal(shuffleId)); + protected ReduceFileGroups updateFileGroup(int shuffleId, int partitionId) + throws CelebornIOException { + if (reduceFileGroupsMap.containsKey(shuffleId)) { + return reduceFileGroupsMap.get(shuffleId); + } else { + Tuple2<ReduceFileGroups, String> fileGroups = loadFileGroupInternal(shuffleId); + ReduceFileGroups newGroups = fileGroups._1; + if (newGroups == null) { + throw new CelebornIOException( + loadFileGroupException(shuffleId, partitionId, fileGroups._2)); + } + reduceFileGroupsMap.put(shuffleId, newGroups); + return newGroups; + } } - protected ReduceFileGroups loadFileGroup(int shuffleId, int partitionId) throws IOException { - ReduceFileGroups reduceFileGroups = updateFileGroup(shuffleId, partitionId); - if (reduceFileGroups == null) { - String msg = - "Shuffle data lost for shuffle " + shuffleId + " partitionId " + partitionId + "!"; - logger.error(msg); - throw new CelebornIOException(msg); - } - return reduceFileGroups; + protected String loadFileGroupException(int shuffleId, int partitionId, String exceptionMsg) { + return String.format( + "Failed to load file group of shuffle %d partition %d! %s", + shuffleId, + partitionId, + StringUtils.isEmpty(exceptionMsg) ? StringUtils.EMPTY : exceptionMsg); } @Override @@ -1640,7 +1662,7 @@ public class ShuffleClientImpl extends ShuffleClient { logger.warn("Shuffle data is empty for shuffle {}: UNKNOWN_APP_SHUFFLE_ID.", shuffleId); return CelebornInputStream.empty(); } - ReduceFileGroups fileGroups = loadFileGroup(shuffleId, partitionId); + ReduceFileGroups fileGroups = updateFileGroup(shuffleId, partitionId); if (fileGroups.partitionGroups.isEmpty() || !fileGroups.partitionGroups.containsKey(partitionId)) {