This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 45d8398a018 Move schemaName setting logic from MigrationJobAPI to 
MigrateTableExecutor (#36944)
45d8398a018 is described below

commit 45d8398a018a32167d1bc149cc6e4be35f75a9cb
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Oct 27 00:13:58 2025 +0800

    Move schemaName setting logic from MigrationJobAPI to MigrateTableExecutor 
(#36944)
    
    * Move schemaName setting logic from MigrationJobAPI to MigrateTableExecutor
    
    - Move setSchemaName logic from MigrationJobAPI line 147 to 
MigrateTableExecutor
    - Add default schema detection and setting in DataNode creation phase
    - Add necessary imports for database metadata and schema utilities
    - Remove redundant schema setting logic from MigrationJobAPI
    - Maintain exception handling with debug logging for schema detection 
failures
    - Ensure schemaName is set immediately after DataNode creation
    
    This change centralizes schema name setting logic and reduces redundant
    database metadata lookups during migration job configuration.
    
    🤖 Generated with [Claude Code](https://claude.com/claude-code)
    
    Co-Authored-By: Claude <[email protected]>
    
    * Refactor MigrateTableExecutor
    
    * Move schemaName setting logic from MigrationJobAPI to MigrateTableExecutor
    
    * Move schemaName setting logic from MigrationJobAPI to MigrateTableExecutor
    
    * Move schemaName setting logic from MigrationJobAPI to MigrateTableExecutor
    
    ---------
    
    Co-authored-by: Claude <[email protected]>
---
 .../core/metadata/loader/PipelineSchemaUtils.java  | 10 +++-----
 .../scenario/migration/api/MigrationJobAPI.java    |  7 ------
 .../handler/update/MigrateTableExecutor.java       | 28 +++++++++++++++++++---
 3 files changed, 28 insertions(+), 17 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java
index b136fdd94d5..a34eb05c8fd 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java
@@ -20,7 +20,6 @@ package 
org.apache.shardingsphere.data.pipeline.core.metadata.loader;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 
@@ -31,22 +30,19 @@ import java.sql.SQLException;
  * Pipeline schema utility class.
  */
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
-@Slf4j
 public final class PipelineSchemaUtils {
     
     /**
-     * Get default schema by connection.getSchema().
+     * Get default schema.
      *
      * @param dataSourceConfig pipeline data source configuration
-     * @return schema
+     * @return default schema
      */
     @SneakyThrows(SQLException.class)
     public static String getDefaultSchema(final 
PipelineDataSourceConfiguration dataSourceConfig) {
         try (PipelineDataSource dataSource = new 
PipelineDataSource(dataSourceConfig)) {
             try (Connection connection = dataSource.getConnection()) {
-                String result = connection.getSchema();
-                log.info("get default schema {}", result);
-                return result;
+                return connection.getSchema();
             }
         }
     }
diff --git 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index 56fa46b90e5..7745861d7a7 100644
--- 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -37,7 +37,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfi
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSchemaUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
@@ -46,11 +45,9 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.co
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
 import 
org.apache.shardingsphere.database.connector.core.jdbcurl.parser.ConnectionProperties;
 import 
org.apache.shardingsphere.database.connector.core.jdbcurl.parser.ConnectionPropertiesParser;
-import 
org.apache.shardingsphere.database.connector.core.metadata.database.metadata.DialectDatabaseMetaData;
 import 
org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
 import 
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeFactory;
-import 
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
 import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
 import org.apache.shardingsphere.infra.datanode.DataNode;
 import 
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
@@ -142,10 +139,6 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
             Map<String, Object> sourceDataSourcePoolProps = 
dataSourceConfigSwapper.swapToMap(metaDataDataSource.get(dataSourceName));
             StandardPipelineDataSourceConfiguration sourceDataSourceConfig = 
new StandardPipelineDataSourceConfiguration(sourceDataSourcePoolProps);
             configSources.put(dataSourceName, 
buildYamlPipelineDataSourceConfiguration(sourceDataSourceConfig.getType(), 
sourceDataSourceConfig.getParameter()));
-            DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(sourceDataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData();
-            if (null == each.getSource().getSchemaName() && 
dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable()) {
-                
each.getSource().setSchemaName(PipelineSchemaUtils.getDefaultSchema(sourceDataSourceConfig));
-            }
             DatabaseType sourceDatabaseType = 
sourceDataSourceConfig.getDatabaseType();
             if (null == result.getSourceDatabaseType()) {
                 result.setSourceDatabaseType(sourceDatabaseType.getType());
diff --git 
a/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java
 
b/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java
index aa26404ad9f..5420c2affa7 100644
--- 
a/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java
+++ 
b/kernel/data-pipeline/scenario/migration/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/distsql/handler/update/MigrateTableExecutor.java
@@ -18,25 +18,33 @@
 package 
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.handler.update;
 
 import lombok.Setter;
+import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.MissingRequiredTargetDatabaseException;
 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSchemaUtils;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationSourceTargetEntry;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.segment.MigrationSourceTargetSegment;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.distsql.statement.updatable.MigrateTableStatement;
+import 
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
 import 
org.apache.shardingsphere.distsql.handler.aware.DistSQLExecutorDatabaseAware;
 import 
org.apache.shardingsphere.distsql.handler.engine.update.DistSQLUpdateExecutor;
 import 
org.apache.shardingsphere.distsql.handler.required.DistSQLExecutorClusterModeRequired;
 import org.apache.shardingsphere.infra.datanode.DataNode;
+import 
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
 import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 
 import java.util.Collection;
 import java.util.LinkedList;
+import java.util.Map;
+import java.util.Optional;
 
 /**
  * Migrate table executor.
@@ -53,19 +61,33 @@ public final class MigrateTableExecutor implements 
DistSQLUpdateExecutor<Migrate
         
ShardingSpherePreconditions.checkState(contextManager.getMetaDataContexts().getMetaData().containsDatabase(targetDatabaseName),
                 () -> new 
MissingRequiredTargetDatabaseException(targetDatabaseName));
         MigrationJobAPI jobAPI = (MigrationJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
-        jobAPI.schedule(new PipelineContextKey(InstanceType.PROXY), 
getMigrationSourceTargetEntries(sqlStatement), targetDatabaseName);
+        PipelineContextKey contextKey = new 
PipelineContextKey(InstanceType.PROXY);
+        jobAPI.schedule(contextKey, 
getMigrationSourceTargetEntries(contextKey, sqlStatement), targetDatabaseName);
     }
     
-    private Collection<MigrationSourceTargetEntry> 
getMigrationSourceTargetEntries(final MigrateTableStatement sqlStatement) {
+    private Collection<MigrationSourceTargetEntry> 
getMigrationSourceTargetEntries(final PipelineContextKey contextKey, final 
MigrateTableStatement sqlStatement) {
         Collection<MigrationSourceTargetEntry> result = new LinkedList<>();
         for (MigrationSourceTargetSegment each : 
sqlStatement.getSourceTargetEntries()) {
             DataNode dataNode = new DataNode(each.getSourceDatabaseName(), 
each.getSourceTableName());
-            dataNode.setSchemaName(each.getSourceSchemaName());
+            if (null == each.getSourceSchemaName()) {
+                getDefaultSchemaName(contextKey, 
each.getSourceDatabaseName()).ifPresent(dataNode::setSchemaName);
+            } else {
+                dataNode.setSchemaName(each.getSourceSchemaName());
+            }
             result.add(new MigrationSourceTargetEntry(dataNode, 
each.getTargetTableName()));
         }
         return result;
     }
     
+    private Optional<String> getDefaultSchemaName(final PipelineContextKey 
contextKey, final String sourceDatabaseName) {
+        if (new 
DatabaseTypeRegistry(database.getProtocolType()).getDialectDatabaseMetaData().getSchemaOption().isSchemaAvailable())
 {
+            Map<String, DataSourcePoolProperties> metaDataDataSource = new 
PipelineDataSourcePersistService().load(contextKey, "MIGRATION");
+            Map<String, Object> sourceDataSourcePoolProps = new 
YamlDataSourceConfigurationSwapper().swapToMap(metaDataDataSource.get(sourceDatabaseName));
+            return Optional.of(PipelineSchemaUtils.getDefaultSchema(new 
StandardPipelineDataSourceConfiguration(sourceDataSourcePoolProps)));
+        }
+        return Optional.empty();
+    }
+    
     @Override
     public Class<MigrateTableStatement> getType() {
         return MigrateTableStatement.class;

Reply via email to