This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 2d81b4f9df9 Refactor AbstractInseparablePipelineJob (#32754)
2d81b4f9df9 is described below
commit 2d81b4f9df907ca34187fce8827d981a164f6fbb
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Aug 31 21:56:31 2024 +0800
Refactor AbstractInseparablePipelineJob (#32754)
---
.../core/job/AbstractInseparablePipelineJob.java | 16 ++++++++++------
1 file changed, 10 insertions(+), 6 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index 9839cb8dc0a..3d3047bcdea 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
@@ -68,11 +69,12 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobConfig
String jobId = shardingContext.getJobName();
log.info("Execute job {}", jobId);
PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
+ PipelineContextKey contextKey =
PipelineJobIdUtils.parseContextKey(jobId);
T jobConfig = (T)
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
- TransmissionProcessContext jobProcessContext =
jobType.isTransmissionJob() ? createTransmissionProcessContext(jobId) : null;
- Collection<I> jobItemContexts = new LinkedList<>();
PipelineJobItemManager<P> jobItemManager = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
- PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId));
+ TransmissionProcessContext jobProcessContext =
createTransmissionProcessContext(jobId, jobType, contextKey);
+ PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey);
+ Collection<I> jobItemContexts = new LinkedList<>();
for (int shardingItem = 0; shardingItem <
jobConfig.getJobShardingCount(); shardingItem++) {
if (jobRunnerManager.isStopping()) {
log.info("Job is stopping, ignore.");
@@ -96,9 +98,11 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobConfig
executeIncrementalTasks(jobItemContexts, jobItemManager);
}
- private TransmissionProcessContext createTransmissionProcessContext(final
String jobId) {
- PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.fillInDefaultValue(
- new
PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobId),
PipelineJobIdUtils.parseJobType(jobId).getType()));
+ private TransmissionProcessContext createTransmissionProcessContext(final
String jobId, final PipelineJobType jobType, final PipelineContextKey
contextKey) {
+ if (!jobType.isTransmissionJob()) {
+ return null;
+ }
+ PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.fillInDefaultValue(new
PipelineProcessConfigurationPersistService().load(contextKey,
jobType.getType()));
return new TransmissionProcessContext(jobId, processConfig);
}