This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new d2dc98f88 [lake] Store partition name in lake snapshot property (#1485)
d2dc98f88 is described below
commit d2dc98f8881c7d6212775df5682c59e1c1c5f7c9
Author: yuxia Luo <[email protected]>
AuthorDate: Tue Aug 12 20:13:40 2025 +0800
[lake] Store partition name in lake snapshot property (#1485)
---
.../fluss/client/metadata/LakeSnapshot.java | 19 +++-
.../fluss/client/utils/ClientRpcMessageUtils.java | 6 +-
.../alibaba/fluss/lake/committer/BucketOffset.java | 12 +--
.../lake/committer/CommittedLakeSnapshot.java | 19 +++-
.../fluss/metadata/ResolvedPartitionSpec.java | 19 ++++
.../fluss/utils/json/BucketOffsetJsonSerde.java | 4 +-
.../fluss/metadata/ResolvedPartitionSpecTest.java | 9 ++
.../utils/json/BucketOffsetJsonSerdeTest.java | 5 +-
.../tiering/committer/FlussTableLakeSnapshot.java | 24 ++---
.../committer/FlussTableLakeSnapshotCommitter.java | 40 ++++----
.../tiering/committer/TieringCommitOperator.java | 109 +++++++++++++--------
.../tiering/source/TableBucketWriteResult.java | 10 ++
.../source/TableBucketWriteResultSerializer.java | 10 +-
.../flink/tiering/source/TieringSplitReader.java | 19 +++-
.../FlussTableLakeSnapshotCommitterTest.java | 43 ++++----
.../committer/TieringCommitOperatorTest.java | 47 +++++++--
.../TableBucketWriteResultSerializerTest.java | 11 ++-
.../lake/paimon/tiering/PaimonLakeCommitter.java | 1 +
.../lake/paimon/tiering/PaimonTieringITCase.java | 4 +-
.../lake/paimon/tiering/PaimonTieringTest.java | 51 +++++++++-
fluss-rpc/src/main/proto/FlussApi.proto | 2 +
.../fluss/server/utils/ServerRpcMessageUtils.java | 19 +++-
.../alibaba/fluss/server/zk/ZooKeeperClient.java | 7 +-
.../fluss/server/zk/data/LakeTableSnapshot.java | 29 ++++--
.../server/zk/data/LakeTableSnapshotJsonSerde.java | 24 ++++-
.../replica/CommitLakeTableSnapshotITCase.java | 7 +-
.../zk/data/LakeTableSnapshotJsonSerdeTest.java | 28 +++++-
27 files changed, 429 insertions(+), 149 deletions(-)
diff --git
a/fluss-client/src/main/java/com/alibaba/fluss/client/metadata/LakeSnapshot.java
b/fluss-client/src/main/java/com/alibaba/fluss/client/metadata/LakeSnapshot.java
index ff148cfb8..36228cb4c 100644
---
a/fluss-client/src/main/java/com/alibaba/fluss/client/metadata/LakeSnapshot.java
+++
b/fluss-client/src/main/java/com/alibaba/fluss/client/metadata/LakeSnapshot.java
@@ -20,6 +20,7 @@ package com.alibaba.fluss.client.metadata;
import com.alibaba.fluss.annotation.PublicEvolving;
import com.alibaba.fluss.metadata.TableBucket;
+import java.util.Collections;
import java.util.Map;
/**
@@ -36,9 +37,17 @@ public class LakeSnapshot {
// the specific log offset of the snapshot
private final Map<TableBucket, Long> tableBucketsOffset;
- public LakeSnapshot(long snapshotId, Map<TableBucket, Long>
tableBucketsOffset) {
+ // the partition name by partition id of this lake snapshot if
+ // is a partitioned table, empty if not a partitioned table
+ private final Map<Long, String> partitionNameById;
+
+ public LakeSnapshot(
+ long snapshotId,
+ Map<TableBucket, Long> tableBucketsOffset,
+ Map<Long, String> partitionNameById) {
this.snapshotId = snapshotId;
this.tableBucketsOffset = tableBucketsOffset;
+ this.partitionNameById = partitionNameById;
}
public long getSnapshotId() {
@@ -46,7 +55,11 @@ public class LakeSnapshot {
}
public Map<TableBucket, Long> getTableBucketsOffset() {
- return tableBucketsOffset;
+ return Collections.unmodifiableMap(tableBucketsOffset);
+ }
+
+ public Map<Long, String> getPartitionNameById() {
+ return Collections.unmodifiableMap(partitionNameById);
}
@Override
@@ -56,6 +69,8 @@ public class LakeSnapshot {
+ snapshotId
+ ", tableBucketsOffset="
+ tableBucketsOffset
+ + ", partitionNameById="
+ + partitionNameById
+ '}';
}
}
diff --git
a/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java
b/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java
index bfc36f6fd..d8b932a84 100644
---
a/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java
+++
b/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java
@@ -202,6 +202,7 @@ public class ClientRpcMessageUtils {
long snapshotId = response.getSnapshotId();
Map<TableBucket, Long> tableBucketsOffset =
new HashMap<>(response.getBucketSnapshotsCount());
+ Map<Long, String> partitionNameById = new HashMap<>();
for (PbLakeSnapshotForBucket pbLakeSnapshotForBucket :
response.getBucketSnapshotsList()) {
Long partitionId =
pbLakeSnapshotForBucket.hasPartitionId()
@@ -209,9 +210,12 @@ public class ClientRpcMessageUtils {
: null;
TableBucket tableBucket =
new TableBucket(tableId, partitionId,
pbLakeSnapshotForBucket.getBucketId());
+ if (partitionId != null &&
pbLakeSnapshotForBucket.hasPartitionName()) {
+ partitionNameById.put(partitionId,
pbLakeSnapshotForBucket.getPartitionName());
+ }
tableBucketsOffset.put(tableBucket,
pbLakeSnapshotForBucket.getLogOffset());
}
- return new LakeSnapshot(snapshotId, tableBucketsOffset);
+ return new LakeSnapshot(snapshotId, tableBucketsOffset,
partitionNameById);
}
public static List<FsPathAndFileName> toFsPathAndFileName(
diff --git
a/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/BucketOffset.java
b/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/BucketOffset.java
index d90c4805b..ae8fdbdb1 100644
---
a/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/BucketOffset.java
+++
b/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/BucketOffset.java
@@ -31,17 +31,17 @@ public class BucketOffset implements Serializable {
private final long logOffset;
private final int bucket;
private final @Nullable Long partitionId;
- private final @Nullable String partitionName;
+ private final @Nullable String partitionQualifiedName;
public BucketOffset(
long logOffset,
int bucket,
@Nullable Long partitionId,
- @Nullable String partitionName) {
+ @Nullable String partitionQualifiedName) {
this.logOffset = logOffset;
this.bucket = bucket;
this.partitionId = partitionId;
- this.partitionName = partitionName;
+ this.partitionQualifiedName = partitionQualifiedName;
}
public long getLogOffset() {
@@ -58,8 +58,8 @@ public class BucketOffset implements Serializable {
}
@Nullable
- public String getPartitionName() {
- return partitionName;
+ public String getPartitionQualifiedName() {
+ return partitionQualifiedName;
}
@Override
@@ -74,6 +74,6 @@ public class BucketOffset implements Serializable {
return bucket == that.bucket
&& logOffset == that.logOffset
&& Objects.equals(partitionId, that.partitionId)
- && Objects.equals(partitionName, that.partitionName);
+ && Objects.equals(partitionQualifiedName,
that.partitionQualifiedName);
}
}
diff --git
a/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/CommittedLakeSnapshot.java
b/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/CommittedLakeSnapshot.java
index a0d94b140..2e5f7947f 100644
---
a/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/CommittedLakeSnapshot.java
+++
b/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/CommittedLakeSnapshot.java
@@ -34,6 +34,10 @@ public class CommittedLakeSnapshot {
// partition bucket
private final Map<Tuple2<Long, Integer>, Long> logEndOffsets = new
HashMap<>();
+ // partition id -> partition name, will be empty if is not a partitioned
table
+ // the partition name is a qualified name in the format:
key1=value1/key2=value2
+ private final Map<Long, String> qualifiedPartitionNameById = new
HashMap<>();
+
public CommittedLakeSnapshot(long lakeSnapshotId) {
this.lakeSnapshotId = lakeSnapshotId;
}
@@ -46,14 +50,20 @@ public class CommittedLakeSnapshot {
logEndOffsets.put(Tuple2.of(null, bucketId), offset);
}
- public void addPartitionBucket(Long partitionId, int bucketId, long
offset) {
+ public void addPartitionBucket(
+ Long partitionId, String partitionQualifiedName, int bucketId,
long offset) {
logEndOffsets.put(Tuple2.of(partitionId, bucketId), offset);
+ qualifiedPartitionNameById.put(partitionId, partitionQualifiedName);
}
public Map<Tuple2<Long, Integer>, Long> getLogEndOffsets() {
return logEndOffsets;
}
+ public Map<Long, String> getQualifiedPartitionNameById() {
+ return qualifiedPartitionNameById;
+ }
+
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
@@ -61,12 +71,13 @@ public class CommittedLakeSnapshot {
}
CommittedLakeSnapshot that = (CommittedLakeSnapshot) o;
return lakeSnapshotId == that.lakeSnapshotId
- && Objects.equals(logEndOffsets, that.logEndOffsets);
+ && Objects.equals(logEndOffsets, that.logEndOffsets)
+ && Objects.equals(qualifiedPartitionNameById,
that.qualifiedPartitionNameById);
}
@Override
public int hashCode() {
- return Objects.hash(lakeSnapshotId, logEndOffsets);
+ return Objects.hash(lakeSnapshotId, logEndOffsets,
qualifiedPartitionNameById);
}
@Override
@@ -76,6 +87,8 @@ public class CommittedLakeSnapshot {
+ lakeSnapshotId
+ ", logEndOffsets="
+ logEndOffsets
+ + ", partitionNameById="
+ + qualifiedPartitionNameById
+ '}';
}
}
diff --git
a/fluss-common/src/main/java/com/alibaba/fluss/metadata/ResolvedPartitionSpec.java
b/fluss-common/src/main/java/com/alibaba/fluss/metadata/ResolvedPartitionSpec.java
index 5bd8b5363..4575206da 100644
---
a/fluss-common/src/main/java/com/alibaba/fluss/metadata/ResolvedPartitionSpec.java
+++
b/fluss-common/src/main/java/com/alibaba/fluss/metadata/ResolvedPartitionSpec.java
@@ -125,6 +125,25 @@ public class ResolvedPartitionSpec {
return sb.toString();
}
+ public static ResolvedPartitionSpec fromPartitionQualifiedName(String
qualifiedPartitionName) {
+ // convert from qualified name to ResolvedPartitionSpec
+ List<String> keys = new ArrayList<>();
+ List<String> values = new ArrayList<>();
+
+ String[] keyValuePairs = qualifiedPartitionName.split("/");
+
+ for (String pair : keyValuePairs) {
+ String[] keyValue = pair.split("=", 2);
+ if (keyValue.length != 2) {
+ throw new IllegalArgumentException(
+ "Invalid partition name format. Expected key=value,
got: " + pair);
+ }
+ keys.add(keyValue[0]);
+ values.add(keyValue[1]);
+ }
+ return new ResolvedPartitionSpec(keys, values);
+ }
+
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
diff --git
a/fluss-common/src/main/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerde.java
b/fluss-common/src/main/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerde.java
index f5fa71209..819e2f09f 100644
---
a/fluss-common/src/main/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerde.java
+++
b/fluss-common/src/main/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerde.java
@@ -60,8 +60,8 @@ public class BucketOffsetJsonSerde
generator.writeNumberField(BUCKET_ID, bucketOffset.getBucket());
// serialize partition name
- if (bucketOffset.getPartitionName() != null) {
- generator.writeStringField(PARTITION_NAME,
bucketOffset.getPartitionName());
+ if (bucketOffset.getPartitionQualifiedName() != null) {
+ generator.writeStringField(PARTITION_NAME,
bucketOffset.getPartitionQualifiedName());
}
// serialize bucket offset
diff --git
a/fluss-common/src/test/java/com/alibaba/fluss/metadata/ResolvedPartitionSpecTest.java
b/fluss-common/src/test/java/com/alibaba/fluss/metadata/ResolvedPartitionSpecTest.java
index f9e827477..50981e33d 100644
---
a/fluss-common/src/test/java/com/alibaba/fluss/metadata/ResolvedPartitionSpecTest.java
+++
b/fluss-common/src/test/java/com/alibaba/fluss/metadata/ResolvedPartitionSpecTest.java
@@ -38,6 +38,11 @@ public class ResolvedPartitionSpecTest {
new PartitionSpec(Collections.singletonMap("a", "1")));
assertThat(resolvedPartitionSpec.getPartitionName()).isEqualTo("1");
assertThat(resolvedPartitionSpec.getPartitionQualifiedName()).isEqualTo("a=1");
+ assertThat(
+ ResolvedPartitionSpec.fromPartitionQualifiedName(
+
resolvedPartitionSpec.getPartitionQualifiedName()))
+ .isEqualTo(resolvedPartitionSpec);
+
assertThat(resolvedPartitionSpec.getPartitionKeys())
.isEqualTo(Collections.singletonList("a"));
assertThat(resolvedPartitionSpec.getPartitionValues())
@@ -52,5 +57,9 @@ public class ResolvedPartitionSpecTest {
assertThat(resolvedPartitionSpec.getPartitionQualifiedName()).isEqualTo("a=1/b=2");
assertThat(resolvedPartitionSpec.getPartitionKeys()).isEqualTo(Arrays.asList("a",
"b"));
assertThat(resolvedPartitionSpec.getPartitionValues()).isEqualTo(Arrays.asList("1",
"2"));
+ assertThat(
+ ResolvedPartitionSpec.fromPartitionQualifiedName(
+
resolvedPartitionSpec.getPartitionQualifiedName()))
+ .isEqualTo(resolvedPartitionSpec);
}
}
diff --git
a/fluss-common/src/test/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerdeTest.java
b/fluss-common/src/test/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerdeTest.java
index 7f267b160..c1913bae2 100644
---
a/fluss-common/src/test/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerdeTest.java
+++
b/fluss-common/src/test/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerdeTest.java
@@ -29,14 +29,15 @@ public class BucketOffsetJsonSerdeTest extends
JsonSerdeTestBase<BucketOffset> {
@Override
protected BucketOffset[] createObjects() {
return new BucketOffset[] {
- new BucketOffset(10, 1, 1L, "eu-central$2023$12"), new
BucketOffset(20, 2, null, null)
+ new BucketOffset(10, 1, 1L,
"country=eu-central/year=2023/month=12"),
+ new BucketOffset(20, 2, null, null)
};
}
@Override
protected String[] expectedJsons() {
return new String[] {
-
"{\"partition_id\":1,\"bucket_id\":1,\"partition_name\":\"eu-central$2023$12\",\"log_offset\":10}",
+
"{\"partition_id\":1,\"bucket_id\":1,\"partition_name\":\"country=eu-central/year=2023/month=12\",\"log_offset\":10}",
"{\"bucket_id\":2,\"log_offset\":20}"
};
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshot.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshot.java
index 294c0b94f..a9207ca49 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshot.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshot.java
@@ -18,28 +18,26 @@
package com.alibaba.fluss.flink.tiering.committer;
import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.utils.types.Tuple2;
import java.util.HashMap;
import java.util.Map;
/** A lake snapshot for a Fluss table. */
-public class FlussTableLakeSnapshot {
+class FlussTableLakeSnapshot {
private final long tableId;
private final long lakeSnapshotId;
- private final Map<TableBucket, Long> logEndOffsets;
+ // <table_bucket, partition_name> -> log end offsets,
+ // if the bucket is not of a partition, the partition_name is null
+ private final Map<Tuple2<TableBucket, String>, Long> logEndOffsets;
- public FlussTableLakeSnapshot(long tableId, long lakeSnapshotId) {
- this(tableId, lakeSnapshotId, new HashMap<>());
- }
-
- public FlussTableLakeSnapshot(
- long tableId, long lakeSnapshotId, Map<TableBucket, Long>
logEndOffsets) {
+ FlussTableLakeSnapshot(long tableId, long lakeSnapshotId) {
this.tableId = tableId;
this.lakeSnapshotId = lakeSnapshotId;
- this.logEndOffsets = logEndOffsets;
+ this.logEndOffsets = new HashMap<>();
}
public long tableId() {
@@ -50,12 +48,16 @@ public class FlussTableLakeSnapshot {
return lakeSnapshotId;
}
- public Map<TableBucket, Long> logEndOffsets() {
+ public Map<Tuple2<TableBucket, String>, Long> logEndOffsets() {
return logEndOffsets;
}
public void addBucketOffset(TableBucket bucket, long offset) {
- logEndOffsets.put(bucket, offset);
+ logEndOffsets.put(Tuple2.of(bucket, null), offset);
+ }
+
+ public void addPartitionBucketOffset(TableBucket bucket, String
partitionName, long offset) {
+ logEndOffsets.put(Tuple2.of(bucket, partitionName), offset);
}
@Override
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
index 77a08e83e..e1b88e320 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
@@ -21,6 +21,7 @@ import com.alibaba.fluss.client.metadata.MetadataUpdater;
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
+import com.alibaba.fluss.metadata.ResolvedPartitionSpec;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metrics.registry.MetricRegistry;
import com.alibaba.fluss.rpc.GatewayClientProxy;
@@ -33,8 +34,6 @@ import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
import com.alibaba.fluss.utils.ExceptionUtils;
import com.alibaba.fluss.utils.types.Tuple2;
-import javax.annotation.Nullable;
-
import java.io.IOException;
import java.util.Map;
@@ -63,7 +62,7 @@ public class FlussTableLakeSnapshotCommitter implements
AutoCloseable {
metadataUpdater::getCoordinatorServer, rpcClient,
CoordinatorGateway.class);
}
- public void commit(FlussTableLakeSnapshot flussTableLakeSnapshot) throws
IOException {
+ void commit(FlussTableLakeSnapshot flussTableLakeSnapshot) throws
IOException {
try {
CommitLakeTableSnapshotRequest request =
toCommitLakeTableSnapshotRequest(flussTableLakeSnapshot);
@@ -77,10 +76,7 @@ public class FlussTableLakeSnapshotCommitter implements
AutoCloseable {
}
}
- public void commit(
- long tableId,
- @Nullable Map<String, Long> partitionIdByName,
- CommittedLakeSnapshot committedLakeSnapshot)
+ public void commit(long tableId, CommittedLakeSnapshot
committedLakeSnapshot)
throws IOException {
// construct lake snapshot to commit to Fluss
FlussTableLakeSnapshot flussTableLakeSnapshot =
@@ -89,18 +85,22 @@ public class FlussTableLakeSnapshotCommitter implements
AutoCloseable {
committedLakeSnapshot.getLogEndOffsets().entrySet()) {
Tuple2<Long, Integer> partitionBucket = entry.getKey();
TableBucket tableBucket;
- if (partitionBucket.f0 == null) {
+ Long partitionId = partitionBucket.f0;
+ if (partitionId == null) {
tableBucket = new TableBucket(tableId, partitionBucket.f1);
+ flussTableLakeSnapshot.addBucketOffset(tableBucket,
entry.getValue());
} else {
- Long partitionId = partitionBucket.f0;
- if (partitionId != null) {
- tableBucket = new TableBucket(tableId, partitionId,
partitionBucket.f1);
- } else {
- // let's skip the bucket
- continue;
- }
+ tableBucket = new TableBucket(tableId, partitionId,
partitionBucket.f1);
+ // the partition name is qualified partition name in format:
+ // key1=value1/key2=value2.
+ // we need to convert to partition name in format:
value1$value2$
+ String qualifiedPartitionName =
+
committedLakeSnapshot.getQualifiedPartitionNameById().get(partitionId);
+ ResolvedPartitionSpec resolvedPartitionSpec =
+
ResolvedPartitionSpec.fromPartitionQualifiedName(qualifiedPartitionName);
+ flussTableLakeSnapshot.addPartitionBucketOffset(
+ tableBucket, resolvedPartitionSpec.getPartitionName(),
entry.getValue());
}
- flussTableLakeSnapshot.addBucketOffset(tableBucket,
entry.getValue());
}
commit(flussTableLakeSnapshot);
}
@@ -114,15 +114,19 @@ public class FlussTableLakeSnapshotCommitter implements
AutoCloseable {
pbLakeTableSnapshotInfo.setTableId(flussTableLakeSnapshot.tableId());
pbLakeTableSnapshotInfo.setSnapshotId(flussTableLakeSnapshot.lakeSnapshotId());
- for (Map.Entry<TableBucket, Long> bucketEndOffsetEntry :
+ for (Map.Entry<Tuple2<TableBucket, String>, Long> bucketEndOffsetEntry
:
flussTableLakeSnapshot.logEndOffsets().entrySet()) {
PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =
pbLakeTableSnapshotInfo.addBucketsReq();
- TableBucket tableBucket = bucketEndOffsetEntry.getKey();
+ TableBucket tableBucket = bucketEndOffsetEntry.getKey().f0;
+ String partitionName = bucketEndOffsetEntry.getKey().f1;
long endOffset = bucketEndOffsetEntry.getValue();
if (tableBucket.getPartitionId() != null) {
pbLakeTableOffsetForBucket.setPartitionId(tableBucket.getPartitionId());
}
+ if (partitionName != null) {
+ pbLakeTableOffsetForBucket.setPartitionName(partitionName);
+ }
pbLakeTableOffsetForBucket.setBucketId(tableBucket.getBucket());
pbLakeTableOffsetForBucket.setLogEndOffset(endOffset);
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java
index ffb7ecb5b..64d78d7c2 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -32,7 +32,7 @@ import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
import com.alibaba.fluss.lake.committer.LakeCommitter;
import com.alibaba.fluss.lake.writer.LakeTieringFactory;
import com.alibaba.fluss.lake.writer.LakeWriter;
-import com.alibaba.fluss.metadata.PartitionInfo;
+import com.alibaba.fluss.metadata.ResolvedPartitionSpec;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.metadata.TablePath;
@@ -190,9 +190,9 @@ public class TieringCommitOperator<WriteResult, Committable>
.collect(Collectors.toList());
LakeSnapshot flussCurrentLakeSnapshot =
getLatestLakeSnapshot(tablePath);
- Map<TableBucket, Long> logOffsets =
- mergeTableBucketOffsets(flussCurrentLakeSnapshot,
committableWriteResults);
-
+ Map<String, String> logOffsetsProperty =
+ toBucketOffsetsProperty(
+ tablePath, flussCurrentLakeSnapshot,
committableWriteResults);
// to committable
Committable committable =
lakeCommitter.toCommittable(writeResults);
// before commit to lake, check fluss not missing any lake
snapshot committed by fluss
@@ -203,32 +203,86 @@ public class TieringCommitOperator<WriteResult,
Committable>
flussCurrentLakeSnapshot == null
? null
: flussCurrentLakeSnapshot.getSnapshotId());
- long committedSnapshotId =
- lakeCommitter.commit(committable,
toBucketOffsetsProperty(logOffsets));
+ long committedSnapshotId = lakeCommitter.commit(committable,
logOffsetsProperty);
// commit to fluss
- Map<TableBucket, Long> logEndOffsets = new HashMap<>();
+ FlussTableLakeSnapshot flussTableLakeSnapshot =
+ new FlussTableLakeSnapshot(tableId, committedSnapshotId);
for (TableBucketWriteResult<WriteResult> writeResult :
committableWriteResults) {
- logEndOffsets.put(writeResult.tableBucket(),
writeResult.logEndOffset());
+ TableBucket tableBucket = writeResult.tableBucket();
+ if (writeResult.tableBucket().getPartitionId() == null) {
+ flussTableLakeSnapshot.addBucketOffset(tableBucket,
writeResult.logEndOffset());
+ } else {
+ flussTableLakeSnapshot.addPartitionBucketOffset(
+ tableBucket, writeResult.partitionName(),
writeResult.logEndOffset());
+ }
}
- flussTableLakeSnapshotCommitter.commit(
- new FlussTableLakeSnapshot(tableId, committedSnapshotId,
logEndOffsets));
+ flussTableLakeSnapshotCommitter.commit(flussTableLakeSnapshot);
return committable;
}
}
+ /**
+ * Merge the log offsets of latest snapshot with current written bucket
offsets to get full log
+ * offsets.
+ */
+ private Map<String, String> toBucketOffsetsProperty(
+ TablePath tablePath,
+ @Nullable LakeSnapshot latestLakeSnapshot,
+ List<TableBucketWriteResult<WriteResult>> currentWriteResults)
+ throws Exception {
+ // first of all, we need to merge latest lake snapshot with current
write results
+ Map<TableBucket, Long> tableBucketOffsets = new HashMap<>();
+ Map<Long, String> partitionNameById = new HashMap<>();
+ if (latestLakeSnapshot != null) {
+ tableBucketOffsets = new
HashMap<>(latestLakeSnapshot.getTableBucketsOffset());
+ partitionNameById = new
HashMap<>(latestLakeSnapshot.getPartitionNameById());
+ }
+
+ for (TableBucketWriteResult<WriteResult> tableBucketWriteResult :
currentWriteResults) {
+ tableBucketOffsets.put(
+ tableBucketWriteResult.tableBucket(),
tableBucketWriteResult.logEndOffset());
+ if (tableBucketWriteResult.tableBucket().getPartitionId() != null
+ && tableBucketWriteResult.partitionName() != null) {
+ partitionNameById.put(
+ tableBucketWriteResult.tableBucket().getPartitionId(),
+ tableBucketWriteResult.partitionName());
+ }
+ }
+
+ List<String> partitionKeys = new ArrayList<>();
+ if (!partitionNameById.isEmpty()) {
+ partitionKeys =
admin.getTableInfo(tablePath).get().getPartitionKeys();
+ }
+
+ // then, serialize the bucket offsets, partition name by id
+ return toBucketOffsetsProperty(tableBucketOffsets, partitionNameById,
partitionKeys);
+ }
+
public static Map<String, String> toBucketOffsetsProperty(
- Map<TableBucket, Long> tableBucketOffsets) throws IOException {
+ Map<TableBucket, Long> tableBucketOffsets,
+ Map<Long, String> partitionNameById,
+ List<String> partitionKeys)
+ throws IOException {
StringWriter sw = new StringWriter();
try (JsonGenerator gen = JACKSON_FACTORY.createGenerator(sw)) {
gen.writeStartArray();
for (Map.Entry<TableBucket, Long> entry :
tableBucketOffsets.entrySet()) {
+ Long partitionId = entry.getKey().getPartitionId();
+ String partitionQualifiedName = null;
+ if (partitionId != null) {
+ // the partitionName is 2025$12$03, we need to convert to
+ // qualified name year=2025/month=12/day=03
+ partitionQualifiedName =
+ ResolvedPartitionSpec.fromPartitionName(
+ partitionKeys,
partitionNameById.get(partitionId))
+ .getPartitionQualifiedName();
+ }
BucketOffsetJsonSerde.INSTANCE.serialize(
new BucketOffset(
entry.getValue(),
entry.getKey().getBucket(),
entry.getKey().getPartitionId(),
- // todo: fill partition name in #1448
- null),
+ partitionQualifiedName),
gen);
}
gen.writeEndArray();
@@ -240,24 +294,6 @@ public class TieringCommitOperator<WriteResult,
Committable>
};
}
- /**
- * Merge the log offsets of latest snapshot with current written bucket
offsets to get full log
- * offsets.
- */
- private Map<TableBucket, Long> mergeTableBucketOffsets(
- @Nullable LakeSnapshot latestLakeSnapshot,
- List<TableBucketWriteResult<WriteResult>> currentWriteResults) {
- Map<TableBucket, Long> tableBucketOffsets =
- latestLakeSnapshot == null
- ? new HashMap<>()
- : new
HashMap<>(latestLakeSnapshot.getTableBucketsOffset());
- for (TableBucketWriteResult<WriteResult> tableBucketWriteResult :
currentWriteResults) {
- tableBucketOffsets.put(
- tableBucketWriteResult.tableBucket(),
tableBucketWriteResult.logEndOffset());
- }
- return tableBucketOffsets;
- }
-
@Nullable
private LakeSnapshot getLatestLakeSnapshot(TablePath tablePath) throws
Exception {
LakeSnapshot flussCurrentLakeSnapshot;
@@ -293,17 +329,8 @@ public class TieringCommitOperator<WriteResult,
Committable>
if (missingCommittedSnapshot != null) {
// commit this missing snapshot to fluss
TableInfo tableInfo = admin.getTableInfo(tablePath).get();
- Map<String, Long> partitionIdByName = null;
- if (tableInfo.isPartitioned()) {
- partitionIdByName =
- admin.listPartitionInfos(tablePath).get().stream()
- .collect(
- Collectors.toMap(
-
PartitionInfo::getPartitionName,
-
PartitionInfo::getPartitionId));
- }
flussTableLakeSnapshotCommitter.commit(
- tableInfo.getTableId(), partitionIdByName,
missingCommittedSnapshot);
+ tableInfo.getTableId(), missingCommittedSnapshot);
// abort this committable to delete the written files
lakeCommitter.abort(committable);
throw new IllegalStateException(
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TableBucketWriteResult.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TableBucketWriteResult.java
index 2d06898fe..7260fbf8f 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TableBucketWriteResult.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TableBucketWriteResult.java
@@ -39,6 +39,9 @@ public class TableBucketWriteResult<WriteResult> implements
Serializable {
private final TableBucket tableBucket;
+ // null when the bucket is not for a partition
+ @Nullable private final String partitionName;
+
// will be null when no any data write, such as for tiering a empty log
split
@Nullable private final WriteResult writeResult;
@@ -53,11 +56,13 @@ public class TableBucketWriteResult<WriteResult> implements
Serializable {
public TableBucketWriteResult(
TablePath tablePath,
TableBucket tableBucket,
+ @Nullable String partitionName,
@Nullable WriteResult writeResult,
long logEndOffset,
int numberOfWriteResults) {
this.tablePath = tablePath;
this.tableBucket = tableBucket;
+ this.partitionName = partitionName;
this.writeResult = writeResult;
this.logEndOffset = logEndOffset;
this.numberOfWriteResults = numberOfWriteResults;
@@ -71,6 +76,11 @@ public class TableBucketWriteResult<WriteResult> implements
Serializable {
return tableBucket;
}
+ @Nullable
+ public String partitionName() {
+ return partitionName;
+ }
+
@Nullable
public WriteResult writeResult() {
return writeResult;
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java
index 6af8ab53b..486b361af 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java
@@ -65,6 +65,7 @@ public class TableBucketWriteResultSerializer<WriteResult>
if (tableBucket.getPartitionId() != null) {
out.writeBoolean(true);
out.writeLong(tableBucket.getPartitionId());
+ out.writeUTF(tableBucketWriteResult.partitionName());
} else {
out.writeBoolean(false);
}
@@ -107,8 +108,10 @@ public class TableBucketWriteResultSerializer<WriteResult>
// deserialize bucket
long tableId = in.readLong();
Long partitionId = null;
+ String partitionName = null;
if (in.readBoolean()) {
partitionId = in.readLong();
+ partitionName = in.readUTF();
}
int bucketId = in.readInt();
TableBucket tableBucket = new TableBucket(tableId, partitionId,
bucketId);
@@ -129,6 +132,11 @@ public class TableBucketWriteResultSerializer<WriteResult>
// deserialize number of write results
int numberOfWriteResults = in.readInt();
return new TableBucketWriteResult<>(
- tablePath, tableBucket, writeResult, logEndOffset,
numberOfWriteResults);
+ tablePath,
+ tableBucket,
+ partitionName,
+ writeResult,
+ logEndOffset,
+ numberOfWriteResults);
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringSplitReader.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringSplitReader.java
index 80cbe664a..e3b0107a4 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringSplitReader.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringSplitReader.java
@@ -279,9 +279,13 @@ public class TieringSplitReader<WriteResult>
// todo: should unsubscribe the log split if unsubscribe
bucket for
// un-partitioned table is supported
}
+ TieringSplit currentTieringSplit =
currentTableSplitsByBucket.remove(bucket);
+ String currentSplitId = currentTieringSplit.splitId();
// put write result of the bucket
- writeResults.put(bucket, completeLakeWriter(bucket,
stoppingOffset));
- String currentSplitId =
currentTableSplitsByBucket.remove(bucket).splitId();
+ writeResults.put(
+ bucket,
+ completeLakeWriter(
+ bucket,
currentTieringSplit.getPartitionName(), stoppingOffset));
// put split of the bucket
finishedSplitIds.put(bucket, currentSplitId);
LOG.info("Split {} has been finished.", currentSplitId);
@@ -312,13 +316,15 @@ public class TieringSplitReader<WriteResult>
}
private TableBucketWriteResult<WriteResult> completeLakeWriter(
- TableBucket bucket, long logEndOffset) throws IOException {
+ TableBucket bucket, @Nullable String partitionName, long
logEndOffset)
+ throws IOException {
LakeWriter<WriteResult> lakeWriter = lakeWriters.remove(bucket);
WriteResult writeResult = lakeWriter.complete();
lakeWriter.close();
return toTableBucketWriteResult(
currentTablePath,
bucket,
+ partitionName,
writeResult,
logEndOffset,
checkNotNull(currentTableNumberOfSplits));
@@ -335,6 +341,7 @@ public class TieringSplitReader<WriteResult>
toTableBucketWriteResult(
logSplit.getTablePath(),
tableBucket,
+ logSplit.getPartitionName(),
null,
logSplit.getStoppingOffset(),
logSplit.getNumberOfSplits()));
@@ -355,7 +362,8 @@ public class TieringSplitReader<WriteResult>
long logEndOffset = currentSnapshotSplit.getLogOffsetOfSnapshot();
String splitId =
currentTableSplitsByBucket.remove(tableBucket).splitId();
TableBucketWriteResult<WriteResult> writeResult =
- completeLakeWriter(tableBucket, logEndOffset);
+ completeLakeWriter(
+ tableBucket, currentSnapshotSplit.getPartitionName(),
logEndOffset);
closeCurrentSnapshotSplit();
mayFinishCurrentTable();
return new TableBucketWriteResultWithSplitIds(
@@ -472,11 +480,12 @@ public class TieringSplitReader<WriteResult>
private TableBucketWriteResult<WriteResult> toTableBucketWriteResult(
TablePath tablePath,
TableBucket tableBucket,
+ @Nullable String partitionName,
@Nullable WriteResult writeResult,
long endLogOffset,
int numberOfSplits) {
return new TableBucketWriteResult<>(
- tablePath, tableBucket, writeResult, endLogOffset,
numberOfSplits);
+ tablePath, tableBucket, partitionName, writeResult,
endLogOffset, numberOfSplits);
}
private class TableBucketWriteResultWithSplitIds
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
index 26b3580dd..c73cd4999 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
@@ -20,7 +20,7 @@ package com.alibaba.fluss.flink.tiering.committer;
import com.alibaba.fluss.client.metadata.LakeSnapshot;
import com.alibaba.fluss.flink.utils.FlinkTestBase;
import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
-import com.alibaba.fluss.metadata.PartitionInfo;
+import com.alibaba.fluss.metadata.ResolvedPartitionSpec;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TablePath;
@@ -34,7 +34,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import static
com.alibaba.fluss.record.TestData.DATA1_PARTITIONED_TABLE_DESCRIPTOR;
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
@@ -71,14 +70,15 @@ class FlussTableLakeSnapshotCommitterTest extends
FlinkTestBase {
? DATA1_PARTITIONED_TABLE_DESCRIPTOR
: DATA1_TABLE_DESCRIPTOR);
- List<Long> partitions;
- Map<String, Long> partitionNameAndIds = Collections.emptyMap();
+ List<String> partitions;
+ Map<String, Long> partitionNameAndIds = new HashMap<>();
+ Map<Long, String> expectedPartitionNameById = new HashMap<>();
if (!isPartitioned) {
FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
partitions = Collections.singletonList(null);
} else {
partitionNameAndIds =
FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
- partitions = new ArrayList<>(partitionNameAndIds.values());
+ partitions = new ArrayList<>(partitionNameAndIds.keySet());
}
CommittedLakeSnapshot committedLakeSnapshot = new
CommittedLakeSnapshot(3);
@@ -86,34 +86,37 @@ class FlussTableLakeSnapshotCommitterTest extends
FlinkTestBase {
Map<TableBucket, Long> expectedOffsets = new HashMap<>();
for (int bucket = 0; bucket < 3; bucket++) {
long bucketOffset = bucket * bucket;
- for (Long partition : partitions) {
- if (partition == null) {
+ for (String partitionName : partitions) {
+ if (partitionName == null) {
committedLakeSnapshot.addBucket(bucket, bucketOffset);
expectedOffsets.put(new TableBucket(tableId, bucket),
bucketOffset);
} else {
- committedLakeSnapshot.addPartitionBucket(partition,
bucket, bucketOffset);
- expectedOffsets.put(new TableBucket(tableId, partition,
bucket), bucketOffset);
+ long partitionId = partitionNameAndIds.get(partitionName);
+ committedLakeSnapshot.addPartitionBucket(
+ partitionId,
+ ResolvedPartitionSpec.fromPartitionName(
+ Collections.singletonList("a"),
partitionName)
+ .getPartitionQualifiedName(),
+ bucket,
+ bucketOffset);
+ expectedOffsets.put(
+ new TableBucket(tableId, partitionId, bucket),
bucketOffset);
+ expectedPartitionNameById.put(partitionId, partitionName);
}
}
}
- Map<String, Long> partitionIdByName = null;
- if (isPartitioned) {
- partitionIdByName =
- admin.listPartitionInfos(tablePath).get().stream()
- .collect(
- Collectors.toMap(
- PartitionInfo::getPartitionName,
- PartitionInfo::getPartitionId));
- }
-
// commit offsets
- flussTableLakeSnapshotCommitter.commit(tableId, partitionIdByName,
committedLakeSnapshot);
+ flussTableLakeSnapshotCommitter.commit(tableId, committedLakeSnapshot);
LakeSnapshot lakeSnapshot =
admin.getLatestLakeSnapshot(tablePath).get();
assertThat(lakeSnapshot.getSnapshotId()).isEqualTo(3);
// get and check the offsets
Map<TableBucket, Long> bucketLogOffsets =
lakeSnapshot.getTableBucketsOffset();
assertThat(bucketLogOffsets).isEqualTo(expectedOffsets);
+
+ // check partition name
+ Map<Long, String> partitionNameById =
lakeSnapshot.getPartitionNameById();
+ assertThat(partitionNameById).isEqualTo(expectedPartitionNameById);
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
index e5e47a8ff..53721c4e1 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
@@ -26,6 +26,7 @@ import
com.alibaba.fluss.flink.tiering.event.FinishedTieringEvent;
import com.alibaba.fluss.flink.tiering.source.TableBucketWriteResult;
import com.alibaba.fluss.flink.utils.FlinkTestBase;
import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
+import com.alibaba.fluss.metadata.ResolvedPartitionSpec;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.utils.types.Tuple2;
@@ -180,7 +181,12 @@ class TieringCommitOperatorTest extends FlinkTestBase {
long currentOffset = offset++;
committerOperator.processElement(
createTableBucketWriteResultStreamRecord(
- tablePath, tableBucket, 1, currentOffset,
numberOfWriteResults));
+ tablePath,
+ tableBucket,
+ partitionIdAndNameEntry.getKey(),
+ 1,
+ currentOffset,
+ numberOfWriteResults));
expectedLogEndOffsets.put(tableBucket, currentOffset);
}
if (bucket == 2) {
@@ -257,9 +263,10 @@ class TieringCommitOperatorTest extends FlinkTestBase {
tablePath,
tableId,
2,
- getExpectedLogEndOffsets(tableId, mockCommittedSnapshot,
Collections.emptyMap()),
+ getExpectedLogEndOffsets(tableId, mockCommittedSnapshot),
+ mockCommittedSnapshot.getQualifiedPartitionNameById(),
String.format(
- "The current Fluss's lake snapshot %d is less than
lake actual snapshot %d committed by Fluss for table: {tablePath=%s,
tableId=%d},"
+ "The current Fluss's lake snapshot %s is less than
lake actual snapshot %d committed by Fluss for table: {tablePath=%s,
tableId=%d},"
+ " missing snapshot: %s.",
null,
mockCommittedSnapshot.getLakeSnapshotId(),
@@ -309,7 +316,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
TableBucket tableBucket = new TableBucket(tableId,
partitionId, bucket);
committerOperator.processElement(
createTableBucketWriteResultStreamRecord(
- tablePath, tableBucket, 3, 3,
numberOfWriteResults));
+ tablePath, tableBucket, partitionName, 3, 3,
numberOfWriteResults));
}
}
@@ -317,9 +324,10 @@ class TieringCommitOperatorTest extends FlinkTestBase {
tablePath,
tableId,
3,
- getExpectedLogEndOffsets(tableId, mockCommittedSnapshot,
Collections.emptyMap()),
+ getExpectedLogEndOffsets(tableId, mockCommittedSnapshot),
+ mockCommittedSnapshot.getQualifiedPartitionNameById(),
String.format(
- "The current Fluss's lake snapshot %d is less than
lake actual snapshot %d committed by Fluss for table: {tablePath=%s,
tableId=%d}, missing snapshot: %s.",
+ "The current Fluss's lake snapshot %s is less than
lake actual snapshot %d committed by Fluss for table: {tablePath=%s,
tableId=%d}, missing snapshot: %s.",
null,
mockCommittedSnapshot.getLakeSnapshotId(),
tablePath,
@@ -334,7 +342,13 @@ class TieringCommitOperatorTest extends FlinkTestBase {
if (partition == null) {
mockCommittedSnapshot.addBucket(bucket, bucket + 1);
} else {
- mockCommittedSnapshot.addPartitionBucket(partition,
bucket, bucket + 1);
+ mockCommittedSnapshot.addPartitionBucket(
+ partition,
+ ResolvedPartitionSpec.fromPartitionValue(
+ "partition_key", "partition-" +
partition)
+ .getPartitionQualifiedName(),
+ bucket,
+ bucket + 1);
}
}
}
@@ -342,9 +356,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
}
private Map<TableBucket, Long> getExpectedLogEndOffsets(
- long tableId,
- CommittedLakeSnapshot committedLakeSnapshot,
- Map<String, Long> partitionIdByName) {
+ long tableId, CommittedLakeSnapshot committedLakeSnapshot) {
Map<TableBucket, Long> expectedLogEndOffsets = new HashMap<>();
for (Map.Entry<Tuple2<Long, Integer>, Long> entry :
committedLakeSnapshot.getLogEndOffsets().entrySet()) {
@@ -368,10 +380,23 @@ class TieringCommitOperatorTest extends FlinkTestBase {
@Nullable Integer writeResult,
long logEndOffset,
int numberOfWriteResults) {
+ return createTableBucketWriteResultStreamRecord(
+ tablePath, tableBucket, null, writeResult, logEndOffset,
numberOfWriteResults);
+ }
+
+ private StreamRecord<TableBucketWriteResult<TestingWriteResult>>
+ createTableBucketWriteResultStreamRecord(
+ TablePath tablePath,
+ TableBucket tableBucket,
+ @Nullable String partitionName,
+ @Nullable Integer writeResult,
+ long logEndOffset,
+ int numberOfWriteResults) {
TableBucketWriteResult<TestingWriteResult> tableBucketWriteResult =
new TableBucketWriteResult<>(
tablePath,
tableBucket,
+ partitionName,
writeResult == null ? null : new
TestingWriteResult(writeResult),
logEndOffset,
numberOfWriteResults);
@@ -408,11 +433,13 @@ class TieringCommitOperatorTest extends FlinkTestBase {
long tableId,
long expectedSnapshotId,
Map<TableBucket, Long> expectedLogEndOffsets,
+ Map<Long, String> expectedPartitionIdByName,
String failedReason)
throws Exception {
LakeSnapshot lakeSnapshot =
admin.getLatestLakeSnapshot(tablePath).get();
assertThat(lakeSnapshot.getSnapshotId()).isEqualTo(expectedSnapshotId);
assertThat(lakeSnapshot.getTableBucketsOffset()).isEqualTo(expectedLogEndOffsets);
+
assertThat(lakeSnapshot.getPartitionNameById()).isEqualTo(expectedPartitionIdByName);
// check the tableId has been send to mark failed
List<OperatorEvent> operatorEvents =
mockOperatorEventGateway.getEventsSent();
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java
index d36ad40d3..d7ef1dd0c 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java
@@ -40,9 +40,11 @@ class TableBucketWriteResultSerializerTest {
TestingWriteResult testingWriteResult = new TestingWriteResult(2);
TablePath tablePath = TablePath.of("db1", "tb1");
TableBucket tableBucket =
- isPartitioned ? new TableBucket(1, 2) : new TableBucket(1,
1000L, 2);
+ isPartitioned ? new TableBucket(1, 1000L, 2) : new
TableBucket(1, 2);
+ String partitionName = isPartitioned ? "partition1" : null;
TableBucketWriteResult<TestingWriteResult> tableBucketWriteResult =
- new TableBucketWriteResult<>(tablePath, tableBucket,
testingWriteResult, 10, 20);
+ new TableBucketWriteResult<>(
+ tablePath, tableBucket, partitionName,
testingWriteResult, 10, 20);
// test serialize and deserialize
byte[] serialized =
tableBucketWriteResultSerializer.serialize(tableBucketWriteResult);
@@ -52,6 +54,7 @@ class TableBucketWriteResultSerializerTest {
assertThat(deserialized.tablePath()).isEqualTo(tablePath);
assertThat(deserialized.tableBucket()).isEqualTo(tableBucket);
+ assertThat(deserialized.partitionName()).isEqualTo(partitionName);
TestingWriteResult deserializedWriteResult =
deserialized.writeResult();
assertThat(deserializedWriteResult).isNotNull();
assertThat(deserializedWriteResult.getWriteResult())
@@ -59,13 +62,15 @@ class TableBucketWriteResultSerializerTest {
assertThat(deserialized.numberOfWriteResults()).isEqualTo(20);
// verify when writeResult is null
- tableBucketWriteResult = new TableBucketWriteResult<>(tablePath,
tableBucket, null, 20, 30);
+ tableBucketWriteResult =
+ new TableBucketWriteResult<>(tablePath, tableBucket,
partitionName, null, 20, 30);
serialized =
tableBucketWriteResultSerializer.serialize(tableBucketWriteResult);
deserialized =
tableBucketWriteResultSerializer.deserialize(
tableBucketWriteResultSerializer.getVersion(),
serialized);
assertThat(deserialized.tablePath()).isEqualTo(tablePath);
assertThat(deserialized.tableBucket()).isEqualTo(tableBucket);
+ assertThat(deserialized.partitionName()).isEqualTo(partitionName);
assertThat(deserialized.writeResult()).isNull();
assertThat(deserialized.numberOfWriteResults()).isEqualTo(30);
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
index 8e16e0fc4..08bcae786 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
@@ -154,6 +154,7 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
if (bucketOffset.getPartitionId() != null) {
committedLakeSnapshot.addPartitionBucket(
bucketOffset.getPartitionId(),
+ bucketOffset.getPartitionQualifiedName(),
bucketOffset.getBucket(),
bucketOffset.getLogOffset());
} else {
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
index 576d47229..0bccd2fb7 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
@@ -165,8 +165,8 @@ class PaimonTieringITCase extends
FlinkPaimonTieringTestBase {
put(
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
"["
- +
"{\"partition_id\":0,\"bucket_id\":0,\"log_offset\":3},"
- +
"{\"partition_id\":1,\"bucket_id\":0,\"log_offset\":3}"
+ +
"{\"partition_id\":0,\"bucket_id\":0,\"partition_name\":\"date=2025\",\"log_offset\":3},"
+ +
"{\"partition_id\":1,\"bucket_id\":0,\"partition_name\":\"date=2026\",\"log_offset\":3}"
+ "]");
}
};
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
index 16984f7fe..93fd23a2f 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
@@ -62,11 +62,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import static
com.alibaba.fluss.flink.tiering.committer.TieringCommitOperator.toBucketOffsetsProperty;
+import static
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
@@ -180,7 +182,11 @@ class PaimonTieringTest {
committableSerializer.getVersion(), serialized);
long snapshot =
lakeCommitter.commit(
- paimonCommittable,
toBucketOffsetsProperty(tableBucketOffsets));
+ paimonCommittable,
+ toBucketOffsetsProperty(
+ tableBucketOffsets,
+ partitionIdAndName,
+ getPartitionKeys(tablePath)));
assertThat(snapshot).isEqualTo(1);
}
@@ -271,7 +277,12 @@ class PaimonTieringTest {
createLakeCommitter(tablePath)) {
PaimonCommittable committable =
lakeCommitter.toCommittable(paimonWriteResults);
long snapshot =
- lakeCommitter.commit(committable,
toBucketOffsetsProperty(tableBucketOffsets));
+ lakeCommitter.commit(
+ committable,
+ toBucketOffsetsProperty(
+ tableBucketOffsets,
+ partitionIdAndName,
+ getPartitionKeys(tablePath)));
assertThat(snapshot).isEqualTo(1);
}
@@ -296,7 +307,7 @@ class PaimonTieringTest {
// Test data for different three-level partitions using $ separator
Map<Long, String> partitionIdAndName =
- new HashMap<Long, String>() {
+ new LinkedHashMap<Long, String>() {
{
put(1L, "us-east$2024$01");
put(2L, "eu-central$2023$12");
@@ -324,14 +335,27 @@ class PaimonTieringTest {
}
// Commit all data
+ long snapshot;
try (LakeCommitter<PaimonWriteResult, PaimonCommittable> lakeCommitter
=
createLakeCommitter(tablePath)) {
PaimonCommittable committable =
lakeCommitter.toCommittable(paimonWriteResults);
- long snapshot =
- lakeCommitter.commit(committable,
toBucketOffsetsProperty(tableBucketOffsets));
+ snapshot =
+ lakeCommitter.commit(
+ committable,
+ toBucketOffsetsProperty(
+ tableBucketOffsets,
+ partitionIdAndName,
+ getPartitionKeys(tablePath)));
assertThat(snapshot).isEqualTo(1);
}
+ // check fluss offsets in paimon snapshot property
+ String offsetProperty = getSnapshotLogOffsetProperty(tablePath,
snapshot);
+ assertThat(offsetProperty)
+ .isEqualTo(
+
"[{\"partition_id\":1,\"bucket_id\":0,\"partition_name\":\"region=us-east/year=2024/month=01\",\"log_offset\":2},"
+ +
"{\"partition_id\":2,\"bucket_id\":0,\"partition_name\":\"region=eu-central/year=2023/month=12\",\"log_offset\":2}]");
+
// Verify data for each partition
for (String partition : partitionIdAndName.values()) {
List<LogRecord> expectRecords = recordsByPartition.get(partition);
@@ -770,4 +794,21 @@ class PaimonTieringTest {
paimonCatalog.createDatabase(tablePath.getDatabaseName(), true);
paimonCatalog.createTable(toPaimon(tablePath),
paimonSchemaBuilder.build(), true);
}
+
+ private String getSnapshotLogOffsetProperty(TablePath tablePath, long
snapshotId)
+ throws Exception {
+ Identifier identifier = toPaimon(tablePath);
+ FileStoreTable fileStoreTable = (FileStoreTable)
paimonCatalog.getTable(identifier);
+ return fileStoreTable
+ .snapshotManager()
+ .snapshot(snapshotId)
+ .properties()
+ .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
+ }
+
+ private List<String> getPartitionKeys(TablePath tablePath) throws
Exception {
+ Identifier identifier = toPaimon(tablePath);
+ FileStoreTable fileStoreTable = (FileStoreTable)
paimonCatalog.getTable(identifier);
+ return fileStoreTable.partitionKeys();
+ }
}
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto
b/fluss-rpc/src/main/proto/FlussApi.proto
index 246dbe89e..bf2f9aaa6 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -448,6 +448,7 @@ message PbLakeTableOffsetForBucket {
required int32 bucket_id = 2;
optional int64 log_start_offset = 3;
optional int64 log_end_offset = 4;
+ optional string partition_name = 5;
}
message CommitLakeTableSnapshotResponse {
@@ -773,6 +774,7 @@ message PbLakeSnapshotForBucket {
optional int64 partition_id = 1;
required int32 bucket_id = 2;
optional int64 log_offset = 3;
+ optional string partition_name = 4;
}
message PbRemotePathAndLocalFile {
diff --git
a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java
b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java
index 034163493..a1363e6b2 100644
---
a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java
+++
b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java
@@ -1427,6 +1427,7 @@ public class ServerRpcMessageUtils {
long snapshotId = pdLakeTableSnapshotInfo.getSnapshotId();
Map<TableBucket, Long> bucketLogStartOffset = new HashMap<>();
Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+ Map<Long, String> partitionNameByPartitionId = new HashMap<>();
for (PbLakeTableOffsetForBucket lakeTableOffsetForBucket :
pdLakeTableSnapshotInfo.getBucketsReqsList()) {
@@ -1447,11 +1448,20 @@ public class ServerRpcMessageUtils {
: null;
bucketLogStartOffset.put(tableBucket, logStartOffset);
bucketLogEndOffset.put(tableBucket, logEndOffset);
+
+ if (lakeTableOffsetForBucket.hasPartitionName()) {
+ partitionNameByPartitionId.put(
+ partitionId,
lakeTableOffsetForBucket.getPartitionName());
+ }
}
lakeTableInfoByTableId.put(
tableId,
new LakeTableSnapshot(
- snapshotId, tableId, bucketLogStartOffset,
bucketLogEndOffset));
+ snapshotId,
+ tableId,
+ bucketLogStartOffset,
+ bucketLogEndOffset,
+ partitionNameByPartitionId));
}
return new CommitLakeTableSnapshotData(lakeTableInfoByTableId);
}
@@ -1517,6 +1527,13 @@ public class ServerRpcMessageUtils {
.setLogOffset(logEndLogOffsetEntry.getValue());
if (tableBucket.getPartitionId() != null) {
pbLakeSnapshotForBucket.setPartitionId(tableBucket.getPartitionId());
+ String partitionName =
+ lakeTableSnapshot
+ .getPartitionNameIdByPartitionId()
+ .get(tableBucket.getPartitionId());
+ if (partitionName != null) {
+ pbLakeSnapshotForBucket.setPartitionName(partitionName);
+ }
}
}
diff --git
a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java
b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java
index 2d0500f15..f1899ffec 100644
---
a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java
+++
b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java
@@ -789,12 +789,17 @@ public class ZooKeeperClient implements AutoCloseable {
new HashMap<>(previous.getBucketLogEndOffset());
bucketLogEndOffset.putAll(lakeTableSnapshot.getBucketLogEndOffset());
+ Map<Long, String> partitionNameById =
+ new HashMap<>(previous.getPartitionNameIdByPartitionId());
+
partitionNameById.putAll(lakeTableSnapshot.getPartitionNameIdByPartitionId());
+
lakeTableSnapshot =
new LakeTableSnapshot(
lakeTableSnapshot.getSnapshotId(),
lakeTableSnapshot.getTableId(),
bucketLogStartOffset,
- bucketLogEndOffset);
+ bucketLogEndOffset,
+ partitionNameById);
zkClient.setData().forPath(path,
LakeTableZNode.encode(lakeTableSnapshot));
} else {
zkClient.create()
diff --git
a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/LakeTableSnapshot.java
b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/LakeTableSnapshot.java
index 8bf9bec94..96dfa3eb1 100644
---
a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/LakeTableSnapshot.java
+++
b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/LakeTableSnapshot.java
@@ -37,15 +37,21 @@ public class LakeTableSnapshot {
private final Map<TableBucket, Long> bucketLogStartOffset;
private final Map<TableBucket, Long> bucketLogEndOffset;
+ // mapping from partition id to partition name, will be empty if the table
is not partitioned
+ // table
+ private final Map<Long, String> partitionNameIdByPartitionId;
+
public LakeTableSnapshot(
long snapshotId,
long tableId,
Map<TableBucket, Long> bucketLogStartOffset,
- Map<TableBucket, Long> bucketLogEndOffset) {
+ Map<TableBucket, Long> bucketLogEndOffset,
+ Map<Long, String> partitionNameIdByPartitionId) {
this.snapshotId = snapshotId;
this.tableId = tableId;
this.bucketLogStartOffset = bucketLogStartOffset;
this.bucketLogEndOffset = bucketLogEndOffset;
+ this.partitionNameIdByPartitionId = partitionNameIdByPartitionId;
}
public long getSnapshotId() {
@@ -80,24 +86,31 @@ public class LakeTableSnapshot {
return bucketLogStartOffset;
}
+ public Map<Long, String> getPartitionNameIdByPartitionId() {
+ return partitionNameIdByPartitionId;
+ }
+
@Override
public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof LakeTableSnapshot)) {
+ if (o == null || getClass() != o.getClass()) {
return false;
}
LakeTableSnapshot that = (LakeTableSnapshot) o;
return snapshotId == that.snapshotId
&& tableId == that.tableId
&& Objects.equals(bucketLogStartOffset,
that.bucketLogStartOffset)
- && Objects.equals(bucketLogEndOffset, that.bucketLogEndOffset);
+ && Objects.equals(bucketLogEndOffset, that.bucketLogEndOffset)
+ && Objects.equals(partitionNameIdByPartitionId,
that.partitionNameIdByPartitionId);
}
@Override
public int hashCode() {
- return Objects.hash(snapshotId, tableId, bucketLogStartOffset,
bucketLogEndOffset);
+ return Objects.hash(
+ snapshotId,
+ tableId,
+ bucketLogStartOffset,
+ bucketLogEndOffset,
+ partitionNameIdByPartitionId);
}
@Override
@@ -111,6 +124,8 @@ public class LakeTableSnapshot {
+ bucketLogStartOffset
+ ", bucketLogEndOffset="
+ bucketLogEndOffset
+ + ", partitionNameIdByPartitionId="
+ + partitionNameIdByPartitionId
+ '}';
}
}
diff --git
a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/LakeTableSnapshotJsonSerde.java
b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/LakeTableSnapshotJsonSerde.java
index a4f539149..6a1df0bea 100644
---
a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/LakeTableSnapshotJsonSerde.java
+++
b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/LakeTableSnapshotJsonSerde.java
@@ -43,6 +43,7 @@ public class LakeTableSnapshotJsonSerde
private static final String BUCKET_ID = "bucket_id";
private static final String LOG_START_OFFSET = "log_start_offset";
private static final String LOG_END_OFFSET = "log_end_offset";
+ private static final String PARTITION_NAME = "partition_name";
private static final int VERSION = 1;
@@ -55,11 +56,19 @@ public class LakeTableSnapshotJsonSerde
generator.writeNumberField(TABLE_ID, lakeTableSnapshot.getTableId());
generator.writeArrayFieldStart(BUCKETS);
- for (TableBucket tableBucket :
lakeTableSnapshot.getBucketLogStartOffset().keySet()) {
+ for (TableBucket tableBucket :
lakeTableSnapshot.getBucketLogEndOffset().keySet()) {
generator.writeStartObject();
if (tableBucket.getPartitionId() != null) {
generator.writeNumberField(PARTITION_ID,
tableBucket.getPartitionId());
+ // have partition name
+ String partitionName =
+ lakeTableSnapshot
+ .getPartitionNameIdByPartitionId()
+ .get(tableBucket.getPartitionId());
+ if (partitionName != null) {
+ generator.writeStringField(PARTITION_NAME, partitionName);
+ }
}
generator.writeNumberField(BUCKET_ID, tableBucket.getBucket());
@@ -91,6 +100,7 @@ public class LakeTableSnapshotJsonSerde
Iterator<JsonNode> buckets = node.get(BUCKETS).elements();
Map<TableBucket, Long> bucketLogStartOffset = new HashMap<>();
Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+ Map<Long, String> partitionNameIdByPartitionId = new HashMap<>();
while (buckets.hasNext()) {
JsonNode bucket = buckets.next();
TableBucket tableBucket;
@@ -109,7 +119,17 @@ public class LakeTableSnapshotJsonSerde
} else {
bucketLogEndOffset.put(tableBucket, null);
}
+
+ if (partitionId != null && bucket.get(PARTITION_NAME) != null) {
+ partitionNameIdByPartitionId.put(
+ tableBucket.getPartitionId(),
bucket.get(PARTITION_NAME).asText());
+ }
}
- return new LakeTableSnapshot(snapshotId, tableId,
bucketLogStartOffset, bucketLogEndOffset);
+ return new LakeTableSnapshot(
+ snapshotId,
+ tableId,
+ bucketLogStartOffset,
+ bucketLogEndOffset,
+ partitionNameIdByPartitionId);
}
}
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/CommitLakeTableSnapshotITCase.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/CommitLakeTableSnapshotITCase.java
index 77bd44bae..3617f56cc 100644
---
a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/CommitLakeTableSnapshotITCase.java
+++
b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/CommitLakeTableSnapshotITCase.java
@@ -38,6 +38,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import java.time.Duration;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -131,7 +132,11 @@ class CommitLakeTableSnapshotITCase {
LakeTableSnapshot expectedDataLakeTieredInfo =
new LakeTableSnapshot(
- snapshotId, tableId, bucketsLogStartOffset,
bucketsLogEndOffset);
+ snapshotId,
+ tableId,
+ bucketsLogStartOffset,
+ bucketsLogEndOffset,
+ Collections.emptyMap());
checkLakeTableDataInZk(tableId, expectedDataLakeTieredInfo);
}
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/zk/data/LakeTableSnapshotJsonSerdeTest.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/zk/data/LakeTableSnapshotJsonSerdeTest.java
index 82779f700..f5c15cfa1 100644
---
a/fluss-server/src/test/java/com/alibaba/fluss/server/zk/data/LakeTableSnapshotJsonSerdeTest.java
+++
b/fluss-server/src/test/java/com/alibaba/fluss/server/zk/data/LakeTableSnapshotJsonSerdeTest.java
@@ -34,7 +34,12 @@ class LakeTableSnapshotJsonSerdeTest extends
JsonSerdeTestBase<LakeTableSnapshot
@Override
protected LakeTableSnapshot[] createObjects() {
LakeTableSnapshot lakeTableSnapshot1 =
- new LakeTableSnapshot(1, 1L, Collections.emptyMap(),
Collections.emptyMap());
+ new LakeTableSnapshot(
+ 1,
+ 1L,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap());
long tableId = 4;
Map<TableBucket, Long> bucketLogStartOffset = new HashMap<>();
@@ -45,10 +50,18 @@ class LakeTableSnapshotJsonSerdeTest extends
JsonSerdeTestBase<LakeTableSnapshot
bucketLogEndOffset.put(new TableBucket(tableId, 2), 4L);
LakeTableSnapshot lakeTableSnapshot2 =
- new LakeTableSnapshot(2, tableId, bucketLogStartOffset,
bucketLogEndOffset);
+ new LakeTableSnapshot(
+ 2,
+ tableId,
+ bucketLogStartOffset,
+ bucketLogEndOffset,
+ Collections.emptyMap());
tableId = 5;
bucketLogStartOffset = new HashMap<>();
+ Map<Long, String> partitionNameIdByPartitionId = new HashMap<>();
+ partitionNameIdByPartitionId.put(1L, "partition1");
+ partitionNameIdByPartitionId.put(2L, "partition2");
bucketLogStartOffset.put(new TableBucket(tableId, 1L, 1), 1L);
bucketLogStartOffset.put(new TableBucket(tableId, 2L, 1), 2L);
@@ -57,7 +70,12 @@ class LakeTableSnapshotJsonSerdeTest extends
JsonSerdeTestBase<LakeTableSnapshot
bucketLogEndOffset.put(new TableBucket(tableId, 2L, 1), 4L);
LakeTableSnapshot lakeTableSnapshot3 =
- new LakeTableSnapshot(3, tableId, bucketLogStartOffset,
bucketLogEndOffset);
+ new LakeTableSnapshot(
+ 3,
+ tableId,
+ bucketLogStartOffset,
+ bucketLogEndOffset,
+ partitionNameIdByPartitionId);
return new LakeTableSnapshot[] {
lakeTableSnapshot1, lakeTableSnapshot2, lakeTableSnapshot3,
@@ -72,8 +90,8 @@ class LakeTableSnapshotJsonSerdeTest extends
JsonSerdeTestBase<LakeTableSnapshot
+
"\"buckets\":[{\"bucket_id\":2,\"log_start_offset\":2,\"log_end_offset\":4},"
+
"{\"bucket_id\":1,\"log_start_offset\":1,\"log_end_offset\":3}]}",
"{\"version\":1,\"snapshot_id\":3,\"table_id\":5,"
- +
"\"buckets\":[{\"partition_id\":1,\"bucket_id\":1,\"log_start_offset\":1,\"log_end_offset\":3},"
- +
"{\"partition_id\":2,\"bucket_id\":1,\"log_start_offset\":2,\"log_end_offset\":4}]}"
+ +
"\"buckets\":[{\"partition_id\":1,\"partition_name\":\"partition1\",\"bucket_id\":1,\"log_start_offset\":1,\"log_end_offset\":3},"
+ +
"{\"partition_id\":2,\"partition_name\":\"partition2\",\"bucket_id\":1,\"log_start_offset\":2,\"log_end_offset\":4}]}"
};
}
}