This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 45d8398a018 Move schemaName setting logic from MigrationJobAPI to
MigrateTableExecutor (#36944)
45d8398a018 is described below
commit 45d8398a018a32167d1bc149cc6e4be35f75a9cb
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Oct 27 00:13:58 2025 +0800
Move schemaName setting logic from MigrationJobAPI to MigrateTableExecutor
(#36944)
* Move schemaName setting logic from MigrationJobAPI to MigrateTableExecutor
- Move setSchemaName logic from MigrationJobAPI line 147 to
MigrateTableExecutor
- Add default schema detection and setting in DataNode creation phase
- Add necessary imports for database metadata and schema utilities
- Remove redundant schema setting logic from MigrationJobAPI
- Maintain exception handling with debug logging for schema detection
failures
- Ensure schemaName is set immediately after DataNode creation
This change centralizes schema name setting logic and reduces redundant
database metadata lookups during migration job configuration.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <[email protected]>
* Refactor MigrateTableExecutor
* Move schemaName setting logic from MigrationJobAPI to MigrateTableExecutor
* Move schemaName setting logic from MigrationJobAPI to MigrateTableExecutor
* Move schemaName setting logic from MigrationJobAPI to MigrateTableExecutor
---------
Co-authored-by: Claude <[email protected]>
---
.../core/metadata/loader/PipelineSchemaUtils.java | 10 +++-----
.../scenario/migration/api/MigrationJobAPI.java | 7 ------
.../handler/update/MigrateTableExecutor.java | 28 +++++++++++++++++++---
3 files changed, 28 insertions(+), 17 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java
index b136fdd94d5..a34eb05c8fd 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java
@@ -20,7 +20,6 @@ package
org.apache.shardingsphere.data.pipeline.core.metadata.loader;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
@@ -31,22 +30,19 @@ import java.sql.SQLException;
* Pipeline schema utility class.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
-@Slf4j
public final class PipelineSchemaUtils {
/**
- * Get default schema by connection.getSchema().
+ * Get default schema.
*
* @param dataSourceConfig pipeline data source configuration
- * @return schema
+ * @return default schema
*/
@SneakyThrows(SQLException.class)
public static String getDefaultSchema(final
PipelineDataSourceConfiguration dataSourceConfig) {
try (PipelineDataSource dataSource = new
PipelineDataSource(dataSourceConfig)) {
try (Connection connection = dataSource.getConnection()) {
- String result = connection.getSchema();
- log.info("get default schema {}", result);
- return result;
+ return connection.getSchema();
}
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index 56fa46b90e5..7745861d7a7 100644
---
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -37,7 +37,6 @@ import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfi
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSchemaUtils;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
@@ -46,11 +45,9 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.co
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
import
org.apache.shardingsphere.database.connector.core.jdbcurl.parser.ConnectionProperties;
import
org.apache.shardingsphere.database.connector.core.jdbcurl.parser.ConnectionPropertiesParser;
-import
org.apache.shardingsphere.database.connector.core.metadata.database.metadata.DialectDatabaseMetaData;
import
org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
import
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeFactory;
-import
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
import org.apache.shardingsphere.infra.datanode.DataNode;
import
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
@@ -142,10 +139,6 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
Map<String, Object> sourceDataSourcePoolProps =
dataSourceConfigSwapper.swapToMap(metaDataDataSource.get(dataSourceName));
StandardPipelineDataSourceConfiguration sourceDataSourceConfig =
new StandardPipelineDataSourceConfiguration(sourceDataSourcePoolProps);
configSources.put(dataSourceName,
buildYamlPipelineDataSourceConfiguration(sourceDataSourceConfig.getType(),
sourceDataSourceConfig.getParameter()));
- DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(sourceDataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData();
- if (null == each.getSource().getSchemaName() &&
dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable()) {
-
each.getSource().setSchemaName(PipelineSchemaUtils.getDefaultSchema(sourceDataSourceConfig));
- }
DatabaseType sourceDatabaseType =
sourceDataSourceConfig.getDatabaseType();
if (null == result.getSourceDatabaseType()) {
result.setSourceDatabaseType(sourceDatabaseType.getType());
diff --git
a/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java
b/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java
index aa26404ad9f..5420c2affa7 100644
---
a/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java
+++
b/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java
@@ -18,25 +18,33 @@
package
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.handler.update;
import lombok.Setter;
+import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.MissingRequiredTargetDatabaseException;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSchemaUtils;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationSourceTargetEntry;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.segment.MigrationSourceTargetSegment;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.updatable.MigrateTableStatement;
+import
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
import
org.apache.shardingsphere.distsql.handler.aware.DistSQLExecutorDatabaseAware;
import
org.apache.shardingsphere.distsql.handler.engine.update.DistSQLUpdateExecutor;
import
org.apache.shardingsphere.distsql.handler.required.DistSQLExecutorClusterModeRequired;
import org.apache.shardingsphere.infra.datanode.DataNode;
+import
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
import org.apache.shardingsphere.mode.manager.ContextManager;
import java.util.Collection;
import java.util.LinkedList;
+import java.util.Map;
+import java.util.Optional;
/**
* Migrate table executor.
@@ -53,19 +61,33 @@ public final class MigrateTableExecutor implements
DistSQLUpdateExecutor<Migrate
ShardingSpherePreconditions.checkState(contextManager.getMetaDataContexts().getMetaData().containsDatabase(targetDatabaseName),
() -> new
MissingRequiredTargetDatabaseException(targetDatabaseName));
MigrationJobAPI jobAPI = (MigrationJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
- jobAPI.schedule(new PipelineContextKey(InstanceType.PROXY),
getMigrationSourceTargetEntries(sqlStatement), targetDatabaseName);
+ PipelineContextKey contextKey = new
PipelineContextKey(InstanceType.PROXY);
+ jobAPI.schedule(contextKey,
getMigrationSourceTargetEntries(contextKey, sqlStatement), targetDatabaseName);
}
- private Collection<MigrationSourceTargetEntry>
getMigrationSourceTargetEntries(final MigrateTableStatement sqlStatement) {
+ private Collection<MigrationSourceTargetEntry>
getMigrationSourceTargetEntries(final PipelineContextKey contextKey, final
MigrateTableStatement sqlStatement) {
Collection<MigrationSourceTargetEntry> result = new LinkedList<>();
for (MigrationSourceTargetSegment each :
sqlStatement.getSourceTargetEntries()) {
DataNode dataNode = new DataNode(each.getSourceDatabaseName(),
each.getSourceTableName());
- dataNode.setSchemaName(each.getSourceSchemaName());
+ if (null == each.getSourceSchemaName()) {
+ getDefaultSchemaName(contextKey,
each.getSourceDatabaseName()).ifPresent(dataNode::setSchemaName);
+ } else {
+ dataNode.setSchemaName(each.getSourceSchemaName());
+ }
result.add(new MigrationSourceTargetEntry(dataNode,
each.getTargetTableName()));
}
return result;
}
+ private Optional<String> getDefaultSchemaName(final PipelineContextKey
contextKey, final String sourceDatabaseName) {
+ if (new
DatabaseTypeRegistry(database.getProtocolType()).getDialectDatabaseMetaData().getSchemaOption().isSchemaAvailable())
{
+ Map<String, DataSourcePoolProperties> metaDataDataSource = new
PipelineDataSourcePersistService().load(contextKey, "MIGRATION");
+ Map<String, Object> sourceDataSourcePoolProps = new
YamlDataSourceConfigurationSwapper().swapToMap(metaDataDataSource.get(sourceDatabaseName));
+ return Optional.of(PipelineSchemaUtils.getDefaultSchema(new
StandardPipelineDataSourceConfiguration(sourceDataSourcePoolProps)));
+ }
+ return Optional.empty();
+ }
+
@Override
public Class<MigrateTableStatement> getType() {
return MigrateTableStatement.class;