luoyuxia commented on code in PR #1485:
URL: https://github.com/apache/fluss/pull/1485#discussion_r2265748970
##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java:
##########
@@ -203,32 +203,87 @@ private Committable commitWriteResults(
flussCurrentLakeSnapshot == null
? null
: flussCurrentLakeSnapshot.getSnapshotId());
- long committedSnapshotId =
- lakeCommitter.commit(committable,
toBucketOffsetsProperty(logOffsets));
+ long committedSnapshotId = lakeCommitter.commit(committable,
logOffsetsProperty);
// commit to fluss
- Map<TableBucket, Long> logEndOffsets = new HashMap<>();
+ FlussTableLakeSnapshot flussTableLakeSnapshot =
+ new FlussTableLakeSnapshot(tableId, committedSnapshotId);
for (TableBucketWriteResult<WriteResult> writeResult :
committableWriteResults) {
- logEndOffsets.put(writeResult.tableBucket(),
writeResult.logEndOffset());
+ TableBucket tableBucket = writeResult.tableBucket();
+ if (writeResult.tableBucket().getPartitionId() == null) {
+ flussTableLakeSnapshot.addBucketOffset(tableBucket,
writeResult.logEndOffset());
+ } else {
+ flussTableLakeSnapshot.addPartitionBucketOffset(
+ tableBucket, writeResult.partitionName(),
writeResult.logEndOffset());
+ }
}
- flussTableLakeSnapshotCommitter.commit(
- new FlussTableLakeSnapshot(tableId, committedSnapshotId,
logEndOffsets));
+ flussTableLakeSnapshotCommitter.commit(flussTableLakeSnapshot);
return committable;
}
}
+ /**
+ * Merge the log offsets of latest snapshot with current written bucket
offsets to get full log
+ * offsets.
+ */
+ private Map<String, String> toBucketOffsetsProperty(
+ TablePath tablePath,
+ @Nullable LakeSnapshot latestLakeSnapshot,
+ List<TableBucketWriteResult<WriteResult>> currentWriteResults)
+ throws Exception {
+ // first of all, we need to merge latest lake snapshot with current
write results
+ Map<TableBucket, Long> tableBucketOffsets = new HashMap<>();
+ Map<Long, String> partitionNameById = new HashMap<>();
+ if (latestLakeSnapshot != null) {
+ tableBucketOffsets = new
HashMap<>(latestLakeSnapshot.getTableBucketsOffset());
+ partitionNameById = new
HashMap<>(latestLakeSnapshot.getPartitionNameById());
+ }
+
+ for (TableBucketWriteResult<WriteResult> tableBucketWriteResult :
currentWriteResults) {
+ tableBucketOffsets.put(
+ tableBucketWriteResult.tableBucket(),
tableBucketWriteResult.logEndOffset());
+ if (tableBucketWriteResult.tableBucket().getPartitionId() != null
+ && tableBucketWriteResult.partitionName() != null) {
+ partitionNameById.put(
+ tableBucketWriteResult.tableBucket().getPartitionId(),
+ tableBucketWriteResult.partitionName());
+ }
+ }
+
+ List<String> partitionKeys = new ArrayList<>();
+ if (!partitionNameById.isEmpty()) {
+ partitionKeys =
admin.getTableInfo(tablePath).get().getPartitionKeys();
Review Comment:
We can combine these rpc calls, but I feel it makes code more complex. And
considering it's a corner case that ` TableInfo tableInfo =
admin.getTableInfo(tablePath).get();` in method
`checkFlussNotMissingLakeSnapshot` is called since it's only happen when some
exception happens. So, I think it's acceptable to not merge in here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]