This is an automated email from the ASF dual-hosted git repository.
somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7bd61f05a7 SegmentRelocator should skip rebalancing tables that
haven't completed rebalance since the last relocator run (#15681)
7bd61f05a7 is described below
commit 7bd61f05a710391d899484752b46117266736168
Author: Sonam Mandal <[email protected]>
AuthorDate: Wed Apr 30 13:34:41 2025 -0700
SegmentRelocator should skip rebalancing tables that haven't completed
rebalance since the last relocator run (#15681)
* SegmentRelocator should skip rebalancing tables that haven't completed
rebalance since the last relocator run
---
.../helix/core/relocation/SegmentRelocator.java | 26 ++++++++++++++++++++--
1 file changed, 24 insertions(+), 2 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
index 9bc8015d1c..0b4ad7d7ba 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
+import javax.annotation.Nullable;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.Criteria;
@@ -80,6 +81,7 @@ public class SegmentRelocator extends
ControllerPeriodicTask<Void> {
private final Set<String> _waitingTables;
private final BlockingQueue<String> _waitingQueue;
+ @Nullable private final Set<String> _tablesUndergoingRebalance;
public SegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager,
LeadControllerManager leadControllerManager, ControllerConf config,
ControllerMetrics controllerMetrics,
@@ -120,17 +122,24 @@ public class SegmentRelocator extends
ControllerPeriodicTask<Void> {
LOGGER.warn("Got interrupted while rebalancing tables sequentially",
e);
}
});
+ _tablesUndergoingRebalance = null;
} else {
_waitingTables = null;
_waitingQueue = null;
+ _tablesUndergoingRebalance = ConcurrentHashMap.newKeySet();
}
}
@Override
protected void processTable(String tableNameWithType) {
if (_waitingTables == null) {
- LOGGER.debug("Rebalance table: {} immediately", tableNameWithType);
- _executorService.submit(() -> rebalanceTable(tableNameWithType));
+ assert _tablesUndergoingRebalance != null;
+ if (!_tablesUndergoingRebalance.contains(tableNameWithType)) {
+ LOGGER.debug("Rebalance table: {} immediately", tableNameWithType);
+ _executorService.submit(() -> rebalanceTable(tableNameWithType));
+ } else {
+ LOGGER.info("The previous rebalance has not yet completed, skip
rebalancing table {}", tableNameWithType);
+ }
return;
}
putTableToWait(tableNameWithType);
@@ -195,6 +204,14 @@ public class SegmentRelocator extends
ControllerPeriodicTask<Void> {
rebalanceConfig.setIncludeConsuming(_includeConsuming);
rebalanceConfig.setMinimizeDataMovement(_minimizeDataMovement);
+ if (_tablesUndergoingRebalance != null) {
+ LOGGER.debug("Start rebalancing table: {}, adding to
tablesUndergoingRebalance", tableNameWithType);
+ if (!_tablesUndergoingRebalance.add(tableNameWithType)) {
+ LOGGER.warn("Skip rebalancing table: {}, table already exists in
tablesUndergoingRebalance, a rebalance "
+ + "must have already been started", tableNameWithType);
+ return;
+ }
+ }
try {
// Relocating segments to new tiers needs two sequential actions: table
rebalance and local tier migration.
// Table rebalance moves segments to the new ideal servers, which can
change for a segment when its target
@@ -219,6 +236,11 @@ public class SegmentRelocator extends
ControllerPeriodicTask<Void> {
}
} catch (Throwable t) {
LOGGER.error("Caught exception/error while rebalancing table: {}",
tableNameWithType, t);
+ } finally {
+ if (_tablesUndergoingRebalance != null) {
+ LOGGER.debug("Done rebalancing table: {}, removing from
tablesUndergoingRebalance", tableNameWithType);
+ _tablesUndergoingRebalance.remove(tableNameWithType);
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]