This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 333dc99cab [Improve][Zeta] Add restore when commit failed (#6101)
333dc99cab is described below
commit 333dc99cab0821c1022e8b5b302728ff16458a7c
Author: Jia Fan <[email protected]>
AuthorDate: Thu Jan 11 17:37:55 2024 +0800
[Improve][Zeta] Add restore when commit failed (#6101)
* [Improve][Zeta] Add restore when commit failed
* update
* update
---
.../server/checkpoint/CheckpointCoordinator.java | 7 +-
.../server/checkpoint/CheckpointManager.java | 4 +-
.../engine/server/master/JobMasterTest.java | 96 +++++++++++++---------
3 files changed, 65 insertions(+), 42 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index b9bd7b6498..6a88f169fc 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -45,6 +45,7 @@ import
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import lombok.Getter;
@@ -260,7 +261,8 @@ public class CheckpointCoordinator {
});
}
- private void handleCoordinatorError(String message, Throwable e,
CheckpointCloseReason reason) {
+ @VisibleForTesting
+ public void handleCoordinatorError(String message, Throwable e,
CheckpointCloseReason reason) {
LOG.error(message, e);
handleCoordinatorError(reason, e);
}
@@ -277,8 +279,7 @@ public class CheckpointCoordinator {
checkpointCoordinatorFuture.complete(
new CheckpointCoordinatorState(
CheckpointCoordinatorStatus.FAILED,
errorByPhysicalVertex.get()));
- checkpointManager.handleCheckpointError(
- pipelineId,
reason.equals(CheckpointCloseReason.CHECKPOINT_NOTIFY_COMPLETE_FAILED));
+ checkpointManager.handleCheckpointError(pipelineId, false);
}
private void restoreTaskState(TaskLocation taskLocation) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 941af4b791..d915cb5a79 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -44,6 +44,7 @@ import
org.apache.seatunnel.engine.server.task.operation.TaskOperation;
import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
+import com.google.common.annotations.VisibleForTesting;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
@@ -188,7 +189,8 @@ public class CheckpointManager {
getCheckpointCoordinator(taskLocation).reportCheckpointErrorFromTask(errorMsg);
}
- private CheckpointCoordinator getCheckpointCoordinator(int pipelineId) {
+ @VisibleForTesting
+ public CheckpointCoordinator getCheckpointCoordinator(int pipelineId) {
CheckpointCoordinator coordinator = coordinatorMap.get(pipelineId);
if (coordinator == null) {
throw new RuntimeException(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
index 5c5c9a132d..dd886d923e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.TestUtils;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointCloseReason;
import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
@@ -51,8 +52,6 @@ import static org.awaitility.Awaitility.await;
@DisabledOnOs(OS.WINDOWS)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class JobMasterTest extends AbstractSeaTunnelServerTest {
- private static Long JOB_ID;
-
/**
* IMap key is jobId and value is a Tuple2 Tuple2 key is JobMaster init
timestamp and value is
* the jobImmutableInformation which is sent by client when submit job
@@ -103,40 +102,12 @@ public class JobMasterTest extends
AbstractSeaTunnelServerTest {
@BeforeAll
public void before() {
super.before();
- JOB_ID =
instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId();
}
@Test
public void testHandleCheckpointTimeout() throws Exception {
- LogicalDag testLogicalDag =
- TestUtils.createTestLogicalPlan(
- "stream_fakesource_to_file.conf",
"test_clear_coordinator_service", JOB_ID);
-
- JobImmutableInformation jobImmutableInformation =
- new JobImmutableInformation(
- JOB_ID,
- "Test",
-
nodeEngine.getSerializationService().toData(testLogicalDag),
- testLogicalDag.getJobConfig(),
- Collections.emptyList(),
- Collections.emptyList());
-
- Data data =
nodeEngine.getSerializationService().toData(jobImmutableInformation);
-
- PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
- server.getCoordinatorService().submitJob(JOB_ID, data);
- voidPassiveCompletableFuture.join();
-
- JobMaster jobMaster =
server.getCoordinatorService().getJobMaster(JOB_ID);
-
- // waiting for job status turn to running
- await().atMost(120000, TimeUnit.MILLISECONDS)
- .untilAsserted(
- () -> Assertions.assertEquals(JobStatus.RUNNING,
jobMaster.getJobStatus()));
-
- // Because handleCheckpointTimeout is an async method, so we need
sleep 5s to waiting job
- // status become running again
- Thread.sleep(5000);
+ long jobId =
instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId();
+ JobMaster jobMaster = newJobInstanceWithRunningState(jobId);
jobMaster.neverNeedRestore();
// call checkpoint timeout
@@ -164,10 +135,10 @@ public class JobMasterTest extends
AbstractSeaTunnelServerTest {
.get()
.getStatus()))));
- testIMapRemovedAfterJobComplete(jobMaster);
+ testIMapRemovedAfterJobComplete(jobId, jobMaster);
}
- private void testIMapRemovedAfterJobComplete(JobMaster jobMaster) {
+ private void testIMapRemovedAfterJobComplete(long jobId, JobMaster
jobMaster) {
runningJobInfoIMap =
nodeEngine.getHazelcastInstance().getMap("runningJobInfo");
runningJobStateIMap =
nodeEngine.getHazelcastInstance().getMap("runningJobState");
runningJobStateTimestampsIMap =
nodeEngine.getHazelcastInstance().getMap("stateTimestamps");
@@ -176,10 +147,10 @@ public class JobMasterTest extends
AbstractSeaTunnelServerTest {
await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
-
Assertions.assertNull(runningJobInfoIMap.get(JOB_ID));
-
Assertions.assertNull(runningJobStateIMap.get(JOB_ID));
-
Assertions.assertNull(runningJobStateTimestampsIMap.get(JOB_ID));
-
Assertions.assertNull(ownedSlotProfilesIMap.get(JOB_ID));
+
Assertions.assertNull(runningJobInfoIMap.get(jobId));
+
Assertions.assertNull(runningJobStateIMap.get(jobId));
+
Assertions.assertNull(runningJobStateTimestampsIMap.get(jobId));
+
Assertions.assertNull(ownedSlotProfilesIMap.get(jobId));
jobMaster
.getPhysicalPlan()
@@ -231,4 +202,53 @@ public class JobMasterTest extends
AbstractSeaTunnelServerTest {
});
});
}
+
+ @Test
+ public void testCommitFailedWillRestore() throws Exception {
+ long jobId =
instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId();
+ JobMaster jobMaster = newJobInstanceWithRunningState(jobId);
+
+ // call checkpoint timeout
+ jobMaster
+ .getCheckpointManager()
+ .getCheckpointCoordinator(1)
+ .handleCoordinatorError(
+ "commit failed",
+ new RuntimeException(),
+ CheckpointCloseReason.AGGREGATE_COMMIT_ERROR);
+ Assertions.assertTrue(jobMaster.isNeedRestore());
+ }
+
+ private JobMaster newJobInstanceWithRunningState(long jobId) throws
InterruptedException {
+ LogicalDag testLogicalDag =
+ TestUtils.createTestLogicalPlan(
+ "stream_fakesource_to_file.conf",
"test_clear_coordinator_service", jobId);
+
+ JobImmutableInformation jobImmutableInformation =
+ new JobImmutableInformation(
+ jobId,
+ "Test",
+
nodeEngine.getSerializationService().toData(testLogicalDag),
+ testLogicalDag.getJobConfig(),
+ Collections.emptyList(),
+ Collections.emptyList());
+
+ Data data =
nodeEngine.getSerializationService().toData(jobImmutableInformation);
+
+ PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
+ server.getCoordinatorService().submitJob(jobId, data);
+ voidPassiveCompletableFuture.join();
+
+ JobMaster jobMaster =
server.getCoordinatorService().getJobMaster(jobId);
+
+ // waiting for job status turn to running
+ await().atMost(120000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> Assertions.assertEquals(JobStatus.RUNNING,
jobMaster.getJobStatus()));
+
+ // Because handleCheckpointTimeout is an async method, so we need
sleep 5s to waiting job
+ // status become running again
+ Thread.sleep(5000);
+ return jobMaster;
+ }
}