This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 42e2df4f924 Revise RuleAlteredJobScheduler (#20034)
42e2df4f924 is described below
commit 42e2df4f9246b68b404c29e00dd0bac40c51d97f
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Aug 10 01:13:08 2022 +0800
Revise RuleAlteredJobScheduler (#20034)
---
.../scenario/rulealtered/RuleAlteredJob.java | 3 +-
.../rulealtered/RuleAlteredJobScheduler.java | 64 +++++++---------------
2 files changed, 21 insertions(+), 46 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index 600533f1041..3d1e54bf990 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -63,7 +63,8 @@ public final class RuleAlteredJob extends AbstractPipelineJob
implements SimpleJ
return;
}
log.info("start RuleAlteredJobScheduler, jobId={}, shardingItem={}",
getJobId(), shardingItem);
- RuleAlteredJobScheduler jobScheduler = new
RuleAlteredJobScheduler(jobContext, jobContext.getInventoryTasks(),
jobContext.getIncrementalTasks());
+ RuleAlteredJobScheduler jobScheduler = new
RuleAlteredJobScheduler(jobContext, jobContext.getInventoryTasks(),
jobContext.getIncrementalTasks(),
+
jobContext.getJobProcessContext().getInventoryDumperExecuteEngine(),
jobContext.getJobProcessContext().getIncrementalDumperExecuteEngine());
runInBackground(() -> {
prepare(jobContext);
jobScheduler.start();
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
index 9290bb2b9f3..afb74e828ed 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
@@ -19,10 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.concurrent.ConcurrentException;
-import org.apache.commons.lang3.concurrent.LazyInitializer;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
@@ -41,43 +38,30 @@ import java.util.Collection;
*/
@Slf4j
@RequiredArgsConstructor
-@Getter
public final class RuleAlteredJobScheduler implements PipelineTasksRunner {
+ @Getter
private final PipelineJobContext jobContext;
-
+
private final Collection<InventoryTask> inventoryTasks;
-
+
private final Collection<IncrementalTask> incrementalTasks;
-
- private final LazyInitializer<ExecuteEngine>
inventoryDumperExecuteEngineLazyInitializer = new
LazyInitializer<ExecuteEngine>() {
-
- @Override
- protected ExecuteEngine initialize() {
- return ExecuteEngine.newCachedThreadInstance("Inventory-" +
jobContext.getJobId());
- }
- };
-
- private final LazyInitializer<ExecuteEngine>
incrementalDumperExecuteEngineLazyInitializer = new
LazyInitializer<ExecuteEngine>() {
- @Override
- protected ExecuteEngine initialize() {
- return ExecuteEngine.newCachedThreadInstance("Incremental-" +
jobContext.getJobId());
- }
- };
-
- /**
- * Stop all task.
- */
+
+ private final ExecuteEngine inventoryDumperExecuteEngine;
+
+ private final ExecuteEngine incrementalDumperExecuteEngine;
+
+ @Override
public void stop() {
jobContext.setStopping(true);
log.info("stop, jobId={}, shardingItem={}", jobContext.getJobId(),
jobContext.getShardingItem());
// TODO blocking stop
- for (InventoryTask each : getInventoryTasks()) {
+ for (InventoryTask each : inventoryTasks) {
log.info("stop inventory task {} - {}", jobContext.getJobId(),
each.getTaskId());
each.stop();
each.close();
}
- for (IncrementalTask each : getIncrementalTasks()) {
+ for (IncrementalTask each : incrementalTasks) {
log.info("stop incremental task {} - {}", jobContext.getJobId(),
each.getTaskId());
each.stop();
each.close();
@@ -101,33 +85,28 @@ public final class RuleAlteredJobScheduler implements
PipelineTasksRunner {
}
private synchronized boolean executeInventoryTask() {
- if
(PipelineJobProgressDetector.allInventoryTasksFinished(getInventoryTasks())) {
+ if
(PipelineJobProgressDetector.allInventoryTasksFinished(inventoryTasks)) {
log.info("All inventory tasks finished.");
return true;
}
log.info("-------------- Start inventory task --------------");
jobContext.setStatus(JobStatus.EXECUTE_INVENTORY_TASK);
ExecuteCallback inventoryTaskCallback = createInventoryTaskCallback();
- for (InventoryTask each : getInventoryTasks()) {
+ for (InventoryTask each : inventoryTasks) {
if (each.getProgress().getPosition() instanceof FinishedPosition) {
continue;
}
- getInventoryDumperExecuteEngine().submit(each,
inventoryTaskCallback);
+ inventoryDumperExecuteEngine.submit(each, inventoryTaskCallback);
}
return false;
}
-
- @SneakyThrows(ConcurrentException.class)
- private ExecuteEngine getInventoryDumperExecuteEngine() {
- return inventoryDumperExecuteEngineLazyInitializer.get();
- }
-
+
private ExecuteCallback createInventoryTaskCallback() {
return new ExecuteCallback() {
@Override
public void onSuccess() {
- if
(PipelineJobProgressDetector.allInventoryTasksFinished(getInventoryTasks())) {
+ if
(PipelineJobProgressDetector.allInventoryTasksFinished(inventoryTasks)) {
log.info("onSuccess, all inventory tasks finished.");
executeIncrementalTask();
}
@@ -150,19 +129,14 @@ public final class RuleAlteredJobScheduler implements
PipelineTasksRunner {
log.info("-------------- Start incremental task --------------");
jobContext.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK);
ExecuteCallback incrementalTaskCallback =
createIncrementalTaskCallback();
- for (IncrementalTask each : getIncrementalTasks()) {
+ for (IncrementalTask each : incrementalTasks) {
if (each.getProgress().getPosition() instanceof FinishedPosition) {
continue;
}
- getIncrementalDumperExecuteEngine().submit(each,
incrementalTaskCallback);
+ incrementalDumperExecuteEngine.submit(each,
incrementalTaskCallback);
}
}
-
- @SneakyThrows(ConcurrentException.class)
- private ExecuteEngine getIncrementalDumperExecuteEngine() {
- return incrementalDumperExecuteEngineLazyInitializer.get();
- }
-
+
private ExecuteCallback createIncrementalTaskCallback() {
return new ExecuteCallback() {