This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e5da743656a Rename ShowProcessInstanceCompleteEvent to
ReportLocalProcessesCompletedEvent (#25441)
e5da743656a is described below
commit e5da743656ac974b2145112686e2cc1c6746dede
Author: Liang Zhang <[email protected]>
AuthorDate: Wed May 3 22:09:44 2023 +0800
Rename ShowProcessInstanceCompleteEvent to
ReportLocalProcessesCompletedEvent (#25441)
---
.../subscriber/ProcessListChangedSubscriber.java | 30 +++++++++----------
...ava => ReportLocalProcessesCompletedEvent.java} | 4 +--
.../watcher/ComputeNodeStateChangedWatcher.java | 4 +--
.../ProcessListChangedSubscriberTest.java | 34 +++++++++++-----------
4 files changed, 36 insertions(+), 36 deletions(-)
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
index 282cc8f628c..42551d17114 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
@@ -29,7 +29,7 @@ import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessInstanceCompleteEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessInstanceCompleteEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
import java.sql.SQLException;
@@ -72,6 +72,19 @@ public final class ProcessListChangedSubscriber {
registryCenter.getRepository().delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(),
event.getTaskId()));
}
+ /**
+ * Complete to report local processes.
+ *
+ * @param event report local processes completed event
+ */
+ @Subscribe
+ public synchronized void completeToReportLocalProcesses(final
ReportLocalProcessesCompletedEvent event) {
+ ShowProcessListLock lock =
ProcessRegistry.getInstance().getLocks().get(event.getTaskId());
+ if (null != lock) {
+ lock.doNotify();
+ }
+ }
+
/**
* Kill process.
*
@@ -92,26 +105,13 @@ public final class ProcessListChangedSubscriber {
registryCenter.getRepository().delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(),
event.getProcessId()));
}
- /**
- * Complete show process instance.
- *
- * @param event show process instance complete event
- */
- @Subscribe
- public synchronized void completeShowProcessInstance(final
ShowProcessInstanceCompleteEvent event) {
- ShowProcessListLock lock =
ProcessRegistry.getInstance().getLocks().get(event.getTaskId());
- if (null != lock) {
- lock.doNotify();
- }
- }
-
/**
* Complete to kill process instance.
*
* @param event kill process instance complete event
*/
@Subscribe
- public synchronized void completeKillProcessInstance(final
KillProcessInstanceCompleteEvent event) {
+ public synchronized void completeToKillProcessInstance(final
KillProcessInstanceCompleteEvent event) {
ShowProcessListLock lock =
ProcessRegistry.getInstance().getLocks().get(event.getProcessId());
if (null != lock) {
lock.doNotify();
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ShowProcessInstanceCompleteEvent.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ReportLocalProcessesCompletedEvent.java
similarity index 90%
rename from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ShowProcessInstanceCompleteEvent.java
rename to
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ReportLocalProcessesCompletedEvent.java
index 85b560a7bb7..c56cabcf1fb 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ShowProcessInstanceCompleteEvent.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ReportLocalProcessesCompletedEvent.java
@@ -22,11 +22,11 @@ import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
/**
- * Show process instance complete event.
+ * Report local processes completed event.
*/
@RequiredArgsConstructor
@Getter
-public final class ShowProcessInstanceCompleteEvent implements GovernanceEvent
{
+public final class ReportLocalProcessesCompletedEvent implements
GovernanceEvent {
private final String taskId;
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
index c091abfa5ce..6e8b1b6d463 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -31,7 +31,7 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessInstanceCompleteEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessInstanceCompleteEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
@@ -117,7 +117,7 @@ public final class ComputeNodeStateChangedWatcher
implements GovernanceWatcher<G
return Optional.of(new
ShowProcessListTriggerEvent(matcher.group(1), matcher.group(2)));
}
if (Type.DELETED == event.getType()) {
- return Optional.of(new
ShowProcessInstanceCompleteEvent(matcher.group(2)));
+ return Optional.of(new
ReportLocalProcessesCompletedEvent(matcher.group(2)));
}
return Optional.empty();
}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
index 2fda62e8fae..62df5687bce 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
@@ -36,7 +36,7 @@ import
org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuild
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessInstanceCompleteEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessInstanceCompleteEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -112,19 +112,10 @@ class ProcessListChangedSubscriberTest {
}
@Test
- void assertKillProcess() throws SQLException, ReflectiveOperationException
{
- String instanceId =
contextManager.getInstanceContext().getInstance().getMetaData().getId();
- String processId = "foo_process_id";
- subscriber.killProcess(new KillProcessEvent(instanceId, processId));
- ClusterPersistRepository repository = ((RegistryCenter)
Plugins.getMemberAccessor().get(ProcessListChangedSubscriber.class.getDeclaredField("registryCenter"),
subscriber)).getRepository();
- verify(repository).delete("/nodes/compute_nodes/process_kill/" +
instanceId + ":foo_process_id");
- }
-
- @Test
- void assertCompleteShowProcessUnit() {
- String processId = "foo_process_id";
+ void assertCompleteToReportLocalProcesses() {
+ String taskId = "foo_process_id";
ShowProcessListLock lock = new ShowProcessListLock();
- ProcessRegistry.getInstance().getLocks().put(processId, lock);
+ ProcessRegistry.getInstance().getLocks().put(taskId, lock);
long startMillis = System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.submit(() -> {
@@ -132,17 +123,26 @@ class ProcessListChangedSubscriberTest {
Thread.sleep(50L);
} catch (final InterruptedException ignored) {
}
- subscriber.completeShowProcessInstance(new
ShowProcessInstanceCompleteEvent(processId));
+ subscriber.completeToReportLocalProcesses(new
ReportLocalProcessesCompletedEvent(taskId));
});
lockAndAwaitDefaultTime(lock);
long currentTime = System.currentTimeMillis();
assertTrue(currentTime >= startMillis + 50L);
assertTrue(currentTime <= startMillis + 5000L);
- ProcessRegistry.getInstance().getLocks().remove(processId);
+ ProcessRegistry.getInstance().getLocks().remove(taskId);
+ }
+
+ @Test
+ void assertKillProcess() throws SQLException, ReflectiveOperationException
{
+ String instanceId =
contextManager.getInstanceContext().getInstance().getMetaData().getId();
+ String processId = "foo_process_id";
+ subscriber.killProcess(new KillProcessEvent(instanceId, processId));
+ ClusterPersistRepository repository = ((RegistryCenter)
Plugins.getMemberAccessor().get(ProcessListChangedSubscriber.class.getDeclaredField("registryCenter"),
subscriber)).getRepository();
+ verify(repository).delete("/nodes/compute_nodes/process_kill/" +
instanceId + ":foo_process_id");
}
@Test
- void assertCompleteKillProcessUnit() {
+ void assertCompleteToKillProcessInstance() {
String processId = "foo_process_id";
ShowProcessListLock lock = new ShowProcessListLock();
ProcessRegistry.getInstance().getLocks().put(processId, lock);
@@ -153,7 +153,7 @@ class ProcessListChangedSubscriberTest {
Thread.sleep(50L);
} catch (final InterruptedException ignored) {
}
- subscriber.completeKillProcessInstance(new
KillProcessInstanceCompleteEvent(processId));
+ subscriber.completeToKillProcessInstance(new
KillProcessInstanceCompleteEvent(processId));
});
lockAndAwaitDefaultTime(lock);
long currentTime = System.currentTimeMillis();