This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 8b9d2937c [server] Use async zk operations to improve performance of
CoordinatorServer initCoordinatorContext() (#1381)
8b9d2937c is described below
commit 8b9d2937c35f47dce0d034785ee684d49ba04bc3
Author: xiaozhou <[email protected]>
AuthorDate: Mon Sep 15 10:43:47 2025 +0800
[server] Use async zk operations to improve performance of
CoordinatorServer initCoordinatorContext() (#1381)
---
.../org/apache/fluss/config/ConfigOptions.java | 6 +
.../fluss/server/authorizer/DefaultAuthorizer.java | 2 +-
.../coordinator/CoordinatorEventProcessor.java | 119 ++++++---
.../fluss/server/coordinator/MetadataManager.java | 25 ++
.../org/apache/fluss/server/zk/ZkAsyncRequest.java | 48 ++++
.../apache/fluss/server/zk/ZkAsyncResponse.java | 103 ++++++++
.../apache/fluss/server/zk/ZooKeeperClient.java | 289 ++++++++++++++++++++-
.../org/apache/fluss/server/zk/ZooKeeperUtils.java | 2 +-
.../fluss/server/zk/ZooKeeperClientTest.java | 118 ++++++---
website/docs/maintenance/configuration.md | 1 +
10 files changed, 632 insertions(+), 81 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index 6e738ce89..e6261c692 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -463,6 +463,12 @@ public class ConfigOptions {
// ------------------------------------------------------------------------
// ZooKeeper Client Settings
// ------------------------------------------------------------------------
+ public static final ConfigOption<Integer> ZOOKEEPER_MAX_INFLIGHT_REQUESTS =
+ key("zookeeper.client.max-inflight-requests")
+ .intType()
+ .defaultValue(100)
+ .withDescription(
+ "The maximum number of unacknowledged requests the
client will send to ZooKeeper before blocking.");
public static final ConfigOption<Duration> ZOOKEEPER_SESSION_TIMEOUT =
key("zookeeper.client.session-timeout")
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/authorizer/DefaultAuthorizer.java
b/fluss-server/src/main/java/org/apache/fluss/server/authorizer/DefaultAuthorizer.java
index c7528edc5..69b007d7a 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/authorizer/DefaultAuthorizer.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/authorizer/DefaultAuthorizer.java
@@ -388,7 +388,7 @@ public class DefaultAuthorizer extends AbstractAuthorizer
implements FatalErrorH
} else {
LOG.trace("Deleting path for {} because it had no ACLs
remaining", resource);
- zooKeeperClient.contitionalDeleteResourceAcl(
+ zooKeeperClient.conditionalDeleteResourceAcl(
resource, currentVersionedAcls.zkVersion);
}
writeComplete = true;
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
index 128cd07a0..ba48a6240 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -272,8 +272,12 @@ public class CoordinatorEventProcessor implements
EventProcessor {
int[] currentServers = zooKeeperClient.getSortedTabletServerList();
List<ServerInfo> tabletServerInfos = new ArrayList<>();
List<ServerNode> internalServerNodes = new ArrayList<>();
+
+ long start4loadTabletServer = System.currentTimeMillis();
+ Map<Integer, TabletServerRegistration> tabletServerRegistrations =
+ zooKeeperClient.getTabletServers(currentServers);
for (int server : currentServers) {
- TabletServerRegistration registration =
zooKeeperClient.getTabletServer(server).get();
+ TabletServerRegistration registration =
tabletServerRegistrations.get(server);
ServerInfo serverInfo =
new ServerInfo(
server,
@@ -299,48 +303,74 @@ public class CoordinatorEventProcessor implements
EventProcessor {
}
coordinatorContext.setLiveTabletServers(tabletServerInfos);
+ LOG.info(
+ "Load tablet servers success in {}ms when initializing
coordinator context.",
+ System.currentTimeMillis() - start4loadTabletServer);
+
// init tablet server channels
coordinatorChannelManager.startup(internalServerNodes);
// load all tables
+ long start4loadTables = System.currentTimeMillis();
List<TableInfo> autoPartitionTables = new ArrayList<>();
List<Tuple2<TableInfo, Long>> lakeTables = new ArrayList<>();
+ Set<TablePath> tablePathSet = new HashSet<>();
for (String database : metadataManager.listDatabases()) {
for (String tableName : metadataManager.listTables(database)) {
- TablePath tablePath = TablePath.of(database, tableName);
- TableInfo tableInfo = metadataManager.getTable(tablePath);
- coordinatorContext.putTablePath(tableInfo.getTableId(),
tablePath);
- coordinatorContext.putTableInfo(tableInfo);
- if (tableInfo.getTableConfig().isDataLakeEnabled()) {
- // always set to current time,
- // todo: should get from the last lake snapshot
- lakeTables.add(Tuple2.of(tableInfo,
System.currentTimeMillis()));
- }
- if (tableInfo.isPartitioned()) {
- Map<String, Long> partitions =
- zooKeeperClient.getPartitionNameAndIds(tablePath);
+ tablePathSet.add(TablePath.of(database, tableName));
+ }
+ }
+ Map<TablePath, TableInfo> tablePath2TableInfoMap =
metadataManager.getTables(tablePathSet);
+ List<TablePath> partitionedTablePathList =
+ tablePath2TableInfoMap.entrySet().stream()
+ .filter(entry -> entry.getValue().isPartitioned())
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+ Map<TablePath, Map<String, Long>> tablePathMap =
+
zooKeeperClient.getPartitionNameAndIdsForTables(partitionedTablePathList);
+ for (TablePath tablePath : tablePathSet) {
+ TableInfo tableInfo = tablePath2TableInfoMap.get(tablePath);
+ coordinatorContext.putTablePath(tableInfo.getTableId(), tablePath);
+ coordinatorContext.putTableInfo(tableInfo);
+ if (tableInfo.getTableConfig().isDataLakeEnabled()) {
+ // always set to current time,
+ // todo: should get from the last lake snapshot
+ lakeTables.add(Tuple2.of(tableInfo,
System.currentTimeMillis()));
+ }
+ if (tableInfo.isPartitioned()) {
+ Map<String, Long> partitions = tablePathMap.get(tablePath);
+ if (partitions != null) {
for (Map.Entry<String, Long> partition :
partitions.entrySet()) {
// put partition info to coordinator context
coordinatorContext.putPartition(
partition.getValue(),
PhysicalTablePath.of(tableInfo.getTablePath(),
partition.getKey()));
}
- // if the table is auto partition, put the partitions info
- if (tableInfo
- .getTableConfig()
- .getAutoPartitionStrategy()
- .isAutoPartitionEnabled()) {
- autoPartitionTables.add(tableInfo);
- }
+ }
+ // if the table is auto partition, put the partitions info
+ if (tableInfo
+ .getTableConfig()
+ .getAutoPartitionStrategy()
+ .isAutoPartitionEnabled()) {
+ autoPartitionTables.add(tableInfo);
}
}
}
+ LOG.info(
+ "Load tables success in {}ms when initializing coordinator
context.",
+ System.currentTimeMillis() - start4loadTables);
+
autoPartitionManager.initAutoPartitionTables(autoPartitionTables);
lakeTableTieringManager.initWithLakeTables(lakeTables);
// load all assignment
+ long start4loadAssignment = System.currentTimeMillis();
loadTableAssignment();
loadPartitionAssignment();
+ LOG.info(
+ "Load table and partition assignment success in {}ms when
initializing coordinator context.",
+ System.currentTimeMillis() - start4loadAssignment);
+
long end = System.currentTimeMillis();
LOG.info("Current total {} tables in the cluster.",
coordinatorContext.allTables().size());
LOG.info(
@@ -355,17 +385,19 @@ public class CoordinatorEventProcessor implements
EventProcessor {
private void loadTableAssignment() throws Exception {
List<String> assignmentTables =
zooKeeperClient.getChildren(TableIdsZNode.path());
Set<Long> deletedTables = new HashSet<>();
- for (String tableIdStr : assignmentTables) {
- long tableId = Long.parseLong(tableIdStr);
+ List<Long> tableIds =
+
assignmentTables.stream().map(Long::parseLong).collect(Collectors.toList());
+ Map<Long, TableAssignment> tableId2tableAssignmentMap =
+ zooKeeperClient.getTablesAssignments(tableIds);
+ for (Long tableId : tableIds) {
// if table id not in current coordinator context,
// we'll consider it as deleted
if (!coordinatorContext.containsTableId(tableId)) {
deletedTables.add(tableId);
}
- Optional<TableAssignment> optAssignment =
zooKeeperClient.getTableAssignment(tableId);
- if (optAssignment.isPresent()) {
- TableAssignment tableAssignment = optAssignment.get();
- loadAssignment(tableId, tableAssignment, null);
+ TableAssignment assignment =
tableId2tableAssignmentMap.get(tableId);
+ if (assignment != null) {
+ loadAssignment(tableId, assignment, null);
} else {
LOG.warn(
"Can't get the assignment for table {} with id {}.",
@@ -378,24 +410,25 @@ public class CoordinatorEventProcessor implements
EventProcessor {
private void loadPartitionAssignment() throws Exception {
// load all assignment
- List<String> partitionAssignmentNodes =
- zooKeeperClient.getChildren(PartitionIdsZNode.path());
+ List<Long> partitionAssignmentNodes =
+ zooKeeperClient.getChildren(PartitionIdsZNode.path()).stream()
+ .map(Long::parseLong)
+ .collect(Collectors.toList());
Set<TablePartition> deletedPartitions = new HashSet<>();
- for (String partitionIdStr : partitionAssignmentNodes) {
- long partitionId = Long.parseLong(partitionIdStr);
- Optional<PartitionAssignment> optAssignment =
- zooKeeperClient.getPartitionAssignment(partitionId);
- if (!optAssignment.isPresent()) {
+ Map<Long, PartitionAssignment> partitionId2partitionAssignmentMap =
+
zooKeeperClient.getPartitionsAssignments(partitionAssignmentNodes);
+ for (Long partitionId : partitionAssignmentNodes) {
+ PartitionAssignment assignment =
partitionId2partitionAssignmentMap.get(partitionId);
+ if (assignment == null) {
LOG.warn("Can't get the assignment for table partition {}.",
partitionId);
continue;
}
- PartitionAssignment partitionAssignment = optAssignment.get();
- long tableId = partitionAssignment.getTableId();
+ long tableId = assignment.getTableId();
// partition id doesn't exist in coordinator context, consider it
as deleted
if (!coordinatorContext.containsPartitionId(partitionId)) {
deletedPartitions.add(new TablePartition(tableId,
partitionId));
}
- loadAssignment(tableId, optAssignment.get(), partitionId);
+ loadAssignment(tableId, assignment, partitionId);
}
coordinatorContext.queuePartitionDeletion(deletedPartitions);
}
@@ -403,19 +436,25 @@ public class CoordinatorEventProcessor implements
EventProcessor {
private void loadAssignment(
long tableId, TableAssignment tableAssignment, @Nullable Long
partitionId)
throws Exception {
+ Set<TableBucket> tableBucketSet = new HashSet<>();
for (Map.Entry<Integer, BucketAssignment> entry :
tableAssignment.getBucketAssignments().entrySet()) {
int bucketId = entry.getKey();
BucketAssignment bucketAssignment = entry.getValue();
// put the assignment information to context
TableBucket tableBucket = new TableBucket(tableId, partitionId,
bucketId);
+ tableBucketSet.add(tableBucket);
coordinatorContext.updateBucketReplicaAssignment(
tableBucket, bucketAssignment.getReplicas());
- Optional<LeaderAndIsr> optLeaderAndIsr =
zooKeeperClient.getLeaderAndIsr(tableBucket);
+ }
+ Map<TableBucket, LeaderAndIsr> leaderAndIsrMap =
+ zooKeeperClient.getLeaderAndIsrs(tableBucketSet);
+ for (TableBucket tableBucket : tableBucketSet) {
+ LeaderAndIsr leaderAndIsr = leaderAndIsrMap.get(tableBucket);
// update bucket LeaderAndIsr info
- optLeaderAndIsr.ifPresent(
- leaderAndIsr ->
-
coordinatorContext.putBucketLeaderAndIsr(tableBucket, leaderAndIsr));
+ if (leaderAndIsr != null) {
+ coordinatorContext.putBucketLeaderAndIsr(tableBucket,
leaderAndIsr);
+ }
}
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index 6017fe3de..a216a7eb1 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -55,6 +55,8 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -316,6 +318,29 @@ public class MetadataManager {
return tableReg.toTableInfo(tablePath, schemaInfo,
defaultTableLakeOptions);
}
+ public Map<TablePath, TableInfo> getTables(Collection<TablePath>
tablePaths)
+ throws TableNotExistException {
+ Map<TablePath, TableInfo> result = new HashMap<>();
+ try {
+ Map<TablePath, TableRegistration> tablePath2TableRegistrations =
+ zookeeperClient.getTables(tablePaths);
+ for (TablePath tablePath : tablePaths) {
+ if (!tablePath2TableRegistrations.containsKey(tablePath)) {
+ throw new TableNotExistException("Table '" + tablePath +
"' does not exist.");
+ }
+ TableRegistration tableReg =
tablePath2TableRegistrations.get(tablePath);
+ SchemaInfo schemaInfo = getLatestSchema(tablePath);
+ result.put(
+ tablePath,
+ tableReg.toTableInfo(tablePath, schemaInfo,
defaultTableLakeOptions));
+ }
+ } catch (Exception e) {
+ throw new FlussRuntimeException(
+ String.format("Failed to get tables '%s'.", tablePaths),
e);
+ }
+ return result;
+ }
+
public TableRegistration getTableRegistration(TablePath tablePath) {
Optional<TableRegistration> optionalTable;
try {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkAsyncRequest.java
b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkAsyncRequest.java
new file mode 100644
index 000000000..9163bd1b6
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkAsyncRequest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk;
+
+/** The base class for ZooKeeper async operation request. */
+public abstract class ZkAsyncRequest {
+
+ private final String path;
+
+ protected ZkAsyncRequest(String path) {
+ this.path = path;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ //
-------------------------------------------------------------------------------------------
+
+ /** The request for ZooKeeper getData async operation. */
+ public static class ZkGetDataRequest extends ZkAsyncRequest {
+ protected ZkGetDataRequest(String path) {
+ super(path);
+ }
+ }
+
+ /** The request for ZooKeeper getChildren async operation. */
+ public static class ZkGetChildrenRequest extends ZkAsyncRequest {
+ protected ZkGetChildrenRequest(String path) {
+ super(path);
+ }
+ }
+}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkAsyncResponse.java
b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkAsyncResponse.java
new file mode 100644
index 000000000..900939f44
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkAsyncResponse.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk;
+
+import
org.apache.fluss.shaded.curator5.org.apache.curator.framework.api.CuratorEvent;
+import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
+import
org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.KeeperException.Code;
+
+import java.util.List;
+import java.util.Optional;
+
+/** Base class for ZooKeeper async operation responses. */
+public abstract class ZkAsyncResponse {
+ private final String path;
+ private final Code resultCode;
+
+ protected ZkAsyncResponse(String path, Code resultCode) {
+ this.path = path;
+ this.resultCode = resultCode;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public Code getResultCode() {
+ return resultCode;
+ }
+
+ /** Return None if the result code is OK and KeeperException otherwise. */
+ public Optional<KeeperException> resultException() {
+ if (resultCode == Code.OK) {
+ return Optional.empty();
+ } else {
+ return Optional.of(KeeperException.create(resultCode, path));
+ }
+ }
+
+ /** Throw KeeperException if the result code is not OK. */
+ public void maybeThrow() throws KeeperException {
+ if (resultCode != Code.OK) {
+ throw KeeperException.create(resultCode, path);
+ }
+ }
+
+ //
-------------------------------------------------------------------------------------------
+
+ /** The response for ZooKeeper getData async operation. */
+ public static class ZkGetDataResponse extends ZkAsyncResponse {
+
+ private final byte[] data;
+
+ public ZkGetDataResponse(String path, Code resultCode, byte[] data) {
+ super(path, resultCode);
+ this.data = data;
+ }
+
+ public byte[] getData() {
+ return data;
+ }
+
+ public static ZkGetDataResponse create(CuratorEvent event) {
+ return new ZkGetDataResponse(
+ event.getPath(), Code.get(event.getResultCode()),
event.getData());
+ }
+ }
+
+ /** The response for ZooKeeper getChildren async operation. */
+ public static class ZkGetChildrenResponse extends ZkAsyncResponse {
+
+ private final List<String> children;
+
+ public ZkGetChildrenResponse(
+ String path, KeeperException.Code resultCode, List<String>
children) {
+ super(path, resultCode);
+ this.children = children;
+ }
+
+ public List<String> getChildren() {
+ return children;
+ }
+
+ public static ZkGetChildrenResponse create(CuratorEvent event) {
+ return new ZkGetChildrenResponse(
+ event.getPath(), Code.get(event.getResultCode()),
event.getChildren());
+ }
+ }
+}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
index abdc5e1fd..d555cd1df 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
@@ -18,6 +18,8 @@
package org.apache.fluss.server.zk;
import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
import org.apache.fluss.metadata.ResolvedPartitionSpec;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.SchemaInfo;
@@ -29,6 +31,10 @@ import org.apache.fluss.security.acl.Resource;
import org.apache.fluss.security.acl.ResourceType;
import org.apache.fluss.server.authorizer.DefaultAuthorizer.VersionedAcls;
import org.apache.fluss.server.entity.RegisterTableBucketLeadAndIsrInfo;
+import org.apache.fluss.server.zk.ZkAsyncRequest.ZkGetChildrenRequest;
+import org.apache.fluss.server.zk.ZkAsyncRequest.ZkGetDataRequest;
+import org.apache.fluss.server.zk.ZkAsyncResponse.ZkGetChildrenResponse;
+import org.apache.fluss.server.zk.ZkAsyncResponse.ZkGetDataResponse;
import org.apache.fluss.server.zk.data.BucketSnapshot;
import org.apache.fluss.server.zk.data.CoordinatorAddress;
import org.apache.fluss.server.zk.data.DatabaseRegistration;
@@ -66,6 +72,8 @@ import org.apache.fluss.server.zk.data.ZkData.TableZNode;
import org.apache.fluss.server.zk.data.ZkData.TablesZNode;
import org.apache.fluss.server.zk.data.ZkData.WriterIdZNode;
import
org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFramework;
+import
org.apache.fluss.shaded.curator5.org.apache.curator.framework.api.BackgroundCallback;
+import
org.apache.fluss.shaded.curator5.org.apache.curator.framework.api.CuratorEvent;
import
org.apache.fluss.shaded.curator5.org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.CreateMode;
import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
@@ -78,6 +86,8 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -86,6 +96,11 @@ import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import static
org.apache.fluss.metadata.ResolvedPartitionSpec.fromPartitionName;
@@ -106,13 +121,21 @@ public class ZooKeeperClient implements AutoCloseable {
private final ZkSequenceIDCounter partitionIdCounter;
private final ZkSequenceIDCounter writerIdCounter;
- public ZooKeeperClient(CuratorFrameworkWithUnhandledErrorListener
curatorFrameworkWrapper) {
+ private final Semaphore inFlightRequests;
+
+ public ZooKeeperClient(
+ CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper,
+ Configuration configuration) {
this.curatorFrameworkWrapper = curatorFrameworkWrapper;
this.zkClient = curatorFrameworkWrapper.asCuratorFramework();
this.tableIdCounter = new ZkSequenceIDCounter(zkClient,
TableSequenceIdZNode.path());
this.partitionIdCounter =
new ZkSequenceIDCounter(zkClient,
PartitionSequenceIdZNode.path());
this.writerIdCounter = new ZkSequenceIDCounter(zkClient,
WriterIdZNode.path());
+
+ int maxInFlightRequests =
+
configuration.getInt(ConfigOptions.ZOOKEEPER_MAX_INFLIGHT_REQUESTS);
+ this.inFlightRequests = new Semaphore(maxInFlightRequests);
}
public Optional<byte[]> getOrEmpty(String path) throws Exception {
@@ -123,6 +146,57 @@ public class ZooKeeperClient implements AutoCloseable {
}
}
+ /**
+ * Send a pipelined sequence of requests and wait for all of their
responses.
+ *
+ * <p>The watch flag on each outgoing request will be set if we've already
registered a handler
+ * for the path associated with the request.
+ *
+ * @param requests a sequence of requests to send and wait on.
+ * @return the responses for the requests. If all requests have the same
type, the responses
+ * will have the respective response type.
+ */
+ private <Resp extends ZkAsyncResponse, Req extends ZkAsyncRequest>
+ List<Resp> handleRequestAsync(
+ List<Req> requests, Function<CuratorEvent, Resp>
respCreator) throws Exception {
+ if (requests == null || requests.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ ArrayBlockingQueue<Resp> responseQueue = new
ArrayBlockingQueue<>(requests.size());
+ CountDownLatch countDownLatch = new CountDownLatch(requests.size());
+
+ BackgroundCallback callback =
+ (client, event) -> {
+ Resp response = respCreator.apply(event);
+ responseQueue.add(response);
+ inFlightRequests.release();
+ countDownLatch.countDown();
+ };
+
+ for (Req request : requests) {
+ try {
+ inFlightRequests.acquire();
+ if (request instanceof ZkGetDataRequest) {
+
zkClient.getData().inBackground(callback).forPath(request.getPath());
+
+ } else if (request instanceof ZkGetChildrenRequest) {
+
zkClient.getChildren().inBackground(callback).forPath(request.getPath());
+
+ } else {
+ throw new IllegalArgumentException(
+ "Unsupported request type: " + request.getClass());
+ }
+ } catch (Exception e) {
+ inFlightRequests.release();
+ throw e;
+ }
+ }
+
+ countDownLatch.await();
+ return new ArrayList<>(responseQueue);
+ }
+
//
--------------------------------------------------------------------------------------------
// Coordinator server
//
--------------------------------------------------------------------------------------------
@@ -169,6 +243,32 @@ public class ZooKeeperClient implements AutoCloseable {
return bytes.map(ServerIdZNode::decode);
}
+ /** Get the tablet servers registered in ZK. */
+ public Map<Integer, TabletServerRegistration> getTabletServers(int[]
tabletServerIds)
+ throws Exception {
+ Map<String, Integer> path2IdMap =
+ Arrays.stream(tabletServerIds)
+ .boxed()
+ .collect(Collectors.toMap(ServerIdZNode::path, id ->
id));
+
+ List<ZkGetDataResponse> responses = getDataAsync(path2IdMap.keySet());
+ // tablet server id -> TabletServerRegistration
+ Map<Integer, TabletServerRegistration> result = new HashMap<>();
+ for (ZkGetDataResponse response : responses) {
+ if (response.getResultCode() == KeeperException.Code.OK) {
+ result.put(
+ path2IdMap.get(response.getPath()),
+ ServerIdZNode.decode(response.getData()));
+ } else {
+ LOG.warn(
+ "Failed to get data for path {}: {}",
+ response.getPath(),
+ response.getResultCode());
+ }
+ }
+ return result;
+ }
+
/** Gets the list of sorted server Ids. */
public int[] getSortedTabletServerList() throws Exception {
List<String> tabletServers = getChildren(ServerIdsZNode.path());
@@ -201,12 +301,62 @@ public class ZooKeeperClient implements AutoCloseable {
data.length == 0 ? null : TableIdZNode.decode(data));
}
+ /** Get the tables assignments in ZK. */
+ public Map<Long, TableAssignment> getTablesAssignments(List<Long>
tableIds) throws Exception {
+ Map<String, Long> path2TableIdMap =
+ tableIds.stream().collect(Collectors.toMap(TableIdZNode::path,
id -> id));
+
+ List<ZkGetDataResponse> responses =
getDataAsync(path2TableIdMap.keySet());
+ // tabletId -> TableAssignment
+ Map<Long, TableAssignment> result = new HashMap<>();
+ for (ZkGetDataResponse response : responses) {
+ if (response.getResultCode() == KeeperException.Code.OK
+ && response.getData().length > 0) {
+ result.put(
+ path2TableIdMap.get(response.getPath()),
+ TableIdZNode.decode(response.getData()));
+ } else {
+ LOG.warn(
+ "Failed to get data for path {}: {}, data length = {}",
+ response.getPath(),
+ response.getResultCode(),
+ response.getData() == null ? 0 :
response.getData().length);
+ }
+ }
+
+ return result;
+ }
+
/** Get the partition assignment in ZK. */
public Optional<PartitionAssignment> getPartitionAssignment(long
partitionId) throws Exception {
Optional<byte[]> bytes =
getOrEmpty(PartitionIdZNode.path(partitionId));
return bytes.map(PartitionIdZNode::decode);
}
+ /** Get the partitions assignments in ZK. */
+ public Map<Long, PartitionAssignment> getPartitionsAssignments(List<Long>
partitionIds)
+ throws Exception {
+ Map<String, Long> path2PartitionIdMap =
+
partitionIds.stream().collect(Collectors.toMap(PartitionIdZNode::path, id ->
id));
+
+ List<ZkGetDataResponse> responses =
getDataAsync(path2PartitionIdMap.keySet());
+ // tabletId -> PartitionAssignment
+ Map<Long, PartitionAssignment> result = new HashMap<>();
+ for (ZkGetDataResponse response : responses) {
+ if (response.getResultCode() == KeeperException.Code.OK) {
+ result.put(
+ path2PartitionIdMap.get(response.getPath()),
+ PartitionIdZNode.decode(response.getData()));
+ } else {
+ LOG.warn(
+ "Failed to get data for path {}: {}",
+ response.getPath(),
+ response.getResultCode());
+ }
+ }
+ return result;
+ }
+
public void updateTableAssignment(long tableId, TableAssignment
tableAssignment)
throws Exception {
String path = TableIdZNode.path(tableId);
@@ -294,6 +444,31 @@ public class ZooKeeperClient implements AutoCloseable {
return bytes.map(LeaderAndIsrZNode::decode);
}
+ /** Get the buckets LeaderAndIsr in ZK. */
+ public Map<TableBucket, LeaderAndIsr>
getLeaderAndIsrs(Collection<TableBucket> tableBuckets)
+ throws Exception {
+ Map<String, TableBucket> path2TableBucketMap =
+ tableBuckets.stream()
+ .collect(Collectors.toMap(LeaderAndIsrZNode::path,
bucket -> bucket));
+
+ List<ZkGetDataResponse> responses =
getDataAsync(path2TableBucketMap.keySet());
+ // TableBucket -> LeaderAndIsr
+ Map<TableBucket, LeaderAndIsr> result = new HashMap<>();
+ for (ZkGetDataResponse response : responses) {
+ if (response.getResultCode() == KeeperException.Code.OK) {
+ result.put(
+ path2TableBucketMap.get(response.getPath()),
+ LeaderAndIsrZNode.decode(response.getData()));
+ } else {
+ LOG.warn(
+ "Failed to get data for path {}: {}",
+ response.getPath(),
+ response.getResultCode());
+ }
+ }
+ return result;
+ }
+
public void updateLeaderAndIsr(TableBucket tableBucket, LeaderAndIsr
leaderAndIsr)
throws Exception {
String path = LeaderAndIsrZNode.path(tableBucket);
@@ -422,6 +597,30 @@ public class ZooKeeperClient implements AutoCloseable {
return bytes.map(TableZNode::decode);
}
+ /** Get the tables in ZK. */
+ public Map<TablePath, TableRegistration> getTables(Collection<TablePath>
tablePaths)
+ throws Exception {
+ Map<String, TablePath> path2TablePathMap =
+ tablePaths.stream().collect(Collectors.toMap(TableZNode::path,
path -> path));
+
+ List<ZkGetDataResponse> responses =
getDataAsync(path2TablePathMap.keySet());
+ // TablePath -> TableRegistration
+ Map<TablePath, TableRegistration> result = new HashMap<>();
+ for (ZkGetDataResponse response : responses) {
+ if (response.getResultCode() == KeeperException.Code.OK) {
+ result.put(
+ path2TablePathMap.get(response.getPath()),
+ TableZNode.decode(response.getData()));
+ } else {
+ LOG.warn(
+ "Failed to get data for path {}: {}",
+ response.getPath(),
+ response.getResultCode());
+ }
+ }
+ return result;
+ }
+
/** Update the table in ZK. */
public void updateTable(TablePath tablePath, TableRegistration
tableRegistration)
throws Exception {
@@ -455,6 +654,27 @@ public class ZooKeeperClient implements AutoCloseable {
return new HashSet<>(getChildren(path));
}
+ /** Get the partitions of tables in ZK. */
+ public Map<TablePath, List<String>> getPartitionsForTables(List<TablePath>
tablePaths)
+ throws Exception {
+ Map<String, TablePath> path2TablePathMap =
+
tablePaths.stream().collect(Collectors.toMap(PartitionsZNode::path, path ->
path));
+
+ List<ZkGetChildrenResponse> responses =
getChildrenAsync(path2TablePathMap.keySet());
+ Map<TablePath, List<String>> result = new HashMap<>();
+ for (ZkGetChildrenResponse response : responses) {
+ if (response.getResultCode() == KeeperException.Code.OK) {
+ result.put(path2TablePathMap.get(response.getPath()),
response.getChildren());
+ } else {
+ LOG.warn(
+ "Failed to get children for path {}: {}",
+ response.getPath(),
+ response.getResultCode());
+ }
+ }
+ return result;
+ }
+
/** Get the partition and the id for the partitions of a table in ZK. */
public Map<String, Long> getPartitionNameAndIds(TablePath tablePath)
throws Exception {
Map<String, Long> partitions = new HashMap<>();
@@ -466,6 +686,45 @@ public class ZooKeeperClient implements AutoCloseable {
return partitions;
}
+ /** Get the partition and the id for the partitions of tables in ZK. */
+ public Map<TablePath, Map<String, Long>> getPartitionNameAndIdsForTables(
+ List<TablePath> tablePaths) throws Exception {
+ Map<TablePath, Map<String, Long>> result = new HashMap<>();
+
+ Map<TablePath, List<String>> tablePath2Partitions =
getPartitionsForTables(tablePaths);
+
+ // each TablePath has a list of partitions
+ Map<String, TablePath> zkPath2TablePath = new HashMap<>();
+ Map<String, String> zkPath2PartitionName = new HashMap<>();
+ for (Map.Entry<TablePath, List<String>> entry :
tablePath2Partitions.entrySet()) {
+ TablePath tablePath = entry.getKey();
+ List<String> partitions = entry.getValue();
+ for (String partitionName : partitions) {
+ zkPath2TablePath.put(PartitionZNode.path(tablePath,
partitionName), tablePath);
+ zkPath2PartitionName.put(
+ PartitionZNode.path(tablePath, partitionName),
partitionName);
+ }
+ }
+
+ List<ZkGetDataResponse> responses =
getDataAsync(zkPath2TablePath.keySet());
+ for (ZkGetDataResponse response : responses) {
+ if (response.getResultCode() == KeeperException.Code.OK) {
+ String zkPath = response.getPath();
+ TablePath tablePath = zkPath2TablePath.get(zkPath);
+ String partitionName = zkPath2PartitionName.get(zkPath);
+ long partitionId =
PartitionZNode.decode(response.getData()).getPartitionId();
+ result.computeIfAbsent(tablePath, k -> new HashMap<>())
+ .put(partitionName, partitionId);
+ } else {
+ LOG.warn(
+ "Failed to get data for path {}: {}",
+ response.getPath(),
+ response.getResultCode());
+ }
+ }
+ return result;
+ }
+
/** Get the partition and the id for the partitions of a table in ZK by
partition spec. */
public Map<String, Long> getPartitionNameAndIds(
TablePath tablePath,
@@ -887,7 +1146,7 @@ public class ZooKeeperClient implements AutoCloseable {
* @param resource the resource whose ACL should be deleted
* @throws Exception if there is an error accessing ZooKeeper
*/
- public void contitionalDeleteResourceAcl(Resource resource, int zkVersion)
throws Exception {
+ public void conditionalDeleteResourceAcl(Resource resource, int zkVersion)
throws Exception {
String path = ResourceAclNode.path(resource);
zkClient.delete().withVersion(zkVersion).forPath(path);
}
@@ -920,6 +1179,32 @@ public class ZooKeeperClient implements AutoCloseable {
}
}
+ /**
+ * Gets the child nodes at given zk node paths asynchronously.
+ *
+ * @param paths the paths to list children
+ * @return list of async responses for each path
+ * @throws Exception if there is an error during the operation
+ */
+ public List<ZkGetChildrenResponse> getChildrenAsync(Collection<String>
paths) throws Exception {
+ List<ZkGetChildrenRequest> requests =
+
paths.stream().map(ZkGetChildrenRequest::new).collect(Collectors.toList());
+ return handleRequestAsync(requests, ZkGetChildrenResponse::create);
+ }
+
+ /**
+ * Gets the data of given zk node paths asynchronously.
+ *
+ * @param paths the paths to fetch data
+ * @return list of async responses for each path
+ * @throws Exception if there is an error during the operation
+ */
+ public List<ZkGetDataResponse> getDataAsync(Collection<String> paths)
throws Exception {
+ List<ZkGetDataRequest> requests =
+
paths.stream().map(ZkGetDataRequest::new).collect(Collectors.toList());
+ return handleRequestAsync(requests, ZkGetDataResponse::create);
+ }
+
/** Gets the data and stat of a given zk node path. */
public Optional<Stat> getStat(String path) throws Exception {
try {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperUtils.java
b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperUtils.java
index 485b0ea14..c74ff03ad 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperUtils.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperUtils.java
@@ -120,7 +120,7 @@ public class ZooKeeperUtils {
}
}
return new ZooKeeperClient(
- startZookeeperClient(curatorFrameworkBuilder,
fatalErrorHandler));
+ startZookeeperClient(curatorFrameworkBuilder,
fatalErrorHandler), configuration);
}
/**
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java
index 28d33f70d..9c15121e3 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java
@@ -131,52 +131,74 @@ class ZooKeeperClientTest {
// get tablet server1
assertThat(zookeeperClient.getTabletServer(1)).contains(registration1);
assertThat(zookeeperClient.getTabletServer(2)).contains(registration2);
+ // fetch all tablet servers
+ assertThat(zookeeperClient.getTabletServers(new int[] {1, 2}))
+ .containsValues(registration1, registration2);
}
@Test
void testTabletAssignments() throws Exception {
- long tableId = 1;
+ long tableId1 = 1;
+ long tableId2 = 2;
// try to get tablet assignment, should return empty
- assertThat(zookeeperClient.getTableAssignment(tableId)).isEmpty();
+ assertThat(zookeeperClient.getTableAssignment(tableId1)).isEmpty();
+ assertThat(zookeeperClient.getTableAssignment(tableId2)).isEmpty();
- TableAssignment tableAssignment =
+ TableAssignment tableAssignment1 =
TableAssignment.builder()
.add(0, BucketAssignment.of(1, 4, 5))
.add(1, BucketAssignment.of(2, 3))
.build();
- zookeeperClient.registerTableAssignment(tableId, tableAssignment);
-
assertThat(zookeeperClient.getTableAssignment(tableId)).contains(tableAssignment);
+ TableAssignment tableAssignment2 =
+ TableAssignment.builder()
+ .add(0, BucketAssignment.of(1, 2))
+ .add(1, BucketAssignment.of(3, 4, 5))
+ .build();
+ zookeeperClient.registerTableAssignment(tableId1, tableAssignment1);
+ zookeeperClient.registerTableAssignment(tableId2, tableAssignment2);
+
assertThat(zookeeperClient.getTableAssignment(tableId1)).contains(tableAssignment1);
+
assertThat(zookeeperClient.getTableAssignment(tableId2)).contains(tableAssignment2);
+
assertThat(zookeeperClient.getTablesAssignments(Arrays.asList(tableId1,
tableId2)))
+ .containsValues(tableAssignment1, tableAssignment2);
// test update
- TableAssignment tableAssignment2 =
+ TableAssignment tableAssignment3 =
TableAssignment.builder().add(3, BucketAssignment.of(1,
5)).build();
- zookeeperClient.updateTableAssignment(tableId, tableAssignment2);
-
assertThat(zookeeperClient.getTableAssignment(tableId)).contains(tableAssignment2);
+ zookeeperClient.updateTableAssignment(tableId1, tableAssignment3);
+
assertThat(zookeeperClient.getTableAssignment(tableId1)).contains(tableAssignment3);
// test delete
- zookeeperClient.deleteTableAssignment(tableId);
- assertThat(zookeeperClient.getTableAssignment(tableId)).isEmpty();
+ zookeeperClient.deleteTableAssignment(tableId1);
+ assertThat(zookeeperClient.getTableAssignment(tableId1)).isEmpty();
}
@Test
void testLeaderAndIsr() throws Exception {
// try to get bucket leadership, should return empty
- TableBucket tableBucket = new TableBucket(1, 1);
- assertThat(zookeeperClient.getLeaderAndIsr(tableBucket)).isEmpty();
+ TableBucket tableBucket1 = new TableBucket(1, 1);
+ TableBucket tableBucket2 = new TableBucket(1, 2);
+ assertThat(zookeeperClient.getLeaderAndIsr(tableBucket1)).isEmpty();
+ assertThat(zookeeperClient.getLeaderAndIsr(tableBucket2)).isEmpty();
// try to register bucket leaderAndIsr
- LeaderAndIsr leaderAndIsr = new LeaderAndIsr(1, 10, Arrays.asList(1,
2, 3), 100, 1000);
- zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr);
-
assertThat(zookeeperClient.getLeaderAndIsr(tableBucket)).hasValue(leaderAndIsr);
+ LeaderAndIsr leaderAndIsr1 = new LeaderAndIsr(1, 10, Arrays.asList(1,
2, 3), 100, 1000);
+ LeaderAndIsr leaderAndIsr2 = new LeaderAndIsr(2, 10, Arrays.asList(4,
5, 6), 100, 1000);
+
+ zookeeperClient.registerLeaderAndIsr(tableBucket1, leaderAndIsr1);
+ zookeeperClient.registerLeaderAndIsr(tableBucket2, leaderAndIsr2);
+
assertThat(zookeeperClient.getLeaderAndIsr(tableBucket1)).hasValue(leaderAndIsr1);
+
assertThat(zookeeperClient.getLeaderAndIsr(tableBucket2)).hasValue(leaderAndIsr2);
+
assertThat(zookeeperClient.getLeaderAndIsrs(Arrays.asList(tableBucket1,
tableBucket2)))
+ .containsValues(leaderAndIsr1, leaderAndIsr2);
// test update
- leaderAndIsr = new LeaderAndIsr(2, 20, Collections.emptyList(), 200,
2000);
- zookeeperClient.updateLeaderAndIsr(tableBucket, leaderAndIsr);
-
assertThat(zookeeperClient.getLeaderAndIsr(tableBucket)).hasValue(leaderAndIsr);
+ leaderAndIsr1 = new LeaderAndIsr(2, 20, Collections.emptyList(), 200,
2000);
+ zookeeperClient.updateLeaderAndIsr(tableBucket1, leaderAndIsr1);
+
assertThat(zookeeperClient.getLeaderAndIsr(tableBucket1)).hasValue(leaderAndIsr1);
// test delete
- zookeeperClient.deleteLeaderAndIsr(tableBucket);
- assertThat(zookeeperClient.getLeaderAndIsr(tableBucket)).isEmpty();
+ zookeeperClient.deleteLeaderAndIsr(tableBucket1);
+ assertThat(zookeeperClient.getLeaderAndIsr(tableBucket1)).isEmpty();
}
@ParameterizedTest
@@ -280,15 +302,18 @@ class ZooKeeperClientTest {
@Test
void testTable() throws Exception {
- TablePath tablePath = TablePath.of("db", "tb");
- assertThat(zookeeperClient.getTable(tablePath)).isEmpty();
+ TablePath tablePath1 = TablePath.of("db", "tb1");
+ TablePath tablePath2 = TablePath.of("db", "tb2");
+
+ assertThat(zookeeperClient.getTable(tablePath1)).isEmpty();
+ assertThat(zookeeperClient.getTable(tablePath2)).isEmpty();
// register table.
Map<String, String> options = new HashMap<>();
options.put("option-1", "100");
options.put("option-2", "200");
long currentMillis = System.currentTimeMillis();
- TableRegistration tableReg =
+ TableRegistration tableReg1 =
new TableRegistration(
11,
"first table",
@@ -298,32 +323,49 @@ class ZooKeeperClientTest {
Collections.singletonMap("custom-1", "100"),
currentMillis,
currentMillis);
+ TableRegistration tableReg2 =
+ new TableRegistration(
+ 12,
+ "second table",
+ Arrays.asList("a", "b"),
+ new TableDescriptor.TableDistribution(16,
Collections.singletonList("a")),
+ options,
+ Collections.singletonMap("custom-2", "200"),
+ currentMillis,
+ currentMillis);
+ zookeeperClient.registerTable(tablePath1, tableReg1);
+ zookeeperClient.registerTable(tablePath2, tableReg2);
- zookeeperClient.registerTable(tablePath, tableReg);
- Optional<TableRegistration> optionalTable =
zookeeperClient.getTable(tablePath);
- assertThat(optionalTable.isPresent()).isTrue();
- assertThat(optionalTable.get()).isEqualTo(tableReg);
+ Optional<TableRegistration> optionalTable1 =
zookeeperClient.getTable(tablePath1);
+ Optional<TableRegistration> optionalTable2 =
zookeeperClient.getTable(tablePath2);
+
+ assertThat(optionalTable1.isPresent()).isTrue();
+ assertThat(optionalTable1.get()).isEqualTo(tableReg1);
+ assertThat(optionalTable2.isPresent()).isTrue();
+ assertThat(optionalTable2.get()).isEqualTo(tableReg2);
+ assertThat(zookeeperClient.getTables(Arrays.asList(tablePath1,
tablePath2)))
+ .containsValues(tableReg1, tableReg2);
// update table.
currentMillis = System.currentTimeMillis();
- tableReg =
+ tableReg1 =
new TableRegistration(
- 22,
- "second table",
+ 13,
+ "third table",
Arrays.asList("a", "b"),
new TableDescriptor.TableDistribution(16,
Collections.singletonList("a")),
options,
- Collections.singletonMap("custom-2", "200"),
+ Collections.singletonMap("custom-3", "300"),
currentMillis,
currentMillis);
- zookeeperClient.updateTable(tablePath, tableReg);
- optionalTable = zookeeperClient.getTable(tablePath);
- assertThat(optionalTable.isPresent()).isTrue();
- assertThat(optionalTable.get()).isEqualTo(tableReg);
+ zookeeperClient.updateTable(tablePath1, tableReg1);
+ optionalTable1 = zookeeperClient.getTable(tablePath1);
+ assertThat(optionalTable1.isPresent()).isTrue();
+ assertThat(optionalTable1.get()).isEqualTo(tableReg1);
// delete table.
- zookeeperClient.deleteTable(tablePath);
- assertThat(zookeeperClient.getTable(tablePath)).isEmpty();
+ zookeeperClient.deleteTable(tablePath1);
+ assertThat(zookeeperClient.getTable(tablePath1)).isEmpty();
}
@Test
@@ -496,6 +538,8 @@ class ZooKeeperClientTest {
assertThat(partition.getPartitionId()).isEqualTo(1L);
partition = zookeeperClient.getPartition(tablePath, "p2").get();
assertThat(partition.getPartitionId()).isEqualTo(2L);
+
assertThat(zookeeperClient.getPartitionsForTables(Arrays.asList(tablePath)))
+ .containsValues(new ArrayList<>(partitions));
// test delete partition
zookeeperClient.deletePartition(tablePath, "p1");
diff --git a/website/docs/maintenance/configuration.md
b/website/docs/maintenance/configuration.md
index c2b7cb959..82c34e013 100644
--- a/website/docs/maintenance/configuration.md
+++ b/website/docs/maintenance/configuration.md
@@ -82,6 +82,7 @@ during the Fluss cluster working.
| zookeeper.client.tolerate-suspended-connections | Boolean | false |
Defines whether a suspended ZooKeeper connection will be treated as an error
that causes the leader information to be invalidated or not. In case you set
this option to %s, Fluss will wait until a ZooKeeper connection is marked as
lost before it revokes the leadership of components. This has the effect that
Fluss is more resilient against temporary connection instabilities at the cost
of running more likely into tim [...]
| zookeeper.client.ensemble-tracker | Boolean | true |
Defines whether Curator should enable ensemble tracker. This can be useful in
certain scenarios in which CuratorFramework is accessing to ZK clusters via
load balancer or Virtual IPs. Default Curator EnsembleTracking logic watches
`CuratorEventType.GET_CONFIG` events and changes ZooKeeper connection string.
It is not desired behaviour when ZooKeeper is running under the Virtual IPs.
Under certain configurations Ense [...]
| zookeeper.client.config-path | String | (None) | The
file path from which the ZooKeeper client reads its configuration. This allows
each ZooKeeper client instance to load its own configuration file, instead of
relying on shared JVM-level environment settings. This enables fine-grained
control over ZooKeeper client behavior.
[...]
+| zookeeper.client.max-inflight-requests | String | 100 | The
maximum number of unacknowledged requests the client will send to ZooKeeper
before blocking.
[...]
## Netty