This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c900c17c667 Decouple MigrationJobAPI and MigrateTableStatement (#36099)
c900c17c667 is described below

commit c900c17c667c9a998cbeb7349b13c2b8377b660c
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Jul 29 10:48:04 2025 +0800

    Decouple MigrationJobAPI and MigrateTableStatement (#36099)
---
 .../scenario/migration/api/MigrationJobAPI.java       | 19 +++++++++----------
 .../distsql/handler/update/MigrateTableExecutor.java  |  2 +-
 .../scenario/migration/api/MigrationJobAPITest.java   |  7 +++----
 3 files changed, 13 insertions(+), 15 deletions(-)

diff --git 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index c5eaecc5f63..5126418cd35 100644
--- 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -45,7 +45,6 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.config.YamlMigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.pojo.SourceTargetEntry;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.updatable.MigrateTableStatement;
 import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
 import 
org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
 import 
org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser;
@@ -112,25 +111,25 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
      * Schedule migration job.
      *
      * @param contextKey context key
-     * @param param create migration job parameter
+     * @param sourceTargetEntries source target entries
+     * @param targetDatabaseName target database name
      * @return job id
      */
-    public String schedule(final PipelineContextKey contextKey, final 
MigrateTableStatement param) {
-        MigrationJobConfiguration jobConfig = new 
YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey,
 param));
+    public String schedule(final PipelineContextKey contextKey, final 
List<SourceTargetEntry> sourceTargetEntries, final String targetDatabaseName) {
+        MigrationJobConfiguration jobConfig = new 
YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey,
 sourceTargetEntries, targetDatabaseName));
         jobManager.start(jobConfig);
         return jobConfig.getJobId();
     }
     
-    private YamlMigrationJobConfiguration buildYamlJobConfiguration(final 
PipelineContextKey contextKey, final MigrateTableStatement param) {
+    private YamlMigrationJobConfiguration buildYamlJobConfiguration(final 
PipelineContextKey contextKey, final List<SourceTargetEntry> 
sourceTargetEntries, final String targetDatabaseName) {
         YamlMigrationJobConfiguration result = new 
YamlMigrationJobConfiguration();
-        result.setTargetDatabaseName(param.getTargetDatabaseName());
+        result.setTargetDatabaseName(targetDatabaseName);
         Map<String, DataSourcePoolProperties> metaDataDataSource = 
dataSourcePersistService.load(contextKey, "MIGRATION");
-        List<SourceTargetEntry> sourceTargetEntries = new ArrayList<>(new 
HashSet<>(param.getSourceTargetEntries())).stream().sorted(Comparator.comparing(SourceTargetEntry::getTargetTableName)
-                .thenComparing(each -> 
each.getSource().format())).collect(Collectors.toList());
         Map<String, List<DataNode>> sourceDataNodes = new 
LinkedHashMap<>(sourceTargetEntries.size(), 1F);
         Map<String, YamlPipelineDataSourceConfiguration> configSources = new 
LinkedHashMap<>(sourceTargetEntries.size(), 1F);
         YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new 
YamlDataSourceConfigurationSwapper();
-        for (SourceTargetEntry each : sourceTargetEntries) {
+        for (SourceTargetEntry each : new 
HashSet<>(sourceTargetEntries).stream()
+                
.sorted(Comparator.comparing(SourceTargetEntry::getTargetTableName).thenComparing(each
 -> each.getSource().format())).collect(Collectors.toList())) {
             sourceDataNodes.computeIfAbsent(each.getTargetTableName(), key -> 
new LinkedList<>()).add(each.getSource());
             ShardingSpherePreconditions.checkState(1 == 
sourceDataNodes.get(each.getTargetTableName()).size(),
                     () -> new PipelineInvalidParameterException("more than one 
source table for " + each.getTargetTableName()));
@@ -155,7 +154,7 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
             }
         }
         result.setSources(configSources);
-        ShardingSphereDatabase targetDatabase = 
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getTargetDatabaseName());
+        ShardingSphereDatabase targetDatabase = 
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(targetDatabaseName);
         PipelineDataSourceConfiguration targetPipelineDataSourceConfig = 
buildTargetPipelineDataSourceConfiguration(targetDatabase);
         
result.setTarget(buildYamlPipelineDataSourceConfiguration(targetPipelineDataSourceConfig.getType(),
 targetPipelineDataSourceConfig.getParameter()));
         
result.setTargetDatabaseType(targetPipelineDataSourceConfig.getDatabaseType().getType());
diff --git 
a/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java
 
b/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java
index 7f70c8eaae3..9f968afc826 100644
--- 
a/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java
+++ 
b/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java
@@ -53,7 +53,7 @@ public final class MigrateTableExecutor implements 
DistSQLUpdateExecutor<Migrate
             targetDatabaseName = database.getName();
         }
         MigrationJobAPI jobAPI = (MigrationJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
-        jobAPI.schedule(new PipelineContextKey(InstanceType.PROXY), new 
MigrateTableStatement(sqlStatement.getSourceTargetEntries(), 
targetDatabaseName));
+        jobAPI.schedule(new PipelineContextKey(InstanceType.PROXY), 
sqlStatement.getSourceTargetEntries(), targetDatabaseName);
     }
     
     @Override
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
index ebcd896549c..e2aee3506b1 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
@@ -49,7 +49,6 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobTy
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.pojo.SourceTargetEntry;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.updatable.MigrateTableStatement;
 import org.apache.shardingsphere.data.pipeline.spi.PipelineDataSourceCreator;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -261,20 +260,20 @@ class MigrationJobAPITest {
     void assertCreateJobConfigFailedOnMoreThanOneSourceTable() {
         List<SourceTargetEntry> sourceTargetEntries = Stream.of("t_order_0", 
"t_order_1")
                 .map(each -> new SourceTargetEntry("logic_db", new 
DataNode("ds_0", each), "t_order")).collect(Collectors.toList());
-        assertThrows(PipelineInvalidParameterException.class, () -> 
jobAPI.schedule(PipelineContextUtils.getContextKey(), new 
MigrateTableStatement(sourceTargetEntries, "logic_db")));
+        assertThrows(PipelineInvalidParameterException.class, () -> 
jobAPI.schedule(PipelineContextUtils.getContextKey(), sourceTargetEntries, 
"logic_db"));
     }
     
     @Test
     void assertCreateJobConfigFailedOnDataSourceNotExist() {
         List<SourceTargetEntry> sourceTargetEntries = 
Collections.singletonList(new SourceTargetEntry("logic_db", new 
DataNode("ds_not_exists", "t_order"), "t_order"));
-        assertThrows(PipelineInvalidParameterException.class, () -> 
jobAPI.schedule(PipelineContextUtils.getContextKey(), new 
MigrateTableStatement(sourceTargetEntries, "logic_db")));
+        assertThrows(PipelineInvalidParameterException.class, () -> 
jobAPI.schedule(PipelineContextUtils.getContextKey(), sourceTargetEntries, 
"logic_db"));
     }
     
     @Test
     void assertCreateJobConfig() throws SQLException {
         initIntPrimaryEnvironment();
         SourceTargetEntry sourceTargetEntry = new 
SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order");
-        String jobId = jobAPI.schedule(PipelineContextUtils.getContextKey(), 
new MigrateTableStatement(Collections.singletonList(sourceTargetEntry), 
"logic_db"));
+        String jobId = jobAPI.schedule(PipelineContextUtils.getContextKey(), 
Collections.singletonList(sourceTargetEntry), "logic_db");
         MigrationJobConfiguration actual = 
jobConfigManager.getJobConfiguration(jobId);
         assertThat(actual.getTargetDatabaseName(), is("logic_db"));
         List<JobDataNodeLine> dataNodeLines = actual.getJobShardingDataNodes();

Reply via email to