This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0046e7cd839 Refactor AbstractSeparablePipelineJob (#32752)
0046e7cd839 is described below

commit 0046e7cd8399094b0bb74530c3fc07c72b95ac76
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Aug 31 21:45:32 2024 +0800

    Refactor AbstractSeparablePipelineJob (#32752)
    
    * Refactor AbstractSeparablePipelineJob
    
    * Refactor AbstractSeparablePipelineJob
---
 .../core/job/AbstractSeparablePipelineJob.java     | 22 ++++++++++++++--------
 1 file changed, 14 insertions(+), 8 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index ae16f08672a..63fac5ef828 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.job;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
@@ -34,6 +35,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfi
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
+import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 
@@ -67,13 +69,15 @@ public abstract class AbstractSeparablePipelineJob<T 
extends PipelineJobConfigur
         }
         PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
         PipelineJobConfigurationManager jobConfigManager = new 
PipelineJobConfigurationManager(jobType);
+        PipelineContextKey contextKey = 
PipelineJobIdUtils.parseContextKey(jobId);
         T jobConfig = jobConfigManager.getJobConfiguration(jobId);
-        TransmissionProcessContext jobProcessContext = 
jobType.isTransmissionJob() ? createTransmissionProcessContext(jobId) : null;
+        TransmissionProcessContext jobProcessContext = 
createTransmissionProcessContext(jobId, jobType, contextKey);
         PipelineJobItemManager<P> jobItemManager = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
         P jobItemProgress = 
jobItemManager.getProgress(shardingContext.getJobName(), 
shardingItem).orElse(null);
+        PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey);
         boolean started = false;
         try {
-            started = execute(buildJobItemContext(jobConfig, shardingItem, 
jobItemProgress, jobProcessContext));
+            started = execute(buildJobItemContext(jobConfig, shardingItem, 
jobItemProgress, jobProcessContext), governanceFacade);
             if (started) {
                 PipelineJobProgressPersistService.persistNow(jobId, 
shardingItem);
             }
@@ -82,7 +86,7 @@ public abstract class AbstractSeparablePipelineJob<T extends 
PipelineJobConfigur
             // CHECKSTYLE:ON
             if (!jobRunnerManager.isStopping()) {
                 log.error("Job {}-{} execution failed.", jobId, shardingItem, 
ex);
-                
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
 shardingItem, ex);
+                
governanceFacade.getJobItemFacade().getErrorMessage().update(jobId, 
shardingItem, ex);
                 throw ex;
             }
         } finally {
@@ -92,23 +96,25 @@ public abstract class AbstractSeparablePipelineJob<T 
extends PipelineJobConfigur
         }
     }
     
-    private boolean execute(final I jobItemContext) {
+    private boolean execute(final I jobItemContext, final 
PipelineGovernanceFacade governanceFacade) {
         int shardingItem = jobItemContext.getShardingItem();
         PipelineTasksRunner tasksRunner = buildTasksRunner(jobItemContext);
         if (!jobRunnerManager.addTasksRunner(shardingItem, tasksRunner)) {
             return false;
         }
         String jobId = jobItemContext.getJobId();
-        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId,
 shardingItem);
+        governanceFacade.getJobItemFacade().getErrorMessage().clean(jobId, 
shardingItem);
         prepare(jobItemContext);
         log.info("Start tasks runner, jobId={}, shardingItem={}.", jobId, 
shardingItem);
         tasksRunner.start();
         return true;
     }
     
-    private TransmissionProcessContext createTransmissionProcessContext(final 
String jobId) {
-        PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.fillInDefaultValue(
-                new 
PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobId),
 PipelineJobIdUtils.parseJobType(jobId).getType()));
+    private TransmissionProcessContext createTransmissionProcessContext(final 
String jobId, final PipelineJobType jobType, final PipelineContextKey 
contextKey) {
+        if (!jobType.isTransmissionJob()) {
+            return null;
+        }
+        PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.fillInDefaultValue(new 
PipelineProcessConfigurationPersistService().load(contextKey, 
jobType.getType()));
         return new TransmissionProcessContext(jobId, processConfig);
     }
     

Reply via email to