This is an automated email from the ASF dual-hosted git repository.

hzlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b83e8e  Skip cancellation for partition reset pending message (#1637)
0b83e8e is described below

commit 0b83e8eff099db6b87a9fb3e4678cfea190bedcb
Author: Huizhi Lu <[email protected]>
AuthorDate: Fri Feb 5 14:16:40 2021 -0800

    Skip cancellation for partition reset pending message (#1637)
    
    When admin or participant resets partition with ERROR -> OFFLINE state 
transition, helix controller determines it is an unnecessary state transition 
so it cancels the state transition. The ensure partition reset is successful, 
helix controller should skip cancelling the ERROR -> OFFLINE state transition.
    
    This commit adds a logic: Helix controller should not cancel the pending ST 
if the pending ST is ERROR -> initialState
---
 .../controller/stages/MessageGenerationPhase.java  |  29 ++++--
 .../stages/TestCancellationMessageGeneration.java  | 105 ++++++++++++++++++++-
 2 files changed, 123 insertions(+), 11 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index b869ed8..3112327 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -211,8 +211,8 @@ public abstract class MessageGenerationPhase extends 
AbstractBaseStage {
         }
 
         if (desiredState.equals(NO_DESIRED_STATE) || 
desiredState.equalsIgnoreCase(currentState)) {
-          if (desiredState.equals(NO_DESIRED_STATE) || pendingMessage != null 
&& !currentState
-              .equalsIgnoreCase(pendingMessage.getToState())) {
+          if (shouldCreateSTCancellation(pendingMessage, desiredState,
+              stateModelDef.getInitialState())) {
             message = createStateTransitionCancellationMessage(manager, 
resource,
                 partition.getPartitionName(), instanceName, 
sessionIdMap.get(instanceName),
                 stateModelDef.getId(), pendingMessage.getFromState(), 
pendingMessage.getToState(),
@@ -271,6 +271,23 @@ public abstract class MessageGenerationPhase extends 
AbstractBaseStage {
     } // end of for-each-partition
   }
 
+  private boolean shouldCreateSTCancellation(Message pendingMessage, String 
desiredState,
+      String initialState) {
+    if (pendingMessage == null) {
+      return false;
+    }
+    if (NO_DESIRED_STATE.equals(desiredState)) {
+      return true;
+    }
+
+    // Cancel the ST except below scenarios:
+    // 1. pending message toState is desired state
+    // 2. pending message is an ERROR reset: ERROR -> initState (eg. OFFLINE)
+    return !desiredState.equalsIgnoreCase(pendingMessage.getToState())
+        && 
!(HelixDefinedState.ERROR.name().equals(pendingMessage.getFromState())
+        && initialState.equals(pendingMessage.getToState()));
+  }
+
   private void logAndAddToCleanUp(Map<String, Map<String, Message>> 
messagesToCleanUp,
       Message message, String instanceName, String resourceName, Partition 
partition,
       String currentState, String cleanUpMessageType) {
@@ -437,10 +454,10 @@ public abstract class MessageGenerationPhase extends 
AbstractBaseStage {
       boolean isCancellationEnabled, String currentState) {
 
     if (isCancellationEnabled && cancellationMessage == null) {
-      LogUtil.logInfo(logger, _eventId,
-          "Send cancellation message of the state transition for " + 
resource.getResourceName()
-              + "." + partitionName + " on " + instanceName + ", currentState: 
" + currentState
-              + ", nextState: " + (nextState == null ? "N/A" : nextState));
+      logger.info("Event {} : Send cancellation message of the state 
transition for {}.{} on {}, "
+              + "currentState: {}, nextState: {},  toState: {}",
+          _eventId, resource.getResourceName(), partitionName, instanceName,
+          currentState, nextState == null ? "N/A" : nextState, toState);
 
       String uuid = UUID.randomUUID().toString();
       String managerSessionId = manager.getSessionId();
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
index 6500f04..b231e6d 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
@@ -19,8 +19,8 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
-import java.lang.reflect.Array;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -44,16 +44,15 @@ import org.testng.annotations.Test;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-
-/**
- * This test checks the cancellation message generation when currentState=null 
and desiredState=DROPPED
- */
 public class TestCancellationMessageGeneration extends MessageGenerationPhase {
   private static final String TEST_CLUSTER = "testCluster";
   private static final String TEST_RESOURCE = "resource0";
   private static final String TEST_INSTANCE = "instance0";
   private static final String TEST_PARTITION = "partition0";
 
+  /*
+   * This test checks the cancellation message generation when 
currentState=null and desiredState=DROPPED
+   */
   @Test
   public void TestOFFLINEToDROPPED() throws Exception {
 
@@ -113,4 +112,100 @@ public class TestCancellationMessageGeneration extends 
MessageGenerationPhase {
     MessageOutput output = 
event.getAttribute(AttributeName.MESSAGES_ALL.name());
     Assert.assertEquals(output.getMessages(TEST_RESOURCE, partition).size(), 
1);
   }
+
+  /*
+   * Tests that no cancellation message is created for
+   * pending ST message of error partition reset.
+   */
+  @Test
+  public void testNoCancellationForErrorReset() throws Exception {
+    List<Message> messages = generateMessages("ERROR", "ERROR", "OFFLINE");
+
+    Assert.assertTrue(messages.isEmpty(), "Should not create cancellation 
message");
+  }
+
+  /*
+   * Tests that controller should be able to cancel ST: ONLINE -> OFFLINE
+   */
+  @Test
+  public void testCancelOnlineToOffline() throws Exception {
+    List<Message> messages = generateMessages("ONLINE", "ONLINE", "OFFLINE");
+
+    Assert.assertEquals(messages.size(), 1, "Should create cancellation 
message");
+
+    Message msg = messages.get(0);
+    Assert.assertEquals(msg.getMsgType(), 
Message.MessageType.STATE_TRANSITION_CANCELLATION.name());
+    Assert.assertEquals(msg.getFromState(), "ONLINE");
+    Assert.assertEquals(msg.getToState(), "OFFLINE");
+  }
+
+  private List<Message> generateMessages(String currentState, String 
fromState, String toState)
+      throws Exception {
+    ClusterEvent event = new ClusterEvent(TEST_CLUSTER, 
ClusterEventType.Unknown);
+
+    // Set current state to event
+    CurrentStateOutput currentStateOutput = mock(CurrentStateOutput.class);
+    Partition partition = mock(Partition.class);
+    when(partition.getPartitionName()).thenReturn(TEST_PARTITION);
+    when(currentStateOutput.getCurrentState(TEST_RESOURCE, partition, 
TEST_INSTANCE))
+        .thenReturn(currentState);
+
+    // Pending message for error partition reset
+    Message pendingMessage = mock(Message.class);
+    when(pendingMessage.getFromState()).thenReturn(fromState);
+    when(pendingMessage.getToState()).thenReturn(toState);
+    when(currentStateOutput.getPendingMessage(TEST_RESOURCE, partition, 
TEST_INSTANCE))
+        .thenReturn(pendingMessage);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    // Set helix manager to event
+    event.addAttribute(AttributeName.helixmanager.name(), 
mock(HelixManager.class));
+
+    StateModelDefinition stateModelDefinition = new 
StateModelDefinition.Builder("TestStateModel")
+        .addState("ONLINE", 1).addState("OFFLINE")
+        .addState("DROPPED").addState("ERROR")
+        .initialState("OFFLINE")
+        .addTransition("ERROR", "OFFLINE", 1).addTransition("ONLINE", 
"OFFLINE", 2)
+        .addTransition("OFFLINE", "DROPPED", 3).addTransition("OFFLINE", 
"ONLINE", 4)
+        .build();
+
+    // Set controller data provider to event
+    BaseControllerDataProvider cache = mock(BaseControllerDataProvider.class);
+    
when(cache.getStateModelDef(TaskConstants.STATE_MODEL_NAME)).thenReturn(stateModelDefinition);
+    Map<String, LiveInstance> liveInstances = mock(Map.class);
+    LiveInstance mockLiveInstance = mock(LiveInstance.class);
+    when(mockLiveInstance.getInstanceName()).thenReturn(TEST_INSTANCE);
+    when(mockLiveInstance.getEphemeralOwner()).thenReturn("TEST");
+    
when(liveInstances.values()).thenReturn(Collections.singletonList(mockLiveInstance));
+    when(cache.getLiveInstances()).thenReturn(liveInstances);
+    ClusterConfig clusterConfig = mock(ClusterConfig.class);
+    when(cache.getClusterConfig()).thenReturn(clusterConfig);
+    when(clusterConfig.isStateTransitionCancelEnabled()).thenReturn(true);
+    event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
+
+    // Set event attribute: resources to rebalance
+    Map<String, Resource> resourceMap = new HashMap<>();
+    Resource resource = mock(Resource.class);
+    when(resource.getResourceName()).thenReturn(TEST_RESOURCE);
+    List<Partition> partitions = Collections.singletonList(partition);
+    when(resource.getPartitions()).thenReturn(partitions);
+    
when(resource.getStateModelDefRef()).thenReturn(TaskConstants.STATE_MODEL_NAME);
+    resourceMap.put(TEST_RESOURCE, resource);
+    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), 
resourceMap);
+
+    // set up resource state map
+    ResourcesStateMap resourcesStateMap = new ResourcesStateMap();
+    PartitionStateMap partitionStateMap = new PartitionStateMap(TEST_RESOURCE);
+    Map<Partition, Map<String, String>> stateMap = 
partitionStateMap.getStateMap();
+    Map<String, String> instanceStateMap = new HashMap<>();
+    instanceStateMap.put(TEST_INSTANCE, currentState);
+    stateMap.put(partition, instanceStateMap);
+    resourcesStateMap.setState(TEST_RESOURCE, partition, instanceStateMap);
+
+    // Process the event
+    processEvent(event, resourcesStateMap);
+    MessageOutput output = 
event.getAttribute(AttributeName.MESSAGES_ALL.name());
+
+    return output.getMessages(TEST_RESOURCE, partition);
+  }
 }

Reply via email to