This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 4c2ddc390 [flink] Add restore test for streaming union read pk table 
(#1674)
4c2ddc390 is described below

commit 4c2ddc39087a2c9189f83ede1f3f6871710b6769
Author: CaoZhen <[email protected]>
AuthorDate: Thu Sep 11 19:53:00 2025 +0800

    [flink] Add restore test for streaming union read pk table (#1674)
    
    ---------
    
    Co-authored-by: luoyuxia <[email protected]>
---
 .../fluss/flink/lake/LakeRecordRecordEmitter.java  |   2 -
 .../fluss/flink/lake/LakeSplitReaderGenerator.java |   5 +-
 .../fluss/flink/lake/LakeSplitSerializer.java      |   5 +-
 .../lake/split/LakeSnapshotAndFlussLogSplit.java   |  41 ++++-
 .../state/LakeSnapshotAndFlussLogSplitState.java   |   8 +-
 .../tiering/committer/TieringCommitOperator.java   |  34 ++--
 .../fluss/flink/lake/LakeSplitSerializerTest.java  |  22 ++-
 .../flink/FlinkUnionReadPrimaryKeyTableITCase.java | 199 ++++++++++++++++++++-
 8 files changed, 271 insertions(+), 45 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeRecordRecordEmitter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeRecordRecordEmitter.java
index 031444501..ddd492fec 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeRecordRecordEmitter.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeRecordRecordEmitter.java
@@ -57,8 +57,6 @@ public class LakeRecordRecordEmitter<OUT> {
             }
 
             ScanRecord scanRecord = recordAndPos.record();
-            // todo: may need a state to mark snapshot phase is finished
-            // just like what we do for HybridSnapshotLogSplitState
             if (scanRecord.logOffset() >= 0) {
                 // record is with a valid offset, means it's in incremental 
phase,
                 // update the log offset
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
index aa3eef17f..39b4e08d4 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
@@ -55,7 +55,10 @@ public class LakeSplitReaderGenerator {
         if (split instanceof LakeSnapshotSplit) {
             boundedSplits.add(split);
         } else if (split instanceof LakeSnapshotAndFlussLogSplit) {
-            boundedSplits.add(split);
+            // lake split not finished, add to it
+            if (!((LakeSnapshotAndFlussLogSplit) split).isLakeSplitFinished()) 
{
+                boundedSplits.add(split);
+            }
         } else {
             throw new UnsupportedOperationException(
                     String.format("The split type of %s is not supported.", 
split.getClass()));
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
index af2eeb2ad..4e1cec920 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
@@ -78,6 +78,7 @@ public class LakeSplitSerializer {
                             .orElse(LogSplit.NO_STOPPING_OFFSET));
             out.writeLong(lakeSnapshotAndFlussLogSplit.getRecordsToSkip());
             
out.writeInt(lakeSnapshotAndFlussLogSplit.getCurrentLakeSplitIndex());
+            
out.writeBoolean(lakeSnapshotAndFlussLogSplit.isLakeSplitFinished());
         } else {
             throw new UnsupportedOperationException(
                     "Unsupported split type: " + split.getClass().getName());
@@ -115,6 +116,7 @@ public class LakeSplitSerializer {
             long stoppingOffset = input.readLong();
             long recordsToSkip = input.readLong();
             int splitIndex = input.readInt();
+            boolean isLakeSplitFinished = input.readBoolean();
             return new LakeSnapshotAndFlussLogSplit(
                     tableBucket,
                     partition,
@@ -122,7 +124,8 @@ public class LakeSplitSerializer {
                     startingOffset,
                     stoppingOffset,
                     recordsToSkip,
-                    splitIndex);
+                    splitIndex,
+                    isLakeSplitFinished);
         } else {
             throw new UnsupportedOperationException("Unsupported split kind: " 
+ splitKind);
         }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
index 74d549f22..93a957614 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
@@ -40,8 +40,11 @@ public class LakeSnapshotAndFlussLogSplit extends 
SourceSplitBase {
      * lake split via this lake split index.
      */
     private int currentLakeSplitIndex;
-    /** The records to skip when reading a split. */
-    private long recordOffset;
+    /** The records to skip when reading a lake split. */
+    private long recordToSkip;
+
+    /** Whether the lake split has been finished. */
+    private boolean isLakeSplitFinished;
 
     private long startingOffset;
     private final long stoppingOffset;
@@ -52,7 +55,16 @@ public class LakeSnapshotAndFlussLogSplit extends 
SourceSplitBase {
             @Nullable List<LakeSplit> snapshotSplits,
             long startingOffset,
             long stoppingOffset) {
-        this(tableBucket, partitionName, snapshotSplits, startingOffset, 
stoppingOffset, 0, 0);
+        this(
+                tableBucket,
+                partitionName,
+                snapshotSplits,
+                startingOffset,
+                stoppingOffset,
+                0,
+                0,
+                // if lake splits is null, no lake splits, also means 
LakeSplitFinished
+                snapshotSplits == null);
     }
 
     public LakeSnapshotAndFlussLogSplit(
@@ -62,17 +74,19 @@ public class LakeSnapshotAndFlussLogSplit extends 
SourceSplitBase {
             long startingOffset,
             long stoppingOffset,
             long recordsToSkip,
-            int currentLakeSplitIndex) {
+            int currentLakeSplitIndex,
+            boolean isLakeSplitFinished) {
         super(tableBucket, partitionName);
         this.lakeSnapshotSplits = snapshotSplits;
         this.startingOffset = startingOffset;
         this.stoppingOffset = stoppingOffset;
-        this.recordOffset = recordsToSkip;
+        this.recordToSkip = recordsToSkip;
         this.currentLakeSplitIndex = currentLakeSplitIndex;
+        this.isLakeSplitFinished = isLakeSplitFinished;
     }
 
     public LakeSnapshotAndFlussLogSplit updateWithRecordsToSkip(long 
recordsToSkip) {
-        this.recordOffset = recordsToSkip;
+        this.recordToSkip = recordsToSkip;
         return this;
     }
 
@@ -86,14 +100,23 @@ public class LakeSnapshotAndFlussLogSplit extends 
SourceSplitBase {
         return this;
     }
 
+    public LakeSnapshotAndFlussLogSplit updateWithLakeSplitFinished(boolean 
isLakeSplitFinished) {
+        this.isLakeSplitFinished = isLakeSplitFinished;
+        return this;
+    }
+
     public long getRecordsToSkip() {
-        return recordOffset;
+        return recordToSkip;
     }
 
     public long getStartingOffset() {
         return startingOffset;
     }
 
+    public boolean isLakeSplitFinished() {
+        return isLakeSplitFinished;
+    }
+
     public Optional<Long> getStoppingOffset() {
         return stoppingOffset >= 0 ? Optional.of(stoppingOffset) : 
Optional.empty();
     }
@@ -130,8 +153,8 @@ public class LakeSnapshotAndFlussLogSplit extends 
SourceSplitBase {
         return "LakeSnapshotAndFlussLogSplit{"
                 + "lakeSnapshotSplits="
                 + lakeSnapshotSplits
-                + ", recordOffset="
-                + recordOffset
+                + ", recordToSkip="
+                + recordToSkip
                 + ", currentLakeSplitIndex="
                 + currentLakeSplitIndex
                 + ", startingOffset="
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotAndFlussLogSplitState.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotAndFlussLogSplitState.java
index d46b6575f..4d36ad0ea 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotAndFlussLogSplitState.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotAndFlussLogSplitState.java
@@ -30,12 +30,15 @@ public class LakeSnapshotAndFlussLogSplitState extends 
SourceSplitState {
     private int currentLakeSplitIndex;
     private long nextLogOffset;
 
+    private boolean isLakeSplitFinished;
+
     public LakeSnapshotAndFlussLogSplitState(LakeSnapshotAndFlussLogSplit 
split) {
         super(split);
         this.recordsToSkip = split.getRecordsToSkip();
         this.split = split;
         this.currentLakeSplitIndex = split.getCurrentLakeSplitIndex();
         this.nextLogOffset = split.getStartingOffset();
+        this.isLakeSplitFinished = split.isLakeSplitFinished();
     }
 
     public void setRecordsToSkip(long recordsToSkip) {
@@ -47,6 +50,8 @@ public class LakeSnapshotAndFlussLogSplitState extends 
SourceSplitState {
     }
 
     public void setNextLogOffset(long nextOffset) {
+        // if set offset, means lake splits is finished
+        isLakeSplitFinished = true;
         this.nextLogOffset = nextOffset;
     }
 
@@ -54,6 +59,7 @@ public class LakeSnapshotAndFlussLogSplitState extends 
SourceSplitState {
     public SourceSplitBase toSourceSplit() {
         return split.updateWithCurrentLakeSplitIndex(currentLakeSplitIndex)
                 .updateWithRecordsToSkip(recordsToSkip)
-                .updateWithStartingOffset(nextLogOffset);
+                .updateWithStartingOffset(nextLogOffset)
+                .updateWithLakeSplitFinished(isLakeSplitFinished);
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
index d508fb49b..282d12d80 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -44,11 +44,13 @@ import org.apache.fluss.utils.json.BucketOffsetJsonSerde;
 
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
-import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 import javax.annotation.Nullable;
 
@@ -109,34 +111,38 @@ public class TieringCommitOperator<WriteResult, 
Committable>
         this.flussTableLakeSnapshotCommitter = new 
FlussTableLakeSnapshotCommitter(flussConf);
         this.collectedTableBucketWriteResults = new HashMap<>();
         this.flussConfig = flussConf;
+        this.operatorEventGateway =
+                parameters
+                        .getOperatorEventDispatcher()
+                        
.getOperatorEventGateway(TieringSource.TIERING_SOURCE_OPERATOR_UID);
         this.setup(
                 parameters.getContainingTask(),
                 parameters.getStreamConfig(),
                 parameters.getOutput());
-        operatorEventGateway =
-                parameters
-                        .getOperatorEventDispatcher()
-                        
.getOperatorEventGateway(TieringSource.TIERING_SOURCE_OPERATOR_UID);
-    }
-
-    @Override
-    public void open() {
-        flussTableLakeSnapshotCommitter.open();
-        connection = ConnectionFactory.createConnection(flussConfig);
-        admin = connection.getAdmin();
     }
 
     @Override
-    public void initializeState(StateInitializationContext context) throws 
Exception {
-        super.initializeState(context);
+    public void setup(
+            StreamTask<?, ?> containingTask,
+            StreamConfig config,
+            Output<StreamRecord<CommittableMessage<Committable>>> output) {
+        super.setup(containingTask, config, output);
         int attemptNumber = getRuntimeContext().getAttemptNumber();
         if (attemptNumber > 0) {
+            LOG.info("Send TieringRestoreEvent");
             // attempt number is greater than zero, the job must failover
             operatorEventGateway.sendEventToCoordinator(
                     new SourceEventWrapper(new TieringRestoreEvent()));
         }
     }
 
+    @Override
+    public void open() {
+        flussTableLakeSnapshotCommitter.open();
+        connection = ConnectionFactory.createConnection(flussConfig);
+        admin = connection.getAdmin();
+    }
+
     @Override
     public void 
processElement(StreamRecord<TableBucketWriteResult<WriteResult>> streamRecord)
             throws Exception {
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
index bd6081417..45cfb2e3f 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
@@ -90,7 +90,10 @@ class LakeSplitSerializerTest {
                         "2025-08-18",
                         Collections.singletonList(LAKE_SPLIT),
                         EARLIEST_OFFSET,
-                        STOPPING_OFFSET);
+                        STOPPING_OFFSET,
+                        2,
+                        1,
+                        true);
 
         DataOutputSerializer output = new 
DataOutputSerializer(STOPPING_OFFSET);
         serializer.serialize(output, originalSplit);
@@ -105,11 +108,14 @@ class LakeSplitSerializerTest {
         assertThat(deserializedSplit instanceof 
LakeSnapshotAndFlussLogSplit).isTrue();
         LakeSnapshotAndFlussLogSplit result = (LakeSnapshotAndFlussLogSplit) 
deserializedSplit;
 
-        assertThat(tableBucket).isEqualTo(result.getTableBucket());
-        assertThat("2025-08-18").isEqualTo(result.getPartitionName());
-        
assertThat(Collections.singletonList(LAKE_SPLIT)).isEqualTo(result.getLakeSplits());
-        assertThat(EARLIEST_OFFSET).isEqualTo(result.getStartingOffset());
-        assertThat((long) 
STOPPING_OFFSET).isEqualTo(result.getStoppingOffset().get());
+        assertThat(result.getTableBucket()).isEqualTo(tableBucket);
+        assertThat(result.getPartitionName()).isEqualTo("2025-08-18");
+        
assertThat(result.getLakeSplits()).isEqualTo(Collections.singletonList(LAKE_SPLIT));
+        assertThat(result.getStartingOffset()).isEqualTo(EARLIEST_OFFSET);
+        
assertThat(result.getStoppingOffset().get()).isEqualTo(STOPPING_OFFSET);
+        assertThat(result.getCurrentLakeSplitIndex()).isEqualTo(1);
+        assertThat(result.getRecordsToSkip()).isEqualTo(2);
+        assertThat(result.isLakeSplitFinished()).isEqualTo(true);
     }
 
     @Test
@@ -149,8 +155,8 @@ class LakeSplitSerializerTest {
 
     private static class TestLakeSplit implements LakeSplit {
 
-        private int bucket;
-        private List<String> partition;
+        private final int bucket;
+        private final List<String> partition;
 
         public TestLakeSplit(int bucket, List<String> partition) {
             this.bucket = bucket;
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
index 4bf3e97cd..a5418d163 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -31,17 +31,21 @@ import org.apache.fluss.server.replica.Replica;
 import org.apache.fluss.types.DataTypes;
 
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.CollectionUtil;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import javax.annotation.Nullable;
 
+import java.io.File;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -50,6 +54,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder;
@@ -60,6 +65,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** The IT case for Flink union data in lake and fluss for primary key table. 
*/
 class FlinkUnionReadPrimaryKeyTableITCase extends FlinkUnionReadTestBase {
 
+    @TempDir public static File savepointDir;
+
     @BeforeAll
     protected static void beforeAll() {
         FlinkUnionReadTestBase.beforeAll();
@@ -592,6 +599,122 @@ class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase {
         }
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testUnionReadPrimaryKeyTableFailover(boolean isPartitioned) throws 
Exception {
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName1 =
+                "restore_pk_table_" + (isPartitioned ? "partitioned" : 
"non_partitioned");
+        String resultTableName =
+                "result_pk_table_" + (isPartitioned ? "partitioned" : 
"non_partitioned");
+
+        TablePath table1 = TablePath.of(DEFAULT_DB, tableName1);
+        TablePath resultTable = TablePath.of(DEFAULT_DB, resultTableName);
+
+        // create table and write data
+        Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+        Function<String, List<InternalRow>> rowGenerator =
+                (partition) ->
+                        Arrays.asList(
+                                row(3, "string", partition), row(30, 
"another_string", partition));
+        long tableId =
+                prepareSimplePKTable(
+                        table1,
+                        DEFAULT_BUCKET_NUM,
+                        isPartitioned,
+                        rowGenerator,
+                        bucketLogEndOffset);
+
+        // check the status of replica after synced
+        assertReplicaStatus(table1, tableId, DEFAULT_BUCKET_NUM, 
isPartitioned, bucketLogEndOffset);
+
+        // create result table
+        createSimplePkTable(resultTable, DEFAULT_BUCKET_NUM, isPartitioned, 
false);
+        // union read lake data
+        StreamTableEnvironment streamTEnv = buildSteamTEnv(null);
+        TableResult insertResult =
+                streamTEnv.executeSql(
+                        "insert into " + resultTableName + " select * from " + 
tableName1);
+
+        // will read paimon snapshot, should only +I since no change log
+        List<Row> expectedRows = new ArrayList<>();
+        if (isPartitioned) {
+            for (String partition : waitUntilPartitions(table1).values()) {
+                expectedRows.add(Row.of(3, "string", partition));
+                expectedRows.add(Row.of(30, "another_string", partition));
+            }
+        } else {
+            expectedRows =
+                    Arrays.asList(Row.of(3, "string", null), Row.of(30, 
"another_string", null));
+        }
+
+        CloseableIterator<Row> actual =
+                streamTEnv.executeSql("select * from " + 
resultTableName).collect();
+
+        if (isPartitioned) {
+            assertRowResultsIgnoreOrder(actual, expectedRows, false);
+        } else {
+            assertResultsExactOrder(actual, expectedRows, false);
+        }
+
+        // now, stop the job with save point
+        String savepointPath =
+                insertResult
+                        .getJobClient()
+                        .get()
+                        .stopWithSavepoint(
+                                false,
+                                savepointDir.getAbsolutePath(),
+                                SavepointFormatType.CANONICAL)
+                        .get();
+
+        // re buildSteamTEnv
+        streamTEnv = buildSteamTEnv(savepointPath);
+        insertResult =
+                streamTEnv.executeSql(
+                        "insert into " + resultTableName + " select * from " + 
tableName1);
+
+        // write some log data again
+        // write a row again
+        if (isPartitioned) {
+            Map<Long, String> partitionNameById = waitUntilPartitions(table1);
+            for (String partition : partitionNameById.values()) {
+                writeRows(
+                        table1,
+                        Collections.singletonList(row(30, "another_string_2", 
partition)),
+                        false);
+            }
+        } else {
+            writeRows(table1, Collections.singletonList(row(30, 
"another_string_2", null)), false);
+        }
+
+        // should generate -U & +U
+        List<Row> expectedRows2 = new ArrayList<>();
+        if (isPartitioned) {
+            for (String partition : waitUntilPartitions(table1).values()) {
+                expectedRows2.add(
+                        Row.ofKind(RowKind.UPDATE_BEFORE, 30, 
"another_string", partition));
+                expectedRows2.add(
+                        Row.ofKind(RowKind.UPDATE_AFTER, 30, 
"another_string_2", partition));
+            }
+        } else {
+            expectedRows2.add(Row.ofKind(RowKind.UPDATE_BEFORE, 30, 
"another_string", null));
+            expectedRows2.add(Row.ofKind(RowKind.UPDATE_AFTER, 30, 
"another_string_2", null));
+        }
+
+        if (isPartitioned) {
+            assertRowResultsIgnoreOrder(actual, expectedRows2, true);
+        } else {
+            assertResultsExactOrder(actual, expectedRows2, true);
+        }
+
+        // cancel jobs
+        insertResult.getJobClient().get().cancel().get();
+        jobClient.cancel().get();
+    }
+
     private List<Row> sortedRows(List<Row> rows) {
         rows.sort(Comparator.comparing(Row::toString));
         return rows;
@@ -634,6 +757,37 @@ class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase {
         return tableId;
     }
 
+    private long prepareSimplePKTable(
+            TablePath tablePath,
+            int bucketNum,
+            boolean isPartitioned,
+            Function<String, List<InternalRow>> rowGenerator,
+            Map<TableBucket, Long> bucketLogEndOffset)
+            throws Exception {
+        long tableId = createSimplePkTable(tablePath, bucketNum, 
isPartitioned, true);
+        if (isPartitioned) {
+            Map<Long, String> partitionNameById = 
waitUntilPartitions(tablePath);
+            for (String partition : partitionNameById.values()) {
+                for (int i = 0; i < 2; i++) {
+                    List<InternalRow> rows = rowGenerator.apply(partition);
+                    // write records
+                    writeRows(tablePath, rows, false);
+                }
+            }
+            for (Long partitionId : partitionNameById.keySet()) {
+                bucketLogEndOffset.putAll(getBucketLogEndOffset(tableId, 
bucketNum, partitionId));
+            }
+        } else {
+            for (int i = 0; i < 2; i++) {
+                List<InternalRow> rows = rowGenerator.apply(null);
+                // write records
+                writeRows(tablePath, rows, false);
+            }
+            bucketLogEndOffset.putAll(getBucketLogEndOffset(tableId, 
bucketNum, null));
+        }
+        return tableId;
+    }
+
     private Map<TableBucket, Long> getBucketLogEndOffset(
             long tableId, int bucketNum, Long partitionId) {
         Map<TableBucket, Long> bucketLogEndOffsets = new HashMap<>();
@@ -705,20 +859,47 @@ class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase {
                         .column("c15", DataTypes.BINARY(4))
                         .column("c16", DataTypes.STRING());
 
-        TableDescriptor.Builder tableBuilder =
-                TableDescriptor.builder()
-                        .distributedBy(bucketNum)
-                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
-                        .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
+        return createPkTable(tablePath, bucketNum, isPartitioned, true, 
schemaBuilder, "c4", "c16");
+    }
+
+    protected long createSimplePkTable(
+            TablePath tablePath, int bucketNum, boolean isPartitioned, boolean 
lakeEnabled)
+            throws Exception {
+        Schema.Builder schemaBuilder =
+                Schema.newBuilder()
+                        .column("c1", DataTypes.INT())
+                        .column("c2", DataTypes.STRING())
+                        .column("c3", DataTypes.STRING());
+
+        return createPkTable(
+                tablePath, bucketNum, isPartitioned, lakeEnabled, 
schemaBuilder, "c1", "c3");
+    }
+
+    protected long createPkTable(
+            TablePath tablePath,
+            int bucketNum,
+            boolean isPartitioned,
+            boolean lakeEnabled,
+            Schema.Builder schemaBuilder,
+            String primaryKey,
+            String partitionKeys)
+            throws Exception {
+
+        TableDescriptor.Builder tableBuilder = 
TableDescriptor.builder().distributedBy(bucketNum);
+        if (lakeEnabled) {
+            tableBuilder
+                    .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
+                    .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
+        }
 
         if (isPartitioned) {
             tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, 
true);
-            tableBuilder.partitionedBy("c16");
-            schemaBuilder.primaryKey("c4", "c16");
+            tableBuilder.partitionedBy(partitionKeys);
+            schemaBuilder.primaryKey(primaryKey, partitionKeys);
             tableBuilder.property(
                     ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, 
AutoPartitionTimeUnit.YEAR);
         } else {
-            schemaBuilder.primaryKey("c4");
+            schemaBuilder.primaryKey(primaryKey);
         }
         tableBuilder.schema(schemaBuilder.build());
         return createTable(tablePath, tableBuilder.build());
@@ -747,7 +928,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase {
         writeRows(tablePath, rows, false);
     }
 
-    private List<InternalRow> generateKvRowsFullType(@Nullable String 
partition) {
+    private static List<InternalRow> generateKvRowsFullType(@Nullable String 
partition) {
         return Arrays.asList(
                 row(
                         false,

Reply via email to