This is an automated email from the ASF dual-hosted git repository. rickyma pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push: new f984efb0c [#1894] fix(server): Fix NPE caused by app not found issue (#1915) f984efb0c is described below commit f984efb0c4681619182dc777c1cd365c98238406 Author: maobaolong <baoloong...@tencent.com> AuthorDate: Wed Jul 17 14:13:43 2024 +0800 [#1894] fix(server): Fix NPE caused by app not found issue (#1915) ### What changes were proposed in this pull request? Fix the NPE issue within shuffle server. ### Why are the changes needed? Fix: #1894 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Tested on our test cluster. --- .../org/apache/uniffle/common/rpc/StatusCode.java | 2 ++ .../client/impl/grpc/ShuffleServerGrpcClient.java | 7 +++++-- .../impl/grpc/ShuffleServerGrpcNettyClient.java | 4 ++-- proto/src/main/proto/Rss.proto | 2 ++ .../uniffle/server/ShuffleServerGrpcService.java | 20 ++++++++++++++++++++ .../apache/uniffle/server/ShuffleServerMetrics.java | 4 ++++ .../server/netty/ShuffleServerNettyHandler.java | 17 +++++++++++++++++ 7 files changed, 52 insertions(+), 4 deletions(-) diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java b/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java index ff8ac231c..4b891440d 100644 --- a/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java +++ b/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java @@ -36,6 +36,8 @@ public enum StatusCode { INVALID_REQUEST(9), NO_BUFFER_FOR_HUGE_PARTITION(10), STAGE_RETRY_IGNORE(11), + APP_NOT_FOUND(13), + INTERNAL_NOT_RETRY_ERROR(14), UNKNOWN(-1); static final Map<Integer, StatusCode> VALUE_MAP = diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java index 14dbf2f60..988b7a7f0 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java @@ -130,6 +130,9 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer protected Random random = new Random(); protected static final int BACK_OFF_BASE = 2000; + static final List<StatusCode> NOT_RETRY_STATUS_CODES = + Lists.newArrayList( + StatusCode.NO_REGISTER, StatusCode.APP_NOT_FOUND, StatusCode.INTERNAL_NOT_RETRY_ERROR); @VisibleForTesting public ShuffleServerGrpcClient(String host, int port) { @@ -595,7 +598,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer + ", errorMsg:" + response.getRetMsg(); failedStatusCode.set(StatusCode.fromCode(response.getStatus().getNumber())); - if (response.getStatus() == RssProtos.StatusCode.NO_REGISTER) { + if (NOT_RETRY_STATUS_CODES.contains(failedStatusCode.get())) { throw new NotRetryException(msg); } else { throw new RssException(msg); @@ -606,7 +609,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer null, request.getRetryIntervalMax(), maxRetryAttempts, - t -> !(t instanceof OutOfMemoryError)); + t -> !(t instanceof OutOfMemoryError) && !(t instanceof NotRetryException)); } catch (Throwable throwable) { LOG.warn("Failed to send shuffle data due to ", throwable); isSuccessful = false; diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java index 26e53851d..6da8788d8 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java @@ -217,7 +217,7 @@ public class ShuffleServerGrpcNettyClient extends ShuffleServerGrpcClient { + rpcResponse.getStatusCode() + ", errorMsg:" + rpcResponse.getRetMessage(); - if (rpcResponse.getStatusCode() == StatusCode.NO_REGISTER) { + if (NOT_RETRY_STATUS_CODES.contains(rpcResponse.getStatusCode())) { throw new NotRetryException(msg); } else { throw new RssException(msg); @@ -228,7 +228,7 @@ public class ShuffleServerGrpcNettyClient extends ShuffleServerGrpcClient { null, request.getRetryIntervalMax(), maxRetryAttempts, - t -> !(t instanceof OutOfMemoryError)); + t -> !(t instanceof OutOfMemoryError) && !(t instanceof NotRetryException)); } catch (Throwable throwable) { LOG.warn("Failed to send shuffle data due to ", throwable); isSuccessful = false; diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto index 97928bf20..5ce36df7e 100644 --- a/proto/src/main/proto/Rss.proto +++ b/proto/src/main/proto/Rss.proto @@ -308,6 +308,8 @@ enum StatusCode { INVALID_REQUEST = 9; NO_BUFFER_FOR_HUGE_PARTITION = 10; STAGE_RETRY_IGNORE = 11; + APP_NOT_FOUND = 13; + INTERNAL_NOT_RETRY_ERROR = 14; // add more status } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index b6e37029f..aea43d24e 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -254,6 +254,26 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase { long timestamp = req.getTimestamp(); int stageAttemptNumber = req.getStageAttemptNumber(); ShuffleTaskInfo taskInfo = shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(appId); + if (taskInfo == null) { + String errorMsg = + "APP_NOT_FOUND error, requireBufferId[" + + requireBufferId + + "] for appId[" + + appId + + "], shuffleId[" + + shuffleId + + "]"; + LOG.error(errorMsg); + ShuffleServerMetrics.counterAppNotFound.inc(); + reply = + SendShuffleDataResponse.newBuilder() + .setStatus(StatusCode.APP_NOT_FOUND.toProto()) + .setRetMsg(errorMsg) + .build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + return; + } Integer latestStageAttemptNumber = taskInfo.getLatestStageAttemptNumber(shuffleId); // The Stage retry occurred, and the task before StageNumber was simply ignored and not // processed if the task was being sent. diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java index f97978c0f..8eada73c5 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java @@ -134,6 +134,7 @@ public class ShuffleServerMetrics { private static final String TOTAL_EXPIRED_PRE_ALLOCATED_BUFFER_NUM = "total_expired_preAllocated_buffer_num"; + private static final String TOTAL_APP_NOT_FOUND_NUM = "total_app_not_found_num"; private static final String TOTAL_REMOVE_RESOURCE_TIME = "total_remove_resource_time"; private static final String TOTAL_REMOVE_RESOURCE_BY_SHUFFLE_IDS_TIME = @@ -237,6 +238,7 @@ public class ShuffleServerMetrics { public static Counter counterLocalFileEventFlush; public static Counter counterHadoopEventFlush; public static Counter counterPreAllocatedBufferExpired; + public static Counter counterAppNotFound; private static MetricsManager metricsManager; private static boolean isRegister = false; @@ -462,6 +464,8 @@ public class ShuffleServerMetrics { counterPreAllocatedBufferExpired = metricsManager.addCounter(TOTAL_EXPIRED_PRE_ALLOCATED_BUFFER_NUM); + counterAppNotFound = metricsManager.addCounter(TOTAL_APP_NOT_FOUND_NUM); + summaryTotalRemoveResourceTime = metricsManager.addSummary(TOTAL_REMOVE_RESOURCE_TIME); summaryTotalRemoveResourceByShuffleIdsTime = metricsManager.addSummary(TOTAL_REMOVE_RESOURCE_BY_SHUFFLE_IDS_TIME); diff --git a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java index 448c12a23..d297603b9 100644 --- a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java +++ b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java @@ -105,6 +105,23 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler { long timestamp = req.getTimestamp(); int stageAttemptNumber = req.getStageAttemptNumber(); ShuffleTaskInfo taskInfo = shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(appId); + if (taskInfo == null) { + rpcResponse = + new RpcResponse( + req.getRequestId(), StatusCode.APP_NOT_FOUND, "appId: " + appId + " not found"); + String errorMsg = + "APP_NOT_FOUND error, requireBufferId[" + + requireBufferId + + "] for appId[" + + appId + + "], shuffleId[" + + shuffleId + + "]"; + LOG.error(errorMsg); + ShuffleServerMetrics.counterAppNotFound.inc(); + client.getChannel().writeAndFlush(rpcResponse); + return; + } Integer latestStageAttemptNumber = taskInfo.getLatestStageAttemptNumber(shuffleId); // The Stage retry occurred, and the task before StageNumber was simply ignored and not // processed if the task was being sent.