This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f2d4b4170e8 Revert "Add database name for PipelineContextUtils
(#36868)" (#36871)
f2d4b4170e8 is described below
commit f2d4b4170e869ba0c8cd2fee0d14dbcbad16c90c
Author: Haoran Meng <[email protected]>
AuthorDate: Mon Oct 13 14:51:56 2025 +0800
Revert "Add database name for PipelineContextUtils (#36868)" (#36871)
This reverts commit 88c8e1bf6a1bccd8a54f2e07ac14deff56a9cd63.
---
.../datasource/PipelineDataSourceManagerTest.java | 2 +-
...lineProcessConfigurationPersistServiceTest.java | 9 +++-----
.../splitter/InventoryTaskSplitterTest.java | 2 +-
.../repository/PipelineGovernanceFacadeTest.java | 8 +++----
.../pipeline/core/task/IncrementalTaskTest.java | 2 +-
.../data/pipeline/core/task/InventoryTaskTest.java | 2 +-
.../core/util/PipelineDistributedBarrierTest.java | 8 +++----
.../ConsistencyCheckJobExecutorCallbackTest.java | 6 ++---
.../api/ConsistencyCheckJobAPITest.java | 10 ++++----
.../migration/api/MigrationJobAPITest.java | 27 +++++++++-------------
.../MigrationDataConsistencyCheckerTest.java | 8 +++----
.../pipeline/core/util/PipelineContextUtils.java | 13 +++++------
12 files changed, 39 insertions(+), 58 deletions(-)
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
index e27ac430ae8..37f1b64ccbc 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
@@ -41,7 +41,7 @@ class PipelineDataSourceManagerTest {
@BeforeAll
static void beforeClass() {
-
PipelineContextUtils.initPipelineContextManager(PipelineDataSourceManagerTest.class.getSimpleName());
+ PipelineContextUtils.initPipelineContextManager();
}
@BeforeEach
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
index 37ad37954ac..3a19079d14f 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
@@ -33,11 +33,9 @@ import static org.hamcrest.MatcherAssert.assertThat;
class PipelineProcessConfigurationPersistServiceTest {
- private static final String TEST_DATABASE_NAME =
PipelineProcessConfigurationPersistServiceTest.class.getSimpleName();
-
@BeforeAll
static void beforeClass() {
- PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME);
+ PipelineContextUtils.initPipelineContextManager();
}
@Test
@@ -54,9 +52,8 @@ class PipelineProcessConfigurationPersistServiceTest {
String expectedYamlText = YamlEngine.marshal(yamlProcessConfig);
PipelineProcessConfiguration processConfig = new
YamlPipelineProcessConfigurationSwapper().swapToObject(yamlProcessConfig);
PipelineProcessConfigurationPersistService persistService = new
PipelineProcessConfigurationPersistService();
-
persistService.persist(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME),
"MIGRATION", processConfig);
- String actualYamlText = YamlEngine.marshal(new
YamlPipelineProcessConfigurationSwapper()
-
.swapToYamlConfiguration(persistService.load(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME),
"MIGRATION")));
+ persistService.persist(PipelineContextUtils.getContextKey(),
"MIGRATION", processConfig);
+ String actualYamlText = YamlEngine.marshal(new
YamlPipelineProcessConfigurationSwapper().swapToYamlConfiguration(persistService.load(PipelineContextUtils.getContextKey(),
"MIGRATION")));
assertThat(actualYamlText, is(expectedYamlText));
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
index 0fc5ad1fff6..36def006535 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
@@ -59,7 +59,7 @@ class InventoryTaskSplitterTest {
@BeforeAll
static void beforeClass() {
-
PipelineContextUtils.initPipelineContextManager(InventoryTaskSplitterTest.class.getSimpleName());
+ PipelineContextUtils.initPipelineContextManager();
}
@BeforeEach
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java
index af3590328e4..06052dd6b68 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java
@@ -67,12 +67,10 @@ class PipelineGovernanceFacadeTest {
private static final CountDownLatch COUNT_DOWN_LATCH = new
CountDownLatch(1);
- private static final String TEST_DATABASE_NAME =
PipelineGovernanceFacadeTest.class.getSimpleName();
-
@BeforeAll
static void beforeClass() {
- PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME);
- governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME));
+ PipelineContextUtils.initPipelineContextManager();
+ governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
watch();
}
@@ -190,7 +188,7 @@ class PipelineGovernanceFacadeTest {
}
private ClusterPersistRepository getClusterPersistRepository() {
- return (ClusterPersistRepository)
PipelineContextManager.getContext(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)).getPersistServiceFacade().getRepository();
+ return (ClusterPersistRepository)
PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getPersistServiceFacade().getRepository();
}
private MigrationJobItemContext mockJobItemContext() {
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index 0eae6053cd0..de7d2a158dd 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -46,7 +46,7 @@ class IncrementalTaskTest {
@BeforeAll
static void beforeClass() {
-
PipelineContextUtils.initPipelineContextManager(IncrementalTaskTest.class.getSimpleName());
+ PipelineContextUtils.initPipelineContextManager();
}
@BeforeEach
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index b080c126684..368f3dc1852 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -56,7 +56,7 @@ class InventoryTaskTest {
@BeforeAll
static void beforeClass() {
-
PipelineContextUtils.initPipelineContextManager(InventoryTaskTest.class.getSimpleName());
+ PipelineContextUtils.initPipelineContextManager();
}
@AfterAll
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
index 2fb96e733bb..5422ce2f32c 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
@@ -35,17 +35,15 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class PipelineDistributedBarrierTest {
- private static final String TEST_DATABASE_NAME =
PipelineDistributedBarrierTest.class.getSimpleName();
-
@BeforeAll
static void setUp() {
- PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME);
+ PipelineContextUtils.initPipelineContextManager();
}
@Test
void assertRegisterAndRemove() throws ReflectiveOperationException {
String jobId =
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId();
- PipelineContextKey contextKey =
PipelineContextUtils.getContextKey(TEST_DATABASE_NAME);
+ PipelineContextKey contextKey = PipelineContextUtils.getContextKey();
PipelineContextManager.getContext(contextKey).getPersistServiceFacade().getRepository().persist(PipelineMetaDataNode.getJobRootPath(jobId),
"");
PipelineDistributedBarrier instance =
PipelineDistributedBarrier.getInstance(contextKey);
String parentPath = "/barrier";
@@ -60,7 +58,7 @@ class PipelineDistributedBarrierTest {
@Test
void assertAwait() {
String jobId =
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId();
- PipelineContextKey contextKey =
PipelineContextUtils.getContextKey(TEST_DATABASE_NAME);
+ PipelineContextKey contextKey = PipelineContextUtils.getContextKey();
PipelineContextManager.getContext(contextKey).getPersistServiceFacade().getRepository().persist(PipelineMetaDataNode.getJobRootPath(jobId),
"");
PipelineDistributedBarrier instance =
PipelineDistributedBarrier.getInstance(contextKey);
String barrierEnablePath =
PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
index 4c742d1608c..32c4db66fd4 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
@@ -48,11 +48,9 @@ import static org.hamcrest.Matchers.is;
class ConsistencyCheckJobExecutorCallbackTest {
- private static final String TEST_DATABASE_NAME =
ConsistencyCheckJobExecutorCallbackTest.class.getSimpleName();
-
@BeforeAll
static void beforeClass() {
- PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME);
+ PipelineContextUtils.initPipelineContextManager();
}
@Test
@@ -60,7 +58,7 @@ class ConsistencyCheckJobExecutorCallbackTest {
ConsistencyCheckJobId pipelineJobId = new ConsistencyCheckJobId(new
PipelineContextKey(InstanceType.PROXY),
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId());
String checkJobId = PipelineJobIdUtils.marshal(pipelineJobId);
List<YamlTableCheckRangePosition> expectedYamlTableCheckRangePositions
= Collections.singletonList(createYamlTableCheckRangePosition());
-
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)).getJobItemFacade().getProcess().persist(checkJobId,
0,
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId,
0,
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectedYamlTableCheckRangePositions)));
ConsistencyCheckJobExecutorCallback callback = new
ConsistencyCheckJobExecutorCallback();
ConsistencyCheckJobConfiguration jobConfig = new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(createYamlConsistencyCheckJobConfiguration(checkJobId));
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
index f0faac443af..bb8ba7c779c 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java
@@ -55,8 +55,6 @@ import static org.junit.jupiter.api.Assertions.assertNull;
class ConsistencyCheckJobAPITest {
- private static final String TEST_DATABASE_NAME =
ConsistencyCheckJobAPITest.class.getSimpleName();
-
private final ConsistencyCheckJobType jobType = new
ConsistencyCheckJobType();
private final ConsistencyCheckJobAPI jobAPI = new
ConsistencyCheckJobAPI(jobType);
@@ -67,7 +65,7 @@ class ConsistencyCheckJobAPITest {
@BeforeAll
static void beforeClass() {
- PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME);
+ PipelineContextUtils.initPipelineContextManager();
}
@Test
@@ -83,7 +81,7 @@ class ConsistencyCheckJobAPITest {
assertNull(checkJobConfig.getAlgorithmTypeName());
int sequence = ConsistencyCheckJobId.parseSequence(expectCheckJobId);
assertThat(sequence, is(expectedSequence));
- PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME));
+ PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
Collection<String> actualCheckJobIds =
governanceFacade.getJobFacade().getCheck().listCheckJobIds(parentJobId);
assertThat(actualCheckJobIds.size(), is(1));
assertThat(actualCheckJobIds.iterator().next(), is(expectCheckJobId));
@@ -95,7 +93,7 @@ class ConsistencyCheckJobAPITest {
void assertDropByParentJobId() {
MigrationJobConfiguration parentJobConfig =
jobConfigSwapper.swapToObject(JobConfigurationBuilder.createYamlMigrationJobConfiguration());
String parentJobId = parentJobConfig.getJobId();
- PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME));
+ PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
int expectedSequence = 1;
for (int i = 0; i < 3; i++) {
String checkJobId = jobAPI.start(new
CreateConsistencyCheckJobParameter(parentJobId, null, null,
@@ -237,7 +235,7 @@ class ConsistencyCheckJobAPITest {
private void persistCheckJobResult(final String parentJobId, final String
checkJobId) {
Map<String, TableDataConsistencyCheckResult>
dataConsistencyCheckResult = Collections.singletonMap("t_order", new
TableDataConsistencyCheckResult(true));
- PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME));
+ PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
governanceFacade.getJobFacade().getCheck().persistCheckJobResult(parentJobId,
checkJobId, dataConsistencyCheckResult);
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
index 83fb82cbe9e..91b60d1e063 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java
@@ -108,11 +108,9 @@ class MigrationJobAPITest {
private static DatabaseType databaseType;
- private static final String TEST_DATABASE_NAME =
MigrationJobAPITest.class.getSimpleName();
-
@BeforeAll
static void beforeClass() {
- PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME);
+ PipelineContextUtils.initPipelineContextManager();
jobType = new MigrationJobType();
jobAPI = (MigrationJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION");
jobConfigManager = new PipelineJobConfigurationManager(jobType);
@@ -125,13 +123,12 @@ class MigrationJobAPITest {
props.put("jdbcUrl", jdbcUrl);
props.put("username", "root");
props.put("password", "root");
-
jobAPI.registerMigrationSourceStorageUnits(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME),
Collections.singletonMap("ds_0",
- new
DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props)));
+
jobAPI.registerMigrationSourceStorageUnits(PipelineContextUtils.getContextKey(),
Collections.singletonMap("ds_0", new
DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props)));
}
@AfterAll
static void afterClass() {
-
jobAPI.dropMigrationSourceResources(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME),
Collections.singletonList("ds_0"));
+
jobAPI.dropMigrationSourceResources(PipelineContextUtils.getContextKey(),
Collections.singletonList("ds_0"));
}
@Test
@@ -254,7 +251,7 @@ class MigrationJobAPITest {
@Test
void assertAddMigrationSourceResources() {
PipelineDataSourcePersistService persistService = new
PipelineDataSourcePersistService();
- Map<String, DataSourcePoolProperties> actual =
persistService.load(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME),
"MIGRATION");
+ Map<String, DataSourcePoolProperties> actual =
persistService.load(PipelineContextUtils.getContextKey(), "MIGRATION");
assertTrue(actual.containsKey("ds_0"));
}
@@ -262,20 +259,20 @@ class MigrationJobAPITest {
void assertCreateJobConfigFailedOnMoreThanOneSourceTable() {
Collection<MigrationSourceTargetEntry> sourceTargetEntries =
Stream.of("t_order_0", "t_order_1")
.map(each -> new MigrationSourceTargetEntry(new
DataNode("ds_0", each), "t_order")).collect(Collectors.toList());
- assertThrows(PipelineInvalidParameterException.class, () ->
jobAPI.schedule(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME),
sourceTargetEntries, "logic_db"));
+ assertThrows(PipelineInvalidParameterException.class, () ->
jobAPI.schedule(PipelineContextUtils.getContextKey(), sourceTargetEntries,
"logic_db"));
}
@Test
void assertCreateJobConfigFailedOnDataSourceNotExist() {
Collection<MigrationSourceTargetEntry> sourceTargetEntries =
Collections.singleton(new MigrationSourceTargetEntry(new
DataNode("ds_not_exists", "t_order"), "t_order"));
- assertThrows(PipelineInvalidParameterException.class, () ->
jobAPI.schedule(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME),
sourceTargetEntries, "logic_db"));
+ assertThrows(PipelineInvalidParameterException.class, () ->
jobAPI.schedule(PipelineContextUtils.getContextKey(), sourceTargetEntries,
"logic_db"));
}
@Test
void assertCreateJobConfig() throws SQLException {
initIntPrimaryEnvironment();
MigrationSourceTargetEntry sourceTargetEntry = new
MigrationSourceTargetEntry(new DataNode("ds_0", "t_order"), "t_order");
- String jobId =
jobAPI.schedule(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME),
Collections.singleton(sourceTargetEntry), "logic_db");
+ String jobId = jobAPI.schedule(PipelineContextUtils.getContextKey(),
Collections.singleton(sourceTargetEntry), "logic_db");
MigrationJobConfiguration actual =
jobConfigManager.getJobConfiguration(jobId);
assertThat(actual.getTargetDatabaseName(), is("logic_db"));
List<JobDataNodeLine> dataNodeLines = actual.getJobShardingDataNodes();
@@ -290,7 +287,7 @@ class MigrationJobAPITest {
}
private void initIntPrimaryEnvironment() throws SQLException {
- Map<String, DataSourcePoolProperties> metaDataDataSource = new
PipelineDataSourcePersistService().load(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME),
"MIGRATION");
+ Map<String, DataSourcePoolProperties> metaDataDataSource = new
PipelineDataSourcePersistService().load(PipelineContextUtils.getContextKey(),
"MIGRATION");
DataSourcePoolProperties props = metaDataDataSource.get("ds_0");
try (
PipelineDataSource dataSource = new
PipelineDataSource(DataSourcePoolCreator.create(props), databaseType);
@@ -303,7 +300,7 @@ class MigrationJobAPITest {
@Test
void assertShowMigrationSourceResources() {
- Collection<Collection<Object>> actual =
jobAPI.listMigrationSourceResources(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME));
+ Collection<Collection<Object>> actual =
jobAPI.listMigrationSourceResources(PipelineContextUtils.getContextKey());
assertThat(actual.size(), is(1));
Collection<Object> objects = actual.iterator().next();
assertThat(objects.toArray()[0], is("ds_0"));
@@ -316,8 +313,7 @@ class MigrationJobAPITest {
YamlTransmissionJobItemProgress yamlJobItemProgress = new
YamlTransmissionJobItemProgress();
yamlJobItemProgress.setStatus(JobStatus.RUNNING.name());
yamlJobItemProgress.setSourceDatabaseType("MySQL");
-
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)).getJobItemFacade().getProcess().persist(jobConfig.getJobId(),
0,
- YamlEngine.marshal(yamlJobItemProgress));
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobConfig.getJobId(),
0, YamlEngine.marshal(yamlJobItemProgress));
Collection<TransmissionJobItemInfo> jobItemInfos =
transmissionJobManager.getJobItemInfos(jobConfig.getJobId());
assertThat(jobItemInfos.size(), is(1));
TransmissionJobItemInfo jobItemInfo = jobItemInfos.iterator().next();
@@ -334,8 +330,7 @@ class MigrationJobAPITest {
yamlJobItemProgress.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK.name());
yamlJobItemProgress.setProcessedRecordsCount(100L);
yamlJobItemProgress.setInventoryRecordsCount(50L);
-
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)).getJobItemFacade().getProcess().persist(jobConfig.getJobId(),
0,
- YamlEngine.marshal(yamlJobItemProgress));
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobConfig.getJobId(),
0, YamlEngine.marshal(yamlJobItemProgress));
Collection<TransmissionJobItemInfo> jobItemInfos =
transmissionJobManager.getJobItemInfos(jobConfig.getJobId());
TransmissionJobItemInfo jobItemInfo =
jobItemInfos.stream().iterator().next();
assertThat(jobItemInfo.getJobItemProgress().getStatus(),
is(JobStatus.EXECUTE_INCREMENTAL_TASK));
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index 3720397f7e0..04f4574ad55 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -50,11 +50,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class MigrationDataConsistencyCheckerTest {
- private static final String TEST_DATABASE_NAME =
MigrationDataConsistencyCheckerTest.class.getSimpleName();
-
@BeforeAll
static void beforeClass() {
- PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME);
+ PipelineContextUtils.initPipelineContextManager();
}
@Test
@@ -81,7 +79,7 @@ class MigrationDataConsistencyCheckerTest {
jobConfigurationPOJO.setJobParameter(YamlEngine.marshal(new
YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
jobConfigurationPOJO.setJobName(jobConfig.getJobId());
jobConfigurationPOJO.setShardingTotalCount(1);
- PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME));
+ PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
getClusterPersistRepository().persist(String.format("/pipeline/jobs/%s/config",
jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
governanceFacade.getJobItemFacade().getProcess().persist(jobConfig.getJobId(),
0, "");
return new MigrationDataConsistencyChecker(jobConfig, new
TransmissionProcessContext(jobConfig.getJobId(), null),
@@ -89,7 +87,7 @@ class MigrationDataConsistencyCheckerTest {
}
private ClusterPersistRepository getClusterPersistRepository() {
- return (ClusterPersistRepository)
PipelineContextManager.getContext(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)).getPersistServiceFacade().getRepository();
+ return (ClusterPersistRepository)
PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getPersistServiceFacade().getRepository();
}
private ConsistencyCheckJobItemProgressContext
createConsistencyCheckJobItemProgressContext(final String jobId) {
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
index ded82dca6a9..e58fb2b4122 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
@@ -92,16 +92,16 @@ import static org.mockito.Mockito.mock;
*/
public final class PipelineContextUtils {
+ private static final PipelineContextKey CONTEXT_KEY = new
PipelineContextKey(InstanceType.PROXY);
+
private static final PipelineExecuteEngine EXECUTE_ENGINE =
PipelineExecuteEngine.newCachedThreadInstance(PipelineContextUtils.class.getSimpleName());
/**
* Init pipeline context manager.
- *
- * @param databaseName database name
*/
- public static void initPipelineContextManager(final String databaseName) {
+ public static void initPipelineContextManager() {
EmbedTestingServer.start();
- PipelineContextKey contextKey = getContextKey(databaseName);
+ PipelineContextKey contextKey = getContextKey();
if (null != PipelineContextManager.getContext(contextKey)) {
return;
}
@@ -171,11 +171,10 @@ public final class PipelineContextUtils {
/**
* Get context key.
*
- * @param databaseName database name
* @return context key
*/
- public static PipelineContextKey getContextKey(final String databaseName) {
- return new PipelineContextKey(databaseName, InstanceType.PROXY);
+ public static PipelineContextKey getContextKey() {
+ return CONTEXT_KEY;
}
/**