This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new af96795f05c Fix pipeline e2e for MySQL (#34108)
af96795f05c is described below

commit af96795f05c8d7c646bc5df666d4fa7ba446dd04
Author: Haoran Meng <[email protected]>
AuthorDate: Fri Dec 20 11:20:53 2024 +0800

    Fix pipeline e2e for MySQL (#34108)
---
 .../metadata/generator/PipelineDDLGenerator.java   | 19 +++++++--------
 .../datasource/PipelineJobDataSourcePreparer.java  | 27 ++++++++++++++++------
 .../param/PrepareTargetTablesParameter.java        |  5 +++-
 .../PipelineJobDataSourcePreparerTest.java         | 13 ++++++++++-
 .../migration/preparer/MigrationJobPreparer.java   |  7 +++---
 5 files changed, 48 insertions(+), 23 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
index a0510de5440..95b8b3b85ef 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
@@ -43,7 +43,6 @@ import 
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table
 import 
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.TableNameSegment;
 
 import javax.sql.DataSource;
-import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -74,15 +73,17 @@ public final class PipelineDDLGenerator {
      * @param sourceTableName source table name
      * @param targetTableName target table name
      * @param parserEngine parser engine
+     * @param targetDatabaseName target database name
      * @return DDL SQL
      * @throws SQLException SQL exception 
      */
     public List<String> generateLogicDDL(final DatabaseType databaseType, 
final DataSource sourceDataSource,
-                                         final String schemaName, final String 
sourceTableName, final String targetTableName, final SQLParserEngine 
parserEngine) throws SQLException {
+                                         final String schemaName, final String 
sourceTableName, final String targetTableName,
+                                         final SQLParserEngine parserEngine, 
final String targetDatabaseName) throws SQLException {
         long startTimeMillis = System.currentTimeMillis();
         List<String> result = new ArrayList<>();
         for (String each : 
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, 
databaseType).buildCreateTableSQLs(sourceDataSource, schemaName, 
sourceTableName)) {
-            Optional<String> queryContext = decorate(databaseType, 
sourceDataSource, schemaName, targetTableName, parserEngine, each);
+            Optional<String> queryContext = decorate(databaseType, 
targetDatabaseName, schemaName, targetTableName, parserEngine, each);
             queryContext.ifPresent(sql -> {
                 String trimmedSql = sql.trim();
                 if (!Strings.isNullOrEmpty(trimmedSql)) {
@@ -95,19 +96,15 @@ public final class PipelineDDLGenerator {
         return result;
     }
     
-    private Optional<String> decorate(final DatabaseType databaseType, final 
DataSource dataSource, final String schemaName, final String targetTableName,
-                                      final SQLParserEngine parserEngine, 
final String sql) throws SQLException {
+    private Optional<String> decorate(final DatabaseType databaseType, final 
String targetDatabaseName, final String schemaName, final String 
targetTableName,
+                                      final SQLParserEngine parserEngine, 
final String sql) {
         if (Strings.isNullOrEmpty(sql)) {
             return Optional.empty();
         }
-        String databaseName;
-        try (Connection connection = dataSource.getConnection()) {
-            databaseName = connection.getCatalog();
-        }
-        String result = decorateActualSQL(databaseName, targetTableName, 
parserEngine, sql.trim());
+        String result = decorateActualSQL(targetDatabaseName, targetTableName, 
parserEngine, sql.trim());
         // TODO remove it after set search_path is supported.
         if ("openGauss".equals(databaseType.getType())) {
-            return decorateOpenGauss(databaseName, schemaName, result, 
parserEngine);
+            return decorateOpenGauss(targetDatabaseName, schemaName, result, 
parserEngine);
         }
         return Optional.of(result);
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
index 2901c616442..c256c37ae2a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
@@ -27,6 +27,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.Cr
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
+import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -40,8 +41,10 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.regex.Pattern;
 
@@ -62,14 +65,16 @@ public final class PipelineJobDataSourcePreparer {
      * Prepare target schemas.
      *
      * @param param prepare target schemas parameter
+     * @return target schemas
      * @throws SQLException if prepare target schema fail
      */
-    public void prepareTargetSchemas(final PrepareTargetSchemasParameter 
param) throws SQLException {
+    public Map<String, ShardingSphereMetaData> prepareTargetSchemas(final 
PrepareTargetSchemasParameter param) throws SQLException {
         DatabaseType targetDatabaseType = param.getTargetDatabaseType();
         DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(targetDatabaseType).getDialectDatabaseMetaData();
         if (!dialectDatabaseMetaData.isSchemaAvailable()) {
-            return;
+            return Collections.emptyMap();
         }
+        Map<String, ShardingSphereMetaData> result = new 
HashMap<>(param.getCreateTableConfigurations().size(), 1F);
         String defaultSchema = 
dialectDatabaseMetaData.getDefaultSchema().orElse(null);
         PipelinePrepareSQLBuilder pipelineSQLBuilder = new 
PipelinePrepareSQLBuilder(targetDatabaseType);
         Collection<String> createdSchemaNames = new 
HashSet<>(param.getCreateTableConfigurations().size(), 1F);
@@ -80,18 +85,21 @@ public final class PipelineJobDataSourcePreparer {
             }
             Optional<String> sql = 
pipelineSQLBuilder.buildCreateSchemaSQL(targetSchemaName);
             if (sql.isPresent()) {
-                executeCreateSchema(param.getDataSourceManager(), 
each.getTargetDataSourceConfig(), sql.get());
+                executeCreateSchema(param.getDataSourceManager(), 
each.getTargetDataSourceConfig(), sql.get()).ifPresent(metaData -> 
result.put(targetSchemaName, metaData));
                 createdSchemaNames.add(targetSchemaName);
             }
         }
+        return result;
     }
     
-    private void executeCreateSchema(final PipelineDataSourceManager 
dataSourceManager, final PipelineDataSourceConfiguration 
targetDataSourceConfig, final String sql) throws SQLException {
+    private Optional<ShardingSphereMetaData> executeCreateSchema(final 
PipelineDataSourceManager dataSourceManager,
+                                                                 final 
PipelineDataSourceConfiguration targetDataSourceConfig, final String sql) 
throws SQLException {
         log.info("Prepare target schemas SQL: {}", sql);
         try (
                 Connection connection = 
dataSourceManager.getDataSource(targetDataSourceConfig).getConnection();
                 Statement statement = connection.createStatement()) {
             statement.execute(sql);
+            return Optional.of(((ShardingSphereConnection) 
connection).getContextManager().getMetaDataContexts().getMetaData());
         } catch (final SQLException ex) {
             if 
(DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class,
 databaseType)
                     
.map(DialectPipelineJobDataSourcePrepareOption::isSupportIfNotExistsOnCreateSchema).orElse(true))
 {
@@ -99,6 +107,7 @@ public final class PipelineJobDataSourcePreparer {
             }
             log.warn("Create schema failed", ex);
         }
+        return Optional.empty();
     }
     
     /**
@@ -111,8 +120,12 @@ public final class PipelineJobDataSourcePreparer {
         final long startTimeMillis = System.currentTimeMillis();
         PipelineDataSourceManager dataSourceManager = 
param.getDataSourceManager();
         for (CreateTableConfiguration each : 
param.getCreateTableConfigurations()) {
-            List<String> createTargetTableSQL = getCreateTargetTableSQL(each, 
dataSourceManager, param.getSqlParserEngine(), param.getMetaData());
             try (Connection targetConnection = 
dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection())
 {
+                ShardingSphereMetaData metaData = 
param.getTargetSchemaMetaData().get(each.getTargetName().getSchemaName());
+                if (null == metaData) {
+                    metaData = ((ShardingSphereConnection) 
targetConnection).getContextManager().getMetaDataContexts().getMetaData();
+                }
+                List<String> createTargetTableSQL = 
getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine(), 
metaData, param.getTargetDatabaseName());
                 for (String sql : createTargetTableSQL) {
                     executeTargetTableSQL(targetConnection, 
addIfNotExistsForCreateTableSQL(sql));
                 }
@@ -122,13 +135,13 @@ public final class PipelineJobDataSourcePreparer {
     }
     
     private List<String> getCreateTargetTableSQL(final 
CreateTableConfiguration createTableConfig, final PipelineDataSourceManager 
dataSourceManager,
-                                                 final SQLParserEngine 
sqlParserEngine, final ShardingSphereMetaData metaData) throws SQLException {
+                                                 final SQLParserEngine 
sqlParserEngine, final ShardingSphereMetaData metaData, final String 
targetDatabaseName) throws SQLException {
         DatabaseType databaseType = 
createTableConfig.getSourceDataSourceConfig().getDatabaseType();
         DataSource sourceDataSource = 
dataSourceManager.getDataSource(createTableConfig.getSourceDataSourceConfig());
         String schemaName = createTableConfig.getSourceName().getSchemaName();
         String sourceTableName = 
createTableConfig.getSourceName().getTableName();
         String targetTableName = 
createTableConfig.getTargetName().getTableName();
-        return new 
PipelineDDLGenerator(metaData).generateLogicDDL(databaseType, sourceDataSource, 
schemaName, sourceTableName, targetTableName, sqlParserEngine);
+        return new 
PipelineDDLGenerator(metaData).generateLogicDDL(databaseType, sourceDataSource, 
schemaName, sourceTableName, targetTableName, sqlParserEngine, 
targetDatabaseName);
     }
     
     private void executeTargetTableSQL(final Connection targetConnection, 
final String sql) throws SQLException {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java
index d9be8f69bdb..f14bb707ff0 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java
@@ -24,6 +24,7 @@ import 
org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import org.apache.shardingsphere.infra.parser.SQLParserEngine;
 
 import java.util.Collection;
+import java.util.Map;
 
 /**
  * Prepare target tables parameter.
@@ -38,5 +39,7 @@ public final class PrepareTargetTablesParameter {
     
     private final SQLParserEngine sqlParserEngine;
     
-    private final ShardingSphereMetaData metaData;
+    private final Map<String, ShardingSphereMetaData> targetSchemaMetaData;
+    
+    private final String targetDatabaseName;
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java
index 06d26a8cf5e..73013470e88 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java
@@ -17,19 +17,24 @@
 
 package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
 
+import lombok.SneakyThrows;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
+import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import org.apache.shardingsphere.infra.parser.SQLParserEngine;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.junit.jupiter.api.Test;
 
+import java.sql.SQLException;
 import java.util.Collections;
+import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -45,11 +50,17 @@ class PipelineJobDataSourcePreparerTest {
     }
     
     @Test
+    @SneakyThrows(SQLException.class)
     void assertPrepareTargetTables() {
         CreateTableConfiguration createTableConfig = 
mock(CreateTableConfiguration.class, RETURNS_DEEP_STUBS);
         
when(createTableConfig.getSourceDataSourceConfig().getDatabaseType()).thenReturn(databaseType);
+        PipelineDataSourceManager pipelineDataSourceManager = 
mock(PipelineDataSourceManager.class, RETURNS_DEEP_STUBS);
+        ShardingSphereConnection connection = 
mock(ShardingSphereConnection.class, RETURNS_DEEP_STUBS);
+        
when(pipelineDataSourceManager.getDataSource(any()).getConnection()).thenReturn(connection);
+        
when(connection.getContextManager().getMetaDataContexts().getMetaData()).thenReturn(mock(ShardingSphereMetaData.class));
         PrepareTargetTablesParameter parameter = new 
PrepareTargetTablesParameter(
-                Collections.singleton(createTableConfig), 
mock(PipelineDataSourceManager.class, RETURNS_DEEP_STUBS), 
mock(SQLParserEngine.class), mock(ShardingSphereMetaData.class));
+                Collections.singleton(createTableConfig), 
pipelineDataSourceManager,
+                mock(SQLParserEngine.class), mock(Map.class), "foo_db");
         assertDoesNotThrow(() -> new 
PipelineJobDataSourcePreparer(databaseType).prepareTargetTables(parameter));
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 110ff5ac616..ecb2df8815d 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -24,8 +24,8 @@ import 
org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.checker.PipelineDataSourceCheckEngine;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
 import 
org.apache.shardingsphere.data.pipeline.core.execute.PipelineExecuteEngine;
@@ -77,6 +77,7 @@ import org.apache.shardingsphere.parser.rule.SQLParserRule;
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Map;
 
 /**
  * Migration job preparer.
@@ -157,11 +158,11 @@ public final class MigrationJobPreparer implements 
PipelineJobPreparer<Migration
         Collection<CreateTableConfiguration> createTableConfigs = 
jobItemContext.getTaskConfig().getCreateTableConfigurations();
         PipelineDataSourceManager dataSourceManager = 
jobItemContext.getDataSourceManager();
         PipelineJobDataSourcePreparer preparer = new 
PipelineJobDataSourcePreparer(targetDatabaseType);
-        preparer.prepareTargetSchemas(new 
PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs, 
dataSourceManager));
+        Map<String, ShardingSphereMetaData> targetSchemaMetaData = 
preparer.prepareTargetSchemas(new 
PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs, 
dataSourceManager));
         ShardingSphereMetaData metaData = 
contextManager.getMetaDataContexts().getMetaData();
         SQLParserEngine sqlParserEngine = 
metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class)
                 
.getSQLParserEngine(metaData.getDatabase(jobConfig.getTargetDatabaseName()).getProtocolType());
-        preparer.prepareTargetTables(new 
PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, 
sqlParserEngine, metaData));
+        preparer.prepareTargetTables(new 
PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, 
sqlParserEngine, targetSchemaMetaData, jobConfig.getTargetDatabaseName()));
     }
     
     private void prepareIncremental(final MigrationJobItemContext 
jobItemContext) {

Reply via email to