This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new b075297f5b0 Refactor usage of ProcessEngine (#25422)
b075297f5b0 is described below
commit b075297f5b0463d3939c238fc3496680b6736868
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Apr 30 22:43:53 2023 +0800
Refactor usage of ProcessEngine (#25422)
---
.../sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java | 4 +++-
.../infra/executor/sql/execute/engine/raw/RawExecutor.java | 3 ++-
.../sql/execute/engine/raw/callback/RawSQLExecutorCallback.java | 4 +++-
.../apache/shardingsphere/driver/executor/DriverJDBCExecutor.java | 5 ++---
.../sqlfederation/executor/FilterableTableScanExecutor.java | 3 ++-
.../sqlfederation/executor/TranslatableTableScanExecutor.java | 4 ++--
.../proxy/backend/connector/jdbc/executor/ProxyJDBCExecutor.java | 3 ++-
.../proxy/frontend/netty/FrontendChannelInboundHandler.java | 6 ++++--
8 files changed, 20 insertions(+), 12 deletions(-)
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
index ce41216f46a..2113466b73e 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
@@ -56,6 +56,8 @@ public abstract class JDBCExecutorCallback<T> implements
ExecutorCallback<JDBCEx
private final boolean isExceptionThrown;
+ private final ProcessEngine processEngine = new ProcessEngine();
+
@Override
public final Collection<T> execute(final Collection<JDBCExecutionUnit>
executionUnits, final boolean isTrunkThread) throws SQLException {
// TODO It is better to judge whether need sane result before execute,
can avoid exception thrown
@@ -84,7 +86,7 @@ public abstract class JDBCExecutorCallback<T> implements
ExecutorCallback<JDBCEx
sqlExecutionHook.start(jdbcExecutionUnit.getExecutionUnit().getDataSourceName(),
sqlUnit.getSql(), sqlUnit.getParameters(), dataSourceMetaData, isTrunkThread);
T result = executeSQL(sqlUnit.getSql(),
jdbcExecutionUnit.getStorageResource(), jdbcExecutionUnit.getConnectionMode(),
storageType);
sqlExecutionHook.finishSuccess();
- new ProcessEngine().finishExecution();
+ processEngine.finishExecution();
return result;
} catch (final SQLException ex) {
if (!storageType.equals(protocolType)) {
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
index c0d002dc040..b481e08d80c 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
@@ -43,6 +43,8 @@ public final class RawExecutor {
private final ConnectionContext connectionContext;
+ private final ProcessEngine processEngine = new ProcessEngine();
+
/**
* Execute.
*
@@ -54,7 +56,6 @@ public final class RawExecutor {
*/
public List<ExecuteResult> execute(final
ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext,
final QueryContext queryContext, final
RawSQLExecutorCallback callback) throws SQLException {
- ProcessEngine processEngine = new ProcessEngine();
try {
processEngine.initializeExecution(executionGroupContext,
queryContext);
// TODO Load query header for first query
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
index 3adfcde0ac5..01a3880d6c3 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
@@ -36,6 +36,8 @@ public final class RawSQLExecutorCallback implements
ExecutorCallback<RawSQLExec
@SuppressWarnings("rawtypes")
private final Collection<RawExecutorCallback> callbacks;
+ private final ProcessEngine processEngine = new ProcessEngine();
+
public RawSQLExecutorCallback() {
callbacks =
ShardingSphereServiceLoader.getServiceInstances(RawExecutorCallback.class);
Preconditions.checkState(!callbacks.isEmpty(), "No raw executor
callback implementation found.");
@@ -47,7 +49,7 @@ public final class RawSQLExecutorCallback implements
ExecutorCallback<RawSQLExec
Collection<ExecuteResult> result =
callbacks.iterator().next().execute(inputs, isTrunkThread);
if (!ExecuteIDContext.isEmpty()) {
for (int i = 0; i < inputs.size(); i++) {
- new ProcessEngine().finishExecution();
+ processEngine.finishExecution();
}
}
return result;
diff --git
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
index 6fa19d5d567..9633ac77d09 100644
---
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
+++
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
@@ -50,6 +50,8 @@ public final class DriverJDBCExecutor {
private final MetaDataRefreshEngine metaDataRefreshEngine;
+ private final ProcessEngine processEngine = new ProcessEngine();
+
public DriverJDBCExecutor(final String databaseName, final ContextManager
contextManager, final JDBCExecutor jdbcExecutor) {
this.databaseName = databaseName;
this.jdbcExecutor = jdbcExecutor;
@@ -69,7 +71,6 @@ public final class DriverJDBCExecutor {
*/
public List<QueryResult> executeQuery(final
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
final QueryContext queryContext,
final ExecuteQueryCallback callback) throws SQLException {
- ProcessEngine processEngine = new ProcessEngine();
try {
processEngine.initializeExecution(executionGroupContext,
queryContext);
return jdbcExecutor.execute(executionGroupContext, callback);
@@ -90,7 +91,6 @@ public final class DriverJDBCExecutor {
*/
public int executeUpdate(final ExecutionGroupContext<JDBCExecutionUnit>
executionGroupContext,
final QueryContext queryContext, final
Collection<RouteUnit> routeUnits, final JDBCExecutorCallback<Integer> callback)
throws SQLException {
- ProcessEngine processEngine = new ProcessEngine();
try {
processEngine.initializeExecution(executionGroupContext,
queryContext);
SQLStatementContext<?> sqlStatementContext =
queryContext.getSqlStatementContext();
@@ -130,7 +130,6 @@ public final class DriverJDBCExecutor {
*/
public boolean execute(final ExecutionGroupContext<JDBCExecutionUnit>
executionGroupContext, final QueryContext queryContext,
final Collection<RouteUnit> routeUnits, final
JDBCExecutorCallback<Boolean> callback) throws SQLException {
- ProcessEngine processEngine = new ProcessEngine();
try {
processEngine.initializeExecution(executionGroupContext,
queryContext);
List<Boolean> results = doExecute(executionGroupContext,
queryContext.getSqlStatementContext(), routeUnits, callback);
diff --git
a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java
b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java
index 79277b57eb4..8b7948d8b6d 100644
---
a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java
+++
b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java
@@ -116,6 +116,8 @@ public final class FilterableTableScanExecutor implements
TableScanExecutor {
private final ShardingSphereData data;
+ private final ProcessEngine processEngine = new ProcessEngine();
+
@Override
public Enumerable<Object> executeScalar(final ShardingSphereTable table,
final ScanNodeExecutorContext scanContext) {
return new AbstractEnumerable<Object>() {
@@ -149,7 +151,6 @@ public final class FilterableTableScanExecutor implements
TableScanExecutor {
}
private AbstractEnumerable<Object[]> execute(final DatabaseType
databaseType, final QueryContext queryContext, final ShardingSphereDatabase
database, final ExecutionContext context) {
- ProcessEngine processEngine = new ProcessEngine();
try {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
prepareEngine.prepare(context.getRouteContext(),
context.getExecutionUnits(), new
ExecutionGroupReportContext(database.getName()));
diff --git
a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java
b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java
index abe138b8620..b30fb712a7b 100644
---
a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java
+++
b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java
@@ -127,6 +127,8 @@ public final class TranslatableTableScanExecutor implements
TableScanExecutor {
private final ShardingSphereData data;
+ private final ProcessEngine processEngine = new ProcessEngine();
+
@Override
public Enumerable<Object> executeScalar(final ShardingSphereTable table,
final ScanNodeExecutorContext scanContext) {
String databaseName = executorContext.getDatabaseName().toLowerCase();
@@ -175,7 +177,6 @@ public final class TranslatableTableScanExecutor implements
TableScanExecutor {
private AbstractEnumerable<Object> executeScalarEnumerable(final
DatabaseType databaseType, final QueryContext queryContext,
final
ShardingSphereDatabase database, final ExecutionContext context) {
- ProcessEngine processEngine = new ProcessEngine();
try {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
prepareEngine.prepare(context.getRouteContext(),
context.getExecutionUnits(), new
ExecutionGroupReportContext(database.getName()));
@@ -235,7 +236,6 @@ public final class TranslatableTableScanExecutor implements
TableScanExecutor {
}
private AbstractEnumerable<Object[]> execute(final DatabaseType
databaseType, final QueryContext queryContext, final ShardingSphereDatabase
database, final ExecutionContext context) {
- ProcessEngine processEngine = new ProcessEngine();
try {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
prepareEngine.prepare(context.getRouteContext(),
context.getExecutionUnits(), new
ExecutionGroupReportContext(database.getName()));
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/executor/ProxyJDBCExecutor.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/executor/ProxyJDBCExecutor.java
index f8d3a93044f..64eb74db2c7 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/executor/ProxyJDBCExecutor.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/executor/ProxyJDBCExecutor.java
@@ -51,6 +51,8 @@ public final class ProxyJDBCExecutor {
private final JDBCExecutor jdbcExecutor;
+ private final ProcessEngine processEngine = new ProcessEngine();
+
/**
* Execute.
*
@@ -63,7 +65,6 @@ public final class ProxyJDBCExecutor {
*/
public List<ExecuteResult> execute(final QueryContext queryContext, final
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
final boolean isReturnGeneratedKeys,
final boolean isExceptionThrown) throws SQLException {
- ProcessEngine processEngine = new ProcessEngine();
try {
MetaDataContexts metaDataContexts =
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName());
diff --git
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
index 1d8efd9d9ec..75458694ab1 100644
---
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
+++
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
@@ -50,6 +50,8 @@ public final class FrontendChannelInboundHandler extends
ChannelInboundHandlerAd
private final ConnectionSession connectionSession;
+ private final ProcessEngine processEngine = new ProcessEngine();
+
private volatile boolean authenticated;
public FrontendChannelInboundHandler(final DatabaseProtocolFrontendEngine
databaseProtocolFrontendEngine, final Channel channel) {
@@ -80,7 +82,7 @@ public final class FrontendChannelInboundHandler extends
ChannelInboundHandlerAd
if (authResult.isFinished()) {
connectionSession.setGrantee(new
Grantee(authResult.getUsername(), authResult.getHostname()));
connectionSession.setCurrentDatabase(authResult.getDatabase());
- connectionSession.setExecutionId(new
ProcessEngine().initializeConnection(connectionSession.getGrantee(),
connectionSession.getDatabaseName()));
+
connectionSession.setExecutionId(processEngine.initializeConnection(connectionSession.getGrantee(),
connectionSession.getDatabaseName()));
}
return authResult.isFinished();
// CHECKSTYLE:OFF
@@ -106,7 +108,7 @@ public final class FrontendChannelInboundHandler extends
ChannelInboundHandlerAd
private void closeAllResources() {
ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(connectionSession.getConnectionId());
connectionSession.getBackendConnection().closeAllResources();
- Optional.ofNullable(connectionSession.getExecutionId()).ifPresent(new
ProcessEngine()::finishConnection);
+
Optional.ofNullable(connectionSession.getExecutionId()).ifPresent(processEngine::finishConnection);
databaseProtocolFrontendEngine.release(connectionSession);
}