This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 0376bdce03 Optimize reorderReadSequence to check WriteSet instead of
entire Ensemble (#4478)
0376bdce03 is described below
commit 0376bdce03257def64b95b787e466dac6a9de588
Author: Shogo Takayama <[email protected]>
AuthorDate: Wed Nov 13 17:12:29 2024 +0900
Optimize reorderReadSequence to check WriteSet instead of entire Ensemble
(#4478)
---
.../RackawareEnsemblePlacementPolicyImpl.java | 5 +--
.../TestRackawareEnsemblePlacementPolicy.java | 39 ++++++++++++++++++++++
2 files changed, 42 insertions(+), 2 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index da7fe22adb..be90ceab94 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -920,8 +920,9 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
if (useRegionAware || reorderReadsRandom) {
isAnyBookieUnavailable = true;
} else {
- for (int i = 0; i < ensemble.size(); i++) {
- BookieId bookieAddr = ensemble.get(i);
+ for (int i = 0; i < writeSet.size(); i++) {
+ int idx = writeSet.get(i);
+ BookieId bookieAddr = ensemble.get(idx);
if ((!knownBookies.containsKey(bookieAddr) &&
!readOnlyBookies.contains(bookieAddr))
|| slowBookies.getIfPresent(bookieAddr) != null) {
// Found at least one bookie not available in the
ensemble, or in slowBookies
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index d1e2cd5323..cd76cdd1c6 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -64,6 +64,7 @@ import org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
import org.apache.bookkeeper.util.StaticDNSResolver;
import org.apache.commons.collections4.CollectionUtils;
@@ -2359,6 +2360,44 @@ public class TestRackawareEnsemblePlacementPolicy
extends TestCase {
StaticDNSResolver.reset();
}
+ @Test
+ public void testSlowBookieInEnsembleOnly() throws Exception {
+ repp.uninitalize();
+ updateMyRack("/r1/rack1");
+
+ TestStatsProvider statsProvider = new TestStatsProvider();
+ TestStatsLogger statsLogger = statsProvider.getStatsLogger("");
+
+ repp = new RackawareEnsemblePlacementPolicy();
+ repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
+ DISABLE_ALL, statsLogger,
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+ repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+ TestOpStatsLogger readRequestsReorderedCounter = (TestOpStatsLogger)
statsLogger
+ .getOpStatsLogger(BookKeeperClientStats.READ_REQUESTS_REORDERED);
+
+ // Update cluster
+ Set<BookieId> addrs = new HashSet<BookieId>();
+ addrs.add(addr1.toBookieId());
+ addrs.add(addr2.toBookieId());
+ addrs.add(addr3.toBookieId());
+ addrs.add(addr4.toBookieId());
+ repp.onClusterChanged(addrs, new HashSet<BookieId>());
+ repp.registerSlowBookie(addr1.toBookieId(), 0L);
+ Map<BookieId, Long> bookiePendingMap = new HashMap<>();
+ bookiePendingMap.put(addr1.toBookieId(), 1L);
+ repp.onClusterChanged(addrs, new HashSet<>());
+
+ DistributionSchedule.WriteSet writeSet = writeSetFromValues(1, 2, 3);
+
+ DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
+ ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap),
writeSet);
+
+ // If the slow bookie is only present in the ensemble, no reordering
occurs.
+ assertEquals(writeSet, reorderSet);
+ assertEquals(0, readRequestsReorderedCounter.getSuccessCount());
+ }
+
@Test
public void testReplaceNotAvailableBookieWithDefaultRack() throws
Exception {
repp.uninitalize();