This is an automated email from the ASF dual-hosted git repository. coderzc pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 97613e0f0ac8eb9168c7e114d17cdd957816eabc Author: Matteo Merli <[email protected]> AuthorDate: Fri May 1 06:09:07 2026 -0700 [fix][test] Reduce flakiness in testLoadBalancerServiceUnitTableViewSyncer (#25638) --- .../extensions/ExtensibleLoadManagerImplTest.java | 337 +++++++-------------- 1 file changed, 111 insertions(+), 226 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 94c60356b4a..97921488002 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -1298,263 +1298,148 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase @Test(priority = 200) public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception { + // Make pulsar1 the leader so primaryLoadManager is the syncer-running broker. + makePrimaryAsLeader(); Pair<TopicName, NamespaceBundle> topicAndBundle = getBundleIsNotOwnByChangeEventTopic("testLoadBalancerServiceUnitTableViewSyncer"); - TopicName topicName = topicAndBundle.getLeft(); NamespaceBundle bundle = topicAndBundle.getRight(); - String topic = topicName.toString(); - - String lookupResultBefore1 = pulsar1.getAdminClient().lookups().lookupTopic(topic); - String lookupResultBefore2 = pulsar2.getAdminClient().lookups().lookupTopic(topic); - assertEquals(lookupResultBefore1, lookupResultBefore2); + String topic = topicAndBundle.getLeft().toString(); LookupOptions options = LookupOptions.builder() .authoritative(false) .requestHttps(false) .readOnly(false) .loadTopicsInBundle(false).build(); - Optional<URL> webServiceUrlBefore1 = - pulsar1.getNamespaceService().getWebServiceUrl(bundle, options); - assertTrue(webServiceUrlBefore1.isPresent()); - Optional<URL> webServiceUrlBefore2 = - pulsar2.getNamespaceService().getWebServiceUrl(bundle, options); - assertTrue(webServiceUrlBefore2.isPresent()); - assertEquals(webServiceUrlBefore2.get().toString(), webServiceUrlBefore1.get().toString()); - - - String syncerTyp = serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName()) - ? "SystemTopicToMetadataStoreSyncer" : "MetadataStoreToSystemTopicSyncer"; + String ownershipBefore = pulsar1.getAdminClient().lookups().lookupTopic(topic); + assertEquals(pulsar2.getAdminClient().lookups().lookupTopic(topic), ownershipBefore); + Optional<URL> webUrlBefore = pulsar1.getNamespaceService().getWebServiceUrl(bundle, options); + assertTrue(webUrlBefore.isPresent()); + + // === Phase 1: enable the syncer and verify it activates on the leader only === + // The syncer is started inside ExtensibleLoadManagerImpl.monitor() when the + // dynamic config is enabled and the broker is the channel owner. Calling + // monitor() directly avoids forcing a leader transition (which serializes + // playLeader() behind playFollower() on the single-threaded load manager + // executor and was the root cause of repeated 30s+ flakes here). + String syncerType = + serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName()) + ? "SystemTopicToMetadataStoreSyncer" : "MetadataStoreToSystemTopicSyncer"; pulsar.getAdminClient().brokers() - .updateDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer", syncerTyp); - // Wait for the dynamic config to propagate to both brokers before triggering - // leader transitions. Without this, the leader callback may see the old config - // and skip activating the syncer. + .updateDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer", syncerType); Awaitility.await().untilAsserted(() -> assertTrue(pulsar1.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled())); - Awaitility.await().untilAsserted(() -> - assertTrue(pulsar2.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled())); - makeSecondaryAsLeader(); - makePrimaryAsLeader(); + primaryLoadManager.monitor(); Awaitility.await().atMost(30, TimeUnit.SECONDS) .untilAsserted(() -> assertTrue(primaryLoadManager.getServiceUnitStateTableViewSyncer() .isActive())); - Awaitility.await().atMost(30, TimeUnit.SECONDS) - .untilAsserted(() -> assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer() - .isActive())); - ServiceConfiguration defaultConf = getDefaultConf(); - defaultConf.setAllowAutoTopicCreation(true); - defaultConf.setForceDeleteNamespaceAllowed(true); - defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getCanonicalName()); - defaultConf.setLoadBalancerSheddingEnabled(false); - defaultConf.setLoadManagerServiceUnitStateTableViewClassName(ServiceUnitStateTableViewImpl.class.getName()); - try (var additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf)) { - // start pulsar3 with ServiceUnitStateTableViewImpl - @Cleanup - var pulsar3 = additionalPulsarTestContext.getPulsarService(); - - String lookupResult1 = pulsar3.getAdminClient().lookups().lookupTopic(topic); - String lookupResult2 = pulsar1.getAdminClient().lookups().lookupTopic(topic); - String lookupResult3 = pulsar2.getAdminClient().lookups().lookupTopic(topic); - assertEquals(lookupResult1, lookupResult2); - assertEquals(lookupResult1, lookupResult3); - assertEquals(lookupResult1, lookupResultBefore1); - - Optional<URL> webServiceUrl1 = - pulsar1.getNamespaceService().getWebServiceUrl(bundle, options); - assertTrue(webServiceUrl1.isPresent()); - - Optional<URL> webServiceUrl2 = - pulsar2.getNamespaceService().getWebServiceUrl(bundle, options); - assertTrue(webServiceUrl2.isPresent()); - assertEquals(webServiceUrl2.get().toString(), webServiceUrl1.get().toString()); - - Optional<URL> webServiceUrl3 = - pulsar3.getNamespaceService().getWebServiceUrl(bundle, options); - assertTrue(webServiceUrl3.isPresent()); - assertEquals(webServiceUrl3.get().toString(), webServiceUrl1.get().toString()); - - assertEquals(webServiceUrl3.get().toString(), webServiceUrlBefore1.get().toString()); - - List<PulsarService> pulsarServices = List.of(pulsar1, pulsar2, pulsar3); - for (PulsarService pulsarService : pulsarServices) { - // Test lookup heartbeat namespace's topic - for (PulsarService pulsar : pulsarServices) { - assertLookupHeartbeatOwner(pulsarService, - pulsar.getBrokerId(), pulsar.getBrokerServiceUrl()); - } - // Test lookup SLA namespace's topic - for (PulsarService pulsar : pulsarServices) { - assertLookupSLANamespaceOwner(pulsarService, - pulsar.getBrokerId(), pulsar.getBrokerServiceUrl()); + assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive()); + + // === Phase 2: add a 3rd broker using the OTHER table view impl === + // pulsar1/pulsar2 use serviceUnitStateTableViewClassName; pulsar3 deliberately + // uses the other one so the test exercises cross-impl lookups regardless of + // which parametrization we're running. + String otherClassName = + serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName()) + ? ServiceUnitStateMetadataStoreTableViewImpl.class.getName() + : ServiceUnitStateTableViewImpl.class.getName(); + + ServiceConfiguration crossImplConf = getDefaultConf(); + crossImplConf.setAllowAutoTopicCreation(true); + crossImplConf.setForceDeleteNamespaceAllowed(true); + crossImplConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getCanonicalName()); + crossImplConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName()); + crossImplConf.setLoadManagerServiceUnitStateTableViewClassName(otherClassName); + + try (var crossImplCtx = createAdditionalPulsarTestContext(crossImplConf)) { + var pulsar3 = crossImplCtx.getPulsarService(); + + // All three brokers (across both impls) must agree on topic ownership. + assertEquals(pulsar2.getAdminClient().lookups().lookupTopic(topic), ownershipBefore); + assertEquals(pulsar3.getAdminClient().lookups().lookupTopic(topic), ownershipBefore); + Optional<URL> webUrlPulsar3 = pulsar3.getNamespaceService().getWebServiceUrl(bundle, options); + assertTrue(webUrlPulsar3.isPresent()); + assertEquals(webUrlPulsar3.get().toString(), webUrlBefore.get().toString()); + + // SLA monitor and heartbeat lookups must agree across impls in every direction. + List<PulsarService> brokers = List.of(pulsar1, pulsar2, pulsar3); + for (PulsarService viewer : brokers) { + for (PulsarService owner : brokers) { + assertLookupHeartbeatOwner(viewer, owner.getBrokerId(), owner.getBrokerServiceUrl()); + assertLookupSLANamespaceOwner(viewer, owner.getBrokerId(), owner.getBrokerServiceUrl()); } } - // Start broker4 with ServiceUnitStateMetadataStoreTableViewImpl - ServiceConfiguration conf = getDefaultConf(); - conf.setAllowAutoTopicCreation(true); - conf.setForceDeleteNamespaceAllowed(true); - conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getCanonicalName()); - conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName()); - conf.setLoadManagerServiceUnitStateTableViewClassName( - ServiceUnitStateMetadataStoreTableViewImpl.class.getName()); - try (var additionPulsarTestContext = createAdditionalPulsarTestContext(conf)) { - @Cleanup - var pulsar4 = additionPulsarTestContext.getPulsarService(); - - Set<String> availableCandidates = Sets.newHashSet( - pulsar1.getBrokerServiceUrl(), - pulsar2.getBrokerServiceUrl(), - pulsar3.getBrokerServiceUrl(), - pulsar4.getBrokerServiceUrl()); - String lookupResult4 = pulsar4.getAdminClient().lookups().lookupTopic(topic); - assertTrue(availableCandidates.contains(lookupResult4)); - - String lookupResult5 = pulsar1.getAdminClient().lookups().lookupTopic(topic); - String lookupResult6 = pulsar2.getAdminClient().lookups().lookupTopic(topic); - String lookupResult7 = pulsar3.getAdminClient().lookups().lookupTopic(topic); - assertEquals(lookupResult4, lookupResult5); - assertEquals(lookupResult4, lookupResult6); - assertEquals(lookupResult4, lookupResult7); - assertEquals(lookupResult4, lookupResultBefore1); - - - Pair<TopicName, NamespaceBundle> topicAndBundle2 = - getBundleIsNotOwnByChangeEventTopic("testLoadBalancerServiceUnitTableViewSyncer2"); - String topic2 = topicAndBundle2.getLeft().toString(); - - String lookupResult8 = pulsar1.getAdminClient().lookups().lookupTopic(topic2); - String lookupResult9 = pulsar2.getAdminClient().lookups().lookupTopic(topic2); - String lookupResult10 = pulsar3.getAdminClient().lookups().lookupTopic(topic2); - String lookupResult11 = pulsar4.getAdminClient().lookups().lookupTopic(topic2); - assertEquals(lookupResult9, lookupResult8); - assertEquals(lookupResult10, lookupResult8); - assertEquals(lookupResult11, lookupResult8); - - Set<String> availableWebUrlCandidates = Sets.newHashSet( - pulsar1.getWebServiceAddress(), - pulsar2.getWebServiceAddress(), - pulsar3.getWebServiceAddress(), - pulsar4.getWebServiceAddress()); - - webServiceUrl1 = - pulsar1.getNamespaceService().getWebServiceUrl(bundle, options); - assertTrue(webServiceUrl1.isPresent()); - assertTrue(availableWebUrlCandidates.contains(webServiceUrl1.get().toString())); - - webServiceUrl2 = - pulsar2.getNamespaceService().getWebServiceUrl(bundle, options); - assertTrue(webServiceUrl2.isPresent()); - assertEquals(webServiceUrl2.get().toString(), webServiceUrl1.get().toString()); - - webServiceUrl3 = - pulsar3.getNamespaceService().getWebServiceUrl(bundle, options); - assertTrue(webServiceUrl3.isPresent()); - assertTrue(availableWebUrlCandidates.contains(webServiceUrl3.get().toString())); - - var webServiceUrl4 = - pulsar4.getNamespaceService().getWebServiceUrl(bundle, options); - assertTrue(webServiceUrl4.isPresent()); - assertEquals(webServiceUrl4.get().toString(), webServiceUrl1.get().toString()); - assertEquals(webServiceUrl4.get().toString(), webServiceUrlBefore1.get().toString()); - - pulsarServices = List.of(pulsar1, pulsar2, pulsar3, pulsar4); - for (PulsarService pulsarService : pulsarServices) { - // Test lookup heartbeat namespace's topic - for (PulsarService pulsar : pulsarServices) { - assertLookupHeartbeatOwner(pulsarService, - pulsar.getBrokerId(), pulsar.getBrokerServiceUrl()); - } - // Test lookup SLA namespace's topic - for (PulsarService pulsar : pulsarServices) { - assertLookupSLANamespaceOwner(pulsarService, - pulsar.getBrokerId(), pulsar.getBrokerServiceUrl()); - } - } - // Simulate broker going offline: clean ownerships first (as disableBroker() does), - // then unregister from the broker registry. - var wrapper = (ExtensibleLoadManagerWrapper) pulsar4.getLoadManager().get(); - var loadManager4 = spy((ExtensibleLoadManagerImpl) - FieldUtils.readField(wrapper, "loadManager", true)); - ServiceUnitStateChannel channel4 = (ServiceUnitStateChannel) - FieldUtils.readField(loadManager4, "serviceUnitStateChannel", true); - channel4.cleanOwnerships(); - // Simulate broker going offline by removing it from the broker registry. - // Set state to Closed BEFORE deleting the ZK node to prevent the notification - // handler's session-expiry recovery from auto-re-registering broker4. - // In production, the PulsarService shuts down after unregister(), so the handler - // never fires. In tests, the service stays running, creating a race. - var registry4 = (BrokerRegistryImpl) loadManager4.getBrokerRegistry(); - registry4.state.set(BrokerRegistryImpl.State.Closed); - String brokerZkPath = "/loadbalance/brokers/" + pulsar4.getBrokerId(); - pulsar4.getLocalMetadataStore().delete(brokerZkPath, Optional.empty()).get(); - - NamespaceName slaMonitorNamespace = - getSLAMonitorNamespace(pulsar4.getBrokerId(), pulsar.getConfiguration()); - String slaMonitorTopic = slaMonitorNamespace.getPersistentTopicName("test"); - // Wait for ownership to be reassigned after broker unregistration - String pulsar4BrokerUrl = pulsar4.getBrokerServiceUrl(); - Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { - String lookupResult = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); - assertNotNull(lookupResult); - assertNotEquals(lookupResult, pulsar4BrokerUrl); - }); - String result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); - log.info("{} Namespace is re-owned by {}", slaMonitorTopic, result); - - Producer<String> producer = pulsar.getClient().newProducer(Schema.STRING) - .topic(slaMonitorTopic).create(); - producer.send("t1"); - - // Test re-register broker and check the lookup result - // Reset state to Started to allow re-registration. - registry4.state.set(BrokerRegistryImpl.State.Started); - registry4.registerAsync().get(); - - // Wait for ownership to be reassigned back to pulsar4 after re-registration - Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { - String reRegResult = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); - assertNotNull(reRegResult); - assertEquals(reRegResult, pulsar4.getBrokerServiceUrl()); - }); - result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); - log.info("{} Namespace is re-owned by {}", slaMonitorTopic, result); + // === Phase 3: simulate the cross-impl broker going offline === + // Its SLA namespace must reassign to a remaining broker, and the ownership + // change must propagate through the syncer to brokers using the other impl. + var wrapper3 = (ExtensibleLoadManagerWrapper) pulsar3.getLoadManager().get(); + var loadManager3 = (ExtensibleLoadManagerImpl) + FieldUtils.readField(wrapper3, "loadManager", true); + ServiceUnitStateChannel channel3 = (ServiceUnitStateChannel) + FieldUtils.readField(loadManager3, "serviceUnitStateChannel", true); + channel3.cleanOwnerships(); + // Set state to Closed BEFORE deleting the ZK node to prevent the notification + // handler's session-expiry recovery from auto-re-registering broker3. In + // production the PulsarService shuts down after unregister(), so the handler + // never fires; in tests the service stays running and creates a race. + var registry3 = (BrokerRegistryImpl) loadManager3.getBrokerRegistry(); + registry3.state.set(BrokerRegistryImpl.State.Closed); + pulsar3.getLocalMetadataStore() + .delete("/loadbalance/brokers/" + pulsar3.getBrokerId(), Optional.empty()).get(); + + String slaMonitorTopic = getSLAMonitorNamespace(pulsar3.getBrokerId(), pulsar.getConfiguration()) + .getPersistentTopicName("test"); + String pulsar3BrokerUrl = pulsar3.getBrokerServiceUrl(); + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { + String reassigned = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); + assertNotNull(reassigned); + assertNotEquals(reassigned, pulsar3BrokerUrl); + }); - producer.send("t2"); - Producer<String> producer1 = pulsar.getClient().newProducer(Schema.STRING) - .topic(slaMonitorTopic).create(); - producer1.send("t3"); + // Send a message while the topic is owned by the reassigned broker; this must + // remain durable when ownership migrates back below. + @Cleanup + Producer<String> producer = pulsar.getClient().newProducer(Schema.STRING) + .topic(slaMonitorTopic).create(); + producer.send("offline"); + + // === Phase 4: re-register the broker and verify ownership returns === + registry3.state.set(BrokerRegistryImpl.State.Started); + registry3.registerAsync().get(); + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> + assertEquals(pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic), + pulsar3.getBrokerServiceUrl())); + + // Same producer reconnects to the new owner; a fresh producer also works. + producer.send("after-reconnect"); + @Cleanup + Producer<String> producer2 = pulsar.getClient().newProducer(Schema.STRING) + .topic(slaMonitorTopic).create(); + producer2.send("from-new-producer"); - producer.close(); - producer1.close(); - @Cleanup - Consumer<String> consumer = pulsar.getClient().newConsumer(Schema.STRING) - .topic(slaMonitorTopic) - .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .subscriptionName("test") - .subscribe(); - // receive message t1 t2 t3 - assertEquals(consumer.receive().getValue(), "t1"); - assertEquals(consumer.receive().getValue(), "t2"); - assertEquals(consumer.receive().getValue(), "t3"); - } + @Cleanup + Consumer<String> consumer = pulsar.getClient().newConsumer(Schema.STRING) + .topic(slaMonitorTopic) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionName("test") + .subscribe(); + assertEquals(consumer.receive().getValue(), "offline"); + assertEquals(consumer.receive().getValue(), "after-reconnect"); + assertEquals(consumer.receive().getValue(), "from-new-producer"); } + // === Phase 5: disable the syncer and verify it deactivates === pulsar.getAdminClient().brokers() .deleteDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer"); - // Wait for config deletion to propagate before leader transition Awaitility.await().untilAsserted(() -> assertFalse(pulsar1.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled())); - Awaitility.await().untilAsserted(() -> - assertFalse(pulsar2.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled())); - makeSecondaryAsLeader(); + primaryLoadManager.monitor(); Awaitility.await().atMost(30, TimeUnit.SECONDS) .untilAsserted(() -> assertFalse(primaryLoadManager.getServiceUnitStateTableViewSyncer() .isActive())); - Awaitility.await().atMost(30, TimeUnit.SECONDS) - .untilAsserted(() -> assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer() - .isActive())); + assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive()); } private void assertLookupHeartbeatOwner(PulsarService pulsar,
