This is an automated email from the ASF dual-hosted git repository.

ckj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 7cb0b000 [#750] feat: add RssFetchFailedException (#751)
7cb0b000 is described below

commit 7cb0b000cdf7260db3ac77f9d1742b8496c24cb2
Author: advancedxy <[email protected]>
AuthorDate: Wed Mar 22 11:13:53 2023 +0800

    [#750] feat: add RssFetchFailedException (#751)
    
    ### What changes were proposed in this pull request?
    1. add a new exception called RssFetchFailedException
    2. it's used in various instance creation
    
    ### Why are the changes needed?
    Fix: #750
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing UTs.
---
 .../org/apache/uniffle/client/impl/ShuffleReadClientImpl.java |  4 ++--
 .../apache/uniffle/client/impl/ShuffleWriteClientImpl.java    |  6 ++++--
 .../apache/uniffle/common/exception/NotRetryException.java    |  1 -
 .../org/apache/uniffle/common/exception/RssException.java     |  1 -
 .../{NotRetryException.java => RssFetchFailedException.java}  | 11 ++++++-----
 .../uniffle/client/impl/grpc/ShuffleServerGrpcClient.java     | 11 ++++++-----
 .../storage/handler/impl/ComposedClientReadHandler.java       |  7 ++++++-
 .../storage/handler/impl/LocalFileClientReadHandler.java      |  5 ++++-
 .../uniffle/storage/handler/impl/MemoryClientReadHandler.java |  6 ++++--
 9 files changed, 32 insertions(+), 20 deletions(-)

diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index a7ff8ef0..852a253f 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -36,7 +36,7 @@ import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShuffleServerInfo;
-import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.exception.RssFetchFailedException;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.common.util.IdHelper;
 import org.apache.uniffle.common.util.RssUtils;
@@ -201,7 +201,7 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
             clientReadHandler.updateConsumedBlockInfo(bs, true);
             continue;
           } else {
-            throw new RssException(errMsg);
+            throw new RssFetchFailedException(errMsg);
           }
         }
 
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 45d8ed74..05b3be68 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -85,6 +85,7 @@ import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.exception.RssFetchFailedException;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.ThreadUtils;
 
@@ -584,7 +585,7 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
       }
     }
     if (!isSuccessful) {
-      throw new RssException("Get shuffle result is failed for appId["
+      throw new RssFetchFailedException("Get shuffle result is failed for 
appId["
           + appId + "], shuffleId[" + shuffleId + "]");
     }
     return blockIdBitmap;
@@ -625,7 +626,8 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
     }
     boolean isSuccessful = partitionReadSuccess.entrySet().stream().allMatch(x 
-> x.getValue() >= replicaRead);
     if (!isSuccessful) {
-      throw new RssException("Get shuffle result is failed for appId[" + appId 
+ "], shuffleId[" + shuffleId + "]");
+      throw new RssFetchFailedException(
+          "Get shuffle result is failed for appId[" + appId + "], shuffleId[" 
+ shuffleId + "]");
     }
     return blockIdBitmap;
   }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/exception/NotRetryException.java
 
b/common/src/main/java/org/apache/uniffle/common/exception/NotRetryException.java
index 49eaee64..5b230f6d 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/exception/NotRetryException.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/exception/NotRetryException.java
@@ -15,7 +15,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.uniffle.common.exception;
 
 public class NotRetryException extends RssException {
diff --git 
a/common/src/main/java/org/apache/uniffle/common/exception/RssException.java 
b/common/src/main/java/org/apache/uniffle/common/exception/RssException.java
index 93a73690..c48aac2d 100644
--- a/common/src/main/java/org/apache/uniffle/common/exception/RssException.java
+++ b/common/src/main/java/org/apache/uniffle/common/exception/RssException.java
@@ -15,7 +15,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.uniffle.common.exception;
 
 public class RssException extends RuntimeException {
diff --git 
a/common/src/main/java/org/apache/uniffle/common/exception/NotRetryException.java
 
b/common/src/main/java/org/apache/uniffle/common/exception/RssFetchFailedException.java
similarity index 77%
copy from 
common/src/main/java/org/apache/uniffle/common/exception/NotRetryException.java
copy to 
common/src/main/java/org/apache/uniffle/common/exception/RssFetchFailedException.java
index 49eaee64..7c696c55 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/exception/NotRetryException.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/exception/RssFetchFailedException.java
@@ -15,16 +15,17 @@
  * limitations under the License.
  */
 
-
 package org.apache.uniffle.common.exception;
 
-public class NotRetryException extends RssException {
-
-  public NotRetryException(String message) {
+/**
+ * Dedicated exception for rss client's shuffle failed related exception.
+ */
+public class RssFetchFailedException extends RssException {
+  public RssFetchFailedException(String message) {
     super(message);
   }
 
-  public NotRetryException(String message, Throwable e) {
+  public RssFetchFailedException(String message, Throwable e) {
     super(message, e);
   }
 }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index c9b6016b..d9208f02 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -62,6 +62,7 @@ import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.exception.NotRetryException;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.exception.RssFetchFailedException;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.RetryUtils;
 import org.apache.uniffle.common.util.RssUtils;
@@ -522,7 +523,7 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
             + " for [appId=" + request.getAppId() + ", shuffleId=" + 
request.getShuffleId()
             + ", errorMsg:" + rpcResponse.getRetMsg();
         LOG.error(msg);
-        throw new RssException(msg);
+        throw new RssFetchFailedException(msg);
     }
 
     return response;
@@ -554,7 +555,7 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
             + " for [appId=" + request.getAppId() + ", shuffleId=" + 
request.getShuffleId()
             + ", errorMsg:" + rpcResponse.getRetMsg();
         LOG.error(msg);
-        throw new RssException(msg);
+        throw new RssFetchFailedException(msg);
     }
 
     return response;
@@ -593,7 +594,7 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
         String msg = "Can't get shuffle data from " + host + ":" + port
             + " for " + requestInfo + ", errorMsg:" + rpcResponse.getRetMsg();
         LOG.error(msg);
-        throw new RssException(msg);
+        throw new RssFetchFailedException(msg);
     }
     return response;
   }
@@ -628,7 +629,7 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
         String msg = "Can't get shuffle index from " + host + ":" + port
             + " for " + requestInfo + ", errorMsg:" + rpcResponse.getRetMsg();
         LOG.error(msg);
-        throw new RssException(msg);
+        throw new RssFetchFailedException(msg);
     }
     return response;
   }
@@ -677,7 +678,7 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
         String msg = "Can't get shuffle in memory data from " + host + ":" + 
port
             + " for " + requestInfo + ", errorMsg:" + rpcResponse.getRetMsg();
         LOG.error(msg);
-        throw new RssException(msg);
+        throw new RssFetchFailedException(msg);
     }
     return response;
   }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
index 1f00edbb..91f34951 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
@@ -33,6 +33,7 @@ import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.exception.RssFetchFailedException;
 import org.apache.uniffle.storage.handler.ClientReadHandlerMetric;
 import org.apache.uniffle.storage.handler.api.ClientReadHandler;
 
@@ -98,8 +99,12 @@ public class ComposedClientReadHandler extends 
AbstractClientReadHandler {
     ShuffleDataResult shuffleDataResult;
     try {
       shuffleDataResult = handler.readShuffleData();
+    } catch (RssFetchFailedException e) {
+      Throwable cause = e.getCause();
+      String message = "Failed to read shuffle data from " + 
currentTier.name() + "handler, error: " + e.getMessage();
+      throw new RssFetchFailedException(message, cause);
     } catch (Exception e) {
-      throw new RssException("Failed to read shuffle data from " + 
currentTier.name() + " handler", e);
+      throw new RssFetchFailedException("Failed to read shuffle data from " + 
currentTier.name() + " handler", e);
     }
     // when is no data for current handler, and the upmostLevel is not reached,
     // then try next one if there has
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
index a2d96940..61ecc3ce 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
@@ -30,6 +30,7 @@ import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShuffleDataSegment;
 import org.apache.uniffle.common.ShuffleIndexResult;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.exception.RssFetchFailedException;
 
 public class LocalFileClientReadHandler extends DataSkippableReadHandler {
   private static final Logger LOG = LoggerFactory.getLogger(
@@ -88,8 +89,10 @@ public class LocalFileClientReadHandler extends 
DataSkippableReadHandler {
         appId, shuffleId, partitionId, partitionNumPerRange, partitionNum);
     try {
       shuffleIndexResult = 
shuffleServerClient.getShuffleIndex(request).getShuffleIndexResult();
+    } catch (RssFetchFailedException e) {
+      throw e;
     } catch (Exception e) {
-      throw new RssException("Failed to read shuffle index for appId[" + appId 
+ "], shuffleId["
+      throw new RssFetchFailedException("Failed to read shuffle index for 
appId[" + appId + "], shuffleId["
         + shuffleId + "], partitionId[" + partitionId + "]", e);
     }
     return shuffleIndexResult;
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
index 49eaa0b0..b154261e 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
@@ -28,7 +28,7 @@ import 
org.apache.uniffle.client.request.RssGetInMemoryShuffleDataRequest;
 import org.apache.uniffle.client.response.RssGetInMemoryShuffleDataResponse;
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.exception.RssFetchFailedException;
 import org.apache.uniffle.common.util.Constants;
 
 
@@ -71,9 +71,11 @@ public class MemoryClientReadHandler extends 
AbstractClientReadHandler {
       RssGetInMemoryShuffleDataResponse response =
           shuffleServerClient.getInMemoryShuffleData(request);
       result = new ShuffleDataResult(response.getData(), 
response.getBufferSegments());
+    } catch (RssFetchFailedException e) {
+      throw e;
     } catch (Exception e) {
       // todo: fault tolerance solution should be added
-      throw new RssException("Failed to read in memory shuffle data with "
+      throw new RssFetchFailedException("Failed to read in memory shuffle data 
with "
           + shuffleServerClient.getClientInfo() + " due to " + e);
     }
 

Reply via email to