This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new abb5981676 Rackaware placement policy support local node awareness by 
hostname (#4057)
abb5981676 is described below

commit abb5981676a870c2d3b17b62d5b96f3a3f4a4d55
Author: Hang Chen <[email protected]>
AuthorDate: Wed Sep 20 15:14:25 2023 +0800

    Rackaware placement policy support local node awareness by hostname (#4057)
    
    ### Motivation
    Rack-aware placement policies enable preference for bookies that reside in 
the same rack as the bookie client.
    - Initiate local node by resolving the rack information
    
https://github.com/apache/bookkeeper/blob/5b5c05331757e7356579076970e61f119f5d34ae/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java#L207-L215
    - When generating new ensembles for a ledger, the selecting algorithm will 
set the localNode's rack to `curRack` and select one bookie from `curRack` first
    
https://github.com/apache/bookkeeper/blob/5b5c05331757e7356579076970e61f119f5d34ae/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java#L420-L426
    
    However, when resolving the local node's rack information, we use IP to 
resolve the rack name, which is unfriendly with k8s deployment.
    
https://github.com/apache/bookkeeper/blob/5b5c05331757e7356579076970e61f119f5d34ae/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java#L209
    
    In k8s deployment, we usually use the hostname as bookieId and Pulsar 
broker name instead of IP, because the IP will be changed when the pods 
migrated to other nodes.
    
    ### Modification
    In order not to bring break change to the current behavior, I introduced a 
flag `useHostnameResolveLocalNodePlacementPolicy` in the BookKeeper client 
configuration to control whether to use the hostname to resolve the bookie 
client's local node rack information. The flag is `false` by default, which is 
the same behavior as the current logic.
    
    Due to this PR doesn't introduce any break changes, I think we can 
cherry-pick it back to the patch releases (branch-4.14, branch-4.15 and 
branch-4.16)
---
 .../client/RackawareEnsemblePlacementPolicy.java   | 30 ++++++--
 .../RackawareEnsemblePlacementPolicyImpl.java      | 43 ++++++++++-
 .../client/RegionAwareEnsemblePlacementPolicy.java |  9 ++-
 .../bookkeeper/conf/ClientConfiguration.java       | 19 +++++
 .../TestRackawareEnsemblePlacementPolicy.java      | 84 ++++++++++++++++++++++
 5 files changed, 176 insertions(+), 9 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index 72858f188f..1fb17ca3ef 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -58,20 +58,40 @@ public class RackawareEnsemblePlacementPolicy extends 
RackawareEnsemblePlacement
                                                           boolean 
enforceMinNumRacksPerWriteQuorum,
                                                           boolean 
ignoreLocalNodeInPlacementPolicy,
             StatsLogger statsLogger, BookieAddressResolver 
bookieAddressResolver) {
+        return initialize(dnsResolver, timer, reorderReadsRandom, 
stabilizePeriodSeconds,
+            reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, 
minNumRacksPerWriteQuorum,
+            enforceMinNumRacksPerWriteQuorum, 
ignoreLocalNodeInPlacementPolicy, false,
+            statsLogger, bookieAddressResolver);
+    }
+
+    @Override
+    protected RackawareEnsemblePlacementPolicy initialize(DNSToSwitchMapping 
dnsResolver,
+                                                          HashedWheelTimer 
timer,
+                                                          boolean 
reorderReadsRandom,
+                                                          int 
stabilizePeriodSeconds,
+                                                          int 
reorderThresholdPendingRequests,
+                                                          boolean isWeighted,
+                                                          int 
maxWeightMultiple,
+                                                          int 
minNumRacksPerWriteQuorum,
+                                                          boolean 
enforceMinNumRacksPerWriteQuorum,
+                                                          boolean 
ignoreLocalNodeInPlacementPolicy,
+                                                          boolean 
useHostnameResolveLocalNodePlacementPolicy,
+            StatsLogger statsLogger, BookieAddressResolver 
bookieAddressResolver) {
         if (stabilizePeriodSeconds > 0) {
             super.initialize(dnsResolver, timer, reorderReadsRandom, 0, 
reorderThresholdPendingRequests, isWeighted,
                     maxWeightMultiple, minNumRacksPerWriteQuorum, 
enforceMinNumRacksPerWriteQuorum,
-                    ignoreLocalNodeInPlacementPolicy, statsLogger, 
bookieAddressResolver);
+                    ignoreLocalNodeInPlacementPolicy, 
useHostnameResolveLocalNodePlacementPolicy,
+                statsLogger, bookieAddressResolver);
             slave = new 
RackawareEnsemblePlacementPolicyImpl(enforceDurability);
             slave.initialize(dnsResolver, timer, reorderReadsRandom, 
stabilizePeriodSeconds,
                     reorderThresholdPendingRequests, isWeighted, 
maxWeightMultiple, minNumRacksPerWriteQuorum,
-                    enforceMinNumRacksPerWriteQuorum, 
ignoreLocalNodeInPlacementPolicy, statsLogger,
-                    bookieAddressResolver);
+                    enforceMinNumRacksPerWriteQuorum, 
ignoreLocalNodeInPlacementPolicy,
+                    useHostnameResolveLocalNodePlacementPolicy, statsLogger, 
bookieAddressResolver);
         } else {
             super.initialize(dnsResolver, timer, reorderReadsRandom, 
stabilizePeriodSeconds,
                     reorderThresholdPendingRequests, isWeighted, 
maxWeightMultiple, minNumRacksPerWriteQuorum,
-                    enforceMinNumRacksPerWriteQuorum, 
ignoreLocalNodeInPlacementPolicy, statsLogger,
-                    bookieAddressResolver);
+                    enforceMinNumRacksPerWriteQuorum, 
ignoreLocalNodeInPlacementPolicy,
+                    useHostnameResolveLocalNodePlacementPolicy, statsLogger, 
bookieAddressResolver);
             slave = null;
         }
         return this;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 6ec9e5b158..7f219854ed 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -90,6 +90,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
     protected int minNumRacksPerWriteQuorum;
     protected boolean enforceMinNumRacksPerWriteQuorum;
     protected boolean ignoreLocalNodeInPlacementPolicy;
+    protected boolean useHostnameResolveLocalNodePlacementPolicy;
 
     public static final String REPP_RANDOM_READ_REORDERING = 
"ensembleRandomReadReordering";
 
@@ -144,6 +145,41 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
         topology = new NetworkTopologyImpl();
     }
 
+    /**
+     * Initialize the policy.
+     *
+     * @param dnsResolver
+     * @param timer
+     * @param reorderReadsRandom
+     * @param stabilizePeriodSeconds
+     * @param reorderThresholdPendingRequests
+     * @param isWeighted
+     * @param maxWeightMultiple
+     * @param minNumRacksPerWriteQuorum
+     * @param enforceMinNumRacksPerWriteQuorum
+     * @param ignoreLocalNodeInPlacementPolicy
+     * @param statsLogger
+     * @param bookieAddressResolver
+     * @return initialized ensemble placement policy
+     */
+    protected RackawareEnsemblePlacementPolicyImpl 
initialize(DNSToSwitchMapping dnsResolver,
+                                                              HashedWheelTimer 
timer,
+                                                              boolean 
reorderReadsRandom,
+                                                              int 
stabilizePeriodSeconds,
+                                                              int 
reorderThresholdPendingRequests,
+                                                              boolean 
isWeighted,
+                                                              int 
maxWeightMultiple,
+                                                              int 
minNumRacksPerWriteQuorum,
+                                                              boolean 
enforceMinNumRacksPerWriteQuorum,
+                                                              boolean 
ignoreLocalNodeInPlacementPolicy,
+                                                              StatsLogger 
statsLogger,
+                                                              
BookieAddressResolver bookieAddressResolver) {
+        return initialize(dnsResolver, timer, reorderReadsRandom, 
stabilizePeriodSeconds,
+            reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, 
minNumRacksPerWriteQuorum,
+            enforceMinNumRacksPerWriteQuorum, ignoreLocalNodeInPlacementPolicy,
+            false, statsLogger, bookieAddressResolver);
+    }
+
     /**
      * Initialize the policy.
      *
@@ -160,6 +196,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
                                                               int 
minNumRacksPerWriteQuorum,
                                                               boolean 
enforceMinNumRacksPerWriteQuorum,
                                                               boolean 
ignoreLocalNodeInPlacementPolicy,
+                                                              boolean 
useHostnameResolveLocalNodePlacementPolicy,
                                                               StatsLogger 
statsLogger,
                                                               
BookieAddressResolver bookieAddressResolver) {
         checkNotNull(statsLogger, "statsLogger should not be null, use 
NullStatsLogger instead.");
@@ -195,6 +232,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
         this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum;
         this.enforceMinNumRacksPerWriteQuorum = 
enforceMinNumRacksPerWriteQuorum;
         this.ignoreLocalNodeInPlacementPolicy = 
ignoreLocalNodeInPlacementPolicy;
+        this.useHostnameResolveLocalNodePlacementPolicy = 
useHostnameResolveLocalNodePlacementPolicy;
 
         // create the network topology
         if (stabilizePeriodSeconds > 0) {
@@ -206,7 +244,9 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
         BookieNode bn = null;
         if (!ignoreLocalNodeInPlacementPolicy) {
             try {
-                bn = 
createDummyLocalBookieNode(InetAddress.getLocalHost().getHostAddress());
+                String hostname = useHostnameResolveLocalNodePlacementPolicy
+                    ? InetAddress.getLocalHost().getCanonicalHostName() : 
InetAddress.getLocalHost().getHostAddress();
+                bn = createDummyLocalBookieNode(hostname);
             } catch (IOException e) {
                 LOG.error("Failed to get local host address : ", e);
             }
@@ -303,6 +343,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
                 conf.getMinNumRacksPerWriteQuorum(),
                 conf.getEnforceMinNumRacksPerWriteQuorum(),
                 conf.getIgnoreLocalNodeInPlacementPolicy(),
+                conf.getUseHostnameResolveLocalNodePlacementPolicy(),
                 statsLogger,
                 bookieAddressResolver);
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index 43969b8fde..5fcfd0f94f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -140,7 +140,8 @@ public class RegionAwareEnsemblePlacementPolicy extends 
RackawareEnsemblePlaceme
                         .initialize(dnsResolver, timer, 
this.reorderReadsRandom, this.stabilizePeriodSeconds,
                                 this.reorderThresholdPendingRequests, 
this.isWeighted, this.maxWeightMultiple,
                                 this.minNumRacksPerWriteQuorum, 
this.enforceMinNumRacksPerWriteQuorum,
-                                this.ignoreLocalNodeInPlacementPolicy, 
statsLogger, bookieAddressResolver)
+                                this.ignoreLocalNodeInPlacementPolicy,
+                                
this.useHostnameResolveLocalNodePlacementPolicy, statsLogger, 
bookieAddressResolver)
                         
.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
             }
 
@@ -201,7 +202,8 @@ public class RegionAwareEnsemblePlacementPolicy extends 
RackawareEnsemblePlaceme
                                                 this.stabilizePeriodSeconds, 
this.reorderThresholdPendingRequests,
                                                 this.isWeighted, 
this.maxWeightMultiple,
                                                 
this.minNumRacksPerWriteQuorum, this.enforceMinNumRacksPerWriteQuorum,
-                                                
this.ignoreLocalNodeInPlacementPolicy, statsLogger,
+                                                
this.ignoreLocalNodeInPlacementPolicy,
+                                                
this.useHostnameResolveLocalNodePlacementPolicy, statsLogger,
                                                 bookieAddressResolver)
                                         
.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
                                 perRegionPlacement.put(newRegion, 
newRegionPlacement);
@@ -242,7 +244,8 @@ public class RegionAwareEnsemblePlacementPolicy extends 
RackawareEnsemblePlaceme
                         .initialize(dnsResolver, timer, 
this.reorderReadsRandom, this.stabilizePeriodSeconds,
                                 this.reorderThresholdPendingRequests, 
this.isWeighted, this.maxWeightMultiple,
                                 this.minNumRacksPerWriteQuorum, 
this.enforceMinNumRacksPerWriteQuorum,
-                                this.ignoreLocalNodeInPlacementPolicy, 
statsLogger, bookieAddressResolver)
+                                this.ignoreLocalNodeInPlacementPolicy, 
this.ignoreLocalNodeInPlacementPolicy,
+                                statsLogger, bookieAddressResolver)
                         
.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
             }
             minRegionsForDurability = 
conf.getInt(REPP_MINIMUM_REGIONS_FOR_DURABILITY,
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 8aaa8bbff4..297a2f62f4 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -161,6 +161,9 @@ public class ClientConfiguration extends 
AbstractConfiguration<ClientConfigurati
     protected static final String ENSEMBLE_PLACEMENT_POLICY_ORDER_SLOW_BOOKIES 
=
         "ensemblePlacementPolicyOrderSlowBookies";
     protected static final String BOOKIE_ADDRESS_RESOLVER_ENABLED = 
"bookieAddressResolverEnabled";
+    // Use hostname to resolve local placement info
+    public static final String 
USE_HOSTNAME_RESOLVE_LOCAL_NODE_PLACEMENT_POLICY =
+        "useHostnameResolveLocalNodePlacementPolicy";
 
     // Stats
     protected static final String ENABLE_TASK_EXECUTION_STATS = 
"enableTaskExecutionStats";
@@ -1314,6 +1317,22 @@ public class ClientConfiguration extends 
AbstractConfiguration<ClientConfigurati
         return this;
     }
 
+    /**
+     * Set the flag to use hostname to resolve local node placement policy.
+     * @param useHostnameResolveLocalNodePlacementPolicy
+     */
+    public void setUseHostnameResolveLocalNodePlacementPolicy(boolean 
useHostnameResolveLocalNodePlacementPolicy) {
+        setProperty(USE_HOSTNAME_RESOLVE_LOCAL_NODE_PLACEMENT_POLICY, 
useHostnameResolveLocalNodePlacementPolicy);
+    }
+
+    /**
+     * Get whether to use hostname to resolve local node placement policy.
+     * @return
+     */
+    public boolean getUseHostnameResolveLocalNodePlacementPolicy() {
+        return getBoolean(USE_HOSTNAME_RESOLVE_LOCAL_NODE_PLACEMENT_POLICY, 
false);
+    }
+
     /**
      * Whether to enable recording task execution stats.
      *
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index 95a7d5b40d..ed37159ee1 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -1689,6 +1689,90 @@ public class TestRackawareEnsemblePlacementPolicy 
extends TestCase {
         }
     }
 
+    @Test
+    public void testNewEnsemblePickLocalRackBookiesByHostname() throws 
Exception {
+        testNewEnsemblePickLocalRackBookiesInternal(true);
+    }
+
+    @Test
+    public void testNewEnsemblePickLocalRackBookiesByIP() throws Exception {
+        testNewEnsemblePickLocalRackBookiesInternal(false);
+    }
+
+    public void testNewEnsemblePickLocalRackBookiesInternal(boolean 
useHostnameResolveLocalNodePlacementPolicy)
+        throws Exception {
+        BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
+        BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
+        BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
+        BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
+        BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181);
+        BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.6", 3181);
+        BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.7", 3181);
+
+        // update dns mapping
+        StaticDNSResolver.addNodeToRack(addr1.getHostName(), 
"/default-region/r1");
+        StaticDNSResolver.addNodeToRack(addr2.getHostName(), 
"/default-region/r2");
+        StaticDNSResolver.addNodeToRack(addr3.getHostName(), 
"/default-region/r2");
+        StaticDNSResolver.addNodeToRack(addr4.getHostName(), 
"/default-region/r2");
+        StaticDNSResolver.addNodeToRack(addr5.getHostName(), 
"/default-region/r3");
+        StaticDNSResolver.addNodeToRack(addr6.getHostName(), 
"/default-region/r4");
+        StaticDNSResolver.addNodeToRack(addr7.getHostName(), 
"/default-region/r5");
+
+        String hostname = useHostnameResolveLocalNodePlacementPolicy
+            ? InetAddress.getLocalHost().getCanonicalHostName() : 
InetAddress.getLocalHost().getHostAddress();
+        StaticDNSResolver.addNodeToRack(hostname, "/default-region/r1");
+        if (useHostnameResolveLocalNodePlacementPolicy) {
+            
conf.setUseHostnameResolveLocalNodePlacementPolicy(useHostnameResolveLocalNodePlacementPolicy);
+        }
+
+        repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
+            DISABLE_ALL, NullStatsLogger.INSTANCE, 
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+        repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+        // Update cluster
+        Set<BookieId> addrs = new HashSet<BookieId>();
+        addrs.add(addr1.toBookieId());
+        addrs.add(addr2.toBookieId());
+        addrs.add(addr3.toBookieId());
+        addrs.add(addr4.toBookieId());
+        addrs.add(addr5.toBookieId());
+        addrs.add(addr6.toBookieId());
+        addrs.add(addr7.toBookieId());
+        repp.onClusterChanged(addrs, new HashSet<BookieId>());
+
+        int ensembleSize = 3;
+        int writeQuorumSize = 3;
+        int ackQuorumSize = 2;
+
+        Set<BookieId> excludeBookies = new HashSet<>();
+
+        for (int i = 0; i < 50000; ++i) {
+            EnsemblePlacementPolicy.PlacementResult<List<BookieId>> 
ensembleResponse =
+                repp.newEnsemble(ensembleSize, writeQuorumSize,
+                    ackQuorumSize, null, excludeBookies);
+            List<BookieId> ensemble = ensembleResponse.getResult();
+            if (!ensemble.contains(addr1.toBookieId())) {
+                fail("Failed to select bookie located on the same rack with 
bookie client");
+            }
+            if (ensemble.contains(addr2.toBookieId()) && 
ensemble.contains(addr3.toBookieId())) {
+                fail("addr2 and addr3 is same rack.");
+            }
+        }
+
+        //addr4 shutdown.
+        addrs.remove(addr5.toBookieId());
+        repp.onClusterChanged(addrs, new HashSet<BookieId>());
+        for (int i = 0; i < 50000; ++i) {
+            EnsemblePlacementPolicy.PlacementResult<List<BookieId>> 
ensembleResponse =
+                repp.newEnsemble(ensembleSize, writeQuorumSize,
+                    ackQuorumSize, null, excludeBookies);
+            List<BookieId> ensemble = ensembleResponse.getResult();
+            if (!ensemble.contains(addr1.toBookieId())) {
+                fail("Failed to select bookie located on the same rack with 
bookie client");
+            }
+        }
+
+    }
+
     @Test
     public void testMinNumRacksPerWriteQuorumOfRacks() throws Exception {
         int numOfRacksToCreate = 6;

Reply via email to