This is an automated email from the ASF dual-hosted git repository. zhangliang pushed a commit to branch revert-32736-dev in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
commit 0435cac549c8fd112a187f77cba9e424b11bdbc5 Author: Liang Zhang <[email protected]> AuthorDate: Fri Aug 30 19:44:50 2024 +0800 Revert "Refactor TransmissionJobItemProgress (#32736)" This reverts commit e98467e43f52a80593d0a167523843298d88bd22. --- .../core/job/AbstractSeparablePipelineJob.java | 2 +- .../core/job/progress/TransmissionJobItemProgress.java | 18 +++++++++--------- .../YamlTransmissionJobItemProgressSwapper.java | 12 +++++++----- .../job/progress/PipelineJobProgressDetectorTest.java | 16 ++++++++-------- .../data/pipeline/cdc/api/CDCJobAPI.java | 7 +++++-- 5 files changed, 30 insertions(+), 25 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java index 648665e70b7..196034b3549 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java @@ -124,7 +124,7 @@ public abstract class AbstractSeparablePipelineJob<T extends PipelineJobConfigur protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext); - private void prepare(final I jobItemContext) { + protected final void prepare(final I jobItemContext) { try { doPrepare(jobItemContext); // CHECKSTYLE:OFF diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java index 88876f0763a..56c18fa144c 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java @@ -18,13 +18,13 @@ package org.apache.shardingsphere.data.pipeline.core.job.progress; import lombok.Getter; -import lombok.RequiredArgsConstructor; +import lombok.NoArgsConstructor; import lombok.Setter; import org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext; import org.apache.shardingsphere.data.pipeline.core.job.JobStatus; -import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask; import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress; import org.apache.shardingsphere.data.pipeline.core.task.progress.InventoryTaskProgress; +import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import java.util.Collection; @@ -34,22 +34,22 @@ import java.util.Map; /** * Transmission job item progress. */ -@RequiredArgsConstructor +@NoArgsConstructor @Getter @Setter public final class TransmissionJobItemProgress implements PipelineJobItemProgress { - private final DatabaseType sourceDatabaseType; + private DatabaseType sourceDatabaseType; - private final String dataSourceName; + private String dataSourceName; - private final JobItemInventoryTasksProgress inventory; + private JobItemInventoryTasksProgress inventory; - private final JobItemIncrementalTasksProgress incremental; + private JobItemIncrementalTasksProgress incremental; - private final long inventoryRecordsCount; + private long inventoryRecordsCount; - private final long processedRecordsCount; + private long processedRecordsCount; private boolean active; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java index 4082d0dc487..43e63745c3a 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java @@ -47,12 +47,14 @@ public final class YamlTransmissionJobItemProgressSwapper implements YamlPipelin @Override public TransmissionJobItemProgress swapToObject(final YamlTransmissionJobItemProgress yamlProgress) { - TransmissionJobItemProgress result = new TransmissionJobItemProgress( - TypedSPILoader.getService(DatabaseType.class, yamlProgress.getSourceDatabaseType()), - yamlProgress.getDataSourceName(), inventoryTasksProgressSwapper.swapToObject(yamlProgress.getInventory()), - incrementalTasksProgressSwapper.swapToObject(yamlProgress.getSourceDatabaseType(), yamlProgress.getIncremental()), - yamlProgress.getInventoryRecordsCount(), yamlProgress.getProcessedRecordsCount()); + TransmissionJobItemProgress result = new TransmissionJobItemProgress(); result.setStatus(JobStatus.valueOf(yamlProgress.getStatus())); + result.setSourceDatabaseType(TypedSPILoader.getService(DatabaseType.class, yamlProgress.getSourceDatabaseType())); + result.setDataSourceName(yamlProgress.getDataSourceName()); + result.setInventory(inventoryTasksProgressSwapper.swapToObject(yamlProgress.getInventory())); + result.setIncremental(incrementalTasksProgressSwapper.swapToObject(yamlProgress.getSourceDatabaseType(), yamlProgress.getIncremental())); + result.setProcessedRecordsCount(yamlProgress.getProcessedRecordsCount()); + result.setInventoryRecordsCount(yamlProgress.getInventoryRecordsCount()); return result; } diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java index 850cf0d1991..9a3bfdd5971 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java @@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicReference; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; class PipelineJobProgressDetectorTest { @@ -68,30 +67,31 @@ class PipelineJobProgressDetectorTest { @Test void assertIsInventoryFinishedWhenJobCountDoesNotMatchJobItemProgresses() { - assertFalse(PipelineJobProgressDetector.isInventoryFinished(2, Collections.singleton(mock(TransmissionJobItemProgress.class)))); + TransmissionJobItemProgress transmissionJobItemProgress = new TransmissionJobItemProgress(); + assertFalse(PipelineJobProgressDetector.isInventoryFinished(2, Collections.singleton(transmissionJobItemProgress))); } @Test void assertIsInventoryFinishedWhenInventoryTaskProgressHasEmptyMap() { JobItemInventoryTasksProgress jobItemInventoryTasksProgress = new JobItemInventoryTasksProgress(Collections.emptyMap()); - TransmissionJobItemProgress transmissionJobItemProgress = mock(TransmissionJobItemProgress.class); - when(transmissionJobItemProgress.getInventory()).thenReturn(jobItemInventoryTasksProgress); + TransmissionJobItemProgress transmissionJobItemProgress = new TransmissionJobItemProgress(); + transmissionJobItemProgress.setInventory(jobItemInventoryTasksProgress); assertFalse(PipelineJobProgressDetector.isInventoryFinished(1, Collections.singleton(transmissionJobItemProgress))); } @Test void assertIsInventoryFinishedWhenNotAllInventoryTasksCompleted() { JobItemInventoryTasksProgress inventoryTasksProgress = new JobItemInventoryTasksProgress(Collections.singletonMap("TEST", new InventoryTaskProgress(new IngestPlaceholderPosition()))); - TransmissionJobItemProgress transmissionJobItemProgress = mock(TransmissionJobItemProgress.class); - when(transmissionJobItemProgress.getInventory()).thenReturn(inventoryTasksProgress); + TransmissionJobItemProgress transmissionJobItemProgress = new TransmissionJobItemProgress(); + transmissionJobItemProgress.setInventory(inventoryTasksProgress); assertFalse(PipelineJobProgressDetector.isInventoryFinished(1, Collections.singleton(transmissionJobItemProgress))); } @Test void assertIsInventoryFinished() { JobItemInventoryTasksProgress inventoryTasksProgress = new JobItemInventoryTasksProgress(Collections.singletonMap("TEST", new InventoryTaskProgress(new IngestFinishedPosition()))); - TransmissionJobItemProgress transmissionJobItemProgress = mock(TransmissionJobItemProgress.class); - when(transmissionJobItemProgress.getInventory()).thenReturn(inventoryTasksProgress); + TransmissionJobItemProgress transmissionJobItemProgress = new TransmissionJobItemProgress(); + transmissionJobItemProgress.setInventory(inventoryTasksProgress); assertTrue(PipelineJobProgressDetector.isInventoryFinished(1, Collections.singleton(transmissionJobItemProgress))); } } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java index b8d5447ce3f..2e9d6187cf8 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java @@ -207,10 +207,13 @@ public final class CDCJobAPI implements TransmissionJobAPI { private TransmissionJobItemProgress getTransmissionJobItemProgress(final CDCJobConfiguration jobConfig, final PipelineDataSourceManager dataSourceManager, final IncrementalDumperContext incrementalDumperContext) throws SQLException { + TransmissionJobItemProgress result = new TransmissionJobItemProgress(); + result.setSourceDatabaseType(jobConfig.getSourceDatabaseType()); + result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName()); IncrementalTaskPositionManager positionManager = new IncrementalTaskPositionManager(incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType()); IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(positionManager.getPosition(null, incrementalDumperContext, dataSourceManager)); - return new TransmissionJobItemProgress(jobConfig.getSourceDatabaseType(), incrementalDumperContext.getCommonContext().getDataSourceName(), null, - new JobItemIncrementalTasksProgress(incrementalTaskProgress), 0L, 0L); + result.setIncremental(new JobItemIncrementalTasksProgress(incrementalTaskProgress)); + return result; } /**
