This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c16cc2d11 [engine][checkpoint] future and thread pool resource
recovery (#3072)
c16cc2d11 is described below
commit c16cc2d11fef7c78b4785804db831f88352280b8
Author: Zongwen Li <[email protected]>
AuthorDate: Wed Oct 12 14:56:23 2022 +0800
[engine][checkpoint] future and thread pool resource recovery (#3072)
---
LICENSE | 9 +-
.../engine/core/checkpoint/CheckpointType.java | 2 +-
.../server/checkpoint/CheckpointCoordinator.java | 96 +++++++++++++---------
.../server/checkpoint/CheckpointException.java | 54 ++++++++++++
.../CheckpointFailureReason.java} | 33 ++++----
.../server/checkpoint/CheckpointManager.java | 33 +++++---
.../server/checkpoint/PendingCheckpoint.java | 40 +++++++--
.../server/task/SourceSplitEnumeratorTask.java | 4 +-
.../engine/server/task/record/Barrier.java | 11 ++-
.../engine/server/TaskExecutionServiceTest.java | 25 ++----
10 files changed, 203 insertions(+), 104 deletions(-)
diff --git a/LICENSE b/LICENSE
index b8d8a2446..2927f5026 100644
--- a/LICENSE
+++ b/LICENSE
@@ -209,7 +209,8 @@ The text of each license is the standard Apache 2.0 license.
tools/dependencies/checkLicense.sh files from
https://github.com/apache/skywalking
mvnw files from https://github.com/apache/maven-wrapper Apache 2.0
-seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/RowKind.java
from https://github.com/apache/flink
+seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/RowKind.java
from https://github.com/apache/flink
+seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointListener.java
from https://github.com/apache/flink
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/
from https://github.com/lightbend/config
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/
from https://github.com/apache/flink
seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/
from https://github.com/apache/iceberg
@@ -224,5 +225,7 @@
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engi
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobStatus.java
from https://github.com/apache/flink
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/ExecutionState.java
from https://github.com/apache/flink
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/PassiveCompletableFuture.java
from https://github.com/hazelcast/hazelcast
-
-
+seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointException.java
from https://github.com/apache/flink
+seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java
from https://github.com/apache/flink
+seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/InternalCheckpointListener.java
from https://github.com/apache/flink
+seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/StandaloneCheckpointIDCounter.java
from https://github.com/apache/flink
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
index b6a9818a8..fe30dc491 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
@@ -32,7 +32,7 @@ public enum CheckpointType {
/**
* Automatically triggered by the Task.
*/
- AUTO_SAVEPOINT_TYPE(true, "auto-savepoint");
+ COMPLETED_POINT_TYPE(true, "completed-point");
private final boolean auto;
private final String name;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 6fb96a950..6525e1beb 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.engine.server.checkpoint;
import static
org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneakyThrow;
-import static
org.apache.seatunnel.engine.core.checkpoint.CheckpointType.AUTO_SAVEPOINT_TYPE;
+import static
org.apache.seatunnel.engine.core.checkpoint.CheckpointType.COMPLETED_POINT_TYPE;
import static
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState.READY_START;
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
@@ -100,7 +100,7 @@ public class CheckpointCoordinator {
private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
- private CompletedCheckpoint latestCompletedCheckpoint;
+ private volatile CompletedCheckpoint latestCompletedCheckpoint;
private final CheckpointCoordinatorConfiguration coordinatorConfig;
@@ -113,7 +113,9 @@ public class CheckpointCoordinator {
private final Object lock = new Object();
- private final Object autoSavepointLock = new Object();
+ /** Flag marking the coordinator as shut down (not accepting any messages
any more). */
+ private volatile boolean shutdown;
+
public CheckpointCoordinator(CheckpointManager manager,
CheckpointStorage checkpointStorage,
CheckpointStorageConfiguration storageConfig,
@@ -221,6 +223,14 @@ public class CheckpointCoordinator {
}
}
+ private boolean canTriggered() {
+ return !isCompleted() && !isShutdown();
+ }
+
+ public boolean isShutdown() {
+ return shutdown;
+ }
+
public static Map<Long, Integer> getPipelineTasks(Set<TaskLocation>
pipelineSubtasks) {
return pipelineSubtasks.stream()
.collect(Collectors.groupingBy(TaskLocation::getTaskVertexId,
Collectors.toList()))
@@ -228,16 +238,20 @@ public class CheckpointCoordinator {
.collect(Collectors.toMap(Map.Entry::getKey, entry ->
entry.getValue().size()));
}
- public PassiveCompletableFuture<PendingCheckpoint> startSavepoint() {
+ public PassiveCompletableFuture<CompletedCheckpoint> startSavepoint() {
CompletableFuture<PendingCheckpoint> savepoint =
createPendingCheckpoint(Instant.now().toEpochMilli(),
CheckpointType.SAVEPOINT_TYPE);
startTriggerPendingCheckpoint(savepoint);
- return new PassiveCompletableFuture<>(savepoint);
+ return savepoint.join().getCompletableFuture();
}
private void
startTriggerPendingCheckpoint(CompletableFuture<PendingCheckpoint>
pendingCompletableFuture) {
- // Trigger the barrier and wait for all tasks to ACK
pendingCompletableFuture.thenAcceptAsync(pendingCheckpoint -> {
- if (AUTO_SAVEPOINT_TYPE != pendingCheckpoint.getCheckpointType()) {
+ LOG.debug("wait checkpoint completed: " + pendingCheckpoint);
+ PassiveCompletableFuture<CompletedCheckpoint> completableFuture =
pendingCheckpoint.getCompletableFuture();
+ completableFuture.thenAcceptAsync(this::completePendingCheckpoint);
+
+ if (COMPLETED_POINT_TYPE != pendingCheckpoint.getCheckpointType())
{
+ // Trigger the barrier and wait for all tasks to ACK
LOG.debug("trigger checkpoint barrier" + pendingCheckpoint);
CompletableFuture.supplyAsync(() ->
new
CheckpointBarrier(pendingCheckpoint.getCheckpointId(),
@@ -246,18 +260,13 @@ public class CheckpointCoordinator {
.thenApplyAsync(this::triggerCheckpoint)
.thenApplyAsync(invocationFutures ->
CompletableFuture.allOf(invocationFutures).join());
}
- LOG.debug("wait checkpoint completed: " + pendingCheckpoint);
- pendingCheckpoint.getCompletableFuture()
- .thenAcceptAsync(this::completePendingCheckpoint);
- });
- // If any task is not acked within the checkpoint timeout
- pendingCompletableFuture.thenAcceptAsync(pendingCheckpoint -> {
LOG.debug("Start a scheduled task to prevent checkpoint timeouts");
scheduler.schedule(() -> {
+ // If any task is not acked within the checkpoint timeout
if
(pendingCheckpoints.get(pendingCheckpoint.getCheckpointId()) != null &&
!pendingCheckpoint.isFullyAcknowledged()) {
if (tolerableFailureCheckpoints-- <= 0) {
- cleanPendingCheckpoint();
+
cleanPendingCheckpoint(CheckpointFailureReason.CHECKPOINT_EXPIRED);
// TODO: notify job master to restore the pipeline.
}
}
@@ -268,22 +277,24 @@ public class CheckpointCoordinator {
}
CompletableFuture<PendingCheckpoint> createPendingCheckpoint(long
triggerTimestamp, CheckpointType checkpointType) {
- CompletableFuture<Long> idFuture = CompletableFuture.supplyAsync(() ->
{
- try {
- // this must happen outside the coordinator-wide lock,
- // because it communicates with external services
- // (in HA mode) and may block for a while.
- return checkpointIdCounter.getAndIncrement();
- } catch (Throwable e) {
- throw new CompletionException(e);
- }
- });
- return createPendingCheckpoint(triggerTimestamp, idFuture,
checkpointType);
+ synchronized (lock) {
+ CompletableFuture<Long> idFuture =
CompletableFuture.supplyAsync(() -> {
+ try {
+ // this must happen outside the coordinator-wide lock,
+ // because it communicates with external services
+ // (in HA mode) and may block for a while.
+ return checkpointIdCounter.getAndIncrement();
+ } catch (Throwable e) {
+ throw new CompletionException(e);
+ }
+ });
+ return triggerPendingCheckpoint(triggerTimestamp, idFuture,
checkpointType);
+ }
}
- CompletableFuture<PendingCheckpoint> createPendingCheckpoint(long
triggerTimestamp, CompletableFuture<Long> idFuture, CheckpointType
checkpointType) {
+ CompletableFuture<PendingCheckpoint> triggerPendingCheckpoint(long
triggerTimestamp, CompletableFuture<Long> idFuture, CheckpointType
checkpointType) {
+ assert Thread.holdsLock(lock);
latestTriggerTimestamp.set(triggerTimestamp);
- CompletableFuture<PendingCheckpoint> completableFuture = new
CompletableFuture<>();
return idFuture.thenApplyAsync(checkpointId ->
new PendingCheckpoint(this.jobId,
this.plan.getPipelineId(),
@@ -292,8 +303,7 @@ public class CheckpointCoordinator {
checkpointType,
getNotYetAcknowledgedTasks(),
getTaskStatistics(),
- getActionStates(),
- completableFuture)
+ getActionStates())
).thenApplyAsync(pendingCheckpoint -> {
pendingCheckpoints.put(pendingCheckpoint.getCheckpointId(),
pendingCheckpoint);
return pendingCheckpoint;
@@ -335,9 +345,13 @@ public class CheckpointCoordinator {
.toArray(InvocationFuture[]::new);
}
- protected void cleanPendingCheckpoint() {
+ protected void cleanPendingCheckpoint(CheckpointFailureReason
failureReason) {
+ pendingCheckpoints.values().forEach(pendingCheckpoint ->
+ pendingCheckpoint.abortCheckpoint(failureReason, null)
+ );
// TODO: clear related future & scheduler task
pendingCheckpoints.clear();
+ scheduler.shutdown();
}
protected void acknowledgeTask(TaskAcknowledgeOperation ackOperation) {
@@ -345,13 +359,13 @@ public class CheckpointCoordinator {
final PendingCheckpoint pendingCheckpoint =
pendingCheckpoints.get(checkpointId);
TaskLocation location = ackOperation.getTaskLocation();
LOG.debug("task[{}]({}/{}) ack. {}", location.getTaskID(),
location.getPipelineId(), location.getJobId(),
ackOperation.getBarrier().toString());
- if (checkpointId == Barrier.PREPARE_CLOSE_BARRIER_ID) {
- synchronized (autoSavepointLock) {
- if (pendingCheckpoints.get(checkpointId) == null) {
- CompletableFuture<PendingCheckpoint> future =
createPendingCheckpoint(
+ if (ackOperation.getBarrier().getCheckpointType() ==
COMPLETED_POINT_TYPE) {
+ synchronized (lock) {
+ if (pendingCheckpoints.get(Barrier.PREPARE_CLOSE_BARRIER_ID)
== null) {
+ CompletableFuture<PendingCheckpoint> future =
triggerPendingCheckpoint(
Instant.now().toEpochMilli(),
CompletableFuture.completedFuture(Barrier.PREPARE_CLOSE_BARRIER_ID),
- AUTO_SAVEPOINT_TYPE);
+ COMPLETED_POINT_TYPE);
startTriggerPendingCheckpoint(future);
future.join();
}
@@ -370,11 +384,10 @@ public class CheckpointCoordinator {
SubtaskStatus.RUNNING);
}
- public void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint)
{
- LOG.info("pending checkpoint({}/{}@{}) completed!",
pendingCheckpoint.getCheckpointId(), pendingCheckpoint.getPipelineId(),
pendingCheckpoint.getJobId());
+ public void completePendingCheckpoint(CompletedCheckpoint
completedCheckpoint) {
+ LOG.info("pending checkpoint({}/{}@{}) completed!",
completedCheckpoint.getCheckpointId(), completedCheckpoint.getPipelineId(),
completedCheckpoint.getJobId());
pendingCounter.decrementAndGet();
- final long checkpointId = pendingCheckpoint.getCheckpointId();
- CompletedCheckpoint completedCheckpoint =
pendingCheckpoint.toCompletedCheckpoint();
+ final long checkpointId = completedCheckpoint.getCheckpointId();
pendingCheckpoints.remove(checkpointId);
if (pendingCheckpoints.size() + 1 ==
coordinatorConfig.getMaxConcurrentCheckpoints()) {
// latest checkpoint completed time > checkpoint interval
@@ -403,6 +416,9 @@ public class CheckpointCoordinator {
CompletableFuture.allOf(invocationFutures).join();
// TODO: notifyCheckpointCompleted fail
latestCompletedCheckpoint = completedCheckpoint;
+ if (isCompleted()) {
+
cleanPendingCheckpoint(CheckpointFailureReason.CHECKPOINT_COORDINATOR_COMPLETED);
+ }
}
public InvocationFuture<?>[] notifyCheckpointCompleted(long checkpointId) {
@@ -417,6 +433,6 @@ public class CheckpointCoordinator {
if (latestCompletedCheckpoint == null) {
return false;
}
- return latestCompletedCheckpoint.getCheckpointType() ==
AUTO_SAVEPOINT_TYPE;
+ return latestCompletedCheckpoint.getCheckpointType() ==
COMPLETED_POINT_TYPE;
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointException.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointException.java
new file mode 100644
index 000000000..32832f900
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointException.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/** Base class for checkpoint related exceptions. */
+public class CheckpointException extends Exception {
+
+ private static final long serialVersionUID = 3257526119022486948L;
+
+ private final CheckpointFailureReason checkpointFailureReason;
+
+ public CheckpointException(CheckpointFailureReason failureReason) {
+ super(failureReason.message());
+ this.checkpointFailureReason = checkNotNull(failureReason);
+ }
+
+ public CheckpointException(String message, CheckpointFailureReason
failureReason) {
+ super(message + " Failure reason: " + failureReason.message());
+ this.checkpointFailureReason = checkNotNull(failureReason);
+ }
+
+ public CheckpointException(CheckpointFailureReason failureReason,
Throwable cause) {
+ super(failureReason.message(), cause);
+ this.checkpointFailureReason = checkNotNull(failureReason);
+ }
+
+ public CheckpointException(
+ String message, CheckpointFailureReason failureReason, Throwable
cause) {
+ super(message + " Failure reason: " + failureReason.message(), cause);
+ this.checkpointFailureReason = checkNotNull(failureReason);
+ }
+
+ public CheckpointFailureReason getCheckpointFailureReason() {
+ return checkpointFailureReason;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
similarity index 60%
copy from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
copy to
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
index 322404baa..8d4276f5e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
@@ -15,27 +15,22 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.task.record;
+package org.apache.seatunnel.engine.server.checkpoint;
-/**
- * barrier flowing in data flow
- */
-public interface Barrier {
- Long PREPARE_CLOSE_BARRIER_ID = 0L;
+public enum CheckpointFailureReason {
+
+ TASK_FAILURE("Task has failed."),
+ CHECKPOINT_EXPIRED("Checkpoint expired before completing."),
+ CHECKPOINT_COORDINATOR_COMPLETED("CheckpointCoordinator completed."),
+ CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown.");
- /**
- * The ID of the barrier.
- */
- long getId();
+ private final String message;
- /**
- * Whether the task needs to perform a status snapshot after the barrier
is aligned.
- * For example, DDL barrier does not require a snapshot.
- */
- boolean snapshot();
+ CheckpointFailureReason(String message) {
+ this.message = message;
+ }
- /**
- * Barrier indicating that the task should prepare to close.
- */
- boolean prepareClose();
+ public String message() {
+ return message;
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index cde298c41..b62e1a454 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -38,6 +38,7 @@ import
org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import com.hazelcast.cluster.Address;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
+import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
@@ -50,6 +51,7 @@ import java.util.stream.Collectors;
* Maintain the life cycle of the {@link CheckpointCoordinator} through the
{@link CheckpointPlan} and the status of the job.
* </p>
*/
+@Slf4j
public class CheckpointManager {
private final Long jobId;
@@ -91,7 +93,7 @@ public class CheckpointManager {
* <br> After the savepoint is triggered, it will cause the job to stop
automatically.
*/
@SuppressWarnings("unchecked")
- public PassiveCompletableFuture<PendingCheckpoint>[] triggerSavepoints() {
+ public PassiveCompletableFuture<CompletedCheckpoint>[] triggerSavepoints()
{
return coordinatorMap.values()
.parallelStream()
.map(CheckpointCoordinator::startSavepoint)
@@ -102,12 +104,20 @@ public class CheckpointManager {
* Called by the JobMaster, actually triggered by the user.
* <br> After the savepoint is triggered, it will cause the pipeline to
stop automatically.
*/
- public PassiveCompletableFuture<PendingCheckpoint> triggerSavepoint(int
pipelineId) {
- return coordinatorMap.get(pipelineId).startSavepoint();
+ public PassiveCompletableFuture<CompletedCheckpoint> triggerSavepoint(int
pipelineId) {
+ return getCheckpointCoordinator(pipelineId).startSavepoint();
}
private CheckpointCoordinator getCheckpointCoordinator(TaskLocation
taskLocation) {
- return coordinatorMap.get(taskLocation.getPipelineId());
+ return getCheckpointCoordinator(taskLocation.getPipelineId());
+ }
+
+ private CheckpointCoordinator getCheckpointCoordinator(int pipelineId) {
+ CheckpointCoordinator coordinator = coordinatorMap.get(pipelineId);
+ if (coordinator == null) {
+ throw new RuntimeException(String.format("The checkpoint
coordinator(%s) don't exist", pipelineId));
+ }
+ return coordinator;
}
/**
@@ -131,7 +141,7 @@ public class CheckpointManager {
switch (executionState) {
case FAILED:
case CANCELED:
-
coordinatorMap.get(groupLocation.getPipelineId()).cleanPendingCheckpoint();
+
getCheckpointCoordinator(groupLocation.getPipelineId()).cleanPendingCheckpoint(CheckpointFailureReason.TASK_FAILURE);
return;
default:
}
@@ -142,11 +152,7 @@ public class CheckpointManager {
* <br> Returns whether the pipeline has completed; No need to
deploy/restore the {@link SubPlan} if the pipeline has been completed;
*/
public boolean isCompletedPipeline(int pipelineId) {
- CheckpointCoordinator coordinator = coordinatorMap.get(pipelineId);
- if (coordinator == null) {
- throw new RuntimeException(String.format("The checkpoint
coordinator(%s) don't exist", pipelineId));
- }
- return coordinator.isCompleted();
+ return getCheckpointCoordinator(pipelineId).isCompleted();
}
/**
@@ -154,7 +160,12 @@ public class CheckpointManager {
* <br> used for the ack of the checkpoint, including the state snapshot
of all {@link Action} within the {@link Task}.
*/
public void acknowledgeTask(TaskAcknowledgeOperation ackOperation) {
-
getCheckpointCoordinator(ackOperation.getTaskLocation()).acknowledgeTask(ackOperation);
+ CheckpointCoordinator coordinator =
getCheckpointCoordinator(ackOperation.getTaskLocation());
+ if (coordinator.isCompleted()) {
+ log.info("The checkpoint coordinator({}) is completed",
ackOperation.getTaskLocation().getPipelineId());
+ return;
+ }
+ coordinator.acknowledgeTask(ackOperation);
}
protected InvocationFuture<?> sendOperationToMemberNode(TaskOperation
operation) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
index 8432d0d57..91cb9013a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
@@ -22,7 +22,8 @@ import org.apache.seatunnel.engine.core.checkpoint.Checkpoint;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
-import lombok.AllArgsConstructor;
+import com.beust.jcommander.internal.Nullable;
+import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,7 +33,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-@AllArgsConstructor
public class PendingCheckpoint implements Checkpoint {
private static final Logger LOG =
LoggerFactory.getLogger(PendingCheckpoint.class);
private final long jobId;
@@ -51,7 +51,29 @@ public class PendingCheckpoint implements Checkpoint {
private final Map<Long, ActionState> actionStates;
- private final CompletableFuture<PendingCheckpoint> completableFuture;
+ private final CompletableFuture<CompletedCheckpoint> completableFuture;
+
+ @Getter
+ private CheckpointException failureCause;
+
+ public PendingCheckpoint(long jobId,
+ int pipelineId,
+ long checkpointId,
+ long triggerTimestamp,
+ CheckpointType checkpointType,
+ Set<Long> notYetAcknowledgedTasks,
+ Map<Long, TaskStatistics> taskStatistics,
+ Map<Long, ActionState> actionStates) {
+ this.jobId = jobId;
+ this.pipelineId = pipelineId;
+ this.checkpointId = checkpointId;
+ this.triggerTimestamp = triggerTimestamp;
+ this.checkpointType = checkpointType;
+ this.notYetAcknowledgedTasks = notYetAcknowledgedTasks;
+ this.taskStatistics = taskStatistics;
+ this.actionStates = actionStates;
+ this.completableFuture = new CompletableFuture<>();
+ }
@Override
public long getCheckpointId() {
@@ -86,7 +108,7 @@ public class PendingCheckpoint implements Checkpoint {
return actionStates;
}
- public PassiveCompletableFuture<PendingCheckpoint> getCompletableFuture() {
+ public PassiveCompletableFuture<CompletedCheckpoint>
getCompletableFuture() {
return new PassiveCompletableFuture<>(completableFuture);
}
@@ -114,7 +136,7 @@ public class PendingCheckpoint implements Checkpoint {
if (isFullyAcknowledged()) {
LOG.debug("checkpoint is full ack!");
- completableFuture.complete(this);
+ completableFuture.complete(toCompletedCheckpoint());
}
}
@@ -122,7 +144,7 @@ public class PendingCheckpoint implements Checkpoint {
return notYetAcknowledgedTasks.size() == 0;
}
- public CompletedCheckpoint toCompletedCheckpoint() {
+ private CompletedCheckpoint toCompletedCheckpoint() {
return new CompletedCheckpoint(
jobId,
pipelineId,
@@ -133,4 +155,10 @@ public class PendingCheckpoint implements Checkpoint {
actionStates,
taskStatistics);
}
+
+ public void abortCheckpoint(CheckpointFailureReason failureReason,
+ @Nullable Throwable cause) {
+ this.failureCause = new CheckpointException(failureReason, cause);
+ completableFuture.completeExceptionally(failureCause);
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index ebc253c6a..83fb06b82 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -29,7 +29,6 @@ import static
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTask
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
@@ -51,7 +50,6 @@ import lombok.NonNull;
import java.io.IOException;
import java.io.Serializable;
import java.net.URL;
-import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -233,7 +231,7 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
case RUNNING:
// The reader closes automatically after reading
if (prepareCloseStatus) {
- triggerBarrier(new
CheckpointBarrier(Barrier.PREPARE_CLOSE_BARRIER_ID,
Instant.now().toEpochMilli(), CheckpointType.AUTO_SAVEPOINT_TYPE));
+ triggerBarrier(Barrier.completedBarrier());
currState = PREPARE_CLOSE;
}
break;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
index 322404baa..5985fd2f4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java
@@ -17,11 +17,16 @@
package org.apache.seatunnel.engine.server.task.record;
+import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
+
+import java.time.Instant;
+
/**
* barrier flowing in data flow
*/
public interface Barrier {
- Long PREPARE_CLOSE_BARRIER_ID = 0L;
+ Long PREPARE_CLOSE_BARRIER_ID = Long.MAX_VALUE;
/**
* The ID of the barrier.
@@ -38,4 +43,8 @@ public interface Barrier {
* Barrier indicating that the task should prepare to close.
*/
boolean prepareClose();
+
+ static CheckpointBarrier completedBarrier() {
+ return new CheckpointBarrier(Barrier.PREPARE_CLOSE_BARRIER_ID,
Instant.now().toEpochMilli(), CheckpointType.COMPLETED_POINT_TYPE);
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
index 80d1cb42e..f12bff8cb 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java
@@ -36,6 +36,7 @@ import org.apache.seatunnel.engine.server.execution.TestTask;
import com.google.common.collect.Lists;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -61,26 +62,6 @@ public class TaskExecutionServiceTest extends
AbstractSeaTunnelServerTest {
}
@Test
- public void testAll() throws InterruptedException {
- LOGGER.info("----------start Cancel test----------");
- //testCancel();
-
- LOGGER.info("----------start Finish test----------");
- //testFinish();
-
- LOGGER.info("----------start Delay test----------");
- // This test will error while we have more and more test case.
- //testDelay();
- //testDelay();
-
- LOGGER.info("----------start ThrowException test----------");
- //testThrowException();
-
- LOGGER.info("----------start CriticalCallTime test----------");
- //testCriticalCallTime();
-
- }
-
public void testCancel() {
TaskExecutionService taskExecutionService =
server.getTaskExecutionService();
@@ -99,6 +80,7 @@ public class TaskExecutionServiceTest extends
AbstractSeaTunnelServerTest {
.untilAsserted(() -> assertEquals(CANCELED,
completableFuture.get().getExecutionState()));
}
+ @Test
public void testFinish() {
TaskExecutionService taskExecutionService =
server.getTaskExecutionService();
@@ -122,6 +104,7 @@ public class TaskExecutionServiceTest extends
AbstractSeaTunnelServerTest {
/**
* Test task execution time is the same as the timer timeout
*/
+ @Test
public void testCriticalCallTime() throws InterruptedException {
AtomicBoolean stopMark = new AtomicBoolean(false);
CopyOnWriteArrayList<Long> stopTime = new CopyOnWriteArrayList<>();
@@ -153,6 +136,7 @@ public class TaskExecutionServiceTest extends
AbstractSeaTunnelServerTest {
}
+ @Test
public void testThrowException() throws InterruptedException {
TaskExecutionService taskExecutionService =
server.getTaskExecutionService();
@@ -203,6 +187,7 @@ public class TaskExecutionServiceTest extends
AbstractSeaTunnelServerTest {
.untilAsserted(() -> assertEquals(FINISHED,
taskCts.get().getExecutionState()));
}
+ @RepeatedTest(2)
public void testDelay() throws InterruptedException {
long lowLagSleep = 10;