This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 73b3f5eba42 [fix](streamingjob) Persist cdc_stream TVF offset across 
FE checkpoint (#62902)
73b3f5eba42 is described below

commit 73b3f5eba421ea969b2899d253e6678dbd3cf3ea
Author: wudi <[email protected]>
AuthorDate: Fri May 15 09:51:39 2026 +0800

    [fix](streamingjob) Persist cdc_stream TVF offset across FE checkpoint 
(#62902)
    
    ### What problem does this PR solve?
    
    Related PR: #62449
    
    Problem Summary:
    
    PR #62449 fixed streaming job offset state-loss after FE checkpoint
    restart for the S3 path, but the cdc_stream TVF path has the same root
    cause and worse impact: after a checkpoint restart in the binlog phase,
    the job replays from the very beginning of the binlog (because
    `currentOffset == null` falls through to a fresh `BinlogSplit` with no
    `startingOffset`).
    
    Root cause: `JdbcTvfSourceOffsetProvider.getPersistInfo()` returns
    `null`, so `offsetProviderPersist` is never written into the FE image.
    After checkpoint, the pre-checkpoint journal is GC'd, neither
    journal-replayed `currentOffset` nor image-persisted state survives, and
    recovery falls back to a fresh provider with empty `chw`/`bop`.
    
    Only the non-cloud mode is affected. Cloud mode is fine because
    `replayOnCloudMode` pulls a cumulative attachment from MS.
    
    Fix — reuse the parent's existing `chw`/`bop`/`ts` `@SerializedName`
    persistence:
    - Drop the `getPersistInfo()` override so the parent's
    `GsonUtils.GSON.toJson(this)` writes `chw/bop/ts` into the image.
    - Add a `restoreFromPersistInfo()` override to read them back on FE
    startup (called from `gsonPostProcess`).
    - In `updateOffset` binlog branch, mirror `startingOffset` into
    `binlogOffsetPersist` so it survives the image (`currentOffset` has no
    `@SerializedName`).
    - In `replayIfNeed` `currentOffset == null` branch, rebuild
    `BinlogSplit` from `bop`, or apply `chw` (using the existing `null.null`
    remap) when restoring snapshot phase.
---
 .../offset/jdbc/JdbcTvfSourceOffsetProvider.java   |  71 +++--
 ...b_cdc_stream_postgres_checkpoint_restart_fe.out |  12 +
 ...dc_stream_postgres_checkpoint_restart_fe.groovy | 298 +++++++++++++++++++++
 3 files changed, 363 insertions(+), 18 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java
index 749b592c775..b1d860c9430 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java
@@ -34,6 +34,7 @@ import org.apache.doris.nereids.trees.expressions.Properties;
 import org.apache.doris.nereids.trees.plans.Plan;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.InternalService.PRequestCdcClientResult;
 import org.apache.doris.rpc.BackendServiceProxy;
@@ -77,6 +78,9 @@ import java.util.stream.Collectors;
  *       chunkHighWatermarkMap is always updated unconditionally to support 
recovery</li>
  *   <li>replayIfNeed: checks currentOffset directly — snapshot triggers 
remainingSplits rebuild
  *       from meta + chunkHighWatermarkMap; binlog needs no action 
(currentOffset already set)</li>
+ *   <li>image persistence: chw/bop/ts also flow through getPersistInfo 
(inherited) so state
+ *       survives FE checkpoint after pre-checkpoint journal is GC'd; 
restoreFromPersistInfo
+ *       reads them back on startup</li>
  * </ul>
  */
 @Log4j2
@@ -115,10 +119,7 @@ public class JdbcTvfSourceOffsetProvider extends 
JdbcSourceOffsetProvider {
         if (this.jobId != null) {
             return;
         }
-        // One-time initialization below — safe to skip on FE restart because 
the provider
-        // is reconstructed fresh (getPersistInfo returns null), so jobId is 
null then too.
         this.jobId = jobId;
-        this.chunkHighWatermarkMap = new HashMap<>();
         this.sourceType = resolvedType;
         String table = originTvfProps.get(DataSourceConfigKeys.TABLE);
         Preconditions.checkArgument(table != null, "table is required for 
cdc_stream TVF");
@@ -273,7 +274,8 @@ public class JdbcTvfSourceOffsetProvider extends 
JdbcSourceOffsetProvider {
      * adds it to finishedSplits. During txn replay remainingSplits is empty 
so removeIf returns
      * false naturally — chunkHighWatermarkMap is still updated for 
replayIfNeed to use later.
      *
-     * <p>Binlog: currentOffset is set above; no extra state needed for TVF 
recovery path.
+     * <p>Binlog: currentOffset is set above. Also mirror startingOffset into 
binlogOffsetPersist
+     * so it survives FE checkpoint via image (currentOffset has no 
@SerializedName).
      */
     @Override
     public void updateOffset(Offset offset) {
@@ -297,13 +299,19 @@ public class JdbcTvfSourceOffsetProvider extends 
JdbcSourceOffsetProvider {
                 chunkHighWatermarkMap.computeIfAbsent(buildTableKey(), k -> 
new HashMap<>())
                         .put(ss.getSplitId(), ss.getHighWatermark());
             }
+        } else {
+            // Mirror binlog offset into bop so it survives FE checkpoint
+            BinlogSplit bs = (BinlogSplit) currentOffset.getSplits().get(0);
+            if (MapUtils.isNotEmpty(bs.getStartingOffset())) {
+                binlogOffsetPersist = new HashMap<>(bs.getStartingOffset());
+                binlogOffsetPersist.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
+            }
         }
-        // Binlog: currentOffset is already set; no binlogOffsetPersist needed 
for TVF path.
     }
 
     /**
-     * TVF path recovery: offsetProviderPersist is always null (no EditLog 
write).
-     * currentOffset is set by replayOnCommitted/replayOnCloudMode -> 
updateOffset before this runs.
+     * TVF path recovery. After FE checkpoint the pre-checkpoint journal is 
GC'd, so currentOffset
+     * (no @SerializedName) may be null with prior commits — fall back to 
bop/chw restored from image.
      *
      * <ul>
      *   <li>snapshot: mid-snapshot restart — rebuild remainingSplits from 
meta + chunkHighWatermarkMap</li>
@@ -313,14 +321,28 @@ public class JdbcTvfSourceOffsetProvider extends 
JdbcSourceOffsetProvider {
     @Override
     public void replayIfNeed(StreamingInsertJob job) throws JobException {
         if (currentOffset == null) {
-            // No committed txn yet. If snapshot splits exist in the meta 
table (written by
-            // initOnCreate), restore remainingSplits so getNextOffset() 
returns snapshot splits
-            // instead of a BinlogSplit (which would incorrectly skip the 
snapshot phase).
+            // Post-checkpoint binlog: rebuild from bop persisted in image
+            if (MapUtils.isNotEmpty(binlogOffsetPersist)) {
+                currentOffset = new JdbcOffset(
+                        Collections.singletonList(new 
BinlogSplit(binlogOffsetPersist)));
+                log.info("Replaying TVF offset provider for job {}: restored 
binlog offset from persist",
+                        job.getJobId());
+                return;
+            }
+            // Fresh-create or post-checkpoint mid-snapshot: restore 
remainingSplits from meta
+            // so getNextOffset() returns snapshot splits instead of an empty 
BinlogSplit.
             Map<String, List<SnapshotSplit>> snapshotSplits = 
StreamingJobUtils.restoreSplitsToJob(job.getJobId());
             if (MapUtils.isNotEmpty(snapshotSplits)) {
-                recalculateRemainingSplits(new HashMap<>(), snapshotSplits);
-                log.info("Replaying TVF offset provider for job {}: no 
committed txn,"
-                        + " restored {} remaining splits from meta", 
job.getJobId(), remainingSplits.size());
+                // chw outer key may be "null.null" during journal replay 
(sourceProperties uninitialized); remap
+                Map<String, Map<String, Map<String, String>>> effective =
+                        MapUtils.isNotEmpty(chunkHighWatermarkMap)
+                                ? remapChunkHighWatermarkMap(snapshotSplits)
+                                : new HashMap<>();
+                recalculateRemainingSplits(effective, snapshotSplits);
+                log.info("Replaying TVF offset provider for job {}: no current 
offset,"
+                        + " restored {} remaining splits from meta (chw 
size={})",
+                        job.getJobId(), remainingSplits.size(),
+                        chunkHighWatermarkMap == null ? 0 : 
chunkHighWatermarkMap.size());
             } else {
                 log.info("Replaying TVF offset provider for job {}: no 
committed txn,"
                         + " no snapshot splits in meta", job.getJobId());
@@ -406,13 +428,26 @@ public class JdbcTvfSourceOffsetProvider extends 
JdbcSourceOffsetProvider {
     }
 
     /**
-     * TVF path does not persist to EditLog; state is recovered via txn replay.
-     * This override is defensive — the persistOffsetProviderIfNeed() call path
-     * only runs in the non-TVF commitOffset flow and won't reach here.
+     * Restore chw/bop/ts from the image-persisted JSON. Called by 
gsonPostProcess on FE startup
+     * before any journal replay; recovers state lost when pre-checkpoint 
journal is GC'd.
      */
     @Override
-    public String getPersistInfo() {
-        return null;
+    public void restoreFromPersistInfo(String persistInfo) {
+        if (persistInfo == null) {
+            return;
+        }
+        try {
+            JdbcSourceOffsetProvider tmp = 
GsonUtils.GSON.fromJson(persistInfo, JdbcSourceOffsetProvider.class);
+            this.chunkHighWatermarkMap = tmp.getChunkHighWatermarkMap();
+            this.binlogOffsetPersist = tmp.getBinlogOffsetPersist();
+            this.tableSchemas = tmp.getTableSchemas();
+            log.info("Restored TVF offset provider from persist: chw={}, 
bop={}, ts.len={}",
+                    chunkHighWatermarkMap == null ? 0 : 
chunkHighWatermarkMap.size(),
+                    binlogOffsetPersist == null ? 0 : 
binlogOffsetPersist.size(),
+                    tableSchemas == null ? 0 : tableSchemas.length());
+        } catch (Exception e) {
+            log.warn("Failed to restore TVF offset provider from persistInfo", 
e);
+        }
     }
 
     @Override
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_checkpoint_restart_fe.out
 
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_checkpoint_restart_fe.out
new file mode 100644
index 00000000000..c351ac3cba5
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_checkpoint_restart_fe.out
@@ -0,0 +1,12 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !final_data --
+A1     1
+B1     2
+C1     3
+D1     4
+E1     5
+F1     6
+G1     7
+H1     8
+I1     9
+
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_checkpoint_restart_fe.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_checkpoint_restart_fe.groovy
new file mode 100644
index 00000000000..4e493b3fb8b
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_checkpoint_restart_fe.groovy
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Test FE checkpoint-restart recovery of a cdc_stream TVF streaming job for 
PostgreSQL.
+ *
+ * Counterpart of test_streaming_job_cdc_stream_postgres_restart_fe but with 
the pre-checkpoint
+ * journal GC'd before each restart, so recovery cannot rely on EditLog txn 
replay and must
+ * fall back to image-persisted bop/chw written via getPersistInfo (inherited 
from parent) and
+ * read back by JdbcTvfSourceOffsetProvider.restoreFromPersistInfo.
+ *
+ * edit_log_roll_num=1 finalizes journals on every write so the next 
checkpoint cycle picks
+ * them up; sleeping >= 90s after each commit ensures a checkpoint runs 
(interval defaults
+ * to 60s) and the pre-checkpoint journal is then eligible for GC.
+ *
+ * Two checkpoint-restart scenarios are covered in sequence:
+ *
+ * Restart 1 — mid-snapshot:
+ *   snapshot_split_size=1 splits 5 pre-existing rows (A1-E1) into 5 separate 
tasks.
+ *   After the first task succeeds and a checkpoint runs, FE is restarted. 
This exercises
+ *   replayIfNeed() with currentOffset == null but chunkHighWatermarkMap 
restored from image:
+ *   remainingSplits is rebuilt from the meta table with chw remap so 
already-finished splits
+ *   are not re-processed.
+ *
+ * Restart 2 — binlog phase:
+ *   After the full snapshot completes and F1/G1 are consumed via binlog, a 
checkpoint runs
+ *   and FE is restarted. This exercises the binlog recovery path where 
currentOffset == null
+ *   but binlogOffsetPersist is restored from image: a BinlogSplit is rebuilt 
from bop so
+ *   the job resumes from the last committed binlog position rather than the 
initial one.
+ *   H1/I1 are then inserted to verify the job continues reading binlog 
correctly.
+ */
+suite("test_streaming_job_cdc_stream_postgres_checkpoint_restart_fe",
+        
"docker,p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_job_cdc_stream_pg_ckpt_restart_fe"
+    def options = new ClusterOptions()
+    options.setFeNum(1)
+    options.cloudMode = null
+    // Roll the journal on every write so the checkpoint thread has finalized 
journals to
+    // include; without this, a small steady-state EditLog stays in the active 
segment and
+    // never reaches the checkpoint image, defeating the test.
+    options.feConfigs += [
+        'edit_log_roll_num=1'
+    ]
+
+    docker(options) {
+        def currentDb = (sql "select database()")[0][0]
+        def dorisTable = "test_streaming_job_cdc_stream_pg_ckpt_restart_fe_tbl"
+        def pgDB = "postgres"
+        def pgSchema = "cdc_test"
+        def pgUser = "postgres"
+        def pgPassword = "123456"
+        def pgTable = "test_streaming_job_cdc_stream_pg_ckpt_restart_fe_src"
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+        sql """drop table if exists ${currentDb}.${dorisTable} force"""
+
+        sql """
+            CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} (
+                `name` varchar(200) NULL,
+                `age`  int NULL
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`name`)
+            DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1")
+        """
+
+        String enabled = context.config.otherConfigs.get("enableJdbcTest")
+        if (enabled != null && enabled.equalsIgnoreCase("true")) {
+            String pg_port = context.config.otherConfigs.get("pg_14_port")
+            String externalEnvIp = 
context.config.otherConfigs.get("externalEnvIp")
+            String s3_endpoint = getS3Endpoint()
+            String bucket = getS3BucketName()
+            String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+            // ── Phase 1: prepare source table with pre-existing snapshot 
rows ───────────
+            connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+                sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgTable}"""
+                sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgTable} (
+                          "name" varchar(200) PRIMARY KEY,
+                          "age"  int2
+                      )"""
+                sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('A1', 1)"""
+                sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('B1', 2)"""
+                sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('C1', 3)"""
+                sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('D1', 4)"""
+                sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('E1', 5)"""
+            }
+
+            // ── Phase 2: create streaming job (offset=initial, split_size=1 
→ 5 tasks) ─
+            sql """
+                CREATE JOB ${jobName}
+                ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (name, 
age)
+                SELECT name, age FROM cdc_stream(
+                    "type"                = "postgres",
+                    "jdbc_url"            = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url"          = "${driver_url}",
+                    "driver_class"        = "org.postgresql.Driver",
+                    "user"                = "${pgUser}",
+                    "password"            = "${pgPassword}",
+                    "database"            = "${pgDB}",
+                    "schema"              = "${pgSchema}",
+                    "table"               = "${pgTable}",
+                    "offset"              = "initial",
+                    "snapshot_split_size" = "1"
+                )
+            """
+
+            // ── Phase 3: wait for the first snapshot task to succeed, then 
trigger checkpoint ──
+            try {
+                Awaitility.await().atMost(300, SECONDS).pollInterval(2, 
SECONDS).until({
+                    def cnt = sql """select SucceedTaskCount from 
jobs("type"="insert")
+                                     where Name='${jobName}' and 
ExecuteType='STREAMING'"""
+                    log.info("SucceedTaskCount before first ckpt restart: " + 
cnt)
+                    cnt.size() == 1 && (cnt.get(0).get(0) as int) >= 1
+                })
+            } catch (Exception ex) {
+                log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+                log.info("tasks: " + (sql """select * from 
tasks("type"="insert") where JobName='${jobName}'"""))
+                throw ex
+            }
+
+            def jobInfoBeforeRestart = sql """
+                select status, currentOffset, loadStatistic
+                from jobs("type"="insert") where Name='${jobName}'
+            """
+            log.info("job info before first ckpt restart: " + 
jobInfoBeforeRestart)
+            assert jobInfoBeforeRestart.get(0).get(0) == "RUNNING" :
+                    "Job should be RUNNING before first restart, got: 
${jobInfoBeforeRestart.get(0).get(0)}"
+            def scannedRowsBeforeFirstRestart = 
parseJson(jobInfoBeforeRestart.get(0).get(2) as String).scannedRows as long
+            log.info("scannedRows before first ckpt restart: " + 
scannedRowsBeforeFirstRestart)
+
+            // Wait >= 90s so the checkpoint thread (60s interval) runs at 
least once after our
+            // last commit, then GC's the pre-checkpoint journal. Subsequent 
FE restart can no
+            // longer recover via journal txn replay and must rely on 
image-persisted chw.
+            log.info("Waiting 90s for checkpoint to run before first FE 
restart...")
+            sleep(90000)
+
+            // ── Phase 4: restart FE (mid-snapshot, post-checkpoint) 
──────────────────
+            cluster.restartFrontends()
+            sleep(60000)
+            context.reconnectFe()
+
+            // ── Phase 5: verify job recovers and finishedSplits are not 
re-processed ───
+            try {
+                Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                    def status = sql """select status from 
jobs("type"="insert") where Name='${jobName}'"""
+                    log.info("job status after first ckpt restart: " + status)
+                    status.size() == 1 && (status.get(0).get(0) == "RUNNING" 
|| status.get(0).get(0) == "PENDING")
+                })
+            } catch (Exception ex) {
+                log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+                throw ex
+            }
+
+            def jobInfoAfterRestart1 = sql """select currentOffset, 
loadStatistic from jobs("type"="insert") where Name='${jobName}'"""
+            log.info("job info after first ckpt restart: " + 
jobInfoAfterRestart1)
+            def scannedRowsAfterFirstRestart = 
parseJson(jobInfoAfterRestart1.get(0).get(1) as String).scannedRows as long
+            assert scannedRowsAfterFirstRestart >= 
scannedRowsBeforeFirstRestart :
+                    "scannedRows should not reset after first ckpt restart: 
before=${scannedRowsBeforeFirstRestart}, after=${scannedRowsAfterFirstRestart}"
+
+            // ── Phase 6: wait for full snapshot to complete and assert exact 
row count ─
+            try {
+                Awaitility.await().atMost(300, SECONDS).pollInterval(2, 
SECONDS).until({
+                    def rows = sql """SELECT count(1) FROM 
${currentDb}.${dorisTable}
+                                      WHERE name IN ('A1', 'B1', 'C1', 'D1', 
'E1')"""
+                    log.info("snapshot rows after first ckpt restart: " + rows)
+                    (rows.get(0).get(0) as int) >= 5
+                })
+            } catch (Exception ex) {
+                log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+                log.info("tasks: " + (sql """select * from 
tasks("type"="insert") where JobName='${jobName}'"""))
+                throw ex
+            }
+
+            // DUPLICATE KEY table: re-processed snapshot splits would inflate 
the count above 5.
+            def snapshotCount = sql """SELECT count(1) FROM 
${currentDb}.${dorisTable}
+                                       WHERE name IN ('A1', 'B1', 'C1', 'D1', 
'E1')"""
+            assert (snapshotCount.get(0).get(0) as int) == 5 :
+                    "Snapshot rows should be exactly 5 after mid-snapshot ckpt 
restart (no re-processing), got: ${snapshotCount.get(0).get(0)}"
+
+            // ── Phase 7: insert F1/G1 and wait for binlog to consume them 
───────────────
+            connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+                sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('F1', 6)"""
+                sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('G1', 7)"""
+            }
+
+            try {
+                Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                    def rows = sql """SELECT count(1) FROM 
${currentDb}.${dorisTable} WHERE name IN ('F1', 'G1')"""
+                    log.info("binlog rows before second ckpt restart: " + rows)
+                    (rows.get(0).get(0) as int) == 2
+                })
+            } catch (Exception ex) {
+                log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+                log.info("tasks: " + (sql """select * from 
tasks("type"="insert") where JobName='${jobName}'"""))
+                throw ex
+            }
+
+            // ── Phase 8: restart FE in binlog phase, post-checkpoint 
──────────────────
+            def jobInfoBeforeSecondRestart = sql """
+                select status, currentOffset, loadStatistic
+                from jobs("type"="insert") where Name='${jobName}'
+            """
+            log.info("job info before second ckpt restart (binlog phase): " + 
jobInfoBeforeSecondRestart)
+            def offsetBeforeSecondRestart = 
jobInfoBeforeSecondRestart.get(0).get(1) as String
+            assert offsetBeforeSecondRestart != null && 
offsetBeforeSecondRestart.contains("binlog-split") :
+                    "currentOffset should be in binlog state before second 
restart, got: ${offsetBeforeSecondRestart}"
+            def scannedRowsBefore = 
parseJson(jobInfoBeforeSecondRestart.get(0).get(2) as String).scannedRows as 
long
+            log.info("scannedRows before second ckpt restart: " + 
scannedRowsBefore)
+
+            log.info("Waiting 90s for checkpoint to run before second FE 
restart...")
+            sleep(90000)
+
+            cluster.restartFrontends()
+            sleep(60000)
+            context.reconnectFe()
+
+            // ── Phase 9: verify job recovers and binlog offset did not 
regress ─────────
+            try {
+                Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                    def status = sql """select status from 
jobs("type"="insert") where Name='${jobName}'"""
+                    log.info("job status after second ckpt restart: " + status)
+                    status.size() == 1 && (status.get(0).get(0) == "RUNNING" 
|| status.get(0).get(0) == "PENDING")
+                })
+            } catch (Exception ex) {
+                log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+                throw ex
+            }
+
+            def jobInfoAfterRestart2 = sql """select currentOffset, 
loadStatistic from jobs("type"="insert") where Name='${jobName}'"""
+            log.info("job info after second ckpt restart: " + 
jobInfoAfterRestart2)
+
+            // Binlog offset advances with heartbeats; assert correctness via 
row counts.
+            // If binlog offset regressed (bop not restored from image), F1/G1 
would be
+            // re-inserted and count > 2.
+            def binlogCountAfterRestart = sql """SELECT count(1) FROM 
${currentDb}.${dorisTable}
+                                                 WHERE name IN ('F1', 'G1')"""
+            assert (binlogCountAfterRestart.get(0).get(0) as int) == 2 :
+                    "Binlog rows F1/G1 should be exactly 2 after binlog-phase 
ckpt restart (no re-processing), got: ${binlogCountAfterRestart.get(0).get(0)}"
+
+            // ── Phase 10: insert H1, I1 and verify binlog still progresses 
post-restart ─
+            connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+                sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('H1', 8)"""
+                sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('I1', 9)"""
+            }
+
+            try {
+                Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                    def rows = sql """SELECT count(1) FROM 
${currentDb}.${dorisTable} WHERE name IN ('H1', 'I1')"""
+                    log.info("binlog rows after second ckpt restart: " + rows)
+                    (rows.get(0).get(0) as int) == 2
+                })
+            } catch (Exception ex) {
+                log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+                log.info("tasks: " + (sql """select * from 
tasks("type"="insert") where JobName='${jobName}'"""))
+                throw ex
+            }
+
+            qt_final_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER 
BY name """
+
+            // ── Phase 11: final assertions 
────────────────────────────────────────────
+            def jobInfoFinal = sql """
+                select status, FailedTaskCount, ErrorMsg, currentOffset, 
loadStatistic
+                from jobs("type"="insert") where Name='${jobName}'
+            """
+            log.info("job info final: " + jobInfoFinal)
+            assert jobInfoFinal.get(0).get(0) == "RUNNING" : "Job should be 
RUNNING at end"
+            assert (jobInfoFinal.get(0).get(1) as int) == 0 : "FailedTaskCount 
should be 0"
+
+            def scannedRowsAfter = parseJson(jobInfoFinal.get(0).get(4) as 
String).scannedRows as long
+            log.info("scannedRows after second ckpt restart: " + 
scannedRowsAfter)
+            assert scannedRowsAfter >= scannedRowsBefore :
+                    "scannedRows should not reset after FE ckpt restart: 
before=${scannedRowsBefore}, after=${scannedRowsAfter}"
+
+            sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+            sql """drop table if exists ${currentDb}.${dorisTable} force"""
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to