This is an automated email from the ASF dual-hosted git repository.

coderzc pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5ab5452484cf262b35ff8561b256bb0fa1aba014
Author: coderzc <[email protected]>
AuthorDate: Wed May 13 12:18:22 2026 +0800

    Revert "[fix][test] Reduce flakiness in 
testLoadBalancerServiceUnitTableViewSyncer (#25638)"
    
    This reverts commit 97613e0f0ac8eb9168c7e114d17cdd957816eabc.
---
 .../extensions/ExtensibleLoadManagerImplTest.java  | 337 ++++++++++++++-------
 1 file changed, 226 insertions(+), 111 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 97921488002..94c60356b4a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -1298,148 +1298,263 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
 
     @Test(priority = 200)
     public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception {
-        // Make pulsar1 the leader so primaryLoadManager is the syncer-running 
broker.
-        makePrimaryAsLeader();
 
         Pair<TopicName, NamespaceBundle> topicAndBundle =
                 
getBundleIsNotOwnByChangeEventTopic("testLoadBalancerServiceUnitTableViewSyncer");
+        TopicName topicName = topicAndBundle.getLeft();
         NamespaceBundle bundle = topicAndBundle.getRight();
-        String topic = topicAndBundle.getLeft().toString();
+        String topic = topicName.toString();
+
+        String lookupResultBefore1 = 
pulsar1.getAdminClient().lookups().lookupTopic(topic);
+        String lookupResultBefore2 = 
pulsar2.getAdminClient().lookups().lookupTopic(topic);
+        assertEquals(lookupResultBefore1, lookupResultBefore2);
 
         LookupOptions options = LookupOptions.builder()
                 .authoritative(false)
                 .requestHttps(false)
                 .readOnly(false)
                 .loadTopicsInBundle(false).build();
+        Optional<URL> webServiceUrlBefore1 =
+                pulsar1.getNamespaceService().getWebServiceUrl(bundle, 
options);
+        assertTrue(webServiceUrlBefore1.isPresent());
 
-        String ownershipBefore = 
pulsar1.getAdminClient().lookups().lookupTopic(topic);
-        assertEquals(pulsar2.getAdminClient().lookups().lookupTopic(topic), 
ownershipBefore);
-        Optional<URL> webUrlBefore = 
pulsar1.getNamespaceService().getWebServiceUrl(bundle, options);
-        assertTrue(webUrlBefore.isPresent());
-
-        // === Phase 1: enable the syncer and verify it activates on the 
leader only ===
-        // The syncer is started inside ExtensibleLoadManagerImpl.monitor() 
when the
-        // dynamic config is enabled and the broker is the channel owner. 
Calling
-        // monitor() directly avoids forcing a leader transition (which 
serializes
-        // playLeader() behind playFollower() on the single-threaded load 
manager
-        // executor and was the root cause of repeated 30s+ flakes here).
-        String syncerType =
-                
serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName())
-                        ? "SystemTopicToMetadataStoreSyncer" : 
"MetadataStoreToSystemTopicSyncer";
+        Optional<URL> webServiceUrlBefore2 =
+                pulsar2.getNamespaceService().getWebServiceUrl(bundle, 
options);
+        assertTrue(webServiceUrlBefore2.isPresent());
+        assertEquals(webServiceUrlBefore2.get().toString(), 
webServiceUrlBefore1.get().toString());
+
+
+        String syncerTyp = 
serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName())
+                ? "SystemTopicToMetadataStoreSyncer" : 
"MetadataStoreToSystemTopicSyncer";
         pulsar.getAdminClient().brokers()
-                
.updateDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer", 
syncerType);
+                
.updateDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer", 
syncerTyp);
+        // Wait for the dynamic config to propagate to both brokers before 
triggering
+        // leader transitions. Without this, the leader callback may see the 
old config
+        // and skip activating the syncer.
         Awaitility.await().untilAsserted(() ->
                 
assertTrue(pulsar1.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()));
-        primaryLoadManager.monitor();
+        Awaitility.await().untilAsserted(() ->
+                
assertTrue(pulsar2.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()));
+        makeSecondaryAsLeader();
+        makePrimaryAsLeader();
         Awaitility.await().atMost(30, TimeUnit.SECONDS)
                 .untilAsserted(() -> 
assertTrue(primaryLoadManager.getServiceUnitStateTableViewSyncer()
                         .isActive()));
-        
assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive());
-
-        // === Phase 2: add a 3rd broker using the OTHER table view impl ===
-        // pulsar1/pulsar2 use serviceUnitStateTableViewClassName; pulsar3 
deliberately
-        // uses the other one so the test exercises cross-impl lookups 
regardless of
-        // which parametrization we're running.
-        String otherClassName =
-                
serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName())
-                        ? 
ServiceUnitStateMetadataStoreTableViewImpl.class.getName()
-                        : ServiceUnitStateTableViewImpl.class.getName();
-
-        ServiceConfiguration crossImplConf = getDefaultConf();
-        crossImplConf.setAllowAutoTopicCreation(true);
-        crossImplConf.setForceDeleteNamespaceAllowed(true);
-        
crossImplConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getCanonicalName());
-        
crossImplConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
-        
crossImplConf.setLoadManagerServiceUnitStateTableViewClassName(otherClassName);
-
-        try (var crossImplCtx = 
createAdditionalPulsarTestContext(crossImplConf)) {
-            var pulsar3 = crossImplCtx.getPulsarService();
-
-            // All three brokers (across both impls) must agree on topic 
ownership.
-            
assertEquals(pulsar2.getAdminClient().lookups().lookupTopic(topic), 
ownershipBefore);
-            
assertEquals(pulsar3.getAdminClient().lookups().lookupTopic(topic), 
ownershipBefore);
-            Optional<URL> webUrlPulsar3 = 
pulsar3.getNamespaceService().getWebServiceUrl(bundle, options);
-            assertTrue(webUrlPulsar3.isPresent());
-            assertEquals(webUrlPulsar3.get().toString(), 
webUrlBefore.get().toString());
-
-            // SLA monitor and heartbeat lookups must agree across impls in 
every direction.
-            List<PulsarService> brokers = List.of(pulsar1, pulsar2, pulsar3);
-            for (PulsarService viewer : brokers) {
-                for (PulsarService owner : brokers) {
-                    assertLookupHeartbeatOwner(viewer, owner.getBrokerId(), 
owner.getBrokerServiceUrl());
-                    assertLookupSLANamespaceOwner(viewer, owner.getBrokerId(), 
owner.getBrokerServiceUrl());
+        Awaitility.await().atMost(30, TimeUnit.SECONDS)
+                .untilAsserted(() -> 
assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer()
+                        .isActive()));
+        ServiceConfiguration defaultConf = getDefaultConf();
+        defaultConf.setAllowAutoTopicCreation(true);
+        defaultConf.setForceDeleteNamespaceAllowed(true);
+        
defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getCanonicalName());
+        defaultConf.setLoadBalancerSheddingEnabled(false);
+        
defaultConf.setLoadManagerServiceUnitStateTableViewClassName(ServiceUnitStateTableViewImpl.class.getName());
+        try (var additionalPulsarTestContext = 
createAdditionalPulsarTestContext(defaultConf)) {
+            // start pulsar3 with ServiceUnitStateTableViewImpl
+            @Cleanup
+            var pulsar3 = additionalPulsarTestContext.getPulsarService();
+
+            String lookupResult1 = 
pulsar3.getAdminClient().lookups().lookupTopic(topic);
+            String lookupResult2 = 
pulsar1.getAdminClient().lookups().lookupTopic(topic);
+            String lookupResult3 = 
pulsar2.getAdminClient().lookups().lookupTopic(topic);
+            assertEquals(lookupResult1, lookupResult2);
+            assertEquals(lookupResult1, lookupResult3);
+            assertEquals(lookupResult1, lookupResultBefore1);
+
+            Optional<URL> webServiceUrl1 =
+                    pulsar1.getNamespaceService().getWebServiceUrl(bundle, 
options);
+            assertTrue(webServiceUrl1.isPresent());
+
+            Optional<URL> webServiceUrl2 =
+                    pulsar2.getNamespaceService().getWebServiceUrl(bundle, 
options);
+            assertTrue(webServiceUrl2.isPresent());
+            assertEquals(webServiceUrl2.get().toString(), 
webServiceUrl1.get().toString());
+
+            Optional<URL> webServiceUrl3 =
+                    pulsar3.getNamespaceService().getWebServiceUrl(bundle, 
options);
+            assertTrue(webServiceUrl3.isPresent());
+            assertEquals(webServiceUrl3.get().toString(), 
webServiceUrl1.get().toString());
+
+            assertEquals(webServiceUrl3.get().toString(), 
webServiceUrlBefore1.get().toString());
+
+            List<PulsarService> pulsarServices = List.of(pulsar1, pulsar2, 
pulsar3);
+            for (PulsarService pulsarService : pulsarServices) {
+                // Test lookup heartbeat namespace's topic
+                for (PulsarService pulsar : pulsarServices) {
+                    assertLookupHeartbeatOwner(pulsarService,
+                            pulsar.getBrokerId(), 
pulsar.getBrokerServiceUrl());
+                }
+                // Test lookup SLA namespace's topic
+                for (PulsarService pulsar : pulsarServices) {
+                    assertLookupSLANamespaceOwner(pulsarService,
+                            pulsar.getBrokerId(), 
pulsar.getBrokerServiceUrl());
                 }
             }
 
-            // === Phase 3: simulate the cross-impl broker going offline ===
-            // Its SLA namespace must reassign to a remaining broker, and the 
ownership
-            // change must propagate through the syncer to brokers using the 
other impl.
-            var wrapper3 = (ExtensibleLoadManagerWrapper) 
pulsar3.getLoadManager().get();
-            var loadManager3 = (ExtensibleLoadManagerImpl)
-                    FieldUtils.readField(wrapper3, "loadManager", true);
-            ServiceUnitStateChannel channel3 = (ServiceUnitStateChannel)
-                    FieldUtils.readField(loadManager3, 
"serviceUnitStateChannel", true);
-            channel3.cleanOwnerships();
-            // Set state to Closed BEFORE deleting the ZK node to prevent the 
notification
-            // handler's session-expiry recovery from auto-re-registering 
broker3. In
-            // production the PulsarService shuts down after unregister(), so 
the handler
-            // never fires; in tests the service stays running and creates a 
race.
-            var registry3 = (BrokerRegistryImpl) 
loadManager3.getBrokerRegistry();
-            registry3.state.set(BrokerRegistryImpl.State.Closed);
-            pulsar3.getLocalMetadataStore()
-                    .delete("/loadbalance/brokers/" + pulsar3.getBrokerId(), 
Optional.empty()).get();
-
-            String slaMonitorTopic = 
getSLAMonitorNamespace(pulsar3.getBrokerId(), pulsar.getConfiguration())
-                    .getPersistentTopicName("test");
-            String pulsar3BrokerUrl = pulsar3.getBrokerServiceUrl();
-            Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
-                String reassigned = 
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
-                assertNotNull(reassigned);
-                assertNotEquals(reassigned, pulsar3BrokerUrl);
-            });
+            // Start broker4 with ServiceUnitStateMetadataStoreTableViewImpl
+            ServiceConfiguration conf = getDefaultConf();
+            conf.setAllowAutoTopicCreation(true);
+            conf.setForceDeleteNamespaceAllowed(true);
+            
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getCanonicalName());
+            
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+            conf.setLoadManagerServiceUnitStateTableViewClassName(
+                    
ServiceUnitStateMetadataStoreTableViewImpl.class.getName());
+            try (var additionPulsarTestContext = 
createAdditionalPulsarTestContext(conf)) {
+                @Cleanup
+                var pulsar4 = additionPulsarTestContext.getPulsarService();
 
-            // Send a message while the topic is owned by the reassigned 
broker; this must
-            // remain durable when ownership migrates back below.
-            @Cleanup
-            Producer<String> producer = 
pulsar.getClient().newProducer(Schema.STRING)
-                    .topic(slaMonitorTopic).create();
-            producer.send("offline");
-
-            // === Phase 4: re-register the broker and verify ownership 
returns ===
-            registry3.state.set(BrokerRegistryImpl.State.Started);
-            registry3.registerAsync().get();
-            Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() ->
-                    
assertEquals(pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic),
-                            pulsar3.getBrokerServiceUrl()));
-
-            // Same producer reconnects to the new owner; a fresh producer 
also works.
-            producer.send("after-reconnect");
-            @Cleanup
-            Producer<String> producer2 = 
pulsar.getClient().newProducer(Schema.STRING)
-                    .topic(slaMonitorTopic).create();
-            producer2.send("from-new-producer");
+                Set<String> availableCandidates = Sets.newHashSet(
+                        pulsar1.getBrokerServiceUrl(),
+                        pulsar2.getBrokerServiceUrl(),
+                        pulsar3.getBrokerServiceUrl(),
+                        pulsar4.getBrokerServiceUrl());
+                String lookupResult4 = 
pulsar4.getAdminClient().lookups().lookupTopic(topic);
+                assertTrue(availableCandidates.contains(lookupResult4));
 
-            @Cleanup
-            Consumer<String> consumer = 
pulsar.getClient().newConsumer(Schema.STRING)
-                    .topic(slaMonitorTopic)
-                    
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
-                    .subscriptionName("test")
-                    .subscribe();
-            assertEquals(consumer.receive().getValue(), "offline");
-            assertEquals(consumer.receive().getValue(), "after-reconnect");
-            assertEquals(consumer.receive().getValue(), "from-new-producer");
+                String lookupResult5 = 
pulsar1.getAdminClient().lookups().lookupTopic(topic);
+                String lookupResult6 = 
pulsar2.getAdminClient().lookups().lookupTopic(topic);
+                String lookupResult7 = 
pulsar3.getAdminClient().lookups().lookupTopic(topic);
+                assertEquals(lookupResult4, lookupResult5);
+                assertEquals(lookupResult4, lookupResult6);
+                assertEquals(lookupResult4, lookupResult7);
+                assertEquals(lookupResult4, lookupResultBefore1);
+
+
+                Pair<TopicName, NamespaceBundle> topicAndBundle2 =
+                        
getBundleIsNotOwnByChangeEventTopic("testLoadBalancerServiceUnitTableViewSyncer2");
+                String topic2 = topicAndBundle2.getLeft().toString();
+
+                String lookupResult8 = 
pulsar1.getAdminClient().lookups().lookupTopic(topic2);
+                String lookupResult9 = 
pulsar2.getAdminClient().lookups().lookupTopic(topic2);
+                String lookupResult10 = 
pulsar3.getAdminClient().lookups().lookupTopic(topic2);
+                String lookupResult11 = 
pulsar4.getAdminClient().lookups().lookupTopic(topic2);
+                assertEquals(lookupResult9, lookupResult8);
+                assertEquals(lookupResult10, lookupResult8);
+                assertEquals(lookupResult11, lookupResult8);
+
+                Set<String> availableWebUrlCandidates = Sets.newHashSet(
+                        pulsar1.getWebServiceAddress(),
+                        pulsar2.getWebServiceAddress(),
+                        pulsar3.getWebServiceAddress(),
+                        pulsar4.getWebServiceAddress());
+
+                webServiceUrl1 =
+                        pulsar1.getNamespaceService().getWebServiceUrl(bundle, 
options);
+                assertTrue(webServiceUrl1.isPresent());
+                
assertTrue(availableWebUrlCandidates.contains(webServiceUrl1.get().toString()));
+
+                webServiceUrl2 =
+                        pulsar2.getNamespaceService().getWebServiceUrl(bundle, 
options);
+                assertTrue(webServiceUrl2.isPresent());
+                assertEquals(webServiceUrl2.get().toString(), 
webServiceUrl1.get().toString());
+
+                webServiceUrl3 =
+                        pulsar3.getNamespaceService().getWebServiceUrl(bundle, 
options);
+                assertTrue(webServiceUrl3.isPresent());
+                
assertTrue(availableWebUrlCandidates.contains(webServiceUrl3.get().toString()));
+
+                var webServiceUrl4 =
+                        pulsar4.getNamespaceService().getWebServiceUrl(bundle, 
options);
+                assertTrue(webServiceUrl4.isPresent());
+                assertEquals(webServiceUrl4.get().toString(), 
webServiceUrl1.get().toString());
+                assertEquals(webServiceUrl4.get().toString(), 
webServiceUrlBefore1.get().toString());
+
+                pulsarServices = List.of(pulsar1, pulsar2, pulsar3, pulsar4);
+                for (PulsarService pulsarService : pulsarServices) {
+                    // Test lookup heartbeat namespace's topic
+                    for (PulsarService pulsar : pulsarServices) {
+                        assertLookupHeartbeatOwner(pulsarService,
+                                pulsar.getBrokerId(), 
pulsar.getBrokerServiceUrl());
+                    }
+                    // Test lookup SLA namespace's topic
+                    for (PulsarService pulsar : pulsarServices) {
+                        assertLookupSLANamespaceOwner(pulsarService,
+                                pulsar.getBrokerId(), 
pulsar.getBrokerServiceUrl());
+                    }
+                }
+                // Simulate broker going offline: clean ownerships first (as 
disableBroker() does),
+                // then unregister from the broker registry.
+                var wrapper = (ExtensibleLoadManagerWrapper) 
pulsar4.getLoadManager().get();
+                var loadManager4 = spy((ExtensibleLoadManagerImpl)
+                        FieldUtils.readField(wrapper, "loadManager", true));
+                ServiceUnitStateChannel channel4 = (ServiceUnitStateChannel)
+                        FieldUtils.readField(loadManager4, 
"serviceUnitStateChannel", true);
+                channel4.cleanOwnerships();
+                // Simulate broker going offline by removing it from the 
broker registry.
+                // Set state to Closed BEFORE deleting the ZK node to prevent 
the notification
+                // handler's session-expiry recovery from auto-re-registering 
broker4.
+                // In production, the PulsarService shuts down after 
unregister(), so the handler
+                // never fires. In tests, the service stays running, creating 
a race.
+                var registry4 = (BrokerRegistryImpl) 
loadManager4.getBrokerRegistry();
+                registry4.state.set(BrokerRegistryImpl.State.Closed);
+                String brokerZkPath = "/loadbalance/brokers/" + 
pulsar4.getBrokerId();
+                pulsar4.getLocalMetadataStore().delete(brokerZkPath, 
Optional.empty()).get();
+
+                NamespaceName slaMonitorNamespace =
+                        getSLAMonitorNamespace(pulsar4.getBrokerId(), 
pulsar.getConfiguration());
+                String slaMonitorTopic = 
slaMonitorNamespace.getPersistentTopicName("test");
+                // Wait for ownership to be reassigned after broker 
unregistration
+                String pulsar4BrokerUrl = pulsar4.getBrokerServiceUrl();
+                Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+                    String lookupResult = 
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
+                    assertNotNull(lookupResult);
+                    assertNotEquals(lookupResult, pulsar4BrokerUrl);
+                });
+                String result = 
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
+                log.info("{} Namespace is re-owned by {}", slaMonitorTopic, 
result);
+
+                Producer<String> producer = 
pulsar.getClient().newProducer(Schema.STRING)
+                        .topic(slaMonitorTopic).create();
+                producer.send("t1");
+
+                // Test re-register broker and check the lookup result
+                // Reset state to Started to allow re-registration.
+                registry4.state.set(BrokerRegistryImpl.State.Started);
+                registry4.registerAsync().get();
+
+                // Wait for ownership to be reassigned back to pulsar4 after 
re-registration
+                Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+                    String reRegResult = 
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
+                    assertNotNull(reRegResult);
+                    assertEquals(reRegResult, pulsar4.getBrokerServiceUrl());
+                });
+                result = 
pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
+                log.info("{} Namespace is re-owned by {}", slaMonitorTopic, 
result);
+
+                producer.send("t2");
+                Producer<String> producer1 = 
pulsar.getClient().newProducer(Schema.STRING)
+                        .topic(slaMonitorTopic).create();
+                producer1.send("t3");
+
+                producer.close();
+                producer1.close();
+                @Cleanup
+                Consumer<String> consumer = 
pulsar.getClient().newConsumer(Schema.STRING)
+                        .topic(slaMonitorTopic)
+                        
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                        .subscriptionName("test")
+                        .subscribe();
+                // receive message t1 t2 t3
+                assertEquals(consumer.receive().getValue(), "t1");
+                assertEquals(consumer.receive().getValue(), "t2");
+                assertEquals(consumer.receive().getValue(), "t3");
+            }
         }
 
-        // === Phase 5: disable the syncer and verify it deactivates ===
         pulsar.getAdminClient().brokers()
                 
.deleteDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer");
+        // Wait for config deletion to propagate before leader transition
         Awaitility.await().untilAsserted(() ->
                 
assertFalse(pulsar1.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()));
-        primaryLoadManager.monitor();
+        Awaitility.await().untilAsserted(() ->
+                
assertFalse(pulsar2.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()));
+        makeSecondaryAsLeader();
         Awaitility.await().atMost(30, TimeUnit.SECONDS)
                 .untilAsserted(() -> 
assertFalse(primaryLoadManager.getServiceUnitStateTableViewSyncer()
                         .isActive()));
-        
assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive());
+        Awaitility.await().atMost(30, TimeUnit.SECONDS)
+                .untilAsserted(() -> 
assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer()
+                        .isActive()));
     }
 
     private void assertLookupHeartbeatOwner(PulsarService pulsar,

Reply via email to