This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 309d6f31df7 Add PipelineJobConfigurationManager (#29198)
309d6f31df7 is described below

commit 309d6f31df767394d0a21d2f4dc4fd085e885236
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Nov 24 18:37:31 2023 +0800

    Add PipelineJobConfigurationManager (#29198)
    
    * Add PipelineJobConfigurationLoader
    
    * Refactor PipelineJobConfigurationLoader
    
    * Add PipelineJobConfigurationLoader
    
    * Add PipelineJobConfigurationManager
---
 .../config/job/PipelineJobConfiguration.java       | 35 -------------
 .../pipeline/core/job/service/PipelineJobAPI.java  | 15 +++++-
 .../service/PipelineJobConfigurationManager.java}  | 58 +++++++++-------------
 .../core/job/service/PipelineJobManager.java       | 14 +-----
 .../core/job/service/TransmissionJobManager.java   |  2 +-
 .../handler/update/CheckMigrationJobUpdater.java   |  4 +-
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      | 26 +++++++---
 .../cdc/config/job/CDCJobConfiguration.java        | 15 ------
 .../pipeline/cdc/handler/CDCBackendHandler.java    |  8 +--
 .../api/impl/ConsistencyCheckJobAPI.java           |  4 +-
 .../config/ConsistencyCheckJobConfiguration.java   |  7 ---
 .../task/ConsistencyCheckTasksRunner.java          |  3 +-
 .../migration/api/impl/MigrationJobAPI.java        |  8 +--
 .../config/MigrationJobConfiguration.java          |  7 ---
 .../api/impl/ConsistencyCheckJobAPITest.java       |  4 +-
 .../migration/api/impl/MigrationJobAPITest.java    | 10 ++--
 16 files changed, 83 insertions(+), 137 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
index 511c10d2fb9..e6f6516b7f4 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
@@ -17,23 +17,13 @@
 
 package org.apache.shardingsphere.data.pipeline.common.config.job;
 
-import 
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.Collections;
 
 /**
  * Pipeline job configuration.
  */
 public interface PipelineJobConfiguration {
     
-    DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
-    
     /**
      * Get job id.
      *
@@ -54,29 +44,4 @@ public interface PipelineJobConfiguration {
      * @return source database type
      */
     DatabaseType getSourceDatabaseType();
-    
-    /**
-     * Convert to job configuration POJO.
-     * 
-     * @return converted job configuration POJO
-     */
-    default JobConfigurationPOJO convertToJobConfigurationPOJO() {
-        JobConfigurationPOJO result = new JobConfigurationPOJO();
-        result.setJobName(getJobId());
-        result.setShardingTotalCount(getJobShardingCount());
-        
result.setJobParameter(YamlEngine.marshal(swapToYamlJobConfiguration()));
-        String createTimeFormat = 
LocalDateTime.now().format(DATE_TIME_FORMATTER);
-        result.getProps().setProperty("create_time", createTimeFormat);
-        result.getProps().setProperty("start_time_millis", 
String.valueOf(System.currentTimeMillis()));
-        result.getProps().setProperty("run_count", "1");
-        
result.setJobListenerTypes(Collections.singletonList(PipelineElasticJobListener.class.getName()));
-        return result;
-    }
-    
-    /**
-     * Swap to YAML pipeline job configuration.
-     * 
-     * @return swapped YAML pipeline job configuration
-     */
-    YamlPipelineJobConfiguration swapToYamlJobConfiguration();
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index 87ae49520dc..5e4c86b125f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.job.service;
 
+import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper;
@@ -24,6 +25,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItem
 import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
 import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 
 import java.util.Optional;
 
@@ -36,9 +38,11 @@ public interface PipelineJobAPI extends TypedSPI {
     /**
      * Get YAML pipeline job configuration swapper.
      * 
+     * @param <T> type of YAML configuration
+     * @param <Y> type of pipeline job configuration
      * @return YAML pipeline job configuration swapper
      */
-    YamlPipelineJobConfigurationSwapper<?, ?> getYamlJobConfigurationSwapper();
+    <Y extends YamlConfiguration, T extends PipelineJobConfiguration> 
YamlPipelineJobConfigurationSwapper<Y, T> getYamlJobConfigurationSwapper();
     
     /**
      * Get YAML pipeline job item progress swapper.
@@ -75,6 +79,15 @@ public interface PipelineJobAPI extends TypedSPI {
         return Optional.empty();
     }
     
+    /**
+     * Whether to force no sharding when convert to job configuration POJO.
+     * 
+     * @return without sharding or not
+     */
+    default boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() {
+        return false;
+    }
+    
     /**
      * Get pipeline job class.
      * 
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
similarity index 56%
copy from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
copy to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
index 511c10d2fb9..3e5d3577660 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
@@ -15,12 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.common.config.job;
+package org.apache.shardingsphere.data.pipeline.core.job.service;
 
-import 
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 
 import java.time.LocalDateTime;
@@ -28,43 +29,39 @@ import java.time.format.DateTimeFormatter;
 import java.util.Collections;
 
 /**
- * Pipeline job configuration.
+ * Pipeline job configuration manager.
  */
-public interface PipelineJobConfiguration {
+@RequiredArgsConstructor
+public final class PipelineJobConfigurationManager {
     
-    DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+    private static final DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
     
-    /**
-     * Get job id.
-     *
-     * @return job id
-     */
-    String getJobId();
-    
-    /**
-     * Get job sharding count.
-     *
-     * @return job sharding count
-     */
-    int getJobShardingCount();
+    private final PipelineJobAPI jobAPI;
     
     /**
-     * Get source database type.
+     * Get job configuration.
      *
-     * @return source database type
+     * @param jobId job ID
+     * @param <T> type of pipeline job configuration
+     * @return pipeline job configuration
      */
-    DatabaseType getSourceDatabaseType();
+    @SuppressWarnings("unchecked")
+    public <T extends PipelineJobConfiguration> T getJobConfiguration(final 
String jobId) {
+        return (T) 
jobAPI.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter());
+    }
     
     /**
      * Convert to job configuration POJO.
-     * 
+     *
+     * @param jobConfig pipeline job configuration
      * @return converted job configuration POJO
      */
-    default JobConfigurationPOJO convertToJobConfigurationPOJO() {
+    public JobConfigurationPOJO convertToJobConfigurationPOJO(final 
PipelineJobConfiguration jobConfig) {
         JobConfigurationPOJO result = new JobConfigurationPOJO();
-        result.setJobName(getJobId());
-        result.setShardingTotalCount(getJobShardingCount());
-        
result.setJobParameter(YamlEngine.marshal(swapToYamlJobConfiguration()));
+        result.setJobName(jobConfig.getJobId());
+        int shardingTotalCount = 
jobAPI.isForceNoShardingWhenConvertToJobConfigurationPOJO() ? 1 : 
jobConfig.getJobShardingCount();
+        result.setShardingTotalCount(shardingTotalCount);
+        
result.setJobParameter(YamlEngine.marshal(jobAPI.getYamlJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
         String createTimeFormat = 
LocalDateTime.now().format(DATE_TIME_FORMATTER);
         result.getProps().setProperty("create_time", createTimeFormat);
         result.getProps().setProperty("start_time_millis", 
String.valueOf(System.currentTimeMillis()));
@@ -72,11 +69,4 @@ public interface PipelineJobConfiguration {
         
result.setJobListenerTypes(Collections.singletonList(PipelineElasticJobListener.class.getName()));
         return result;
     }
-    
-    /**
-     * Swap to YAML pipeline job configuration.
-     * 
-     * @return swapped YAML pipeline job configuration
-     */
-    YamlPipelineJobConfiguration swapToYamlJobConfiguration();
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index b5414e71b54..80890cc2d46 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -53,18 +53,6 @@ public final class PipelineJobManager {
     
     private final PipelineJobAPI jobAPI;
     
-    /**
-     * Get job configuration.
-     *
-     * @param jobId job ID
-     * @param <T> type of pipeline job configuration
-     * @return pipeline job configuration
-     */
-    @SuppressWarnings("unchecked")
-    public <T extends PipelineJobConfiguration> T getJobConfiguration(final 
String jobId) {
-        return (T) 
jobAPI.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter());
-    }
-    
     /**
      * Start job.
      *
@@ -80,7 +68,7 @@ public final class PipelineJobManager {
             return Optional.of(jobId);
         }
         governanceFacade.getJobFacade().getJob().create(jobId, 
jobAPI.getJobClass());
-        governanceFacade.getJobFacade().getConfiguration().persist(jobId, 
jobConfig.convertToJobConfigurationPOJO());
+        governanceFacade.getJobFacade().getConfiguration().persist(jobId, new 
PipelineJobConfigurationManager(jobAPI).convertToJobConfigurationPOJO(jobConfig));
         return Optional.of(jobId);
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
index cc3f9c46443..e6b8e980379 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
@@ -76,7 +76,7 @@ public final class TransmissionJobManager {
      * @return job item infos
      */
     public List<TransmissionJobItemInfo> getJobItemInfos(final String jobId) {
-        PipelineJobConfiguration jobConfig = new 
PipelineJobManager(jobAPI).getJobConfiguration(jobId);
+        PipelineJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(jobAPI).getJobConfiguration(jobId);
         long startTimeMillis = 
Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0"));
         Map<Integer, TransmissionJobItemProgress> jobProgress = 
getJobProgress(jobConfig);
         List<TransmissionJobItemInfo> result = new LinkedList<>();
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
index 6f72afb054f..dfccb6f9405 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
@@ -19,8 +19,8 @@ package 
org.apache.shardingsphere.migration.distsql.handler.update;
 
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
@@ -48,7 +48,7 @@ public final class CheckMigrationJobUpdater implements 
RALUpdater<CheckMigration
         String algorithmTypeName = null == typeStrategy ? null : 
typeStrategy.getName();
         Properties algorithmProps = null == typeStrategy ? null : 
typeStrategy.getProps();
         String jobId = sqlStatement.getJobId();
-        MigrationJobConfiguration jobConfig = new 
PipelineJobManager(migrationJobAPI).getJobConfiguration(jobId);
+        MigrationJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(migrationJobAPI).getJobConfiguration(jobId);
         verifyInventoryFinished(jobConfig);
         checkJobAPI.createJobAndStart(new 
CreateConsistencyCheckJobParameter(jobId, algorithmTypeName, algorithmProps, 
jobConfig.getSourceDatabaseType(), jobConfig.getTargetDatabaseType()));
     }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 839f61bf204..416d4ee4204 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -36,9 +36,9 @@ import 
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfigurati
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
+import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
 import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
@@ -46,8 +46,8 @@ import 
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipeline
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfigurationSwapper;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
@@ -66,11 +66,12 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.Increm
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
@@ -85,6 +86,7 @@ import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigur
 
 import java.sql.SQLException;
 import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -100,6 +102,8 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class CDCJobAPI implements TransmissionJobAPI {
     
+    private static final DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+    
     private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = 
new YamlDataSourceConfigurationSwapper();
     
     private final YamlRuleConfigurationSwapperEngine ruleConfigSwapperEngine = 
new YamlRuleConfigurationSwapperEngine();
@@ -125,7 +129,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
             log.warn("CDC job already exists in registry center, ignore, job 
id is `{}`", jobConfig.getJobId());
         } else {
             
governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(), 
getJobClass());
-            JobConfigurationPOJO jobConfigPOJO = 
jobConfig.convertToJobConfigurationPOJO();
+            JobConfigurationPOJO jobConfigPOJO = new 
PipelineJobConfigurationManager(this).convertToJobConfigurationPOJO(jobConfig);
             jobConfigPOJO.setDisabled(true);
             
governanceFacade.getJobFacade().getConfiguration().persist(jobConfig.getJobId(),
 jobConfigPOJO);
             if (!param.isFull()) {
@@ -222,7 +226,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
         JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
         jobConfigPOJO.setDisabled(disabled);
         if (disabled) {
-            jobConfigPOJO.getProps().setProperty("stop_time", 
LocalDateTime.now().format(PipelineJobConfiguration.DATE_TIME_FORMATTER));
+            jobConfigPOJO.getProps().setProperty("stop_time", 
LocalDateTime.now().format(DATE_TIME_FORMATTER));
             jobConfigPOJO.getProps().setProperty("stop_time_millis", 
String.valueOf(System.currentTimeMillis()));
         } else {
             jobConfigPOJO.getProps().setProperty("start_time_millis", 
String.valueOf(System.currentTimeMillis()));
@@ -282,6 +286,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
         return new CDCProcessContext(jobConfig.getJobId(), 
jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())));
     }
     
+    @SuppressWarnings("unchecked")
     @Override
     public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() {
         return new YamlCDCJobConfigurationSwapper();
@@ -290,10 +295,15 @@ public final class CDCJobAPI implements 
TransmissionJobAPI {
     @Override
     public PipelineJobInfo getJobInfo(final String jobId) {
         PipelineJobMetaData jobMetaData = new 
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
-        CDCJobConfiguration jobConfig = new 
PipelineJobManager(this).getJobConfiguration(jobId);
+        CDCJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(this).getJobConfiguration(jobId);
         return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), 
String.join(", ", jobConfig.getSchemaTableNames()));
     }
     
+    @Override
+    public boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() {
+        return true;
+    }
+    
     @Override
     public void commit(final String jobId) {
     }
@@ -304,7 +314,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
      * @param jobId job id
      */
     public void dropStreaming(final String jobId) {
-        CDCJobConfiguration jobConfig = new 
PipelineJobManager(this).getJobConfiguration(jobId);
+        CDCJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(this).getJobConfiguration(jobId);
         
ShardingSpherePreconditions.checkState(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).isDisabled(),
 () -> new PipelineInternalException("Can't drop streaming job which is 
active"));
         new PipelineJobManager(this).drop(jobId);
         cleanup(jobConfig);
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
index c15c38e6243..8e0aa41737f 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
@@ -21,11 +21,8 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
-import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 
 import java.util.List;
@@ -67,18 +64,6 @@ public final class CDCJobConfiguration implements 
PipelineJobConfiguration {
         return jobShardingDataNodes.size();
     }
     
-    @Override
-    public JobConfigurationPOJO convertToJobConfigurationPOJO() {
-        JobConfigurationPOJO result = 
PipelineJobConfiguration.super.convertToJobConfigurationPOJO();
-        result.setShardingTotalCount(1);
-        return result;
-    }
-    
-    @Override
-    public YamlPipelineJobConfiguration swapToYamlJobConfiguration() {
-        return new 
YamlCDCJobConfigurationSwapper().swapToYamlConfiguration(this);
-    }
-    
     @RequiredArgsConstructor
     @Getter
     public static class SinkConfiguration {
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 05070a7a901..cc0faf0536c 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -47,7 +47,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextMan
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
 import 
org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType;
@@ -72,7 +72,7 @@ public final class CDCBackendHandler {
     
     private final CDCJobAPI jobAPI = new CDCJobAPI();
     
-    private final PipelineJobManager jobManager = new 
PipelineJobManager(jobAPI);
+    private final PipelineJobConfigurationManager jobConfigManager = new 
PipelineJobConfigurationManager(jobAPI);
     
     /**
      * Get database name by job ID.
@@ -81,7 +81,7 @@ public final class CDCBackendHandler {
      * @return database
      */
     public String getDatabaseNameByJobId(final String jobId) {
-        return 
jobManager.<CDCJobConfiguration>getJobConfiguration(jobId).getDatabaseName();
+        return 
jobConfigManager.<CDCJobConfiguration>getJobConfiguration(jobId).getDatabaseName();
     }
     
     /**
@@ -129,7 +129,7 @@ public final class CDCBackendHandler {
      * @param connectionContext connection context
      */
     public void startStreaming(final String jobId, final CDCConnectionContext 
connectionContext, final Channel channel) {
-        CDCJobConfiguration cdcJobConfig = 
jobManager.getJobConfiguration(jobId);
+        CDCJobConfiguration cdcJobConfig = 
jobConfigManager.getJobConfiguration(jobId);
         ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new 
PipelineJobNotFoundException(jobId));
         if (PipelineJobCenter.isJobExisting(jobId)) {
             PipelineJobCenter.stop(jobId);
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 367b7e19b17..c02766b6033 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -33,6 +33,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedCon
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
@@ -265,13 +266,14 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
     }
     
     private void fillInJobItemInfoWithCheckAlgorithm(final 
ConsistencyCheckJobItemInfo result, final String checkJobId) {
-        ConsistencyCheckJobConfiguration jobConfig = new 
PipelineJobManager(this).getJobConfiguration(checkJobId);
+        ConsistencyCheckJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(this).getJobConfiguration(checkJobId);
         result.setAlgorithmType(jobConfig.getAlgorithmTypeName());
         if (null != jobConfig.getAlgorithmProps()) {
             
result.setAlgorithmProps(jobConfig.getAlgorithmProps().entrySet().stream().map(entry
 -> String.format("'%s'='%s'", entry.getKey(), 
entry.getValue())).collect(Collectors.joining(",")));
         }
     }
     
+    @SuppressWarnings("unchecked")
     @Override
     public YamlConsistencyCheckJobConfigurationSwapper 
getYamlJobConfigurationSwapper() {
         return new YamlConsistencyCheckJobConfigurationSwapper();
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
index 578eeeed0e0..04d647d5971 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
@@ -21,8 +21,6 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfigurationSwapper;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 
 import java.util.Properties;
@@ -49,9 +47,4 @@ public final class ConsistencyCheckJobConfiguration 
implements PipelineJobConfig
     public int getJobShardingCount() {
         return 1;
     }
-    
-    @Override
-    public YamlPipelineJobConfiguration swapToYamlJobConfiguration() {
-        return new 
YamlConsistencyCheckJobConfigurationSwapper().swapToYamlConfiguration(this);
-    }
 }
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 291d724dfdb..291ce0108df 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -30,6 +30,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
@@ -103,7 +104,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
             jobItemManager.persistProgress(jobItemContext);
             JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
             TransmissionJobAPI jobAPI = (TransmissionJobAPI) 
TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType());
-            PipelineJobConfiguration parentJobConfig = new 
PipelineJobManager(jobAPI).getJobConfiguration(parentJobId);
+            PipelineJobConfiguration parentJobConfig = new 
PipelineJobConfigurationManager(jobAPI).getJobConfiguration(parentJobId);
             try {
                 PipelineDataConsistencyChecker checker = 
jobAPI.buildDataConsistencyChecker(
                         parentJobConfig, 
jobAPI.buildProcessContext(parentJobConfig), 
jobItemContext.getProgressContext());
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 331211fdf2d..8b1918c218a 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -53,6 +53,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInva
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
@@ -212,7 +213,7 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
     public PipelineJobInfo getJobInfo(final String jobId) {
         PipelineJobMetaData jobMetaData = new 
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
         List<String> sourceTables = new LinkedList<>();
-        new 
PipelineJobManager(this).<MigrationJobConfiguration>getJobConfiguration(jobId).getJobShardingDataNodes()
+        new 
PipelineJobConfigurationManager(this).<MigrationJobConfiguration>getJobConfiguration(jobId).getJobShardingDataNodes()
                 .forEach(each -> each.getEntries().forEach(entry -> 
entry.getDataNodes().forEach(dataNode -> 
sourceTables.add(DataNodeUtils.formatWithSchema(dataNode)))));
         return new PipelineJobInfo(jobMetaData, null, String.join(",", 
sourceTables));
     }
@@ -225,6 +226,7 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
         }
     }
     
+    @SuppressWarnings("unchecked")
     @Override
     public YamlMigrationJobConfigurationSwapper 
getYamlJobConfigurationSwapper() {
         return new YamlMigrationJobConfigurationSwapper();
@@ -322,7 +324,7 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
     }
     
     private void cleanTempTableOnRollback(final String jobId) throws 
SQLException {
-        MigrationJobConfiguration jobConfig = new 
PipelineJobManager(this).getJobConfiguration(jobId);
+        MigrationJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(this).getJobConfiguration(jobId);
         PipelineCommonSQLBuilder pipelineSQLBuilder = new 
PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType());
         TableAndSchemaNameMapper mapping = new 
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
         try (
@@ -346,7 +348,7 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
         PipelineJobManager jobManager = new PipelineJobManager(this);
         jobManager.stop(jobId);
         dropCheckJobs(jobId);
-        MigrationJobConfiguration jobConfig = new 
PipelineJobManager(this).getJobConfiguration(jobId);
+        MigrationJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(this).getJobConfiguration(jobId);
         refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
         jobManager.drop(jobId);
         log.info("Commit cost {} ms", System.currentTimeMillis() - 
startTimeMillis);
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
index f45a30d512c..9ef3c648b07 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
@@ -22,9 +22,7 @@ import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
-import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 
 import java.util.List;
@@ -69,9 +67,4 @@ public final class MigrationJobConfiguration implements 
PipelineJobConfiguration
     public int getJobShardingCount() {
         return jobShardingDataNodes.size();
     }
-    
-    @Override
-    public YamlPipelineJobConfiguration swapToYamlJobConfiguration() {
-        return new 
YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration(this);
-    }
 }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index 1fde78da7f5..5409c4cd9e2 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -23,8 +23,8 @@ import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter;
@@ -69,7 +69,7 @@ class ConsistencyCheckJobAPITest {
         String parentJobId = parentJobConfig.getJobId();
         String checkJobId = jobAPI.createJobAndStart(new 
CreateConsistencyCheckJobParameter(parentJobId, null, null,
                 parentJobConfig.getSourceDatabaseType(), 
parentJobConfig.getTargetDatabaseType()));
-        ConsistencyCheckJobConfiguration checkJobConfig = new 
PipelineJobManager(jobAPI).getJobConfiguration(checkJobId);
+        ConsistencyCheckJobConfiguration checkJobConfig = new 
PipelineJobConfigurationManager(jobAPI).getJobConfiguration(checkJobId);
         int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
         String expectCheckJobId = new 
ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), 
parentJobId, expectedSequence).marshal();
         assertThat(checkJobConfig.getJobId(), is(expectCheckJobId));
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 13c3f46d689..e38bf513ad4 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -32,6 +32,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.Consistency
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
@@ -90,6 +91,8 @@ class MigrationJobAPITest {
     
     private static MigrationJobAPI jobAPI;
     
+    private static PipelineJobConfigurationManager jobConfigManager;
+    
     private static PipelineJobManager jobManager;
     
     private static TransmissionJobManager transmissionJobManager;
@@ -102,6 +105,7 @@ class MigrationJobAPITest {
     static void beforeClass() {
         PipelineContextUtils.mockModeConfigAndContextManager();
         jobAPI = new MigrationJobAPI();
+        jobConfigManager = new PipelineJobConfigurationManager(jobAPI);
         jobManager = new PipelineJobManager(jobAPI);
         transmissionJobManager = new TransmissionJobManager(jobAPI);
         jobItemManager = new 
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
@@ -149,7 +153,7 @@ class MigrationJobAPITest {
     void assertRollback() throws SQLException {
         Optional<String> jobId = 
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
         assertTrue(jobId.isPresent());
-        MigrationJobConfiguration jobConfig = 
jobManager.getJobConfiguration(jobId.get());
+        MigrationJobConfiguration jobConfig = 
jobConfigManager.getJobConfiguration(jobId.get());
         initTableData(jobConfig);
         PipelineDistributedBarrier mockBarrier = 
mock(PipelineDistributedBarrier.class);
         
when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
@@ -161,7 +165,7 @@ class MigrationJobAPITest {
     void assertCommit() {
         Optional<String> jobId = 
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
         assertTrue(jobId.isPresent());
-        MigrationJobConfiguration jobConfig = 
jobManager.getJobConfiguration(jobId.get());
+        MigrationJobConfiguration jobConfig = 
jobConfigManager.getJobConfiguration(jobId.get());
         initTableData(jobConfig);
         PipelineDistributedBarrier mockBarrier = 
mock(PipelineDistributedBarrier.class);
         
when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
@@ -263,7 +267,7 @@ class MigrationJobAPITest {
         initIntPrimaryEnvironment();
         SourceTargetEntry sourceTargetEntry = new 
SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order");
         String jobId = 
jobAPI.createJobAndStart(PipelineContextUtils.getContextKey(), new 
MigrateTableStatement(Collections.singletonList(sourceTargetEntry), 
"logic_db"));
-        MigrationJobConfiguration actual = 
jobManager.getJobConfiguration(jobId);
+        MigrationJobConfiguration actual = 
jobConfigManager.getJobConfiguration(jobId);
         assertThat(actual.getTargetDatabaseName(), is("logic_db"));
         List<JobDataNodeLine> dataNodeLines = actual.getJobShardingDataNodes();
         assertThat(dataNodeLines.size(), is(1));


Reply via email to