This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d2ab6bb90e1 Refactor ExecuteProcessEngine (#23704)
d2ab6bb90e1 is described below
commit d2ab6bb90e18b8a4d02287616cabb883d3faa218
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Jan 22 17:26:47 2023 +0800
Refactor ExecuteProcessEngine (#23704)
---
.../engine/driver/jdbc/JDBCExecutorCallback.java | 2 +-
.../sql/execute/engine/raw/RawExecutor.java | 7 +++--
.../raw/callback/RawSQLExecutorCallback.java | 3 +-
.../executor/sql/process/ExecuteProcessEngine.java | 33 +++++++++++-----------
.../driver/executor/DriverJDBCExecutor.java | 21 ++++++++------
.../executor/FilterableTableScanExecutor.java | 9 +++---
.../executor/TranslatableTableScanExecutor.java | 20 +++++++------
.../jdbc/executor/ProxyJDBCExecutor.java | 7 +++--
.../netty/FrontendChannelInboundHandler.java | 4 +--
9 files changed, 57 insertions(+), 49 deletions(-)
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
index b86cd8f6ef2..99df0e83ca2 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
@@ -116,7 +116,7 @@ public abstract class JDBCExecutorCallback<T> implements
ExecutorCallback<JDBCEx
private void finishReport(final Map<String, Object> dataMap, final
SQLExecutionUnit executionUnit) {
if (dataMap.containsKey(ExecuteProcessConstants.EXECUTE_ID.name())) {
-
ExecuteProcessEngine.finishExecution(dataMap.get(ExecuteProcessConstants.EXECUTE_ID.name()).toString(),
executionUnit);
+ new
ExecuteProcessEngine().finishExecution(dataMap.get(ExecuteProcessConstants.EXECUTE_ID.name()).toString(),
executionUnit);
}
}
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
index df48b64d605..fb024c6d7e5 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
@@ -60,15 +60,16 @@ public final class RawExecutor {
*/
public List<ExecuteResult> execute(final
ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext, final
QueryContext queryContext,
final RawSQLExecutorCallback callback)
throws SQLException {
+ ExecuteProcessEngine executeProcessEngine = new ExecuteProcessEngine();
try {
- ExecuteProcessEngine.initializeExecution(queryContext,
executionGroupContext, eventBusContext);
+ executeProcessEngine.initializeExecution(queryContext,
executionGroupContext, eventBusContext);
// TODO Load query header for first query
List<ExecuteResult> results = execute(executionGroupContext,
(RawSQLExecutorCallback) null, callback);
-
ExecuteProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
+
executeProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
return results.isEmpty() || Objects.isNull(results.get(0)) ?
Collections
.singletonList(new UpdateResult(0, 0L)) : results;
} finally {
- ExecuteProcessEngine.cleanExecution();
+ executeProcessEngine.cleanExecution();
}
}
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
index 43f00cc9233..ba3e0238d04 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
@@ -47,9 +47,10 @@ public final class RawSQLExecutorCallback implements
ExecutorCallback<RawSQLExec
public Collection<ExecuteResult> execute(final
Collection<RawSQLExecutionUnit> inputs, final boolean isTrunkThread, final
Map<String, Object> dataMap) throws SQLException {
Collection<ExecuteResult> result =
callbacks.iterator().next().execute(inputs, isTrunkThread, dataMap);
if (dataMap.containsKey(ExecuteProcessConstants.EXECUTE_ID.name())) {
+ ExecuteProcessEngine executeProcessEngine = new
ExecuteProcessEngine();
String executionID =
dataMap.get(ExecuteProcessConstants.EXECUTE_ID.name()).toString();
for (RawSQLExecutionUnit each : inputs) {
- ExecuteProcessEngine.finishExecution(executionID, each);
+ executeProcessEngine.finishExecution(executionID, each);
}
}
return result;
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngine.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngine.java
index b3a56e7f760..a0aa5ea2277 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngine.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngine.java
@@ -18,8 +18,6 @@
package org.apache.shardingsphere.infra.executor.sql.process;
import com.google.common.base.Strings;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.binder.QueryContext;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorDataMap;
@@ -39,9 +37,10 @@ import java.util.concurrent.ThreadLocalRandom;
/**
* Execute process engine.
*/
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ExecuteProcessEngine {
+ private final ExecuteProcessReporter reporter = new
ExecuteProcessReporter();
+
/**
* Initialize connection.
*
@@ -49,13 +48,13 @@ public final class ExecuteProcessEngine {
* @param databaseName database name
* @return execution ID
*/
- public static String initializeConnection(final Grantee grantee, final
String databaseName) {
+ public String initializeConnection(final Grantee grantee, final String
databaseName) {
ExecutionGroupContext<SQLExecutionUnit> executionGroupContext =
createExecutionGroupContext(grantee, databaseName);
- new ExecuteProcessReporter().report(executionGroupContext);
+ reporter.report(executionGroupContext);
return executionGroupContext.getExecutionID();
}
- private static ExecutionGroupContext<SQLExecutionUnit>
createExecutionGroupContext(final Grantee grantee, final String databaseName) {
+ private ExecutionGroupContext<SQLExecutionUnit>
createExecutionGroupContext(final Grantee grantee, final String databaseName) {
ExecutionGroupContext<SQLExecutionUnit> result = new
ExecutionGroupContext<>(Collections.emptyList());
result.setExecutionID(new UUID(ThreadLocalRandom.current().nextLong(),
ThreadLocalRandom.current().nextLong()).toString().replace("-", ""));
result.setGrantee(grantee);
@@ -68,8 +67,8 @@ public final class ExecuteProcessEngine {
*
* @param executionID execution ID
*/
- public static void finishConnection(final String executionID) {
- new ExecuteProcessReporter().reportRemove(executionID);
+ public void finishConnection(final String executionID) {
+ reporter.reportRemove(executionID);
}
/**
@@ -79,13 +78,13 @@ public final class ExecuteProcessEngine {
* @param executionGroupContext execution group context
* @param eventBusContext event bus context
*/
- public static void initializeExecution(final QueryContext queryContext,
final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext,
final EventBusContext eventBusContext) {
+ public void initializeExecution(final QueryContext queryContext, final
ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, final
EventBusContext eventBusContext) {
if (Strings.isNullOrEmpty(executionGroupContext.getExecutionID())) {
executionGroupContext.setExecutionID(new
UUID(ThreadLocalRandom.current().nextLong(),
ThreadLocalRandom.current().nextLong()).toString().replace("-", ""));
}
if
(isMySQLDDLOrDMLStatement(queryContext.getSqlStatementContext().getSqlStatement()))
{
ExecutorDataMap.getValue().put(ExecuteProcessConstants.EXECUTE_ID.name(),
executionGroupContext.getExecutionID());
- new ExecuteProcessReporter().report(queryContext,
executionGroupContext, ExecuteProcessConstants.EXECUTE_STATUS_START);
+ reporter.report(queryContext, executionGroupContext,
ExecuteProcessConstants.EXECUTE_STATUS_START);
}
}
@@ -95,8 +94,8 @@ public final class ExecuteProcessEngine {
* @param executionID execution ID
* @param executionUnit execution unit
*/
- public static void finishExecution(final String executionID, final
SQLExecutionUnit executionUnit) {
- new ExecuteProcessReporter().report(executionID, executionUnit,
ExecuteProcessConstants.EXECUTE_STATUS_DONE);
+ public void finishExecution(final String executionID, final
SQLExecutionUnit executionUnit) {
+ reporter.report(executionID, executionUnit,
ExecuteProcessConstants.EXECUTE_STATUS_DONE);
}
/**
@@ -105,23 +104,23 @@ public final class ExecuteProcessEngine {
* @param executionID execution ID
* @param eventBusContext event bus context
*/
- public static void finishExecution(final String executionID, final
EventBusContext eventBusContext) {
+ public void finishExecution(final String executionID, final
EventBusContext eventBusContext) {
if
(ExecutorDataMap.getValue().containsKey(ExecuteProcessConstants.EXECUTE_ID.name()))
{
- new ExecuteProcessReporter().report(executionID,
ExecuteProcessConstants.EXECUTE_STATUS_DONE, eventBusContext);
+ reporter.report(executionID,
ExecuteProcessConstants.EXECUTE_STATUS_DONE, eventBusContext);
}
}
/**
* Clean execution.
*/
- public static void cleanExecution() {
+ public void cleanExecution() {
if
(ExecutorDataMap.getValue().containsKey(ExecuteProcessConstants.EXECUTE_ID.name()))
{
- new
ExecuteProcessReporter().reportClean(ExecutorDataMap.getValue().get(ExecuteProcessConstants.EXECUTE_ID.name()).toString());
+
reporter.reportClean(ExecutorDataMap.getValue().get(ExecuteProcessConstants.EXECUTE_ID.name()).toString());
}
ExecutorDataMap.getValue().remove(ExecuteProcessConstants.EXECUTE_ID.name());
}
- private static boolean isMySQLDDLOrDMLStatement(final SQLStatement
sqlStatement) {
+ private boolean isMySQLDDLOrDMLStatement(final SQLStatement sqlStatement) {
return sqlStatement instanceof MySQLStatement && (sqlStatement
instanceof DDLStatement || sqlStatement instanceof DMLStatement);
}
}
diff --git
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
index 0b1d59c48ed..d84cd871f39 100644
---
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
+++
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
@@ -73,13 +73,14 @@ public final class DriverJDBCExecutor {
*/
public List<QueryResult> executeQuery(final
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
final QueryContext queryContext,
final ExecuteQueryCallback callback) throws SQLException {
+ ExecuteProcessEngine executeProcessEngine = new ExecuteProcessEngine();
try {
- ExecuteProcessEngine.initializeExecution(queryContext,
executionGroupContext, eventBusContext);
+ executeProcessEngine.initializeExecution(queryContext,
executionGroupContext, eventBusContext);
List<QueryResult> result =
jdbcExecutor.execute(executionGroupContext, callback);
-
ExecuteProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
+
executeProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
return result;
} finally {
- ExecuteProcessEngine.cleanExecution();
+ executeProcessEngine.cleanExecution();
}
}
@@ -95,15 +96,16 @@ public final class DriverJDBCExecutor {
*/
public int executeUpdate(final ExecutionGroupContext<JDBCExecutionUnit>
executionGroupContext,
final QueryContext queryContext, final
Collection<RouteUnit> routeUnits, final JDBCExecutorCallback<Integer> callback)
throws SQLException {
+ ExecuteProcessEngine executeProcessEngine = new ExecuteProcessEngine();
try {
- ExecuteProcessEngine.initializeExecution(queryContext,
executionGroupContext, eventBusContext);
+ executeProcessEngine.initializeExecution(queryContext,
executionGroupContext, eventBusContext);
SQLStatementContext<?> sqlStatementContext =
queryContext.getSqlStatementContext();
List<Integer> results = doExecute(executionGroupContext,
sqlStatementContext, routeUnits, callback);
int result =
isNeedAccumulate(metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules(),
sqlStatementContext) ? accumulate(results) : results.get(0);
-
ExecuteProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
+
executeProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
return result;
} finally {
- ExecuteProcessEngine.cleanExecution();
+ executeProcessEngine.cleanExecution();
}
}
@@ -136,14 +138,15 @@ public final class DriverJDBCExecutor {
*/
public boolean execute(final ExecutionGroupContext<JDBCExecutionUnit>
executionGroupContext, final QueryContext queryContext,
final Collection<RouteUnit> routeUnits, final
JDBCExecutorCallback<Boolean> callback) throws SQLException {
+ ExecuteProcessEngine executeProcessEngine = new ExecuteProcessEngine();
try {
- ExecuteProcessEngine.initializeExecution(queryContext,
executionGroupContext, eventBusContext);
+ executeProcessEngine.initializeExecution(queryContext,
executionGroupContext, eventBusContext);
List<Boolean> results = doExecute(executionGroupContext,
queryContext.getSqlStatementContext(), routeUnits, callback);
boolean result = null != results && !results.isEmpty() && null !=
results.get(0) && results.get(0);
-
ExecuteProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
+
executeProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
return result;
} finally {
- ExecuteProcessEngine.cleanExecution();
+ executeProcessEngine.cleanExecution();
}
}
diff --git
a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java
b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java
index 624233e4cbb..40b525bfe97 100644
---
a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java
+++
b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java
@@ -123,7 +123,7 @@ public final class FilterableTableScanExecutor implements
TableScanExecutor {
@Override
public Enumerator<Object> enumerator() {
- return new EmptyRowEnumerator();
+ return new EmptyRowEnumerator<>();
}
};
}
@@ -150,12 +150,13 @@ public final class FilterableTableScanExecutor implements
TableScanExecutor {
}
private AbstractEnumerable<Object[]> execute(final DatabaseType
databaseType, final QueryContext queryContext, final ShardingSphereDatabase
database, final ExecutionContext context) {
+ ExecuteProcessEngine executeProcessEngine = new ExecuteProcessEngine();
try {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
prepareEngine.prepare(context.getRouteContext(), context.getExecutionUnits());
setParameters(executionGroupContext.getInputGroups());
-
ExecuteProcessEngine.initializeExecution(context.getQueryContext(),
executionGroupContext, eventBusContext);
+
executeProcessEngine.initializeExecution(context.getQueryContext(),
executionGroupContext, eventBusContext);
List<QueryResult> queryResults = execute(executionGroupContext,
databaseType);
-
ExecuteProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
+
executeProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
// TODO need to get session context
MergeEngine mergeEngine = new MergeEngine(database,
executorContext.getProps(), new ConnectionContext());
MergedResult mergedResult = mergeEngine.merge(queryResults,
queryContext.getSqlStatementContext());
@@ -164,7 +165,7 @@ public final class FilterableTableScanExecutor implements
TableScanExecutor {
} catch (final SQLException ex) {
throw new SQLWrapperException(ex);
} finally {
- ExecuteProcessEngine.cleanExecution();
+ executeProcessEngine.cleanExecution();
}
}
diff --git
a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java
b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java
index d3fcdffd22f..86376bb0d0a 100644
---
a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java
+++
b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java
@@ -154,7 +154,7 @@ public final class TranslatableTableScanExecutor implements
TableScanExecutor {
@Override
public Enumerator<Object> enumerator() {
- return new EmptyRowEnumerator();
+ return new EmptyRowEnumerator<>();
}
};
}
@@ -170,19 +170,20 @@ public final class TranslatableTableScanExecutor
implements TableScanExecutor {
@Override
public Enumerator<Object> enumerator() {
- return new MemoryEnumerator(tableData.getRows());
+ return new MemoryEnumerator<>(tableData.getRows());
}
};
}
private AbstractEnumerable<Object> executeScalarEnumerable(final
DatabaseType databaseType, final QueryContext queryContext,
final
ShardingSphereDatabase database, final ExecutionContext context) {
+ ExecuteProcessEngine executeProcessEngine = new ExecuteProcessEngine();
try {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
prepareEngine.prepare(context.getRouteContext(), context.getExecutionUnits());
setParameters(executionGroupContext.getInputGroups());
-
ExecuteProcessEngine.initializeExecution(context.getQueryContext(),
executionGroupContext, eventBusContext);
+
executeProcessEngine.initializeExecution(context.getQueryContext(),
executionGroupContext, eventBusContext);
List<QueryResult> queryResults = execute(executionGroupContext,
databaseType);
-
ExecuteProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
+
executeProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
MergeEngine mergeEngine = new MergeEngine(database,
executorContext.getProps(), new ConnectionContext());
MergedResult mergedResult = mergeEngine.merge(queryResults,
queryContext.getSqlStatementContext());
Collection<Statement> statements =
getStatements(executionGroupContext.getInputGroups());
@@ -190,7 +191,7 @@ public final class TranslatableTableScanExecutor implements
TableScanExecutor {
} catch (final SQLException ex) {
throw new SQLWrapperException(ex);
} finally {
- ExecuteProcessEngine.cleanExecution();
+ executeProcessEngine.cleanExecution();
}
}
@@ -201,7 +202,7 @@ public final class TranslatableTableScanExecutor implements
TableScanExecutor {
@Override
public Enumerator<Object> enumerator() {
- return new SQLFederationRowEnumerator(rows, statements);
+ return new SQLFederationRowEnumerator<>(rows, statements);
}
};
}
@@ -237,12 +238,13 @@ public final class TranslatableTableScanExecutor
implements TableScanExecutor {
}
private AbstractEnumerable<Object[]> execute(final DatabaseType
databaseType, final QueryContext queryContext, final ShardingSphereDatabase
database, final ExecutionContext context) {
+ ExecuteProcessEngine executeProcessEngine = new ExecuteProcessEngine();
try {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
prepareEngine.prepare(context.getRouteContext(), context.getExecutionUnits());
setParameters(executionGroupContext.getInputGroups());
-
ExecuteProcessEngine.initializeExecution(context.getQueryContext(),
executionGroupContext, eventBusContext);
+
executeProcessEngine.initializeExecution(context.getQueryContext(),
executionGroupContext, eventBusContext);
List<QueryResult> queryResults = execute(executionGroupContext,
databaseType);
-
ExecuteProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
+
executeProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
MergeEngine mergeEngine = new MergeEngine(database,
executorContext.getProps(), new ConnectionContext());
MergedResult mergedResult = mergeEngine.merge(queryResults,
queryContext.getSqlStatementContext());
Collection<Statement> statements =
getStatements(executionGroupContext.getInputGroups());
@@ -250,7 +252,7 @@ public final class TranslatableTableScanExecutor implements
TableScanExecutor {
} catch (final SQLException ex) {
throw new SQLWrapperException(ex);
} finally {
- ExecuteProcessEngine.cleanExecution();
+ executeProcessEngine.cleanExecution();
}
}
diff --git
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
index 61d715f38cb..7a75c6a5ffe 100644
---
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
+++
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
@@ -64,23 +64,24 @@ public final class ProxyJDBCExecutor {
*/
public List<ExecuteResult> execute(final QueryContext queryContext, final
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
final boolean isReturnGeneratedKeys,
final boolean isExceptionThrown) throws SQLException {
+ ExecuteProcessEngine executeProcessEngine = new ExecuteProcessEngine();
try {
MetaDataContexts metaDataContexts =
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
EventBusContext eventBusContext =
ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext();
ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName());
DatabaseType protocolType = database.getProtocolType();
Map<String, DatabaseType> storageTypes =
database.getResourceMetaData().getStorageTypes();
- ExecuteProcessEngine.initializeExecution(queryContext,
executionGroupContext, eventBusContext);
+ executeProcessEngine.initializeExecution(queryContext,
executionGroupContext, eventBusContext);
SQLStatementContext<?> context =
queryContext.getSqlStatementContext();
List<ExecuteResult> result =
jdbcExecutor.execute(executionGroupContext,
ProxyJDBCExecutorCallbackFactory.newInstance(type,
protocolType, storageTypes, context.getSqlStatement(),
databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown,
true),
ProxyJDBCExecutorCallbackFactory.newInstance(type,
protocolType, storageTypes, context.getSqlStatement(),
databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown,
false));
-
ExecuteProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
+
executeProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
return result;
} finally {
- ExecuteProcessEngine.cleanExecution();
+ executeProcessEngine.cleanExecution();
}
}
}
diff --git
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
index 73e0820597d..f547bb07ca9 100644
---
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
+++
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
@@ -79,7 +79,7 @@ public final class FrontendChannelInboundHandler extends
ChannelInboundHandlerAd
if (authResult.isFinished()) {
connectionSession.setGrantee(new
Grantee(authResult.getUsername(), authResult.getHostname()));
connectionSession.setCurrentDatabase(authResult.getDatabase());
-
connectionSession.setExecutionId(ExecuteProcessEngine.initializeConnection(connectionSession.getGrantee(),
connectionSession.getDatabaseName()));
+ connectionSession.setExecutionId(new
ExecuteProcessEngine().initializeConnection(connectionSession.getGrantee(),
connectionSession.getDatabaseName()));
}
return authResult.isFinished();
// CHECKSTYLE:OFF
@@ -101,7 +101,7 @@ public final class FrontendChannelInboundHandler extends
ChannelInboundHandlerAd
private void closeAllResources() {
ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(connectionSession.getConnectionId());
connectionSession.getBackendConnection().closeAllResources();
-
Optional.ofNullable(connectionSession.getExecutionId()).ifPresent(ExecuteProcessEngine::finishConnection);
+ Optional.ofNullable(connectionSession.getExecutionId()).ifPresent(new
ExecuteProcessEngine()::finishConnection);
databaseProtocolFrontendEngine.release(connectionSession);
}