This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 90b1f6bf3db Revert "Refactor TransmissionJobItemProgress (#32736)"
(#32737)
90b1f6bf3db is described below
commit 90b1f6bf3dbf5fd28ab4f27945c3820c60454fdd
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Aug 30 19:46:24 2024 +0800
Revert "Refactor TransmissionJobItemProgress (#32736)" (#32737)
This reverts commit e98467e43f52a80593d0a167523843298d88bd22.
---
.../core/job/AbstractSeparablePipelineJob.java | 2 +-
.../core/job/progress/TransmissionJobItemProgress.java | 18 +++++++++---------
.../YamlTransmissionJobItemProgressSwapper.java | 12 +++++++-----
.../job/progress/PipelineJobProgressDetectorTest.java | 16 ++++++++--------
.../data/pipeline/cdc/api/CDCJobAPI.java | 7 +++++--
5 files changed, 30 insertions(+), 25 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 648665e70b7..196034b3549 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -124,7 +124,7 @@ public abstract class AbstractSeparablePipelineJob<T
extends PipelineJobConfigur
protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext);
- private void prepare(final I jobItemContext) {
+ protected final void prepare(final I jobItemContext) {
try {
doPrepare(jobItemContext);
// CHECKSTYLE:OFF
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java
index 88876f0763a..56c18fa144c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java
@@ -18,13 +18,13 @@
package org.apache.shardingsphere.data.pipeline.core.job.progress;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import lombok.NoArgsConstructor;
import lombok.Setter;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
import
org.apache.shardingsphere.data.pipeline.core.task.progress.InventoryTaskProgress;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import java.util.Collection;
@@ -34,22 +34,22 @@ import java.util.Map;
/**
* Transmission job item progress.
*/
-@RequiredArgsConstructor
+@NoArgsConstructor
@Getter
@Setter
public final class TransmissionJobItemProgress implements
PipelineJobItemProgress {
- private final DatabaseType sourceDatabaseType;
+ private DatabaseType sourceDatabaseType;
- private final String dataSourceName;
+ private String dataSourceName;
- private final JobItemInventoryTasksProgress inventory;
+ private JobItemInventoryTasksProgress inventory;
- private final JobItemIncrementalTasksProgress incremental;
+ private JobItemIncrementalTasksProgress incremental;
- private final long inventoryRecordsCount;
+ private long inventoryRecordsCount;
- private final long processedRecordsCount;
+ private long processedRecordsCount;
private boolean active;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java
index 4082d0dc487..43e63745c3a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java
@@ -47,12 +47,14 @@ public final class YamlTransmissionJobItemProgressSwapper
implements YamlPipelin
@Override
public TransmissionJobItemProgress swapToObject(final
YamlTransmissionJobItemProgress yamlProgress) {
- TransmissionJobItemProgress result = new TransmissionJobItemProgress(
- TypedSPILoader.getService(DatabaseType.class,
yamlProgress.getSourceDatabaseType()),
- yamlProgress.getDataSourceName(),
inventoryTasksProgressSwapper.swapToObject(yamlProgress.getInventory()),
-
incrementalTasksProgressSwapper.swapToObject(yamlProgress.getSourceDatabaseType(),
yamlProgress.getIncremental()),
- yamlProgress.getInventoryRecordsCount(),
yamlProgress.getProcessedRecordsCount());
+ TransmissionJobItemProgress result = new TransmissionJobItemProgress();
result.setStatus(JobStatus.valueOf(yamlProgress.getStatus()));
+
result.setSourceDatabaseType(TypedSPILoader.getService(DatabaseType.class,
yamlProgress.getSourceDatabaseType()));
+ result.setDataSourceName(yamlProgress.getDataSourceName());
+
result.setInventory(inventoryTasksProgressSwapper.swapToObject(yamlProgress.getInventory()));
+
result.setIncremental(incrementalTasksProgressSwapper.swapToObject(yamlProgress.getSourceDatabaseType(),
yamlProgress.getIncremental()));
+
result.setProcessedRecordsCount(yamlProgress.getProcessedRecordsCount());
+
result.setInventoryRecordsCount(yamlProgress.getInventoryRecordsCount());
return result;
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java
index 850cf0d1991..9a3bfdd5971 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java
@@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
class PipelineJobProgressDetectorTest {
@@ -68,30 +67,31 @@ class PipelineJobProgressDetectorTest {
@Test
void assertIsInventoryFinishedWhenJobCountDoesNotMatchJobItemProgresses() {
- assertFalse(PipelineJobProgressDetector.isInventoryFinished(2,
Collections.singleton(mock(TransmissionJobItemProgress.class))));
+ TransmissionJobItemProgress transmissionJobItemProgress = new
TransmissionJobItemProgress();
+ assertFalse(PipelineJobProgressDetector.isInventoryFinished(2,
Collections.singleton(transmissionJobItemProgress)));
}
@Test
void assertIsInventoryFinishedWhenInventoryTaskProgressHasEmptyMap() {
JobItemInventoryTasksProgress jobItemInventoryTasksProgress = new
JobItemInventoryTasksProgress(Collections.emptyMap());
- TransmissionJobItemProgress transmissionJobItemProgress =
mock(TransmissionJobItemProgress.class);
-
when(transmissionJobItemProgress.getInventory()).thenReturn(jobItemInventoryTasksProgress);
+ TransmissionJobItemProgress transmissionJobItemProgress = new
TransmissionJobItemProgress();
+
transmissionJobItemProgress.setInventory(jobItemInventoryTasksProgress);
assertFalse(PipelineJobProgressDetector.isInventoryFinished(1,
Collections.singleton(transmissionJobItemProgress)));
}
@Test
void assertIsInventoryFinishedWhenNotAllInventoryTasksCompleted() {
JobItemInventoryTasksProgress inventoryTasksProgress = new
JobItemInventoryTasksProgress(Collections.singletonMap("TEST", new
InventoryTaskProgress(new IngestPlaceholderPosition())));
- TransmissionJobItemProgress transmissionJobItemProgress =
mock(TransmissionJobItemProgress.class);
-
when(transmissionJobItemProgress.getInventory()).thenReturn(inventoryTasksProgress);
+ TransmissionJobItemProgress transmissionJobItemProgress = new
TransmissionJobItemProgress();
+ transmissionJobItemProgress.setInventory(inventoryTasksProgress);
assertFalse(PipelineJobProgressDetector.isInventoryFinished(1,
Collections.singleton(transmissionJobItemProgress)));
}
@Test
void assertIsInventoryFinished() {
JobItemInventoryTasksProgress inventoryTasksProgress = new
JobItemInventoryTasksProgress(Collections.singletonMap("TEST", new
InventoryTaskProgress(new IngestFinishedPosition())));
- TransmissionJobItemProgress transmissionJobItemProgress =
mock(TransmissionJobItemProgress.class);
-
when(transmissionJobItemProgress.getInventory()).thenReturn(inventoryTasksProgress);
+ TransmissionJobItemProgress transmissionJobItemProgress = new
TransmissionJobItemProgress();
+ transmissionJobItemProgress.setInventory(inventoryTasksProgress);
assertTrue(PipelineJobProgressDetector.isInventoryFinished(1,
Collections.singleton(transmissionJobItemProgress)));
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index b8d5447ce3f..2e9d6187cf8 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -207,10 +207,13 @@ public final class CDCJobAPI implements
TransmissionJobAPI {
private TransmissionJobItemProgress getTransmissionJobItemProgress(final
CDCJobConfiguration jobConfig, final PipelineDataSourceManager
dataSourceManager,
final
IncrementalDumperContext incrementalDumperContext) throws SQLException {
+ TransmissionJobItemProgress result = new TransmissionJobItemProgress();
+ result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
+
result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName());
IncrementalTaskPositionManager positionManager = new
IncrementalTaskPositionManager(incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType());
IncrementalTaskProgress incrementalTaskProgress = new
IncrementalTaskProgress(positionManager.getPosition(null,
incrementalDumperContext, dataSourceManager));
- return new
TransmissionJobItemProgress(jobConfig.getSourceDatabaseType(),
incrementalDumperContext.getCommonContext().getDataSourceName(), null,
- new JobItemIncrementalTasksProgress(incrementalTaskProgress),
0L, 0L);
+ result.setIncremental(new
JobItemIncrementalTasksProgress(incrementalTaskProgress));
+ return result;
}
/**