This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch abnormalResolver
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 5b4d480410adf539d4d43d8014676f501dcfc49d
Author: Jiajun Wang <[email protected]>
AuthorDate: Thu Jun 4 11:50:23 2020 -0700

    Add ExcessiveTopStateResolver to gracefully fix the double-masters 
situation. (#1037)
    
    Although the rebalancer will fix the additional master eventually, the 
default operations are arbitrary and it may cause an older master to survive. 
This may cause serious application logic issues since many applications require 
the master to have the latest data.
    With this state resolver, the rebalancer will change the default behavior 
to reset all the master replicas so as to ensure the remaining one is the 
youngest one. Then the double-masters situation is gracefully resolved.
---
 .../constraint/AbnormalStateResolver.java          |   4 +-
 .../controller/rebalancer/AbstractRebalancer.java  |   2 +-
 .../constraint/ExcessiveTopStateResolver.java      | 123 +++++++++++++++++++++
 .../constraint/MockAbnormalStateResolver.java      |   2 +-
 .../rebalancer/TestAbnormalStatesResolver.java     | 117 ++++++++++++++++++++
 5 files changed, 244 insertions(+), 4 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
 
b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
index 7e9946c..1a75e0b 100644
--- 
a/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
+++ 
b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
@@ -35,7 +35,7 @@ public interface AbnormalStateResolver {
    * This is a dummy class that does not really functional.
    */
   AbnormalStateResolver DUMMY_STATE_RESOLVER = new AbnormalStateResolver() {
-    public boolean isCurrentStatesValid(final CurrentStateOutput 
currentStateOutput,
+    public boolean checkCurrentStates(final CurrentStateOutput 
currentStateOutput,
         final String resourceName, final Partition partition,
         final StateModelDefinition stateModelDef) {
       // By default, all current states are valid.
@@ -56,7 +56,7 @@ public interface AbnormalStateResolver {
    * @param stateModelDef
    * @return true if the current states of the specified partition is valid.
    */
-  boolean isCurrentStatesValid(final CurrentStateOutput currentStateOutput,
+  boolean checkCurrentStates(final CurrentStateOutput currentStateOutput,
       final String resourceName, final Partition partition,
       final StateModelDefinition stateModelDef);
 
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
index 6fdd0b6..a1ca6ec 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
@@ -245,7 +245,7 @@ public abstract class AbstractRebalancer<T extends 
BaseControllerDataProvider> i
     }
 
     // (3) If the current states are not valid, fix the invalid part first.
-    if (!resolver.isCurrentStatesValid(currentStateOutput, resourceName, 
partition, stateModelDef)) {
+    if (!resolver.checkCurrentStates(currentStateOutput, resourceName, 
partition, stateModelDef)) {
       Map<String, String> recoveryAssignment = resolver
           .computeRecoveryAssignment(currentStateOutput, resourceName, 
partition, stateModelDef,
               preferenceList);
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/ExcessiveTopStateResolver.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/ExcessiveTopStateResolver.java
new file mode 100644
index 0000000..3c1e100
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/ExcessiveTopStateResolver.java
@@ -0,0 +1,123 @@
+package org.apache.helix.controller.rebalancer.constraint;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.StateModelDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The abnormal state resolver that gracefully fixes the abnormality of 
excessive top states for
+ * single-topstate state model. For example, two replcias of a MasterSlave 
partition are assigned
+ * with the Master state at the same time. This could be caused by a network 
partitioning or the
+ * other unexpected issues.
+ *
+ * The resolver checks for the abnormality and computes recovery assignment 
which triggers the
+ * rebalancer to eventually reset all the top state replias for once. After 
the resets, only one
+ * replica will be assigned the top state.
+ *
+ * Note that without using this resolver, the regular Helix rebalance pipeline 
also removes the
+ * excessive top state replicas. However, the default logic does not force 
resetting ALL the top
+ * state replicas. Since the multiple top states situation may break 
application data, the default
+ * resolution won't be enough to fix the potential problem.
+ */
+public class ExcessiveTopStateResolver implements AbnormalStateResolver {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExcessiveTopStateResolver.class);
+
+  /**
+   * The current states are not valid if there are more than one top state 
replicas for a single top
+   * state state model.
+   */
+  @Override
+  public boolean checkCurrentStates(final CurrentStateOutput 
currentStateOutput,
+      final String resourceName, final Partition partition, 
StateModelDefinition stateModelDef) {
+    if (!stateModelDef.isSingleTopStateModel()) {
+      return true;
+    }
+    // TODO: Cache top state count in the ResourceControllerDataProvider and 
avoid repeated counting
+    // TODO: here. It would be premature to do it now. But with more use case, 
we can improve the
+    // TODO: ResourceControllerDataProvider to calculate by default.
+    if (currentStateOutput.getCurrentStateMap(resourceName, 
partition).values().stream()
+        .filter(state -> state.equals(stateModelDef.getTopState())).count() > 
1) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public Map<String, String> computeRecoveryAssignment(final 
CurrentStateOutput currentStateOutput,
+      final String resourceName, final Partition partition, 
StateModelDefinition stateModelDef,
+      List<String> preferenceList) {
+    Map<String, String> currentStateMap =
+        currentStateOutput.getCurrentStateMap(resourceName, partition);
+    if (checkCurrentStates(currentStateOutput, resourceName, partition, 
stateModelDef)) {
+      // This method should not be triggered when the mapping is valid.
+      // Log the warning for debug purposes.
+      LOG.warn("The input current state map {} is valid, return the original 
current state.",
+          currentStateMap);
+      return currentStateMap;
+    }
+
+    Map<String, String> recoverMap = new HashMap<>(currentStateMap);
+    String recoveryState = stateModelDef
+        .getNextStateForTransition(stateModelDef.getTopState(), 
stateModelDef.getInitialState());
+
+    // 1. We have to reset the expected top state replica host if it is 
hosting the top state
+    // replica. Otherwise, the old master replica with the possible stale data 
will never be reset
+    // there.
+    if (preferenceList != null && !preferenceList.isEmpty()) {
+      String expectedTopStateHost = preferenceList.get(0);
+      if 
(recoverMap.get(expectedTopStateHost).equals(stateModelDef.getTopState())) {
+        recoverMap.put(expectedTopStateHost, recoveryState);
+      }
+    }
+
+    // 2. To minimize the impact of the resolution, we want to reserve one top 
state replica even
+    // during the recovery process.
+    boolean hasReservedTopState = false;
+    for (String instance : recoverMap.keySet()) {
+      if (recoverMap.get(instance).equals(stateModelDef.getTopState())) {
+        if (hasReservedTopState) {
+          recoverMap.put(instance, recoveryState);
+        } else {
+          hasReservedTopState = true;
+        }
+      }
+    }
+    // Here's what we expect to happen next:
+    // 1. The ideal partition assignment is changed to the proposed recovery 
state. Then the current
+    // rebalance pipeline proceeds. State transition messages will be sent 
accordingly.
+    // 2. When the next rebalance pipeline starts, the new current state may 
still contain
+    // abnormality if the participants have not finished state transition yet. 
Then the resolver
+    // continues to fix the states with the same logic.
+    // 3. Otherwise, if the new current state contains only one top state 
replica, then we will hand
+    // it over to the regular rebalancer logic. The rebalancer will trigger 
the state transition to
+    // bring the top state back in the expected allocation.
+    // And the masters with potential stale data will be all reset by then.
+    return recoverMap;
+  }
+}
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java
index 4718921..00a2f68 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java
@@ -33,7 +33,7 @@ import org.apache.helix.model.StateModelDefinition;
  */
 public class MockAbnormalStateResolver implements AbnormalStateResolver {
   @Override
-  public boolean isCurrentStatesValid(final CurrentStateOutput 
currentStateOutput,
+  public boolean checkCurrentStates(final CurrentStateOutput 
currentStateOutput,
       final String resourceName, final Partition partition,
       final StateModelDefinition stateModelDef) {
     // By default, all current states are valid.
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
index dde2644..e10645f 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
@@ -19,16 +19,33 @@ package org.apache.helix.integration.rebalancer;
  * under the License.
  */
 
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.helix.ConfigAccessor;
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
 import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import 
org.apache.helix.controller.rebalancer.constraint.ExcessiveTopStateResolver;
 import 
org.apache.helix.controller.rebalancer.constraint.MockAbnormalStateResolver;
 import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.messaging.AsyncCallback;
 import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.model.Message;
+import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -64,4 +81,104 @@ public class TestAbnormalStatesResolver extends 
ZkStandAloneCMTestBase {
     clusterConfig.setAbnormalStateResolverMap(Collections.emptyMap());
     configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
   }
+
+  @Test(dependsOnMethods = "testConfigureResolver")
+  public void testExcessiveTopStateResolver() {
+    BestPossibleExternalViewVerifier verifier =
+        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
+    Assert.assertTrue(verifier.verify());
+
+    // 1. Find a partition with a MASTER replica and a SLAVE replica
+    HelixAdmin admin = new 
ZKHelixAdmin.Builder().setZkAddress(ZK_ADDR).build();
+    ExternalView ev = admin.getResourceExternalView(CLUSTER_NAME, TEST_DB);
+    String targetPartition = ev.getPartitionSet().iterator().next();
+    Map<String, String> partitionAssignment = ev.getStateMap(targetPartition);
+    String slaveHost =
+        partitionAssignment.entrySet().stream().filter(entry -> 
entry.getValue().equals("SLAVE"))
+            .findAny().get().getKey();
+    long previousMasterUpdateTime = getTopStateUpdateTime(ev, targetPartition, 
"MASTER");
+
+    // Build SLAVE to MASTER message
+    String msgId = new UUID(123, 456).toString();
+    Message msg =
+        createMessage(Message.MessageType.STATE_TRANSITION, msgId, "SLAVE", 
"MASTER", TEST_DB,
+            slaveHost);
+    msg.setStateModelDef(MasterSlaveSMD.name);
+
+    Criteria cr = new Criteria();
+    cr.setInstanceName(slaveHost);
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(true);
+    cr.setPartition(targetPartition);
+    cr.setResource(TEST_DB);
+    cr.setClusterName(CLUSTER_NAME);
+
+    AsyncCallback callback = new AsyncCallback() {
+      @Override
+      public void onTimeOut() {
+        Assert.fail("The test state transition timeout.");
+      }
+
+      @Override
+      public void onReplyMessage(Message message) {
+        Assert.assertEquals(message.getMsgState(), Message.MessageState.READ);
+      }
+    };
+
+    // 2. Send the SLAVE to MASTER message to the SLAVE host to make abnormal 
partition states.
+
+    // 2.A. Without resolver, the fixing is not completely done by the default 
rebalancer logic.
+    _controller.getMessagingService()
+        .sendAndWait(cr, msg, callback, (int) TestHelper.WAIT_DURATION);
+    // Wait until the partition status is fixed, verify if the result is as 
expected
+    verifier =
+        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
+    Assert.assertTrue(verifier.verify());
+    ev = admin.getResourceExternalView(CLUSTER_NAME, TEST_DB);
+    Assert.assertEquals(
+        ev.getStateMap(targetPartition).values().stream().filter(state -> 
state.equals("MASTER"))
+            .count(), 1);
+    // Since the resolver is not used in the auto default fix process, there 
is no update on the
+    // original master. So if there is any data issue, it was not fixed.
+    long currentMasterUpdateTime = getTopStateUpdateTime(ev, targetPartition, 
"MASTER");
+    Assert.assertFalse(currentMasterUpdateTime > previousMasterUpdateTime);
+
+    // 2.B. with resolver configured, the fixing is complete.
+    ConfigAccessor configAccessor = new 
ConfigAccessor.Builder().setZkAddress(ZK_ADDR).build();
+    ClusterConfig clusterConfig = 
configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setAbnormalStateResolverMap(
+        ImmutableMap.of(MasterSlaveSMD.name, 
ExcessiveTopStateResolver.class.getName()));
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+    _controller.getMessagingService()
+        .sendAndWait(cr, msg, callback, (int) TestHelper.WAIT_DURATION);
+    // Wait until the partition status is fixed, verify if the result is as 
expected
+    Assert.assertTrue(verifier.verify());
+    ev = admin.getResourceExternalView(CLUSTER_NAME, TEST_DB);
+    Assert.assertEquals(
+        ev.getStateMap(targetPartition).values().stream().filter(state -> 
state.equals("MASTER"))
+            .count(), 1);
+    // Now the resolver is used in the auto fix process, the original master 
has also been refreshed.
+    // The potential data issue has been fixed in this process.
+    currentMasterUpdateTime = getTopStateUpdateTime(ev, targetPartition, 
"MASTER");
+    Assert.assertTrue(currentMasterUpdateTime > previousMasterUpdateTime);
+
+    // Reset the resolver map
+    clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setAbnormalStateResolverMap(Collections.emptyMap());
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+  }
+
+  private long getTopStateUpdateTime(ExternalView ev, String partition, String 
state) {
+    String topStateHost = ev.getStateMap(partition).entrySet().stream()
+        .filter(entry -> 
entry.getValue().equals(state)).findFirst().get().getKey();
+    MockParticipantManager participant = Arrays.stream(_participants)
+        .filter(instance -> 
instance.getInstanceName().equals(topStateHost)).findFirst().get();
+
+    HelixDataAccessor accessor = _controller.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    CurrentState currentState = accessor.getProperty(keyBuilder
+        .currentState(participant.getInstanceName(), 
participant.getSessionId(),
+            ev.getResourceName()));
+    return currentState.getEndTime(partition);
+  }
 }

Reply via email to