This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 9b27ba2c9b3 [fix][broker]Fix never recovered metadata store bad
version issue if received a large response from ZK (#24580)
9b27ba2c9b3 is described below
commit 9b27ba2c9b3517a5d22e2437c7e7fbeda7f56482
Author: fengyubiao <[email protected]>
AuthorDate: Thu Sep 11 18:58:27 2025 +0800
[fix][broker]Fix never recovered metadata store bad version issue if
received a large response from ZK (#24580)
(cherry picked from commit 1cb64a921c946bfcebb16890a38b9e748ecb0e62)
---
.../org/apache/pulsar/broker/PulsarService.java | 3 +
.../zookeeper/DefaultMetadataNodeSizeStats.java | 262 +++++++++
.../broker/service/BrokerServiceChaosTest.java | 3 +
.../CanReconnectZKClientPulsarServiceBaseTest.java | 4 +-
.../ZKMetadataStoreBatchIOperationTest.java | 105 ++++
.../DefaultMetadataNodeSizeStatsSplitPathTest.java | 273 +++++++++
.../DefaultMetadataNodeSizeStatsTest.java | 484 ++++++++++++++++
.../metadata/api/DummyMetadataNodeSizeStats.java | 43 ++
.../pulsar/metadata/api/MetadataNodeSizeStats.java | 56 ++
.../pulsar/metadata/api/MetadataStoreConfig.java | 5 +
.../metadata/impl/AbstractMetadataStore.java | 17 +-
.../metadata/impl/LocalMemoryMetadataStore.java | 3 +-
.../pulsar/metadata/impl/RocksdbMetadataStore.java | 3 +-
.../pulsar/metadata/impl/ZKMetadataStore.java | 40 +-
.../batching/AbstractBatchedMetadataStore.java | 42 +-
.../DefaultMetadataStoreBatchStrategy.java | 68 +++
.../impl/batching/MetadataStoreBatchStrategy.java | 33 ++
.../batching/ZKMetadataStoreBatchStrategy.java | 131 +++++
.../impl/MetadataStoreFactoryImplTest.java | 2 +-
.../batching/ZKMetadataStoreBatchStrategyTest.java | 637 +++++++++++++++++++++
.../java/org/apache/zookeeper/MockZooKeeper.java | 8 +
.../org/apache/zookeeper/MockZooKeeperSession.java | 9 +
22 files changed, 2188 insertions(+), 43 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 66e5b495076..dd726ee9a56 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -179,6 +179,7 @@ import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
+import org.apache.pulsar.zookeeper.DefaultMetadataNodeSizeStats;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.slf4j.Logger;
@@ -381,6 +382,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
.batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb())
.metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
.synchronizer(synchronizer)
+ .nodeSizeStats(new DefaultMetadataNodeSizeStats())
.build());
}
@@ -1175,6 +1177,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
.batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb())
.synchronizer(synchronizer)
.metadataStoreName(MetadataStoreConfig.METADATA_STORE)
+ .nodeSizeStats(new DefaultMetadataNodeSizeStats())
.build());
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStats.java
new file mode 100644
index 00000000000..433d47624f1
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStats.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.zookeeper;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataNodeSizeStats;
+
+@Slf4j
+public class DefaultMetadataNodeSizeStats implements MetadataNodeSizeStats {
+
+ public static final int UNSET = -1;
+
+ private static final SplitPathRes MEANINGLESS_SPLIT_PATH_RES = new
SplitPathRes();
+ private static final FastThreadLocal<SplitPathRes> LOCAL_SPLIT_PATH_RES =
new FastThreadLocal<SplitPathRes>() {
+ @Override
+ protected SplitPathRes initialValue() {
+ return new SplitPathRes();
+ }
+ };
+ private final AtomicReferenceArray<Integer> maxSizeMapping;
+ private final AtomicReferenceArray<Integer> maxChildrenCountMapping;
+
+ public DefaultMetadataNodeSizeStats() {
+ int pathTypeCount = PathType.values().length;
+ maxSizeMapping = new AtomicReferenceArray<>(pathTypeCount);
+ maxChildrenCountMapping = new AtomicReferenceArray<>(pathTypeCount);
+ for (int i = 0; i < pathTypeCount; i++) {
+ maxSizeMapping.set(i, UNSET);
+ maxChildrenCountMapping.set(i, UNSET);
+ }
+ }
+
+ @Override
+ public void recordPut(String path, byte[] data) {
+ PathType pathType = getPathType(path);
+ if (pathType == PathType.UNKNOWN) {
+ return;
+ }
+ maxSizeMapping.set(pathType.ordinal(),
Math.max(maxSizeMapping.get(pathType.ordinal()), data.length));
+ }
+
+ @Override
+ public void recordGetRes(String path, GetResult getResult) {
+ PathType pathType = getPathType(path);
+ if (pathType == PathType.UNKNOWN || getResult == null) {
+ return;
+ }
+ maxSizeMapping.set(pathType.ordinal(),
Math.max(maxSizeMapping.get(pathType.ordinal()),
+ getResult.getValue().length));
+ }
+
+ @Override
+ public void recordGetChildrenRes(String path, List<String> list) {
+ PathType pathType = getPathType(path);
+ if (pathType == PathType.UNKNOWN) {
+ return;
+ }
+ int size = CollectionUtils.isEmpty(list) ? 0 : list.size();
+ maxChildrenCountMapping.set(pathType.ordinal(),
Math.max(maxChildrenCountMapping.get(pathType.ordinal()),
+ size));
+ }
+
+ @Override
+ public int getMaxSizeOfSameResourceType(String path) {
+ PathType pathType = getPathType(path);
+ if (pathType == PathType.UNKNOWN) {
+ return -1;
+ }
+ return maxSizeMapping.get(pathType.ordinal());
+ }
+
+ @Override
+ public int getMaxChildrenCountOfSameResourceType(String path) {
+ PathType pathType = getPathType(path);
+ if (pathType == PathType.UNKNOWN) {
+ return -1;
+ }
+ return maxChildrenCountMapping.get(pathType.ordinal());
+ }
+
+ private PathType getPathType(String path) {
+ SplitPathRes splitPathRes = splitPath(path);
+ if (splitPathRes.partCount < 2) {
+ return PathType.UNKNOWN;
+ }
+ return switch (splitPathRes.parts[0]) {
+ case "admin" -> getAdminPathType(splitPathRes);
+ case "managed-ledgers" -> getMlPathType(splitPathRes);
+ case "loadbalance" -> getLoadBalancePathType(splitPathRes);
+ case "namespace" -> getBundleOwnerPathType(splitPathRes);
+ case "schemas" -> getSchemaPathType(splitPathRes);
+ default -> PathType.UNKNOWN;
+ };
+ }
+
+ private PathType getAdminPathType(SplitPathRes splitPathRes) {
+ return switch (splitPathRes.parts[1]) {
+ case "clusters" -> PathType.CLUSTER;
+ case "policies" -> switch (splitPathRes.partCount) {
+ case 3 -> PathType.TENANT;
+ case 4 -> PathType.NAMESPACE_POLICIES;
+ default -> PathType.UNKNOWN;
+ };
+ case "local-policies" -> switch (splitPathRes.partCount) {
+ case 4 -> PathType.NAMESPACE_POLICIES;
+ default -> PathType.UNKNOWN;
+ };
+ case "partitioned-topics" -> switch (splitPathRes.partCount) {
+ case 5 -> PathType.PARTITIONED_NAMESPACE;
+ case 6 -> PathType.PARTITIONED_TOPIC;
+ default -> PathType.UNKNOWN;
+ };
+ default -> PathType.UNKNOWN;
+ };
+ }
+
+ private PathType getBundleOwnerPathType(SplitPathRes splitPathRes) {
+ return switch (splitPathRes.partCount) {
+ case 3 -> PathType.BUNDLE_OWNER_NAMESPACE;
+ case 4 -> PathType.BUNDLE_OWNER;
+ default -> PathType.UNKNOWN;
+ };
+ }
+
+ private PathType getSchemaPathType(SplitPathRes splitPathRes) {
+ if (splitPathRes.partCount == 4) {
+ return PathType.TOPIC_SCHEMA;
+ }
+ return PathType.UNKNOWN;
+ }
+
+ private PathType getLoadBalancePathType(SplitPathRes splitPathRes) {
+ return switch (splitPathRes.parts[1]) {
+ case "brokers" -> switch (splitPathRes.partCount) {
+ case 2 -> PathType.BROKERS;
+ case 3 -> PathType.BROKER;
+ default -> PathType.UNKNOWN;
+ };
+ case "bundle-data" -> switch (splitPathRes.partCount) {
+ case 4 -> PathType.BUNDLE_NAMESPACE;
+ case 5 -> PathType.BUNDLE_DATA;
+ default -> PathType.UNKNOWN;
+ };
+ case "broker-time-average" -> switch (splitPathRes.partCount) {
+ case 3 -> PathType.BROKER_TIME_AVERAGE;
+ default -> PathType.UNKNOWN;
+ };
+ case "leader" -> PathType.BROKER_LEADER;
+ default -> PathType.UNKNOWN;
+ };
+ }
+
+ private PathType getMlPathType(SplitPathRes splitPathRes) {
+ return switch (splitPathRes.partCount) {
+ case 4 -> PathType.ML_NAMESPACE;
+ case 5 -> PathType.TOPIC;
+ // v2 subscription and v1 topic.
+ case 6 -> PathType.SUBSCRIPTION;
+ // v1 subscription.
+ case 7 -> PathType.SUBSCRIPTION;
+ default -> PathType.UNKNOWN;
+ };
+ }
+
+ enum PathType {
+ // admin
+ CLUSTER,
+ TENANT,
+ NAMESPACE_POLICIES,
+ LOCAL_POLICIES,
+ // load-balance
+ BROKERS,
+ BROKER,
+ BROKER_LEADER,
+ BUNDLE_NAMESPACE,
+ BUNDLE_DATA,
+ BROKER_TIME_AVERAGE ,
+ BUNDLE_OWNER_NAMESPACE ,
+ BUNDLE_OWNER ,
+ // topic schema
+ TOPIC_SCHEMA,
+ // partitioned topics.
+ PARTITIONED_TOPIC,
+ PARTITIONED_NAMESPACE,
+ // managed ledger.
+ ML_NAMESPACE,
+ TOPIC,
+ SUBSCRIPTION,
+ UNKNOWN;
+ }
+
+ static class SplitPathRes {
+ String[] parts = new String[2];
+ int partCount;
+
+ void reset() {
+ parts[0] = null;
+ parts[1] = null;
+ partCount = 0;
+ }
+ }
+
+ /**
+ * Split the path by the delimiter '/', calculate pieces count and only
keep the first two parts.
+ */
+ static SplitPathRes splitPath(String path) {
+ if (path == null || path.length() <= 1) {
+ return MEANINGLESS_SPLIT_PATH_RES;
+ }
+ SplitPathRes res = LOCAL_SPLIT_PATH_RES.get();
+ res.reset();
+ String[] parts = res.parts;
+ char delimiter = '/';
+ int length = path.length();
+ int start = 0;
+ int count = 0;
+ for (int i = 0; i < length; i++) {
+ if (path.charAt(i) == delimiter) {
+ // Skip the first and the latest delimiter.
+ if (start == i) {
+ start = i + 1;
+ continue;
+ }
+ // Only keep the first two parts.
+ if (count < 2) {
+ parts[count] = path.substring(start, i);
+ }
+ start = i + 1;
+ count++;
+ }
+ }
+ if (start < length) {
+ if (count < 2) {
+ parts[count] = path.substring(start);
+ }
+ count++;
+ }
+ res.partCount = count;
+ return res;
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java
index 5650fe6e72f..96df607d7e4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java
@@ -97,5 +97,8 @@ public class BrokerServiceChaosTest extends
CanReconnectZKClientPulsarServiceBas
PartitionedTopicMetadata partitionedTopicMetadata3 =
pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName2,
true).get();
assertEquals(partitionedTopicMetadata3.partitions, 3);
+
+ // cleanup.
+ stopLocalMetadataStoreConnectionTermination();
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
index 5a8f946fe2c..88585b2c1ae 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
@@ -191,7 +191,7 @@ public abstract class
CanReconnectZKClientPulsarServiceBaseTest extends TestRetr
log.info("--- OneWayReplicatorTestBase::setup completed ---");
}
- private void setConfigDefaults(ServiceConfiguration config, String
clusterName,
+ protected void setConfigDefaults(ServiceConfiguration config, String
clusterName,
LocalBookkeeperEnsemble bookkeeperEnsemble,
ZookeeperServerTest brokerConfigZk) {
config.setClusterName(clusterName);
config.setAdvertisedAddress("localhost");
@@ -217,8 +217,6 @@ public abstract class
CanReconnectZKClientPulsarServiceBaseTest extends TestRetr
markCurrentSetupNumberCleaned();
log.info("--- Shutting down ---");
- stopLocalMetadataStoreConnectionTermination();
-
// Stop brokers.
if (client != null) {
client.close();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZKMetadataStoreBatchIOperationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZKMetadataStoreBatchIOperationTest.java
new file mode 100644
index 00000000000..bab495bb5f0
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZKMetadataStoreBatchIOperationTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.pulsar.zookeeper.ZookeeperServerTest;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class ZKMetadataStoreBatchIOperationTest extends
CanReconnectZKClientPulsarServiceBaseTest {
+
+ @Override
+ @BeforeClass(alwaysRun = true, timeOut = 300000)
+ public void setup() throws Exception {
+ System.setProperty("jute.maxbuffer", "16384");
+ super.setup();
+ }
+
+ @Override
+ @AfterClass(alwaysRun = true, timeOut = 300000)
+ public void cleanup() throws Exception {
+ System.clearProperty("jute.maxbuffer");
+ super.cleanup();
+ }
+
+ protected void setConfigDefaults(ServiceConfiguration config, String
clusterName,
+ LocalBookkeeperEnsemble
bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) {
+ super.setConfigDefaults(config, clusterName, bookkeeperEnsemble,
brokerConfigZk);
+ config.setMetadataStoreBatchingEnabled(true);
+ config.setMetadataStoreBatchingMaxOperations(1000);
+ config.setMetadataStoreBatchingMaxSizeKb(128);
+ config.setMetadataStoreBatchingMaxDelayMillis(20);
+ }
+
+ @Test(timeOut = 1000 * 60 * 2)
+ public void testReceivedHugeResponse() throws Exception {
+ int maxPacketLen =
Integer.parseInt(System.getProperty("jute.maxbuffer",
+ ZKClientConfig.CLIENT_MAX_PACKET_LENGTH_DEFAULT + ""));
+ String defaultTp =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ admin.topics().createNonPartitionedTopic(defaultTp);
+
+ int nsCount = (maxPacketLen / 834) + 1;
+ log.info("Try to create {} namespaces", nsCount);
+ String[] nsArray = new String[nsCount];
+ String[] tpArray = new String[nsCount];
+ for (int i = 0; i < nsCount; i++) {
+ String ns = BrokerTestUtil.newUniqueName(defaultTenant + "/ns");
+ nsArray[i] = ns;
+ String tp = BrokerTestUtil.newUniqueName("persistent://" + ns +
"/tp");
+ tpArray[i] = tp;
+ admin.namespaces().createNamespace(ns);
+ admin.topics().createPartitionedTopic(tp, 16);
+ }
+
+ int len =
pulsar.getLocalMetadataStore().getChildren("/managed-ledgers/" + nsArray[0] +
"/persistent").join()
+ .stream().mapToInt(str -> str.length()).sum();
+ log.info("Packet len of list topics of per namespace: {}", len);
+
+ long start = System.currentTimeMillis();
+ CompletableFuture<Void> createSubscriptionFuture = admin.topics()
+ .createSubscriptionAsync(defaultTp, "s1", MessageId.earliest);
+ for (int i = 0; i < nsCount; i++) {
+ pulsar.getLocalMetadataStore().getChildren("/managed-ledgers/" +
nsArray[i] + "/persistent");
+ }
+ log.info("Send multi ZK operations in {} ms. If it is larger than 20,
may can not reproduce the issue",
+ (System.currentTimeMillis() - start));
+
client.newConsumer().topic(defaultTp).subscriptionName("s1").subscribe().close();
+ createSubscriptionFuture.get(10, TimeUnit.SECONDS);
+
+ // cleanup.
+ for (int i = 0; i < nsCount; i++) {
+ admin.topics().deletePartitionedTopic(tpArray[i]);
+ }
+ for (int i = 0; i < nsCount; i++) {
+ admin.namespaces().deleteNamespace(nsArray[i]);
+ }
+ admin.topics().delete(defaultTp);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStatsSplitPathTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStatsSplitPathTest.java
new file mode 100644
index 00000000000..85d98dd7a6a
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStatsSplitPathTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.zookeeper;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import org.testng.annotations.Test;
+
+public class DefaultMetadataNodeSizeStatsSplitPathTest {
+
+ @Test
+ public void testSplitPathWithNullInput() {
+ // Test null input returns the meaningless split path result
+ DefaultMetadataNodeSizeStats.SplitPathRes result =
+ DefaultMetadataNodeSizeStats.splitPath(null);
+
+ // Should return the static MEANINGLESS_SPLIT_PATH_RES instance
+ assertEquals(result.partCount, 0);
+ assertNull(result.parts[0]);
+ assertNull(result.parts[1]);
+ }
+
+ @Test
+ public void testSplitPathWithEmptyString() {
+ // Test empty string returns the meaningless split path result
+ DefaultMetadataNodeSizeStats.SplitPathRes result =
+ DefaultMetadataNodeSizeStats.splitPath("");
+
+ assertEquals(result.partCount, 0);
+ assertNull(result.parts[0]);
+ assertNull(result.parts[1]);
+ }
+
+ @Test
+ public void testSplitPathWithSingleCharacter() {
+ // Test single character string returns the meaningless split path
result
+ DefaultMetadataNodeSizeStats.SplitPathRes result =
+ DefaultMetadataNodeSizeStats.splitPath("/");
+
+ assertEquals(result.partCount, 0);
+ assertNull(result.parts[0]);
+ assertNull(result.parts[1]);
+ }
+
+ @Test
+ public void testSplitPathWithSingleSlash() {
+ // Test single slash returns the meaningless split path result
+ DefaultMetadataNodeSizeStats.SplitPathRes result =
+ DefaultMetadataNodeSizeStats.splitPath("/");
+
+ assertEquals(result.partCount, 0);
+ assertNull(result.parts[0]);
+ assertNull(result.parts[1]);
+ }
+
+ @Test
+ public void testSplitPathWithTwoParts() {
+ // Test path with two parts: /admin/clusters
+ DefaultMetadataNodeSizeStats.SplitPathRes result =
+ DefaultMetadataNodeSizeStats.splitPath("/admin/clusters");
+
+ assertEquals(result.partCount, 2);
+ assertEquals(result.parts[0], "admin");
+ assertEquals(result.parts[1], "clusters");
+ }
+
+ @Test
+ public void testSplitPathWithThreeParts() {
+ // Test path with three parts: /admin/policies/tenant
+ DefaultMetadataNodeSizeStats.SplitPathRes result =
+
DefaultMetadataNodeSizeStats.splitPath("/admin/policies/tenant");
+
+ assertEquals(result.partCount, 3);
+ assertEquals(result.parts[0], "admin");
+ assertEquals(result.parts[1], "policies");
+ // Third part should not be stored in parts array (only first two are
kept)
+ }
+
+ @Test
+ public void testSplitPathWithFourParts() {
+ // Test path with four parts: /admin/policies/tenant/namespace
+ DefaultMetadataNodeSizeStats.SplitPathRes result =
+
DefaultMetadataNodeSizeStats.splitPath("/admin/policies/tenant/namespace");
+
+ assertEquals(result.partCount, 4);
+ assertEquals(result.parts[0], "admin");
+ assertEquals(result.parts[1], "policies");
+ // Only first two parts are stored in the parts array
+ }
+
+ @Test
+ public void testSplitPathWithFiveParts() {
+ // Test path with five parts: /admin/policies/tenant/namespace/topic
+ DefaultMetadataNodeSizeStats.SplitPathRes result =
+
DefaultMetadataNodeSizeStats.splitPath("/admin/policies/tenant/namespace/topic");
+
+ assertEquals(result.partCount, 5);
+ assertEquals(result.parts[0], "admin");
+ assertEquals(result.parts[1], "policies");
+ }
+
+ @Test
+ public void testSplitPathWithSixParts() {
+ // Test path with six parts:
/admin/partitioned-topics/persistent/tenant/namespace/topic
+ DefaultMetadataNodeSizeStats.SplitPathRes result =
DefaultMetadataNodeSizeStats
+
.splitPath("/admin/partitioned-topics/persistent/tenant/namespace/topic");
+
+ assertEquals(result.partCount, 6);
+ assertEquals(result.parts[0], "admin");
+ assertEquals(result.parts[1], "partitioned-topics");
+ }
+
+ @Test
+ public void testSplitPathWithManagedLedgerPath() {
+ // Test managed ledger path:
/managed-ledgers/tenant/namespace/persistent/topic
+ DefaultMetadataNodeSizeStats.SplitPathRes result =
+
DefaultMetadataNodeSizeStats.splitPath("/managed-ledgers/tenant/namespace/persistent/topic");
+
+ assertEquals(result.partCount, 5);
+ assertEquals(result.parts[0], "managed-ledgers");
+ assertEquals(result.parts[1], "tenant");
+ }
+
+ @Test
+ public void testSplitPathWithSubscriptionPath() {
+ // Test subscription path:
/managed-ledgers/tenant/namespace/persistent/topic/subscription
+ DefaultMetadataNodeSizeStats.SplitPathRes result =
DefaultMetadataNodeSizeStats
+
.splitPath("/managed-ledgers/tenant/namespace/persistent/topic/subscription");
+
+ assertEquals(result.partCount, 6);
+ assertEquals(result.parts[0], "managed-ledgers");
+ assertEquals(result.parts[1], "tenant");
+ }
+
+ @Test
+ public void testSplitPathWithTrailingSlash() {
+ // Test path with trailing slash: /admin/clusters/
+ DefaultMetadataNodeSizeStats.SplitPathRes result =
+ DefaultMetadataNodeSizeStats.splitPath("/admin/clusters/");
+
+ assertEquals(result.partCount, 2);
+ assertEquals(result.parts[0], "admin");
+ assertEquals(result.parts[1], "clusters");
+ }
+
+ @Test
+ public void testSplitPathWithMultipleTrailingSlashes() {
+ // Test path with multiple trailing slashes: /admin/clusters///
+ DefaultMetadataNodeSizeStats.SplitPathRes result =
+ DefaultMetadataNodeSizeStats.splitPath("/admin/clusters///");
+
+ assertEquals(result.partCount, 2);
+ assertEquals(result.parts[0], "admin");
+ assertEquals(result.parts[1], "clusters");
+ }
+
+ @Test
+ public void testSplitPathWithConsecutiveSlashes() {
+ // Test path with consecutive slashes: /admin//clusters
+ DefaultMetadataNodeSizeStats.SplitPathRes result =
+ DefaultMetadataNodeSizeStats.splitPath("/admin//clusters");
+
+ assertEquals(result.partCount, 2);
+ assertEquals(result.parts[0], "admin");
+ assertEquals(result.parts[1], "clusters");
+ }
+
+ @Test
+ public void testSplitPathWithLeadingSlashes() {
+ // Test path with multiple leading slashes: ///admin/clusters
+ DefaultMetadataNodeSizeStats.SplitPathRes result =
+ DefaultMetadataNodeSizeStats.splitPath("///admin/clusters");
+
+ assertEquals(result.partCount, 2);
+ assertEquals(result.parts[0], "admin");
+ assertEquals(result.parts[1], "clusters");
+ }
+
+ @Test
+ public void testSplitPathWithSinglePart() {
+ // Test path with single part: /admin
+ DefaultMetadataNodeSizeStats.SplitPathRes result =
+ DefaultMetadataNodeSizeStats.splitPath("/admin");
+
+ assertEquals(result.partCount, 1);
+ assertEquals(result.parts[0], "admin");
+ assertNull(result.parts[1]);
+ }
+
+ @Test
+ public void testSplitPathWithEmptyParts() {
+ // Test path with empty parts: /admin//clusters//tenant
+ DefaultMetadataNodeSizeStats.SplitPathRes result =
+
DefaultMetadataNodeSizeStats.splitPath("/admin//clusters//tenant");
+
+ assertEquals(result.partCount, 3);
+ assertEquals(result.parts[0], "admin");
+ assertEquals(result.parts[1], "clusters");
+ }
+
+ @Test
+ public void testSplitPathWithSpecialCharacters() {
+ // Test path with special characters: /admin-test/clusters_test
+ DefaultMetadataNodeSizeStats.SplitPathRes result =
+
DefaultMetadataNodeSizeStats.splitPath("/admin-test/clusters_test");
+
+ assertEquals(result.partCount, 2);
+ assertEquals(result.parts[0], "admin-test");
+ assertEquals(result.parts[1], "clusters_test");
+ }
+
+ @Test
+ public void testSplitPathWithNumbers() {
+ // Test path with numbers: /admin123/clusters456/tenant789
+ DefaultMetadataNodeSizeStats.SplitPathRes result =
+
DefaultMetadataNodeSizeStats.splitPath("/admin123/clusters456/tenant789");
+
+ assertEquals(result.partCount, 3);
+ assertEquals(result.parts[0], "admin123");
+ assertEquals(result.parts[1], "clusters456");
+ }
+
+ @Test
+ public void testSplitPathWithLongPath() {
+ // Test very long path with many parts
+ String longPath =
"/part1/part2/part3/part4/part5/part6/part7/part8/part9/part10";
+ DefaultMetadataNodeSizeStats.SplitPathRes result =
+ DefaultMetadataNodeSizeStats.splitPath(longPath);
+
+ assertEquals(result.partCount, 10);
+ assertEquals(result.parts[0], "part1");
+ assertEquals(result.parts[1], "part2");
+ // Only first two parts are stored
+ }
+
+ @Test
+ public void testSplitPathWithShortValidPath() {
+ // Test shortest valid path: /a
+ DefaultMetadataNodeSizeStats.SplitPathRes result =
+ DefaultMetadataNodeSizeStats.splitPath("/a");
+
+ assertEquals(result.partCount, 1);
+ assertEquals(result.parts[0], "a");
+ assertNull(result.parts[1]);
+ }
+
+ @Test
+ public void testSplitPathWithTwoCharacterPath() {
+ // Test two character path: /ab
+ DefaultMetadataNodeSizeStats.SplitPathRes result =
+ DefaultMetadataNodeSizeStats.splitPath("/ab");
+
+ assertEquals(result.partCount, 1);
+ assertEquals(result.parts[0], "ab");
+ assertNull(result.parts[1]);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStatsTest.java
new file mode 100644
index 00000000000..bfbed2324d1
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/zookeeper/DefaultMetadataNodeSizeStatsTest.java
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.zookeeper;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.Stat;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Comprehensive test suite for DefaultMetadataNodeSizeStats class.
+ *
+ * This test covers all the functionality of the DefaultMetadataNodeSizeStats
class including:
+ * - Path type classification for different Pulsar metadata paths
+ * - Size tracking for put and get operations
+ * - Children count tracking for list operations
+ * - Proper handling of edge cases and null values
+ * - Path splitting functionality
+ */
+public class DefaultMetadataNodeSizeStatsTest {
+
+ private DefaultMetadataNodeSizeStats stats;
+
+ @BeforeMethod
+ public void setUp() {
+ stats = new DefaultMetadataNodeSizeStats();
+ }
+
+ @Test
+ public void testRecordPutAndGetMaxSize() {
+ // Test with cluster path
+ String clusterPath = "/admin/clusters/test-cluster";
+ byte[] smallData = "small".getBytes(StandardCharsets.UTF_8);
+ byte[] largeData = "this is a much larger data
payload".getBytes(StandardCharsets.UTF_8);
+
+ // Initially should return UNSET (-1)
+ assertEquals(stats.getMaxSizeOfSameResourceType(clusterPath),
DefaultMetadataNodeSizeStats.UNSET);
+
+ // Record small data first
+ stats.recordPut(clusterPath, smallData);
+ assertEquals(stats.getMaxSizeOfSameResourceType(clusterPath),
smallData.length);
+
+ // Record larger data - should update the max
+ stats.recordPut(clusterPath, largeData);
+ assertEquals(stats.getMaxSizeOfSameResourceType(clusterPath),
largeData.length);
+
+ // Record smaller data again - should not change the max
+ stats.recordPut(clusterPath, smallData);
+ assertEquals(stats.getMaxSizeOfSameResourceType(clusterPath),
largeData.length);
+ }
+
+ @Test
+ public void testRecordGetResAndGetMaxSize() {
+ String tenantPath = "/admin/policies/test-tenant";
+ byte[] data1 = "data1".getBytes(StandardCharsets.UTF_8);
+ byte[] data2 = "much longer data
payload".getBytes(StandardCharsets.UTF_8);
+
+ // Create mock GetResult objects
+ Stat mockStat = Mockito.mock(Stat.class);
+ GetResult getResult1 = new GetResult(data1, mockStat);
+ GetResult getResult2 = new GetResult(data2, mockStat);
+
+ // Initially should return UNSET (-1)
+ assertEquals(stats.getMaxSizeOfSameResourceType(tenantPath),
DefaultMetadataNodeSizeStats.UNSET);
+
+ // Record first result
+ stats.recordGetRes(tenantPath, getResult1);
+ assertEquals(stats.getMaxSizeOfSameResourceType(tenantPath),
data1.length);
+
+ // Record larger result - should update the max
+ stats.recordGetRes(tenantPath, getResult2);
+ assertEquals(stats.getMaxSizeOfSameResourceType(tenantPath),
data2.length);
+
+ // Record smaller result again - should not change the max
+ stats.recordGetRes(tenantPath, getResult1);
+ assertEquals(stats.getMaxSizeOfSameResourceType(tenantPath),
data2.length);
+ }
+
+ @Test
+ public void testRecordGetChildrenResAndGetMaxChildrenCount() {
+ String namespacePath = "/admin/policies/test-tenant/test-namespace";
+ List<String> smallList = Arrays.asList("a", "b", "c");
+ List<String> largeList = Arrays.asList("longer-name-1",
"longer-name-2", "longer-name-3", "longer-name-4");
+
+ // Initially should return UNSET (-1)
+
assertEquals(stats.getMaxChildrenCountOfSameResourceType(namespacePath),
DefaultMetadataNodeSizeStats.UNSET);
+
+ // Record small list first - should track the count, not the total
length
+ stats.recordGetChildrenRes(namespacePath, smallList);
+
assertEquals(stats.getMaxChildrenCountOfSameResourceType(namespacePath),
smallList.size());
+
+ // Record larger list - should update the max count
+ stats.recordGetChildrenRes(namespacePath, largeList);
+
assertEquals(stats.getMaxChildrenCountOfSameResourceType(namespacePath),
largeList.size());
+
+ // Record smaller list again - should not change the max
+ stats.recordGetChildrenRes(namespacePath, smallList);
+
assertEquals(stats.getMaxChildrenCountOfSameResourceType(namespacePath),
largeList.size());
+ }
+
+ @Test
+ public void testUnknownPathsReturnMinusOne() {
+ // Test that unknown paths return -1 (not UNSET from internal storage)
+ String unknownPath = "/some/unknown/path";
+ assertEquals(stats.getMaxSizeOfSameResourceType(unknownPath), -1);
+ assertEquals(stats.getMaxChildrenCountOfSameResourceType(unknownPath),
-1);
+
+ // Test that recording data for unknown paths is ignored
+ byte[] data = "test-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(unknownPath, data);
+ assertEquals(stats.getMaxSizeOfSameResourceType(unknownPath), -1);
+
+ // Test that recording children for unknown paths is ignored
+ List<String> children = Arrays.asList("child1", "child2");
+ stats.recordGetChildrenRes(unknownPath, children);
+ assertEquals(stats.getMaxChildrenCountOfSameResourceType(unknownPath),
-1);
+ }
+
+ @Test
+ public void testNullGetResult() {
+ String testPath = "/admin/clusters/test";
+
+ // Recording null GetResult should not affect the stats
+ stats.recordGetRes(testPath, null);
+ assertEquals(stats.getMaxSizeOfSameResourceType(testPath),
DefaultMetadataNodeSizeStats.UNSET);
+
+ // Record valid data first
+ byte[] data = "test-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(testPath, data);
+ assertEquals(stats.getMaxSizeOfSameResourceType(testPath),
data.length);
+
+ // Recording null GetResult should not change the existing max
+ stats.recordGetRes(testPath, null);
+ assertEquals(stats.getMaxSizeOfSameResourceType(testPath),
data.length);
+ }
+
+ @Test
+ public void testAdminPathTypeClassification() {
+ // Test cluster paths
+ String clusterPath = "/admin/clusters/test-cluster";
+ byte[] clusterData = "cluster-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(clusterPath, clusterData);
+ assertEquals(stats.getMaxSizeOfSameResourceType(clusterPath),
clusterData.length);
+
+ // Test tenant paths (policies with 2 parts)
+ String tenantPath = "/admin/policies/test-tenant";
+ byte[] tenantData = "tenant-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(tenantPath, tenantData);
+ assertEquals(stats.getMaxSizeOfSameResourceType(tenantPath),
tenantData.length);
+
+ // Test namespace policy paths (policies with 3 parts)
+ String namespacePath = "/admin/policies/test-tenant/test-namespace";
+ byte[] namespaceData =
"namespace-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(namespacePath, namespaceData);
+ assertEquals(stats.getMaxSizeOfSameResourceType(namespacePath),
namespaceData.length);
+
+ // Test local policies paths
+ String localPoliciesPath =
"/admin/local-policies/test-tenant/test-namespace";
+ byte[] localPoliciesData =
"local-policies-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(localPoliciesPath, localPoliciesData);
+ assertEquals(stats.getMaxSizeOfSameResourceType(localPoliciesPath),
localPoliciesData.length);
+
+ // Test partitioned namespace paths (5 parts)
+ String partitionedNamespacePath =
"/admin/partitioned-topics/persistent/test-tenant/test-namespace";
+ byte[] partitionedNamespaceData =
"partitioned-namespace-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(partitionedNamespacePath, partitionedNamespaceData);
+
assertEquals(stats.getMaxSizeOfSameResourceType(partitionedNamespacePath),
partitionedNamespaceData.length);
+
+ // Test partitioned topic paths (6 parts)
+ String partitionedTopicPath =
"/admin/partitioned-topics/persistent/test-tenant/test-namespace/test-topic";
+ byte[] partitionedTopicData =
"partitioned-topic-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(partitionedTopicPath, partitionedTopicData);
+ assertEquals(stats.getMaxSizeOfSameResourceType(partitionedTopicPath),
partitionedTopicData.length);
+
+ // Verify that different path types maintain separate max values
+ assertNotEquals(stats.getMaxSizeOfSameResourceType(clusterPath),
+ stats.getMaxSizeOfSameResourceType(tenantPath));
+ assertNotEquals(stats.getMaxSizeOfSameResourceType(tenantPath),
+ stats.getMaxSizeOfSameResourceType(namespacePath));
+ }
+
+ @Test
+ public void testManagedLedgerPathTypes() {
+ // Test ML namespace paths (4 parts)
+ String mlNamespacePath =
"/managed-ledgers/tenant/namespace/persistent";
+ byte[] mlNamespaceData =
"ml-namespace-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(mlNamespacePath, mlNamespaceData);
+ assertEquals(stats.getMaxSizeOfSameResourceType(mlNamespacePath),
mlNamespaceData.length);
+
+ // Test topic paths (5 parts)
+ String topicPath =
"/managed-ledgers/tenant/namespace/persistent/topic";
+ byte[] topicData = "topic-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(topicPath, topicData);
+ assertEquals(stats.getMaxSizeOfSameResourceType(topicPath),
topicData.length);
+
+ // Test v2 subscription paths (6 parts)
+ String v2SubscriptionPath =
"/managed-ledgers/tenant/namespace/persistent/topic/subscription";
+ byte[] v2SubscriptionData =
"v2-subscription-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(v2SubscriptionPath, v2SubscriptionData);
+ assertEquals(stats.getMaxSizeOfSameResourceType(v2SubscriptionPath),
v2SubscriptionData.length);
+
+ // Test v1 subscription paths (7 parts)
+ String v1SubscriptionPath =
"/managed-ledgers/tenant/cluster/namespace/persistent/topic/subscription";
+ byte[] v1SubscriptionData =
"v1-subscription-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(v1SubscriptionPath, v1SubscriptionData);
+ assertEquals(stats.getMaxSizeOfSameResourceType(v1SubscriptionPath),
v1SubscriptionData.length);
+
+ // Verify that v2 and v1 subscriptions use the same path type
(SUBSCRIPTION)
+ assertEquals(stats.getMaxSizeOfSameResourceType(v2SubscriptionPath),
+ Math.max(v2SubscriptionData.length,
v1SubscriptionData.length));
+ assertEquals(stats.getMaxSizeOfSameResourceType(v1SubscriptionPath),
+ Math.max(v2SubscriptionData.length,
v1SubscriptionData.length));
+
+ // Verify different path types maintain separate max values
+ assertNotEquals(stats.getMaxSizeOfSameResourceType(mlNamespacePath),
+ stats.getMaxSizeOfSameResourceType(topicPath));
+ assertNotEquals(stats.getMaxSizeOfSameResourceType(topicPath),
+ stats.getMaxSizeOfSameResourceType(v2SubscriptionPath));
+ }
+
+ @Test
+ public void testLoadBalancePathTypes() {
+ // Test brokers path (2 parts)
+ String brokersPath = "/loadbalance/brokers";
+ byte[] brokersData = "brokers-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(brokersPath, brokersData);
+ assertEquals(stats.getMaxSizeOfSameResourceType(brokersPath),
brokersData.length);
+
+ // Test broker path (3 parts)
+ String brokerPath = "/loadbalance/brokers/broker1";
+ byte[] brokerData = "broker-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(brokerPath, brokerData);
+ assertEquals(stats.getMaxSizeOfSameResourceType(brokerPath),
brokerData.length);
+
+ // Test bundle namespace path (4 parts)
+ String bundleNamespacePath =
"/loadbalance/bundle-data/tenant/namespace";
+ byte[] bundleNamespaceData =
"bundle-namespace-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(bundleNamespacePath, bundleNamespaceData);
+ assertEquals(stats.getMaxSizeOfSameResourceType(bundleNamespacePath),
bundleNamespaceData.length);
+
+ // Test bundle data path (5 parts)
+ String bundleDataPath =
"/loadbalance/bundle-data/tenant/namespace/bundle";
+ byte[] bundleData = "bundle-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(bundleDataPath, bundleData);
+ assertEquals(stats.getMaxSizeOfSameResourceType(bundleDataPath),
bundleData.length);
+
+ // Test broker time average path (3 parts)
+ String brokerTimeAvgPath = "/loadbalance/broker-time-average/broker1";
+ byte[] brokerTimeAvgData =
"broker-time-avg-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(brokerTimeAvgPath, brokerTimeAvgData);
+ assertEquals(stats.getMaxSizeOfSameResourceType(brokerTimeAvgPath),
brokerTimeAvgData.length);
+
+ // Test leader path
+ String leaderPath = "/loadbalance/leader";
+ byte[] leaderData = "leader-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(leaderPath, leaderData);
+ assertEquals(stats.getMaxSizeOfSameResourceType(leaderPath),
leaderData.length);
+
+ // Verify different path types maintain separate max values
+ assertNotEquals(stats.getMaxSizeOfSameResourceType(brokersPath),
+ stats.getMaxSizeOfSameResourceType(brokerPath));
+ }
+
+ @Test
+ public void testBundleOwnerPathTypes() {
+ // Test bundle owner namespace path (3 parts)
+ String bundleOwnerNamespacePath = "/namespace/tenant/namespace";
+ byte[] bundleOwnerNamespaceData =
"bundle-owner-namespace-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(bundleOwnerNamespacePath, bundleOwnerNamespaceData);
+
assertEquals(stats.getMaxSizeOfSameResourceType(bundleOwnerNamespacePath),
bundleOwnerNamespaceData.length);
+
+ // Test bundle owner path (4 parts)
+ String bundleOwnerPath = "/namespace/tenant/namespace/bundle";
+ byte[] bundleOwnerData =
"bundle-owner-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(bundleOwnerPath, bundleOwnerData);
+ assertEquals(stats.getMaxSizeOfSameResourceType(bundleOwnerPath),
bundleOwnerData.length);
+
+ // Verify different path types maintain separate max values
+
assertNotEquals(stats.getMaxSizeOfSameResourceType(bundleOwnerNamespacePath),
+ stats.getMaxSizeOfSameResourceType(bundleOwnerPath));
+ }
+
+ @Test
+ public void testSchemaPathTypes() {
+ // Test topic schema path (4 parts)
+ String schemaPath = "/schemas/tenant/namespace/topic";
+ byte[] schemaData = "schema-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(schemaPath, schemaData);
+ assertEquals(stats.getMaxSizeOfSameResourceType(schemaPath),
schemaData.length);
+
+ // Test invalid schema path (3 parts) - should be UNKNOWN
+ String invalidSchemaPath = "/schemas/tenant/namespace";
+ byte[] invalidSchemaData =
"invalid-schema-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(invalidSchemaPath, invalidSchemaData);
+ assertEquals(stats.getMaxSizeOfSameResourceType(invalidSchemaPath),
-1);
+
+ // Verify they use different path types
+ assertNotEquals(stats.getMaxSizeOfSameResourceType(schemaPath),
+ stats.getMaxSizeOfSameResourceType(invalidSchemaPath));
+ }
+
+ @Test
+ public void testShortAndUnknownPaths() {
+ // Test paths that are too short to be classified
+ String shortPath1 = "/";
+ String shortPath2 = "/admin";
+
+ byte[] data = "test-data".getBytes(StandardCharsets.UTF_8);
+
+ // Recording data for unknown paths should be ignored
+ stats.recordPut(shortPath1, data);
+ stats.recordPut(shortPath2, data);
+
+ // Both should return -1 since they are UNKNOWN path types
+ assertEquals(stats.getMaxSizeOfSameResourceType(shortPath1), -1);
+ assertEquals(stats.getMaxSizeOfSameResourceType(shortPath2), -1);
+
+ // Test unknown admin paths
+ String unknownAdminPath = "/admin/unknown/path";
+ byte[] unknownData =
"unknown-admin-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(unknownAdminPath, unknownData);
+
+ // Should return -1 since it's an UNKNOWN path type
+ assertEquals(stats.getMaxSizeOfSameResourceType(unknownAdminPath), -1);
+
+ // Test children recording for unknown paths
+ List<String> children = Arrays.asList("child1", "child2");
+ stats.recordGetChildrenRes(shortPath1, children);
+ assertEquals(stats.getMaxChildrenCountOfSameResourceType(shortPath1),
-1);
+ }
+
+ @Test
+ public void testEmptyAndZeroSizeData() {
+ String testPath = "/admin/clusters/test";
+
+ // Test with empty data
+ byte[] emptyData = new byte[0];
+ stats.recordPut(testPath, emptyData);
+ assertEquals(stats.getMaxSizeOfSameResourceType(testPath), 0);
+
+ // Test with empty list
+ List<String> emptyList = Collections.emptyList();
+ stats.recordGetChildrenRes(testPath, emptyList);
+ assertEquals(stats.getMaxChildrenCountOfSameResourceType(testPath), 0);
+
+ // Record larger data - should update the max
+ byte[] largerData = "larger-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(testPath, largerData);
+ assertEquals(stats.getMaxSizeOfSameResourceType(testPath),
largerData.length);
+
+ // Record larger list - should update the max
+ List<String> largerList = Arrays.asList("item1", "item2", "item3");
+ stats.recordGetChildrenRes(testPath, largerList);
+ assertEquals(stats.getMaxChildrenCountOfSameResourceType(testPath),
largerList.size());
+ }
+
+ @Test
+ public void testMixedOperationsOnSamePath() {
+ String testPath = "/admin/policies/test-tenant";
+
+ // Record via put
+ byte[] putData = "put-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(testPath, putData);
+ assertEquals(stats.getMaxSizeOfSameResourceType(testPath),
putData.length);
+
+ // Record via get with larger data - should update max
+ byte[] largerData = "much larger get result
data".getBytes(StandardCharsets.UTF_8);
+ Stat mockStat = Mockito.mock(Stat.class);
+ GetResult getResult = new GetResult(largerData, mockStat);
+ stats.recordGetRes(testPath, getResult);
+ assertEquals(stats.getMaxSizeOfSameResourceType(testPath),
largerData.length);
+
+ // Record via put with smaller data - should not change max
+ byte[] smallerData = "small".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(testPath, smallerData);
+ assertEquals(stats.getMaxSizeOfSameResourceType(testPath),
largerData.length);
+
+ // Test children count operations
+ List<String> smallList = Arrays.asList("a", "b");
+ stats.recordGetChildrenRes(testPath, smallList);
+ assertEquals(stats.getMaxChildrenCountOfSameResourceType(testPath),
smallList.size());
+
+ List<String> largerList = Arrays.asList("a", "b", "c", "d", "e");
+ stats.recordGetChildrenRes(testPath, largerList);
+ assertEquals(stats.getMaxChildrenCountOfSameResourceType(testPath),
largerList.size());
+ }
+
+ @Test
+ public void testSplitPathFunctionality() {
+ // Test the static splitPath method indirectly through path type
classification
+
+ // Test path with trailing slashes
+ String pathWithTrailingSlash = "/admin/clusters/test-cluster/";
+ byte[] data = "test-data".getBytes(StandardCharsets.UTF_8);
+ stats.recordPut(pathWithTrailingSlash, data);
+
assertEquals(stats.getMaxSizeOfSameResourceType(pathWithTrailingSlash),
data.length);
+
+ // Should be classified the same as path without trailing slash
+ String pathWithoutTrailingSlash = "/admin/clusters/test-cluster";
+
assertEquals(stats.getMaxSizeOfSameResourceType(pathWithoutTrailingSlash),
data.length);
+
+ // Test path with multiple consecutive slashes
+ String pathWithMultipleSlashes = "/admin//clusters//test-cluster";
+
assertEquals(stats.getMaxSizeOfSameResourceType(pathWithMultipleSlashes),
data.length);
+ }
+
+ @Test
+ public void testUnknownPathTypesAreIgnored() {
+ // Test that UNKNOWN path types don't affect any statistics
+ String[] unknownPaths = {
+ "/",
+ "/admin",
+ "/unknown/path",
+ "/admin/unknown/path/type",
+ "/managed-ledgers", // too short
+ "/loadbalance", // too short
+ "/schemas/tenant", // too short (needs 4 parts)
+ "/admin/policies/tenant/namespace/extra", // too long for policies
+ "/admin/partitioned-topics/persistent", // too short for
partitioned topics
+
"/admin/partitioned-topics/persistent/tenant/namespace/topic/extra" // too long
+ };
+
+ byte[] data = "test-data".getBytes(StandardCharsets.UTF_8);
+ List<String> children = Arrays.asList("child1", "child2", "child3");
+
+ for (String unknownPath : unknownPaths) {
+ // Record operations should be ignored
+ stats.recordPut(unknownPath, data);
+ stats.recordGetChildrenRes(unknownPath, children);
+
+ // Should always return -1
+ assertEquals(stats.getMaxSizeOfSameResourceType(unknownPath), -1,
+ "Path should return -1: " + unknownPath);
+
assertEquals(stats.getMaxChildrenCountOfSameResourceType(unknownPath), -1,
+ "Path should return -1: " + unknownPath);
+ }
+ }
+
+ @Test
+ public void testEmptyAndNullListHandling() {
+ String testPath = "/admin/clusters/test";
+
+ // Test with null list (should be handled gracefully)
+ stats.recordGetChildrenRes(testPath, null);
+ assertEquals(stats.getMaxChildrenCountOfSameResourceType(testPath), 0);
+
+ // Test with empty list
+ List<String> emptyList = Collections.emptyList();
+ stats.recordGetChildrenRes(testPath, emptyList);
+ assertEquals(stats.getMaxChildrenCountOfSameResourceType(testPath), 0);
+
+ // Test with non-empty list - should update the max
+ List<String> nonEmptyList = Arrays.asList("item1", "item2", "item3");
+ stats.recordGetChildrenRes(testPath, nonEmptyList);
+ assertEquals(stats.getMaxChildrenCountOfSameResourceType(testPath),
nonEmptyList.size());
+
+ // Test with empty list again - should not change the max
+ stats.recordGetChildrenRes(testPath, emptyList);
+ assertEquals(stats.getMaxChildrenCountOfSameResourceType(testPath),
nonEmptyList.size());
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/DummyMetadataNodeSizeStats.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/DummyMetadataNodeSizeStats.java
new file mode 100644
index 00000000000..ba60c662588
--- /dev/null
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/DummyMetadataNodeSizeStats.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.api;
+
+import java.util.List;
+
+public class DummyMetadataNodeSizeStats implements MetadataNodeSizeStats {
+
+ @Override
+ public void recordPut(String path, byte[] data) {}
+
+ @Override
+ public void recordGetRes(String path, GetResult getResult) {}
+
+ @Override
+ public void recordGetChildrenRes(String path, List<String> list) {}
+
+ @Override
+ public int getMaxSizeOfSameResourceType(String path) {
+ return 1;
+ }
+
+ @Override
+ public int getMaxChildrenCountOfSameResourceType(String path) {
+ return 1;
+ }
+}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataNodeSizeStats.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataNodeSizeStats.java
new file mode 100644
index 00000000000..92d6084b46c
--- /dev/null
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataNodeSizeStats.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.api;
+
+import java.util.List;
+
+/***
+ * The interface to cache the max payload length of metadata node. It is
helpful for the following cases:
+ * 1. the limitation of the response of batching query from metadata store.
For example, the ZK client limits the max
+ * length of response packet to 1MB by default, if the response packet is
larger than the limitation, the ZK client
+ * will throw an error "Packet len {len} is out of range!" and reconnects.
+ * 2. expose the metrics of payload length of metadata node.
+ */
+public interface MetadataNodeSizeStats {
+
+ /**
+ * Record the payload length of put operation.
+ */
+ void recordPut(String path, byte[] data);
+
+ /**
+ * Record the payload length of get result.
+ */
+ void recordGetRes(String path, GetResult getResult);
+
+ /**
+ * Record the payload length of list result.
+ */
+ void recordGetChildrenRes(String path, List<String> list);
+
+ /**
+ * Get the max size of same resource type.
+ */
+ int getMaxSizeOfSameResourceType(String path);
+
+ /**
+ * Get the max children count of same resource type.
+ */
+ int getMaxChildrenCountOfSameResourceType(String path);
+}
\ No newline at end of file
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
index 5ddfe33c391..b25c123d9d8 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
@@ -92,4 +92,9 @@ public class MetadataStoreConfig {
* separate clusters.
*/
private MetadataEventSynchronizer synchronizer;
+
+ /**
+ * The estimator to estimate the payload length of metadata node, which
used to limit the batch size requested.
+ */
+ private MetadataNodeSizeStats nodeSizeStats;
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index ccbb2a11a90..102d3f85b11 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -48,11 +48,13 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.DummyMetadataNodeSizeStats;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataCacheConfig;
import org.apache.pulsar.metadata.api.MetadataEvent;
import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
+import org.apache.pulsar.metadata.api.MetadataNodeSizeStats;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
@@ -84,11 +86,16 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
+ protected MetadataNodeSizeStats nodeSizeStats;
+
protected abstract CompletableFuture<List<String>>
getChildrenFromStore(String path);
protected abstract CompletableFuture<Boolean> existsFromStore(String path);
- protected AbstractMetadataStore(String metadataStoreName) {
+ protected AbstractMetadataStore(String metadataStoreName,
+ MetadataNodeSizeStats nodeSizeStats) {
+ this.nodeSizeStats = nodeSizeStats == null ? new
DummyMetadataNodeSizeStats()
+ : nodeSizeStats;
this.executor = new ScheduledThreadPoolExecutor(1,
new DefaultThreadFactory(
StringUtils.isNotBlank(metadataStoreName) ?
metadataStoreName : getClass().getSimpleName()));
@@ -268,6 +275,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
return storeGet(path)
.whenComplete((v, t) -> {
if (t != null) {
+ v.ifPresent(getResult ->
nodeSizeStats.recordGetRes(path, getResult));
metadataStoreStats.recordGetOpsFailed(System.currentTimeMillis() - start);
} else {
metadataStoreStats.recordGetOpsSucceeded(System.currentTimeMillis() - start);
@@ -290,7 +298,11 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new
MetadataStoreException.InvalidPathException(path));
}
- return childrenCache.get(path);
+ CompletableFuture<List<String>> listFuture = childrenCache.get(path);
+ listFuture.thenAccept((list) -> {
+ nodeSizeStats.recordGetChildrenRes(path, list);
+ });
+ return listFuture;
}
@Override
@@ -476,6 +488,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
NotificationType type = stat.isFirstVersion() ?
NotificationType.Created
: NotificationType.Modified;
if (type == NotificationType.Created) {
+ nodeSizeStats.recordPut(path, data);
existsCache.synchronous().invalidate(path);
String parent = parent(path);
if (parent != null) {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 3909a89cf5e..7661c73640f 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -78,7 +78,8 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig
metadataStoreConfig)
throws MetadataStoreException {
- super(metadataStoreConfig.getMetadataStoreName());
+ super(metadataStoreConfig.getMetadataStoreName(),
+ metadataStoreConfig.getNodeSizeStats());
String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length());
// Local means a private data set
// update synchronizer and register sync listener
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 06f7b260536..f900418842e 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -209,7 +209,8 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
*/
private RocksdbMetadataStore(String metadataURL, MetadataStoreConfig
metadataStoreConfig)
throws MetadataStoreException {
- super(metadataStoreConfig.getMetadataStoreName());
+ super(metadataStoreConfig.getMetadataStoreName(),
+ metadataStoreConfig.getNodeSizeStats());
this.metadataUrl = metadataURL;
try {
RocksDB.loadLibrary();
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 5ac87461110..acd7f7de8e9 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -31,7 +31,7 @@ import java.util.stream.Collectors;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStore;
@@ -54,6 +54,7 @@ import org.apache.pulsar.metadata.impl.batching.OpDelete;
import org.apache.pulsar.metadata.impl.batching.OpGet;
import org.apache.pulsar.metadata.impl.batching.OpGetChildren;
import org.apache.pulsar.metadata.impl.batching.OpPut;
+import org.apache.pulsar.metadata.impl.batching.ZKMetadataStoreBatchStrategy;
import org.apache.zookeeper.AddWatchMode;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
@@ -109,6 +110,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
sessionWatcher = null;
}
zkc.addWatch("/", eventWatcher, AddWatchMode.PERSISTENT_RECURSIVE);
+ initBatchStrategy();
} catch (Throwable t) {
throw new MetadataStoreException(t);
}
@@ -143,6 +145,14 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
this.zkc = zkc;
this.sessionWatcher = new ZKSessionWatcher(zkc,
this::receivedSessionEvent);
zkc.addWatch("/", eventWatcher, AddWatchMode.PERSISTENT_RECURSIVE);
+ initBatchStrategy();
+ }
+
+ private void initBatchStrategy() {
+ ZKMetadataStoreBatchStrategy batchStrategy =
+ new ZKMetadataStoreBatchStrategy(nodeSizeStats, maxOperations,
maxSize, zkc);
+ this.maxSize = batchStrategy.maxSize();
+ super.metadataStoreBatchStrategy = batchStrategy;
}
@Override
@@ -203,18 +213,34 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
// Build the log warning message
// summarize the operations by type
+ final int logThresholdPut = maxSize >> 4;
+ final int logThresholdGet = maxSize >> 4;
String countsByType = ops.stream().collect(
Collectors.groupingBy(MetadataOp::getType, Collectors.summingInt(op -> 1)))
- .entrySet().stream().map(e -> e.getValue() + "
" + e.getKey().name() + " entries")
+ .entrySet().stream().map(e -> e.getValue() + "
" + e.getKey().name())
.collect(Collectors.joining(", "));
- List<Pair> opsForLog = ops.stream()
- .filter(item -> item.size() > 256 * 1024)
- .map(op -> Pair.of(op.getPath(), op.size()))
+ boolean shouldLimitLogLen = ops.size() > 16;
+ List<Triple<String, String, Integer>> opsForLog =
ops.stream()
+ .filter(item -> {
+ if (!shouldLimitLogLen) {
+ return true;
+ }
+ return switch (item.getType()) {
+ case PUT ->
item.asPut().getData().length > logThresholdPut;
+ case GET -> nodeSizeStats
+
.getMaxSizeOfSameResourceType(item.getPath()) > logThresholdGet;
+ case GET_CHILDREN -> nodeSizeStats
+
.getMaxChildrenCountOfSameResourceType(item.getPath()) > 512;
+ default -> false;
+ };
+ })
+ .map(op -> Triple.of(op.getPath(),
op.getType().toString(), op.size()))
.collect(Collectors.toList());
Long totalSize =
ops.stream().collect(Collectors.summingLong(MetadataOp::size));
log.warn("Connection loss while executing batch
operation of {} "
- + "of total data size of {}. "
- + "Retrying individual operations one-by-one.
ops whose size > 256KB: {}",
+ + "of total requested data size of {}. "
+ + "Retrying individual operations one-by-one."
+ + " ops that maybe large: {}",
countsByType, totalSize, opsForLog);
// Retry with the individual operations
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index 5b45530d2e2..0114534853d 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.metadata.impl.batching;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
@@ -28,6 +27,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -50,13 +50,14 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
private final boolean enabled;
private final int maxDelayMillis;
- private final int maxOperations;
- private final int maxSize;
+ protected final int maxOperations;
+ protected int maxSize;
private MetadataEventSynchronizer synchronizer;
private final BatchMetadataStoreStats batchMetadataStoreStats;
+ protected MetadataStoreBatchStrategy metadataStoreBatchStrategy;
protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) {
- super(conf.getMetadataStoreName());
+ super(conf.getMetadataStoreName(), conf.getNodeSizeStats());
this.enabled = conf.isBatchingEnabled();
this.maxDelayMillis = conf.getBatchingMaxDelayMillis();
@@ -78,6 +79,7 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
updateMetadataEventSynchronizer(conf.getSynchronizer());
this.batchMetadataStoreStats =
new BatchMetadataStoreStats(metadataStoreName, executor);
+ this.metadataStoreBatchStrategy = new
DefaultMetadataStoreBatchStrategy(maxOperations, maxSize);
}
@Override
@@ -96,33 +98,17 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
}
private void flush() {
- while (!readOps.isEmpty()) {
- List<MetadataOp> ops = new ArrayList<>();
- readOps.drain(ops::add, maxOperations);
- internalBatchOperation(ops);
+ List<MetadataOp> currentBatch;
+ if (!readOps.isEmpty()) {
+ while (CollectionUtils.isNotEmpty(currentBatch =
metadataStoreBatchStrategy.nextBatch(readOps))) {
+ internalBatchOperation(currentBatch);
+ }
}
-
- while (!writeOps.isEmpty()) {
- int batchSize = 0;
-
- List<MetadataOp> ops = new ArrayList<>();
- for (int i = 0; i < maxOperations; i++) {
- MetadataOp op = writeOps.peek();
- if (op == null) {
- break;
- }
-
- if (i > 0 && (batchSize + op.size()) > maxSize) {
- // We have already reached the max size, so flush the
current batch
- break;
- }
-
- batchSize += op.size();
- ops.add(writeOps.poll());
+ if (!writeOps.isEmpty()) {
+ while (CollectionUtils.isNotEmpty(currentBatch =
metadataStoreBatchStrategy.nextBatch(writeOps))) {
+ internalBatchOperation(currentBatch);
}
- internalBatchOperation(ops);
}
-
flushInProgress.set(false);
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/DefaultMetadataStoreBatchStrategy.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/DefaultMetadataStoreBatchStrategy.java
new file mode 100644
index 00000000000..766c2551775
--- /dev/null
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/DefaultMetadataStoreBatchStrategy.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl.batching;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.jctools.queues.MessagePassingQueue;
+
+/***
+ * The default batching strategy, which only consider how the max operations
and max size work for the request packet.
+ * And do not care about the response packet.
+ */
+public class DefaultMetadataStoreBatchStrategy implements
MetadataStoreBatchStrategy {
+
+ private final int maxOperations;
+ private final int maxPutSize;
+
+ public DefaultMetadataStoreBatchStrategy(int maxOperations, int
maxPutSize) {
+ this.maxOperations = maxOperations;
+ this.maxPutSize = maxPutSize;
+ }
+
+ @Override
+ public List<MetadataOp> nextBatch(MessagePassingQueue<MetadataOp> opsSrc) {
+ int requestSize = 0;
+ List<MetadataOp> ops = new ArrayList<>();
+ while (!opsSrc.isEmpty()) {
+ MetadataOp op = opsSrc.peek();
+ if (op == null) {
+ break;
+ }
+ MetadataOp.Type type = op.getType();
+ switch (type) {
+ case PUT:
+ case DELETE: {
+ requestSize += op.size();
+ break;
+ }
+ default: {}
+ }
+ if (!ops.isEmpty() && requestSize > maxPutSize) {
+ // We have already reached the max size, so flush the current
batch.
+ break;
+ }
+ ops.add(opsSrc.poll());
+ if (ops.size() == maxOperations) {
+ break;
+ }
+ }
+ return ops;
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataStoreBatchStrategy.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataStoreBatchStrategy.java
new file mode 100644
index 00000000000..e37d80c27b3
--- /dev/null
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataStoreBatchStrategy.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl.batching;
+
+import java.util.List;
+import org.jctools.queues.MessagePassingQueue;
+
+/***
+ * Used to split ops into multi batch. Because that one batch may can not
afford all ops.
+ */
+public interface MetadataStoreBatchStrategy {
+
+ /**
+ * Get the next batch of operations to execute.
+ */
+ List<MetadataOp> nextBatch(MessagePassingQueue<MetadataOp> ops);
+}
\ No newline at end of file
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/ZKMetadataStoreBatchStrategy.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/ZKMetadataStoreBatchStrategy.java
new file mode 100644
index 00000000000..ff8cc74d4c1
--- /dev/null
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/ZKMetadataStoreBatchStrategy.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl.batching;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pulsar.metadata.api.MetadataNodeSizeStats;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ZKConfig;
+import org.jctools.queues.MessagePassingQueue;
+
+public class ZKMetadataStoreBatchStrategy implements
MetadataStoreBatchStrategy {
+
+ // The headers of response command contains the following attributes,
which cost 88 bytes.
+ // Base attrs: xid(int), zxid(long), err(int), len(int)
+ // Stat attrs: czxid(long), mzxid(long), ctime(long), mtime(long),
version(int), cversion(int), aversion(int)
+ // ephemeralOwner(long), dataLength(int), numChildren(int),
pzxid(long)
+ // By the way, the length of response header may be different between
different version, since we had use a half
+ // of max size, we can skip to consider the difference.
+ public static final int ZK_RESPONSE_HEADER_LEN = 88;
+ private final int defaultSize;
+
+ private final int maxOperations;
+ private final int maxGetSize;
+ private final int maxPutSize;
+ private final MetadataNodeSizeStats nodeSizeStats;
+
+ public ZKMetadataStoreBatchStrategy(MetadataNodeSizeStats nodeSizeStats,
int maxOperations, int defaultMaxSize,
+ ZooKeeper zkc) {
+ int maxSizeConfigured = zkc.getClientConfig().getInt(
+ ZKConfig.JUTE_MAXBUFFER,
+ ZKClientConfig.CLIENT_MAX_PACKET_LENGTH_DEFAULT);
+ maxSizeConfigured = maxSizeConfigured > 0 ? maxSizeConfigured :
defaultMaxSize;
+ this.maxOperations = maxOperations;
+ this.maxGetSize = maxSizeConfigured;
+ this.maxPutSize = maxSizeConfigured;
+ this.nodeSizeStats = nodeSizeStats;
+ // If the size of the node can not be calculated by "nodeSizeStats",
at most package 8 ops into a batch.
+ this.defaultSize = Math.max(maxPutSize >>> 4, 1024);
+ }
+
+ public int maxSize() {
+ return maxPutSize;
+ }
+
+ @Override
+ public List<MetadataOp> nextBatch(MessagePassingQueue<MetadataOp> opsSrc) {
+ int requestSize = 0;
+ int estimatedResponseSize = 0;
+ // Since the response size is estimated, we use half of the max size
to be safe.
+ int maxGetSize = this.maxGetSize >>> 1;
+ List<MetadataOp> ops = new ArrayList<>();
+ while (!opsSrc.isEmpty()) {
+ MetadataOp op = opsSrc.peek();
+ if (op == null) {
+ break;
+ }
+ MetadataOp.Type type = op.getType();
+ String path = op.getPath();
+ switch (type) {
+ case GET_CHILDREN: {
+ estimatedResponseSize += ZK_RESPONSE_HEADER_LEN;
+ int childrenCount =
nodeSizeStats.getMaxChildrenCountOfSameResourceType(path);
+ if (childrenCount < 0) {
+ estimatedResponseSize += defaultSize;
+ break;
+ }
+ // The way that combines list of nodes is as follows
+ // [4 bytes that indicates the length of the next item]
[item name].
+ // So we add 4 bytes for each item.
+ int size =
nodeSizeStats.getMaxSizeOfSameResourceType(path);
+ if (size > 0) {
+ estimatedResponseSize += (childrenCount * (size + 4));
+ } else {
+ estimatedResponseSize += (childrenCount * (defaultSize
+ 4));
+ }
+ break;
+ }
+ case GET: {
+ estimatedResponseSize += ZK_RESPONSE_HEADER_LEN;
+ int size =
nodeSizeStats.getMaxSizeOfSameResourceType(path);
+ if (size > 0) {
+ estimatedResponseSize += size;
+ } else {
+ estimatedResponseSize += defaultSize;
+ }
+ break;
+ }
+ case DELETE:
+ case PUT: {
+ requestSize += op.size();
+ // The response of creation contains two attributes: stat
and path, so we add them into the
+ // estimation response size.
+ estimatedResponseSize += ZK_RESPONSE_HEADER_LEN;
+ estimatedResponseSize += path.length();
+ break;
+ }
+ default: {
+ estimatedResponseSize += ZK_RESPONSE_HEADER_LEN;
+ estimatedResponseSize += path.length();
+ }
+ }
+ if (!ops.isEmpty() && (estimatedResponseSize > maxGetSize ||
requestSize > maxPutSize)) {
+ // We have already reached the max size, so flush the current
batch.
+ break;
+ }
+ ops.add(opsSrc.poll());
+ if (ops.size() == maxOperations) {
+ break;
+ }
+ }
+ return ops;
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
index 34e860aa578..e0e95c85529 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
@@ -94,7 +94,7 @@ public class MetadataStoreFactoryImplTest {
public static class MyMetadataStore extends AbstractMetadataStore {
protected MyMetadataStore() {
- super("custom");
+ super("custom", null);
}
@Override
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/batching/ZKMetadataStoreBatchStrategyTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/batching/ZKMetadataStoreBatchStrategyTest.java
new file mode 100644
index 00000000000..4fe2611a4de
--- /dev/null
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/batching/ZKMetadataStoreBatchStrategyTest.java
@@ -0,0 +1,637 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl.batching;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.metadata.api.MetadataNodeSizeStats;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.jctools.queues.MpscUnboundedArrayQueue;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Comprehensive test suite for ZKMetadataStoreBatchStrategy class.
+ * This test covers:
+ * - Constructor and configuration handling
+ * - Batch size calculations for different operation types
+ * - Size limit enforcement for requests and responses
+ * - Operation count limits
+ * - Edge cases and boundary conditions
+ */
+public class ZKMetadataStoreBatchStrategyTest {
+
+ private ZKMetadataStoreBatchStrategy strategy;
+ private MetadataNodeSizeStats mockNodeSizeStats;
+ private ZooKeeper mockZooKeeper;
+ private ZKClientConfig mockClientConfig;
+ private MpscUnboundedArrayQueue<MetadataOp> operationQueue;
+
+ private static final int DEFAULT_MAX_OPERATIONS = 100;
+ private static final int DEFAULT_MAX_SIZE = 1024 * 1024; // 1MB
+ private static final String TEST_PATH = "/test/path";
+
+ @BeforeMethod
+ public void setUp() {
+ mockNodeSizeStats = Mockito.mock(MetadataNodeSizeStats.class);
+ mockZooKeeper = Mockito.mock(ZooKeeper.class);
+ mockClientConfig = Mockito.mock(ZKClientConfig.class);
+ operationQueue = new MpscUnboundedArrayQueue<>(1000);
+ // Setup default mock behavior
+
Mockito.when(mockZooKeeper.getClientConfig()).thenReturn(mockClientConfig);
+ Mockito.when(mockClientConfig.getInt(Mockito.anyString(),
Mockito.anyInt()))
+ .thenReturn(DEFAULT_MAX_SIZE);
+ // Setup default node size stats
+
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+ .thenReturn(100);
+
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(Mockito.anyString()))
+ .thenReturn(10);
+ strategy = new ZKMetadataStoreBatchStrategy(mockNodeSizeStats,
DEFAULT_MAX_OPERATIONS,
+ DEFAULT_MAX_SIZE, mockZooKeeper);
+ }
+
+ @Test
+ public void testConstructorWithDefaultMaxSize() {
+ // Test constructor uses default max size when ZK config returns 0 or
negative
+ Mockito.when(mockClientConfig.getInt(Mockito.anyString(),
Mockito.anyInt()))
+ .thenReturn(0);
+ ZKMetadataStoreBatchStrategy strategyWithDefault = new
ZKMetadataStoreBatchStrategy(
+ mockNodeSizeStats, DEFAULT_MAX_OPERATIONS, DEFAULT_MAX_SIZE,
mockZooKeeper);
+ assertEquals(strategyWithDefault.maxSize(), DEFAULT_MAX_SIZE);
+ }
+
+ @Test
+ public void testConstructorWithConfiguredMaxSize() {
+ int configuredSize = 2 * 1024 * 1024; // 2MB
+ Mockito.when(mockClientConfig.getInt(Mockito.anyString(),
Mockito.anyInt()))
+ .thenReturn(configuredSize);
+ ZKMetadataStoreBatchStrategy strategyWithConfig = new
ZKMetadataStoreBatchStrategy(
+ mockNodeSizeStats, DEFAULT_MAX_OPERATIONS, DEFAULT_MAX_SIZE,
mockZooKeeper);
+ assertEquals(strategyWithConfig.maxSize(), configuredSize);
+ }
+
+ @Test
+ public void testMaxSizeMethod() {
+ assertEquals(strategy.maxSize(), DEFAULT_MAX_SIZE);
+ }
+
+ @Test
+ public void testEmptyQueue() {
+ List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+ assertTrue(batch.isEmpty());
+ }
+
+ @Test
+ public void testSingleGetOperation() {
+ MetadataOp getOp = createMockGetOperation(TEST_PATH);
+ operationQueue.offer(getOp);
+ List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+ assertEquals(batch.size(), 1);
+ assertEquals(batch.get(0), getOp);
+ assertTrue(operationQueue.isEmpty());
+ }
+
+ @Test
+ public void testSingleGetChildrenOperation() {
+ MetadataOp getChildrenOp = createMockGetChildrenOperation(TEST_PATH);
+ operationQueue.offer(getChildrenOp);
+ List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+ assertEquals(batch.size(), 1);
+ assertEquals(batch.get(0), getChildrenOp);
+ assertTrue(operationQueue.isEmpty());
+ }
+
+ @Test
+ public void testSinglePutOperation() {
+ MetadataOp putOp = createMockPutOperation(TEST_PATH, 100);
+ operationQueue.offer(putOp);
+ List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+ assertEquals(batch.size(), 1);
+ assertEquals(batch.get(0), putOp);
+ assertTrue(operationQueue.isEmpty());
+ }
+
+ @Test
+ public void testSingleDeleteOperation() {
+ MetadataOp deleteOp = createMockDeleteOperation(TEST_PATH);
+ operationQueue.offer(deleteOp);
+ List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+ assertEquals(batch.size(), 1);
+ assertEquals(batch.get(0), deleteOp);
+ assertTrue(operationQueue.isEmpty());
+ }
+
+ @Test
+ public void testMaxOperationsLimit() {
+ // Add more operations than the max limit
+ for (int i = 0; i < DEFAULT_MAX_OPERATIONS + 10; i++) {
+ operationQueue.offer(createMockGetOperation("/test/path/" + i));
+ }
+ List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+ assertEquals(batch.size(), DEFAULT_MAX_OPERATIONS);
+ assertEquals(operationQueue.size(), 10); // Remaining operations
+ }
+
+ @Test
+ public void testRequestSizeLimit() {
+ // Create large PUT operations that exceed the request size limit
+ int largeSize = DEFAULT_MAX_SIZE / 2 + 1000; // Larger than half max
size
+ operationQueue.offer(createMockPutOperation("/test/path1", largeSize));
+ operationQueue.offer(createMockPutOperation("/test/path2", largeSize));
+ operationQueue.offer(createMockPutOperation("/test/path3", 100)); //
Small operation
+ List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+ // Should only include the first operation due to size limit
+ assertEquals(batch.size(), 1);
+ assertEquals(operationQueue.size(), 2); // Two operations remaining
+ }
+
+ // Helper methods for creating mock operations
+ private MetadataOp createMockGetOperation(String path) {
+ MetadataOp op = Mockito.mock(MetadataOp.class);
+ Mockito.when(op.getType()).thenReturn(MetadataOp.Type.GET);
+ Mockito.when(op.getPath()).thenReturn(path);
+ Mockito.when(op.size()).thenReturn(0);
+ Mockito.when(op.getFuture()).thenReturn(new CompletableFuture<>());
+ return op;
+ }
+
+ private MetadataOp createMockGetChildrenOperation(String path) {
+ MetadataOp op = Mockito.mock(MetadataOp.class);
+ Mockito.when(op.getType()).thenReturn(MetadataOp.Type.GET_CHILDREN);
+ Mockito.when(op.getPath()).thenReturn(path);
+ Mockito.when(op.size()).thenReturn(0);
+ Mockito.when(op.getFuture()).thenReturn(new CompletableFuture<>());
+ return op;
+ }
+
+ private MetadataOp createMockPutOperation(String path, int size) {
+ MetadataOp op = Mockito.mock(MetadataOp.class);
+ Mockito.when(op.getType()).thenReturn(MetadataOp.Type.PUT);
+ Mockito.when(op.getPath()).thenReturn(path);
+ Mockito.when(op.size()).thenReturn(size);
+ Mockito.when(op.getFuture()).thenReturn(new CompletableFuture<>());
+ return op;
+ }
+
+ private MetadataOp createMockDeleteOperation(String path) {
+ MetadataOp op = Mockito.mock(MetadataOp.class);
+ Mockito.when(op.getType()).thenReturn(MetadataOp.Type.DELETE);
+ Mockito.when(op.getPath()).thenReturn(path);
+ Mockito.when(op.size()).thenReturn(path.length());
+ Mockito.when(op.getFuture()).thenReturn(new CompletableFuture<>());
+ return op;
+ }
+
+ @Test
+ public void testResponseSizeLimitForGetOperations() {
+ // Setup large response size for node stats
+
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+ .thenReturn(DEFAULT_MAX_SIZE / 5); // Large response size
+ // Add multiple GET operations
+ for (int i = 0; i < 10; i++) {
+ operationQueue.offer(createMockGetOperation("/test/path/" + i));
+ }
+ List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+ // Should limit based on estimated response size (half of max size)
+ assertEquals(batch.size(), 2);
+ }
+
+ @Test
+ public void testResponseSizeLimitForGetChildrenOperations() {
+ // Setup large response size for children operations
+
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(Mockito.anyString()))
+ .thenReturn(1000); // Many children
+
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+ .thenReturn(100); // Size per child
+ // Add multiple GET_CHILDREN operations
+ for (int i = 0; i < 10; i++) {
+ operationQueue.offer(createMockGetChildrenOperation("/test/path/"
+ i));
+ }
+ List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+ // Should limit based on estimated response size.
+ // DEFAULT_MAX_SIZE is 1024 * 1024, the half value is 1024 * 512.
+ // Per children response is 1000 * 100.
+ // So the result should be 5;
+ assertEquals(batch.size(), 5);
+ }
+
+ @Test
+ public void testMixedOperationTypes() {
+ operationQueue.offer(createMockGetOperation("/test/get"));
+ operationQueue.offer(createMockPutOperation("/test/put", 100));
+ operationQueue.offer(createMockDeleteOperation("/test/delete"));
+ operationQueue.offer(createMockGetChildrenOperation("/test/children"));
+ List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+ assertEquals(batch.size(), 4);
+ assertTrue(operationQueue.isEmpty());
+ }
+
+ @Test
+ public void testNullOperationInQueue() {
+ // Test the case where peek() returns null (queue becomes empty during
processing)
+ MetadataOp mockOp = createMockGetOperation("/test/path1");
+ @SuppressWarnings("unchecked")
+ MpscUnboundedArrayQueue<MetadataOp> mockQueue =
Mockito.mock(MpscUnboundedArrayQueue.class);
+ Mockito.when(mockQueue.isEmpty()).thenReturn(false, false, true);
+ Mockito.when(mockQueue.peek()).thenReturn(mockOp, (MetadataOp) null);
+ Mockito.when(mockQueue.poll()).thenReturn(mockOp);
+ List<MetadataOp> batch = strategy.nextBatch(mockQueue);
+ assertEquals(batch.size(), 1);
+ }
+
+ @Test
+ public void testZKResponseHeaderCalculation() {
+ // Test that ZK_RESPONSE_HEADER_LEN is correctly used in calculations
+ assertEquals(ZKMetadataStoreBatchStrategy.ZK_RESPONSE_HEADER_LEN, 88);
+ // Add a GET operation and verify header is included in size
calculation
+ operationQueue.offer(createMockGetOperation(TEST_PATH));
+ // The actual size calculation is internal, but we can verify the
operation is processed
+ List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+ assertEquals(batch.size(), 1);
+ }
+
+ @Test
+ public void testDefaultSizeFallbackForGetOperations() {
+ // Test that defaultSize is used when node size stats return negative
values
+ // defaultSize = Math.max(maxPutSize >>> 4, 1024) = Math.max(1MB >>>
4, 1024) = Math.max(65536, 1024) = 65536
+
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+ .thenReturn(-1); // Negative value should trigger defaultSize
fallback
+ operationQueue.offer(createMockGetOperation("/test/path1"));
+ operationQueue.offer(createMockGetOperation("/test/path2"));
+ List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+ // Should process operations using defaultSize for size estimation
+ assertEquals(batch.size(), 2);
+ assertTrue(operationQueue.isEmpty());
+ }
+
+ @Test
+ public void testDefaultSizeFallbackForGetChildrenOperations() {
+ // Test defaultSize fallback for GET_CHILDREN when childrenCount < 0
+
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(Mockito.anyString()))
+ .thenReturn(-1); // Negative value should trigger defaultSize
fallback
+ operationQueue.offer(createMockGetChildrenOperation("/test/path1"));
+ operationQueue.offer(createMockGetChildrenOperation("/test/path2"));
+ List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+ // Should process operations using defaultSize for size estimation
+ assertEquals(batch.size(), 2);
+ assertTrue(operationQueue.isEmpty());
+ }
+
+ @Test
+ public void
testDefaultSizeFallbackForGetChildrenWithValidCountButInvalidSize() {
+ // Test defaultSize fallback when childrenCount >= 0 but size <= 0
+
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(Mockito.anyString()))
+ .thenReturn(5); // Valid children count
+
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+ .thenReturn(-1); // Invalid size should trigger defaultSize
fallback
+ operationQueue.offer(createMockGetChildrenOperation("/test/path"));
+ List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+ assertEquals(batch.size(), 1);
+ assertTrue(operationQueue.isEmpty());
+ }
+
+ @Test
+ public void testDefaultOperationType() {
+ // The implementation doesn't handle null types, so let's test with a
valid operation
+ // that would use the default case in the switch statement
+ MetadataOp putOp = createMockPutOperation(TEST_PATH, 50);
+ operationQueue.offer(putOp);
+ List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+ assertEquals(batch.size(), 1);
+ assertEquals(batch.get(0), putOp);
+ }
+
+ @Test
+ public void testLargePathNames() {
+ // Test with very long path names
+ String longPath = "/very/long/path/name/that/exceeds/normal/length/"
+ + "and/continues/for/a/very/long/time/to/test/path/handling/"
+ + "in/the/batch/strategy/implementation";
+ operationQueue.offer(createMockPutOperation(longPath, 100));
+ operationQueue.offer(createMockDeleteOperation(longPath));
+ List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+ assertEquals(batch.size(), 2);
+ assertTrue(operationQueue.isEmpty());
+ }
+
+ @Test
+ public void testZeroSizeOperations() {
+ // Test operations with zero size
+ operationQueue.offer(createMockPutOperation(TEST_PATH, 0));
+ operationQueue.offer(createMockGetOperation(TEST_PATH));
+ List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+ assertEquals(batch.size(), 2);
+ assertTrue(operationQueue.isEmpty());
+ }
+
+ @Test
+ public void testNodeSizeStatsIntegration() {
+ String path1 = "/test/path1";
+ String path2 = "/test/path2";
+
+ // Setup different sizes for different paths
+
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(path1)).thenReturn(1000);
+
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(path2)).thenReturn(2000);
+
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(path1)).thenReturn(50);
+
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(path2)).thenReturn(100);
+
+ operationQueue.offer(createMockGetOperation(path1));
+ operationQueue.offer(createMockGetChildrenOperation(path2));
+
+ List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+
+ assertEquals(batch.size(), 2);
+
+ // Verify that the node size stats were consulted
+ Mockito.verify(mockNodeSizeStats).getMaxSizeOfSameResourceType(path1);
+
Mockito.verify(mockNodeSizeStats).getMaxChildrenCountOfSameResourceType(path2);
+ }
+
+ @Test
+ public void testBatchSizeCalculationAccuracy() {
+ // Test that batch size calculation prevents oversized batches
+
+ // Setup a scenario where the second operation would exceed the limit
+
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+ .thenReturn(DEFAULT_MAX_SIZE / 3); // Each operation uses 1/3
of max size
+
+ operationQueue.offer(createMockGetOperation("/test/path1"));
+ operationQueue.offer(createMockGetOperation("/test/path2"));
+ operationQueue.offer(createMockGetOperation("/test/path3")); // This
should not fit
+
+ List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+
+ // Should include first two operations but not the third
+ assertTrue(batch.size() <= 2);
+ assertTrue(batch.size() >= 1);
+ }
+
+ @Test
+ public void testMixedValidAndInvalidNodeSizeStats() {
+ // Test mixing operations with valid and invalid node size stats
+ String validPath = "/valid/path";
+ String invalidPath = "/invalid/path";
+
+ // Setup different responses for different paths
+
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(validPath)).thenReturn(500);
+
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(invalidPath)).thenReturn(-1);
+
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(validPath)).thenReturn(10);
+
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(invalidPath)).thenReturn(-1);
+
+ operationQueue.offer(createMockGetOperation(validPath));
+ operationQueue.offer(createMockGetOperation(invalidPath)); // Should
use defaultSize
+ operationQueue.offer(createMockGetChildrenOperation(validPath));
+ operationQueue.offer(createMockGetChildrenOperation(invalidPath)); //
Should use defaultSize
+
+ List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+
+ assertEquals(batch.size(), 4);
+ assertTrue(operationQueue.isEmpty());
+ }
+
+ @Test
+ public void testZeroSizeFromNodeStats() {
+ // Test handling of zero size from node stats (should use defaultSize)
+
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+ .thenReturn(0); // Zero size should trigger defaultSize
fallback
+
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(Mockito.anyString()))
+ .thenReturn(5); // Valid children count
+
+ operationQueue.offer(createMockGetOperation("/test/get"));
+ operationQueue.offer(createMockGetChildrenOperation("/test/children"));
+
+ List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+
+ assertEquals(batch.size(), 2);
+ assertTrue(operationQueue.isEmpty());
+ }
+
+ @Test
+ public void testDynamicDefaultSizeCalculation() {
+ // Test that defaultSize is calculated dynamically based on maxPutSize
+ // defaultSize = Math.max(maxPutSize >>> 4, 1024)
+
+ // Test with default configuration (1MB)
+ // defaultSize = Math.max(1048576 >>> 4, 1024) = Math.max(65536, 1024)
= 65536
+ assertEquals(strategy.maxSize(), DEFAULT_MAX_SIZE); // Verify
maxPutSize
+
+ // Test with smaller maxPutSize
+ int smallMaxSize = 8192; // 8KB
+ Mockito.when(mockClientConfig.getInt(Mockito.anyString(),
Mockito.anyInt()))
+ .thenReturn(smallMaxSize);
+
+ ZKMetadataStoreBatchStrategy smallStrategy = new
ZKMetadataStoreBatchStrategy(
+ mockNodeSizeStats, DEFAULT_MAX_OPERATIONS, DEFAULT_MAX_SIZE,
mockZooKeeper);
+
+ assertEquals(smallStrategy.maxSize(), smallMaxSize);
+
+ // For small maxPutSize, defaultSize should be Math.max(8192 >>> 4,
1024) = Math.max(512, 1024) = 1024
+ // We can't directly access defaultSize, but we can test its behavior
+
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+ .thenReturn(-1); // Force defaultSize usage
+
+ MpscUnboundedArrayQueue<MetadataOp> smallQueue = new
MpscUnboundedArrayQueue<>(1000);
+ smallQueue.offer(createMockGetOperation("/test/path"));
+
+ List<MetadataOp> batch = smallStrategy.nextBatch(smallQueue);
+ assertEquals(batch.size(), 1);
+ }
+
+ @Test
+ public void testDynamicDefaultSizeWithLargeMaxPutSize() {
+ // Test with very large maxPutSize
+ int largeMaxSize = 16 * 1024 * 1024; // 16MB
+ Mockito.when(mockClientConfig.getInt(Mockito.anyString(),
Mockito.anyInt()))
+ .thenReturn(largeMaxSize);
+
+ ZKMetadataStoreBatchStrategy largeStrategy = new
ZKMetadataStoreBatchStrategy(
+ mockNodeSizeStats, DEFAULT_MAX_OPERATIONS, DEFAULT_MAX_SIZE,
mockZooKeeper);
+
+ assertEquals(largeStrategy.maxSize(), largeMaxSize);
+
+ // For large maxPutSize, defaultSize should be Math.max(16MB >>> 4,
1024) = Math.max(1MB, 1024) = 1MB
+ // This allows for larger batches when size stats are unavailable
+
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+ .thenReturn(-1); // Force defaultSize usage
+
+ MpscUnboundedArrayQueue<MetadataOp> largeQueue = new
MpscUnboundedArrayQueue<>(1000);
+
+ // Add multiple operations - with larger defaultSize, more should fit
+ for (int i = 0; i < 20; i++) {
+ largeQueue.offer(createMockGetOperation("/test/path/" + i));
+ }
+
+ List<MetadataOp> batch = largeStrategy.nextBatch(largeQueue);
+ assertTrue(batch.size() > 0);
+ assertTrue(batch.size() <= 20);
+ }
+
+ @Test
+ public void testDefaultSizeBehavior() {
+ // Test that the defaultSize field is used correctly for size
calculations
+ // We can't directly access the private field, but we can test its
behavior
+
+ // When node stats return negative values, defaultSize should be used
+
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+ .thenReturn(-1);
+
+ // Add many GET operations to test if defaultSize is being used for
calculations
+ for (int i = 0; i < 100; i++) {
+ operationQueue.offer(createMockGetOperation("/test/path/" + i));
+ }
+
+ List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+
+ // Should be able to process some operations (exact number depends on
defaultSize calculation)
+ assertTrue(batch.size() >= 7);
+ assertTrue(batch.size() <= 8);
+ }
+
+ @Test
+ public void testGetChildrenWithZeroChildrenCount() {
+ // Test GET_CHILDREN with zero children count (valid case)
+
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(Mockito.anyString()))
+ .thenReturn(0); // Zero children is valid
+
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+ .thenReturn(100); // Valid size
+
+
operationQueue.offer(createMockGetChildrenOperation("/empty/directory"));
+
+ List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+
+ assertEquals(batch.size(), 1);
+ assertTrue(operationQueue.isEmpty());
+ }
+
+ @Test
+ public void testEnhancedSizeCalculationLogic() {
+ // Test the enhanced size calculation logic with various scenarios
+
+ // Scenario 1: Valid stats
+ String path1 = "/valid/stats";
+
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(path1)).thenReturn(200);
+
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(path1)).thenReturn(5);
+
+ // Scenario 2: Invalid children count, valid size
+ String path2 = "/invalid/children";
+
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(path2)).thenReturn(300);
+
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(path2)).thenReturn(-1);
+
+ // Scenario 3: Valid children count, invalid size
+ String path3 = "/invalid/size";
+
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(path3)).thenReturn(-1);
+
Mockito.when(mockNodeSizeStats.getMaxChildrenCountOfSameResourceType(path3)).thenReturn(3);
+
+ operationQueue.offer(createMockGetOperation(path1));
+ operationQueue.offer(createMockGetChildrenOperation(path1));
+ operationQueue.offer(createMockGetChildrenOperation(path2)); // Should
use defaultSize
+ operationQueue.offer(createMockGetChildrenOperation(path3)); // Should
use defaultSize for size
+ operationQueue.offer(createMockGetOperation(path2));
+ operationQueue.offer(createMockGetOperation(path3)); // Should use
defaultSize
+
+ List<MetadataOp> batch = strategy.nextBatch(operationQueue);
+
+ // All operations should be processed
+ assertEquals(batch.size(), 6);
+ assertTrue(operationQueue.isEmpty());
+ }
+
+ @Test
+ public void testDefaultSizeCalculationFormula() {
+ // Test the specific formula: defaultSize = Math.max(maxPutSize >>> 4,
1024)
+
+ // Test case 1: maxPutSize = 1MB (default), defaultSize should be
Math.max(65536, 1024) = 65536
+ int maxSize1 = 1024 * 1024; // 1MB
+ Mockito.when(mockClientConfig.getInt(Mockito.anyString(),
Mockito.anyInt()))
+ .thenReturn(maxSize1);
+
+ ZKMetadataStoreBatchStrategy strategy1 = new
ZKMetadataStoreBatchStrategy(
+ mockNodeSizeStats, DEFAULT_MAX_OPERATIONS, DEFAULT_MAX_SIZE,
mockZooKeeper);
+
+ // Test case 2: maxPutSize = 8KB, defaultSize should be Math.max(512,
1024) = 1024
+ int maxSize2 = 8 * 1024; // 8KB
+ Mockito.when(mockClientConfig.getInt(Mockito.anyString(),
Mockito.anyInt()))
+ .thenReturn(maxSize2);
+
+ ZKMetadataStoreBatchStrategy strategy2 = new
ZKMetadataStoreBatchStrategy(
+ mockNodeSizeStats, DEFAULT_MAX_OPERATIONS, DEFAULT_MAX_SIZE,
mockZooKeeper);
+
+ // Test case 3: maxPutSize = 32MB, defaultSize should be Math.max(2MB,
1024) = 2MB
+ int maxSize3 = 32 * 1024 * 1024; // 32MB
+ Mockito.when(mockClientConfig.getInt(Mockito.anyString(),
Mockito.anyInt()))
+ .thenReturn(maxSize3);
+
+ ZKMetadataStoreBatchStrategy strategy3 = new
ZKMetadataStoreBatchStrategy(
+ mockNodeSizeStats, DEFAULT_MAX_OPERATIONS, DEFAULT_MAX_SIZE,
mockZooKeeper);
+
+ // Verify maxSize() returns the configured values
+ assertEquals(strategy1.maxSize(), maxSize1);
+ assertEquals(strategy2.maxSize(), maxSize2);
+ assertEquals(strategy3.maxSize(), maxSize3);
+
+ // Test behavior with invalid node stats to verify defaultSize usage
+
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+ .thenReturn(-1);
+
+ // All strategies should be able to process operations using their
respective defaultSize
+ MpscUnboundedArrayQueue<MetadataOp> queue1 = new
MpscUnboundedArrayQueue<>(10);
+ MpscUnboundedArrayQueue<MetadataOp> queue2 = new
MpscUnboundedArrayQueue<>(10);
+ MpscUnboundedArrayQueue<MetadataOp> queue3 = new
MpscUnboundedArrayQueue<>(10);
+
+ queue1.offer(createMockGetOperation("/test1"));
+ queue2.offer(createMockGetOperation("/test2"));
+ queue3.offer(createMockGetOperation("/test3"));
+
+ List<MetadataOp> batch1 = strategy1.nextBatch(queue1);
+ List<MetadataOp> batch2 = strategy2.nextBatch(queue2);
+ List<MetadataOp> batch3 = strategy3.nextBatch(queue3);
+
+ assertEquals(batch1.size(), 1);
+ assertEquals(batch2.size(), 1);
+ assertEquals(batch3.size(), 1);
+ }
+
+ @Test
+ public void testDefaultSizeMinimumValue() {
+ // Test that defaultSize never goes below 1024 bytes
+ int verySmallMaxSize = 1024; // 1KB
+ Mockito.when(mockClientConfig.getInt(Mockito.anyString(),
Mockito.anyInt()))
+ .thenReturn(verySmallMaxSize);
+
+ ZKMetadataStoreBatchStrategy smallStrategy = new
ZKMetadataStoreBatchStrategy(
+ mockNodeSizeStats, DEFAULT_MAX_OPERATIONS, DEFAULT_MAX_SIZE,
mockZooKeeper);
+
+ // defaultSize should be Math.max(1024 >>> 4, 1024) = Math.max(64,
1024) = 1024
+ assertEquals(smallStrategy.maxSize(), verySmallMaxSize);
+
+ // Test that operations can still be processed even with minimum
defaultSize
+
Mockito.when(mockNodeSizeStats.getMaxSizeOfSameResourceType(Mockito.anyString()))
+ .thenReturn(-1);
+
+ MpscUnboundedArrayQueue<MetadataOp> smallQueue = new
MpscUnboundedArrayQueue<>(10);
+ smallQueue.offer(createMockGetOperation("/test/small"));
+
+ List<MetadataOp> batch = smallStrategy.nextBatch(smallQueue);
+ assertEquals(batch.size(), 1);
+ }
+}
diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
index 2682f038df2..c8f37e9b3fc 100644
--- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
+++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
@@ -53,6 +53,7 @@ import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.DeleteRequest;
@@ -138,6 +139,7 @@ public class MockZooKeeper extends ZooKeeper {
private int referenceCount;
private List<AutoCloseable> closeables;
private int sessionTimeout;
+ private ZKClientConfig zKClientConfig = new ZKClientConfig();
//see details of Objenesis caching - http://objenesis.org/details.html
//see supported jvms -
https://github.com/easymock/objenesis/blob/master/SupportedJVMs.md
@@ -190,6 +192,7 @@ public class MockZooKeeper extends ZooKeeper {
zk.sequentialIdGenerator = new AtomicLong();
zk.closeables = new ArrayList<>();
zk.sessionTimeout = 30_000;
+ zk.zKClientConfig = new ZKClientConfig();
return zk;
}
@@ -236,6 +239,11 @@ public class MockZooKeeper extends ZooKeeper {
return runInExecutorReturningValue(() -> internalCreate(path, data,
createMode));
}
+ @Override
+ public ZKClientConfig getClientConfig() {
+ return zKClientConfig;
+ }
+
private <T> T runInExecutorReturningValue(Callable<T> task)
throws InterruptedException, KeeperException {
if (isStopped()) {
diff --git
a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java
b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java
index 766f70979aa..0da88c03c2e 100644
--- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java
+++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java
@@ -27,6 +27,7 @@ import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.objenesis.Objenesis;
@@ -50,6 +51,8 @@ public class MockZooKeeperSession extends ZooKeeper {
private int sessionTimeout = -1;
+ private ZKClientConfig zkClientConfig = new ZKClientConfig();
+
public static MockZooKeeperSession newInstance(MockZooKeeper
mockZooKeeper) {
return newInstance(mockZooKeeper, true);
}
@@ -61,6 +64,7 @@ public class MockZooKeeperSession extends ZooKeeper {
mockZooKeeperSession.mockZooKeeper = mockZooKeeper;
mockZooKeeperSession.sessionId = sessionIdGenerator.getAndIncrement();
mockZooKeeperSession.closeMockZooKeeperOnClose =
closeMockZooKeeperOnClose;
+ mockZooKeeperSession.zkClientConfig = new ZKClientConfig();
if (closeMockZooKeeperOnClose) {
mockZooKeeper.increaseRefCount();
}
@@ -74,6 +78,11 @@ public class MockZooKeeperSession extends ZooKeeper {
assert false;
}
+ @Override
+ public ZKClientConfig getClientConfig() {
+ return zkClientConfig;
+ }
+
@Override
public int getSessionTimeout() {
if (sessionTimeout > 0) {