This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 88d1bfdfa8 [Fix][Zeta] Fix start with savepoint with no checkpoint
file error (#6215)
88d1bfdfa8 is described below
commit 88d1bfdfa8df8f81a0106a31e78d0791d0415b5d
Author: Jia Fan <[email protected]>
AuthorDate: Tue Jan 16 12:01:21 2024 +0800
[Fix][Zeta] Fix start with savepoint with no checkpoint file error (#6215)
---
.../server/checkpoint/CheckpointManager.java | 14 +++--
.../engine/server/checkpoint/SavePointTest.java | 72 ++++++++++++----------
2 files changed, 49 insertions(+), 37 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index d915cb5a79..7f2b25b955 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -121,12 +121,14 @@ public class CheckpointManager {
String.valueOf(jobId),
String.valueOf(
plan.getPipelineId()));
- long checkpointId =
pipelineState.getCheckpointId();
- idCounter.setCount(checkpointId +
1);
- log.info(
- "pipeline({}) start with
savePoint on checkPointId({})",
- plan.getPipelineId(),
- checkpointId);
+ if (pipelineState != null) {
+ long checkpointId =
pipelineState.getCheckpointId();
+
idCounter.setCount(checkpointId + 1);
+ log.info(
+ "pipeline({}) start
with savePoint on checkPointId({})",
+ plan.getPipelineId(),
+ checkpointId);
+ }
}
return new CheckpointCoordinator(
this,
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
index 23b22b183c..4f2d3271ce 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
@@ -41,9 +41,10 @@ import java.util.concurrent.TimeUnit;
import static org.awaitility.Awaitility.await;
@DisabledOnOs(OS.WINDOWS)
-public class SavePointTest extends AbstractSeaTunnelServerTest {
+public class SavePointTest extends AbstractSeaTunnelServerTest<SavePointTest> {
public static String OUT_PATH = "/tmp/hive/warehouse/test3";
- public static String CONF_PATH =
"stream_fakesource_to_file_savepoint.conf";
+ public static String STREAM_CONF_PATH =
"stream_fakesource_to_file_savepoint.conf";
+ public static String BATCH_CONF_PATH = "batch_fakesource_to_file.conf";
public static long JOB_ID = 823342L;
@Test
@@ -57,12 +58,24 @@ public class SavePointTest extends
AbstractSeaTunnelServerTest {
Assertions.assertThrows(
CompletionException.class,
() ->
server.getCoordinatorService().savePoint(1L).join());
- Assertions.assertTrue(exception.getCause() instanceof
SavePointFailedException);
+ Assertions.assertInstanceOf(SavePointFailedException.class,
exception.getCause());
Assertions.assertEquals(
"The job with id '1' not running, save point failed",
exception.getCause().getMessage());
}
+ @Test
+ public void testRestoreWithNoSavepointFile() {
+ long jobId = System.currentTimeMillis();
+ startJob(jobId, BATCH_CONF_PATH, true);
+ await().atMost(120000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+
server.getCoordinatorService().getJobStatus(jobId),
+ JobStatus.FINISHED));
+ }
+
@Test
@Disabled()
public void testSavePointOnServerRestart() throws InterruptedException {
@@ -74,18 +87,18 @@ public class SavePointTest extends
AbstractSeaTunnelServerTest {
FileUtils.createNewDir(OUT_PATH);
// 1 Start a streaming mode job
- startJob(JOB_ID, CONF_PATH, false);
+ startJob(JOB_ID, STREAM_CONF_PATH, false);
// 2 Wait for the job to running and start outputting data
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
- () -> {
- Assertions.assertTrue(
- server.getCoordinatorService()
- .getJobStatus(JOB_ID)
- .equals(JobStatus.RUNNING)
- &&
FileUtils.getFileLineNumberFromDir(OUT_PATH) > 10);
- });
+ () ->
+ Assertions.assertTrue(
+ server.getCoordinatorService()
+ .getJobStatus(JOB_ID)
+
.equals(JobStatus.RUNNING)
+ &&
FileUtils.getFileLineNumberFromDir(OUT_PATH)
+ > 10));
// 3 start savePoint
server.getCoordinatorService().savePoint(JOB_ID);
@@ -99,11 +112,10 @@ public class SavePointTest extends
AbstractSeaTunnelServerTest {
// 4 Wait for savePoint to complete
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
- () -> {
- Assertions.assertEquals(
-
server.getCoordinatorService().getJobStatus(JOB_ID),
- JobStatus.SAVEPOINT_DONE);
- });
+ () ->
+ Assertions.assertEquals(
+
server.getCoordinatorService().getJobStatus(JOB_ID),
+ JobStatus.SAVEPOINT_DONE));
Thread.sleep(1000);
@@ -115,15 +127,14 @@ public class SavePointTest extends
AbstractSeaTunnelServerTest {
Thread.sleep(1000);
// 5 Resume from savePoint
- startJob(JOB_ID, CONF_PATH, true);
+ startJob(JOB_ID, STREAM_CONF_PATH, true);
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
- () -> {
- Assertions.assertEquals(
-
server.getCoordinatorService().getJobStatus(JOB_ID),
- JobStatus.RUNNING);
- });
+ () ->
+ Assertions.assertEquals(
+
server.getCoordinatorService().getJobStatus(JOB_ID),
+ JobStatus.RUNNING));
// 6 Run long enough to ensure that the data write is complete
Thread.sleep(30000);
@@ -132,11 +143,10 @@ public class SavePointTest extends
AbstractSeaTunnelServerTest {
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
- () -> {
- Assertions.assertEquals(
-
server.getCoordinatorService().getJobStatus(JOB_ID),
- JobStatus.CANCELED);
- });
+ () ->
+ Assertions.assertEquals(
+
server.getCoordinatorService().getJobStatus(JOB_ID),
+ JobStatus.CANCELED));
// 7 Check the final data count
Assertions.assertEquals(100,
FileUtils.getFileLineNumberFromDir(OUT_PATH));
@@ -144,12 +154,12 @@ public class SavePointTest extends
AbstractSeaTunnelServerTest {
Thread.sleep(1000);
}
- private void startJob(Long jobid, String path, boolean
isStartWithSavePoint) {
- LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path,
jobid.toString(), jobid);
+ private void startJob(Long jobId, String path, boolean
isStartWithSavePoint) {
+ LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path,
jobId.toString(), jobId);
JobImmutableInformation jobImmutableInformation =
new JobImmutableInformation(
- jobid,
+ jobId,
"Test",
isStartWithSavePoint,
nodeEngine.getSerializationService().toData(testLogicalDag),
@@ -160,7 +170,7 @@ public class SavePointTest extends
AbstractSeaTunnelServerTest {
Data data =
nodeEngine.getSerializationService().toData(jobImmutableInformation);
PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
- server.getCoordinatorService().submitJob(jobid, data);
+ server.getCoordinatorService().submitJob(jobId, data);
voidPassiveCompletableFuture.join();
}
}