This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c225ab144 [hotfix][engine][checkpoint] fix state restoring error
(#3114)
c225ab144 is described below
commit c225ab1447bf4d1d6f8ec4863d81c73e2b4f0610
Author: Zongwen Li <[email protected]>
AuthorDate: Mon Oct 17 14:34:02 2022 +0800
[hotfix][engine][checkpoint] fix state restoring error (#3114)
---
.../server/checkpoint/CheckpointCoordinator.java | 19 +++++++--
.../engine/server/checkpoint/CheckpointPlan.java | 16 ++++++++
.../server/dag/physical/PhysicalPlanGenerator.java | 45 +++++++++++++++-------
.../engine/server/task/SeaTunnelTask.java | 16 ++++++--
.../operation/source/RestoredSplitOperation.java | 6 ++-
5 files changed, 80 insertions(+), 22 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 2a1ef88d7..df0e06122 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.checkpoint;
import static
org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneakyThrow;
import static
org.apache.seatunnel.engine.core.checkpoint.CheckpointType.COMPLETED_POINT_TYPE;
+import static
org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan.COORDINATOR_INDEX;
import static
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState.READY_START;
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
@@ -182,10 +183,20 @@ public class CheckpointCoordinator {
List<ActionSubtaskState> states = new ArrayList<>();
if (latestCompletedCheckpoint != null) {
final Integer currentParallelism =
pipelineTasks.get(taskLocation.getTaskVertexId());
- final ActionState actionState =
latestCompletedCheckpoint.getTaskStates().get(taskLocation.getTaskVertexId());
- for (int i = taskLocation.getTaskIndex(); i <
actionState.getParallelism(); i += currentParallelism) {
- states.add(actionState.getSubtaskStates()[i]);
- }
+ plan.getSubtaskActions().get(taskLocation)
+ .forEach(tuple -> {
+ ActionState actionState =
latestCompletedCheckpoint.getTaskStates().get(tuple.f0());
+ if (actionState == null) {
+ return;
+ }
+ if (COORDINATOR_INDEX.equals(tuple.f1())) {
+ states.add(actionState.getCoordinatorState());
+ return;
+ }
+ for (int i = tuple.f1(); i < actionState.getParallelism();
i += currentParallelism) {
+ states.add(actionState.getSubtaskStates()[i]);
+ }
+ });
}
checkpointManager.sendOperationToMemberNode(new
NotifyTaskRestoreOperation(taskLocation, states));
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
index 0dd1c0dc5..2859c96c3 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.checkpoint;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import com.hazelcast.jet.datamodel.Tuple2;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -37,6 +38,8 @@ import java.util.Set;
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public class CheckpointPlan {
+ public static final Integer COORDINATOR_INDEX = -1;
+
private final int pipelineId;
/**
@@ -56,11 +59,19 @@ public class CheckpointPlan {
*/
private final Map<Long, Integer> pipelineActions;
+ /**
+ * <br> key: the subtask locations;
+ * <br> value: all actions in this subtask; f0: action id, f1: action
index;
+ */
+ private final Map<TaskLocation, Set<Tuple2<Long, Integer>>> subtaskActions;
+
public static final class Builder {
private final Set<TaskLocation> pipelineSubtasks = new HashSet<>();
private final Set<TaskLocation> startingSubtasks = new HashSet<>();
private final Map<Long, Integer> pipelineActions = new HashMap<>();
+ private final Map<TaskLocation, Set<Tuple2<Long, Integer>>>
subtaskActions = new HashMap<>();
+
private Builder() {
}
@@ -78,5 +89,10 @@ public class CheckpointPlan {
this.pipelineActions.putAll(pipelineActions);
return this;
}
+
+ public Builder subtaskActions(Map<TaskLocation, Set<Tuple2<Long,
Integer>>> subtaskActions) {
+ this.subtaskActions.putAll(subtaskActions);
+ return this;
+ }
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 90bf6336f..1e96c4ba5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -111,6 +111,12 @@ public class PhysicalPlanGenerator {
*/
private final Set<TaskLocation> startingTasks;
+ /**
+ * <br> key: the subtask locations;
+ * <br> value: all actions in this subtask; f0: action id, f1: action
index;
+ */
+ private final Map<TaskLocation, Set<Tuple2<Long, Integer>>> subtaskActions;
+
private final IMap<Object, Object> runningJobStateIMap;
private final IMap<Object, Object> runningJobStateTimestampsIMap;
@@ -132,6 +138,7 @@ public class PhysicalPlanGenerator {
// the checkpoint of a pipeline
this.pipelineTasks = new HashSet<>();
this.startingTasks = new HashSet<>();
+ this.subtaskActions = new HashMap<>();
this.runningJobStateIMap = runningJobStateIMap;
this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
}
@@ -147,6 +154,7 @@ public class PhysicalPlanGenerator {
Stream<SubPlan> subPlanStream = pipelines.stream().map(pipeline -> {
this.pipelineTasks.clear();
this.startingTasks.clear();
+ this.subtaskActions.clear();
final int pipelineId = pipeline.getId();
final List<ExecutionEdge> edges = pipeline.getEdges();
@@ -172,6 +180,7 @@ public class PhysicalPlanGenerator {
.pipelineSubtasks(pipelineTasks)
.startingSubtasks(startingTasks)
.pipelineActions(pipeline.getActions())
+ .subtaskActions(subtaskActions)
.build());
return new SubPlan(pipelineId,
totalPipelineNum,
@@ -207,10 +216,10 @@ public class PhysicalPlanGenerator {
.collect(Collectors.toList());
return collect.stream().map(s -> (SinkAction<?, ?, ?, ?>)
s.getRightVertex().getAction())
- .map(s -> {
+ .map(sinkAction -> {
Optional<? extends SinkAggregatedCommitter<?, ?>>
sinkAggregatedCommitter;
try {
- sinkAggregatedCommitter =
s.getSink().createAggregatedCommitter();
+ sinkAggregatedCommitter =
sinkAction.getSink().createAggregatedCommitter();
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -222,22 +231,23 @@ public class PhysicalPlanGenerator {
new
TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex,
taskGroupID);
TaskLocation taskLocation = new
TaskLocation(taskGroupLocation, taskTypeId, 0);
SinkAggregatedCommitterTask<?, ?> t =
- new
SinkAggregatedCommitterTask(jobImmutableInformation.getJobId(), taskLocation, s,
+ new
SinkAggregatedCommitterTask(jobImmutableInformation.getJobId(), taskLocation,
sinkAction,
sinkAggregatedCommitter.get());
- committerTaskIDMap.put(s, taskLocation);
+ committerTaskIDMap.put(sinkAction, taskLocation);
// checkpoint
pipelineTasks.add(taskLocation);
+ subtaskActions.put(taskLocation,
Collections.singleton(Tuple2.tuple2(sinkAction.getId(), -1)));
return new PhysicalVertex(atomicInteger.incrementAndGet(),
executorService,
collect.size(),
- new TaskGroupDefaultImpl(taskGroupLocation,
s.getName() + "-AggregatedCommitterTask",
+ new TaskGroupDefaultImpl(taskGroupLocation,
sinkAction.getName() + "-AggregatedCommitterTask",
Lists.newArrayList(t)),
flakeIdGenerator,
pipelineIndex,
totalPipelineNum,
- s.getJarUrls(),
+ sinkAction.getJarUrls(),
jobImmutableInformation,
initializationTimestamp,
nodeEngine,
@@ -268,7 +278,7 @@ public class PhysicalPlanGenerator {
SeaTunnelTask seaTunnelTask =
new
TransformSeaTunnelTask(jobImmutableInformation.getJobId(), taskLocation, i,
flow);
// checkpoint
- pipelineTasks.add(taskLocation);
+ fillCheckpointPlan(seaTunnelTask);
t.add(new PhysicalVertex(
i,
executorService,
@@ -295,24 +305,25 @@ public class PhysicalPlanGenerator {
int totalPipelineNum) {
AtomicInteger atomicInteger = new AtomicInteger(-1);
- return sources.stream().map(s -> {
+ return sources.stream().map(sourceAction -> {
long taskGroupID = idGenerator.getNextId();
long taskTypeId = idGenerator.getNextId();
TaskGroupLocation taskGroupLocation =
new TaskGroupLocation(jobImmutableInformation.getJobId(),
pipelineIndex, taskGroupID);
TaskLocation taskLocation = new TaskLocation(taskGroupLocation,
taskTypeId, 0);
SourceSplitEnumeratorTask<?> t =
- new
SourceSplitEnumeratorTask<>(jobImmutableInformation.getJobId(), taskLocation,
s);
+ new
SourceSplitEnumeratorTask<>(jobImmutableInformation.getJobId(), taskLocation,
sourceAction);
// checkpoint
pipelineTasks.add(taskLocation);
startingTasks.add(taskLocation);
- enumeratorTaskIDMap.put(s, taskLocation);
+ subtaskActions.put(taskLocation,
Collections.singleton(Tuple2.tuple2(sourceAction.getId(), -1)));
+ enumeratorTaskIDMap.put(sourceAction, taskLocation);
return new PhysicalVertex(
atomicInteger.incrementAndGet(),
executorService,
sources.size(),
- new TaskGroupDefaultImpl(taskGroupLocation, s.getName() +
"-SplitEnumerator",
+ new TaskGroupDefaultImpl(taskGroupLocation,
sourceAction.getName() + "-SplitEnumerator",
Lists.newArrayList(t)),
flakeIdGenerator,
pipelineIndex,
@@ -352,8 +363,6 @@ public class PhysicalPlanGenerator {
flowTaskIDPrefixMap.computeIfAbsent(f.getFlowID(), id ->
idGenerator.getNextId());
final TaskLocation taskLocation =
new TaskLocation(taskGroupLocation,
taskIDPrefix, finalParallelismIndex);
- // checkpoint
- pipelineTasks.add(taskLocation);
if (f instanceof PhysicalExecutionFlow) {
return new
SourceSeaTunnelTask<>(jobImmutableInformation.getJobId(),
taskLocation,
@@ -363,7 +372,7 @@ public class PhysicalPlanGenerator {
taskLocation,
finalParallelismIndex, f);
}
- }).collect(Collectors.toList());
+
}).peek(this::fillCheckpointPlan).collect(Collectors.toList());
Set<URL> jars =
taskList.stream().flatMap(task ->
task.getJarsUrl().stream()).collect(Collectors.toSet());
@@ -408,6 +417,14 @@ public class PhysicalPlanGenerator {
}).collect(Collectors.toList());
}
+ private void fillCheckpointPlan(SeaTunnelTask task) {
+ pipelineTasks.add(task.getTaskLocation());
+ subtaskActions.put(task.getTaskLocation(),
+ task.getActionIds().stream()
+ .map(id -> Tuple2.tuple2(id,
task.getTaskLocation().getTaskIndex()))
+ .collect(Collectors.toSet()));
+ }
+
/**
* set config for flow, some flow should have config support for execute
on task.
*
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 5379e1d60..437ecd74b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -31,6 +31,7 @@ import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.engine.common.utils.ConsumerWithException;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.PartitionTransformAction;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
@@ -75,6 +76,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
import java.util.stream.Collectors;
public abstract class SeaTunnelTask extends AbstractTask {
@@ -231,21 +233,29 @@ public abstract class SeaTunnelTask extends AbstractTask {
@Override
public Set<URL> getJarsUrl() {
+ return getFlowInfo((action, set) -> set.addAll(action.getJarUrls()));
+ }
+
+ public Set<Long> getActionIds() {
+ return getFlowInfo((action, set) -> set.add(action.getId()));
+ }
+
+ private <T> Set<T> getFlowInfo(BiConsumer<Action, Set<T>> function) {
List<Flow> now = new ArrayList<>();
now.add(executionFlow);
- Set<URL> urls = new HashSet<>();
+ Set<T> result = new HashSet<>();
while (!now.isEmpty()) {
final List<Flow> next = new ArrayList<>();
now.forEach(n -> {
if (n instanceof PhysicalExecutionFlow) {
- urls.addAll(((PhysicalExecutionFlow)
n).getAction().getJarUrls());
+ function.accept(((PhysicalExecutionFlow) n).getAction(),
result);
}
next.addAll(n.getNext());
});
now.clear();
now.addAll(next);
}
- return urls;
+ return result;
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
index 30df27151..4da0c2896 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
@@ -34,6 +34,7 @@ import com.hazelcast.nio.ObjectDataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
public class RestoredSplitOperation extends TaskOperation {
@@ -78,7 +79,10 @@ public class RestoredSplitOperation extends TaskOperation {
SeaTunnelServer server = getService();
TaskExecutionService taskExecutionService =
server.getTaskExecutionService();
ClassLoader classLoader =
taskExecutionService.getExecutionContext(taskLocation.getTaskGroupLocation()).getClassLoader();
- List<SourceSplit> deserialize =
Arrays.asList(SerializationUtils.deserialize(splits, classLoader));
+
+ List<SourceSplit> deserialize = Arrays.stream((Object[])
SerializationUtils.deserialize(splits, classLoader))
+ .map(o -> (SourceSplit) o)
+ .collect(Collectors.toList());
RetryUtils.retryWithException(() -> {
SourceSplitEnumeratorTask<SourceSplit> task =
taskExecutionService.getTask(taskLocation);
task.addSplitsBack(deserialize, subtaskIndex);