This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 76a2394022b [fix][broker] Force EnsemblePolicies to resolve network 
location after rackInfoMap is updated due to changes in /ledgers/available 
znode (#25067)
76a2394022b is described below

commit 76a2394022b0f02ec0cbf823f72656826174b265
Author: Malla Sandeep <[email protected]>
AuthorDate: Fri Dec 12 11:25:53 2025 +0530

    [fix][broker] Force EnsemblePolicies to resolve network location after 
rackInfoMap is updated due to changes in /ledgers/available znode (#25067)
---
 .../rackawareness/BookieRackAffinityMapping.java   |  26 +++-
 .../BookieRackAffinityMappingTest.java             | 143 ++++++++++++++++++---
 2 files changed, 146 insertions(+), 23 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
index 2df4dc22c14..d1662100e3e 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
@@ -22,6 +22,7 @@ import static 
org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.METAD
 import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -64,7 +65,7 @@ public class BookieRackAffinityMapping extends 
AbstractDNSToSwitchMapping
     public static final String METADATA_STORE_INSTANCE = 
"METADATA_STORE_INSTANCE";
 
     private MetadataCache<BookiesRackConfiguration> bookieMappingCache = null;
-    private ITopologyAwareEnsemblePlacementPolicy<BookieNode> rackawarePolicy 
= null;
+    private volatile ITopologyAwareEnsemblePlacementPolicy<BookieNode> 
rackawarePolicy = null;
     private List<BookieId> bookieAddressListLastTime = new ArrayList<>();
 
     private BookiesRackConfiguration racksWithHost = new 
BookiesRackConfiguration();
@@ -157,7 +158,7 @@ public class BookieRackAffinityMapping extends 
AbstractDNSToSwitchMapping
                 registrationClient.watchWritableBookies(versioned -> {
                     bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
                             .thenApply(optRes -> 
optRes.orElseGet(BookiesRackConfiguration::new))
-                            .thenAccept(this::updateRacksWithHost)
+                            .thenApply(this::processRackUpdate)
                             .exceptionally(ex -> {
                                 LOG.error("Failed to update rack info. ", ex);
                                 return null;
@@ -169,6 +170,17 @@ public class BookieRackAffinityMapping extends 
AbstractDNSToSwitchMapping
         }
     }
 
+    private Void processRackUpdate(BookiesRackConfiguration racks) {
+        ArrayList<BookieId> bookieIdSet;
+        synchronized (this) {
+            updateRacksWithHost(racks);
+            bookieIdSet = new ArrayList<>(bookieAddressListLastTime);
+        }
+        // Notify ensemble placement policy after rack info is updated to 
ensure consistent state.
+        rackChangeListenerCallback(bookieIdSet);
+        return null;
+    }
+
     private synchronized void updateRacksWithHost(BookiesRackConfiguration 
racks) {
         // In config z-node, the bookies are added in the `ip:port` notation, 
while BK will ask
         // for just the IP/hostname when trying to get the rack for a bookie.
@@ -274,12 +286,16 @@ public class BookieRackAffinityMapping extends 
AbstractDNSToSwitchMapping
                         bookieIdSet.addAll(bookieAddressListLastTime);
                         bookieAddressListLastTime = bookieAddressList;
                     }
-                    if (rackawarePolicy != null) {
-                        rackawarePolicy.onBookieRackChange(new 
ArrayList<>(bookieIdSet));
-                    }
+                    rackChangeListenerCallback(bookieIdSet);
                 });
     }
 
+    private void rackChangeListenerCallback(Collection<BookieId> bookieIdSet) {
+        if (rackawarePolicy != null) {
+            rackawarePolicy.onBookieRackChange(new ArrayList<>(bookieIdSet));
+        }
+    }
+
     @Override
     public void 
registerRackChangeListener(ITopologyAwareEnsemblePlacementPolicy<BookieNode> 
rackawarePolicy) {
         this.rackawarePolicy = rackawarePolicy;
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
index 46466dc6f97..a8f39917869 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
@@ -19,6 +19,9 @@
 package org.apache.pulsar.bookie.rackawareness;
 
 import static 
org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
+import static 
org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
@@ -37,6 +40,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -56,9 +60,12 @@ import org.apache.bookkeeper.net.NetworkTopology;
 import org.apache.bookkeeper.proto.BookieAddressResolver;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.pulsar.common.policies.data.BookieInfo;
 import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreFactory;
@@ -98,7 +105,7 @@ public class BookieRackAffinityMappingTest {
         String data = "{\"group1\": {\"" + bookie1
                 + "\": {\"rack\": \"/rack0\", \"hostname\": 
\"bookie1.example.com\"}, \"" + bookie2
                 + "\": {\"rack\": \"/rack1\", \"hostname\": 
\"bookie2.example.com\"}}}";
-        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
data.getBytes(), Optional.empty()).join();
+        store.put(BOOKIE_INFO_ROOT_PATH, data.getBytes(), 
Optional.empty()).join();
 
         // Case1: ZKCache is given
         BookieRackAffinityMapping mapping = new BookieRackAffinityMapping();
@@ -133,7 +140,7 @@ public class BookieRackAffinityMappingTest {
                 + "\": {\"rack\": \"/\", \"hostname\": 
\"bookie1.example.com\"}, \"" + bookie2
                 + "\": {\"rack\": \"\", \"hostname\": 
\"bookie2.example.com\"}}}";
 
-        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
data.getBytes(), Optional.empty()).join();
+        store.put(BOOKIE_INFO_ROOT_PATH, data.getBytes(), 
Optional.empty()).join();
 
         // Case1: ZKCache is given
         BookieRackAffinityMapping mapping1 = new BookieRackAffinityMapping();
@@ -171,7 +178,7 @@ public class BookieRackAffinityMappingTest {
 
         bookieMapping.put("group1", mainBookieGroup);
 
-        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper.writeValueAsBytes(bookieMapping),
+        store.put(BOOKIE_INFO_ROOT_PATH, 
jsonMapper.writeValueAsBytes(bookieMapping),
                 Optional.empty()).join();
 
         Awaitility.await().untilAsserted(() -> {
@@ -193,7 +200,7 @@ public class BookieRackAffinityMappingTest {
 
         bookieMapping.put("group1", mainBookieGroup);
 
-        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper.writeValueAsBytes(bookieMapping),
+        store.put(BOOKIE_INFO_ROOT_PATH, 
jsonMapper.writeValueAsBytes(bookieMapping),
                 Optional.empty()).join();
 
         BookieRackAffinityMapping mapping = new BookieRackAffinityMapping();
@@ -213,7 +220,7 @@ public class BookieRackAffinityMappingTest {
         secondaryBookieGroup.put(bookie3, 
BookieInfo.builder().rack("rack0").build());
 
         bookieMapping.put("group2", secondaryBookieGroup);
-        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper.writeValueAsBytes(bookieMapping),
+        store.put(BOOKIE_INFO_ROOT_PATH, 
jsonMapper.writeValueAsBytes(bookieMapping),
                 Optional.empty()).join();
         Awaitility.await().untilAsserted(() -> {
             List<String> r = mapping.resolve(Lists.newArrayList("127.0.0.1", 
"127.0.0.2", "127.0.0.3"));
@@ -221,7 +228,7 @@ public class BookieRackAffinityMappingTest {
             assertEquals(r.get(1), "/rack1");
             assertEquals(r.get(2), "/rack0");
         });
-        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
"{}".getBytes(),
+        store.put(BOOKIE_INFO_ROOT_PATH, "{}".getBytes(),
                 Optional.empty()).join();
 
         Awaitility.await().untilAsserted(() -> {
@@ -237,7 +244,7 @@ public class BookieRackAffinityMappingTest {
         String data = "{\"group1\": {\"" + bookie1
                 + "\": {\"rack\": \"/rack0\", \"hostname\": 
\"bookie1.example.com\"}, \"" + bookie2
                 + "\": {\"rack\": \"/rack1\", \"hostname\": 
\"bookie2.example.com\"}}}";
-        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
data.getBytes(), Optional.empty()).join();
+        store.put(BOOKIE_INFO_ROOT_PATH, data.getBytes(), 
Optional.empty()).join();
 
         // Case1: ZKCache is given
         BookieRackAffinityMapping mapping = new BookieRackAffinityMapping();
@@ -260,11 +267,7 @@ public class BookieRackAffinityMappingTest {
         assertEquals(racks.size(), 0);
 
         @Cleanup("stop")
-        HashedWheelTimer timer = new HashedWheelTimer(
-                new 
ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(),
-                bkClientConf.getTimeoutTimerTickDurationMs(), 
TimeUnit.MILLISECONDS,
-                bkClientConf.getTimeoutTimerNumTicks());
-
+        HashedWheelTimer timer = getTestHashedWheelTimer(bkClientConf);
         RackawareEnsemblePlacementPolicy repp = new 
RackawareEnsemblePlacementPolicy();
         mapping.registerRackChangeListener(repp);
         Class<?> clazz1 = 
Class.forName("org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy");
@@ -342,7 +345,7 @@ public class BookieRackAffinityMappingTest {
         //remove bookie2 rack, the bookie2 rack should be /default-rack
         data = "{\"group1\": {\"" + bookie1
                 + "\": {\"rack\": \"/rack0\", \"hostname\": 
\"bookie1.example.com\"}}}";
-        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
data.getBytes(), Optional.empty()).join();
+        store.put(BOOKIE_INFO_ROOT_PATH, data.getBytes(), 
Optional.empty()).join();
         Awaitility.await().atMost(30, TimeUnit.SECONDS).until(
                 () -> ((BookiesRackConfiguration) 
field.get(mapping)).get("group1").size() == 1);
 
@@ -367,10 +370,7 @@ public class BookieRackAffinityMappingTest {
         mapping.setConf(bkClientConf);
 
         @Cleanup("stop")
-        HashedWheelTimer timer = new HashedWheelTimer(new 
ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(),
-                bkClientConf.getTimeoutTimerTickDurationMs(), 
TimeUnit.MILLISECONDS,
-                bkClientConf.getTimeoutTimerNumTicks());
-
+        HashedWheelTimer timer = getTestHashedWheelTimer(bkClientConf);
         RackawareEnsemblePlacementPolicy repp = new 
RackawareEnsemblePlacementPolicy();
         repp.initialize(bkClientConf, Optional.of(mapping), timer,
                 DISABLE_ALL, NullStatsLogger.INSTANCE, 
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
@@ -391,7 +391,7 @@ public class BookieRackAffinityMappingTest {
                         
BookieRackAffinityMapping.class.getDeclaredMethod("handleUpdates", 
Notification.class);
                 handleUpdates.setAccessible(true);
                 Notification n =
-                        new Notification(NotificationType.Modified, 
BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH);
+                        new Notification(NotificationType.Modified, 
BOOKIE_INFO_ROOT_PATH);
                 long start = System.currentTimeMillis();
                 while (System.currentTimeMillis() - start < 2_000) {
                     handleUpdates.invoke(mapping, n);
@@ -415,4 +415,111 @@ public class BookieRackAffinityMappingTest {
 
         assertTrue(count.await(3, TimeUnit.SECONDS));
     }
+
+    @Test
+    public void testZKEventListenersOrdering() throws Exception {
+        @Cleanup
+        PulsarRegistrationClient pulsarRegistrationClient =
+                new PulsarRegistrationClient(store, "/ledgers");
+        DefaultBookieAddressResolver defaultBookieAddressResolver =
+                new DefaultBookieAddressResolver(pulsarRegistrationClient);
+        // Create and configure the mapping
+        BookieRackAffinityMapping mapping = new BookieRackAffinityMapping();
+        ClientConfiguration bkClientConf = new ClientConfiguration();
+        
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, 
store);
+        mapping.setBookieAddressResolver(defaultBookieAddressResolver);
+        mapping.setConf(bkClientConf);
+
+        // Create RackawareEnsemblePlacementPolicy and initialize it
+        @Cleanup("stop")
+        HashedWheelTimer timer = getTestHashedWheelTimer(bkClientConf);
+        RackawareEnsemblePlacementPolicy repp = new 
RackawareEnsemblePlacementPolicy();
+        repp.initialize(bkClientConf, Optional.of(mapping), timer,
+                DISABLE_ALL, NullStatsLogger.INSTANCE, 
defaultBookieAddressResolver);
+        mapping.registerRackChangeListener(repp);
+
+        // Create a BookieWatcherImpl instance via reflection
+        Class<?> watcherClazz = 
Class.forName("org.apache.bookkeeper.client.BookieWatcherImpl");
+        Constructor<?> constructor = watcherClazz.getDeclaredConstructor(
+                ClientConfiguration.class,
+                EnsemblePlacementPolicy.class,
+                RegistrationClient.class,
+                BookieAddressResolver.class,
+                StatsLogger.class);
+        constructor.setAccessible(true);
+        Object watcher = constructor.newInstance(
+                bkClientConf,
+                repp,
+                pulsarRegistrationClient,
+                defaultBookieAddressResolver,
+                NullStatsLogger.INSTANCE
+        );
+        Method initMethod = 
watcherClazz.getDeclaredMethod("initialBlockingBookieRead");
+        initMethod.setAccessible(true);
+        initMethod.invoke(watcher);
+
+        // Prepare a BookiesRackConfiguration that maps bookie1 -> /rack0
+        BookieInfo bi = BookieInfo.builder().rack("/rack0").build();
+        BookiesRackConfiguration racks = new BookiesRackConfiguration();
+        racks.updateBookie("group1", bookie1.toString(), bi);
+
+        // Create a mock cache for racks /bookies
+        MetadataCache<BookiesRackConfiguration> mockCache = 
mock(MetadataCache.class);
+        Field f = 
BookieRackAffinityMapping.class.getDeclaredField("bookieMappingCache");
+        f.setAccessible(true);
+        f.set(mapping, mockCache);
+        when(mockCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH))
+                
.thenReturn(CompletableFuture.completedFuture(Optional.of(racks)));
+
+        // Inject the bookie address list into BookieRackAffinityMapping
+        Field addressListField = 
BookieRackAffinityMapping.class.getDeclaredField("bookieAddressListLastTime");
+        addressListField.setAccessible(true);
+        addressListField.set(mapping, List.of(bookie1.toBookieId()));
+
+        // Inject the writable bookie into PulsarRegistrationClient
+        Field writableField = 
PulsarRegistrationClient.class.getDeclaredField("writableBookieInfo");
+        writableField.setAccessible(true);
+        @SuppressWarnings("unchecked")
+        Map<BookieId, Versioned<BookieServiceInfo>> writableBookieInfo =
+                (Map<BookieId, Versioned<BookieServiceInfo>>) 
writableField.get(pulsarRegistrationClient);
+        writableBookieInfo.put(
+                bookie1.toBookieId(),
+                new 
Versioned<>(BookieServiceInfoUtils.buildLegacyBookieServiceInfo(bookie1.toString()),
 Version.NEW)
+        );
+
+        // watcher.processWritableBookiesChanged runs FIRST triggering 
RackAware ensemble policy listener → incorrect
+        // ordering
+        Method procMethod =
+                
watcherClazz.getDeclaredMethod("processWritableBookiesChanged", 
java.util.Set.class);
+        procMethod.setAccessible(true);
+        Set<BookieId> ids = new HashSet<>();
+        ids.add(bookie1.toBookieId());
+        procMethod.invoke(watcher, ids);
+
+        // BookieRackAffinityMapping rack mapping update runs SECOND → delayed 
rack info
+        Method processRackUpdateMethod = 
BookieRackAffinityMapping.class.getDeclaredMethod("processRackUpdate",
+                BookiesRackConfiguration.class);
+        processRackUpdateMethod.setAccessible(true);
+        processRackUpdateMethod.invoke(mapping, racks);
+
+        // -------------------
+        // NOW CHECK REPP INTERNAL STATE
+        // -------------------
+        // BookieNode.getNetworkLocation()
+        Class<?> clazz1 = 
Class.forName("org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy");
+        Field field1 = clazz1.getDeclaredField("knownBookies");
+        field1.setAccessible(true);
+        Map<BookieId, BookieNode> knownBookies = (Map<BookieId, BookieNode>) 
field1.get(repp);
+        BookieNode bn = knownBookies.get(bookie1.toBookieId());
+        // Rack info update is delayed but because of new callback the rack 
info on ensemble policy should be updated.
+        assertEquals(bn.getNetworkLocation(), "/rack0",
+                "Network location should match /rack0 on bookie");
+    }
+
+    private static HashedWheelTimer 
getTestHashedWheelTimer(ClientConfiguration bkClientConf) {
+        return new HashedWheelTimer(
+                new 
ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(),
+                bkClientConf.getTimeoutTimerTickDurationMs(), 
TimeUnit.MILLISECONDS,
+                bkClientConf.getTimeoutTimerNumTicks());
+    }
 }

Reply via email to