This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 88c8e1bf6a1 Add database name for PipelineContextUtils (#36868)
88c8e1bf6a1 is described below

commit 88c8e1bf6a1bccd8a54f2e07ac14deff56a9cd63
Author: Haoran Meng <[email protected]>
AuthorDate: Mon Oct 13 13:22:44 2025 +0800

    Add database name for PipelineContextUtils (#36868)
---
 .../datasource/PipelineDataSourceManagerTest.java  |  2 +-
 ...lineProcessConfigurationPersistServiceTest.java |  9 +++++---
 .../splitter/InventoryTaskSplitterTest.java        |  2 +-
 .../repository/PipelineGovernanceFacadeTest.java   |  8 ++++---
 .../pipeline/core/task/IncrementalTaskTest.java    |  2 +-
 .../data/pipeline/core/task/InventoryTaskTest.java |  2 +-
 .../core/util/PipelineDistributedBarrierTest.java  |  8 ++++---
 .../ConsistencyCheckJobExecutorCallbackTest.java   |  6 +++--
 .../api/ConsistencyCheckJobAPITest.java            | 10 ++++----
 .../migration/api/MigrationJobAPITest.java         | 27 +++++++++++++---------
 .../MigrationDataConsistencyCheckerTest.java       |  8 ++++---
 .../pipeline/core/util/PipelineContextUtils.java   | 13 ++++++-----
 12 files changed, 58 insertions(+), 39 deletions(-)

diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
index 37f1b64ccbc..e27ac430ae8 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
@@ -41,7 +41,7 @@ class PipelineDataSourceManagerTest {
     
     @BeforeAll
     static void beforeClass() {
-        PipelineContextUtils.initPipelineContextManager();
+        
PipelineContextUtils.initPipelineContextManager(PipelineDataSourceManagerTest.class.getSimpleName());
     }
     
     @BeforeEach
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
index 3a19079d14f..37ad37954ac 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
@@ -33,9 +33,11 @@ import static org.hamcrest.MatcherAssert.assertThat;
 
 class PipelineProcessConfigurationPersistServiceTest {
     
+    private static final String TEST_DATABASE_NAME = 
PipelineProcessConfigurationPersistServiceTest.class.getSimpleName();
+    
     @BeforeAll
     static void beforeClass() {
-        PipelineContextUtils.initPipelineContextManager();
+        PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME);
     }
     
     @Test
@@ -52,8 +54,9 @@ class PipelineProcessConfigurationPersistServiceTest {
         String expectedYamlText = YamlEngine.marshal(yamlProcessConfig);
         PipelineProcessConfiguration processConfig = new 
YamlPipelineProcessConfigurationSwapper().swapToObject(yamlProcessConfig);
         PipelineProcessConfigurationPersistService persistService = new 
PipelineProcessConfigurationPersistService();
-        persistService.persist(PipelineContextUtils.getContextKey(), 
"MIGRATION", processConfig);
-        String actualYamlText = YamlEngine.marshal(new 
YamlPipelineProcessConfigurationSwapper().swapToYamlConfiguration(persistService.load(PipelineContextUtils.getContextKey(),
 "MIGRATION")));
+        
persistService.persist(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME), 
"MIGRATION", processConfig);
+        String actualYamlText = YamlEngine.marshal(new 
YamlPipelineProcessConfigurationSwapper()
+                
.swapToYamlConfiguration(persistService.load(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME),
 "MIGRATION")));
         assertThat(actualYamlText, is(expectedYamlText));
     }
 }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
index 36def006535..0fc5ad1fff6 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
@@ -59,7 +59,7 @@ class InventoryTaskSplitterTest {
     
     @BeforeAll
     static void beforeClass() {
-        PipelineContextUtils.initPipelineContextManager();
+        
PipelineContextUtils.initPipelineContextManager(InventoryTaskSplitterTest.class.getSimpleName());
     }
     
     @BeforeEach
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java
index 06052dd6b68..af3590328e4 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java
@@ -67,10 +67,12 @@ class PipelineGovernanceFacadeTest {
     
     private static final CountDownLatch COUNT_DOWN_LATCH = new 
CountDownLatch(1);
     
+    private static final String TEST_DATABASE_NAME = 
PipelineGovernanceFacadeTest.class.getSimpleName();
+    
     @BeforeAll
     static void beforeClass() {
-        PipelineContextUtils.initPipelineContextManager();
-        governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
+        PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME);
+        governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME));
         watch();
     }
     
@@ -188,7 +190,7 @@ class PipelineGovernanceFacadeTest {
     }
     
     private ClusterPersistRepository getClusterPersistRepository() {
-        return (ClusterPersistRepository) 
PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getPersistServiceFacade().getRepository();
+        return (ClusterPersistRepository) 
PipelineContextManager.getContext(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)).getPersistServiceFacade().getRepository();
     }
     
     private MigrationJobItemContext mockJobItemContext() {
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index de7d2a158dd..0eae6053cd0 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -46,7 +46,7 @@ class IncrementalTaskTest {
     
     @BeforeAll
     static void beforeClass() {
-        PipelineContextUtils.initPipelineContextManager();
+        
PipelineContextUtils.initPipelineContextManager(IncrementalTaskTest.class.getSimpleName());
     }
     
     @BeforeEach
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index 368f3dc1852..b080c126684 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -56,7 +56,7 @@ class InventoryTaskTest {
     
     @BeforeAll
     static void beforeClass() {
-        PipelineContextUtils.initPipelineContextManager();
+        
PipelineContextUtils.initPipelineContextManager(InventoryTaskTest.class.getSimpleName());
     }
     
     @AfterAll
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
index 5422ce2f32c..2fb96e733bb 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
@@ -35,15 +35,17 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class PipelineDistributedBarrierTest {
     
+    private static final String TEST_DATABASE_NAME = 
PipelineDistributedBarrierTest.class.getSimpleName();
+    
     @BeforeAll
     static void setUp() {
-        PipelineContextUtils.initPipelineContextManager();
+        PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME);
     }
     
     @Test
     void assertRegisterAndRemove() throws ReflectiveOperationException {
         String jobId = 
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId();
-        PipelineContextKey contextKey = PipelineContextUtils.getContextKey();
+        PipelineContextKey contextKey = 
PipelineContextUtils.getContextKey(TEST_DATABASE_NAME);
         
PipelineContextManager.getContext(contextKey).getPersistServiceFacade().getRepository().persist(PipelineMetaDataNode.getJobRootPath(jobId),
 "");
         PipelineDistributedBarrier instance = 
PipelineDistributedBarrier.getInstance(contextKey);
         String parentPath = "/barrier";
@@ -58,7 +60,7 @@ class PipelineDistributedBarrierTest {
     @Test
     void assertAwait() {
         String jobId = 
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId();
-        PipelineContextKey contextKey = PipelineContextUtils.getContextKey();
+        PipelineContextKey contextKey = 
PipelineContextUtils.getContextKey(TEST_DATABASE_NAME);
         
PipelineContextManager.getContext(contextKey).getPersistServiceFacade().getRepository().persist(PipelineMetaDataNode.getJobRootPath(jobId),
 "");
         PipelineDistributedBarrier instance = 
PipelineDistributedBarrier.getInstance(contextKey);
         String barrierEnablePath = 
PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
index 32c4db66fd4..4c742d1608c 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
@@ -48,9 +48,11 @@ import static org.hamcrest.Matchers.is;
 
 class ConsistencyCheckJobExecutorCallbackTest {
     
+    private static final String TEST_DATABASE_NAME = 
ConsistencyCheckJobExecutorCallbackTest.class.getSimpleName();
+    
     @BeforeAll
     static void beforeClass() {
-        PipelineContextUtils.initPipelineContextManager();
+        PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME);
     }
     
     @Test
@@ -58,7 +60,7 @@ class ConsistencyCheckJobExecutorCallbackTest {
         ConsistencyCheckJobId pipelineJobId = new ConsistencyCheckJobId(new 
PipelineContextKey(InstanceType.PROXY), 
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId());
         String checkJobId = PipelineJobIdUtils.marshal(pipelineJobId);
         List<YamlTableCheckRangePosition> expectedYamlTableCheckRangePositions 
= Collections.singletonList(createYamlTableCheckRangePosition());
-        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId,
 0,
+        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)).getJobItemFacade().getProcess().persist(checkJobId,
 0,
                 
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectedYamlTableCheckRangePositions)));
         ConsistencyCheckJobExecutorCallback callback = new 
ConsistencyCheckJobExecutorCallback();
         ConsistencyCheckJobConfiguration jobConfig = new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(createYamlConsistencyCheckJobConfiguration(checkJobId));
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
index bb8ba7c779c..f0faac443af 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
@@ -55,6 +55,8 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 
 class ConsistencyCheckJobAPITest {
     
+    private static final String TEST_DATABASE_NAME = 
ConsistencyCheckJobAPITest.class.getSimpleName();
+    
     private final ConsistencyCheckJobType jobType = new 
ConsistencyCheckJobType();
     
     private final ConsistencyCheckJobAPI jobAPI = new 
ConsistencyCheckJobAPI(jobType);
@@ -65,7 +67,7 @@ class ConsistencyCheckJobAPITest {
     
     @BeforeAll
     static void beforeClass() {
-        PipelineContextUtils.initPipelineContextManager();
+        PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME);
     }
     
     @Test
@@ -81,7 +83,7 @@ class ConsistencyCheckJobAPITest {
         assertNull(checkJobConfig.getAlgorithmTypeName());
         int sequence = ConsistencyCheckJobId.parseSequence(expectCheckJobId);
         assertThat(sequence, is(expectedSequence));
-        PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
+        PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME));
         Collection<String> actualCheckJobIds = 
governanceFacade.getJobFacade().getCheck().listCheckJobIds(parentJobId);
         assertThat(actualCheckJobIds.size(), is(1));
         assertThat(actualCheckJobIds.iterator().next(), is(expectCheckJobId));
@@ -93,7 +95,7 @@ class ConsistencyCheckJobAPITest {
     void assertDropByParentJobId() {
         MigrationJobConfiguration parentJobConfig = 
jobConfigSwapper.swapToObject(JobConfigurationBuilder.createYamlMigrationJobConfiguration());
         String parentJobId = parentJobConfig.getJobId();
-        PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
+        PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME));
         int expectedSequence = 1;
         for (int i = 0; i < 3; i++) {
             String checkJobId = jobAPI.start(new 
CreateConsistencyCheckJobParameter(parentJobId, null, null,
@@ -235,7 +237,7 @@ class ConsistencyCheckJobAPITest {
     
     private void persistCheckJobResult(final String parentJobId, final String 
checkJobId) {
         Map<String, TableDataConsistencyCheckResult> 
dataConsistencyCheckResult = Collections.singletonMap("t_order", new 
TableDataConsistencyCheckResult(true));
-        PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
+        PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME));
         
governanceFacade.getJobFacade().getCheck().persistCheckJobResult(parentJobId, 
checkJobId, dataConsistencyCheckResult);
     }
     
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
index 91b60d1e063..83fb82cbe9e 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
@@ -108,9 +108,11 @@ class MigrationJobAPITest {
     
     private static DatabaseType databaseType;
     
+    private static final String TEST_DATABASE_NAME = 
MigrationJobAPITest.class.getSimpleName();
+    
     @BeforeAll
     static void beforeClass() {
-        PipelineContextUtils.initPipelineContextManager();
+        PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME);
         jobType = new MigrationJobType();
         jobAPI = (MigrationJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
         jobConfigManager = new PipelineJobConfigurationManager(jobType);
@@ -123,12 +125,13 @@ class MigrationJobAPITest {
         props.put("jdbcUrl", jdbcUrl);
         props.put("username", "root");
         props.put("password", "root");
-        
jobAPI.registerMigrationSourceStorageUnits(PipelineContextUtils.getContextKey(),
 Collections.singletonMap("ds_0", new 
DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props)));
+        
jobAPI.registerMigrationSourceStorageUnits(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME),
 Collections.singletonMap("ds_0",
+                new 
DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props)));
     }
     
     @AfterAll
     static void afterClass() {
-        
jobAPI.dropMigrationSourceResources(PipelineContextUtils.getContextKey(), 
Collections.singletonList("ds_0"));
+        
jobAPI.dropMigrationSourceResources(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME),
 Collections.singletonList("ds_0"));
     }
     
     @Test
@@ -251,7 +254,7 @@ class MigrationJobAPITest {
     @Test
     void assertAddMigrationSourceResources() {
         PipelineDataSourcePersistService persistService = new 
PipelineDataSourcePersistService();
-        Map<String, DataSourcePoolProperties> actual = 
persistService.load(PipelineContextUtils.getContextKey(), "MIGRATION");
+        Map<String, DataSourcePoolProperties> actual = 
persistService.load(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME), 
"MIGRATION");
         assertTrue(actual.containsKey("ds_0"));
     }
     
@@ -259,20 +262,20 @@ class MigrationJobAPITest {
     void assertCreateJobConfigFailedOnMoreThanOneSourceTable() {
         Collection<MigrationSourceTargetEntry> sourceTargetEntries = 
Stream.of("t_order_0", "t_order_1")
                 .map(each -> new MigrationSourceTargetEntry(new 
DataNode("ds_0", each), "t_order")).collect(Collectors.toList());
-        assertThrows(PipelineInvalidParameterException.class, () -> 
jobAPI.schedule(PipelineContextUtils.getContextKey(), sourceTargetEntries, 
"logic_db"));
+        assertThrows(PipelineInvalidParameterException.class, () -> 
jobAPI.schedule(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME), 
sourceTargetEntries, "logic_db"));
     }
     
     @Test
     void assertCreateJobConfigFailedOnDataSourceNotExist() {
         Collection<MigrationSourceTargetEntry> sourceTargetEntries = 
Collections.singleton(new MigrationSourceTargetEntry(new 
DataNode("ds_not_exists", "t_order"), "t_order"));
-        assertThrows(PipelineInvalidParameterException.class, () -> 
jobAPI.schedule(PipelineContextUtils.getContextKey(), sourceTargetEntries, 
"logic_db"));
+        assertThrows(PipelineInvalidParameterException.class, () -> 
jobAPI.schedule(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME), 
sourceTargetEntries, "logic_db"));
     }
     
     @Test
     void assertCreateJobConfig() throws SQLException {
         initIntPrimaryEnvironment();
         MigrationSourceTargetEntry sourceTargetEntry = new 
MigrationSourceTargetEntry(new DataNode("ds_0", "t_order"), "t_order");
-        String jobId = jobAPI.schedule(PipelineContextUtils.getContextKey(), 
Collections.singleton(sourceTargetEntry), "logic_db");
+        String jobId = 
jobAPI.schedule(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME), 
Collections.singleton(sourceTargetEntry), "logic_db");
         MigrationJobConfiguration actual = 
jobConfigManager.getJobConfiguration(jobId);
         assertThat(actual.getTargetDatabaseName(), is("logic_db"));
         List<JobDataNodeLine> dataNodeLines = actual.getJobShardingDataNodes();
@@ -287,7 +290,7 @@ class MigrationJobAPITest {
     }
     
     private void initIntPrimaryEnvironment() throws SQLException {
-        Map<String, DataSourcePoolProperties> metaDataDataSource = new 
PipelineDataSourcePersistService().load(PipelineContextUtils.getContextKey(), 
"MIGRATION");
+        Map<String, DataSourcePoolProperties> metaDataDataSource = new 
PipelineDataSourcePersistService().load(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME),
 "MIGRATION");
         DataSourcePoolProperties props = metaDataDataSource.get("ds_0");
         try (
                 PipelineDataSource dataSource = new 
PipelineDataSource(DataSourcePoolCreator.create(props), databaseType);
@@ -300,7 +303,7 @@ class MigrationJobAPITest {
     
     @Test
     void assertShowMigrationSourceResources() {
-        Collection<Collection<Object>> actual = 
jobAPI.listMigrationSourceResources(PipelineContextUtils.getContextKey());
+        Collection<Collection<Object>> actual = 
jobAPI.listMigrationSourceResources(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME));
         assertThat(actual.size(), is(1));
         Collection<Object> objects = actual.iterator().next();
         assertThat(objects.toArray()[0], is("ds_0"));
@@ -313,7 +316,8 @@ class MigrationJobAPITest {
         YamlTransmissionJobItemProgress yamlJobItemProgress = new 
YamlTransmissionJobItemProgress();
         yamlJobItemProgress.setStatus(JobStatus.RUNNING.name());
         yamlJobItemProgress.setSourceDatabaseType("MySQL");
-        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobConfig.getJobId(),
 0, YamlEngine.marshal(yamlJobItemProgress));
+        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)).getJobItemFacade().getProcess().persist(jobConfig.getJobId(),
 0,
+                YamlEngine.marshal(yamlJobItemProgress));
         Collection<TransmissionJobItemInfo> jobItemInfos = 
transmissionJobManager.getJobItemInfos(jobConfig.getJobId());
         assertThat(jobItemInfos.size(), is(1));
         TransmissionJobItemInfo jobItemInfo = jobItemInfos.iterator().next();
@@ -330,7 +334,8 @@ class MigrationJobAPITest {
         
yamlJobItemProgress.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK.name());
         yamlJobItemProgress.setProcessedRecordsCount(100L);
         yamlJobItemProgress.setInventoryRecordsCount(50L);
-        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobConfig.getJobId(),
 0, YamlEngine.marshal(yamlJobItemProgress));
+        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)).getJobItemFacade().getProcess().persist(jobConfig.getJobId(),
 0,
+                YamlEngine.marshal(yamlJobItemProgress));
         Collection<TransmissionJobItemInfo> jobItemInfos = 
transmissionJobManager.getJobItemInfos(jobConfig.getJobId());
         TransmissionJobItemInfo jobItemInfo = 
jobItemInfos.stream().iterator().next();
         assertThat(jobItemInfo.getJobItemProgress().getStatus(), 
is(JobStatus.EXECUTE_INCREMENTAL_TASK));
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index 04f4574ad55..3720397f7e0 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -50,9 +50,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class MigrationDataConsistencyCheckerTest {
     
+    private static final String TEST_DATABASE_NAME = 
MigrationDataConsistencyCheckerTest.class.getSimpleName();
+    
     @BeforeAll
     static void beforeClass() {
-        PipelineContextUtils.initPipelineContextManager();
+        PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME);
     }
     
     @Test
@@ -79,7 +81,7 @@ class MigrationDataConsistencyCheckerTest {
         jobConfigurationPOJO.setJobParameter(YamlEngine.marshal(new 
YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
         jobConfigurationPOJO.setJobName(jobConfig.getJobId());
         jobConfigurationPOJO.setShardingTotalCount(1);
-        PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
+        PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME));
         
getClusterPersistRepository().persist(String.format("/pipeline/jobs/%s/config", 
jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
         
governanceFacade.getJobItemFacade().getProcess().persist(jobConfig.getJobId(), 
0, "");
         return new MigrationDataConsistencyChecker(jobConfig, new 
TransmissionProcessContext(jobConfig.getJobId(), null),
@@ -87,7 +89,7 @@ class MigrationDataConsistencyCheckerTest {
     }
     
     private ClusterPersistRepository getClusterPersistRepository() {
-        return (ClusterPersistRepository) 
PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getPersistServiceFacade().getRepository();
+        return (ClusterPersistRepository) 
PipelineContextManager.getContext(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)).getPersistServiceFacade().getRepository();
     }
     
     private ConsistencyCheckJobItemProgressContext 
createConsistencyCheckJobItemProgressContext(final String jobId) {
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
index e58fb2b4122..ded82dca6a9 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
@@ -92,16 +92,16 @@ import static org.mockito.Mockito.mock;
  */
 public final class PipelineContextUtils {
     
-    private static final PipelineContextKey CONTEXT_KEY = new 
PipelineContextKey(InstanceType.PROXY);
-    
     private static final PipelineExecuteEngine EXECUTE_ENGINE = 
PipelineExecuteEngine.newCachedThreadInstance(PipelineContextUtils.class.getSimpleName());
     
     /**
      * Init pipeline context manager.
+     *
+     * @param databaseName database name
      */
-    public static void initPipelineContextManager() {
+    public static void initPipelineContextManager(final String databaseName) {
         EmbedTestingServer.start();
-        PipelineContextKey contextKey = getContextKey();
+        PipelineContextKey contextKey = getContextKey(databaseName);
         if (null != PipelineContextManager.getContext(contextKey)) {
             return;
         }
@@ -171,10 +171,11 @@ public final class PipelineContextUtils {
     /**
      * Get context key.
      *
+     * @param databaseName database name
      * @return context key
      */
-    public static PipelineContextKey getContextKey() {
-        return CONTEXT_KEY;
+    public static PipelineContextKey getContextKey(final String databaseName) {
+        return new PipelineContextKey(databaseName, InstanceType.PROXY);
     }
     
     /**

Reply via email to