This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new d863bb4f7 [flink] Add restore test for streaming union read log table
(#1663)
d863bb4f7 is described below
commit d863bb4f765fe63c395250d2449b1517533fbd29
Author: CaoZhen <[email protected]>
AuthorDate: Wed Sep 10 21:12:22 2025 +0800
[flink] Add restore test for streaming union read log table (#1663)
---
.../fluss/flink/lake/LakeSplitReaderGenerator.java | 2 +-
.../fluss/flink/lake/split/LakeSnapshotSplit.java | 14 ++---
.../flink/lake/state/LakeSnapshotSplitState.java | 8 +--
.../paimon/flink/FlinkUnionReadLogTableITCase.java | 65 ++++++++++++++++++++++
.../lake/paimon/flink/FlinkUnionReadTestBase.java | 15 ++++-
.../testutils/FlinkPaimonTieringTestBase.java | 16 ++++--
6 files changed, 103 insertions(+), 17 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
index 70734a490..aa3eef17f 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
@@ -68,7 +68,7 @@ public class LakeSplitReaderGenerator {
LakeSnapshotScanner lakeSnapshotScanner =
new LakeSnapshotScanner(lakeSource, lakeSnapshotSplit);
return new BoundedSplitReader(
- lakeSnapshotScanner,
lakeSnapshotSplit.getRecordsToSplit());
+ lakeSnapshotScanner, lakeSnapshotSplit.getRecordsToSkip());
} else if (split instanceof LakeSnapshotAndFlussLogSplit) {
LakeSnapshotAndFlussLogSplit lakeSplit =
(LakeSnapshotAndFlussLogSplit) split;
return new BoundedSplitReader(getBatchScanner(lakeSplit),
lakeSplit.getRecordsToSkip());
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotSplit.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotSplit.java
index b85d990fe..5754a2f5a 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotSplit.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotSplit.java
@@ -30,7 +30,7 @@ public class LakeSnapshotSplit extends SourceSplitBase {
private final LakeSplit lakeSplit;
- private final long recordsToSplit;
+ private final long recordsToSkip;
private final int splitIndex;
@@ -47,19 +47,19 @@ public class LakeSnapshotSplit extends SourceSplitBase {
@Nullable String partitionName,
LakeSplit lakeSplit,
int splitIndex,
- long recordsToSplit) {
+ long recordsToSkip) {
super(tableBucket, partitionName);
this.lakeSplit = lakeSplit;
this.splitIndex = splitIndex;
- this.recordsToSplit = recordsToSplit;
+ this.recordsToSkip = recordsToSkip;
}
public LakeSplit getLakeSplit() {
return lakeSplit;
}
- public long getRecordsToSplit() {
- return recordsToSplit;
+ public long getRecordsToSkip() {
+ return recordsToSkip;
}
public int getSplitIndex() {
@@ -93,8 +93,8 @@ public class LakeSnapshotSplit extends SourceSplitBase {
return "LakeSnapshotSplit{"
+ "lakeSplit="
+ lakeSplit
- + ", recordsToSplit="
- + recordsToSplit
+ + ", recordsToSkip="
+ + recordsToSkip
+ ", splitIndex="
+ splitIndex
+ ", tableBucket="
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotSplitState.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotSplitState.java
index 399601b99..f56f6f834 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotSplitState.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotSplitState.java
@@ -25,16 +25,16 @@ import org.apache.fluss.flink.source.split.SourceSplitState;
public class LakeSnapshotSplitState extends SourceSplitState {
private final LakeSnapshotSplit split;
- private long recordsToSplit;
+ private long recordsToSkip;
public LakeSnapshotSplitState(LakeSnapshotSplit split) {
super(split);
this.split = split;
- this.recordsToSplit = split.getRecordsToSplit();
+ this.recordsToSkip = split.getRecordsToSkip();
}
public void setRecordsToSkip(long recordsToSkip) {
- this.recordsToSplit = recordsToSkip;
+ this.recordsToSkip = recordsToSkip;
}
@Override
@@ -44,6 +44,6 @@ public class LakeSnapshotSplitState extends SourceSplitState {
split.getPartitionName(),
split.getLakeSplit(),
split.getSplitIndex(),
- recordsToSplit);
+ recordsToSkip);
}
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
index 0b530a722..80c6832b0 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
@@ -24,15 +24,20 @@ import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;
import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import javax.annotation.Nullable;
+import java.io.File;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
@@ -42,6 +47,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder;
import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
import static org.apache.fluss.testutils.DataTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
@@ -49,6 +55,8 @@ import static org.assertj.core.api.Assertions.assertThat;
/** The IT case for Flink union data in lake and fluss for log table. */
class FlinkUnionReadLogTableITCase extends FlinkUnionReadTestBase {
+ @TempDir public static File savepointDir;
+
@BeforeAll
protected static void beforeAll() {
FlinkUnionReadTestBase.beforeAll();
@@ -169,6 +177,63 @@ class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
actual,
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testUnionReadLogTableFailover(boolean isPartitioned) throws Exception
{
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ String tableName1 =
+ "restore_logTable_" + (isPartitioned ? "partitioned" :
"non_partitioned");
+ String resultTableName =
+ "result_table" + (isPartitioned ? "partitioned" :
"non_partitioned");
+
+ TablePath table1 = TablePath.of(DEFAULT_DB, tableName1);
+ TablePath resultTable = TablePath.of(DEFAULT_DB, resultTableName);
+ List<Row> writtenRows = new LinkedList<>();
+ long tableId = prepareLogTable(table1, DEFAULT_BUCKET_NUM,
isPartitioned, writtenRows);
+ // wait until records has been synced
+ waitUntilBucketSynced(table1, tableId, DEFAULT_BUCKET_NUM,
isPartitioned);
+
+ StreamTableEnvironment streamTEnv = buildSteamTEnv(null);
+ // now, start to read the log table to write to a fluss result table
+ // may read fluss or not, depends on the log offset of paimon snapshot
+ createFullTypeLogTable(resultTable, DEFAULT_BUCKET_NUM, isPartitioned,
false);
+ TableResult insertResult =
+ streamTEnv.executeSql(
+ "insert into " + resultTableName + " select * from " +
tableName1);
+
+ CloseableIterator<Row> actual =
+ streamTEnv.executeSql("select * from " +
resultTableName).collect();
+ assertResultsExactOrder(actual, writtenRows, false);
+
+ // now, stop the job with save point
+ String savepointPath =
+ insertResult
+ .getJobClient()
+ .get()
+ .stopWithSavepoint(
+ false,
+ savepointDir.getAbsolutePath(),
+ SavepointFormatType.CANONICAL)
+ .get();
+
+ // re buildSteamTEnv
+ streamTEnv = buildSteamTEnv(savepointPath);
+ insertResult =
+ streamTEnv.executeSql(
+ "insert into " + resultTableName + " select * from " +
tableName1);
+
+ // write some log data again
+ List<Row> rows = writeRows(table1, 3, isPartitioned);
+
+ assertResultsExactOrder(actual, rows, true);
+
+ // cancel jobs
+ insertResult.getJobClient().get().cancel().get();
+ jobClient.cancel().get();
+ }
+
private long prepareLogTable(
TablePath tablePath, int bucketNum, boolean isPartitioned,
List<Row> flinkRows)
throws Exception {
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
index 277593720..0caa3fb36 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
@@ -21,12 +21,15 @@ import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase;
import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.RegisterExtension;
+import javax.annotation.Nullable;
+
import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
/** Base class for Flink union read test. */
@@ -60,7 +63,12 @@ public class FlinkUnionReadTestBase extends
FlinkPaimonTieringTestBase {
return FLUSS_CLUSTER_EXTENSION;
}
- private void buildStreamTEnv() {
+ protected StreamTableEnvironment buildSteamTEnv(@Nullable String
savepointPath) {
+ Configuration conf = new Configuration();
+ if (savepointPath != null) {
+ conf.setString("execution.savepoint.path", savepointPath);
+ execEnv.configure(conf);
+ }
String bootstrapServers = String.join(",",
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
// create table environment
streamTEnv = StreamTableEnvironment.create(execEnv,
EnvironmentSettings.inStreamingMode());
@@ -71,6 +79,11 @@ public class FlinkUnionReadTestBase extends
FlinkPaimonTieringTestBase {
CATALOG_NAME, BOOTSTRAP_SERVERS.key(),
bootstrapServers));
streamTEnv.executeSql("use catalog " + CATALOG_NAME);
streamTEnv.executeSql("use " + DEFAULT_DB);
+ return streamTEnv;
+ }
+
+ private void buildStreamTEnv() {
+ buildSteamTEnv(null);
}
public void buildBatchTEnv() {
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index 5c2ab1f1a..91cb4e0f9 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -282,6 +282,12 @@ public abstract class FlinkPaimonTieringTestBase {
protected long createFullTypeLogTable(TablePath tablePath, int bucketNum,
boolean isPartitioned)
throws Exception {
+ return createFullTypeLogTable(tablePath, bucketNum, isPartitioned,
true);
+ }
+
+ protected long createFullTypeLogTable(
+ TablePath tablePath, int bucketNum, boolean isPartitioned, boolean
lakeEnabled)
+ throws Exception {
Schema.Builder schemaBuilder =
Schema.newBuilder()
.column("f_boolean", DataTypes.BOOLEAN())
@@ -301,10 +307,12 @@ public abstract class FlinkPaimonTieringTestBase {
.column("f_binary", DataTypes.BINARY(4));
TableDescriptor.Builder tableBuilder =
- TableDescriptor.builder()
- .distributedBy(bucketNum, "f_int")
- .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true")
- .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500));
+ TableDescriptor.builder().distributedBy(bucketNum, "f_int");
+ if (lakeEnabled) {
+ tableBuilder
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true")
+ .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500));
+ }
if (isPartitioned) {
schemaBuilder.column("p", DataTypes.STRING());