This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 88c8e1bf6a1 Add database name for PipelineContextUtils (#36868)
88c8e1bf6a1 is described below
commit 88c8e1bf6a1bccd8a54f2e07ac14deff56a9cd63
Author: Haoran Meng <[email protected]>
AuthorDate: Mon Oct 13 13:22:44 2025 +0800
Add database name for PipelineContextUtils (#36868)
---
.../datasource/PipelineDataSourceManagerTest.java | 2 +-
...lineProcessConfigurationPersistServiceTest.java | 9 +++++---
.../splitter/InventoryTaskSplitterTest.java | 2 +-
.../repository/PipelineGovernanceFacadeTest.java | 8 ++++---
.../pipeline/core/task/IncrementalTaskTest.java | 2 +-
.../data/pipeline/core/task/InventoryTaskTest.java | 2 +-
.../core/util/PipelineDistributedBarrierTest.java | 8 ++++---
.../ConsistencyCheckJobExecutorCallbackTest.java | 6 +++--
.../api/ConsistencyCheckJobAPITest.java | 10 ++++----
.../migration/api/MigrationJobAPITest.java | 27 +++++++++++++---------
.../MigrationDataConsistencyCheckerTest.java | 8 ++++---
.../pipeline/core/util/PipelineContextUtils.java | 13 ++++++-----
12 files changed, 58 insertions(+), 39 deletions(-)
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
index 37f1b64ccbc..e27ac430ae8 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
@@ -41,7 +41,7 @@ class PipelineDataSourceManagerTest {
@BeforeAll
static void beforeClass() {
- PipelineContextUtils.initPipelineContextManager();
+
PipelineContextUtils.initPipelineContextManager(PipelineDataSourceManagerTest.class.getSimpleName());
}
@BeforeEach
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
index 3a19079d14f..37ad37954ac 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
@@ -33,9 +33,11 @@ import static org.hamcrest.MatcherAssert.assertThat;
class PipelineProcessConfigurationPersistServiceTest {
+ private static final String TEST_DATABASE_NAME =
PipelineProcessConfigurationPersistServiceTest.class.getSimpleName();
+
@BeforeAll
static void beforeClass() {
- PipelineContextUtils.initPipelineContextManager();
+ PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME);
}
@Test
@@ -52,8 +54,9 @@ class PipelineProcessConfigurationPersistServiceTest {
String expectedYamlText = YamlEngine.marshal(yamlProcessConfig);
PipelineProcessConfiguration processConfig = new
YamlPipelineProcessConfigurationSwapper().swapToObject(yamlProcessConfig);
PipelineProcessConfigurationPersistService persistService = new
PipelineProcessConfigurationPersistService();
- persistService.persist(PipelineContextUtils.getContextKey(),
"MIGRATION", processConfig);
- String actualYamlText = YamlEngine.marshal(new
YamlPipelineProcessConfigurationSwapper().swapToYamlConfiguration(persistService.load(PipelineContextUtils.getContextKey(),
"MIGRATION")));
+
persistService.persist(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME),
"MIGRATION", processConfig);
+ String actualYamlText = YamlEngine.marshal(new
YamlPipelineProcessConfigurationSwapper()
+
.swapToYamlConfiguration(persistService.load(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME),
"MIGRATION")));
assertThat(actualYamlText, is(expectedYamlText));
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
index 36def006535..0fc5ad1fff6 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
@@ -59,7 +59,7 @@ class InventoryTaskSplitterTest {
@BeforeAll
static void beforeClass() {
- PipelineContextUtils.initPipelineContextManager();
+
PipelineContextUtils.initPipelineContextManager(InventoryTaskSplitterTest.class.getSimpleName());
}
@BeforeEach
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java
index 06052dd6b68..af3590328e4 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java
@@ -67,10 +67,12 @@ class PipelineGovernanceFacadeTest {
private static final CountDownLatch COUNT_DOWN_LATCH = new
CountDownLatch(1);
+ private static final String TEST_DATABASE_NAME =
PipelineGovernanceFacadeTest.class.getSimpleName();
+
@BeforeAll
static void beforeClass() {
- PipelineContextUtils.initPipelineContextManager();
- governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
+ PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME);
+ governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME));
watch();
}
@@ -188,7 +190,7 @@ class PipelineGovernanceFacadeTest {
}
private ClusterPersistRepository getClusterPersistRepository() {
- return (ClusterPersistRepository)
PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getPersistServiceFacade().getRepository();
+ return (ClusterPersistRepository)
PipelineContextManager.getContext(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)).getPersistServiceFacade().getRepository();
}
private MigrationJobItemContext mockJobItemContext() {
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index de7d2a158dd..0eae6053cd0 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -46,7 +46,7 @@ class IncrementalTaskTest {
@BeforeAll
static void beforeClass() {
- PipelineContextUtils.initPipelineContextManager();
+
PipelineContextUtils.initPipelineContextManager(IncrementalTaskTest.class.getSimpleName());
}
@BeforeEach
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index 368f3dc1852..b080c126684 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -56,7 +56,7 @@ class InventoryTaskTest {
@BeforeAll
static void beforeClass() {
- PipelineContextUtils.initPipelineContextManager();
+
PipelineContextUtils.initPipelineContextManager(InventoryTaskTest.class.getSimpleName());
}
@AfterAll
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
index 5422ce2f32c..2fb96e733bb 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
@@ -35,15 +35,17 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class PipelineDistributedBarrierTest {
+ private static final String TEST_DATABASE_NAME =
PipelineDistributedBarrierTest.class.getSimpleName();
+
@BeforeAll
static void setUp() {
- PipelineContextUtils.initPipelineContextManager();
+ PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME);
}
@Test
void assertRegisterAndRemove() throws ReflectiveOperationException {
String jobId =
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId();
- PipelineContextKey contextKey = PipelineContextUtils.getContextKey();
+ PipelineContextKey contextKey =
PipelineContextUtils.getContextKey(TEST_DATABASE_NAME);
PipelineContextManager.getContext(contextKey).getPersistServiceFacade().getRepository().persist(PipelineMetaDataNode.getJobRootPath(jobId),
"");
PipelineDistributedBarrier instance =
PipelineDistributedBarrier.getInstance(contextKey);
String parentPath = "/barrier";
@@ -58,7 +60,7 @@ class PipelineDistributedBarrierTest {
@Test
void assertAwait() {
String jobId =
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId();
- PipelineContextKey contextKey = PipelineContextUtils.getContextKey();
+ PipelineContextKey contextKey =
PipelineContextUtils.getContextKey(TEST_DATABASE_NAME);
PipelineContextManager.getContext(contextKey).getPersistServiceFacade().getRepository().persist(PipelineMetaDataNode.getJobRootPath(jobId),
"");
PipelineDistributedBarrier instance =
PipelineDistributedBarrier.getInstance(contextKey);
String barrierEnablePath =
PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
index 32c4db66fd4..4c742d1608c 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
@@ -48,9 +48,11 @@ import static org.hamcrest.Matchers.is;
class ConsistencyCheckJobExecutorCallbackTest {
+ private static final String TEST_DATABASE_NAME =
ConsistencyCheckJobExecutorCallbackTest.class.getSimpleName();
+
@BeforeAll
static void beforeClass() {
- PipelineContextUtils.initPipelineContextManager();
+ PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME);
}
@Test
@@ -58,7 +60,7 @@ class ConsistencyCheckJobExecutorCallbackTest {
ConsistencyCheckJobId pipelineJobId = new ConsistencyCheckJobId(new
PipelineContextKey(InstanceType.PROXY),
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId());
String checkJobId = PipelineJobIdUtils.marshal(pipelineJobId);
List<YamlTableCheckRangePosition> expectedYamlTableCheckRangePositions
= Collections.singletonList(createYamlTableCheckRangePosition());
-
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId,
0,
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)).getJobItemFacade().getProcess().persist(checkJobId,
0,
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectedYamlTableCheckRangePositions)));
ConsistencyCheckJobExecutorCallback callback = new
ConsistencyCheckJobExecutorCallback();
ConsistencyCheckJobConfiguration jobConfig = new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(createYamlConsistencyCheckJobConfiguration(checkJobId));
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
index bb8ba7c779c..f0faac443af 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
@@ -55,6 +55,8 @@ import static org.junit.jupiter.api.Assertions.assertNull;
class ConsistencyCheckJobAPITest {
+ private static final String TEST_DATABASE_NAME =
ConsistencyCheckJobAPITest.class.getSimpleName();
+
private final ConsistencyCheckJobType jobType = new
ConsistencyCheckJobType();
private final ConsistencyCheckJobAPI jobAPI = new
ConsistencyCheckJobAPI(jobType);
@@ -65,7 +67,7 @@ class ConsistencyCheckJobAPITest {
@BeforeAll
static void beforeClass() {
- PipelineContextUtils.initPipelineContextManager();
+ PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME);
}
@Test
@@ -81,7 +83,7 @@ class ConsistencyCheckJobAPITest {
assertNull(checkJobConfig.getAlgorithmTypeName());
int sequence = ConsistencyCheckJobId.parseSequence(expectCheckJobId);
assertThat(sequence, is(expectedSequence));
- PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
+ PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME));
Collection<String> actualCheckJobIds =
governanceFacade.getJobFacade().getCheck().listCheckJobIds(parentJobId);
assertThat(actualCheckJobIds.size(), is(1));
assertThat(actualCheckJobIds.iterator().next(), is(expectCheckJobId));
@@ -93,7 +95,7 @@ class ConsistencyCheckJobAPITest {
void assertDropByParentJobId() {
MigrationJobConfiguration parentJobConfig =
jobConfigSwapper.swapToObject(JobConfigurationBuilder.createYamlMigrationJobConfiguration());
String parentJobId = parentJobConfig.getJobId();
- PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
+ PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME));
int expectedSequence = 1;
for (int i = 0; i < 3; i++) {
String checkJobId = jobAPI.start(new
CreateConsistencyCheckJobParameter(parentJobId, null, null,
@@ -235,7 +237,7 @@ class ConsistencyCheckJobAPITest {
private void persistCheckJobResult(final String parentJobId, final String
checkJobId) {
Map<String, TableDataConsistencyCheckResult>
dataConsistencyCheckResult = Collections.singletonMap("t_order", new
TableDataConsistencyCheckResult(true));
- PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
+ PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME));
governanceFacade.getJobFacade().getCheck().persistCheckJobResult(parentJobId,
checkJobId, dataConsistencyCheckResult);
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
index 91b60d1e063..83fb82cbe9e 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
@@ -108,9 +108,11 @@ class MigrationJobAPITest {
private static DatabaseType databaseType;
+ private static final String TEST_DATABASE_NAME =
MigrationJobAPITest.class.getSimpleName();
+
@BeforeAll
static void beforeClass() {
- PipelineContextUtils.initPipelineContextManager();
+ PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME);
jobType = new MigrationJobType();
jobAPI = (MigrationJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
jobConfigManager = new PipelineJobConfigurationManager(jobType);
@@ -123,12 +125,13 @@ class MigrationJobAPITest {
props.put("jdbcUrl", jdbcUrl);
props.put("username", "root");
props.put("password", "root");
-
jobAPI.registerMigrationSourceStorageUnits(PipelineContextUtils.getContextKey(),
Collections.singletonMap("ds_0", new
DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props)));
+
jobAPI.registerMigrationSourceStorageUnits(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME),
Collections.singletonMap("ds_0",
+ new
DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props)));
}
@AfterAll
static void afterClass() {
-
jobAPI.dropMigrationSourceResources(PipelineContextUtils.getContextKey(),
Collections.singletonList("ds_0"));
+
jobAPI.dropMigrationSourceResources(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME),
Collections.singletonList("ds_0"));
}
@Test
@@ -251,7 +254,7 @@ class MigrationJobAPITest {
@Test
void assertAddMigrationSourceResources() {
PipelineDataSourcePersistService persistService = new
PipelineDataSourcePersistService();
- Map<String, DataSourcePoolProperties> actual =
persistService.load(PipelineContextUtils.getContextKey(), "MIGRATION");
+ Map<String, DataSourcePoolProperties> actual =
persistService.load(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME),
"MIGRATION");
assertTrue(actual.containsKey("ds_0"));
}
@@ -259,20 +262,20 @@ class MigrationJobAPITest {
void assertCreateJobConfigFailedOnMoreThanOneSourceTable() {
Collection<MigrationSourceTargetEntry> sourceTargetEntries =
Stream.of("t_order_0", "t_order_1")
.map(each -> new MigrationSourceTargetEntry(new
DataNode("ds_0", each), "t_order")).collect(Collectors.toList());
- assertThrows(PipelineInvalidParameterException.class, () ->
jobAPI.schedule(PipelineContextUtils.getContextKey(), sourceTargetEntries,
"logic_db"));
+ assertThrows(PipelineInvalidParameterException.class, () ->
jobAPI.schedule(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME),
sourceTargetEntries, "logic_db"));
}
@Test
void assertCreateJobConfigFailedOnDataSourceNotExist() {
Collection<MigrationSourceTargetEntry> sourceTargetEntries =
Collections.singleton(new MigrationSourceTargetEntry(new
DataNode("ds_not_exists", "t_order"), "t_order"));
- assertThrows(PipelineInvalidParameterException.class, () ->
jobAPI.schedule(PipelineContextUtils.getContextKey(), sourceTargetEntries,
"logic_db"));
+ assertThrows(PipelineInvalidParameterException.class, () ->
jobAPI.schedule(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME),
sourceTargetEntries, "logic_db"));
}
@Test
void assertCreateJobConfig() throws SQLException {
initIntPrimaryEnvironment();
MigrationSourceTargetEntry sourceTargetEntry = new
MigrationSourceTargetEntry(new DataNode("ds_0", "t_order"), "t_order");
- String jobId = jobAPI.schedule(PipelineContextUtils.getContextKey(),
Collections.singleton(sourceTargetEntry), "logic_db");
+ String jobId =
jobAPI.schedule(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME),
Collections.singleton(sourceTargetEntry), "logic_db");
MigrationJobConfiguration actual =
jobConfigManager.getJobConfiguration(jobId);
assertThat(actual.getTargetDatabaseName(), is("logic_db"));
List<JobDataNodeLine> dataNodeLines = actual.getJobShardingDataNodes();
@@ -287,7 +290,7 @@ class MigrationJobAPITest {
}
private void initIntPrimaryEnvironment() throws SQLException {
- Map<String, DataSourcePoolProperties> metaDataDataSource = new
PipelineDataSourcePersistService().load(PipelineContextUtils.getContextKey(),
"MIGRATION");
+ Map<String, DataSourcePoolProperties> metaDataDataSource = new
PipelineDataSourcePersistService().load(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME),
"MIGRATION");
DataSourcePoolProperties props = metaDataDataSource.get("ds_0");
try (
PipelineDataSource dataSource = new
PipelineDataSource(DataSourcePoolCreator.create(props), databaseType);
@@ -300,7 +303,7 @@ class MigrationJobAPITest {
@Test
void assertShowMigrationSourceResources() {
- Collection<Collection<Object>> actual =
jobAPI.listMigrationSourceResources(PipelineContextUtils.getContextKey());
+ Collection<Collection<Object>> actual =
jobAPI.listMigrationSourceResources(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME));
assertThat(actual.size(), is(1));
Collection<Object> objects = actual.iterator().next();
assertThat(objects.toArray()[0], is("ds_0"));
@@ -313,7 +316,8 @@ class MigrationJobAPITest {
YamlTransmissionJobItemProgress yamlJobItemProgress = new
YamlTransmissionJobItemProgress();
yamlJobItemProgress.setStatus(JobStatus.RUNNING.name());
yamlJobItemProgress.setSourceDatabaseType("MySQL");
-
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobConfig.getJobId(),
0, YamlEngine.marshal(yamlJobItemProgress));
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)).getJobItemFacade().getProcess().persist(jobConfig.getJobId(),
0,
+ YamlEngine.marshal(yamlJobItemProgress));
Collection<TransmissionJobItemInfo> jobItemInfos =
transmissionJobManager.getJobItemInfos(jobConfig.getJobId());
assertThat(jobItemInfos.size(), is(1));
TransmissionJobItemInfo jobItemInfo = jobItemInfos.iterator().next();
@@ -330,7 +334,8 @@ class MigrationJobAPITest {
yamlJobItemProgress.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK.name());
yamlJobItemProgress.setProcessedRecordsCount(100L);
yamlJobItemProgress.setInventoryRecordsCount(50L);
-
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobConfig.getJobId(),
0, YamlEngine.marshal(yamlJobItemProgress));
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)).getJobItemFacade().getProcess().persist(jobConfig.getJobId(),
0,
+ YamlEngine.marshal(yamlJobItemProgress));
Collection<TransmissionJobItemInfo> jobItemInfos =
transmissionJobManager.getJobItemInfos(jobConfig.getJobId());
TransmissionJobItemInfo jobItemInfo =
jobItemInfos.stream().iterator().next();
assertThat(jobItemInfo.getJobItemProgress().getStatus(),
is(JobStatus.EXECUTE_INCREMENTAL_TASK));
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index 04f4574ad55..3720397f7e0 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -50,9 +50,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class MigrationDataConsistencyCheckerTest {
+ private static final String TEST_DATABASE_NAME =
MigrationDataConsistencyCheckerTest.class.getSimpleName();
+
@BeforeAll
static void beforeClass() {
- PipelineContextUtils.initPipelineContextManager();
+ PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME);
}
@Test
@@ -79,7 +81,7 @@ class MigrationDataConsistencyCheckerTest {
jobConfigurationPOJO.setJobParameter(YamlEngine.marshal(new
YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
jobConfigurationPOJO.setJobName(jobConfig.getJobId());
jobConfigurationPOJO.setShardingTotalCount(1);
- PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
+ PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME));
getClusterPersistRepository().persist(String.format("/pipeline/jobs/%s/config",
jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
governanceFacade.getJobItemFacade().getProcess().persist(jobConfig.getJobId(),
0, "");
return new MigrationDataConsistencyChecker(jobConfig, new
TransmissionProcessContext(jobConfig.getJobId(), null),
@@ -87,7 +89,7 @@ class MigrationDataConsistencyCheckerTest {
}
private ClusterPersistRepository getClusterPersistRepository() {
- return (ClusterPersistRepository)
PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getPersistServiceFacade().getRepository();
+ return (ClusterPersistRepository)
PipelineContextManager.getContext(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)).getPersistServiceFacade().getRepository();
}
private ConsistencyCheckJobItemProgressContext
createConsistencyCheckJobItemProgressContext(final String jobId) {
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
index e58fb2b4122..ded82dca6a9 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
@@ -92,16 +92,16 @@ import static org.mockito.Mockito.mock;
*/
public final class PipelineContextUtils {
- private static final PipelineContextKey CONTEXT_KEY = new
PipelineContextKey(InstanceType.PROXY);
-
private static final PipelineExecuteEngine EXECUTE_ENGINE =
PipelineExecuteEngine.newCachedThreadInstance(PipelineContextUtils.class.getSimpleName());
/**
* Init pipeline context manager.
+ *
+ * @param databaseName database name
*/
- public static void initPipelineContextManager() {
+ public static void initPipelineContextManager(final String databaseName) {
EmbedTestingServer.start();
- PipelineContextKey contextKey = getContextKey();
+ PipelineContextKey contextKey = getContextKey(databaseName);
if (null != PipelineContextManager.getContext(contextKey)) {
return;
}
@@ -171,10 +171,11 @@ public final class PipelineContextUtils {
/**
* Get context key.
*
+ * @param databaseName database name
* @return context key
*/
- public static PipelineContextKey getContextKey() {
- return CONTEXT_KEY;
+ public static PipelineContextKey getContextKey(final String databaseName) {
+ return new PipelineContextKey(databaseName, InstanceType.PROXY);
}
/**