This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 5b972f1971d3ca959906ac31353787fb1f9bff32 Author: Yi Wang <[email protected]> AuthorDate: Fri Mar 29 17:28:07 2019 -0700 Check sibling nodes to guarantee MIN_ACTIVE_REPLICAS satisfied RB=1614128 G=helix-reviewers A=jxue,hulee Signed-off-by: Hunter Lee <[email protected]> --- .../java/org/apache/helix/model/ExternalView.java | 21 +++- .../apache/helix/util/InstanceValidationUtil.java | 105 +++++++++++++++--- .../helix/util/TestInstanceValidationUtil.java | 121 +++++++++++++++++++++ .../helix/rest/server/service/InstanceService.java | 7 +- .../rest/server/service/InstanceServiceImpl.java | 4 + .../helix/rest/server/AbstractTestClass.java | 7 +- 6 files changed, 244 insertions(+), 21 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/model/ExternalView.java b/helix-core/src/main/java/org/apache/helix/model/ExternalView.java index 7b201b0..aba8d85 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ExternalView.java +++ b/helix-core/src/main/java/org/apache/helix/model/ExternalView.java @@ -38,7 +38,9 @@ public class ExternalView extends HelixProperty { public enum ExternalViewProperty { INSTANCE_GROUP_TAG, RESOURCE_GROUP_NAME, - GROUP_ROUTING_ENABLED + GROUP_ROUTING_ENABLED, + MIN_ACTIVE_REPLICAS, + STATE_MODEL_DEF_REF } /** @@ -131,6 +133,23 @@ public class ExternalView extends HelixProperty { return _record.getSimpleField(ExternalViewProperty.INSTANCE_GROUP_TAG.toString()); } + /** + * Get the number of minimum active partitions for this resource. + * + * @return + */ + public int getMinActiveReplicas() { + return _record.getIntField(ExternalViewProperty.MIN_ACTIVE_REPLICAS.toString(), -1); + } + + /** + * Get the state model associated with this resource + * @return an identifier of the state model + */ + public String getStateModelDefRef() { + return _record.getSimpleField(ExternalViewProperty.STATE_MODEL_DEF_REF.toString()); + } + @Override public boolean isValid() { return true; diff --git a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java index 385920f..dba8f94 100644 --- a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java +++ b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java @@ -19,9 +19,11 @@ package org.apache.helix.util; * under the License. */ -import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; + import org.apache.helix.AccessOption; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixDataAccessor; @@ -35,10 +37,14 @@ import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.RESTConfig; +import org.apache.helix.model.StateModelDefinition; import org.apache.helix.task.TaskConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + /** * Utility class for validating Helix properties * Warning: each method validates one single property of instance individually and independently. @@ -48,6 +54,9 @@ import org.slf4j.LoggerFactory; public class InstanceValidationUtil { private static final Logger _logger = LoggerFactory.getLogger(InstanceValidationUtil.class); + public static Set<String> UNHEALTHY_STATES = + ImmutableSet.of(HelixDefinedState.DROPPED.name(), HelixDefinedState.ERROR.name()); + public enum HealthStatusType { instanceHealthStatus, partitionHealthStatus @@ -115,12 +124,10 @@ public class InstanceValidationUtil { if (liveInstance != null) { String sessionId = liveInstance.getSessionId(); - PropertyKey currentStatesKey = - propertyKeyBuilder.currentStates(instanceName, sessionId); + PropertyKey currentStatesKey = propertyKeyBuilder.currentStates(instanceName, sessionId); List<String> resourceNames = dataAccessor.getChildNames(currentStatesKey); for (String resourceName : resourceNames) { - PropertyKey key = - propertyKeyBuilder.currentState(instanceName, sessionId, resourceName); + PropertyKey key = propertyKeyBuilder.currentState(instanceName, sessionId, resourceName); CurrentState currentState = dataAccessor.getProperty(key); if (currentState != null && currentState.getPartitionStateMap().size() > 0) { return true; @@ -179,12 +186,10 @@ public class InstanceValidationUtil { if (liveInstance != null) { String sessionId = liveInstance.getSessionId(); - PropertyKey currentStatesKey = - propertyKeyBuilder.currentStates(instanceName, sessionId); + PropertyKey currentStatesKey = propertyKeyBuilder.currentStates(instanceName, sessionId); List<String> resourceNames = dataAccessor.getChildNames(currentStatesKey); for (String resourceName : resourceNames) { - PropertyKey key = - propertyKeyBuilder.currentState(instanceName, sessionId, resourceName); + PropertyKey key = propertyKeyBuilder.currentState(instanceName, sessionId, resourceName); CurrentState currentState = dataAccessor.getProperty(key); if (currentState != null @@ -200,10 +205,9 @@ public class InstanceValidationUtil { /** * Check the overall health status for instance including: - * 1. Per instance health status with application customized key-value entries - * 2. Sibling partitions (replicas for same partition holding on different node - * health status for the entire cluster. - * + * 1. Per instance health status with application customized key-value entries + * 2. Sibling partitions (replicas for same partition holding on different node + * health status for the entire cluster. * @param configAccessor * @param clustername * @param hostName @@ -221,8 +225,8 @@ public class InstanceValidationUtil { return isHealthy; } // TODO : 1. Call REST with customized URL - // 2. Parse mapping result with string -> boolean value and return out for per instance - // 3. Check sibling nodes for partition health + // 2. Parse mapping result with string -> boolean value and return out for per instance + // 3. Check sibling nodes for partition health isHealthy = perInstanceHealthCheck(instanceHealthMap) || perPartitionHealthCheck(partitionHealthMap); @@ -254,9 +258,7 @@ public class InstanceValidationUtil { /** * Check instance is already in the stable state. Here stable means all the ideal state mapping * matches external view (view of current state). - * - * It requires persist assignment on! - * + * It requires PERSIST_INTERMEDIATE_ASSIGNMENT turned on! * @param dataAccessor * @param instanceName * @return @@ -295,4 +297,71 @@ public class InstanceValidationUtil { } return true; } + + /** + * Check if sibling nodes of the instance meet min active replicas constraint + * Two instances are sibling of each other if they host the same partition + * + * WARNING: The check uses ExternalView to reduce network traffic but suffer from accuracy + * due to external view propagation latency + * + * TODO: Use in memory cache and query instance's currentStates + * + * @param dataAccessor + * @param instanceName + * @return + */ + public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAccessor, String instanceName) { + PropertyKey.Builder propertyKeyBuilder = dataAccessor.keyBuilder(); + List<String> idealStates = dataAccessor.getChildNames(propertyKeyBuilder.idealStates()); + List<String> externalViews = dataAccessor.getChildNames(propertyKeyBuilder.externalViews()); + if (idealStates.size() != externalViews.size()) { + throw new HelixException( + "The following resources in IdealStates are not found in ExternalViews: " + + Sets.difference(new HashSet<>(idealStates), new HashSet<>(externalViews))); + } + + for (String externalViewName : externalViews) { + ExternalView externalView = + dataAccessor.getProperty(propertyKeyBuilder.externalView(externalViewName)); + if (externalView == null) { + _logger.error("ExternalView for {} doesn't exist", externalViewName); + continue; + } + // Get the minActiveReplicas constraint for the resource + int minActiveReplicas = externalView.getMinActiveReplicas(); + if (minActiveReplicas == -1) { + throw new HelixException( + "ExternalView " + externalViewName + " is missing minActiveReplica field"); + } + String stateModeDef = externalView.getStateModelDefRef(); + StateModelDefinition stateModelDefinition = + dataAccessor.getProperty(propertyKeyBuilder.stateModelDef(stateModeDef)); + Set<String> unhealthyStates = new HashSet<>(UNHEALTHY_STATES); + if (stateModelDefinition != null) { + unhealthyStates.add(stateModelDefinition.getInitialState()); + } + for (String partition : externalView.getPartitionSet()) { + Map<String, String> stateByInstanceMap = externalView.getStateMap(partition); + // found the resource hosted on the instance + if (stateByInstanceMap.containsKey(instanceName)) { + int numHealthySiblings = 0; + for (Map.Entry<String, String> entry : stateByInstanceMap.entrySet()) { + if (!entry.getKey().equals(instanceName) + && !unhealthyStates.contains(entry.getValue())) { + numHealthySiblings++; + } + } + if (numHealthySiblings < minActiveReplicas) { + _logger.info( + "Partition {} doesn't have enough active replicas in sibling nodes. NumHealthySiblings: {}, minActiveReplicas: {}", + partition, numHealthySiblings, minActiveReplicas); + return false; + } + } + } + } + + return true; + } } diff --git a/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java b/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java new file mode 100644 index 0000000..f26d2bb --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java @@ -0,0 +1,121 @@ +package org.apache.helix.util; + +import static org.mockito.Mockito.*; + +import java.util.Collections; + +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixException; +import org.apache.helix.PropertyKey; +import org.apache.helix.PropertyType; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.StateModelDefinition; +import org.mockito.ArgumentMatcher; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + + +public class TestInstanceValidationUtil { + private static final String TEST_CLUSTER = "testCluster"; + private static final String TEST_INSTANCE = "instance0"; + + @Test + public void TestSiblingNodesActiveReplicaCheck_success() { + String resource = "resource"; + Mock mock = new Mock(); + doReturn(ImmutableList.of(resource)).when(mock.dataAccessor).getChildNames(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW))); + doReturn(ImmutableList.of(resource)).when(mock.dataAccessor).getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES))); + ExternalView externalView = mock(ExternalView.class); + when(externalView.getMinActiveReplicas()).thenReturn(2); + when(externalView.getStateModelDefRef()).thenReturn("MasterSlave"); + when(externalView.getPartitionSet()).thenReturn(ImmutableSet.of("db0")); + when(externalView.getStateMap("db0")).thenReturn(ImmutableMap.of( + TEST_INSTANCE, "Master", + "instance1", "Slave", + "instance2", "Slave", + "instance3", "Slave")); + doReturn(externalView).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW))); + StateModelDefinition stateModelDefinition = mock(StateModelDefinition.class); + when(stateModelDefinition.getInitialState()).thenReturn("OFFLINE"); + doReturn(stateModelDefinition).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS))); + + boolean result = InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE); + + Assert.assertTrue(result); + } + + @Test + public void TestSiblingNodesActiveReplicaCheck_fail() { + String resource = "resource"; + Mock mock = new Mock(); + doReturn(ImmutableList.of(resource)).when(mock.dataAccessor).getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES))); + doReturn(ImmutableList.of(resource)).when(mock.dataAccessor).getChildNames(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW))); + ExternalView externalView = mock(ExternalView.class); + when(externalView.getMinActiveReplicas()).thenReturn(3); + when(externalView.getStateModelDefRef()).thenReturn("MasterSlave"); + when(externalView.getPartitionSet()).thenReturn(ImmutableSet.of("db0")); + when(externalView.getStateMap("db0")).thenReturn(ImmutableMap.of( + TEST_INSTANCE, "Master", + "instance1", "Slave", + "instance2", "Slave")); + doReturn(externalView).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW))); + StateModelDefinition stateModelDefinition = mock(StateModelDefinition.class); + when(stateModelDefinition.getInitialState()).thenReturn("OFFLINE"); + doReturn(stateModelDefinition).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS))); + + boolean result = InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE); + + Assert.assertFalse(result); + } + + @Test (expectedExceptions = HelixException.class) + public void TestSiblingNodesActiveReplicaCheck_exception_whenIdealStatesMisMatch() { + String resource = "resource"; + Mock mock = new Mock(); + doReturn(ImmutableList.of(resource)).when(mock.dataAccessor).getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES))); + doReturn(Collections.emptyList()).when(mock.dataAccessor).getChildNames(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW))); + ExternalView externalView = mock(ExternalView.class); + when(externalView.getMinActiveReplicas()).thenReturn(-1); + doReturn(externalView).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW))); + + InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE); + } + + @Test (expectedExceptions = HelixException.class) + public void TestSiblingNodesActiveReplicaCheck_exception_whenMissingMinActiveReplicas() { + String resource = "resource"; + Mock mock = new Mock(); + doReturn(ImmutableList.of(resource)).when(mock.dataAccessor).getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES))); + doReturn(Collections.emptyList()).when(mock.dataAccessor).getChildNames(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW))); + + InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE); + } + + private class Mock { + HelixDataAccessor dataAccessor; + + Mock() { + this.dataAccessor = mock(HelixDataAccessor.class); + when(dataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder(TEST_CLUSTER)); + } + } + + public static class PropertyKeyArgument extends ArgumentMatcher<PropertyKey> { + private PropertyType propertyType; + + public PropertyKeyArgument(PropertyType propertyType) { + this.propertyType = propertyType; + } + + @Override + public boolean matches(Object o) { + PropertyKey propertyKey = (PropertyKey) o; + + return this.propertyType == propertyKey.getType(); + } + } +} diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java index f32551b..471a4ec 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java @@ -58,7 +58,12 @@ public interface InstanceService { /** * Check if instance has error partitions */ - HAS_ERROR_PARTITION; + HAS_ERROR_PARTITION, + /** + * Check if all resources hosted on the instance can still meet the min active replica + * constraint if this instance is shutdown + */ + MIN_ACTIVE_REPLICA_CHECK_FAILED; /** * Pre-defined list of checks to test if an instance can be stopped at runtime diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java index a928b98..ff12678 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java @@ -87,6 +87,10 @@ public class InstanceServiceImpl implements InstanceService { healthStatus.put(HealthCheck.EMPTY_RESOURCE_ASSIGNMENT.name(), InstanceValidationUtil.hasResourceAssigned(_dataAccessor, clusterId, instanceName)); break; + case MIN_ACTIVE_REPLICA_CHECK_FAILED: + healthStatus.put(HealthCheck.MIN_ACTIVE_REPLICA_CHECK_FAILED.name(), + InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor, instanceName)); + break; default: _logger.error("Unsupported health check: {}", healthCheck); break; diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java index ab30419..e5c42a0 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java @@ -54,6 +54,7 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.manager.zk.client.DedicatedZkClientFactory; import org.apache.helix.manager.zk.client.HelixZkClient; import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.rest.common.ContextPropertyKeys; @@ -98,7 +99,8 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest { protected static final String WORKFLOW_PREFIX = "Workflow_"; protected static final String JOB_PREFIX = "Job_"; protected static int NUM_PARTITIONS = 10; - protected static int NUM_REPLICA = 3; + protected static int NUM_REPLICA = 2; + protected static int MIN_ACTIVE_REPLICA = 3; protected static ZkServer _zkServer; protected static HelixZkClient _gZkClient; protected static ClusterSetup _gSetupTool; @@ -316,6 +318,9 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest { for (int i = 0; i < numResources; i++) { String resource = cluster + "_db_" + i; _gSetupTool.addResourceToCluster(cluster, resource, NUM_PARTITIONS, "MasterSlave"); + IdealState idealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(cluster, resource); + idealState.setMinActiveReplicas(MIN_ACTIVE_REPLICA); + _gSetupTool.getClusterManagementTool().setResourceIdealState(cluster, resource, idealState); _gSetupTool.rebalanceStorageCluster(cluster, resource, NUM_REPLICA); resources.add(resource); }
