This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new af96795f05c Fix pipeline e2e for MySQL (#34108)
af96795f05c is described below
commit af96795f05c8d7c646bc5df666d4fa7ba446dd04
Author: Haoran Meng <[email protected]>
AuthorDate: Fri Dec 20 11:20:53 2024 +0800
Fix pipeline e2e for MySQL (#34108)
---
.../metadata/generator/PipelineDDLGenerator.java | 19 +++++++--------
.../datasource/PipelineJobDataSourcePreparer.java | 27 ++++++++++++++++------
.../param/PrepareTargetTablesParameter.java | 5 +++-
.../PipelineJobDataSourcePreparerTest.java | 13 ++++++++++-
.../migration/preparer/MigrationJobPreparer.java | 7 +++---
5 files changed, 48 insertions(+), 23 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
index a0510de5440..95b8b3b85ef 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
@@ -43,7 +43,6 @@ import
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table
import
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.TableNameSegment;
import javax.sql.DataSource;
-import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
@@ -74,15 +73,17 @@ public final class PipelineDDLGenerator {
* @param sourceTableName source table name
* @param targetTableName target table name
* @param parserEngine parser engine
+ * @param targetDatabaseName target database name
* @return DDL SQL
* @throws SQLException SQL exception
*/
public List<String> generateLogicDDL(final DatabaseType databaseType,
final DataSource sourceDataSource,
- final String schemaName, final String
sourceTableName, final String targetTableName, final SQLParserEngine
parserEngine) throws SQLException {
+ final String schemaName, final String
sourceTableName, final String targetTableName,
+ final SQLParserEngine parserEngine,
final String targetDatabaseName) throws SQLException {
long startTimeMillis = System.currentTimeMillis();
List<String> result = new ArrayList<>();
for (String each :
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class,
databaseType).buildCreateTableSQLs(sourceDataSource, schemaName,
sourceTableName)) {
- Optional<String> queryContext = decorate(databaseType,
sourceDataSource, schemaName, targetTableName, parserEngine, each);
+ Optional<String> queryContext = decorate(databaseType,
targetDatabaseName, schemaName, targetTableName, parserEngine, each);
queryContext.ifPresent(sql -> {
String trimmedSql = sql.trim();
if (!Strings.isNullOrEmpty(trimmedSql)) {
@@ -95,19 +96,15 @@ public final class PipelineDDLGenerator {
return result;
}
- private Optional<String> decorate(final DatabaseType databaseType, final
DataSource dataSource, final String schemaName, final String targetTableName,
- final SQLParserEngine parserEngine,
final String sql) throws SQLException {
+ private Optional<String> decorate(final DatabaseType databaseType, final
String targetDatabaseName, final String schemaName, final String
targetTableName,
+ final SQLParserEngine parserEngine,
final String sql) {
if (Strings.isNullOrEmpty(sql)) {
return Optional.empty();
}
- String databaseName;
- try (Connection connection = dataSource.getConnection()) {
- databaseName = connection.getCatalog();
- }
- String result = decorateActualSQL(databaseName, targetTableName,
parserEngine, sql.trim());
+ String result = decorateActualSQL(targetDatabaseName, targetTableName,
parserEngine, sql.trim());
// TODO remove it after set search_path is supported.
if ("openGauss".equals(databaseType.getType())) {
- return decorateOpenGauss(databaseName, schemaName, result,
parserEngine);
+ return decorateOpenGauss(targetDatabaseName, schemaName, result,
parserEngine);
}
return Optional.of(result);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
index 2901c616442..c256c37ae2a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
@@ -27,6 +27,7 @@ import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.Cr
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
+import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -40,8 +41,10 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
@@ -62,14 +65,16 @@ public final class PipelineJobDataSourcePreparer {
* Prepare target schemas.
*
* @param param prepare target schemas parameter
+ * @return target schemas
* @throws SQLException if prepare target schema fail
*/
- public void prepareTargetSchemas(final PrepareTargetSchemasParameter
param) throws SQLException {
+ public Map<String, ShardingSphereMetaData> prepareTargetSchemas(final
PrepareTargetSchemasParameter param) throws SQLException {
DatabaseType targetDatabaseType = param.getTargetDatabaseType();
DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(targetDatabaseType).getDialectDatabaseMetaData();
if (!dialectDatabaseMetaData.isSchemaAvailable()) {
- return;
+ return Collections.emptyMap();
}
+ Map<String, ShardingSphereMetaData> result = new
HashMap<>(param.getCreateTableConfigurations().size(), 1F);
String defaultSchema =
dialectDatabaseMetaData.getDefaultSchema().orElse(null);
PipelinePrepareSQLBuilder pipelineSQLBuilder = new
PipelinePrepareSQLBuilder(targetDatabaseType);
Collection<String> createdSchemaNames = new
HashSet<>(param.getCreateTableConfigurations().size(), 1F);
@@ -80,18 +85,21 @@ public final class PipelineJobDataSourcePreparer {
}
Optional<String> sql =
pipelineSQLBuilder.buildCreateSchemaSQL(targetSchemaName);
if (sql.isPresent()) {
- executeCreateSchema(param.getDataSourceManager(),
each.getTargetDataSourceConfig(), sql.get());
+ executeCreateSchema(param.getDataSourceManager(),
each.getTargetDataSourceConfig(), sql.get()).ifPresent(metaData ->
result.put(targetSchemaName, metaData));
createdSchemaNames.add(targetSchemaName);
}
}
+ return result;
}
- private void executeCreateSchema(final PipelineDataSourceManager
dataSourceManager, final PipelineDataSourceConfiguration
targetDataSourceConfig, final String sql) throws SQLException {
+ private Optional<ShardingSphereMetaData> executeCreateSchema(final
PipelineDataSourceManager dataSourceManager,
+ final
PipelineDataSourceConfiguration targetDataSourceConfig, final String sql)
throws SQLException {
log.info("Prepare target schemas SQL: {}", sql);
try (
Connection connection =
dataSourceManager.getDataSource(targetDataSourceConfig).getConnection();
Statement statement = connection.createStatement()) {
statement.execute(sql);
+ return Optional.of(((ShardingSphereConnection)
connection).getContextManager().getMetaDataContexts().getMetaData());
} catch (final SQLException ex) {
if
(DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class,
databaseType)
.map(DialectPipelineJobDataSourcePrepareOption::isSupportIfNotExistsOnCreateSchema).orElse(true))
{
@@ -99,6 +107,7 @@ public final class PipelineJobDataSourcePreparer {
}
log.warn("Create schema failed", ex);
}
+ return Optional.empty();
}
/**
@@ -111,8 +120,12 @@ public final class PipelineJobDataSourcePreparer {
final long startTimeMillis = System.currentTimeMillis();
PipelineDataSourceManager dataSourceManager =
param.getDataSourceManager();
for (CreateTableConfiguration each :
param.getCreateTableConfigurations()) {
- List<String> createTargetTableSQL = getCreateTargetTableSQL(each,
dataSourceManager, param.getSqlParserEngine(), param.getMetaData());
try (Connection targetConnection =
dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection())
{
+ ShardingSphereMetaData metaData =
param.getTargetSchemaMetaData().get(each.getTargetName().getSchemaName());
+ if (null == metaData) {
+ metaData = ((ShardingSphereConnection)
targetConnection).getContextManager().getMetaDataContexts().getMetaData();
+ }
+ List<String> createTargetTableSQL =
getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine(),
metaData, param.getTargetDatabaseName());
for (String sql : createTargetTableSQL) {
executeTargetTableSQL(targetConnection,
addIfNotExistsForCreateTableSQL(sql));
}
@@ -122,13 +135,13 @@ public final class PipelineJobDataSourcePreparer {
}
private List<String> getCreateTargetTableSQL(final
CreateTableConfiguration createTableConfig, final PipelineDataSourceManager
dataSourceManager,
- final SQLParserEngine
sqlParserEngine, final ShardingSphereMetaData metaData) throws SQLException {
+ final SQLParserEngine
sqlParserEngine, final ShardingSphereMetaData metaData, final String
targetDatabaseName) throws SQLException {
DatabaseType databaseType =
createTableConfig.getSourceDataSourceConfig().getDatabaseType();
DataSource sourceDataSource =
dataSourceManager.getDataSource(createTableConfig.getSourceDataSourceConfig());
String schemaName = createTableConfig.getSourceName().getSchemaName();
String sourceTableName =
createTableConfig.getSourceName().getTableName();
String targetTableName =
createTableConfig.getTargetName().getTableName();
- return new
PipelineDDLGenerator(metaData).generateLogicDDL(databaseType, sourceDataSource,
schemaName, sourceTableName, targetTableName, sqlParserEngine);
+ return new
PipelineDDLGenerator(metaData).generateLogicDDL(databaseType, sourceDataSource,
schemaName, sourceTableName, targetTableName, sqlParserEngine,
targetDatabaseName);
}
private void executeTargetTableSQL(final Connection targetConnection,
final String sql) throws SQLException {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java
index d9be8f69bdb..f14bb707ff0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java
@@ -24,6 +24,7 @@ import
org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
import java.util.Collection;
+import java.util.Map;
/**
* Prepare target tables parameter.
@@ -38,5 +39,7 @@ public final class PrepareTargetTablesParameter {
private final SQLParserEngine sqlParserEngine;
- private final ShardingSphereMetaData metaData;
+ private final Map<String, ShardingSphereMetaData> targetSchemaMetaData;
+
+ private final String targetDatabaseName;
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java
index 06d26a8cf5e..73013470e88 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java
@@ -17,19 +17,24 @@
package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
+import lombok.SneakyThrows;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
+import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.junit.jupiter.api.Test;
+import java.sql.SQLException;
import java.util.Collections;
+import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -45,11 +50,17 @@ class PipelineJobDataSourcePreparerTest {
}
@Test
+ @SneakyThrows(SQLException.class)
void assertPrepareTargetTables() {
CreateTableConfiguration createTableConfig =
mock(CreateTableConfiguration.class, RETURNS_DEEP_STUBS);
when(createTableConfig.getSourceDataSourceConfig().getDatabaseType()).thenReturn(databaseType);
+ PipelineDataSourceManager pipelineDataSourceManager =
mock(PipelineDataSourceManager.class, RETURNS_DEEP_STUBS);
+ ShardingSphereConnection connection =
mock(ShardingSphereConnection.class, RETURNS_DEEP_STUBS);
+
when(pipelineDataSourceManager.getDataSource(any()).getConnection()).thenReturn(connection);
+
when(connection.getContextManager().getMetaDataContexts().getMetaData()).thenReturn(mock(ShardingSphereMetaData.class));
PrepareTargetTablesParameter parameter = new
PrepareTargetTablesParameter(
- Collections.singleton(createTableConfig),
mock(PipelineDataSourceManager.class, RETURNS_DEEP_STUBS),
mock(SQLParserEngine.class), mock(ShardingSphereMetaData.class));
+ Collections.singleton(createTableConfig),
pipelineDataSourceManager,
+ mock(SQLParserEngine.class), mock(Map.class), "foo_db");
assertDoesNotThrow(() -> new
PipelineJobDataSourcePreparer(databaseType).prepareTargetTables(parameter));
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 110ff5ac616..ecb2df8815d 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -24,8 +24,8 @@ import
org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.checker.PipelineDataSourceCheckEngine;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
import
org.apache.shardingsphere.data.pipeline.core.execute.PipelineExecuteEngine;
@@ -77,6 +77,7 @@ import org.apache.shardingsphere.parser.rule.SQLParserRule;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
+import java.util.Map;
/**
* Migration job preparer.
@@ -157,11 +158,11 @@ public final class MigrationJobPreparer implements
PipelineJobPreparer<Migration
Collection<CreateTableConfiguration> createTableConfigs =
jobItemContext.getTaskConfig().getCreateTableConfigurations();
PipelineDataSourceManager dataSourceManager =
jobItemContext.getDataSourceManager();
PipelineJobDataSourcePreparer preparer = new
PipelineJobDataSourcePreparer(targetDatabaseType);
- preparer.prepareTargetSchemas(new
PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs,
dataSourceManager));
+ Map<String, ShardingSphereMetaData> targetSchemaMetaData =
preparer.prepareTargetSchemas(new
PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs,
dataSourceManager));
ShardingSphereMetaData metaData =
contextManager.getMetaDataContexts().getMetaData();
SQLParserEngine sqlParserEngine =
metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class)
.getSQLParserEngine(metaData.getDatabase(jobConfig.getTargetDatabaseName()).getProtocolType());
- preparer.prepareTargetTables(new
PrepareTargetTablesParameter(createTableConfigs, dataSourceManager,
sqlParserEngine, metaData));
+ preparer.prepareTargetTables(new
PrepareTargetTablesParameter(createTableConfigs, dataSourceManager,
sqlParserEngine, targetSchemaMetaData, jobConfig.getTargetDatabaseName()));
}
private void prepareIncremental(final MigrationJobItemContext
jobItemContext) {