This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.4 by this push:
     new c6af7ccbf [CELEBORN-1217] Improve exception message of loadFileGroup 
for ShuffleClientImpl
c6af7ccbf is described below

commit c6af7ccbfc8130e1a7ed87c4a638c98ded5b9273
Author: SteNicholas <programg...@163.com>
AuthorDate: Thu Jan 11 11:04:53 2024 +0800

    [CELEBORN-1217] Improve exception message of loadFileGroup for 
ShuffleClientImpl
    
    ### What changes were proposed in this pull request?
    
    Improve exception message of `loadFileGroup` for `ShuffleClientImpl`.
    
    ### Why are the changes needed?
    
    The exception message of `ShuffleClientImpl#loadFileGroup` that is 
`org.apache.celeborn.common.exception.CelebornIOException: Shuffle data lost 
for shuffle %s partitionId %s!` is confusing to users, which does not only mean 
shuffle data lost but also other exception situation like stage end time out 
etc. It's recommended to improve exception message of `loadFileGroup` for 
`ShuffleClientImpl`.
    
    ```
    Caused by: org.apache.kyuubi.jdbc.hive.KyuubiSQLException: 
org.apache.kyuubi.KyuubiSQLException: org.apache.kyuubi.KyuubiSQLException: 
Error operating ExecuteStatement: org.apache.spark.SparkException: Job aborted 
due to stage failure: Task 15 in stage 60.0 failed 4 times, most recent 
failure: Lost task 15.3 in stage 60.0 (TID 170802) (xxx executor 60): 
org.apache.celeborn.common.exception.CelebornIOException: Shuffle data lost for 
shuffle 1 partitionId 15!
            at 
org.apache.celeborn.client.ShuffleClientImpl.loadFileGroup(ShuffleClientImpl.java:1591)
            at 
org.apache.celeborn.client.ShuffleClientImpl.readPartition(ShuffleClientImpl.java:1600)
            at 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader.$anonfun$read$1(CelebornShuffleReader.scala:88)
            at 
org.apache.spark.shuffle.celeborn.CelebornShuffleReader.$anonfun$read$1$adapted(CelebornShuffleReader.scala:80)
            at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
            at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
            at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
            at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
            at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
            at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
            at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage37.sort_addToSorter_0$(Unknown
 Source)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage37.processNext(Unknown
 Source)
            at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
            at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753)
            at 
org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
            at 
org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoinExec.scala:822)
            at 
org.apache.spark.sql.execution.joins.SortMergeJoinScanner.<init>(SortMergeJoinExec.scala:686)
            at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec.$anonfun$doExecute$1(SortMergeJoinExec.scala:185)
            at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:398)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:362)
            at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:398)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:362)
            at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:398)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:362)
            at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:398)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:362)
            at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:398)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:362)
            at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:91)
            at org.apache.spark.scheduler.Task.run(Task.scala:143)
            at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:591)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545)
            at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:596)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Internal test.
    
    Closes #2219 from SteNicholas/CELEBORN-1217.
    
    Authored-by: SteNicholas <programg...@163.com>
    Signed-off-by: mingji <fengmingxiao....@alibaba-inc.com>
    (cherry picked from commit 8bf1a059104824ced8151668895f6e69406b3682)
    Signed-off-by: mingji <fengmingxiao....@alibaba-inc.com>
---
 .../flink/readclient/FlinkShuffleClientImpl.java   |  19 ++--
 .../apache/celeborn/client/ShuffleClientImpl.java  | 110 ++++++++++++---------
 2 files changed, 77 insertions(+), 52 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
index 5e2974547..6ced153c4 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import scala.Tuple2;
 import scala.reflect.ClassTag$;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -145,7 +146,7 @@ public class FlinkShuffleClientImpl extends 
ShuffleClientImpl {
       int shuffleId, int partitionId, int subPartitionIndexStart, int 
subPartitionIndexEnd)
       throws IOException {
     String shuffleKey = Utils.makeShuffleKey(appUniqueId, shuffleId);
-    ReduceFileGroups fileGroups = loadFileGroup(shuffleId, partitionId);
+    ReduceFileGroups fileGroups = updateFileGroup(shuffleId, partitionId);
     if (fileGroups.partitionGroups.size() == 0
         || !fileGroups.partitionGroups.containsKey(partitionId)) {
       logger.error("Shuffle data is empty for shuffle {} partitionId {}.", 
shuffleId, partitionId);
@@ -170,7 +171,8 @@ public class FlinkShuffleClientImpl extends 
ShuffleClientImpl {
   }
 
   @Override
-  protected ReduceFileGroups updateFileGroup(int shuffleId, int partitionId) 
throws IOException {
+  protected ReduceFileGroups updateFileGroup(int shuffleId, int partitionId)
+      throws CelebornIOException {
     ReduceFileGroups reduceFileGroups =
         reduceFileGroupsMap.computeIfAbsent(shuffleId, (id) -> new 
ReduceFileGroups());
     if (reduceFileGroups.partitionIds != null
@@ -186,14 +188,15 @@ public class FlinkShuffleClientImpl extends 
ShuffleClientImpl {
               Utils.makeReducerKey(shuffleId, partitionId));
         } else {
           // refresh file groups
-          ReduceFileGroups newGroups = loadFileGroupInternal(shuffleId);
+          Tuple2<ReduceFileGroups, String> fileGroups = 
loadFileGroupInternal(shuffleId);
+          ReduceFileGroups newGroups = fileGroups._1;
           if (newGroups == null) {
-            throw new IOException(
-                "Load file group from lifecycle manager failed: "
-                    + Utils.makeReducerKey(shuffleId, partitionId));
+            throw new CelebornIOException(
+                loadFileGroupException(shuffleId, partitionId, fileGroups._2));
           } else if (!newGroups.partitionIds.contains(partitionId)) {
-            throw new IOException(
-                "shuffle data lost for partition: " + 
Utils.makeReducerKey(shuffleId, partitionId));
+            throw new CelebornIOException(
+                String.format(
+                    "Shuffle data lost for shuffle %d partition %d.", 
shuffleId, partitionId));
           }
           reduceFileGroups.update(newGroups);
         }
diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index a7b4e7083..b995bc13a 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -25,11 +25,13 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import scala.Tuple2;
 import scala.reflect.ClassTag$;
 
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.CompositeByteBuf;
 import io.netty.buffer.Unpooled;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1567,64 +1569,84 @@ public class ShuffleClientImpl extends ShuffleClient {
     return true;
   }
 
-  protected ReduceFileGroups loadFileGroupInternal(int shuffleId) {
+  protected Tuple2<ReduceFileGroups, String> loadFileGroupInternal(int 
shuffleId) {
     {
       long getReducerFileGroupStartTime = System.nanoTime();
+      String exceptionMsg = null;
       try {
         if (lifecycleManagerRef == null) {
-          logger.warn("Driver endpoint is null!");
-          return null;
-        }
+          exceptionMsg = "Driver endpoint is null!";
+          logger.warn(exceptionMsg);
+        } else {
+          GetReducerFileGroup getReducerFileGroup = new 
GetReducerFileGroup(shuffleId);
 
-        GetReducerFileGroup getReducerFileGroup = new 
GetReducerFileGroup(shuffleId);
+          GetReducerFileGroupResponse response =
+              lifecycleManagerRef.askSync(
+                  getReducerFileGroup,
+                  conf.clientRpcGetReducerFileGroupRpcAskTimeout(),
+                  ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class));
 
-        GetReducerFileGroupResponse response =
-            lifecycleManagerRef.askSync(
-                getReducerFileGroup,
-                conf.clientRpcGetReducerFileGroupRpcAskTimeout(),
-                ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class));
-
-        switch (response.status()) {
-          case SUCCESS:
-            logger.info(
-                "Shuffle {} request reducer file group success using {} ms, 
result partition size {}.",
-                shuffleId,
-                TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
getReducerFileGroupStartTime),
-                response.fileGroup().size());
-            return new ReduceFileGroups(
-                response.fileGroup(), response.attempts(), 
response.partitionIds());
-          case STAGE_END_TIME_OUT:
-          case SHUFFLE_DATA_LOST:
-            logger.warn(
-                "Request {} return {} for {}.", getReducerFileGroup, 
response.status(), shuffleId);
-            return null;
-          case SHUFFLE_NOT_REGISTERED:
-            logger.warn(
-                "Request {} return {} for {}.", getReducerFileGroup, 
response.status(), shuffleId);
-            // return empty result
-            return new ReduceFileGroups(
-                response.fileGroup(), response.attempts(), 
response.partitionIds());
+          switch (response.status()) {
+            case SUCCESS:
+              logger.info(
+                  "Shuffle {} request reducer file group success using {} ms, 
result partition size {}.",
+                  shuffleId,
+                  TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
getReducerFileGroupStartTime),
+                  response.fileGroup().size());
+              return Tuple2.apply(
+                  new ReduceFileGroups(
+                      response.fileGroup(), response.attempts(), 
response.partitionIds()),
+                  null);
+            case SHUFFLE_NOT_REGISTERED:
+              logger.warn(
+                  "Request {} return {} for {}.",
+                  getReducerFileGroup,
+                  response.status(),
+                  shuffleId);
+              // return empty result
+              return Tuple2.apply(
+                  new ReduceFileGroups(
+                      response.fileGroup(), response.attempts(), 
response.partitionIds()),
+                  null);
+            case STAGE_END_TIME_OUT:
+            case SHUFFLE_DATA_LOST:
+              exceptionMsg =
+                  String.format(
+                      "Request %s return %s for %s.",
+                      getReducerFileGroup, response.status(), shuffleId);
+              logger.warn(exceptionMsg);
+          }
         }
       } catch (Exception e) {
         logger.error("Exception raised while call GetReducerFileGroup for 
{}.", shuffleId, e);
+        exceptionMsg = e.getMessage();
       }
-      return null;
+      return Tuple2.apply(null, exceptionMsg);
     }
   }
 
-  protected ReduceFileGroups updateFileGroup(int shuffleId, int partitionId) 
throws IOException {
-    return reduceFileGroupsMap.computeIfAbsent(shuffleId, (id) -> 
loadFileGroupInternal(shuffleId));
+  protected ReduceFileGroups updateFileGroup(int shuffleId, int partitionId)
+      throws CelebornIOException {
+    if (reduceFileGroupsMap.containsKey(shuffleId)) {
+      return reduceFileGroupsMap.get(shuffleId);
+    } else {
+      Tuple2<ReduceFileGroups, String> fileGroups = 
loadFileGroupInternal(shuffleId);
+      ReduceFileGroups newGroups = fileGroups._1;
+      if (newGroups == null) {
+        throw new CelebornIOException(
+            loadFileGroupException(shuffleId, partitionId, fileGroups._2));
+      }
+      reduceFileGroupsMap.put(shuffleId, newGroups);
+      return newGroups;
+    }
   }
 
-  protected ReduceFileGroups loadFileGroup(int shuffleId, int partitionId) 
throws IOException {
-    ReduceFileGroups reduceFileGroups = updateFileGroup(shuffleId, 
partitionId);
-    if (reduceFileGroups == null) {
-      String msg =
-          "Shuffle data lost for shuffle " + shuffleId + " partitionId " + 
partitionId + "!";
-      logger.error(msg);
-      throw new CelebornIOException(msg);
-    }
-    return reduceFileGroups;
+  protected String loadFileGroupException(int shuffleId, int partitionId, 
String exceptionMsg) {
+    return String.format(
+        "Failed to load file group of shuffle %d partition %d! %s",
+        shuffleId,
+        partitionId,
+        StringUtils.isEmpty(exceptionMsg) ? StringUtils.EMPTY : exceptionMsg);
   }
 
   @Override
@@ -1640,7 +1662,7 @@ public class ShuffleClientImpl extends ShuffleClient {
       logger.warn("Shuffle data is empty for shuffle {}: 
UNKNOWN_APP_SHUFFLE_ID.", shuffleId);
       return CelebornInputStream.empty();
     }
-    ReduceFileGroups fileGroups = loadFileGroup(shuffleId, partitionId);
+    ReduceFileGroups fileGroups = updateFileGroup(shuffleId, partitionId);
 
     if (fileGroups.partitionGroups.isEmpty()
         || !fileGroups.partitionGroups.containsKey(partitionId)) {

Reply via email to