This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new d97632a0c [server] Use batch operations of ZooKeeper to optimize 
updateLeaderAndIsr (#1445)
d97632a0c is described below

commit d97632a0c3a8b142d6b18a6a0192c89bbdc54c78
Author: Liebing <[email protected]>
AuthorDate: Sun Aug 10 21:36:13 2025 +0800

    [server] Use batch operations of ZooKeeper to optimize updateLeaderAndIsr 
(#1445)
    
    Co-authored-by: Liebing <[email protected]>
---
 .../coordinator/CoordinatorEventProcessor.java     | 40 +++++++++++----
 .../alibaba/fluss/server/zk/ZooKeeperClient.java   | 25 ++++++++++
 .../coordinator/CoordinatorEventProcessorTest.java | 58 ++++++++++++++++++++++
 .../fluss/server/zk/ZooKeeperClientTest.java       | 40 +++++++++++++++
 4 files changed, 152 insertions(+), 11 deletions(-)

diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java
 
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java
index afadb7508..dd65037dd 100644
--- 
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -91,6 +91,7 @@ import javax.annotation.concurrent.NotThreadSafe;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -831,6 +832,7 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
         // TODO verify leader epoch.
 
         List<AdjustIsrResultForBucket> result = new ArrayList<>();
+        Map<TableBucket, LeaderAndIsr> newLeaderAndIsrList = new HashMap<>();
         for (Map.Entry<TableBucket, LeaderAndIsr> entry : 
leaderAndIsrList.entrySet()) {
             TableBucket tableBucket = entry.getKey();
             LeaderAndIsr tryAdjustLeaderAndIsr = entry.getValue();
@@ -863,21 +865,37 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
                             tryAdjustLeaderAndIsr.isr(),
                             coordinatorContext.getCoordinatorEpoch(),
                             currentLeaderAndIsr.bucketEpoch() + 1);
-            try {
-                zooKeeperClient.updateLeaderAndIsr(tableBucket, 
newLeaderAndIsr);
-            } catch (Exception e) {
-                LOG.error("Error when register leader and isr.", e);
-                result.add(new AdjustIsrResultForBucket(tableBucket, 
ApiError.fromThrowable(e)));
+            newLeaderAndIsrList.put(tableBucket, newLeaderAndIsr);
+        }
+
+        try {
+            zooKeeperClient.batchUpdateLeaderAndIsr(newLeaderAndIsrList);
+            newLeaderAndIsrList.forEach(
+                    (tableBucket, newLeaderAndIsr) ->
+                            result.add(new 
AdjustIsrResultForBucket(tableBucket, newLeaderAndIsr)));
+        } catch (Exception batchException) {
+            LOG.error("Error when batch update leader and isr. Try one by 
one.", batchException);
+
+            for (Map.Entry<TableBucket, LeaderAndIsr> entry : 
newLeaderAndIsrList.entrySet()) {
+                TableBucket tableBucket = entry.getKey();
+                LeaderAndIsr newLeaderAndIsr = entry.getValue();
+                try {
+                    zooKeeperClient.updateLeaderAndIsr(tableBucket, 
newLeaderAndIsr);
+                } catch (Exception e) {
+                    LOG.error("Error when register leader and isr.", e);
+                    result.add(
+                            new AdjustIsrResultForBucket(tableBucket, 
ApiError.fromThrowable(e)));
+                }
+                // Successful return.
+                result.add(new AdjustIsrResultForBucket(tableBucket, 
newLeaderAndIsr));
             }
+        }
 
-            // update coordinator leader and isr cache.
-            coordinatorContext.putBucketLeaderAndIsr(tableBucket, 
newLeaderAndIsr);
+        // update coordinator leader and isr cache.
+        newLeaderAndIsrList.forEach(coordinatorContext::putBucketLeaderAndIsr);
 
-            // TODO update metadata for all alive tablet servers.
+        // TODO update metadata for all alive tablet servers.
 
-            // Successful return.
-            result.add(new AdjustIsrResultForBucket(tableBucket, 
newLeaderAndIsr));
-        }
         return result;
     }
 
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java 
b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java
index 078201212..2d0500f15 100644
--- 
a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java
@@ -301,6 +301,31 @@ public class ZooKeeperClient implements AutoCloseable {
         LOG.info("Updated {} for bucket {} in Zookeeper.", leaderAndIsr, 
tableBucket);
     }
 
+    public void batchUpdateLeaderAndIsr(Map<TableBucket, LeaderAndIsr> 
leaderAndIsrList)
+            throws Exception {
+        if (leaderAndIsrList.isEmpty()) {
+            return;
+        }
+
+        List<CuratorOp> ops = new ArrayList<>(leaderAndIsrList.size());
+        for (Map.Entry<TableBucket, LeaderAndIsr> entry : 
leaderAndIsrList.entrySet()) {
+            TableBucket tableBucket = entry.getKey();
+            LeaderAndIsr leaderAndIsr = entry.getValue();
+
+            String path = LeaderAndIsrZNode.path(tableBucket);
+            byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr);
+            CuratorOp updateOp = 
zkClient.transactionOp().setData().forPath(path, data);
+            ops.add(updateOp);
+            if (ops.size() == MAX_BATCH_SIZE) {
+                zkClient.transaction().forOperations(ops);
+                ops.clear();
+            }
+        }
+        if (!ops.isEmpty()) {
+            zkClient.transaction().forOperations(ops);
+        }
+    }
+
     public void deleteLeaderAndIsr(TableBucket tableBucket) throws Exception {
         String path = LeaderAndIsrZNode.path(tableBucket);
         zkClient.delete().forPath(path);
diff --git 
a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java
 
b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java
index b2ff021e9..637b126e0 100644
--- 
a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java
+++ 
b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java
@@ -31,16 +31,19 @@ import com.alibaba.fluss.metadata.TableBucketReplica;
 import com.alibaba.fluss.metadata.TableDescriptor;
 import com.alibaba.fluss.metadata.TablePartition;
 import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.rpc.messages.AdjustIsrResponse;
 import com.alibaba.fluss.rpc.messages.CommitKvSnapshotResponse;
 import com.alibaba.fluss.rpc.messages.CommitRemoteLogManifestResponse;
 import com.alibaba.fluss.rpc.messages.NotifyKvSnapshotOffsetRequest;
 import com.alibaba.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest;
 import com.alibaba.fluss.server.coordinator.event.AccessContextEvent;
+import com.alibaba.fluss.server.coordinator.event.AdjustIsrReceivedEvent;
 import com.alibaba.fluss.server.coordinator.event.CommitKvSnapshotEvent;
 import com.alibaba.fluss.server.coordinator.event.CommitRemoteLogManifestEvent;
 import com.alibaba.fluss.server.coordinator.event.CoordinatorEventManager;
 import com.alibaba.fluss.server.coordinator.statemachine.BucketState;
 import com.alibaba.fluss.server.coordinator.statemachine.ReplicaState;
+import com.alibaba.fluss.server.entity.AdjustIsrResultForBucket;
 import com.alibaba.fluss.server.entity.CommitKvSnapshotData;
 import com.alibaba.fluss.server.entity.CommitRemoteLogManifestData;
 import com.alibaba.fluss.server.kv.snapshot.CompletedSnapshot;
@@ -78,6 +81,7 @@ import java.nio.file.Path;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -99,6 +103,7 @@ import static 
com.alibaba.fluss.server.coordinator.statemachine.BucketState.Onli
 import static 
com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.OfflineReplica;
 import static 
com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.OnlineReplica;
 import static 
com.alibaba.fluss.server.testutils.KvTestUtils.mockCompletedSnapshot;
+import static 
com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getAdjustIsrResponseData;
 import static 
com.alibaba.fluss.server.utils.TableAssignmentUtils.generateAssignment;
 import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry;
 import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitValue;
@@ -761,6 +766,59 @@ class CoordinatorEventProcessorTest {
         verifyReceiveRequestExceptFor(3, leader, 
NotifyKvSnapshotOffsetRequest.class);
     }
 
+    @Test
+    void testProcessAdjustIsr() throws Exception {
+        // make sure all request to gateway should be successful
+        initCoordinatorChannel();
+        // create a table,
+        TablePath t1 = TablePath.of(defaultDatabase, 
"create_process_adjust_isr");
+        int nBuckets = 3;
+        int replicationFactor = 3;
+        TableAssignment tableAssignment =
+                generateAssignment(
+                        nBuckets,
+                        replicationFactor,
+                        new TabletServerInfo[] {
+                            new TabletServerInfo(0, "rack0"),
+                            new TabletServerInfo(1, "rack1"),
+                            new TabletServerInfo(2, "rack2")
+                        });
+        long t1Id = metadataManager.createTable(t1, TEST_TABLE, 
tableAssignment, false);
+        verifyTableCreated(t1Id, tableAssignment, nBuckets, replicationFactor);
+
+        // get the origin bucket leaderAndIsr
+        Map<TableBucket, LeaderAndIsr> bucketLeaderAndIsrMap =
+                new HashMap<>(
+                        waitValue(
+                                () -> fromCtx((ctx) -> 
Optional.of(ctx.bucketLeaderAndIsr())),
+                                Duration.ofMinutes(1),
+                                "leader not elected"));
+
+        // verify AdjustIsrReceivedEvent
+        CompletableFuture<AdjustIsrResponse> response = new 
CompletableFuture<>();
+        eventProcessor
+                .getCoordinatorEventManager()
+                .put(new AdjustIsrReceivedEvent(bucketLeaderAndIsrMap, 
response));
+
+        retryVerifyContext(
+                ctx ->
+                        bucketLeaderAndIsrMap.forEach(
+                                (tableBucket, leaderAndIsr) ->
+                                        
assertThat(ctx.getBucketLeaderAndIsr(tableBucket))
+                                                .contains(
+                                                        
leaderAndIsr.newLeaderAndIsr(
+                                                                
leaderAndIsr.leader(),
+                                                                
leaderAndIsr.isr()))));
+
+        // verify the response
+        AdjustIsrResponse adjustIsrResponse = response.get();
+        Map<TableBucket, AdjustIsrResultForBucket> resultForBucketMap =
+                getAdjustIsrResponseData(adjustIsrResponse);
+        assertThat(resultForBucketMap.keySet())
+                .containsAnyElementsOf(bucketLeaderAndIsrMap.keySet());
+        
assertThat(resultForBucketMap.values()).allMatch(AdjustIsrResultForBucket::succeeded);
+    }
+
     private CoordinatorEventProcessor buildCoordinatorEventProcessor() {
         return new CoordinatorEventProcessor(
                 zookeeperClient,
diff --git 
a/fluss-server/src/test/java/com/alibaba/fluss/server/zk/ZooKeeperClientTest.java
 
b/fluss-server/src/test/java/com/alibaba/fluss/server/zk/ZooKeeperClientTest.java
index 8f119f1af..81cc59b3d 100644
--- 
a/fluss-server/src/test/java/com/alibaba/fluss/server/zk/ZooKeeperClientTest.java
+++ 
b/fluss-server/src/test/java/com/alibaba/fluss/server/zk/ZooKeeperClientTest.java
@@ -58,6 +58,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static 
com.alibaba.fluss.server.utils.TableAssignmentUtils.generateAssignment;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -220,6 +221,45 @@ class ZooKeeperClientTest {
         }
     }
 
+    @Test
+    void testBatchUpdateLeaderAndIsr() throws Exception {
+        int totalCount = 100;
+
+        // try to register bucket leaderAndIsr
+        Map<TableBucket, LeaderAndIsr> leaderAndIsrList = new HashMap<>();
+        for (int i = 0; i < totalCount; i++) {
+            TableBucket tableBucket = new TableBucket(1, i);
+            LeaderAndIsr leaderAndIsr =
+                    new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 
3), 100, 1000);
+            leaderAndIsrList.put(tableBucket, leaderAndIsr);
+            zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr);
+        }
+
+        // try to batch update
+        Map<TableBucket, LeaderAndIsr> updateLeaderAndIsrList =
+                leaderAndIsrList.entrySet().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        Map.Entry::getKey,
+                                        entry -> {
+                                            LeaderAndIsr old = 
entry.getValue();
+                                            return new LeaderAndIsr(
+                                                    old.leader() + 1,
+                                                    old.leaderEpoch() + 1,
+                                                    old.isr(),
+                                                    old.coordinatorEpoch() + 1,
+                                                    old.bucketEpoch() + 1);
+                                        }));
+        zookeeperClient.batchUpdateLeaderAndIsr(updateLeaderAndIsrList);
+        for (Map.Entry<TableBucket, LeaderAndIsr> entry : 
updateLeaderAndIsrList.entrySet()) {
+            TableBucket tableBucket = entry.getKey();
+            LeaderAndIsr leaderAndIsr = entry.getValue();
+            
assertThat(zookeeperClient.getLeaderAndIsr(tableBucket)).hasValue(leaderAndIsr);
+            zookeeperClient.deleteLeaderAndIsr(tableBucket);
+            assertThat(zookeeperClient.getLeaderAndIsr(tableBucket)).isEmpty();
+        }
+    }
+
     @Test
     void testTable() throws Exception {
         TablePath tablePath = TablePath.of("db", "tb");

Reply via email to