This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 4c2ddc390 [flink] Add restore test for streaming union read pk table
(#1674)
4c2ddc390 is described below
commit 4c2ddc39087a2c9189f83ede1f3f6871710b6769
Author: CaoZhen <[email protected]>
AuthorDate: Thu Sep 11 19:53:00 2025 +0800
[flink] Add restore test for streaming union read pk table (#1674)
---------
Co-authored-by: luoyuxia <[email protected]>
---
.../fluss/flink/lake/LakeRecordRecordEmitter.java | 2 -
.../fluss/flink/lake/LakeSplitReaderGenerator.java | 5 +-
.../fluss/flink/lake/LakeSplitSerializer.java | 5 +-
.../lake/split/LakeSnapshotAndFlussLogSplit.java | 41 ++++-
.../state/LakeSnapshotAndFlussLogSplitState.java | 8 +-
.../tiering/committer/TieringCommitOperator.java | 34 ++--
.../fluss/flink/lake/LakeSplitSerializerTest.java | 22 ++-
.../flink/FlinkUnionReadPrimaryKeyTableITCase.java | 199 ++++++++++++++++++++-
8 files changed, 271 insertions(+), 45 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeRecordRecordEmitter.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeRecordRecordEmitter.java
index 031444501..ddd492fec 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeRecordRecordEmitter.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeRecordRecordEmitter.java
@@ -57,8 +57,6 @@ public class LakeRecordRecordEmitter<OUT> {
}
ScanRecord scanRecord = recordAndPos.record();
- // todo: may need a state to mark snapshot phase is finished
- // just like what we do for HybridSnapshotLogSplitState
if (scanRecord.logOffset() >= 0) {
// record is with a valid offset, means it's in incremental
phase,
// update the log offset
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
index aa3eef17f..39b4e08d4 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
@@ -55,7 +55,10 @@ public class LakeSplitReaderGenerator {
if (split instanceof LakeSnapshotSplit) {
boundedSplits.add(split);
} else if (split instanceof LakeSnapshotAndFlussLogSplit) {
- boundedSplits.add(split);
+ // lake split not finished, add to it
+ if (!((LakeSnapshotAndFlussLogSplit) split).isLakeSplitFinished())
{
+ boundedSplits.add(split);
+ }
} else {
throw new UnsupportedOperationException(
String.format("The split type of %s is not supported.",
split.getClass()));
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
index af2eeb2ad..4e1cec920 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
@@ -78,6 +78,7 @@ public class LakeSplitSerializer {
.orElse(LogSplit.NO_STOPPING_OFFSET));
out.writeLong(lakeSnapshotAndFlussLogSplit.getRecordsToSkip());
out.writeInt(lakeSnapshotAndFlussLogSplit.getCurrentLakeSplitIndex());
+
out.writeBoolean(lakeSnapshotAndFlussLogSplit.isLakeSplitFinished());
} else {
throw new UnsupportedOperationException(
"Unsupported split type: " + split.getClass().getName());
@@ -115,6 +116,7 @@ public class LakeSplitSerializer {
long stoppingOffset = input.readLong();
long recordsToSkip = input.readLong();
int splitIndex = input.readInt();
+ boolean isLakeSplitFinished = input.readBoolean();
return new LakeSnapshotAndFlussLogSplit(
tableBucket,
partition,
@@ -122,7 +124,8 @@ public class LakeSplitSerializer {
startingOffset,
stoppingOffset,
recordsToSkip,
- splitIndex);
+ splitIndex,
+ isLakeSplitFinished);
} else {
throw new UnsupportedOperationException("Unsupported split kind: "
+ splitKind);
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
index 74d549f22..93a957614 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
@@ -40,8 +40,11 @@ public class LakeSnapshotAndFlussLogSplit extends
SourceSplitBase {
* lake split via this lake split index.
*/
private int currentLakeSplitIndex;
- /** The records to skip when reading a split. */
- private long recordOffset;
+ /** The records to skip when reading a lake split. */
+ private long recordToSkip;
+
+ /** Whether the lake split has been finished. */
+ private boolean isLakeSplitFinished;
private long startingOffset;
private final long stoppingOffset;
@@ -52,7 +55,16 @@ public class LakeSnapshotAndFlussLogSplit extends
SourceSplitBase {
@Nullable List<LakeSplit> snapshotSplits,
long startingOffset,
long stoppingOffset) {
- this(tableBucket, partitionName, snapshotSplits, startingOffset,
stoppingOffset, 0, 0);
+ this(
+ tableBucket,
+ partitionName,
+ snapshotSplits,
+ startingOffset,
+ stoppingOffset,
+ 0,
+ 0,
+ // if lake splits is null, no lake splits, also means
LakeSplitFinished
+ snapshotSplits == null);
}
public LakeSnapshotAndFlussLogSplit(
@@ -62,17 +74,19 @@ public class LakeSnapshotAndFlussLogSplit extends
SourceSplitBase {
long startingOffset,
long stoppingOffset,
long recordsToSkip,
- int currentLakeSplitIndex) {
+ int currentLakeSplitIndex,
+ boolean isLakeSplitFinished) {
super(tableBucket, partitionName);
this.lakeSnapshotSplits = snapshotSplits;
this.startingOffset = startingOffset;
this.stoppingOffset = stoppingOffset;
- this.recordOffset = recordsToSkip;
+ this.recordToSkip = recordsToSkip;
this.currentLakeSplitIndex = currentLakeSplitIndex;
+ this.isLakeSplitFinished = isLakeSplitFinished;
}
public LakeSnapshotAndFlussLogSplit updateWithRecordsToSkip(long
recordsToSkip) {
- this.recordOffset = recordsToSkip;
+ this.recordToSkip = recordsToSkip;
return this;
}
@@ -86,14 +100,23 @@ public class LakeSnapshotAndFlussLogSplit extends
SourceSplitBase {
return this;
}
+ public LakeSnapshotAndFlussLogSplit updateWithLakeSplitFinished(boolean
isLakeSplitFinished) {
+ this.isLakeSplitFinished = isLakeSplitFinished;
+ return this;
+ }
+
public long getRecordsToSkip() {
- return recordOffset;
+ return recordToSkip;
}
public long getStartingOffset() {
return startingOffset;
}
+ public boolean isLakeSplitFinished() {
+ return isLakeSplitFinished;
+ }
+
public Optional<Long> getStoppingOffset() {
return stoppingOffset >= 0 ? Optional.of(stoppingOffset) :
Optional.empty();
}
@@ -130,8 +153,8 @@ public class LakeSnapshotAndFlussLogSplit extends
SourceSplitBase {
return "LakeSnapshotAndFlussLogSplit{"
+ "lakeSnapshotSplits="
+ lakeSnapshotSplits
- + ", recordOffset="
- + recordOffset
+ + ", recordToSkip="
+ + recordToSkip
+ ", currentLakeSplitIndex="
+ currentLakeSplitIndex
+ ", startingOffset="
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotAndFlussLogSplitState.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotAndFlussLogSplitState.java
index d46b6575f..4d36ad0ea 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotAndFlussLogSplitState.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotAndFlussLogSplitState.java
@@ -30,12 +30,15 @@ public class LakeSnapshotAndFlussLogSplitState extends
SourceSplitState {
private int currentLakeSplitIndex;
private long nextLogOffset;
+ private boolean isLakeSplitFinished;
+
public LakeSnapshotAndFlussLogSplitState(LakeSnapshotAndFlussLogSplit
split) {
super(split);
this.recordsToSkip = split.getRecordsToSkip();
this.split = split;
this.currentLakeSplitIndex = split.getCurrentLakeSplitIndex();
this.nextLogOffset = split.getStartingOffset();
+ this.isLakeSplitFinished = split.isLakeSplitFinished();
}
public void setRecordsToSkip(long recordsToSkip) {
@@ -47,6 +50,8 @@ public class LakeSnapshotAndFlussLogSplitState extends
SourceSplitState {
}
public void setNextLogOffset(long nextOffset) {
+ // if set offset, means lake splits is finished
+ isLakeSplitFinished = true;
this.nextLogOffset = nextOffset;
}
@@ -54,6 +59,7 @@ public class LakeSnapshotAndFlussLogSplitState extends
SourceSplitState {
public SourceSplitBase toSourceSplit() {
return split.updateWithCurrentLakeSplitIndex(currentLakeSplitIndex)
.updateWithRecordsToSkip(recordsToSkip)
- .updateWithStartingOffset(nextLogOffset);
+ .updateWithStartingOffset(nextLogOffset)
+ .updateWithLakeSplitFinished(isLakeSplitFinished);
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
index d508fb49b..282d12d80 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -44,11 +44,13 @@ import org.apache.fluss.utils.json.BucketOffsetJsonSerde;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
-import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
import javax.annotation.Nullable;
@@ -109,34 +111,38 @@ public class TieringCommitOperator<WriteResult,
Committable>
this.flussTableLakeSnapshotCommitter = new
FlussTableLakeSnapshotCommitter(flussConf);
this.collectedTableBucketWriteResults = new HashMap<>();
this.flussConfig = flussConf;
+ this.operatorEventGateway =
+ parameters
+ .getOperatorEventDispatcher()
+
.getOperatorEventGateway(TieringSource.TIERING_SOURCE_OPERATOR_UID);
this.setup(
parameters.getContainingTask(),
parameters.getStreamConfig(),
parameters.getOutput());
- operatorEventGateway =
- parameters
- .getOperatorEventDispatcher()
-
.getOperatorEventGateway(TieringSource.TIERING_SOURCE_OPERATOR_UID);
- }
-
- @Override
- public void open() {
- flussTableLakeSnapshotCommitter.open();
- connection = ConnectionFactory.createConnection(flussConfig);
- admin = connection.getAdmin();
}
@Override
- public void initializeState(StateInitializationContext context) throws
Exception {
- super.initializeState(context);
+ public void setup(
+ StreamTask<?, ?> containingTask,
+ StreamConfig config,
+ Output<StreamRecord<CommittableMessage<Committable>>> output) {
+ super.setup(containingTask, config, output);
int attemptNumber = getRuntimeContext().getAttemptNumber();
if (attemptNumber > 0) {
+ LOG.info("Send TieringRestoreEvent");
// attempt number is greater than zero, the job must failover
operatorEventGateway.sendEventToCoordinator(
new SourceEventWrapper(new TieringRestoreEvent()));
}
}
+ @Override
+ public void open() {
+ flussTableLakeSnapshotCommitter.open();
+ connection = ConnectionFactory.createConnection(flussConfig);
+ admin = connection.getAdmin();
+ }
+
@Override
public void
processElement(StreamRecord<TableBucketWriteResult<WriteResult>> streamRecord)
throws Exception {
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
index bd6081417..45cfb2e3f 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
@@ -90,7 +90,10 @@ class LakeSplitSerializerTest {
"2025-08-18",
Collections.singletonList(LAKE_SPLIT),
EARLIEST_OFFSET,
- STOPPING_OFFSET);
+ STOPPING_OFFSET,
+ 2,
+ 1,
+ true);
DataOutputSerializer output = new
DataOutputSerializer(STOPPING_OFFSET);
serializer.serialize(output, originalSplit);
@@ -105,11 +108,14 @@ class LakeSplitSerializerTest {
assertThat(deserializedSplit instanceof
LakeSnapshotAndFlussLogSplit).isTrue();
LakeSnapshotAndFlussLogSplit result = (LakeSnapshotAndFlussLogSplit)
deserializedSplit;
- assertThat(tableBucket).isEqualTo(result.getTableBucket());
- assertThat("2025-08-18").isEqualTo(result.getPartitionName());
-
assertThat(Collections.singletonList(LAKE_SPLIT)).isEqualTo(result.getLakeSplits());
- assertThat(EARLIEST_OFFSET).isEqualTo(result.getStartingOffset());
- assertThat((long)
STOPPING_OFFSET).isEqualTo(result.getStoppingOffset().get());
+ assertThat(result.getTableBucket()).isEqualTo(tableBucket);
+ assertThat(result.getPartitionName()).isEqualTo("2025-08-18");
+
assertThat(result.getLakeSplits()).isEqualTo(Collections.singletonList(LAKE_SPLIT));
+ assertThat(result.getStartingOffset()).isEqualTo(EARLIEST_OFFSET);
+
assertThat(result.getStoppingOffset().get()).isEqualTo(STOPPING_OFFSET);
+ assertThat(result.getCurrentLakeSplitIndex()).isEqualTo(1);
+ assertThat(result.getRecordsToSkip()).isEqualTo(2);
+ assertThat(result.isLakeSplitFinished()).isEqualTo(true);
}
@Test
@@ -149,8 +155,8 @@ class LakeSplitSerializerTest {
private static class TestLakeSplit implements LakeSplit {
- private int bucket;
- private List<String> partition;
+ private final int bucket;
+ private final List<String> partition;
public TestLakeSplit(int bucket, List<String> partition) {
this.bucket = bucket;
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
index 4bf3e97cd..a5418d163 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -31,17 +31,21 @@ import org.apache.fluss.server.replica.Replica;
import org.apache.fluss.types.DataTypes;
import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import javax.annotation.Nullable;
+import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@@ -50,6 +54,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
import java.util.stream.Collectors;
import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder;
@@ -60,6 +65,8 @@ import static org.assertj.core.api.Assertions.assertThat;
/** The IT case for Flink union data in lake and fluss for primary key table.
*/
class FlinkUnionReadPrimaryKeyTableITCase extends FlinkUnionReadTestBase {
+ @TempDir public static File savepointDir;
+
@BeforeAll
protected static void beforeAll() {
FlinkUnionReadTestBase.beforeAll();
@@ -592,6 +599,122 @@ class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase {
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testUnionReadPrimaryKeyTableFailover(boolean isPartitioned) throws
Exception {
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ String tableName1 =
+ "restore_pk_table_" + (isPartitioned ? "partitioned" :
"non_partitioned");
+ String resultTableName =
+ "result_pk_table_" + (isPartitioned ? "partitioned" :
"non_partitioned");
+
+ TablePath table1 = TablePath.of(DEFAULT_DB, tableName1);
+ TablePath resultTable = TablePath.of(DEFAULT_DB, resultTableName);
+
+ // create table and write data
+ Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+ Function<String, List<InternalRow>> rowGenerator =
+ (partition) ->
+ Arrays.asList(
+ row(3, "string", partition), row(30,
"another_string", partition));
+ long tableId =
+ prepareSimplePKTable(
+ table1,
+ DEFAULT_BUCKET_NUM,
+ isPartitioned,
+ rowGenerator,
+ bucketLogEndOffset);
+
+ // check the status of replica after synced
+ assertReplicaStatus(table1, tableId, DEFAULT_BUCKET_NUM,
isPartitioned, bucketLogEndOffset);
+
+ // create result table
+ createSimplePkTable(resultTable, DEFAULT_BUCKET_NUM, isPartitioned,
false);
+ // union read lake data
+ StreamTableEnvironment streamTEnv = buildSteamTEnv(null);
+ TableResult insertResult =
+ streamTEnv.executeSql(
+ "insert into " + resultTableName + " select * from " +
tableName1);
+
+ // will read paimon snapshot, should only +I since no change log
+ List<Row> expectedRows = new ArrayList<>();
+ if (isPartitioned) {
+ for (String partition : waitUntilPartitions(table1).values()) {
+ expectedRows.add(Row.of(3, "string", partition));
+ expectedRows.add(Row.of(30, "another_string", partition));
+ }
+ } else {
+ expectedRows =
+ Arrays.asList(Row.of(3, "string", null), Row.of(30,
"another_string", null));
+ }
+
+ CloseableIterator<Row> actual =
+ streamTEnv.executeSql("select * from " +
resultTableName).collect();
+
+ if (isPartitioned) {
+ assertRowResultsIgnoreOrder(actual, expectedRows, false);
+ } else {
+ assertResultsExactOrder(actual, expectedRows, false);
+ }
+
+ // now, stop the job with save point
+ String savepointPath =
+ insertResult
+ .getJobClient()
+ .get()
+ .stopWithSavepoint(
+ false,
+ savepointDir.getAbsolutePath(),
+ SavepointFormatType.CANONICAL)
+ .get();
+
+ // re buildSteamTEnv
+ streamTEnv = buildSteamTEnv(savepointPath);
+ insertResult =
+ streamTEnv.executeSql(
+ "insert into " + resultTableName + " select * from " +
tableName1);
+
+ // write some log data again
+ // write a row again
+ if (isPartitioned) {
+ Map<Long, String> partitionNameById = waitUntilPartitions(table1);
+ for (String partition : partitionNameById.values()) {
+ writeRows(
+ table1,
+ Collections.singletonList(row(30, "another_string_2",
partition)),
+ false);
+ }
+ } else {
+ writeRows(table1, Collections.singletonList(row(30,
"another_string_2", null)), false);
+ }
+
+ // should generate -U & +U
+ List<Row> expectedRows2 = new ArrayList<>();
+ if (isPartitioned) {
+ for (String partition : waitUntilPartitions(table1).values()) {
+ expectedRows2.add(
+ Row.ofKind(RowKind.UPDATE_BEFORE, 30,
"another_string", partition));
+ expectedRows2.add(
+ Row.ofKind(RowKind.UPDATE_AFTER, 30,
"another_string_2", partition));
+ }
+ } else {
+ expectedRows2.add(Row.ofKind(RowKind.UPDATE_BEFORE, 30,
"another_string", null));
+ expectedRows2.add(Row.ofKind(RowKind.UPDATE_AFTER, 30,
"another_string_2", null));
+ }
+
+ if (isPartitioned) {
+ assertRowResultsIgnoreOrder(actual, expectedRows2, true);
+ } else {
+ assertResultsExactOrder(actual, expectedRows2, true);
+ }
+
+ // cancel jobs
+ insertResult.getJobClient().get().cancel().get();
+ jobClient.cancel().get();
+ }
+
private List<Row> sortedRows(List<Row> rows) {
rows.sort(Comparator.comparing(Row::toString));
return rows;
@@ -634,6 +757,37 @@ class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase {
return tableId;
}
+ private long prepareSimplePKTable(
+ TablePath tablePath,
+ int bucketNum,
+ boolean isPartitioned,
+ Function<String, List<InternalRow>> rowGenerator,
+ Map<TableBucket, Long> bucketLogEndOffset)
+ throws Exception {
+ long tableId = createSimplePkTable(tablePath, bucketNum,
isPartitioned, true);
+ if (isPartitioned) {
+ Map<Long, String> partitionNameById =
waitUntilPartitions(tablePath);
+ for (String partition : partitionNameById.values()) {
+ for (int i = 0; i < 2; i++) {
+ List<InternalRow> rows = rowGenerator.apply(partition);
+ // write records
+ writeRows(tablePath, rows, false);
+ }
+ }
+ for (Long partitionId : partitionNameById.keySet()) {
+ bucketLogEndOffset.putAll(getBucketLogEndOffset(tableId,
bucketNum, partitionId));
+ }
+ } else {
+ for (int i = 0; i < 2; i++) {
+ List<InternalRow> rows = rowGenerator.apply(null);
+ // write records
+ writeRows(tablePath, rows, false);
+ }
+ bucketLogEndOffset.putAll(getBucketLogEndOffset(tableId,
bucketNum, null));
+ }
+ return tableId;
+ }
+
private Map<TableBucket, Long> getBucketLogEndOffset(
long tableId, int bucketNum, Long partitionId) {
Map<TableBucket, Long> bucketLogEndOffsets = new HashMap<>();
@@ -705,20 +859,47 @@ class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase {
.column("c15", DataTypes.BINARY(4))
.column("c16", DataTypes.STRING());
- TableDescriptor.Builder tableBuilder =
- TableDescriptor.builder()
- .distributedBy(bucketNum)
- .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true")
- .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500));
+ return createPkTable(tablePath, bucketNum, isPartitioned, true,
schemaBuilder, "c4", "c16");
+ }
+
+ protected long createSimplePkTable(
+ TablePath tablePath, int bucketNum, boolean isPartitioned, boolean
lakeEnabled)
+ throws Exception {
+ Schema.Builder schemaBuilder =
+ Schema.newBuilder()
+ .column("c1", DataTypes.INT())
+ .column("c2", DataTypes.STRING())
+ .column("c3", DataTypes.STRING());
+
+ return createPkTable(
+ tablePath, bucketNum, isPartitioned, lakeEnabled,
schemaBuilder, "c1", "c3");
+ }
+
+ protected long createPkTable(
+ TablePath tablePath,
+ int bucketNum,
+ boolean isPartitioned,
+ boolean lakeEnabled,
+ Schema.Builder schemaBuilder,
+ String primaryKey,
+ String partitionKeys)
+ throws Exception {
+
+ TableDescriptor.Builder tableBuilder =
TableDescriptor.builder().distributedBy(bucketNum);
+ if (lakeEnabled) {
+ tableBuilder
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true")
+ .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500));
+ }
if (isPartitioned) {
tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED,
true);
- tableBuilder.partitionedBy("c16");
- schemaBuilder.primaryKey("c4", "c16");
+ tableBuilder.partitionedBy(partitionKeys);
+ schemaBuilder.primaryKey(primaryKey, partitionKeys);
tableBuilder.property(
ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT,
AutoPartitionTimeUnit.YEAR);
} else {
- schemaBuilder.primaryKey("c4");
+ schemaBuilder.primaryKey(primaryKey);
}
tableBuilder.schema(schemaBuilder.build());
return createTable(tablePath, tableBuilder.build());
@@ -747,7 +928,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase {
writeRows(tablePath, rows, false);
}
- private List<InternalRow> generateKvRowsFullType(@Nullable String
partition) {
+ private static List<InternalRow> generateKvRowsFullType(@Nullable String
partition) {
return Arrays.asList(
row(
false,