This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.15
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit b445bfc985df9fdc9cc2fe92aae7006b1a45348a
Author: Shogo Takayama <[email protected]>
AuthorDate: Wed Nov 13 17:12:29 2024 +0900

    Optimize reorderReadSequence to check WriteSet instead of entire Ensemble 
(#4478)
    
    (cherry picked from commit 0376bdce03257def64b95b787e466dac6a9de588)
---
 .../RackawareEnsemblePlacementPolicyImpl.java      |  5 +--
 .../TestRackawareEnsemblePlacementPolicy.java      | 39 ++++++++++++++++++++++
 2 files changed, 42 insertions(+), 2 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 161fadd3c5..a47b23f5b3 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -865,8 +865,9 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
         if (useRegionAware || reorderReadsRandom) {
             isAnyBookieUnavailable = true;
         } else {
-            for (int i = 0; i < ensemble.size(); i++) {
-                BookieId bookieAddr = ensemble.get(i);
+            for (int i = 0; i < writeSet.size(); i++) {
+                int idx = writeSet.get(i);
+                BookieId bookieAddr = ensemble.get(idx);
                 if ((!knownBookies.containsKey(bookieAddr) && 
!readOnlyBookies.contains(bookieAddr))
                     || slowBookies.getIfPresent(bookieAddr) != null) {
                     // Found at least one bookie not available in the 
ensemble, or in slowBookies
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index 4865ac09d8..4ce3982cea 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -60,6 +60,7 @@ import org.apache.bookkeeper.proto.BookieAddressResolver;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
 import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
 import org.apache.bookkeeper.util.StaticDNSResolver;
 import org.apache.commons.collections4.CollectionUtils;
@@ -2267,6 +2268,44 @@ public class TestRackawareEnsemblePlacementPolicy 
extends TestCase {
         StaticDNSResolver.reset();
     }
 
+    @Test
+    public void testSlowBookieInEnsembleOnly() throws Exception {
+        repp.uninitalize();
+        updateMyRack("/r1/rack1");
+
+        TestStatsProvider statsProvider = new TestStatsProvider();
+        TestStatsLogger statsLogger = statsProvider.getStatsLogger("");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
+            DISABLE_ALL, statsLogger, 
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+        TestOpStatsLogger readRequestsReorderedCounter = (TestOpStatsLogger) 
statsLogger
+            .getOpStatsLogger(BookKeeperClientStats.READ_REQUESTS_REORDERED);
+
+        // Update cluster
+        Set<BookieId> addrs = new HashSet<BookieId>();
+        addrs.add(addr1.toBookieId());
+        addrs.add(addr2.toBookieId());
+        addrs.add(addr3.toBookieId());
+        addrs.add(addr4.toBookieId());
+        repp.onClusterChanged(addrs, new HashSet<BookieId>());
+        repp.registerSlowBookie(addr1.toBookieId(), 0L);
+        Map<BookieId, Long> bookiePendingMap = new HashMap<>();
+        bookiePendingMap.put(addr1.toBookieId(), 1L);
+        repp.onClusterChanged(addrs, new HashSet<>());
+
+        DistributionSchedule.WriteSet writeSet = writeSetFromValues(1, 2, 3);
+
+        DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+            ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), 
writeSet);
+
+        // If the slow bookie is only present in the ensemble, no reordering 
occurs.
+        assertEquals(writeSet, reorderSet);
+        assertEquals(0, readRequestsReorderedCounter.getSuccessCount());
+    }
+
     @Test
     public void testReplaceNotAvailableBookieWithDefaultRack() throws 
Exception {
         repp.uninitalize();

Reply via email to