This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9df009494 [Bug] [Seatunnel-Engine] Fix NEP when scheduling sub plan
fails. (#3909)
9df009494 is described below
commit 9df009494bba65a954106906a56f26250ea06e3d
Author: Guangdong Liu <[email protected]>
AuthorDate: Tue Jan 17 17:34:31 2023 +0800
[Bug] [Seatunnel-Engine] Fix NEP when scheduling sub plan fails. (#3909)
---
.../engine/common/utils/PassiveCompletableFuture.java | 16 +++++++++-------
.../seatunnel/engine/server/dag/physical/SubPlan.java | 2 ++
.../engine/server/scheduler/PipelineBaseScheduler.java | 18 ++++++++++++++----
3 files changed, 25 insertions(+), 11 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
index 0a6c519d7..ce5e6d182 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
@@ -27,13 +27,15 @@ public class PassiveCompletableFuture<T> extends
CompletableFuture<T> {
}
public PassiveCompletableFuture(CompletableFuture<T> chainedFuture) {
- chainedFuture.whenComplete((r, t) -> {
- if (t != null) {
- internalCompleteExceptionally(t);
- } else {
- internalComplete(r);
- }
- });
+ if (chainedFuture != null) {
+ chainedFuture.whenComplete((r, t) -> {
+ if (t != null) {
+ internalCompleteExceptionally(t);
+ } else {
+ internalComplete(r);
+ }
+ });
+ }
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 7bfa4eaba..b88224458 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.engine.server.master.JobMaster;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.map.IMap;
+import lombok.Data;
import lombok.NonNull;
import java.util.List;
@@ -39,6 +40,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+@Data
public class SubPlan {
private static final ILogger LOGGER = Logger.getLogger(SubPlan.class);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index 7fdfe323f..791feb405 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -17,7 +17,9 @@
package org.apache.seatunnel.engine.server.scheduler;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.common.exception.JobException;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
@@ -63,7 +65,7 @@ public class PipelineBaseScheduler implements JobScheduler {
List<CompletableFuture<Void>> collect =
physicalPlan.getPipelineList()
.stream()
- .map(pipeline -> schedulerPipeline(pipeline))
+ .map(this::schedulerPipeline)
.filter(Objects::nonNull).collect(Collectors.toList());
try {
CompletableFuture<Void> voidCompletableFuture =
CompletableFuture.allOf(
@@ -99,6 +101,7 @@ public class PipelineBaseScheduler implements JobScheduler {
deployPipeline(pipeline, slotProfiles);
}, jobMaster.getExecutorService());
} catch (Exception e) {
+ log.error(String.format("scheduler %s error and cancel pipeline.
The error is %s", pipeline.getPipelineFullName(),
ExceptionUtils.getMessage(e)));
pipeline.cancelPipeline();
return null;
}
@@ -135,11 +138,18 @@ public class PipelineBaseScheduler implements
JobScheduler {
oldProfile = ownedSlotProfiles.get(task.getTaskGroupLocation());
}
if (oldProfile == null ||
!resourceManager.slotActiveCheck(oldProfile)) {
- SlotProfile newProfile = applyResourceForTask(task).join();
- log.info(String.format("use new profile: %s to replace not active
profile: %s for task %s", newProfile, oldProfile, task));
+ SlotProfile newProfile;
+ CompletableFuture<SlotProfile> slotProfileCompletableFuture =
applyResourceForTask(task);
+ if (slotProfileCompletableFuture != null) {
+ newProfile = slotProfileCompletableFuture.join();
+ } else {
+ throw new SeaTunnelEngineException(String.format("The task
[%s] state is [%s] and the resource can not be retrieved",
task.getTaskFullName(), task.getExecutionState()));
+ }
+
+ log.info(String.format("use new profile: %s to replace not active
profile: %s for task %s", newProfile, oldProfile, task.getTaskFullName()));
return newProfile;
}
- log.info(String.format("use active old profile: %s for task %s",
oldProfile, task));
+ log.info(String.format("use active old profile: %s for task %s",
oldProfile, task.getTaskFullName()));
task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
return oldProfile;
}