This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-4.16
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 857959eb8c5cb3c1ceca2da5a36db26419a8a5f3
Author: YingQun Zhong <[email protected]>
AuthorDate: Sun Feb 16 07:17:49 2025 +0800

    remove in address2Region while bookie left to get correct rack info (#4504)
    
    (cherry picked from commit 07de650ea03612fb9b81647c2a55fd538d0cda3a)
---
 .../client/RegionAwareEnsemblePlacementPolicy.java |  17 +++
 .../TestRegionAwareEnsemblePlacementPolicy.java    | 121 ++++++++++++++++++++-
 2 files changed, 136 insertions(+), 2 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index eb8784a621..bc882589b3 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -117,6 +117,23 @@ public class RegionAwareEnsemblePlacementPolicy extends 
RackawareEnsemblePlaceme
 
     @Override
     public void handleBookiesThatLeft(Set<BookieId> leftBookies) {
+        // case 1: (In some situation, eg, Broker and bookie restart 
concurrently.)
+        //1. Bookie X join cluster for the first time, encounters a region 
exception, and `address2Region` record X's
+        // region as default-region.
+        //2. Bookie X left cluster and is removed from knownBookies, but 
address2Region retains the information of
+        // bookie X.
+        //3. update Bookie X's rack info, and calling `onBookieRackChange` 
will only update address2Region for
+        // addresses present in knownBookies; therefore, bookie X's region 
info is not updated.
+        //4. Bookie X join cluster again, since address2Region contains the 
previous default-region information,
+        // getRegion will directly use cached data, resulting of an incorrect 
region.
+
+        // The bookie region is initialized to "default-region" in 
address2Region.
+        // We should ensure that when a bookie leaves the cluster,
+        // we also clean up the corresponding region information for that 
bookie in address2Region,
+        // so that it can update the correct region for the bookie during 
onBookieRackChange and
+        // handleBookiesThatJoined.
+        // to avoid traffic skew in ensemble selection.
+        leftBookies.forEach(address2Region::remove);
         super.handleBookiesThatLeft(leftBookies);
 
         for (TopologyAwareEnsemblePlacementPolicy policy: 
perRegionPlacement.values()) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index 1f04ffa718..ca6c074b33 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -1066,7 +1066,7 @@ public class TestRegionAwareEnsemblePlacementPolicy 
extends TestCase {
         testEnsembleWithThreeRegionsReplaceInternal(1, true, false);
     }
 
-    public void testEnsembleWithThreeRegionsReplaceInternal(int minDurability, 
boolean disableDurability,
+    private void testEnsembleWithThreeRegionsReplaceInternal(int 
minDurability, boolean disableDurability,
             boolean disableOneRegion) throws Exception {
         repp.uninitalize();
         repp = new RegionAwareEnsemblePlacementPolicy();
@@ -1208,7 +1208,7 @@ public class TestRegionAwareEnsemblePlacementPolicy 
extends TestCase {
         testEnsembleDurabilityDisabledInternal(2, true);
     }
 
-    public void testEnsembleDurabilityDisabledInternal(int minDurability, 
boolean disableDurability) throws Exception {
+    private void testEnsembleDurabilityDisabledInternal(int minDurability, 
boolean disableDurability) throws Exception {
         repp.uninitalize();
         repp = new RegionAwareEnsemblePlacementPolicy();
         conf.setProperty(REPP_REGIONS_TO_WRITE, "region1;region2;region3");
@@ -2027,4 +2027,121 @@ public class TestRegionAwareEnsemblePlacementPolicy 
extends TestCase {
         assertEquals("region2", repp.address2Region.get(addr3.toBookieId()));
         assertEquals("region3", repp.address2Region.get(addr4.toBookieId()));
     }
+
+    @Test
+    public void testBookieLeftThenJoinWithDNSResolveFailed() throws Exception {
+
+        BookieSocketAddress addr1 = new BookieSocketAddress("127.0.1.1", 3181);
+        BookieSocketAddress addr2 = new BookieSocketAddress("127.0.1.2", 3181);
+        BookieSocketAddress addr3 = new BookieSocketAddress("127.0.1.3", 3181);
+        BookieSocketAddress addr4 = new BookieSocketAddress("127.0.1.4", 3181);
+
+        // init dns mapping
+        // 2. mock dns resolver failed, use default region and rack.
+        // addr1 rack info. /region-1/default-rack -> 
/default-region/default-rack.
+
+        // 1. mock addr1 dns resolver failed and use default region and rack.
+        StaticDNSResolver.addNodeToRack(addr1.getHostName(), 
"/default-region/default-rack");
+        StaticDNSResolver.addNodeToRack(addr2.getHostName(), 
"/region-1/default-rack");
+        StaticDNSResolver.addNodeToRack(addr3.getHostName(), 
"/region-2/default-rack");
+        StaticDNSResolver.addNodeToRack(addr4.getHostName(), 
"/region-3/default-rack");
+
+        // init cluster
+        Set<BookieId> addrs = Sets.newHashSet(addr1.toBookieId(),
+                addr2.toBookieId(), addr3.toBookieId(), addr4.toBookieId());
+        repp.onClusterChanged(addrs, new HashSet<>());
+
+        assertEquals(4, repp.knownBookies.size());
+        assertEquals("/default-region/default-rack", 
repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+        assertEquals("/region-1/default-rack", 
repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+        assertEquals("/region-2/default-rack", 
repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+        assertEquals("/region-3/default-rack", 
repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+        assertEquals(4, repp.perRegionPlacement.size());
+        TopologyAwareEnsemblePlacementPolicy unknownRegionPlacement = 
repp.perRegionPlacement.get("UnknownRegion");
+        assertEquals(1, unknownRegionPlacement.knownBookies.keySet().size());
+        assertEquals("/default-region/default-rack",
+                
unknownRegionPlacement.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+
+        TopologyAwareEnsemblePlacementPolicy region1Placement = 
repp.perRegionPlacement.get("region-1");
+        assertEquals(1, region1Placement.knownBookies.keySet().size());
+        assertEquals("/region-1/default-rack",
+                
region1Placement.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+
+        TopologyAwareEnsemblePlacementPolicy region2Placement = 
repp.perRegionPlacement.get("region-2");
+        assertEquals(1, region2Placement.knownBookies.keySet().size());
+        assertEquals("/region-2/default-rack",
+                
region2Placement.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+
+        TopologyAwareEnsemblePlacementPolicy region3Placement = 
repp.perRegionPlacement.get("region-3");
+        assertEquals(1, region3Placement.knownBookies.keySet().size());
+        assertEquals("/region-3/default-rack",
+                
region3Placement.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+        assertEquals("UnknownRegion", 
repp.address2Region.get(addr1.toBookieId()));
+        assertEquals("region-1", repp.address2Region.get(addr2.toBookieId()));
+        assertEquals("region-2", repp.address2Region.get(addr3.toBookieId()));
+        assertEquals("region-3", repp.address2Region.get(addr4.toBookieId()));
+
+        // 2. addr1 bookie shutdown and decommission
+        addrs.remove(addr1.toBookieId());
+        repp.onClusterChanged(addrs, new HashSet<>());
+
+        assertEquals(3, repp.knownBookies.size());
+        assertNull(repp.knownBookies.get(addr1.toBookieId()));
+        assertEquals("/region-1/default-rack", 
repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+        assertEquals("/region-2/default-rack", 
repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+        assertEquals("/region-3/default-rack", 
repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+        // UnknownRegion,region-1,region-2,region-3
+        assertEquals(4, repp.perRegionPlacement.size());
+        // after addr1 bookie left, it should remove from locally 
address2Region
+        assertNull(repp.address2Region.get(addr1.toBookieId()));
+        assertEquals("region-1", repp.address2Region.get(addr2.toBookieId()));
+        assertEquals("region-2", repp.address2Region.get(addr3.toBookieId()));
+        assertEquals("region-3", repp.address2Region.get(addr4.toBookieId()));
+
+
+        // 3. addr1 bookie start and join
+        addrs.add(addr1.toBookieId());
+        repp.onClusterChanged(addrs, new HashSet<>());
+
+        assertEquals(4, repp.knownBookies.size());
+        assertEquals("/default-region/default-rack", 
repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+        assertEquals("/region-1/default-rack", 
repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
+        assertEquals("/region-2/default-rack", 
repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
+        assertEquals("/region-3/default-rack", 
repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation());
+
+        // UnknownRegion,region-1,region-2,region-3
+        assertEquals(4, repp.perRegionPlacement.size());
+        assertEquals("UnknownRegion", 
repp.address2Region.get(addr1.toBookieId()));
+        // addr1 bookie belongs to unknown region
+        unknownRegionPlacement = repp.perRegionPlacement.get("UnknownRegion");
+        assertEquals(1, unknownRegionPlacement.knownBookies.keySet().size());
+        assertEquals("/default-region/default-rack",
+                
unknownRegionPlacement.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+
+        // 4. Update the correct rack.
+        // change addr1 rack info. /default-region/default-rack -> 
/region-1/default-rack.
+        List<BookieSocketAddress> bookieAddressList = new ArrayList<>();
+        List<String> rackList = new ArrayList<>();
+        bookieAddressList.add(addr1);
+        rackList.add("/region-1/default-rack");
+        // onBookieRackChange
+        StaticDNSResolver.changeRack(bookieAddressList, rackList);
+
+        assertEquals(4, repp.perRegionPlacement.size());
+        // addr1 bookie, oldRegion=default-region, newRegion=region-1
+        assertEquals("region-1", repp.address2Region.get(addr1.toBookieId()));
+
+        unknownRegionPlacement = repp.perRegionPlacement.get("UnknownRegion");
+        assertEquals(0, unknownRegionPlacement.knownBookies.keySet().size());
+        
assertNotNull(unknownRegionPlacement.historyBookies.get(addr1.toBookieId()));
+
+
+        region1Placement = repp.perRegionPlacement.get("region-1");
+        assertEquals(2, region1Placement.knownBookies.keySet().size());
+        assertEquals("/region-1/default-rack",
+                
region1Placement.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
+    }
 }

Reply via email to