This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3022ab5cf1d Rename MigrateSourceTargetEntry (#36104)
3022ab5cf1d is described below
commit 3022ab5cf1d0ba6389ce772233faa593b77647f7
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Jul 29 15:12:19 2025 +0800
Rename MigrateSourceTargetEntry (#36104)
* Rename MigrateSourceTargetEntry
* Rename MigrateSourceTargetEntry
---
.../data/pipeline/scenario/migration/api/MigrationJobAPI.java | 11 ++++++-----
.../distsql/handler/update/MigrateTableExecutor.java | 8 ++++----
.../{SourceTargetEntry.java => MigrateSourceTargetEntry.java} | 8 ++++----
.../pipeline/scenario/migration/api/MigrationJobAPITest.java | 10 +++++-----
4 files changed, 19 insertions(+), 18 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index 9e7d586864b..992b42e8f7f 100644
---
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -44,7 +44,7 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobTy
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.config.YamlMigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.pojo.SourceTargetEntry;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.pojo.MigrateSourceTargetEntry;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
import
org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
import
org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser;
@@ -115,21 +115,22 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
* @param targetDatabaseName target database name
* @return job id
*/
- public String schedule(final PipelineContextKey contextKey, final
Collection<SourceTargetEntry> sourceTargetEntries, final String
targetDatabaseName) {
+ public String schedule(final PipelineContextKey contextKey, final
Collection<MigrateSourceTargetEntry> sourceTargetEntries, final String
targetDatabaseName) {
MigrationJobConfiguration jobConfig = new
YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey,
sourceTargetEntries, targetDatabaseName));
jobManager.start(jobConfig);
return jobConfig.getJobId();
}
- private YamlMigrationJobConfiguration buildYamlJobConfiguration(final
PipelineContextKey contextKey, final Collection<SourceTargetEntry>
sourceTargetEntries, final String targetDatabaseName) {
+ private YamlMigrationJobConfiguration buildYamlJobConfiguration(final
PipelineContextKey contextKey,
+ final
Collection<MigrateSourceTargetEntry> sourceTargetEntries, final String
targetDatabaseName) {
YamlMigrationJobConfiguration result = new
YamlMigrationJobConfiguration();
result.setTargetDatabaseName(targetDatabaseName);
Map<String, DataSourcePoolProperties> metaDataDataSource =
dataSourcePersistService.load(contextKey, "MIGRATION");
Map<String, List<DataNode>> sourceDataNodes = new
LinkedHashMap<>(sourceTargetEntries.size(), 1F);
Map<String, YamlPipelineDataSourceConfiguration> configSources = new
LinkedHashMap<>(sourceTargetEntries.size(), 1F);
YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new
YamlDataSourceConfigurationSwapper();
- for (SourceTargetEntry each : new
HashSet<>(sourceTargetEntries).stream()
-
.sorted(Comparator.comparing(SourceTargetEntry::getTargetTableName).thenComparing(each
-> each.getSource().format())).collect(Collectors.toList())) {
+ for (MigrateSourceTargetEntry each : new
HashSet<>(sourceTargetEntries).stream()
+
.sorted(Comparator.comparing(MigrateSourceTargetEntry::getTargetTableName).thenComparing(each
-> each.getSource().format())).collect(Collectors.toList())) {
sourceDataNodes.computeIfAbsent(each.getTargetTableName(), key ->
new LinkedList<>()).add(each.getSource());
ShardingSpherePreconditions.checkState(1 ==
sourceDataNodes.get(each.getTargetTableName()).size(),
() -> new PipelineInvalidParameterException("more than one
source table for " + each.getTargetTableName()));
diff --git
a/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java
b/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java
index 4c3aa415a80..ccc84101e56 100644
---
a/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java
+++
b/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java
@@ -23,7 +23,7 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.job.MissingRequire
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.segment.MigrationSourceTargetSegment;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.pojo.SourceTargetEntry;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.pojo.MigrateSourceTargetEntry;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.updatable.MigrateTableStatement;
import
org.apache.shardingsphere.distsql.handler.aware.DistSQLExecutorDatabaseAware;
import
org.apache.shardingsphere.distsql.handler.engine.update.DistSQLUpdateExecutor;
@@ -56,12 +56,12 @@ public final class MigrateTableExecutor implements
DistSQLUpdateExecutor<Migrate
jobAPI.schedule(new PipelineContextKey(InstanceType.PROXY),
getSourceTargetEntries(sqlStatement), targetDatabaseName);
}
- private Collection<SourceTargetEntry> getSourceTargetEntries(final
MigrateTableStatement sqlStatement) {
- Collection<SourceTargetEntry> result = new LinkedList<>();
+ private Collection<MigrateSourceTargetEntry> getSourceTargetEntries(final
MigrateTableStatement sqlStatement) {
+ Collection<MigrateSourceTargetEntry> result = new LinkedList<>();
for (MigrationSourceTargetSegment each :
sqlStatement.getSourceTargetEntries()) {
DataNode dataNode = new DataNode(each.getSourceDatabaseName(),
each.getSourceTableName());
dataNode.setSchemaName(each.getSourceSchemaName());
- result.add(new SourceTargetEntry(each.getTargetDatabaseName(),
dataNode, each.getTargetTableName()));
+ result.add(new MigrateSourceTargetEntry(dataNode,
each.getTargetDatabaseName(), each.getTargetTableName()));
}
return result;
}
diff --git
a/kernel/data-pipeline/scenario/migration/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/statement/pojo/SourceTargetEntry.java
b/kernel/data-pipeline/scenario/migration/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/statement/pojo/MigrateSourceTargetEntry.java
similarity index 94%
rename from
kernel/data-pipeline/scenario/migration/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/statement/pojo/SourceTargetEntry.java
rename to
kernel/data-pipeline/scenario/migration/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/statement/pojo/MigrateSourceTargetEntry.java
index a0e16ccaaf7..a8511f2c069 100644
---
a/kernel/data-pipeline/scenario/migration/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/statement/pojo/SourceTargetEntry.java
+++
b/kernel/data-pipeline/scenario/migration/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/statement/pojo/MigrateSourceTargetEntry.java
@@ -23,16 +23,16 @@ import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.datanode.DataNode;
/**
- * Source target entry.
+ * Migrate source target entry.
*/
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode(of = {"source", "targetTableName"})
-public final class SourceTargetEntry {
-
- private final String targetDatabaseName;
+public final class MigrateSourceTargetEntry {
private final DataNode source;
+ private final String targetDatabaseName;
+
private final String targetTableName;
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
index 2a6d87d9107..9cbe3cd6e7f 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
@@ -48,7 +48,7 @@ import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarr
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.pojo.SourceTargetEntry;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.pojo.MigrateSourceTargetEntry;
import org.apache.shardingsphere.data.pipeline.spi.PipelineDataSourceCreator;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -258,21 +258,21 @@ class MigrationJobAPITest {
@Test
void assertCreateJobConfigFailedOnMoreThanOneSourceTable() {
- Collection<SourceTargetEntry> sourceTargetEntries =
Stream.of("t_order_0", "t_order_1")
- .map(each -> new SourceTargetEntry("logic_db", new
DataNode("ds_0", each), "t_order")).collect(Collectors.toList());
+ Collection<MigrateSourceTargetEntry> sourceTargetEntries =
Stream.of("t_order_0", "t_order_1")
+ .map(each -> new MigrateSourceTargetEntry(new DataNode("ds_0",
each), "logic_db", "t_order")).collect(Collectors.toList());
assertThrows(PipelineInvalidParameterException.class, () ->
jobAPI.schedule(PipelineContextUtils.getContextKey(), sourceTargetEntries,
"logic_db"));
}
@Test
void assertCreateJobConfigFailedOnDataSourceNotExist() {
- Collection<SourceTargetEntry> sourceTargetEntries =
Collections.singleton(new SourceTargetEntry("logic_db", new
DataNode("ds_not_exists", "t_order"), "t_order"));
+ Collection<MigrateSourceTargetEntry> sourceTargetEntries =
Collections.singleton(new MigrateSourceTargetEntry(new
DataNode("ds_not_exists", "t_order"), "logic_db", "t_order"));
assertThrows(PipelineInvalidParameterException.class, () ->
jobAPI.schedule(PipelineContextUtils.getContextKey(), sourceTargetEntries,
"logic_db"));
}
@Test
void assertCreateJobConfig() throws SQLException {
initIntPrimaryEnvironment();
- SourceTargetEntry sourceTargetEntry = new
SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order");
+ MigrateSourceTargetEntry sourceTargetEntry = new
MigrateSourceTargetEntry(new DataNode("ds_0", "t_order"), "logic_db",
"t_order");
String jobId = jobAPI.schedule(PipelineContextUtils.getContextKey(),
Collections.singleton(sourceTargetEntry), "logic_db");
MigrationJobConfiguration actual =
jobConfigManager.getJobConfiguration(jobId);
assertThat(actual.getTargetDatabaseName(), is("logic_db"));