This is an automated email from the ASF dual-hosted git repository.

chengzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 94962c312fb Remove PipelineJobPreparer.prepareTargetSchema() (#29441)
94962c312fb is described below

commit 94962c312fb7b286bf55214c5a84ef69941db287
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Dec 18 19:48:34 2023 +0800

    Remove PipelineJobPreparer.prepareTargetSchema() (#29441)
    
    * Refactor PipelineJobPreparer
    
    * Remove PipelineJobPreparer.prepareTargetSchema()
---
 .../pipeline/core/preparer/PipelineJobPreparer.java     | 17 +----------------
 .../migration/preparer/MigrationJobPreparer.java        | 11 +++++++----
 2 files changed, 8 insertions(+), 20 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
index 51d3a5e17d1..dc2475fdaec 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
@@ -31,7 +31,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPositi
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.IncrementalDumperCreator;
 import 
org.apache.shardingsphere.data.pipeline.core.spi.ingest.position.PositionInitializer;
@@ -67,18 +66,6 @@ public final class PipelineJobPreparer {
         return 
DatabaseTypedSPILoader.findService(IncrementalDumperCreator.class, 
databaseType).map(IncrementalDumperCreator::isSupportIncrementalDump).orElse(false);
     }
     
-    /**
-     * Prepare target schema.
-     *
-     * @param param prepare target schemas parameter
-     * @throws SQLException if prepare target schema fail
-     */
-    public void prepareTargetSchema(final PrepareTargetSchemasParameter param) 
throws SQLException {
-        DialectPipelineJobDataSourcePrepareOption option = 
DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class,
 databaseType)
-                .orElseGet(() -> 
DatabaseTypedSPILoader.getService(DialectPipelineJobDataSourcePrepareOption.class,
 null));
-        new PipelineJobDataSourcePreparer(option).prepareTargetSchemas(param);
-    }
-    
     /**
      * Get SQL parser engine.
      *
@@ -96,10 +83,8 @@ public final class PipelineJobPreparer {
      * @throws SQLException SQL exception
      */
     public void prepareTargetTables(final PrepareTargetTablesParameter 
prepareTargetTablesParam) throws SQLException {
-        DialectPipelineJobDataSourcePrepareOption option = 
DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class,
 databaseType)
-                .orElseGet(() -> 
DatabaseTypedSPILoader.getService(DialectPipelineJobDataSourcePrepareOption.class,
 null));
         long startTimeMillis = System.currentTimeMillis();
-        new 
PipelineJobDataSourcePreparer(option).prepareTargetTables(prepareTargetTablesParam);
+        new 
PipelineJobDataSourcePreparer(DatabaseTypedSPILoader.getService(DialectPipelineJobDataSourcePrepareOption.class,
 databaseType)).prepareTargetTables(prepareTargetTablesParam);
         log.info("prepareTargetTables cost {} ms", System.currentTimeMillis() 
- startTimeMillis);
     }
     
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index cf01823a977..299f9d35b68 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -20,6 +20,8 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
@@ -59,6 +61,7 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnsupportedSQLOperationException;
 import org.apache.shardingsphere.infra.lock.GlobalLockNames;
@@ -166,15 +169,15 @@ public final class MigrationJobPreparer {
     private void prepareTarget(final MigrationJobItemContext jobItemContext) 
throws SQLException {
         MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
         Collection<CreateTableConfiguration> createTableConfigs = 
jobItemContext.getTaskConfig().getCreateTableConfigurations();
+        DatabaseType targetDatabaseType = 
jobItemContext.getJobConfig().getTargetDatabaseType();
         PipelineDataSourceManager dataSourceManager = 
jobItemContext.getDataSourceManager();
-        PrepareTargetSchemasParameter prepareTargetSchemasParam = new 
PrepareTargetSchemasParameter(jobItemContext.getJobConfig().getTargetDatabaseType(),
 createTableConfigs, dataSourceManager);
-        PipelineJobPreparer targetDataSourcePreparer = new 
PipelineJobPreparer(jobItemContext.getJobConfig().getTargetDatabaseType());
-        
targetDataSourcePreparer.prepareTargetSchema(prepareTargetSchemasParam);
+        PrepareTargetSchemasParameter prepareTargetSchemasParam = new 
PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs, 
dataSourceManager);
+        new 
PipelineJobDataSourcePreparer(DatabaseTypedSPILoader.getService(DialectPipelineJobDataSourcePrepareOption.class,
 targetDatabaseType)).prepareTargetSchemas(prepareTargetSchemasParam);
         ShardingSphereMetaData metaData = 
PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())).getContextManager().getMetaDataContexts().getMetaData();
         SQLParserEngine sqlParserEngine = new PipelineJobPreparer(
                 
metaData.getDatabase(jobConfig.getTargetDatabaseName()).getProtocolType().getTrunkDatabaseType().orElse(metaData.getDatabase(jobConfig.getTargetDatabaseName()).getProtocolType()))
                         .getSQLParserEngine(metaData);
-        targetDataSourcePreparer.prepareTargetTables(new 
PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, 
sqlParserEngine));
+        new PipelineJobPreparer(targetDatabaseType).prepareTargetTables(new 
PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, 
sqlParserEngine));
     }
     
     private void prepareIncremental(final MigrationJobItemContext 
jobItemContext) {

Reply via email to