Hisoka-X commented on code in PR #2567:
URL:
https://github.com/apache/incubator-seatunnel/pull/2567#discussion_r962695101
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java:
##########
@@ -154,65 +150,71 @@ public PassiveCompletableFuture<TaskExecutionState>
initStateFuture() {
private void deployOnLocal(@NonNull SlotProfile slotProfile) {
deployInternal(taskGroupImmutableInformation -> {
SeaTunnelServer server =
nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
- return new
PassiveCompletableFuture<>(server.getSlotService().getSlotContext(slotProfile)
-
.getTaskExecutionService().deployTask(taskGroupImmutableInformation));
+ server.getSlotService().getSlotContext(slotProfile)
+
.getTaskExecutionService().deployTask(taskGroupImmutableInformation);
+ return null;
});
}
private void deployOnRemote(@NonNull SlotProfile slotProfile) {
- deployInternal(taskGroupImmutableInformation -> new
PassiveCompletableFuture<>(
-
nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
- new DeployTaskOperation(slotProfile,
-
nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
- slotProfile.getWorker())
- .invoke()));
+ deployInternal(taskGroupImmutableInformation -> {
+ try {
+
nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
+ new DeployTaskOperation(slotProfile,
+
nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
+ slotProfile.getWorker())
+ .invoke().get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
}
@SuppressWarnings("checkstyle:MagicNumber")
// This method must not throw an exception
public void deploy(@NonNull SlotProfile slotProfile) {
- currentExecutionAddress = slotProfile.getWorker();
- if (slotProfile.getWorker().equals(nodeEngine.getThisAddress())) {
- deployOnLocal(slotProfile);
- } else {
- deployOnRemote(slotProfile);
+ try {
+ currentExecutionAddress = slotProfile.getWorker();
+ if (slotProfile.getWorker().equals(nodeEngine.getThisAddress())) {
+ deployOnLocal(slotProfile);
+ } else {
+ deployOnRemote(slotProfile);
+ }
+ } catch (Throwable th) {
+ failedByException(th);
}
}
- private void deployInternal(Function<TaskGroupImmutableInformation,
PassiveCompletableFuture<TaskExecutionState>> deployMethod) {
+ private void deployInternal(
+ Function<TaskGroupImmutableInformation,
PassiveCompletableFuture<TaskExecutionState>> deployMethod) {
Review Comment:
Change `Function` to `Consumer`, because deploy method doesn't return
anything at now.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -189,4 +190,28 @@ public JobStatus getJobStatus() {
public PhysicalPlan getPhysicalPlan() {
return physicalPlan;
}
+
+ public void updateTaskExecutionState(TaskExecutionState
taskExecutionState) {
+ this.physicalPlan.getPipelineList().forEach(pipeline -> {
+ if (pipeline.getPipelineIndex() !=
taskExecutionState.getTaskGroupLocation().getPipelineId()) {
+ return;
+ }
+
+ pipeline.getCoordinatorVertexList().forEach(task -> {
+ if
(task.getTaskGroupLocation().equals(taskExecutionState.getTaskGroupLocation()))
{
+ return;
Review Comment:
Why `equals` but not `not equals`?
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -189,4 +190,28 @@ public JobStatus getJobStatus() {
public PhysicalPlan getPhysicalPlan() {
return physicalPlan;
}
+
+ public void updateTaskExecutionState(TaskExecutionState
taskExecutionState) {
+ this.physicalPlan.getPipelineList().forEach(pipeline -> {
+ if (pipeline.getPipelineIndex() !=
taskExecutionState.getTaskGroupLocation().getPipelineId()) {
+ return;
+ }
+
+ pipeline.getCoordinatorVertexList().forEach(task -> {
+ if
(task.getTaskGroupLocation().equals(taskExecutionState.getTaskGroupLocation()))
{
+ return;
+ }
+
+ task.updateTaskExecutionState(taskExecutionState);
+ });
+
+ pipeline.getPhysicalVertexList().forEach(task -> {
+ if
(task.getTaskGroupLocation().equals(taskExecutionState.getTaskGroupLocation()))
{
+ return;
Review Comment:
same as above
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -189,4 +190,28 @@ public JobStatus getJobStatus() {
public PhysicalPlan getPhysicalPlan() {
return physicalPlan;
}
+
+ public void updateTaskExecutionState(TaskExecutionState
taskExecutionState) {
+ this.physicalPlan.getPipelineList().forEach(pipeline -> {
+ if (pipeline.getPipelineIndex() !=
taskExecutionState.getTaskGroupLocation().getPipelineId()) {
Review Comment:
unify pipelineIndex and PipelineId maybe better
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]