This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 76ba8a14e06 Merge DefaultPipelineDataSourceManager and 
PipelineDataSourceManager (#29348)
76ba8a14e06 is described below

commit 76ba8a14e06d38152c5c802f16ff347505fdc0b8
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 10 14:58:05 2023 +0800

    Merge DefaultPipelineDataSourceManager and PipelineDataSourceManager 
(#29348)
---
 .../DefaultPipelineDataSourceManager.java          | 70 ----------------------
 .../core/datasource/PipelineDataSourceManager.java | 44 +++++++++++++-
 .../core/job/engine/PipelineJobRunnerManager.java  |  3 +-
 .../mysql/ingest/MySQLIncrementalDumperTest.java   | 11 ++--
 .../postgresql/ingest/PostgreSQLWALDumperTest.java |  9 ++-
 .../ingest/wal/WALEventConverterTest.java          | 21 ++++---
 .../data/pipeline/cdc/api/CDCJobAPI.java           | 13 ++--
 .../MigrationDataConsistencyChecker.java           | 21 ++++---
 ...est.java => PipelineDataSourceManagerTest.java} |  9 ++-
 .../data/pipeline/core/task/InventoryTaskTest.java | 13 ++--
 .../pipeline/core/util/PipelineContextUtils.java   | 24 ++++----
 .../MigrationDataConsistencyCheckerTest.java       |  9 ++-
 12 files changed, 107 insertions(+), 140 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManager.java
deleted file mode 100644
index ddc807d7c9f..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManager.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.datasource;
-
-import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-
-import java.sql.SQLException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Default pipeline data source manager.
- */
-@Slf4j
-public final class DefaultPipelineDataSourceManager implements 
PipelineDataSourceManager {
-    
-    private final Map<PipelineDataSourceConfiguration, 
PipelineDataSourceWrapper> cachedDataSources = new ConcurrentHashMap<>();
-    
-    @Override
-    public PipelineDataSourceWrapper getDataSource(final 
PipelineDataSourceConfiguration dataSourceConfig) {
-        PipelineDataSourceWrapper result = 
cachedDataSources.get(dataSourceConfig);
-        if (null != result) {
-            return result;
-        }
-        synchronized (cachedDataSources) {
-            result = cachedDataSources.get(dataSourceConfig);
-            if (null != result) {
-                if (!result.isClosed()) {
-                    return result;
-                } else {
-                    log.warn("{} is already closed, create again", result);
-                }
-            }
-            result = PipelineDataSourceFactory.newInstance(dataSourceConfig);
-            cachedDataSources.put(dataSourceConfig, result);
-            return result;
-        }
-    }
-    
-    @Override
-    public void close() {
-        for (PipelineDataSourceWrapper each : cachedDataSources.values()) {
-            if (each.isClosed()) {
-                continue;
-            }
-            try {
-                each.close();
-            } catch (final SQLException ex) {
-                log.error("An exception occurred while closing the data 
source", ex);
-            }
-        }
-        cachedDataSources.clear();
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
index f7f1359db7b..db31eefe755 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
@@ -17,12 +17,20 @@
 
 package org.apache.shardingsphere.data.pipeline.core.datasource;
 
+import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 /**
  * Pipeline data source manager.
  */
-public interface PipelineDataSourceManager extends AutoCloseable {
+@Slf4j
+public final class PipelineDataSourceManager implements AutoCloseable {
+    
+    private final Map<PipelineDataSourceConfiguration, 
PipelineDataSourceWrapper> cachedDataSources = new ConcurrentHashMap<>();
     
     /**
      * Get cached data source.
@@ -30,8 +38,38 @@ public interface PipelineDataSourceManager extends 
AutoCloseable {
      * @param dataSourceConfig data source configuration
      * @return data source
      */
-    PipelineDataSourceWrapper getDataSource(PipelineDataSourceConfiguration 
dataSourceConfig);
+    public PipelineDataSourceWrapper getDataSource(final 
PipelineDataSourceConfiguration dataSourceConfig) {
+        PipelineDataSourceWrapper result = 
cachedDataSources.get(dataSourceConfig);
+        if (null != result) {
+            return result;
+        }
+        synchronized (cachedDataSources) {
+            result = cachedDataSources.get(dataSourceConfig);
+            if (null != result) {
+                if (!result.isClosed()) {
+                    return result;
+                } else {
+                    log.warn("{} is already closed, create again", result);
+                }
+            }
+            result = PipelineDataSourceFactory.newInstance(dataSourceConfig);
+            cachedDataSources.put(dataSourceConfig, result);
+            return result;
+        }
+    }
     
     @Override
-    void close();
+    public void close() {
+        for (PipelineDataSourceWrapper each : cachedDataSources.values()) {
+            if (each.isClosed()) {
+                continue;
+            }
+            try {
+                each.close();
+            } catch (final SQLException ex) {
+                log.error("An exception occurred while closing the data 
source", ex);
+            }
+        }
+        cachedDataSources.clear();
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
index 6f51ba89525..f39c70ffb46 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
@@ -20,7 +20,6 @@ package 
org.apache.shardingsphere.data.pipeline.core.job.engine;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.engine.cleaner.PipelineJobRunnerCleaner;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
@@ -58,7 +57,7 @@ public final class PipelineJobRunnerManager {
     private final Map<Integer, PipelineTasksRunner> tasksRunners = new 
ConcurrentHashMap<>();
     
     @Getter
-    private final PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
+    private final PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
     
     private final PipelineJobRunnerCleaner cleaner;
     
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 6b77ba07ed5..cdfb39e1b56 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -18,22 +18,21 @@
 package org.apache.shardingsphere.data.pipeline.mysql.ingest;
 
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.EmptyAckCallback;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteRowsEvent;
@@ -105,7 +104,7 @@ class MySQLIncrementalDumperTest {
     
     private void initTableData(final IncrementalDumperContext dumperContext) 
throws SQLException {
         try (
-                PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
+                PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
                 PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement()) {
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index 5d52b2815c6..eff5fbc0d23 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -18,17 +18,16 @@
 package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
 
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.EmptyAckCallback;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.PostgreSQLLogicalReplication;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
@@ -79,7 +78,7 @@ class PostgreSQLWALDumperTest {
     
     private SimpleMemoryPipelineChannel channel;
     
-    private final PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
+    private final PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
     
     @BeforeEach
     void setUp() {
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index 66824896827..b399d2112d2 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -18,14 +18,9 @@
 package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;
 
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
@@ -33,6 +28,10 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
@@ -42,6 +41,7 @@ import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.Place
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.WriteRowEvent;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnsupportedSQLOperationException;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.internal.configuration.plugins.Plugins;
@@ -75,10 +75,12 @@ class WALEventConverterTest {
     
     private PipelineTableMetaData pipelineTableMetaData;
     
+    private PipelineDataSourceManager dataSourceManager;
+    
     @BeforeEach
     void setUp() throws SQLException {
         IncrementalDumperContext dumperContext = mockDumperContext();
-        PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
+        dataSourceManager = new PipelineDataSourceManager();
         walEventConverter = new WALEventConverter(dumperContext, new 
StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig())));
         initTableData(dumperContext);
         pipelineTableMetaData = new PipelineTableMetaData("t_order", 
mockOrderColumnsMetaDataMap(), Collections.emptyList());
@@ -94,7 +96,7 @@ class WALEventConverterTest {
     
     private void initTableData(final IncrementalDumperContext dumperContext) 
throws SQLException {
         try (
-                PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
+                PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
                 PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement()) {
@@ -116,6 +118,11 @@ class WALEventConverterTest {
         return result;
     }
     
+    @AfterEach
+    void clean() {
+        dataSourceManager.close();
+    }
+    
     @Test
     void assertWriteRowEvent() throws ReflectiveOperationException {
         DataRecord actual = getDataRecord(createWriteRowEvent());
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 0398be69743..7567053f893 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -33,14 +33,9 @@ import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManag
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceConfigurationFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.yaml.YamlPipelineDataSourceConfigurationSwapper;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
-import 
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
@@ -49,13 +44,17 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.Dumper
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
-import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
+import 
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
@@ -170,7 +169,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
     private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
         String jobId = jobConfig.getJobId();
         PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
-        try (PipelineDataSourceManager pipelineDataSourceManager = new 
DefaultPipelineDataSourceManager()) {
+        try (PipelineDataSourceManager pipelineDataSourceManager = new 
PipelineDataSourceManager()) {
             for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
                 if (jobItemManager.getProgress(jobId, i).isPresent()) {
                     continue;
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index 82d06118847..5914737803b 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -18,15 +18,23 @@
 package 
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency;
 
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.datanode.DataNodeUtils;
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
@@ -34,15 +42,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipe
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryChecker;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import org.apache.shardingsphere.infra.datanode.DataNode;
@@ -89,7 +88,7 @@ public final class MigrationDataConsistencyChecker implements 
PipelineDataConsis
         progressContext.onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(0));
         Map<CaseInsensitiveQualifiedTable, TableDataConsistencyCheckResult> 
result = new LinkedHashMap<>();
         try (
-                PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
+                PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
                 TableDataConsistencyChecker tableChecker = 
TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps)) {
             for (JobDataNodeLine each : jobConfig.getJobShardingDataNodes()) {
                 checkTableInventoryData(each, tableChecker, result, 
dataSourceManager);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/DefaultPipelineDataSourceManagerTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/PipelineDataSourceManagerTest.java
similarity index 88%
rename from 
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/DefaultPipelineDataSourceManagerTest.java
rename to 
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/PipelineDataSourceManagerTest.java
index 7b256a4fcd9..ac4fa1cc98a 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/DefaultPipelineDataSourceManagerTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/PipelineDataSourceManagerTest.java
@@ -19,7 +19,6 @@ package 
org.apache.shardingsphere.test.it.data.pipeline.common;
 
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceConfigurationFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
@@ -38,7 +37,7 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-class DefaultPipelineDataSourceManagerTest {
+class PipelineDataSourceManagerTest {
     
     private MigrationJobConfiguration jobConfig;
     
@@ -54,7 +53,7 @@ class DefaultPipelineDataSourceManagerTest {
     
     @Test
     void assertGetDataSource() {
-        try (PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager()) {
+        try (PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager()) {
             PipelineDataSourceConfiguration source = 
jobConfig.getSources().values().iterator().next();
             DataSource actual = 
dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(source.getType(),
 source.getParameter()));
             assertThat(actual, instanceOf(PipelineDataSourceWrapper.class));
@@ -64,10 +63,10 @@ class DefaultPipelineDataSourceManagerTest {
     @Test
     void assertClose() throws ReflectiveOperationException {
         PipelineDataSourceConfiguration source = 
jobConfig.getSources().values().iterator().next();
-        try (PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager()) {
+        try (PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager()) {
             
dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(source.getType(),
 source.getParameter()));
             
dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(),
 jobConfig.getTarget().getParameter()));
-            Map<?, ?> cachedDataSources = (Map<?, ?>) 
Plugins.getMemberAccessor().get(DefaultPipelineDataSourceManager.class.getDeclaredField("cachedDataSources"),
 dataSourceManager);
+            Map<?, ?> cachedDataSources = (Map<?, ?>) 
Plugins.getMemberAccessor().get(PipelineDataSourceManager.class.getDeclaredField("cachedDataSources"),
 dataSourceManager);
             assertThat(cachedDataSources.size(), is(2));
             dataSourceManager.close();
             assertTrue(cachedDataSources.isEmpty());
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
index e47946375dc..c15047698d4 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
@@ -17,15 +17,14 @@
 
 package org.apache.shardingsphere.test.it.data.pipeline.core.task;
 
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.pk.type.IntegerPrimaryKeyPosition;
-import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
@@ -52,7 +51,7 @@ import static org.mockito.Mockito.mock;
 
 class InventoryTaskTest {
     
-    private static final PipelineDataSourceManager DATA_SOURCE_MANAGER = new 
DefaultPipelineDataSourceManager();
+    private static final PipelineDataSourceManager DATA_SOURCE_MANAGER = new 
PipelineDataSourceManager();
     
     private MigrationTaskConfiguration taskConfig;
     
@@ -85,7 +84,7 @@ class InventoryTaskTest {
     }
     
     private void initTableData(final IncrementalDumperContext dumperContext) 
throws SQLException {
-        PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
+        PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
         try (
                 PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
                 Connection connection = dataSource.getConnection();
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
index d9f7b8018c0..939ff8881a7 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
@@ -20,31 +20,31 @@ package 
org.apache.shardingsphere.test.it.data.pipeline.core.util;
 import lombok.SneakyThrows;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.yaml.config.YamlPipelineProcessConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.yaml.config.YamlPipelineReadConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.yaml.swapper.YamlPipelineProcessConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.yaml.config.YamlPipelineProcessConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.yaml.config.YamlPipelineReadConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.yaml.swapper.YamlPipelineProcessConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.ingest.dumper.MigrationIncrementalDumperContextCreator;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.ingest.dumper.MigrationIncrementalDumperContextCreator;
 import 
org.apache.shardingsphere.driver.api.yaml.YamlShardingSphereDataSourceFactory;
 import 
org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
@@ -184,7 +184,7 @@ public final class PipelineContextUtils {
         TransmissionProcessContext processContext = new 
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
         int jobShardingItem = 0;
         MigrationTaskConfiguration taskConfig = 
buildTaskConfiguration(jobConfig, jobShardingItem, processConfig);
-        return new MigrationJobItemContext(jobConfig, jobShardingItem, null, 
processContext, taskConfig, new DefaultPipelineDataSourceManager());
+        return new MigrationJobItemContext(jobConfig, jobShardingItem, null, 
processContext, taskConfig, new PipelineDataSourceManager());
     }
     
     private static PipelineProcessConfiguration 
mockPipelineProcessConfiguration() {
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index fd9f1e22031..02d6722a8a1 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -18,15 +18,14 @@
 package 
org.apache.shardingsphere.test.it.data.pipeline.scenario.migration.check.consistency;
 
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
-import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
@@ -89,7 +88,7 @@ class MigrationDataConsistencyCheckerTest {
     
     private void initTableData(final PipelineDataSourceConfiguration 
dataSourceConfig) throws SQLException {
         try (
-                PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
+                PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
                 PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(dataSourceConfig);
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement()) {


Reply via email to