This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 3b544ee938c55cca2619c10c93f6289f9c7432f1 Author: yiguolei <[email protected]> AuthorDate: Wed Apr 24 19:19:31 2024 +0800 [refactor](errormessage) step1: unify the status usage in FE (#34062) We should tell the user the correct error message when some thing wrong. But error message is in a mess. I will make it clear. This is the first step: unify the error code usage in FE. --- .../main/java/org/apache/doris/common/Status.java | 48 +++++++++++----------- .../main/java/org/apache/doris/qe/Coordinator.java | 19 ++++----- .../java/org/apache/doris/qe/PointQueryExec.java | 22 +++++----- .../java/org/apache/doris/qe/ResultReceiver.java | 26 ++++++------ .../org/apache/doris/qe/cache/CacheBeProxy.java | 18 ++++---- .../org/apache/doris/qe/cache/PartitionCache.java | 3 +- .../arrowflight/FlightSqlConnectProcessor.java | 8 ++-- 7 files changed, 69 insertions(+), 75 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java index 555a82751ee..18852d8c04c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java @@ -26,14 +26,6 @@ public class Status { public static final Status CANCELLED = new Status(TStatusCode.CANCELLED, "Cancelled"); public static final Status TIMEOUT = new Status(TStatusCode.TIMEOUT, "Timeout"); - public TStatusCode getErrorCode() { - return errorCode; - } - - public String getErrorMsg() { - return errorMsg; - } - private TStatusCode errorCode; // anything other than OK private String errorMsg; @@ -58,6 +50,25 @@ public class Status { } } + public Status(final PStatus status) { + TStatusCode mappingCode = TStatusCode.findByValue(status.getStatusCode()); + // Not all pstatus code could be mapped to TStatusCode, see BE status.h file + // For those not mapped, set it to internal error. + if (mappingCode == null) { + this.errorCode = TStatusCode.INTERNAL_ERROR; + } else { + this.errorCode = mappingCode; + } + if (!status.getErrorMsgsList().isEmpty()) { + this.errorMsg = status.getErrorMsgs(0); + } + } + + public void updateStatus(TStatusCode code, String errorMessage) { + this.errorCode = code; + this.errorMsg = errorMessage; + } + public boolean ok() { return this.errorCode == TStatusCode.OK; } @@ -70,27 +81,14 @@ public class Status { return this.errorCode == TStatusCode.THRIFT_RPC_ERROR; } - public void setStatus(Status status) { - this.errorCode = status.errorCode; - this.errorMsg = status.getErrorMsg(); - } - - public void setStatus(String msg) { - this.errorCode = TStatusCode.INTERNAL_ERROR; - this.errorMsg = msg; + public TStatusCode getErrorCode() { + return errorCode; } - public void setPstatus(PStatus status) { - this.errorCode = TStatusCode.findByValue(status.getStatusCode()); - if (!status.getErrorMsgsList().isEmpty()) { - this.errorMsg = status.getErrorMsgs(0); - } + public String getErrorMsg() { + return errorMsg; } - public void setRpcStatus(String msg) { - this.errorCode = TStatusCode.THRIFT_RPC_ERROR; - this.errorMsg = msg; - } public void rewriteErrorMsg() { if (ok()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 49fa87e9ac9..1bc45bb358f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -490,7 +490,7 @@ public class Coordinator implements CoordInterface { try { this.backendExecStates.clear(); this.pipelineExecContexts.clear(); - this.queryStatus.setStatus(new Status()); + this.queryStatus.updateStatus(TStatusCode.OK, ""); if (this.exportFiles == null) { this.exportFiles = Lists.newArrayList(); } @@ -1101,7 +1101,7 @@ public class Coordinator implements CoordInterface { if (exception != null && errMsg == null) { errMsg = operation + " failed. " + exception.getMessage(); } - queryStatus.setStatus(errMsg); + queryStatus.updateStatus(TStatusCode.INTERNAL_ERROR, errMsg); cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR); switch (code) { case TIMEOUT: @@ -1182,7 +1182,7 @@ public class Coordinator implements CoordInterface { if (exception != null && errMsg == null) { errMsg = operation + " failed. " + exception.getMessage(); } - queryStatus.setStatus(errMsg); + queryStatus.updateStatus(TStatusCode.INTERNAL_ERROR, errMsg); cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR); switch (code) { case TIMEOUT: @@ -1306,7 +1306,7 @@ public class Coordinator implements CoordInterface { return; } - queryStatus.setStatus(status); + queryStatus.updateStatus(status.getErrorCode(), status.getErrorMsg()); if (status.getErrorCode() == TStatusCode.TIMEOUT) { cancelInternal(Types.PPlanFragmentCancelReason.TIMEOUT); } else { @@ -1470,7 +1470,7 @@ public class Coordinator implements CoordInterface { + "so that send cancel to BE again", DebugUtil.printId(queryId), queryStatus.toString(), new Exception()); } else { - queryStatus.setStatus(Status.CANCELLED); + queryStatus.updateStatus(TStatusCode.CANCELLED, "cancelled"); } LOG.warn("Cancel execution of query {}, this is a outside invoke, cancelReason {}", DebugUtil.printId(queryId), cancelReason.toString()); @@ -3137,8 +3137,7 @@ public class Coordinator implements CoordInterface { public void onSuccess(InternalService.PCancelPlanFragmentResult result) { cancelInProcess = false; if (result.hasStatus()) { - Status status = new Status(); - status.setPstatus(result.getStatus()); + Status status = new Status(result.getStatus()); if (status.getErrorCode() == TStatusCode.OK) { hasCancelled = true; } else { @@ -3323,8 +3322,7 @@ public class Coordinator implements CoordInterface { public void onSuccess(InternalService.PCancelPlanFragmentResult result) { cancelInProcess = false; if (result.hasStatus()) { - Status status = new Status(); - status.setPstatus(result.getStatus()); + Status status = new Status(result.getStatus()); if (status.getErrorCode() == TStatusCode.OK) { hasCancelled = true; } else { @@ -3388,8 +3386,7 @@ public class Coordinator implements CoordInterface { public void onSuccess(InternalService.PCancelPlanFragmentResult result) { cancelInProcess = false; if (result.hasStatus()) { - Status status = new Status(); - status.setPstatus(result.getStatus()); + Status status = new Status(result.getStatus()); if (status.getErrorCode() == TStatusCode.OK) { hasCancelled = true; } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java index f41e9a4d896..5742a99de2f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java @@ -196,7 +196,7 @@ public class PointQueryExec implements CoordInterface { if (tryCount >= maxTry) { break; } - status.setStatus(Status.OK); + status.updateStatus(TStatusCode.OK, ""); } while (true); // handle status code if (!status.ok()) { @@ -270,7 +270,7 @@ public class PointQueryExec implements CoordInterface { long currentTs = System.currentTimeMillis(); if (currentTs >= timeoutTs) { LOG.warn("fetch result timeout {}", backend.getBrpcAddress()); - status.setStatus("query timeout"); + status.updateStatus(TStatusCode.INTERNAL_ERROR, "query timeout"); return null; } try { @@ -279,35 +279,35 @@ public class PointQueryExec implements CoordInterface { // continue to get result LOG.info("future get interrupted Exception"); if (isCancel) { - status.setStatus(Status.CANCELLED); + status.updateStatus(TStatusCode.CANCELLED, "cancelled"); return null; } } catch (TimeoutException e) { futureResponse.cancel(true); LOG.warn("fetch result timeout {}, addr {}", timeoutTs - currentTs, backend.getBrpcAddress()); - status.setStatus("query timeout"); + status.updateStatus(TStatusCode.INTERNAL_ERROR, "query timeout"); return null; } } } catch (RpcException e) { LOG.warn("fetch result rpc exception {}, e {}", backend.getBrpcAddress(), e); - status.setRpcStatus(e.getMessage()); + status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage()); SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage()); return null; } catch (ExecutionException e) { LOG.warn("fetch result execution exception {}, addr {}", e, backend.getBrpcAddress()); if (e.getMessage().contains("time out")) { // if timeout, we set error code to TIMEOUT, and it will not retry querying. - status.setStatus(new Status(TStatusCode.TIMEOUT, e.getMessage())); + status.updateStatus(TStatusCode.TIMEOUT, e.getMessage()); } else { - status.setRpcStatus(e.getMessage()); + status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage()); SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage()); } return null; } - TStatusCode code = TStatusCode.findByValue(pResult.getStatus().getStatusCode()); - if (code != TStatusCode.OK) { - status.setPstatus(pResult.getStatus()); + Status resultStatus = new Status(pResult.getStatus()); + if (resultStatus.getErrorCode() != TStatusCode.OK) { + status.updateStatus(resultStatus.getErrorCode(), resultStatus.getErrorMsg()); return null; } @@ -335,7 +335,7 @@ public class PointQueryExec implements CoordInterface { } if (isCancel) { - status.setStatus(Status.CANCELLED); + status.updateStatus(TStatusCode.CANCELLED, "cancelled"); } return rowBatch; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java index 275ba0ffd78..981d720e8b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java @@ -59,7 +59,7 @@ public class ResultReceiver { int maxMsgSizeOfResultReceiver; private void setRunStatus(Status status) { - runStatus.setStatus(status); + runStatus.updateStatus(status.getErrorCode(), status.getErrorMsg()); } private boolean isCancel() { @@ -104,14 +104,14 @@ public class ResultReceiver { if (!isCancel()) { LOG.warn("ResultReceiver is not set to cancelled state, this should not happen"); } else { - status.setStatus(new Status(TStatusCode.CANCELLED, this.cancelReason)); + status.updateStatus(TStatusCode.CANCELLED, this.cancelReason); return null; } } catch (TimeoutException e) { LOG.warn("Query {} get result timeout, get result duration {} ms", DebugUtil.printId(this.queryId), (timeoutTs - currentTs) / 1000); setRunStatus(Status.TIMEOUT); - status.setStatus(Status.TIMEOUT); + status.updateStatus(TStatusCode.TIMEOUT, ""); updateCancelReason("fetch data timeout"); return null; } catch (InterruptedException e) { @@ -119,15 +119,15 @@ public class ResultReceiver { LOG.warn("Future of ResultReceiver of query {} got interrupted Exception", DebugUtil.printId(this.queryId), e); if (isCancel()) { - status.setStatus(Status.CANCELLED); + status.updateStatus(TStatusCode.CANCELLED, "cancelled"); return null; } } } - TStatusCode code = TStatusCode.findByValue(pResult.getStatus().getStatusCode()); - if (code != TStatusCode.OK) { - status.setPstatus(pResult.getStatus()); + Status resultStatus = new Status(pResult.getStatus()); + if (resultStatus.getErrorCode() != TStatusCode.OK) { + status.updateStatus(resultStatus.getErrorCode(), resultStatus.getErrorMsg()); return null; } @@ -136,7 +136,7 @@ public class ResultReceiver { if (packetIdx != pResult.getPacketSeq()) { LOG.warn("finistId={}, receive packet failed, expect={}, receive={}", DebugUtil.printId(finstId), packetIdx, pResult.getPacketSeq()); - status.setRpcStatus("receive error packet"); + status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, "receive error packet"); return null; } @@ -170,20 +170,20 @@ public class ResultReceiver { } } catch (RpcException e) { LOG.warn("fetch result rpc exception, finstId={}", DebugUtil.printId(finstId), e); - status.setRpcStatus(e.getMessage()); + status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage()); SimpleScheduler.addToBlacklist(backendId, e.getMessage()); } catch (ExecutionException e) { LOG.warn("fetch result execution exception, finstId={}", DebugUtil.printId(finstId), e); if (e.getMessage().contains("time out")) { // if timeout, we set error code to TIMEOUT, and it will not retry querying. - status.setStatus(new Status(TStatusCode.TIMEOUT, e.getMessage())); + status.updateStatus(TStatusCode.TIMEOUT, e.getMessage()); } else { - status.setRpcStatus(e.getMessage()); + status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage()); SimpleScheduler.addToBlacklist(backendId, e.getMessage()); } } catch (TimeoutException e) { LOG.warn("fetch result timeout, finstId={}", DebugUtil.printId(finstId), e); - status.setStatus(new Status(TStatusCode.TIMEOUT, "query timeout")); + status.updateStatus(TStatusCode.TIMEOUT, "query timeout"); } finally { synchronized (this) { currentThread = null; @@ -191,7 +191,7 @@ public class ResultReceiver { } if ((isCancel())) { - status.setStatus(runStatus); + status.updateStatus(runStatus.getErrorCode(), runStatus.getErrorMsg()); } return rowBatch; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java index 9be5aacef11..f27db75a10e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java @@ -55,13 +55,13 @@ public class CacheBeProxy extends CacheProxy { .updateCache(address, request); InternalService.PCacheResponse response = future.get(timeoutMs, TimeUnit.MILLISECONDS); if (response.getStatus() == InternalService.PCacheStatus.CACHE_OK) { - status.setStatus(new Status(TStatusCode.OK, "CACHE_OK")); + status.updateStatus(TStatusCode.OK, "CACHE_OK"); } else { - status.setStatus(response.getStatus().toString()); + status.updateStatus(TStatusCode.INTERNAL_ERROR, response.getStatus().toString()); } } catch (Exception e) { LOG.warn("update cache exception, sqlKey {}", sqlKey, e); - status.setRpcStatus(e.getMessage()); + status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage()); SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage()); } } @@ -80,17 +80,17 @@ public class CacheBeProxy extends CacheProxy { return future.get(timeoutMs, TimeUnit.MILLISECONDS); } catch (RpcException e) { LOG.warn("fetch catch rpc exception, sqlKey {}, backend {}", sqlKey, backend.getId(), e); - status.setRpcStatus(e.getMessage()); + status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage()); SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage()); } catch (InterruptedException e) { LOG.warn("future get interrupted exception, sqlKey {}, backend {}", sqlKey, backend.getId(), e); - status.setStatus("interrupted exception"); + status.updateStatus(TStatusCode.INTERNAL_ERROR, "interrupted exception"); } catch (ExecutionException e) { LOG.warn("future get execution exception, sqlKey {}, backend {}", sqlKey, backend.getId(), e); - status.setStatus("execution exception"); + status.updateStatus(TStatusCode.INTERNAL_ERROR, "execution exception"); } catch (TimeoutException e) { LOG.warn("fetch result timeout, sqlKey {}, backend {}", sqlKey, backend.getId(), e); - status.setStatus("query timeout"); + status.updateStatus(TStatusCode.TIMEOUT, "query timeout"); } return null; } @@ -130,10 +130,10 @@ public class CacheBeProxy extends CacheProxy { = BackendServiceProxy.getInstance().clearCache(address, request); InternalService.PCacheResponse response = future.get(timeoutMs, TimeUnit.MILLISECONDS); if (response.getStatus() == InternalService.PCacheStatus.CACHE_OK) { - status.setStatus(new Status(TStatusCode.OK, "CACHE_OK")); + status.updateStatus(TStatusCode.OK, "CACHE_OK"); return true; } else { - status.setStatus(response.getStatus().toString()); + status.updateStatus(TStatusCode.INTERNAL_ERROR, response.getStatus().toString()); return false; } } catch (Exception e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java index 0aedef7fc60..277e8f16087 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java @@ -31,6 +31,7 @@ import org.apache.doris.common.util.DebugUtil; import org.apache.doris.metric.MetricRepo; import org.apache.doris.proto.InternalService; import org.apache.doris.qe.RowBatch; +import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.Lists; @@ -86,7 +87,7 @@ public class PartitionCache extends Cache { range = new PartitionRange(this.partitionPredicate, this.olapTable, this.partitionInfo); if (!range.analytics()) { - status.setStatus("analytics range error"); + status.updateStatus(TStatusCode.INTERNAL_ERROR, "analytics range error"); return null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java index 818bd1929c8..6f51c0391af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java @@ -118,12 +118,10 @@ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoC throw new RuntimeException(String.format("fetch arrow flight schema timeout, finstId: %s", DebugUtil.printId(tid))); } - TStatusCode code = TStatusCode.findByValue(pResult.getStatus().getStatusCode()); - if (code != TStatusCode.OK) { - Status status = new Status(); - status.setPstatus(pResult.getStatus()); + Status resultStatus = new Status(pResult.getStatus()); + if (resultStatus.getErrorCode() != TStatusCode.OK) { throw new RuntimeException(String.format("fetch arrow flight schema failed, finstId: %s, errmsg: %s", - DebugUtil.printId(tid), status.getErrorMsg())); + DebugUtil.printId(tid), resultStatus.toString())); } if (pResult.hasBeArrowFlightIp()) { ctx.getResultFlightServerAddr().hostname = pResult.getBeArrowFlightIp().toStringUtf8(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
