This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 499cb457f31 Add MigrationSourceTargetSegment (#36100)
499cb457f31 is described below
commit 499cb457f31e61fb168ee0d1e7765f37f592aeed
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Jul 29 13:22:26 2025 +0800
Add MigrationSourceTargetSegment (#36100)
---
.../distsql/handler/update/MigrateTableExecutor.java | 17 ++++++++++++++++-
.../parser/core/MigrationDistSQLStatementVisitor.java | 15 ++++++---------
.../MigrationSourceTargetSegment.java} | 19 +++++++++++--------
.../statement/updatable/MigrateTableStatement.java | 4 ++--
.../migration/update/MigrateTableStatementAssert.java | 9 +++++----
5 files changed, 40 insertions(+), 24 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java
b/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java
index 9f968afc826..74622b9e488 100644
---
a/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java
+++
b/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java
@@ -22,16 +22,21 @@ import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.MissingRequiredTargetDatabaseException;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.segment.MigrationSourceTargetSegment;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.pojo.SourceTargetEntry;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.updatable.MigrateTableStatement;
import
org.apache.shardingsphere.distsql.handler.aware.DistSQLExecutorDatabaseAware;
import
org.apache.shardingsphere.distsql.handler.engine.update.DistSQLUpdateExecutor;
import
org.apache.shardingsphere.distsql.handler.required.DistSQLExecutorClusterModeRequired;
+import org.apache.shardingsphere.infra.datanode.DataNode;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Objects;
/**
@@ -53,7 +58,17 @@ public final class MigrateTableExecutor implements
DistSQLUpdateExecutor<Migrate
targetDatabaseName = database.getName();
}
MigrationJobAPI jobAPI = (MigrationJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
- jobAPI.schedule(new PipelineContextKey(InstanceType.PROXY),
sqlStatement.getSourceTargetEntries(), targetDatabaseName);
+ jobAPI.schedule(new PipelineContextKey(InstanceType.PROXY),
getSourceTargetEntries(sqlStatement), targetDatabaseName);
+ }
+
+ private List<SourceTargetEntry> getSourceTargetEntries(final
MigrateTableStatement sqlStatement) {
+ List<SourceTargetEntry> result = new
ArrayList<>(sqlStatement.getSourceTargetEntries().size());
+ for (MigrationSourceTargetSegment each :
sqlStatement.getSourceTargetEntries()) {
+ DataNode dataNode = new DataNode(each.getSourceDatabaseName(),
each.getSourceTableName());
+ dataNode.setSchemaName(each.getSourceSchemaName());
+ result.add(new SourceTargetEntry(each.getTargetDatabaseName(),
dataNode, each.getTargetTableName()));
+ }
+ return result;
}
@Override
diff --git
a/kernel/data-pipeline/scenario/migration/distsql/parser/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
b/kernel/data-pipeline/scenario/migration/distsql/parser/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
index 6313649fd81..a1bc74b21a9 100644
---
a/kernel/data-pipeline/scenario/migration/distsql/parser/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
+++
b/kernel/data-pipeline/scenario/migration/distsql/parser/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/parser/core/MigrationDistSQLStatementVisitor.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.parse
import com.google.common.base.Splitter;
import org.antlr.v4.runtime.tree.ParseTree;
import
org.apache.shardingsphere.data.pipeline.distsql.statement.updatable.AlterTransmissionRuleStatement;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.pojo.SourceTargetEntry;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.segment.MigrationSourceTargetSegment;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.queryable.ShowMigrationCheckStatusStatement;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.queryable.ShowMigrationListStatement;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.queryable.ShowMigrationRuleStatement;
@@ -80,7 +80,6 @@ import
org.apache.shardingsphere.distsql.segment.TransmissionRuleSegment;
import org.apache.shardingsphere.distsql.segment.URLBasedDataSourceSegment;
import
org.apache.shardingsphere.distsql.statement.ral.queryable.show.ShowPluginsStatement;
import
org.apache.shardingsphere.infra.database.core.metadata.database.enums.QuoteCharacter;
-import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.sql.parser.api.ASTNode;
import org.apache.shardingsphere.sql.parser.api.visitor.SQLVisitor;
import
org.apache.shardingsphere.sql.parser.statement.core.value.identifier.IdentifierValue;
@@ -160,21 +159,19 @@ public final class MigrationDistSQLStatementVisitor
extends MigrationDistSQLStat
@Override
public ASTNode visitMigrateTable(final MigrateTableContext ctx) {
- SourceTargetEntry sourceTargetEntry =
buildSourceTargetEntry(ctx.sourceTableName(), ctx.targetTableName());
- return new
MigrateTableStatement(Collections.singletonList(sourceTargetEntry),
sourceTargetEntry.getTargetDatabaseName());
+ MigrationSourceTargetSegment migrationSourceTargetSegment =
buildMigrationSourceTargetSegment(ctx.sourceTableName(), ctx.targetTableName());
+ return new
MigrateTableStatement(Collections.singletonList(migrationSourceTargetSegment),
migrationSourceTargetSegment.getTargetDatabaseName());
}
- private SourceTargetEntry buildSourceTargetEntry(final
SourceTableNameContext sourceContext, final TargetTableNameContext
targetContext) {
+ private MigrationSourceTargetSegment
buildMigrationSourceTargetSegment(final SourceTableNameContext sourceContext,
final TargetTableNameContext targetContext) {
List<String> source =
Splitter.on('.').splitToList(getRequiredIdentifierValue(sourceContext));
List<String> target =
Splitter.on('.').splitToList(getRequiredIdentifierValue(targetContext));
- String sourceResourceName = source.get(0);
+ String sourceDatabaseName = source.get(0);
String sourceSchemaName = 3 == source.size() ? source.get(1) : null;
String sourceTableName = source.get(source.size() - 1);
String targetDatabaseName = target.size() > 1 ? target.get(0) : null;
String targetTableName = target.get(target.size() - 1);
- SourceTargetEntry result = new SourceTargetEntry(targetDatabaseName,
new DataNode(sourceResourceName, sourceTableName), targetTableName);
- result.getSource().setSchemaName(sourceSchemaName);
- return result;
+ return new MigrationSourceTargetSegment(sourceDatabaseName,
sourceSchemaName, sourceTableName, targetDatabaseName, targetTableName);
}
private String getRequiredIdentifierValue(final ParseTree context) {
diff --git
a/kernel/data-pipeline/scenario/migration/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/statement/updatable/MigrateTableStatement.java
b/kernel/data-pipeline/scenario/migration/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/segment/MigrationSourceTargetSegment.java
similarity index 70%
copy from
kernel/data-pipeline/scenario/migration/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/statement/updatable/MigrateTableStatement.java
copy to
kernel/data-pipeline/scenario/migration/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/segment/MigrationSourceTargetSegment.java
index 19ef08fed81..011c17bb7cb 100644
---
a/kernel/data-pipeline/scenario/migration/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/statement/updatable/MigrateTableStatement.java
+++
b/kernel/data-pipeline/scenario/migration/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/segment/MigrationSourceTargetSegment.java
@@ -15,23 +15,26 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.updatable;
+package
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.segment;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.distsql.statement.updatable.UpdatablePipelineRALStatement;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.pojo.SourceTargetEntry;
-
-import java.util.List;
+import org.apache.shardingsphere.distsql.segment.DistSQLSegment;
/**
- * Migrate table statement.
+ * Migration source target segment.
*/
@RequiredArgsConstructor
@Getter
-public final class MigrateTableStatement extends UpdatablePipelineRALStatement
{
+public final class MigrationSourceTargetSegment implements DistSQLSegment {
+
+ private final String sourceDatabaseName;
- private final List<SourceTargetEntry> sourceTargetEntries;
+ private final String sourceSchemaName;
+
+ private final String sourceTableName;
private final String targetDatabaseName;
+
+ private final String targetTableName;
}
diff --git
a/kernel/data-pipeline/scenario/migration/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/statement/updatable/MigrateTableStatement.java
b/kernel/data-pipeline/scenario/migration/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/statement/updatable/MigrateTableStatement.java
index 19ef08fed81..fb325c648ba 100644
---
a/kernel/data-pipeline/scenario/migration/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/statement/updatable/MigrateTableStatement.java
+++
b/kernel/data-pipeline/scenario/migration/distsql/statement/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/statement/updatable/MigrateTableStatement.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.state
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.distsql.statement.updatable.UpdatablePipelineRALStatement;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.pojo.SourceTargetEntry;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.segment.MigrationSourceTargetSegment;
import java.util.List;
@@ -31,7 +31,7 @@ import java.util.List;
@Getter
public final class MigrateTableStatement extends UpdatablePipelineRALStatement
{
- private final List<SourceTargetEntry> sourceTargetEntries;
+ private final List<MigrationSourceTargetSegment> sourceTargetEntries;
private final String targetDatabaseName;
}
diff --git
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/type/pipeline/migration/update/MigrateTableStatementAssert.java
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/type/pipeline/migration/update/MigrateTableStatementAssert.java
index 6d9d2a6e7f3..6835aea7cfc 100644
---
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/type/pipeline/migration/update/MigrateTableStatementAssert.java
+++
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/type/pipeline/migration/update/MigrateTableStatementAssert.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.pojo.SourceTargetEntry;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.segment.MigrationSourceTargetSegment;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.updatable.MigrateTableStatement;
import org.apache.shardingsphere.infra.datanode.DataNode;
import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
@@ -44,11 +44,12 @@ public final class MigrateTableStatementAssert {
public static void assertIs(final SQLCaseAssertContext assertContext,
final MigrateTableStatement actual, final MigrateTableStatementTestCase
expected) {
assertThat(assertContext.getText("target database name does not
match"), actual.getTargetDatabaseName(), is(expected.getTargetDatabaseName()));
assertThat(actual.getSourceTargetEntries().size(), is(1));
- SourceTargetEntry entry = actual.getSourceTargetEntries().get(0);
- DataNode dataNode = entry.getSource();
+ MigrationSourceTargetSegment segment =
actual.getSourceTargetEntries().get(0);
+ DataNode dataNode = new DataNode(segment.getSourceDatabaseName(),
segment.getSourceTableName());
+ dataNode.setSchemaName(segment.getSourceSchemaName());
assertThat(assertContext.getText("source database name does not
match"), dataNode.getDataSourceName(), is(expected.getSourceResourceName()));
assertThat(assertContext.getText("source schema name does not match"),
dataNode.getSchemaName(), is(expected.getSourceSchemaName()));
assertThat(assertContext.getText("source table name does not match"),
dataNode.getTableName(), is(expected.getSourceTableName()));
- assertThat(assertContext.getText("target table name does not match"),
entry.getTargetTableName(), is(expected.getTargetTableName()));
+ assertThat(assertContext.getText("target table name does not match"),
segment.getTargetTableName(), is(expected.getTargetTableName()));
}
}