This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-4.14 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 6560efd0752243b52cbddd18bd4f732d6aa25fd5 Author: Yan Zhao <[email protected]> AuthorDate: Fri Feb 17 15:09:36 2023 +0800 Fix RegionAwareEnsemblePlacementPolicy.newEnsemble sometimes failed problem. (#3725) Descriptions of the changes in this PR: Fixes #3722 See [#3722](https://github.com/apache/bookkeeper/issues/3722#issuecomment-1369859251) (cherry picked from commit 2381d9b65a37b6e58ffc9c7be2aba35fd37874b7) --- .../RackawareEnsemblePlacementPolicyImpl.java | 3 +- .../ZoneawareEnsemblePlacementPolicyImpl.java | 2 +- .../apache/bookkeeper/net/NetworkTopologyImpl.java | 7 +- .../TestRackawareEnsemblePlacementPolicy.java | 124 +++++++++++++++++++++ .../TestRegionAwareEnsemblePlacementPolicy.java | 61 ++++++++++ 5 files changed, 192 insertions(+), 5 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java index 3299f70406..86ed8b9ef3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java @@ -430,13 +430,14 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP curRack = curRack + NetworkTopologyImpl.NODE_SEPARATOR + prevNode.getNetworkLocation(); } } + boolean firstBookieInTheEnsemble = (null == prevNode); try { prevNode = selectRandomFromRack(curRack, excludeNodes, ensemble, ensemble); } catch (BKNotEnoughBookiesException e) { if (!curRack.equals(NodeBase.ROOT)) { curRack = NodeBase.ROOT; prevNode = selectFromNetworkLocation(curRack, excludeNodes, ensemble, ensemble, - !enforceMinNumRacksPerWriteQuorum || prevNode == null); + !enforceMinNumRacksPerWriteQuorum || firstBookieInTheEnsemble); } else { throw e; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java index 61b81ed303..d4bde8f326 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ZoneawareEnsemblePlacementPolicyImpl.java @@ -618,7 +618,7 @@ public class ZoneawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP if (excludeZones.isEmpty()) { return ""; } - StringBuilder excludedZonesString = new StringBuilder("~"); + StringBuilder excludedZonesString = new StringBuilder(NetworkTopologyImpl.INVERSE); boolean firstZone = true; for (String excludeZone : excludeZones) { if (!firstZone) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java index 123f13ca8a..891d71578f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java @@ -45,6 +45,7 @@ public class NetworkTopologyImpl implements NetworkTopology { public static final int DEFAULT_HOST_LEVEL = 2; public static final Logger LOG = LoggerFactory.getLogger(NetworkTopologyImpl.class); public static final String NODE_SEPARATOR = ","; + public static final String INVERSE = "~"; /** * A marker for an InvalidTopology Exception. @@ -707,7 +708,7 @@ public class NetworkTopologyImpl implements NetworkTopology { public Node chooseRandom(String scope) { netlock.readLock().lock(); try { - if (scope.startsWith("~")) { + if (scope.startsWith(INVERSE)) { return chooseRandom(NodeBase.ROOT, scope.substring(1)); } else { return chooseRandom(scope, null); @@ -773,7 +774,7 @@ public class NetworkTopologyImpl implements NetworkTopology { public Set<Node> getLeaves(String scope) { netlock.readLock().lock(); try { - if (scope.startsWith("~")) { + if (scope.startsWith(INVERSE)) { Set<Node> allNodes = doGetLeaves(NodeBase.ROOT); String[] excludeScopes = scope.substring(1).split(NODE_SEPARATOR); Set<Node> excludeNodes = new HashSet<Node>(); @@ -793,7 +794,7 @@ public class NetworkTopologyImpl implements NetworkTopology { @Override public int countNumOfAvailableNodes(String scope, Collection<Node> excludedNodes) { boolean isExcluded = false; - if (scope.startsWith("~")) { + if (scope.startsWith(INVERSE)) { isExcluded = true; scope = scope.substring(1); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java index 9d970c79bc..e23882c015 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java @@ -1462,6 +1462,130 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase { } } + //see: https://github.com/apache/bookkeeper/issues/3722 + @Test + public void testNewEnsembleWithMultipleRacksWithCommonRack() throws Exception { + ClientConfiguration clientConf = new ClientConfiguration(conf); + clientConf.setEnforceMinNumRacksPerWriteQuorum(true); + clientConf.setMinNumRacksPerWriteQuorum(3); + repp.uninitalize(); + repp = new RackawareEnsemblePlacementPolicy(); + repp.initialize(clientConf, Optional.<DNSToSwitchMapping>empty(), timer, + DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); + + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181); + BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181); + BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.6", 3181); + BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.7", 3181); + BookieSocketAddress addr8 = new BookieSocketAddress("127.0.0.8", 3181); + BookieSocketAddress addr9 = new BookieSocketAddress("127.0.0.9", 3181); + BookieSocketAddress addr10 = new BookieSocketAddress("127.0.0.10", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr6.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr7.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr8.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr9.getHostName(), "/default-region/r2"); + StaticDNSResolver.addNodeToRack(addr10.getHostName(), "/default-region/r3"); + // Update cluster + Set<BookieId> addrs = new HashSet<BookieId>(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + addrs.add(addr5.toBookieId()); + addrs.add(addr6.toBookieId()); + addrs.add(addr7.toBookieId()); + addrs.add(addr8.toBookieId()); + addrs.add(addr9.toBookieId()); + addrs.add(addr10.toBookieId()); + repp.onClusterChanged(addrs, new HashSet<BookieId>()); + + try { + int ensembleSize = 10; + int writeQuorumSize = 10; + int ackQuorumSize = 2; + + for (int i = 0; i < 50; ++i) { + Set<BookieId> excludeBookies = new HashSet<>(); + EnsemblePlacementPolicy.PlacementResult<List<BookieId>> ensembleResponse = + repp.newEnsemble(ensembleSize, writeQuorumSize, + ackQuorumSize, null, excludeBookies); + } + } catch (Exception e) { + fail("Can not new ensemble selection succeed"); + } + } + + @Test + public void testNewEnsembleWithMultipleRacksWithCommonRackFailed() throws Exception { + ClientConfiguration clientConf = new ClientConfiguration(conf); + clientConf.setEnforceMinNumRacksPerWriteQuorum(true); + clientConf.setMinNumRacksPerWriteQuorum(3); + repp.uninitalize(); + repp = new RackawareEnsemblePlacementPolicy(); + repp.initialize(clientConf, Optional.<DNSToSwitchMapping>empty(), timer, + DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); + + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181); + BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181); + BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.6", 3181); + BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.7", 3181); + BookieSocketAddress addr8 = new BookieSocketAddress("127.0.0.8", 3181); + BookieSocketAddress addr9 = new BookieSocketAddress("127.0.0.9", 3181); + BookieSocketAddress addr10 = new BookieSocketAddress("127.0.0.10", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr6.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr7.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr8.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr9.getHostName(), "/default-region/r1"); + StaticDNSResolver.addNodeToRack(addr10.getHostName(), "/default-region/r2"); + // Update cluster + Set<BookieId> addrs = new HashSet<BookieId>(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + addrs.add(addr5.toBookieId()); + addrs.add(addr6.toBookieId()); + addrs.add(addr7.toBookieId()); + addrs.add(addr8.toBookieId()); + addrs.add(addr9.toBookieId()); + addrs.add(addr10.toBookieId()); + repp.onClusterChanged(addrs, new HashSet<BookieId>()); + + try { + int ensembleSize = 10; + int writeQuorumSize = 10; + int ackQuorumSize = 2; + + Set<BookieId> excludeBookies = new HashSet<>(); + EnsemblePlacementPolicy.PlacementResult<List<BookieId>> ensembleResponse = + repp.newEnsemble(ensembleSize, writeQuorumSize, + ackQuorumSize, null, excludeBookies); + fail("Can not new ensemble selection succeed"); + } catch (Exception e) { + assertTrue(e instanceof BKNotEnoughBookiesException); + } + } + @Test public void testNewEnsembleWithPickDifferentRack() throws Exception { BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java index 52b9a8a63b..ba9c4f862f 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java @@ -705,6 +705,67 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase { } } + @Test + public void testNewEnsembleWithMultipleRacksWithCommonRack() throws Exception { + ClientConfiguration clientConf = new ClientConfiguration(conf); + clientConf.setEnforceMinNumRacksPerWriteQuorum(true); + clientConf.setMinNumRacksPerWriteQuorum(3); + repp.uninitalize(); + repp = new RegionAwareEnsemblePlacementPolicy(); + repp.initialize(clientConf, Optional.<DNSToSwitchMapping>empty(), timer, + DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181); + BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181); + BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.6", 3181); + BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.7", 3181); + BookieSocketAddress addr8 = new BookieSocketAddress("127.0.0.8", 3181); + BookieSocketAddress addr9 = new BookieSocketAddress("127.0.0.9", 3181); + BookieSocketAddress addr10 = new BookieSocketAddress("127.0.0.10", 3181); + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/r1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region1/r1"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region1/r1"); + StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region1/r1"); + StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/region1/r1"); + StaticDNSResolver.addNodeToRack(addr6.getHostName(), "/region1/r1"); + StaticDNSResolver.addNodeToRack(addr7.getHostName(), "/region1/r2"); + StaticDNSResolver.addNodeToRack(addr8.getHostName(), "/region1/r3"); + StaticDNSResolver.addNodeToRack(addr9.getHostName(), "/region2/r1"); + StaticDNSResolver.addNodeToRack(addr10.getHostName(), "/region3/r1"); + // Update cluster + Set<BookieId> addrs = new HashSet<BookieId>(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + addrs.add(addr5.toBookieId()); + addrs.add(addr6.toBookieId()); + addrs.add(addr7.toBookieId()); + addrs.add(addr8.toBookieId()); + addrs.add(addr9.toBookieId()); + addrs.add(addr10.toBookieId()); + repp.onClusterChanged(addrs, new HashSet<BookieId>()); + + try { + int ensembleSize = 10; + int writeQuorumSize = 10; + int ackQuorumSize = 2; + + for (int i = 0; i < 50; ++i) { + Set<BookieId> excludeBookies = new HashSet<>(); + EnsemblePlacementPolicy.PlacementResult<List<BookieId>> ensembleResponse = + repp.newEnsemble(ensembleSize, writeQuorumSize, + ackQuorumSize, null, excludeBookies); + } + } catch (Exception e) { + fail("RegionAwareEnsemblePlacementPolicy should newEnsemble succeed."); + } + } + @Test public void testNewEnsembleWithThreeRegions() throws Exception { repp.uninitalize();
