This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 309d6f31df7 Add PipelineJobConfigurationManager (#29198)
309d6f31df7 is described below
commit 309d6f31df767394d0a21d2f4dc4fd085e885236
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Nov 24 18:37:31 2023 +0800
Add PipelineJobConfigurationManager (#29198)
* Add PipelineJobConfigurationLoader
* Refactor PipelineJobConfigurationLoader
* Add PipelineJobConfigurationLoader
* Add PipelineJobConfigurationManager
---
.../config/job/PipelineJobConfiguration.java | 35 -------------
.../pipeline/core/job/service/PipelineJobAPI.java | 15 +++++-
.../service/PipelineJobConfigurationManager.java} | 58 +++++++++-------------
.../core/job/service/PipelineJobManager.java | 14 +-----
.../core/job/service/TransmissionJobManager.java | 2 +-
.../handler/update/CheckMigrationJobUpdater.java | 4 +-
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 26 +++++++---
.../cdc/config/job/CDCJobConfiguration.java | 15 ------
.../pipeline/cdc/handler/CDCBackendHandler.java | 8 +--
.../api/impl/ConsistencyCheckJobAPI.java | 4 +-
.../config/ConsistencyCheckJobConfiguration.java | 7 ---
.../task/ConsistencyCheckTasksRunner.java | 3 +-
.../migration/api/impl/MigrationJobAPI.java | 8 +--
.../config/MigrationJobConfiguration.java | 7 ---
.../api/impl/ConsistencyCheckJobAPITest.java | 4 +-
.../migration/api/impl/MigrationJobAPITest.java | 10 ++--
16 files changed, 83 insertions(+), 137 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
index 511c10d2fb9..e6f6516b7f4 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
@@ -17,23 +17,13 @@
package org.apache.shardingsphere.data.pipeline.common.config.job;
-import
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.Collections;
/**
* Pipeline job configuration.
*/
public interface PipelineJobConfiguration {
- DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
-
/**
* Get job id.
*
@@ -54,29 +44,4 @@ public interface PipelineJobConfiguration {
* @return source database type
*/
DatabaseType getSourceDatabaseType();
-
- /**
- * Convert to job configuration POJO.
- *
- * @return converted job configuration POJO
- */
- default JobConfigurationPOJO convertToJobConfigurationPOJO() {
- JobConfigurationPOJO result = new JobConfigurationPOJO();
- result.setJobName(getJobId());
- result.setShardingTotalCount(getJobShardingCount());
-
result.setJobParameter(YamlEngine.marshal(swapToYamlJobConfiguration()));
- String createTimeFormat =
LocalDateTime.now().format(DATE_TIME_FORMATTER);
- result.getProps().setProperty("create_time", createTimeFormat);
- result.getProps().setProperty("start_time_millis",
String.valueOf(System.currentTimeMillis()));
- result.getProps().setProperty("run_count", "1");
-
result.setJobListenerTypes(Collections.singletonList(PipelineElasticJobListener.class.getName()));
- return result;
- }
-
- /**
- * Swap to YAML pipeline job configuration.
- *
- * @return swapped YAML pipeline job configuration
- */
- YamlPipelineJobConfiguration swapToYamlJobConfiguration();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index 87ae49520dc..5e4c86b125f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.job.service;
+import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper;
@@ -24,6 +25,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItem
import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
import java.util.Optional;
@@ -36,9 +38,11 @@ public interface PipelineJobAPI extends TypedSPI {
/**
* Get YAML pipeline job configuration swapper.
*
+ * @param <T> type of YAML configuration
+ * @param <Y> type of pipeline job configuration
* @return YAML pipeline job configuration swapper
*/
- YamlPipelineJobConfigurationSwapper<?, ?> getYamlJobConfigurationSwapper();
+ <Y extends YamlConfiguration, T extends PipelineJobConfiguration>
YamlPipelineJobConfigurationSwapper<Y, T> getYamlJobConfigurationSwapper();
/**
* Get YAML pipeline job item progress swapper.
@@ -75,6 +79,15 @@ public interface PipelineJobAPI extends TypedSPI {
return Optional.empty();
}
+ /**
+ * Whether to force no sharding when convert to job configuration POJO.
+ *
+ * @return without sharding or not
+ */
+ default boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() {
+ return false;
+ }
+
/**
* Get pipeline job class.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
similarity index 56%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
copy to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
index 511c10d2fb9..3e5d3577660 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
@@ -15,12 +15,13 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.common.config.job;
+package org.apache.shardingsphere.data.pipeline.core.job.service;
-import
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import java.time.LocalDateTime;
@@ -28,43 +29,39 @@ import java.time.format.DateTimeFormatter;
import java.util.Collections;
/**
- * Pipeline job configuration.
+ * Pipeline job configuration manager.
*/
-public interface PipelineJobConfiguration {
+@RequiredArgsConstructor
+public final class PipelineJobConfigurationManager {
- DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+ private static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- /**
- * Get job id.
- *
- * @return job id
- */
- String getJobId();
-
- /**
- * Get job sharding count.
- *
- * @return job sharding count
- */
- int getJobShardingCount();
+ private final PipelineJobAPI jobAPI;
/**
- * Get source database type.
+ * Get job configuration.
*
- * @return source database type
+ * @param jobId job ID
+ * @param <T> type of pipeline job configuration
+ * @return pipeline job configuration
*/
- DatabaseType getSourceDatabaseType();
+ @SuppressWarnings("unchecked")
+ public <T extends PipelineJobConfiguration> T getJobConfiguration(final
String jobId) {
+ return (T)
jobAPI.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter());
+ }
/**
* Convert to job configuration POJO.
- *
+ *
+ * @param jobConfig pipeline job configuration
* @return converted job configuration POJO
*/
- default JobConfigurationPOJO convertToJobConfigurationPOJO() {
+ public JobConfigurationPOJO convertToJobConfigurationPOJO(final
PipelineJobConfiguration jobConfig) {
JobConfigurationPOJO result = new JobConfigurationPOJO();
- result.setJobName(getJobId());
- result.setShardingTotalCount(getJobShardingCount());
-
result.setJobParameter(YamlEngine.marshal(swapToYamlJobConfiguration()));
+ result.setJobName(jobConfig.getJobId());
+ int shardingTotalCount =
jobAPI.isForceNoShardingWhenConvertToJobConfigurationPOJO() ? 1 :
jobConfig.getJobShardingCount();
+ result.setShardingTotalCount(shardingTotalCount);
+
result.setJobParameter(YamlEngine.marshal(jobAPI.getYamlJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
String createTimeFormat =
LocalDateTime.now().format(DATE_TIME_FORMATTER);
result.getProps().setProperty("create_time", createTimeFormat);
result.getProps().setProperty("start_time_millis",
String.valueOf(System.currentTimeMillis()));
@@ -72,11 +69,4 @@ public interface PipelineJobConfiguration {
result.setJobListenerTypes(Collections.singletonList(PipelineElasticJobListener.class.getName()));
return result;
}
-
- /**
- * Swap to YAML pipeline job configuration.
- *
- * @return swapped YAML pipeline job configuration
- */
- YamlPipelineJobConfiguration swapToYamlJobConfiguration();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index b5414e71b54..80890cc2d46 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -53,18 +53,6 @@ public final class PipelineJobManager {
private final PipelineJobAPI jobAPI;
- /**
- * Get job configuration.
- *
- * @param jobId job ID
- * @param <T> type of pipeline job configuration
- * @return pipeline job configuration
- */
- @SuppressWarnings("unchecked")
- public <T extends PipelineJobConfiguration> T getJobConfiguration(final
String jobId) {
- return (T)
jobAPI.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter());
- }
-
/**
* Start job.
*
@@ -80,7 +68,7 @@ public final class PipelineJobManager {
return Optional.of(jobId);
}
governanceFacade.getJobFacade().getJob().create(jobId,
jobAPI.getJobClass());
- governanceFacade.getJobFacade().getConfiguration().persist(jobId,
jobConfig.convertToJobConfigurationPOJO());
+ governanceFacade.getJobFacade().getConfiguration().persist(jobId, new
PipelineJobConfigurationManager(jobAPI).convertToJobConfigurationPOJO(jobConfig));
return Optional.of(jobId);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
index cc3f9c46443..e6b8e980379 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
@@ -76,7 +76,7 @@ public final class TransmissionJobManager {
* @return job item infos
*/
public List<TransmissionJobItemInfo> getJobItemInfos(final String jobId) {
- PipelineJobConfiguration jobConfig = new
PipelineJobManager(jobAPI).getJobConfiguration(jobId);
+ PipelineJobConfiguration jobConfig = new
PipelineJobConfigurationManager(jobAPI).getJobConfiguration(jobId);
long startTimeMillis =
Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0"));
Map<Integer, TransmissionJobItemProgress> jobProgress =
getJobProgress(jobConfig);
List<TransmissionJobItemInfo> result = new LinkedList<>();
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
index 6f72afb054f..dfccb6f9405 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
@@ -19,8 +19,8 @@ package
org.apache.shardingsphere.migration.distsql.handler.update;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
@@ -48,7 +48,7 @@ public final class CheckMigrationJobUpdater implements
RALUpdater<CheckMigration
String algorithmTypeName = null == typeStrategy ? null :
typeStrategy.getName();
Properties algorithmProps = null == typeStrategy ? null :
typeStrategy.getProps();
String jobId = sqlStatement.getJobId();
- MigrationJobConfiguration jobConfig = new
PipelineJobManager(migrationJobAPI).getJobConfiguration(jobId);
+ MigrationJobConfiguration jobConfig = new
PipelineJobConfigurationManager(migrationJobAPI).getJobConfiguration(jobId);
verifyInventoryFinished(jobConfig);
checkJobAPI.createJobAndStart(new
CreateConsistencyCheckJobParameter(jobId, algorithmTypeName, algorithmProps,
jobConfig.getSourceDatabaseType(), jobConfig.getTargetDatabaseType()));
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 839f61bf204..416d4ee4204 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -36,9 +36,9 @@ import
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfigurati
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
+import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
@@ -46,8 +46,8 @@ import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipeline
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfigurationSwapper;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
@@ -66,11 +66,12 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.Increm
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
@@ -85,6 +86,7 @@ import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigur
import java.sql.SQLException;
import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -100,6 +102,8 @@ import java.util.stream.Collectors;
@Slf4j
public final class CDCJobAPI implements TransmissionJobAPI {
+ private static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper =
new YamlDataSourceConfigurationSwapper();
private final YamlRuleConfigurationSwapperEngine ruleConfigSwapperEngine =
new YamlRuleConfigurationSwapperEngine();
@@ -125,7 +129,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
log.warn("CDC job already exists in registry center, ignore, job
id is `{}`", jobConfig.getJobId());
} else {
governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(),
getJobClass());
- JobConfigurationPOJO jobConfigPOJO =
jobConfig.convertToJobConfigurationPOJO();
+ JobConfigurationPOJO jobConfigPOJO = new
PipelineJobConfigurationManager(this).convertToJobConfigurationPOJO(jobConfig);
jobConfigPOJO.setDisabled(true);
governanceFacade.getJobFacade().getConfiguration().persist(jobConfig.getJobId(),
jobConfigPOJO);
if (!param.isFull()) {
@@ -222,7 +226,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
jobConfigPOJO.setDisabled(disabled);
if (disabled) {
- jobConfigPOJO.getProps().setProperty("stop_time",
LocalDateTime.now().format(PipelineJobConfiguration.DATE_TIME_FORMATTER));
+ jobConfigPOJO.getProps().setProperty("stop_time",
LocalDateTime.now().format(DATE_TIME_FORMATTER));
jobConfigPOJO.getProps().setProperty("stop_time_millis",
String.valueOf(System.currentTimeMillis()));
} else {
jobConfigPOJO.getProps().setProperty("start_time_millis",
String.valueOf(System.currentTimeMillis()));
@@ -282,6 +286,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
return new CDCProcessContext(jobConfig.getJobId(),
jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())));
}
+ @SuppressWarnings("unchecked")
@Override
public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() {
return new YamlCDCJobConfigurationSwapper();
@@ -290,10 +295,15 @@ public final class CDCJobAPI implements
TransmissionJobAPI {
@Override
public PipelineJobInfo getJobInfo(final String jobId) {
PipelineJobMetaData jobMetaData = new
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
- CDCJobConfiguration jobConfig = new
PipelineJobManager(this).getJobConfiguration(jobId);
+ CDCJobConfiguration jobConfig = new
PipelineJobConfigurationManager(this).getJobConfiguration(jobId);
return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(),
String.join(", ", jobConfig.getSchemaTableNames()));
}
+ @Override
+ public boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() {
+ return true;
+ }
+
@Override
public void commit(final String jobId) {
}
@@ -304,7 +314,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
* @param jobId job id
*/
public void dropStreaming(final String jobId) {
- CDCJobConfiguration jobConfig = new
PipelineJobManager(this).getJobConfiguration(jobId);
+ CDCJobConfiguration jobConfig = new
PipelineJobConfigurationManager(this).getJobConfiguration(jobId);
ShardingSpherePreconditions.checkState(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).isDisabled(),
() -> new PipelineInternalException("Can't drop streaming job which is
active"));
new PipelineJobManager(this).drop(jobId);
cleanup(jobConfig);
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
index c15c38e6243..8e0aa41737f 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
@@ -21,11 +21,8 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
-import
org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import java.util.List;
@@ -67,18 +64,6 @@ public final class CDCJobConfiguration implements
PipelineJobConfiguration {
return jobShardingDataNodes.size();
}
- @Override
- public JobConfigurationPOJO convertToJobConfigurationPOJO() {
- JobConfigurationPOJO result =
PipelineJobConfiguration.super.convertToJobConfigurationPOJO();
- result.setShardingTotalCount(1);
- return result;
- }
-
- @Override
- public YamlPipelineJobConfiguration swapToYamlJobConfiguration() {
- return new
YamlCDCJobConfigurationSwapper().swapToYamlConfiguration(this);
- }
-
@RequiredArgsConstructor
@Getter
public static class SinkConfiguration {
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 05070a7a901..cc0faf0536c 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -47,7 +47,7 @@ import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextMan
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import
org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType;
@@ -72,7 +72,7 @@ public final class CDCBackendHandler {
private final CDCJobAPI jobAPI = new CDCJobAPI();
- private final PipelineJobManager jobManager = new
PipelineJobManager(jobAPI);
+ private final PipelineJobConfigurationManager jobConfigManager = new
PipelineJobConfigurationManager(jobAPI);
/**
* Get database name by job ID.
@@ -81,7 +81,7 @@ public final class CDCBackendHandler {
* @return database
*/
public String getDatabaseNameByJobId(final String jobId) {
- return
jobManager.<CDCJobConfiguration>getJobConfiguration(jobId).getDatabaseName();
+ return
jobConfigManager.<CDCJobConfiguration>getJobConfiguration(jobId).getDatabaseName();
}
/**
@@ -129,7 +129,7 @@ public final class CDCBackendHandler {
* @param connectionContext connection context
*/
public void startStreaming(final String jobId, final CDCConnectionContext
connectionContext, final Channel channel) {
- CDCJobConfiguration cdcJobConfig =
jobManager.getJobConfiguration(jobId);
+ CDCJobConfiguration cdcJobConfig =
jobConfigManager.getJobConfiguration(jobId);
ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new
PipelineJobNotFoundException(jobId));
if (PipelineJobCenter.isJobExisting(jobId)) {
PipelineJobCenter.stop(jobId);
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 367b7e19b17..c02766b6033 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -33,6 +33,7 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedCon
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
@@ -265,13 +266,14 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
}
private void fillInJobItemInfoWithCheckAlgorithm(final
ConsistencyCheckJobItemInfo result, final String checkJobId) {
- ConsistencyCheckJobConfiguration jobConfig = new
PipelineJobManager(this).getJobConfiguration(checkJobId);
+ ConsistencyCheckJobConfiguration jobConfig = new
PipelineJobConfigurationManager(this).getJobConfiguration(checkJobId);
result.setAlgorithmType(jobConfig.getAlgorithmTypeName());
if (null != jobConfig.getAlgorithmProps()) {
result.setAlgorithmProps(jobConfig.getAlgorithmProps().entrySet().stream().map(entry
-> String.format("'%s'='%s'", entry.getKey(),
entry.getValue())).collect(Collectors.joining(",")));
}
}
+ @SuppressWarnings("unchecked")
@Override
public YamlConsistencyCheckJobConfigurationSwapper
getYamlJobConfigurationSwapper() {
return new YamlConsistencyCheckJobConfigurationSwapper();
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
index 578eeeed0e0..04d647d5971 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
@@ -21,8 +21,6 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfigurationSwapper;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import java.util.Properties;
@@ -49,9 +47,4 @@ public final class ConsistencyCheckJobConfiguration
implements PipelineJobConfig
public int getJobShardingCount() {
return 1;
}
-
- @Override
- public YamlPipelineJobConfiguration swapToYamlJobConfiguration() {
- return new
YamlConsistencyCheckJobConfigurationSwapper().swapToYamlConfiguration(this);
- }
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 291d724dfdb..291ce0108df 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -30,6 +30,7 @@ import
org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
@@ -103,7 +104,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
jobItemManager.persistProgress(jobItemContext);
JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
TransmissionJobAPI jobAPI = (TransmissionJobAPI)
TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType());
- PipelineJobConfiguration parentJobConfig = new
PipelineJobManager(jobAPI).getJobConfiguration(parentJobId);
+ PipelineJobConfiguration parentJobConfig = new
PipelineJobConfigurationManager(jobAPI).getJobConfiguration(parentJobId);
try {
PipelineDataConsistencyChecker checker =
jobAPI.buildDataConsistencyChecker(
parentJobConfig,
jobAPI.buildProcessContext(parentJobConfig),
jobItemContext.getProgressContext());
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 331211fdf2d..8b1918c218a 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -53,6 +53,7 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInva
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
@@ -212,7 +213,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
public PipelineJobInfo getJobInfo(final String jobId) {
PipelineJobMetaData jobMetaData = new
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
List<String> sourceTables = new LinkedList<>();
- new
PipelineJobManager(this).<MigrationJobConfiguration>getJobConfiguration(jobId).getJobShardingDataNodes()
+ new
PipelineJobConfigurationManager(this).<MigrationJobConfiguration>getJobConfiguration(jobId).getJobShardingDataNodes()
.forEach(each -> each.getEntries().forEach(entry ->
entry.getDataNodes().forEach(dataNode ->
sourceTables.add(DataNodeUtils.formatWithSchema(dataNode)))));
return new PipelineJobInfo(jobMetaData, null, String.join(",",
sourceTables));
}
@@ -225,6 +226,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
}
}
+ @SuppressWarnings("unchecked")
@Override
public YamlMigrationJobConfigurationSwapper
getYamlJobConfigurationSwapper() {
return new YamlMigrationJobConfigurationSwapper();
@@ -322,7 +324,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
}
private void cleanTempTableOnRollback(final String jobId) throws
SQLException {
- MigrationJobConfiguration jobConfig = new
PipelineJobManager(this).getJobConfiguration(jobId);
+ MigrationJobConfiguration jobConfig = new
PipelineJobConfigurationManager(this).getJobConfiguration(jobId);
PipelineCommonSQLBuilder pipelineSQLBuilder = new
PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType());
TableAndSchemaNameMapper mapping = new
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
try (
@@ -346,7 +348,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
PipelineJobManager jobManager = new PipelineJobManager(this);
jobManager.stop(jobId);
dropCheckJobs(jobId);
- MigrationJobConfiguration jobConfig = new
PipelineJobManager(this).getJobConfiguration(jobId);
+ MigrationJobConfiguration jobConfig = new
PipelineJobConfigurationManager(this).getJobConfiguration(jobId);
refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
jobManager.drop(jobId);
log.info("Commit cost {} ms", System.currentTimeMillis() -
startTimeMillis);
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
index f45a30d512c..9ef3c648b07 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
@@ -22,9 +22,7 @@ import lombok.RequiredArgsConstructor;
import lombok.ToString;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
-import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import java.util.List;
@@ -69,9 +67,4 @@ public final class MigrationJobConfiguration implements
PipelineJobConfiguration
public int getJobShardingCount() {
return jobShardingDataNodes.size();
}
-
- @Override
- public YamlPipelineJobConfiguration swapToYamlJobConfiguration() {
- return new
YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration(this);
- }
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index 1fde78da7f5..5409c4cd9e2 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -23,8 +23,8 @@ import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter;
@@ -69,7 +69,7 @@ class ConsistencyCheckJobAPITest {
String parentJobId = parentJobConfig.getJobId();
String checkJobId = jobAPI.createJobAndStart(new
CreateConsistencyCheckJobParameter(parentJobId, null, null,
parentJobConfig.getSourceDatabaseType(),
parentJobConfig.getTargetDatabaseType()));
- ConsistencyCheckJobConfiguration checkJobConfig = new
PipelineJobManager(jobAPI).getJobConfiguration(checkJobId);
+ ConsistencyCheckJobConfiguration checkJobConfig = new
PipelineJobConfigurationManager(jobAPI).getJobConfiguration(checkJobId);
int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
String expectCheckJobId = new
ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId),
parentJobId, expectedSequence).marshal();
assertThat(checkJobConfig.getJobId(), is(expectCheckJobId));
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 13c3f46d689..e38bf513ad4 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -32,6 +32,7 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.Consistency
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
@@ -90,6 +91,8 @@ class MigrationJobAPITest {
private static MigrationJobAPI jobAPI;
+ private static PipelineJobConfigurationManager jobConfigManager;
+
private static PipelineJobManager jobManager;
private static TransmissionJobManager transmissionJobManager;
@@ -102,6 +105,7 @@ class MigrationJobAPITest {
static void beforeClass() {
PipelineContextUtils.mockModeConfigAndContextManager();
jobAPI = new MigrationJobAPI();
+ jobConfigManager = new PipelineJobConfigurationManager(jobAPI);
jobManager = new PipelineJobManager(jobAPI);
transmissionJobManager = new TransmissionJobManager(jobAPI);
jobItemManager = new
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
@@ -149,7 +153,7 @@ class MigrationJobAPITest {
void assertRollback() throws SQLException {
Optional<String> jobId =
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
- MigrationJobConfiguration jobConfig =
jobManager.getJobConfiguration(jobId.get());
+ MigrationJobConfiguration jobConfig =
jobConfigManager.getJobConfiguration(jobId.get());
initTableData(jobConfig);
PipelineDistributedBarrier mockBarrier =
mock(PipelineDistributedBarrier.class);
when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
@@ -161,7 +165,7 @@ class MigrationJobAPITest {
void assertCommit() {
Optional<String> jobId =
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
- MigrationJobConfiguration jobConfig =
jobManager.getJobConfiguration(jobId.get());
+ MigrationJobConfiguration jobConfig =
jobConfigManager.getJobConfiguration(jobId.get());
initTableData(jobConfig);
PipelineDistributedBarrier mockBarrier =
mock(PipelineDistributedBarrier.class);
when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
@@ -263,7 +267,7 @@ class MigrationJobAPITest {
initIntPrimaryEnvironment();
SourceTargetEntry sourceTargetEntry = new
SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order");
String jobId =
jobAPI.createJobAndStart(PipelineContextUtils.getContextKey(), new
MigrateTableStatement(Collections.singletonList(sourceTargetEntry),
"logic_db"));
- MigrationJobConfiguration actual =
jobManager.getJobConfiguration(jobId);
+ MigrationJobConfiguration actual =
jobConfigManager.getJobConfiguration(jobId);
assertThat(actual.getTargetDatabaseName(), is("logic_db"));
List<JobDataNodeLine> dataNodeLines = actual.getJobShardingDataNodes();
assertThat(dataNodeLines.size(), is(1));