This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 164542f2b [lake/iceberg] Iceberg support union read for primary key 
table in streaming mode (#1708)
164542f2b is described below

commit 164542f2bcdd05dc2fa279f3e0de3a64017c7ee5
Author: Junbo Wang <[email protected]>
AuthorDate: Wed Sep 17 10:19:34 2025 +0800

    [lake/iceberg] Iceberg support union read for primary key table in 
streaming mode (#1708)
---
 .../apache/fluss/flink/utils/FlinkTestBase.java    |   2 +-
 fluss-lake/fluss-lake-iceberg/pom.xml              |   1 +
 .../flink/FlinkUnionReadLogTableITCase.java        | 132 +++++++
 .../flink/FlinkUnionReadPrimaryKeyTableITCase.java | 417 +++++++++++++++++++++
 .../lake/iceberg/flink/FlinkUnionReadTestBase.java |  24 ++
 .../testutils/FlinkIcebergTieringTestBase.java     |  35 ++
 .../paimon/flink/FlinkUnionReadLogTableITCase.java |   6 +-
 .../flink/FlinkUnionReadPrimaryKeyTableITCase.java |   6 +-
 .../lake/paimon/flink/FlinkUnionReadTestBase.java  |   4 +-
 .../testutils/FlinkPaimonTieringTestBase.java      |   2 +-
 10 files changed, 619 insertions(+), 10 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
index d4ac97bf0..2f0b47a4e 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
@@ -189,7 +189,7 @@ public class FlinkTestBase extends AbstractTestBase {
 
     /**
      * Wait until the default number of partitions is created. Return the map 
from partition id to
-     * partition name. .
+     * partition name.
      */
     public static Map<Long, String> waitUntilPartitions(
             ZooKeeperClient zooKeeperClient, TablePath tablePath) {
diff --git a/fluss-lake/fluss-lake-iceberg/pom.xml 
b/fluss-lake/fluss-lake-iceberg/pom.xml
index b5fe686b0..46fcbab1f 100644
--- a/fluss-lake/fluss-lake-iceberg/pom.xml
+++ b/fluss-lake/fluss-lake-iceberg/pom.xml
@@ -81,6 +81,7 @@
             <artifactId>fluss-flink-common</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
+            <type>test-jar</type>
         </dependency>
         <dependency>
             <groupId>org.apache.fluss</groupId>
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
index 5b07b68f4..06dd90eec 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
@@ -30,28 +30,40 @@ import org.apache.fluss.row.TimestampNtz;
 import org.apache.fluss.types.DataTypes;
 
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.CollectionUtil;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import javax.annotation.Nullable;
 
+import java.io.File;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder;
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder;
 import static org.apache.fluss.testutils.DataTestUtils.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test union read log table with full type. */
 public class FlinkUnionReadLogTableITCase extends FlinkUnionReadTestBase {
+    @TempDir public static File savepointDir;
+
     @BeforeAll
     protected static void beforeAll() {
         FlinkUnionReadTestBase.beforeAll();
@@ -129,6 +141,114 @@ public class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
         }
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testReadLogTableInStreamMode(boolean isPartitioned) throws Exception {
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName = "stream_logTable_" + (isPartitioned ? "partitioned" 
: "non_partitioned");
+
+        TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+        List<Row> writtenRows = new LinkedList<>();
+        long tableId = prepareLogTable(t1, DEFAULT_BUCKET_NUM, isPartitioned, 
writtenRows);
+        // wait until records has been synced
+        waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+        // now, start to read the log table, which will read iceberg
+        // may read fluss or not, depends on the log offset of iceberg snapshot
+        CloseableIterator<Row> actual =
+                streamTEnv.executeSql("select * from " + tableName).collect();
+        assertResultsIgnoreOrder(
+                actual, 
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
+
+        // cancel the tiering job
+        jobClient.cancel().get();
+
+        // write some log data again
+        writtenRows.addAll(writeRows(t1, 3, isPartitioned));
+
+        // query the log table again and check the data
+        // it should read both iceberg snapshot and fluss log
+        actual =
+                streamTEnv
+                        .executeSql(
+                                "select * from "
+                                        + tableName
+                                        + " /*+ 
OPTIONS('scan.partition.discovery.interval'='100ms') */")
+                        .collect();
+        if (isPartitioned) {
+            // we write to a new partition to verify partition discovery
+            writtenRows.addAll(writeFullTypeRows(t1, 10, "3027"));
+        }
+        assertResultsIgnoreOrder(
+                actual, 
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testUnionReadLogTableFailover(boolean isPartitioned) throws Exception 
{
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName1 =
+                "restore_logTable_" + (isPartitioned ? "partitioned" : 
"non_partitioned");
+        String resultTableName =
+                "result_table" + (isPartitioned ? "partitioned" : 
"non_partitioned");
+
+        TablePath table1 = TablePath.of(DEFAULT_DB, tableName1);
+        TablePath resultTable = TablePath.of(DEFAULT_DB, resultTableName);
+        List<Row> writtenRows = new LinkedList<>();
+        long tableId = prepareLogTable(table1, DEFAULT_BUCKET_NUM, 
isPartitioned, writtenRows);
+        // wait until records has been synced
+        waitUntilBucketSynced(table1, tableId, DEFAULT_BUCKET_NUM, 
isPartitioned);
+
+        StreamTableEnvironment streamTEnv = buildStreamTEnv(null);
+        // now, start to read the log table to write to a fluss result table
+        // may read fluss or not, depends on the log offset of iceberg snapshot
+        createFullTypeLogTable(resultTable, DEFAULT_BUCKET_NUM, isPartitioned, 
false);
+        TableResult insertResult =
+                streamTEnv.executeSql(
+                        "insert into " + resultTableName + " select * from " + 
tableName1);
+
+        CloseableIterator<Row> actual =
+                streamTEnv.executeSql("select * from " + 
resultTableName).collect();
+        if (isPartitioned) {
+            assertRowResultsIgnoreOrder(actual, writtenRows, false);
+        } else {
+            assertResultsExactOrder(actual, writtenRows, false);
+        }
+
+        // now, stop the job with save point
+        String savepointPath =
+                insertResult
+                        .getJobClient()
+                        .get()
+                        .stopWithSavepoint(
+                                false,
+                                savepointDir.getAbsolutePath(),
+                                SavepointFormatType.CANONICAL)
+                        .get();
+
+        // re buildStreamTEnv
+        streamTEnv = buildStreamTEnv(savepointPath);
+        insertResult =
+                streamTEnv.executeSql(
+                        "insert into " + resultTableName + " select * from " + 
tableName1);
+
+        // write some log data again
+        List<Row> rows = writeRows(table1, 3, isPartitioned);
+        if (isPartitioned) {
+            assertRowResultsIgnoreOrder(actual, rows, true);
+        } else {
+            assertResultsExactOrder(actual, rows, true);
+        }
+
+        // cancel jobs
+        insertResult.getJobClient().get().cancel().get();
+        jobClient.cancel().get();
+    }
+
     private long prepareLogTable(
             TablePath tablePath, int bucketNum, boolean isPartitioned, 
List<Row> flinkRows)
             throws Exception {
@@ -152,6 +272,12 @@ public class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
 
     protected long createFullTypeLogTable(TablePath tablePath, int bucketNum, 
boolean isPartitioned)
             throws Exception {
+        return createFullTypeLogTable(tablePath, bucketNum, isPartitioned, 
true);
+    }
+
+    protected long createFullTypeLogTable(
+            TablePath tablePath, int bucketNum, boolean isPartitioned, boolean 
lakeEnabled)
+            throws Exception {
         Schema.Builder schemaBuilder =
                 Schema.newBuilder()
                         .column("f_boolean", DataTypes.BOOLEAN())
@@ -176,6 +302,12 @@ public class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
                         .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
                         .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
 
+        if (lakeEnabled) {
+            tableBuilder
+                    .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
+                    .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
+        }
+
         if (isPartitioned) {
             schemaBuilder.column("p", DataTypes.STRING());
             tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, 
true);
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
new file mode 100644
index 000000000..603947f5b
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.flink;
+
+import org.apache.fluss.config.AutoPartitionTimeUnit;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.server.replica.Replica;
+import org.apache.fluss.types.DataTypes;
+
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder;
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+
+/** Test case for union read primary key table. */
+public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName =
+                "stream_pk_table_full" + (isPartitioned ? "_partitioned" : 
"_non_partitioned");
+        TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+        Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+        // create table & write initial data
+        long tableId =
+                preparePKTableFullType(t1, DEFAULT_BUCKET_NUM, isPartitioned, 
bucketLogEndOffset);
+
+        // wait unit records have been synced
+        waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+        // check the status of replica after synced
+        assertReplicaStatus(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned, 
bucketLogEndOffset);
+
+        // will read iceberg snapshot, should only +I since no change log
+        List<Row> expectedRows = new ArrayList<>();
+        if (isPartitioned) {
+            for (String partition : waitUntilPartitions(t1).values()) {
+                expectedRows.add(
+                        Row.of(
+                                false,
+                                (byte) 1,
+                                (short) 2,
+                                3,
+                                4L,
+                                5.1f,
+                                6.0d,
+                                "string",
+                                Decimal.fromUnscaledLong(9, 5, 2),
+                                Decimal.fromBigDecimal(new 
java.math.BigDecimal(10), 20, 0),
+                                TimestampLtz.fromEpochMillis(1698235273182L),
+                                TimestampLtz.fromEpochMillis(1698235273182L, 
5000),
+                                TimestampNtz.fromMillis(1698235273183L),
+                                TimestampNtz.fromMillis(1698235273183L, 6000),
+                                new byte[] {1, 2, 3, 4},
+                                partition));
+                expectedRows.add(
+                        Row.of(
+                                true,
+                                (byte) 10,
+                                (short) 20,
+                                30,
+                                40L,
+                                50.1f,
+                                60.0d,
+                                "another_string",
+                                Decimal.fromUnscaledLong(90, 5, 2),
+                                Decimal.fromBigDecimal(new 
java.math.BigDecimal(100), 20, 0),
+                                TimestampLtz.fromEpochMillis(1698235273200L),
+                                TimestampLtz.fromEpochMillis(1698235273200L, 
5000),
+                                TimestampNtz.fromMillis(1698235273201L),
+                                TimestampNtz.fromMillis(1698235273201L, 6000),
+                                new byte[] {1, 2, 3, 4},
+                                partition));
+            }
+        } else {
+            expectedRows =
+                    Arrays.asList(
+                            Row.of(
+                                    false,
+                                    (byte) 1,
+                                    (short) 2,
+                                    3,
+                                    4L,
+                                    5.1f,
+                                    6.0d,
+                                    "string",
+                                    Decimal.fromUnscaledLong(9, 5, 2),
+                                    Decimal.fromBigDecimal(new 
java.math.BigDecimal(10), 20, 0),
+                                    
TimestampLtz.fromEpochMillis(1698235273182L),
+                                    
TimestampLtz.fromEpochMillis(1698235273182L, 5000),
+                                    TimestampNtz.fromMillis(1698235273183L),
+                                    TimestampNtz.fromMillis(1698235273183L, 
6000),
+                                    new byte[] {1, 2, 3, 4},
+                                    null),
+                            Row.of(
+                                    true,
+                                    (byte) 10,
+                                    (short) 20,
+                                    30,
+                                    40L,
+                                    50.1f,
+                                    60.0d,
+                                    "another_string",
+                                    Decimal.fromUnscaledLong(90, 5, 2),
+                                    Decimal.fromBigDecimal(new 
java.math.BigDecimal(100), 20, 0),
+                                    
TimestampLtz.fromEpochMillis(1698235273200L),
+                                    
TimestampLtz.fromEpochMillis(1698235273200L, 5000),
+                                    TimestampNtz.fromMillis(1698235273201L),
+                                    TimestampNtz.fromMillis(1698235273201L, 
6000),
+                                    new byte[] {1, 2, 3, 4},
+                                    null));
+        }
+
+        String query = "select * from " + tableName;
+        CloseableIterator<Row> actual = streamTEnv.executeSql(query).collect();
+        assertRowResultsIgnoreOrder(actual, expectedRows, false);
+
+        // stop lake tiering service
+        jobClient.cancel().get();
+
+        // write a row again
+        if (isPartitioned) {
+            Map<Long, String> partitionNameById = waitUntilPartitions(t1);
+            for (String partition : partitionNameById.values()) {
+                writeFullTypeRow(t1, partition);
+            }
+        } else {
+            writeFullTypeRow(t1, null);
+        }
+
+        // should generate -U & +U
+        List<Row> expectedRows2 = new ArrayList<>();
+        if (isPartitioned) {
+            for (String partition : waitUntilPartitions(t1).values()) {
+                expectedRows2.add(
+                        Row.ofKind(
+                                RowKind.UPDATE_BEFORE,
+                                true,
+                                (byte) 10,
+                                (short) 20,
+                                30,
+                                40L,
+                                50.1f,
+                                60.0d,
+                                "another_string",
+                                Decimal.fromUnscaledLong(90, 5, 2),
+                                Decimal.fromBigDecimal(new 
java.math.BigDecimal(100), 20, 0),
+                                TimestampLtz.fromEpochMillis(1698235273200L),
+                                TimestampLtz.fromEpochMillis(1698235273200L, 
5000),
+                                TimestampNtz.fromMillis(1698235273201L),
+                                TimestampNtz.fromMillis(1698235273201L, 6000),
+                                new byte[] {1, 2, 3, 4},
+                                partition));
+                expectedRows2.add(
+                        Row.ofKind(
+                                RowKind.UPDATE_AFTER,
+                                true,
+                                (byte) 100,
+                                (short) 200,
+                                30,
+                                400L,
+                                500.1f,
+                                600.0d,
+                                "another_string_2",
+                                Decimal.fromUnscaledLong(900, 5, 2),
+                                Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                TimestampLtz.fromEpochMillis(1698235273400L),
+                                TimestampLtz.fromEpochMillis(1698235273400L, 
7000),
+                                TimestampNtz.fromMillis(1698235273501L),
+                                TimestampNtz.fromMillis(1698235273501L, 8000),
+                                new byte[] {5, 6, 7, 8},
+                                partition));
+            }
+        } else {
+            expectedRows2.add(
+                    Row.ofKind(
+                            RowKind.UPDATE_BEFORE,
+                            true,
+                            (byte) 10,
+                            (short) 20,
+                            30,
+                            40L,
+                            50.1f,
+                            60.0d,
+                            "another_string",
+                            Decimal.fromUnscaledLong(90, 5, 2),
+                            Decimal.fromBigDecimal(new 
java.math.BigDecimal(100), 20, 0),
+                            TimestampLtz.fromEpochMillis(1698235273200L),
+                            TimestampLtz.fromEpochMillis(1698235273200L, 5000),
+                            TimestampNtz.fromMillis(1698235273201L),
+                            TimestampNtz.fromMillis(1698235273201L, 6000),
+                            new byte[] {1, 2, 3, 4},
+                            null));
+            expectedRows2.add(
+                    Row.ofKind(
+                            RowKind.UPDATE_AFTER,
+                            true,
+                            (byte) 100,
+                            (short) 200,
+                            30,
+                            400L,
+                            500.1f,
+                            600.0d,
+                            "another_string_2",
+                            Decimal.fromUnscaledLong(900, 5, 2),
+                            Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                            TimestampLtz.fromEpochMillis(1698235273400L),
+                            TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+                            TimestampNtz.fromMillis(1698235273501L),
+                            TimestampNtz.fromMillis(1698235273501L, 8000),
+                            new byte[] {5, 6, 7, 8},
+                            null));
+        }
+
+        if (isPartitioned) {
+            assertRowResultsIgnoreOrder(actual, expectedRows2, true);
+        } else {
+            assertResultsExactOrder(actual, expectedRows2, true);
+        }
+
+        // query again
+        actual = streamTEnv.executeSql(query).collect();
+        List<Row> totalExpectedRows = new ArrayList<>(expectedRows);
+        totalExpectedRows.addAll(expectedRows2);
+
+        if (isPartitioned) {
+            assertRowResultsIgnoreOrder(actual, totalExpectedRows, true);
+        } else {
+            assertResultsExactOrder(actual, totalExpectedRows, true);
+        }
+    }
+
+    private void writeFullTypeRow(TablePath tablePath, String partition) 
throws Exception {
+        List<InternalRow> rows =
+                Collections.singletonList(
+                        row(
+                                true,
+                                (byte) 100,
+                                (short) 200,
+                                30,
+                                400L,
+                                500.1f,
+                                600.0d,
+                                "another_string_2",
+                                Decimal.fromUnscaledLong(900, 5, 2),
+                                Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                TimestampLtz.fromEpochMillis(1698235273400L),
+                                TimestampLtz.fromEpochMillis(1698235273400L, 
7000),
+                                TimestampNtz.fromMillis(1698235273501L),
+                                TimestampNtz.fromMillis(1698235273501L, 8000),
+                                new byte[] {5, 6, 7, 8},
+                                partition));
+        writeRows(tablePath, rows, false);
+    }
+
+    private long preparePKTableFullType(
+            TablePath tablePath,
+            int bucketNum,
+            boolean isPartitioned,
+            Map<TableBucket, Long> bucketLogEndOffset)
+            throws Exception {
+        long tableId = createPkTableFullType(tablePath, bucketNum, 
isPartitioned);
+        if (isPartitioned) {
+            Map<Long, String> partitionNameById = 
waitUntilPartitions(tablePath);
+            for (String partition : partitionNameById.values()) {
+                for (int i = 0; i < 2; i++) {
+                    List<InternalRow> rows = generateKvRowsFullType(partition);
+                    // write records
+                    writeRows(tablePath, rows, false);
+                }
+            }
+            for (Long partitionId : partitionNameById.keySet()) {
+                bucketLogEndOffset.putAll(getBucketLogEndOffset(tableId, 
bucketNum, partitionId));
+            }
+        } else {
+            for (int i = 0; i < 2; i++) {
+                List<InternalRow> rows = generateKvRowsFullType(null);
+                // write records
+                writeRows(tablePath, rows, false);
+            }
+            bucketLogEndOffset.putAll(getBucketLogEndOffset(tableId, 
bucketNum, null));
+        }
+        return tableId;
+    }
+
+    protected long createPkTableFullType(TablePath tablePath, int bucketNum, 
boolean isPartitioned)
+            throws Exception {
+        Schema.Builder schemaBuilder =
+                Schema.newBuilder()
+                        .column("c1", DataTypes.BOOLEAN())
+                        .column("c2", DataTypes.TINYINT())
+                        .column("c3", DataTypes.SMALLINT())
+                        .column("c4", DataTypes.INT())
+                        .column("c5", DataTypes.BIGINT())
+                        .column("c6", DataTypes.FLOAT())
+                        .column("c7", DataTypes.DOUBLE())
+                        .column("c8", DataTypes.STRING())
+                        .column("c9", DataTypes.DECIMAL(5, 2))
+                        .column("c10", DataTypes.DECIMAL(20, 0))
+                        .column("c11", DataTypes.TIMESTAMP_LTZ(3))
+                        .column("c12", DataTypes.TIMESTAMP_LTZ(6))
+                        .column("c13", DataTypes.TIMESTAMP(3))
+                        .column("c14", DataTypes.TIMESTAMP(6))
+                        .column("c15", DataTypes.BINARY(4))
+                        .column("c16", DataTypes.STRING());
+
+        TableDescriptor.Builder tableBuilder =
+                TableDescriptor.builder()
+                        .distributedBy(bucketNum)
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
+                        .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
+
+        if (isPartitioned) {
+            tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, 
true);
+            tableBuilder.partitionedBy("c16");
+            schemaBuilder.primaryKey("c4", "c16");
+            tableBuilder.property(
+                    ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, 
AutoPartitionTimeUnit.YEAR);
+        } else {
+            schemaBuilder.primaryKey("c4");
+        }
+        tableBuilder.schema(schemaBuilder.build());
+        return createTable(tablePath, tableBuilder.build());
+    }
+
+    private List<InternalRow> generateKvRowsFullType(@Nullable String 
partition) {
+        return Arrays.asList(
+                row(
+                        false,
+                        (byte) 1,
+                        (short) 2,
+                        3,
+                        4L,
+                        5.1f,
+                        6.0d,
+                        "string",
+                        Decimal.fromUnscaledLong(9, 5, 2),
+                        Decimal.fromBigDecimal(new java.math.BigDecimal(10), 
20, 0),
+                        TimestampLtz.fromEpochMillis(1698235273182L),
+                        TimestampLtz.fromEpochMillis(1698235273182L, 5000),
+                        TimestampNtz.fromMillis(1698235273183L),
+                        TimestampNtz.fromMillis(1698235273183L, 6000),
+                        new byte[] {1, 2, 3, 4},
+                        partition),
+                row(
+                        true,
+                        (byte) 10,
+                        (short) 20,
+                        30,
+                        40L,
+                        50.1f,
+                        60.0d,
+                        "another_string",
+                        Decimal.fromUnscaledLong(90, 5, 2),
+                        Decimal.fromBigDecimal(new java.math.BigDecimal(100), 
20, 0),
+                        TimestampLtz.fromEpochMillis(1698235273200L),
+                        TimestampLtz.fromEpochMillis(1698235273200L, 5000),
+                        TimestampNtz.fromMillis(1698235273201L),
+                        TimestampNtz.fromMillis(1698235273201L, 6000),
+                        new byte[] {1, 2, 3, 4},
+                        partition));
+    }
+
+    private Map<TableBucket, Long> getBucketLogEndOffset(
+            long tableId, int bucketNum, Long partitionId) {
+        Map<TableBucket, Long> bucketLogEndOffsets = new HashMap<>();
+        for (int i = 0; i < bucketNum; i++) {
+            TableBucket tableBucket = new TableBucket(tableId, partitionId, i);
+            Replica replica = getLeaderReplica(tableBucket);
+            bucketLogEndOffsets.put(tableBucket, 
replica.getLocalLogEndOffset());
+        }
+        return bucketLogEndOffsets;
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadTestBase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadTestBase.java
index 8438c2499..adb9a8686 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadTestBase.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadTestBase.java
@@ -21,11 +21,14 @@ package org.apache.fluss.lake.iceberg.flink;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 
+import javax.annotation.Nullable;
+
 import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
 
 /** Base class for iceberg union read test. */
@@ -35,6 +38,7 @@ class FlinkUnionReadTestBase extends 
FlinkIcebergTieringTestBase {
     protected static final String CATALOG_NAME = "test_iceberg_lake";
     protected static final int DEFAULT_BUCKET_NUM = 1;
     StreamTableEnvironment batchTEnv;
+    StreamTableEnvironment streamTEnv;
 
     @BeforeAll
     protected static void beforeAll() {
@@ -54,5 +58,25 @@ class FlinkUnionReadTestBase extends 
FlinkIcebergTieringTestBase {
                         CATALOG_NAME, BOOTSTRAP_SERVERS.key(), 
bootstrapServers));
         batchTEnv.executeSql("use catalog " + CATALOG_NAME);
         batchTEnv.executeSql("use " + DEFAULT_DB);
+        buildStreamTEnv(null);
+    }
+
+    protected StreamTableEnvironment buildStreamTEnv(@Nullable String 
savepointPath) {
+        Configuration conf = new Configuration();
+        if (savepointPath != null) {
+            conf.setString("execution.savepoint.path", savepointPath);
+            execEnv.configure(conf);
+        }
+        String bootstrapServers = String.join(",", 
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
+        // create table environment
+        streamTEnv = StreamTableEnvironment.create(execEnv, 
EnvironmentSettings.inStreamingMode());
+        // crate catalog using sql
+        streamTEnv.executeSql(
+                String.format(
+                        "create catalog %s with ('type' = 'fluss', '%s' = 
'%s')",
+                        CATALOG_NAME, BOOTSTRAP_SERVERS.key(), 
bootstrapServers));
+        streamTEnv.executeSql("use catalog " + CATALOG_NAME);
+        streamTEnv.executeSql("use " + DEFAULT_DB);
+        return streamTEnv;
     }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
index 81bfb0bd0..e9bd27bd5 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
@@ -219,6 +219,29 @@ public class FlinkIcebergTieringTestBase {
         return admin.getTableInfo(tablePath).get().getTableId();
     }
 
+    protected void assertReplicaStatus(
+            TablePath tablePath,
+            long tableId,
+            int bucketCount,
+            boolean isPartitioned,
+            Map<TableBucket, Long> expectedLogEndOffset) {
+        if (isPartitioned) {
+            Map<Long, String> partitionById =
+                    
waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), tablePath);
+            for (Long partitionId : partitionById.keySet()) {
+                for (int i = 0; i < bucketCount; i++) {
+                    TableBucket tableBucket = new TableBucket(tableId, 
partitionId, i);
+                    assertReplicaStatus(tableBucket, 
expectedLogEndOffset.get(tableBucket));
+                }
+            }
+        } else {
+            for (int i = 0; i < bucketCount; i++) {
+                TableBucket tableBucket = new TableBucket(tableId, i);
+                assertReplicaStatus(tableBucket, 
expectedLogEndOffset.get(tableBucket));
+            }
+        }
+    }
+
     protected void assertReplicaStatus(TableBucket tb, long 
expectedLogEndOffset) {
         retry(
                 Duration.ofMinutes(1),
@@ -231,6 +254,18 @@ public class FlinkIcebergTieringTestBase {
                 });
     }
 
+    /**
+     * Wait until the default number of partitions is created. Return the map 
from partition id to
+     * partition name.
+     */
+    public static Map<Long, String> waitUntilPartitions(
+            ZooKeeperClient zooKeeperClient, TablePath tablePath) {
+        return waitUntilPartitions(
+                zooKeeperClient,
+                tablePath,
+                
ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.defaultValue());
+    }
+
     public static Map<Long, String> waitUntilPartitions(TablePath tablePath) {
         return waitUntilPartitions(
                 FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(),
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
index 3cd682ab2..cdb88dcbf 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
@@ -196,7 +196,7 @@ class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
         // wait until records has been synced
         waitUntilBucketSynced(table1, tableId, DEFAULT_BUCKET_NUM, 
isPartitioned);
 
-        StreamTableEnvironment streamTEnv = buildSteamTEnv(null);
+        StreamTableEnvironment streamTEnv = buildStreamTEnv(null);
         // now, start to read the log table to write to a fluss result table
         // may read fluss or not, depends on the log offset of paimon snapshot
         createFullTypeLogTable(resultTable, DEFAULT_BUCKET_NUM, isPartitioned, 
false);
@@ -223,8 +223,8 @@ class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
                                 SavepointFormatType.CANONICAL)
                         .get();
 
-        // re buildSteamTEnv
-        streamTEnv = buildSteamTEnv(savepointPath);
+        // re buildStreamTEnv
+        streamTEnv = buildStreamTEnv(savepointPath);
         insertResult =
                 streamTEnv.executeSql(
                         "insert into " + resultTableName + " select * from " + 
tableName1);
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
index a5418d163..a7dce3836 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -633,7 +633,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase {
         // create result table
         createSimplePkTable(resultTable, DEFAULT_BUCKET_NUM, isPartitioned, 
false);
         // union read lake data
-        StreamTableEnvironment streamTEnv = buildSteamTEnv(null);
+        StreamTableEnvironment streamTEnv = buildStreamTEnv(null);
         TableResult insertResult =
                 streamTEnv.executeSql(
                         "insert into " + resultTableName + " select * from " + 
tableName1);
@@ -670,8 +670,8 @@ class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase {
                                 SavepointFormatType.CANONICAL)
                         .get();
 
-        // re buildSteamTEnv
-        streamTEnv = buildSteamTEnv(savepointPath);
+        // re buildStreamTEnv
+        streamTEnv = buildStreamTEnv(savepointPath);
         insertResult =
                 streamTEnv.executeSql(
                         "insert into " + resultTableName + " select * from " + 
tableName1);
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
index 0caa3fb36..94ba4fd1c 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
@@ -63,7 +63,7 @@ public class FlinkUnionReadTestBase extends 
FlinkPaimonTieringTestBase {
         return FLUSS_CLUSTER_EXTENSION;
     }
 
-    protected StreamTableEnvironment buildSteamTEnv(@Nullable String 
savepointPath) {
+    protected StreamTableEnvironment buildStreamTEnv(@Nullable String 
savepointPath) {
         Configuration conf = new Configuration();
         if (savepointPath != null) {
             conf.setString("execution.savepoint.path", savepointPath);
@@ -83,7 +83,7 @@ public class FlinkUnionReadTestBase extends 
FlinkPaimonTieringTestBase {
     }
 
     private void buildStreamTEnv() {
-        buildSteamTEnv(null);
+        buildStreamTEnv(null);
     }
 
     public void buildBatchTEnv() {
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index 91cb4e0f9..7405841b5 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -206,7 +206,7 @@ public abstract class FlinkPaimonTieringTestBase {
 
     /**
      * Wait until the default number of partitions is created. Return the map 
from partition id to
-     * partition name. .
+     * partition name.
      */
     public static Map<Long, String> waitUntilPartitions(
             ZooKeeperClient zooKeeperClient, TablePath tablePath) {


Reply via email to