This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 08758e929 [kv] Commit kv snapshot to zk should be done asynchronously 
and not block coordinator (#1375)
08758e929 is described below

commit 08758e929ca6b100414b1760e9a010d49f86b60e
Author: yuxia Luo <[email protected]>
AuthorDate: Tue Sep 9 11:20:53 2025 +0800

    [kv] Commit kv snapshot to zk should be done asynchronously and not block 
coordinator (#1375)
---
 .../coordinator/CompletedSnapshotStoreManager.java | 17 ++++---
 .../coordinator/CoordinatorEventProcessor.java     | 56 +++++++++++++++-------
 .../event/NotifyKvSnapshotOffsetEvent.java         | 41 ++++++++++++++++
 .../coordinator/CoordinatorEventProcessorTest.java |  7 ++-
 4 files changed, 96 insertions(+), 25 deletions(-)

diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java
index 418cca954..0ea715930 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java
@@ -26,17 +26,18 @@ import 
org.apache.fluss.server.kv.snapshot.CompletedSnapshotStore;
 import org.apache.fluss.server.kv.snapshot.SharedKvFileRegistry;
 import 
org.apache.fluss.server.kv.snapshot.ZooKeeperCompletedSnapshotHandleStore;
 import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.utils.MapUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.concurrent.NotThreadSafe;
+import javax.annotation.concurrent.ThreadSafe;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.function.Function;
 
@@ -48,13 +49,14 @@ import static 
org.apache.fluss.utils.Preconditions.checkNotNull;
  * {@link CompletedSnapshotStore} not exist for a {@link TableBucket}, it will 
create a new {@link
  * CompletedSnapshotStore} for it.
  */
-@NotThreadSafe
+@ThreadSafe
 public class CompletedSnapshotStoreManager {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CompletedSnapshotStoreManager.class);
     private final int maxNumberOfSnapshotsToRetain;
     private final ZooKeeperClient zooKeeperClient;
-    private final Map<TableBucket, CompletedSnapshotStore> 
bucketCompletedSnapshotStores;
+    private final ConcurrentHashMap<TableBucket, CompletedSnapshotStore>
+            bucketCompletedSnapshotStores;
     private final Executor ioExecutor;
     private final Function<ZooKeeperClient, CompletedSnapshotHandleStore>
             makeZookeeperCompletedSnapshotHandleStore;
@@ -67,7 +69,7 @@ public class CompletedSnapshotStoreManager {
                 maxNumberOfSnapshotsToRetain > 0, 
"maxNumberOfSnapshotsToRetain must be positive");
         this.maxNumberOfSnapshotsToRetain = maxNumberOfSnapshotsToRetain;
         this.zooKeeperClient = zooKeeperClient;
-        this.bucketCompletedSnapshotStores = new HashMap<>();
+        this.bucketCompletedSnapshotStores = MapUtils.newConcurrentHashMap();
         this.ioExecutor = ioExecutor;
         this.makeZookeeperCompletedSnapshotHandleStore = 
ZooKeeperCompletedSnapshotHandleStore::new;
     }
@@ -83,7 +85,7 @@ public class CompletedSnapshotStoreManager {
                 maxNumberOfSnapshotsToRetain > 0, 
"maxNumberOfSnapshotsToRetain must be positive");
         this.maxNumberOfSnapshotsToRetain = maxNumberOfSnapshotsToRetain;
         this.zooKeeperClient = zooKeeperClient;
-        this.bucketCompletedSnapshotStores = new HashMap<>();
+        this.bucketCompletedSnapshotStores = MapUtils.newConcurrentHashMap();
         this.ioExecutor = ioExecutor;
         this.makeZookeeperCompletedSnapshotHandleStore = 
makeZookeeperCompletedSnapshotHandleStore;
     }
@@ -191,7 +193,8 @@ public class CompletedSnapshotStoreManager {
                 ioExecutor);
     }
 
-    public Map<TableBucket, CompletedSnapshotStore> 
getBucketCompletedSnapshotStores() {
+    @VisibleForTesting
+    Map<TableBucket, CompletedSnapshotStore> 
getBucketCompletedSnapshotStores() {
         return bucketCompletedSnapshotStores;
     }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
index ffc6192b7..6533565a0 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -56,6 +56,7 @@ import 
org.apache.fluss.server.coordinator.event.DropTableEvent;
 import org.apache.fluss.server.coordinator.event.EventProcessor;
 import org.apache.fluss.server.coordinator.event.FencedCoordinatorEvent;
 import org.apache.fluss.server.coordinator.event.NewTabletServerEvent;
+import org.apache.fluss.server.coordinator.event.NotifyKvSnapshotOffsetEvent;
 import 
org.apache.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent;
 import org.apache.fluss.server.coordinator.event.watcher.TableChangeWatcher;
 import 
org.apache.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher;
@@ -117,6 +118,7 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
     private static final Logger LOG = 
LoggerFactory.getLogger(CoordinatorEventProcessor.class);
 
     private final ZooKeeperClient zooKeeperClient;
+    private final ExecutorService ioExecutor;
     private final CoordinatorContext coordinatorContext;
     private final ReplicaStateMachine replicaStateMachine;
     private final TableBucketStateMachine tableBucketStateMachine;
@@ -190,6 +192,7 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
         this.lakeTableTieringManager = lakeTableTieringManager;
         this.coordinatorMetricGroup = coordinatorMetricGroup;
         this.internalListenerName = 
conf.getString(ConfigOptions.INTERNAL_LISTENER_NAME);
+        this.ioExecutor = ioExecutor;
     }
 
     public CoordinatorEventManager getCoordinatorEventManager() {
@@ -455,9 +458,10 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
                                             
adjustIsrReceivedEvent.getLeaderAndIsrMap())));
         } else if (event instanceof CommitKvSnapshotEvent) {
             CommitKvSnapshotEvent commitKvSnapshotEvent = 
(CommitKvSnapshotEvent) event;
-            CompletableFuture<CommitKvSnapshotResponse> callback =
-                    commitKvSnapshotEvent.getRespCallback();
-            completeFromCallable(callback, () -> 
tryProcessCommitKvSnapshot(commitKvSnapshotEvent));
+            tryProcessCommitKvSnapshot(
+                    commitKvSnapshotEvent, 
commitKvSnapshotEvent.getRespCallback());
+        } else if (event instanceof NotifyKvSnapshotOffsetEvent) {
+            processNotifyKvSnapshotOffsetEvent((NotifyKvSnapshotOffsetEvent) 
event);
         } else if (event instanceof CommitRemoteLogManifestEvent) {
             CommitRemoteLogManifestEvent commitRemoteLogManifestEvent =
                     (CommitRemoteLogManifestEvent) event;
@@ -936,21 +940,40 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
         }
     }
 
-    private CommitKvSnapshotResponse 
tryProcessCommitKvSnapshot(CommitKvSnapshotEvent event)
-            throws Exception {
+    private void tryProcessCommitKvSnapshot(
+            CommitKvSnapshotEvent event, 
CompletableFuture<CommitKvSnapshotResponse> callback) {
         // validate
-        validateFencedEvent(event);
+        try {
+            validateFencedEvent(event);
+        } catch (Exception e) {
+            callback.completeExceptionally(e);
+            return;
+        }
+        // commit the kv snapshot asynchronously
+        ioExecutor.execute(
+                () -> {
+                    try {
+                        TableBucket tb = event.getTableBucket();
+                        CompletedSnapshot completedSnapshot =
+                                
event.getAddCompletedSnapshotData().getCompletedSnapshot();
+                        // add completed snapshot
+                        CompletedSnapshotStore completedSnapshotStore =
+                                
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(tb);
+                        // this involves IO operation (ZK), so we do it in 
ioExecutor
+                        completedSnapshotStore.add(completedSnapshot);
+                        coordinatorEventManager.put(
+                                new NotifyKvSnapshotOffsetEvent(
+                                        tb, completedSnapshot.getLogOffset()));
+                        callback.complete(new CommitKvSnapshotResponse());
+                    } catch (Exception e) {
+                        callback.completeExceptionally(e);
+                    }
+                });
+    }
 
+    private void 
processNotifyKvSnapshotOffsetEvent(NotifyKvSnapshotOffsetEvent event) {
         TableBucket tb = event.getTableBucket();
-        CompletedSnapshot completedSnapshot =
-                event.getAddCompletedSnapshotData().getCompletedSnapshot();
-        // add completed snapshot
-        CompletedSnapshotStore completedSnapshotStore =
-                
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(tb);
-        completedSnapshotStore.add(completedSnapshot);
-
-        // send notify snapshot request to all replicas.
-        // TODO: this should be moved after sending 
AddCompletedSnapshotResponse
+        long logOffset = event.getLogOffset();
         coordinatorRequestBatch.newBatch();
         coordinatorContext
                 .getBucketLeaderAndIsr(tb)
@@ -961,10 +984,9 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
                                                 
coordinatorContext.getFollowers(
                                                         tb, 
leaderAndIsr.leader()),
                                                 tb,
-                                                
completedSnapshot.getLogOffset()));
+                                                logOffset));
         coordinatorRequestBatch.sendNotifyKvSnapshotOffsetRequest(
                 coordinatorContext.getCoordinatorEpoch());
-        return new CommitKvSnapshotResponse();
     }
 
     private CommitRemoteLogManifestResponse tryProcessCommitRemoteLogManifest(
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/NotifyKvSnapshotOffsetEvent.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/NotifyKvSnapshotOffsetEvent.java
new file mode 100644
index 000000000..da6a42067
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/NotifyKvSnapshotOffsetEvent.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.event;
+
+import org.apache.fluss.metadata.TableBucket;
+
+/** An event for notify kv snapshot offset to local tablet servers. */
+public class NotifyKvSnapshotOffsetEvent implements CoordinatorEvent {
+
+    private final TableBucket tableBucket;
+    private final long logOffset;
+
+    public NotifyKvSnapshotOffsetEvent(TableBucket tableBucket, long 
logOffset) {
+        this.tableBucket = tableBucket;
+        this.logOffset = logOffset;
+    }
+
+    public TableBucket getTableBucket() {
+        return tableBucket;
+    }
+
+    public long getLogOffset() {
+        return logOffset;
+    }
+}
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
index b448c4126..82cd8f1a3 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
@@ -763,7 +763,11 @@ class CoordinatorEventProcessorTest {
                                 completedSnapshot, coordinatorEpoch, 
bucketLeaderEpoch),
                         responseCompletableFuture2));
         responseCompletableFuture2.get();
-        verifyReceiveRequestExceptFor(3, leader, 
NotifyKvSnapshotOffsetRequest.class);
+        retry(
+                Duration.ofMinutes(1),
+                () ->
+                        verifyReceiveRequestExceptFor(
+                                3, leader, 
NotifyKvSnapshotOffsetRequest.class));
     }
 
     @Test
@@ -1082,6 +1086,7 @@ class CoordinatorEventProcessorTest {
                         .hasMessage("No requests pending for inbound 
response.");
             } else {
                 // should contain NotifyKvSnapshotOffsetRequest
+                
assertThat(testTabletServerGateway.pendingRequestSize()).isNotZero();
                 
assertThat(testTabletServerGateway.getRequest(0)).isInstanceOf(requestClass);
             }
         }

Reply via email to