This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new fe59f4cb279 Remove SPI for ExecuteProcessReporter (#23701)
fe59f4cb279 is described below
commit fe59f4cb279573af72b50110a17349eff2cd8dcb
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Jan 22 14:59:16 2023 +0800
Remove SPI for ExecuteProcessReporter (#23701)
* Remove SPI for ExecuteProcessReporter
* Remove SPI for ExecuteProcessReporter
* Remove SPI for ExecuteProcessReporter
---
.../engine/driver/jdbc/JDBCExecutorCallback.java | 2 +-
.../raw/callback/RawSQLExecutorCallback.java | 8 +--
.../executor/sql/process/ExecuteProcessEngine.java | 28 +++-----
.../sql/process/ExecuteProcessReporter.java | 53 ++++++++++----
.../sql}/process/ShowProcessListManager.java | 4 +-
.../process/lock/ShowProcessListSimpleLock.java | 2 +-
.../sql/process/spi/ExecuteProcessReporter.java | 83 ----------------------
.../sql/process/ExecuteProcessEngineTest.java | 81 ---------------------
.../sql/process/ExecuteProcessReporterTest.java | 10 +--
.../fixture}/EventBusContextHolderFixture.java | 2 +-
.../fixture/ExecuteProcessReporterFixture.java | 61 ----------------
...executor.sql.process.spi.ExecuteProcessReporter | 18 -----
.../statement/ShardingSpherePreparedStatement.java | 10 ++-
.../core/statement/ShardingSphereStatement.java | 14 ++--
...executor.sql.process.spi.ExecuteProcessReporter | 18 -----
.../subscriber/ProcessListChangedSubscriber.java | 4 +-
.../subscriber/ProcessRegistrySubscriber.java | 4 +-
.../ProcessListChangedSubscriberTest.java | 4 +-
.../subscriber/ProcessStandaloneSubscriber.java | 2 +-
.../ProcessStandaloneSubscriberTest.java | 2 +-
.../backend/communication/ProxySQLExecutor.java | 3 +-
21 files changed, 83 insertions(+), 330 deletions(-)
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
index b74af5500a6..b86cd8f6ef2 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
@@ -116,7 +116,7 @@ public abstract class JDBCExecutorCallback<T> implements
ExecutorCallback<JDBCEx
private void finishReport(final Map<String, Object> dataMap, final
SQLExecutionUnit executionUnit) {
if (dataMap.containsKey(ExecuteProcessConstants.EXECUTE_ID.name())) {
-
ExecuteProcessEngine.finishExecution(dataMap.get(ExecuteProcessConstants.EXECUTE_ID.name()).toString(),
executionUnit, eventBusContext);
+
ExecuteProcessEngine.finishExecution(dataMap.get(ExecuteProcessConstants.EXECUTE_ID.name()).toString(),
executionUnit);
}
}
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
index 86b7a3a42d3..43f00cc9233 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
@@ -23,7 +23,6 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExe
import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import
org.apache.shardingsphere.infra.executor.sql.process.ExecuteProcessEngine;
import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
import java.sql.SQLException;
@@ -38,10 +37,7 @@ public final class RawSQLExecutorCallback implements
ExecutorCallback<RawSQLExec
@SuppressWarnings("rawtypes")
private final Collection<RawExecutorCallback> callbacks;
- private final EventBusContext eventBusContext;
-
- public RawSQLExecutorCallback(final EventBusContext eventBusContext) {
- this.eventBusContext = eventBusContext;
+ public RawSQLExecutorCallback() {
callbacks =
ShardingSphereServiceLoader.getServiceInstances(RawExecutorCallback.class);
Preconditions.checkState(!callbacks.isEmpty(), "No raw executor
callback implementation found.");
}
@@ -53,7 +49,7 @@ public final class RawSQLExecutorCallback implements
ExecutorCallback<RawSQLExec
if (dataMap.containsKey(ExecuteProcessConstants.EXECUTE_ID.name())) {
String executionID =
dataMap.get(ExecuteProcessConstants.EXECUTE_ID.name()).toString();
for (RawSQLExecutionUnit each : inputs) {
- ExecuteProcessEngine.finishExecution(executionID, each,
eventBusContext);
+ ExecuteProcessEngine.finishExecution(executionID, each);
}
}
return result;
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngine.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngine.java
index 91b8fd9a97f..b3a56e7f760 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngine.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngine.java
@@ -25,17 +25,14 @@ import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupConte
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorDataMap;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
-import
org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessReporter;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
-import
org.apache.shardingsphere.infra.util.spi.type.optional.OptionalSPIRegistry;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DDLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.MySQLStatement;
import java.util.Collections;
-import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
@@ -54,7 +51,7 @@ public final class ExecuteProcessEngine {
*/
public static String initializeConnection(final Grantee grantee, final
String databaseName) {
ExecutionGroupContext<SQLExecutionUnit> executionGroupContext =
createExecutionGroupContext(grantee, databaseName);
-
OptionalSPIRegistry.findRegisteredService(ExecuteProcessReporter.class).ifPresent(optional
-> optional.report(executionGroupContext));
+ new ExecuteProcessReporter().report(executionGroupContext);
return executionGroupContext.getExecutionID();
}
@@ -72,7 +69,7 @@ public final class ExecuteProcessEngine {
* @param executionID execution ID
*/
public static void finishConnection(final String executionID) {
-
OptionalSPIRegistry.findRegisteredService(ExecuteProcessReporter.class).ifPresent(optional
-> optional.reportRemove(executionID));
+ new ExecuteProcessReporter().reportRemove(executionID);
}
/**
@@ -83,13 +80,12 @@ public final class ExecuteProcessEngine {
* @param eventBusContext event bus context
*/
public static void initializeExecution(final QueryContext queryContext,
final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext,
final EventBusContext eventBusContext) {
- Optional<ExecuteProcessReporter> reporter =
OptionalSPIRegistry.findRegisteredService(ExecuteProcessReporter.class);
if (Strings.isNullOrEmpty(executionGroupContext.getExecutionID())) {
executionGroupContext.setExecutionID(new
UUID(ThreadLocalRandom.current().nextLong(),
ThreadLocalRandom.current().nextLong()).toString().replace("-", ""));
}
- if (reporter.isPresent() &&
isMySQLDDLOrDMLStatement(queryContext.getSqlStatementContext().getSqlStatement()))
{
+ if
(isMySQLDDLOrDMLStatement(queryContext.getSqlStatementContext().getSqlStatement()))
{
ExecutorDataMap.getValue().put(ExecuteProcessConstants.EXECUTE_ID.name(),
executionGroupContext.getExecutionID());
- reporter.get().report(queryContext, executionGroupContext,
ExecuteProcessConstants.EXECUTE_STATUS_START, eventBusContext);
+ new ExecuteProcessReporter().report(queryContext,
executionGroupContext, ExecuteProcessConstants.EXECUTE_STATUS_START);
}
}
@@ -98,11 +94,9 @@ public final class ExecuteProcessEngine {
*
* @param executionID execution ID
* @param executionUnit execution unit
- * @param eventBusContext event bus context
*/
- public static void finishExecution(final String executionID, final
SQLExecutionUnit executionUnit, final EventBusContext eventBusContext) {
- OptionalSPIRegistry.findRegisteredService(ExecuteProcessReporter.class)
- .ifPresent(optional -> optional.report(executionID,
executionUnit, ExecuteProcessConstants.EXECUTE_STATUS_DONE, eventBusContext));
+ public static void finishExecution(final String executionID, final
SQLExecutionUnit executionUnit) {
+ new ExecuteProcessReporter().report(executionID, executionUnit,
ExecuteProcessConstants.EXECUTE_STATUS_DONE);
}
/**
@@ -112,9 +106,8 @@ public final class ExecuteProcessEngine {
* @param eventBusContext event bus context
*/
public static void finishExecution(final String executionID, final
EventBusContext eventBusContext) {
- Optional<ExecuteProcessReporter> reporter =
OptionalSPIRegistry.findRegisteredService(ExecuteProcessReporter.class);
- if (reporter.isPresent() &&
ExecutorDataMap.getValue().containsKey(ExecuteProcessConstants.EXECUTE_ID.name()))
{
- reporter.get().report(executionID,
ExecuteProcessConstants.EXECUTE_STATUS_DONE, eventBusContext);
+ if
(ExecutorDataMap.getValue().containsKey(ExecuteProcessConstants.EXECUTE_ID.name()))
{
+ new ExecuteProcessReporter().report(executionID,
ExecuteProcessConstants.EXECUTE_STATUS_DONE, eventBusContext);
}
}
@@ -122,9 +115,8 @@ public final class ExecuteProcessEngine {
* Clean execution.
*/
public static void cleanExecution() {
- Optional<ExecuteProcessReporter> reporter =
OptionalSPIRegistry.findRegisteredService(ExecuteProcessReporter.class);
- if (reporter.isPresent() &&
ExecutorDataMap.getValue().containsKey(ExecuteProcessConstants.EXECUTE_ID.name()))
{
-
reporter.get().reportClean(ExecutorDataMap.getValue().get(ExecuteProcessConstants.EXECUTE_ID.name()).toString());
+ if
(ExecutorDataMap.getValue().containsKey(ExecuteProcessConstants.EXECUTE_ID.name()))
{
+ new
ExecuteProcessReporter().reportClean(ExecutorDataMap.getValue().get(ExecuteProcessConstants.EXECUTE_ID.name()).toString());
}
ExecutorDataMap.getValue().remove(ExecuteProcessConstants.EXECUTE_ID.name());
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/GovernanceExecuteProcessReporter.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessReporter.java
similarity index 78%
rename from
mode/core/src/main/java/org/apache/shardingsphere/mode/process/GovernanceExecuteProcessReporter.java
rename to
infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessReporter.java
index b2c95250108..c3c6b86b629 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/GovernanceExecuteProcessReporter.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessReporter.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.process;
+package org.apache.shardingsphere.infra.executor.sql.process;
import org.apache.shardingsphere.infra.binder.QueryContext;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
@@ -23,25 +23,34 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionU
import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessUnit;
-import
org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessReporter;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import java.util.Optional;
/**
- * Governance execute process reporter.
+ * Execute process report.
*/
-public final class GovernanceExecuteProcessReporter implements
ExecuteProcessReporter {
+public final class ExecuteProcessReporter {
- @Override
+ /**
+ * Report this connection for proxy.
+ *
+ * @param executionGroupContext execution group context
+ */
public void report(final ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext) {
ExecuteProcessContext executeProcessContext = new
ExecuteProcessContext("", executionGroupContext,
ExecuteProcessConstants.EXECUTE_STATUS_SLEEP, true);
ShowProcessListManager.getInstance().putProcessContext(executeProcessContext.getExecutionID(),
executeProcessContext);
}
- @Override
+ /**
+ * Report the summary of this task.
+ *
+ * @param queryContext query context
+ * @param executionGroupContext execution group context
+ * @param constants constants
+ */
public void report(final QueryContext queryContext, final
ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext,
- final ExecuteProcessConstants constants, final
EventBusContext eventBusContext) {
+ final ExecuteProcessConstants constants) {
ExecuteProcessContext originExecuteProcessContext =
ShowProcessListManager.getInstance().getProcessContext(executionGroupContext.getExecutionID());
boolean isProxyContext = null != originExecuteProcessContext &&
originExecuteProcessContext.isProxyContext();
ExecuteProcessContext executeProcessContext = new
ExecuteProcessContext(queryContext.getSql(), executionGroupContext, constants,
isProxyContext);
@@ -49,18 +58,34 @@ public final class GovernanceExecuteProcessReporter
implements ExecuteProcessRep
ShowProcessListManager.getInstance().putProcessStatement(executeProcessContext.getExecutionID(),
executeProcessContext.getProcessStatements());
}
- @Override
- public void report(final String executionID, final SQLExecutionUnit
executionUnit, final ExecuteProcessConstants constants, final EventBusContext
eventBusContext) {
+ /**
+ * Report a unit of this task.
+ *
+ * @param executionID execution ID
+ * @param executionUnit execution unit
+ * @param constants constants
+ */
+ public void report(final String executionID, final SQLExecutionUnit
executionUnit, final ExecuteProcessConstants constants) {
ExecuteProcessUnit executeProcessUnit = new
ExecuteProcessUnit(executionUnit.getExecutionUnit(), constants);
ExecuteProcessContext executeProcessContext =
ShowProcessListManager.getInstance().getProcessContext(executionID);
Optional.ofNullable(executeProcessContext.getProcessUnits().get(executeProcessUnit.getUnitID())).ifPresent(optional
-> optional.setStatus(executeProcessUnit.getStatus()));
}
- @Override
+ /**
+ * Report this task on completion.
+ *
+ * @param executionID execution ID
+ * @param constants constants
+ * @param eventBusContext event bus context
+ */
public void report(final String executionID, final ExecuteProcessConstants
constants, final EventBusContext eventBusContext) {
}
- @Override
+ /**
+ * Report clean the task.
+ *
+ * @param executionID execution ID
+ */
public void reportClean(final String executionID) {
ShowProcessListManager.getInstance().removeProcessStatement(executionID);
ExecuteProcessContext executeProcessContext =
ShowProcessListManager.getInstance().getProcessContext(executionID);
@@ -74,7 +99,11 @@ public final class GovernanceExecuteProcessReporter
implements ExecuteProcessRep
}
}
- @Override
+ /**
+ * Report remove process context.
+ *
+ * @param executionID execution ID
+ */
public void reportRemove(final String executionID) {
ShowProcessListManager.getInstance().removeProcessStatement(executionID);
ShowProcessListManager.getInstance().removeProcessContext(executionID);
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/ShowProcessListManager.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ShowProcessListManager.java
similarity index 96%
rename from
mode/core/src/main/java/org/apache/shardingsphere/mode/process/ShowProcessListManager.java
rename to
infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ShowProcessListManager.java
index 6cfece88c34..7587e4d24d9 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/ShowProcessListManager.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ShowProcessListManager.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.process;
+package org.apache.shardingsphere.infra.executor.sql.process;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
+import
org.apache.shardingsphere.infra.executor.sql.process.lock.ShowProcessListSimpleLock;
import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
-import org.apache.shardingsphere.mode.process.lock.ShowProcessListSimpleLock;
import java.sql.Statement;
import java.util.Collection;
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/lock/ShowProcessListSimpleLock.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ShowProcessListSimpleLock.java
similarity index 96%
rename from
mode/core/src/main/java/org/apache/shardingsphere/mode/process/lock/ShowProcessListSimpleLock.java
rename to
infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ShowProcessListSimpleLock.java
index d5c63e40090..412d7268790 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/process/lock/ShowProcessListSimpleLock.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ShowProcessListSimpleLock.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.process.lock;
+package org.apache.shardingsphere.infra.executor.sql.process.lock;
import lombok.SneakyThrows;
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/spi/ExecuteProcessReporter.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/spi/ExecuteProcessReporter.java
deleted file mode 100644
index a08fa318180..00000000000
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/spi/ExecuteProcessReporter.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.executor.sql.process.spi;
-
-import org.apache.shardingsphere.infra.binder.QueryContext;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
-import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
-import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.infra.util.spi.type.optional.OptionalSPI;
-
-/**
- * Execute process report.
- */
-@SingletonSPI
-public interface ExecuteProcessReporter extends OptionalSPI {
-
- /**
- * Report this connection for proxy.
- *
- * @param executionGroupContext execution group context
- */
- void report(ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext);
-
- /**
- * Report the summary of this task.
- *
- * @param queryContext query context
- * @param executionGroupContext execution group context
- * @param constants constants
- * @param eventBusContext event bus context
- */
- void report(QueryContext queryContext, ExecutionGroupContext<? extends
SQLExecutionUnit> executionGroupContext, ExecuteProcessConstants constants,
EventBusContext eventBusContext);
-
- /**
- * Report a unit of this task.
- *
- * @param executionID execution ID
- * @param executionUnit execution unit
- * @param constants constants
- * @param eventBusContext event bus context
- */
- void report(String executionID, SQLExecutionUnit executionUnit,
ExecuteProcessConstants constants, EventBusContext eventBusContext);
-
- /**
- * Report this task on completion.
- *
- * @param executionID execution ID
- * @param constants constants
- * @param eventBusContext event bus context
- */
- void report(String executionID, ExecuteProcessConstants constants,
EventBusContext eventBusContext);
-
- /**
- * Report clean the task.
- *
- * @param executionID execution ID
- */
- void reportClean(String executionID);
-
- /**
- * Report remove process context.
- *
- * @param executionID execution ID
- */
- void reportRemove(String executionID);
-}
diff --git
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngineTest.java
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngineTest.java
deleted file mode 100644
index 104d8655afc..00000000000
---
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngineTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.executor.sql.process;
-
-import org.apache.shardingsphere.infra.binder.QueryContext;
-import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
-import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorDataMap;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.process.fixture.ExecuteProcessReporterFixture;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
-import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.ddl.MySQLCreateTableStatement;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.UUID;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public final class ExecuteProcessEngineTest {
-
- private ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext;
-
- private final EventBusContext eventBusContext = new EventBusContext();
-
- @Before
- public void setUp() {
- executionGroupContext = createMockedExecutionGroups();
- ExecuteProcessEngine.initializeExecution(createQueryContext(),
executionGroupContext, eventBusContext);
- assertThat(ExecutorDataMap.getValue().get("EXECUTE_ID"),
is(executionGroupContext.getExecutionID()));
- assertThat(ExecuteProcessReporterFixture.ACTIONS.get(0), is("Report
the summary of this task."));
- }
-
- @Test
- public void assertFinish() {
-
ExecuteProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
mock(RawSQLExecutionUnit.class), eventBusContext);
- assertThat(ExecuteProcessReporterFixture.ACTIONS.get(1), is("Report a
unit of this task."));
-
ExecuteProcessEngine.finishExecution(executionGroupContext.getExecutionID(),
eventBusContext);
- assertThat(ExecuteProcessReporterFixture.ACTIONS.get(2), is("Report
this task on completion."));
- }
-
- @Test
- public void assertClean() {
- ExecuteProcessEngine.cleanExecution();
- assertTrue(ExecutorDataMap.getValue().isEmpty());
- }
-
- private QueryContext createQueryContext() {
- SQLStatementContext sqlStatementContext =
mock(SQLStatementContext.class);
-
when(sqlStatementContext.getSqlStatement()).thenReturn(mock(MySQLCreateTableStatement.class));
- QueryContext result = mock(QueryContext.class);
- when(result.getSqlStatementContext()).thenReturn(sqlStatementContext);
- return result;
- }
-
- private ExecutionGroupContext<? extends SQLExecutionUnit>
createMockedExecutionGroups() {
- ExecutionGroupContext<? extends SQLExecutionUnit> result =
mock(ExecutionGroupContext.class);
- when(result.getExecutionID()).thenReturn(UUID.randomUUID().toString());
- return result;
- }
-}
diff --git
a/mode/core/src/test/java/org/apache/shardingsphere/mode/process/GovernanceExecuteProcessReporterTest.java
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessReporterTest.java
similarity index 91%
rename from
mode/core/src/test/java/org/apache/shardingsphere/mode/process/GovernanceExecuteProcessReporterTest.java
rename to
infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessReporterTest.java
index 43360267220..132bb71fa4b 100644
---
a/mode/core/src/test/java/org/apache/shardingsphere/mode/process/GovernanceExecuteProcessReporterTest.java
+++
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessReporterTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.process;
+package org.apache.shardingsphere.infra.executor.sql.process;
import org.apache.shardingsphere.infra.binder.QueryContext;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
@@ -39,13 +39,13 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-public final class GovernanceExecuteProcessReporterTest {
+public class ExecuteProcessReporterTest {
private MockedStatic<ShowProcessListManager> mockedStatic;
private ShowProcessListManager showProcessListManager;
- private final GovernanceExecuteProcessReporter reporter = new
GovernanceExecuteProcessReporter();
+ private final ExecuteProcessReporter reporter = new
ExecuteProcessReporter();
@Before
public void setUp() {
@@ -58,7 +58,7 @@ public final class GovernanceExecuteProcessReporterTest {
public void assertReport() {
QueryContext queryContext = new QueryContext(null, null, null);
ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext = mockExecutionGroupContext();
- reporter.report(queryContext, executionGroupContext,
ExecuteProcessConstants.EXECUTE_ID,
EventBusContextHolderFixture.EVENT_BUS_CONTEXT);
+ reporter.report(queryContext, executionGroupContext,
ExecuteProcessConstants.EXECUTE_ID);
verify(showProcessListManager,
times(1)).putProcessContext(eq(executionGroupContext.getExecutionID()), any());
}
@@ -77,7 +77,7 @@ public final class GovernanceExecuteProcessReporterTest {
ExecuteProcessContext executeProcessContext =
mock(ExecuteProcessContext.class);
when(executeProcessContext.getProcessUnits()).thenReturn(Collections.emptyMap());
when(showProcessListManager.getProcessContext("foo_id")).thenReturn(executeProcessContext);
- reporter.report("foo_id", sqlExecutionUnit,
ExecuteProcessConstants.EXECUTE_ID,
EventBusContextHolderFixture.EVENT_BUS_CONTEXT);
+ reporter.report("foo_id", sqlExecutionUnit,
ExecuteProcessConstants.EXECUTE_ID);
verify(showProcessListManager,
times(1)).getProcessContext(eq("foo_id"));
}
diff --git
a/mode/core/src/test/java/org/apache/shardingsphere/mode/process/EventBusContextHolderFixture.java
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/fixture/EventBusContextHolderFixture.java
similarity index 93%
rename from
mode/core/src/test/java/org/apache/shardingsphere/mode/process/EventBusContextHolderFixture.java
rename to
infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/fixture/EventBusContextHolderFixture.java
index 7a914709524..a82ebc0b08d 100644
---
a/mode/core/src/test/java/org/apache/shardingsphere/mode/process/EventBusContextHolderFixture.java
+++
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/fixture/EventBusContextHolderFixture.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.process;
+package org.apache.shardingsphere.infra.executor.sql.process.fixture;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
diff --git
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/fixture/ExecuteProcessReporterFixture.java
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/fixture/ExecuteProcessReporterFixture.java
deleted file mode 100644
index c9ffcb69235..00000000000
---
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/fixture/ExecuteProcessReporterFixture.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.executor.sql.process.fixture;
-
-import org.apache.shardingsphere.infra.binder.QueryContext;
-import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
-import
org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessReporter;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
-
-import java.util.LinkedList;
-import java.util.List;
-
-public final class ExecuteProcessReporterFixture implements
ExecuteProcessReporter {
-
- public static final List<String> ACTIONS = new LinkedList<>();
-
- @Override
- public void report(final ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext) {
- }
-
- @Override
- public void report(final QueryContext queryContext, final
ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, final
ExecuteProcessConstants constants,
- final EventBusContext eventBusContext) {
- ACTIONS.add("Report the summary of this task.");
- }
-
- @Override
- public void report(final String executionID, final SQLExecutionUnit
executionUnit, final ExecuteProcessConstants constants, final EventBusContext
eventBusContext) {
- ACTIONS.add("Report a unit of this task.");
- }
-
- @Override
- public void report(final String executionID, final ExecuteProcessConstants
constants, final EventBusContext eventBusContext) {
- ACTIONS.add("Report this task on completion.");
- }
-
- @Override
- public void reportClean(final String executionID) {
- }
-
- @Override
- public void reportRemove(final String executionID) {
- }
-}
diff --git
a/infra/executor/src/test/resources/META-INF/services/org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessReporter
b/infra/executor/src/test/resources/META-INF/services/org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessReporter
deleted file mode 100644
index dd35d61f702..00000000000
---
a/infra/executor/src/test/resources/META-INF/services/org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessReporter
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.infra.executor.sql.process.fixture.ExecuteProcessReporterFixture
diff --git
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index b4581757752..594d954f7f8 100644
---
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -296,8 +296,8 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
private List<QueryResult> executeQuery0() throws SQLException {
if (hasRawExecutionRule()) {
- return
executor.getRawExecutor().execute(createRawExecutionGroupContext(),
executionContext.getQueryContext(),
- new
RawSQLExecutorCallback(eventBusContext)).stream().map(each -> (QueryResult)
each).collect(Collectors.toList());
+ return
executor.getRawExecutor().execute(createRawExecutionGroupContext(),
+ executionContext.getQueryContext(), new
RawSQLExecutorCallback()).stream().map(each -> (QueryResult)
each).collect(Collectors.toList());
}
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext();
cacheStatements(executionGroupContext.getInputGroups());
@@ -338,8 +338,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
}
executionContext = createExecutionContext(queryContext);
if (hasRawExecutionRule()) {
- Collection<ExecuteResult> executeResults =
executor.getRawExecutor().execute(createRawExecutionGroupContext(),
executionContext.getQueryContext(),
- new RawSQLExecutorCallback(eventBusContext));
+ Collection<ExecuteResult> executeResults =
executor.getRawExecutor().execute(createRawExecutionGroupContext(),
executionContext.getQueryContext(), new RawSQLExecutorCallback());
return accumulate(executeResults);
}
return isNeedImplicitCommitTransaction(executionContext) ?
executeUpdateWithImplicitCommitTransaction() : useDriverToExecuteUpdate();
@@ -408,8 +407,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
executionContext = createExecutionContext(queryContext);
if (hasRawExecutionRule()) {
// TODO process getStatement
- Collection<ExecuteResult> executeResults =
executor.getRawExecutor().execute(createRawExecutionGroupContext(),
executionContext.getQueryContext(),
- new RawSQLExecutorCallback(eventBusContext));
+ Collection<ExecuteResult> executeResults =
executor.getRawExecutor().execute(createRawExecutionGroupContext(),
executionContext.getQueryContext(), new RawSQLExecutorCallback());
return executeResults.iterator().next() instanceof QueryResult;
}
return isNeedImplicitCommitTransaction(executionContext) ?
executeWithImplicitCommitTransaction() : useDriverToExecute();
diff --git
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index bbbbcf988dc..c67a37ba61b 100644
---
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -216,8 +216,8 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
private List<QueryResult> executeQuery0() throws SQLException {
if
(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
- return
executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getQueryContext(),
- new
RawSQLExecutorCallback(eventBusContext)).stream().map(each -> (QueryResult)
each).collect(Collectors.toList());
+ return executor.getRawExecutor().execute(
+ createRawExecutionContext(),
executionContext.getQueryContext(), new
RawSQLExecutorCallback()).stream().map(each -> (QueryResult)
each).collect(Collectors.toList());
}
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext();
cacheStatements(executionGroupContext.getInputGroups());
@@ -254,7 +254,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
}
executionContext = createExecutionContext(queryContext);
if
(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
- return
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getQueryContext(), new
RawSQLExecutorCallback(eventBusContext)));
+ return
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getQueryContext(), new RawSQLExecutorCallback()));
}
return executeUpdate((actualSQL, statement) ->
statement.executeUpdate(actualSQL), executionContext.getSqlStatementContext());
// CHECKSTYLE:OFF
@@ -282,7 +282,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
}
executionContext = createExecutionContext(queryContext);
if
(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
- return
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getQueryContext(), new
RawSQLExecutorCallback(eventBusContext)));
+ return
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getQueryContext(), new RawSQLExecutorCallback()));
}
return executeUpdate((actualSQL, statement) ->
statement.executeUpdate(actualSQL, autoGeneratedKeys),
executionContext.getSqlStatementContext());
// CHECKSTYLE:OFF
@@ -308,7 +308,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
}
executionContext = createExecutionContext(queryContext);
if
(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
- return
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getQueryContext(), new
RawSQLExecutorCallback(eventBusContext)));
+ return
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getQueryContext(), new RawSQLExecutorCallback()));
}
return executeUpdate((actualSQL, statement) ->
statement.executeUpdate(actualSQL, columnIndexes),
executionContext.getSqlStatementContext());
// CHECKSTYLE:OFF
@@ -334,7 +334,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
}
executionContext = createExecutionContext(queryContext);
if
(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
- return
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getQueryContext(), new
RawSQLExecutorCallback(eventBusContext)));
+ return
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getQueryContext(), new RawSQLExecutorCallback()));
}
return executeUpdate((actualSQL, statement) ->
statement.executeUpdate(actualSQL, columnNames),
executionContext.getSqlStatementContext());
// CHECKSTYLE:OFF
@@ -470,7 +470,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
executionContext = createExecutionContext(queryContext);
if
(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
// TODO process getStatement
- Collection<ExecuteResult> results =
executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getQueryContext(), new
RawSQLExecutorCallback(eventBusContext));
+ Collection<ExecuteResult> results =
executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getQueryContext(), new RawSQLExecutorCallback());
return results.iterator().next() instanceof QueryResult;
}
return isNeedImplicitCommitTransaction(executionContext) ?
executeWithImplicitCommitTransaction(callback) : useDriverToExecute(callback);
diff --git
a/mode/core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessReporter
b/mode/core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessReporter
deleted file mode 100644
index c9761f91b35..00000000000
---
a/mode/core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessReporter
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.mode.process.GovernanceExecuteProcessReporter
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
index 9f82191f82e..280ad10065e 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
@@ -18,6 +18,8 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
import com.google.common.eventbus.Subscribe;
+import
org.apache.shardingsphere.infra.executor.sql.process.ShowProcessListManager;
+import
org.apache.shardingsphere.infra.executor.sql.process.lock.ShowProcessListSimpleLock;
import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
import
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.BatchYamlExecuteProcessContext;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -28,8 +30,6 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
-import org.apache.shardingsphere.mode.process.ShowProcessListManager;
-import org.apache.shardingsphere.mode.process.lock.ShowProcessListSimpleLock;
import org.apache.shardingsphere.mode.process.node.ProcessNode;
import java.sql.SQLException;
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/process/subscriber/ProcessRegistrySubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/process/subscriber/ProcessRegistrySubscriber.java
index 9006bf3cccb..ba8b5dfceb9 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/process/subscriber/ProcessRegistrySubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/process/subscriber/ProcessRegistrySubscriber.java
@@ -18,15 +18,15 @@
package org.apache.shardingsphere.mode.manager.cluster.process.subscriber;
import com.google.common.eventbus.Subscribe;
+import
org.apache.shardingsphere.infra.executor.sql.process.ShowProcessListManager;
+import
org.apache.shardingsphere.infra.executor.sql.process.lock.ShowProcessListSimpleLock;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.persist.PersistRepository;
-import org.apache.shardingsphere.mode.process.ShowProcessListManager;
import
org.apache.shardingsphere.mode.process.event.KillProcessListIdRequestEvent;
import
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
import
org.apache.shardingsphere.mode.process.event.ShowProcessListResponseEvent;
-import org.apache.shardingsphere.mode.process.lock.ShowProcessListSimpleLock;
import org.apache.shardingsphere.mode.process.node.ProcessNode;
import java.util.Collection;
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriberTest.java
index 2d800cbae39..29c0f9eeda9 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriberTest.java
@@ -20,6 +20,8 @@ package
org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import
org.apache.shardingsphere.infra.executor.sql.process.ShowProcessListManager;
+import
org.apache.shardingsphere.infra.executor.sql.process.lock.ShowProcessListSimpleLock;
import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import
org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
@@ -36,8 +38,6 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
-import org.apache.shardingsphere.mode.process.ShowProcessListManager;
-import org.apache.shardingsphere.mode.process.lock.ShowProcessListSimpleLock;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.junit.Before;
diff --git
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriber.java
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriber.java
index 9eaf487d252..ca7f3c6b5d9 100644
---
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriber.java
+++
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriber.java
@@ -19,10 +19,10 @@ package
org.apache.shardingsphere.mode.manager.standalone.subscriber;
import com.google.common.eventbus.Subscribe;
import lombok.SneakyThrows;
+import
org.apache.shardingsphere.infra.executor.sql.process.ShowProcessListManager;
import
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.BatchYamlExecuteProcessContext;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import org.apache.shardingsphere.mode.process.ShowProcessListManager;
import
org.apache.shardingsphere.mode.process.event.KillProcessListIdRequestEvent;
import
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
import
org.apache.shardingsphere.mode.process.event.ShowProcessListResponseEvent;
diff --git
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriberTest.java
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriberTest.java
index 9ae4097e130..b10af07b7e7 100644
---
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriberTest.java
+++
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/ProcessStandaloneSubscriberTest.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.mode.manager.standalone.subscriber;
+import
org.apache.shardingsphere.infra.executor.sql.process.ShowProcessListManager;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
-import org.apache.shardingsphere.mode.process.ShowProcessListManager;
import
org.apache.shardingsphere.mode.process.event.ShowProcessListRequestEvent;
import org.junit.After;
import org.junit.Before;
diff --git
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index 4707bce23dc..ba99bec52ae 100644
---
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -207,8 +207,7 @@ public final class ProxySQLExecutor {
executionGroupContext.setGrantee(backendConnection.getConnectionSession().getGrantee());
executionGroupContext.setExecutionID(backendConnection.getConnectionSession().getExecutionId());
// TODO handle query header
- return rawExecutor.execute(executionGroupContext,
executionContext.getQueryContext(), new
RawSQLExecutorCallback(ProxyContext.getInstance().getContextManager().getInstanceContext()
- .getEventBusContext()));
+ return rawExecutor.execute(executionGroupContext,
executionContext.getQueryContext(), new RawSQLExecutorCallback());
}
private List<ExecuteResult> useDriverToExecute(final ExecutionContext
executionContext, final Collection<ShardingSphereRule> rules,