This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new dbf28033ffb Refactor CDCJobPreparer (#32760)
dbf28033ffb is described below
commit dbf28033ffb6f288ecd6550ea518105c1761d3b1
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Sep 1 17:49:31 2024 +0800
Refactor CDCJobPreparer (#32760)
---
.../apache/shardingsphere/data/pipeline/cdc/CDCJob.java | 7 ++++---
.../data/pipeline/cdc/core/prepare/CDCJobPreparer.java | 15 ++++++++-------
2 files changed, 12 insertions(+), 10 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 71474b4f6c5..6768dc0c261 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -126,7 +126,7 @@ public final class CDCJob implements PipelineJob {
log.warn("Job item contexts are empty, ignore.");
return;
}
- initTasks(jobItemContexts, governanceFacade);
+ initTasks(jobItemContexts, governanceFacade, jobItemManager);
executeInventoryTasks(jobItemContexts, jobItemManager);
executeIncrementalTasks(jobItemContexts, jobItemManager);
}
@@ -158,9 +158,10 @@ public final class CDCJob implements PipelineJob {
return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
mapper, write.getBatchSize(), writeRateLimitAlgorithm, 0, 1);
}
- private void initTasks(final Collection<CDCJobItemContext>
jobItemContexts, final PipelineGovernanceFacade governanceFacade) {
+ private void initTasks(final Collection<CDCJobItemContext> jobItemContexts,
+ final PipelineGovernanceFacade governanceFacade,
final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager) {
try {
- new CDCJobPreparer().initTasks(jobItemContexts);
+ new CDCJobPreparer(jobItemManager).initTasks(jobItemContexts);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index c45c6f4abb1..b4eee41dce5 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.data.pipeline.cdc.core.prepare;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
import org.apache.shardingsphere.data.pipeline.cdc.config.CDCTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCChannelProgressPair;
@@ -65,13 +65,14 @@ import java.util.concurrent.atomic.AtomicReference;
/**
* CDC job preparer.
*/
+@RequiredArgsConstructor
@Slf4j
public final class CDCJobPreparer {
- private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new PipelineJobItemManager<>(new
CDCJobType().getYamlJobItemProgressSwapper());
+ private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager;
/**
- * Do prepare work.
+ * Prepare.
*
* @param jobItemContexts job item contexts
*/
@@ -82,12 +83,12 @@ public final class CDCJobPreparer {
AtomicBoolean incrementalImporterUsed = new AtomicBoolean();
List<CDCChannelProgressPair> incrementalChannelProgressPairs = new
CopyOnWriteArrayList<>();
for (CDCJobItemContext each : jobItemContexts) {
- initTasks0(each, inventoryImporterUsed,
inventoryChannelProgressPairs, incrementalImporterUsed,
incrementalChannelProgressPairs);
+ initTasks(each, inventoryImporterUsed,
inventoryChannelProgressPairs, incrementalImporterUsed,
incrementalChannelProgressPairs);
}
}
- private void initTasks0(final CDCJobItemContext jobItemContext, final
AtomicBoolean inventoryImporterUsed, final List<CDCChannelProgressPair>
inventoryChannelProgressPairs,
- final AtomicBoolean incrementalImporterUsed, final
List<CDCChannelProgressPair> incrementalChannelProgressPairs) {
+ private void initTasks(final CDCJobItemContext jobItemContext, final
AtomicBoolean inventoryImporterUsed, final List<CDCChannelProgressPair>
inventoryChannelProgressPairs,
+ final AtomicBoolean incrementalImporterUsed, final
List<CDCChannelProgressPair> incrementalChannelProgressPairs) {
Optional<TransmissionJobItemProgress> jobItemProgress =
jobItemManager.getProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
if (!jobItemProgress.isPresent()) {
jobItemManager.persistProgress(jobItemContext);
@@ -138,7 +139,7 @@ public final class CDCJobPreparer {
importerUsed.set(true);
}
}
- log.info("initInventoryTasks cost {} ms", System.currentTimeMillis() -
startTimeMillis);
+ log.info("Init inventory tasks cost {} ms", System.currentTimeMillis()
- startTimeMillis);
}
private void initIncrementalTask(final CDCJobItemContext jobItemContext,
final AtomicBoolean importerUsed, final List<CDCChannelProgressPair>
channelProgressPairs) {