This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f246711e6c2 Fix pipeline e2e for PostgresSQL (#34125)
f246711e6c2 is described below

commit f246711e6c21a973a70574031016dccad45f5cdc
Author: Haoran Meng <[email protected]>
AuthorDate: Mon Dec 23 13:04:58 2024 +0800

    Fix pipeline e2e for PostgresSQL (#34125)
    
    * Fix pipeline e2e for PostgresSQL
    
    * Fix checkstyle
---
 ...DDLGenerator.java => PipelineDDLDecorator.java} |  49 ++-----
 .../metadata/generator/PipelineDDLGenerator.java   | 156 +--------------------
 .../datasource/PipelineJobDataSourcePreparer.java  |  44 +++---
 .../param/PrepareTargetTablesParameter.java        |   4 -
 .../PipelineJobDataSourcePreparerTest.java         |   3 +-
 .../migration/preparer/MigrationJobPreparer.java   |   7 +-
 6 files changed, 41 insertions(+), 222 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLDecorator.java
similarity index 81%
copy from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
copy to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLDecorator.java
index 95b8b3b85ef..d66f840447c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLDecorator.java
@@ -18,9 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.metadata.generator;
 
 import com.google.common.base.Strings;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
+import lombok.AllArgsConstructor;
 import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
 import 
org.apache.shardingsphere.infra.binder.context.statement.ddl.AlterTableStatementContext;
 import 
org.apache.shardingsphere.infra.binder.context.statement.ddl.CommentStatementContext;
@@ -30,7 +28,6 @@ import 
org.apache.shardingsphere.infra.binder.context.type.ConstraintAvailable;
 import org.apache.shardingsphere.infra.binder.context.type.IndexAvailable;
 import org.apache.shardingsphere.infra.binder.context.type.TableAvailable;
 import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine;
-import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.hint.HintValueContext;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
@@ -42,62 +39,36 @@ import 
org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.index.Ind
 import 
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.SimpleTableSegment;
 import 
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.TableNameSegment;
 
-import javax.sql.DataSource;
-import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.TreeMap;
 
 /**
- * Pipeline DDL generator.
+ * Pipeline DDL decorator.
  */
-@RequiredArgsConstructor
-@Slf4j
-public final class PipelineDDLGenerator {
+@AllArgsConstructor
+public final class PipelineDDLDecorator {
     
     private static final String SET_SEARCH_PATH_PREFIX = "set search_path";
     
     private final ShardingSphereMetaData metaData;
     
     /**
-     * Generate logic DDL.
+     * Decorate SQL.
      *
      * @param databaseType database type
-     * @param sourceDataSource source data source
+     * @param targetDatabaseName target database name
      * @param schemaName schema name
-     * @param sourceTableName source table name
      * @param targetTableName target table name
      * @param parserEngine parser engine
-     * @param targetDatabaseName target database name
-     * @return DDL SQL
-     * @throws SQLException SQL exception 
+     * @param sql SQL
+     * @return decorated SQL
      */
-    public List<String> generateLogicDDL(final DatabaseType databaseType, 
final DataSource sourceDataSource,
-                                         final String schemaName, final String 
sourceTableName, final String targetTableName,
-                                         final SQLParserEngine parserEngine, 
final String targetDatabaseName) throws SQLException {
-        long startTimeMillis = System.currentTimeMillis();
-        List<String> result = new ArrayList<>();
-        for (String each : 
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, 
databaseType).buildCreateTableSQLs(sourceDataSource, schemaName, 
sourceTableName)) {
-            Optional<String> queryContext = decorate(databaseType, 
targetDatabaseName, schemaName, targetTableName, parserEngine, each);
-            queryContext.ifPresent(sql -> {
-                String trimmedSql = sql.trim();
-                if (!Strings.isNullOrEmpty(trimmedSql)) {
-                    result.add(trimmedSql);
-                }
-            });
-        }
-        log.info("generateLogicDDL, databaseType={}, schemaName={}, 
sourceTableName={}, targetTableName={}, cost {} ms",
-                databaseType.getType(), schemaName, sourceTableName, 
targetTableName, System.currentTimeMillis() - startTimeMillis);
-        return result;
-    }
-    
-    private Optional<String> decorate(final DatabaseType databaseType, final 
String targetDatabaseName, final String schemaName, final String 
targetTableName,
-                                      final SQLParserEngine parserEngine, 
final String sql) {
+    public Optional<String> decorate(final DatabaseType databaseType, final 
String targetDatabaseName, final String schemaName, final String 
targetTableName,
+                                     final SQLParserEngine parserEngine, final 
String sql) {
         if (Strings.isNullOrEmpty(sql)) {
             return Optional.empty();
         }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
index 95b8b3b85ef..37d281e4da6 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
@@ -17,41 +17,16 @@
 
 package org.apache.shardingsphere.data.pipeline.core.metadata.generator;
 
-import com.google.common.base.Strings;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
-import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
-import 
org.apache.shardingsphere.infra.binder.context.statement.ddl.AlterTableStatementContext;
-import 
org.apache.shardingsphere.infra.binder.context.statement.ddl.CommentStatementContext;
-import 
org.apache.shardingsphere.infra.binder.context.statement.ddl.CreateIndexStatementContext;
-import 
org.apache.shardingsphere.infra.binder.context.statement.ddl.CreateTableStatementContext;
-import org.apache.shardingsphere.infra.binder.context.type.ConstraintAvailable;
-import org.apache.shardingsphere.infra.binder.context.type.IndexAvailable;
-import org.apache.shardingsphere.infra.binder.context.type.TableAvailable;
-import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import org.apache.shardingsphere.infra.hint.HintValueContext;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.util.IndexMetaDataUtils;
-import org.apache.shardingsphere.infra.parser.SQLParserEngine;
-import org.apache.shardingsphere.sql.parser.statement.core.segment.SQLSegment;
-import 
org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.constraint.ConstraintSegment;
-import 
org.apache.shardingsphere.sql.parser.statement.core.segment.ddl.index.IndexSegment;
-import 
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.SimpleTableSegment;
-import 
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.TableNameSegment;
 
 import javax.sql.DataSource;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.TreeMap;
 
 /**
  * Pipeline DDL generator.
@@ -60,10 +35,6 @@ import java.util.TreeMap;
 @Slf4j
 public final class PipelineDDLGenerator {
     
-    private static final String SET_SEARCH_PATH_PREFIX = "set search_path";
-    
-    private final ShardingSphereMetaData metaData;
-    
     /**
      * Generate logic DDL.
      *
@@ -72,135 +43,16 @@ public final class PipelineDDLGenerator {
      * @param schemaName schema name
      * @param sourceTableName source table name
      * @param targetTableName target table name
-     * @param parserEngine parser engine
-     * @param targetDatabaseName target database name
      * @return DDL SQL
      * @throws SQLException SQL exception 
      */
-    public List<String> generateLogicDDL(final DatabaseType databaseType, 
final DataSource sourceDataSource,
-                                         final String schemaName, final String 
sourceTableName, final String targetTableName,
-                                         final SQLParserEngine parserEngine, 
final String targetDatabaseName) throws SQLException {
+    public static List<String> generateLogicDDL(final DatabaseType 
databaseType, final DataSource sourceDataSource,
+                                                final String schemaName, final 
String sourceTableName, final String targetTableName) throws SQLException {
         long startTimeMillis = System.currentTimeMillis();
-        List<String> result = new ArrayList<>();
-        for (String each : 
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, 
databaseType).buildCreateTableSQLs(sourceDataSource, schemaName, 
sourceTableName)) {
-            Optional<String> queryContext = decorate(databaseType, 
targetDatabaseName, schemaName, targetTableName, parserEngine, each);
-            queryContext.ifPresent(sql -> {
-                String trimmedSql = sql.trim();
-                if (!Strings.isNullOrEmpty(trimmedSql)) {
-                    result.add(trimmedSql);
-                }
-            });
-        }
+        List<String> result = new 
ArrayList<>(DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, 
databaseType)
+                .buildCreateTableSQLs(sourceDataSource, schemaName, 
sourceTableName));
         log.info("generateLogicDDL, databaseType={}, schemaName={}, 
sourceTableName={}, targetTableName={}, cost {} ms",
                 databaseType.getType(), schemaName, sourceTableName, 
targetTableName, System.currentTimeMillis() - startTimeMillis);
         return result;
     }
-    
-    private Optional<String> decorate(final DatabaseType databaseType, final 
String targetDatabaseName, final String schemaName, final String 
targetTableName,
-                                      final SQLParserEngine parserEngine, 
final String sql) {
-        if (Strings.isNullOrEmpty(sql)) {
-            return Optional.empty();
-        }
-        String result = decorateActualSQL(targetDatabaseName, targetTableName, 
parserEngine, sql.trim());
-        // TODO remove it after set search_path is supported.
-        if ("openGauss".equals(databaseType.getType())) {
-            return decorateOpenGauss(targetDatabaseName, schemaName, result, 
parserEngine);
-        }
-        return Optional.of(result);
-    }
-    
-    private String decorateActualSQL(final String databaseName, final String 
targetTableName, final SQLParserEngine parserEngine, final String sql) {
-        SQLStatementContext sqlStatementContext = parseSQL(databaseName, 
parserEngine, sql);
-        Map<SQLSegment, String> replaceMap = new 
TreeMap<>(Comparator.comparing(SQLSegment::getStartIndex));
-        if (sqlStatementContext instanceof CreateTableStatementContext) {
-            appendFromIndexAndConstraint(replaceMap, targetTableName, 
sqlStatementContext);
-            appendFromTable(replaceMap, targetTableName, (TableAvailable) 
sqlStatementContext);
-        }
-        if (sqlStatementContext instanceof CommentStatementContext) {
-            appendFromTable(replaceMap, targetTableName, (TableAvailable) 
sqlStatementContext);
-        }
-        if (sqlStatementContext instanceof CreateIndexStatementContext) {
-            appendFromTable(replaceMap, targetTableName, (TableAvailable) 
sqlStatementContext);
-            appendFromIndexAndConstraint(replaceMap, targetTableName, 
sqlStatementContext);
-        }
-        if (sqlStatementContext instanceof AlterTableStatementContext) {
-            appendFromIndexAndConstraint(replaceMap, targetTableName, 
sqlStatementContext);
-            appendFromTable(replaceMap, targetTableName, (TableAvailable) 
sqlStatementContext);
-        }
-        return doDecorateActualTable(replaceMap, sql);
-    }
-    
-    private SQLStatementContext parseSQL(final String currentDatabaseName, 
final SQLParserEngine parserEngine, final String sql) {
-        return new SQLBindEngine(metaData, currentDatabaseName, new 
HintValueContext()).bind(parserEngine.parse(sql, true), 
Collections.emptyList());
-    }
-    
-    private void appendFromIndexAndConstraint(final Map<SQLSegment, String> 
replaceMap, final String targetTableName, final SQLStatementContext 
sqlStatementContext) {
-        if (!(sqlStatementContext instanceof TableAvailable) || 
((TableAvailable) 
sqlStatementContext).getTablesContext().getSimpleTables().isEmpty()) {
-            return;
-        }
-        TableNameSegment tableNameSegment = ((TableAvailable) 
sqlStatementContext).getTablesContext().getSimpleTables().iterator().next().getTableName();
-        if 
(!tableNameSegment.getIdentifier().getValue().equals(targetTableName)) {
-            if (sqlStatementContext instanceof IndexAvailable) {
-                for (IndexSegment each : ((IndexAvailable) 
sqlStatementContext).getIndexes()) {
-                    String logicIndexName = 
IndexMetaDataUtils.getLogicIndexName(each.getIndexName().getIdentifier().getValue(),
 tableNameSegment.getIdentifier().getValue());
-                    replaceMap.put(each.getIndexName(), logicIndexName);
-                }
-            }
-            if (sqlStatementContext instanceof ConstraintAvailable) {
-                for (ConstraintSegment each : ((ConstraintAvailable) 
sqlStatementContext).getConstraints()) {
-                    String logicConstraint = 
IndexMetaDataUtils.getLogicIndexName(each.getIdentifier().getValue(), 
tableNameSegment.getIdentifier().getValue());
-                    replaceMap.put(each, logicConstraint);
-                }
-            }
-        }
-    }
-    
-    private void appendFromTable(final Map<SQLSegment, String> replaceMap, 
final String targetTableName, final TableAvailable sqlStatementContext) {
-        for (SimpleTableSegment each : 
sqlStatementContext.getTablesContext().getSimpleTables()) {
-            if 
(!targetTableName.equals(each.getTableName().getIdentifier().getValue())) {
-                replaceMap.put(each.getTableName(), targetTableName);
-            }
-        }
-    }
-    
-    private String doDecorateActualTable(final Map<SQLSegment, String> 
replaceMap, final String sql) {
-        StringBuilder result = new StringBuilder();
-        int lastStopIndex = 0;
-        for (Entry<SQLSegment, String> entry : replaceMap.entrySet()) {
-            result.append(sql, lastStopIndex, entry.getKey().getStartIndex());
-            result.append(entry.getValue());
-            lastStopIndex = entry.getKey().getStopIndex() + 1;
-        }
-        if (lastStopIndex < sql.length()) {
-            result.append(sql, lastStopIndex, sql.length());
-        }
-        return result.toString();
-    }
-    
-    // TODO remove it after set search_path is supported.
-    private Optional<String> decorateOpenGauss(final String databaseName, 
final String schemaName, final String queryContext,
-                                               final SQLParserEngine 
parserEngine) {
-        if (queryContext.toLowerCase().startsWith(SET_SEARCH_PATH_PREFIX)) {
-            return Optional.empty();
-        }
-        return Optional.of(replaceTableNameWithPrefix(queryContext, schemaName 
+ ".", databaseName, parserEngine));
-    }
-    
-    private String replaceTableNameWithPrefix(final String sql, final String 
prefix, final String databaseName, final SQLParserEngine parserEngine) {
-        SQLStatementContext sqlStatementContext = parseSQL(databaseName, 
parserEngine, sql);
-        if (sqlStatementContext instanceof CreateTableStatementContext || 
sqlStatementContext instanceof CommentStatementContext
-                || sqlStatementContext instanceof CreateIndexStatementContext 
|| sqlStatementContext instanceof AlterTableStatementContext) {
-            if (((TableAvailable) 
sqlStatementContext).getTablesContext().getSimpleTables().isEmpty()) {
-                return sql;
-            }
-            if (((TableAvailable) 
sqlStatementContext).getTablesContext().getSchemaName().isPresent()) {
-                return sql;
-            }
-            Map<SQLSegment, String> replaceMap = new 
TreeMap<>(Comparator.comparing(SQLSegment::getStartIndex));
-            TableNameSegment tableNameSegment = ((TableAvailable) 
sqlStatementContext).getTablesContext().getSimpleTables().iterator().next().getTableName();
-            replaceMap.put(tableNameSegment, prefix + 
tableNameSegment.getIdentifier().getValue());
-            return doDecorateActualTable(replaceMap, sql);
-        }
-        return sql;
-    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
index c256c37ae2a..9d127680312 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
@@ -17,10 +17,12 @@
 
 package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
 
+import com.google.common.base.Strings;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLDecorator;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
@@ -41,10 +43,8 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.regex.Pattern;
 
@@ -65,16 +65,14 @@ public final class PipelineJobDataSourcePreparer {
      * Prepare target schemas.
      *
      * @param param prepare target schemas parameter
-     * @return target schemas
      * @throws SQLException if prepare target schema fail
      */
-    public Map<String, ShardingSphereMetaData> prepareTargetSchemas(final 
PrepareTargetSchemasParameter param) throws SQLException {
+    public void prepareTargetSchemas(final PrepareTargetSchemasParameter 
param) throws SQLException {
         DatabaseType targetDatabaseType = param.getTargetDatabaseType();
         DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(targetDatabaseType).getDialectDatabaseMetaData();
         if (!dialectDatabaseMetaData.isSchemaAvailable()) {
-            return Collections.emptyMap();
+            return;
         }
-        Map<String, ShardingSphereMetaData> result = new 
HashMap<>(param.getCreateTableConfigurations().size(), 1F);
         String defaultSchema = 
dialectDatabaseMetaData.getDefaultSchema().orElse(null);
         PipelinePrepareSQLBuilder pipelineSQLBuilder = new 
PipelinePrepareSQLBuilder(targetDatabaseType);
         Collection<String> createdSchemaNames = new 
HashSet<>(param.getCreateTableConfigurations().size(), 1F);
@@ -85,21 +83,19 @@ public final class PipelineJobDataSourcePreparer {
             }
             Optional<String> sql = 
pipelineSQLBuilder.buildCreateSchemaSQL(targetSchemaName);
             if (sql.isPresent()) {
-                executeCreateSchema(param.getDataSourceManager(), 
each.getTargetDataSourceConfig(), sql.get()).ifPresent(metaData -> 
result.put(targetSchemaName, metaData));
+                executeCreateSchema(param.getDataSourceManager(), 
each.getTargetDataSourceConfig(), sql.get());
                 createdSchemaNames.add(targetSchemaName);
             }
         }
-        return result;
     }
     
-    private Optional<ShardingSphereMetaData> executeCreateSchema(final 
PipelineDataSourceManager dataSourceManager,
-                                                                 final 
PipelineDataSourceConfiguration targetDataSourceConfig, final String sql) 
throws SQLException {
+    private void executeCreateSchema(final PipelineDataSourceManager 
dataSourceManager,
+                                     final PipelineDataSourceConfiguration 
targetDataSourceConfig, final String sql) throws SQLException {
         log.info("Prepare target schemas SQL: {}", sql);
         try (
                 Connection connection = 
dataSourceManager.getDataSource(targetDataSourceConfig).getConnection();
                 Statement statement = connection.createStatement()) {
             statement.execute(sql);
-            return Optional.of(((ShardingSphereConnection) 
connection).getContextManager().getMetaDataContexts().getMetaData());
         } catch (final SQLException ex) {
             if 
(DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class,
 databaseType)
                     
.map(DialectPipelineJobDataSourcePrepareOption::isSupportIfNotExistsOnCreateSchema).orElse(true))
 {
@@ -107,7 +103,6 @@ public final class PipelineJobDataSourcePreparer {
             }
             log.warn("Create schema failed", ex);
         }
-        return Optional.empty();
     }
     
     /**
@@ -121,27 +116,34 @@ public final class PipelineJobDataSourcePreparer {
         PipelineDataSourceManager dataSourceManager = 
param.getDataSourceManager();
         for (CreateTableConfiguration each : 
param.getCreateTableConfigurations()) {
             try (Connection targetConnection = 
dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection())
 {
-                ShardingSphereMetaData metaData = 
param.getTargetSchemaMetaData().get(each.getTargetName().getSchemaName());
-                if (null == metaData) {
-                    metaData = ((ShardingSphereConnection) 
targetConnection).getContextManager().getMetaDataContexts().getMetaData();
-                }
-                List<String> createTargetTableSQL = 
getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine(), 
metaData, param.getTargetDatabaseName());
+                List<String> createTargetTableSQL = 
getCreateTargetTableSQL(each, dataSourceManager);
                 for (String sql : createTargetTableSQL) {
-                    executeTargetTableSQL(targetConnection, 
addIfNotExistsForCreateTableSQL(sql));
+                    ShardingSphereMetaData metaData = 
((ShardingSphereConnection) 
targetConnection).getContextManager().getMetaDataContexts().getMetaData();
+                    Optional<String> decoratedSQL = 
decorateTargetTableSQL(each, param.getSqlParserEngine(), metaData, 
param.getTargetDatabaseName(), sql);
+                    if (decoratedSQL.isPresent()) {
+                        executeTargetTableSQL(targetConnection, 
addIfNotExistsForCreateTableSQL(decoratedSQL.get()));
+                    }
                 }
             }
         }
         log.info("prepareTargetTables cost {} ms", System.currentTimeMillis() 
- startTimeMillis);
     }
     
-    private List<String> getCreateTargetTableSQL(final 
CreateTableConfiguration createTableConfig, final PipelineDataSourceManager 
dataSourceManager,
-                                                 final SQLParserEngine 
sqlParserEngine, final ShardingSphereMetaData metaData, final String 
targetDatabaseName) throws SQLException {
+    private List<String> getCreateTargetTableSQL(final 
CreateTableConfiguration createTableConfig, final PipelineDataSourceManager 
dataSourceManager) throws SQLException {
         DatabaseType databaseType = 
createTableConfig.getSourceDataSourceConfig().getDatabaseType();
         DataSource sourceDataSource = 
dataSourceManager.getDataSource(createTableConfig.getSourceDataSourceConfig());
         String schemaName = createTableConfig.getSourceName().getSchemaName();
         String sourceTableName = 
createTableConfig.getSourceName().getTableName();
         String targetTableName = 
createTableConfig.getTargetName().getTableName();
-        return new 
PipelineDDLGenerator(metaData).generateLogicDDL(databaseType, sourceDataSource, 
schemaName, sourceTableName, targetTableName, sqlParserEngine, 
targetDatabaseName);
+        return PipelineDDLGenerator.generateLogicDDL(databaseType, 
sourceDataSource, schemaName, sourceTableName, targetTableName);
+    }
+    
+    private Optional<String> decorateTargetTableSQL(final 
CreateTableConfiguration createTableConfig, final SQLParserEngine 
sqlParserEngine,
+                                                    final 
ShardingSphereMetaData metaData, final String targetDatabaseName, final String 
sql) {
+        String schemaName = createTableConfig.getSourceName().getSchemaName();
+        String targetTableName = 
createTableConfig.getTargetName().getTableName();
+        Optional<String> decoratedSQL = new 
PipelineDDLDecorator(metaData).decorate(databaseType, targetDatabaseName, 
schemaName, targetTableName, sqlParserEngine, sql);
+        return decoratedSQL.map(String::trim).filter(trimmedSql -> 
!Strings.isNullOrEmpty(trimmedSql));
     }
     
     private void executeTargetTableSQL(final Connection targetConnection, 
final String sql) throws SQLException {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java
index f14bb707ff0..e0c6cdccfd3 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java
@@ -20,11 +20,9 @@ package 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import org.apache.shardingsphere.infra.parser.SQLParserEngine;
 
 import java.util.Collection;
-import java.util.Map;
 
 /**
  * Prepare target tables parameter.
@@ -39,7 +37,5 @@ public final class PrepareTargetTablesParameter {
     
     private final SQLParserEngine sqlParserEngine;
     
-    private final Map<String, ShardingSphereMetaData> targetSchemaMetaData;
-    
     private final String targetDatabaseName;
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java
index 73013470e88..6eec79b803b 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java
@@ -31,7 +31,6 @@ import org.junit.jupiter.api.Test;
 
 import java.sql.SQLException;
 import java.util.Collections;
-import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.mockito.ArgumentMatchers.any;
@@ -60,7 +59,7 @@ class PipelineJobDataSourcePreparerTest {
         
when(connection.getContextManager().getMetaDataContexts().getMetaData()).thenReturn(mock(ShardingSphereMetaData.class));
         PrepareTargetTablesParameter parameter = new 
PrepareTargetTablesParameter(
                 Collections.singleton(createTableConfig), 
pipelineDataSourceManager,
-                mock(SQLParserEngine.class), mock(Map.class), "foo_db");
+                mock(SQLParserEngine.class), "foo_db");
         assertDoesNotThrow(() -> new 
PipelineJobDataSourcePreparer(databaseType).prepareTargetTables(parameter));
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index e8d29bf89e2..6f88bce5f3f 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -65,19 +65,18 @@ import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import 
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
-import org.apache.shardingsphere.mode.lock.global.GlobalLockNames;
 import org.apache.shardingsphere.infra.lock.LockContext;
 import org.apache.shardingsphere.infra.lock.LockDefinition;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import org.apache.shardingsphere.infra.parser.SQLParserEngine;
 import org.apache.shardingsphere.mode.lock.global.GlobalLockDefinition;
+import org.apache.shardingsphere.mode.lock.global.GlobalLockNames;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.parser.rule.SQLParserRule;
 
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Map;
 
 /**
  * Migration job preparer.
@@ -157,11 +156,11 @@ public final class MigrationJobPreparer implements 
PipelineJobPreparer<Migration
         Collection<CreateTableConfiguration> createTableConfigs = 
jobItemContext.getTaskConfig().getCreateTableConfigurations();
         PipelineDataSourceManager dataSourceManager = 
jobItemContext.getDataSourceManager();
         PipelineJobDataSourcePreparer preparer = new 
PipelineJobDataSourcePreparer(targetDatabaseType);
-        Map<String, ShardingSphereMetaData> targetSchemaMetaData = 
preparer.prepareTargetSchemas(new 
PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs, 
dataSourceManager));
+        preparer.prepareTargetSchemas(new 
PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs, 
dataSourceManager));
         ShardingSphereMetaData metaData = 
contextManager.getMetaDataContexts().getMetaData();
         SQLParserEngine sqlParserEngine = 
metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class)
                 
.getSQLParserEngine(metaData.getDatabase(jobConfig.getTargetDatabaseName()).getProtocolType());
-        preparer.prepareTargetTables(new 
PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, 
sqlParserEngine, targetSchemaMetaData, jobConfig.getTargetDatabaseName()));
+        preparer.prepareTargetTables(new 
PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, 
sqlParserEngine, jobConfig.getTargetDatabaseName()));
     }
     
     private void prepareIncremental(final MigrationJobItemContext 
jobItemContext) {

Reply via email to