This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a7c5f98c0d Refactor GovernanceRepositoryAPI (#29117)
2a7c5f98c0d is described below

commit 2a7c5f98c0d0fb517d8f156f7c4239f9fd93bf35
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Nov 21 23:56:10 2023 +0800

    Refactor GovernanceRepositoryAPI (#29117)
    
    * Remove GovernanceRepositoryAPI.getChildrenKeys()
    
    * Remove GovernanceRepositoryAPI.watchPipeLineRootPath()
    
    * Rename GovernanceRepositoryAPI.watchPipeLineRootPath()
    
    * Rename GovernanceRepositoryAPI.updateJobItemErrorMessage()
    
    * Refactor GovernanceRepositoryAPI
---
 .../metadata/node/PipelineMetaDataNodeWatcher.java |  2 +-
 .../repository/GovernanceRepositoryAPI.java        | 44 +++++++++++-----------
 .../repository/GovernanceRepositoryAPIImpl.java    | 27 +++++++------
 .../service/PipelineJobIteErrorMessageManager.java |  5 +--
 .../core/job/service/PipelineJobManager.java       |  5 +--
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      |  5 +--
 .../service/GovernanceRepositoryAPIImplTest.java   | 24 ++++++------
 .../MigrationDataConsistencyCheckerTest.java       | 10 ++++-
 8 files changed, 66 insertions(+), 56 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java
index 39f5c0a064b..42003ec1d66 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java
@@ -50,7 +50,7 @@ public final class PipelineMetaDataNodeWatcher {
     private PipelineMetaDataNodeWatcher(final PipelineContextKey contextKey) {
         
listenerMap.putAll(ShardingSphereServiceLoader.getServiceInstances(PipelineMetaDataChangedEventHandler.class)
                 
.stream().collect(Collectors.toMap(PipelineMetaDataChangedEventHandler::getKeyPattern,
 each -> each)));
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).watch(PipelineNodePath.DATA_PIPELINE_ROOT,
 this::dispatchEvent);
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).watchPipeLineRootPath(this::dispatchEvent);
     }
     
     private void dispatchEvent(final DataChangedEvent event) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
index 83904a361a9..87496931229 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
@@ -17,8 +17,10 @@
 
 package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
 
+import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
 
 import java.util.Collection;
@@ -31,6 +33,13 @@ import java.util.Optional;
  */
 public interface GovernanceRepositoryAPI {
     
+    /**
+     * Watch pipeLine root path.
+     *
+     * @param listener data changed event listener
+     */
+    void watchPipeLineRootPath(DataChangedEventListener listener);
+    
     /**
      * Whether job configuration existed.
      *
@@ -147,36 +156,29 @@ public interface GovernanceRepositoryAPI {
     void deleteJob(String jobId);
     
     /**
-     * Get node's sub-nodes list.
-     *
-     * @param key key of data
-     * @return sub-nodes name list
-     */
-    List<String> getChildrenKeys(String key);
-    
-    /**
-     * Watch key or path of governance server.
+     * Persist job root info.
      *
-     * @param key key of data
-     * @param listener data changed event listener
+     * @param jobId job ID
+     * @param jobClass job class
      */
-    void watch(String key, DataChangedEventListener listener);
+    void persistJobRootInfo(String jobId, Class<? extends PipelineJob> 
jobClass);
     
     /**
-     * Persist data.
-     *
-     * @param key key of data
-     * @param value value of data
+     * Persist job configuration.
+     * 
+     * @param jobId job ID
+     * @param jobConfigPOJO job configuration POJO
      */
-    void persist(String key, String value);
+    void persistJobConfiguration(String jobId, JobConfigurationPOJO 
jobConfigPOJO);
     
     /**
-     * Update data.
+     * Update job item error message.
      *
-     * @param key key of data
-     * @param value value of data
+     * @param jobId job ID
+     * @param shardingItem sharding item
+     * @param errorMessage error message
      */
-    void update(String key, String value);
+    void updateJobItemErrorMessage(String jobId, int shardingItem, String 
errorMessage);
     
     /**
      * Get sharding items of job.
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
index 9e12ee8d9a6..1ea1dee6c94 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
@@ -20,13 +20,16 @@ package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository
 import com.google.common.base.Strings;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResultSwapper;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
@@ -50,6 +53,11 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
     
     private final ClusterPersistRepository repository;
     
+    @Override
+    public void watchPipeLineRootPath(final DataChangedEventListener listener) 
{
+        repository.watch(PipelineNodePath.DATA_PIPELINE_ROOT, listener);
+    }
+    
     @Override
     public boolean isJobConfigurationExisted(final String jobId) {
         return null != 
repository.getDirectly(PipelineMetaDataNode.getJobConfigurationPath(jobId));
@@ -142,28 +150,23 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
     }
     
     @Override
-    public List<String> getChildrenKeys(final String key) {
-        return repository.getChildrenKeys(key);
-    }
-    
-    @Override
-    public void watch(final String key, final DataChangedEventListener 
listener) {
-        repository.watch(key, listener);
+    public void persistJobRootInfo(final String jobId, final Class<? extends 
PipelineJob> jobClass) {
+        repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), 
jobClass.getName());
     }
     
     @Override
-    public void persist(final String key, final String value) {
-        repository.persist(key, value);
+    public void persistJobConfiguration(final String jobId, final 
JobConfigurationPOJO jobConfigPOJO) {
+        
repository.persist(PipelineMetaDataNode.getJobConfigurationPath(jobId), 
YamlEngine.marshal(jobConfigPOJO));
     }
     
     @Override
-    public void update(final String key, final String value) {
-        repository.update(key, value);
+    public void updateJobItemErrorMessage(final String jobId, final int 
shardingItem, final String errorMessage) {
+        
repository.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, 
shardingItem), errorMessage);
     }
     
     @Override
     public List<Integer> getShardingItems(final String jobId) {
-        List<String> result = 
getChildrenKeys(PipelineMetaDataNode.getJobOffsetPath(jobId));
+        List<String> result = 
repository.getChildrenKeys(PipelineMetaDataNode.getJobOffsetPath(jobId));
         return 
result.stream().map(Integer::parseInt).collect(Collectors.toList());
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
index a20cd2e7a4f..669d7eebbfe 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.data.pipeline.core.job.service;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
-import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 
@@ -56,7 +55,7 @@ public final class PipelineJobIteErrorMessageManager {
      * @param error error
      */
     public void updateErrorMessage(final Object error) {
-        
governanceRepositoryAPI.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
 shardingItem), null == error ? "" : buildErrorMessage(error));
+        governanceRepositoryAPI.updateJobItemErrorMessage(jobId, shardingItem, 
null == error ? "" : buildErrorMessage(error));
     }
     
     private String buildErrorMessage(final Object error) {
@@ -67,6 +66,6 @@ public final class PipelineJobIteErrorMessageManager {
      * Clean job item error message.
      */
     public void cleanErrorMessage() {
-        
governanceRepositoryAPI.persist(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
 shardingItem), "");
+        governanceRepositoryAPI.updateJobItemErrorMessage(jobId, shardingItem, 
"");
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index e8e1c821481..9dfa1c4f62a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -33,7 +33,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
@@ -80,8 +79,8 @@ public final class PipelineJobManager {
             log.warn("jobId already exists in registry center, ignore, job id 
is `{}`", jobId);
             return Optional.of(jobId);
         }
-        repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), 
jobAPI.getJobClass().getName());
-        
repositoryAPI.persist(PipelineMetaDataNode.getJobConfigurationPath(jobId), 
YamlEngine.marshal(jobConfig.convertToJobConfigurationPOJO()));
+        repositoryAPI.persistJobRootInfo(jobId, jobAPI.getJobClass());
+        repositoryAPI.persistJobConfiguration(jobId, 
jobConfig.convertToJobConfigurationPOJO());
         return Optional.of(jobId);
     }
     
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 17c3daa7a1f..0fdff431d4b 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -49,7 +49,6 @@ import 
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipeli
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
-import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
 import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
@@ -125,10 +124,10 @@ public final class CDCJobAPI implements 
InventoryIncrementalJobAPI {
         if (repositoryAPI.isJobConfigurationExisted(jobConfig.getJobId())) {
             log.warn("CDC job already exists in registry center, ignore, job 
id is `{}`", jobConfig.getJobId());
         } else {
-            
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()),
 getJobClass().getName());
+            repositoryAPI.persistJobRootInfo(jobConfig.getJobId(), 
getJobClass());
             JobConfigurationPOJO jobConfigPOJO = 
jobConfig.convertToJobConfigurationPOJO();
             jobConfigPOJO.setDisabled(true);
-            
repositoryAPI.persist(PipelineMetaDataNode.getJobConfigurationPath(jobConfig.getJobId()),
 YamlEngine.marshal(jobConfigPOJO));
+            repositoryAPI.persistJobConfiguration(jobConfig.getJobId(), 
jobConfigPOJO);
             if (!param.isFull()) {
                 initIncrementalPosition(jobConfig);
             }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
index 9c5bca8cf1d..cf5203910f4 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.test.it.data.pipeline.core.job.service;
 
+import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath;
@@ -32,6 +33,8 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
 import org.apache.shardingsphere.mode.event.DataChangedEvent;
 import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
 import org.junit.jupiter.api.BeforeAll;
@@ -71,7 +74,7 @@ class GovernanceRepositoryAPIImplTest {
     }
     
     private static void watch() {
-        governanceRepositoryAPI.watch(PipelineNodePath.DATA_PIPELINE_ROOT, 
event -> {
+        governanceRepositoryAPI.watchPipeLineRootPath(event -> {
             if ((PipelineNodePath.DATA_PIPELINE_ROOT + 
"/1").equals(event.getKey())) {
                 EVENT_ATOMIC_REFERENCE.set(event);
                 COUNT_DOWN_LATCH.countDown();
@@ -82,7 +85,7 @@ class GovernanceRepositoryAPIImplTest {
     @Test
     void assertIsJobConfigurationExisted() {
         
assertFalse(governanceRepositoryAPI.isJobConfigurationExisted("foo_job"));
-        governanceRepositoryAPI.persist("/pipeline/jobs/foo_job/config", 
"foo");
+        getClusterPersistRepository().persist("/pipeline/jobs/foo_job/config", 
"foo");
         
assertTrue(governanceRepositoryAPI.isJobConfigurationExisted("foo_job"));
     }
     
@@ -114,24 +117,16 @@ class GovernanceRepositoryAPIImplTest {
     
     @Test
     void assertDeleteJob() {
-        governanceRepositoryAPI.persist(PipelineNodePath.DATA_PIPELINE_ROOT + 
"/1", "");
+        
getClusterPersistRepository().persist(PipelineNodePath.DATA_PIPELINE_ROOT + 
"/1", "");
         governanceRepositoryAPI.deleteJob("1");
         Optional<String> actual = 
governanceRepositoryAPI.getJobItemProgress("1", 0);
         assertFalse(actual.isPresent());
     }
     
-    @Test
-    void assertGetChildrenKeys() {
-        governanceRepositoryAPI.persist(PipelineNodePath.DATA_PIPELINE_ROOT + 
"/1", "");
-        List<String> actual = 
governanceRepositoryAPI.getChildrenKeys(PipelineNodePath.DATA_PIPELINE_ROOT);
-        assertFalse(actual.isEmpty());
-        assertTrue(actual.contains("1"));
-    }
-    
     @Test
     void assertWatch() throws InterruptedException {
         String key = PipelineNodePath.DATA_PIPELINE_ROOT + "/1";
-        governanceRepositoryAPI.persist(key, "");
+        getClusterPersistRepository().persist(key, "");
         boolean awaitResult = COUNT_DOWN_LATCH.await(10, TimeUnit.SECONDS);
         assertTrue(awaitResult);
         DataChangedEvent event = EVENT_ATOMIC_REFERENCE.get();
@@ -167,6 +162,11 @@ class GovernanceRepositoryAPIImplTest {
         
assertFalse(governanceRepositoryAPI.getLatestCheckJobId(parentJobId).isPresent(),
 "Expected no checkJobId to be present after deletion");
     }
     
+    private ClusterPersistRepository getClusterPersistRepository() {
+        ContextManager contextManager = 
PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getContextManager();
+        return (ClusterPersistRepository) 
contextManager.getMetaDataContexts().getPersistService().getRepository();
+    }
+    
     private MigrationJobItemContext mockJobItemContext() {
         MigrationJobItemContext result = 
PipelineContextUtils.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration());
         MigrationTaskConfiguration taskConfig = result.getTaskConfig();
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index 3ba0e7724e8..375062ab197 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -18,6 +18,7 @@
 package 
org.apache.shardingsphere.test.it.data.pipeline.scenario.migration.check.consistency;
 
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
@@ -32,6 +33,8 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.Migrat
 import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
 import org.junit.jupiter.api.BeforeAll;
@@ -59,7 +62,7 @@ class MigrationDataConsistencyCheckerTest {
         jobConfigurationPOJO.setJobName(jobConfig.getJobId());
         jobConfigurationPOJO.setShardingTotalCount(1);
         GovernanceRepositoryAPI governanceRepositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
-        
governanceRepositoryAPI.persist(String.format("/pipeline/jobs/%s/config", 
jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
+        
getClusterPersistRepository().persist(String.format("/pipeline/jobs/%s/config", 
jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
         governanceRepositoryAPI.persistJobItemProgress(jobConfig.getJobId(), 
0, "");
         Map<String, TableDataConsistencyCheckResult> actual = new 
MigrationDataConsistencyChecker(jobConfig, new 
MigrationProcessContext(jobConfig.getJobId(), null),
                 
createConsistencyCheckJobItemProgressContext(jobConfig.getJobId())).check("FIXTURE",
 null);
@@ -68,6 +71,11 @@ class MigrationDataConsistencyCheckerTest {
         assertTrue(actual.get(checkKey).isMatched());
     }
     
+    private ClusterPersistRepository getClusterPersistRepository() {
+        ContextManager contextManager = 
PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getContextManager();
+        return (ClusterPersistRepository) 
contextManager.getMetaDataContexts().getPersistService().getRepository();
+    }
+    
     private ConsistencyCheckJobItemProgressContext 
createConsistencyCheckJobItemProgressContext(final String jobId) {
         return new ConsistencyCheckJobItemProgressContext(jobId, 0, "H2");
     }

Reply via email to