This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 42e2df4f924 Revise RuleAlteredJobScheduler (#20034)
42e2df4f924 is described below

commit 42e2df4f9246b68b404c29e00dd0bac40c51d97f
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Aug 10 01:13:08 2022 +0800

    Revise RuleAlteredJobScheduler (#20034)
---
 .../scenario/rulealtered/RuleAlteredJob.java       |  3 +-
 .../rulealtered/RuleAlteredJobScheduler.java       | 64 +++++++---------------
 2 files changed, 21 insertions(+), 46 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index 600533f1041..3d1e54bf990 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -63,7 +63,8 @@ public final class RuleAlteredJob extends AbstractPipelineJob 
implements SimpleJ
             return;
         }
         log.info("start RuleAlteredJobScheduler, jobId={}, shardingItem={}", 
getJobId(), shardingItem);
-        RuleAlteredJobScheduler jobScheduler = new 
RuleAlteredJobScheduler(jobContext, jobContext.getInventoryTasks(), 
jobContext.getIncrementalTasks());
+        RuleAlteredJobScheduler jobScheduler = new 
RuleAlteredJobScheduler(jobContext, jobContext.getInventoryTasks(), 
jobContext.getIncrementalTasks(),
+                
jobContext.getJobProcessContext().getInventoryDumperExecuteEngine(), 
jobContext.getJobProcessContext().getIncrementalDumperExecuteEngine());
         runInBackground(() -> {
             prepare(jobContext);
             jobScheduler.start();
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
index 9290bb2b9f3..afb74e828ed 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
@@ -19,10 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.concurrent.ConcurrentException;
-import org.apache.commons.lang3.concurrent.LazyInitializer;
 import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
@@ -41,43 +38,30 @@ import java.util.Collection;
  */
 @Slf4j
 @RequiredArgsConstructor
-@Getter
 public final class RuleAlteredJobScheduler implements PipelineTasksRunner {
     
+    @Getter
     private final PipelineJobContext jobContext;
-
+    
     private final Collection<InventoryTask> inventoryTasks;
-
+    
     private final Collection<IncrementalTask> incrementalTasks;
-
-    private final LazyInitializer<ExecuteEngine> 
inventoryDumperExecuteEngineLazyInitializer = new 
LazyInitializer<ExecuteEngine>() {
-
-        @Override
-        protected ExecuteEngine initialize() {
-            return ExecuteEngine.newCachedThreadInstance("Inventory-" + 
jobContext.getJobId());
-        }
-    };
-
-    private final LazyInitializer<ExecuteEngine> 
incrementalDumperExecuteEngineLazyInitializer = new 
LazyInitializer<ExecuteEngine>() {
-        @Override
-        protected ExecuteEngine initialize() {
-            return ExecuteEngine.newCachedThreadInstance("Incremental-" + 
jobContext.getJobId());
-        }
-    };
-
-    /**
-     * Stop all task.
-     */
+    
+    private final ExecuteEngine inventoryDumperExecuteEngine;
+    
+    private final ExecuteEngine incrementalDumperExecuteEngine;
+    
+    @Override
     public void stop() {
         jobContext.setStopping(true);
         log.info("stop, jobId={}, shardingItem={}", jobContext.getJobId(), 
jobContext.getShardingItem());
         // TODO blocking stop
-        for (InventoryTask each : getInventoryTasks()) {
+        for (InventoryTask each : inventoryTasks) {
             log.info("stop inventory task {} - {}", jobContext.getJobId(), 
each.getTaskId());
             each.stop();
             each.close();
         }
-        for (IncrementalTask each : getIncrementalTasks()) {
+        for (IncrementalTask each : incrementalTasks) {
             log.info("stop incremental task {} - {}", jobContext.getJobId(), 
each.getTaskId());
             each.stop();
             each.close();
@@ -101,33 +85,28 @@ public final class RuleAlteredJobScheduler implements 
PipelineTasksRunner {
     }
     
     private synchronized boolean executeInventoryTask() {
-        if 
(PipelineJobProgressDetector.allInventoryTasksFinished(getInventoryTasks())) {
+        if 
(PipelineJobProgressDetector.allInventoryTasksFinished(inventoryTasks)) {
             log.info("All inventory tasks finished.");
             return true;
         }
         log.info("-------------- Start inventory task --------------");
         jobContext.setStatus(JobStatus.EXECUTE_INVENTORY_TASK);
         ExecuteCallback inventoryTaskCallback = createInventoryTaskCallback();
-        for (InventoryTask each : getInventoryTasks()) {
+        for (InventoryTask each : inventoryTasks) {
             if (each.getProgress().getPosition() instanceof FinishedPosition) {
                 continue;
             }
-            getInventoryDumperExecuteEngine().submit(each, 
inventoryTaskCallback);
+            inventoryDumperExecuteEngine.submit(each, inventoryTaskCallback);
         }
         return false;
     }
-
-    @SneakyThrows(ConcurrentException.class)
-    private ExecuteEngine getInventoryDumperExecuteEngine() {
-        return inventoryDumperExecuteEngineLazyInitializer.get();
-    }
-
+    
     private ExecuteCallback createInventoryTaskCallback() {
         return new ExecuteCallback() {
             
             @Override
             public void onSuccess() {
-                if 
(PipelineJobProgressDetector.allInventoryTasksFinished(getInventoryTasks())) {
+                if 
(PipelineJobProgressDetector.allInventoryTasksFinished(inventoryTasks)) {
                     log.info("onSuccess, all inventory tasks finished.");
                     executeIncrementalTask();
                 }
@@ -150,19 +129,14 @@ public final class RuleAlteredJobScheduler implements 
PipelineTasksRunner {
         log.info("-------------- Start incremental task --------------");
         jobContext.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK);
         ExecuteCallback incrementalTaskCallback = 
createIncrementalTaskCallback();
-        for (IncrementalTask each : getIncrementalTasks()) {
+        for (IncrementalTask each : incrementalTasks) {
             if (each.getProgress().getPosition() instanceof FinishedPosition) {
                 continue;
             }
-            getIncrementalDumperExecuteEngine().submit(each, 
incrementalTaskCallback);
+            incrementalDumperExecuteEngine.submit(each, 
incrementalTaskCallback);
         }
     }
-
-    @SneakyThrows(ConcurrentException.class)
-    private ExecuteEngine getIncrementalDumperExecuteEngine() {
-        return incrementalDumperExecuteEngineLazyInitializer.get();
-    }
-
+    
     private ExecuteCallback createIncrementalTaskCallback() {
         return new ExecuteCallback() {
             

Reply via email to