This is an automated email from the ASF dual-hosted git repository.
ckj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 7cb0b000 [#750] feat: add RssFetchFailedException (#751)
7cb0b000 is described below
commit 7cb0b000cdf7260db3ac77f9d1742b8496c24cb2
Author: advancedxy <[email protected]>
AuthorDate: Wed Mar 22 11:13:53 2023 +0800
[#750] feat: add RssFetchFailedException (#751)
### What changes were proposed in this pull request?
1. add a new exception called RssFetchFailedException
2. it's used in various instance creation
### Why are the changes needed?
Fix: #750
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
---
.../org/apache/uniffle/client/impl/ShuffleReadClientImpl.java | 4 ++--
.../apache/uniffle/client/impl/ShuffleWriteClientImpl.java | 6 ++++--
.../apache/uniffle/common/exception/NotRetryException.java | 1 -
.../org/apache/uniffle/common/exception/RssException.java | 1 -
.../{NotRetryException.java => RssFetchFailedException.java} | 11 ++++++-----
.../uniffle/client/impl/grpc/ShuffleServerGrpcClient.java | 11 ++++++-----
.../storage/handler/impl/ComposedClientReadHandler.java | 7 ++++++-
.../storage/handler/impl/LocalFileClientReadHandler.java | 5 ++++-
.../uniffle/storage/handler/impl/MemoryClientReadHandler.java | 6 ++++--
9 files changed, 32 insertions(+), 20 deletions(-)
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index a7ff8ef0..852a253f 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -36,7 +36,7 @@ import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleServerInfo;
-import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.IdHelper;
import org.apache.uniffle.common.util.RssUtils;
@@ -201,7 +201,7 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
clientReadHandler.updateConsumedBlockInfo(bs, true);
continue;
} else {
- throw new RssException(errMsg);
+ throw new RssFetchFailedException(errMsg);
}
}
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 45d8ed74..05b3be68 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -85,6 +85,7 @@ import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.ThreadUtils;
@@ -584,7 +585,7 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
}
}
if (!isSuccessful) {
- throw new RssException("Get shuffle result is failed for appId["
+ throw new RssFetchFailedException("Get shuffle result is failed for
appId["
+ appId + "], shuffleId[" + shuffleId + "]");
}
return blockIdBitmap;
@@ -625,7 +626,8 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
}
boolean isSuccessful = partitionReadSuccess.entrySet().stream().allMatch(x
-> x.getValue() >= replicaRead);
if (!isSuccessful) {
- throw new RssException("Get shuffle result is failed for appId[" + appId
+ "], shuffleId[" + shuffleId + "]");
+ throw new RssFetchFailedException(
+ "Get shuffle result is failed for appId[" + appId + "], shuffleId["
+ shuffleId + "]");
}
return blockIdBitmap;
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/exception/NotRetryException.java
b/common/src/main/java/org/apache/uniffle/common/exception/NotRetryException.java
index 49eaee64..5b230f6d 100644
---
a/common/src/main/java/org/apache/uniffle/common/exception/NotRetryException.java
+++
b/common/src/main/java/org/apache/uniffle/common/exception/NotRetryException.java
@@ -15,7 +15,6 @@
* limitations under the License.
*/
-
package org.apache.uniffle.common.exception;
public class NotRetryException extends RssException {
diff --git
a/common/src/main/java/org/apache/uniffle/common/exception/RssException.java
b/common/src/main/java/org/apache/uniffle/common/exception/RssException.java
index 93a73690..c48aac2d 100644
--- a/common/src/main/java/org/apache/uniffle/common/exception/RssException.java
+++ b/common/src/main/java/org/apache/uniffle/common/exception/RssException.java
@@ -15,7 +15,6 @@
* limitations under the License.
*/
-
package org.apache.uniffle.common.exception;
public class RssException extends RuntimeException {
diff --git
a/common/src/main/java/org/apache/uniffle/common/exception/NotRetryException.java
b/common/src/main/java/org/apache/uniffle/common/exception/RssFetchFailedException.java
similarity index 77%
copy from
common/src/main/java/org/apache/uniffle/common/exception/NotRetryException.java
copy to
common/src/main/java/org/apache/uniffle/common/exception/RssFetchFailedException.java
index 49eaee64..7c696c55 100644
---
a/common/src/main/java/org/apache/uniffle/common/exception/NotRetryException.java
+++
b/common/src/main/java/org/apache/uniffle/common/exception/RssFetchFailedException.java
@@ -15,16 +15,17 @@
* limitations under the License.
*/
-
package org.apache.uniffle.common.exception;
-public class NotRetryException extends RssException {
-
- public NotRetryException(String message) {
+/**
+ * Dedicated exception for rss client's shuffle failed related exception.
+ */
+public class RssFetchFailedException extends RssException {
+ public RssFetchFailedException(String message) {
super(message);
}
- public NotRetryException(String message, Throwable e) {
+ public RssFetchFailedException(String message, Throwable e) {
super(message, e);
}
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index c9b6016b..d9208f02 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -62,6 +62,7 @@ import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.exception.NotRetryException;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.common.util.RssUtils;
@@ -522,7 +523,7 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
+ " for [appId=" + request.getAppId() + ", shuffleId=" +
request.getShuffleId()
+ ", errorMsg:" + rpcResponse.getRetMsg();
LOG.error(msg);
- throw new RssException(msg);
+ throw new RssFetchFailedException(msg);
}
return response;
@@ -554,7 +555,7 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
+ " for [appId=" + request.getAppId() + ", shuffleId=" +
request.getShuffleId()
+ ", errorMsg:" + rpcResponse.getRetMsg();
LOG.error(msg);
- throw new RssException(msg);
+ throw new RssFetchFailedException(msg);
}
return response;
@@ -593,7 +594,7 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
String msg = "Can't get shuffle data from " + host + ":" + port
+ " for " + requestInfo + ", errorMsg:" + rpcResponse.getRetMsg();
LOG.error(msg);
- throw new RssException(msg);
+ throw new RssFetchFailedException(msg);
}
return response;
}
@@ -628,7 +629,7 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
String msg = "Can't get shuffle index from " + host + ":" + port
+ " for " + requestInfo + ", errorMsg:" + rpcResponse.getRetMsg();
LOG.error(msg);
- throw new RssException(msg);
+ throw new RssFetchFailedException(msg);
}
return response;
}
@@ -677,7 +678,7 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
String msg = "Can't get shuffle in memory data from " + host + ":" +
port
+ " for " + requestInfo + ", errorMsg:" + rpcResponse.getRetMsg();
LOG.error(msg);
- throw new RssException(msg);
+ throw new RssFetchFailedException(msg);
}
return response;
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
index 1f00edbb..91f34951 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
@@ -33,6 +33,7 @@ import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.storage.handler.ClientReadHandlerMetric;
import org.apache.uniffle.storage.handler.api.ClientReadHandler;
@@ -98,8 +99,12 @@ public class ComposedClientReadHandler extends
AbstractClientReadHandler {
ShuffleDataResult shuffleDataResult;
try {
shuffleDataResult = handler.readShuffleData();
+ } catch (RssFetchFailedException e) {
+ Throwable cause = e.getCause();
+ String message = "Failed to read shuffle data from " +
currentTier.name() + "handler, error: " + e.getMessage();
+ throw new RssFetchFailedException(message, cause);
} catch (Exception e) {
- throw new RssException("Failed to read shuffle data from " +
currentTier.name() + " handler", e);
+ throw new RssFetchFailedException("Failed to read shuffle data from " +
currentTier.name() + " handler", e);
}
// when is no data for current handler, and the upmostLevel is not reached,
// then try next one if there has
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
index a2d96940..61ecc3ce 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
@@ -30,6 +30,7 @@ import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleDataSegment;
import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.exception.RssFetchFailedException;
public class LocalFileClientReadHandler extends DataSkippableReadHandler {
private static final Logger LOG = LoggerFactory.getLogger(
@@ -88,8 +89,10 @@ public class LocalFileClientReadHandler extends
DataSkippableReadHandler {
appId, shuffleId, partitionId, partitionNumPerRange, partitionNum);
try {
shuffleIndexResult =
shuffleServerClient.getShuffleIndex(request).getShuffleIndexResult();
+ } catch (RssFetchFailedException e) {
+ throw e;
} catch (Exception e) {
- throw new RssException("Failed to read shuffle index for appId[" + appId
+ "], shuffleId["
+ throw new RssFetchFailedException("Failed to read shuffle index for
appId[" + appId + "], shuffleId["
+ shuffleId + "], partitionId[" + partitionId + "]", e);
}
return shuffleIndexResult;
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
index 49eaa0b0..b154261e 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
@@ -28,7 +28,7 @@ import
org.apache.uniffle.client.request.RssGetInMemoryShuffleDataRequest;
import org.apache.uniffle.client.response.RssGetInMemoryShuffleDataResponse;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.common.util.Constants;
@@ -71,9 +71,11 @@ public class MemoryClientReadHandler extends
AbstractClientReadHandler {
RssGetInMemoryShuffleDataResponse response =
shuffleServerClient.getInMemoryShuffleData(request);
result = new ShuffleDataResult(response.getData(),
response.getBufferSegments());
+ } catch (RssFetchFailedException e) {
+ throw e;
} catch (Exception e) {
// todo: fault tolerance solution should be added
- throw new RssException("Failed to read in memory shuffle data with "
+ throw new RssFetchFailedException("Failed to read in memory shuffle data
with "
+ shuffleServerClient.getClientInfo() + " due to " + e);
}