This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 6197c71 Fix CustomRebalancer's assignment computation (#477)
6197c71 is described below
commit 6197c717cc4dfeca2ba16f29750620d0e21f90d7
Author: Hunter Lee <[email protected]>
AuthorDate: Mon Sep 16 14:40:20 2019 -0700
Fix CustomRebalancer's assignment computation (#477)
It was observed that sometimes CustomRebalancer would leave out an instance
entirely if an instance is disabled or the partition on the instance was still
bootstrapping (current state is null). This would cause a cluster not to
converge. This diff fixes this by 1) still including an assignment from
IdealState even though the current state is null (maybe due to a pending state
transition) 2) putting disabled partitions in InitialState.
Changelist:
1. Fix the issue
2. Add a test: TestCustomRebalancer
---
.../controller/rebalancer/CustomRebalancer.java | 17 +++--
.../rebalancer/TestCustomRebalancer.java | 81 ++++++++++++++++++++++
2 files changed, 92 insertions(+), 6 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index 273786b..3cf90c1 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -130,14 +130,19 @@ public class CustomRebalancer extends
AbstractRebalancer<ResourceControllerDataP
Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
for (String instance : idealStateMap.keySet()) {
- boolean notInErrorState =
- currentStateMap == null || currentStateMap.get(instance) == null
- ||
!currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString());
-
+ boolean notInErrorState = currentStateMap != null
+ &&
!HelixDefinedState.ERROR.toString().equals(currentStateMap.get(instance));
boolean enabled = !disabledInstancesForPartition.contains(instance) &&
isResourceEnabled;
- if (liveInstancesMap.containsKey(instance) && notInErrorState &&
enabled) {
- instanceStateMap.put(instance, idealStateMap.get(instance));
+ // Note: if instance is not live, the mapping for that instance will not
show up in
+ // BestPossibleMapping (and ExternalView)
+ if (liveInstancesMap.containsKey(instance) && notInErrorState) {
+ if (enabled) {
+ instanceStateMap.put(instance, idealStateMap.get(instance));
+ } else {
+ // if disabled, put it in initial state
+ instanceStateMap.put(instance, stateModelDef.getInitialState());
+ }
}
}
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomRebalancer.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomRebalancer.java
new file mode 100644
index 0000000..57f8e80
--- /dev/null
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomRebalancer.java
@@ -0,0 +1,81 @@
+package org.apache.helix.integration.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.CustomRebalancer;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.OnlineOfflineSMD;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+
+public class TestCustomRebalancer {
+
+ /**
+ * This test was written because there is an edge case where an instance
becomes disabled while a
+ * partition is bootstrapping by way of pending
+ * messages.
+ * The newly bootstrapped partitions never get further state transitions
because the instance
+ * won't ever get added to instanceStateMap (this issue has been fixed). In
other words, if there
+ * are mapping changes while a partition is bootstrapping, the final state
should go into the best
+ * possible mapping for clusters to converge correctly.
+ */
+ @Test
+ public void testDisabledBootstrappingPartitions() {
+ String resourceName = "Test";
+ String partitionName = "Test0";
+ String instanceName = "localhost";
+ String stateModelName = "OnlineOffline";
+ StateModelDefinition stateModelDef = new OnlineOfflineSMD();
+
+ IdealState idealState = new IdealState(resourceName);
+ idealState.setStateModelDefRef(stateModelName);
+ idealState.setPartitionState(partitionName, instanceName, "ONLINE");
+
+ Resource resource = new Resource(resourceName);
+ resource.addPartition(partitionName);
+
+ CustomRebalancer customRebalancer = new CustomRebalancer();
+ ResourceControllerDataProvider cache =
mock(ResourceControllerDataProvider.class);
+ when(cache.getStateModelDef(stateModelName)).thenReturn(stateModelDef);
+ when(cache.getDisabledInstancesForPartition(resource.getResourceName(),
partitionName))
+ .thenReturn(ImmutableSet.of(instanceName));
+ when(cache.getLiveInstances())
+ .thenReturn(ImmutableMap.of(instanceName, new
LiveInstance(instanceName)));
+
+ CurrentStateOutput currOutput = new CurrentStateOutput();
+ ResourceAssignment resourceAssignment =
+ customRebalancer.computeBestPossiblePartitionState(cache, idealState,
resource, currOutput);
+
+ Assert.assertEquals(
+ resourceAssignment.getReplicaMap(new
Partition(partitionName)).get(instanceName),
+ stateModelDef.getInitialState());
+ }
+}