This is an automated email from the ASF dual-hosted git repository.

rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new f984efb0c [#1894] fix(server): Fix NPE caused by app not found issue 
(#1915)
f984efb0c is described below

commit f984efb0c4681619182dc777c1cd365c98238406
Author: maobaolong <baoloong...@tencent.com>
AuthorDate: Wed Jul 17 14:13:43 2024 +0800

    [#1894] fix(server): Fix NPE caused by app not found issue (#1915)
    
    ### What changes were proposed in this pull request?
    
    Fix the NPE issue within shuffle server.
    
    ### Why are the changes needed?
    
    Fix: #1894
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Tested on our test cluster.
---
 .../org/apache/uniffle/common/rpc/StatusCode.java    |  2 ++
 .../client/impl/grpc/ShuffleServerGrpcClient.java    |  7 +++++--
 .../impl/grpc/ShuffleServerGrpcNettyClient.java      |  4 ++--
 proto/src/main/proto/Rss.proto                       |  2 ++
 .../uniffle/server/ShuffleServerGrpcService.java     | 20 ++++++++++++++++++++
 .../apache/uniffle/server/ShuffleServerMetrics.java  |  4 ++++
 .../server/netty/ShuffleServerNettyHandler.java      | 17 +++++++++++++++++
 7 files changed, 52 insertions(+), 4 deletions(-)

diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java 
b/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java
index ff8ac231c..4b891440d 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java
@@ -36,6 +36,8 @@ public enum StatusCode {
   INVALID_REQUEST(9),
   NO_BUFFER_FOR_HUGE_PARTITION(10),
   STAGE_RETRY_IGNORE(11),
+  APP_NOT_FOUND(13),
+  INTERNAL_NOT_RETRY_ERROR(14),
   UNKNOWN(-1);
 
   static final Map<Integer, StatusCode> VALUE_MAP =
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 14dbf2f60..988b7a7f0 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -130,6 +130,9 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
   protected Random random = new Random();
 
   protected static final int BACK_OFF_BASE = 2000;
+  static final List<StatusCode> NOT_RETRY_STATUS_CODES =
+      Lists.newArrayList(
+          StatusCode.NO_REGISTER, StatusCode.APP_NOT_FOUND, 
StatusCode.INTERNAL_NOT_RETRY_ERROR);
 
   @VisibleForTesting
   public ShuffleServerGrpcClient(String host, int port) {
@@ -595,7 +598,7 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
                         + ", errorMsg:"
                         + response.getRetMsg();
                 
failedStatusCode.set(StatusCode.fromCode(response.getStatus().getNumber()));
-                if (response.getStatus() == RssProtos.StatusCode.NO_REGISTER) {
+                if (NOT_RETRY_STATUS_CODES.contains(failedStatusCode.get())) {
                   throw new NotRetryException(msg);
                 } else {
                   throw new RssException(msg);
@@ -606,7 +609,7 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
             null,
             request.getRetryIntervalMax(),
             maxRetryAttempts,
-            t -> !(t instanceof OutOfMemoryError));
+            t -> !(t instanceof OutOfMemoryError) && !(t instanceof 
NotRetryException));
       } catch (Throwable throwable) {
         LOG.warn("Failed to send shuffle data due to ", throwable);
         isSuccessful = false;
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index 26e53851d..6da8788d8 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -217,7 +217,7 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
                         + rpcResponse.getStatusCode()
                         + ", errorMsg:"
                         + rpcResponse.getRetMessage();
-                if (rpcResponse.getStatusCode() == StatusCode.NO_REGISTER) {
+                if 
(NOT_RETRY_STATUS_CODES.contains(rpcResponse.getStatusCode())) {
                   throw new NotRetryException(msg);
                 } else {
                   throw new RssException(msg);
@@ -228,7 +228,7 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
             null,
             request.getRetryIntervalMax(),
             maxRetryAttempts,
-            t -> !(t instanceof OutOfMemoryError));
+            t -> !(t instanceof OutOfMemoryError) && !(t instanceof 
NotRetryException));
       } catch (Throwable throwable) {
         LOG.warn("Failed to send shuffle data due to ", throwable);
         isSuccessful = false;
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 97928bf20..5ce36df7e 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -308,6 +308,8 @@ enum StatusCode {
   INVALID_REQUEST = 9;
   NO_BUFFER_FOR_HUGE_PARTITION = 10;
   STAGE_RETRY_IGNORE = 11;
+  APP_NOT_FOUND = 13;
+  INTERNAL_NOT_RETRY_ERROR = 14;
   // add more status
 }
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index b6e37029f..aea43d24e 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -254,6 +254,26 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
     long timestamp = req.getTimestamp();
     int stageAttemptNumber = req.getStageAttemptNumber();
     ShuffleTaskInfo taskInfo = 
shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(appId);
+    if (taskInfo == null) {
+      String errorMsg =
+          "APP_NOT_FOUND error, requireBufferId["
+              + requireBufferId
+              + "] for appId["
+              + appId
+              + "], shuffleId["
+              + shuffleId
+              + "]";
+      LOG.error(errorMsg);
+      ShuffleServerMetrics.counterAppNotFound.inc();
+      reply =
+          SendShuffleDataResponse.newBuilder()
+              .setStatus(StatusCode.APP_NOT_FOUND.toProto())
+              .setRetMsg(errorMsg)
+              .build();
+      responseObserver.onNext(reply);
+      responseObserver.onCompleted();
+      return;
+    }
     Integer latestStageAttemptNumber = 
taskInfo.getLatestStageAttemptNumber(shuffleId);
     // The Stage retry occurred, and the task before StageNumber was simply 
ignored and not
     // processed if the task was being sent.
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index f97978c0f..8eada73c5 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -134,6 +134,7 @@ public class ShuffleServerMetrics {
 
   private static final String TOTAL_EXPIRED_PRE_ALLOCATED_BUFFER_NUM =
       "total_expired_preAllocated_buffer_num";
+  private static final String TOTAL_APP_NOT_FOUND_NUM = 
"total_app_not_found_num";
 
   private static final String TOTAL_REMOVE_RESOURCE_TIME = 
"total_remove_resource_time";
   private static final String TOTAL_REMOVE_RESOURCE_BY_SHUFFLE_IDS_TIME =
@@ -237,6 +238,7 @@ public class ShuffleServerMetrics {
   public static Counter counterLocalFileEventFlush;
   public static Counter counterHadoopEventFlush;
   public static Counter counterPreAllocatedBufferExpired;
+  public static Counter counterAppNotFound;
 
   private static MetricsManager metricsManager;
   private static boolean isRegister = false;
@@ -462,6 +464,8 @@ public class ShuffleServerMetrics {
     counterPreAllocatedBufferExpired =
         metricsManager.addCounter(TOTAL_EXPIRED_PRE_ALLOCATED_BUFFER_NUM);
 
+    counterAppNotFound = metricsManager.addCounter(TOTAL_APP_NOT_FOUND_NUM);
+
     summaryTotalRemoveResourceTime = 
metricsManager.addSummary(TOTAL_REMOVE_RESOURCE_TIME);
     summaryTotalRemoveResourceByShuffleIdsTime =
         metricsManager.addSummary(TOTAL_REMOVE_RESOURCE_BY_SHUFFLE_IDS_TIME);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index 448c12a23..d297603b9 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -105,6 +105,23 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
     long timestamp = req.getTimestamp();
     int stageAttemptNumber = req.getStageAttemptNumber();
     ShuffleTaskInfo taskInfo = 
shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(appId);
+    if (taskInfo == null) {
+      rpcResponse =
+          new RpcResponse(
+              req.getRequestId(), StatusCode.APP_NOT_FOUND, "appId: " + appId 
+ " not found");
+      String errorMsg =
+          "APP_NOT_FOUND error, requireBufferId["
+              + requireBufferId
+              + "] for appId["
+              + appId
+              + "], shuffleId["
+              + shuffleId
+              + "]";
+      LOG.error(errorMsg);
+      ShuffleServerMetrics.counterAppNotFound.inc();
+      client.getChannel().writeAndFlush(rpcResponse);
+      return;
+    }
     Integer latestStageAttemptNumber = 
taskInfo.getLatestStageAttemptNumber(shuffleId);
     // The Stage retry occurred, and the task before StageNumber was simply 
ignored and not
     // processed if the task was being sent.

Reply via email to