This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4ce95d325e4 [refactor](fe) Merge MasterOpExecutor and FEOpExecutor (#50776) 4ce95d325e4 is described below commit 4ce95d325e4a60113b45fdf68e8b6d5eb6e5fd5b Author: Mingyu Chen (Rayner) <morning...@163.com> AuthorDate: Mon May 12 23:13:38 2025 +0800 [refactor](fe) Merge MasterOpExecutor and FEOpExecutor (#50776) ### What problem does this PR solve? Related PR: #34685 Problem Summary: This PR #34685 introduced `FEOpExecutor`, which is just a copy of `MasterOpExecutor`. Both class are used to forwarding request to a specific FE node, and there are lots of duplicate code. So I refactor the code to let `MasterOpExecutor` extends from `FEOpExecutor`. No logic being changed in this PR. --- .../main/java/org/apache/doris/catalog/Env.java | 4 +- .../apache/doris/httpv2/rest/SetConfigAction.java | 2 +- .../java/org/apache/doris/qe/FEOpExecutor.java | 139 +++++++-- .../java/org/apache/doris/qe/MasterOpExecutor.java | 324 ++------------------- .../java/org/apache/doris/qe/StmtExecutor.java | 6 +- 5 files changed, 143 insertions(+), 332 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index ad35b25572c..25a00b0f8da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -6173,7 +6173,7 @@ public class Env { * we can't set callback which is in fe-core to config items which are in fe-common. so wrap them here. it's not so * good but is best for us now. */ - public void setMutableConfigwithCallback(String key, String value) throws ConfigException { + public void setMutableConfigWithCallback(String key, String value) throws ConfigException { ConfigBase.setMutableConfig(key, value); if (configtoThreads.get(key) != null) { try { @@ -6193,7 +6193,7 @@ public class Env { for (Map.Entry<String, String> entry : configs.entrySet()) { try { - setMutableConfigwithCallback(entry.getKey(), entry.getValue()); + setMutableConfigWithCallback(entry.getKey(), entry.getValue()); } catch (ConfigException e) { throw new DdlException(e.getMessage()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/SetConfigAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/SetConfigAction.java index d9351ec5978..8d0cc12a235 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/SetConfigAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/SetConfigAction.java @@ -94,7 +94,7 @@ public class SetConfigAction extends RestBaseController { try { if (confValue != null && confValue.length == 1) { try { - Env.getCurrentEnv().setMutableConfigwithCallback(confKey, confValue[0]); + Env.getCurrentEnv().setMutableConfigWithCallback(confKey, confValue[0]); } catch (ConfigException e) { throw new DdlException(e.getMessage()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java index 703f0b64e0b..02b106a98be 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java @@ -19,9 +19,11 @@ package org.apache.doris.qe; import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.catalog.Env; -import org.apache.doris.cloud.qe.ComputeGroupException; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ClientPool; +import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; +import org.apache.doris.mysql.MysqlCommand; import org.apache.doris.thrift.FrontendService; import org.apache.doris.thrift.TExpr; import org.apache.doris.thrift.TExprNode; @@ -39,22 +41,28 @@ import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; import java.util.Map; +/** + * FEOpExecutor is used to send request to specific FE + */ public class FEOpExecutor { private static final Logger LOG = LogManager.getLogger(FEOpExecutor.class); - private static final float RPC_TIMEOUT_COEFFICIENT = 1.2f; + protected static final float RPC_TIMEOUT_COEFFICIENT = 1.2f; - private final OriginStatement originStmt; - private final ConnectContext ctx; - private TMasterOpResult result; - private TNetworkAddress feAddr; + protected final OriginStatement originStmt; + protected final ConnectContext ctx; + protected TMasterOpResult result; + protected TNetworkAddress feAddr; // the total time of thrift connectTime, readTime and writeTime - private int thriftTimeoutMs; + protected int thriftTimeoutMs; - private boolean shouldNotRetry; + protected boolean shouldNotRetry; public FEOpExecutor(TNetworkAddress feAddress, OriginStatement originStmt, ConnectContext ctx, boolean isQuery) { this.feAddr = feAddress; @@ -66,7 +74,15 @@ public class FEOpExecutor { } public void execute() throws Exception { - result = forward(feAddr, buildStmtForwardParams()); + result = forward(buildStmtForwardParams()); + if (ctx.isTxnModel()) { + if (result.isSetTxnLoadInfo()) { + ctx.getTxnEntry().setTxnLoadInfoInObserver(result.getTxnLoadInfo()); + } else { + ctx.setTxnEntry(null); + LOG.info("set txn entry to null"); + } + } } public void cancel() throws Exception { @@ -84,22 +100,24 @@ public class FEOpExecutor { request.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort()); // just make the protocol happy request.setSql(""); - result = forward(feAddr, request); + result = forward(request); } // Send request to specific fe - private TMasterOpResult forward(TNetworkAddress thriftAddress, TMasterOpRequest params) throws Exception { + protected TMasterOpResult forward(TMasterOpRequest params) throws Exception { ctx.getEnv().checkReadyOrThrow(); FrontendService.Client client; try { - client = ClientPool.frontendPool.borrowObject(thriftAddress, thriftTimeoutMs); + client = ClientPool.frontendPool.borrowObject(feAddr, thriftTimeoutMs); } catch (Exception e) { // may throw NullPointerException. add err msg - throw new Exception("Failed to get fe client: " + thriftAddress.toString(), e); + throw new Exception("Failed to get master client.", e); + } + final StringBuilder forwardMsg = new StringBuilder("forward to master FE " + feAddr.toString()); + if (!params.isSyncJournalOnly()) { + forwardMsg.append(", statement id: ").append(ctx.getStmtId()); } - final StringBuilder forwardMsg = new StringBuilder("forward to FE " + thriftAddress.toString()); - forwardMsg.append(", statement id: ").append(ctx.getStmtId()); LOG.info(forwardMsg.toString()); boolean isReturnToPool = false; @@ -110,7 +128,7 @@ public class FEOpExecutor { } catch (TTransportException e) { // wrap the raw exception. forwardMsg.append(" : failed"); - Exception exception = new ForwardToFEException(forwardMsg.toString(), e); + Exception exception = new ForwardToMasterException(forwardMsg.toString(), e); boolean ok = ClientPool.frontendPool.reopen(client, thriftTimeoutMs); if (!ok) { @@ -130,14 +148,14 @@ public class FEOpExecutor { } } finally { if (isReturnToPool) { - ClientPool.frontendPool.returnObject(thriftAddress, client); + ClientPool.frontendPool.returnObject(feAddr, client); } else { - ClientPool.frontendPool.invalidateObject(thriftAddress, client); + ClientPool.frontendPool.invalidateObject(feAddr, client); } } } - private TMasterOpRequest buildStmtForwardParams() { + protected TMasterOpRequest buildStmtForwardParams() throws AnalysisException { TMasterOpRequest params = new TMasterOpRequest(); // node ident params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); @@ -151,22 +169,38 @@ public class FEOpExecutor { params.setUserIp(ctx.getRemoteIP()); params.setStmtId(ctx.getStmtId()); params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); + params.setSessionId(ctx.getSessionId()); - String cluster = ""; - try { - ctx.getCloudCluster(false); - } catch (ComputeGroupException e) { - LOG.warn("failed to get cloud cluster", e); - } - if (!Strings.isNullOrEmpty(cluster)) { - params.setCloudCluster(cluster); + if (Config.isCloudMode()) { + String cluster = ""; + try { + cluster = ctx.getCloudCluster(false); + } catch (Exception e) { + LOG.warn("failed to get cloud compute group", e); + } + if (!Strings.isNullOrEmpty(cluster)) { + params.setCloudCluster(cluster); + } } + // session variables params.setSessionVariables(ctx.getSessionVariable().getForwardVariables()); params.setUserVariables(getForwardUserVariables(ctx.getUserVars())); if (null != ctx.queryId()) { params.setQueryId(ctx.queryId()); } + + // set transaction load info + if (ctx.isTxnModel()) { + params.setTxnLoadInfo(ctx.getTxnEntry().getTxnLoadInfoInObserver()); + } + + if (ctx.getCommand() == MysqlCommand.COM_STMT_EXECUTE) { + if (null != ctx.getPrepareExecuteBuffer()) { + params.setPrepareExecuteBuffer(ctx.getPrepareExecuteBuffer()); + } + } + return params; } @@ -187,6 +221,48 @@ public class FEOpExecutor { return result.getErrMessage(); } + + public ByteBuffer getOutputPacket() { + if (result == null) { + return null; + } + return result.packet; + } + + public TUniqueId getQueryId() { + if (result != null && result.isSetQueryId()) { + return result.getQueryId(); + } else { + return null; + } + } + + public String getProxyStatus() { + if (result == null) { + return QueryState.MysqlStateType.UNKNOWN.name(); + } + if (!result.isSetStatus()) { + return QueryState.MysqlStateType.UNKNOWN.name(); + } else { + return result.getStatus(); + } + } + + public ShowResultSet getProxyResultSet() { + if (result == null) { + return null; + } + if (result.isSetResultSet()) { + return new ShowResultSet(result.resultSet); + } else { + return null; + } + } + + public List<ByteBuffer> getQueryResultBufList() { + return result.isSetQueryResultBufList() ? result.getQueryResultBufList() : Collections.emptyList(); + } + private Map<String, TExprNode> getForwardUserVariables(Map<String, LiteralExpr> userVariables) { Map<String, TExprNode> forwardVariables = Maps.newHashMap(); for (Map.Entry<String, LiteralExpr> entry : userVariables.entrySet()) { @@ -198,21 +274,22 @@ public class FEOpExecutor { return forwardVariables; } - public static class ForwardToFEException extends RuntimeException { - + protected static class ForwardToMasterException extends RuntimeException { private static final Map<Integer, String> TYPE_MSG_MAP = ImmutableMap.<Integer, String>builder() .put(TTransportException.UNKNOWN, "Unknown exception") .put(TTransportException.NOT_OPEN, "Connection is not open") .put(TTransportException.ALREADY_OPEN, "Connection has already opened up") - .put(TTransportException.TIMED_OUT, "Connection timeout") + .put(TTransportException.TIMED_OUT, + "Connection timeout, please check network state or enlarge session variable:" + + "`query_timeout`/`insert_timeout`") .put(TTransportException.END_OF_FILE, "EOF") .put(TTransportException.CORRUPTED_DATA, "Corrupted data") .build(); private final String msg; - public ForwardToFEException(String msg, TTransportException exception) { + public ForwardToMasterException(String msg, TTransportException exception) { this.msg = msg + ", cause: " + TYPE_MSG_MAP.get(exception.getType()) + ", " + exception.getMessage(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index f78b3c7bc9e..87e16626e82 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -17,66 +17,33 @@ package org.apache.doris.qe; -import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.analysis.RedirectStatus; import org.apache.doris.catalog.Env; -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.ClientPool; -import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; -import org.apache.doris.common.ErrorCode; -import org.apache.doris.mysql.MysqlCommand; -import org.apache.doris.thrift.FrontendService; -import org.apache.doris.thrift.TExpr; -import org.apache.doris.thrift.TExprNode; import org.apache.doris.thrift.TGroupCommitInfo; import org.apache.doris.thrift.TMasterOpRequest; -import org.apache.doris.thrift.TMasterOpResult; import org.apache.doris.thrift.TNetworkAddress; -import org.apache.doris.thrift.TUniqueId; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.thrift.TException; -import org.apache.thrift.transport.TTransportException; -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -public class MasterOpExecutor { +/** + * MasterOpExecutor is used to send request to Master FE. + * It is inherited from FEOpExecutor. The difference is that MasterOpExecutor may need to wait the journal being + * synced before returning. + */ +public class MasterOpExecutor extends FEOpExecutor { private static final Logger LOG = LogManager.getLogger(MasterOpExecutor.class); - - private static final float RPC_TIMEOUT_COEFFICIENT = 1.2f; - - private final OriginStatement originStmt; - private final ConnectContext ctx; - private TMasterOpResult result; - - private TNetworkAddress masterAddr; - - private int waitTimeoutMs; - // the total time of thrift connectTime add readTime and writeTime - private int thriftTimeoutMs; - - private boolean shouldNotRetry; + private final int journalWaitTimeoutMs; public MasterOpExecutor(OriginStatement originStmt, ConnectContext ctx, RedirectStatus status, boolean isQuery) { - this.originStmt = originStmt; - this.ctx = ctx; + super(new TNetworkAddress(ctx.getEnv().getMasterHost(), ctx.getEnv().getMasterRpcPort()), + originStmt, ctx, isQuery); if (status.isNeedToWaitJournalSync()) { - this.waitTimeoutMs = (int) (ctx.getExecTimeout() * 1000 * RPC_TIMEOUT_COEFFICIENT); + this.journalWaitTimeoutMs = (int) (ctx.getExecTimeout() * 1000 * RPC_TIMEOUT_COEFFICIENT); } else { - this.waitTimeoutMs = 0; + this.journalWaitTimeoutMs = 0; } - this.thriftTimeoutMs = (int) (ctx.getExecTimeout() * 1000 * RPC_TIMEOUT_COEFFICIENT); - // if isQuery=false, we shouldn't retry twice when catch exception because of Idempotency - this.shouldNotRetry = !isQuery; } /** @@ -86,21 +53,25 @@ public class MasterOpExecutor { this(null, ctx, RedirectStatus.FORWARD_WITH_SYNC, true); } + @Override public void execute() throws Exception { - result = forward(buildStmtForwardParams()); - if (ctx.isTxnModel()) { - if (result.isSetTxnLoadInfo()) { - ctx.getTxnEntry().setTxnLoadInfoInObserver(result.getTxnLoadInfo()); - } else { - ctx.setTxnEntry(null); - LOG.info("set txn entry to null"); - } - } + super.execute(); waitOnReplaying(); } + @Override + public void cancel() throws Exception { + super.cancel(); + waitOnReplaying(); + } + + private void waitOnReplaying() throws DdlException { + LOG.info("forwarding to master get result max journal id: {}", result.maxJournalId); + ctx.getEnv().getJournalObservable().waitOn(result.maxJournalId, journalWaitTimeoutMs); + } + public void syncJournal() throws Exception { - result = forward(buildSyncJournalParmas()); + result = forward(buildSyncJournalParams()); waitOnReplaying(); } @@ -115,137 +86,7 @@ public class MasterOpExecutor { waitOnReplaying(); } - public void cancel() throws Exception { - TUniqueId queryId = ctx.queryId(); - if (queryId == null) { - return; - } - Preconditions.checkNotNull(masterAddr, "query with id %s is not forwarded to master", queryId); - TMasterOpRequest request = new TMasterOpRequest(); - request.setCancelQeury(true); - request.setQueryId(queryId); - request.setDb(ctx.getDatabase()); - request.setUser(ctx.getQualifiedUser()); - request.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); - request.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort()); - // just make the protocol happy - request.setSql(""); - result = forward(masterAddr, request); - waitOnReplaying(); - } - - private void waitOnReplaying() throws DdlException { - LOG.info("forwarding to master get result max journal id: {}", result.maxJournalId); - ctx.getEnv().getJournalObservable().waitOn(result.maxJournalId, waitTimeoutMs); - } - - private TMasterOpResult forward(TMasterOpRequest params) throws Exception { - String masterHost = ctx.getEnv().getMasterHost(); - int masterRpcPort = ctx.getEnv().getMasterRpcPort(); - masterAddr = new TNetworkAddress(masterHost, masterRpcPort); - return forward(masterAddr, params); - } - - // Send request to Master - private TMasterOpResult forward(TNetworkAddress thriftAddress, TMasterOpRequest params) throws Exception { - ctx.getEnv().checkReadyOrThrow(); - - FrontendService.Client client; - try { - client = ClientPool.frontendPool.borrowObject(thriftAddress, thriftTimeoutMs); - } catch (Exception e) { - // may throw NullPointerException. add err msg - throw new Exception("Failed to get master client.", e); - } - final StringBuilder forwardMsg = new StringBuilder("forward to master FE " + thriftAddress.toString()); - if (!params.isSyncJournalOnly()) { - forwardMsg.append(", statement id: ").append(ctx.getStmtId()); - } - LOG.info(forwardMsg.toString()); - - boolean isReturnToPool = false; - try { - final TMasterOpResult result = client.forward(params); - isReturnToPool = true; - return result; - } catch (TTransportException e) { - // wrap the raw exception. - forwardMsg.append(" : failed"); - Exception exception = new ForwardToMasterException(forwardMsg.toString(), e); - - boolean ok = ClientPool.frontendPool.reopen(client, thriftTimeoutMs); - if (!ok) { - throw exception; - } - if (shouldNotRetry || e.getType() == TTransportException.TIMED_OUT) { - throw exception; - } else { - LOG.warn(forwardMsg.append(" twice").toString(), e); - try { - TMasterOpResult result = client.forward(params); - isReturnToPool = true; - return result; - } catch (TException ex) { - throw exception; - } - } - } finally { - if (isReturnToPool) { - ClientPool.frontendPool.returnObject(thriftAddress, client); - } else { - ClientPool.frontendPool.invalidateObject(thriftAddress, client); - } - } - } - - private TMasterOpRequest buildStmtForwardParams() throws AnalysisException { - TMasterOpRequest params = new TMasterOpRequest(); - // node ident - params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); - params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort()); - params.setSql(originStmt.originStmt); - params.setStmtIdx(originStmt.idx); - params.setUser(ctx.getQualifiedUser()); - params.setDefaultCatalog(ctx.getDefaultCatalog()); - params.setDefaultDatabase(ctx.getDatabase()); - params.setDb(ctx.getDatabase()); - params.setUserIp(ctx.getRemoteIP()); - params.setStmtId(ctx.getStmtId()); - params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); - params.setSessionId(ctx.getSessionId()); - - if (Config.isCloudMode()) { - String cluster = ""; - try { - cluster = ctx.getCloudCluster(false); - } catch (Exception e) { - LOG.warn("failed to get cloud compute group", e); - } - if (!Strings.isNullOrEmpty(cluster)) { - params.setCloudCluster(cluster); - } - } - - // session variables - params.setSessionVariables(ctx.getSessionVariable().getForwardVariables()); - params.setUserVariables(getForwardUserVariables(ctx.getUserVars())); - if (null != ctx.queryId()) { - params.setQueryId(ctx.queryId()); - } - // set transaction load info - if (ctx.isTxnModel()) { - params.setTxnLoadInfo(ctx.getTxnEntry().getTxnLoadInfoInObserver()); - } - - if (ctx.getCommand() == MysqlCommand.COM_STMT_EXECUTE) { - if (null != ctx.getPrepareExecuteBuffer()) { - params.setPrepareExecuteBuffer(ctx.getPrepareExecuteBuffer()); - } - } - return params; - } - - private TMasterOpRequest buildSyncJournalParmas() { + private TMasterOpRequest buildSyncJournalParams() { final TMasterOpRequest params = new TMasterOpRequest(); // node ident params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); @@ -263,17 +104,7 @@ public class MasterOpExecutor { groupCommitParams.setGetGroupCommitLoadBeId(true); groupCommitParams.setGroupCommitLoadTableId(tableId); groupCommitParams.setCluster(cluster); - - final TMasterOpRequest params = new TMasterOpRequest(); - // node ident - params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); - params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort()); - params.setGroupCommitInfo(groupCommitParams); - params.setDb(ctx.getDatabase()); - params.setUser(ctx.getQualifiedUser()); - // just make the protocol happy - params.setSql(""); - return params; + return getMasterOpRequestForGroupCommit(groupCommitParams); } private TMasterOpRequest buildUpdateLoadDataParams(long tableId, long receiveData) { @@ -281,7 +112,10 @@ public class MasterOpExecutor { groupCommitParams.setUpdateLoadData(true); groupCommitParams.setTableId(tableId); groupCommitParams.setReceiveData(receiveData); + return getMasterOpRequestForGroupCommit(groupCommitParams); + } + private TMasterOpRequest getMasterOpRequestForGroupCommit(TGroupCommitInfo groupCommitParams) { final TMasterOpRequest params = new TMasterOpRequest(); // node ident params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); @@ -294,102 +128,4 @@ public class MasterOpExecutor { return params; } - public ByteBuffer getOutputPacket() { - if (result == null) { - return null; - } - return result.packet; - } - - public TUniqueId getQueryId() { - if (result != null && result.isSetQueryId()) { - return result.getQueryId(); - } else { - return null; - } - } - - public String getProxyStatus() { - if (result == null) { - return QueryState.MysqlStateType.UNKNOWN.name(); - } - if (!result.isSetStatus()) { - return QueryState.MysqlStateType.UNKNOWN.name(); - } else { - return result.getStatus(); - } - } - - public int getProxyStatusCode() { - if (result == null || !result.isSetStatusCode()) { - return ErrorCode.ERR_UNKNOWN_ERROR.getCode(); - } - return result.getStatusCode(); - } - - public String getProxyErrMsg() { - if (result == null) { - return ErrorCode.ERR_UNKNOWN_ERROR.getErrorMsg(); - } - if (!result.isSetErrMessage()) { - return ""; - } - return result.getErrMessage(); - } - - public ShowResultSet getProxyResultSet() { - if (result == null) { - return null; - } - if (result.isSetResultSet()) { - return new ShowResultSet(result.resultSet); - } else { - return null; - } - } - - public List<ByteBuffer> getQueryResultBufList() { - return result.isSetQueryResultBufList() ? result.getQueryResultBufList() : Collections.emptyList(); - } - - public void setResult(TMasterOpResult result) { - this.result = result; - } - - public static class ForwardToMasterException extends RuntimeException { - - private static final Map<Integer, String> TYPE_MSG_MAP = - ImmutableMap.<Integer, String>builder() - .put(TTransportException.UNKNOWN, "Unknown exception") - .put(TTransportException.NOT_OPEN, "Connection is not open") - .put(TTransportException.ALREADY_OPEN, "Connection has already opened up") - .put(TTransportException.TIMED_OUT, - "Connection timeout, please check network state or enlarge session variable:" - + "`query_timeout`/`insert_timeout`") - .put(TTransportException.END_OF_FILE, "EOF") - .put(TTransportException.CORRUPTED_DATA, "Corrupted data") - .build(); - - private final String msg; - - public ForwardToMasterException(String msg, TTransportException exception) { - this.msg = msg + ", cause: " + TYPE_MSG_MAP.get(exception.getType()) + ", " + exception.getMessage(); - } - - @Override - public String getMessage() { - return msg; - } - } - - private Map<String, TExprNode> getForwardUserVariables(Map<String, LiteralExpr> userVariables) { - Map<String, TExprNode> forwardVariables = Maps.newHashMap(); - for (Map.Entry<String, LiteralExpr> entry : userVariables.entrySet()) { - LiteralExpr literalExpr = entry.getValue(); - TExpr tExpr = literalExpr.treeToThrift(); - TExprNode tExprNode = tExpr.nodes.get(0); - forwardVariables.put(entry.getKey(), tExprNode); - } - return forwardVariables; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index baedb5caebd..69db459bbf7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -493,14 +493,14 @@ public class StmtExecutor { if (masterOpExecutor == null) { return MysqlStateType.UNKNOWN.ordinal(); } - return masterOpExecutor.getProxyStatusCode(); + return masterOpExecutor.getStatusCode(); } public String getProxyErrMsg() { if (masterOpExecutor == null) { return MysqlStateType.UNKNOWN.name(); } - return masterOpExecutor.getProxyErrMsg(); + return masterOpExecutor.getErrMsg(); } public boolean isSyncLoadKindStmt() { @@ -1202,8 +1202,6 @@ public class StmtExecutor { /** * get variables in stmt. - * - * @throws DdlException */ private void analyzeVariablesInStmt() throws DdlException { analyzeVariablesInStmt(parsedStmt); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org