This is an automated email from the ASF dual-hosted git repository.

coderzc pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 97613e0f0ac8eb9168c7e114d17cdd957816eabc
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 1 06:09:07 2026 -0700

    [fix][test] Reduce flakiness in testLoadBalancerServiceUnitTableViewSyncer 
(#25638)
---
 .../extensions/ExtensibleLoadManagerImplTest.java  | 337 +++++++--------------
 1 file changed, 111 insertions(+), 226 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 94c60356b4a..97921488002 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -1298,263 +1298,148 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
 
     @Test(priority = 200)
     public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception {
+        // Make pulsar1 the leader so primaryLoadManager is the syncer-running 
broker.
+        makePrimaryAsLeader();
 
         Pair<TopicName, NamespaceBundle> topicAndBundle =
                 
getBundleIsNotOwnByChangeEventTopic("testLoadBalancerServiceUnitTableViewSyncer");
-        TopicName topicName = topicAndBundle.getLeft();
         NamespaceBundle bundle = topicAndBundle.getRight();
-        String topic = topicName.toString();
-
-        String lookupResultBefore1 = 
pulsar1.getAdminClient().lookups().lookupTopic(topic);
-        String lookupResultBefore2 = 
pulsar2.getAdminClient().lookups().lookupTopic(topic);
-        assertEquals(lookupResultBefore1, lookupResultBefore2);
+        String topic = topicAndBundle.getLeft().toString();
 
         LookupOptions options = LookupOptions.builder()
                 .authoritative(false)
                 .requestHttps(false)
                 .readOnly(false)
                 .loadTopicsInBundle(false).build();
-        Optional<URL> webServiceUrlBefore1 =
-                pulsar1.getNamespaceService().getWebServiceUrl(bundle, 
options);
-        assertTrue(webServiceUrlBefore1.isPresent());
 
-        Optional<URL> webServiceUrlBefore2 =
-                pulsar2.getNamespaceService().getWebServiceUrl(bundle, 
options);
-        assertTrue(webServiceUrlBefore2.isPresent());
-        assertEquals(webServiceUrlBefore2.get().toString(), 
webServiceUrlBefore1.get().toString());
-
-
-        String syncerTyp = 
serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName())
-                ? "SystemTopicToMetadataStoreSyncer" : 
"MetadataStoreToSystemTopicSyncer";
+        String ownershipBefore = 
pulsar1.getAdminClient().lookups().lookupTopic(topic);
+        assertEquals(pulsar2.getAdminClient().lookups().lookupTopic(topic), 
ownershipBefore);
+        Optional<URL> webUrlBefore = 
pulsar1.getNamespaceService().getWebServiceUrl(bundle, options);
+        assertTrue(webUrlBefore.isPresent());
+
+        // === Phase 1: enable the syncer and verify it activates on the 
leader only ===
+        // The syncer is started inside ExtensibleLoadManagerImpl.monitor() 
when the
+        // dynamic config is enabled and the broker is the channel owner. 
Calling
+        // monitor() directly avoids forcing a leader transition (which 
serializes
+        // playLeader() behind playFollower() on the single-threaded load 
manager
+        // executor and was the root cause of repeated 30s+ flakes here).
+        String syncerType =
+                
serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName())
+                        ? "SystemTopicToMetadataStoreSyncer" : 
"MetadataStoreToSystemTopicSyncer";
         pulsar.getAdminClient().brokers()
-                
.updateDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer", 
syncerTyp);
-        // Wait for the dynamic config to propagate to both brokers before 
triggering
-        // leader transitions. Without this, the leader callback may see the 
old config
-        // and skip activating the syncer.
+                
.updateDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer", 
syncerType);
         Awaitility.await().untilAsserted(() ->
                 
assertTrue(pulsar1.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()));
-        Awaitility.await().untilAsserted(() ->
-                
assertTrue(pulsar2.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()));
-        makeSecondaryAsLeader();
-        makePrimaryAsLeader();
+        primaryLoadManager.monitor();
         Awaitility.await().atMost(30, TimeUnit.SECONDS)
                 .untilAsserted(() -> 
assertTrue(primaryLoadManager.getServiceUnitStateTableViewSyncer()
                         .isActive()));
-        Awaitility.await().atMost(30, TimeUnit.SECONDS)
-                .untilAsserted(() -> 
assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer()
-                        .isActive()));
-        ServiceConfiguration defaultConf = getDefaultConf();
-        defaultConf.setAllowAutoTopicCreation(true);
-        defaultConf.setForceDeleteNamespaceAllowed(true);
-        
defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getCanonicalName());
-        defaultConf.setLoadBalancerSheddingEnabled(false);
-        
defaultConf.setLoadManagerServiceUnitStateTableViewClassName(ServiceUnitStateTableViewImpl.class.getName());
-        try (var additionalPulsarTestContext = 
createAdditionalPulsarTestContext(defaultConf)) {
-            // start pulsar3 with ServiceUnitStateTableViewImpl
-            @Cleanup
-            var pulsar3 = additionalPulsarTestContext.getPulsarService();
-
-            String lookupResult1 = 
pulsar3.getAdminClient().lookups().lookupTopic(topic);
-            String lookupResult2 = 
pulsar1.getAdminClient().lookups().lookupTopic(topic);
-            String lookupResult3 = 
pulsar2.getAdminClient().lookups().lookupTopic(topic);
-            assertEquals(lookupResult1, lookupResult2);
-            assertEquals(lookupResult1, lookupResult3);
-            assertEquals(lookupResult1, lookupResultBefore1);
-
-            Optional<URL> webServiceUrl1 =
-                    pulsar1.getNamespaceService().getWebServiceUrl(bundle, 
options);
-            assertTrue(webServiceUrl1.isPresent());
-
-            Optional<URL> webServiceUrl2 =
-                    pulsar2.getNamespaceService().getWebServiceUrl(bundle, 
options);
-            assertTrue(webServiceUrl2.isPresent());
-            assertEquals(webServiceUrl2.get().toString(), 
webServiceUrl1.get().toString());
-
-            Optional<URL> webServiceUrl3 =
-                    pulsar3.getNamespaceService().getWebServiceUrl(bundle, 
options);
-            assertTrue(webServiceUrl3.isPresent());
-            assertEquals(webServiceUrl3.get().toString(), 
webServiceUrl1.get().toString());
-
-            assertEquals(webServiceUrl3.get().toString(), 
webServiceUrlBefore1.get().toString());
-
-            List<PulsarService> pulsarServices = List.of(pulsar1, pulsar2, 
pulsar3);
-            for (PulsarService pulsarService : pulsarServices) {
-                // Test lookup heartbeat namespace's topic
-                for (PulsarService pulsar : pulsarServices) {
-                    assertLookupHeartbeatOwner(pulsarService,
-                            pulsar.getBrokerId(), 
pulsar.getBrokerServiceUrl());
-                }
-                // Test lookup SLA namespace's topic
-                for (PulsarService pulsar : pulsarServices) {
-                    assertLookupSLANamespaceOwner(pulsarService,
-                            pulsar.getBrokerId(), 
pulsar.getBrokerServiceUrl());
+        
assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive());
+
+        // === Phase 2: add a 3rd broker using the OTHER table view impl ===
+        // pulsar1/pulsar2 use serviceUnitStateTableViewClassName; pulsar3 
deliberately
+        // uses the other one so the test exercises cross-impl lookups 
regardless of
+        // which parametrization we're running.
+        String otherClassName =
+                
serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName())
+                        ? 
ServiceUnitStateMetadataStoreTableViewImpl.class.getName()
+                        : ServiceUnitStateTableViewImpl.class.getName();
+
+        ServiceConfiguration crossImplConf = getDefaultConf();
+        crossImplConf.setAllowAutoTopicCreation(true);
+        crossImplConf.setForceDeleteNamespaceAllowed(true);
+        
crossImplConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getCanonicalName());
+        
crossImplConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+        
crossImplConf.setLoadManagerServiceUnitStateTableViewClassName(otherClassName);
+
+        try (var crossImplCtx = 
createAdditionalPulsarTestContext(crossImplConf)) {
+            var pulsar3 = crossImplCtx.getPulsarService();
+
+            // All three brokers (across both impls) must agree on topic 
ownership.
+            
assertEquals(pulsar2.getAdminClient().lookups().lookupTopic(topic), 
ownershipBefore);
+            
assertEquals(pulsar3.getAdminClient().lookups().lookupTopic(topic), 
ownershipBefore);
+            Optional<URL> webUrlPulsar3 = 
pulsar3.getNamespaceService().getWebServiceUrl(bundle, options);
+            assertTrue(webUrlPulsar3.isPresent());
+            assertEquals(webUrlPulsar3.get().toString(), 
webUrlBefore.get().toString());
+
+            // SLA monitor and heartbeat lookups must agree across impls in 
every direction.
+            List<PulsarService> brokers = List.of(pulsar1, pulsar2, pulsar3);
+            for (PulsarService viewer : brokers) {
+                for (PulsarService owner : brokers) {
+                    assertLookupHeartbeatOwner(viewer, owner.getBrokerId(), 
owner.getBrokerServiceUrl());
+                    assertLookupSLANamespaceOwner(viewer, owner.getBrokerId(), 
owner.getBrokerServiceUrl());
                 }
             }
 
-            // Start broker4 with ServiceUnitStateMetadataStoreTableViewImpl
-            ServiceConfiguration conf = getDefaultConf();
-            conf.setAllowAutoTopicCreation(true);
-            conf.setForceDeleteNamespaceAllowed(true);
-            
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getCanonicalName());
-            
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
-            conf.setLoadManagerServiceUnitStateTableViewClassName(
-                    
ServiceUnitStateMetadataStoreTableViewImpl.class.getName());
-            try (var additionPulsarTestContext = 
createAdditionalPulsarTestContext(conf)) {
-                @Cleanup
-                var pulsar4 = additionPulsarTestContext.getPulsarService();
-
-                Set<String> availableCandidates = Sets.newHashSet(
-                        pulsar1.getBrokerServiceUrl(),
-                        pulsar2.getBrokerServiceUrl(),
-                        pulsar3.getBrokerServiceUrl(),
-                        pulsar4.getBrokerServiceUrl());
-                String lookupResult4 = 
pulsar4.getAdminClient().lookups().lookupTopic(topic);
-                assertTrue(availableCandidates.contains(lookupResult4));
-
-                String lookupResult5 = 
pulsar1.getAdminClient().lookups().lookupTopic(topic);
-                String lookupResult6 = 
pulsar2.getAdminClient().lookups().lookupTopic(topic);
-                String lookupResult7 = 
pulsar3.getAdminClient().lookups().lookupTopic(topic);
-                assertEquals(lookupResult4, lookupResult5);
-                assertEquals(lookupResult4, lookupResult6);
-                assertEquals(lookupResult4, lookupResult7);
-                assertEquals(lookupResult4, lookupResultBefore1);
-
-
-                Pair<TopicName, NamespaceBundle> topicAndBundle2 =
-                        
getBundleIsNotOwnByChangeEventTopic("testLoadBalancerServiceUnitTableViewSyncer2");
-                String topic2 = topicAndBundle2.getLeft().toString();
-
-                String lookupResult8 = 
pulsar1.getAdminClient().lookups().lookupTopic(topic2);
-                String lookupResult9 = 
pulsar2.getAdminClient().lookups().lookupTopic(topic2);
-                String lookupResult10 = 
pulsar3.getAdminClient().lookups().lookupTopic(topic2);
-                String lookupResult11 = 
pulsar4.getAdminClient().lookups().lookupTopic(topic2);
-                assertEquals(lookupResult9, lookupResult8);
-                assertEquals(lookupResult10, lookupResult8);
-                assertEquals(lookupResult11, lookupResult8);
-
-                Set<String> availableWebUrlCandidates = Sets.newHashSet(
-                        pulsar1.getWebServiceAddress(),
-                        pulsar2.getWebServiceAddress(),
-                        pulsar3.getWebServiceAddress(),
-                        pulsar4.getWebServiceAddress());
-
-                webServiceUrl1 =
-                        pulsar1.getNamespaceService().getWebServiceUrl(bundle, 
options);
-                assertTrue(webServiceUrl1.isPresent());
-                
assertTrue(availableWebUrlCandidates.contains(webServiceUrl1.get().toString()));
-
-                webServiceUrl2 =
-                        pulsar2.getNamespaceService().getWebServiceUrl(bundle, 
options);
-                assertTrue(webServiceUrl2.isPresent());
-                assertEquals(webServiceUrl2.get().toString(), 
webServiceUrl1.get().toString());
-
-                webServiceUrl3 =
-                        pulsar3.getNamespaceService().getWebServiceUrl(bundle, 
options);
-                assertTrue(webServiceUrl3.isPresent());
-                
assertTrue(availableWebUrlCandidates.contains(webServiceUrl3.get().toString()));
-
-                var webServiceUrl4 =
-                        pulsar4.getNamespaceService().getWebServiceUrl(bundle, 
options);
-                assertTrue(webServiceUrl4.isPresent());
-                assertEquals(webServiceUrl4.get().toString(), 
webServiceUrl1.get().toString());
-                assertEquals(webServiceUrl4.get().toString(), 
webServiceUrlBefore1.get().toString());
-
-                pulsarServices = List.of(pulsar1, pulsar2, pulsar3, pulsar4);
-                for (PulsarService pulsarService : pulsarServices) {
-                    // Test lookup heartbeat namespace's topic
-                    for (PulsarService pulsar : pulsarServices) {
-                        assertLookupHeartbeatOwner(pulsarService,
-                                pulsar.getBrokerId(), 
pulsar.getBrokerServiceUrl());
-                    }
-                    // Test lookup SLA namespace's topic
-                    for (PulsarService pulsar : pulsarServices) {
-                        assertLookupSLANamespaceOwner(pulsarService,
-                                pulsar.getBrokerId(), 
pulsar.getBrokerServiceUrl());
-                    }
-                }
-                // Simulate broker going offline: clean ownerships first (as 
disableBroker() does),
-                // then unregister from the broker registry.
-                var wrapper = (ExtensibleLoadManagerWrapper) 
pulsar4.getLoadManager().get();
-                var loadManager4 = spy((ExtensibleLoadManagerImpl)
-                        FieldUtils.readField(wrapper, "loadManager", true));
-                ServiceUnitStateChannel channel4 = (ServiceUnitStateChannel)
-                        FieldUtils.readField(loadManager4, 
"serviceUnitStateChannel", true);
-                channel4.cleanOwnerships();
-                // Simulate broker going offline by removing it from the 
broker registry.
-                // Set state to Closed BEFORE deleting the ZK node to prevent 
the notification
-                // handler's session-expiry recovery from auto-re-registering 
broker4.
-                // In production, the PulsarService shuts down after 
unregister(), so the handler
-                // never fires. In tests, the service stays running, creating 
a race.
-                var registry4 = (BrokerRegistryImpl) 
loadManager4.getBrokerRegistry();
-                registry4.state.set(BrokerRegistryImpl.State.Closed);
-                String brokerZkPath = "/loadbalance/brokers/" + 
pulsar4.getBrokerId();
-                pulsar4.getLocalMetadataStore().delete(brokerZkPath, 
Optional.empty()).get();
-
-                NamespaceName slaMonitorNamespace =
-                        getSLAMonitorNamespace(pulsar4.getBrokerId(), 
pulsar.getConfiguration());
-                String slaMonitorTopic = 
slaMonitorNamespace.getPersistentTopicName("test");
-                // Wait for ownership to be reassigned after broker 
unregistration
-                String pulsar4BrokerUrl = pulsar4.getBrokerServiceUrl();
-                Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
-                    String lookupResult = 
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
-                    assertNotNull(lookupResult);
-                    assertNotEquals(lookupResult, pulsar4BrokerUrl);
-                });
-                String result = 
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
-                log.info("{} Namespace is re-owned by {}", slaMonitorTopic, 
result);
-
-                Producer<String> producer = 
pulsar.getClient().newProducer(Schema.STRING)
-                        .topic(slaMonitorTopic).create();
-                producer.send("t1");
-
-                // Test re-register broker and check the lookup result
-                // Reset state to Started to allow re-registration.
-                registry4.state.set(BrokerRegistryImpl.State.Started);
-                registry4.registerAsync().get();
-
-                // Wait for ownership to be reassigned back to pulsar4 after 
re-registration
-                Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
-                    String reRegResult = 
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
-                    assertNotNull(reRegResult);
-                    assertEquals(reRegResult, pulsar4.getBrokerServiceUrl());
-                });
-                result = 
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
-                log.info("{} Namespace is re-owned by {}", slaMonitorTopic, 
result);
+            // === Phase 3: simulate the cross-impl broker going offline ===
+            // Its SLA namespace must reassign to a remaining broker, and the 
ownership
+            // change must propagate through the syncer to brokers using the 
other impl.
+            var wrapper3 = (ExtensibleLoadManagerWrapper) 
pulsar3.getLoadManager().get();
+            var loadManager3 = (ExtensibleLoadManagerImpl)
+                    FieldUtils.readField(wrapper3, "loadManager", true);
+            ServiceUnitStateChannel channel3 = (ServiceUnitStateChannel)
+                    FieldUtils.readField(loadManager3, 
"serviceUnitStateChannel", true);
+            channel3.cleanOwnerships();
+            // Set state to Closed BEFORE deleting the ZK node to prevent the 
notification
+            // handler's session-expiry recovery from auto-re-registering 
broker3. In
+            // production the PulsarService shuts down after unregister(), so 
the handler
+            // never fires; in tests the service stays running and creates a 
race.
+            var registry3 = (BrokerRegistryImpl) 
loadManager3.getBrokerRegistry();
+            registry3.state.set(BrokerRegistryImpl.State.Closed);
+            pulsar3.getLocalMetadataStore()
+                    .delete("/loadbalance/brokers/" + pulsar3.getBrokerId(), 
Optional.empty()).get();
+
+            String slaMonitorTopic = 
getSLAMonitorNamespace(pulsar3.getBrokerId(), pulsar.getConfiguration())
+                    .getPersistentTopicName("test");
+            String pulsar3BrokerUrl = pulsar3.getBrokerServiceUrl();
+            Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+                String reassigned = 
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
+                assertNotNull(reassigned);
+                assertNotEquals(reassigned, pulsar3BrokerUrl);
+            });
 
-                producer.send("t2");
-                Producer<String> producer1 = 
pulsar.getClient().newProducer(Schema.STRING)
-                        .topic(slaMonitorTopic).create();
-                producer1.send("t3");
+            // Send a message while the topic is owned by the reassigned 
broker; this must
+            // remain durable when ownership migrates back below.
+            @Cleanup
+            Producer<String> producer = 
pulsar.getClient().newProducer(Schema.STRING)
+                    .topic(slaMonitorTopic).create();
+            producer.send("offline");
+
+            // === Phase 4: re-register the broker and verify ownership 
returns ===
+            registry3.state.set(BrokerRegistryImpl.State.Started);
+            registry3.registerAsync().get();
+            Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() ->
+                    
assertEquals(pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic),
+                            pulsar3.getBrokerServiceUrl()));
+
+            // Same producer reconnects to the new owner; a fresh producer 
also works.
+            producer.send("after-reconnect");
+            @Cleanup
+            Producer<String> producer2 = 
pulsar.getClient().newProducer(Schema.STRING)
+                    .topic(slaMonitorTopic).create();
+            producer2.send("from-new-producer");
 
-                producer.close();
-                producer1.close();
-                @Cleanup
-                Consumer<String> consumer = 
pulsar.getClient().newConsumer(Schema.STRING)
-                        .topic(slaMonitorTopic)
-                        
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
-                        .subscriptionName("test")
-                        .subscribe();
-                // receive message t1 t2 t3
-                assertEquals(consumer.receive().getValue(), "t1");
-                assertEquals(consumer.receive().getValue(), "t2");
-                assertEquals(consumer.receive().getValue(), "t3");
-            }
+            @Cleanup
+            Consumer<String> consumer = 
pulsar.getClient().newConsumer(Schema.STRING)
+                    .topic(slaMonitorTopic)
+                    
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                    .subscriptionName("test")
+                    .subscribe();
+            assertEquals(consumer.receive().getValue(), "offline");
+            assertEquals(consumer.receive().getValue(), "after-reconnect");
+            assertEquals(consumer.receive().getValue(), "from-new-producer");
         }
 
+        // === Phase 5: disable the syncer and verify it deactivates ===
         pulsar.getAdminClient().brokers()
                 
.deleteDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer");
-        // Wait for config deletion to propagate before leader transition
         Awaitility.await().untilAsserted(() ->
                 
assertFalse(pulsar1.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()));
-        Awaitility.await().untilAsserted(() ->
-                
assertFalse(pulsar2.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()));
-        makeSecondaryAsLeader();
+        primaryLoadManager.monitor();
         Awaitility.await().atMost(30, TimeUnit.SECONDS)
                 .untilAsserted(() -> 
assertFalse(primaryLoadManager.getServiceUnitStateTableViewSyncer()
                         .isActive()));
-        Awaitility.await().atMost(30, TimeUnit.SECONDS)
-                .untilAsserted(() -> 
assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer()
-                        .isActive()));
+        
assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive());
     }
 
     private void assertLookupHeartbeatOwner(PulsarService pulsar,

Reply via email to