This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new fa763c5a33b Isolate ShardingSphereDataSource Standalone repository in 
pipeline (#26881)
fa763c5a33b is described below

commit fa763c5a33b0f492ee790794d0faa303f0de4c1c
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon Jul 10 17:32:04 2023 +0800

    Isolate ShardingSphereDataSource Standalone repository in pipeline (#26881)
    
    * Use Standalone mode JDBC type in 
ShardingSpherePipelineDataSourceCreator.createPipelineDataSource
    
    * Enable PostgreSQL for CDCE2EIT
---
 .../ShardingSpherePipelineDataSourceCreator.java      | 19 ++++++++++++++++++-
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java    |  4 +++-
 .../data/pipeline/core/util/PipelineContextUtils.java |  4 ++--
 3 files changed, 23 insertions(+), 4 deletions(-)

diff --git 
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
 
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
index c1e894e6c18..627d009e55d 100644
--- 
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
+++ 
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
@@ -24,6 +24,9 @@ import 
org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.mode.YamlModeConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.mode.YamlPersistRepositoryConfiguration;
+import 
org.apache.shardingsphere.mode.repository.standalone.jdbc.props.JDBCRepositoryPropertyKey;
 import 
org.apache.shardingsphere.sharding.yaml.config.YamlShardingRuleConfiguration;
 import 
org.apache.shardingsphere.sharding.yaml.swapper.ShardingRuleConfigurationConverter;
 import org.apache.shardingsphere.single.api.constant.SingleTableConstants;
@@ -33,6 +36,7 @@ import javax.sql.DataSource;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.Optional;
+import java.util.Properties;
 
 /**
  * ShardingSphere pipeline data source creator.
@@ -43,6 +47,7 @@ public final class ShardingSpherePipelineDataSourceCreator 
implements PipelineDa
     public DataSource createPipelineDataSource(final Object dataSourceConfig) 
throws SQLException {
         YamlRootConfiguration rootConfig = 
YamlEngine.unmarshal(YamlEngine.marshal(dataSourceConfig), 
YamlRootConfiguration.class);
         enableStreamingQuery(rootConfig);
+        updateSingleRuleConfiguration(rootConfig);
         Optional<YamlShardingRuleConfiguration> yamlShardingRuleConfig = 
ShardingRuleConfigurationConverter.findYamlShardingRuleConfiguration(rootConfig.getRules());
         if (yamlShardingRuleConfig.isPresent()) {
             enableRangeQueryForInline(yamlShardingRuleConfig.get());
@@ -50,7 +55,7 @@ public final class ShardingSpherePipelineDataSourceCreator 
implements PipelineDa
         }
         rootConfig.setDatabaseName(rootConfig.getDatabaseName());
         rootConfig.setSchemaName(rootConfig.getSchemaName());
-        updateSingleRuleConfiguration(rootConfig);
+        
rootConfig.setMode(createStandaloneModeConfiguration(rootConfig.getDatabaseName()));
         return 
YamlShardingSphereDataSourceFactory.createDataSourceWithoutCache(rootConfig);
     }
     
@@ -86,6 +91,18 @@ public final class ShardingSpherePipelineDataSourceCreator 
implements PipelineDa
         }
     }
     
+    private YamlModeConfiguration createStandaloneModeConfiguration(final 
String databaseName) {
+        YamlModeConfiguration result = new YamlModeConfiguration();
+        result.setType("Standalone");
+        YamlPersistRepositoryConfiguration repository = new 
YamlPersistRepositoryConfiguration();
+        result.setRepository(repository);
+        repository.setType("JDBC");
+        Properties props = new Properties();
+        repository.setProps(props);
+        props.setProperty(JDBCRepositoryPropertyKey.JDBC_URL.getKey(), 
String.format("jdbc:h2:mem:config_%s;DB_CLOSE_DELAY=0;DATABASE_TO_UPPER=false;MODE=MYSQL",
 databaseName));
+        return result;
+    }
+    
     @Override
     public String getType() {
         return ShardingSpherePipelineDataSourceConfiguration.TYPE;
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index f36d0029600..8b968bc083e 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -38,6 +38,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.algorithm.D
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.DataConsistencyCheckResult;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import 
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import 
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
 import 
org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.task.E2EIncrementalTask;
@@ -78,6 +79,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  */
 @PipelineE2ESettings(database = {
         @PipelineE2EDatabaseSettings(type = "MySQL", scenarioFiles = 
"env/scenario/general/mysql.xml"),
+        @PipelineE2EDatabaseSettings(type = "PostgreSQL", scenarioFiles = 
"env/scenario/general/postgresql.xml"),
         @PipelineE2EDatabaseSettings(type = "openGauss", scenarioFiles = 
"env/scenario/general/opengauss.xml")})
 @Slf4j
 class CDCE2EIT {
@@ -213,6 +215,6 @@ class CDCE2EIT {
     }
     
     private static boolean isEnabled() {
-        return PipelineE2ECondition.isEnabled(new MySQLDatabaseType(), new 
OpenGaussDatabaseType());
+        return PipelineE2ECondition.isEnabled(new MySQLDatabaseType(), new 
PostgreSQLDatabaseType(), new OpenGaussDatabaseType());
     }
 }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
index 1ed17586e66..8c496e3777a 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
@@ -29,7 +29,6 @@ import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.channel.PipelineChannelCreator;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
@@ -37,6 +36,7 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
+import 
org.apache.shardingsphere.driver.api.yaml.YamlShardingSphereDataSourceFactory;
 import 
org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereColumn;
@@ -86,7 +86,7 @@ public final class PipelineContextUtils {
                 
ConfigurationFileUtils.readFileAndIgnoreComments("config_sharding_sphere_jdbc_source.yaml"));
         YamlRootConfiguration rootConfig = (YamlRootConfiguration) 
pipelineDataSourceConfig.getDataSourceConfiguration();
         ModeConfiguration modeConfig = new 
YamlModeConfigurationSwapper().swapToObject(rootConfig.getMode());
-        ShardingSphereDataSource dataSource = (ShardingSphereDataSource) 
PipelineDataSourceFactory.newInstance(pipelineDataSourceConfig).getDataSource();
+        ShardingSphereDataSource dataSource = (ShardingSphereDataSource) 
YamlShardingSphereDataSourceFactory.createDataSourceWithoutCache(rootConfig);
         ContextManager contextManager = getContextManager(dataSource);
         ClusterPersistRepository persistRepository = 
getClusterPersistRepository((ClusterPersistRepositoryConfiguration) 
modeConfig.getRepository());
         MetaDataBasedPersistService persistService = 
"Cluster".equals(modeConfig.getType()) ? new 
NewMetaDataPersistService(persistRepository) : new 
MetaDataPersistService(persistRepository);

Reply via email to