This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f246711e6c2 Fix pipeline e2e for PostgresSQL (#34125)
f246711e6c2 is described below
commit f246711e6c21a973a70574031016dccad45f5cdc
Author: Haoran Meng <[email protected]>
AuthorDate: Mon Dec 23 13:04:58 2024 +0800
Fix pipeline e2e for PostgresSQL (#34125)
* Fix pipeline e2e for PostgresSQL
* Fix checkstyle
---
...DDLGenerator.java => PipelineDDLDecorator.java} | 49 ++-----
.../metadata/generator/PipelineDDLGenerator.java | 156 +--------------------
.../datasource/PipelineJobDataSourcePreparer.java | 44 +++---
.../param/PrepareTargetTablesParameter.java | 4 -
.../PipelineJobDataSourcePreparerTest.java | 3 +-
.../migration/preparer/MigrationJobPreparer.java | 7 +-
6 files changed, 41 insertions(+), 222 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLDecorator.java
similarity index 81%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
copy to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLDecorator.java
index 95b8b3b85ef..d66f840447c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLDecorator.java
@@ -18,9 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.metadata.generator;
import com.google.common.base.Strings;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
+import lombok.AllArgsConstructor;
import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
import
org.apache.shardingsphere.infra.binder.context.statement.ddl.AlterTableStatementContext;
import
org.apache.shardingsphere.infra.binder.context.statement.ddl.CommentStatementContext;
@@ -30,7 +28,6 @@ import
org.apache.shardingsphere.infra.binder.context.type.ConstraintAvailable;
import org.apache.shardingsphere.infra.binder.context.type.IndexAvailable;
import org.apache.shardingsphere.infra.binder.context.type.TableAvailable;
import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine;
-import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.hint.HintValueContext;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
@@ -42,62 +39,36 @@ import
org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.index.Ind
import
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.SimpleTableSegment;
import
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.TableNameSegment;
-import javax.sql.DataSource;
-import java.sql.SQLException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.TreeMap;
/**
- * Pipeline DDL generator.
+ * Pipeline DDL decorator.
*/
-@RequiredArgsConstructor
-@Slf4j
-public final class PipelineDDLGenerator {
+@AllArgsConstructor
+public final class PipelineDDLDecorator {
private static final String SET_SEARCH_PATH_PREFIX = "set search_path";
private final ShardingSphereMetaData metaData;
/**
- * Generate logic DDL.
+ * Decorate SQL.
*
* @param databaseType database type
- * @param sourceDataSource source data source
+ * @param targetDatabaseName target database name
* @param schemaName schema name
- * @param sourceTableName source table name
* @param targetTableName target table name
* @param parserEngine parser engine
- * @param targetDatabaseName target database name
- * @return DDL SQL
- * @throws SQLException SQL exception
+ * @param sql SQL
+ * @return decorated SQL
*/
- public List<String> generateLogicDDL(final DatabaseType databaseType,
final DataSource sourceDataSource,
- final String schemaName, final String
sourceTableName, final String targetTableName,
- final SQLParserEngine parserEngine,
final String targetDatabaseName) throws SQLException {
- long startTimeMillis = System.currentTimeMillis();
- List<String> result = new ArrayList<>();
- for (String each :
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class,
databaseType).buildCreateTableSQLs(sourceDataSource, schemaName,
sourceTableName)) {
- Optional<String> queryContext = decorate(databaseType,
targetDatabaseName, schemaName, targetTableName, parserEngine, each);
- queryContext.ifPresent(sql -> {
- String trimmedSql = sql.trim();
- if (!Strings.isNullOrEmpty(trimmedSql)) {
- result.add(trimmedSql);
- }
- });
- }
- log.info("generateLogicDDL, databaseType={}, schemaName={},
sourceTableName={}, targetTableName={}, cost {} ms",
- databaseType.getType(), schemaName, sourceTableName,
targetTableName, System.currentTimeMillis() - startTimeMillis);
- return result;
- }
-
- private Optional<String> decorate(final DatabaseType databaseType, final
String targetDatabaseName, final String schemaName, final String
targetTableName,
- final SQLParserEngine parserEngine,
final String sql) {
+ public Optional<String> decorate(final DatabaseType databaseType, final
String targetDatabaseName, final String schemaName, final String
targetTableName,
+ final SQLParserEngine parserEngine, final
String sql) {
if (Strings.isNullOrEmpty(sql)) {
return Optional.empty();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
index 95b8b3b85ef..37d281e4da6 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
@@ -17,41 +17,16 @@
package org.apache.shardingsphere.data.pipeline.core.metadata.generator;
-import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
-import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
-import
org.apache.shardingsphere.infra.binder.context.statement.ddl.AlterTableStatementContext;
-import
org.apache.shardingsphere.infra.binder.context.statement.ddl.CommentStatementContext;
-import
org.apache.shardingsphere.infra.binder.context.statement.ddl.CreateIndexStatementContext;
-import
org.apache.shardingsphere.infra.binder.context.statement.ddl.CreateTableStatementContext;
-import org.apache.shardingsphere.infra.binder.context.type.ConstraintAvailable;
-import org.apache.shardingsphere.infra.binder.context.type.IndexAvailable;
-import org.apache.shardingsphere.infra.binder.context.type.TableAvailable;
-import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import org.apache.shardingsphere.infra.hint.HintValueContext;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import
org.apache.shardingsphere.infra.metadata.database.schema.util.IndexMetaDataUtils;
-import org.apache.shardingsphere.infra.parser.SQLParserEngine;
-import org.apache.shardingsphere.sql.parser.statement.core.segment.SQLSegment;
-import
org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.constraint.ConstraintSegment;
-import
org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.index.IndexSegment;
-import
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.SimpleTableSegment;
-import
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.TableNameSegment;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.TreeMap;
/**
* Pipeline DDL generator.
@@ -60,10 +35,6 @@ import java.util.TreeMap;
@Slf4j
public final class PipelineDDLGenerator {
- private static final String SET_SEARCH_PATH_PREFIX = "set search_path";
-
- private final ShardingSphereMetaData metaData;
-
/**
* Generate logic DDL.
*
@@ -72,135 +43,16 @@ public final class PipelineDDLGenerator {
* @param schemaName schema name
* @param sourceTableName source table name
* @param targetTableName target table name
- * @param parserEngine parser engine
- * @param targetDatabaseName target database name
* @return DDL SQL
* @throws SQLException SQL exception
*/
- public List<String> generateLogicDDL(final DatabaseType databaseType,
final DataSource sourceDataSource,
- final String schemaName, final String
sourceTableName, final String targetTableName,
- final SQLParserEngine parserEngine,
final String targetDatabaseName) throws SQLException {
+ public static List<String> generateLogicDDL(final DatabaseType
databaseType, final DataSource sourceDataSource,
+ final String schemaName, final
String sourceTableName, final String targetTableName) throws SQLException {
long startTimeMillis = System.currentTimeMillis();
- List<String> result = new ArrayList<>();
- for (String each :
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class,
databaseType).buildCreateTableSQLs(sourceDataSource, schemaName,
sourceTableName)) {
- Optional<String> queryContext = decorate(databaseType,
targetDatabaseName, schemaName, targetTableName, parserEngine, each);
- queryContext.ifPresent(sql -> {
- String trimmedSql = sql.trim();
- if (!Strings.isNullOrEmpty(trimmedSql)) {
- result.add(trimmedSql);
- }
- });
- }
+ List<String> result = new
ArrayList<>(DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class,
databaseType)
+ .buildCreateTableSQLs(sourceDataSource, schemaName,
sourceTableName));
log.info("generateLogicDDL, databaseType={}, schemaName={},
sourceTableName={}, targetTableName={}, cost {} ms",
databaseType.getType(), schemaName, sourceTableName,
targetTableName, System.currentTimeMillis() - startTimeMillis);
return result;
}
-
- private Optional<String> decorate(final DatabaseType databaseType, final
String targetDatabaseName, final String schemaName, final String
targetTableName,
- final SQLParserEngine parserEngine,
final String sql) {
- if (Strings.isNullOrEmpty(sql)) {
- return Optional.empty();
- }
- String result = decorateActualSQL(targetDatabaseName, targetTableName,
parserEngine, sql.trim());
- // TODO remove it after set search_path is supported.
- if ("openGauss".equals(databaseType.getType())) {
- return decorateOpenGauss(targetDatabaseName, schemaName, result,
parserEngine);
- }
- return Optional.of(result);
- }
-
- private String decorateActualSQL(final String databaseName, final String
targetTableName, final SQLParserEngine parserEngine, final String sql) {
- SQLStatementContext sqlStatementContext = parseSQL(databaseName,
parserEngine, sql);
- Map<SQLSegment, String> replaceMap = new
TreeMap<>(Comparator.comparing(SQLSegment::getStartIndex));
- if (sqlStatementContext instanceof CreateTableStatementContext) {
- appendFromIndexAndConstraint(replaceMap, targetTableName,
sqlStatementContext);
- appendFromTable(replaceMap, targetTableName, (TableAvailable)
sqlStatementContext);
- }
- if (sqlStatementContext instanceof CommentStatementContext) {
- appendFromTable(replaceMap, targetTableName, (TableAvailable)
sqlStatementContext);
- }
- if (sqlStatementContext instanceof CreateIndexStatementContext) {
- appendFromTable(replaceMap, targetTableName, (TableAvailable)
sqlStatementContext);
- appendFromIndexAndConstraint(replaceMap, targetTableName,
sqlStatementContext);
- }
- if (sqlStatementContext instanceof AlterTableStatementContext) {
- appendFromIndexAndConstraint(replaceMap, targetTableName,
sqlStatementContext);
- appendFromTable(replaceMap, targetTableName, (TableAvailable)
sqlStatementContext);
- }
- return doDecorateActualTable(replaceMap, sql);
- }
-
- private SQLStatementContext parseSQL(final String currentDatabaseName,
final SQLParserEngine parserEngine, final String sql) {
- return new SQLBindEngine(metaData, currentDatabaseName, new
HintValueContext()).bind(parserEngine.parse(sql, true),
Collections.emptyList());
- }
-
- private void appendFromIndexAndConstraint(final Map<SQLSegment, String>
replaceMap, final String targetTableName, final SQLStatementContext
sqlStatementContext) {
- if (!(sqlStatementContext instanceof TableAvailable) ||
((TableAvailable)
sqlStatementContext).getTablesContext().getSimpleTables().isEmpty()) {
- return;
- }
- TableNameSegment tableNameSegment = ((TableAvailable)
sqlStatementContext).getTablesContext().getSimpleTables().iterator().next().getTableName();
- if
(!tableNameSegment.getIdentifier().getValue().equals(targetTableName)) {
- if (sqlStatementContext instanceof IndexAvailable) {
- for (IndexSegment each : ((IndexAvailable)
sqlStatementContext).getIndexes()) {
- String logicIndexName =
IndexMetaDataUtils.getLogicIndexName(each.getIndexName().getIdentifier().getValue(),
tableNameSegment.getIdentifier().getValue());
- replaceMap.put(each.getIndexName(), logicIndexName);
- }
- }
- if (sqlStatementContext instanceof ConstraintAvailable) {
- for (ConstraintSegment each : ((ConstraintAvailable)
sqlStatementContext).getConstraints()) {
- String logicConstraint =
IndexMetaDataUtils.getLogicIndexName(each.getIdentifier().getValue(),
tableNameSegment.getIdentifier().getValue());
- replaceMap.put(each, logicConstraint);
- }
- }
- }
- }
-
- private void appendFromTable(final Map<SQLSegment, String> replaceMap,
final String targetTableName, final TableAvailable sqlStatementContext) {
- for (SimpleTableSegment each :
sqlStatementContext.getTablesContext().getSimpleTables()) {
- if
(!targetTableName.equals(each.getTableName().getIdentifier().getValue())) {
- replaceMap.put(each.getTableName(), targetTableName);
- }
- }
- }
-
- private String doDecorateActualTable(final Map<SQLSegment, String>
replaceMap, final String sql) {
- StringBuilder result = new StringBuilder();
- int lastStopIndex = 0;
- for (Entry<SQLSegment, String> entry : replaceMap.entrySet()) {
- result.append(sql, lastStopIndex, entry.getKey().getStartIndex());
- result.append(entry.getValue());
- lastStopIndex = entry.getKey().getStopIndex() + 1;
- }
- if (lastStopIndex < sql.length()) {
- result.append(sql, lastStopIndex, sql.length());
- }
- return result.toString();
- }
-
- // TODO remove it after set search_path is supported.
- private Optional<String> decorateOpenGauss(final String databaseName,
final String schemaName, final String queryContext,
- final SQLParserEngine
parserEngine) {
- if (queryContext.toLowerCase().startsWith(SET_SEARCH_PATH_PREFIX)) {
- return Optional.empty();
- }
- return Optional.of(replaceTableNameWithPrefix(queryContext, schemaName
+ ".", databaseName, parserEngine));
- }
-
- private String replaceTableNameWithPrefix(final String sql, final String
prefix, final String databaseName, final SQLParserEngine parserEngine) {
- SQLStatementContext sqlStatementContext = parseSQL(databaseName,
parserEngine, sql);
- if (sqlStatementContext instanceof CreateTableStatementContext ||
sqlStatementContext instanceof CommentStatementContext
- || sqlStatementContext instanceof CreateIndexStatementContext
|| sqlStatementContext instanceof AlterTableStatementContext) {
- if (((TableAvailable)
sqlStatementContext).getTablesContext().getSimpleTables().isEmpty()) {
- return sql;
- }
- if (((TableAvailable)
sqlStatementContext).getTablesContext().getSchemaName().isPresent()) {
- return sql;
- }
- Map<SQLSegment, String> replaceMap = new
TreeMap<>(Comparator.comparing(SQLSegment::getStartIndex));
- TableNameSegment tableNameSegment = ((TableAvailable)
sqlStatementContext).getTablesContext().getSimpleTables().iterator().next().getTableName();
- replaceMap.put(tableNameSegment, prefix +
tableNameSegment.getIdentifier().getValue());
- return doDecorateActualTable(replaceMap, sql);
- }
- return sql;
- }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
index c256c37ae2a..9d127680312 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
@@ -17,10 +17,12 @@
package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
+import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLDecorator;
import
org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
@@ -41,10 +43,8 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
@@ -65,16 +65,14 @@ public final class PipelineJobDataSourcePreparer {
* Prepare target schemas.
*
* @param param prepare target schemas parameter
- * @return target schemas
* @throws SQLException if prepare target schema fail
*/
- public Map<String, ShardingSphereMetaData> prepareTargetSchemas(final
PrepareTargetSchemasParameter param) throws SQLException {
+ public void prepareTargetSchemas(final PrepareTargetSchemasParameter
param) throws SQLException {
DatabaseType targetDatabaseType = param.getTargetDatabaseType();
DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(targetDatabaseType).getDialectDatabaseMetaData();
if (!dialectDatabaseMetaData.isSchemaAvailable()) {
- return Collections.emptyMap();
+ return;
}
- Map<String, ShardingSphereMetaData> result = new
HashMap<>(param.getCreateTableConfigurations().size(), 1F);
String defaultSchema =
dialectDatabaseMetaData.getDefaultSchema().orElse(null);
PipelinePrepareSQLBuilder pipelineSQLBuilder = new
PipelinePrepareSQLBuilder(targetDatabaseType);
Collection<String> createdSchemaNames = new
HashSet<>(param.getCreateTableConfigurations().size(), 1F);
@@ -85,21 +83,19 @@ public final class PipelineJobDataSourcePreparer {
}
Optional<String> sql =
pipelineSQLBuilder.buildCreateSchemaSQL(targetSchemaName);
if (sql.isPresent()) {
- executeCreateSchema(param.getDataSourceManager(),
each.getTargetDataSourceConfig(), sql.get()).ifPresent(metaData ->
result.put(targetSchemaName, metaData));
+ executeCreateSchema(param.getDataSourceManager(),
each.getTargetDataSourceConfig(), sql.get());
createdSchemaNames.add(targetSchemaName);
}
}
- return result;
}
- private Optional<ShardingSphereMetaData> executeCreateSchema(final
PipelineDataSourceManager dataSourceManager,
- final
PipelineDataSourceConfiguration targetDataSourceConfig, final String sql)
throws SQLException {
+ private void executeCreateSchema(final PipelineDataSourceManager
dataSourceManager,
+ final PipelineDataSourceConfiguration
targetDataSourceConfig, final String sql) throws SQLException {
log.info("Prepare target schemas SQL: {}", sql);
try (
Connection connection =
dataSourceManager.getDataSource(targetDataSourceConfig).getConnection();
Statement statement = connection.createStatement()) {
statement.execute(sql);
- return Optional.of(((ShardingSphereConnection)
connection).getContextManager().getMetaDataContexts().getMetaData());
} catch (final SQLException ex) {
if
(DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class,
databaseType)
.map(DialectPipelineJobDataSourcePrepareOption::isSupportIfNotExistsOnCreateSchema).orElse(true))
{
@@ -107,7 +103,6 @@ public final class PipelineJobDataSourcePreparer {
}
log.warn("Create schema failed", ex);
}
- return Optional.empty();
}
/**
@@ -121,27 +116,34 @@ public final class PipelineJobDataSourcePreparer {
PipelineDataSourceManager dataSourceManager =
param.getDataSourceManager();
for (CreateTableConfiguration each :
param.getCreateTableConfigurations()) {
try (Connection targetConnection =
dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection())
{
- ShardingSphereMetaData metaData =
param.getTargetSchemaMetaData().get(each.getTargetName().getSchemaName());
- if (null == metaData) {
- metaData = ((ShardingSphereConnection)
targetConnection).getContextManager().getMetaDataContexts().getMetaData();
- }
- List<String> createTargetTableSQL =
getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine(),
metaData, param.getTargetDatabaseName());
+ List<String> createTargetTableSQL =
getCreateTargetTableSQL(each, dataSourceManager);
for (String sql : createTargetTableSQL) {
- executeTargetTableSQL(targetConnection,
addIfNotExistsForCreateTableSQL(sql));
+ ShardingSphereMetaData metaData =
((ShardingSphereConnection)
targetConnection).getContextManager().getMetaDataContexts().getMetaData();
+ Optional<String> decoratedSQL =
decorateTargetTableSQL(each, param.getSqlParserEngine(), metaData,
param.getTargetDatabaseName(), sql);
+ if (decoratedSQL.isPresent()) {
+ executeTargetTableSQL(targetConnection,
addIfNotExistsForCreateTableSQL(decoratedSQL.get()));
+ }
}
}
}
log.info("prepareTargetTables cost {} ms", System.currentTimeMillis()
- startTimeMillis);
}
- private List<String> getCreateTargetTableSQL(final
CreateTableConfiguration createTableConfig, final PipelineDataSourceManager
dataSourceManager,
- final SQLParserEngine
sqlParserEngine, final ShardingSphereMetaData metaData, final String
targetDatabaseName) throws SQLException {
+ private List<String> getCreateTargetTableSQL(final
CreateTableConfiguration createTableConfig, final PipelineDataSourceManager
dataSourceManager) throws SQLException {
DatabaseType databaseType =
createTableConfig.getSourceDataSourceConfig().getDatabaseType();
DataSource sourceDataSource =
dataSourceManager.getDataSource(createTableConfig.getSourceDataSourceConfig());
String schemaName = createTableConfig.getSourceName().getSchemaName();
String sourceTableName =
createTableConfig.getSourceName().getTableName();
String targetTableName =
createTableConfig.getTargetName().getTableName();
- return new
PipelineDDLGenerator(metaData).generateLogicDDL(databaseType, sourceDataSource,
schemaName, sourceTableName, targetTableName, sqlParserEngine,
targetDatabaseName);
+ return PipelineDDLGenerator.generateLogicDDL(databaseType,
sourceDataSource, schemaName, sourceTableName, targetTableName);
+ }
+
+ private Optional<String> decorateTargetTableSQL(final
CreateTableConfiguration createTableConfig, final SQLParserEngine
sqlParserEngine,
+ final
ShardingSphereMetaData metaData, final String targetDatabaseName, final String
sql) {
+ String schemaName = createTableConfig.getSourceName().getSchemaName();
+ String targetTableName =
createTableConfig.getTargetName().getTableName();
+ Optional<String> decoratedSQL = new
PipelineDDLDecorator(metaData).decorate(databaseType, targetDatabaseName,
schemaName, targetTableName, sqlParserEngine, sql);
+ return decoratedSQL.map(String::trim).filter(trimmedSql ->
!Strings.isNullOrEmpty(trimmedSql));
}
private void executeTargetTableSQL(final Connection targetConnection,
final String sql) throws SQLException {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java
index f14bb707ff0..e0c6cdccfd3 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java
@@ -20,11 +20,9 @@ package
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
import java.util.Collection;
-import java.util.Map;
/**
* Prepare target tables parameter.
@@ -39,7 +37,5 @@ public final class PrepareTargetTablesParameter {
private final SQLParserEngine sqlParserEngine;
- private final Map<String, ShardingSphereMetaData> targetSchemaMetaData;
-
private final String targetDatabaseName;
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java
index 73013470e88..6eec79b803b 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java
@@ -31,7 +31,6 @@ import org.junit.jupiter.api.Test;
import java.sql.SQLException;
import java.util.Collections;
-import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.ArgumentMatchers.any;
@@ -60,7 +59,7 @@ class PipelineJobDataSourcePreparerTest {
when(connection.getContextManager().getMetaDataContexts().getMetaData()).thenReturn(mock(ShardingSphereMetaData.class));
PrepareTargetTablesParameter parameter = new
PrepareTargetTablesParameter(
Collections.singleton(createTableConfig),
pipelineDataSourceManager,
- mock(SQLParserEngine.class), mock(Map.class), "foo_db");
+ mock(SQLParserEngine.class), "foo_db");
assertDoesNotThrow(() -> new
PipelineJobDataSourcePreparer(databaseType).prepareTargetTables(parameter));
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index e8d29bf89e2..6f88bce5f3f 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -65,19 +65,18 @@ import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
-import org.apache.shardingsphere.mode.lock.global.GlobalLockNames;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.lock.LockDefinition;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
import org.apache.shardingsphere.mode.lock.global.GlobalLockDefinition;
+import org.apache.shardingsphere.mode.lock.global.GlobalLockNames;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
-import java.util.Map;
/**
* Migration job preparer.
@@ -157,11 +156,11 @@ public final class MigrationJobPreparer implements
PipelineJobPreparer<Migration
Collection<CreateTableConfiguration> createTableConfigs =
jobItemContext.getTaskConfig().getCreateTableConfigurations();
PipelineDataSourceManager dataSourceManager =
jobItemContext.getDataSourceManager();
PipelineJobDataSourcePreparer preparer = new
PipelineJobDataSourcePreparer(targetDatabaseType);
- Map<String, ShardingSphereMetaData> targetSchemaMetaData =
preparer.prepareTargetSchemas(new
PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs,
dataSourceManager));
+ preparer.prepareTargetSchemas(new
PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs,
dataSourceManager));
ShardingSphereMetaData metaData =
contextManager.getMetaDataContexts().getMetaData();
SQLParserEngine sqlParserEngine =
metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class)
.getSQLParserEngine(metaData.getDatabase(jobConfig.getTargetDatabaseName()).getProtocolType());
- preparer.prepareTargetTables(new
PrepareTargetTablesParameter(createTableConfigs, dataSourceManager,
sqlParserEngine, targetSchemaMetaData, jobConfig.getTargetDatabaseName()));
+ preparer.prepareTargetTables(new
PrepareTargetTablesParameter(createTableConfigs, dataSourceManager,
sqlParserEngine, jobConfig.getTargetDatabaseName()));
}
private void prepareIncremental(final MigrationJobItemContext
jobItemContext) {