xx789633 commented on code in PR #1485:
URL: https://github.com/apache/fluss/pull/1485#discussion_r2259416300


##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java:
##########
@@ -203,32 +203,87 @@ private Committable commitWriteResults(
                     flussCurrentLakeSnapshot == null
                             ? null
                             : flussCurrentLakeSnapshot.getSnapshotId());
-            long committedSnapshotId =
-                    lakeCommitter.commit(committable, 
toBucketOffsetsProperty(logOffsets));
+            long committedSnapshotId = lakeCommitter.commit(committable, 
logOffsetsProperty);
             // commit to fluss
-            Map<TableBucket, Long> logEndOffsets = new HashMap<>();
+            FlussTableLakeSnapshot flussTableLakeSnapshot =
+                    new FlussTableLakeSnapshot(tableId, committedSnapshotId);
             for (TableBucketWriteResult<WriteResult> writeResult : 
committableWriteResults) {
-                logEndOffsets.put(writeResult.tableBucket(), 
writeResult.logEndOffset());
+                TableBucket tableBucket = writeResult.tableBucket();
+                if (writeResult.tableBucket().getPartitionId() == null) {
+                    flussTableLakeSnapshot.addBucketOffset(tableBucket, 
writeResult.logEndOffset());
+                } else {
+                    flussTableLakeSnapshot.addPartitionBucketOffset(
+                            tableBucket, writeResult.partitionName(), 
writeResult.logEndOffset());
+                }
             }
-            flussTableLakeSnapshotCommitter.commit(
-                    new FlussTableLakeSnapshot(tableId, committedSnapshotId, 
logEndOffsets));
+            flussTableLakeSnapshotCommitter.commit(flussTableLakeSnapshot);
             return committable;
         }
     }
 
+    /**
+     * Merge the log offsets of latest snapshot with current written bucket 
offsets to get full log
+     * offsets.
+     */
+    private Map<String, String> toBucketOffsetsProperty(
+            TablePath tablePath,
+            @Nullable LakeSnapshot latestLakeSnapshot,
+            List<TableBucketWriteResult<WriteResult>> currentWriteResults)
+            throws Exception {
+        // first of all, we need to merge latest lake snapshot with current 
write results
+        Map<TableBucket, Long> tableBucketOffsets = new HashMap<>();
+        Map<Long, String> partitionNameById = new HashMap<>();
+        if (latestLakeSnapshot != null) {
+            tableBucketOffsets = new 
HashMap<>(latestLakeSnapshot.getTableBucketsOffset());
+            partitionNameById = new 
HashMap<>(latestLakeSnapshot.getPartitionNameById());
+        }
+
+        for (TableBucketWriteResult<WriteResult> tableBucketWriteResult : 
currentWriteResults) {
+            tableBucketOffsets.put(
+                    tableBucketWriteResult.tableBucket(), 
tableBucketWriteResult.logEndOffset());
+            if (tableBucketWriteResult.tableBucket().getPartitionId() != null
+                    && tableBucketWriteResult.partitionName() != null) {
+                partitionNameById.put(
+                        tableBucketWriteResult.tableBucket().getPartitionId(),
+                        tableBucketWriteResult.partitionName());
+            }
+        }
+
+        List<String> partitionKeys = new ArrayList<>();
+        if (!partitionNameById.isEmpty()) {
+            partitionKeys = 
admin.getTableInfo(tablePath).get().getPartitionKeys();

Review Comment:
   In `checkFlussNotMissingLakeSnapshot`, we also call 
`admin.getTableInfo(tablePath)`. Can we combine these rpc calls to reduce cost?



##########
fluss-common/src/main/java/com/alibaba/fluss/metadata/ResolvedPartitionSpec.java:
##########
@@ -125,6 +125,25 @@ public String getPartitionQualifiedName() {
         return sb.toString();
     }
 
+    public static ResolvedPartitionSpec fromPartitionQualifiedName(String 
qualifiedPartitionName) {

Review Comment:
   Can this function handle the partition name string that contains special 
character "/"?



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java:
##########
@@ -203,32 +203,87 @@ private Committable commitWriteResults(
                     flussCurrentLakeSnapshot == null
                             ? null
                             : flussCurrentLakeSnapshot.getSnapshotId());
-            long committedSnapshotId =
-                    lakeCommitter.commit(committable, 
toBucketOffsetsProperty(logOffsets));
+            long committedSnapshotId = lakeCommitter.commit(committable, 
logOffsetsProperty);
             // commit to fluss
-            Map<TableBucket, Long> logEndOffsets = new HashMap<>();
+            FlussTableLakeSnapshot flussTableLakeSnapshot =
+                    new FlussTableLakeSnapshot(tableId, committedSnapshotId);
             for (TableBucketWriteResult<WriteResult> writeResult : 
committableWriteResults) {
-                logEndOffsets.put(writeResult.tableBucket(), 
writeResult.logEndOffset());
+                TableBucket tableBucket = writeResult.tableBucket();
+                if (writeResult.tableBucket().getPartitionId() == null) {
+                    flussTableLakeSnapshot.addBucketOffset(tableBucket, 
writeResult.logEndOffset());
+                } else {
+                    flussTableLakeSnapshot.addPartitionBucketOffset(
+                            tableBucket, writeResult.partitionName(), 
writeResult.logEndOffset());
+                }
             }
-            flussTableLakeSnapshotCommitter.commit(
-                    new FlussTableLakeSnapshot(tableId, committedSnapshotId, 
logEndOffsets));
+            flussTableLakeSnapshotCommitter.commit(flussTableLakeSnapshot);
             return committable;
         }
     }
 
+    /**
+     * Merge the log offsets of latest snapshot with current written bucket 
offsets to get full log
+     * offsets.
+     */
+    private Map<String, String> toBucketOffsetsProperty(
+            TablePath tablePath,
+            @Nullable LakeSnapshot latestLakeSnapshot,
+            List<TableBucketWriteResult<WriteResult>> currentWriteResults)
+            throws Exception {
+        // first of all, we need to merge latest lake snapshot with current 
write results
+        Map<TableBucket, Long> tableBucketOffsets = new HashMap<>();
+        Map<Long, String> partitionNameById = new HashMap<>();
+        if (latestLakeSnapshot != null) {
+            tableBucketOffsets = new 
HashMap<>(latestLakeSnapshot.getTableBucketsOffset());
+            partitionNameById = new 
HashMap<>(latestLakeSnapshot.getPartitionNameById());
+        }
+
+        for (TableBucketWriteResult<WriteResult> tableBucketWriteResult : 
currentWriteResults) {
+            tableBucketOffsets.put(
+                    tableBucketWriteResult.tableBucket(), 
tableBucketWriteResult.logEndOffset());
+            if (tableBucketWriteResult.tableBucket().getPartitionId() != null
+                    && tableBucketWriteResult.partitionName() != null) {
+                partitionNameById.put(
+                        tableBucketWriteResult.tableBucket().getPartitionId(),
+                        tableBucketWriteResult.partitionName());
+            }
+        }
+
+        List<String> partitionKeys = new ArrayList<>();
+        if (!partitionNameById.isEmpty()) {
+            partitionKeys = 
admin.getTableInfo(tablePath).get().getPartitionKeys();
+        }
+
+        // then, serialize the bucket offsets, partition name by id
+        return toBucketOffsetsProperty(tableBucketOffsets, partitionNameById, 
partitionKeys);
+    }
+
     public static Map<String, String> toBucketOffsetsProperty(
-            Map<TableBucket, Long> tableBucketOffsets) throws IOException {
+            Map<TableBucket, Long> tableBucketOffsets,
+            Map<Long, String> partitionNameById,
+            List<String> partitionKeys)
+            throws IOException {
         StringWriter sw = new StringWriter();
         try (JsonGenerator gen = JACKSON_FACTORY.createGenerator(sw)) {
             gen.writeStartArray();
             for (Map.Entry<TableBucket, Long> entry : 
tableBucketOffsets.entrySet()) {
+                Long partitionId = entry.getKey().getPartitionId();
+                String partitionQualifiedName = null;
+                if (partitionId != null) {
+                    // the partitionName is 2025$12$03, we need to convert to
+                    // qualified name year=2025/month=12/day=03
+                    partitionQualifiedName =
+                            ResolvedPartitionSpec.fromPartitionName(
+                                            partitionKeys, 
partitionNameById.get(partitionId))
+                                    .getPartitionQualifiedName();
+                }
+                partitionNameById.get(entry.getKey().getPartitionId());

Review Comment:
   This is useless?



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java:
##########
@@ -114,15 +114,19 @@ private CommitLakeTableSnapshotRequest 
toCommitLakeTableSnapshotRequest(
 
         pbLakeTableSnapshotInfo.setTableId(flussTableLakeSnapshot.tableId());
         
pbLakeTableSnapshotInfo.setSnapshotId(flussTableLakeSnapshot.lakeSnapshotId());
-        for (Map.Entry<TableBucket, Long> bucketEndOffsetEntry :
+        for (Map.Entry<Tuple2<TableBucket, String>, Long> bucketEndOffsetEntry 
:
                 flussTableLakeSnapshot.logEndOffsets().entrySet()) {
             PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =
                     pbLakeTableSnapshotInfo.addBucketsReq();
-            TableBucket tableBucket = bucketEndOffsetEntry.getKey();
+            TableBucket tableBucket = bucketEndOffsetEntry.getKey().f0;
+            String partitionName = bucketEndOffsetEntry.getKey().f1;
             long endOffset = bucketEndOffsetEntry.getValue();
             if (tableBucket.getPartitionId() != null) {
                 
pbLakeTableOffsetForBucket.setPartitionId(tableBucket.getPartitionId());
             }
+            if (partitionName != null) {

Review Comment:
   Is it possible that partitionId is null while partitionName is not null?



##########
fluss-client/src/main/java/com/alibaba/fluss/client/metadata/LakeSnapshot.java:
##########
@@ -36,9 +36,17 @@ public class LakeSnapshot {
     // the specific log offset of the snapshot
     private final Map<TableBucket, Long> tableBucketsOffset;
 
-    public LakeSnapshot(long snapshotId, Map<TableBucket, Long> 
tableBucketsOffset) {
+    // the partition name by partition id of this lake snapshot if
+    // is a partitioned table, empty if not a partitioned table
+    private final Map<Long, String> partitionNameById;

Review Comment:
   Can we wrap partition name in `tableBucketsOffset`'s key? Since partition id 
is duplicated in `tableBucketsOffset` and `partitionNameById`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to