This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new d97632a0c [server] Use batch operations of ZooKeeper to optimize
updateLeaderAndIsr (#1445)
d97632a0c is described below
commit d97632a0c3a8b142d6b18a6a0192c89bbdc54c78
Author: Liebing <[email protected]>
AuthorDate: Sun Aug 10 21:36:13 2025 +0800
[server] Use batch operations of ZooKeeper to optimize updateLeaderAndIsr
(#1445)
Co-authored-by: Liebing <[email protected]>
---
.../coordinator/CoordinatorEventProcessor.java | 40 +++++++++++----
.../alibaba/fluss/server/zk/ZooKeeperClient.java | 25 ++++++++++
.../coordinator/CoordinatorEventProcessorTest.java | 58 ++++++++++++++++++++++
.../fluss/server/zk/ZooKeeperClientTest.java | 40 +++++++++++++++
4 files changed, 152 insertions(+), 11 deletions(-)
diff --git
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java
index afadb7508..dd65037dd 100644
---
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java
+++
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -91,6 +91,7 @@ import javax.annotation.concurrent.NotThreadSafe;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -831,6 +832,7 @@ public class CoordinatorEventProcessor implements
EventProcessor {
// TODO verify leader epoch.
List<AdjustIsrResultForBucket> result = new ArrayList<>();
+ Map<TableBucket, LeaderAndIsr> newLeaderAndIsrList = new HashMap<>();
for (Map.Entry<TableBucket, LeaderAndIsr> entry :
leaderAndIsrList.entrySet()) {
TableBucket tableBucket = entry.getKey();
LeaderAndIsr tryAdjustLeaderAndIsr = entry.getValue();
@@ -863,21 +865,37 @@ public class CoordinatorEventProcessor implements
EventProcessor {
tryAdjustLeaderAndIsr.isr(),
coordinatorContext.getCoordinatorEpoch(),
currentLeaderAndIsr.bucketEpoch() + 1);
- try {
- zooKeeperClient.updateLeaderAndIsr(tableBucket,
newLeaderAndIsr);
- } catch (Exception e) {
- LOG.error("Error when register leader and isr.", e);
- result.add(new AdjustIsrResultForBucket(tableBucket,
ApiError.fromThrowable(e)));
+ newLeaderAndIsrList.put(tableBucket, newLeaderAndIsr);
+ }
+
+ try {
+ zooKeeperClient.batchUpdateLeaderAndIsr(newLeaderAndIsrList);
+ newLeaderAndIsrList.forEach(
+ (tableBucket, newLeaderAndIsr) ->
+ result.add(new
AdjustIsrResultForBucket(tableBucket, newLeaderAndIsr)));
+ } catch (Exception batchException) {
+ LOG.error("Error when batch update leader and isr. Try one by
one.", batchException);
+
+ for (Map.Entry<TableBucket, LeaderAndIsr> entry :
newLeaderAndIsrList.entrySet()) {
+ TableBucket tableBucket = entry.getKey();
+ LeaderAndIsr newLeaderAndIsr = entry.getValue();
+ try {
+ zooKeeperClient.updateLeaderAndIsr(tableBucket,
newLeaderAndIsr);
+ } catch (Exception e) {
+ LOG.error("Error when register leader and isr.", e);
+ result.add(
+ new AdjustIsrResultForBucket(tableBucket,
ApiError.fromThrowable(e)));
+ }
+ // Successful return.
+ result.add(new AdjustIsrResultForBucket(tableBucket,
newLeaderAndIsr));
}
+ }
- // update coordinator leader and isr cache.
- coordinatorContext.putBucketLeaderAndIsr(tableBucket,
newLeaderAndIsr);
+ // update coordinator leader and isr cache.
+ newLeaderAndIsrList.forEach(coordinatorContext::putBucketLeaderAndIsr);
- // TODO update metadata for all alive tablet servers.
+ // TODO update metadata for all alive tablet servers.
- // Successful return.
- result.add(new AdjustIsrResultForBucket(tableBucket,
newLeaderAndIsr));
- }
return result;
}
diff --git
a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java
b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java
index 078201212..2d0500f15 100644
---
a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java
+++
b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java
@@ -301,6 +301,31 @@ public class ZooKeeperClient implements AutoCloseable {
LOG.info("Updated {} for bucket {} in Zookeeper.", leaderAndIsr,
tableBucket);
}
+ public void batchUpdateLeaderAndIsr(Map<TableBucket, LeaderAndIsr>
leaderAndIsrList)
+ throws Exception {
+ if (leaderAndIsrList.isEmpty()) {
+ return;
+ }
+
+ List<CuratorOp> ops = new ArrayList<>(leaderAndIsrList.size());
+ for (Map.Entry<TableBucket, LeaderAndIsr> entry :
leaderAndIsrList.entrySet()) {
+ TableBucket tableBucket = entry.getKey();
+ LeaderAndIsr leaderAndIsr = entry.getValue();
+
+ String path = LeaderAndIsrZNode.path(tableBucket);
+ byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr);
+ CuratorOp updateOp =
zkClient.transactionOp().setData().forPath(path, data);
+ ops.add(updateOp);
+ if (ops.size() == MAX_BATCH_SIZE) {
+ zkClient.transaction().forOperations(ops);
+ ops.clear();
+ }
+ }
+ if (!ops.isEmpty()) {
+ zkClient.transaction().forOperations(ops);
+ }
+ }
+
public void deleteLeaderAndIsr(TableBucket tableBucket) throws Exception {
String path = LeaderAndIsrZNode.path(tableBucket);
zkClient.delete().forPath(path);
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java
index b2ff021e9..637b126e0 100644
---
a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java
+++
b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java
@@ -31,16 +31,19 @@ import com.alibaba.fluss.metadata.TableBucketReplica;
import com.alibaba.fluss.metadata.TableDescriptor;
import com.alibaba.fluss.metadata.TablePartition;
import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.rpc.messages.AdjustIsrResponse;
import com.alibaba.fluss.rpc.messages.CommitKvSnapshotResponse;
import com.alibaba.fluss.rpc.messages.CommitRemoteLogManifestResponse;
import com.alibaba.fluss.rpc.messages.NotifyKvSnapshotOffsetRequest;
import com.alibaba.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest;
import com.alibaba.fluss.server.coordinator.event.AccessContextEvent;
+import com.alibaba.fluss.server.coordinator.event.AdjustIsrReceivedEvent;
import com.alibaba.fluss.server.coordinator.event.CommitKvSnapshotEvent;
import com.alibaba.fluss.server.coordinator.event.CommitRemoteLogManifestEvent;
import com.alibaba.fluss.server.coordinator.event.CoordinatorEventManager;
import com.alibaba.fluss.server.coordinator.statemachine.BucketState;
import com.alibaba.fluss.server.coordinator.statemachine.ReplicaState;
+import com.alibaba.fluss.server.entity.AdjustIsrResultForBucket;
import com.alibaba.fluss.server.entity.CommitKvSnapshotData;
import com.alibaba.fluss.server.entity.CommitRemoteLogManifestData;
import com.alibaba.fluss.server.kv.snapshot.CompletedSnapshot;
@@ -78,6 +81,7 @@ import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -99,6 +103,7 @@ import static
com.alibaba.fluss.server.coordinator.statemachine.BucketState.Onli
import static
com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.OfflineReplica;
import static
com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.OnlineReplica;
import static
com.alibaba.fluss.server.testutils.KvTestUtils.mockCompletedSnapshot;
+import static
com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getAdjustIsrResponseData;
import static
com.alibaba.fluss.server.utils.TableAssignmentUtils.generateAssignment;
import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry;
import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitValue;
@@ -761,6 +766,59 @@ class CoordinatorEventProcessorTest {
verifyReceiveRequestExceptFor(3, leader,
NotifyKvSnapshotOffsetRequest.class);
}
+ @Test
+ void testProcessAdjustIsr() throws Exception {
+ // make sure all request to gateway should be successful
+ initCoordinatorChannel();
+ // create a table,
+ TablePath t1 = TablePath.of(defaultDatabase,
"create_process_adjust_isr");
+ int nBuckets = 3;
+ int replicationFactor = 3;
+ TableAssignment tableAssignment =
+ generateAssignment(
+ nBuckets,
+ replicationFactor,
+ new TabletServerInfo[] {
+ new TabletServerInfo(0, "rack0"),
+ new TabletServerInfo(1, "rack1"),
+ new TabletServerInfo(2, "rack2")
+ });
+ long t1Id = metadataManager.createTable(t1, TEST_TABLE,
tableAssignment, false);
+ verifyTableCreated(t1Id, tableAssignment, nBuckets, replicationFactor);
+
+ // get the origin bucket leaderAndIsr
+ Map<TableBucket, LeaderAndIsr> bucketLeaderAndIsrMap =
+ new HashMap<>(
+ waitValue(
+ () -> fromCtx((ctx) ->
Optional.of(ctx.bucketLeaderAndIsr())),
+ Duration.ofMinutes(1),
+ "leader not elected"));
+
+ // verify AdjustIsrReceivedEvent
+ CompletableFuture<AdjustIsrResponse> response = new
CompletableFuture<>();
+ eventProcessor
+ .getCoordinatorEventManager()
+ .put(new AdjustIsrReceivedEvent(bucketLeaderAndIsrMap,
response));
+
+ retryVerifyContext(
+ ctx ->
+ bucketLeaderAndIsrMap.forEach(
+ (tableBucket, leaderAndIsr) ->
+
assertThat(ctx.getBucketLeaderAndIsr(tableBucket))
+ .contains(
+
leaderAndIsr.newLeaderAndIsr(
+
leaderAndIsr.leader(),
+
leaderAndIsr.isr()))));
+
+ // verify the response
+ AdjustIsrResponse adjustIsrResponse = response.get();
+ Map<TableBucket, AdjustIsrResultForBucket> resultForBucketMap =
+ getAdjustIsrResponseData(adjustIsrResponse);
+ assertThat(resultForBucketMap.keySet())
+ .containsAnyElementsOf(bucketLeaderAndIsrMap.keySet());
+
assertThat(resultForBucketMap.values()).allMatch(AdjustIsrResultForBucket::succeeded);
+ }
+
private CoordinatorEventProcessor buildCoordinatorEventProcessor() {
return new CoordinatorEventProcessor(
zookeeperClient,
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/zk/ZooKeeperClientTest.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/zk/ZooKeeperClientTest.java
index 8f119f1af..81cc59b3d 100644
---
a/fluss-server/src/test/java/com/alibaba/fluss/server/zk/ZooKeeperClientTest.java
+++
b/fluss-server/src/test/java/com/alibaba/fluss/server/zk/ZooKeeperClientTest.java
@@ -58,6 +58,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.stream.Collectors;
import static
com.alibaba.fluss.server.utils.TableAssignmentUtils.generateAssignment;
import static org.assertj.core.api.Assertions.assertThat;
@@ -220,6 +221,45 @@ class ZooKeeperClientTest {
}
}
+ @Test
+ void testBatchUpdateLeaderAndIsr() throws Exception {
+ int totalCount = 100;
+
+ // try to register bucket leaderAndIsr
+ Map<TableBucket, LeaderAndIsr> leaderAndIsrList = new HashMap<>();
+ for (int i = 0; i < totalCount; i++) {
+ TableBucket tableBucket = new TableBucket(1, i);
+ LeaderAndIsr leaderAndIsr =
+ new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i +
3), 100, 1000);
+ leaderAndIsrList.put(tableBucket, leaderAndIsr);
+ zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr);
+ }
+
+ // try to batch update
+ Map<TableBucket, LeaderAndIsr> updateLeaderAndIsrList =
+ leaderAndIsrList.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> {
+ LeaderAndIsr old =
entry.getValue();
+ return new LeaderAndIsr(
+ old.leader() + 1,
+ old.leaderEpoch() + 1,
+ old.isr(),
+ old.coordinatorEpoch() + 1,
+ old.bucketEpoch() + 1);
+ }));
+ zookeeperClient.batchUpdateLeaderAndIsr(updateLeaderAndIsrList);
+ for (Map.Entry<TableBucket, LeaderAndIsr> entry :
updateLeaderAndIsrList.entrySet()) {
+ TableBucket tableBucket = entry.getKey();
+ LeaderAndIsr leaderAndIsr = entry.getValue();
+
assertThat(zookeeperClient.getLeaderAndIsr(tableBucket)).hasValue(leaderAndIsr);
+ zookeeperClient.deleteLeaderAndIsr(tableBucket);
+ assertThat(zookeeperClient.getLeaderAndIsr(tableBucket)).isEmpty();
+ }
+ }
+
@Test
void testTable() throws Exception {
TablePath tablePath = TablePath.of("db", "tb");