This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new fa763c5a33b Isolate ShardingSphereDataSource Standalone repository in
pipeline (#26881)
fa763c5a33b is described below
commit fa763c5a33b0f492ee790794d0faa303f0de4c1c
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon Jul 10 17:32:04 2023 +0800
Isolate ShardingSphereDataSource Standalone repository in pipeline (#26881)
* Use Standalone mode JDBC type in
ShardingSpherePipelineDataSourceCreator.createPipelineDataSource
* Enable PostgreSQL for CDCE2EIT
---
.../ShardingSpherePipelineDataSourceCreator.java | 19 ++++++++++++++++++-
.../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 4 +++-
.../data/pipeline/core/util/PipelineContextUtils.java | 4 ++--
3 files changed, 23 insertions(+), 4 deletions(-)
diff --git
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
index c1e894e6c18..627d009e55d 100644
---
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
+++
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
@@ -24,6 +24,9 @@ import
org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.mode.YamlModeConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.mode.YamlPersistRepositoryConfiguration;
+import
org.apache.shardingsphere.mode.repository.standalone.jdbc.props.JDBCRepositoryPropertyKey;
import
org.apache.shardingsphere.sharding.yaml.config.YamlShardingRuleConfiguration;
import
org.apache.shardingsphere.sharding.yaml.swapper.ShardingRuleConfigurationConverter;
import org.apache.shardingsphere.single.api.constant.SingleTableConstants;
@@ -33,6 +36,7 @@ import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Optional;
+import java.util.Properties;
/**
* ShardingSphere pipeline data source creator.
@@ -43,6 +47,7 @@ public final class ShardingSpherePipelineDataSourceCreator
implements PipelineDa
public DataSource createPipelineDataSource(final Object dataSourceConfig)
throws SQLException {
YamlRootConfiguration rootConfig =
YamlEngine.unmarshal(YamlEngine.marshal(dataSourceConfig),
YamlRootConfiguration.class);
enableStreamingQuery(rootConfig);
+ updateSingleRuleConfiguration(rootConfig);
Optional<YamlShardingRuleConfiguration> yamlShardingRuleConfig =
ShardingRuleConfigurationConverter.findYamlShardingRuleConfiguration(rootConfig.getRules());
if (yamlShardingRuleConfig.isPresent()) {
enableRangeQueryForInline(yamlShardingRuleConfig.get());
@@ -50,7 +55,7 @@ public final class ShardingSpherePipelineDataSourceCreator
implements PipelineDa
}
rootConfig.setDatabaseName(rootConfig.getDatabaseName());
rootConfig.setSchemaName(rootConfig.getSchemaName());
- updateSingleRuleConfiguration(rootConfig);
+
rootConfig.setMode(createStandaloneModeConfiguration(rootConfig.getDatabaseName()));
return
YamlShardingSphereDataSourceFactory.createDataSourceWithoutCache(rootConfig);
}
@@ -86,6 +91,18 @@ public final class ShardingSpherePipelineDataSourceCreator
implements PipelineDa
}
}
+ private YamlModeConfiguration createStandaloneModeConfiguration(final
String databaseName) {
+ YamlModeConfiguration result = new YamlModeConfiguration();
+ result.setType("Standalone");
+ YamlPersistRepositoryConfiguration repository = new
YamlPersistRepositoryConfiguration();
+ result.setRepository(repository);
+ repository.setType("JDBC");
+ Properties props = new Properties();
+ repository.setProps(props);
+ props.setProperty(JDBCRepositoryPropertyKey.JDBC_URL.getKey(),
String.format("jdbc:h2:mem:config_%s;DB_CLOSE_DELAY=0;DATABASE_TO_UPPER=false;MODE=MYSQL",
databaseName));
+ return result;
+ }
+
@Override
public String getType() {
return ShardingSpherePipelineDataSourceConfiguration.TYPE;
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index f36d0029600..8b968bc083e 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -38,6 +38,7 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.algorithm.D
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.DataConsistencyCheckResult;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
import
org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
import
org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
import
org.apache.shardingsphere.test.e2e.data.pipeline.cases.task.E2EIncrementalTask;
@@ -78,6 +79,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
*/
@PipelineE2ESettings(database = {
@PipelineE2EDatabaseSettings(type = "MySQL", scenarioFiles =
"env/scenario/general/mysql.xml"),
+ @PipelineE2EDatabaseSettings(type = "PostgreSQL", scenarioFiles =
"env/scenario/general/postgresql.xml"),
@PipelineE2EDatabaseSettings(type = "openGauss", scenarioFiles =
"env/scenario/general/opengauss.xml")})
@Slf4j
class CDCE2EIT {
@@ -213,6 +215,6 @@ class CDCE2EIT {
}
private static boolean isEnabled() {
- return PipelineE2ECondition.isEnabled(new MySQLDatabaseType(), new
OpenGaussDatabaseType());
+ return PipelineE2ECondition.isEnabled(new MySQLDatabaseType(), new
PostgreSQLDatabaseType(), new OpenGaussDatabaseType());
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
index 1ed17586e66..8c496e3777a 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
@@ -29,7 +29,6 @@ import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.common.ingest.channel.PipelineChannelCreator;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
@@ -37,6 +36,7 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
+import
org.apache.shardingsphere.driver.api.yaml.YamlShardingSphereDataSourceFactory;
import
org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereColumn;
@@ -86,7 +86,7 @@ public final class PipelineContextUtils {
ConfigurationFileUtils.readFileAndIgnoreComments("config_sharding_sphere_jdbc_source.yaml"));
YamlRootConfiguration rootConfig = (YamlRootConfiguration)
pipelineDataSourceConfig.getDataSourceConfiguration();
ModeConfiguration modeConfig = new
YamlModeConfigurationSwapper().swapToObject(rootConfig.getMode());
- ShardingSphereDataSource dataSource = (ShardingSphereDataSource)
PipelineDataSourceFactory.newInstance(pipelineDataSourceConfig).getDataSource();
+ ShardingSphereDataSource dataSource = (ShardingSphereDataSource)
YamlShardingSphereDataSourceFactory.createDataSourceWithoutCache(rootConfig);
ContextManager contextManager = getContextManager(dataSource);
ClusterPersistRepository persistRepository =
getClusterPersistRepository((ClusterPersistRepositoryConfiguration)
modeConfig.getRepository());
MetaDataBasedPersistService persistService =
"Cluster".equals(modeConfig.getType()) ? new
NewMetaDataPersistService(persistRepository) : new
MetaDataPersistService(persistRepository);