This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new 7dce3b335d0 HBASE-29933: update_all_config hangs indefinitely when
balancing event is in progress(#7932)
7dce3b335d0 is described below
commit 7dce3b335d07f5317671362dd4f10de776015e93
Author: Dev Hingu <[email protected]>
AuthorDate: Tue Mar 24 15:39:01 2026 +0530
HBASE-29933: update_all_config hangs indefinitely when balancing event is
in progress(#7932)
Signed-off-by: Wellington Chevreuil <[email protected]>
Reviewed-by: Vaibhav Joshi <[email protected]>
---
.../org/apache/hadoop/hbase/master/HMaster.java | 122 +++++++++--------
.../apache/hadoop/hbase/master/LoadBalancer.java | 8 ++
.../master/balancer/CacheAwareLoadBalancer.java | 70 +++++++++-
.../balancer/TestCacheAwareLoadBalancer.java | 151 +++++++++++++++++++++
4 files changed, 288 insertions(+), 63 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 4a89ad7bc12..4e7bd84b22f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -2036,80 +2036,86 @@ public class HMaster extends HRegionServer implements
MasterServices {
}
synchronized (this.balancer) {
- // Only allow one balance run at at time.
- if (this.assignmentManager.getRegionTransitScheduledCount() > 0) {
- List<RegionStateNode> regionsInTransition =
assignmentManager.getRegionsInTransition();
- // if hbase:meta region is in transition, result of assignment cannot
be recorded
- // ignore the force flag in that case
- boolean metaInTransition =
assignmentManager.isMetaRegionInTransition();
- List<RegionStateNode> toPrint = regionsInTransition;
- int max = 5;
- boolean truncated = false;
- if (regionsInTransition.size() > max) {
- toPrint = regionsInTransition.subList(0, max);
- truncated = true;
- }
+ try {
+ this.balancer.onBalancingStart();
+
+ // Only allow one balance run at at time.
+ if (this.assignmentManager.getRegionTransitScheduledCount() > 0) {
+ List<RegionStateNode> regionsInTransition =
assignmentManager.getRegionsInTransition();
+ // if hbase:meta region is in transition, result of assignment
cannot be recorded
+ // ignore the force flag in that case
+ boolean metaInTransition =
assignmentManager.isMetaRegionInTransition();
+ List<RegionStateNode> toPrint = regionsInTransition;
+ int max = 5;
+ boolean truncated = false;
+ if (regionsInTransition.size() > max) {
+ toPrint = regionsInTransition.subList(0, max);
+ truncated = true;
+ }
- if (!request.isIgnoreRegionsInTransition() || metaInTransition) {
- LOG.info("Not running balancer (ignoreRIT=false" + ", metaRIT=" +
metaInTransition
- + ") because " + assignmentManager.getRegionTransitScheduledCount()
- + " region(s) are scheduled to transit " + toPrint
- + (truncated ? "(truncated list)" : ""));
+ if (!request.isIgnoreRegionsInTransition() || metaInTransition) {
+ LOG.info("Not running balancer (ignoreRIT=false" + ", metaRIT=" +
metaInTransition
+ + ") because " +
assignmentManager.getRegionTransitScheduledCount()
+ + " region(s) are scheduled to transit " + toPrint
+ + (truncated ? "(truncated list)" : ""));
+ return responseBuilder.build();
+ }
+ }
+ if (this.serverManager.areDeadServersInProgress()) {
+ LOG.info("Not running balancer because processing dead
regionserver(s): "
+ + this.serverManager.getDeadServers());
return responseBuilder.build();
}
- }
- if (this.serverManager.areDeadServersInProgress()) {
- LOG.info("Not running balancer because processing dead
regionserver(s): "
- + this.serverManager.getDeadServers());
- return responseBuilder.build();
- }
- if (this.cpHost != null) {
- try {
- if (this.cpHost.preBalance(request)) {
- LOG.debug("Coprocessor bypassing balancer request");
+ if (this.cpHost != null) {
+ try {
+ if (this.cpHost.preBalance(request)) {
+ LOG.debug("Coprocessor bypassing balancer request");
+ return responseBuilder.build();
+ }
+ } catch (IOException ioe) {
+ LOG.error("Error invoking master coprocessor preBalance()", ioe);
return responseBuilder.build();
}
- } catch (IOException ioe) {
- LOG.error("Error invoking master coprocessor preBalance()", ioe);
- return responseBuilder.build();
}
- }
- Map<TableName, Map<ServerName, List<RegionInfo>>> assignments =
-
this.assignmentManager.getRegionStates().getAssignmentsForBalancer(tableStateManager,
- this.serverManager.getOnlineServersList());
- for (Map<ServerName, List<RegionInfo>> serverMap : assignments.values())
{
-
serverMap.keySet().removeAll(this.serverManager.getDrainingServersList());
- }
+ Map<TableName, Map<ServerName, List<RegionInfo>>> assignments =
+
this.assignmentManager.getRegionStates().getAssignmentsForBalancer(tableStateManager,
+ this.serverManager.getOnlineServersList());
+ for (Map<ServerName, List<RegionInfo>> serverMap :
assignments.values()) {
+
serverMap.keySet().removeAll(this.serverManager.getDrainingServersList());
+ }
- // Give the balancer the current cluster state.
-
this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());
+ // Give the balancer the current cluster state.
+
this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());
- List<RegionPlan> plans = this.balancer.balanceCluster(assignments);
+ List<RegionPlan> plans = this.balancer.balanceCluster(assignments);
- responseBuilder.setBalancerRan(true).setMovesCalculated(plans == null ?
0 : plans.size());
+ responseBuilder.setBalancerRan(true).setMovesCalculated(plans == null
? 0 : plans.size());
- if (skipRegionManagementAction("balancer")) {
- // make one last check that the cluster isn't shutting down before
proceeding.
- return responseBuilder.build();
- }
+ if (skipRegionManagementAction("balancer")) {
+ // make one last check that the cluster isn't shutting down before
proceeding.
+ return responseBuilder.build();
+ }
- // For dry run we don't actually want to execute the moves, but we do
want
- // to execute the coprocessor below
- List<RegionPlan> sucRPs =
- request.isDryRun() ? Collections.emptyList() :
executeRegionPlansWithThrottling(plans);
+ // For dry run we don't actually want to execute the moves, but we do
want
+ // to execute the coprocessor below
+ List<RegionPlan> sucRPs =
+ request.isDryRun() ? Collections.emptyList() :
executeRegionPlansWithThrottling(plans);
- if (this.cpHost != null) {
- try {
- this.cpHost.postBalance(request, sucRPs);
- } catch (IOException ioe) {
- // balancing already succeeded so don't change the result
- LOG.error("Error invoking master coprocessor postBalance()", ioe);
+ if (this.cpHost != null) {
+ try {
+ this.cpHost.postBalance(request, sucRPs);
+ } catch (IOException ioe) {
+ // balancing already succeeded so don't change the result
+ LOG.error("Error invoking master coprocessor postBalance()", ioe);
+ }
}
- }
- responseBuilder.setMovesExecuted(sucRPs.size());
+ responseBuilder.setMovesExecuted(sucRPs.size());
+ } finally {
+ this.balancer.onBalancingComplete();
+ }
}
// If LoadBalancer did not generate any plans, it means the cluster is
already balanced.
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
index 908e04e2051..0ffe1996226 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
@@ -188,6 +188,14 @@ public interface LoadBalancer extends Stoppable,
ConfigurationObserver {
// noop
}
+ default void onBalancingStart() {
+ // noop
+ }
+
+ default void onBalancingComplete() {
+ // noop
+ }
+
/**
* @return true if Master carries regions
* @deprecated since 2.4.0, will be removed in 3.0.0.
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
index d48dc518175..d73109b1c08 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
@@ -37,6 +37,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.RegionMetrics;
@@ -64,13 +66,36 @@ public class CacheAwareLoadBalancer extends
StochasticLoadBalancer {
private Long sleepTime;
private Configuration configuration;
+ /**
+ * Tracks whether a balance run is currently in progress.
+ */
+ private final AtomicBoolean isBalancing = new AtomicBoolean(false);
+
+ /**
+ * Holds a configuration update that arrived while a balance run was in
progress.
+ */
+ private AtomicReference<Configuration> pendingConfiguration = new
AtomicReference<>();
+
public enum GeneratorFunctionType {
LOAD,
CACHE_RATIO
}
@Override
- public synchronized void loadConf(Configuration configuration) {
+ public void loadConf(Configuration configuration) {
+ // If balance is running, store configuration in pendingConfiguration and
return immediately.
+ // Defer the config update.
+ if (isBalancing.get()) {
+ LOG.debug(
+ "Balance is in progress, defer applying configuration change until
balance completed.");
+ pendingConfiguration.set(configuration);
+ } else {
+ // Apply configuration change immediately.
+ updateConfiguration(configuration);
+ }
+ }
+
+ public void updateConfiguration(Configuration configuration) {
this.configuration = configuration;
this.costFunctions = new ArrayList<>();
super.loadConf(configuration);
@@ -79,6 +104,38 @@ public class CacheAwareLoadBalancer extends
StochasticLoadBalancer {
sleepTime = configuration.getLong(MOVE_THROTTLING,
MOVE_THROTTLING_DEFAULT.toMillis());
}
+ /**
+ * Sets {@link #isBalancing} to {@code true} before a balance run starts.
+ */
+ @Override
+ public void onBalancingStart() {
+ LOG.debug("Setting isBalancing to true as balance is starting");
+ isBalancing.set(true);
+ }
+
+ /**
+ * Sets {@link #isBalancing} to {@code false} after a balance run completes
and applies any
+ * pending configuration that arrived during balancing.
+ */
+ @Override
+ public void onBalancingComplete() {
+ LOG.debug("Setting isBalancing to false as balance is completed");
+ isBalancing.set(false);
+ applyPendingConfiguration();
+ }
+
+ /**
+ * If a pending configuration was stored during a balance run, apply it and
clear the pending
+ * reference.
+ */
+ public void applyPendingConfiguration() {
+ Configuration toApply = pendingConfiguration.getAndSet(null);
+ if (toApply != null) {
+ LOG.info("Applying pending configuration after balance completed.");
+ updateConfiguration(toApply);
+ }
+ }
+
@Override
protected Map<Class<? extends CandidateGenerator>, CandidateGenerator>
createCandidateGenerators() {
@@ -193,10 +250,13 @@ public class CacheAwareLoadBalancer extends
StochasticLoadBalancer {
+ "Throttling move for {}ms.",
plan.getRegionInfo().getEncodedName(), plan.getDestination(),
sleepTime);
}
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+ synchronized (this) {
+ try {
+ // Release the monitor while waiting to avoid blocking other threads.
+ wait(sleepTime);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java
index 335a719a1f9..485ae754443 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java
@@ -33,6 +33,10 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
@@ -599,4 +603,151 @@ public class TestCacheAwareLoadBalancer extends
BalancerTestBase {
fail("NPE should not be thrown");
}
}
+
+ /**
+ * This test verifies that when loadConf/onConfigurationChange is called on a
+ * CacheAwareLoadBalancer while a balance run is in progress, the
configuration update: 1. Does
+ * not block (returns quickly without waiting for balancing to finish) 2.
Does not affect the
+ * ongoing balance run (the configuration used during balancing remains the
old one) 3. Is applied
+ * correctly after the balance run completes
+ */
+ @Test(timeout = 60000)
+ public void testConfigUpdateDuringBalance() throws Exception {
+ Float expectedOldRatioThreshold = 0.8f;
+ Float expectedNewRatioThreshold = 0.95f;
+ long throttleTimeMs = 10000;
+
+ // Actual old ratio threshold used during balance
+ float[] actualOldRatioThresholdDuringBalance = new float[1];
+
+ Configuration conf = HBaseConfiguration.create();
+ conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY,
"prefetch_file_list");
+ conf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, throttleTimeMs);
+ conf.setFloat(CacheAwareLoadBalancer.CACHE_RATIO_THRESHOLD,
expectedOldRatioThreshold);
+
+ CacheAwareLoadBalancer balancer = new CacheAwareLoadBalancer();
+ MasterServices services = mock(MasterServices.class);
+ when(services.getConfiguration()).thenReturn(conf);
+ balancer.setMasterServices(services);
+ balancer.loadConf(conf);
+ balancer.initialize();
+
+ Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
+ ServerName server0 = servers.get(0);
+ ServerName server1 = servers.get(1);
+ ServerName server2 = servers.get(2);
+
+ // Setup cluster: all 3 regions on server0 (unbalanced)
+ List<RegionInfo> regionsOnServer0 = randomRegions(3);
+ List<RegionInfo> regionsOnServer1 = randomRegions(0);
+ List<RegionInfo> regionsOnServer2 = randomRegions(0);
+
+ clusterState.put(server0, regionsOnServer0);
+ clusterState.put(server1, regionsOnServer1);
+ clusterState.put(server2, regionsOnServer2);
+
+ // Mock metrics: NO cache info for any region = all will be throttled
+ Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
+ serverMetricsMap.put(server0,
mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
+ 0.0f, new ArrayList<>(), 0, 10));
+ serverMetricsMap.put(server1,
mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
+ 0.0f, new ArrayList<>(), 0, 10));
+ serverMetricsMap.put(server2,
mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
+ 0.0f, new ArrayList<>(), 0, 10));
+
+ ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
+ when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
+ balancer.updateClusterMetrics(clusterMetrics);
+
+ final Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable =
+ (Map) mockClusterServersWithTables(clusterState);
+
+ // Verify initial configuration
+ assertEquals(expectedOldRatioThreshold, balancer.ratioThreshold, 0.001f);
+
+ CountDownLatch balanceStarted = new CountDownLatch(1);
+ CountDownLatch updateConfigInitiated = new CountDownLatch(1);
+
+ long[] configUpdateDuration = new long[1];
+ long[] balanceDuration = new long[1];
+
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+
+ try {
+ // Thread 1 Simulate similar flow to HMaster.balance() which holds
synchronized(balancer) for
+ // the duration of balance
+ Future<Long> balanceFuture = executor.submit(() -> {
+ try {
+ long start = EnvironmentEdgeManager.currentTime();
+ synchronized (balancer) {
+ try {
+ // Simulate beginning of HMaster.balance() mark balancing window
open
+ balancer.onBalancingStart();
+ balanceStarted.countDown();
+ List<RegionPlan> plans = balancer.balanceCluster(loadOfAllTable);
+
+ LOG.info("Balance generated {} plans, executing with throttling",
+ plans != null ? plans.size() : 0);
+
+ if (plans != null) {
+ for (int i = 0; i < plans.size(); i++) {
+ RegionPlan plan = plans.get(i);
+ balancer.throttle(plan);
+ }
+ }
+ // Wait until config update is initiated while balance is still
in progress
+ updateConfigInitiated.await();
+
+ // Old config should still be visible during current balance run
+ actualOldRatioThresholdDuringBalance[0] =
balancer.ratioThreshold;
+ } finally {
+ balancer.onBalancingComplete();
+ }
+ }
+ return EnvironmentEdgeManager.currentTime() - start;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // Thread 2: Simulate update_all_config / onConfigurationChange
+ Future<Long> configUpdateFuture = executor.submit(() -> {
+ try {
+ // Wait for balance to start
+ balanceStarted.await();
+ long startTime = EnvironmentEdgeManager.currentTime();
+
+ // Call onConfigurationChange - should NOT hang
+ Configuration newConf = HBaseConfiguration.create();
+ newConf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY,
"prefetch_file_list");
+ newConf.setLong(CacheAwareLoadBalancer.MOVE_THROTTLING, 10000);
+ newConf.setFloat(CacheAwareLoadBalancer.CACHE_RATIO_THRESHOLD,
expectedNewRatioThreshold);
+ balancer.onConfigurationChange(newConf);
+ updateConfigInitiated.countDown();
+
+ return EnvironmentEdgeManager.currentTime() - startTime;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // Wait for both threads to complete
+ configUpdateDuration[0] = configUpdateFuture.get();
+ balanceDuration[0] = balanceFuture.get();
+ System.out.println("Balance duration (ms): " + balanceDuration[0]);
+ System.out.println("Config update duration (ms): " +
configUpdateDuration[0]);
+
+ // Verify that ratio threshold used during balance is stll the old
+ assertEquals(expectedOldRatioThreshold,
actualOldRatioThresholdDuringBalance[0], 0.001f);
+
+ // Verify that config updated successfully after balance completed
+ assertEquals(expectedNewRatioThreshold, balancer.ratioThreshold, 0.001f);
+
+ // Verify that config update didn't hang/timeout waiting for balance
+ assertTrue(configUpdateDuration[0] < balanceDuration[0]);
+
+ } finally {
+ executor.shutdownNow();
+ }
+ }
}