This is an automated email from the ASF dual-hosted git repository.
jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 2bd70383791 Rename ProcessContext to Process (#25443)
2bd70383791 is described below
commit 2bd70383791a1c32d9accd61ba664cb23b75125d
Author: Liang Zhang <[email protected]>
AuthorDate: Wed May 3 23:26:55 2023 +0800
Rename ProcessContext to Process (#25443)
---
...42\200\231s_Show_processlist_&_Kill_Work.en.md" | 46 +++++++++++-----------
.../process/{ProcessContext.java => Process.java} | 10 ++---
.../infra/executor/sql/process/ProcessEngine.java | 20 +++++-----
.../executor/sql/process/ProcessRegistry.java | 32 +++++++--------
.../{YamlProcessContext.java => YamlProcess.java} | 14 +++----
...ocessListContexts.java => YamlProcessList.java} | 6 +--
...xtsSwapper.java => YamlProcessListSwapper.java} | 20 +++++-----
...ContextSwapper.java => YamlProcessSwapper.java} | 22 +++++------
.../executor/sql/process/ProcessEngineTest.java | 6 +--
.../fixture/EventBusContextHolderFixture.java | 25 ------------
...erTest.java => YamlProcessListSwapperTest.java} | 26 ++++++------
...wapperTest.java => YamlProcessSwapperTest.java} | 18 ++++-----
.../process/ShowProcessListResponseEvent.java | 2 +-
.../subscriber/ProcessListChangedSubscriber.java | 18 ++++-----
.../ProcessListChangedSubscriberTest.java | 20 +++++-----
.../StandaloneProcessListSubscriber.java | 18 ++++-----
.../StandaloneProcessListSubscriberTest.java | 2 +-
.../admin/executor/ShowProcessListExecutor.java | 40 +++++++++----------
.../executor/ShowProcessListExecutorTest.java | 8 ++--
19 files changed, 164 insertions(+), 189 deletions(-)
diff --git
"a/docs/blog/content/material/2022_09_22_How_does_ShardingSphere\342\200\231s_Show_processlist_&_Kill_Work.en.md"
"b/docs/blog/content/material/2022_09_22_How_does_ShardingSphere\342\200\231s_Show_processlist_&_Kill_Work.en.md"
index 1924974eca8..e58ddd8f2f1 100644
---
"a/docs/blog/content/material/2022_09_22_How_does_ShardingSphere\342\200\231s_Show_processlist_&_Kill_Work.en.md"
+++
"b/docs/blog/content/material/2022_09_22_How_does_ShardingSphere\342\200\231s_Show_processlist_&_Kill_Work.en.md"
@@ -55,9 +55,9 @@ public final class GovernanceExecuteProcessReporter
implements ExecuteProcessRep
@Override
public void report(final QueryContext queryContext, final
ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext,
final ExecuteProcessConstants constants, final
EventBusContext eventBusContext) {
- ExecuteProcessContext processContext = new
ExecuteProcessContext(queryContext.getSql(), executionGroupContext, constants);
-
ShowProcessListManager.getInstance().putProcessContext(processContext.getProcessID(),
processContext);
-
ShowProcessListManager.getInstance().putProcessStatement(processContext.getProcessID(),
processContext.getProcessStatements());
+ ExecuteProcessContext process = new
ExecuteProcessContext(queryContext.getSql(), executionGroupContext, constants);
+
ShowProcessListManager.getInstance().putProcessContext(process.getProcessID(),
process);
+
ShowProcessListManager.getInstance().putProcessStatement(process.getProcessID(),
process.getProcessStatements());
}
}@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ShowProcessListManager {
@@ -65,7 +65,7 @@ public final class ShowProcessListManager {
private static final ShowProcessListManager INSTANCE = new
ShowProcessListManager();
@Getter
- private final Map<String, ExecuteProcessContext> processContexts = new
ConcurrentHashMap<>();
+ private final Map<String, ExecuteProcessContext> processes = new
ConcurrentHashMap<>();
@Getter
private final Map<String, Collection<Statement>> processStatements = new
ConcurrentHashMap<>();
@@ -74,8 +74,8 @@ public final class ShowProcessListManager {
return INSTANCE;
}
- public void putProcessContext(final String processID, final
ExecuteProcessContext processContext) {
- processContexts.put(processID, processContext);
+ public void putProcessContext(final String processID, final
ExecuteProcessContext process) {
+ processes.put(processID, process);
}
public void putProcessStatement(final String processID, final
Collection<Statement> statements) {
@@ -87,7 +87,7 @@ public final class ShowProcessListManager {
}
```
-As shown above, the `ShowProcessListManager` class has two cache Maps, namely
`processContexts` and `processStatements`. The former stores the mapping
between `processID` and `ExecuteProcessContext`.
+As shown above, the `ShowProcessListManager` class has two cache Maps, namely
`processes` and `processStatements`. The former stores the mapping between
`processID` and `ExecuteProcessContext`.
The latter contains the mapping between `processID` and `Statement objects`
that may generate multiple statements after the SQL is overwritten.
@@ -130,7 +130,7 @@ public final class ProxyJDBCExecutor {
As shown above, `ExecuteProcessEngine.initialize(queryContext,
executionGroupContext, eventBusContext);` will store the SQL information in the
two cache Maps. Finally, `ExecuteProcessEngine.clean();` in the code block will
clear up the Map in the cache.
-The SQL shown in the `Show processlist` was obtained from `processContexts`.
But this Map is just a local cache. If ShardingSphere is deployed in cluster
mode, how does `Show processlist` obtain SQL running on other machines in the
cluster? Let's see how ShardingSphere handles it.
+The SQL shown in the `Show processlist` was obtained from `processes`. But
this Map is just a local cache. If ShardingSphere is deployed in cluster mode,
how does `Show processlist` obtain SQL running on other machines in the
cluster? Let's see how ShardingSphere handles it.
## 2.2 How does `Show processlist` work?
@@ -167,22 +167,22 @@ public final class ShowProcessListExecutor implements
DatabaseAdminQueryExecutor
if (null == batchProcessContexts || batchProcessContexts.isEmpty()) {
return new RawMemoryQueryResult(queryResultMetaData,
Collections.emptyList());
}
- Collection<YamlExecuteProcessContext> processContexts = new
LinkedList<>();
+ Collection<YamlExecuteProcessContext> processes = new LinkedList<>();
for (String each : batchProcessContexts) {
- processContexts.addAll(YamlEngine.unmarshal(each,
BatchYamlExecuteProcessContext.class).getContexts());
+ processes.addAll(YamlEngine.unmarshal(each,
BatchYamlExecuteProcessContext.class).getContexts());
}
- List<MemoryQueryResultDataRow> rows =
processContexts.stream().map(processContext -> {
+ List<MemoryQueryResultDataRow> rows = processes.stream().map(process
-> {
List<Object> rowValues = new ArrayList<>(8);
- rowValues.add(processContext.getProcessIDID());
- rowValues.add(processContext.getUsername());
- rowValues.add(processContext.getHostname());
- rowValues.add(processContext.getDatabaseName());
+ rowValues.add(process.getProcessIDID());
+ rowValues.add(process.getUsername());
+ rowValues.add(process.getHostname());
+ rowValues.add(process.getDatabaseName());
rowValues.add("Execute");
-
rowValues.add(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() -
processContext.getStartTimeMillis()));
- int processDoneCount =
processContext.getUnitStatuses().stream().map(each ->
ExecuteProcessConstants.EXECUTE_STATUS_DONE == each.getStatus() ? 1 :
0).reduce(0, Integer::sum);
+
rowValues.add(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() -
process.getStartTimeMillis()));
+ int processDoneCount = process.getUnitStatuses().stream().map(each
-> ExecuteProcessConstants.EXECUTE_STATUS_DONE == each.getStatus() ? 1 :
0).reduce(0, Integer::sum);
String statePrefix = "Executing ";
- rowValues.add(statePrefix + processDoneCount + "/" +
processContext.getUnitStatuses().size());
- String sql = processContext.getSql();
+ rowValues.add(statePrefix + processDoneCount + "/" +
process.getUnitStatuses().size());
+ String sql = process.getSql();
if (null != sql && sql.length() > 100) {
sql = sql.substring(0, 100);
}
@@ -311,17 +311,17 @@ public final class ClusterContextManagerCoordinator {
@Subscribe
if
(!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId()))
{
return;
}
- Collection<ExecuteProcessContext> processContexts =
ShowProcessListManager.getInstance().getAllProcessContext();
- if (!processContexts.isEmpty()) {
+ Collection<ExecuteProcessContext> processes =
ShowProcessListManager.getInstance().getAllProcessContext();
+ if (!processes.isEmpty()) {
registryCenter.getRepository().persist(ProcessNode.getProcessListInstancePath(event.getProcessId(),
event.getInstanceId()),
- YamlEngine.marshal(new
BatchYamlExecuteProcessContext(processContexts)));
+ YamlEngine.marshal(new
BatchYamlExecuteProcessContext(processes)));
}
registryCenter.getRepository().delete(ComputeNode.getProcessTriggerInstanceIdNodePath(event.getInstanceId(),
event.getProcessId()));
}
}
```
-`ClusterContextManagerCoordinator#triggerShowProcessList` will subscribe to
`ShowProcessListTriggerEvent`, in which `processContext` data is processed by
itself. `ShowProcessListManager.getInstance().getAllProcessContext()` retrieves
the `processContext` that is currently running (here the data refers to the SQL
information that ShardingSphere stores in the Map before each SQL execution,
which is described at the beginning of the article) and transfers it to the
persistence layer. If the [...]
+`ClusterContextManagerCoordinator#triggerShowProcessList` will subscribe to
`ShowProcessListTriggerEvent`, in which `process` data is processed by itself.
`ShowProcessListManager.getInstance().getAllProcessContext()` retrieves the
`process` that is currently running (here the data refers to the SQL
information that ShardingSphere stores in the Map before each SQL execution,
which is described at the beginning of the article) and transfers it to the
persistence layer. If the `/nodes/compu [...]
When you delete the node, monitoring will also be triggered and
`ShowProcessListUnitCompleteEvent` will be posted. This event will finally
awake the pending lock.
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessContext.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
similarity index 89%
rename from
infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessContext.java
rename to
infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
index b685744fef8..1d09b5ccf8e 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessContext.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
@@ -30,10 +30,10 @@ import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;
/**
- * Process context.
+ * Process.
*/
@Getter
-public final class ProcessContext {
+public final class Process {
private final String id;
@@ -55,15 +55,15 @@ public final class ProcessContext {
private final boolean idle;
- public ProcessContext(final ExecutionGroupContext<? extends
SQLExecutionUnit> executionGroupContext) {
+ public Process(final ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext) {
this("", executionGroupContext, true);
}
- public ProcessContext(final String sql, final ExecutionGroupContext<?
extends SQLExecutionUnit> executionGroupContext) {
+ public Process(final String sql, final ExecutionGroupContext<? extends
SQLExecutionUnit> executionGroupContext) {
this(sql, executionGroupContext, false);
}
- private ProcessContext(final String sql, final ExecutionGroupContext<?
extends SQLExecutionUnit> executionGroupContext, final boolean idle) {
+ private Process(final String sql, final ExecutionGroupContext<? extends
SQLExecutionUnit> executionGroupContext, final boolean idle) {
id = executionGroupContext.getReportContext().getProcessID();
startMillis = System.currentTimeMillis();
this.sql = sql;
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
index 00681c1f1ce..0dc47d491ff 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
@@ -43,8 +43,8 @@ public final class ProcessEngine {
*/
public String connect(final Grantee grantee, final String databaseName) {
ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext = new ExecutionGroupContext<>(Collections.emptyList(),
new ExecutionGroupReportContext(databaseName, grantee));
- ProcessContext processContext = new
ProcessContext(executionGroupContext);
-
ProcessRegistry.getInstance().putProcessContext(processContext.getId(),
processContext);
+ Process process = new Process(executionGroupContext);
+ ProcessRegistry.getInstance().putProcess(process.getId(), process);
return executionGroupContext.getReportContext().getProcessID();
}
@@ -54,7 +54,7 @@ public final class ProcessEngine {
* @param processID process ID
*/
public void disconnect(final String processID) {
- ProcessRegistry.getInstance().removeProcessContext(processID);
+ ProcessRegistry.getInstance().removeProcess(processID);
}
@@ -67,8 +67,8 @@ public final class ProcessEngine {
public void executeSQL(final ExecutionGroupContext<? extends
SQLExecutionUnit> executionGroupContext, final QueryContext queryContext) {
if
(isMySQLDDLOrDMLStatement(queryContext.getSqlStatementContext().getSqlStatement()))
{
ProcessIDContext.set(executionGroupContext.getReportContext().getProcessID());
- ProcessContext processContext = new
ProcessContext(queryContext.getSql(), executionGroupContext);
-
ProcessRegistry.getInstance().putProcessContext(processContext.getId(),
processContext);
+ Process process = new Process(queryContext.getSql(),
executionGroupContext);
+ ProcessRegistry.getInstance().putProcess(process.getId(), process);
}
}
@@ -79,7 +79,7 @@ public final class ProcessEngine {
if (ProcessIDContext.isEmpty()) {
return;
}
-
ProcessRegistry.getInstance().getProcessContext(ProcessIDContext.get()).completeExecutionUnit();
+
ProcessRegistry.getInstance().getProcess(ProcessIDContext.get()).completeExecutionUnit();
}
/**
@@ -89,13 +89,13 @@ public final class ProcessEngine {
if (ProcessIDContext.isEmpty()) {
return;
}
- ProcessContext processContext =
ProcessRegistry.getInstance().getProcessContext(ProcessIDContext.get());
- if (null == processContext) {
+ Process process =
ProcessRegistry.getInstance().getProcess(ProcessIDContext.get());
+ if (null == process) {
return;
}
ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext = new ExecutionGroupContext<>(
- Collections.emptyList(), new
ExecutionGroupReportContext(processContext.getDatabaseName(), new
Grantee(processContext.getUsername(), processContext.getHostname())));
-
ProcessRegistry.getInstance().putProcessContext(ProcessIDContext.get(), new
ProcessContext(executionGroupContext));
+ Collections.emptyList(), new
ExecutionGroupReportContext(process.getDatabaseName(), new
Grantee(process.getUsername(), process.getHostname())));
+ ProcessRegistry.getInstance().putProcess(ProcessIDContext.get(), new
Process(executionGroupContext));
ProcessIDContext.remove();
}
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java
index fc9a56ea638..fb0a49b99ea 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java
@@ -34,7 +34,7 @@ public final class ProcessRegistry {
private static final ProcessRegistry INSTANCE = new ProcessRegistry();
- private final Map<String, ProcessContext> processContexts = new
ConcurrentHashMap<>();
+ private final Map<String, Process> processes = new ConcurrentHashMap<>();
@Getter
private final Map<String, ShowProcessListLock> locks = new
ConcurrentHashMap<>();
@@ -49,40 +49,40 @@ public final class ProcessRegistry {
}
/**
- * Put process context.
+ * Put process.
*
* @param processID process ID
- * @param processContext process context
+ * @param process process
*/
- public void putProcessContext(final String processID, final ProcessContext
processContext) {
- processContexts.put(processID, processContext);
+ public void putProcess(final String processID, final Process process) {
+ processes.put(processID, process);
}
/**
- * Get process context.
+ * Get process.
*
* @param processID process ID
- * @return process context
+ * @return process
*/
- public ProcessContext getProcessContext(final String processID) {
- return processContexts.get(processID);
+ public Process getProcess(final String processID) {
+ return processes.get(processID);
}
/**
- * Remove process context.
+ * Remove process.
*
* @param processID process ID
*/
- public void removeProcessContext(final String processID) {
- processContexts.remove(processID);
+ public void removeProcess(final String processID) {
+ processes.remove(processID);
}
/**
- * Get all process contexts.
+ * Get all process.
*
- * @return all process contexts
+ * @return all processes
*/
- public Collection<ProcessContext> getAllProcessContexts() {
- return processContexts.values();
+ public Collection<Process> getAllProcesses() {
+ return processes.values();
}
}
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcessContext.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
similarity index 88%
rename from
infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcessContext.java
rename to
infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
index dc14b80af4f..d72828dff96 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcessContext.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
@@ -22,13 +22,17 @@ import lombok.Setter;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
/**
- * Process context for YAML.
+ * YAML process.
*/
@Getter
@Setter
-public final class YamlProcessContext implements YamlConfiguration {
+public final class YamlProcess implements YamlConfiguration {
- private String processID;
+ private String id;
+
+ private Long startMillis;
+
+ private String sql;
private String databaseName;
@@ -36,13 +40,9 @@ public final class YamlProcessContext implements
YamlConfiguration {
private String hostname;
- private String sql;
-
private int totalUnitCount;
private int completedUnitCount;
- private Long startTimeMillis;
-
private boolean idle;
}
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcessListContexts.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcessList.java
similarity index 85%
rename from
infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcessListContexts.java
rename to
infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcessList.java
index a5cd7374d0c..2b7c8967398 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcessListContexts.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcessList.java
@@ -25,11 +25,11 @@ import java.util.Collection;
import java.util.LinkedList;
/**
- * Process list contexts for YAML.
+ * YAML process list.
*/
@Getter
@Setter
-public final class YamlProcessListContexts implements YamlConfiguration {
+public final class YamlProcessList implements YamlConfiguration {
- private Collection<YamlProcessContext> contexts = new LinkedList<>();
+ private Collection<YamlProcess> processes = new LinkedList<>();
}
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListContextsSwapper.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapper.java
similarity index 63%
rename from
infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListContextsSwapper.java
rename to
infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapper.java
index 47abd91412b..a4c10da1b46 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListContextsSwapper.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapper.java
@@ -17,29 +17,29 @@
package org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessContext;
-import
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessListContexts;
+import org.apache.shardingsphere.infra.executor.sql.process.Process;
+import
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList;
import
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
import java.util.Collection;
import java.util.stream.Collectors;
/**
- * YAML process list contexts swapper.
+ * YAML process list swapper.
*/
-public final class YamlProcessListContextsSwapper implements
YamlConfigurationSwapper<YamlProcessListContexts, Collection<ProcessContext>> {
+public final class YamlProcessListSwapper implements
YamlConfigurationSwapper<YamlProcessList, Collection<Process>> {
- private final YamlProcessContextSwapper yamlProcessContextSwapper = new
YamlProcessContextSwapper();
+ private final YamlProcessSwapper yamlProcessSwapper = new
YamlProcessSwapper();
@Override
- public YamlProcessListContexts swapToYamlConfiguration(final
Collection<ProcessContext> data) {
- YamlProcessListContexts result = new YamlProcessListContexts();
-
result.setContexts(data.stream().map(yamlProcessContextSwapper::swapToYamlConfiguration).collect(Collectors.toList()));
+ public YamlProcessList swapToYamlConfiguration(final Collection<Process>
data) {
+ YamlProcessList result = new YamlProcessList();
+
result.setProcesses(data.stream().map(yamlProcessSwapper::swapToYamlConfiguration).collect(Collectors.toList()));
return result;
}
@Override
- public Collection<ProcessContext> swapToObject(final
YamlProcessListContexts yamlConfig) {
- throw new
UnsupportedOperationException("YamlProcessListContextsSwapper.swapToObject");
+ public Collection<Process> swapToObject(final YamlProcessList yamlConfig) {
+ throw new
UnsupportedOperationException("YamlProcessListSwapper.swapToObject");
}
}
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessContextSwapper.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
similarity index 71%
rename from
infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessContextSwapper.java
rename to
infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
index b7ef881f211..f9800611dae 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessContextSwapper.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
@@ -17,32 +17,32 @@
package org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessContext;
-import
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessContext;
+import org.apache.shardingsphere.infra.executor.sql.process.Process;
+import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcess;
import
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
/**
- * YAML process context swapper.
+ * YAML process swapper.
*/
-public final class YamlProcessContextSwapper implements
YamlConfigurationSwapper<YamlProcessContext, ProcessContext> {
+public final class YamlProcessSwapper implements
YamlConfigurationSwapper<YamlProcess, Process> {
@Override
- public YamlProcessContext swapToYamlConfiguration(final ProcessContext
data) {
- YamlProcessContext result = new YamlProcessContext();
- result.setProcessID(data.getId());
+ public YamlProcess swapToYamlConfiguration(final Process data) {
+ YamlProcess result = new YamlProcess();
+ result.setId(data.getId());
+ result.setStartMillis(data.getStartMillis());
+ result.setSql(data.getSql());
result.setDatabaseName(data.getDatabaseName());
result.setUsername(data.getUsername());
result.setHostname(data.getHostname());
- result.setSql(data.getSql());
result.setTotalUnitCount(data.getTotalUnitCount());
result.setCompletedUnitCount(data.getCompletedUnitCount());
- result.setStartTimeMillis(data.getStartMillis());
result.setIdle(data.isIdle());
return result;
}
@Override
- public ProcessContext swapToObject(final YamlProcessContext yamlConfig) {
- throw new
UnsupportedOperationException("YamlProcessContextSwapper.swapToObject");
+ public Process swapToObject(final YamlProcess yamlConfig) {
+ throw new
UnsupportedOperationException("YamlProcessSwapper.swapToObject");
}
}
diff --git
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
index 2f8fdf05683..9ae5283c414 100644
---
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
+++
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
@@ -59,7 +59,7 @@ class ProcessEngineTest {
void assertExecuteSQL() {
ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext = mockExecutionGroupContext();
new ProcessEngine().executeSQL(executionGroupContext, new
QueryContext(new UpdateStatementContext(getSQLStatement()), null, null));
-
verify(processRegistry).putProcessContext(eq(executionGroupContext.getReportContext().getProcessID()),
any());
+
verify(processRegistry).putProcess(eq(executionGroupContext.getReportContext().getProcessID()),
any());
}
@SuppressWarnings("unchecked")
@@ -81,9 +81,9 @@ class ProcessEngineTest {
@Test
void assertCompleteSQLUnitExecution() {
ProcessIDContext.set("foo_id");
-
when(processRegistry.getProcessContext("foo_id")).thenReturn(mock(ProcessContext.class));
+
when(processRegistry.getProcess("foo_id")).thenReturn(mock(Process.class));
new ProcessEngine().completeSQLUnitExecution();
- verify(processRegistry).getProcessContext("foo_id");
+ verify(processRegistry).getProcess("foo_id");
ProcessIDContext.remove();
}
}
diff --git
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/fixture/EventBusContextHolderFixture.java
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/fixture/EventBusContextHolderFixture.java
deleted file mode 100644
index a82ebc0b08d..00000000000
---
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/fixture/EventBusContextHolderFixture.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.executor.sql.process.fixture;
-
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
-
-public final class EventBusContextHolderFixture {
-
- public static final EventBusContext EVENT_BUS_CONTEXT = new
EventBusContext();
-}
diff --git
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlAllProcessContextsSwapperTest.java
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
similarity index 79%
rename from
infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlAllProcessContextsSwapperTest.java
rename to
infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
index 86eab04922b..e510cbfdd81 100644
---
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlAllProcessContextsSwapperTest.java
+++
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
@@ -20,9 +20,9 @@ package
org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessContext;
-import
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessContext;
-import
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessListContexts;
+import org.apache.shardingsphere.infra.executor.sql.process.Process;
+import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcess;
+import
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.junit.jupiter.api.Test;
@@ -35,32 +35,32 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
-class YamlAllProcessContextsSwapperTest {
+class YamlProcessListSwapperTest {
@Test
void assertSwapToYamlConfiguration() {
ExecutionGroupReportContext reportContext = new
ExecutionGroupReportContext("foo_db", new Grantee("root", "localhost"));
ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext = new ExecutionGroupContext<>(Collections.emptyList(),
reportContext);
- ProcessContext processContext = new ProcessContext("SELECT 1",
executionGroupContext);
- YamlProcessListContexts actual = new
YamlProcessListContextsSwapper().swapToYamlConfiguration(Collections.singleton(processContext));
- assertThat(actual.getContexts().size(), is(1));
- assertYamlProcessContext(actual.getContexts().iterator().next());
+ Process process = new Process("SELECT 1", executionGroupContext);
+ YamlProcessList actual = new
YamlProcessListSwapper().swapToYamlConfiguration(Collections.singleton(process));
+ assertThat(actual.getProcesses().size(), is(1));
+ assertYamlProcessContext(actual.getProcesses().iterator().next());
}
- private static void assertYamlProcessContext(final YamlProcessContext
actual) {
- assertNotNull(actual.getProcessID());
+ private static void assertYamlProcessContext(final YamlProcess actual) {
+ assertNotNull(actual.getId());
+ assertThat(actual.getStartMillis(),
lessThanOrEqualTo(System.currentTimeMillis()));
+ assertThat(actual.getSql(), is("SELECT 1"));
assertThat(actual.getDatabaseName(), is("foo_db"));
assertThat(actual.getUsername(), is("root"));
assertThat(actual.getHostname(), is("localhost"));
- assertThat(actual.getSql(), is("SELECT 1"));
assertThat(actual.getCompletedUnitCount(), is(0));
assertThat(actual.getTotalUnitCount(), is(0));
- assertThat(actual.getStartTimeMillis(),
lessThanOrEqualTo(System.currentTimeMillis()));
assertFalse(actual.isIdle());
}
@Test
void assertSwapToObject() {
- assertThrows(UnsupportedOperationException.class, () -> new
YamlProcessListContextsSwapper().swapToObject(new YamlProcessListContexts()));
+ assertThrows(UnsupportedOperationException.class, () -> new
YamlProcessListSwapper().swapToObject(new YamlProcessList()));
}
}
diff --git
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessContextSwapperTest.java
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
similarity index 83%
rename from
infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessContextSwapperTest.java
rename to
infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
index 376c1b614c7..871ede91e3e 100644
---
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessContextSwapperTest.java
+++
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
@@ -20,8 +20,8 @@ package
org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessContext;
-import
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessContext;
+import org.apache.shardingsphere.infra.executor.sql.process.Process;
+import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcess;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.junit.jupiter.api.Test;
@@ -34,27 +34,27 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
-class YamlProcessContextSwapperTest {
+class YamlProcessSwapperTest {
@Test
void assertSwapToYamlConfiguration() {
ExecutionGroupReportContext reportContext = new
ExecutionGroupReportContext("foo_db", new Grantee("root", "localhost"));
ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext = new ExecutionGroupContext<>(Collections.emptyList(),
reportContext);
- ProcessContext processContext = new ProcessContext("SELECT 1",
executionGroupContext);
- YamlProcessContext actual = new
YamlProcessContextSwapper().swapToYamlConfiguration(processContext);
- assertNotNull(actual.getProcessID());
+ Process process = new Process("SELECT 1", executionGroupContext);
+ YamlProcess actual = new
YamlProcessSwapper().swapToYamlConfiguration(process);
+ assertNotNull(actual.getId());
+ assertThat(actual.getStartMillis(),
lessThanOrEqualTo(System.currentTimeMillis()));
+ assertThat(actual.getSql(), is("SELECT 1"));
assertThat(actual.getDatabaseName(), is("foo_db"));
assertThat(actual.getUsername(), is("root"));
assertThat(actual.getHostname(), is("localhost"));
- assertThat(actual.getSql(), is("SELECT 1"));
assertThat(actual.getCompletedUnitCount(), is(0));
assertThat(actual.getTotalUnitCount(), is(0));
- assertThat(actual.getStartTimeMillis(),
lessThanOrEqualTo(System.currentTimeMillis()));
assertFalse(actual.isIdle());
}
@Test
void assertSwapToObject() {
- assertThrows(UnsupportedOperationException.class, () -> new
YamlProcessContextSwapper().swapToObject(new YamlProcessContext()));
+ assertThrows(UnsupportedOperationException.class, () -> new
YamlProcessSwapper().swapToObject(new YamlProcess()));
}
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/process/ShowProcessListResponseEvent.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/process/ShowProcessListResponseEvent.java
index 8250b24d0e3..d13ddd10ba1 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/process/ShowProcessListResponseEvent.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/process/ShowProcessListResponseEvent.java
@@ -29,5 +29,5 @@ import java.util.Collection;
@Getter
public final class ShowProcessListResponseEvent {
- private final Collection<String> processListContexts;
+ private final Collection<String> processes;
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
index 42551d17114..397d9b76cc9 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
@@ -18,10 +18,10 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber;
import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessContext;
+import org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
import
org.apache.shardingsphere.infra.executor.sql.process.lock.ShowProcessListLock;
-import
org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListContextsSwapper;
+import
org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
@@ -46,7 +46,7 @@ public final class ProcessListChangedSubscriber {
private final ContextManager contextManager;
- private final YamlProcessListContextsSwapper swapper = new
YamlProcessListContextsSwapper();
+ private final YamlProcessListSwapper swapper = new
YamlProcessListSwapper();
public ProcessListChangedSubscriber(final RegistryCenter registryCenter,
final ContextManager contextManager) {
this.registryCenter = registryCenter;
@@ -64,10 +64,10 @@ public final class ProcessListChangedSubscriber {
if
(!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId()))
{
return;
}
- Collection<ProcessContext> processContexts =
ProcessRegistry.getInstance().getAllProcessContexts();
- if (!processContexts.isEmpty()) {
+ Collection<Process> processes =
ProcessRegistry.getInstance().getAllProcesses();
+ if (!processes.isEmpty()) {
registryCenter.getRepository().persist(
- ProcessNode.getProcessListInstancePath(event.getTaskId(),
event.getInstanceId()),
YamlEngine.marshal(swapper.swapToYamlConfiguration(processContexts)));
+ ProcessNode.getProcessListInstancePath(event.getTaskId(),
event.getInstanceId()),
YamlEngine.marshal(swapper.swapToYamlConfiguration(processes)));
}
registryCenter.getRepository().delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(),
event.getTaskId()));
}
@@ -96,9 +96,9 @@ public final class ProcessListChangedSubscriber {
if
(!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId()))
{
return;
}
- ProcessContext processContext =
ProcessRegistry.getInstance().getProcessContext(event.getProcessId());
- if (null != processContext) {
- for (Statement each : processContext.getProcessStatements()) {
+ Process process =
ProcessRegistry.getInstance().getProcess(event.getProcessId());
+ if (null != process) {
+ for (Statement each : process.getProcessStatements()) {
each.cancel();
}
}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
index 62df5687bce..fe5f4e1256a 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.proc
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessContext;
+import org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
import
org.apache.shardingsphere.infra.executor.sql.process.lock.ShowProcessListLock;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
@@ -102,18 +102,18 @@ class ProcessListChangedSubscriberTest {
@Test
void assertReportLocalProcesses() throws ReflectiveOperationException {
String instanceId =
contextManager.getInstanceContext().getInstance().getMetaData().getId();
- ProcessRegistry.getInstance().putProcessContext("foo_execution_id",
mock(ProcessContext.class));
- String processId = "foo_process_id";
+ ProcessRegistry.getInstance().putProcess("foo_id",
mock(Process.class));
+ String processId = "foo_id";
subscriber.reportLocalProcesses(new
ShowProcessListTriggerEvent(instanceId, processId));
ClusterPersistRepository repository = ((RegistryCenter)
Plugins.getMemberAccessor().get(ProcessListChangedSubscriber.class.getDeclaredField("registryCenter"),
subscriber)).getRepository();
- verify(repository).persist("/execution_nodes/foo_process_id/" +
instanceId,
- "contexts:" + System.lineSeparator() + "- completedUnitCount:
0\n idle: false\n startTimeMillis: 0\n totalUnitCount: 0" +
System.lineSeparator());
- verify(repository).delete("/nodes/compute_nodes/process_trigger/" +
instanceId + ":foo_process_id");
+ verify(repository).persist("/execution_nodes/foo_id/" + instanceId,
+ "processes:" + System.lineSeparator() + "- completedUnitCount:
0\n idle: false\n startMillis: 0\n totalUnitCount: 0" +
System.lineSeparator());
+ verify(repository).delete("/nodes/compute_nodes/process_trigger/" +
instanceId + ":foo_id");
}
@Test
void assertCompleteToReportLocalProcesses() {
- String taskId = "foo_process_id";
+ String taskId = "foo_id";
ShowProcessListLock lock = new ShowProcessListLock();
ProcessRegistry.getInstance().getLocks().put(taskId, lock);
long startMillis = System.currentTimeMillis();
@@ -135,15 +135,15 @@ class ProcessListChangedSubscriberTest {
@Test
void assertKillProcess() throws SQLException, ReflectiveOperationException
{
String instanceId =
contextManager.getInstanceContext().getInstance().getMetaData().getId();
- String processId = "foo_process_id";
+ String processId = "foo_id";
subscriber.killProcess(new KillProcessEvent(instanceId, processId));
ClusterPersistRepository repository = ((RegistryCenter)
Plugins.getMemberAccessor().get(ProcessListChangedSubscriber.class.getDeclaredField("registryCenter"),
subscriber)).getRepository();
- verify(repository).delete("/nodes/compute_nodes/process_kill/" +
instanceId + ":foo_process_id");
+ verify(repository).delete("/nodes/compute_nodes/process_kill/" +
instanceId + ":foo_id");
}
@Test
void assertCompleteToKillProcessInstance() {
- String processId = "foo_process_id";
+ String processId = "foo_id";
ShowProcessListLock lock = new ShowProcessListLock();
ProcessRegistry.getInstance().getLocks().put(processId, lock);
long startMillis = System.currentTimeMillis();
diff --git
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessListSubscriber.java
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessListSubscriber.java
index c3ca49d42aa..8c5b6ec9213 100644
---
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessListSubscriber.java
+++
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessListSubscriber.java
@@ -18,10 +18,10 @@
package org.apache.shardingsphere.mode.manager.standalone.subscriber;
import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessContext;
+import org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
-import
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessListContexts;
-import
org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListContextsSwapper;
+import
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList;
+import
org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.event.process.KillProcessRequestEvent;
@@ -41,7 +41,7 @@ public final class StandaloneProcessListSubscriber implements
ProcessListSubscri
private final EventBusContext eventBusContext;
- private final YamlProcessListContextsSwapper swapper = new
YamlProcessListContextsSwapper();
+ private final YamlProcessListSwapper swapper = new
YamlProcessListSwapper();
public StandaloneProcessListSubscriber(final EventBusContext
eventBusContext) {
this.eventBusContext = eventBusContext;
@@ -51,18 +51,18 @@ public final class StandaloneProcessListSubscriber
implements ProcessListSubscri
@Override
@Subscribe
public void postShowProcessListData(final ShowProcessListRequestEvent
event) {
- YamlProcessListContexts yamlProcessListContexts =
swapper.swapToYamlConfiguration(ProcessRegistry.getInstance().getAllProcessContexts());
- eventBusContext.post(new
ShowProcessListResponseEvent(Collections.singleton(YamlEngine.marshal(yamlProcessListContexts))));
+ YamlProcessList yamlProcessList =
swapper.swapToYamlConfiguration(ProcessRegistry.getInstance().getAllProcesses());
+ eventBusContext.post(new
ShowProcessListResponseEvent(Collections.singleton(YamlEngine.marshal(yamlProcessList))));
}
@Override
@Subscribe
public void killProcess(final KillProcessRequestEvent event) throws
SQLException {
- ProcessContext processContext =
ProcessRegistry.getInstance().getProcessContext(event.getId());
- if (null == processContext) {
+ Process process =
ProcessRegistry.getInstance().getProcess(event.getId());
+ if (null == process) {
return;
}
- for (Statement each : processContext.getProcessStatements()) {
+ for (Statement each : process.getProcessStatements()) {
each.cancel();
}
}
diff --git
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessListSubscriberTest.java
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessListSubscriberTest.java
index f677f688e2a..6e5abc86655 100644
---
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessListSubscriberTest.java
+++
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessListSubscriberTest.java
@@ -38,6 +38,6 @@ class StandaloneProcessListSubscriberTest {
ProcessRegistry processRegistry = mock(ProcessRegistry.class);
when(ProcessRegistry.getInstance()).thenReturn(processRegistry);
new StandaloneProcessListSubscriber(new
EventBusContext()).postShowProcessListData(new ShowProcessListRequestEvent());
- verify(processRegistry).getAllProcessContexts();
+ verify(processRegistry).getAllProcesses();
}
}
diff --git
a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
index 19a64fd65d4..5c13d1ffef3 100644
---
a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
+++
b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
@@ -25,8 +25,8 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.ra
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.raw.metadata.RawQueryResultMetaData;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.raw.type.RawMemoryQueryResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.type.memory.row.MemoryQueryResultDataRow;
-import
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessListContexts;
-import
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessContext;
+import
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList;
+import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcess;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import
org.apache.shardingsphere.infra.merge.result.impl.transparent.TransparentMergedResult;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -51,7 +51,7 @@ import java.util.stream.Collectors;
@SuppressWarnings("UnstableApiUsage")
public final class ShowProcessListExecutor implements
DatabaseAdminQueryExecutor {
- private Collection<String> batchProcessContexts;
+ private Collection<String> processes;
@Getter
private QueryResultMetaData queryResultMetaData;
@@ -70,7 +70,7 @@ public final class ShowProcessListExecutor implements
DatabaseAdminQueryExecutor
*/
@Subscribe
public void receiveProcessListData(final ShowProcessListResponseEvent
event) {
- batchProcessContexts = event.getProcessListContexts();
+ processes = event.getProcesses();
}
@Override
@@ -81,33 +81,33 @@ public final class ShowProcessListExecutor implements
DatabaseAdminQueryExecutor
private QueryResult getQueryResult() {
ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext().post(new
ShowProcessListRequestEvent());
- if (null == batchProcessContexts || batchProcessContexts.isEmpty()) {
+ if (null == processes || processes.isEmpty()) {
return new RawMemoryQueryResult(queryResultMetaData,
Collections.emptyList());
}
- Collection<YamlProcessContext> processContexts = new LinkedList<>();
- for (String each : batchProcessContexts) {
- processContexts.addAll(YamlEngine.unmarshal(each,
YamlProcessListContexts.class).getContexts());
+ Collection<YamlProcess> processes = new LinkedList<>();
+ for (String each : this.processes) {
+ processes.addAll(YamlEngine.unmarshal(each,
YamlProcessList.class).getProcesses());
}
- List<MemoryQueryResultDataRow> rows =
processContexts.stream().map(ShowProcessListExecutor::getMemoryQueryResultDataRow).collect(Collectors.toList());
+ List<MemoryQueryResultDataRow> rows =
processes.stream().map(ShowProcessListExecutor::getMemoryQueryResultDataRow).collect(Collectors.toList());
return new RawMemoryQueryResult(queryResultMetaData, rows);
}
- private static MemoryQueryResultDataRow getMemoryQueryResultDataRow(final
YamlProcessContext yamlProcessContext) {
+ private static MemoryQueryResultDataRow getMemoryQueryResultDataRow(final
YamlProcess yamlProcess) {
List<Object> rowValues = new ArrayList<>(8);
- rowValues.add(yamlProcessContext.getProcessID());
- rowValues.add(yamlProcessContext.getUsername());
- rowValues.add(yamlProcessContext.getHostname());
- rowValues.add(yamlProcessContext.getDatabaseName());
- rowValues.add(yamlProcessContext.isIdle() ? "Sleep" : "Execute");
-
rowValues.add(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() -
yamlProcessContext.getStartTimeMillis()));
+ rowValues.add(yamlProcess.getId());
+ rowValues.add(yamlProcess.getUsername());
+ rowValues.add(yamlProcess.getHostname());
+ rowValues.add(yamlProcess.getDatabaseName());
+ rowValues.add(yamlProcess.isIdle() ? "Sleep" : "Execute");
+
rowValues.add(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() -
yamlProcess.getStartMillis()));
String sql = null;
- if (yamlProcessContext.isIdle()) {
+ if (yamlProcess.isIdle()) {
rowValues.add("");
} else {
- int processDoneCount = yamlProcessContext.getCompletedUnitCount();
+ int processDoneCount = yamlProcess.getCompletedUnitCount();
String statePrefix = "Executing ";
- rowValues.add(statePrefix + processDoneCount + "/" +
yamlProcessContext.getTotalUnitCount());
- sql = yamlProcessContext.getSql();
+ rowValues.add(statePrefix + processDoneCount + "/" +
yamlProcess.getTotalUnitCount());
+ sql = yamlProcess.getSql();
}
if (null != sql && sql.length() > 100) {
sql = sql.substring(0, 100);
diff --git
a/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
b/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
index 0c5626ff02d..420f528a734 100644
---
a/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
+++
b/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
@@ -63,16 +63,16 @@ class ShowProcessListExecutorTest {
}
private void setupBatchProcessContexts(final ShowProcessListExecutor
showProcessListExecutor) throws ReflectiveOperationException {
- String executionNodeValue = "contexts:\n"
- + "- processID: f6c2336a-63ba-41bf-941e-2e3504eb2c80\n"
+ String executionNodeValue = "processes:\n"
+ + "- id: f6c2336a-63ba-41bf-941e-2e3504eb2c80\n"
+ " sql: alter table t_order add column a varchar(64) after
order_id\n"
- + " startTimeMillis: 1617939785160\n"
+ + " startMillis: 1617939785160\n"
+ " databaseName: sharding_db\n"
+ " username: sharding\n"
+ " hostname: 127.0.0.1\n"
+ " totalUnitCount: 2\n"
+ " completedUnitCount: 1\n"
+ " idle: false\n";
-
Plugins.getMemberAccessor().set(showProcessListExecutor.getClass().getDeclaredField("batchProcessContexts"),
showProcessListExecutor, Collections.singleton(executionNodeValue));
+
Plugins.getMemberAccessor().set(showProcessListExecutor.getClass().getDeclaredField("processes"),
showProcessListExecutor, Collections.singleton(executionNodeValue));
}
}