This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new d863bb4f7 [flink] Add restore test for streaming union read log table 
(#1663)
d863bb4f7 is described below

commit d863bb4f765fe63c395250d2449b1517533fbd29
Author: CaoZhen <[email protected]>
AuthorDate: Wed Sep 10 21:12:22 2025 +0800

    [flink] Add restore test for streaming union read log table (#1663)
---
 .../fluss/flink/lake/LakeSplitReaderGenerator.java |  2 +-
 .../fluss/flink/lake/split/LakeSnapshotSplit.java  | 14 ++---
 .../flink/lake/state/LakeSnapshotSplitState.java   |  8 +--
 .../paimon/flink/FlinkUnionReadLogTableITCase.java | 65 ++++++++++++++++++++++
 .../lake/paimon/flink/FlinkUnionReadTestBase.java  | 15 ++++-
 .../testutils/FlinkPaimonTieringTestBase.java      | 16 ++++--
 6 files changed, 103 insertions(+), 17 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
index 70734a490..aa3eef17f 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
@@ -68,7 +68,7 @@ public class LakeSplitReaderGenerator {
             LakeSnapshotScanner lakeSnapshotScanner =
                     new LakeSnapshotScanner(lakeSource, lakeSnapshotSplit);
             return new BoundedSplitReader(
-                    lakeSnapshotScanner, 
lakeSnapshotSplit.getRecordsToSplit());
+                    lakeSnapshotScanner, lakeSnapshotSplit.getRecordsToSkip());
         } else if (split instanceof LakeSnapshotAndFlussLogSplit) {
             LakeSnapshotAndFlussLogSplit lakeSplit = 
(LakeSnapshotAndFlussLogSplit) split;
             return new BoundedSplitReader(getBatchScanner(lakeSplit), 
lakeSplit.getRecordsToSkip());
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotSplit.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotSplit.java
index b85d990fe..5754a2f5a 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotSplit.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotSplit.java
@@ -30,7 +30,7 @@ public class LakeSnapshotSplit extends SourceSplitBase {
 
     private final LakeSplit lakeSplit;
 
-    private final long recordsToSplit;
+    private final long recordsToSkip;
 
     private final int splitIndex;
 
@@ -47,19 +47,19 @@ public class LakeSnapshotSplit extends SourceSplitBase {
             @Nullable String partitionName,
             LakeSplit lakeSplit,
             int splitIndex,
-            long recordsToSplit) {
+            long recordsToSkip) {
         super(tableBucket, partitionName);
         this.lakeSplit = lakeSplit;
         this.splitIndex = splitIndex;
-        this.recordsToSplit = recordsToSplit;
+        this.recordsToSkip = recordsToSkip;
     }
 
     public LakeSplit getLakeSplit() {
         return lakeSplit;
     }
 
-    public long getRecordsToSplit() {
-        return recordsToSplit;
+    public long getRecordsToSkip() {
+        return recordsToSkip;
     }
 
     public int getSplitIndex() {
@@ -93,8 +93,8 @@ public class LakeSnapshotSplit extends SourceSplitBase {
         return "LakeSnapshotSplit{"
                 + "lakeSplit="
                 + lakeSplit
-                + ", recordsToSplit="
-                + recordsToSplit
+                + ", recordsToSkip="
+                + recordsToSkip
                 + ", splitIndex="
                 + splitIndex
                 + ", tableBucket="
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotSplitState.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotSplitState.java
index 399601b99..f56f6f834 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotSplitState.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotSplitState.java
@@ -25,16 +25,16 @@ import org.apache.fluss.flink.source.split.SourceSplitState;
 public class LakeSnapshotSplitState extends SourceSplitState {
 
     private final LakeSnapshotSplit split;
-    private long recordsToSplit;
+    private long recordsToSkip;
 
     public LakeSnapshotSplitState(LakeSnapshotSplit split) {
         super(split);
         this.split = split;
-        this.recordsToSplit = split.getRecordsToSplit();
+        this.recordsToSkip = split.getRecordsToSkip();
     }
 
     public void setRecordsToSkip(long recordsToSkip) {
-        this.recordsToSplit = recordsToSkip;
+        this.recordsToSkip = recordsToSkip;
     }
 
     @Override
@@ -44,6 +44,6 @@ public class LakeSnapshotSplitState extends SourceSplitState {
                 split.getPartitionName(),
                 split.getLakeSplit(),
                 split.getSplitIndex(),
-                recordsToSplit);
+                recordsToSkip);
     }
 }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
index 0b530a722..80c6832b0 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
@@ -24,15 +24,20 @@ import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
 
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.CollectionUtil;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import javax.annotation.Nullable;
 
+import java.io.File;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
@@ -42,6 +47,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder;
 import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
 import static org.apache.fluss.testutils.DataTestUtils.row;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -49,6 +55,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** The IT case for Flink union data in lake and fluss for log table. */
 class FlinkUnionReadLogTableITCase extends FlinkUnionReadTestBase {
 
+    @TempDir public static File savepointDir;
+
     @BeforeAll
     protected static void beforeAll() {
         FlinkUnionReadTestBase.beforeAll();
@@ -169,6 +177,63 @@ class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
                 actual, 
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testUnionReadLogTableFailover(boolean isPartitioned) throws Exception 
{
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName1 =
+                "restore_logTable_" + (isPartitioned ? "partitioned" : 
"non_partitioned");
+        String resultTableName =
+                "result_table" + (isPartitioned ? "partitioned" : 
"non_partitioned");
+
+        TablePath table1 = TablePath.of(DEFAULT_DB, tableName1);
+        TablePath resultTable = TablePath.of(DEFAULT_DB, resultTableName);
+        List<Row> writtenRows = new LinkedList<>();
+        long tableId = prepareLogTable(table1, DEFAULT_BUCKET_NUM, 
isPartitioned, writtenRows);
+        // wait until records has been synced
+        waitUntilBucketSynced(table1, tableId, DEFAULT_BUCKET_NUM, 
isPartitioned);
+
+        StreamTableEnvironment streamTEnv = buildSteamTEnv(null);
+        // now, start to read the log table to write to a fluss result table
+        // may read fluss or not, depends on the log offset of paimon snapshot
+        createFullTypeLogTable(resultTable, DEFAULT_BUCKET_NUM, isPartitioned, 
false);
+        TableResult insertResult =
+                streamTEnv.executeSql(
+                        "insert into " + resultTableName + " select * from " + 
tableName1);
+
+        CloseableIterator<Row> actual =
+                streamTEnv.executeSql("select * from " + 
resultTableName).collect();
+        assertResultsExactOrder(actual, writtenRows, false);
+
+        // now, stop the job with save point
+        String savepointPath =
+                insertResult
+                        .getJobClient()
+                        .get()
+                        .stopWithSavepoint(
+                                false,
+                                savepointDir.getAbsolutePath(),
+                                SavepointFormatType.CANONICAL)
+                        .get();
+
+        // re buildSteamTEnv
+        streamTEnv = buildSteamTEnv(savepointPath);
+        insertResult =
+                streamTEnv.executeSql(
+                        "insert into " + resultTableName + " select * from " + 
tableName1);
+
+        // write some log data again
+        List<Row> rows = writeRows(table1, 3, isPartitioned);
+
+        assertResultsExactOrder(actual, rows, true);
+
+        // cancel jobs
+        insertResult.getJobClient().get().cancel().get();
+        jobClient.cancel().get();
+    }
+
     private long prepareLogTable(
             TablePath tablePath, int bucketNum, boolean isPartitioned, 
List<Row> flinkRows)
             throws Exception {
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
index 277593720..0caa3fb36 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
@@ -21,12 +21,15 @@ import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase;
 import org.apache.fluss.server.testutils.FlussClusterExtension;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import javax.annotation.Nullable;
+
 import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
 
 /** Base class for Flink union read test. */
@@ -60,7 +63,12 @@ public class FlinkUnionReadTestBase extends 
FlinkPaimonTieringTestBase {
         return FLUSS_CLUSTER_EXTENSION;
     }
 
-    private void buildStreamTEnv() {
+    protected StreamTableEnvironment buildSteamTEnv(@Nullable String 
savepointPath) {
+        Configuration conf = new Configuration();
+        if (savepointPath != null) {
+            conf.setString("execution.savepoint.path", savepointPath);
+            execEnv.configure(conf);
+        }
         String bootstrapServers = String.join(",", 
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
         // create table environment
         streamTEnv = StreamTableEnvironment.create(execEnv, 
EnvironmentSettings.inStreamingMode());
@@ -71,6 +79,11 @@ public class FlinkUnionReadTestBase extends 
FlinkPaimonTieringTestBase {
                         CATALOG_NAME, BOOTSTRAP_SERVERS.key(), 
bootstrapServers));
         streamTEnv.executeSql("use catalog " + CATALOG_NAME);
         streamTEnv.executeSql("use " + DEFAULT_DB);
+        return streamTEnv;
+    }
+
+    private void buildStreamTEnv() {
+        buildSteamTEnv(null);
     }
 
     public void buildBatchTEnv() {
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index 5c2ab1f1a..91cb4e0f9 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -282,6 +282,12 @@ public abstract class FlinkPaimonTieringTestBase {
 
     protected long createFullTypeLogTable(TablePath tablePath, int bucketNum, 
boolean isPartitioned)
             throws Exception {
+        return createFullTypeLogTable(tablePath, bucketNum, isPartitioned, 
true);
+    }
+
+    protected long createFullTypeLogTable(
+            TablePath tablePath, int bucketNum, boolean isPartitioned, boolean 
lakeEnabled)
+            throws Exception {
         Schema.Builder schemaBuilder =
                 Schema.newBuilder()
                         .column("f_boolean", DataTypes.BOOLEAN())
@@ -301,10 +307,12 @@ public abstract class FlinkPaimonTieringTestBase {
                         .column("f_binary", DataTypes.BINARY(4));
 
         TableDescriptor.Builder tableBuilder =
-                TableDescriptor.builder()
-                        .distributedBy(bucketNum, "f_int")
-                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
-                        .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
+                TableDescriptor.builder().distributedBy(bucketNum, "f_int");
+        if (lakeEnabled) {
+            tableBuilder
+                    .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
+                    .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
+        }
 
         if (isPartitioned) {
             schemaBuilder.column("p", DataTypes.STRING());

Reply via email to