This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 73b3f5eba42 [fix](streamingjob) Persist cdc_stream TVF offset across
FE checkpoint (#62902)
73b3f5eba42 is described below
commit 73b3f5eba421ea969b2899d253e6678dbd3cf3ea
Author: wudi <[email protected]>
AuthorDate: Fri May 15 09:51:39 2026 +0800
[fix](streamingjob) Persist cdc_stream TVF offset across FE checkpoint
(#62902)
### What problem does this PR solve?
Related PR: #62449
Problem Summary:
PR #62449 fixed streaming job offset state-loss after FE checkpoint
restart for the S3 path, but the cdc_stream TVF path has the same root
cause and worse impact: after a checkpoint restart in the binlog phase,
the job replays from the very beginning of the binlog (because
`currentOffset == null` falls through to a fresh `BinlogSplit` with no
`startingOffset`).
Root cause: `JdbcTvfSourceOffsetProvider.getPersistInfo()` returns
`null`, so `offsetProviderPersist` is never written into the FE image.
After checkpoint, the pre-checkpoint journal is GC'd, neither
journal-replayed `currentOffset` nor image-persisted state survives, and
recovery falls back to a fresh provider with empty `chw`/`bop`.
Only the non-cloud mode is affected. Cloud mode is fine because
`replayOnCloudMode` pulls a cumulative attachment from MS.
Fix — reuse the parent's existing `chw`/`bop`/`ts` `@SerializedName`
persistence:
- Drop the `getPersistInfo()` override so the parent's
`GsonUtils.GSON.toJson(this)` writes `chw/bop/ts` into the image.
- Add a `restoreFromPersistInfo()` override to read them back on FE
startup (called from `gsonPostProcess`).
- In `updateOffset` binlog branch, mirror `startingOffset` into
`binlogOffsetPersist` so it survives the image (`currentOffset` has no
`@SerializedName`).
- In `replayIfNeed` `currentOffset == null` branch, rebuild
`BinlogSplit` from `bop`, or apply `chw` (using the existing `null.null`
remap) when restoring snapshot phase.
---
.../offset/jdbc/JdbcTvfSourceOffsetProvider.java | 71 +++--
...b_cdc_stream_postgres_checkpoint_restart_fe.out | 12 +
...dc_stream_postgres_checkpoint_restart_fe.groovy | 298 +++++++++++++++++++++
3 files changed, 363 insertions(+), 18 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java
index 749b592c775..b1d860c9430 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java
@@ -34,6 +34,7 @@ import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.plans.Plan;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PRequestCdcClientResult;
import org.apache.doris.rpc.BackendServiceProxy;
@@ -77,6 +78,9 @@ import java.util.stream.Collectors;
* chunkHighWatermarkMap is always updated unconditionally to support
recovery</li>
* <li>replayIfNeed: checks currentOffset directly — snapshot triggers
remainingSplits rebuild
* from meta + chunkHighWatermarkMap; binlog needs no action
(currentOffset already set)</li>
+ * <li>image persistence: chw/bop/ts also flow through getPersistInfo
(inherited) so state
+ * survives FE checkpoint after pre-checkpoint journal is GC'd;
restoreFromPersistInfo
+ * reads them back on startup</li>
* </ul>
*/
@Log4j2
@@ -115,10 +119,7 @@ public class JdbcTvfSourceOffsetProvider extends
JdbcSourceOffsetProvider {
if (this.jobId != null) {
return;
}
- // One-time initialization below — safe to skip on FE restart because
the provider
- // is reconstructed fresh (getPersistInfo returns null), so jobId is
null then too.
this.jobId = jobId;
- this.chunkHighWatermarkMap = new HashMap<>();
this.sourceType = resolvedType;
String table = originTvfProps.get(DataSourceConfigKeys.TABLE);
Preconditions.checkArgument(table != null, "table is required for
cdc_stream TVF");
@@ -273,7 +274,8 @@ public class JdbcTvfSourceOffsetProvider extends
JdbcSourceOffsetProvider {
* adds it to finishedSplits. During txn replay remainingSplits is empty
so removeIf returns
* false naturally — chunkHighWatermarkMap is still updated for
replayIfNeed to use later.
*
- * <p>Binlog: currentOffset is set above; no extra state needed for TVF
recovery path.
+ * <p>Binlog: currentOffset is set above. Also mirror startingOffset into
binlogOffsetPersist
+ * so it survives FE checkpoint via image (currentOffset has no
@SerializedName).
*/
@Override
public void updateOffset(Offset offset) {
@@ -297,13 +299,19 @@ public class JdbcTvfSourceOffsetProvider extends
JdbcSourceOffsetProvider {
chunkHighWatermarkMap.computeIfAbsent(buildTableKey(), k ->
new HashMap<>())
.put(ss.getSplitId(), ss.getHighWatermark());
}
+ } else {
+ // Mirror binlog offset into bop so it survives FE checkpoint
+ BinlogSplit bs = (BinlogSplit) currentOffset.getSplits().get(0);
+ if (MapUtils.isNotEmpty(bs.getStartingOffset())) {
+ binlogOffsetPersist = new HashMap<>(bs.getStartingOffset());
+ binlogOffsetPersist.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
+ }
}
- // Binlog: currentOffset is already set; no binlogOffsetPersist needed
for TVF path.
}
/**
- * TVF path recovery: offsetProviderPersist is always null (no EditLog
write).
- * currentOffset is set by replayOnCommitted/replayOnCloudMode ->
updateOffset before this runs.
+ * TVF path recovery. After FE checkpoint the pre-checkpoint journal is
GC'd, so currentOffset
+ * (no @SerializedName) may be null with prior commits — fall back to
bop/chw restored from image.
*
* <ul>
* <li>snapshot: mid-snapshot restart — rebuild remainingSplits from
meta + chunkHighWatermarkMap</li>
@@ -313,14 +321,28 @@ public class JdbcTvfSourceOffsetProvider extends
JdbcSourceOffsetProvider {
@Override
public void replayIfNeed(StreamingInsertJob job) throws JobException {
if (currentOffset == null) {
- // No committed txn yet. If snapshot splits exist in the meta
table (written by
- // initOnCreate), restore remainingSplits so getNextOffset()
returns snapshot splits
- // instead of a BinlogSplit (which would incorrectly skip the
snapshot phase).
+ // Post-checkpoint binlog: rebuild from bop persisted in image
+ if (MapUtils.isNotEmpty(binlogOffsetPersist)) {
+ currentOffset = new JdbcOffset(
+ Collections.singletonList(new
BinlogSplit(binlogOffsetPersist)));
+ log.info("Replaying TVF offset provider for job {}: restored
binlog offset from persist",
+ job.getJobId());
+ return;
+ }
+ // Fresh-create or post-checkpoint mid-snapshot: restore
remainingSplits from meta
+ // so getNextOffset() returns snapshot splits instead of an empty
BinlogSplit.
Map<String, List<SnapshotSplit>> snapshotSplits =
StreamingJobUtils.restoreSplitsToJob(job.getJobId());
if (MapUtils.isNotEmpty(snapshotSplits)) {
- recalculateRemainingSplits(new HashMap<>(), snapshotSplits);
- log.info("Replaying TVF offset provider for job {}: no
committed txn,"
- + " restored {} remaining splits from meta",
job.getJobId(), remainingSplits.size());
+ // chw outer key may be "null.null" during journal replay
(sourceProperties uninitialized); remap
+ Map<String, Map<String, Map<String, String>>> effective =
+ MapUtils.isNotEmpty(chunkHighWatermarkMap)
+ ? remapChunkHighWatermarkMap(snapshotSplits)
+ : new HashMap<>();
+ recalculateRemainingSplits(effective, snapshotSplits);
+ log.info("Replaying TVF offset provider for job {}: no current
offset,"
+ + " restored {} remaining splits from meta (chw
size={})",
+ job.getJobId(), remainingSplits.size(),
+ chunkHighWatermarkMap == null ? 0 :
chunkHighWatermarkMap.size());
} else {
log.info("Replaying TVF offset provider for job {}: no
committed txn,"
+ " no snapshot splits in meta", job.getJobId());
@@ -406,13 +428,26 @@ public class JdbcTvfSourceOffsetProvider extends
JdbcSourceOffsetProvider {
}
/**
- * TVF path does not persist to EditLog; state is recovered via txn replay.
- * This override is defensive — the persistOffsetProviderIfNeed() call path
- * only runs in the non-TVF commitOffset flow and won't reach here.
+ * Restore chw/bop/ts from the image-persisted JSON. Called by
gsonPostProcess on FE startup
+ * before any journal replay; recovers state lost when pre-checkpoint
journal is GC'd.
*/
@Override
- public String getPersistInfo() {
- return null;
+ public void restoreFromPersistInfo(String persistInfo) {
+ if (persistInfo == null) {
+ return;
+ }
+ try {
+ JdbcSourceOffsetProvider tmp =
GsonUtils.GSON.fromJson(persistInfo, JdbcSourceOffsetProvider.class);
+ this.chunkHighWatermarkMap = tmp.getChunkHighWatermarkMap();
+ this.binlogOffsetPersist = tmp.getBinlogOffsetPersist();
+ this.tableSchemas = tmp.getTableSchemas();
+ log.info("Restored TVF offset provider from persist: chw={},
bop={}, ts.len={}",
+ chunkHighWatermarkMap == null ? 0 :
chunkHighWatermarkMap.size(),
+ binlogOffsetPersist == null ? 0 :
binlogOffsetPersist.size(),
+ tableSchemas == null ? 0 : tableSchemas.length());
+ } catch (Exception e) {
+ log.warn("Failed to restore TVF offset provider from persistInfo",
e);
+ }
}
@Override
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_checkpoint_restart_fe.out
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_checkpoint_restart_fe.out
new file mode 100644
index 00000000000..c351ac3cba5
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_checkpoint_restart_fe.out
@@ -0,0 +1,12 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !final_data --
+A1 1
+B1 2
+C1 3
+D1 4
+E1 5
+F1 6
+G1 7
+H1 8
+I1 9
+
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_checkpoint_restart_fe.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_checkpoint_restart_fe.groovy
new file mode 100644
index 00000000000..4e493b3fb8b
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_checkpoint_restart_fe.groovy
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Test FE checkpoint-restart recovery of a cdc_stream TVF streaming job for
PostgreSQL.
+ *
+ * Counterpart of test_streaming_job_cdc_stream_postgres_restart_fe but with
the pre-checkpoint
+ * journal GC'd before each restart, so recovery cannot rely on EditLog txn
replay and must
+ * fall back to image-persisted bop/chw written via getPersistInfo (inherited
from parent) and
+ * read back by JdbcTvfSourceOffsetProvider.restoreFromPersistInfo.
+ *
+ * edit_log_roll_num=1 finalizes journals on every write so the next
checkpoint cycle picks
+ * them up; sleeping >= 90s after each commit ensures a checkpoint runs
(interval defaults
+ * to 60s) and the pre-checkpoint journal is then eligible for GC.
+ *
+ * Two checkpoint-restart scenarios are covered in sequence:
+ *
+ * Restart 1 — mid-snapshot:
+ * snapshot_split_size=1 splits 5 pre-existing rows (A1-E1) into 5 separate
tasks.
+ * After the first task succeeds and a checkpoint runs, FE is restarted.
This exercises
+ * replayIfNeed() with currentOffset == null but chunkHighWatermarkMap
restored from image:
+ * remainingSplits is rebuilt from the meta table with chw remap so
already-finished splits
+ * are not re-processed.
+ *
+ * Restart 2 — binlog phase:
+ * After the full snapshot completes and F1/G1 are consumed via binlog, a
checkpoint runs
+ * and FE is restarted. This exercises the binlog recovery path where
currentOffset == null
+ * but binlogOffsetPersist is restored from image: a BinlogSplit is rebuilt
from bop so
+ * the job resumes from the last committed binlog position rather than the
initial one.
+ * H1/I1 are then inserted to verify the job continues reading binlog
correctly.
+ */
+suite("test_streaming_job_cdc_stream_postgres_checkpoint_restart_fe",
+
"docker,p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_job_cdc_stream_pg_ckpt_restart_fe"
+ def options = new ClusterOptions()
+ options.setFeNum(1)
+ options.cloudMode = null
+ // Roll the journal on every write so the checkpoint thread has finalized
journals to
+ // include; without this, a small steady-state EditLog stays in the active
segment and
+ // never reaches the checkpoint image, defeating the test.
+ options.feConfigs += [
+ 'edit_log_roll_num=1'
+ ]
+
+ docker(options) {
+ def currentDb = (sql "select database()")[0][0]
+ def dorisTable = "test_streaming_job_cdc_stream_pg_ckpt_restart_fe_tbl"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+ def pgTable = "test_streaming_job_cdc_stream_pg_ckpt_restart_fe_src"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${dorisTable} force"""
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} (
+ `name` varchar(200) NULL,
+ `age` int NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`name`)
+ DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1")
+ """
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port")
+ String externalEnvIp =
context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // ── Phase 1: prepare source table with pre-existing snapshot
rows ───────────
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgTable}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgTable} (
+ "name" varchar(200) PRIMARY KEY,
+ "age" int2
+ )"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('A1', 1)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('B1', 2)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('C1', 3)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('D1', 4)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('E1', 5)"""
+ }
+
+ // ── Phase 2: create streaming job (offset=initial, split_size=1
→ 5 tasks) ─
+ sql """
+ CREATE JOB ${jobName}
+ ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (name,
age)
+ SELECT name, age FROM cdc_stream(
+ "type" = "postgres",
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "table" = "${pgTable}",
+ "offset" = "initial",
+ "snapshot_split_size" = "1"
+ )
+ """
+
+ // ── Phase 3: wait for the first snapshot task to succeed, then
trigger checkpoint ──
+ try {
+ Awaitility.await().atMost(300, SECONDS).pollInterval(2,
SECONDS).until({
+ def cnt = sql """select SucceedTaskCount from
jobs("type"="insert")
+ where Name='${jobName}' and
ExecuteType='STREAMING'"""
+ log.info("SucceedTaskCount before first ckpt restart: " +
cnt)
+ cnt.size() == 1 && (cnt.get(0).get(0) as int) >= 1
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ log.info("tasks: " + (sql """select * from
tasks("type"="insert") where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ def jobInfoBeforeRestart = sql """
+ select status, currentOffset, loadStatistic
+ from jobs("type"="insert") where Name='${jobName}'
+ """
+ log.info("job info before first ckpt restart: " +
jobInfoBeforeRestart)
+ assert jobInfoBeforeRestart.get(0).get(0) == "RUNNING" :
+ "Job should be RUNNING before first restart, got:
${jobInfoBeforeRestart.get(0).get(0)}"
+ def scannedRowsBeforeFirstRestart =
parseJson(jobInfoBeforeRestart.get(0).get(2) as String).scannedRows as long
+ log.info("scannedRows before first ckpt restart: " +
scannedRowsBeforeFirstRestart)
+
+ // Wait >= 90s so the checkpoint thread (60s interval) runs at
least once after our
+ // last commit, then GC's the pre-checkpoint journal. Subsequent
FE restart can no
+ // longer recover via journal txn replay and must rely on
image-persisted chw.
+ log.info("Waiting 90s for checkpoint to run before first FE
restart...")
+ sleep(90000)
+
+ // ── Phase 4: restart FE (mid-snapshot, post-checkpoint)
──────────────────
+ cluster.restartFrontends()
+ sleep(60000)
+ context.reconnectFe()
+
+ // ── Phase 5: verify job recovers and finishedSplits are not
re-processed ───
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def status = sql """select status from
jobs("type"="insert") where Name='${jobName}'"""
+ log.info("job status after first ckpt restart: " + status)
+ status.size() == 1 && (status.get(0).get(0) == "RUNNING"
|| status.get(0).get(0) == "PENDING")
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ throw ex
+ }
+
+ def jobInfoAfterRestart1 = sql """select currentOffset,
loadStatistic from jobs("type"="insert") where Name='${jobName}'"""
+ log.info("job info after first ckpt restart: " +
jobInfoAfterRestart1)
+ def scannedRowsAfterFirstRestart =
parseJson(jobInfoAfterRestart1.get(0).get(1) as String).scannedRows as long
+ assert scannedRowsAfterFirstRestart >=
scannedRowsBeforeFirstRestart :
+ "scannedRows should not reset after first ckpt restart:
before=${scannedRowsBeforeFirstRestart}, after=${scannedRowsAfterFirstRestart}"
+
+ // ── Phase 6: wait for full snapshot to complete and assert exact
row count ─
+ try {
+ Awaitility.await().atMost(300, SECONDS).pollInterval(2,
SECONDS).until({
+ def rows = sql """SELECT count(1) FROM
${currentDb}.${dorisTable}
+ WHERE name IN ('A1', 'B1', 'C1', 'D1',
'E1')"""
+ log.info("snapshot rows after first ckpt restart: " + rows)
+ (rows.get(0).get(0) as int) >= 5
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ log.info("tasks: " + (sql """select * from
tasks("type"="insert") where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ // DUPLICATE KEY table: re-processed snapshot splits would inflate
the count above 5.
+ def snapshotCount = sql """SELECT count(1) FROM
${currentDb}.${dorisTable}
+ WHERE name IN ('A1', 'B1', 'C1', 'D1',
'E1')"""
+ assert (snapshotCount.get(0).get(0) as int) == 5 :
+ "Snapshot rows should be exactly 5 after mid-snapshot ckpt
restart (no re-processing), got: ${snapshotCount.get(0).get(0)}"
+
+ // ── Phase 7: insert F1/G1 and wait for binlog to consume them
───────────────
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('F1', 6)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('G1', 7)"""
+ }
+
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def rows = sql """SELECT count(1) FROM
${currentDb}.${dorisTable} WHERE name IN ('F1', 'G1')"""
+ log.info("binlog rows before second ckpt restart: " + rows)
+ (rows.get(0).get(0) as int) == 2
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ log.info("tasks: " + (sql """select * from
tasks("type"="insert") where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ // ── Phase 8: restart FE in binlog phase, post-checkpoint
──────────────────
+ def jobInfoBeforeSecondRestart = sql """
+ select status, currentOffset, loadStatistic
+ from jobs("type"="insert") where Name='${jobName}'
+ """
+ log.info("job info before second ckpt restart (binlog phase): " +
jobInfoBeforeSecondRestart)
+ def offsetBeforeSecondRestart =
jobInfoBeforeSecondRestart.get(0).get(1) as String
+ assert offsetBeforeSecondRestart != null &&
offsetBeforeSecondRestart.contains("binlog-split") :
+ "currentOffset should be in binlog state before second
restart, got: ${offsetBeforeSecondRestart}"
+ def scannedRowsBefore =
parseJson(jobInfoBeforeSecondRestart.get(0).get(2) as String).scannedRows as
long
+ log.info("scannedRows before second ckpt restart: " +
scannedRowsBefore)
+
+ log.info("Waiting 90s for checkpoint to run before second FE
restart...")
+ sleep(90000)
+
+ cluster.restartFrontends()
+ sleep(60000)
+ context.reconnectFe()
+
+ // ── Phase 9: verify job recovers and binlog offset did not
regress ─────────
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def status = sql """select status from
jobs("type"="insert") where Name='${jobName}'"""
+ log.info("job status after second ckpt restart: " + status)
+ status.size() == 1 && (status.get(0).get(0) == "RUNNING"
|| status.get(0).get(0) == "PENDING")
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ throw ex
+ }
+
+ def jobInfoAfterRestart2 = sql """select currentOffset,
loadStatistic from jobs("type"="insert") where Name='${jobName}'"""
+ log.info("job info after second ckpt restart: " +
jobInfoAfterRestart2)
+
+ // Binlog offset advances with heartbeats; assert correctness via
row counts.
+ // If binlog offset regressed (bop not restored from image), F1/G1
would be
+ // re-inserted and count > 2.
+ def binlogCountAfterRestart = sql """SELECT count(1) FROM
${currentDb}.${dorisTable}
+ WHERE name IN ('F1', 'G1')"""
+ assert (binlogCountAfterRestart.get(0).get(0) as int) == 2 :
+ "Binlog rows F1/G1 should be exactly 2 after binlog-phase
ckpt restart (no re-processing), got: ${binlogCountAfterRestart.get(0).get(0)}"
+
+ // ── Phase 10: insert H1, I1 and verify binlog still progresses
post-restart ─
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('H1', 8)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('I1', 9)"""
+ }
+
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def rows = sql """SELECT count(1) FROM
${currentDb}.${dorisTable} WHERE name IN ('H1', 'I1')"""
+ log.info("binlog rows after second ckpt restart: " + rows)
+ (rows.get(0).get(0) as int) == 2
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ log.info("tasks: " + (sql """select * from
tasks("type"="insert") where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ qt_final_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER
BY name """
+
+ // ── Phase 11: final assertions
────────────────────────────────────────────
+ def jobInfoFinal = sql """
+ select status, FailedTaskCount, ErrorMsg, currentOffset,
loadStatistic
+ from jobs("type"="insert") where Name='${jobName}'
+ """
+ log.info("job info final: " + jobInfoFinal)
+ assert jobInfoFinal.get(0).get(0) == "RUNNING" : "Job should be
RUNNING at end"
+ assert (jobInfoFinal.get(0).get(1) as int) == 0 : "FailedTaskCount
should be 0"
+
+ def scannedRowsAfter = parseJson(jobInfoFinal.get(0).get(4) as
String).scannedRows as long
+ log.info("scannedRows after second ckpt restart: " +
scannedRowsAfter)
+ assert scannedRowsAfter >= scannedRowsBefore :
+ "scannedRows should not reset after FE ckpt restart:
before=${scannedRowsBefore}, after=${scannedRowsAfter}"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${dorisTable} force"""
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]