This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new dd190349365 Rename process id to task id in
ClusterProcessListSubscriber.postShowProcessListData() (#25437)
dd190349365 is described below
commit dd19034936585c0d6c828972a3fa462bd488fbda
Author: Liang Zhang <[email protected]>
AuthorDate: Wed May 3 12:33:56 2023 +0800
Rename process id to task id in
ClusterProcessListSubscriber.postShowProcessListData() (#25437)
---
.../metadata/persist/node/ComputeNode.java | 8 ++++----
.../metadata/persist/node/ComputeNodeTest.java | 4 ++--
.../subscriber/ClusterProcessListSubscriber.java | 24 +++++++++++-----------
.../subscriber/ProcessListChangedSubscriber.java | 2 +-
4 files changed, 19 insertions(+), 19 deletions(-)
diff --git
a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNode.java
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNode.java
index 0636b850d0e..25dfd4dbd7f 100644
---
a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNode.java
+++
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNode.java
@@ -95,14 +95,14 @@ public final class ComputeNode {
}
/**
- * Get process trigger instance process id node path.
+ * Get process trigger instance node path.
*
* @param instanceId instance id
- * @param processId process id
+ * @param taskId show process list task id
* @return path of process trigger instance node path
*/
- public static String getProcessTriggerInstanceIdNodePath(final String
instanceId, final String processId) {
- return String.join("/", "", ROOT_NODE, COMPUTE_NODE, PROCESS_TRIGGER,
String.join(":", instanceId, processId));
+ public static String getProcessTriggerInstanceNodePath(final String
instanceId, final String taskId) {
+ return String.join("/", "", ROOT_NODE, COMPUTE_NODE, PROCESS_TRIGGER,
String.join(":", instanceId, taskId));
}
/**
diff --git
a/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodeTest.java
b/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodeTest.java
index 51a7e7ecfe8..725fc8eb35b 100644
---
a/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodeTest.java
+++
b/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodeTest.java
@@ -44,9 +44,9 @@ class ComputeNodeTest {
@Test
void assertGetProcessTriggerInstanceIdNodePath() {
-
assertThat(ComputeNode.getProcessTriggerInstanceIdNodePath("foo_instance",
"foo_process_id"),
+
assertThat(ComputeNode.getProcessTriggerInstanceNodePath("foo_instance",
"foo_process_id"),
is("/nodes/compute_nodes/process_trigger/foo_instance:foo_process_id"));
-
assertThat(ComputeNode.getProcessTriggerInstanceIdNodePath("foo_instance",
"foo_process_id"),
+
assertThat(ComputeNode.getProcessTriggerInstanceNodePath("foo_instance",
"foo_process_id"),
is("/nodes/compute_nodes/process_trigger/foo_instance:foo_process_id"));
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriber.java
index 31bfa25c42e..5062c63a68c 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriber.java
@@ -55,30 +55,30 @@ public final class ClusterProcessListSubscriber implements
ProcessListSubscriber
@Override
@Subscribe
public void postShowProcessListData(final ShowProcessListRequestEvent
event) {
- String processId = new UUID(ThreadLocalRandom.current().nextLong(),
ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
- boolean triggerIsComplete = false;
- Collection<String> triggerPaths = getTriggerPaths(processId);
+ String taskId = new UUID(ThreadLocalRandom.current().nextLong(),
ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
+ boolean isTriggerCompleted = false;
+ Collection<String> triggerPaths = getTriggerPaths(taskId);
try {
triggerPaths.forEach(each -> repository.persist(each, ""));
- triggerIsComplete = waitAllNodeDataReady(processId, triggerPaths);
- postShowProcessListData(processId);
+ isTriggerCompleted = waitAllNodeDataReady(taskId, triggerPaths);
+ postShowProcessListData(taskId);
} finally {
- repository.delete(ProcessNode.getProcessIdPath(processId));
- if (!triggerIsComplete) {
+ repository.delete(ProcessNode.getProcessIdPath(taskId));
+ if (!isTriggerCompleted) {
triggerPaths.forEach(repository::delete);
}
}
}
- private void postShowProcessListData(final String processId) {
- Collection<String> yamlProcessListContexts =
repository.getChildrenKeys(ProcessNode.getProcessIdPath(processId)).stream()
- .map(each ->
repository.getDirectly(ProcessNode.getProcessListInstancePath(processId,
each))).collect(Collectors.toList());
+ private void postShowProcessListData(final String taskId) {
+ Collection<String> yamlProcessListContexts =
repository.getChildrenKeys(ProcessNode.getProcessIdPath(taskId)).stream()
+ .map(each ->
repository.getDirectly(ProcessNode.getProcessListInstancePath(taskId,
each))).collect(Collectors.toList());
eventBusContext.post(new
ShowProcessListResponseEvent(yamlProcessListContexts));
}
- private Collection<String> getTriggerPaths(final String processId) {
+ private Collection<String> getTriggerPaths(final String taskId) {
return Stream.of(InstanceType.values())
- .flatMap(each ->
repository.getChildrenKeys(ComputeNode.getOnlineNodePath(each)).stream().map(onlinePath
-> ComputeNode.getProcessTriggerInstanceIdNodePath(onlinePath, processId)))
+ .flatMap(each ->
repository.getChildrenKeys(ComputeNode.getOnlineNodePath(each)).stream().map(onlinePath
-> ComputeNode.getProcessTriggerInstanceNodePath(onlinePath, taskId)))
.collect(Collectors.toList());
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
index 2db936cb11c..707bbae56d0 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
@@ -69,7 +69,7 @@ public final class ProcessListChangedSubscriber {
registryCenter.getRepository().persist(
ProcessNode.getProcessListInstancePath(event.getProcessId(),
event.getInstanceId()),
YamlEngine.marshal(swapper.swapToYamlConfiguration(processContexts)));
}
-
registryCenter.getRepository().delete(ComputeNode.getProcessTriggerInstanceIdNodePath(event.getInstanceId(),
event.getProcessId()));
+
registryCenter.getRepository().delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(),
event.getProcessId()));
}
/**