swuferhong commented on code in PR #2179:
URL: https://github.com/apache/fluss/pull/2179#discussion_r2778670479


##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:
##########
@@ -2035,6 +2073,53 @@ private ControlledShutdownResponse 
tryProcessControlledShutdown(
         return response;
     }
 
+    private AcquireKvSnapshotLeaseResponse tryProcessAcquireKvSnapshotLease(
+            AcquireKvSnapshotLeaseEvent event) throws Exception {
+        AcquireKvSnapshotLeaseResponse response = new 
AcquireKvSnapshotLeaseResponse();
+        Map<TableBucket, Long> unavailableSnapshots =
+                kvSnapshotLeaseManager.acquireLease(
+                        event.getLeaseId(),
+                        event.getLeaseDuration(),
+                        event.getTableIdToLeasedBucket());
+
+        Map<Long, List<PbKvSnapshotLeaseForBucket>> pbFailedTables = new 
HashMap<>();
+        for (Map.Entry<TableBucket, Long> entry : 
unavailableSnapshots.entrySet()) {
+            TableBucket tb = entry.getKey();
+            Long snapshotId = entry.getValue();
+            PbKvSnapshotLeaseForBucket pbBucket =
+                    new 
PbKvSnapshotLeaseForBucket().setBucketId(tb.getBucket());
+            if (tb.getPartitionId() != null) {
+                pbBucket.setPartitionId(tb.getPartitionId());
+            }
+            pbBucket.setSnapshotId(snapshotId);
+            pbFailedTables.computeIfAbsent(tb.getTableId(), k -> new 
ArrayList<>()).add(pbBucket);
+        }
+
+        for (Map.Entry<Long, List<PbKvSnapshotLeaseForBucket>> entry : 
pbFailedTables.entrySet()) {
+            response.addTablesLeaseRe()
+                    .setTableId(entry.getKey())
+                    .addAllBucketsReqs(entry.getValue());
+        }
+        return response;
+    }
+
+    private ReleaseKvSnapshotLeaseResponse tryProcessReleaseKvSnapshotLease(
+            ReleaseKvSnapshotLeaseEvent event) throws Exception {
+        ReleaseKvSnapshotLeaseResponse response = new 
ReleaseKvSnapshotLeaseResponse();
+        Map<Long, List<TableBucket>> tableIdToReleasedBucket = 
event.getTableIdToReleasedBucket();
+        if (tableIdToReleasedBucket.isEmpty()) {
+            // release all
+            boolean exist = 
kvSnapshotLeaseManager.releaseAll(event.getLeaseId());
+            if (!exist) {
+                throw new KvSnapshotLeaseNotExistException(

Review Comment:
   I change not to throw exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to