This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9bcad7cc010 [fix][test] Fix flaky
BrokerServiceChaosTest.testFetchPartitionedTopicMetadataWithCacheRefresh
(#24161)
9bcad7cc010 is described below
commit 9bcad7cc010348df252b065c59fae42915aca80b
Author: Zixuan Liu <[email protected]>
AuthorDate: Wed Apr 9 12:10:38 2025 +0800
[fix][test] Fix flaky
BrokerServiceChaosTest.testFetchPartitionedTopicMetadataWithCacheRefresh
(#24161)
Signed-off-by: Zixuan Liu <[email protected]>
Co-authored-by: Lari Hotari <[email protected]>
---
.../broker/service/CanReconnectZKClientPulsarServiceBaseTest.java | 8 ++++++++
1 file changed, 8 insertions(+)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
index dc41764ca21..b2db0e84749 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java
@@ -35,12 +35,14 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.apache.zookeeper.ClientCnxn;
import org.apache.zookeeper.ZooKeeper;
+import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
@Slf4j
@@ -65,6 +67,7 @@ public abstract class
CanReconnectZKClientPulsarServiceBaseTest extends TestRetr
protected PulsarAdmin admin;
protected PulsarClient client;
protected ZooKeeper localZkOfBroker;
+ protected volatile SessionEvent sessionEvent;
protected Object localMetaDataStoreClientCnx;
protected final AtomicBoolean connectionTerminationThreadKeepRunning = new
AtomicBoolean();
private volatile Thread connectionTerminationThread;
@@ -87,6 +90,10 @@ public abstract class
CanReconnectZKClientPulsarServiceBaseTest extends TestRetr
broker = pulsar.getBrokerService();
ZKMetadataStore zkMetadataStore = (ZKMetadataStore)
pulsar.getLocalMetadataStore();
localZkOfBroker = zkMetadataStore.getZkClient();
+ zkMetadataStore.registerSessionListener(n -> {
+ log.info("Received session event: {}", n);
+ sessionEvent = n;
+ });
ClientCnxn cnxn = WhiteboxImpl.getInternalState(localZkOfBroker,
"cnxn");
Object sendThread = WhiteboxImpl.getInternalState(cnxn, "sendThread");
localMetaDataStoreClientCnx =
WhiteboxImpl.getInternalState(sendThread, "clientCnxnSocket");
@@ -157,6 +164,7 @@ public abstract class
CanReconnectZKClientPulsarServiceBaseTest extends TestRetr
connectionTerminationThread.join();
connectionTerminationThread = null;
}
+ Awaitility.await().until(() ->
SessionEvent.Reconnected.equals(sessionEvent));
}
protected void createDefaultTenantsAndClustersAndNamespace() throws
Exception {