This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit b0f61507915f5052f53854ddbefcacac86f9c8d4
Author: Junkai Xue <[email protected]>
AuthorDate: Fri Feb 1 13:23:35 2019 -0800

    Add ERROR mapping for displaying in BestPossible and Intemediate assignment.
    
    With adding this ERROR mapping replica, it should not break any rules of 
assignment. It was purely serving for showing the ERROR partition targeted host.
---
 .../rebalancer/DelayedAutoRebalancer.java          |  21 ++-
 .../java/org/apache/helix/common/ZkTestBase.java   |   9 ++
 .../helix/integration/TestErrorReplicaPersist.java | 151 +++++++++++++++++++++
 .../TestDelayedAutoRebalancer.OnlineOffline.json   |   2 +-
 4 files changed, 178 insertions(+), 5 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index 7dd9128..62ad37f 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -529,24 +529,37 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer<ResourceController
     // we should drop all partitions from previous assigned instances.
     if 
(!currentMapWithPreferenceList.values().contains(HelixDefinedState.ERROR.name())
         && bestPossibleStateMap.size() > numReplicas && 
readyToDrop(currentStateMap,
-        bestPossibleStateMap, numReplicas, combinedPreferenceList)) {
+        bestPossibleStateMap, preferenceList, combinedPreferenceList)) {
       for (int i = 0; i < combinedPreferenceList.size() - numReplicas; i++) {
         String instanceToDrop = 
combinedPreferenceList.get(combinedPreferenceList.size() - i - 1);
         bestPossibleStateMap.put(instanceToDrop, 
HelixDefinedState.DROPPED.name());
       }
     }
 
+    // Adding ERROR replica mapping to best possible
+    // ERROR assignment should be mutual excluded from DROPPED assignment 
because
+    // once there is an ERROR replica in the mapping, 
bestPossibleStateMap.size() > numReplicas prevents
+    // code entering the DROPPING stage.
+    for (String instance : combinedPreferenceList) {
+      if (currentStateMap.containsKey(instance) && 
currentStateMap.get(instance)
+          .equals(HelixDefinedState.ERROR.name())) {
+        bestPossibleStateMap.put(instance, HelixDefinedState.ERROR.name());
+      }
+    }
+
     return bestPossibleStateMap;
   }
 
   private boolean readyToDrop(Map<String, String> currentStateMap,
-      Map<String, String> bestPossibleMap, int numReplicas, List<String> 
combinedPreferenceList) {
+      Map<String, String> bestPossibleMap, List<String> preferenceList,
+      List<String> combinedPreferenceList) {
     if (currentStateMap.size() != bestPossibleMap.size()) {
       return false;
     }
+    Set<String> tmpPreferenceSet = new HashSet<>(preferenceList);
+    tmpPreferenceSet.retainAll(combinedPreferenceList);
 
-    for (int i = 0; i < numReplicas; i++) {
-      String instance = combinedPreferenceList.get(i);
+    for (String instance : tmpPreferenceSet) {
       if (!currentStateMap.containsKey(instance) || 
!currentStateMap.get(instance)
           .equals(bestPossibleMap.get(instance))) {
         return false;
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java 
b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index 700d204..721cd33 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -244,6 +244,15 @@ public class ZkTestBase {
     configAccessor.setInstanceConfig(clusterName, instanceName, 
instanceConfig);
   }
 
+  protected void enableDelayRebalanceInCluster(HelixZkClient zkClient, String 
clusterName,
+      boolean enabled, long delay) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    clusterConfig.setDelayRebalaceEnabled(enabled);
+    clusterConfig.setRebalanceDelayTime(delay);
+    configAccessor.setClusterConfig(clusterName, clusterConfig);
+  }
+
   protected void enableP2PInCluster(String clusterName, ConfigAccessor 
configAccessor,
       boolean enable) {
     // enable p2p message in cluster.
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestErrorReplicaPersist.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestErrorReplicaPersist.java
new file mode 100644
index 0000000..126175a
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestErrorReplicaPersist.java
@@ -0,0 +1,151 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+import org.apache.helix.HelixRollbackException;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
+import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.rebalancer.TestAutoRebalance;
+import org.apache.helix.mock.participant.MockDelayMSStateModel;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.helix.tools.ClusterStateVerifier;
+import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestErrorReplicaPersist extends ZkStandAloneCMTestBase {
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    System.out.println("START " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
+
+    int numNode = NODE_NR + 1;
+    _participants = new MockParticipantManager[numNode];
+    // setup storage cluster
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+    createResourceWithDelayedRebalance(CLUSTER_NAME, TEST_DB, 
MasterSlaveSMD.name, _PARTITIONS,
+        _replica, _replica - 1, 1800000, 
CrushEdRebalanceStrategy.class.getName());
+    for (int i = 0; i < numNode; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, _replica);
+
+    // start dummy participants
+    for (int i = 0; i < numNode; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      participant.syncStart();
+      _participants[i] = participant;
+    }
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+    enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true, 1800000);
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+    _controller.syncStart();
+
+    boolean result = ClusterStateVerifier.verifyByZkCallback(
+        new TestAutoRebalance.ExternalViewBalancedVerifier(_gZkClient, 
CLUSTER_NAME, TEST_DB));
+
+    Assert.assertTrue(result);
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    for (MockParticipantManager participant : _participants) {
+      participant.syncStop();
+    }
+    super.afterClass();
+  }
+
+  @Test
+  public void testErrorReplicaPersist() throws InterruptedException {
+    for (int i = 0; i < (NODE_NR + 1) / 2; i++) {
+      _participants[i].syncStop();
+      Thread.sleep(2000);
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      StateMachineEngine stateMachineEngine = 
participant.getStateMachineEngine();
+      stateMachineEngine
+          .registerStateModelFactory(MasterSlaveSMD.name, new 
MockFailedMSStateModelFactory());
+      participant.syncStart();
+      _participants[i] = participant;
+    }
+    HelixClusterVerifier verifier =
+        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
+    Assert.assertTrue(((BestPossibleExternalViewVerifier) 
verifier).verifyByPolling());
+    for (int i = 0; i < (NODE_NR + 1) / 2; i++) {
+      _gSetupTool.getClusterManagementTool()
+          .enableInstance(CLUSTER_NAME, _participants[i].getInstanceName(), 
false);
+    }
+
+    Assert.assertTrue(((BestPossibleExternalViewVerifier) 
verifier).verifyByPolling());
+  }
+
+
+  class MockFailedMSStateModelFactory
+      extends StateModelFactory<MockFailedMSStateModel> {
+
+    @Override
+    public MockFailedMSStateModel createNewStateModel(String resourceName,
+        String partitionKey) {
+      MockFailedMSStateModel model = new MockFailedMSStateModel();
+      return model;
+    }
+  }
+
+  @StateModelInfo(initialState = "OFFLINE", states = { "MASTER", "SLAVE", 
"ERROR"
+  })
+  public static class MockFailedMSStateModel extends StateModel {
+    private static Logger LOG = 
LoggerFactory.getLogger(MockFailedMSStateModel.class);
+
+    public MockFailedMSStateModel() {
+    }
+
+    @Transition(to = "SLAVE", from = "OFFLINE") public void 
onBecomeSlaveFromOffline(
+        Message message, NotificationContext context) throws 
IllegalAccessException {
+      throw new IllegalAccessException("Failed!");
+    }
+
+    @Transition(to = "MASTER", from = "SLAVE") public void 
onBecomeMasterFromSlave(Message message,
+        NotificationContext context) throws InterruptedException, 
HelixRollbackException {
+      LOG.error("Become MASTER from SLAVE");
+    }
+
+    @Transition(to = "SLAVE", from = "MASTER") public void 
onBecomeSlaveFromMaster(Message message,
+        NotificationContext context) {
+      LOG.info("Become Slave from Master");
+    }
+
+    @Transition(to = "OFFLINE", from = "SLAVE") public void 
onBecomeOfflineFromSlave(
+        Message message, NotificationContext context) {
+      LOG.info("Become OFFLINE from SLAVE");
+    }
+
+    @Transition(to = "DROPPED", from = "OFFLINE") public void 
onBecomeDroppedFromOffline(
+        Message message, NotificationContext context) {
+      LOG.info("Become DROPPED FROM OFFLINE");
+    }
+  }
+
+}
diff --git 
a/helix-core/src/test/resources/TestDelayedAutoRebalancer.OnlineOffline.json 
b/helix-core/src/test/resources/TestDelayedAutoRebalancer.OnlineOffline.json
index 9ae6b24..55f5b3c 100644
--- a/helix-core/src/test/resources/TestDelayedAutoRebalancer.OnlineOffline.json
+++ b/helix-core/src/test/resources/TestDelayedAutoRebalancer.OnlineOffline.json
@@ -53,7 +53,7 @@
       "bestPossibleStates": {
         "localhost_2": "ONLINE",
         "localhost_3": "ONLINE",
-        "localhost_0": "DROPPED",
+        "localhost_0": "ERROR",
         "localhost_1": "ONLINE"
       }
     },

Reply via email to