This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 885a37e4f [Feature][ST-Engine] Notice Task State By
TaskExecutionService (#2567)
885a37e4f is described below
commit 885a37e4f312896c1c20211d82a84299464f6212
Author: Eric <[email protected]>
AuthorDate: Mon Sep 5 18:19:21 2022 +0800
[Feature][ST-Engine] Notice Task State By TaskExecutionService (#2567)
* add updateTaskExecutionState method to SeaTunnelServer
* complete notice task state by TaskExecutionService
* delete no use variable
* impove code
* fix review problem
---
.../engine/e2e/engine/JobExecutionIT.java | 21 ++-
.../engine/client/SeaTunnelClientTest.java | 11 +-
.../exception/JobNoEnoughResourceException.java | 4 +-
.../seatunnel/engine/server/SeaTunnelServer.java | 17 ++-
.../engine/server/dag/physical/PhysicalPlan.java | 2 +-
.../server/dag/physical/PhysicalPlanGenerator.java | 35 +++--
.../engine/server/dag/physical/PhysicalVertex.java | 169 +++++++++------------
.../engine/server/dag/physical/SubPlan.java | 22 ++-
.../seatunnel/engine/server/master/JobMaster.java | 31 +++-
.../operation/NotifyTaskStatusOperation.java | 4 +-
.../server/scheduler/PipelineBaseScheduler.java | 2 +-
.../server/task/operation/DeployTaskOperation.java | 15 +-
.../operation/GetTaskGroupAddressOperation.java | 2 +-
.../task/operation/sink/SinkRegisterOperation.java | 2 +-
14 files changed, 184 insertions(+), 153 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
index f67042748..8fb21b51b 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
@@ -84,14 +84,14 @@ public class JobExecutionIT {
try {
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
- CompletableFuture<Object> objectCompletableFuture =
CompletableFuture.supplyAsync(() -> {
- JobStatus jobStatus = clientJobProxy.waitForJobComplete();
- Assert.assertEquals(JobStatus.FINISHED, jobStatus);
- return null;
+ CompletableFuture<JobStatus> objectCompletableFuture =
CompletableFuture.supplyAsync(() -> {
+ return clientJobProxy.waitForJobComplete();
});
await().atMost(20000, TimeUnit.MILLISECONDS)
- .untilAsserted(() ->
Assert.assertTrue(objectCompletableFuture.isDone()));
+ .untilAsserted(() -> Assert.assertTrue(
+ objectCompletableFuture.isDone() &&
JobStatus.FINISHED.equals(objectCompletableFuture.get())));
+
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -115,16 +115,15 @@ public class JobExecutionIT {
JobStatus jobStatus1 = clientJobProxy.getJobStatus();
Assert.assertFalse(jobStatus1.isEndState());
ClientJobProxy finalClientJobProxy = clientJobProxy;
- CompletableFuture<Object> objectCompletableFuture =
CompletableFuture.supplyAsync(() -> {
- JobStatus jobStatus = finalClientJobProxy.waitForJobComplete();
- Assert.assertEquals(JobStatus.CANCELED, jobStatus);
- return null;
+ CompletableFuture<JobStatus> objectCompletableFuture =
CompletableFuture.supplyAsync(() -> {
+ return finalClientJobProxy.waitForJobComplete();
});
Thread.sleep(1000);
clientJobProxy.cancelJob();
- await().atMost(10000, TimeUnit.MILLISECONDS)
- .untilAsserted(() ->
Assert.assertTrue(objectCompletableFuture.isDone()));
+ await().atMost(20000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assert.assertTrue(
+ objectCompletableFuture.isDone() &&
JobStatus.CANCELED.equals(objectCompletableFuture.get())));
} catch (Exception e) {
throw new RuntimeException(e);
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 540ff2074..f3a8d4445 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -83,14 +83,13 @@ public class SeaTunnelClientTest {
try {
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
- CompletableFuture<Object> objectCompletableFuture =
CompletableFuture.supplyAsync(() -> {
- JobStatus jobStatus = clientJobProxy.waitForJobComplete();
- Assert.assertEquals(JobStatus.FINISHED, jobStatus);
- return null;
+ CompletableFuture<JobStatus> objectCompletableFuture =
CompletableFuture.supplyAsync(() -> {
+ return clientJobProxy.waitForJobComplete();
});
- await().atMost(10000, TimeUnit.MILLISECONDS)
- .untilAsserted(() ->
Assert.assertTrue(objectCompletableFuture.isDone()));
+ await().atMost(20000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assert.assertTrue(
+ objectCompletableFuture.isDone() &&
JobStatus.FINISHED.equals(objectCompletableFuture.get())));
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNoEnoughResourceException.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNoEnoughResourceException.java
index 9a0b89565..81653542b 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNoEnoughResourceException.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNoEnoughResourceException.java
@@ -18,8 +18,8 @@
package org.apache.seatunnel.engine.common.exception;
public class JobNoEnoughResourceException extends SeaTunnelEngineException {
- public JobNoEnoughResourceException(String jobName, long jobId, int
pipelineIndex, int totalPipelineNum) {
- super(String.format("Job %s (%s), Pipeline [(%s/%s)] have no enough
resource.", jobName, jobId, pipelineIndex + 1, totalPipelineNum));
+ public JobNoEnoughResourceException(String jobName, long jobId, int
pipelineId, int totalPipelineNum) {
+ super(String.format("Job %s (%s), Pipeline [(%s/%s)] have no enough
resource.", jobName, jobId, pipelineId + 1, totalPipelineNum));
}
public JobNoEnoughResourceException(String message) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 7c912f304..56c006206 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -19,8 +19,11 @@ package org.apache.seatunnel.engine.server;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.exception.JobException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.master.JobMaster;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import
org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
@@ -225,7 +228,7 @@ public class SeaTunnelServer implements ManagedService,
MembershipAwareService,
return new
PassiveCompletableFuture<>(CompletableFuture.supplyAsync(() -> {
runningJobMaster.cancelJob();
return null;
- }));
+ }, executorService));
}
}
@@ -237,4 +240,16 @@ public class SeaTunnelServer implements ManagedService,
MembershipAwareService,
}
return runningJobMaster.getJobStatus();
}
+
+ /**
+ * When TaskGroup ends, it is called by {@link TaskExecutionService} to
notify JobMaster the TaskGroup's state.
+ */
+ public void updateTaskExecutionState(TaskExecutionState
taskExecutionState) {
+ TaskGroupLocation taskGroupLocation =
taskExecutionState.getTaskGroupLocation();
+ JobMaster runningJobMaster =
runningJobMasterMap.get(taskGroupLocation.getJobId());
+ if (runningJobMaster == null) {
+ throw new JobException(String.format("Job %s not running",
taskGroupLocation.getJobId()));
+ }
+ runningJobMaster.updateTaskExecutionState(taskExecutionState);
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index 6fd0fe132..e55b0da65 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -138,7 +138,7 @@ public class PhysicalPlan {
CompletableFuture<Void> future =
CompletableFuture.supplyAsync(() -> {
pipeline.cancelPipeline();
return null;
- });
+ }, executorService);
return future;
}
return null;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index df2b11f53..50fa26b7f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -169,7 +169,8 @@ public class PhysicalPlanGenerator {
initializationTimestamp,
physicalVertexList,
coordinatorVertexList,
- jobImmutableInformation);
+ jobImmutableInformation,
+ executorService);
});
PhysicalPlan physicalPlan = new
PhysicalPlan(subPlanStream.collect(Collectors.toList()),
@@ -204,7 +205,8 @@ public class PhysicalPlanGenerator {
if (sinkAggregatedCommitter.isPresent()) {
long taskGroupID = idGenerator.getNextId();
long taskTypeId = idGenerator.getNextId();
- TaskGroupLocation taskGroupLocation = new
TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex,
taskGroupID);
+ TaskGroupLocation taskGroupLocation =
+ new
TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex,
taskGroupID);
TaskLocation taskLocation = new
TaskLocation(taskGroupLocation, taskTypeId, 0);
SinkAggregatedCommitterTask<?> t =
new
SinkAggregatedCommitterTask(jobImmutableInformation.getJobId(), taskLocation, s,
@@ -214,8 +216,7 @@ public class PhysicalPlanGenerator {
// checkpoint
pipelineTasks.add(taskLocation);
- return new PhysicalVertex(idGenerator.getNextId(),
- atomicInteger.incrementAndGet(),
+ return new PhysicalVertex(atomicInteger.incrementAndGet(),
executorService,
collect.size(),
new TaskGroupDefaultImpl(taskGroupLocation,
s.getName() + "-AggregatedCommitterTask",
@@ -245,13 +246,15 @@ public class PhysicalPlanGenerator {
long taskGroupIDPrefix = idGenerator.getNextId();
for (int i = 0; i < flow.getAction().getParallelism(); i++) {
long taskGroupID = mixIDPrefixAndIndex(taskGroupIDPrefix,
i);
- TaskGroupLocation taskGroupLocation = new
TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex,
taskGroupID);
+ TaskGroupLocation taskGroupLocation =
+ new
TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex,
taskGroupID);
TaskLocation taskLocation = new
TaskLocation(taskGroupLocation, taskIDPrefix, i);
setFlowConfig(flow, i);
- SeaTunnelTask seaTunnelTask = new
TransformSeaTunnelTask(jobImmutableInformation.getJobId(), taskLocation, i,
flow);
+ SeaTunnelTask seaTunnelTask =
+ new
TransformSeaTunnelTask(jobImmutableInformation.getJobId(), taskLocation, i,
flow);
// checkpoint
pipelineTasks.add(taskLocation);
- t.add(new PhysicalVertex(idGenerator.getNextId(),
+ t.add(new PhysicalVertex(
i,
executorService,
flow.getAction().getParallelism(),
@@ -278,15 +281,17 @@ public class PhysicalPlanGenerator {
return sources.stream().map(s -> {
long taskGroupID = idGenerator.getNextId();
long taskTypeId = idGenerator.getNextId();
- TaskGroupLocation taskGroupLocation = new
TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex,
taskGroupID);
+ TaskGroupLocation taskGroupLocation =
+ new TaskGroupLocation(jobImmutableInformation.getJobId(),
pipelineIndex, taskGroupID);
TaskLocation taskLocation = new TaskLocation(taskGroupLocation,
taskTypeId, 0);
- SourceSplitEnumeratorTask<?> t = new
SourceSplitEnumeratorTask<>(jobImmutableInformation.getJobId(), taskLocation,
s);
+ SourceSplitEnumeratorTask<?> t =
+ new
SourceSplitEnumeratorTask<>(jobImmutableInformation.getJobId(), taskLocation,
s);
// checkpoint
pipelineTasks.add(taskLocation);
startingTasks.add(taskLocation);
enumeratorTaskIDMap.put(s, taskLocation);
- return new PhysicalVertex(idGenerator.getNextId(),
+ return new PhysicalVertex(
atomicInteger.incrementAndGet(),
executorService,
sources.size(),
@@ -319,13 +324,15 @@ public class PhysicalPlanGenerator {
for (int i = 0; i < flow.getAction().getParallelism(); i++) {
int finalParallelismIndex = i;
long taskGroupID = mixIDPrefixAndIndex(taskGroupIDPrefix,
i);
- TaskGroupLocation taskGroupLocation = new
TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex,
taskGroupID);
+ TaskGroupLocation taskGroupLocation =
+ new
TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex,
taskGroupID);
List<SeaTunnelTask> taskList =
flows.stream().map(f -> {
setFlowConfig(f, finalParallelismIndex);
long taskIDPrefix =
flowTaskIDPrefixMap.computeIfAbsent(f.getFlowID(), id ->
idGenerator.getNextId());
- final TaskLocation taskLocation = new
TaskLocation(taskGroupLocation, taskIDPrefix, finalParallelismIndex);
+ final TaskLocation taskLocation =
+ new TaskLocation(taskGroupLocation,
taskIDPrefix, finalParallelismIndex);
// checkpoint
pipelineTasks.add(taskLocation);
if (f instanceof PhysicalExecutionFlow) {
@@ -343,7 +350,7 @@ public class PhysicalPlanGenerator {
if
(taskList.stream().anyMatch(TransformSeaTunnelTask.class::isInstance)) {
// contains IntermediateExecutionFlow in task group
- t.add(new PhysicalVertex(idGenerator.getNextId(),
+ t.add(new PhysicalVertex(
i,
executorService,
flow.getAction().getParallelism(),
@@ -358,7 +365,7 @@ public class PhysicalPlanGenerator {
initializationTimestamp,
nodeEngine));
} else {
- t.add(new PhysicalVertex(idGenerator.getNextId(),
+ t.add(new PhysicalVertex(
i,
executorService,
flow.getAction().getParallelism(),
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 1e8dc0ecf..399c4850d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -45,7 +45,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
+import java.util.function.Consumer;
/**
* PhysicalVertex is responsible for the scheduling and execution of a single
task parallel
@@ -56,8 +56,6 @@ public class PhysicalVertex {
private static final ILogger LOGGER =
Logger.getLogger(PhysicalVertex.class);
- private final long physicalVertexId;
-
private final TaskGroupLocation taskGroupLocation;
/**
@@ -75,7 +73,7 @@ public class PhysicalVertex {
private final FlakeIdGenerator flakeIdGenerator;
- private final int pipelineIndex;
+ private final int pipelineId;
private final int totalPipelineNum;
@@ -105,25 +103,23 @@ public class PhysicalVertex {
private Address currentExecutionAddress;
- public PhysicalVertex(long physicalVertexId,
- int subTaskGroupIndex,
+ public PhysicalVertex(int subTaskGroupIndex,
@NonNull ExecutorService executorService,
int parallelism,
@NonNull TaskGroupDefaultImpl taskGroup,
@NonNull FlakeIdGenerator flakeIdGenerator,
- int pipelineIndex,
+ int pipelineId,
int totalPipelineNum,
Set<URL> pluginJarsUrls,
@NonNull JobImmutableInformation
jobImmutableInformation,
long initializationTimestamp,
@NonNull NodeEngine nodeEngine) {
- this.physicalVertexId = physicalVertexId;
this.subTaskGroupIndex = subTaskGroupIndex;
this.executorService = executorService;
this.parallelism = parallelism;
this.taskGroup = taskGroup;
this.flakeIdGenerator = flakeIdGenerator;
- this.pipelineIndex = pipelineIndex;
+ this.pipelineId = pipelineId;
this.totalPipelineNum = totalPipelineNum;
this.pluginJarsUrls = pluginJarsUrls;
this.jobImmutableInformation = jobImmutableInformation;
@@ -138,7 +134,7 @@ public class PhysicalVertex {
"Job %s (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)]",
jobImmutableInformation.getJobConfig().getName(),
jobImmutableInformation.getJobId(),
- pipelineIndex,
+ pipelineId,
totalPipelineNum,
taskGroup.getTaskGroupName(),
subTaskGroupIndex + 1,
@@ -154,65 +150,67 @@ public class PhysicalVertex {
private void deployOnLocal(@NonNull SlotProfile slotProfile) {
deployInternal(taskGroupImmutableInformation -> {
SeaTunnelServer server =
nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
- return new
PassiveCompletableFuture<>(server.getSlotService().getSlotContext(slotProfile)
-
.getTaskExecutionService().deployTask(taskGroupImmutableInformation));
+ server.getSlotService().getSlotContext(slotProfile)
+
.getTaskExecutionService().deployTask(taskGroupImmutableInformation);
});
}
private void deployOnRemote(@NonNull SlotProfile slotProfile) {
- deployInternal(taskGroupImmutableInformation -> new
PassiveCompletableFuture<>(
-
nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
- new DeployTaskOperation(slotProfile,
-
nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
- slotProfile.getWorker())
- .invoke()));
+ deployInternal(taskGroupImmutableInformation -> {
+ try {
+
nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
+ new DeployTaskOperation(slotProfile,
+
nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
+ slotProfile.getWorker())
+ .invoke().get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
}
@SuppressWarnings("checkstyle:MagicNumber")
// This method must not throw an exception
public void deploy(@NonNull SlotProfile slotProfile) {
- currentExecutionAddress = slotProfile.getWorker();
- if (slotProfile.getWorker().equals(nodeEngine.getThisAddress())) {
- deployOnLocal(slotProfile);
- } else {
- deployOnRemote(slotProfile);
+ try {
+ currentExecutionAddress = slotProfile.getWorker();
+ if (slotProfile.getWorker().equals(nodeEngine.getThisAddress())) {
+ deployOnLocal(slotProfile);
+ } else {
+ deployOnRemote(slotProfile);
+ }
+ } catch (Throwable th) {
+ failedByException(th);
}
}
- private void deployInternal(Function<TaskGroupImmutableInformation,
PassiveCompletableFuture<TaskExecutionState>> deployMethod) {
+ private void deployInternal(Consumer<TaskGroupImmutableInformation>
taskGroupConsumer) {
TaskGroupImmutableInformation taskGroupImmutableInformation =
getTaskGroupImmutableInformation();
- PassiveCompletableFuture<TaskExecutionState> completeFuture;
- try {
- if (ExecutionState.DEPLOYING.equals(executionState.get())) {
- completeFuture =
deployMethod.apply(taskGroupImmutableInformation);
- // may be canceling
- if (!updateTaskState(ExecutionState.DEPLOYING,
ExecutionState.RUNNING)) {
- // If we found the task state turned to CANCELING after
deployed to TaskExecutionService. We need
- // notice the TaskExecutionService to cancel this task.
- noticeTaskExecutionServiceCancel();
- if
(ExecutionState.CANCELING.equals(this.getExecutionState().get())) {
- turnToEndState(ExecutionState.CANCELED);
- taskFuture.complete(
- new TaskExecutionState(this.taskGroupLocation,
ExecutionState.CANCELED, null));
- } else {
- turnToEndState(ExecutionState.FAILED);
- taskFuture.complete(new
TaskExecutionState(this.taskGroupLocation, ExecutionState.FAILED,
- new JobException(String.format("%s turn to a
unexpected state: %s, make it Failed",
- this.getTaskFullName(),
executionState.get()))));
- }
+ if (ExecutionState.DEPLOYING.equals(executionState.get())) {
+ taskGroupConsumer.accept(taskGroupImmutableInformation);
+ // may be canceling
+ if (!updateTaskState(ExecutionState.DEPLOYING,
ExecutionState.RUNNING)) {
+ // If we found the task state turned to CANCELING after
deployed to TaskExecutionService. We need
+ // notice the TaskExecutionService to cancel this task.
+ noticeTaskExecutionServiceCancel();
+ if
(ExecutionState.CANCELING.equals(this.getExecutionState().get())) {
+ turnToEndState(ExecutionState.CANCELED);
+ taskFuture.complete(
+ new TaskExecutionState(this.taskGroupLocation,
ExecutionState.CANCELED, null));
+ } else {
+ turnToEndState(ExecutionState.FAILED);
+ taskFuture.complete(new
TaskExecutionState(this.taskGroupLocation, ExecutionState.FAILED,
+ new JobException(String.format("%s turn to a
unexpected state: %s, make it Failed",
+ this.getTaskFullName(), executionState.get()))));
}
- monitorTask(completeFuture);
- } else if
(ExecutionState.CANCELING.equals(this.getExecutionState().get())) {
- turnToEndState(ExecutionState.CANCELED);
- taskFuture.complete(new
TaskExecutionState(this.taskGroupLocation, executionState.get(), null));
- } else {
- turnToEndState(ExecutionState.FAILED);
- taskFuture.complete(new
TaskExecutionState(this.taskGroupLocation, executionState.get(),
- new JobException(String.format("%s turn to a unexpected
state", getTaskFullName()))));
}
-
- } catch (Throwable th) {
- failedByException(th);
+ } else if
(ExecutionState.CANCELING.equals(this.getExecutionState().get())) {
+ turnToEndState(ExecutionState.CANCELED);
+ taskFuture.complete(new TaskExecutionState(this.taskGroupLocation,
executionState.get(), null));
+ } else {
+ turnToEndState(ExecutionState.FAILED);
+ taskFuture.complete(new TaskExecutionState(this.taskGroupLocation,
executionState.get(),
+ new JobException(String.format("%s turn to a unexpected
state", getTaskFullName()))));
}
}
@@ -227,48 +225,8 @@ public class PhysicalVertex {
private TaskGroupImmutableInformation getTaskGroupImmutableInformation() {
return new TaskGroupImmutableInformation(flakeIdGenerator.newId(),
- nodeEngine.getSerializationService().toData(this.taskGroup),
- this.pluginJarsUrls);
- }
-
- /**
- * @param completeFuture This future only can completion by the task run in
- * {@link
com.hazelcast.spi.impl.executionservice.ExecutionService }
- */
- private void monitorTask(PassiveCompletableFuture<TaskExecutionState>
completeFuture) {
- completeFuture.whenComplete((v, t) -> {
- try {
- if (t != null) {
- LOGGER.severe("An unexpected error occurred while the task
was running", t);
- taskFuture.complete(
- new TaskExecutionState(this.taskGroupLocation,
ExecutionState.FAILED,
- t));
- } else {
- turnToEndState(v.getExecutionState());
- if (v.getThrowable() != null) {
- LOGGER.severe(String.format("%s end with state %s and
Exception: %s",
- this.taskFullName,
- v.getExecutionState(),
- ExceptionUtils.getMessage(v.getThrowable())));
- } else {
- LOGGER.info(String.format("%s end with state %s",
- this.taskFullName,
- v.getExecutionState()));
- }
- taskFuture.complete(v);
- }
- } catch (Throwable th) {
- LOGGER.severe(
- String.format("%s end with Exception: %s",
this.taskFullName, ExceptionUtils.getMessage(th)));
- turnToEndState(ExecutionState.FAILED);
- v = new TaskExecutionState(this.taskGroupLocation,
ExecutionState.FAILED, th);
- taskFuture.complete(v);
- }
- });
- }
-
- public long getPhysicalVertexId() {
- return physicalVertexId;
+ nodeEngine.getSerializationService().toData(this.taskGroup),
+ this.pluginJarsUrls);
}
private void turnToEndState(@NonNull ExecutionState endState) {
@@ -378,4 +336,23 @@ public class PhysicalVertex {
public String getTaskFullName() {
return taskFullName;
}
+
+ public void updateTaskExecutionState(TaskExecutionState
taskExecutionState) {
+ turnToEndState(taskExecutionState.getExecutionState());
+ if (taskExecutionState.getThrowable() != null) {
+ LOGGER.severe(String.format("%s end with state %s and Exception:
%s",
+ this.taskFullName,
+ taskExecutionState.getExecutionState(),
+ ExceptionUtils.getMessage(taskExecutionState.getThrowable())));
+ } else {
+ LOGGER.info(String.format("%s end with state %s",
+ this.taskFullName,
+ taskExecutionState.getExecutionState()));
+ }
+ taskFuture.complete(taskExecutionState);
+ }
+
+ public TaskGroupLocation getTaskGroupLocation() {
+ return taskGroupLocation;
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index ea11a93d0..7df5074b7 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -30,6 +30,7 @@ import lombok.NonNull;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
@@ -42,7 +43,7 @@ public class SubPlan {
private final List<PhysicalVertex> coordinatorVertexList;
- private final int pipelineIndex;
+ private final int pipelineId;
private final int totalPipelineNum;
@@ -72,13 +73,16 @@ public class SubPlan {
*/
private final CompletableFuture<PipelineState> pipelineFuture;
- public SubPlan(int pipelineIndex,
+ private final ExecutorService executorService;
+
+ public SubPlan(int pipelineId,
int totalPipelineNum,
long initializationTimestamp,
@NonNull List<PhysicalVertex> physicalVertexList,
@NonNull List<PhysicalVertex> coordinatorVertexList,
- @NonNull JobImmutableInformation jobImmutableInformation) {
- this.pipelineIndex = pipelineIndex;
+ @NonNull JobImmutableInformation jobImmutableInformation,
+ @NonNull ExecutorService executorService) {
+ this.pipelineId = pipelineId;
this.pipelineFuture = new CompletableFuture<>();
this.totalPipelineNum = totalPipelineNum;
this.physicalVertexList = physicalVertexList;
@@ -92,8 +96,10 @@ public class SubPlan {
"Job %s (%s), Pipeline: [(%d/%d)]",
jobImmutableInformation.getJobConfig().getName(),
jobImmutableInformation.getJobId(),
- pipelineIndex,
+ pipelineId,
totalPipelineNum);
+
+ this.executorService = executorService;
}
public PassiveCompletableFuture<PipelineState> initStateFuture() {
@@ -247,7 +253,7 @@ public class SubPlan {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(()
-> {
task.cancel();
return null;
- });
+ }, executorService);
return future;
}
return null;
@@ -258,8 +264,8 @@ public class SubPlan {
cancelPipeline();
}
- public int getPipelineIndex() {
- return pipelineIndex;
+ public int getPipelineId() {
+ return pipelineId;
}
public List<PhysicalVertex> getPhysicalVertexList() {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 98d865f55..4b21001a6 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -31,6 +31,7 @@ import
org.apache.seatunnel.engine.server.checkpoint.CheckpointStorageConfigurat
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.scheduler.PipelineBaseScheduler;
@@ -151,12 +152,12 @@ public class JobMaster implements Runnable {
}
public Address queryTaskGroupAddress(long taskGroupId) {
- for (Integer pipelineIndex : ownedSlotProfiles.keySet()) {
- Optional<PhysicalVertex> currentVertex =
ownedSlotProfiles.get(pipelineIndex).keySet().stream()
+ for (Integer pipelineId : ownedSlotProfiles.keySet()) {
+ Optional<PhysicalVertex> currentVertex =
ownedSlotProfiles.get(pipelineId).keySet().stream()
.filter(physicalVertex ->
physicalVertex.getTaskGroup().getTaskGroupLocation().getTaskGroupId() ==
taskGroupId)
.findFirst();
if (currentVertex.isPresent()) {
- return
ownedSlotProfiles.get(pipelineIndex).get(currentVertex.get()).getWorker();
+ return
ownedSlotProfiles.get(pipelineId).get(currentVertex.get()).getWorker();
}
}
throw new IllegalArgumentException("can't find task group address from
task group id: " + taskGroupId);
@@ -189,4 +190,28 @@ public class JobMaster implements Runnable {
public PhysicalPlan getPhysicalPlan() {
return physicalPlan;
}
+
+ public void updateTaskExecutionState(TaskExecutionState
taskExecutionState) {
+ this.physicalPlan.getPipelineList().forEach(pipeline -> {
+ if (pipeline.getPipelineId() !=
taskExecutionState.getTaskGroupLocation().getPipelineId()) {
+ return;
+ }
+
+ pipeline.getCoordinatorVertexList().forEach(task -> {
+ if
(!task.getTaskGroupLocation().equals(taskExecutionState.getTaskGroupLocation()))
{
+ return;
+ }
+
+ task.updateTaskExecutionState(taskExecutionState);
+ });
+
+ pipeline.getPhysicalVertexList().forEach(task -> {
+ if
(!task.getTaskGroupLocation().equals(taskExecutionState.getTaskGroupLocation()))
{
+ return;
+ }
+
+ task.updateTaskExecutionState(taskExecutionState);
+ });
+ });
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/NotifyTaskStatusOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/NotifyTaskStatusOperation.java
index 377316c36..7ae0dc1d2 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/NotifyTaskStatusOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/NotifyTaskStatusOperation.java
@@ -20,7 +20,6 @@ package org.apache.seatunnel.engine.server.operation;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
-import org.apache.seatunnel.engine.server.master.JobMaster;
import com.hazelcast.spi.impl.operationservice.Operation;
@@ -38,8 +37,7 @@ public class NotifyTaskStatusOperation extends Operation {
@Override
public void run() throws Exception {
SeaTunnelServer service = getService();
- JobMaster jobMaster =
service.getJobMaster(taskGroupLocation.getJobId());
- //TODO Notify jobMaster of TaskGroup State
+ service.updateTaskExecutionState(taskExecutionState);
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index 22a6a0f16..1727e06ba 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -72,7 +72,7 @@ public class PipelineBaseScheduler implements JobScheduler {
Map<PhysicalVertex, SlotProfile> slotProfiles;
try {
slotProfiles = applyResourceForPipeline(pipeline);
- ownedSlotProfiles.put(pipeline.getPipelineIndex(),
slotProfiles);
+ ownedSlotProfiles.put(pipeline.getPipelineId(),
slotProfiles);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
index a67a43456..336ef59a9 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
@@ -17,9 +17,7 @@
package org.apache.seatunnel.engine.server.task.operation;
-import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
-import org.apache.seatunnel.engine.server.operation.AsyncOperation;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
@@ -27,11 +25,13 @@ import com.hazelcast.internal.nio.IOUtil;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.operationservice.Operation;
import lombok.NonNull;
import java.io.IOException;
-public class DeployTaskOperation extends AsyncOperation {
+public class DeployTaskOperation extends Operation implements
IdentifiedDataSerializable {
private Data taskImmutableInformation;
private SlotProfile slotProfile;
@@ -44,12 +44,17 @@ public class DeployTaskOperation extends AsyncOperation {
}
@Override
- protected PassiveCompletableFuture<?> doRun() throws Exception {
+ public void run() throws Exception {
SeaTunnelServer server = getService();
- return server.getSlotService().getSlotContext(slotProfile)
+ server.getSlotService().getSlotContext(slotProfile)
.getTaskExecutionService().deployTask(taskImmutableInformation);
}
+ @Override
+ public int getFactoryId() {
+ return TaskDataSerializerHook.FACTORY_ID;
+ }
+
@Override
public int getClassId() {
return TaskDataSerializerHook.DEPLOY_TASK_OPERATOR;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
index df6544140..1c4a6605e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
@@ -50,7 +50,7 @@ public class GetTaskGroupAddressOperation extends Operation
implements Identifie
response = RetryUtils.retryWithException(() ->
server.getJobMaster(taskLocation.getJobId())
.queryTaskGroupAddress(taskLocation.getTaskGroupLocation().getTaskGroupId()),
new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
- exception -> exception instanceof NullPointerException,
Constant.OPERATION_RETRY_SLEEP));
+ exception -> exception instanceof IllegalArgumentException,
Constant.OPERATION_RETRY_SLEEP));
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
index ccc2d0bce..c11ccc858 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
@@ -59,7 +59,7 @@ public class SinkRegisterOperation extends Operation
implements IdentifiedDataSe
task =
server.getTaskExecutionService().getTask(committerTaskID);
break;
} catch (NullPointerException e) {
- LOGGER.warning("can't get committer task , waiting task
started");
+ LOGGER.warning("can't get committer task , waiting task
started, retry " + i);
Thread.sleep(RETRY_INTERVAL);
}
}