This is an automated email from the ASF dual-hosted git repository.
chengzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 94962c312fb Remove PipelineJobPreparer.prepareTargetSchema() (#29441)
94962c312fb is described below
commit 94962c312fb7b286bf55214c5a84ef69941db287
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Dec 18 19:48:34 2023 +0800
Remove PipelineJobPreparer.prepareTargetSchema() (#29441)
* Refactor PipelineJobPreparer
* Remove PipelineJobPreparer.prepareTargetSchema()
---
.../pipeline/core/preparer/PipelineJobPreparer.java | 17 +----------------
.../migration/preparer/MigrationJobPreparer.java | 11 +++++++----
2 files changed, 8 insertions(+), 20 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
index 51d3a5e17d1..dc2475fdaec 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
@@ -31,7 +31,6 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPositi
import
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
import
org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.IncrementalDumperCreator;
import
org.apache.shardingsphere.data.pipeline.core.spi.ingest.position.PositionInitializer;
@@ -67,18 +66,6 @@ public final class PipelineJobPreparer {
return
DatabaseTypedSPILoader.findService(IncrementalDumperCreator.class,
databaseType).map(IncrementalDumperCreator::isSupportIncrementalDump).orElse(false);
}
- /**
- * Prepare target schema.
- *
- * @param param prepare target schemas parameter
- * @throws SQLException if prepare target schema fail
- */
- public void prepareTargetSchema(final PrepareTargetSchemasParameter param)
throws SQLException {
- DialectPipelineJobDataSourcePrepareOption option =
DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class,
databaseType)
- .orElseGet(() ->
DatabaseTypedSPILoader.getService(DialectPipelineJobDataSourcePrepareOption.class,
null));
- new PipelineJobDataSourcePreparer(option).prepareTargetSchemas(param);
- }
-
/**
* Get SQL parser engine.
*
@@ -96,10 +83,8 @@ public final class PipelineJobPreparer {
* @throws SQLException SQL exception
*/
public void prepareTargetTables(final PrepareTargetTablesParameter
prepareTargetTablesParam) throws SQLException {
- DialectPipelineJobDataSourcePrepareOption option =
DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class,
databaseType)
- .orElseGet(() ->
DatabaseTypedSPILoader.getService(DialectPipelineJobDataSourcePrepareOption.class,
null));
long startTimeMillis = System.currentTimeMillis();
- new
PipelineJobDataSourcePreparer(option).prepareTargetTables(prepareTargetTablesParam);
+ new
PipelineJobDataSourcePreparer(DatabaseTypedSPILoader.getService(DialectPipelineJobDataSourcePrepareOption.class,
databaseType)).prepareTargetTables(prepareTargetTablesParam);
log.info("prepareTargetTables cost {} ms", System.currentTimeMillis()
- startTimeMillis);
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index cf01823a977..299f9d35b68 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -20,6 +20,8 @@ package
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
@@ -59,6 +61,7 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnsupportedSQLOperationException;
import org.apache.shardingsphere.infra.lock.GlobalLockNames;
@@ -166,15 +169,15 @@ public final class MigrationJobPreparer {
private void prepareTarget(final MigrationJobItemContext jobItemContext)
throws SQLException {
MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
Collection<CreateTableConfiguration> createTableConfigs =
jobItemContext.getTaskConfig().getCreateTableConfigurations();
+ DatabaseType targetDatabaseType =
jobItemContext.getJobConfig().getTargetDatabaseType();
PipelineDataSourceManager dataSourceManager =
jobItemContext.getDataSourceManager();
- PrepareTargetSchemasParameter prepareTargetSchemasParam = new
PrepareTargetSchemasParameter(jobItemContext.getJobConfig().getTargetDatabaseType(),
createTableConfigs, dataSourceManager);
- PipelineJobPreparer targetDataSourcePreparer = new
PipelineJobPreparer(jobItemContext.getJobConfig().getTargetDatabaseType());
-
targetDataSourcePreparer.prepareTargetSchema(prepareTargetSchemasParam);
+ PrepareTargetSchemasParameter prepareTargetSchemasParam = new
PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs,
dataSourceManager);
+ new
PipelineJobDataSourcePreparer(DatabaseTypedSPILoader.getService(DialectPipelineJobDataSourcePrepareOption.class,
targetDatabaseType)).prepareTargetSchemas(prepareTargetSchemasParam);
ShardingSphereMetaData metaData =
PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())).getContextManager().getMetaDataContexts().getMetaData();
SQLParserEngine sqlParserEngine = new PipelineJobPreparer(
metaData.getDatabase(jobConfig.getTargetDatabaseName()).getProtocolType().getTrunkDatabaseType().orElse(metaData.getDatabase(jobConfig.getTargetDatabaseName()).getProtocolType()))
.getSQLParserEngine(metaData);
- targetDataSourcePreparer.prepareTargetTables(new
PrepareTargetTablesParameter(createTableConfigs, dataSourceManager,
sqlParserEngine));
+ new PipelineJobPreparer(targetDatabaseType).prepareTargetTables(new
PrepareTargetTablesParameter(createTableConfigs, dataSourceManager,
sqlParserEngine));
}
private void prepareIncremental(final MigrationJobItemContext
jobItemContext) {