This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 88d1bfdfa8 [Fix][Zeta] Fix start with savepoint with no checkpoint 
file error (#6215)
88d1bfdfa8 is described below

commit 88d1bfdfa8df8f81a0106a31e78d0791d0415b5d
Author: Jia Fan <[email protected]>
AuthorDate: Tue Jan 16 12:01:21 2024 +0800

    [Fix][Zeta] Fix start with savepoint with no checkpoint file error (#6215)
---
 .../server/checkpoint/CheckpointManager.java       | 14 +++--
 .../engine/server/checkpoint/SavePointTest.java    | 72 ++++++++++++----------
 2 files changed, 49 insertions(+), 37 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index d915cb5a79..7f2b25b955 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -121,12 +121,14 @@ public class CheckpointManager {
                                                                     
String.valueOf(jobId),
                                                                     
String.valueOf(
                                                                             
plan.getPipelineId()));
-                                            long checkpointId = 
pipelineState.getCheckpointId();
-                                            idCounter.setCount(checkpointId + 
1);
-                                            log.info(
-                                                    "pipeline({}) start with 
savePoint on checkPointId({})",
-                                                    plan.getPipelineId(),
-                                                    checkpointId);
+                                            if (pipelineState != null) {
+                                                long checkpointId = 
pipelineState.getCheckpointId();
+                                                
idCounter.setCount(checkpointId + 1);
+                                                log.info(
+                                                        "pipeline({}) start 
with savePoint on checkPointId({})",
+                                                        plan.getPipelineId(),
+                                                        checkpointId);
+                                            }
                                         }
                                         return new CheckpointCoordinator(
                                                 this,
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
index 23b22b183c..4f2d3271ce 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
@@ -41,9 +41,10 @@ import java.util.concurrent.TimeUnit;
 import static org.awaitility.Awaitility.await;
 
 @DisabledOnOs(OS.WINDOWS)
-public class SavePointTest extends AbstractSeaTunnelServerTest {
+public class SavePointTest extends AbstractSeaTunnelServerTest<SavePointTest> {
     public static String OUT_PATH = "/tmp/hive/warehouse/test3";
-    public static String CONF_PATH = 
"stream_fakesource_to_file_savepoint.conf";
+    public static String STREAM_CONF_PATH = 
"stream_fakesource_to_file_savepoint.conf";
+    public static String BATCH_CONF_PATH = "batch_fakesource_to_file.conf";
     public static long JOB_ID = 823342L;
 
     @Test
@@ -57,12 +58,24 @@ public class SavePointTest extends 
AbstractSeaTunnelServerTest {
                 Assertions.assertThrows(
                         CompletionException.class,
                         () -> 
server.getCoordinatorService().savePoint(1L).join());
-        Assertions.assertTrue(exception.getCause() instanceof 
SavePointFailedException);
+        Assertions.assertInstanceOf(SavePointFailedException.class, 
exception.getCause());
         Assertions.assertEquals(
                 "The job with id '1' not running, save point failed",
                 exception.getCause().getMessage());
     }
 
+    @Test
+    public void testRestoreWithNoSavepointFile() {
+        long jobId = System.currentTimeMillis();
+        startJob(jobId, BATCH_CONF_PATH, true);
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        
server.getCoordinatorService().getJobStatus(jobId),
+                                        JobStatus.FINISHED));
+    }
+
     @Test
     @Disabled()
     public void testSavePointOnServerRestart() throws InterruptedException {
@@ -74,18 +87,18 @@ public class SavePointTest extends 
AbstractSeaTunnelServerTest {
         FileUtils.createNewDir(OUT_PATH);
 
         // 1 Start a streaming mode job
-        startJob(JOB_ID, CONF_PATH, false);
+        startJob(JOB_ID, STREAM_CONF_PATH, false);
 
         // 2 Wait for the job to running and start outputting data
         await().atMost(120000, TimeUnit.MILLISECONDS)
                 .untilAsserted(
-                        () -> {
-                            Assertions.assertTrue(
-                                    server.getCoordinatorService()
-                                                    .getJobStatus(JOB_ID)
-                                                    .equals(JobStatus.RUNNING)
-                                            && 
FileUtils.getFileLineNumberFromDir(OUT_PATH) > 10);
-                        });
+                        () ->
+                                Assertions.assertTrue(
+                                        server.getCoordinatorService()
+                                                        .getJobStatus(JOB_ID)
+                                                        
.equals(JobStatus.RUNNING)
+                                                && 
FileUtils.getFileLineNumberFromDir(OUT_PATH)
+                                                        > 10));
 
         // 3 start savePoint
         server.getCoordinatorService().savePoint(JOB_ID);
@@ -99,11 +112,10 @@ public class SavePointTest extends 
AbstractSeaTunnelServerTest {
         // 4 Wait for savePoint to complete
         await().atMost(120000, TimeUnit.MILLISECONDS)
                 .untilAsserted(
-                        () -> {
-                            Assertions.assertEquals(
-                                    
server.getCoordinatorService().getJobStatus(JOB_ID),
-                                    JobStatus.SAVEPOINT_DONE);
-                        });
+                        () ->
+                                Assertions.assertEquals(
+                                        
server.getCoordinatorService().getJobStatus(JOB_ID),
+                                        JobStatus.SAVEPOINT_DONE));
 
         Thread.sleep(1000);
 
@@ -115,15 +127,14 @@ public class SavePointTest extends 
AbstractSeaTunnelServerTest {
         Thread.sleep(1000);
 
         // 5 Resume from savePoint
-        startJob(JOB_ID, CONF_PATH, true);
+        startJob(JOB_ID, STREAM_CONF_PATH, true);
 
         await().atMost(120000, TimeUnit.MILLISECONDS)
                 .untilAsserted(
-                        () -> {
-                            Assertions.assertEquals(
-                                    
server.getCoordinatorService().getJobStatus(JOB_ID),
-                                    JobStatus.RUNNING);
-                        });
+                        () ->
+                                Assertions.assertEquals(
+                                        
server.getCoordinatorService().getJobStatus(JOB_ID),
+                                        JobStatus.RUNNING));
 
         // 6 Run long enough to ensure that the data write is complete
         Thread.sleep(30000);
@@ -132,11 +143,10 @@ public class SavePointTest extends 
AbstractSeaTunnelServerTest {
 
         await().atMost(120000, TimeUnit.MILLISECONDS)
                 .untilAsserted(
-                        () -> {
-                            Assertions.assertEquals(
-                                    
server.getCoordinatorService().getJobStatus(JOB_ID),
-                                    JobStatus.CANCELED);
-                        });
+                        () ->
+                                Assertions.assertEquals(
+                                        
server.getCoordinatorService().getJobStatus(JOB_ID),
+                                        JobStatus.CANCELED));
 
         // 7 Check the final data count
         Assertions.assertEquals(100, 
FileUtils.getFileLineNumberFromDir(OUT_PATH));
@@ -144,12 +154,12 @@ public class SavePointTest extends 
AbstractSeaTunnelServerTest {
         Thread.sleep(1000);
     }
 
-    private void startJob(Long jobid, String path, boolean 
isStartWithSavePoint) {
-        LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path, 
jobid.toString(), jobid);
+    private void startJob(Long jobId, String path, boolean 
isStartWithSavePoint) {
+        LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path, 
jobId.toString(), jobId);
 
         JobImmutableInformation jobImmutableInformation =
                 new JobImmutableInformation(
-                        jobid,
+                        jobId,
                         "Test",
                         isStartWithSavePoint,
                         
nodeEngine.getSerializationService().toData(testLogicalDag),
@@ -160,7 +170,7 @@ public class SavePointTest extends 
AbstractSeaTunnelServerTest {
         Data data = 
nodeEngine.getSerializationService().toData(jobImmutableInformation);
 
         PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
-                server.getCoordinatorService().submitJob(jobid, data);
+                server.getCoordinatorService().submitJob(jobId, data);
         voidPassiveCompletableFuture.join();
     }
 }

Reply via email to