This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0046e7cd839 Refactor AbstractSeparablePipelineJob (#32752)
0046e7cd839 is described below
commit 0046e7cd8399094b0bb74530c3fc07c72b95ac76
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Aug 31 21:45:32 2024 +0800
Refactor AbstractSeparablePipelineJob (#32752)
* Refactor AbstractSeparablePipelineJob
* Refactor AbstractSeparablePipelineJob
---
.../core/job/AbstractSeparablePipelineJob.java | 22 ++++++++++++++--------
1 file changed, 14 insertions(+), 8 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index ae16f08672a..63fac5ef828 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
@@ -34,6 +35,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfi
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
+import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
@@ -67,13 +69,15 @@ public abstract class AbstractSeparablePipelineJob<T
extends PipelineJobConfigur
}
PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
PipelineJobConfigurationManager jobConfigManager = new
PipelineJobConfigurationManager(jobType);
+ PipelineContextKey contextKey =
PipelineJobIdUtils.parseContextKey(jobId);
T jobConfig = jobConfigManager.getJobConfiguration(jobId);
- TransmissionProcessContext jobProcessContext =
jobType.isTransmissionJob() ? createTransmissionProcessContext(jobId) : null;
+ TransmissionProcessContext jobProcessContext =
createTransmissionProcessContext(jobId, jobType, contextKey);
PipelineJobItemManager<P> jobItemManager = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
P jobItemProgress =
jobItemManager.getProgress(shardingContext.getJobName(),
shardingItem).orElse(null);
+ PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey);
boolean started = false;
try {
- started = execute(buildJobItemContext(jobConfig, shardingItem,
jobItemProgress, jobProcessContext));
+ started = execute(buildJobItemContext(jobConfig, shardingItem,
jobItemProgress, jobProcessContext), governanceFacade);
if (started) {
PipelineJobProgressPersistService.persistNow(jobId,
shardingItem);
}
@@ -82,7 +86,7 @@ public abstract class AbstractSeparablePipelineJob<T extends
PipelineJobConfigur
// CHECKSTYLE:ON
if (!jobRunnerManager.isStopping()) {
log.error("Job {}-{} execution failed.", jobId, shardingItem,
ex);
-
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
shardingItem, ex);
+
governanceFacade.getJobItemFacade().getErrorMessage().update(jobId,
shardingItem, ex);
throw ex;
}
} finally {
@@ -92,23 +96,25 @@ public abstract class AbstractSeparablePipelineJob<T
extends PipelineJobConfigur
}
}
- private boolean execute(final I jobItemContext) {
+ private boolean execute(final I jobItemContext, final
PipelineGovernanceFacade governanceFacade) {
int shardingItem = jobItemContext.getShardingItem();
PipelineTasksRunner tasksRunner = buildTasksRunner(jobItemContext);
if (!jobRunnerManager.addTasksRunner(shardingItem, tasksRunner)) {
return false;
}
String jobId = jobItemContext.getJobId();
-
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId,
shardingItem);
+ governanceFacade.getJobItemFacade().getErrorMessage().clean(jobId,
shardingItem);
prepare(jobItemContext);
log.info("Start tasks runner, jobId={}, shardingItem={}.", jobId,
shardingItem);
tasksRunner.start();
return true;
}
- private TransmissionProcessContext createTransmissionProcessContext(final
String jobId) {
- PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.fillInDefaultValue(
- new
PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobId),
PipelineJobIdUtils.parseJobType(jobId).getType()));
+ private TransmissionProcessContext createTransmissionProcessContext(final
String jobId, final PipelineJobType jobType, final PipelineContextKey
contextKey) {
+ if (!jobType.isTransmissionJob()) {
+ return null;
+ }
+ PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.fillInDefaultValue(new
PipelineProcessConfigurationPersistService().load(contextKey,
jobType.getType()));
return new TransmissionProcessContext(jobId, processConfig);
}