This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c4f1ef13950 Move updateJobItemStatus method from PipelineJobAPI to
PipelineJobManager (#29076)
c4f1ef13950 is described below
commit c4f1ef13950c0155054139dd381cedd3f204782b
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 18 18:28:16 2023 +0800
Move updateJobItemStatus method from PipelineJobAPI to PipelineJobManager
(#29076)
---
.../common/job/progress/PipelineJobItemProgress.java | 7 +++++++
.../data/pipeline/core/job/service/PipelineJobAPI.java | 10 ----------
.../pipeline/core/job/service/PipelineJobManager.java | 18 ++++++++++++++++++
.../impl/AbstractInventoryIncrementalJobAPIImpl.java | 12 ------------
.../task/runner/InventoryIncrementalTasksRunner.java | 2 +-
.../data/pipeline/cdc/core/job/CDCJob.java | 2 +-
.../api/impl/ConsistencyCheckJobAPI.java | 14 --------------
.../migration/prepare/MigrationJobPreparer.java | 2 +-
.../migration/api/impl/MigrationJobAPITest.java | 4 ++--
9 files changed, 30 insertions(+), 41 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/PipelineJobItemProgress.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/PipelineJobItemProgress.java
index 4153e740733..3c9770a369f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/PipelineJobItemProgress.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/PipelineJobItemProgress.java
@@ -30,4 +30,11 @@ public interface PipelineJobItemProgress {
* @return job status
*/
JobStatus getStatus();
+
+ /**
+ * Set status.
+ *
+ * @param jobStatus job status
+ */
+ void setStatus(JobStatus jobStatus);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index 168b18d88ad..87ae49520dc 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.core.job.service;
-import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper;
@@ -76,15 +75,6 @@ public interface PipelineJobAPI extends TypedSPI {
return Optional.empty();
}
- /**
- * Update job item status.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- * @param status status
- */
- void updateJobItemStatus(String jobId, int shardingItem, JobStatus status);
-
/**
* Get pipeline job class.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index e7fd3ad85e6..3eab31d08af 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -227,6 +227,24 @@ public final class PipelineJobManager {
.persistJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
}
+ /**
+ * Update job item status.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @param status status
+ */
+ public void updateJobItemStatus(final String jobId, final int
shardingItem, final JobStatus status) {
+ Optional<PipelineJobItemProgress> jobItemProgress =
getJobItemProgress(jobId, shardingItem);
+ if (!jobItemProgress.isPresent()) {
+ log.warn("updateJobItemStatus, jobProgress is null, jobId={},
shardingItem={}", jobId, shardingItem);
+ return;
+ }
+ jobItemProgress.get().setStatus(status);
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId,
shardingItem,
+
YamlEngine.marshal(jobAPI.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get())));
+ }
+
/**
* Update job item progress.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 6b0cf4171e6..f0c85629db7 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -130,18 +130,6 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl implements Inventor
return jobOffsetInfoSwapper.swapToObject(new YamlJobOffsetInfo());
}
- @Override
- public void updateJobItemStatus(final String jobId, final int
shardingItem, final JobStatus status) {
- PipelineJobManager jobManager = new PipelineJobManager(this);
- Optional<InventoryIncrementalJobItemProgress> jobItemProgress =
jobManager.getJobItemProgress(jobId, shardingItem);
- if (!jobItemProgress.isPresent()) {
- return;
- }
- jobItemProgress.get().setStatus(status);
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId,
shardingItem,
-
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get())));
- }
-
@Override
public Collection<DataConsistencyCheckAlgorithmInfo>
listDataConsistencyCheckAlgorithms() {
Collection<DataConsistencyCheckAlgorithmInfo> result = new
LinkedList<>();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
index ad9e3ccb81b..4af5b5f3b27 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
@@ -106,7 +106,7 @@ public class InventoryIncrementalTasksRunner implements
PipelineTasksRunner {
private void updateLocalAndRemoteJobItemStatus(final JobStatus jobStatus) {
jobItemContext.setStatus(jobStatus);
- jobAPI.updateJobItemStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
+ jobManager.updateJobItemStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
}
private synchronized void executeIncrementalTask() {
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 657b7b48ccd..14cdb71513a 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -151,7 +151,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
private void updateLocalAndRemoteJobItemStatus(final
PipelineJobItemContext jobItemContext, final JobStatus jobStatus) {
jobItemContext.setStatus(jobStatus);
- jobAPI.updateJobItemStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
+ jobManager.updateJobItemStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
}
private void executeIncrementalTasks(final List<CDCJobItemContext>
jobItemContexts) {
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 4ba4d05079d..a60fcb44d5e 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -46,7 +46,6 @@ import
org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import java.sql.Timestamp;
import java.time.Duration;
@@ -117,19 +116,6 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
return true;
}
- @Override
- public void updateJobItemStatus(final String jobId, final int
shardingItem, final JobStatus status) {
- PipelineJobManager jobManager = new PipelineJobManager(this);
- Optional<ConsistencyCheckJobItemProgress> jobItemProgress =
jobManager.getJobItemProgress(jobId, shardingItem);
- if (!jobItemProgress.isPresent()) {
- log.warn("updateJobItemStatus, jobProgress is null, jobId={},
shardingItem={}", jobId, shardingItem);
- return;
- }
- jobItemProgress.get().setStatus(status);
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId,
shardingItem,
-
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get())));
- }
-
/**
* Start by parent job id.
*
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index d473001b873..50ce83843f1 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -136,7 +136,7 @@ public final class MigrationJobPreparer {
JobOffsetInfo offsetInfo = jobAPI.getJobOffsetInfo(jobId);
if (!offsetInfo.isTargetSchemaTableCreated()) {
jobItemContext.setStatus(JobStatus.PREPARING);
- jobAPI.updateJobItemStatus(jobId,
jobItemContext.getShardingItem(), JobStatus.PREPARING);
+ jobManager.updateJobItemStatus(jobId,
jobItemContext.getShardingItem(), JobStatus.PREPARING);
prepareAndCheckTarget(jobItemContext);
jobAPI.persistJobOffsetInfo(jobId, new
JobOffsetInfo(true));
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index a1a1c905dbd..6201a6cb507 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -213,7 +213,7 @@ class MigrationJobAPITest {
assertTrue(jobId.isPresent());
MigrationJobItemContext jobItemContext =
PipelineContextUtils.mockMigrationJobItemContext(jobConfig);
jobManager.persistJobItemProgress(jobItemContext);
- jobAPI.updateJobItemStatus(jobId.get(),
jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
+ jobManager.updateJobItemStatus(jobId.get(),
jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
Map<Integer, InventoryIncrementalJobItemProgress> progress =
jobAPI.getJobProgress(jobConfig);
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry :
progress.entrySet()) {
assertThat(entry.getValue().getStatus(),
is(JobStatus.EXECUTE_INVENTORY_TASK));
@@ -246,7 +246,7 @@ class MigrationJobAPITest {
final MigrationJobConfiguration jobConfig =
JobConfigurationBuilder.createJobConfiguration();
MigrationJobItemContext jobItemContext =
PipelineContextUtils.mockMigrationJobItemContext(jobConfig);
jobManager.persistJobItemProgress(jobItemContext);
- jobAPI.updateJobItemStatus(jobConfig.getJobId(), 0,
JobStatus.FINISHED);
+ jobManager.updateJobItemStatus(jobConfig.getJobId(), 0,
JobStatus.FINISHED);
Optional<InventoryIncrementalJobItemProgress> actual =
jobManager.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
assertTrue(actual.isPresent());
assertThat(actual.get().getStatus(), is(JobStatus.FINISHED));