This is an automated email from the ASF dual-hosted git repository.

wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fbe3dc  Add DriverExecutor (#12681)
0fbe3dc is described below

commit 0fbe3dcfb3c963968d7c3f122dcfea2dcedbfb4d
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Sep 24 15:41:38 2021 +0800

    Add DriverExecutor (#12681)
    
    * Refactor AbstractStatementAdapter
    
    * Add DriverExecutor
    
    * Refactor ProxySQLExecutor
---
 .../sql/federate/execute/FederationExecutor.java   |   8 +-
 .../driver/executor/DriverExecutor.java            |  59 +++++++++
 .../jdbc/adapter/AbstractStatementAdapter.java     | 140 ++++++++++-----------
 .../statement/ShardingSpherePreparedStatement.java |  37 +++---
 .../core/statement/ShardingSphereStatement.java    |  45 +++----
 .../statement/CircuitBreakerPreparedStatement.java |   6 +-
 .../backend/communication/ProxySQLExecutor.java    |   2 +-
 .../jdbc/connection/BackendConnection.java         |   6 +-
 .../frontend/command/CommandExecutorTask.java      |   2 +-
 .../netty/FrontendChannelInboundHandler.java       |   2 +-
 .../frontend/command/CommandExecutorTaskTest.java  |   8 +-
 11 files changed, 169 insertions(+), 146 deletions(-)

diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederationExecutor.java
 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederationExecutor.java
index 053efd5..6511af8 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederationExecutor.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederationExecutor.java
@@ -32,7 +32,7 @@ import java.util.List;
 /**
  * Federation executor.
  */
-public interface FederationExecutor {
+public interface FederationExecutor extends AutoCloseable {
     
     /**
      * Execute query.
@@ -54,10 +54,6 @@ public interface FederationExecutor {
      */
     ResultSet getResultSet() throws SQLException;
     
-    /**
-     * Close.
-     *
-     * @throws SQLException SQL exception
-     */
+    @Override
     void close() throws SQLException;
 }
diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
new file mode 100644
index 0000000..67e88a9
--- /dev/null
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.executor;
+
+import lombok.Getter;
+import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
+import 
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederationExecutor;
+import 
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederationExecutorFactory;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+
+import java.sql.SQLException;
+
+/**
+ * Driver executor.
+ */
+@Getter
+public final class DriverExecutor implements AutoCloseable {
+    
+    private final DriverJDBCExecutor regularExecutor;
+    
+    private final RawExecutor rawExecutor;
+    
+    private final FederationExecutor federationExecutor;
+    
+    public DriverExecutor(final ShardingSphereConnection connection) {
+        MetaDataContexts metaDataContexts = 
connection.getContextManager().getMetaDataContexts();
+        JDBCExecutor jdbcExecutor = new 
JDBCExecutor(metaDataContexts.getExecutorEngine(), 
connection.isHoldTransaction());
+        regularExecutor = new DriverJDBCExecutor(connection.getSchemaName(), 
metaDataContexts, jdbcExecutor);
+        rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(), 
connection.isHoldTransaction(), metaDataContexts.getProps());
+        federationExecutor = 
FederationExecutorFactory.newInstance(connection.getSchemaName(), 
metaDataContexts.getOptimizerContext(), metaDataContexts.getProps(), 
jdbcExecutor);
+    }
+    
+    /**
+     * Close.
+     *
+     * @throws SQLException SQL exception
+     */
+    @Override
+    public void close() throws SQLException {
+        federationExecutor.close();
+    }
+}
diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
index b50c171..14a0090 100644
--- 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
@@ -17,10 +17,11 @@
 
 package org.apache.shardingsphere.driver.jdbc.adapter;
 
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.driver.executor.DriverExecutor;
 import 
org.apache.shardingsphere.driver.jdbc.adapter.executor.ForceExecuteTemplate;
 import 
org.apache.shardingsphere.driver.jdbc.unsupported.AbstractUnsupportedOperationStatement;
-import 
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederationExecutor;
 
 import java.sql.SQLException;
 import java.sql.SQLWarning;
@@ -35,82 +36,86 @@ public abstract class AbstractStatementAdapter extends 
AbstractUnsupportedOperat
     
     private final Class<? extends Statement> targetClass;
     
-    private boolean closed;
+    private final ForceExecuteTemplate<Statement> forceExecuteTemplate = new 
ForceExecuteTemplate<>();
     
+    @Getter
     private boolean poolable;
     
+    @Getter
     private int fetchSize;
     
+    @Getter
     private int fetchDirection;
     
-    private final ForceExecuteTemplate<Statement> forceExecuteTemplate = new 
ForceExecuteTemplate<>();
+    @Getter
+    private boolean closed;
     
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
-    public final void close() throws SQLException {
-        closed = true;
-        try {
-            forceExecuteTemplate.execute((Collection) getRoutedStatements(), 
Statement::close);
-            getFederationExecutor().close();
-        } finally {
-            getRoutedStatements().clear();
-        }
+    public final void setPoolable(final boolean poolable) throws SQLException {
+        this.poolable = poolable;
+        recordMethodInvocation(targetClass, "setPoolable", new Class[] 
{boolean.class}, new Object[] {poolable});
+        forceExecuteTemplate.execute((Collection) getRoutedStatements(), 
statement -> statement.setPoolable(poolable));
     }
     
+    @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
-    public final boolean isClosed() {
-        return closed;
+    public final void setFetchSize(final int rows) throws SQLException {
+        fetchSize = rows;
+        recordMethodInvocation(targetClass, "setFetchSize", new Class[] 
{int.class}, new Object[] {rows});
+        forceExecuteTemplate.execute((Collection) getRoutedStatements(), 
statement -> statement.setFetchSize(rows));
     }
     
+    @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
-    public final boolean isPoolable() {
-        return poolable;
+    public final void setFetchDirection(final int direction) throws 
SQLException {
+        fetchDirection = direction;
+        recordMethodInvocation(targetClass, "setFetchDirection", new Class[] 
{int.class}, new Object[] {direction});
+        forceExecuteTemplate.execute((Collection) getRoutedStatements(), 
statement -> statement.setFetchDirection(direction));
     }
     
-    @SuppressWarnings("unchecked")
     @Override
-    public final void setPoolable(final boolean poolable) throws SQLException {
-        this.poolable = poolable;
-        recordMethodInvocation(targetClass, "setPoolable", new Class[] 
{boolean.class}, new Object[] {poolable});
-        forceExecuteTemplate.execute((Collection) getRoutedStatements(), 
statement -> statement.setPoolable(poolable));
+    public final int getMaxFieldSize() throws SQLException {
+        return getRoutedStatements().isEmpty() ? 0 : 
getRoutedStatements().iterator().next().getMaxFieldSize();
     }
     
+    @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
-    public final int getFetchSize() {
-        return fetchSize;
+    public final void setMaxFieldSize(final int max) throws SQLException {
+        recordMethodInvocation(targetClass, "setMaxFieldSize", new Class[] 
{int.class}, new Object[] {max});
+        forceExecuteTemplate.execute((Collection) getRoutedStatements(), 
statement -> statement.setMaxFieldSize(max));
     }
     
-    @SuppressWarnings("unchecked")
+    // TODO Confirm MaxRows for multiple databases is need special handle. eg: 
10 statements maybe MaxRows / 10
     @Override
-    public final void setFetchSize(final int rows) throws SQLException {
-        fetchSize = rows;
-        recordMethodInvocation(targetClass, "setFetchSize", new Class[] 
{int.class}, new Object[] {rows});
-        forceExecuteTemplate.execute((Collection) getRoutedStatements(), 
statement -> statement.setFetchSize(rows));
+    public final int getMaxRows() throws SQLException {
+        return getRoutedStatements().isEmpty() ? -1 : 
getRoutedStatements().iterator().next().getMaxRows();
     }
     
+    @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
-    public int getFetchDirection() {
-        return fetchDirection;
+    public final void setMaxRows(final int max) throws SQLException {
+        recordMethodInvocation(targetClass, "setMaxRows", new Class[] 
{int.class}, new Object[] {max});
+        forceExecuteTemplate.execute((Collection) getRoutedStatements(), 
statement -> statement.setMaxRows(max));
     }
     
     @Override
-    public void setFetchDirection(final int direction) throws SQLException {
-        fetchDirection = direction;
-        recordMethodInvocation(targetClass, "setFetchDirection", new Class[] 
{int.class}, new Object[] {direction});
-        forceExecuteTemplate.execute((Collection) getRoutedStatements(), 
statement -> statement.setFetchDirection(direction));
+    public final int getQueryTimeout() throws SQLException {
+        return getRoutedStatements().isEmpty() ? 0 : 
getRoutedStatements().iterator().next().getQueryTimeout();
     }
     
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
-    public final void setEscapeProcessing(final boolean enable) throws 
SQLException {
-        recordMethodInvocation(targetClass, "setEscapeProcessing", new Class[] 
{boolean.class}, new Object[] {enable});
-        forceExecuteTemplate.execute((Collection) getRoutedStatements(), 
statement -> statement.setEscapeProcessing(enable));
+    public final void setQueryTimeout(final int seconds) throws SQLException {
+        recordMethodInvocation(targetClass, "setQueryTimeout", new Class[] 
{int.class}, new Object[] {seconds});
+        forceExecuteTemplate.execute((Collection) getRoutedStatements(), 
statement -> statement.setQueryTimeout(seconds));
     }
     
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
-    public final void cancel() throws SQLException {
-        forceExecuteTemplate.execute((Collection) getRoutedStatements(), 
Statement::cancel);
+    public final void setEscapeProcessing(final boolean enable) throws 
SQLException {
+        recordMethodInvocation(targetClass, "setEscapeProcessing", new Class[] 
{boolean.class}, new Object[] {enable});
+        forceExecuteTemplate.execute((Collection) getRoutedStatements(), 
statement -> statement.setEscapeProcessing(enable));
     }
     
     @Override
@@ -142,15 +147,6 @@ public abstract class AbstractStatementAdapter extends 
AbstractUnsupportedOperat
     }
     
     @Override
-    public final SQLWarning getWarnings() {
-        return null;
-    }
-    
-    @Override
-    public final void clearWarnings() {
-    }
-    
-    @Override
     public final boolean getMoreResults() throws SQLException {
         boolean result = false;
         for (Statement each : getRoutedStatements()) {
@@ -165,45 +161,37 @@ public abstract class AbstractStatementAdapter extends 
AbstractUnsupportedOperat
     }
     
     @Override
-    public final int getMaxFieldSize() throws SQLException {
-        return getRoutedStatements().isEmpty() ? 0 : 
getRoutedStatements().iterator().next().getMaxFieldSize();
-    }
-    
-    @SuppressWarnings("unchecked")
-    @Override
-    public final void setMaxFieldSize(final int max) throws SQLException {
-        recordMethodInvocation(targetClass, "setMaxFieldSize", new Class[] 
{int.class}, new Object[] {max});
-        forceExecuteTemplate.execute((Collection) getRoutedStatements(), 
statement -> statement.setMaxFieldSize(max));
-    }
-    
-    // TODO Confirm MaxRows for multiple databases is need special handle. eg: 
10 statements maybe MaxRows / 10
-    @Override
-    public final int getMaxRows() throws SQLException {
-        return getRoutedStatements().isEmpty() ? -1 : 
getRoutedStatements().iterator().next().getMaxRows();
+    public final SQLWarning getWarnings() {
+        return null;
     }
     
-    @SuppressWarnings("unchecked")
     @Override
-    public final void setMaxRows(final int max) throws SQLException {
-        recordMethodInvocation(targetClass, "setMaxRows", new Class[] 
{int.class}, new Object[] {max});
-        forceExecuteTemplate.execute((Collection) getRoutedStatements(), 
statement -> statement.setMaxRows(max));
+    public final void clearWarnings() {
     }
     
+    @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
-    public final int getQueryTimeout() throws SQLException {
-        return getRoutedStatements().isEmpty() ? 0 : 
getRoutedStatements().iterator().next().getQueryTimeout();
+    public final void cancel() throws SQLException {
+        forceExecuteTemplate.execute((Collection) getRoutedStatements(), 
Statement::cancel);
     }
     
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
-    public final void setQueryTimeout(final int seconds) throws SQLException {
-        recordMethodInvocation(targetClass, "setQueryTimeout", new Class[] 
{int.class}, new Object[] {seconds});
-        forceExecuteTemplate.execute((Collection) getRoutedStatements(), 
statement -> statement.setQueryTimeout(seconds));
+    public final void close() throws SQLException {
+        closed = true;
+        try {
+            forceExecuteTemplate.execute((Collection) getRoutedStatements(), 
Statement::close);
+            if (null != getExecutor()) {
+                getExecutor().close();
+            }
+        } finally {
+            getRoutedStatements().clear();
+        }
     }
     
     protected abstract boolean isAccumulate();
     
     protected abstract Collection<? extends Statement> getRoutedStatements();
     
-    protected abstract FederationExecutor getFederationExecutor();
+    protected abstract DriverExecutor getExecutor();
 }
diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index c468d86..195b60b 100644
--- 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.driver.jdbc.core.statement;
 import com.google.common.base.Strings;
 import lombok.AccessLevel;
 import lombok.Getter;
-import org.apache.shardingsphere.driver.executor.DriverJDBCExecutor;
+import org.apache.shardingsphere.driver.executor.DriverExecutor;
 import org.apache.shardingsphere.driver.executor.batch.BatchExecutionUnit;
 import 
org.apache.shardingsphere.driver.executor.batch.BatchPreparedStatementExecutor;
 import 
org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback;
@@ -50,15 +50,12 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorEx
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
-import 
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederationExecutor;
-import 
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederationExecutorFactory;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
@@ -112,12 +109,8 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
     @Getter
     private final ParameterMetaData parameterMetaData;
     
-    private final DriverJDBCExecutor driverJDBCExecutor;
-    
-    private final RawExecutor rawExecutor;
-    
     @Getter(AccessLevel.PROTECTED)
-    private final FederationExecutor federationExecutor;
+    private final DriverExecutor executor;
     
     private final BatchPreparedStatementExecutor 
batchPreparedStatementExecutor;
     
@@ -164,10 +157,8 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
         sqlStatement = sqlParserEngine.parse(sql, true);
         parameterMetaData = new ShardingSphereParameterMetaData(sqlStatement);
         statementOption = returnGeneratedKeys ? new StatementOption(true) : 
new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability);
+        executor = new DriverExecutor(connection);
         JDBCExecutor jdbcExecutor = new 
JDBCExecutor(metaDataContexts.getExecutorEngine(), 
connection.isHoldTransaction());
-        driverJDBCExecutor = new 
DriverJDBCExecutor(connection.getSchemaName(), metaDataContexts, jdbcExecutor);
-        rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(), 
connection.isHoldTransaction(), metaDataContexts.getProps());
-        federationExecutor = 
FederationExecutorFactory.newInstance(connection.getSchemaName(), 
metaDataContexts.getOptimizerContext(), metaDataContexts.getProps(), 
jdbcExecutor);
         batchPreparedStatementExecutor = new 
BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor, 
connection.getSchemaName());
         kernelProcessor = new KernelProcessor();
         statementsCacheable = 
isStatementsCacheable(metaDataContexts.getMetaData(connection.getSchemaName()).getRuleMetaData().getConfigurations());
@@ -189,7 +180,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
             executionContext = createExecutionContext();
             List<QueryResult> queryResults = executeQuery0();
             MergedResult mergedResult = mergeQuery(queryResults);
-            return new 
ShardingSphereResultSet(getResultSetsForShardingSphereResultSet(), 
mergedResult, this, executionContext);
+            return new ShardingSphereResultSet(getShardingSphereResultSet(), 
mergedResult, this, executionContext);
         } finally {
             clearBatch();
         }
@@ -201,16 +192,16 @@ public final class ShardingSpherePreparedStatement 
extends AbstractPreparedState
         replaySetParameter();
     }
     
-    private List<ResultSet> getResultSetsForShardingSphereResultSet() throws 
SQLException {
+    private List<ResultSet> getShardingSphereResultSet() throws SQLException {
         if (executionContext.getRouteContext().isFederated()) {
-            return 
Collections.singletonList(federationExecutor.getResultSet());
+            return 
Collections.singletonList(executor.getFederationExecutor().getResultSet());
         }
         return 
statements.stream().map(this::getResultSet).collect(Collectors.toList());
     }
     
     private List<QueryResult> executeQuery0() throws SQLException {
         if 
(metaDataContexts.getMetaData(connection.getSchemaName()).getRuleMetaData().getRules().stream().anyMatch(each
 -> each instanceof RawExecutionRule)) {
-            return rawExecutor.execute(createRawExecutionGroupContext(), 
executionContext.getLogicSQL(),
+            return 
executor.getRawExecutor().execute(createRawExecutionGroupContext(), 
executionContext.getLogicSQL(),
                     new RawSQLExecutorCallback()).stream().map(each -> 
(QueryResult) each).collect(Collectors.toList());
         }
         if (executionContext.getRouteContext().isFederated()) {
@@ -218,7 +209,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
         }
         ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
createExecutionGroupContext();
         cacheStatements(executionGroupContext.getInputGroups());
-        return driverJDBCExecutor.executeQuery(executionGroupContext, 
executionContext.getLogicSQL(),
+        return 
executor.getRegularExecutor().executeQuery(executionGroupContext, 
executionContext.getLogicSQL(),
                 new 
PreparedStatementExecuteQueryCallback(metaDataContexts.getMetaData(connection.getSchemaName()).getResource().getDatabaseType(),
 sqlStatement,
                         SQLExecutorExceptionHandler.isExceptionThrown()));
     }
@@ -229,7 +220,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
         }
         PreparedStatementExecuteQueryCallback callback = new 
PreparedStatementExecuteQueryCallback(metaDataContexts.getMetaData(connection.getSchemaName()).getResource().getDatabaseType(),
                  sqlStatement, 
SQLExecutorExceptionHandler.isExceptionThrown());
-        return 
federationExecutor.executeQuery(createDriverExecutionPrepareEngine(), callback, 
executionContext);
+        return 
executor.getFederationExecutor().executeQuery(createDriverExecutionPrepareEngine(),
 callback, executionContext);
     }
     
     private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
createDriverExecutionPrepareEngine() {
@@ -248,12 +239,12 @@ public final class ShardingSpherePreparedStatement 
extends AbstractPreparedState
             clearPrevious();
             executionContext = createExecutionContext();
             if 
(metaDataContexts.getMetaData(connection.getSchemaName()).getRuleMetaData().getRules().stream().anyMatch(each
 -> each instanceof RawExecutionRule)) {
-                Collection<ExecuteResult> executeResults = 
rawExecutor.execute(createRawExecutionGroupContext(), 
executionContext.getLogicSQL(), new RawSQLExecutorCallback());
+                Collection<ExecuteResult> executeResults = 
executor.getRawExecutor().execute(createRawExecutionGroupContext(), 
executionContext.getLogicSQL(), new RawSQLExecutorCallback());
                 return accumulate(executeResults);
             }
             ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
createExecutionGroupContext();
             cacheStatements(executionGroupContext.getInputGroups());
-            return driverJDBCExecutor.executeUpdate(executionGroupContext,
+            return 
executor.getRegularExecutor().executeUpdate(executionGroupContext,
                     executionContext.getLogicSQL(), 
executionContext.getRouteContext().getRouteUnits(), 
createExecuteUpdateCallback());
         } finally {
             clearBatch();
@@ -295,7 +286,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
             executionContext = createExecutionContext();
             if 
(metaDataContexts.getMetaData(connection.getSchemaName()).getRuleMetaData().getRules().stream().anyMatch(each
 -> each instanceof RawExecutionRule)) {
                 // TODO process getStatement
-                Collection<ExecuteResult> executeResults = 
rawExecutor.execute(createRawExecutionGroupContext(), 
executionContext.getLogicSQL(), new RawSQLExecutorCallback());
+                Collection<ExecuteResult> executeResults = 
executor.getRawExecutor().execute(createRawExecutionGroupContext(), 
executionContext.getLogicSQL(), new RawSQLExecutorCallback());
                 return executeResults.iterator().next() instanceof QueryResult;
             }
             if (executionContext.getRouteContext().isFederated()) {
@@ -304,7 +295,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
             }
             ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
createExecutionGroupContext();
             cacheStatements(executionGroupContext.getInputGroups());
-            return driverJDBCExecutor.execute(executionGroupContext,
+            return executor.getRegularExecutor().execute(executionGroupContext,
                     executionContext.getLogicSQL(), 
executionContext.getRouteContext().getRouteUnits(), createExecuteCallback());
         } finally {
             clearBatch();
@@ -365,7 +356,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
             result.add(each.getResultSet());
         }
         if (executionContext.getRouteContext().isFederated()) {
-            result.add(federationExecutor.getResultSet());
+            result.add(executor.getFederationExecutor().getResultSet());
         }
         return result;
     }
diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index d84e7a7..62a694d 100644
--- 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.driver.jdbc.core.statement;
 import com.google.common.base.Strings;
 import lombok.AccessLevel;
 import lombok.Getter;
-import org.apache.shardingsphere.driver.executor.DriverJDBCExecutor;
+import org.apache.shardingsphere.driver.executor.DriverExecutor;
 import org.apache.shardingsphere.driver.executor.callback.ExecuteCallback;
 import 
org.apache.shardingsphere.driver.executor.callback.ExecuteUpdateCallback;
 import 
org.apache.shardingsphere.driver.executor.callback.impl.StatementExecuteQueryCallback;
@@ -47,17 +47,13 @@ import 
org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
-import 
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederationExecutor;
-import 
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederationExecutorFactory;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
@@ -99,12 +95,8 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
     
     private final StatementOption statementOption;
     
-    private final DriverJDBCExecutor driverJDBCExecutor;
-    
-    private final RawExecutor rawExecutor;
-    
     @Getter(AccessLevel.PROTECTED)
-    private final FederationExecutor federationExecutor;
+    private final DriverExecutor executor;
     
     private final KernelProcessor kernelProcessor;
     
@@ -128,10 +120,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         metaDataContexts = 
connection.getContextManager().getMetaDataContexts();
         statements = new LinkedList<>();
         statementOption = new StatementOption(resultSetType, 
resultSetConcurrency, resultSetHoldability);
-        JDBCExecutor jdbcExecutor = new 
JDBCExecutor(metaDataContexts.getExecutorEngine(), 
connection.isHoldTransaction());
-        driverJDBCExecutor = new 
DriverJDBCExecutor(connection.getSchemaName(), metaDataContexts, jdbcExecutor);
-        rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(), 
connection.isHoldTransaction(), metaDataContexts.getProps());
-        federationExecutor = 
FederationExecutorFactory.newInstance(connection.getSchemaName(), 
metaDataContexts.getOptimizerContext(), metaDataContexts.getProps(), 
jdbcExecutor);
+        executor = new DriverExecutor(connection);
         kernelProcessor = new KernelProcessor();
     }
     
@@ -156,12 +145,12 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
     
     private List<ResultSet> getShardingSphereResultSets() throws SQLException {
         return executionContext.getRouteContext().isFederated()
-                ? Collections.singletonList(federationExecutor.getResultSet()) 
: statements.stream().map(this::getResultSet).collect(Collectors.toList());
+                ? 
Collections.singletonList(executor.getFederationExecutor().getResultSet()) : 
statements.stream().map(this::getResultSet).collect(Collectors.toList());
     }
     
     private List<QueryResult> executeQuery0() throws SQLException {
         if 
(metaDataContexts.getMetaData(connection.getSchemaName()).getRuleMetaData().getRules().stream().anyMatch(each
 -> each instanceof RawExecutionRule)) {
-            return rawExecutor.execute(createRawExecutionContext(), 
executionContext.getLogicSQL(),
+            return 
executor.getRawExecutor().execute(createRawExecutionContext(), 
executionContext.getLogicSQL(),
                     new RawSQLExecutorCallback()).stream().map(each -> 
(QueryResult) each).collect(Collectors.toList());
         }
         if (executionContext.getRouteContext().isFederated()) {
@@ -171,7 +160,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         cacheStatements(executionGroupContext.getInputGroups());
         StatementExecuteQueryCallback callback = new 
StatementExecuteQueryCallback(metaDataContexts.getMetaData(connection.getSchemaName()).getResource().getDatabaseType(),
                 executionContext.getSqlStatementContext().getSqlStatement(), 
SQLExecutorExceptionHandler.isExceptionThrown());
-        return driverJDBCExecutor.executeQuery(executionGroupContext, 
executionContext.getLogicSQL(), callback);
+        return 
executor.getRegularExecutor().executeQuery(executionGroupContext, 
executionContext.getLogicSQL(), callback);
     }
     
     private List<QueryResult> executeFederationQuery() throws SQLException {
@@ -180,7 +169,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         }
         StatementExecuteQueryCallback callback = new 
StatementExecuteQueryCallback(metaDataContexts.getMetaData(connection.getSchemaName()).getResource().getDatabaseType(),
                 executionContext.getSqlStatementContext().getSqlStatement(), 
SQLExecutorExceptionHandler.isExceptionThrown());
-        return 
federationExecutor.executeQuery(createDriverExecutionPrepareEngine(), callback, 
executionContext);
+        return 
executor.getFederationExecutor().executeQuery(createDriverExecutionPrepareEngine(),
 callback, executionContext);
     }
     
     private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
createDriverExecutionPrepareEngine() {
@@ -194,7 +183,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         try {
             executionContext = createExecutionContext(sql);
             if 
(metaDataContexts.getMetaData(connection.getSchemaName()).getRuleMetaData().getRules().stream().anyMatch(each
 -> each instanceof RawExecutionRule)) {
-                return 
accumulate(rawExecutor.execute(createRawExecutionContext(), 
executionContext.getLogicSQL(), new RawSQLExecutorCallback()));
+                return 
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(), 
executionContext.getLogicSQL(), new RawSQLExecutorCallback()));
             }
             ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
createExecutionContext();
             cacheStatements(executionGroupContext.getInputGroups());
@@ -213,7 +202,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         try {
             executionContext = createExecutionContext(sql);
             if 
(metaDataContexts.getMetaData(connection.getSchemaName()).getRuleMetaData().getRules().stream().anyMatch(each
 -> each instanceof RawExecutionRule)) {
-                return 
accumulate(rawExecutor.execute(createRawExecutionContext(), 
executionContext.getLogicSQL(), new RawSQLExecutorCallback()));
+                return 
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(), 
executionContext.getLogicSQL(), new RawSQLExecutorCallback()));
             }
             ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
createExecutionContext();
             cacheStatements(executionGroupContext.getInputGroups());
@@ -230,7 +219,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         try {
             executionContext = createExecutionContext(sql);
             if 
(metaDataContexts.getMetaData(connection.getSchemaName()).getRuleMetaData().getRules().stream().anyMatch(each
 -> each instanceof RawExecutionRule)) {
-                return 
accumulate(rawExecutor.execute(createRawExecutionContext(), 
executionContext.getLogicSQL(), new RawSQLExecutorCallback()));
+                return 
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(), 
executionContext.getLogicSQL(), new RawSQLExecutorCallback()));
             }
             ExecutionGroupContext<JDBCExecutionUnit> executionGroups = 
createExecutionContext();
             cacheStatements(executionGroups.getInputGroups());
@@ -247,7 +236,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         try {
             executionContext = createExecutionContext(sql);
             if 
(metaDataContexts.getMetaData(connection.getSchemaName()).getRuleMetaData().getRules().stream().anyMatch(each
 -> each instanceof RawExecutionRule)) {
-                return 
accumulate(rawExecutor.execute(createRawExecutionContext(), 
executionContext.getLogicSQL(), new RawSQLExecutorCallback()));
+                return 
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(), 
executionContext.getLogicSQL(), new RawSQLExecutorCallback()));
             }
             ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
createExecutionContext();
             cacheStatements(executionGroupContext.getInputGroups());
@@ -274,7 +263,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
                 return Optional.empty();
             }
         };
-        return driverJDBCExecutor.executeUpdate(executionGroupContext, 
executionContext.getLogicSQL(), routeUnits, callback);
+        return 
executor.getRegularExecutor().executeUpdate(executionGroupContext, 
executionContext.getLogicSQL(), routeUnits, callback);
     }
     
     private int accumulate(final Collection<ExecuteResult> results) {
@@ -310,7 +299,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         return execute0(sql, (actualSQL, statement) -> 
statement.execute(actualSQL, columnNames));
     }
     
-    private boolean execute(final ExecutionGroupContext<JDBCExecutionUnit> 
executionGroupContext, final ExecuteCallback executor,
+    private boolean execute(final ExecutionGroupContext<JDBCExecutionUnit> 
executionGroupContext, final ExecuteCallback executeCallback,
                             final SQLStatement sqlStatement, final 
Collection<RouteUnit> routeUnits) throws SQLException {
         boolean isExceptionThrown = 
SQLExecutorExceptionHandler.isExceptionThrown();
         JDBCExecutorCallback<Boolean> jdbcExecutorCallback = new 
JDBCExecutorCallback<Boolean>(
@@ -318,7 +307,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
             
             @Override
             protected Boolean executeSQL(final String sql, final Statement 
statement, final ConnectionMode connectionMode) throws SQLException {
-                return executor.execute(sql, statement);
+                return executeCallback.execute(sql, statement);
             }
             
             @Override
@@ -326,7 +315,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
                 return Optional.empty();
             }
         };
-        return driverJDBCExecutor.execute(executionGroupContext, 
executionContext.getLogicSQL(), routeUnits, jdbcExecutorCallback);
+        return executor.getRegularExecutor().execute(executionGroupContext, 
executionContext.getLogicSQL(), routeUnits, jdbcExecutorCallback);
     }
     
     private boolean execute0(final String sql, final ExecuteCallback callback) 
throws SQLException {
@@ -334,7 +323,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
             executionContext = createExecutionContext(sql);
             if 
(metaDataContexts.getMetaData(connection.getSchemaName()).getRuleMetaData().getRules().stream().anyMatch(each
 -> each instanceof RawExecutionRule)) {
                 // TODO process getStatement
-                Collection<ExecuteResult> results = 
rawExecutor.execute(createRawExecutionContext(), 
executionContext.getLogicSQL(), new RawSQLExecutorCallback());
+                Collection<ExecuteResult> results = 
executor.getRawExecutor().execute(createRawExecutionContext(), 
executionContext.getLogicSQL(), new RawSQLExecutorCallback());
                 return results.iterator().next() instanceof QueryResult;
             }
             if (executionContext.getRouteContext().isFederated()) {
@@ -422,7 +411,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
             result.add(each.getResultSet());
         }
         if (executionContext.getRouteContext().isFederated()) {
-            result.add(federationExecutor.getResultSet());
+            result.add(executor.getFederationExecutor().getResultSet());
         }
         return result;
     }
diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
index 2be17b5..86e5ee7 100644
--- 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
@@ -18,10 +18,10 @@
 package org.apache.shardingsphere.driver.state.circuit.statement;
 
 import lombok.Getter;
+import org.apache.shardingsphere.driver.executor.DriverExecutor;
+import 
org.apache.shardingsphere.driver.jdbc.unsupported.AbstractUnsupportedOperationPreparedStatement;
 import 
org.apache.shardingsphere.driver.state.circuit.connection.CircuitBreakerConnection;
 import 
org.apache.shardingsphere.driver.state.circuit.resultset.CircuitBreakerResultSet;
-import 
org.apache.shardingsphere.driver.jdbc.unsupported.AbstractUnsupportedOperationPreparedStatement;
-import 
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederationExecutor;
 
 import java.io.InputStream;
 import java.io.Reader;
@@ -276,7 +276,7 @@ public final class CircuitBreakerPreparedStatement extends 
AbstractUnsupportedOp
     }
     
     @Override
-    protected FederationExecutor getFederationExecutor() {
+    protected DriverExecutor getExecutor() {
         return null;
     }
     
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index e738321..083737d 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -82,8 +82,8 @@ public final class ProxySQLExecutor {
         this.databaseCommunicationEngine = databaseCommunicationEngine;
         ExecutorEngine executorEngine = 
BackendExecutorContext.getInstance().getExecutorEngine();
         boolean isSerialExecute = backendConnection.isSerialExecute();
-        jdbcExecutor = new ProxyJDBCExecutor(type, backendConnection, 
databaseCommunicationEngine, new JDBCExecutor(executorEngine, isSerialExecute));
         MetaDataContexts metaDataContexts = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
+        jdbcExecutor = new ProxyJDBCExecutor(type, backendConnection, 
databaseCommunicationEngine, new JDBCExecutor(executorEngine, isSerialExecute));
         rawExecutor = new RawExecutor(executorEngine, isSerialExecute, 
metaDataContexts.getProps());
         federationExecutor = FederationExecutorFactory.newInstance(
                 backendConnection.getSchemaName(), 
metaDataContexts.getOptimizerContext(), metaDataContexts.getProps(), new 
JDBCExecutor(executorEngine, isSerialExecute));
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
index fffa777..b118dad 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
@@ -314,11 +314,11 @@ public final class BackendConnection implements 
ExecutorJDBCManager {
     }
     
     /**
-     * Close federate executor.
+     * Close federation executor.
      * 
-     * @return SQL exception when federate executor close
+     * @return SQL exception when federation executor close
      */
-    public synchronized Collection<SQLException> closeFederateExecutor() {
+    public synchronized Collection<SQLException> closeFederationExecutor() {
         Collection<SQLException> result = new LinkedList<>();
         if (null != federationExecutor) {
             try {
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
index 0dbf4ff..b3c59d2 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
@@ -123,7 +123,7 @@ public final class CommandExecutorTask implements Runnable {
         Collection<SQLException> result = new LinkedList<>();
         PrimaryVisitedManager.clear();
         
result.addAll(backendConnection.closeDatabaseCommunicationEngines(false));
-        result.addAll(backendConnection.closeFederateExecutor());
+        result.addAll(backendConnection.closeFederationExecutor());
         return result;
     }
     
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
index d1afccb..c5a9ffd 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
@@ -103,7 +103,7 @@ public final class FrontendChannelInboundHandler extends 
ChannelInboundHandlerAd
         PrimaryVisitedManager.clear();
         backendConnection.closeDatabaseCommunicationEngines(true);
         backendConnection.closeConnections(true);
-        backendConnection.closeFederateExecutor();
+        backendConnection.closeFederationExecutor();
         databaseProtocolFrontendEngine.release(backendConnection);
     }
     
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
index 1e2f063..a55edd4 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
@@ -101,7 +101,7 @@ public final class CommandExecutorTaskTest {
         
when(backendConnection.getConnectionStatus()).thenReturn(connectionStatus);
         
when(engine.getCodecEngine().createPacketPayload(message)).thenReturn(payload);
         
when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
-        
when(backendConnection.closeFederateExecutor()).thenReturn(Collections.emptyList());
+        
when(backendConnection.closeFederationExecutor()).thenReturn(Collections.emptyList());
         CommandExecutorTask actual = new CommandExecutorTask(engine, 
backendConnection, handlerContext, message);
         actual.run();
         verify(connectionStatus).waitUntilConnectionRelease();
@@ -121,7 +121,7 @@ public final class CommandExecutorTaskTest {
         
when(backendConnection.getConnectionStatus()).thenReturn(connectionStatus);
         
when(engine.getCodecEngine().createPacketPayload(message)).thenReturn(payload);
         
when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
-        
when(backendConnection.closeFederateExecutor()).thenReturn(Collections.emptyList());
+        
when(backendConnection.closeFederationExecutor()).thenReturn(Collections.emptyList());
         CommandExecutorTask actual = new CommandExecutorTask(engine, 
backendConnection, handlerContext, message);
         actual.run();
         verify(connectionStatus).waitUntilConnectionRelease();
@@ -145,7 +145,7 @@ public final class CommandExecutorTaskTest {
         
when(backendConnection.getConnectionStatus()).thenReturn(connectionStatus);
         
when(engine.getCodecEngine().createPacketPayload(message)).thenReturn(payload);
         
when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
-        
when(backendConnection.closeFederateExecutor()).thenReturn(Collections.emptyList());
+        
when(backendConnection.closeFederationExecutor()).thenReturn(Collections.emptyList());
         CommandExecutorTask actual = new CommandExecutorTask(engine, 
backendConnection, handlerContext, message);
         actual.run();
         verify(connectionStatus).waitUntilConnectionRelease();
@@ -166,7 +166,7 @@ public final class CommandExecutorTaskTest {
         when(engine.getCommandExecuteEngine().getErrorPacket(mockException, 
backendConnection)).thenReturn(databasePacket);
         
when(engine.getCommandExecuteEngine().getOtherPacket(backendConnection)).thenReturn(Optional.of(databasePacket));
         
when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
-        
when(backendConnection.closeFederateExecutor()).thenReturn(Collections.emptyList());
+        
when(backendConnection.closeFederationExecutor()).thenReturn(Collections.emptyList());
         CommandExecutorTask actual = new CommandExecutorTask(engine, 
backendConnection, handlerContext, message);
         actual.run();
         verify(handlerContext, atLeast(2)).writeAndFlush(databasePacket);

Reply via email to