This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c900c17c667 Decouple MigrationJobAPI and MigrateTableStatement (#36099)
c900c17c667 is described below
commit c900c17c667c9a998cbeb7349b13c2b8377b660c
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Jul 29 10:48:04 2025 +0800
Decouple MigrationJobAPI and MigrateTableStatement (#36099)
---
.../scenario/migration/api/MigrationJobAPI.java | 19 +++++++++----------
.../distsql/handler/update/MigrateTableExecutor.java | 2 +-
.../scenario/migration/api/MigrationJobAPITest.java | 7 +++----
3 files changed, 13 insertions(+), 15 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index c5eaecc5f63..5126418cd35 100644
---
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -45,7 +45,6 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.config.YamlMigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.pojo.SourceTargetEntry;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.updatable.MigrateTableStatement;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
import
org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
import
org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser;
@@ -112,25 +111,25 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
* Schedule migration job.
*
* @param contextKey context key
- * @param param create migration job parameter
+ * @param sourceTargetEntries source target entries
+ * @param targetDatabaseName target database name
* @return job id
*/
- public String schedule(final PipelineContextKey contextKey, final
MigrateTableStatement param) {
- MigrationJobConfiguration jobConfig = new
YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey,
param));
+ public String schedule(final PipelineContextKey contextKey, final
List<SourceTargetEntry> sourceTargetEntries, final String targetDatabaseName) {
+ MigrationJobConfiguration jobConfig = new
YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey,
sourceTargetEntries, targetDatabaseName));
jobManager.start(jobConfig);
return jobConfig.getJobId();
}
- private YamlMigrationJobConfiguration buildYamlJobConfiguration(final
PipelineContextKey contextKey, final MigrateTableStatement param) {
+ private YamlMigrationJobConfiguration buildYamlJobConfiguration(final
PipelineContextKey contextKey, final List<SourceTargetEntry>
sourceTargetEntries, final String targetDatabaseName) {
YamlMigrationJobConfiguration result = new
YamlMigrationJobConfiguration();
- result.setTargetDatabaseName(param.getTargetDatabaseName());
+ result.setTargetDatabaseName(targetDatabaseName);
Map<String, DataSourcePoolProperties> metaDataDataSource =
dataSourcePersistService.load(contextKey, "MIGRATION");
- List<SourceTargetEntry> sourceTargetEntries = new ArrayList<>(new
HashSet<>(param.getSourceTargetEntries())).stream().sorted(Comparator.comparing(SourceTargetEntry::getTargetTableName)
- .thenComparing(each ->
each.getSource().format())).collect(Collectors.toList());
Map<String, List<DataNode>> sourceDataNodes = new
LinkedHashMap<>(sourceTargetEntries.size(), 1F);
Map<String, YamlPipelineDataSourceConfiguration> configSources = new
LinkedHashMap<>(sourceTargetEntries.size(), 1F);
YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new
YamlDataSourceConfigurationSwapper();
- for (SourceTargetEntry each : sourceTargetEntries) {
+ for (SourceTargetEntry each : new
HashSet<>(sourceTargetEntries).stream()
+
.sorted(Comparator.comparing(SourceTargetEntry::getTargetTableName).thenComparing(each
-> each.getSource().format())).collect(Collectors.toList())) {
sourceDataNodes.computeIfAbsent(each.getTargetTableName(), key ->
new LinkedList<>()).add(each.getSource());
ShardingSpherePreconditions.checkState(1 ==
sourceDataNodes.get(each.getTargetTableName()).size(),
() -> new PipelineInvalidParameterException("more than one
source table for " + each.getTargetTableName()));
@@ -155,7 +154,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
}
}
result.setSources(configSources);
- ShardingSphereDatabase targetDatabase =
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getTargetDatabaseName());
+ ShardingSphereDatabase targetDatabase =
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(targetDatabaseName);
PipelineDataSourceConfiguration targetPipelineDataSourceConfig =
buildTargetPipelineDataSourceConfiguration(targetDatabase);
result.setTarget(buildYamlPipelineDataSourceConfiguration(targetPipelineDataSourceConfig.getType(),
targetPipelineDataSourceConfig.getParameter()));
result.setTargetDatabaseType(targetPipelineDataSourceConfig.getDatabaseType().getType());
diff --git
a/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java
b/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java
index 7f70c8eaae3..9f968afc826 100644
---
a/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java
+++
b/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java
@@ -53,7 +53,7 @@ public final class MigrateTableExecutor implements
DistSQLUpdateExecutor<Migrate
targetDatabaseName = database.getName();
}
MigrationJobAPI jobAPI = (MigrationJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
- jobAPI.schedule(new PipelineContextKey(InstanceType.PROXY), new
MigrateTableStatement(sqlStatement.getSourceTargetEntries(),
targetDatabaseName));
+ jobAPI.schedule(new PipelineContextKey(InstanceType.PROXY),
sqlStatement.getSourceTargetEntries(), targetDatabaseName);
}
@Override
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
index ebcd896549c..e2aee3506b1 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
@@ -49,7 +49,6 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobTy
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.pojo.SourceTargetEntry;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.updatable.MigrateTableStatement;
import org.apache.shardingsphere.data.pipeline.spi.PipelineDataSourceCreator;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -261,20 +260,20 @@ class MigrationJobAPITest {
void assertCreateJobConfigFailedOnMoreThanOneSourceTable() {
List<SourceTargetEntry> sourceTargetEntries = Stream.of("t_order_0",
"t_order_1")
.map(each -> new SourceTargetEntry("logic_db", new
DataNode("ds_0", each), "t_order")).collect(Collectors.toList());
- assertThrows(PipelineInvalidParameterException.class, () ->
jobAPI.schedule(PipelineContextUtils.getContextKey(), new
MigrateTableStatement(sourceTargetEntries, "logic_db")));
+ assertThrows(PipelineInvalidParameterException.class, () ->
jobAPI.schedule(PipelineContextUtils.getContextKey(), sourceTargetEntries,
"logic_db"));
}
@Test
void assertCreateJobConfigFailedOnDataSourceNotExist() {
List<SourceTargetEntry> sourceTargetEntries =
Collections.singletonList(new SourceTargetEntry("logic_db", new
DataNode("ds_not_exists", "t_order"), "t_order"));
- assertThrows(PipelineInvalidParameterException.class, () ->
jobAPI.schedule(PipelineContextUtils.getContextKey(), new
MigrateTableStatement(sourceTargetEntries, "logic_db")));
+ assertThrows(PipelineInvalidParameterException.class, () ->
jobAPI.schedule(PipelineContextUtils.getContextKey(), sourceTargetEntries,
"logic_db"));
}
@Test
void assertCreateJobConfig() throws SQLException {
initIntPrimaryEnvironment();
SourceTargetEntry sourceTargetEntry = new
SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order");
- String jobId = jobAPI.schedule(PipelineContextUtils.getContextKey(),
new MigrateTableStatement(Collections.singletonList(sourceTargetEntry),
"logic_db"));
+ String jobId = jobAPI.schedule(PipelineContextUtils.getContextKey(),
Collections.singletonList(sourceTargetEntry), "logic_db");
MigrationJobConfiguration actual =
jobConfigManager.getJobConfiguration(jobId);
assertThat(actual.getTargetDatabaseName(), is("logic_db"));
List<JobDataNodeLine> dataNodeLines = actual.getJobShardingDataNodes();