This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 3b544ee938c55cca2619c10c93f6289f9c7432f1
Author: yiguolei <[email protected]>
AuthorDate: Wed Apr 24 19:19:31 2024 +0800

    [refactor](errormessage) step1: unify the status usage in FE (#34062)
    
    We should tell the user the correct error message when some thing wrong. 
But error message is in a mess. I will make it clear.
    
    This is the first step: unify the error code usage in FE.
---
 .../main/java/org/apache/doris/common/Status.java  | 48 +++++++++++-----------
 .../main/java/org/apache/doris/qe/Coordinator.java | 19 ++++-----
 .../java/org/apache/doris/qe/PointQueryExec.java   | 22 +++++-----
 .../java/org/apache/doris/qe/ResultReceiver.java   | 26 ++++++------
 .../org/apache/doris/qe/cache/CacheBeProxy.java    | 18 ++++----
 .../org/apache/doris/qe/cache/PartitionCache.java  |  3 +-
 .../arrowflight/FlightSqlConnectProcessor.java     |  8 ++--
 7 files changed, 69 insertions(+), 75 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
index 555a82751ee..18852d8c04c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
@@ -26,14 +26,6 @@ public class Status {
     public static final Status CANCELLED = new Status(TStatusCode.CANCELLED, 
"Cancelled");
     public static final Status TIMEOUT = new Status(TStatusCode.TIMEOUT, 
"Timeout");
 
-    public TStatusCode getErrorCode() {
-        return errorCode;
-    }
-
-    public String getErrorMsg() {
-        return errorMsg;
-    }
-
     private TStatusCode  errorCode; // anything other than OK
     private String errorMsg;
 
@@ -58,6 +50,25 @@ public class Status {
         }
     }
 
+    public Status(final PStatus status) {
+        TStatusCode mappingCode = 
TStatusCode.findByValue(status.getStatusCode());
+        // Not all pstatus code could be mapped to TStatusCode, see BE 
status.h file
+        // For those not mapped, set it to internal error.
+        if (mappingCode == null) {
+            this.errorCode = TStatusCode.INTERNAL_ERROR;
+        } else {
+            this.errorCode = mappingCode;
+        }
+        if (!status.getErrorMsgsList().isEmpty()) {
+            this.errorMsg = status.getErrorMsgs(0);
+        }
+    }
+
+    public void updateStatus(TStatusCode code, String errorMessage) {
+        this.errorCode = code;
+        this.errorMsg = errorMessage;
+    }
+
     public boolean ok() {
         return this.errorCode == TStatusCode.OK;
     }
@@ -70,27 +81,14 @@ public class Status {
         return this.errorCode == TStatusCode.THRIFT_RPC_ERROR;
     }
 
-    public void setStatus(Status status) {
-        this.errorCode = status.errorCode;
-        this.errorMsg = status.getErrorMsg();
-    }
-
-    public void setStatus(String msg) {
-        this.errorCode = TStatusCode.INTERNAL_ERROR;
-        this.errorMsg = msg;
+    public TStatusCode getErrorCode() {
+        return errorCode;
     }
 
-    public void setPstatus(PStatus status) {
-        this.errorCode = TStatusCode.findByValue(status.getStatusCode());
-        if (!status.getErrorMsgsList().isEmpty()) {
-            this.errorMsg = status.getErrorMsgs(0);
-        }
+    public String getErrorMsg() {
+        return errorMsg;
     }
 
-    public void setRpcStatus(String msg) {
-        this.errorCode = TStatusCode.THRIFT_RPC_ERROR;
-        this.errorMsg = msg;
-    }
 
     public void rewriteErrorMsg() {
         if (ok()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 49fa87e9ac9..1bc45bb358f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -490,7 +490,7 @@ public class Coordinator implements CoordInterface {
         try {
             this.backendExecStates.clear();
             this.pipelineExecContexts.clear();
-            this.queryStatus.setStatus(new Status());
+            this.queryStatus.updateStatus(TStatusCode.OK, "");
             if (this.exportFiles == null) {
                 this.exportFiles = Lists.newArrayList();
             }
@@ -1101,7 +1101,7 @@ public class Coordinator implements CoordInterface {
                 if (exception != null && errMsg == null) {
                     errMsg = operation + " failed. " + exception.getMessage();
                 }
-                queryStatus.setStatus(errMsg);
+                queryStatus.updateStatus(TStatusCode.INTERNAL_ERROR, errMsg);
                 cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
                 switch (code) {
                     case TIMEOUT:
@@ -1182,7 +1182,7 @@ public class Coordinator implements CoordInterface {
                 if (exception != null && errMsg == null) {
                     errMsg = operation + " failed. " + exception.getMessage();
                 }
-                queryStatus.setStatus(errMsg);
+                queryStatus.updateStatus(TStatusCode.INTERNAL_ERROR, errMsg);
                 cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
                 switch (code) {
                     case TIMEOUT:
@@ -1306,7 +1306,7 @@ public class Coordinator implements CoordInterface {
                 return;
             }
 
-            queryStatus.setStatus(status);
+            queryStatus.updateStatus(status.getErrorCode(), 
status.getErrorMsg());
             if (status.getErrorCode() == TStatusCode.TIMEOUT) {
                 cancelInternal(Types.PPlanFragmentCancelReason.TIMEOUT);
             } else {
@@ -1470,7 +1470,7 @@ public class Coordinator implements CoordInterface {
                         + "so that send cancel to BE again",
                         DebugUtil.printId(queryId), queryStatus.toString(), 
new Exception());
             } else {
-                queryStatus.setStatus(Status.CANCELLED);
+                queryStatus.updateStatus(TStatusCode.CANCELLED, "cancelled");
             }
             LOG.warn("Cancel execution of query {}, this is a outside invoke, 
cancelReason {}",
                     DebugUtil.printId(queryId), cancelReason.toString());
@@ -3137,8 +3137,7 @@ public class Coordinator implements CoordInterface {
                         public void 
onSuccess(InternalService.PCancelPlanFragmentResult result) {
                             cancelInProcess = false;
                             if (result.hasStatus()) {
-                                Status status = new Status();
-                                status.setPstatus(result.getStatus());
+                                Status status = new Status(result.getStatus());
                                 if (status.getErrorCode() == TStatusCode.OK) {
                                     hasCancelled = true;
                                 } else {
@@ -3323,8 +3322,7 @@ public class Coordinator implements CoordInterface {
                         public void 
onSuccess(InternalService.PCancelPlanFragmentResult result) {
                             cancelInProcess = false;
                             if (result.hasStatus()) {
-                                Status status = new Status();
-                                status.setPstatus(result.getStatus());
+                                Status status = new Status(result.getStatus());
                                 if (status.getErrorCode() == TStatusCode.OK) {
                                     hasCancelled = true;
                                 } else {
@@ -3388,8 +3386,7 @@ public class Coordinator implements CoordInterface {
                         public void 
onSuccess(InternalService.PCancelPlanFragmentResult result) {
                             cancelInProcess = false;
                             if (result.hasStatus()) {
-                                Status status = new Status();
-                                status.setPstatus(result.getStatus());
+                                Status status = new Status(result.getStatus());
                                 if (status.getErrorCode() == TStatusCode.OK) {
                                     hasCancelled = true;
                                 } else {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
index f41e9a4d896..5742a99de2f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
@@ -196,7 +196,7 @@ public class PointQueryExec implements CoordInterface {
             if (tryCount >= maxTry) {
                 break;
             }
-            status.setStatus(Status.OK);
+            status.updateStatus(TStatusCode.OK, "");
         } while (true);
         // handle status code
         if (!status.ok()) {
@@ -270,7 +270,7 @@ public class PointQueryExec implements CoordInterface {
                 long currentTs = System.currentTimeMillis();
                 if (currentTs >= timeoutTs) {
                     LOG.warn("fetch result timeout {}", 
backend.getBrpcAddress());
-                    status.setStatus("query timeout");
+                    status.updateStatus(TStatusCode.INTERNAL_ERROR, "query 
timeout");
                     return null;
                 }
                 try {
@@ -279,35 +279,35 @@ public class PointQueryExec implements CoordInterface {
                     // continue to get result
                     LOG.info("future get interrupted Exception");
                     if (isCancel) {
-                        status.setStatus(Status.CANCELLED);
+                        status.updateStatus(TStatusCode.CANCELLED, 
"cancelled");
                         return null;
                     }
                 } catch (TimeoutException e) {
                     futureResponse.cancel(true);
                     LOG.warn("fetch result timeout {}, addr {}", timeoutTs - 
currentTs, backend.getBrpcAddress());
-                    status.setStatus("query timeout");
+                    status.updateStatus(TStatusCode.INTERNAL_ERROR, "query 
timeout");
                     return null;
                 }
             }
         } catch (RpcException e) {
             LOG.warn("fetch result rpc exception {}, e {}", 
backend.getBrpcAddress(), e);
-            status.setRpcStatus(e.getMessage());
+            status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage());
             SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage());
             return null;
         } catch (ExecutionException e) {
             LOG.warn("fetch result execution exception {}, addr {}", e, 
backend.getBrpcAddress());
             if (e.getMessage().contains("time out")) {
                 // if timeout, we set error code to TIMEOUT, and it will not 
retry querying.
-                status.setStatus(new Status(TStatusCode.TIMEOUT, 
e.getMessage()));
+                status.updateStatus(TStatusCode.TIMEOUT, e.getMessage());
             } else {
-                status.setRpcStatus(e.getMessage());
+                status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, 
e.getMessage());
                 SimpleScheduler.addToBlacklist(backend.getId(), 
e.getMessage());
             }
             return null;
         }
-        TStatusCode code = 
TStatusCode.findByValue(pResult.getStatus().getStatusCode());
-        if (code != TStatusCode.OK) {
-            status.setPstatus(pResult.getStatus());
+        Status resultStatus = new Status(pResult.getStatus());
+        if (resultStatus.getErrorCode() != TStatusCode.OK) {
+            status.updateStatus(resultStatus.getErrorCode(), 
resultStatus.getErrorMsg());
             return null;
         }
 
@@ -335,7 +335,7 @@ public class PointQueryExec implements CoordInterface {
         }
 
         if (isCancel) {
-            status.setStatus(Status.CANCELLED);
+            status.updateStatus(TStatusCode.CANCELLED, "cancelled");
         }
         return rowBatch;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
index 275ba0ffd78..981d720e8b9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
@@ -59,7 +59,7 @@ public class ResultReceiver {
     int maxMsgSizeOfResultReceiver;
 
     private void setRunStatus(Status status) {
-        runStatus.setStatus(status);
+        runStatus.updateStatus(status.getErrorCode(), status.getErrorMsg());
     }
 
     private boolean isCancel() {
@@ -104,14 +104,14 @@ public class ResultReceiver {
                         if (!isCancel()) {
                             LOG.warn("ResultReceiver is not set to cancelled 
state, this should not happen");
                         } else {
-                            status.setStatus(new Status(TStatusCode.CANCELLED, 
this.cancelReason));
+                            status.updateStatus(TStatusCode.CANCELLED, 
this.cancelReason);
                             return null;
                         }
                     } catch (TimeoutException e) {
                         LOG.warn("Query {} get result timeout, get result 
duration {} ms",
                                 DebugUtil.printId(this.queryId), (timeoutTs - 
currentTs) / 1000);
                         setRunStatus(Status.TIMEOUT);
-                        status.setStatus(Status.TIMEOUT);
+                        status.updateStatus(TStatusCode.TIMEOUT, "");
                         updateCancelReason("fetch data timeout");
                         return null;
                     } catch (InterruptedException e) {
@@ -119,15 +119,15 @@ public class ResultReceiver {
                         LOG.warn("Future of ResultReceiver of query {} got 
interrupted Exception",
                                 DebugUtil.printId(this.queryId), e);
                         if (isCancel()) {
-                            status.setStatus(Status.CANCELLED);
+                            status.updateStatus(TStatusCode.CANCELLED, 
"cancelled");
                             return null;
                         }
                     }
                 }
 
-                TStatusCode code = 
TStatusCode.findByValue(pResult.getStatus().getStatusCode());
-                if (code != TStatusCode.OK) {
-                    status.setPstatus(pResult.getStatus());
+                Status resultStatus = new Status(pResult.getStatus());
+                if (resultStatus.getErrorCode() != TStatusCode.OK) {
+                    status.updateStatus(resultStatus.getErrorCode(), 
resultStatus.getErrorMsg());
                     return null;
                 }
 
@@ -136,7 +136,7 @@ public class ResultReceiver {
                 if (packetIdx != pResult.getPacketSeq()) {
                     LOG.warn("finistId={}, receive packet failed, expect={}, 
receive={}",
                             DebugUtil.printId(finstId), packetIdx, 
pResult.getPacketSeq());
-                    status.setRpcStatus("receive error packet");
+                    status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, "receive 
error packet");
                     return null;
                 }
 
@@ -170,20 +170,20 @@ public class ResultReceiver {
             }
         } catch (RpcException e) {
             LOG.warn("fetch result rpc exception, finstId={}", 
DebugUtil.printId(finstId), e);
-            status.setRpcStatus(e.getMessage());
+            status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage());
             SimpleScheduler.addToBlacklist(backendId, e.getMessage());
         } catch (ExecutionException e) {
             LOG.warn("fetch result execution exception, finstId={}", 
DebugUtil.printId(finstId), e);
             if (e.getMessage().contains("time out")) {
                 // if timeout, we set error code to TIMEOUT, and it will not 
retry querying.
-                status.setStatus(new Status(TStatusCode.TIMEOUT, 
e.getMessage()));
+                status.updateStatus(TStatusCode.TIMEOUT, e.getMessage());
             } else {
-                status.setRpcStatus(e.getMessage());
+                status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, 
e.getMessage());
                 SimpleScheduler.addToBlacklist(backendId, e.getMessage());
             }
         } catch (TimeoutException e) {
             LOG.warn("fetch result timeout, finstId={}", 
DebugUtil.printId(finstId), e);
-            status.setStatus(new Status(TStatusCode.TIMEOUT, "query timeout"));
+            status.updateStatus(TStatusCode.TIMEOUT, "query timeout");
         } finally {
             synchronized (this) {
                 currentThread = null;
@@ -191,7 +191,7 @@ public class ResultReceiver {
         }
 
         if ((isCancel())) {
-            status.setStatus(runStatus);
+            status.updateStatus(runStatus.getErrorCode(), 
runStatus.getErrorMsg());
         }
         return rowBatch;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java
index 9be5aacef11..f27db75a10e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java
@@ -55,13 +55,13 @@ public class CacheBeProxy extends CacheProxy {
                     .updateCache(address, request);
             InternalService.PCacheResponse response = future.get(timeoutMs, 
TimeUnit.MILLISECONDS);
             if (response.getStatus() == InternalService.PCacheStatus.CACHE_OK) 
{
-                status.setStatus(new Status(TStatusCode.OK, "CACHE_OK"));
+                status.updateStatus(TStatusCode.OK, "CACHE_OK");
             } else {
-                status.setStatus(response.getStatus().toString());
+                status.updateStatus(TStatusCode.INTERNAL_ERROR, 
response.getStatus().toString());
             }
         } catch (Exception e) {
             LOG.warn("update cache exception, sqlKey {}", sqlKey, e);
-            status.setRpcStatus(e.getMessage());
+            status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage());
             SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage());
         }
     }
@@ -80,17 +80,17 @@ public class CacheBeProxy extends CacheProxy {
             return future.get(timeoutMs, TimeUnit.MILLISECONDS);
         } catch (RpcException e) {
             LOG.warn("fetch catch rpc exception, sqlKey {}, backend {}", 
sqlKey, backend.getId(), e);
-            status.setRpcStatus(e.getMessage());
+            status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage());
             SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage());
         } catch (InterruptedException e) {
             LOG.warn("future get interrupted exception, sqlKey {}, backend 
{}", sqlKey, backend.getId(), e);
-            status.setStatus("interrupted exception");
+            status.updateStatus(TStatusCode.INTERNAL_ERROR, "interrupted 
exception");
         } catch (ExecutionException e) {
             LOG.warn("future get execution exception, sqlKey {}, backend {}", 
sqlKey, backend.getId(), e);
-            status.setStatus("execution exception");
+            status.updateStatus(TStatusCode.INTERNAL_ERROR, "execution 
exception");
         } catch (TimeoutException e) {
             LOG.warn("fetch result timeout, sqlKey {}, backend {}", sqlKey, 
backend.getId(), e);
-            status.setStatus("query timeout");
+            status.updateStatus(TStatusCode.TIMEOUT, "query timeout");
         }
         return null;
     }
@@ -130,10 +130,10 @@ public class CacheBeProxy extends CacheProxy {
                     = BackendServiceProxy.getInstance().clearCache(address, 
request);
             InternalService.PCacheResponse response = future.get(timeoutMs, 
TimeUnit.MILLISECONDS);
             if (response.getStatus() == InternalService.PCacheStatus.CACHE_OK) 
{
-                status.setStatus(new Status(TStatusCode.OK, "CACHE_OK"));
+                status.updateStatus(TStatusCode.OK, "CACHE_OK");
                 return true;
             } else {
-                status.setStatus(response.getStatus().toString());
+                status.updateStatus(TStatusCode.INTERNAL_ERROR, 
response.getStatus().toString());
                 return false;
             }
         } catch (Exception e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java
index 0aedef7fc60..277e8f16087 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java
@@ -31,6 +31,7 @@ import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.qe.RowBatch;
+import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.collect.Lists;
@@ -86,7 +87,7 @@ public class PartitionCache extends Cache {
         range = new PartitionRange(this.partitionPredicate, this.olapTable,
                 this.partitionInfo);
         if (!range.analytics()) {
-            status.setStatus("analytics range error");
+            status.updateStatus(TStatusCode.INTERNAL_ERROR, "analytics range 
error");
             return null;
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
index 818bd1929c8..6f51c0391af 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
@@ -118,12 +118,10 @@ public class FlightSqlConnectProcessor extends 
ConnectProcessor implements AutoC
                 throw new RuntimeException(String.format("fetch arrow flight 
schema timeout, finstId: %s",
                         DebugUtil.printId(tid)));
             }
-            TStatusCode code = 
TStatusCode.findByValue(pResult.getStatus().getStatusCode());
-            if (code != TStatusCode.OK) {
-                Status status = new Status();
-                status.setPstatus(pResult.getStatus());
+            Status resultStatus = new Status(pResult.getStatus());
+            if (resultStatus.getErrorCode() != TStatusCode.OK) {
                 throw new RuntimeException(String.format("fetch arrow flight 
schema failed, finstId: %s, errmsg: %s",
-                        DebugUtil.printId(tid), status.getErrorMsg()));
+                        DebugUtil.printId(tid), resultStatus.toString()));
             }
             if (pResult.hasBeArrowFlightIp()) {
                 ctx.getResultFlightServerAddr().hostname = 
pResult.getBeArrowFlightIp().toStringUtf8();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to