This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch revert-32736-dev
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git

commit 0435cac549c8fd112a187f77cba9e424b11bdbc5
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Aug 30 19:44:50 2024 +0800

    Revert "Refactor TransmissionJobItemProgress (#32736)"
    
    This reverts commit e98467e43f52a80593d0a167523843298d88bd22.
---
 .../core/job/AbstractSeparablePipelineJob.java         |  2 +-
 .../core/job/progress/TransmissionJobItemProgress.java | 18 +++++++++---------
 .../YamlTransmissionJobItemProgressSwapper.java        | 12 +++++++-----
 .../job/progress/PipelineJobProgressDetectorTest.java  | 16 ++++++++--------
 .../data/pipeline/cdc/api/CDCJobAPI.java               |  7 +++++--
 5 files changed, 30 insertions(+), 25 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 648665e70b7..196034b3549 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -124,7 +124,7 @@ public abstract class AbstractSeparablePipelineJob<T 
extends PipelineJobConfigur
     
     protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext);
     
-    private void prepare(final I jobItemContext) {
+    protected final void prepare(final I jobItemContext) {
         try {
             doPrepare(jobItemContext);
             // CHECKSTYLE:OFF
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java
index 88876f0763a..56c18fa144c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgress.java
@@ -18,13 +18,13 @@
 package org.apache.shardingsphere.data.pipeline.core.job.progress;
 
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import lombok.NoArgsConstructor;
 import lombok.Setter;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import 
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.task.progress.InventoryTaskProgress;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 
 import java.util.Collection;
@@ -34,22 +34,22 @@ import java.util.Map;
 /**
  * Transmission job item progress.
  */
-@RequiredArgsConstructor
+@NoArgsConstructor
 @Getter
 @Setter
 public final class TransmissionJobItemProgress implements 
PipelineJobItemProgress {
     
-    private final DatabaseType sourceDatabaseType;
+    private DatabaseType sourceDatabaseType;
     
-    private final String dataSourceName;
+    private String dataSourceName;
     
-    private final JobItemInventoryTasksProgress inventory;
+    private JobItemInventoryTasksProgress inventory;
     
-    private final JobItemIncrementalTasksProgress incremental;
+    private JobItemIncrementalTasksProgress incremental;
     
-    private final long inventoryRecordsCount;
+    private long inventoryRecordsCount;
     
-    private final long processedRecordsCount;
+    private long processedRecordsCount;
     
     private boolean active;
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java
index 4082d0dc487..43e63745c3a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlTransmissionJobItemProgressSwapper.java
@@ -47,12 +47,14 @@ public final class YamlTransmissionJobItemProgressSwapper 
implements YamlPipelin
     
     @Override
     public TransmissionJobItemProgress swapToObject(final 
YamlTransmissionJobItemProgress yamlProgress) {
-        TransmissionJobItemProgress result = new TransmissionJobItemProgress(
-                TypedSPILoader.getService(DatabaseType.class, 
yamlProgress.getSourceDatabaseType()),
-                yamlProgress.getDataSourceName(), 
inventoryTasksProgressSwapper.swapToObject(yamlProgress.getInventory()),
-                
incrementalTasksProgressSwapper.swapToObject(yamlProgress.getSourceDatabaseType(),
 yamlProgress.getIncremental()),
-                yamlProgress.getInventoryRecordsCount(), 
yamlProgress.getProcessedRecordsCount());
+        TransmissionJobItemProgress result = new TransmissionJobItemProgress();
         result.setStatus(JobStatus.valueOf(yamlProgress.getStatus()));
+        
result.setSourceDatabaseType(TypedSPILoader.getService(DatabaseType.class, 
yamlProgress.getSourceDatabaseType()));
+        result.setDataSourceName(yamlProgress.getDataSourceName());
+        
result.setInventory(inventoryTasksProgressSwapper.swapToObject(yamlProgress.getInventory()));
+        
result.setIncremental(incrementalTasksProgressSwapper.swapToObject(yamlProgress.getSourceDatabaseType(),
 yamlProgress.getIncremental()));
+        
result.setProcessedRecordsCount(yamlProgress.getProcessedRecordsCount());
+        
result.setInventoryRecordsCount(yamlProgress.getInventoryRecordsCount());
         return result;
     }
     
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java
index 850cf0d1991..9a3bfdd5971 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetectorTest.java
@@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 class PipelineJobProgressDetectorTest {
     
@@ -68,30 +67,31 @@ class PipelineJobProgressDetectorTest {
     
     @Test
     void assertIsInventoryFinishedWhenJobCountDoesNotMatchJobItemProgresses() {
-        assertFalse(PipelineJobProgressDetector.isInventoryFinished(2, 
Collections.singleton(mock(TransmissionJobItemProgress.class))));
+        TransmissionJobItemProgress transmissionJobItemProgress = new 
TransmissionJobItemProgress();
+        assertFalse(PipelineJobProgressDetector.isInventoryFinished(2, 
Collections.singleton(transmissionJobItemProgress)));
     }
     
     @Test
     void assertIsInventoryFinishedWhenInventoryTaskProgressHasEmptyMap() {
         JobItemInventoryTasksProgress jobItemInventoryTasksProgress = new 
JobItemInventoryTasksProgress(Collections.emptyMap());
-        TransmissionJobItemProgress transmissionJobItemProgress = 
mock(TransmissionJobItemProgress.class);
-        
when(transmissionJobItemProgress.getInventory()).thenReturn(jobItemInventoryTasksProgress);
+        TransmissionJobItemProgress transmissionJobItemProgress = new 
TransmissionJobItemProgress();
+        
transmissionJobItemProgress.setInventory(jobItemInventoryTasksProgress);
         assertFalse(PipelineJobProgressDetector.isInventoryFinished(1, 
Collections.singleton(transmissionJobItemProgress)));
     }
     
     @Test
     void assertIsInventoryFinishedWhenNotAllInventoryTasksCompleted() {
         JobItemInventoryTasksProgress inventoryTasksProgress = new 
JobItemInventoryTasksProgress(Collections.singletonMap("TEST", new 
InventoryTaskProgress(new IngestPlaceholderPosition())));
-        TransmissionJobItemProgress transmissionJobItemProgress = 
mock(TransmissionJobItemProgress.class);
-        
when(transmissionJobItemProgress.getInventory()).thenReturn(inventoryTasksProgress);
+        TransmissionJobItemProgress transmissionJobItemProgress = new 
TransmissionJobItemProgress();
+        transmissionJobItemProgress.setInventory(inventoryTasksProgress);
         assertFalse(PipelineJobProgressDetector.isInventoryFinished(1, 
Collections.singleton(transmissionJobItemProgress)));
     }
     
     @Test
     void assertIsInventoryFinished() {
         JobItemInventoryTasksProgress inventoryTasksProgress = new 
JobItemInventoryTasksProgress(Collections.singletonMap("TEST", new 
InventoryTaskProgress(new IngestFinishedPosition())));
-        TransmissionJobItemProgress transmissionJobItemProgress = 
mock(TransmissionJobItemProgress.class);
-        
when(transmissionJobItemProgress.getInventory()).thenReturn(inventoryTasksProgress);
+        TransmissionJobItemProgress transmissionJobItemProgress = new 
TransmissionJobItemProgress();
+        transmissionJobItemProgress.setInventory(inventoryTasksProgress);
         assertTrue(PipelineJobProgressDetector.isInventoryFinished(1, 
Collections.singleton(transmissionJobItemProgress)));
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index b8d5447ce3f..2e9d6187cf8 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -207,10 +207,13 @@ public final class CDCJobAPI implements 
TransmissionJobAPI {
     
     private TransmissionJobItemProgress getTransmissionJobItemProgress(final 
CDCJobConfiguration jobConfig, final PipelineDataSourceManager 
dataSourceManager,
                                                                        final 
IncrementalDumperContext incrementalDumperContext) throws SQLException {
+        TransmissionJobItemProgress result = new TransmissionJobItemProgress();
+        result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
+        
result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName());
         IncrementalTaskPositionManager positionManager = new 
IncrementalTaskPositionManager(incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType());
         IncrementalTaskProgress incrementalTaskProgress = new 
IncrementalTaskProgress(positionManager.getPosition(null, 
incrementalDumperContext, dataSourceManager));
-        return new 
TransmissionJobItemProgress(jobConfig.getSourceDatabaseType(), 
incrementalDumperContext.getCommonContext().getDataSourceName(), null,
-                new JobItemIncrementalTasksProgress(incrementalTaskProgress), 
0L, 0L);
+        result.setIncremental(new 
JobItemIncrementalTasksProgress(incrementalTaskProgress));
+        return result;
     }
     
     /**

Reply via email to