This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 333dc99cab [Improve][Zeta] Add restore when commit failed (#6101)
333dc99cab is described below

commit 333dc99cab0821c1022e8b5b302728ff16458a7c
Author: Jia Fan <[email protected]>
AuthorDate: Thu Jan 11 17:37:55 2024 +0800

    [Improve][Zeta] Add restore when commit failed (#6101)
    
    * [Improve][Zeta] Add restore when commit failed
    
    * update
    
    * update
---
 .../server/checkpoint/CheckpointCoordinator.java   |  7 +-
 .../server/checkpoint/CheckpointManager.java       |  4 +-
 .../engine/server/master/JobMasterTest.java        | 96 +++++++++++++---------
 3 files changed, 65 insertions(+), 42 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index b9bd7b6498..6a88f169fc 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -45,6 +45,7 @@ import 
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 import lombok.Getter;
@@ -260,7 +261,8 @@ public class CheckpointCoordinator {
                         });
     }
 
-    private void handleCoordinatorError(String message, Throwable e, 
CheckpointCloseReason reason) {
+    @VisibleForTesting
+    public void handleCoordinatorError(String message, Throwable e, 
CheckpointCloseReason reason) {
         LOG.error(message, e);
         handleCoordinatorError(reason, e);
     }
@@ -277,8 +279,7 @@ public class CheckpointCoordinator {
         checkpointCoordinatorFuture.complete(
                 new CheckpointCoordinatorState(
                         CheckpointCoordinatorStatus.FAILED, 
errorByPhysicalVertex.get()));
-        checkpointManager.handleCheckpointError(
-                pipelineId, 
reason.equals(CheckpointCloseReason.CHECKPOINT_NOTIFY_COMPLETE_FAILED));
+        checkpointManager.handleCheckpointError(pipelineId, false);
     }
 
     private void restoreTaskState(TaskLocation taskLocation) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 941af4b791..d915cb5a79 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -44,6 +44,7 @@ import 
org.apache.seatunnel.engine.server.task.operation.TaskOperation;
 import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
 import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngine;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
@@ -188,7 +189,8 @@ public class CheckpointManager {
         
getCheckpointCoordinator(taskLocation).reportCheckpointErrorFromTask(errorMsg);
     }
 
-    private CheckpointCoordinator getCheckpointCoordinator(int pipelineId) {
+    @VisibleForTesting
+    public CheckpointCoordinator getCheckpointCoordinator(int pipelineId) {
         CheckpointCoordinator coordinator = coordinatorMap.get(pipelineId);
         if (coordinator == null) {
             throw new RuntimeException(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
index 5c5c9a132d..dd886d923e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
 import org.apache.seatunnel.engine.server.TestUtils;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointCloseReason;
 import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
@@ -51,8 +52,6 @@ import static org.awaitility.Awaitility.await;
 @DisabledOnOs(OS.WINDOWS)
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class JobMasterTest extends AbstractSeaTunnelServerTest {
-    private static Long JOB_ID;
-
     /**
      * IMap key is jobId and value is a Tuple2 Tuple2 key is JobMaster init 
timestamp and value is
      * the jobImmutableInformation which is sent by client when submit job
@@ -103,40 +102,12 @@ public class JobMasterTest extends 
AbstractSeaTunnelServerTest {
     @BeforeAll
     public void before() {
         super.before();
-        JOB_ID = 
instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId();
     }
 
     @Test
     public void testHandleCheckpointTimeout() throws Exception {
-        LogicalDag testLogicalDag =
-                TestUtils.createTestLogicalPlan(
-                        "stream_fakesource_to_file.conf", 
"test_clear_coordinator_service", JOB_ID);
-
-        JobImmutableInformation jobImmutableInformation =
-                new JobImmutableInformation(
-                        JOB_ID,
-                        "Test",
-                        
nodeEngine.getSerializationService().toData(testLogicalDag),
-                        testLogicalDag.getJobConfig(),
-                        Collections.emptyList(),
-                        Collections.emptyList());
-
-        Data data = 
nodeEngine.getSerializationService().toData(jobImmutableInformation);
-
-        PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
-                server.getCoordinatorService().submitJob(JOB_ID, data);
-        voidPassiveCompletableFuture.join();
-
-        JobMaster jobMaster = 
server.getCoordinatorService().getJobMaster(JOB_ID);
-
-        // waiting for job status turn to running
-        await().atMost(120000, TimeUnit.MILLISECONDS)
-                .untilAsserted(
-                        () -> Assertions.assertEquals(JobStatus.RUNNING, 
jobMaster.getJobStatus()));
-
-        // Because handleCheckpointTimeout is an async method, so we need 
sleep 5s to waiting job
-        // status become running again
-        Thread.sleep(5000);
+        long jobId = 
instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId();
+        JobMaster jobMaster = newJobInstanceWithRunningState(jobId);
 
         jobMaster.neverNeedRestore();
         // call checkpoint timeout
@@ -164,10 +135,10 @@ public class JobMasterTest extends 
AbstractSeaTunnelServerTest {
                                                                         .get()
                                                                         
.getStatus()))));
 
-        testIMapRemovedAfterJobComplete(jobMaster);
+        testIMapRemovedAfterJobComplete(jobId, jobMaster);
     }
 
-    private void testIMapRemovedAfterJobComplete(JobMaster jobMaster) {
+    private void testIMapRemovedAfterJobComplete(long jobId, JobMaster 
jobMaster) {
         runningJobInfoIMap = 
nodeEngine.getHazelcastInstance().getMap("runningJobInfo");
         runningJobStateIMap = 
nodeEngine.getHazelcastInstance().getMap("runningJobState");
         runningJobStateTimestampsIMap = 
nodeEngine.getHazelcastInstance().getMap("stateTimestamps");
@@ -176,10 +147,10 @@ public class JobMasterTest extends 
AbstractSeaTunnelServerTest {
         await().atMost(60000, TimeUnit.MILLISECONDS)
                 .untilAsserted(
                         () -> {
-                            
Assertions.assertNull(runningJobInfoIMap.get(JOB_ID));
-                            
Assertions.assertNull(runningJobStateIMap.get(JOB_ID));
-                            
Assertions.assertNull(runningJobStateTimestampsIMap.get(JOB_ID));
-                            
Assertions.assertNull(ownedSlotProfilesIMap.get(JOB_ID));
+                            
Assertions.assertNull(runningJobInfoIMap.get(jobId));
+                            
Assertions.assertNull(runningJobStateIMap.get(jobId));
+                            
Assertions.assertNull(runningJobStateTimestampsIMap.get(jobId));
+                            
Assertions.assertNull(ownedSlotProfilesIMap.get(jobId));
 
                             jobMaster
                                     .getPhysicalPlan()
@@ -231,4 +202,53 @@ public class JobMasterTest extends 
AbstractSeaTunnelServerTest {
                                             });
                         });
     }
+
+    @Test
+    public void testCommitFailedWillRestore() throws Exception {
+        long jobId = 
instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId();
+        JobMaster jobMaster = newJobInstanceWithRunningState(jobId);
+
+        // call checkpoint timeout
+        jobMaster
+                .getCheckpointManager()
+                .getCheckpointCoordinator(1)
+                .handleCoordinatorError(
+                        "commit failed",
+                        new RuntimeException(),
+                        CheckpointCloseReason.AGGREGATE_COMMIT_ERROR);
+        Assertions.assertTrue(jobMaster.isNeedRestore());
+    }
+
+    private JobMaster newJobInstanceWithRunningState(long jobId) throws 
InterruptedException {
+        LogicalDag testLogicalDag =
+                TestUtils.createTestLogicalPlan(
+                        "stream_fakesource_to_file.conf", 
"test_clear_coordinator_service", jobId);
+
+        JobImmutableInformation jobImmutableInformation =
+                new JobImmutableInformation(
+                        jobId,
+                        "Test",
+                        
nodeEngine.getSerializationService().toData(testLogicalDag),
+                        testLogicalDag.getJobConfig(),
+                        Collections.emptyList(),
+                        Collections.emptyList());
+
+        Data data = 
nodeEngine.getSerializationService().toData(jobImmutableInformation);
+
+        PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
+                server.getCoordinatorService().submitJob(jobId, data);
+        voidPassiveCompletableFuture.join();
+
+        JobMaster jobMaster = 
server.getCoordinatorService().getJobMaster(jobId);
+
+        // waiting for job status turn to running
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> Assertions.assertEquals(JobStatus.RUNNING, 
jobMaster.getJobStatus()));
+
+        // Because handleCheckpointTimeout is an async method, so we need 
sleep 5s to waiting job
+        // status become running again
+        Thread.sleep(5000);
+        return jobMaster;
+    }
 }

Reply via email to