This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ce95d325e4 [refactor](fe) Merge MasterOpExecutor and FEOpExecutor 
(#50776)
4ce95d325e4 is described below

commit 4ce95d325e4a60113b45fdf68e8b6d5eb6e5fd5b
Author: Mingyu Chen (Rayner) <morning...@163.com>
AuthorDate: Mon May 12 23:13:38 2025 +0800

    [refactor](fe) Merge MasterOpExecutor and FEOpExecutor (#50776)
    
    ### What problem does this PR solve?
    
    Related PR: #34685
    
    Problem Summary:
    
    This PR #34685 introduced `FEOpExecutor`, which is just a copy of
    `MasterOpExecutor`.
    Both class are used to forwarding request to a specific FE node, and
    there are lots of duplicate code.
    So I refactor the code to let `MasterOpExecutor` extends from
    `FEOpExecutor`.
    
    No logic being changed in this PR.
---
 .../main/java/org/apache/doris/catalog/Env.java    |   4 +-
 .../apache/doris/httpv2/rest/SetConfigAction.java  |   2 +-
 .../java/org/apache/doris/qe/FEOpExecutor.java     | 139 +++++++--
 .../java/org/apache/doris/qe/MasterOpExecutor.java | 324 ++-------------------
 .../java/org/apache/doris/qe/StmtExecutor.java     |   6 +-
 5 files changed, 143 insertions(+), 332 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index ad35b25572c..25a00b0f8da 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -6173,7 +6173,7 @@ public class Env {
      * we can't set callback which is in fe-core to config items which are in 
fe-common. so wrap them here. it's not so
      * good but is best for us now.
      */
-    public void setMutableConfigwithCallback(String key, String value) throws 
ConfigException {
+    public void setMutableConfigWithCallback(String key, String value) throws 
ConfigException {
         ConfigBase.setMutableConfig(key, value);
         if (configtoThreads.get(key) != null) {
             try {
@@ -6193,7 +6193,7 @@ public class Env {
 
         for (Map.Entry<String, String> entry : configs.entrySet()) {
             try {
-                setMutableConfigwithCallback(entry.getKey(), entry.getValue());
+                setMutableConfigWithCallback(entry.getKey(), entry.getValue());
             } catch (ConfigException e) {
                 throw new DdlException(e.getMessage());
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/SetConfigAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/SetConfigAction.java
index d9351ec5978..8d0cc12a235 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/SetConfigAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/SetConfigAction.java
@@ -94,7 +94,7 @@ public class SetConfigAction extends RestBaseController {
             try {
                 if (confValue != null && confValue.length == 1) {
                     try {
-                        
Env.getCurrentEnv().setMutableConfigwithCallback(confKey, confValue[0]);
+                        
Env.getCurrentEnv().setMutableConfigWithCallback(confKey, confValue[0]);
                     } catch (ConfigException e) {
                         throw new DdlException(e.getMessage());
                     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
index 703f0b64e0b..02b106a98be 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
@@ -19,9 +19,11 @@ package org.apache.doris.qe;
 
 import org.apache.doris.analysis.LiteralExpr;
 import org.apache.doris.catalog.Env;
-import org.apache.doris.cloud.qe.ComputeGroupException;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.ErrorCode;
+import org.apache.doris.mysql.MysqlCommand;
 import org.apache.doris.thrift.FrontendService;
 import org.apache.doris.thrift.TExpr;
 import org.apache.doris.thrift.TExprNode;
@@ -39,22 +41,28 @@ import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
 
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
+/**
+ * FEOpExecutor is used to send request to specific FE
+ */
 public class FEOpExecutor {
     private static final Logger LOG = LogManager.getLogger(FEOpExecutor.class);
 
-    private static final float RPC_TIMEOUT_COEFFICIENT = 1.2f;
+    protected static final float RPC_TIMEOUT_COEFFICIENT = 1.2f;
 
-    private final OriginStatement originStmt;
-    private final ConnectContext ctx;
-    private TMasterOpResult result;
-    private TNetworkAddress feAddr;
+    protected final OriginStatement originStmt;
+    protected final ConnectContext ctx;
+    protected TMasterOpResult result;
+    protected TNetworkAddress feAddr;
 
     // the total time of thrift connectTime, readTime and writeTime
-    private int thriftTimeoutMs;
+    protected int thriftTimeoutMs;
 
-    private boolean shouldNotRetry;
+    protected boolean shouldNotRetry;
 
     public FEOpExecutor(TNetworkAddress feAddress, OriginStatement originStmt, 
ConnectContext ctx, boolean isQuery) {
         this.feAddr = feAddress;
@@ -66,7 +74,15 @@ public class FEOpExecutor {
     }
 
     public void execute() throws Exception {
-        result = forward(feAddr, buildStmtForwardParams());
+        result = forward(buildStmtForwardParams());
+        if (ctx.isTxnModel()) {
+            if (result.isSetTxnLoadInfo()) {
+                
ctx.getTxnEntry().setTxnLoadInfoInObserver(result.getTxnLoadInfo());
+            } else {
+                ctx.setTxnEntry(null);
+                LOG.info("set txn entry to null");
+            }
+        }
     }
 
     public void cancel() throws Exception {
@@ -84,22 +100,24 @@ public class FEOpExecutor {
         request.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
         // just make the protocol happy
         request.setSql("");
-        result = forward(feAddr, request);
+        result = forward(request);
     }
 
     // Send request to specific fe
-    private TMasterOpResult forward(TNetworkAddress thriftAddress, 
TMasterOpRequest params) throws Exception {
+    protected TMasterOpResult forward(TMasterOpRequest params) throws 
Exception {
         ctx.getEnv().checkReadyOrThrow();
 
         FrontendService.Client client;
         try {
-            client = ClientPool.frontendPool.borrowObject(thriftAddress, 
thriftTimeoutMs);
+            client = ClientPool.frontendPool.borrowObject(feAddr, 
thriftTimeoutMs);
         } catch (Exception e) {
             // may throw NullPointerException. add err msg
-            throw new Exception("Failed to get fe client: " + 
thriftAddress.toString(), e);
+            throw new Exception("Failed to get master client.", e);
+        }
+        final StringBuilder forwardMsg = new StringBuilder("forward to master 
FE " + feAddr.toString());
+        if (!params.isSyncJournalOnly()) {
+            forwardMsg.append(", statement id: ").append(ctx.getStmtId());
         }
-        final StringBuilder forwardMsg = new StringBuilder("forward to FE " + 
thriftAddress.toString());
-        forwardMsg.append(", statement id: ").append(ctx.getStmtId());
         LOG.info(forwardMsg.toString());
 
         boolean isReturnToPool = false;
@@ -110,7 +128,7 @@ public class FEOpExecutor {
         } catch (TTransportException e) {
             // wrap the raw exception.
             forwardMsg.append(" : failed");
-            Exception exception = new 
ForwardToFEException(forwardMsg.toString(), e);
+            Exception exception = new 
ForwardToMasterException(forwardMsg.toString(), e);
 
             boolean ok = ClientPool.frontendPool.reopen(client, 
thriftTimeoutMs);
             if (!ok) {
@@ -130,14 +148,14 @@ public class FEOpExecutor {
             }
         } finally {
             if (isReturnToPool) {
-                ClientPool.frontendPool.returnObject(thriftAddress, client);
+                ClientPool.frontendPool.returnObject(feAddr, client);
             } else {
-                ClientPool.frontendPool.invalidateObject(thriftAddress, 
client);
+                ClientPool.frontendPool.invalidateObject(feAddr, client);
             }
         }
     }
 
-    private TMasterOpRequest buildStmtForwardParams() {
+    protected TMasterOpRequest buildStmtForwardParams() throws 
AnalysisException {
         TMasterOpRequest params = new TMasterOpRequest();
         // node ident
         params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
@@ -151,22 +169,38 @@ public class FEOpExecutor {
         params.setUserIp(ctx.getRemoteIP());
         params.setStmtId(ctx.getStmtId());
         params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
+        params.setSessionId(ctx.getSessionId());
 
-        String cluster = "";
-        try {
-            ctx.getCloudCluster(false);
-        } catch (ComputeGroupException e) {
-            LOG.warn("failed to get cloud cluster", e);
-        }
-        if (!Strings.isNullOrEmpty(cluster)) {
-            params.setCloudCluster(cluster);
+        if (Config.isCloudMode()) {
+            String cluster = "";
+            try {
+                cluster = ctx.getCloudCluster(false);
+            } catch (Exception e) {
+                LOG.warn("failed to get cloud compute group", e);
+            }
+            if (!Strings.isNullOrEmpty(cluster)) {
+                params.setCloudCluster(cluster);
+            }
         }
+
         // session variables
         
params.setSessionVariables(ctx.getSessionVariable().getForwardVariables());
         params.setUserVariables(getForwardUserVariables(ctx.getUserVars()));
         if (null != ctx.queryId()) {
             params.setQueryId(ctx.queryId());
         }
+
+        // set transaction load info
+        if (ctx.isTxnModel()) {
+            
params.setTxnLoadInfo(ctx.getTxnEntry().getTxnLoadInfoInObserver());
+        }
+
+        if (ctx.getCommand() == MysqlCommand.COM_STMT_EXECUTE) {
+            if (null != ctx.getPrepareExecuteBuffer()) {
+                params.setPrepareExecuteBuffer(ctx.getPrepareExecuteBuffer());
+            }
+        }
+
         return params;
     }
 
@@ -187,6 +221,48 @@ public class FEOpExecutor {
         return result.getErrMessage();
     }
 
+
+    public ByteBuffer getOutputPacket() {
+        if (result == null) {
+            return null;
+        }
+        return result.packet;
+    }
+
+    public TUniqueId getQueryId() {
+        if (result != null && result.isSetQueryId()) {
+            return result.getQueryId();
+        } else {
+            return null;
+        }
+    }
+
+    public String getProxyStatus() {
+        if (result == null) {
+            return QueryState.MysqlStateType.UNKNOWN.name();
+        }
+        if (!result.isSetStatus()) {
+            return QueryState.MysqlStateType.UNKNOWN.name();
+        } else {
+            return result.getStatus();
+        }
+    }
+
+    public ShowResultSet getProxyResultSet() {
+        if (result == null) {
+            return null;
+        }
+        if (result.isSetResultSet()) {
+            return new ShowResultSet(result.resultSet);
+        } else {
+            return null;
+        }
+    }
+
+    public List<ByteBuffer> getQueryResultBufList() {
+        return result.isSetQueryResultBufList() ? 
result.getQueryResultBufList() : Collections.emptyList();
+    }
+
     private Map<String, TExprNode> getForwardUserVariables(Map<String, 
LiteralExpr> userVariables) {
         Map<String, TExprNode> forwardVariables = Maps.newHashMap();
         for (Map.Entry<String, LiteralExpr> entry : userVariables.entrySet()) {
@@ -198,21 +274,22 @@ public class FEOpExecutor {
         return forwardVariables;
     }
 
-    public static class ForwardToFEException extends RuntimeException {
-
+    protected static class ForwardToMasterException extends RuntimeException {
         private static final Map<Integer, String> TYPE_MSG_MAP =
                 ImmutableMap.<Integer, String>builder()
                         .put(TTransportException.UNKNOWN, "Unknown exception")
                         .put(TTransportException.NOT_OPEN, "Connection is not 
open")
                         .put(TTransportException.ALREADY_OPEN, "Connection has 
already opened up")
-                        .put(TTransportException.TIMED_OUT, "Connection 
timeout")
+                        .put(TTransportException.TIMED_OUT,
+                                "Connection timeout, please check network 
state or enlarge session variable:"
+                                        + "`query_timeout`/`insert_timeout`")
                         .put(TTransportException.END_OF_FILE, "EOF")
                         .put(TTransportException.CORRUPTED_DATA, "Corrupted 
data")
                         .build();
 
         private final String msg;
 
-        public ForwardToFEException(String msg, TTransportException exception) 
{
+        public ForwardToMasterException(String msg, TTransportException 
exception) {
             this.msg = msg + ", cause: " + 
TYPE_MSG_MAP.get(exception.getType()) + ", " + exception.getMessage();
         }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
index f78b3c7bc9e..87e16626e82 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
@@ -17,66 +17,33 @@
 
 package org.apache.doris.qe;
 
-import org.apache.doris.analysis.LiteralExpr;
 import org.apache.doris.analysis.RedirectStatus;
 import org.apache.doris.catalog.Env;
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.ClientPool;
-import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
-import org.apache.doris.common.ErrorCode;
-import org.apache.doris.mysql.MysqlCommand;
-import org.apache.doris.thrift.FrontendService;
-import org.apache.doris.thrift.TExpr;
-import org.apache.doris.thrift.TExprNode;
 import org.apache.doris.thrift.TGroupCommitInfo;
 import org.apache.doris.thrift.TMasterOpRequest;
-import org.apache.doris.thrift.TMasterOpResult;
 import org.apache.doris.thrift.TNetworkAddress;
-import org.apache.doris.thrift.TUniqueId;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransportException;
 
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public class MasterOpExecutor {
+/**
+ * MasterOpExecutor is used to send request to Master FE.
+ * It is inherited from FEOpExecutor. The difference is that MasterOpExecutor 
may need to wait the journal being
+ * synced before returning.
+ */
+public class MasterOpExecutor extends FEOpExecutor {
     private static final Logger LOG = 
LogManager.getLogger(MasterOpExecutor.class);
-
-    private static final float RPC_TIMEOUT_COEFFICIENT = 1.2f;
-
-    private final OriginStatement originStmt;
-    private final ConnectContext ctx;
-    private TMasterOpResult result;
-
-    private TNetworkAddress masterAddr;
-
-    private int waitTimeoutMs;
-    // the total time of thrift connectTime add readTime and writeTime
-    private int thriftTimeoutMs;
-
-    private boolean shouldNotRetry;
+    private final int journalWaitTimeoutMs;
 
     public MasterOpExecutor(OriginStatement originStmt, ConnectContext ctx, 
RedirectStatus status, boolean isQuery) {
-        this.originStmt = originStmt;
-        this.ctx = ctx;
+        super(new TNetworkAddress(ctx.getEnv().getMasterHost(), 
ctx.getEnv().getMasterRpcPort()),
+                originStmt, ctx, isQuery);
         if (status.isNeedToWaitJournalSync()) {
-            this.waitTimeoutMs = (int) (ctx.getExecTimeout() * 1000 * 
RPC_TIMEOUT_COEFFICIENT);
+            this.journalWaitTimeoutMs = (int) (ctx.getExecTimeout() * 1000 * 
RPC_TIMEOUT_COEFFICIENT);
         } else {
-            this.waitTimeoutMs = 0;
+            this.journalWaitTimeoutMs = 0;
         }
-        this.thriftTimeoutMs = (int) (ctx.getExecTimeout() * 1000 * 
RPC_TIMEOUT_COEFFICIENT);
-        // if isQuery=false, we shouldn't retry twice when catch exception 
because of Idempotency
-        this.shouldNotRetry = !isQuery;
     }
 
     /**
@@ -86,21 +53,25 @@ public class MasterOpExecutor {
         this(null, ctx, RedirectStatus.FORWARD_WITH_SYNC, true);
     }
 
+    @Override
     public void execute() throws Exception {
-        result = forward(buildStmtForwardParams());
-        if (ctx.isTxnModel()) {
-            if (result.isSetTxnLoadInfo()) {
-                
ctx.getTxnEntry().setTxnLoadInfoInObserver(result.getTxnLoadInfo());
-            } else {
-                ctx.setTxnEntry(null);
-                LOG.info("set txn entry to null");
-            }
-        }
+        super.execute();
         waitOnReplaying();
     }
 
+    @Override
+    public void cancel() throws Exception {
+        super.cancel();
+        waitOnReplaying();
+    }
+
+    private void waitOnReplaying() throws DdlException {
+        LOG.info("forwarding to master get result max journal id: {}", 
result.maxJournalId);
+        ctx.getEnv().getJournalObservable().waitOn(result.maxJournalId, 
journalWaitTimeoutMs);
+    }
+
     public void syncJournal() throws Exception {
-        result = forward(buildSyncJournalParmas());
+        result = forward(buildSyncJournalParams());
         waitOnReplaying();
     }
 
@@ -115,137 +86,7 @@ public class MasterOpExecutor {
         waitOnReplaying();
     }
 
-    public void cancel() throws Exception {
-        TUniqueId queryId = ctx.queryId();
-        if (queryId == null) {
-            return;
-        }
-        Preconditions.checkNotNull(masterAddr, "query with id %s is not 
forwarded to master", queryId);
-        TMasterOpRequest request = new TMasterOpRequest();
-        request.setCancelQeury(true);
-        request.setQueryId(queryId);
-        request.setDb(ctx.getDatabase());
-        request.setUser(ctx.getQualifiedUser());
-        request.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
-        request.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
-        // just make the protocol happy
-        request.setSql("");
-        result = forward(masterAddr, request);
-        waitOnReplaying();
-    }
-
-    private void waitOnReplaying() throws DdlException {
-        LOG.info("forwarding to master get result max journal id: {}", 
result.maxJournalId);
-        ctx.getEnv().getJournalObservable().waitOn(result.maxJournalId, 
waitTimeoutMs);
-    }
-
-    private TMasterOpResult forward(TMasterOpRequest params) throws Exception {
-        String masterHost = ctx.getEnv().getMasterHost();
-        int masterRpcPort = ctx.getEnv().getMasterRpcPort();
-        masterAddr = new TNetworkAddress(masterHost, masterRpcPort);
-        return forward(masterAddr, params);
-    }
-
-    // Send request to Master
-    private TMasterOpResult forward(TNetworkAddress thriftAddress, 
TMasterOpRequest params) throws Exception {
-        ctx.getEnv().checkReadyOrThrow();
-
-        FrontendService.Client client;
-        try {
-            client = ClientPool.frontendPool.borrowObject(thriftAddress, 
thriftTimeoutMs);
-        } catch (Exception e) {
-            // may throw NullPointerException. add err msg
-            throw new Exception("Failed to get master client.", e);
-        }
-        final StringBuilder forwardMsg = new StringBuilder("forward to master 
FE " + thriftAddress.toString());
-        if (!params.isSyncJournalOnly()) {
-            forwardMsg.append(", statement id: ").append(ctx.getStmtId());
-        }
-        LOG.info(forwardMsg.toString());
-
-        boolean isReturnToPool = false;
-        try {
-            final TMasterOpResult result = client.forward(params);
-            isReturnToPool = true;
-            return result;
-        } catch (TTransportException e) {
-            // wrap the raw exception.
-            forwardMsg.append(" : failed");
-            Exception exception = new 
ForwardToMasterException(forwardMsg.toString(), e);
-
-            boolean ok = ClientPool.frontendPool.reopen(client, 
thriftTimeoutMs);
-            if (!ok) {
-                throw exception;
-            }
-            if (shouldNotRetry || e.getType() == 
TTransportException.TIMED_OUT) {
-                throw exception;
-            } else {
-                LOG.warn(forwardMsg.append(" twice").toString(), e);
-                try {
-                    TMasterOpResult result = client.forward(params);
-                    isReturnToPool = true;
-                    return result;
-                } catch (TException ex) {
-                    throw exception;
-                }
-            }
-        } finally {
-            if (isReturnToPool) {
-                ClientPool.frontendPool.returnObject(thriftAddress, client);
-            } else {
-                ClientPool.frontendPool.invalidateObject(thriftAddress, 
client);
-            }
-        }
-    }
-
-    private TMasterOpRequest buildStmtForwardParams() throws AnalysisException 
{
-        TMasterOpRequest params = new TMasterOpRequest();
-        // node ident
-        params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
-        params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
-        params.setSql(originStmt.originStmt);
-        params.setStmtIdx(originStmt.idx);
-        params.setUser(ctx.getQualifiedUser());
-        params.setDefaultCatalog(ctx.getDefaultCatalog());
-        params.setDefaultDatabase(ctx.getDatabase());
-        params.setDb(ctx.getDatabase());
-        params.setUserIp(ctx.getRemoteIP());
-        params.setStmtId(ctx.getStmtId());
-        params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
-        params.setSessionId(ctx.getSessionId());
-
-        if (Config.isCloudMode()) {
-            String cluster = "";
-            try {
-                cluster = ctx.getCloudCluster(false);
-            } catch (Exception e) {
-                LOG.warn("failed to get cloud compute group", e);
-            }
-            if (!Strings.isNullOrEmpty(cluster)) {
-                params.setCloudCluster(cluster);
-            }
-        }
-
-        // session variables
-        
params.setSessionVariables(ctx.getSessionVariable().getForwardVariables());
-        params.setUserVariables(getForwardUserVariables(ctx.getUserVars()));
-        if (null != ctx.queryId()) {
-            params.setQueryId(ctx.queryId());
-        }
-        // set transaction load info
-        if (ctx.isTxnModel()) {
-            
params.setTxnLoadInfo(ctx.getTxnEntry().getTxnLoadInfoInObserver());
-        }
-
-        if (ctx.getCommand() == MysqlCommand.COM_STMT_EXECUTE) {
-            if (null != ctx.getPrepareExecuteBuffer()) {
-                params.setPrepareExecuteBuffer(ctx.getPrepareExecuteBuffer());
-            }
-        }
-        return params;
-    }
-
-    private TMasterOpRequest buildSyncJournalParmas() {
+    private TMasterOpRequest buildSyncJournalParams() {
         final TMasterOpRequest params = new TMasterOpRequest();
         // node ident
         params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
@@ -263,17 +104,7 @@ public class MasterOpExecutor {
         groupCommitParams.setGetGroupCommitLoadBeId(true);
         groupCommitParams.setGroupCommitLoadTableId(tableId);
         groupCommitParams.setCluster(cluster);
-
-        final TMasterOpRequest params = new TMasterOpRequest();
-        // node ident
-        params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
-        params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
-        params.setGroupCommitInfo(groupCommitParams);
-        params.setDb(ctx.getDatabase());
-        params.setUser(ctx.getQualifiedUser());
-        // just make the protocol happy
-        params.setSql("");
-        return params;
+        return getMasterOpRequestForGroupCommit(groupCommitParams);
     }
 
     private TMasterOpRequest buildUpdateLoadDataParams(long tableId, long 
receiveData) {
@@ -281,7 +112,10 @@ public class MasterOpExecutor {
         groupCommitParams.setUpdateLoadData(true);
         groupCommitParams.setTableId(tableId);
         groupCommitParams.setReceiveData(receiveData);
+        return getMasterOpRequestForGroupCommit(groupCommitParams);
+    }
 
+    private TMasterOpRequest getMasterOpRequestForGroupCommit(TGroupCommitInfo 
groupCommitParams) {
         final TMasterOpRequest params = new TMasterOpRequest();
         // node ident
         params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
@@ -294,102 +128,4 @@ public class MasterOpExecutor {
         return params;
     }
 
-    public ByteBuffer getOutputPacket() {
-        if (result == null) {
-            return null;
-        }
-        return result.packet;
-    }
-
-    public TUniqueId getQueryId() {
-        if (result != null && result.isSetQueryId()) {
-            return result.getQueryId();
-        } else {
-            return null;
-        }
-    }
-
-    public String getProxyStatus() {
-        if (result == null) {
-            return QueryState.MysqlStateType.UNKNOWN.name();
-        }
-        if (!result.isSetStatus()) {
-            return QueryState.MysqlStateType.UNKNOWN.name();
-        } else {
-            return result.getStatus();
-        }
-    }
-
-    public int getProxyStatusCode() {
-        if (result == null || !result.isSetStatusCode()) {
-            return ErrorCode.ERR_UNKNOWN_ERROR.getCode();
-        }
-        return result.getStatusCode();
-    }
-
-    public String getProxyErrMsg() {
-        if (result == null) {
-            return ErrorCode.ERR_UNKNOWN_ERROR.getErrorMsg();
-        }
-        if (!result.isSetErrMessage()) {
-            return "";
-        }
-        return result.getErrMessage();
-    }
-
-    public ShowResultSet getProxyResultSet() {
-        if (result == null) {
-            return null;
-        }
-        if (result.isSetResultSet()) {
-            return new ShowResultSet(result.resultSet);
-        } else {
-            return null;
-        }
-    }
-
-    public List<ByteBuffer> getQueryResultBufList() {
-        return result.isSetQueryResultBufList() ? 
result.getQueryResultBufList() : Collections.emptyList();
-    }
-
-    public void setResult(TMasterOpResult result) {
-        this.result = result;
-    }
-
-    public static class ForwardToMasterException extends RuntimeException {
-
-        private static final Map<Integer, String> TYPE_MSG_MAP =
-                ImmutableMap.<Integer, String>builder()
-                        .put(TTransportException.UNKNOWN, "Unknown exception")
-                        .put(TTransportException.NOT_OPEN, "Connection is not 
open")
-                        .put(TTransportException.ALREADY_OPEN, "Connection has 
already opened up")
-                        .put(TTransportException.TIMED_OUT,
-                                "Connection timeout, please check network 
state or enlarge session variable:"
-                                        + "`query_timeout`/`insert_timeout`")
-                        .put(TTransportException.END_OF_FILE, "EOF")
-                        .put(TTransportException.CORRUPTED_DATA, "Corrupted 
data")
-                        .build();
-
-        private final String msg;
-
-        public ForwardToMasterException(String msg, TTransportException 
exception) {
-            this.msg = msg + ", cause: " + 
TYPE_MSG_MAP.get(exception.getType()) + ", " + exception.getMessage();
-        }
-
-        @Override
-        public String getMessage() {
-            return msg;
-        }
-    }
-
-    private Map<String, TExprNode> getForwardUserVariables(Map<String, 
LiteralExpr> userVariables) {
-        Map<String, TExprNode> forwardVariables = Maps.newHashMap();
-        for (Map.Entry<String, LiteralExpr> entry : userVariables.entrySet()) {
-            LiteralExpr literalExpr = entry.getValue();
-            TExpr tExpr = literalExpr.treeToThrift();
-            TExprNode tExprNode = tExpr.nodes.get(0);
-            forwardVariables.put(entry.getKey(), tExprNode);
-        }
-        return forwardVariables;
-    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index baedb5caebd..69db459bbf7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -493,14 +493,14 @@ public class StmtExecutor {
         if (masterOpExecutor == null) {
             return MysqlStateType.UNKNOWN.ordinal();
         }
-        return masterOpExecutor.getProxyStatusCode();
+        return masterOpExecutor.getStatusCode();
     }
 
     public String getProxyErrMsg() {
         if (masterOpExecutor == null) {
             return MysqlStateType.UNKNOWN.name();
         }
-        return masterOpExecutor.getProxyErrMsg();
+        return masterOpExecutor.getErrMsg();
     }
 
     public boolean isSyncLoadKindStmt() {
@@ -1202,8 +1202,6 @@ public class StmtExecutor {
 
     /**
      * get variables in stmt.
-     *
-     * @throws DdlException
      */
     private void analyzeVariablesInStmt() throws DdlException {
         analyzeVariablesInStmt(parsedStmt);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to