This is an automated email from the ASF dual-hosted git repository.
totalo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 652865fd059 Refactor AbstractJobConfigurationChangedProcessor (#29352)
652865fd059 is described below
commit 652865fd0593de57d0af4e6639e4fde4b3eebcc0
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Dec 11 03:20:20 2023 +0800
Refactor AbstractJobConfigurationChangedProcessor (#29352)
---
.../core/job/AbstractInseparablePipelineJob.java | 22 +++++++++++-----------
.../AbstractJobConfigurationChangedProcessor.java | 4 ++--
.../shardingsphere/data/pipeline/cdc/CDCJob.java | 11 ++---------
...tencyCheckJobConfigurationChangedProcessor.java | 2 +-
.../MigrationJobConfigurationChangedProcessor.java | 2 +-
5 files changed, 17 insertions(+), 24 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index 09af3e3d7a5..64c5f3b12bf 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -22,6 +22,8 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
@@ -121,15 +123,7 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobItemCo
if (futures.isEmpty()) {
return;
}
- executeInventoryTasks(futures, jobItemContexts);
- }
-
- protected abstract void
executeInventoryTasks(Collection<CompletableFuture<?>> futures, Collection<T>
jobItemContexts);
-
- private void updateJobItemStatus(final T jobItemContext, final
PipelineJobType jobType, final JobStatus jobStatus) {
- jobItemContext.setStatus(jobStatus);
- PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
- jobItemManager.updateStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
+ ExecuteEngine.trigger(futures, buildExecuteCallback("inventory",
jobItemContexts.iterator().next()));
}
private void executeIncrementalTasks(final PipelineJobType jobType, final
Collection<T> jobItemContexts) {
@@ -147,8 +141,14 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobItemCo
futures.addAll(task.start());
}
}
- executeIncrementalTasks(futures, jobItemContexts);
+ ExecuteEngine.trigger(futures, buildExecuteCallback("incremental",
jobItemContexts.iterator().next()));
+ }
+
+ private void updateJobItemStatus(final T jobItemContext, final
PipelineJobType jobType, final JobStatus jobStatus) {
+ jobItemContext.setStatus(jobStatus);
+ PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
+ jobItemManager.updateStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
}
- protected abstract void
executeIncrementalTasks(Collection<CompletableFuture<?>> futures, Collection<T>
jobItemContexts);
+ protected abstract ExecuteCallback buildExecuteCallback(String identifier,
T jobItemContext);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
index 38c7b9cfad1..a59a9af680d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
@@ -79,15 +79,15 @@ public abstract class
AbstractJobConfigurationChangedProcessor implements JobCon
protected abstract void onDeleted(JobConfiguration jobConfig);
protected void executeJob(final JobConfiguration jobConfig) {
+ PipelineJob job = buildJob();
String jobId = jobConfig.getJobName();
- PipelineJob job = buildPipelineJob(jobId);
PipelineJobRegistry.add(jobId, job);
OneOffJobBootstrap oneOffJobBootstrap = new
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)),
job, jobConfig);
job.getJobRunnerManager().setJobBootstrap(oneOffJobBootstrap);
oneOffJobBootstrap.execute();
}
- protected abstract PipelineJob buildPipelineJob(String jobId);
+ protected abstract PipelineJob buildJob();
protected abstract PipelineJobType getJobType();
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index eb649068faf..a9aa32f48ef 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -36,7 +36,6 @@ import
org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
-import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
@@ -64,7 +63,6 @@ import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
@@ -145,13 +143,8 @@ public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobItemConte
}
@Override
- protected void executeInventoryTasks(final
Collection<CompletableFuture<?>> futures, final Collection<CDCJobItemContext>
jobItemContexts) {
- ExecuteEngine.trigger(futures, new CDCExecuteCallback("inventory",
jobItemContexts.iterator().next()));
- }
-
- @Override
- protected void executeIncrementalTasks(final
Collection<CompletableFuture<?>> futures, final Collection<CDCJobItemContext>
jobItemContexts) {
- ExecuteEngine.trigger(futures, new CDCExecuteCallback("incremental",
jobItemContexts.iterator().next()));
+ protected ExecuteCallback buildExecuteCallback(final String identifier,
final CDCJobItemContext jobItemContext) {
+ return new CDCExecuteCallback(identifier, jobItemContext);
}
@RequiredArgsConstructor
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
index e57d0066a76..55976be8446 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
@@ -36,7 +36,7 @@ public final class
ConsistencyCheckJobConfigurationChangedProcessor extends Abst
}
@Override
- protected PipelineJob buildPipelineJob(final String jobId) {
+ protected PipelineJob buildJob() {
return new ConsistencyCheckJob();
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
index f9e57077ac5..18c3adc8666 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
@@ -39,7 +39,7 @@ public final class MigrationJobConfigurationChangedProcessor
extends AbstractJob
}
@Override
- protected PipelineJob buildPipelineJob(final String jobId) {
+ protected PipelineJob buildJob() {
return new MigrationJob();
}