This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a9eadde066a Refactor PipelineJobManager.getJobConfiguration() (#29083)
a9eadde066a is described below

commit a9eadde066ad14e86d336bdaca04b8d3838c4db5
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Nov 19 12:53:13 2023 +0800

    Refactor PipelineJobManager.getJobConfiguration() (#29083)
---
 .../core/job/service/InventoryIncrementalJobManager.java     |  5 ++---
 .../data/pipeline/core/job/service/PipelineJobManager.java   |  6 +++---
 .../distsql/handler/update/CheckMigrationJobUpdater.java     |  3 +--
 .../shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java | 10 ++++------
 .../data/pipeline/cdc/handler/CDCBackendHandler.java         |  5 ++---
 .../consistencycheck/api/impl/ConsistencyCheckJobAPI.java    |  2 +-
 .../consistencycheck/task/ConsistencyCheckTasksRunner.java   |  2 +-
 .../scenario/migration/api/impl/MigrationJobAPI.java         | 12 +++++-------
 .../api/impl/ConsistencyCheckJobAPITest.java                 |  2 +-
 .../scenario/migration/api/impl/MigrationJobAPITest.java     |  6 +++---
 10 files changed, 23 insertions(+), 30 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
index e31ee4796b2..764866c717e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
@@ -90,9 +90,8 @@ public final class InventoryIncrementalJobManager {
      */
     public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String 
jobId) {
         PipelineJobManager jobManager = new PipelineJobManager(jobAPI);
-        JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
-        PipelineJobConfiguration jobConfig = 
jobManager.getJobConfiguration(jobConfigPOJO);
-        long startTimeMillis = 
Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0"));
+        PipelineJobConfiguration jobConfig = 
jobManager.getJobConfiguration(jobId);
+        long startTimeMillis = 
Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0"));
         InventoryIncrementalJobManager inventoryIncrementalJobManager = new 
InventoryIncrementalJobManager(jobAPI);
         Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = 
inventoryIncrementalJobManager.getJobProgress(jobConfig);
         List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 0e8d2a2d682..b01ce97c1f2 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -57,13 +57,13 @@ public final class PipelineJobManager {
     /**
      * Get job configuration.
      *
-     * @param jobConfigPOJO job configuration POJO
+     * @param jobId job ID
      * @param <T> type of pipeline job configuration
      * @return pipeline job configuration
      */
     @SuppressWarnings("unchecked")
-    public <T extends PipelineJobConfiguration> T getJobConfiguration(final 
JobConfigurationPOJO jobConfigPOJO) {
-        return (T) 
jobAPI.getYamlJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
+    public <T extends PipelineJobConfiguration> T getJobConfiguration(final 
String jobId) {
+        return (T) 
jobAPI.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter());
     }
     
     /**
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
index f59ed72f6a6..1bf18932ecc 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.migration.distsql.handler.update;
 
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
@@ -49,7 +48,7 @@ public final class CheckMigrationJobUpdater implements 
RALUpdater<CheckMigration
         String algorithmTypeName = null == typeStrategy ? null : 
typeStrategy.getName();
         Properties algorithmProps = null == typeStrategy ? null : 
typeStrategy.getProps();
         String jobId = sqlStatement.getJobId();
-        MigrationJobConfiguration jobConfig = new 
PipelineJobManager(migrationJobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+        MigrationJobConfiguration jobConfig = new 
PipelineJobManager(migrationJobAPI).getJobConfiguration(jobId);
         verifyInventoryFinished(jobConfig);
         checkJobAPI.createJobAndStart(new 
CreateConsistencyCheckJobParameter(jobId, algorithmTypeName, algorithmProps, 
jobConfig.getSourceDatabaseType(), jobConfig.getTargetDatabaseType()));
     }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 6f711ab1992..0e41d2f8cac 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -289,9 +289,8 @@ public final class CDCJobAPI implements 
InventoryIncrementalJobAPI {
     
     @Override
     public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
-        JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
-        PipelineJobMetaData jobMetaData = new 
PipelineJobMetaData(jobConfigPOJO);
-        CDCJobConfiguration jobConfig = new 
PipelineJobManager(this).getJobConfiguration(jobConfigPOJO);
+        PipelineJobMetaData jobMetaData = new 
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+        CDCJobConfiguration jobConfig = new 
PipelineJobManager(this).getJobConfiguration(jobId);
         return new TableBasedPipelineJobInfo(jobMetaData, 
jobConfig.getDatabaseName(), String.join(", ", 
jobConfig.getSchemaTableNames()));
     }
     
@@ -305,9 +304,8 @@ public final class CDCJobAPI implements 
InventoryIncrementalJobAPI {
      * @param jobId job id
      */
     public void dropStreaming(final String jobId) {
-        JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
-        CDCJobConfiguration jobConfig = new 
PipelineJobManager(this).getJobConfiguration(jobConfigPOJO);
-        ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () 
-> new PipelineInternalException("Can't drop streaming job which is active"));
+        CDCJobConfiguration jobConfig = new 
PipelineJobManager(this).getJobConfiguration(jobId);
+        
ShardingSpherePreconditions.checkState(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).isDisabled(),
 () -> new PipelineInternalException("Can't drop streaming job which is 
active"));
         new PipelineJobManager(this).drop(jobId);
         cleanup(jobConfig);
     }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 1f1a9473cd4..05070a7a901 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -47,7 +47,6 @@ import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextMan
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
@@ -82,7 +81,7 @@ public final class CDCBackendHandler {
      * @return database
      */
     public String getDatabaseNameByJobId(final String jobId) {
-        return 
jobManager.<CDCJobConfiguration>getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)).getDatabaseName();
+        return 
jobManager.<CDCJobConfiguration>getJobConfiguration(jobId).getDatabaseName();
     }
     
     /**
@@ -130,7 +129,7 @@ public final class CDCBackendHandler {
      * @param connectionContext connection context
      */
     public void startStreaming(final String jobId, final CDCConnectionContext 
connectionContext, final Channel channel) {
-        CDCJobConfiguration cdcJobConfig = 
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+        CDCJobConfiguration cdcJobConfig = 
jobManager.getJobConfiguration(jobId);
         ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new 
PipelineJobNotFoundException(jobId));
         if (PipelineJobCenter.isJobExisting(jobId)) {
             PipelineJobCenter.stop(jobId);
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 51f8e5edc11..63e6ff42436 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -268,7 +268,7 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
     }
     
     private void fillInJobItemInfoWithCheckAlgorithm(final 
ConsistencyCheckJobItemInfo result, final String checkJobId) {
-        ConsistencyCheckJobConfiguration jobConfig = new 
PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
+        ConsistencyCheckJobConfiguration jobConfig = new 
PipelineJobManager(this).getJobConfiguration(checkJobId);
         result.setAlgorithmType(jobConfig.getAlgorithmTypeName());
         if (null != jobConfig.getAlgorithmProps()) {
             
result.setAlgorithmProps(jobConfig.getAlgorithmProps().entrySet().stream().map(entry
 -> String.format("'%s'='%s'", entry.getKey(), 
entry.getValue())).collect(Collectors.joining(",")));
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 8259022cf89..136e92b3dc5 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -103,7 +103,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
             jobItemManager.persistProgress(jobItemContext);
             JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
             InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) 
TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType());
-            PipelineJobConfiguration parentJobConfig = new 
PipelineJobManager(jobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(parentJobId));
+            PipelineJobConfiguration parentJobConfig = new 
PipelineJobManager(jobAPI).getJobConfiguration(parentJobId);
             try {
                 PipelineDataConsistencyChecker checker = 
jobAPI.buildPipelineDataConsistencyChecker(
                         parentJobConfig, 
jobAPI.buildPipelineProcessContext(parentJobConfig), 
jobItemContext.getProgressContext());
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index c5d551a55ce..6ffa6371715 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -67,7 +67,6 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
 import 
org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
 import 
org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser;
@@ -211,11 +210,10 @@ public final class MigrationJobAPI implements 
InventoryIncrementalJobAPI {
     
     @Override
     public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
-        JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
-        PipelineJobMetaData jobMetaData = new 
PipelineJobMetaData(jobConfigPOJO);
+        PipelineJobMetaData jobMetaData = new 
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
         List<String> sourceTables = new LinkedList<>();
-        new 
PipelineJobManager(this).<MigrationJobConfiguration>getJobConfiguration(jobConfigPOJO).getJobShardingDataNodes().forEach(each
 -> each.getEntries().forEach(entry -> entry.getDataNodes()
-                .forEach(dataNode -> 
sourceTables.add(DataNodeUtils.formatWithSchema(dataNode)))));
+        new 
PipelineJobManager(this).<MigrationJobConfiguration>getJobConfiguration(jobId).getJobShardingDataNodes()
+                .forEach(each -> each.getEntries().forEach(entry -> 
entry.getDataNodes().forEach(dataNode -> 
sourceTables.add(DataNodeUtils.formatWithSchema(dataNode)))));
         return new TableBasedPipelineJobInfo(jobMetaData, String.join(",", 
sourceTables));
     }
     
@@ -324,7 +322,7 @@ public final class MigrationJobAPI implements 
InventoryIncrementalJobAPI {
     }
     
     private void cleanTempTableOnRollback(final String jobId) throws 
SQLException {
-        MigrationJobConfiguration jobConfig = new 
PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+        MigrationJobConfiguration jobConfig = new 
PipelineJobManager(this).getJobConfiguration(jobId);
         PipelineCommonSQLBuilder pipelineSQLBuilder = new 
PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType());
         TableAndSchemaNameMapper mapping = new 
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
         try (
@@ -348,7 +346,7 @@ public final class MigrationJobAPI implements 
InventoryIncrementalJobAPI {
         PipelineJobManager jobManager = new PipelineJobManager(this);
         jobManager.stop(jobId);
         dropCheckJobs(jobId);
-        MigrationJobConfiguration jobConfig = new 
PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+        MigrationJobConfiguration jobConfig = new 
PipelineJobManager(this).getJobConfiguration(jobId);
         refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
         jobManager.drop(jobId);
         log.info("Commit cost {} ms", System.currentTimeMillis() - 
startTimeMillis);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index 7762178b079..fe9ea2db83c 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -69,7 +69,7 @@ class ConsistencyCheckJobAPITest {
         String parentJobId = parentJobConfig.getJobId();
         String checkJobId = jobAPI.createJobAndStart(new 
CreateConsistencyCheckJobParameter(parentJobId, null, null,
                 parentJobConfig.getSourceDatabaseType(), 
parentJobConfig.getTargetDatabaseType()));
-        ConsistencyCheckJobConfiguration checkJobConfig = new 
PipelineJobManager(jobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
+        ConsistencyCheckJobConfiguration checkJobConfig = new 
PipelineJobManager(jobAPI).getJobConfiguration(checkJobId);
         int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
         String expectCheckJobId = new 
ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), 
parentJobId, expectedSequence).marshal();
         assertThat(checkJobConfig.getJobId(), is(expectCheckJobId));
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 3e67089680a..84a4e55572e 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -150,7 +150,7 @@ class MigrationJobAPITest {
     void assertRollback() throws SQLException {
         Optional<String> jobId = 
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
         assertTrue(jobId.isPresent());
-        MigrationJobConfiguration jobConfig = 
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get()));
+        MigrationJobConfiguration jobConfig = 
jobManager.getJobConfiguration(jobId.get());
         initTableData(jobConfig);
         PipelineDistributedBarrier mockBarrier = 
mock(PipelineDistributedBarrier.class);
         
when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
@@ -162,7 +162,7 @@ class MigrationJobAPITest {
     void assertCommit() {
         Optional<String> jobId = 
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
         assertTrue(jobId.isPresent());
-        MigrationJobConfiguration jobConfig = 
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get()));
+        MigrationJobConfiguration jobConfig = 
jobManager.getJobConfiguration(jobId.get());
         initTableData(jobConfig);
         PipelineDistributedBarrier mockBarrier = 
mock(PipelineDistributedBarrier.class);
         
when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
@@ -285,7 +285,7 @@ class MigrationJobAPITest {
         initIntPrimaryEnvironment();
         SourceTargetEntry sourceTargetEntry = new 
SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order");
         String jobId = 
jobAPI.createJobAndStart(PipelineContextUtils.getContextKey(), new 
MigrateTableStatement(Collections.singletonList(sourceTargetEntry), 
"logic_db"));
-        MigrationJobConfiguration actual = 
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+        MigrationJobConfiguration actual = 
jobManager.getJobConfiguration(jobId);
         assertThat(actual.getTargetDatabaseName(), is("logic_db"));
         List<JobDataNodeLine> dataNodeLines = actual.getJobShardingDataNodes();
         assertThat(dataNodeLines.size(), is(1));

Reply via email to