This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 5075fec2 [paimon] Store lake synchronized bucket offsets in Paimon
snapshot property (#1405)
5075fec2 is described below
commit 5075fec2d0b5ef64236d4777188bbe0ed5e0519c
Author: xx789 <[email protected]>
AuthorDate: Sun Aug 3 11:01:14 2025 +0800
[paimon] Store lake synchronized bucket offsets in Paimon snapshot property
(#1405)
---
.../alibaba/fluss/lake/committer/BucketOffset.java | 79 +++++++++++++
.../lake/committer/CommittedLakeSnapshot.java | 10 +-
.../fluss/lake/committer/LakeCommitter.java | 5 +-
.../fluss/utils/json/BucketOffsetJsonSerde.java | 72 ++++++++++++
.../utils/json/BucketOffsetJsonSerdeTest.java | 43 +++++++
.../committer/FlussTableLakeSnapshotCommitter.java | 10 +-
.../tiering/committer/TieringCommitOperator.java | 91 +++++++++++++--
.../flink/tiering/TestingLakeTieringFactory.java | 4 +-
.../FlussTableLakeSnapshotCommitterTest.java | 10 +-
.../committer/TieringCommitOperatorTest.java | 14 +--
.../lake/paimon/tiering/PaimonLakeCommitter.java | 125 ++++++++-------------
.../testutils/FlinkPaimonTieringTestBase.java | 15 +++
.../lake/paimon/tiering/PaimonTieringITCase.java | 26 +++++
.../lake/paimon/tiering/PaimonTieringTest.java | 83 +++++++++-----
14 files changed, 446 insertions(+), 141 deletions(-)
diff --git
a/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/BucketOffset.java
b/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/BucketOffset.java
new file mode 100644
index 00000000..d90c4805
--- /dev/null
+++
b/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/BucketOffset.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.committer;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** The bucket offset information to be expected to be stored in Lake's
snapshot property. */
+public class BucketOffset implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ public static final String FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY =
"fluss-offsets";
+
+ private final long logOffset;
+ private final int bucket;
+ private final @Nullable Long partitionId;
+ private final @Nullable String partitionName;
+
+ public BucketOffset(
+ long logOffset,
+ int bucket,
+ @Nullable Long partitionId,
+ @Nullable String partitionName) {
+ this.logOffset = logOffset;
+ this.bucket = bucket;
+ this.partitionId = partitionId;
+ this.partitionName = partitionName;
+ }
+
+ public long getLogOffset() {
+ return logOffset;
+ }
+
+ public int getBucket() {
+ return bucket;
+ }
+
+ @Nullable
+ public Long getPartitionId() {
+ return partitionId;
+ }
+
+ @Nullable
+ public String getPartitionName() {
+ return partitionName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BucketOffset that = (BucketOffset) o;
+ return bucket == that.bucket
+ && logOffset == that.logOffset
+ && Objects.equals(partitionId, that.partitionId)
+ && Objects.equals(partitionName, that.partitionName);
+ }
+}
diff --git
a/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/CommittedLakeSnapshot.java
b/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/CommittedLakeSnapshot.java
index 6eb48bda..a0d94b14 100644
---
a/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/CommittedLakeSnapshot.java
+++
b/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/CommittedLakeSnapshot.java
@@ -30,9 +30,9 @@ import java.util.Objects;
public class CommittedLakeSnapshot {
private final long lakeSnapshotId;
- // <partition_name, bucket> -> log offset, partition_name will be null if
it's not a
+ // <partition_id, bucket> -> log offset, partition_id will be null if it's
not a
// partition bucket
- private final Map<Tuple2<String, Integer>, Long> logEndOffsets = new
HashMap<>();
+ private final Map<Tuple2<Long, Integer>, Long> logEndOffsets = new
HashMap<>();
public CommittedLakeSnapshot(long lakeSnapshotId) {
this.lakeSnapshotId = lakeSnapshotId;
@@ -46,11 +46,11 @@ public class CommittedLakeSnapshot {
logEndOffsets.put(Tuple2.of(null, bucketId), offset);
}
- public void addPartitionBucket(String partitionName, int bucketId, long
offset) {
- logEndOffsets.put(Tuple2.of(partitionName, bucketId), offset);
+ public void addPartitionBucket(Long partitionId, int bucketId, long
offset) {
+ logEndOffsets.put(Tuple2.of(partitionId, bucketId), offset);
}
- public Map<Tuple2<String, Integer>, Long> getLogEndOffsets() {
+ public Map<Tuple2<Long, Integer>, Long> getLogEndOffsets() {
return logEndOffsets;
}
diff --git
a/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/LakeCommitter.java
b/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/LakeCommitter.java
index d49ee689..77b881dc 100644
---
a/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/LakeCommitter.java
+++
b/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/LakeCommitter.java
@@ -23,6 +23,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
/**
* The LakeCommitter interface for committing write results. It extends the
AutoCloseable interface
@@ -48,10 +49,12 @@ public interface LakeCommitter<WriteResult, CommittableT>
extends AutoCloseable
* Commits the given committable object.
*
* @param committable the committable object
+ * @param snapshotProperties the properties that lake supported to store
in snapshot
* @return the committed snapshot ID
* @throws IOException if an I/O error occurs
*/
- long commit(CommittableT committable) throws IOException;
+ long commit(CommittableT committable, Map<String, String>
snapshotProperties)
+ throws IOException;
/**
* Aborts the given committable object.
diff --git
a/fluss-common/src/main/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerde.java
b/fluss-common/src/main/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerde.java
new file mode 100644
index 00000000..f5fa7120
--- /dev/null
+++
b/fluss-common/src/main/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerde.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.utils.json;
+
+import com.alibaba.fluss.lake.committer.BucketOffset;
+import
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+import java.io.IOException;
+
+/** Json serializer and deserializer for {@link BucketOffset}. */
+public class BucketOffsetJsonSerde
+ implements JsonSerializer<BucketOffset>,
JsonDeserializer<BucketOffset> {
+
+ public static final BucketOffsetJsonSerde INSTANCE = new
BucketOffsetJsonSerde();
+ private static final String PARTITION_ID = "partition_id";
+ private static final String BUCKET_ID = "bucket_id";
+ private static final String PARTITION_NAME = "partition_name";
+ private static final String LOG_OFFSET = "log_offset";
+
+ @Override
+ public BucketOffset deserialize(JsonNode node) {
+ JsonNode partitionIdNode = node.get(PARTITION_ID);
+ Long partitionId = partitionIdNode == null ? null :
partitionIdNode.asLong();
+ int bucketId = node.get(BUCKET_ID).asInt();
+
+ // deserialize partition name
+ JsonNode partitionNameNode = node.get(PARTITION_NAME);
+ String partitionName = partitionNameNode == null ? null :
partitionNameNode.asText();
+
+ // deserialize log offset
+ long logOffset = node.get(LOG_OFFSET).asLong();
+
+ return new BucketOffset(logOffset, bucketId, partitionId,
partitionName);
+ }
+
+ @Override
+ public void serialize(BucketOffset bucketOffset, JsonGenerator generator)
throws IOException {
+ generator.writeStartObject();
+
+ // write partition id
+ if (bucketOffset.getPartitionId() != null) {
+ generator.writeNumberField(PARTITION_ID,
bucketOffset.getPartitionId());
+ }
+ generator.writeNumberField(BUCKET_ID, bucketOffset.getBucket());
+
+ // serialize partition name
+ if (bucketOffset.getPartitionName() != null) {
+ generator.writeStringField(PARTITION_NAME,
bucketOffset.getPartitionName());
+ }
+
+ // serialize bucket offset
+ generator.writeNumberField(LOG_OFFSET, bucketOffset.getLogOffset());
+
+ generator.writeEndObject();
+ }
+}
diff --git
a/fluss-common/src/test/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerdeTest.java
b/fluss-common/src/test/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerdeTest.java
new file mode 100644
index 00000000..7f267b16
--- /dev/null
+++
b/fluss-common/src/test/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerdeTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.utils.json;
+
+import com.alibaba.fluss.lake.committer.BucketOffset;
+
+/** Test for {@link BucketOffset}. */
+public class BucketOffsetJsonSerdeTest extends JsonSerdeTestBase<BucketOffset>
{
+
+ BucketOffsetJsonSerdeTest() {
+ super(BucketOffsetJsonSerde.INSTANCE);
+ }
+
+ @Override
+ protected BucketOffset[] createObjects() {
+ return new BucketOffset[] {
+ new BucketOffset(10, 1, 1L, "eu-central$2023$12"), new
BucketOffset(20, 2, null, null)
+ };
+ }
+
+ @Override
+ protected String[] expectedJsons() {
+ return new String[] {
+
"{\"partition_id\":1,\"bucket_id\":1,\"partition_name\":\"eu-central$2023$12\",\"log_offset\":10}",
+ "{\"bucket_id\":2,\"log_offset\":20}"
+ };
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
index 5c11027f..77a08e83 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
@@ -85,18 +85,14 @@ public class FlussTableLakeSnapshotCommitter implements
AutoCloseable {
// construct lake snapshot to commit to Fluss
FlussTableLakeSnapshot flussTableLakeSnapshot =
new FlussTableLakeSnapshot(tableId,
committedLakeSnapshot.getLakeSnapshotId());
- for (Map.Entry<Tuple2<String, Integer>, Long> entry :
+ for (Map.Entry<Tuple2<Long, Integer>, Long> entry :
committedLakeSnapshot.getLogEndOffsets().entrySet()) {
- Tuple2<String, Integer> partitionBucket = entry.getKey();
+ Tuple2<Long, Integer> partitionBucket = entry.getKey();
TableBucket tableBucket;
if (partitionBucket.f0 == null) {
tableBucket = new TableBucket(tableId, partitionBucket.f1);
} else {
- String partitionName = partitionBucket.f0;
- // todo: remove this
- // in paimon 1.12, we can store this offsets(including
partitionId) into snapshot
- // properties, then, we won't need to get partitionId from
partition name
- Long partitionId = partitionIdByName.get(partitionName);
+ Long partitionId = partitionBucket.f0;
if (partitionId != null) {
tableBucket = new TableBucket(tableId, partitionId,
partitionBucket.f1);
} else {
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java
index 6f6720a9..97b65efc 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -20,12 +20,14 @@ package com.alibaba.fluss.flink.tiering.committer;
import com.alibaba.fluss.client.Connection;
import com.alibaba.fluss.client.ConnectionFactory;
import com.alibaba.fluss.client.admin.Admin;
+import com.alibaba.fluss.client.metadata.LakeSnapshot;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.exception.LakeTableSnapshotNotExistException;
import com.alibaba.fluss.flink.tiering.event.FailedTieringEvent;
import com.alibaba.fluss.flink.tiering.event.FinishedTieringEvent;
import com.alibaba.fluss.flink.tiering.source.TableBucketWriteResult;
import com.alibaba.fluss.flink.tiering.source.TieringSource;
+import com.alibaba.fluss.lake.committer.BucketOffset;
import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
import com.alibaba.fluss.lake.committer.LakeCommitter;
import com.alibaba.fluss.lake.writer.LakeTieringFactory;
@@ -34,7 +36,10 @@ import com.alibaba.fluss.metadata.PartitionInfo;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.metadata.TablePath;
+import
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
+import
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import com.alibaba.fluss.utils.ExceptionUtils;
+import com.alibaba.fluss.utils.json.BucketOffsetJsonSerde;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
@@ -45,6 +50,8 @@ import
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -53,6 +60,7 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import static
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
import static com.alibaba.fluss.utils.Preconditions.checkState;
/**
@@ -64,8 +72,8 @@ import static
com.alibaba.fluss.utils.Preconditions.checkState;
*
* <p>When it collects all {@link TableBucketWriteResult}s of a round of
tiering for a table, it
* will combine all the {@link WriteResult}s to {@link Committable} via method
{@link
- * LakeCommitter#toCommittable(List)}, and then call method {@link
LakeCommitter#commit(Object)} to
- * commit to lake.
+ * LakeCommitter#toCommittable(List)}, and then call method {@link
LakeCommitter#commit(Object,
+ * Map)} to commit to lake.
*
* <p>Finally, it will also commit the commited lake snapshot to Fluss cluster
to make Fluss aware
* of the tiering progress.
@@ -82,6 +90,7 @@ public class TieringCommitOperator<WriteResult, Committable>
private final FlussTableLakeSnapshotCommitter
flussTableLakeSnapshotCommitter;
private Connection connection;
private Admin admin;
+ private static final JsonFactory JACKSON_FACTORY = new JsonFactory();
// gateway to send event to flink source coordinator
private final OperatorEventGateway operatorEventGateway;
@@ -179,11 +188,23 @@ public class TieringCommitOperator<WriteResult,
Committable>
committableWriteResults.stream()
.map(TableBucketWriteResult::writeResult)
.collect(Collectors.toList());
+
+ LakeSnapshot flussCurrentLakeSnapshot =
getLatestLakeSnapshot(tablePath);
+ Map<TableBucket, Long> logOffsets =
+ mergeTableBucketOffsets(flussCurrentLakeSnapshot,
committableWriteResults);
+
// to committable
Committable committable =
lakeCommitter.toCommittable(writeResults);
// before commit to lake, check fluss not missing any lake
snapshot commited by fluss
- checkFlussNotMissingLakeSnapshot(tablePath, lakeCommitter,
committable);
- long commitedSnapshotId = lakeCommitter.commit(committable);
+ checkFlussNotMissingLakeSnapshot(
+ tablePath,
+ lakeCommitter,
+ committable,
+ flussCurrentLakeSnapshot == null
+ ? null
+ : flussCurrentLakeSnapshot.getSnapshotId());
+ long commitedSnapshotId =
+ lakeCommitter.commit(committable,
toBucketOffsetsProperty(logOffsets));
// commit to fluss
Map<TableBucket, Long> logEndOffsets = new HashMap<>();
for (TableBucketWriteResult<WriteResult> writeResult :
committableWriteResults) {
@@ -195,14 +216,53 @@ public class TieringCommitOperator<WriteResult,
Committable>
}
}
- private void checkFlussNotMissingLakeSnapshot(
- TablePath tablePath,
- LakeCommitter<WriteResult, Committable> lakeCommitter,
- Committable committable)
- throws Exception {
- Long flussCurrentLakeSnapshot;
+ public static Map<String, String> toBucketOffsetsProperty(
+ Map<TableBucket, Long> tableBucketOffsets) throws IOException {
+ StringWriter sw = new StringWriter();
+ try (JsonGenerator gen = JACKSON_FACTORY.createGenerator(sw)) {
+ gen.writeStartArray();
+ for (Map.Entry<TableBucket, Long> entry :
tableBucketOffsets.entrySet()) {
+ BucketOffsetJsonSerde.INSTANCE.serialize(
+ new BucketOffset(
+ entry.getValue(),
+ entry.getKey().getBucket(),
+ entry.getKey().getPartitionId(),
+ // todo: fill partition name in #1448
+ null),
+ gen);
+ }
+ gen.writeEndArray();
+ }
+ return new HashMap<String, String>() {
+ {
+ put(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, sw.toString());
+ }
+ };
+ }
+
+ /**
+ * Merge the log offsets of latest snapshot with current written bucket
offsets to get full log
+ * offsets.
+ */
+ private Map<TableBucket, Long> mergeTableBucketOffsets(
+ @Nullable LakeSnapshot latestLakeSnapshot,
+ List<TableBucketWriteResult<WriteResult>> currentWriteResults) {
+ Map<TableBucket, Long> tableBucketOffsets =
+ latestLakeSnapshot == null
+ ? new HashMap<>()
+ : new
HashMap<>(latestLakeSnapshot.getTableBucketsOffset());
+ for (TableBucketWriteResult<WriteResult> tableBucketWriteResult :
currentWriteResults) {
+ tableBucketOffsets.put(
+ tableBucketWriteResult.tableBucket(),
tableBucketWriteResult.logEndOffset());
+ }
+ return tableBucketOffsets;
+ }
+
+ @Nullable
+ private LakeSnapshot getLatestLakeSnapshot(TablePath tablePath) throws
Exception {
+ LakeSnapshot flussCurrentLakeSnapshot;
try {
- flussCurrentLakeSnapshot =
admin.getLatestLakeSnapshot(tablePath).get().getSnapshotId();
+ flussCurrentLakeSnapshot =
admin.getLatestLakeSnapshot(tablePath).get();
} catch (Exception e) {
Throwable throwable = e.getCause();
if (throwable instanceof LakeTableSnapshotNotExistException) {
@@ -212,6 +272,15 @@ public class TieringCommitOperator<WriteResult,
Committable>
throw e;
}
}
+ return flussCurrentLakeSnapshot;
+ }
+
+ private void checkFlussNotMissingLakeSnapshot(
+ TablePath tablePath,
+ LakeCommitter<WriteResult, Committable> lakeCommitter,
+ Committable committable,
+ Long flussCurrentLakeSnapshot)
+ throws Exception {
// get Fluss missing lake snapshot in Lake
CommittedLakeSnapshot missingCommittedSnapshot =
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/TestingLakeTieringFactory.java
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/TestingLakeTieringFactory.java
index 9e5097bc..f81d018e 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/TestingLakeTieringFactory.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/TestingLakeTieringFactory.java
@@ -33,6 +33,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
/** An implementation of {@link LakeTieringFactory} for testing purpose. */
public class TestingLakeTieringFactory
@@ -118,7 +119,8 @@ public class TestingLakeTieringFactory
}
@Override
- public long commit(TestingCommittable committable) throws IOException {
+ public long commit(TestingCommittable committable, Map<String, String>
snapshotProperties)
+ throws IOException {
return ++currentSnapshot;
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
index 0d9d1b0e..26b3580d 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
@@ -71,14 +71,14 @@ class FlussTableLakeSnapshotCommitterTest extends
FlinkTestBase {
? DATA1_PARTITIONED_TABLE_DESCRIPTOR
: DATA1_TABLE_DESCRIPTOR);
- List<String> partitions;
+ List<Long> partitions;
Map<String, Long> partitionNameAndIds = Collections.emptyMap();
if (!isPartitioned) {
FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
partitions = Collections.singletonList(null);
} else {
partitionNameAndIds =
FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
- partitions = new ArrayList<>(partitionNameAndIds.keySet());
+ partitions = new ArrayList<>(partitionNameAndIds.values());
}
CommittedLakeSnapshot committedLakeSnapshot = new
CommittedLakeSnapshot(3);
@@ -86,15 +86,13 @@ class FlussTableLakeSnapshotCommitterTest extends
FlinkTestBase {
Map<TableBucket, Long> expectedOffsets = new HashMap<>();
for (int bucket = 0; bucket < 3; bucket++) {
long bucketOffset = bucket * bucket;
- for (String partition : partitions) {
+ for (Long partition : partitions) {
if (partition == null) {
committedLakeSnapshot.addBucket(bucket, bucketOffset);
expectedOffsets.put(new TableBucket(tableId, bucket),
bucketOffset);
} else {
committedLakeSnapshot.addPartitionBucket(partition,
bucket, bucketOffset);
- expectedOffsets.put(
- new TableBucket(tableId,
partitionNameAndIds.get(partition), bucket),
- bucketOffset);
+ expectedOffsets.put(new TableBucket(tableId, partition,
bucket), bucketOffset);
}
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
index 647ecfc2..86c43c1f 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
@@ -327,10 +327,9 @@ class TieringCommitOperatorTest extends FlinkTestBase {
mockCommitedSnapshot));
}
- private CommittedLakeSnapshot mockCommittedLakeSnapshot(
- List<String> partitions, int snapshotId) {
+ private CommittedLakeSnapshot mockCommittedLakeSnapshot(List<Long>
partitions, int snapshotId) {
CommittedLakeSnapshot mockCommittedSnapshot = new
CommittedLakeSnapshot(snapshotId);
- for (String partition : partitions) {
+ for (Long partition : partitions) {
for (int bucket = 0; bucket < DEFAULT_BUCKET_NUM; bucket++) {
if (partition == null) {
mockCommittedSnapshot.addBucket(bucket, bucket + 1);
@@ -347,18 +346,15 @@ class TieringCommitOperatorTest extends FlinkTestBase {
CommittedLakeSnapshot committedLakeSnapshot,
Map<String, Long> partitionIdByName) {
Map<TableBucket, Long> expectedLogEndOffsets = new HashMap<>();
- for (Map.Entry<Tuple2<String, Integer>, Long> entry :
+ for (Map.Entry<Tuple2<Long, Integer>, Long> entry :
committedLakeSnapshot.getLogEndOffsets().entrySet()) {
- Tuple2<String, Integer> partitionBucket = entry.getKey();
+ Tuple2<Long, Integer> partitionBucket = entry.getKey();
if (partitionBucket.f0 == null) {
expectedLogEndOffsets.put(
new TableBucket(tableId, partitionBucket.f1),
entry.getValue());
} else {
expectedLogEndOffsets.put(
- new TableBucket(
- tableId,
- partitionIdByName.get(partitionBucket.f0),
- partitionBucket.f1),
+ new TableBucket(tableId, partitionBucket.f0,
partitionBucket.f1),
entry.getValue());
}
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
index c5927100..9d7eed0b 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
@@ -17,37 +17,35 @@
package com.alibaba.fluss.lake.paimon.tiering;
+import com.alibaba.fluss.lake.committer.BucketOffset;
import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
import com.alibaba.fluss.lake.committer.LakeCommitter;
import com.alibaba.fluss.metadata.TablePath;
+import
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import com.alibaba.fluss.utils.json.BucketOffsetJsonSerde;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitCallback;
-import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.utils.SnapshotManager;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import static
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
import static
com.alibaba.fluss.lake.paimon.tiering.PaimonLakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
-import static
com.alibaba.fluss.metadata.ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR;
-import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
-import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
import static org.apache.paimon.table.sink.BatchWriteBuilder.COMMIT_IDENTIFIER;
@@ -59,6 +57,7 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
private FileStoreCommit fileStoreCommit;
private final TablePath tablePath;
private static final ThreadLocal<Long> currentCommitSnapshotId = new
ThreadLocal<>();
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public PaimonLakeCommitter(PaimonCatalogProvider paimonCatalogProvider,
TablePath tablePath)
throws IOException {
@@ -78,8 +77,11 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
}
@Override
- public long commit(PaimonCommittable committable) throws IOException {
+ public long commit(PaimonCommittable committable, Map<String, String>
snapshotProperties)
+ throws IOException {
ManifestCommittable manifestCommittable =
committable.manifestCommittable();
+ snapshotProperties.forEach(manifestCommittable::addProperty);
+
try {
fileStoreCommit =
fileStoreTable
@@ -110,9 +112,9 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
@Override
public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long
latestLakeSnapshotIdOfFluss)
throws IOException {
- Long latestLakeSnapshotIdOfLake =
-
getCommittedLatestSnapshotIdOfLake(FLUSS_LAKE_TIERING_COMMIT_USER);
- if (latestLakeSnapshotIdOfLake == null) {
+ Snapshot latestLakeSnapshotOfLake =
+
getCommittedLatestSnapshotOfLake(FLUSS_LAKE_TIERING_COMMIT_USER);
+ if (latestLakeSnapshotOfLake == null) {
return null;
}
@@ -120,40 +122,51 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
// but the latest snapshot is not greater than
latestLakeSnapshotIdOfFluss, no any missing
// snapshot, return directly
if (latestLakeSnapshotIdOfFluss != null
- && latestLakeSnapshotIdOfLake <= latestLakeSnapshotIdOfFluss) {
+ && latestLakeSnapshotOfLake.id() <=
latestLakeSnapshotIdOfFluss) {
return null;
}
- // todo: the temporary way to scan the delta to get the log end offset,
- // we should read from snapshot's properties in Paimon 1.2
CommittedLakeSnapshot committedLakeSnapshot =
- new CommittedLakeSnapshot(latestLakeSnapshotIdOfLake);
- ScanMode scanMode =
- fileStoreTable.primaryKeys().isEmpty() ? ScanMode.DELTA :
ScanMode.CHANGELOG;
-
- Iterator<ManifestEntry> manifestEntryIterator =
- fileStoreTable
- .store()
- .newScan()
- .withSnapshot(latestLakeSnapshotIdOfLake)
- .withKind(scanMode)
- .readFileIterator();
+ new CommittedLakeSnapshot(latestLakeSnapshotOfLake.id());
- int bucketIdColumnIndex = getColumnIndex(BUCKET_COLUMN_NAME);
- int logOffsetColumnIndex = getColumnIndex(OFFSET_COLUMN_NAME);
- while (manifestEntryIterator.hasNext()) {
- updateCommittedLakeSnapshot(
- committedLakeSnapshot,
- manifestEntryIterator.next(),
- bucketIdColumnIndex,
- logOffsetColumnIndex);
+ if (latestLakeSnapshotOfLake.properties() == null) {
+ throw new IOException("Failed to load committed lake snapshot
properties from Paimon.");
}
+ // if resume from an old tiering service v0.7 without paimon
supporting snapshot properties,
+ // we can't get the properties. But once come into here, it must be
that
+ // tiering service commit snapshot to lake, but fail to commit to
fluss, we have to notify
+ // users to run old tiering service again to commit the snapshot to
fluss again, and then
+ // it can resume tiering with new tiering service
+ Map<String, String> lakeSnapshotProperties =
latestLakeSnapshotOfLake.properties();
+ if (lakeSnapshotProperties == null) {
+ throw new IllegalArgumentException(
+ "Cannot resume tiering from an old version(v0.7) of
tiering service. "
+ + "The snapshot was committed to the lake storage
but failed to commit to Fluss. "
+ + "To resolve this:\n"
+ + "1. Run the old tiering service(v0.7) again to
complete the Fluss commit\n"
+ + "2. Then you can resume tiering with the newer
version of tiering service");
+ } else {
+ String flussOffsetProperties =
+
lakeSnapshotProperties.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
+ for (JsonNode node :
OBJECT_MAPPER.readTree(flussOffsetProperties)) {
+ BucketOffset bucketOffset =
BucketOffsetJsonSerde.INSTANCE.deserialize(node);
+ if (bucketOffset.getPartitionId() != null) {
+ committedLakeSnapshot.addPartitionBucket(
+ bucketOffset.getPartitionId(),
+ bucketOffset.getBucket(),
+ bucketOffset.getLogOffset());
+ } else {
+ committedLakeSnapshot.addBucket(
+ bucketOffset.getBucket(),
bucketOffset.getLogOffset());
+ }
+ }
+ }
return committedLakeSnapshot;
}
@Nullable
- private Long getCommittedLatestSnapshotIdOfLake(String commitUser) throws
IOException {
+ private Snapshot getCommittedLatestSnapshotOfLake(String commitUser)
throws IOException {
// get the latest snapshot commited by fluss or latest commited id
SnapshotManager snapshotManager = fileStoreTable.snapshotManager();
Long userCommittedSnapshotIdOrLatestCommitId =
@@ -172,7 +185,7 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
// the snapshot is still not commited by Fluss, return directly
return null;
}
- return snapshot.id();
+ return snapshot;
}
@Override
@@ -226,44 +239,4 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
// do-nothing
}
}
-
- private void updateCommittedLakeSnapshot(
- CommittedLakeSnapshot committedLakeSnapshot,
- ManifestEntry manifestEntry,
- int bucketIdColumnIndex,
- int logOffsetColumnIndex) {
- // always get bucket, log_offset from statistic
- DataFileMeta dataFileMeta = manifestEntry.file();
- BinaryRow maxStatisticRow = dataFileMeta.valueStats().maxValues();
-
- int bucketId = maxStatisticRow.getInt(bucketIdColumnIndex);
- long offset = maxStatisticRow.getLong(logOffsetColumnIndex);
-
- String partition = null;
- BinaryRow partitionRow = manifestEntry.partition();
- if (partitionRow.getFieldCount() > 0) {
- List<String> partitionFields = new
ArrayList<>(partitionRow.getFieldCount());
- for (int i = 0; i < partitionRow.getFieldCount(); i++) {
- partitionFields.add(partitionRow.getString(i).toString());
- }
- partition = String.join(PARTITION_SPEC_SEPARATOR, partitionFields);
- }
-
- if (partition == null) {
- committedLakeSnapshot.addBucket(bucketId, offset);
- } else {
- committedLakeSnapshot.addPartitionBucket(partition, bucketId,
offset);
- }
- }
-
- private int getColumnIndex(String columnName) {
- int columnIndex =
fileStoreTable.schema().fieldNames().indexOf(columnName);
- if (columnIndex < 0) {
- throw new IllegalArgumentException(
- String.format(
- "Column '%s' is not found in paimon table %s, the
columns of the table are %s",
- columnIndex, tablePath,
fileStoreTable.schema().fieldNames()));
- }
- return columnIndex;
- }
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index b4788d87..d9c0bebd 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -43,6 +43,7 @@ import com.alibaba.fluss.types.DataTypes;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
@@ -460,4 +461,18 @@ public class FlinkPaimonTieringTestBase {
table.newRead().createReader(table.newReadBuilder().newScan().plan());
return reader.toCloseableIterator();
}
+
+ protected void checkSnapshotPropertyInPaimon(
+ TablePath tablePath, Map<String, String> expectedProperties)
throws Exception {
+ FileStoreTable table =
+ (FileStoreTable)
+ getPaimonCatalog()
+ .getTable(
+ Identifier.create(
+ tablePath.getDatabaseName(),
+ tablePath.getTableName()));
+ Snapshot snapshot = table.snapshotManager().latestSnapshot();
+ assertThat(snapshot).isNotNull();
+ assertThat(snapshot.properties()).isEqualTo(expectedProperties);
+ }
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
index 2ee3083f..576d4722 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
@@ -42,10 +42,12 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import static
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
import static com.alibaba.fluss.testutils.DataTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
@@ -84,6 +86,16 @@ class PaimonTieringITCase extends FlinkPaimonTieringTestBase
{
assertReplicaStatus(t1Bucket, 3);
// check data in paimon
checkDataInPaimonPrimayKeyTable(t1, rows);
+ // check snapshot property in paimon
+ Map<String, String> properties =
+ new HashMap<String, String>() {
+ {
+ put(
+ FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+ "[{\"bucket_id\":0,\"log_offset\":3}]");
+ }
+ };
+ checkSnapshotPropertyInPaimon(t1, properties);
// then, create another log table
TablePath t2 = TablePath.of(DEFAULT_DB, "logTable");
@@ -146,6 +158,20 @@ class PaimonTieringITCase extends
FlinkPaimonTieringTestBase {
writtenRowsByPartition.get(partitionName),
0);
}
+
+ properties =
+ new HashMap<String, String>() {
+ {
+ put(
+ FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+ "["
+ +
"{\"partition_id\":0,\"bucket_id\":0,\"log_offset\":3},"
+ +
"{\"partition_id\":1,\"bucket_id\":0,\"log_offset\":3}"
+ + "]");
+ }
+ };
+ checkSnapshotPropertyInPaimon(partitionedTablePath, properties);
+
jobClient.cancel().get();
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
index ae02112f..16984f7f 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
@@ -66,6 +66,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
+import static
com.alibaba.fluss.flink.tiering.committer.TieringCommitOperator.toBucketOffsetsProperty;
import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
@@ -129,13 +130,23 @@ class PaimonTieringTest {
}
Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket = new
HashMap<>();
- List<String> partitions =
- isPartitioned ? Arrays.asList("p1", "p2", "p3") :
Collections.singletonList(null);
+ Map<Long, String> partitionIdAndName =
+ isPartitioned
+ ? new HashMap<Long, String>() {
+ {
+ put(1L, "p1");
+ put(2L, "p2");
+ put(3L, "p3");
+ }
+ }
+ : Collections.singletonMap(null, null);
+ Map<TableBucket, Long> tableBucketOffsets = new HashMap<>();
// first, write data
for (int bucket = 0; bucket < bucketNum; bucket++) {
- for (String partition : partitions) {
+ for (Map.Entry<Long, String> entry :
partitionIdAndName.entrySet()) {
+ String partition = entry.getValue();
try (LakeWriter<PaimonWriteResult> lakeWriter =
- createLakeWriter(tablePath, bucket, partition)) {
+ createLakeWriter(tablePath, bucket, partition,
entry.getKey())) {
Tuple2<String, Integer> partitionBucket =
Tuple2.of(partition, bucket);
Tuple2<List<LogRecord>, List<LogRecord>>
writeAndExpectRecords =
isPrimaryKeyTable
@@ -144,6 +155,7 @@ class PaimonTieringTest {
List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
List<LogRecord> expectRecords = writeAndExpectRecords.f1;
recordsByBucket.put(partitionBucket, expectRecords);
+ tableBucketOffsets.put(new TableBucket(0, entry.getKey(),
bucket), 10L);
for (LogRecord logRecord : writtenRecords) {
lakeWriter.write(logRecord);
}
@@ -166,13 +178,15 @@ class PaimonTieringTest {
paimonCommittable =
committableSerializer.deserialize(
committableSerializer.getVersion(), serialized);
- long snapshot = lakeCommitter.commit(paimonCommittable);
+ long snapshot =
+ lakeCommitter.commit(
+ paimonCommittable,
toBucketOffsetsProperty(tableBucketOffsets));
assertThat(snapshot).isEqualTo(1);
}
// then, check data
for (int bucket = 0; bucket < 3; bucket++) {
- for (String partition : partitions) {
+ for (String partition : partitionIdAndName.values()) {
Tuple2<String, Integer> partitionBucket = Tuple2.of(partition,
bucket);
List<LogRecord> expectRecords =
recordsByBucket.get(partitionBucket);
CloseableIterator<InternalRow> actualRecords =
@@ -192,11 +206,11 @@ class PaimonTieringTest {
// use snapshot id 0 as the known snapshot id
CommittedLakeSnapshot committedLakeSnapshot =
lakeCommitter.getMissingLakeSnapshot(0L);
assertThat(committedLakeSnapshot).isNotNull();
- Map<Tuple2<String, Integer>, Long> offsets =
committedLakeSnapshot.getLogEndOffsets();
+ Map<Tuple2<Long, Integer>, Long> offsets =
committedLakeSnapshot.getLogEndOffsets();
for (int bucket = 0; bucket < 3; bucket++) {
- for (String partition : partitions) {
- // we only write 10 records, so expected log offset should
be 9
- assertThat(offsets.get(Tuple2.of(partition,
bucket))).isEqualTo(9);
+ for (Long partitionId : partitionIdAndName.keySet()) {
+ // we only write 10 records, so expected log offset should
be 10
+ assertThat(offsets.get(Tuple2.of(partitionId,
bucket))).isEqualTo(10);
}
}
assertThat(committedLakeSnapshot.getLakeSnapshotId()).isOne();
@@ -221,15 +235,24 @@ class PaimonTieringTest {
Map<String, List<LogRecord>> recordsByPartition = new HashMap<>();
List<PaimonWriteResult> paimonWriteResults = new ArrayList<>();
+ Map<TableBucket, Long> tableBucketOffsets = new HashMap<>();
// Test data for different partitions using $ separator
- List<String> partitions = Arrays.asList("us-east$2024",
"us-west$2024", "eu-central$2023");
+ Map<Long, String> partitionIdAndName =
+ new HashMap<Long, String>() {
+ {
+ put(1L, "us-east$2024");
+ put(2L, "us-west$2024");
+ put(3L, "eu-central$2023");
+ }
+ };
int bucket = 0;
- for (String partition : partitions) {
+ for (Map.Entry<Long, String> entry : partitionIdAndName.entrySet()) {
+ String partition = entry.getValue();
try (LakeWriter<PaimonWriteResult> lakeWriter =
- createLakeWriter(tablePath, bucket, partition)) {
+ createLakeWriter(tablePath, bucket, partition,
entry.getKey())) {
List<LogRecord> logRecords =
genLogTableRecordsForMultiPartition(partition, bucket,
3);
recordsByPartition.put(partition, logRecords);
@@ -237,7 +260,7 @@ class PaimonTieringTest {
for (LogRecord logRecord : logRecords) {
lakeWriter.write(logRecord);
}
-
+ tableBucketOffsets.put(new TableBucket(0, entry.getKey(),
bucket), 3L);
PaimonWriteResult result = lakeWriter.complete();
paimonWriteResults.add(result);
}
@@ -247,12 +270,13 @@ class PaimonTieringTest {
try (LakeCommitter<PaimonWriteResult, PaimonCommittable> lakeCommitter
=
createLakeCommitter(tablePath)) {
PaimonCommittable committable =
lakeCommitter.toCommittable(paimonWriteResults);
- long snapshot = lakeCommitter.commit(committable);
+ long snapshot =
+ lakeCommitter.commit(committable,
toBucketOffsetsProperty(tableBucketOffsets));
assertThat(snapshot).isEqualTo(1);
}
// Verify data for each partition
- for (String partition : partitions) {
+ for (String partition : partitionIdAndName.values()) {
List<LogRecord> expectRecords = recordsByPartition.get(partition);
CloseableIterator<InternalRow> actualRecords =
getPaimonRowsMultiPartition(tablePath, partition);
@@ -268,16 +292,22 @@ class PaimonTieringTest {
Map<String, List<LogRecord>> recordsByPartition = new HashMap<>();
List<PaimonWriteResult> paimonWriteResults = new ArrayList<>();
+ Map<TableBucket, Long> tableBucketOffsets = new HashMap<>();
// Test data for different three-level partitions using $ separator
- List<String> partitions =
- Arrays.asList("us-east$2024$01", "us-east$2024$02",
"eu-central$2023$12");
-
+ Map<Long, String> partitionIdAndName =
+ new HashMap<Long, String>() {
+ {
+ put(1L, "us-east$2024$01");
+ put(2L, "eu-central$2023$12");
+ }
+ };
int bucket = 0;
- for (String partition : partitions) {
+ for (Map.Entry<Long, String> entry : partitionIdAndName.entrySet()) {
+ String partition = entry.getValue();
try (LakeWriter<PaimonWriteResult> lakeWriter =
- createLakeWriter(tablePath, bucket, partition)) {
+ createLakeWriter(tablePath, bucket, partition,
entry.getKey())) {
List<LogRecord> logRecords =
genLogTableRecordsForMultiPartition(
partition, bucket, 2); // Use same method
@@ -286,6 +316,7 @@ class PaimonTieringTest {
for (LogRecord logRecord : logRecords) {
lakeWriter.write(logRecord);
}
+ tableBucketOffsets.put(new TableBucket(0, entry.getKey(),
bucket), 2L);
PaimonWriteResult result = lakeWriter.complete();
paimonWriteResults.add(result);
@@ -296,12 +327,13 @@ class PaimonTieringTest {
try (LakeCommitter<PaimonWriteResult, PaimonCommittable> lakeCommitter
=
createLakeCommitter(tablePath)) {
PaimonCommittable committable =
lakeCommitter.toCommittable(paimonWriteResults);
- long snapshot = lakeCommitter.commit(committable);
+ long snapshot =
+ lakeCommitter.commit(committable,
toBucketOffsetsProperty(tableBucketOffsets));
assertThat(snapshot).isEqualTo(1);
}
// Verify data for each partition
- for (String partition : partitions) {
+ for (String partition : partitionIdAndName.values()) {
List<LogRecord> expectRecords = recordsByPartition.get(partition);
CloseableIterator<InternalRow> actualRecords =
getPaimonRowsThreePartition(tablePath, partition);
@@ -639,7 +671,8 @@ class PaimonTieringTest {
}
private LakeWriter<PaimonWriteResult> createLakeWriter(
- TablePath tablePath, int bucket, @Nullable String partition)
throws IOException {
+ TablePath tablePath, int bucket, @Nullable String partition,
@Nullable Long partitionId)
+ throws IOException {
return paimonLakeTieringFactory.createLakeWriter(
new WriterInitContext() {
@Override
@@ -650,7 +683,7 @@ class PaimonTieringTest {
@Override
public TableBucket tableBucket() {
// don't care about tableId & partitionId
- return new TableBucket(0, 0L, bucket);
+ return new TableBucket(0, partitionId, bucket);
}
@Nullable