This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7b89d1b4a83 [Bug fix] Fault-Domain-Aware Instance Assignment failing
rebalance with minimize data movement (#17799)
7b89d1b4a83 is described below
commit 7b89d1b4a83b10f96fd940fca549c97acc16e7a4
Author: Jhow <[email protected]>
AuthorDate: Wed Mar 4 08:44:32 2026 -0800
[Bug fix] Fault-Domain-Aware Instance Assignment failing rebalance with
minimize data movement (#17799)
---
.../instance/FDAwareInstancePartitionSelector.java | 8 ++-
.../instance/InstanceAssignmentTest.java | 61 ++++++++++++++++++++++
2 files changed, 68 insertions(+), 1 deletion(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
index 89d64272e3d..10db8702f39 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/FDAwareInstancePartitionSelector.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
+import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
@@ -190,7 +191,7 @@ public class FDAwareInstancePartitionSelector extends
InstancePartitionSelector
// preprocess the problem of numReplicaGroups >= numFaultDomains to a
problem
replicaGroupBasedAssignmentState.normalize(faultDomainToCandidateInstancesMap);
- // fill the remaining vacant seats
+ // fill the remaining vacant seats if any
replicaGroupBasedAssignmentState.fill(faultDomainToCandidateInstancesMap);
// adjust the instance assignment to achieve the invariant state
@@ -389,6 +390,11 @@ public class FDAwareInstancePartitionSelector extends
InstancePartitionSelector
* Fill the vacant instances
*/
public void fill(Map<Integer, LinkedHashSet<String>>
faultDomainToCandidateInstancesMap) {
+ // skip filling if there is no candidate instance, which can happen when
minimize data movement is enabled and
+ // no new instances are added to any pool
+ if
(faultDomainToCandidateInstancesMap.values().stream().allMatch(Set::isEmpty)) {
+ return;
+ }
// convert set to que and start to assign
CandidateQueue candidateQueue = new
CandidateQueue(faultDomainToCandidateInstancesMap);
if (_numReplicaGroups != 0) { // uplift instance per replica group first
if not a fresh new assignment
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index 74badfb7fcd..62a1af26bd6 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -3330,4 +3330,65 @@ public class InstanceAssignmentTest {
SERVER_INSTANCE_ID_PREFIX + "11" + SERVER_INSTANCE_POOL_PREFIX + 1,
SERVER_INSTANCE_ID_PREFIX + "17" + SERVER_INSTANCE_POOL_PREFIX +
2));
}
+
+ @Test
+ public void testPoolBasedFDAwareSteadyStateMinimizeDataMovement() {
+ // Test that a rebalance with minimizeDataMovement=true and no instance
changes does not throw
+ // NoSuchElementException. This is a regression test for the case where
all candidate instances are empty
+ // after preprocessing (no new instances added to any pool).
+
+ // 21 instances in 5 pools, with [5,4,4,4,4] instances in each pool
+ int numInstances = 21;
+ int numPools = 5;
+ int numReplicaGroups = 3;
+ int numInstancesPerReplicaGroup = numInstances / numReplicaGroups;
+ List<InstanceConfig> instanceConfigs = new ArrayList<>(numInstances);
+ for (int i = 0; i < numInstances; i++) {
+ int pool = i % numPools;
+ InstanceConfig instanceConfig =
+ new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i +
SERVER_INSTANCE_POOL_PREFIX + pool);
+ instanceConfig.addTag(OFFLINE_TAG);
+ instanceConfig.getRecord()
+ .setMapField(InstanceUtils.POOL_KEY,
Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+ instanceConfigs.add(instanceConfig);
+ }
+
+ // Initial assignment (no minimize data movement, no existing partitions)
+ InstanceTagPoolConfig tagPoolConfig = new
InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
+ InstanceReplicaGroupPartitionConfig replicaPartitionConfig =
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups,
numInstancesPerReplicaGroup, 0, 0, false,
+ null);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+ new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
+
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
false)))
+ .build();
+ InstanceAssignmentDriver driver = new
InstanceAssignmentDriver(tableConfig);
+ InstancePartitions initialPartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs, null);
+ assertEquals(initialPartitions.getNumReplicaGroups(), numReplicaGroups);
+ assertEquals(initialPartitions.getNumPartitions(), 1);
+
+ // Now re-run with the same instances and minimizeDataMovement=true,
passing existing partitions.
+ // Before the fix in #17799, this would throw NoSuchElementException
because CandidateQueue was created with an
+ // empty map (all existing instances were removed from candidates during
preprocessing, leaving empty sets).
+ replicaPartitionConfig =
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups,
numInstancesPerReplicaGroup, 0, 0, true,
+ null);
+ tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+ new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
+
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString(),
true)))
+ .build();
+ driver = new InstanceAssignmentDriver(tableConfig);
+ InstancePartitions steadyStatePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs, initialPartitions);
+
+ // Assignment should be unchanged
+ assertEquals(steadyStatePartitions.getNumReplicaGroups(),
numReplicaGroups);
+ assertEquals(steadyStatePartitions.getNumPartitions(), 1);
+ for (int rg = 0; rg < numReplicaGroups; rg++) {
+ assertEquals(steadyStatePartitions.getInstances(0, rg),
initialPartitions.getInstances(0, rg));
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]