This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a9eadde066a Refactor PipelineJobManager.getJobConfiguration() (#29083)
a9eadde066a is described below
commit a9eadde066ad14e86d336bdaca04b8d3838c4db5
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Nov 19 12:53:13 2023 +0800
Refactor PipelineJobManager.getJobConfiguration() (#29083)
---
.../core/job/service/InventoryIncrementalJobManager.java | 5 ++---
.../data/pipeline/core/job/service/PipelineJobManager.java | 6 +++---
.../distsql/handler/update/CheckMigrationJobUpdater.java | 3 +--
.../shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java | 10 ++++------
.../data/pipeline/cdc/handler/CDCBackendHandler.java | 5 ++---
.../consistencycheck/api/impl/ConsistencyCheckJobAPI.java | 2 +-
.../consistencycheck/task/ConsistencyCheckTasksRunner.java | 2 +-
.../scenario/migration/api/impl/MigrationJobAPI.java | 12 +++++-------
.../api/impl/ConsistencyCheckJobAPITest.java | 2 +-
.../scenario/migration/api/impl/MigrationJobAPITest.java | 6 +++---
10 files changed, 23 insertions(+), 30 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
index e31ee4796b2..764866c717e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
@@ -90,9 +90,8 @@ public final class InventoryIncrementalJobManager {
*/
public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String
jobId) {
PipelineJobManager jobManager = new PipelineJobManager(jobAPI);
- JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
- PipelineJobConfiguration jobConfig =
jobManager.getJobConfiguration(jobConfigPOJO);
- long startTimeMillis =
Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0"));
+ PipelineJobConfiguration jobConfig =
jobManager.getJobConfiguration(jobId);
+ long startTimeMillis =
Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0"));
InventoryIncrementalJobManager inventoryIncrementalJobManager = new
InventoryIncrementalJobManager(jobAPI);
Map<Integer, InventoryIncrementalJobItemProgress> jobProgress =
inventoryIncrementalJobManager.getJobProgress(jobConfig);
List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 0e8d2a2d682..b01ce97c1f2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -57,13 +57,13 @@ public final class PipelineJobManager {
/**
* Get job configuration.
*
- * @param jobConfigPOJO job configuration POJO
+ * @param jobId job ID
* @param <T> type of pipeline job configuration
* @return pipeline job configuration
*/
@SuppressWarnings("unchecked")
- public <T extends PipelineJobConfiguration> T getJobConfiguration(final
JobConfigurationPOJO jobConfigPOJO) {
- return (T)
jobAPI.getYamlJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
+ public <T extends PipelineJobConfiguration> T getJobConfiguration(final
String jobId) {
+ return (T)
jobAPI.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter());
}
/**
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
index f59ed72f6a6..1bf18932ecc 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.migration.distsql.handler.update;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
@@ -49,7 +48,7 @@ public final class CheckMigrationJobUpdater implements
RALUpdater<CheckMigration
String algorithmTypeName = null == typeStrategy ? null :
typeStrategy.getName();
Properties algorithmProps = null == typeStrategy ? null :
typeStrategy.getProps();
String jobId = sqlStatement.getJobId();
- MigrationJobConfiguration jobConfig = new
PipelineJobManager(migrationJobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+ MigrationJobConfiguration jobConfig = new
PipelineJobManager(migrationJobAPI).getJobConfiguration(jobId);
verifyInventoryFinished(jobConfig);
checkJobAPI.createJobAndStart(new
CreateConsistencyCheckJobParameter(jobId, algorithmTypeName, algorithmProps,
jobConfig.getSourceDatabaseType(), jobConfig.getTargetDatabaseType()));
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 6f711ab1992..0e41d2f8cac 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -289,9 +289,8 @@ public final class CDCJobAPI implements
InventoryIncrementalJobAPI {
@Override
public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
- JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
- PipelineJobMetaData jobMetaData = new
PipelineJobMetaData(jobConfigPOJO);
- CDCJobConfiguration jobConfig = new
PipelineJobManager(this).getJobConfiguration(jobConfigPOJO);
+ PipelineJobMetaData jobMetaData = new
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+ CDCJobConfiguration jobConfig = new
PipelineJobManager(this).getJobConfiguration(jobId);
return new TableBasedPipelineJobInfo(jobMetaData,
jobConfig.getDatabaseName(), String.join(", ",
jobConfig.getSchemaTableNames()));
}
@@ -305,9 +304,8 @@ public final class CDCJobAPI implements
InventoryIncrementalJobAPI {
* @param jobId job id
*/
public void dropStreaming(final String jobId) {
- JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
- CDCJobConfiguration jobConfig = new
PipelineJobManager(this).getJobConfiguration(jobConfigPOJO);
- ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), ()
-> new PipelineInternalException("Can't drop streaming job which is active"));
+ CDCJobConfiguration jobConfig = new
PipelineJobManager(this).getJobConfiguration(jobId);
+
ShardingSpherePreconditions.checkState(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).isDisabled(),
() -> new PipelineInternalException("Can't drop streaming job which is
active"));
new PipelineJobManager(this).drop(jobId);
cleanup(jobConfig);
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 1f1a9473cd4..05070a7a901 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -47,7 +47,6 @@ import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextMan
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
@@ -82,7 +81,7 @@ public final class CDCBackendHandler {
* @return database
*/
public String getDatabaseNameByJobId(final String jobId) {
- return
jobManager.<CDCJobConfiguration>getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)).getDatabaseName();
+ return
jobManager.<CDCJobConfiguration>getJobConfiguration(jobId).getDatabaseName();
}
/**
@@ -130,7 +129,7 @@ public final class CDCBackendHandler {
* @param connectionContext connection context
*/
public void startStreaming(final String jobId, final CDCConnectionContext
connectionContext, final Channel channel) {
- CDCJobConfiguration cdcJobConfig =
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+ CDCJobConfiguration cdcJobConfig =
jobManager.getJobConfiguration(jobId);
ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new
PipelineJobNotFoundException(jobId));
if (PipelineJobCenter.isJobExisting(jobId)) {
PipelineJobCenter.stop(jobId);
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 51f8e5edc11..63e6ff42436 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -268,7 +268,7 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
}
private void fillInJobItemInfoWithCheckAlgorithm(final
ConsistencyCheckJobItemInfo result, final String checkJobId) {
- ConsistencyCheckJobConfiguration jobConfig = new
PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
+ ConsistencyCheckJobConfiguration jobConfig = new
PipelineJobManager(this).getJobConfiguration(checkJobId);
result.setAlgorithmType(jobConfig.getAlgorithmTypeName());
if (null != jobConfig.getAlgorithmProps()) {
result.setAlgorithmProps(jobConfig.getAlgorithmProps().entrySet().stream().map(entry
-> String.format("'%s'='%s'", entry.getKey(),
entry.getValue())).collect(Collectors.joining(",")));
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 8259022cf89..136e92b3dc5 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -103,7 +103,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
jobItemManager.persistProgress(jobItemContext);
JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI)
TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType());
- PipelineJobConfiguration parentJobConfig = new
PipelineJobManager(jobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(parentJobId));
+ PipelineJobConfiguration parentJobConfig = new
PipelineJobManager(jobAPI).getJobConfiguration(parentJobId);
try {
PipelineDataConsistencyChecker checker =
jobAPI.buildPipelineDataConsistencyChecker(
parentJobConfig,
jobAPI.buildPipelineProcessContext(parentJobConfig),
jobItemContext.getProgressContext());
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index c5d551a55ce..6ffa6371715 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -67,7 +67,6 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
import
org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
import
org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser;
@@ -211,11 +210,10 @@ public final class MigrationJobAPI implements
InventoryIncrementalJobAPI {
@Override
public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
- JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
- PipelineJobMetaData jobMetaData = new
PipelineJobMetaData(jobConfigPOJO);
+ PipelineJobMetaData jobMetaData = new
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
List<String> sourceTables = new LinkedList<>();
- new
PipelineJobManager(this).<MigrationJobConfiguration>getJobConfiguration(jobConfigPOJO).getJobShardingDataNodes().forEach(each
-> each.getEntries().forEach(entry -> entry.getDataNodes()
- .forEach(dataNode ->
sourceTables.add(DataNodeUtils.formatWithSchema(dataNode)))));
+ new
PipelineJobManager(this).<MigrationJobConfiguration>getJobConfiguration(jobId).getJobShardingDataNodes()
+ .forEach(each -> each.getEntries().forEach(entry ->
entry.getDataNodes().forEach(dataNode ->
sourceTables.add(DataNodeUtils.formatWithSchema(dataNode)))));
return new TableBasedPipelineJobInfo(jobMetaData, String.join(",",
sourceTables));
}
@@ -324,7 +322,7 @@ public final class MigrationJobAPI implements
InventoryIncrementalJobAPI {
}
private void cleanTempTableOnRollback(final String jobId) throws
SQLException {
- MigrationJobConfiguration jobConfig = new
PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+ MigrationJobConfiguration jobConfig = new
PipelineJobManager(this).getJobConfiguration(jobId);
PipelineCommonSQLBuilder pipelineSQLBuilder = new
PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType());
TableAndSchemaNameMapper mapping = new
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
try (
@@ -348,7 +346,7 @@ public final class MigrationJobAPI implements
InventoryIncrementalJobAPI {
PipelineJobManager jobManager = new PipelineJobManager(this);
jobManager.stop(jobId);
dropCheckJobs(jobId);
- MigrationJobConfiguration jobConfig = new
PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+ MigrationJobConfiguration jobConfig = new
PipelineJobManager(this).getJobConfiguration(jobId);
refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
jobManager.drop(jobId);
log.info("Commit cost {} ms", System.currentTimeMillis() -
startTimeMillis);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index 7762178b079..fe9ea2db83c 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -69,7 +69,7 @@ class ConsistencyCheckJobAPITest {
String parentJobId = parentJobConfig.getJobId();
String checkJobId = jobAPI.createJobAndStart(new
CreateConsistencyCheckJobParameter(parentJobId, null, null,
parentJobConfig.getSourceDatabaseType(),
parentJobConfig.getTargetDatabaseType()));
- ConsistencyCheckJobConfiguration checkJobConfig = new
PipelineJobManager(jobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
+ ConsistencyCheckJobConfiguration checkJobConfig = new
PipelineJobManager(jobAPI).getJobConfiguration(checkJobId);
int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
String expectCheckJobId = new
ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId),
parentJobId, expectedSequence).marshal();
assertThat(checkJobConfig.getJobId(), is(expectCheckJobId));
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 3e67089680a..84a4e55572e 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -150,7 +150,7 @@ class MigrationJobAPITest {
void assertRollback() throws SQLException {
Optional<String> jobId =
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
- MigrationJobConfiguration jobConfig =
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get()));
+ MigrationJobConfiguration jobConfig =
jobManager.getJobConfiguration(jobId.get());
initTableData(jobConfig);
PipelineDistributedBarrier mockBarrier =
mock(PipelineDistributedBarrier.class);
when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
@@ -162,7 +162,7 @@ class MigrationJobAPITest {
void assertCommit() {
Optional<String> jobId =
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
- MigrationJobConfiguration jobConfig =
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get()));
+ MigrationJobConfiguration jobConfig =
jobManager.getJobConfiguration(jobId.get());
initTableData(jobConfig);
PipelineDistributedBarrier mockBarrier =
mock(PipelineDistributedBarrier.class);
when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
@@ -285,7 +285,7 @@ class MigrationJobAPITest {
initIntPrimaryEnvironment();
SourceTargetEntry sourceTargetEntry = new
SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order");
String jobId =
jobAPI.createJobAndStart(PipelineContextUtils.getContextKey(), new
MigrateTableStatement(Collections.singletonList(sourceTargetEntry),
"logic_db"));
- MigrationJobConfiguration actual =
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+ MigrationJobConfiguration actual =
jobManager.getJobConfiguration(jobId);
assertThat(actual.getTargetDatabaseName(), is("logic_db"));
List<JobDataNodeLine> dataNodeLines = actual.getJobShardingDataNodes();
assertThat(dataNodeLines.size(), is(1));