This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e98467e43f5 Refactor TransmissionJobItemProgress (#32736)
e98467e43f5 is described below
commit e98467e43f52a80593d0a167523843298d88bd22
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Aug 30 19:39:17 2024 +0800
Refactor TransmissionJobItemProgress (#32736)
* Refactor PipelineJobRunnerManager
* Refactor TransmissionJobItemProgress
---
.../core/job/AbstractSeparablePipelineJob.java | 2 +-
.../core/job/progress/TransmissionJobItemProgress.java | 18 +++++++++---------
.../YamlTransmissionJobItemProgressSwapper.java | 12 +++++-------
.../job/progress/PipelineJobProgressDetectorTest.java | 16 ++++++++--------
.../data/pipeline/cdc/api/CDCJobAPI.java | 7 ++-----
5 files changed, 25 insertions(+), 30 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 196034b3549..648665e70b7 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -124,7 +124,7 @@ public abstract class AbstractSeparablePipelineJob<T
extends PipelineJobConfigur
protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext);
- protected final void prepare(final I jobItemContext) {
+ private void prepare(final I jobItemContext) {
try {
doPrepare(jobItemContext);
// CHECKSTYLE:OFF
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java
index 56c18fa144c..88876f0763a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java
@@ -18,13 +18,13 @@
package org.apache.shardingsphere.data.pipeline.core.job.progress;
import lombok.Getter;
-import lombok.NoArgsConstructor;
+import lombok.RequiredArgsConstructor;
import lombok.Setter;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
import
org.apache.shardingsphere.data.pipeline.core.task.progress.InventoryTaskProgress;
-import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import java.util.Collection;
@@ -34,22 +34,22 @@ import java.util.Map;
/**
* Transmission job item progress.
*/
-@NoArgsConstructor
+@RequiredArgsConstructor
@Getter
@Setter
public final class TransmissionJobItemProgress implements
PipelineJobItemProgress {
- private DatabaseType sourceDatabaseType;
+ private final DatabaseType sourceDatabaseType;
- private String dataSourceName;
+ private final String dataSourceName;
- private JobItemInventoryTasksProgress inventory;
+ private final JobItemInventoryTasksProgress inventory;
- private JobItemIncrementalTasksProgress incremental;
+ private final JobItemIncrementalTasksProgress incremental;
- private long inventoryRecordsCount;
+ private final long inventoryRecordsCount;
- private long processedRecordsCount;
+ private final long processedRecordsCount;
private boolean active;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java
index 43e63745c3a..4082d0dc487 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java
@@ -47,14 +47,12 @@ public final class YamlTransmissionJobItemProgressSwapper
implements YamlPipelin
@Override
public TransmissionJobItemProgress swapToObject(final
YamlTransmissionJobItemProgress yamlProgress) {
- TransmissionJobItemProgress result = new TransmissionJobItemProgress();
+ TransmissionJobItemProgress result = new TransmissionJobItemProgress(
+ TypedSPILoader.getService(DatabaseType.class,
yamlProgress.getSourceDatabaseType()),
+ yamlProgress.getDataSourceName(),
inventoryTasksProgressSwapper.swapToObject(yamlProgress.getInventory()),
+
incrementalTasksProgressSwapper.swapToObject(yamlProgress.getSourceDatabaseType(),
yamlProgress.getIncremental()),
+ yamlProgress.getInventoryRecordsCount(),
yamlProgress.getProcessedRecordsCount());
result.setStatus(JobStatus.valueOf(yamlProgress.getStatus()));
-
result.setSourceDatabaseType(TypedSPILoader.getService(DatabaseType.class,
yamlProgress.getSourceDatabaseType()));
- result.setDataSourceName(yamlProgress.getDataSourceName());
-
result.setInventory(inventoryTasksProgressSwapper.swapToObject(yamlProgress.getInventory()));
-
result.setIncremental(incrementalTasksProgressSwapper.swapToObject(yamlProgress.getSourceDatabaseType(),
yamlProgress.getIncremental()));
-
result.setProcessedRecordsCount(yamlProgress.getProcessedRecordsCount());
-
result.setInventoryRecordsCount(yamlProgress.getInventoryRecordsCount());
return result;
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java
index 9a3bfdd5971..850cf0d1991 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
class PipelineJobProgressDetectorTest {
@@ -67,31 +68,30 @@ class PipelineJobProgressDetectorTest {
@Test
void assertIsInventoryFinishedWhenJobCountDoesNotMatchJobItemProgresses() {
- TransmissionJobItemProgress transmissionJobItemProgress = new
TransmissionJobItemProgress();
- assertFalse(PipelineJobProgressDetector.isInventoryFinished(2,
Collections.singleton(transmissionJobItemProgress)));
+ assertFalse(PipelineJobProgressDetector.isInventoryFinished(2,
Collections.singleton(mock(TransmissionJobItemProgress.class))));
}
@Test
void assertIsInventoryFinishedWhenInventoryTaskProgressHasEmptyMap() {
JobItemInventoryTasksProgress jobItemInventoryTasksProgress = new
JobItemInventoryTasksProgress(Collections.emptyMap());
- TransmissionJobItemProgress transmissionJobItemProgress = new
TransmissionJobItemProgress();
-
transmissionJobItemProgress.setInventory(jobItemInventoryTasksProgress);
+ TransmissionJobItemProgress transmissionJobItemProgress =
mock(TransmissionJobItemProgress.class);
+
when(transmissionJobItemProgress.getInventory()).thenReturn(jobItemInventoryTasksProgress);
assertFalse(PipelineJobProgressDetector.isInventoryFinished(1,
Collections.singleton(transmissionJobItemProgress)));
}
@Test
void assertIsInventoryFinishedWhenNotAllInventoryTasksCompleted() {
JobItemInventoryTasksProgress inventoryTasksProgress = new
JobItemInventoryTasksProgress(Collections.singletonMap("TEST", new
InventoryTaskProgress(new IngestPlaceholderPosition())));
- TransmissionJobItemProgress transmissionJobItemProgress = new
TransmissionJobItemProgress();
- transmissionJobItemProgress.setInventory(inventoryTasksProgress);
+ TransmissionJobItemProgress transmissionJobItemProgress =
mock(TransmissionJobItemProgress.class);
+
when(transmissionJobItemProgress.getInventory()).thenReturn(inventoryTasksProgress);
assertFalse(PipelineJobProgressDetector.isInventoryFinished(1,
Collections.singleton(transmissionJobItemProgress)));
}
@Test
void assertIsInventoryFinished() {
JobItemInventoryTasksProgress inventoryTasksProgress = new
JobItemInventoryTasksProgress(Collections.singletonMap("TEST", new
InventoryTaskProgress(new IngestFinishedPosition())));
- TransmissionJobItemProgress transmissionJobItemProgress = new
TransmissionJobItemProgress();
- transmissionJobItemProgress.setInventory(inventoryTasksProgress);
+ TransmissionJobItemProgress transmissionJobItemProgress =
mock(TransmissionJobItemProgress.class);
+
when(transmissionJobItemProgress.getInventory()).thenReturn(inventoryTasksProgress);
assertTrue(PipelineJobProgressDetector.isInventoryFinished(1,
Collections.singleton(transmissionJobItemProgress)));
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 2e9d6187cf8..b8d5447ce3f 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -207,13 +207,10 @@ public final class CDCJobAPI implements
TransmissionJobAPI {
private TransmissionJobItemProgress getTransmissionJobItemProgress(final
CDCJobConfiguration jobConfig, final PipelineDataSourceManager
dataSourceManager,
final
IncrementalDumperContext incrementalDumperContext) throws SQLException {
- TransmissionJobItemProgress result = new TransmissionJobItemProgress();
- result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
-
result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName());
IncrementalTaskPositionManager positionManager = new
IncrementalTaskPositionManager(incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType());
IncrementalTaskProgress incrementalTaskProgress = new
IncrementalTaskProgress(positionManager.getPosition(null,
incrementalDumperContext, dataSourceManager));
- result.setIncremental(new
JobItemIncrementalTasksProgress(incrementalTaskProgress));
- return result;
+ return new
TransmissionJobItemProgress(jobConfig.getSourceDatabaseType(),
incrementalDumperContext.getCommonContext().getDataSourceName(), null,
+ new JobItemIncrementalTasksProgress(incrementalTaskProgress),
0L, 0L);
}
/**