This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.15 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit b445bfc985df9fdc9cc2fe92aae7006b1a45348a Author: Shogo Takayama <[email protected]> AuthorDate: Wed Nov 13 17:12:29 2024 +0900 Optimize reorderReadSequence to check WriteSet instead of entire Ensemble (#4478) (cherry picked from commit 0376bdce03257def64b95b787e466dac6a9de588) --- .../RackawareEnsemblePlacementPolicyImpl.java | 5 +-- .../TestRackawareEnsemblePlacementPolicy.java | 39 ++++++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java index 161fadd3c5..a47b23f5b3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java @@ -865,8 +865,9 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP if (useRegionAware || reorderReadsRandom) { isAnyBookieUnavailable = true; } else { - for (int i = 0; i < ensemble.size(); i++) { - BookieId bookieAddr = ensemble.get(i); + for (int i = 0; i < writeSet.size(); i++) { + int idx = writeSet.get(i); + BookieId bookieAddr = ensemble.get(idx); if ((!knownBookies.containsKey(bookieAddr) && !readOnlyBookies.contains(bookieAddr)) || slowBookies.getIfPresent(bookieAddr) != null) { // Found at least one bookie not available in the ensemble, or in slowBookies diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java index 4865ac09d8..4ce3982cea 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java @@ -60,6 +60,7 @@ import org.apache.bookkeeper.proto.BookieAddressResolver; import org.apache.bookkeeper.stats.Gauge; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.test.TestStatsProvider; +import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger; import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger; import org.apache.bookkeeper.util.StaticDNSResolver; import org.apache.commons.collections4.CollectionUtils; @@ -2267,6 +2268,44 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase { StaticDNSResolver.reset(); } + @Test + public void testSlowBookieInEnsembleOnly() throws Exception { + repp.uninitalize(); + updateMyRack("/r1/rack1"); + + TestStatsProvider statsProvider = new TestStatsProvider(); + TestStatsLogger statsLogger = statsProvider.getStatsLogger(""); + + repp = new RackawareEnsemblePlacementPolicy(); + repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, + DISABLE_ALL, statsLogger, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); + + TestOpStatsLogger readRequestsReorderedCounter = (TestOpStatsLogger) statsLogger + .getOpStatsLogger(BookKeeperClientStats.READ_REQUESTS_REORDERED); + + // Update cluster + Set<BookieId> addrs = new HashSet<BookieId>(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + repp.onClusterChanged(addrs, new HashSet<BookieId>()); + repp.registerSlowBookie(addr1.toBookieId(), 0L); + Map<BookieId, Long> bookiePendingMap = new HashMap<>(); + bookiePendingMap.put(addr1.toBookieId(), 1L); + repp.onClusterChanged(addrs, new HashSet<>()); + + DistributionSchedule.WriteSet writeSet = writeSetFromValues(1, 2, 3); + + DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence( + ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet); + + // If the slow bookie is only present in the ensemble, no reordering occurs. + assertEquals(writeSet, reorderSet); + assertEquals(0, readRequestsReorderedCounter.getSuccessCount()); + } + @Test public void testReplaceNotAvailableBookieWithDefaultRack() throws Exception { repp.uninitalize();
