This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new dbf28033ffb Refactor CDCJobPreparer (#32760)
dbf28033ffb is described below

commit dbf28033ffb6f288ecd6550ea518105c1761d3b1
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Sep 1 17:49:31 2024 +0800

    Refactor CDCJobPreparer (#32760)
---
 .../apache/shardingsphere/data/pipeline/cdc/CDCJob.java   |  7 ++++---
 .../data/pipeline/cdc/core/prepare/CDCJobPreparer.java    | 15 ++++++++-------
 2 files changed, 12 insertions(+), 10 deletions(-)

diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 71474b4f6c5..6768dc0c261 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -126,7 +126,7 @@ public final class CDCJob implements PipelineJob {
             log.warn("Job item contexts are empty, ignore.");
             return;
         }
-        initTasks(jobItemContexts, governanceFacade);
+        initTasks(jobItemContexts, governanceFacade, jobItemManager);
         executeInventoryTasks(jobItemContexts, jobItemManager);
         executeIncrementalTasks(jobItemContexts, jobItemManager);
     }
@@ -158,9 +158,10 @@ public final class CDCJob implements PipelineJob {
         return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
mapper, write.getBatchSize(), writeRateLimitAlgorithm, 0, 1);
     }
     
-    private void initTasks(final Collection<CDCJobItemContext> 
jobItemContexts, final PipelineGovernanceFacade governanceFacade) {
+    private void initTasks(final Collection<CDCJobItemContext> jobItemContexts,
+                           final PipelineGovernanceFacade governanceFacade, 
final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager) {
         try {
-            new CDCJobPreparer().initTasks(jobItemContexts);
+            new CDCJobPreparer(jobItemManager).initTasks(jobItemContexts);
             // CHECKSTYLE:OFF
         } catch (final RuntimeException ex) {
             // CHECKSTYLE:ON
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index c45c6f4abb1..b4eee41dce5 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -17,8 +17,8 @@
 
 package org.apache.shardingsphere.data.pipeline.cdc.core.prepare;
 
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
 import org.apache.shardingsphere.data.pipeline.cdc.config.CDCTaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCChannelProgressPair;
@@ -65,13 +65,14 @@ import java.util.concurrent.atomic.AtomicReference;
 /**
  * CDC job preparer.
  */
+@RequiredArgsConstructor
 @Slf4j
 public final class CDCJobPreparer {
     
-    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new PipelineJobItemManager<>(new 
CDCJobType().getYamlJobItemProgressSwapper());
+    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager;
     
     /**
-     * Do prepare work.
+     * Prepare.
      *
      * @param jobItemContexts job item contexts
      */
@@ -82,12 +83,12 @@ public final class CDCJobPreparer {
         AtomicBoolean incrementalImporterUsed = new AtomicBoolean();
         List<CDCChannelProgressPair> incrementalChannelProgressPairs = new 
CopyOnWriteArrayList<>();
         for (CDCJobItemContext each : jobItemContexts) {
-            initTasks0(each, inventoryImporterUsed, 
inventoryChannelProgressPairs, incrementalImporterUsed, 
incrementalChannelProgressPairs);
+            initTasks(each, inventoryImporterUsed, 
inventoryChannelProgressPairs, incrementalImporterUsed, 
incrementalChannelProgressPairs);
         }
     }
     
-    private void initTasks0(final CDCJobItemContext jobItemContext, final 
AtomicBoolean inventoryImporterUsed, final List<CDCChannelProgressPair> 
inventoryChannelProgressPairs,
-                            final AtomicBoolean incrementalImporterUsed, final 
List<CDCChannelProgressPair> incrementalChannelProgressPairs) {
+    private void initTasks(final CDCJobItemContext jobItemContext, final 
AtomicBoolean inventoryImporterUsed, final List<CDCChannelProgressPair> 
inventoryChannelProgressPairs,
+                           final AtomicBoolean incrementalImporterUsed, final 
List<CDCChannelProgressPair> incrementalChannelProgressPairs) {
         Optional<TransmissionJobItemProgress> jobItemProgress = 
jobItemManager.getProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
         if (!jobItemProgress.isPresent()) {
             jobItemManager.persistProgress(jobItemContext);
@@ -138,7 +139,7 @@ public final class CDCJobPreparer {
                 importerUsed.set(true);
             }
         }
-        log.info("initInventoryTasks cost {} ms", System.currentTimeMillis() - 
startTimeMillis);
+        log.info("Init inventory tasks cost {} ms", System.currentTimeMillis() 
- startTimeMillis);
     }
     
     private void initIncrementalTask(final CDCJobItemContext jobItemContext, 
final AtomicBoolean importerUsed, final List<CDCChannelProgressPair> 
channelProgressPairs) {

Reply via email to