This is an automated email from the ASF dual-hosted git repository. coderzc pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5ab5452484cf262b35ff8561b256bb0fa1aba014 Author: coderzc <[email protected]> AuthorDate: Wed May 13 12:18:22 2026 +0800 Revert "[fix][test] Reduce flakiness in testLoadBalancerServiceUnitTableViewSyncer (#25638)" This reverts commit 97613e0f0ac8eb9168c7e114d17cdd957816eabc. --- .../extensions/ExtensibleLoadManagerImplTest.java | 337 ++++++++++++++------- 1 file changed, 226 insertions(+), 111 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 97921488002..94c60356b4a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -1298,148 +1298,263 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase @Test(priority = 200) public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception { - // Make pulsar1 the leader so primaryLoadManager is the syncer-running broker. - makePrimaryAsLeader(); Pair<TopicName, NamespaceBundle> topicAndBundle = getBundleIsNotOwnByChangeEventTopic("testLoadBalancerServiceUnitTableViewSyncer"); + TopicName topicName = topicAndBundle.getLeft(); NamespaceBundle bundle = topicAndBundle.getRight(); - String topic = topicAndBundle.getLeft().toString(); + String topic = topicName.toString(); + + String lookupResultBefore1 = pulsar1.getAdminClient().lookups().lookupTopic(topic); + String lookupResultBefore2 = pulsar2.getAdminClient().lookups().lookupTopic(topic); + assertEquals(lookupResultBefore1, lookupResultBefore2); LookupOptions options = LookupOptions.builder() .authoritative(false) .requestHttps(false) .readOnly(false) .loadTopicsInBundle(false).build(); + Optional<URL> webServiceUrlBefore1 = + pulsar1.getNamespaceService().getWebServiceUrl(bundle, options); + assertTrue(webServiceUrlBefore1.isPresent()); - String ownershipBefore = pulsar1.getAdminClient().lookups().lookupTopic(topic); - assertEquals(pulsar2.getAdminClient().lookups().lookupTopic(topic), ownershipBefore); - Optional<URL> webUrlBefore = pulsar1.getNamespaceService().getWebServiceUrl(bundle, options); - assertTrue(webUrlBefore.isPresent()); - - // === Phase 1: enable the syncer and verify it activates on the leader only === - // The syncer is started inside ExtensibleLoadManagerImpl.monitor() when the - // dynamic config is enabled and the broker is the channel owner. Calling - // monitor() directly avoids forcing a leader transition (which serializes - // playLeader() behind playFollower() on the single-threaded load manager - // executor and was the root cause of repeated 30s+ flakes here). - String syncerType = - serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName()) - ? "SystemTopicToMetadataStoreSyncer" : "MetadataStoreToSystemTopicSyncer"; + Optional<URL> webServiceUrlBefore2 = + pulsar2.getNamespaceService().getWebServiceUrl(bundle, options); + assertTrue(webServiceUrlBefore2.isPresent()); + assertEquals(webServiceUrlBefore2.get().toString(), webServiceUrlBefore1.get().toString()); + + + String syncerTyp = serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName()) + ? "SystemTopicToMetadataStoreSyncer" : "MetadataStoreToSystemTopicSyncer"; pulsar.getAdminClient().brokers() - .updateDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer", syncerType); + .updateDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer", syncerTyp); + // Wait for the dynamic config to propagate to both brokers before triggering + // leader transitions. Without this, the leader callback may see the old config + // and skip activating the syncer. Awaitility.await().untilAsserted(() -> assertTrue(pulsar1.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled())); - primaryLoadManager.monitor(); + Awaitility.await().untilAsserted(() -> + assertTrue(pulsar2.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled())); + makeSecondaryAsLeader(); + makePrimaryAsLeader(); Awaitility.await().atMost(30, TimeUnit.SECONDS) .untilAsserted(() -> assertTrue(primaryLoadManager.getServiceUnitStateTableViewSyncer() .isActive())); - assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive()); - - // === Phase 2: add a 3rd broker using the OTHER table view impl === - // pulsar1/pulsar2 use serviceUnitStateTableViewClassName; pulsar3 deliberately - // uses the other one so the test exercises cross-impl lookups regardless of - // which parametrization we're running. - String otherClassName = - serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName()) - ? ServiceUnitStateMetadataStoreTableViewImpl.class.getName() - : ServiceUnitStateTableViewImpl.class.getName(); - - ServiceConfiguration crossImplConf = getDefaultConf(); - crossImplConf.setAllowAutoTopicCreation(true); - crossImplConf.setForceDeleteNamespaceAllowed(true); - crossImplConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getCanonicalName()); - crossImplConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName()); - crossImplConf.setLoadManagerServiceUnitStateTableViewClassName(otherClassName); - - try (var crossImplCtx = createAdditionalPulsarTestContext(crossImplConf)) { - var pulsar3 = crossImplCtx.getPulsarService(); - - // All three brokers (across both impls) must agree on topic ownership. - assertEquals(pulsar2.getAdminClient().lookups().lookupTopic(topic), ownershipBefore); - assertEquals(pulsar3.getAdminClient().lookups().lookupTopic(topic), ownershipBefore); - Optional<URL> webUrlPulsar3 = pulsar3.getNamespaceService().getWebServiceUrl(bundle, options); - assertTrue(webUrlPulsar3.isPresent()); - assertEquals(webUrlPulsar3.get().toString(), webUrlBefore.get().toString()); - - // SLA monitor and heartbeat lookups must agree across impls in every direction. - List<PulsarService> brokers = List.of(pulsar1, pulsar2, pulsar3); - for (PulsarService viewer : brokers) { - for (PulsarService owner : brokers) { - assertLookupHeartbeatOwner(viewer, owner.getBrokerId(), owner.getBrokerServiceUrl()); - assertLookupSLANamespaceOwner(viewer, owner.getBrokerId(), owner.getBrokerServiceUrl()); + Awaitility.await().atMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer() + .isActive())); + ServiceConfiguration defaultConf = getDefaultConf(); + defaultConf.setAllowAutoTopicCreation(true); + defaultConf.setForceDeleteNamespaceAllowed(true); + defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getCanonicalName()); + defaultConf.setLoadBalancerSheddingEnabled(false); + defaultConf.setLoadManagerServiceUnitStateTableViewClassName(ServiceUnitStateTableViewImpl.class.getName()); + try (var additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf)) { + // start pulsar3 with ServiceUnitStateTableViewImpl + @Cleanup + var pulsar3 = additionalPulsarTestContext.getPulsarService(); + + String lookupResult1 = pulsar3.getAdminClient().lookups().lookupTopic(topic); + String lookupResult2 = pulsar1.getAdminClient().lookups().lookupTopic(topic); + String lookupResult3 = pulsar2.getAdminClient().lookups().lookupTopic(topic); + assertEquals(lookupResult1, lookupResult2); + assertEquals(lookupResult1, lookupResult3); + assertEquals(lookupResult1, lookupResultBefore1); + + Optional<URL> webServiceUrl1 = + pulsar1.getNamespaceService().getWebServiceUrl(bundle, options); + assertTrue(webServiceUrl1.isPresent()); + + Optional<URL> webServiceUrl2 = + pulsar2.getNamespaceService().getWebServiceUrl(bundle, options); + assertTrue(webServiceUrl2.isPresent()); + assertEquals(webServiceUrl2.get().toString(), webServiceUrl1.get().toString()); + + Optional<URL> webServiceUrl3 = + pulsar3.getNamespaceService().getWebServiceUrl(bundle, options); + assertTrue(webServiceUrl3.isPresent()); + assertEquals(webServiceUrl3.get().toString(), webServiceUrl1.get().toString()); + + assertEquals(webServiceUrl3.get().toString(), webServiceUrlBefore1.get().toString()); + + List<PulsarService> pulsarServices = List.of(pulsar1, pulsar2, pulsar3); + for (PulsarService pulsarService : pulsarServices) { + // Test lookup heartbeat namespace's topic + for (PulsarService pulsar : pulsarServices) { + assertLookupHeartbeatOwner(pulsarService, + pulsar.getBrokerId(), pulsar.getBrokerServiceUrl()); + } + // Test lookup SLA namespace's topic + for (PulsarService pulsar : pulsarServices) { + assertLookupSLANamespaceOwner(pulsarService, + pulsar.getBrokerId(), pulsar.getBrokerServiceUrl()); } } - // === Phase 3: simulate the cross-impl broker going offline === - // Its SLA namespace must reassign to a remaining broker, and the ownership - // change must propagate through the syncer to brokers using the other impl. - var wrapper3 = (ExtensibleLoadManagerWrapper) pulsar3.getLoadManager().get(); - var loadManager3 = (ExtensibleLoadManagerImpl) - FieldUtils.readField(wrapper3, "loadManager", true); - ServiceUnitStateChannel channel3 = (ServiceUnitStateChannel) - FieldUtils.readField(loadManager3, "serviceUnitStateChannel", true); - channel3.cleanOwnerships(); - // Set state to Closed BEFORE deleting the ZK node to prevent the notification - // handler's session-expiry recovery from auto-re-registering broker3. In - // production the PulsarService shuts down after unregister(), so the handler - // never fires; in tests the service stays running and creates a race. - var registry3 = (BrokerRegistryImpl) loadManager3.getBrokerRegistry(); - registry3.state.set(BrokerRegistryImpl.State.Closed); - pulsar3.getLocalMetadataStore() - .delete("/loadbalance/brokers/" + pulsar3.getBrokerId(), Optional.empty()).get(); - - String slaMonitorTopic = getSLAMonitorNamespace(pulsar3.getBrokerId(), pulsar.getConfiguration()) - .getPersistentTopicName("test"); - String pulsar3BrokerUrl = pulsar3.getBrokerServiceUrl(); - Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { - String reassigned = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); - assertNotNull(reassigned); - assertNotEquals(reassigned, pulsar3BrokerUrl); - }); + // Start broker4 with ServiceUnitStateMetadataStoreTableViewImpl + ServiceConfiguration conf = getDefaultConf(); + conf.setAllowAutoTopicCreation(true); + conf.setForceDeleteNamespaceAllowed(true); + conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getCanonicalName()); + conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName()); + conf.setLoadManagerServiceUnitStateTableViewClassName( + ServiceUnitStateMetadataStoreTableViewImpl.class.getName()); + try (var additionPulsarTestContext = createAdditionalPulsarTestContext(conf)) { + @Cleanup + var pulsar4 = additionPulsarTestContext.getPulsarService(); - // Send a message while the topic is owned by the reassigned broker; this must - // remain durable when ownership migrates back below. - @Cleanup - Producer<String> producer = pulsar.getClient().newProducer(Schema.STRING) - .topic(slaMonitorTopic).create(); - producer.send("offline"); - - // === Phase 4: re-register the broker and verify ownership returns === - registry3.state.set(BrokerRegistryImpl.State.Started); - registry3.registerAsync().get(); - Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> - assertEquals(pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic), - pulsar3.getBrokerServiceUrl())); - - // Same producer reconnects to the new owner; a fresh producer also works. - producer.send("after-reconnect"); - @Cleanup - Producer<String> producer2 = pulsar.getClient().newProducer(Schema.STRING) - .topic(slaMonitorTopic).create(); - producer2.send("from-new-producer"); + Set<String> availableCandidates = Sets.newHashSet( + pulsar1.getBrokerServiceUrl(), + pulsar2.getBrokerServiceUrl(), + pulsar3.getBrokerServiceUrl(), + pulsar4.getBrokerServiceUrl()); + String lookupResult4 = pulsar4.getAdminClient().lookups().lookupTopic(topic); + assertTrue(availableCandidates.contains(lookupResult4)); - @Cleanup - Consumer<String> consumer = pulsar.getClient().newConsumer(Schema.STRING) - .topic(slaMonitorTopic) - .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .subscriptionName("test") - .subscribe(); - assertEquals(consumer.receive().getValue(), "offline"); - assertEquals(consumer.receive().getValue(), "after-reconnect"); - assertEquals(consumer.receive().getValue(), "from-new-producer"); + String lookupResult5 = pulsar1.getAdminClient().lookups().lookupTopic(topic); + String lookupResult6 = pulsar2.getAdminClient().lookups().lookupTopic(topic); + String lookupResult7 = pulsar3.getAdminClient().lookups().lookupTopic(topic); + assertEquals(lookupResult4, lookupResult5); + assertEquals(lookupResult4, lookupResult6); + assertEquals(lookupResult4, lookupResult7); + assertEquals(lookupResult4, lookupResultBefore1); + + + Pair<TopicName, NamespaceBundle> topicAndBundle2 = + getBundleIsNotOwnByChangeEventTopic("testLoadBalancerServiceUnitTableViewSyncer2"); + String topic2 = topicAndBundle2.getLeft().toString(); + + String lookupResult8 = pulsar1.getAdminClient().lookups().lookupTopic(topic2); + String lookupResult9 = pulsar2.getAdminClient().lookups().lookupTopic(topic2); + String lookupResult10 = pulsar3.getAdminClient().lookups().lookupTopic(topic2); + String lookupResult11 = pulsar4.getAdminClient().lookups().lookupTopic(topic2); + assertEquals(lookupResult9, lookupResult8); + assertEquals(lookupResult10, lookupResult8); + assertEquals(lookupResult11, lookupResult8); + + Set<String> availableWebUrlCandidates = Sets.newHashSet( + pulsar1.getWebServiceAddress(), + pulsar2.getWebServiceAddress(), + pulsar3.getWebServiceAddress(), + pulsar4.getWebServiceAddress()); + + webServiceUrl1 = + pulsar1.getNamespaceService().getWebServiceUrl(bundle, options); + assertTrue(webServiceUrl1.isPresent()); + assertTrue(availableWebUrlCandidates.contains(webServiceUrl1.get().toString())); + + webServiceUrl2 = + pulsar2.getNamespaceService().getWebServiceUrl(bundle, options); + assertTrue(webServiceUrl2.isPresent()); + assertEquals(webServiceUrl2.get().toString(), webServiceUrl1.get().toString()); + + webServiceUrl3 = + pulsar3.getNamespaceService().getWebServiceUrl(bundle, options); + assertTrue(webServiceUrl3.isPresent()); + assertTrue(availableWebUrlCandidates.contains(webServiceUrl3.get().toString())); + + var webServiceUrl4 = + pulsar4.getNamespaceService().getWebServiceUrl(bundle, options); + assertTrue(webServiceUrl4.isPresent()); + assertEquals(webServiceUrl4.get().toString(), webServiceUrl1.get().toString()); + assertEquals(webServiceUrl4.get().toString(), webServiceUrlBefore1.get().toString()); + + pulsarServices = List.of(pulsar1, pulsar2, pulsar3, pulsar4); + for (PulsarService pulsarService : pulsarServices) { + // Test lookup heartbeat namespace's topic + for (PulsarService pulsar : pulsarServices) { + assertLookupHeartbeatOwner(pulsarService, + pulsar.getBrokerId(), pulsar.getBrokerServiceUrl()); + } + // Test lookup SLA namespace's topic + for (PulsarService pulsar : pulsarServices) { + assertLookupSLANamespaceOwner(pulsarService, + pulsar.getBrokerId(), pulsar.getBrokerServiceUrl()); + } + } + // Simulate broker going offline: clean ownerships first (as disableBroker() does), + // then unregister from the broker registry. + var wrapper = (ExtensibleLoadManagerWrapper) pulsar4.getLoadManager().get(); + var loadManager4 = spy((ExtensibleLoadManagerImpl) + FieldUtils.readField(wrapper, "loadManager", true)); + ServiceUnitStateChannel channel4 = (ServiceUnitStateChannel) + FieldUtils.readField(loadManager4, "serviceUnitStateChannel", true); + channel4.cleanOwnerships(); + // Simulate broker going offline by removing it from the broker registry. + // Set state to Closed BEFORE deleting the ZK node to prevent the notification + // handler's session-expiry recovery from auto-re-registering broker4. + // In production, the PulsarService shuts down after unregister(), so the handler + // never fires. In tests, the service stays running, creating a race. + var registry4 = (BrokerRegistryImpl) loadManager4.getBrokerRegistry(); + registry4.state.set(BrokerRegistryImpl.State.Closed); + String brokerZkPath = "/loadbalance/brokers/" + pulsar4.getBrokerId(); + pulsar4.getLocalMetadataStore().delete(brokerZkPath, Optional.empty()).get(); + + NamespaceName slaMonitorNamespace = + getSLAMonitorNamespace(pulsar4.getBrokerId(), pulsar.getConfiguration()); + String slaMonitorTopic = slaMonitorNamespace.getPersistentTopicName("test"); + // Wait for ownership to be reassigned after broker unregistration + String pulsar4BrokerUrl = pulsar4.getBrokerServiceUrl(); + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { + String lookupResult = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); + assertNotNull(lookupResult); + assertNotEquals(lookupResult, pulsar4BrokerUrl); + }); + String result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); + log.info("{} Namespace is re-owned by {}", slaMonitorTopic, result); + + Producer<String> producer = pulsar.getClient().newProducer(Schema.STRING) + .topic(slaMonitorTopic).create(); + producer.send("t1"); + + // Test re-register broker and check the lookup result + // Reset state to Started to allow re-registration. + registry4.state.set(BrokerRegistryImpl.State.Started); + registry4.registerAsync().get(); + + // Wait for ownership to be reassigned back to pulsar4 after re-registration + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { + String reRegResult = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); + assertNotNull(reRegResult); + assertEquals(reRegResult, pulsar4.getBrokerServiceUrl()); + }); + result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); + log.info("{} Namespace is re-owned by {}", slaMonitorTopic, result); + + producer.send("t2"); + Producer<String> producer1 = pulsar.getClient().newProducer(Schema.STRING) + .topic(slaMonitorTopic).create(); + producer1.send("t3"); + + producer.close(); + producer1.close(); + @Cleanup + Consumer<String> consumer = pulsar.getClient().newConsumer(Schema.STRING) + .topic(slaMonitorTopic) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionName("test") + .subscribe(); + // receive message t1 t2 t3 + assertEquals(consumer.receive().getValue(), "t1"); + assertEquals(consumer.receive().getValue(), "t2"); + assertEquals(consumer.receive().getValue(), "t3"); + } } - // === Phase 5: disable the syncer and verify it deactivates === pulsar.getAdminClient().brokers() .deleteDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer"); + // Wait for config deletion to propagate before leader transition Awaitility.await().untilAsserted(() -> assertFalse(pulsar1.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled())); - primaryLoadManager.monitor(); + Awaitility.await().untilAsserted(() -> + assertFalse(pulsar2.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled())); + makeSecondaryAsLeader(); Awaitility.await().atMost(30, TimeUnit.SECONDS) .untilAsserted(() -> assertFalse(primaryLoadManager.getServiceUnitStateTableViewSyncer() .isActive())); - assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive()); + Awaitility.await().atMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer() + .isActive())); } private void assertLookupHeartbeatOwner(PulsarService pulsar,
