This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 164542f2b [lake/iceberg] Iceberg support union read for primary key
table in streaming mode (#1708)
164542f2b is described below
commit 164542f2bcdd05dc2fa279f3e0de3a64017c7ee5
Author: Junbo Wang <[email protected]>
AuthorDate: Wed Sep 17 10:19:34 2025 +0800
[lake/iceberg] Iceberg support union read for primary key table in
streaming mode (#1708)
---
.../apache/fluss/flink/utils/FlinkTestBase.java | 2 +-
fluss-lake/fluss-lake-iceberg/pom.xml | 1 +
.../flink/FlinkUnionReadLogTableITCase.java | 132 +++++++
.../flink/FlinkUnionReadPrimaryKeyTableITCase.java | 417 +++++++++++++++++++++
.../lake/iceberg/flink/FlinkUnionReadTestBase.java | 24 ++
.../testutils/FlinkIcebergTieringTestBase.java | 35 ++
.../paimon/flink/FlinkUnionReadLogTableITCase.java | 6 +-
.../flink/FlinkUnionReadPrimaryKeyTableITCase.java | 6 +-
.../lake/paimon/flink/FlinkUnionReadTestBase.java | 4 +-
.../testutils/FlinkPaimonTieringTestBase.java | 2 +-
10 files changed, 619 insertions(+), 10 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
index d4ac97bf0..2f0b47a4e 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
@@ -189,7 +189,7 @@ public class FlinkTestBase extends AbstractTestBase {
/**
* Wait until the default number of partitions is created. Return the map
from partition id to
- * partition name. .
+ * partition name.
*/
public static Map<Long, String> waitUntilPartitions(
ZooKeeperClient zooKeeperClient, TablePath tablePath) {
diff --git a/fluss-lake/fluss-lake-iceberg/pom.xml
b/fluss-lake/fluss-lake-iceberg/pom.xml
index b5fe686b0..46fcbab1f 100644
--- a/fluss-lake/fluss-lake-iceberg/pom.xml
+++ b/fluss-lake/fluss-lake-iceberg/pom.xml
@@ -81,6 +81,7 @@
<artifactId>fluss-flink-common</artifactId>
<version>${project.version}</version>
<scope>test</scope>
+ <type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.fluss</groupId>
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
index 5b07b68f4..06dd90eec 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
@@ -30,28 +30,40 @@ import org.apache.fluss.row.TimestampNtz;
import org.apache.fluss.types.DataTypes;
import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import javax.annotation.Nullable;
+import java.io.File;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder;
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder;
import static org.apache.fluss.testutils.DataTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
/** Test union read log table with full type. */
public class FlinkUnionReadLogTableITCase extends FlinkUnionReadTestBase {
+ @TempDir public static File savepointDir;
+
@BeforeAll
protected static void beforeAll() {
FlinkUnionReadTestBase.beforeAll();
@@ -129,6 +141,114 @@ public class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testReadLogTableInStreamMode(boolean isPartitioned) throws Exception {
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ String tableName = "stream_logTable_" + (isPartitioned ? "partitioned"
: "non_partitioned");
+
+ TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+ List<Row> writtenRows = new LinkedList<>();
+ long tableId = prepareLogTable(t1, DEFAULT_BUCKET_NUM, isPartitioned,
writtenRows);
+ // wait until records has been synced
+ waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+ // now, start to read the log table, which will read iceberg
+ // may read fluss or not, depends on the log offset of iceberg snapshot
+ CloseableIterator<Row> actual =
+ streamTEnv.executeSql("select * from " + tableName).collect();
+ assertResultsIgnoreOrder(
+ actual,
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
+
+ // cancel the tiering job
+ jobClient.cancel().get();
+
+ // write some log data again
+ writtenRows.addAll(writeRows(t1, 3, isPartitioned));
+
+ // query the log table again and check the data
+ // it should read both iceberg snapshot and fluss log
+ actual =
+ streamTEnv
+ .executeSql(
+ "select * from "
+ + tableName
+ + " /*+
OPTIONS('scan.partition.discovery.interval'='100ms') */")
+ .collect();
+ if (isPartitioned) {
+ // we write to a new partition to verify partition discovery
+ writtenRows.addAll(writeFullTypeRows(t1, 10, "3027"));
+ }
+ assertResultsIgnoreOrder(
+ actual,
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testUnionReadLogTableFailover(boolean isPartitioned) throws Exception
{
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ String tableName1 =
+ "restore_logTable_" + (isPartitioned ? "partitioned" :
"non_partitioned");
+ String resultTableName =
+ "result_table" + (isPartitioned ? "partitioned" :
"non_partitioned");
+
+ TablePath table1 = TablePath.of(DEFAULT_DB, tableName1);
+ TablePath resultTable = TablePath.of(DEFAULT_DB, resultTableName);
+ List<Row> writtenRows = new LinkedList<>();
+ long tableId = prepareLogTable(table1, DEFAULT_BUCKET_NUM,
isPartitioned, writtenRows);
+ // wait until records has been synced
+ waitUntilBucketSynced(table1, tableId, DEFAULT_BUCKET_NUM,
isPartitioned);
+
+ StreamTableEnvironment streamTEnv = buildStreamTEnv(null);
+ // now, start to read the log table to write to a fluss result table
+ // may read fluss or not, depends on the log offset of iceberg snapshot
+ createFullTypeLogTable(resultTable, DEFAULT_BUCKET_NUM, isPartitioned,
false);
+ TableResult insertResult =
+ streamTEnv.executeSql(
+ "insert into " + resultTableName + " select * from " +
tableName1);
+
+ CloseableIterator<Row> actual =
+ streamTEnv.executeSql("select * from " +
resultTableName).collect();
+ if (isPartitioned) {
+ assertRowResultsIgnoreOrder(actual, writtenRows, false);
+ } else {
+ assertResultsExactOrder(actual, writtenRows, false);
+ }
+
+ // now, stop the job with save point
+ String savepointPath =
+ insertResult
+ .getJobClient()
+ .get()
+ .stopWithSavepoint(
+ false,
+ savepointDir.getAbsolutePath(),
+ SavepointFormatType.CANONICAL)
+ .get();
+
+ // re buildStreamTEnv
+ streamTEnv = buildStreamTEnv(savepointPath);
+ insertResult =
+ streamTEnv.executeSql(
+ "insert into " + resultTableName + " select * from " +
tableName1);
+
+ // write some log data again
+ List<Row> rows = writeRows(table1, 3, isPartitioned);
+ if (isPartitioned) {
+ assertRowResultsIgnoreOrder(actual, rows, true);
+ } else {
+ assertResultsExactOrder(actual, rows, true);
+ }
+
+ // cancel jobs
+ insertResult.getJobClient().get().cancel().get();
+ jobClient.cancel().get();
+ }
+
private long prepareLogTable(
TablePath tablePath, int bucketNum, boolean isPartitioned,
List<Row> flinkRows)
throws Exception {
@@ -152,6 +272,12 @@ public class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
protected long createFullTypeLogTable(TablePath tablePath, int bucketNum,
boolean isPartitioned)
throws Exception {
+ return createFullTypeLogTable(tablePath, bucketNum, isPartitioned,
true);
+ }
+
+ protected long createFullTypeLogTable(
+ TablePath tablePath, int bucketNum, boolean isPartitioned, boolean
lakeEnabled)
+ throws Exception {
Schema.Builder schemaBuilder =
Schema.newBuilder()
.column("f_boolean", DataTypes.BOOLEAN())
@@ -176,6 +302,12 @@ public class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true")
.property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500));
+ if (lakeEnabled) {
+ tableBuilder
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true")
+ .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500));
+ }
+
if (isPartitioned) {
schemaBuilder.column("p", DataTypes.STRING());
tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED,
true);
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
new file mode 100644
index 000000000..603947f5b
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.flink;
+
+import org.apache.fluss.config.AutoPartitionTimeUnit;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.server.replica.Replica;
+import org.apache.fluss.types.DataTypes;
+
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder;
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+
+/** Test case for union read primary key table. */
+public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ String tableName =
+ "stream_pk_table_full" + (isPartitioned ? "_partitioned" :
"_non_partitioned");
+ TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+ Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+ // create table & write initial data
+ long tableId =
+ preparePKTableFullType(t1, DEFAULT_BUCKET_NUM, isPartitioned,
bucketLogEndOffset);
+
+ // wait unit records have been synced
+ waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+ // check the status of replica after synced
+ assertReplicaStatus(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned,
bucketLogEndOffset);
+
+ // will read iceberg snapshot, should only +I since no change log
+ List<Row> expectedRows = new ArrayList<>();
+ if (isPartitioned) {
+ for (String partition : waitUntilPartitions(t1).values()) {
+ expectedRows.add(
+ Row.of(
+ false,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 5.1f,
+ 6.0d,
+ "string",
+ Decimal.fromUnscaledLong(9, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(10), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273182L),
+ TimestampLtz.fromEpochMillis(1698235273182L,
5000),
+ TimestampNtz.fromMillis(1698235273183L),
+ TimestampNtz.fromMillis(1698235273183L, 6000),
+ new byte[] {1, 2, 3, 4},
+ partition));
+ expectedRows.add(
+ Row.of(
+ true,
+ (byte) 10,
+ (short) 20,
+ 30,
+ 40L,
+ 50.1f,
+ 60.0d,
+ "another_string",
+ Decimal.fromUnscaledLong(90, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(100), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273200L),
+ TimestampLtz.fromEpochMillis(1698235273200L,
5000),
+ TimestampNtz.fromMillis(1698235273201L),
+ TimestampNtz.fromMillis(1698235273201L, 6000),
+ new byte[] {1, 2, 3, 4},
+ partition));
+ }
+ } else {
+ expectedRows =
+ Arrays.asList(
+ Row.of(
+ false,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 5.1f,
+ 6.0d,
+ "string",
+ Decimal.fromUnscaledLong(9, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(10), 20, 0),
+
TimestampLtz.fromEpochMillis(1698235273182L),
+
TimestampLtz.fromEpochMillis(1698235273182L, 5000),
+ TimestampNtz.fromMillis(1698235273183L),
+ TimestampNtz.fromMillis(1698235273183L,
6000),
+ new byte[] {1, 2, 3, 4},
+ null),
+ Row.of(
+ true,
+ (byte) 10,
+ (short) 20,
+ 30,
+ 40L,
+ 50.1f,
+ 60.0d,
+ "another_string",
+ Decimal.fromUnscaledLong(90, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(100), 20, 0),
+
TimestampLtz.fromEpochMillis(1698235273200L),
+
TimestampLtz.fromEpochMillis(1698235273200L, 5000),
+ TimestampNtz.fromMillis(1698235273201L),
+ TimestampNtz.fromMillis(1698235273201L,
6000),
+ new byte[] {1, 2, 3, 4},
+ null));
+ }
+
+ String query = "select * from " + tableName;
+ CloseableIterator<Row> actual = streamTEnv.executeSql(query).collect();
+ assertRowResultsIgnoreOrder(actual, expectedRows, false);
+
+ // stop lake tiering service
+ jobClient.cancel().get();
+
+ // write a row again
+ if (isPartitioned) {
+ Map<Long, String> partitionNameById = waitUntilPartitions(t1);
+ for (String partition : partitionNameById.values()) {
+ writeFullTypeRow(t1, partition);
+ }
+ } else {
+ writeFullTypeRow(t1, null);
+ }
+
+ // should generate -U & +U
+ List<Row> expectedRows2 = new ArrayList<>();
+ if (isPartitioned) {
+ for (String partition : waitUntilPartitions(t1).values()) {
+ expectedRows2.add(
+ Row.ofKind(
+ RowKind.UPDATE_BEFORE,
+ true,
+ (byte) 10,
+ (short) 20,
+ 30,
+ 40L,
+ 50.1f,
+ 60.0d,
+ "another_string",
+ Decimal.fromUnscaledLong(90, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(100), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273200L),
+ TimestampLtz.fromEpochMillis(1698235273200L,
5000),
+ TimestampNtz.fromMillis(1698235273201L),
+ TimestampNtz.fromMillis(1698235273201L, 6000),
+ new byte[] {1, 2, 3, 4},
+ partition));
+ expectedRows2.add(
+ Row.ofKind(
+ RowKind.UPDATE_AFTER,
+ true,
+ (byte) 100,
+ (short) 200,
+ 30,
+ 400L,
+ 500.1f,
+ 600.0d,
+ "another_string_2",
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(1000), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273400L),
+ TimestampLtz.fromEpochMillis(1698235273400L,
7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L, 8000),
+ new byte[] {5, 6, 7, 8},
+ partition));
+ }
+ } else {
+ expectedRows2.add(
+ Row.ofKind(
+ RowKind.UPDATE_BEFORE,
+ true,
+ (byte) 10,
+ (short) 20,
+ 30,
+ 40L,
+ 50.1f,
+ 60.0d,
+ "another_string",
+ Decimal.fromUnscaledLong(90, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(100), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273200L),
+ TimestampLtz.fromEpochMillis(1698235273200L, 5000),
+ TimestampNtz.fromMillis(1698235273201L),
+ TimestampNtz.fromMillis(1698235273201L, 6000),
+ new byte[] {1, 2, 3, 4},
+ null));
+ expectedRows2.add(
+ Row.ofKind(
+ RowKind.UPDATE_AFTER,
+ true,
+ (byte) 100,
+ (short) 200,
+ 30,
+ 400L,
+ 500.1f,
+ 600.0d,
+ "another_string_2",
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(1000), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273400L),
+ TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L, 8000),
+ new byte[] {5, 6, 7, 8},
+ null));
+ }
+
+ if (isPartitioned) {
+ assertRowResultsIgnoreOrder(actual, expectedRows2, true);
+ } else {
+ assertResultsExactOrder(actual, expectedRows2, true);
+ }
+
+ // query again
+ actual = streamTEnv.executeSql(query).collect();
+ List<Row> totalExpectedRows = new ArrayList<>(expectedRows);
+ totalExpectedRows.addAll(expectedRows2);
+
+ if (isPartitioned) {
+ assertRowResultsIgnoreOrder(actual, totalExpectedRows, true);
+ } else {
+ assertResultsExactOrder(actual, totalExpectedRows, true);
+ }
+ }
+
+ private void writeFullTypeRow(TablePath tablePath, String partition)
throws Exception {
+ List<InternalRow> rows =
+ Collections.singletonList(
+ row(
+ true,
+ (byte) 100,
+ (short) 200,
+ 30,
+ 400L,
+ 500.1f,
+ 600.0d,
+ "another_string_2",
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(1000), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273400L),
+ TimestampLtz.fromEpochMillis(1698235273400L,
7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L, 8000),
+ new byte[] {5, 6, 7, 8},
+ partition));
+ writeRows(tablePath, rows, false);
+ }
+
+ private long preparePKTableFullType(
+ TablePath tablePath,
+ int bucketNum,
+ boolean isPartitioned,
+ Map<TableBucket, Long> bucketLogEndOffset)
+ throws Exception {
+ long tableId = createPkTableFullType(tablePath, bucketNum,
isPartitioned);
+ if (isPartitioned) {
+ Map<Long, String> partitionNameById =
waitUntilPartitions(tablePath);
+ for (String partition : partitionNameById.values()) {
+ for (int i = 0; i < 2; i++) {
+ List<InternalRow> rows = generateKvRowsFullType(partition);
+ // write records
+ writeRows(tablePath, rows, false);
+ }
+ }
+ for (Long partitionId : partitionNameById.keySet()) {
+ bucketLogEndOffset.putAll(getBucketLogEndOffset(tableId,
bucketNum, partitionId));
+ }
+ } else {
+ for (int i = 0; i < 2; i++) {
+ List<InternalRow> rows = generateKvRowsFullType(null);
+ // write records
+ writeRows(tablePath, rows, false);
+ }
+ bucketLogEndOffset.putAll(getBucketLogEndOffset(tableId,
bucketNum, null));
+ }
+ return tableId;
+ }
+
+ protected long createPkTableFullType(TablePath tablePath, int bucketNum,
boolean isPartitioned)
+ throws Exception {
+ Schema.Builder schemaBuilder =
+ Schema.newBuilder()
+ .column("c1", DataTypes.BOOLEAN())
+ .column("c2", DataTypes.TINYINT())
+ .column("c3", DataTypes.SMALLINT())
+ .column("c4", DataTypes.INT())
+ .column("c5", DataTypes.BIGINT())
+ .column("c6", DataTypes.FLOAT())
+ .column("c7", DataTypes.DOUBLE())
+ .column("c8", DataTypes.STRING())
+ .column("c9", DataTypes.DECIMAL(5, 2))
+ .column("c10", DataTypes.DECIMAL(20, 0))
+ .column("c11", DataTypes.TIMESTAMP_LTZ(3))
+ .column("c12", DataTypes.TIMESTAMP_LTZ(6))
+ .column("c13", DataTypes.TIMESTAMP(3))
+ .column("c14", DataTypes.TIMESTAMP(6))
+ .column("c15", DataTypes.BINARY(4))
+ .column("c16", DataTypes.STRING());
+
+ TableDescriptor.Builder tableBuilder =
+ TableDescriptor.builder()
+ .distributedBy(bucketNum)
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true")
+ .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500));
+
+ if (isPartitioned) {
+ tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED,
true);
+ tableBuilder.partitionedBy("c16");
+ schemaBuilder.primaryKey("c4", "c16");
+ tableBuilder.property(
+ ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT,
AutoPartitionTimeUnit.YEAR);
+ } else {
+ schemaBuilder.primaryKey("c4");
+ }
+ tableBuilder.schema(schemaBuilder.build());
+ return createTable(tablePath, tableBuilder.build());
+ }
+
+ private List<InternalRow> generateKvRowsFullType(@Nullable String
partition) {
+ return Arrays.asList(
+ row(
+ false,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 5.1f,
+ 6.0d,
+ "string",
+ Decimal.fromUnscaledLong(9, 5, 2),
+ Decimal.fromBigDecimal(new java.math.BigDecimal(10),
20, 0),
+ TimestampLtz.fromEpochMillis(1698235273182L),
+ TimestampLtz.fromEpochMillis(1698235273182L, 5000),
+ TimestampNtz.fromMillis(1698235273183L),
+ TimestampNtz.fromMillis(1698235273183L, 6000),
+ new byte[] {1, 2, 3, 4},
+ partition),
+ row(
+ true,
+ (byte) 10,
+ (short) 20,
+ 30,
+ 40L,
+ 50.1f,
+ 60.0d,
+ "another_string",
+ Decimal.fromUnscaledLong(90, 5, 2),
+ Decimal.fromBigDecimal(new java.math.BigDecimal(100),
20, 0),
+ TimestampLtz.fromEpochMillis(1698235273200L),
+ TimestampLtz.fromEpochMillis(1698235273200L, 5000),
+ TimestampNtz.fromMillis(1698235273201L),
+ TimestampNtz.fromMillis(1698235273201L, 6000),
+ new byte[] {1, 2, 3, 4},
+ partition));
+ }
+
+ private Map<TableBucket, Long> getBucketLogEndOffset(
+ long tableId, int bucketNum, Long partitionId) {
+ Map<TableBucket, Long> bucketLogEndOffsets = new HashMap<>();
+ for (int i = 0; i < bucketNum; i++) {
+ TableBucket tableBucket = new TableBucket(tableId, partitionId, i);
+ Replica replica = getLeaderReplica(tableBucket);
+ bucketLogEndOffsets.put(tableBucket,
replica.getLocalLogEndOffset());
+ }
+ return bucketLogEndOffsets;
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadTestBase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadTestBase.java
index 8438c2499..adb9a8686 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadTestBase.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadTestBase.java
@@ -21,11 +21,14 @@ package org.apache.fluss.lake.iceberg.flink;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
+import javax.annotation.Nullable;
+
import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
/** Base class for iceberg union read test. */
@@ -35,6 +38,7 @@ class FlinkUnionReadTestBase extends
FlinkIcebergTieringTestBase {
protected static final String CATALOG_NAME = "test_iceberg_lake";
protected static final int DEFAULT_BUCKET_NUM = 1;
StreamTableEnvironment batchTEnv;
+ StreamTableEnvironment streamTEnv;
@BeforeAll
protected static void beforeAll() {
@@ -54,5 +58,25 @@ class FlinkUnionReadTestBase extends
FlinkIcebergTieringTestBase {
CATALOG_NAME, BOOTSTRAP_SERVERS.key(),
bootstrapServers));
batchTEnv.executeSql("use catalog " + CATALOG_NAME);
batchTEnv.executeSql("use " + DEFAULT_DB);
+ buildStreamTEnv(null);
+ }
+
+ protected StreamTableEnvironment buildStreamTEnv(@Nullable String
savepointPath) {
+ Configuration conf = new Configuration();
+ if (savepointPath != null) {
+ conf.setString("execution.savepoint.path", savepointPath);
+ execEnv.configure(conf);
+ }
+ String bootstrapServers = String.join(",",
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
+ // create table environment
+ streamTEnv = StreamTableEnvironment.create(execEnv,
EnvironmentSettings.inStreamingMode());
+ // crate catalog using sql
+ streamTEnv.executeSql(
+ String.format(
+ "create catalog %s with ('type' = 'fluss', '%s' =
'%s')",
+ CATALOG_NAME, BOOTSTRAP_SERVERS.key(),
bootstrapServers));
+ streamTEnv.executeSql("use catalog " + CATALOG_NAME);
+ streamTEnv.executeSql("use " + DEFAULT_DB);
+ return streamTEnv;
}
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
index 81bfb0bd0..e9bd27bd5 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
@@ -219,6 +219,29 @@ public class FlinkIcebergTieringTestBase {
return admin.getTableInfo(tablePath).get().getTableId();
}
+ protected void assertReplicaStatus(
+ TablePath tablePath,
+ long tableId,
+ int bucketCount,
+ boolean isPartitioned,
+ Map<TableBucket, Long> expectedLogEndOffset) {
+ if (isPartitioned) {
+ Map<Long, String> partitionById =
+
waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), tablePath);
+ for (Long partitionId : partitionById.keySet()) {
+ for (int i = 0; i < bucketCount; i++) {
+ TableBucket tableBucket = new TableBucket(tableId,
partitionId, i);
+ assertReplicaStatus(tableBucket,
expectedLogEndOffset.get(tableBucket));
+ }
+ }
+ } else {
+ for (int i = 0; i < bucketCount; i++) {
+ TableBucket tableBucket = new TableBucket(tableId, i);
+ assertReplicaStatus(tableBucket,
expectedLogEndOffset.get(tableBucket));
+ }
+ }
+ }
+
protected void assertReplicaStatus(TableBucket tb, long
expectedLogEndOffset) {
retry(
Duration.ofMinutes(1),
@@ -231,6 +254,18 @@ public class FlinkIcebergTieringTestBase {
});
}
+ /**
+ * Wait until the default number of partitions is created. Return the map
from partition id to
+ * partition name.
+ */
+ public static Map<Long, String> waitUntilPartitions(
+ ZooKeeperClient zooKeeperClient, TablePath tablePath) {
+ return waitUntilPartitions(
+ zooKeeperClient,
+ tablePath,
+
ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.defaultValue());
+ }
+
public static Map<Long, String> waitUntilPartitions(TablePath tablePath) {
return waitUntilPartitions(
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(),
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
index 3cd682ab2..cdb88dcbf 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
@@ -196,7 +196,7 @@ class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
// wait until records has been synced
waitUntilBucketSynced(table1, tableId, DEFAULT_BUCKET_NUM,
isPartitioned);
- StreamTableEnvironment streamTEnv = buildSteamTEnv(null);
+ StreamTableEnvironment streamTEnv = buildStreamTEnv(null);
// now, start to read the log table to write to a fluss result table
// may read fluss or not, depends on the log offset of paimon snapshot
createFullTypeLogTable(resultTable, DEFAULT_BUCKET_NUM, isPartitioned,
false);
@@ -223,8 +223,8 @@ class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
SavepointFormatType.CANONICAL)
.get();
- // re buildSteamTEnv
- streamTEnv = buildSteamTEnv(savepointPath);
+ // re buildStreamTEnv
+ streamTEnv = buildStreamTEnv(savepointPath);
insertResult =
streamTEnv.executeSql(
"insert into " + resultTableName + " select * from " +
tableName1);
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
index a5418d163..a7dce3836 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -633,7 +633,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase {
// create result table
createSimplePkTable(resultTable, DEFAULT_BUCKET_NUM, isPartitioned,
false);
// union read lake data
- StreamTableEnvironment streamTEnv = buildSteamTEnv(null);
+ StreamTableEnvironment streamTEnv = buildStreamTEnv(null);
TableResult insertResult =
streamTEnv.executeSql(
"insert into " + resultTableName + " select * from " +
tableName1);
@@ -670,8 +670,8 @@ class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase {
SavepointFormatType.CANONICAL)
.get();
- // re buildSteamTEnv
- streamTEnv = buildSteamTEnv(savepointPath);
+ // re buildStreamTEnv
+ streamTEnv = buildStreamTEnv(savepointPath);
insertResult =
streamTEnv.executeSql(
"insert into " + resultTableName + " select * from " +
tableName1);
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
index 0caa3fb36..94ba4fd1c 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
@@ -63,7 +63,7 @@ public class FlinkUnionReadTestBase extends
FlinkPaimonTieringTestBase {
return FLUSS_CLUSTER_EXTENSION;
}
- protected StreamTableEnvironment buildSteamTEnv(@Nullable String
savepointPath) {
+ protected StreamTableEnvironment buildStreamTEnv(@Nullable String
savepointPath) {
Configuration conf = new Configuration();
if (savepointPath != null) {
conf.setString("execution.savepoint.path", savepointPath);
@@ -83,7 +83,7 @@ public class FlinkUnionReadTestBase extends
FlinkPaimonTieringTestBase {
}
private void buildStreamTEnv() {
- buildSteamTEnv(null);
+ buildStreamTEnv(null);
}
public void buildBatchTEnv() {
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index 91cb4e0f9..7405841b5 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -206,7 +206,7 @@ public abstract class FlinkPaimonTieringTestBase {
/**
* Wait until the default number of partitions is created. Return the map
from partition id to
- * partition name. .
+ * partition name.
*/
public static Map<Long, String> waitUntilPartitions(
ZooKeeperClient zooKeeperClient, TablePath tablePath) {