This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d8ddbdc0d87 Refactor pipeline job path in registry center (#20278)
d8ddbdc0d87 is described below
commit d8ddbdc0d87b02e3046276709ee143664ed10777
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Aug 18 22:25:36 2022 +0800
Refactor pipeline job path in registry center (#20278)
* Refactor pipeline job path in registry center
* Compatible with ejob namespace
---
.../data/pipeline/api/job/JobType.java | 5 ++
.../data/pipeline/core/api/PipelineAPIFactory.java | 6 +-
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 12 ++--
.../core/api/impl/GovernanceRepositoryAPIImpl.java | 12 ++--
.../core/constant/DataPipelineConstants.java | 3 +-
.../pipeline/core/execute/PipelineJobExecutor.java | 10 +--
.../core/metadata/node/PipelineMetaDataNode.java | 80 +++++++++++++---------
.../pipeline/scenario/migration/MigrationJob.java | 4 +-
.../metadata/node/PipelineMetaDataNodeTest.java | 44 +++++++++---
9 files changed, 108 insertions(+), 68 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
index 174550b90fe..553e720c936 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.api.job;
import com.google.common.base.Preconditions;
import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
import java.util.Arrays;
import java.util.Map;
@@ -40,10 +41,14 @@ public enum JobType {
private final String typeName;
+ private final String lowercaseTypeName;
+
private final String typeCode;
JobType(final String typeName, final String typeCode) {
+ Preconditions.checkArgument(StringUtils.isAlpha(typeName), "type name
must be character of [a-z]");
this.typeName = typeName;
+ this.lowercaseTypeName = typeName.toLowerCase();
Preconditions.checkArgument(typeCode.length() == 2, "code length is
not 2");
this.typeCode = typeCode;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
index b9438115173..52972546a8d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
@@ -25,8 +25,8 @@ import
org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import
org.apache.shardingsphere.data.pipeline.core.api.impl.GovernanceRepositoryAPIImpl;
-import
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.core.registry.CoordinatorRegistryCenterInitializer;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobAPIFactory;
import
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
@@ -126,7 +126,7 @@ public final class PipelineAPIFactory {
private ElasticJobAPIHolder() {
ClusterPersistRepositoryConfiguration repositoryConfig =
(ClusterPersistRepositoryConfiguration)
PipelineContext.getModeConfig().getRepository();
- String namespace = repositoryConfig.getNamespace() +
DataPipelineConstants.DATA_PIPELINE_ROOT;
+ String namespace = repositoryConfig.getNamespace() +
PipelineMetaDataNode.getElasticJobNamespace();
jobStatisticsAPI =
JobAPIFactory.createJobStatisticsAPI(repositoryConfig.getServerLists(),
namespace, null);
jobConfigurationAPI =
JobAPIFactory.createJobConfigurationAPI(repositoryConfig.getServerLists(),
namespace, null);
jobOperateAPI =
JobAPIFactory.createJobOperateAPI(repositoryConfig.getServerLists(), namespace,
null);
@@ -162,7 +162,7 @@ public final class PipelineAPIFactory {
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenterInitializer registryCenterInitializer =
new CoordinatorRegistryCenterInitializer();
ModeConfiguration modeConfig = PipelineContext.getModeConfig();
- return registryCenterInitializer.createRegistryCenter(modeConfig,
DataPipelineConstants.DATA_PIPELINE_ROOT);
+ return registryCenterInitializer.createRegistryCenter(modeConfig,
PipelineMetaDataNode.getElasticJobNamespace());
}
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index a9d8f53cfb1..e07fb3035d6 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -65,12 +65,12 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
}
log.info("Start job by {}", jobConfig);
GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
- String jobConfigKey =
PipelineMetaDataNode.getScalingJobConfigPath(jobId);
+ String jobConfigKey = PipelineMetaDataNode.getJobConfigPath(jobId);
if (repositoryAPI.isExisted(jobConfigKey)) {
log.warn("jobId already exists in registry center, ignore,
jobConfigKey={}", jobConfigKey);
return Optional.of(jobId);
}
- repositoryAPI.persist(PipelineMetaDataNode.getScalingJobPath(jobId),
MigrationJob.class.getName());
+ repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId),
MigrationJob.class.getName());
repositoryAPI.persist(jobConfigKey,
convertJobConfigurationToText(jobConfig));
return Optional.of(jobId);
}
@@ -89,7 +89,7 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
@Override
public void startDisabledJob(final String jobId) {
log.info("Start disabled pipeline job {}", jobId);
-
pipelineDistributedBarrier.removeParentNode(PipelineMetaDataNode.getScalingJobBarrierDisablePath(jobId));
+
pipelineDistributedBarrier.removeParentNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId));
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
if (!jobConfigPOJO.isDisabled()) {
throw new PipelineVerifyFailedException("Job is already started.");
@@ -97,7 +97,7 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
jobConfigPOJO.setDisabled(false);
jobConfigPOJO.getProps().remove("stop_time");
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
- String barrierPath =
PipelineMetaDataNode.getScalingJobBarrierEnablePath(jobId);
+ String barrierPath =
PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
pipelineDistributedBarrier.register(barrierPath,
jobConfigPOJO.getShardingTotalCount());
pipelineDistributedBarrier.await(barrierPath, 5, TimeUnit.SECONDS);
}
@@ -105,13 +105,13 @@ public abstract class AbstractPipelineJobAPIImpl
implements PipelineJobAPI {
@Override
public void stop(final String jobId) {
log.info("Stop pipeline job {}", jobId);
-
pipelineDistributedBarrier.removeParentNode(PipelineMetaDataNode.getScalingJobBarrierEnablePath(jobId));
+
pipelineDistributedBarrier.removeParentNode(PipelineMetaDataNode.getJobBarrierEnablePath(jobId));
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
jobConfigPOJO.setDisabled(true);
jobConfigPOJO.getProps().setProperty("stop_time",
LocalDateTime.now().format(DATE_TIME_FORMATTER));
// TODO updateJobConfiguration might doesn't work
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
- String barrierPath =
PipelineMetaDataNode.getScalingJobBarrierDisablePath(jobId);
+ String barrierPath =
PipelineMetaDataNode.getJobBarrierDisablePath(jobId);
pipelineDistributedBarrier.register(barrierPath,
jobConfigPOJO.getShardingTotalCount());
pipelineDistributedBarrier.await(barrierPath, 5, TimeUnit.SECONDS);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index c58c65e24f9..62af844a34a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -45,30 +45,30 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
@Override
public void persistJobItemProgress(final String jobId, final int
shardingItem, final String progressValue) {
- repository.persist(PipelineMetaDataNode.getScalingJobOffsetPath(jobId,
shardingItem), progressValue);
+ repository.persist(PipelineMetaDataNode.getJobOffsetItemPath(jobId,
shardingItem), progressValue);
}
@Override
public String getJobItemProgress(final String jobId, final int
shardingItem) {
- return
repository.get(PipelineMetaDataNode.getScalingJobOffsetPath(jobId,
shardingItem));
+ return repository.get(PipelineMetaDataNode.getJobOffsetItemPath(jobId,
shardingItem));
}
@Override
public void persistJobCheckResult(final String jobId, final boolean
checkSuccess) {
log.info("persist job check result '{}' for job {}", checkSuccess,
jobId);
-
repository.persist(PipelineMetaDataNode.getScalingCheckResultPath(jobId),
String.valueOf(checkSuccess));
+ repository.persist(PipelineMetaDataNode.getJobCheckResultPath(jobId),
String.valueOf(checkSuccess));
}
@Override
public Optional<Boolean> getJobCheckResult(final String jobId) {
- String data =
repository.get(PipelineMetaDataNode.getScalingCheckResultPath(jobId));
+ String data =
repository.get(PipelineMetaDataNode.getJobCheckResultPath(jobId));
return Strings.isNullOrEmpty(data) ? Optional.empty() :
Optional.of(Boolean.parseBoolean(data));
}
@Override
public void deleteJob(final String jobId) {
log.info("delete job {}", jobId);
- repository.delete(PipelineMetaDataNode.getScalingJobPath(jobId));
+ repository.delete(PipelineMetaDataNode.getJobRootPath(jobId));
}
@Override
@@ -88,7 +88,7 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
@Override
public List<Integer> getShardingItems(final String jobId) {
- List<String> result =
getChildrenKeys(PipelineMetaDataNode.getScalingJobOffsetPath(jobId));
+ List<String> result =
getChildrenKeys(PipelineMetaDataNode.getJobOffsetPath(jobId));
log.info("getShardingItems, jobId={}, offsetKeys={}", jobId, result);
return
result.stream().map(Integer::parseInt).collect(Collectors.toList());
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/constant/DataPipelineConstants.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/constant/DataPipelineConstants.java
index 8a7e550bf7d..0caf3b592a6 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/constant/DataPipelineConstants.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/constant/DataPipelineConstants.java
@@ -29,8 +29,7 @@ public final class DataPipelineConstants {
/**
* Data pipeline node name.
*/
- // TODO change to pipeline after job configuration structure completed
- public static final String DATA_PIPELINE_NODE_NAME = "scaling";
+ public static final String DATA_PIPELINE_NODE_NAME = "pipeline";
/**
* Data pipeline root path.
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index b8d93725514..3427ccc3d1a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -25,6 +25,7 @@ import
org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
@@ -38,7 +39,6 @@ import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEve
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.regex.Pattern;
/**
* Pipeline job executor.
@@ -46,16 +46,12 @@ import java.util.regex.Pattern;
@Slf4j
public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
- private static final Pattern CONFIG_PATTERN =
Pattern.compile(DataPipelineConstants.DATA_PIPELINE_ROOT +
"/(j\\d{2}[0-9a-f]+)/config");
-
- private static final Pattern BARRIER_MATCH_PATTERN =
Pattern.compile(DataPipelineConstants.DATA_PIPELINE_ROOT +
"/(j\\d{2}[0-9a-f]+)/barrier/(enable|disable)/\\d+");
-
private final ExecutorService executor = Executors.newFixedThreadPool(20);
@Override
protected void doStart() {
PipelineAPIFactory.getGovernanceRepositoryAPI().watch(DataPipelineConstants.DATA_PIPELINE_ROOT,
event -> {
- if (BARRIER_MATCH_PATTERN.matcher(event.getKey()).matches() &&
event.getType() == Type.ADDED) {
+ if
(PipelineMetaDataNode.BARRIER_PATTERN.matcher(event.getKey()).matches() &&
event.getType() == Type.ADDED) {
PipelineDistributedBarrier.getInstance().checkChildrenNodeCount(event);
}
getJobConfigPOJO(event).ifPresent(optional -> processEvent(event,
optional));
@@ -64,7 +60,7 @@ public final class PipelineJobExecutor extends
AbstractLifecycleExecutor {
private Optional<JobConfigurationPOJO> getJobConfigPOJO(final
DataChangedEvent event) {
try {
- if (CONFIG_PATTERN.matcher(event.getKey()).matches()) {
+ if
(PipelineMetaDataNode.CONFIG_PATTERN.matcher(event.getKey()).matches()) {
log.info("{} job config: {}", event.getType(), event.getKey());
return Optional.of(YamlEngine.unmarshal(event.getValue(),
JobConfigurationPOJO.class, true));
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
index 9e75d46ef13..25a1f47d35f 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
@@ -21,90 +21,102 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
+import java.util.regex.Pattern;
+
/**
- * Scaling meta data node.
+ * Pipeline meta data node.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class PipelineMetaDataNode {
+ private static final String JOB_PATTERN_PREFIX =
DataPipelineConstants.DATA_PIPELINE_ROOT + "/jobs/(j\\d{2}[0-9a-f]+)";
+
+ public static final Pattern CONFIG_PATTERN =
Pattern.compile(JOB_PATTERN_PREFIX + "/config");
+
+ public static final Pattern BARRIER_PATTERN =
Pattern.compile(JOB_PATTERN_PREFIX + "/barrier/(enable|disable)/\\d+");
+
/**
- * Get job config path.
+ * Get ElasticJob namespace.
*
- * @param jobId job id.
- * @return job config path.
+ * @return namespace
*/
- public static String getJobConfigPath(final String jobId) {
- return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT,
jobId, "config");
+ public static String getElasticJobNamespace() {
+ // ElasticJob will persist job to namespace
+ return getJobsPath();
+ }
+
+ private static String getJobsPath() {
+ return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT,
"jobs");
}
/**
- * Get scaling root path.
+ * Get job root path.
*
* @param jobId job id.
* @return root path
*/
- public static String getScalingJobPath(final String jobId) {
- return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT,
jobId);
+ public static String getJobRootPath(final String jobId) {
+ return String.join("/", getJobsPath(), jobId);
}
/**
- * Get scaling job offset path, include job id and sharding item.
+ * Get job offset item path.
*
- * @param jobId job id.
- * @param shardingItem sharding item.
- * @return job offset path.
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @return job offset path
*/
- public static String getScalingJobOffsetPath(final String jobId, final int
shardingItem) {
- return String.join("/", getScalingJobOffsetPath(jobId),
Integer.toString(shardingItem));
+ public static String getJobOffsetItemPath(final String jobId, final int
shardingItem) {
+ return String.join("/", getJobOffsetPath(jobId),
Integer.toString(shardingItem));
}
/**
- * Get scaling job offset path.
+ * Get job offset path.
*
- * @param jobId job id.
- * @return job offset path.
+ * @param jobId job id
+ * @return job offset path
*/
- public static String getScalingJobOffsetPath(final String jobId) {
- return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT,
jobId, "offset");
+ public static String getJobOffsetPath(final String jobId) {
+ return String.join("/", getJobRootPath(jobId), "offset");
}
/**
- * Get scaling job config path.
+ * Get job config path.
*
* @param jobId job id.
* @return job config path.
*/
- public static String getScalingJobConfigPath(final String jobId) {
- return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT,
jobId, "config");
+ public static String getJobConfigPath(final String jobId) {
+ return String.join("/", getJobRootPath(jobId), "config");
}
/**
- * Get scaling job config path.
+ * Get job check result path.
*
* @param jobId job id.
* @return job config path.
*/
- public static String getScalingCheckResultPath(final String jobId) {
- return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT,
jobId, "check", "result");
+ public static String getJobCheckResultPath(final String jobId) {
+ return String.join("/", getJobRootPath(jobId), "check", "result");
}
/**
- * Get scaling job barrier enable path.
+ * Get job barrier enable path.
*
* @param jobId job id
- * @return job barrier path.
+ * @return job barrier enable path
*/
- public static String getScalingJobBarrierEnablePath(final String jobId) {
- return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT,
jobId, "barrier", "enable");
+ public static String getJobBarrierEnablePath(final String jobId) {
+ return String.join("/", getJobRootPath(jobId), "barrier", "enable");
}
/**
- * Get scaling job barrier disable path.
+ * Get job barrier disable path.
*
* @param jobId job id
- * @return job barrier path.
+ * @return job barrier disable path
*/
- public static String getScalingJobBarrierDisablePath(final String jobId) {
- return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT,
jobId, "barrier", "disable");
+ public static String getJobBarrierDisablePath(final String jobId) {
+ return String.join("/", getJobRootPath(jobId), "barrier", "disable");
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index f730c8f3758..b7bd288daac 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -76,7 +76,7 @@ public final class MigrationJob extends AbstractPipelineJob
implements SimpleJob
});
getTasksRunnerMap().put(shardingItem, tasksRunner);
PipelineJobProgressPersistService.addJobProgressPersistContext(getJobId(),
shardingItem);
-
pipelineDistributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getScalingJobBarrierEnablePath(getJobId()),
shardingItem);
+
pipelineDistributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(getJobId()),
shardingItem);
}
private void prepare(final MigrationJobItemContext jobItemContext) {
@@ -110,7 +110,7 @@ public final class MigrationJob extends AbstractPipelineJob
implements SimpleJob
return;
}
log.info("stop tasks runner, jobId={}", getJobId());
- String scalingJobBarrierDisablePath =
PipelineMetaDataNode.getScalingJobBarrierDisablePath(getJobId());
+ String scalingJobBarrierDisablePath =
PipelineMetaDataNode.getJobBarrierDisablePath(getJobId());
for (PipelineTasksRunner each : getTasksRunnerMap().values()) {
each.stop();
pipelineDistributedBarrier.persistEphemeralChildrenNode(scalingJobBarrierDisablePath,
each.getJobItemContext().getShardingItem());
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
index 387a86dfbf3..2619d560e31 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
@@ -24,21 +24,49 @@ import static org.junit.Assert.assertThat;
public final class PipelineMetaDataNodeTest {
+ private final String jobId = "j01001";
+
+ private final String jobsPath = "/pipeline/jobs";
+
+ private final String jobRootPath = jobsPath + "/j01001";
+
+ @Test
+ public void assertGetElasticJobNamespace() {
+ assertThat(PipelineMetaDataNode.getElasticJobNamespace(),
is(jobsPath));
+ }
+
+ @Test
+ public void assertGetJobRootPath() {
+ assertThat(PipelineMetaDataNode.getJobRootPath(jobId),
is(jobRootPath));
+ }
+
+ @Test
+ public void assertGetJobOffsetPath() {
+ assertThat(PipelineMetaDataNode.getJobOffsetPath(jobId),
is(jobRootPath + "/offset"));
+ }
+
+ @Test
+ public void assertGetJobOffsetItemPath() {
+ assertThat(PipelineMetaDataNode.getJobOffsetItemPath(jobId, 0),
is(jobRootPath + "/offset/0"));
+ }
+
@Test
public void assertGetJobConfigPath() {
- String actualOffsetPath =
PipelineMetaDataNode.getScalingJobOffsetPath("0130317c30317c3054317c7368617264696e675f6462");
- assertThat(actualOffsetPath,
is("/scaling/0130317c30317c3054317c7368617264696e675f6462/offset"));
- actualOffsetPath =
PipelineMetaDataNode.getScalingJobOffsetPath("0130317c30317c3054317c7368617264696e675f6462",
1);
- assertThat(actualOffsetPath,
is("/scaling/0130317c30317c3054317c7368617264696e675f6462/offset/1"));
+ assertThat(PipelineMetaDataNode.getJobConfigPath(jobId),
is(jobRootPath + "/config"));
+ }
+
+ @Test
+ public void assertGetCheckResultPath() {
+ assertThat(PipelineMetaDataNode.getJobCheckResultPath(jobId),
is(jobRootPath + "/check/result"));
}
@Test
- public void assertGetScalingJobConfigPath() {
-
assertThat(PipelineMetaDataNode.getScalingJobConfigPath("0130317c30317c3054317c7368617264696e675f6462"),
is("/scaling/0130317c30317c3054317c7368617264696e675f6462/config"));
+ public void assertGetJobBarrierEnablePath() {
+ assertThat(PipelineMetaDataNode.getJobBarrierEnablePath(jobId),
is(jobRootPath + "/barrier/enable"));
}
@Test
- public void assertGetScalingCheckResultPath() {
-
assertThat(PipelineMetaDataNode.getScalingCheckResultPath("0130317c30317c3054317c7368617264696e675f6462"),
is("/scaling/0130317c30317c3054317c7368617264696e675f6462/check/result"));
+ public void assertGetJobBarrierDisablePath() {
+ assertThat(PipelineMetaDataNode.getJobBarrierDisablePath(jobId),
is(jobRootPath + "/barrier/disable"));
}
}