This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f5e90f7c564 Decouple RuleAlteredJobScheduler and RuleAlteredJobContext
for common usage (#19971)
f5e90f7c564 is described below
commit f5e90f7c56488bd5fd592ad57ca28f868bfe4daa
Author: Da Xiang Huang <[email protected]>
AuthorDate: Tue Aug 9 20:47:31 2022 +0800
Decouple RuleAlteredJobScheduler and RuleAlteredJobContext for common usage
(#19971)
---
.../pipeline/api/context/PipelineJobContext.java | 12 +++++
.../scenario/rulealtered/RuleAlteredJob.java | 2 +-
.../rulealtered/RuleAlteredJobScheduler.java | 60 +++++++++++++++++-----
3 files changed, 61 insertions(+), 13 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobContext.java
index b2d92bccf0b..b4b621eeeeb 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineJobContext.java
@@ -66,4 +66,16 @@ public interface PipelineJobContext {
* @return job process context
*/
PipelineProcessContext getJobProcessContext();
+
+ /**
+ * Set stopping.
+ * @param stopping stopping
+ */
+ void setStopping(boolean stopping);
+
+ /**
+ * Get stopping.
+ * @return stopping
+ */
+ boolean isStopping();
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index 236a3f17881..600533f1041 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -63,7 +63,7 @@ public final class RuleAlteredJob extends AbstractPipelineJob
implements SimpleJ
return;
}
log.info("start RuleAlteredJobScheduler, jobId={}, shardingItem={}",
getJobId(), shardingItem);
- RuleAlteredJobScheduler jobScheduler = new
RuleAlteredJobScheduler(jobContext);
+ RuleAlteredJobScheduler jobScheduler = new
RuleAlteredJobScheduler(jobContext, jobContext.getInventoryTasks(),
jobContext.getIncrementalTasks());
runInBackground(() -> {
prepare(jobContext);
jobScheduler.start();
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
index 7ae9c65e99b..9290bb2b9f3 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
@@ -19,16 +19,23 @@ package
org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.commons.lang3.concurrent.LazyInitializer;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobContext;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import java.util.Collection;
+
/**
* Rule altered job scheduler.
*/
@@ -37,8 +44,27 @@ import
org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@Getter
public final class RuleAlteredJobScheduler implements PipelineTasksRunner {
- private final RuleAlteredJobContext jobContext;
-
+ private final PipelineJobContext jobContext;
+
+ private final Collection<InventoryTask> inventoryTasks;
+
+ private final Collection<IncrementalTask> incrementalTasks;
+
+ private final LazyInitializer<ExecuteEngine>
inventoryDumperExecuteEngineLazyInitializer = new
LazyInitializer<ExecuteEngine>() {
+
+ @Override
+ protected ExecuteEngine initialize() {
+ return ExecuteEngine.newCachedThreadInstance("Inventory-" +
jobContext.getJobId());
+ }
+ };
+
+ private final LazyInitializer<ExecuteEngine>
incrementalDumperExecuteEngineLazyInitializer = new
LazyInitializer<ExecuteEngine>() {
+ @Override
+ protected ExecuteEngine initialize() {
+ return ExecuteEngine.newCachedThreadInstance("Incremental-" +
jobContext.getJobId());
+ }
+ };
+
/**
* Stop all task.
*/
@@ -46,12 +72,12 @@ public final class RuleAlteredJobScheduler implements
PipelineTasksRunner {
jobContext.setStopping(true);
log.info("stop, jobId={}, shardingItem={}", jobContext.getJobId(),
jobContext.getShardingItem());
// TODO blocking stop
- for (InventoryTask each : jobContext.getInventoryTasks()) {
+ for (InventoryTask each : getInventoryTasks()) {
log.info("stop inventory task {} - {}", jobContext.getJobId(),
each.getTaskId());
each.stop();
each.close();
}
- for (IncrementalTask each : jobContext.getIncrementalTasks()) {
+ for (IncrementalTask each : getIncrementalTasks()) {
log.info("stop incremental task {} - {}", jobContext.getJobId(),
each.getTaskId());
each.stop();
each.close();
@@ -75,28 +101,33 @@ public final class RuleAlteredJobScheduler implements
PipelineTasksRunner {
}
private synchronized boolean executeInventoryTask() {
- if
(PipelineJobProgressDetector.allInventoryTasksFinished(jobContext.getInventoryTasks()))
{
+ if
(PipelineJobProgressDetector.allInventoryTasksFinished(getInventoryTasks())) {
log.info("All inventory tasks finished.");
return true;
}
log.info("-------------- Start inventory task --------------");
jobContext.setStatus(JobStatus.EXECUTE_INVENTORY_TASK);
ExecuteCallback inventoryTaskCallback = createInventoryTaskCallback();
- for (InventoryTask each : jobContext.getInventoryTasks()) {
+ for (InventoryTask each : getInventoryTasks()) {
if (each.getProgress().getPosition() instanceof FinishedPosition) {
continue;
}
-
jobContext.getJobProcessContext().getInventoryDumperExecuteEngine().submit(each,
inventoryTaskCallback);
+ getInventoryDumperExecuteEngine().submit(each,
inventoryTaskCallback);
}
return false;
}
-
+
+ @SneakyThrows(ConcurrentException.class)
+ private ExecuteEngine getInventoryDumperExecuteEngine() {
+ return inventoryDumperExecuteEngineLazyInitializer.get();
+ }
+
private ExecuteCallback createInventoryTaskCallback() {
return new ExecuteCallback() {
@Override
public void onSuccess() {
- if
(PipelineJobProgressDetector.allInventoryTasksFinished(jobContext.getInventoryTasks()))
{
+ if
(PipelineJobProgressDetector.allInventoryTasksFinished(getInventoryTasks())) {
log.info("onSuccess, all inventory tasks finished.");
executeIncrementalTask();
}
@@ -119,14 +150,19 @@ public final class RuleAlteredJobScheduler implements
PipelineTasksRunner {
log.info("-------------- Start incremental task --------------");
jobContext.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK);
ExecuteCallback incrementalTaskCallback =
createIncrementalTaskCallback();
- for (IncrementalTask each : jobContext.getIncrementalTasks()) {
+ for (IncrementalTask each : getIncrementalTasks()) {
if (each.getProgress().getPosition() instanceof FinishedPosition) {
continue;
}
-
jobContext.getJobProcessContext().getIncrementalDumperExecuteEngine().submit(each,
incrementalTaskCallback);
+ getIncrementalDumperExecuteEngine().submit(each,
incrementalTaskCallback);
}
}
-
+
+ @SneakyThrows(ConcurrentException.class)
+ private ExecuteEngine getIncrementalDumperExecuteEngine() {
+ return incrementalDumperExecuteEngineLazyInitializer.get();
+ }
+
private ExecuteCallback createIncrementalTaskCallback() {
return new ExecuteCallback() {