This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new de114d56444 Refactor PipelineDataSourceFactory (#29503)
de114d56444 is described below

commit de114d56444dd1c2b1e122d7e79a4ea723477fe1
Author: Liang Zhang <zhangli...@apache.org>
AuthorDate: Fri Dec 22 15:59:28 2023 +0800

    Refactor PipelineDataSourceFactory (#29503)
---
 .../core/datasource/PipelineDataSourceFactory.java | 48 ----------------------
 .../core/datasource/PipelineDataSourceManager.java |  2 +-
 .../core/datasource/PipelineDataSourceWrapper.java | 10 +++++
 .../core/metadata/loader/PipelineSchemaUtils.java  |  3 +-
 .../datasource/PipelineDataSourceFactoryTest.java  | 41 ------------------
 .../scenario/migration/api/MigrationJobAPI.java    | 15 ++++---
 .../pipeline/cases/PipelineContainerComposer.java  |  4 +-
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 11 +++--
 .../core/util/JobConfigurationBuilder.java         |  3 +-
 9 files changed, 27 insertions(+), 110 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactory.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactory.java
deleted file mode 100644
index c08946e05c7..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactory.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.datasource;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import lombok.SneakyThrows;
-import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.spi.PipelineDataSourceCreator;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-
-import javax.sql.DataSource;
-import java.sql.SQLException;
-
-/**
- * Pipeline data source factory.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class PipelineDataSourceFactory {
-    
-    /**
-     * New instance data source wrapper.
-     *
-     * @param pipelineDataSourceConfig pipeline data source configuration
-     * @return new data source wrapper
-     */
-    @SneakyThrows(SQLException.class)
-    public static PipelineDataSourceWrapper newInstance(final 
PipelineDataSourceConfiguration pipelineDataSourceConfig) {
-        DataSource dataSource = TypedSPILoader.getService(
-                PipelineDataSourceCreator.class, 
pipelineDataSourceConfig.getType()).create(pipelineDataSourceConfig.getDataSourceConfiguration());
-        return new PipelineDataSourceWrapper(dataSource, 
pipelineDataSourceConfig.getDatabaseType());
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
index 6ef42f14e6b..ad731b33cbc 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
@@ -51,7 +51,7 @@ public final class PipelineDataSourceManager implements 
AutoCloseable {
                 }
                 log.warn("{} is already closed, create again.", result);
             }
-            result = PipelineDataSourceFactory.newInstance(dataSourceConfig);
+            result = new PipelineDataSourceWrapper(dataSourceConfig);
             cachedDataSources.put(dataSourceConfig, result);
             return result;
         }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceWrapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceWrapper.java
index 253e9a9a16a..162ba91a4ef 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceWrapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceWrapper.java
@@ -19,9 +19,13 @@ package 
org.apache.shardingsphere.data.pipeline.core.datasource;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.spi.PipelineDataSourceCreator;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.datasource.pool.destroyer.DataSourcePoolDestroyer;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 
 import javax.sql.DataSource;
 import java.io.PrintWriter;
@@ -45,6 +49,12 @@ public final class PipelineDataSourceWrapper implements 
DataSource, AutoCloseabl
     
     private final AtomicBoolean closed = new AtomicBoolean(false);
     
+    @SneakyThrows(SQLException.class)
+    public PipelineDataSourceWrapper(final PipelineDataSourceConfiguration 
pipelineDataSourceConfig) {
+        this(TypedSPILoader.getService(PipelineDataSourceCreator.class, 
pipelineDataSourceConfig.getType()).create(pipelineDataSourceConfig.getDataSourceConfiguration()),
+                pipelineDataSourceConfig.getDatabaseType());
+    }
+    
     /**
      * Whether underlying data source is closed or not.
      *
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java
index 25dbf133134..f1e78d620f8 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java
@@ -22,7 +22,6 @@ import lombok.NoArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
 
 import java.sql.Connection;
@@ -43,7 +42,7 @@ public final class PipelineSchemaUtils {
      */
     @SneakyThrows(SQLException.class)
     public static String getDefaultSchema(final 
PipelineDataSourceConfiguration dataSourceConfig) {
-        try (PipelineDataSourceWrapper dataSource = 
PipelineDataSourceFactory.newInstance(dataSourceConfig)) {
+        try (PipelineDataSourceWrapper dataSource = new 
PipelineDataSourceWrapper(dataSourceConfig)) {
             try (Connection connection = dataSource.getConnection()) {
                 String result = connection.getSchema();
                 log.info("get default schema {}", result);
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactoryTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactoryTest.java
deleted file mode 100644
index 829aa34d922..00000000000
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactoryTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.datasource;
-
-import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import org.junit.jupiter.api.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-class PipelineDataSourceFactoryTest {
-    
-    @Test
-    void assertNewInstance() {
-        Map<String, Object> yamlDataSourceConfig = new HashMap<>(3, 1F);
-        yamlDataSourceConfig.put("url", 
"jdbc:mysql://localhost:3306/database");
-        yamlDataSourceConfig.put("username", "username");
-        yamlDataSourceConfig.put("password", "password");
-        PipelineDataSourceConfiguration pipelineDataSourceConfig = new 
StandardPipelineDataSourceConfiguration(yamlDataSourceConfig);
-        
assertThat(PipelineDataSourceFactory.newInstance(pipelineDataSourceConfig).getDatabaseType(),
 is(pipelineDataSourceConfig.getDatabaseType()));
-    }
-}
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index df1dc31201e..3e16d0e239f 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -27,23 +27,24 @@ import 
org.apache.shardingsphere.data.pipeline.core.datanode.DataNodeUtils;
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.yaml.config.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSchemaUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.connection.RegisterMigrationSourceStorageUnitException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.connection.UnregisterMigrationSourceStorageUnitException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
-import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSchemaUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.migration.distsql.statement.MigrateTableStatement;
+import 
org.apache.shardingsphere.data.pipeline.migration.distsql.statement.pojo.SourceTargetEntry;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
@@ -67,8 +68,6 @@ import org.apache.shardingsphere.infra.util.json.JsonUtils;
 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
 import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
-import 
org.apache.shardingsphere.data.pipeline.migration.distsql.statement.MigrateTableStatement;
-import 
org.apache.shardingsphere.data.pipeline.migration.distsql.statement.pojo.SourceTargetEntry;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 
 import java.sql.Connection;
@@ -322,7 +321,7 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
         PipelinePrepareSQLBuilder pipelineSQLBuilder = new 
PipelinePrepareSQLBuilder(jobConfig.getTargetDatabaseType());
         TableAndSchemaNameMapper mapping = new 
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
         try (
-                PipelineDataSourceWrapper dataSource = 
PipelineDataSourceFactory.newInstance(jobConfig.getTarget());
+                PipelineDataSourceWrapper dataSource = new 
PipelineDataSourceWrapper(jobConfig.getTarget());
                 Connection connection = dataSource.getConnection()) {
             for (String each : jobConfig.getTargetTableNames()) {
                 String targetSchemaName = mapping.getSchemaName(each);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index f0cf4df32cd..d3ba2ec2a00 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -24,7 +24,7 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.infra.database.core.connector.url.JdbcUrlAppender;
@@ -580,7 +580,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
         YamlSingleRuleConfiguration singleRuleConfig = new 
YamlSingleRuleConfiguration();
         singleRuleConfig.setTables(Collections.singletonList("*.*"));
         rootConfig.getRules().add(singleRuleConfig);
-        return PipelineDataSourceFactory.newInstance(new 
ShardingSpherePipelineDataSourceConfiguration(rootConfig));
+        return new PipelineDataSourceWrapper(new 
ShardingSpherePipelineDataSourceConfiguration(rootConfig));
     }
     
     private YamlRootConfiguration getYamlRootConfig() {
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index a3e2df5d89e..8720812aad8 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -27,16 +27,15 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.client.handler.RetryStreaming
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
@@ -175,7 +174,7 @@ class CDCE2EIT {
     }
     
     private DataSource createStandardDataSource(final 
PipelineContainerComposer containerComposer, final String storageUnitName) {
-        return PipelineDataSourceFactory.newInstance(new 
StandardPipelineDataSourceConfiguration(containerComposer.getActualJdbcUrlTemplate(storageUnitName,
 false),
+        return new PipelineDataSourceWrapper(new 
StandardPipelineDataSourceConfiguration(containerComposer.getActualJdbcUrlTemplate(storageUnitName,
 false),
                 containerComposer.getUsername(), 
containerComposer.getPassword()));
     }
     
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
index 04af7e5121e..6e88c183f8b 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -24,7 +24,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfigurati
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.yaml.config.YamlPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
@@ -83,7 +82,7 @@ public final class JobConfigurationBuilder {
         PipelineDataSourceConfiguration sourceDataSourceConfig = new 
StandardPipelineDataSourceConfiguration(
                 
ConfigurationFileUtils.readFile("migration_standard_jdbc_source.yaml").replace("${databaseNameSuffix}",
 databaseNameSuffix));
         try (
-                PipelineDataSourceWrapper dataSource = 
PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
+                PipelineDataSourceWrapper dataSource = new 
PipelineDataSourceWrapper(sourceDataSourceConfig);
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement()) {
             
statement.execute(PipelineContextUtils.getCreateOrderTableSchema());

Reply via email to