This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 8d18aff2402 Fix job item error message clean: node not created in
repository (#29437)
8d18aff2402 is described below
commit 8d18aff240240ce0c95e93f8fb683a74a91daf2f
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon Dec 18 18:54:32 2023 +0800
Fix job item error message clean: node not created in repository (#29437)
---
...PipelineJobItemErrorMessageGovernanceRepository.java | 2 +-
.../repository}/PipelineGovernanceFacadeTest.java | 17 ++++++++++++++++-
2 files changed, 17 insertions(+), 2 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/item/PipelineJobItemErrorMessageGovernanceRepository.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/item/PipelineJobItemErrorMessageGovernanceRepository.java
index cc59b951d48..7410efb617f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/item/PipelineJobItemErrorMessageGovernanceRepository.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/item/PipelineJobItemErrorMessageGovernanceRepository.java
@@ -50,7 +50,7 @@ public final class
PipelineJobItemErrorMessageGovernanceRepository {
* @param shardingItem sharding item
*/
public void clean(final String jobId, final int shardingItem) {
-
repository.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem), "");
+
repository.persist(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem), "");
}
/**
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/PipelineGovernanceFacadeTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java
similarity index 88%
rename from
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/PipelineGovernanceFacadeTest.java
rename to
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java
index bc36eeae07d..442a2c82e2b 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/PipelineGovernanceFacadeTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java
@@ -15,13 +15,14 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.test.it.data.pipeline.core.job.service;
+package
org.apache.shardingsphere.test.it.data.pipeline.core.registrycenter.repository;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.core.job.progress.JobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineNodePath;
import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
+import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.item.PipelineJobItemErrorMessageGovernanceRepository;
import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.item.PipelineJobItemProcessGovernanceRepository;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
@@ -154,6 +155,20 @@ class PipelineGovernanceFacadeTest {
assertThat(actual.get(), is("testValue2"));
}
+ @Test
+ void assertPersistJobItemErrorMessage() {
+ MigrationJobItemContext jobItemContext = mockJobItemContext();
+ PipelineJobItemErrorMessageGovernanceRepository
jobItemErrorMessageRepository =
governanceFacade.getJobItemFacade().getErrorMessage();
+ jobItemErrorMessageRepository.clean(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
+
assertThat(jobItemErrorMessageRepository.load(jobItemContext.getJobId(),
jobItemContext.getShardingItem()), is(""));
+ jobItemErrorMessageRepository.update(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), new Exception("__DEBUG__"));
+ String actual =
jobItemErrorMessageRepository.load(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
+ assertNotNull(actual, "Error message is null");
+ assertTrue(actual.contains("__DEBUG__"), "Error message does not
contain __DEBUG__");
+ jobItemErrorMessageRepository.clean(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
+
assertThat(jobItemErrorMessageRepository.load(jobItemContext.getJobId(),
jobItemContext.getShardingItem()), is(""));
+ }
+
@Test
void assertPersistJobOffset() {
assertFalse(governanceFacade.getJobFacade().getOffset().load("1").isTargetSchemaTableCreated());