This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit d93f05c6c750346c2ccfe80e89aaa0fee150a982 Author: Yi Wang <[email protected]> AuthorDate: Thu May 2 17:45:08 2019 -0700 Integrate customRestClient health check with instance service main logic RB=1645567 G=helix-reviewers A=jxue Signed-off-by: Hunter Lee <[email protected]> --- .../main/java/org/apache/helix/ConfigAccessor.java | 10 +- .../java/org/apache/helix/model/RESTConfig.java | 45 ++-- .../apache/helix/util/InstanceValidationUtil.java | 72 +++--- .../helix/util/TestInstanceValidationUtil.java | 28 +- .../apache/helix/rest/client/CustomRestClient.java | 3 +- .../helix/rest/client/CustomRestClientFactory.java | 3 +- .../helix/rest/client/CustomRestClientImpl.java | 2 +- .../rest/server/json/cluster/PartitionHealth.java | 40 ++- .../rest/server/json/instance/StoppableCheck.java | 60 +++-- .../server/resources/helix/InstancesAccessor.java | 6 +- .../resources/helix/PerInstanceAccessor.java | 58 ++--- .../helix/rest/server/service/InstanceService.java | 27 +- .../rest/server/service/InstanceServiceImpl.java | 287 ++++++++++++--------- .../server/json/instance/TestStoppableCheck.java | 26 +- .../rest/server/service/TestInstanceService.java | 188 ++++++++++---- 15 files changed, 468 insertions(+), 387 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java index 1d4c5e8..15ea4a0 100644 --- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java @@ -27,12 +27,12 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import org.apache.helix.manager.zk.ZKUtil; import org.apache.helix.manager.zk.client.HelixZkClient; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ConfigScope; import org.apache.helix.model.HelixConfigScope; import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; -import org.apache.helix.manager.zk.ZKUtil; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.RESTConfig; import org.apache.helix.model.ResourceConfig; @@ -563,11 +563,11 @@ public class ConfigAccessor { } /** - * Get ClusterConfig of the given cluster. + * Get RestConfig of the given cluster. * - * @param clusterName + * @param clusterName The cluster * - * @return + * @return The instance of {@link RESTConfig} */ public RESTConfig getRESTConfig(String clusterName) { HelixConfigScope scope = @@ -575,7 +575,7 @@ public class ConfigAccessor { ZNRecord record = getConfigZnRecord(scope); if (record == null) { - LOG.warn("No config found at " + scope.getZkPath()); + LOG.warn("No rest config found at " + scope.getZkPath()); return null; } diff --git a/helix-core/src/main/java/org/apache/helix/model/RESTConfig.java b/helix-core/src/main/java/org/apache/helix/model/RESTConfig.java index a47226c..6683f72 100644 --- a/helix-core/src/main/java/org/apache/helix/model/RESTConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/RESTConfig.java @@ -3,18 +3,20 @@ package org.apache.helix.model; import org.apache.helix.HelixProperty; import org.apache.helix.ZNRecord; -public class RESTConfig extends HelixProperty { - public enum RESTConfigProperty { - CUSTOMIZED_HEALTH_URL // User customized URL for getting participant health status or partition - // health status. - } +/** + * The configuration entry for persisting the client side rest endpoint + * The rest endpoint is used for helix to fetch the health status or other important status of the participant at runtime + */ +public class RESTConfig extends HelixProperty { /** - * Instantiate REST config for a specific cluster - * @param cluster the cluster identifier + * Corresponds to "simpleFields" concept in ZnNode */ - public RESTConfig(String cluster) { - super(cluster); + public enum SimpleFields { + /** + * Customized URL for getting participant(instance)'s health status or partition's health status. + */ + CUSTOMIZED_HEALTH_URL } /** @@ -26,26 +28,15 @@ public class RESTConfig extends HelixProperty { super(record); } - /** - * Set up the user defined URL for check per participant health / per partition health by combine - * URL and final endpoint. It must ended without "/" - * - * eg: http://*:12345/customized/path/check - * - * @param customizedHealthURL - */ - public void setCustomizedHealthURL(String customizedHealthURL) { - _record.setSimpleField(RESTConfigProperty.CUSTOMIZED_HEALTH_URL.name(), customizedHealthURL); + public RESTConfig(String id) { + super(id); } - /** - * Get user defined URL to construct per participant health / partition health - * Return null if it does not exist. - * - * @return - */ - public String getCustomizedHealthURL() { - return _record.getSimpleField(RESTConfigProperty.CUSTOMIZED_HEALTH_URL.name()); + public void set(SimpleFields property, String value) { + _record.setSimpleField(property.name(), value); } + public String get(SimpleFields property) { + return _record.getSimpleField(property.name()); + } } diff --git a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java index 6ac9605..7d23117 100644 --- a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java +++ b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java @@ -19,29 +19,21 @@ package org.apache.helix.util; * under the License. */ -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.helix.ConfigAccessor; +import java.util.*; +import java.util.stream.Collectors; + import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixDefinedState; import org.apache.helix.HelixException; import org.apache.helix.PropertyKey; -import org.apache.helix.model.ClusterConfig; -import org.apache.helix.model.CurrentState; -import org.apache.helix.model.ExternalView; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.InstanceConfig; -import org.apache.helix.model.LiveInstance; -import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.model.*; import org.apache.helix.task.TaskConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + /** * Utility class for validating Helix properties * Warning: each method validates one single property of instance individually and independently. @@ -53,27 +45,19 @@ public class InstanceValidationUtil { public static Set<String> UNHEALTHY_STATES = ImmutableSet.of(HelixDefinedState.DROPPED.name(), HelixDefinedState.ERROR.name()); - public enum HealthStatusType { - instanceHealthStatus, - partitionHealthStatus - } - private InstanceValidationUtil() { } /** * Method to check if the instance is enabled by configuration * @param dataAccessor - * @param configAccessor - * @param clusterId * @param instanceName * @return */ - public static boolean isEnabled(HelixDataAccessor dataAccessor, ConfigAccessor configAccessor, - String clusterId, String instanceName) { + public static boolean isEnabled(HelixDataAccessor dataAccessor, String instanceName) { PropertyKey.Builder propertyKeyBuilder = dataAccessor.keyBuilder(); InstanceConfig instanceConfig = dataAccessor.getProperty(propertyKeyBuilder.instanceConfig(instanceName)); - ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterId); + ClusterConfig clusterConfig = dataAccessor.getProperty(propertyKeyBuilder.clusterConfig()); // TODO deprecate instance level config checks once migrated the enable status to cluster config only if (instanceConfig == null || clusterConfig == null) { throw new HelixException("InstanceConfig or ClusterConfig is NULL"); @@ -89,12 +73,10 @@ public class InstanceValidationUtil { /** * Method to check if the instance is up and running by configuration * @param dataAccessor - * @param clusterId * @param instanceName * @return */ - public static boolean isAlive(HelixDataAccessor dataAccessor, String clusterId, - String instanceName) { + public static boolean isAlive(HelixDataAccessor dataAccessor, String instanceName) { LiveInstance liveInstance = dataAccessor.getProperty(dataAccessor.keyBuilder().liveInstance(instanceName)); return liveInstance != null; } @@ -193,34 +175,42 @@ public class InstanceValidationUtil { } /** - * Perform sibling node partition health check - * @param partitionHealthMap - * @return + * Get the problematic partitions on the to-be-stop instance + * Requirement: + * If the instance gets stopped and the partitions on the instance are OFFLINE, + * the cluster still have enough "healthy" replicas on other sibling instances + * + * - sibling instances mean those who share the same partition (replicas) of the to-be-stop instance + * + * @param globalPartitionHealthStatus (instance => (partition name, health status)) + * @param instanceToBeStop The instance to be stopped + * @param dataAccessor The data accessor + * @return A list of problematic partitions if the instance is stopped */ public static List<String> perPartitionHealthCheck(List<ExternalView> externalViews, - Map<String, Map<String, Boolean>> partitionHealthMap, String instanceName, - HelixDataAccessor accessor) { + Map<String, Map<String, Boolean>> globalPartitionHealthStatus, String instanceToBeStop, + HelixDataAccessor dataAccessor) { List<String> unhealthyPartitions = new ArrayList<>(); for (ExternalView externalView : externalViews) { - StateModelDefinition stateModelDefinition = accessor - .getProperty(accessor.keyBuilder().stateModelDef(externalView.getStateModelDefRef())); + StateModelDefinition stateModelDefinition = dataAccessor + .getProperty(dataAccessor.keyBuilder().stateModelDef(externalView.getStateModelDefRef())); for (String partition : externalView.getPartitionSet()) { Map<String, String> stateMap = externalView.getStateMap(partition); // Only check if instance holds top state - if (stateMap.containsKey(instanceName) && stateMap.get(instanceName) - .equals(stateModelDefinition.getTopState())) { + if (stateMap.containsKey(instanceToBeStop) + && stateMap.get(instanceToBeStop).equals(stateModelDefinition.getTopState())) { for (String siblingInstance : stateMap.keySet()) { // Skip this self check - if (siblingInstance.equals(instanceName)) { + if (siblingInstance.equals(instanceToBeStop)) { continue; } // We are checking sibling partition healthy status. So if partition health does not // exist or it is not healthy. We should mark this partition is unhealthy. - if (!partitionHealthMap.containsKey(siblingInstance) || !partitionHealthMap - .get(siblingInstance).containsKey(partition) - || !partitionHealthMap.get(siblingInstance).get(partition)) { + if (!globalPartitionHealthStatus.containsKey(siblingInstance) + || !globalPartitionHealthStatus.get(siblingInstance).containsKey(partition) + || !globalPartitionHealthStatus.get(siblingInstance).get(partition)) { unhealthyPartitions.add(partition); break; } diff --git a/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java b/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java index 5f72ee8..cbbbfd6 100644 --- a/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java +++ b/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java @@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableSet; public class TestInstanceValidationUtil { private static final String TEST_CLUSTER = "testCluster"; private static final String TEST_INSTANCE = "instance0"; + private static final PropertyKey.Builder BUILDER = new PropertyKey.Builder(TEST_CLUSTER); @DataProvider Object[][] isEnabledTestSuite() { @@ -52,15 +53,15 @@ public class TestInstanceValidationUtil { InstanceConfig instanceConfig = new InstanceConfig(TEST_INSTANCE); instanceConfig.setInstanceEnabled(instanceConfigEnabled); doReturn(instanceConfig).when(mock.dataAccessor) - .getProperty(argThat(new PropertyKeyArgument(PropertyType.CONFIGS))); + .getProperty(BUILDER.instanceConfig(TEST_INSTANCE)); ClusterConfig clusterConfig = new ClusterConfig(TEST_CLUSTER); if (!clusterConfigEnabled) { clusterConfig.setDisabledInstances(ImmutableMap.of(TEST_INSTANCE, "12345")); } - when(mock.configAccessor.getClusterConfig(TEST_CLUSTER)).thenReturn(clusterConfig); + doReturn(clusterConfig).when(mock.dataAccessor) + .getProperty(BUILDER.clusterConfig()); - boolean isEnabled = InstanceValidationUtil.isEnabled(mock.dataAccessor, mock.configAccessor, - TEST_CLUSTER, TEST_INSTANCE); + boolean isEnabled = InstanceValidationUtil.isEnabled(mock.dataAccessor, TEST_INSTANCE); Assert.assertEquals(isEnabled, expected); } @@ -70,11 +71,8 @@ public class TestInstanceValidationUtil { Mock mock = new Mock(); doReturn(null).when(mock.dataAccessor) .getProperty(argThat(new PropertyKeyArgument(PropertyType.CONFIGS))); - when(mock.configAccessor.getClusterConfig(TEST_CLUSTER)) - .thenReturn(new ClusterConfig(TEST_CLUSTER)); - InstanceValidationUtil.isEnabled(mock.dataAccessor, mock.configAccessor, TEST_CLUSTER, - TEST_INSTANCE); + InstanceValidationUtil.isEnabled(mock.dataAccessor, TEST_INSTANCE); } @Test(expectedExceptions = HelixException.class) @@ -82,10 +80,10 @@ public class TestInstanceValidationUtil { Mock mock = new Mock(); doReturn(new InstanceConfig(TEST_INSTANCE)).when(mock.dataAccessor) .getProperty(argThat(new PropertyKeyArgument(PropertyType.CONFIGS))); - when(mock.configAccessor.getClusterConfig(TEST_CLUSTER)).thenReturn(null); + doReturn(null).when(mock.dataAccessor) + .getProperty(BUILDER.clusterConfig()); - InstanceValidationUtil.isEnabled(mock.dataAccessor, mock.configAccessor, TEST_CLUSTER, - TEST_INSTANCE); + InstanceValidationUtil.isEnabled(mock.dataAccessor, TEST_INSTANCE); } @Test @@ -94,8 +92,7 @@ public class TestInstanceValidationUtil { doReturn(new LiveInstance(TEST_INSTANCE)).when(mock.dataAccessor) .getProperty(argThat(new PropertyKeyArgument(PropertyType.LIVEINSTANCES))); - Assert - .assertTrue(InstanceValidationUtil.isAlive(mock.dataAccessor, TEST_CLUSTER, TEST_INSTANCE)); + Assert.assertTrue(InstanceValidationUtil.isAlive(mock.dataAccessor, TEST_INSTANCE)); } @Test @@ -263,7 +260,8 @@ public class TestInstanceValidationUtil { IdealState idealState = mock(IdealState.class); when(idealState.isEnabled()).thenReturn(true); when(idealState.getPartitionSet()).thenReturn(ImmutableSet.of("db0")); - when(idealState.getInstanceStateMap("db0")).thenReturn(ImmutableMap.of(TEST_INSTANCE, "Master")); + when(idealState.getInstanceStateMap("db0")) + .thenReturn(ImmutableMap.of(TEST_INSTANCE, "Master")); when(idealState.isValid()).thenReturn(true); when(idealState.getStateModelDefRef()).thenReturn("MasterSlave"); doReturn(idealState).when(mock.dataAccessor) @@ -416,7 +414,7 @@ public class TestInstanceValidationUtil { Mock() { this.dataAccessor = mock(HelixDataAccessor.class); this.configAccessor = mock(ConfigAccessor.class); - when(dataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder(TEST_CLUSTER)); + when(dataAccessor.keyBuilder()).thenReturn(BUILDER); } } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClient.java b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClient.java index afa4ef3..8b7c5ba 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClient.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClient.java @@ -39,7 +39,8 @@ public interface CustomRestClient { throws IOException; /** - * Get stoppable check result on partition + * Get stoppable check result on a list of partitions on the instance + * * @param baseUrl the base url of the participant * @param partitions a list of partitions maintained by the participant * @param customPayloads generic payloads required from client side and helix only works as proxy diff --git a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientFactory.java b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientFactory.java index 362818c..7a1c58f 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientFactory.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientFactory.java @@ -33,12 +33,13 @@ public class CustomRestClientFactory { private CustomRestClientFactory() { } - public static CustomRestClient get(String jsonContent) { + public static CustomRestClient get() { if (INSTANCE == null) { synchronized (CustomRestClientFactory.class) { if (INSTANCE == null) { try { INSTANCE = new CustomRestClientImpl(); + return INSTANCE; } catch (Exception e) { LOG.error("Exception when initializing CustomRestClient", e); } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java index 0db5a9b..1a6af22 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java @@ -145,4 +145,4 @@ class CustomRestClientImpl implements CustomRestClient { throw e; } } -} +} \ No newline at end of file diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/json/cluster/PartitionHealth.java b/helix-rest/src/main/java/org/apache/helix/rest/server/json/cluster/PartitionHealth.java index 1fc2564..a09e383 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/json/cluster/PartitionHealth.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/json/cluster/PartitionHealth.java @@ -20,52 +20,44 @@ package org.apache.helix.rest.server.json.cluster; */ import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; + public class PartitionHealth { // Partition health map stores the global metadata about the partition health in the format of // instanceName -> partitionName -> isHealthy - private Map<String, Map<String, Boolean>> _paritionHealthMap; - private Map<String, List<String>> _instancesThatNeedDirectCallWithPartitions; + private Map<String, Map<String, Boolean>> _instanceToPartitionHealthMap; + private Map<String, List<String>> _instanceToPartitionsMap; public PartitionHealth() { - _paritionHealthMap = new HashMap<>(); - _instancesThatNeedDirectCallWithPartitions = new HashMap<>(); + _instanceToPartitionHealthMap = new HashMap<>(); + _instanceToPartitionsMap = new HashMap<>(); } public void addInstanceThatNeedDirectCallWithPartition(String instanceName, String partitionName) { - _instancesThatNeedDirectCallWithPartitions + _instanceToPartitionsMap .computeIfAbsent(instanceName, partitions -> new ArrayList<>()).add(partitionName); } - public void setPartitionHealthForInstance(String instanceName, - Map<String, Boolean> partitionHealth) { - _paritionHealthMap.put(instanceName, partitionHealth); - } - public void addSinglePartitionHealthForInstance(String instanceName, String partitionName, Boolean isHealthy) { - _paritionHealthMap.computeIfAbsent(instanceName, partitionMap -> new HashMap<>()) + _instanceToPartitionHealthMap.computeIfAbsent(instanceName, partitionMap -> new HashMap<>()) .put(partitionName, isHealthy); } - public List<String> getInstanceThatNeedDirectCallWithPartitions(String instanceName) { - return _instancesThatNeedDirectCallWithPartitions.getOrDefault(instanceName, - Collections.EMPTY_LIST); + public Map<String, List<String>> getExpiredRecords() { + return _instanceToPartitionsMap; } - public Map<String, Map<String, Boolean>> getParitionHealthMap() { - return _paritionHealthMap; + public void updatePartitionHealth(String instance, String partition, boolean isHealthy) { + _instanceToPartitionHealthMap.get(instance).put(partition, isHealthy); } - public void removePartitionHealthForInstance(String instanceName) { - _paritionHealthMap.remove(instanceName); + public Map<String, Map<String, Boolean>> getGlobalPartitionHealth() { + return _instanceToPartitionHealthMap; } @Override @@ -74,8 +66,8 @@ public class PartitionHealth { return false; } - return _paritionHealthMap.equals(((PartitionHealth) o)._paritionHealthMap) - && _instancesThatNeedDirectCallWithPartitions - .equals(((PartitionHealth) o)._instancesThatNeedDirectCallWithPartitions); + return _instanceToPartitionHealthMap.equals(((PartitionHealth) o)._instanceToPartitionHealthMap) + && _instanceToPartitionsMap + .equals(((PartitionHealth) o)._instanceToPartitionsMap); } } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java b/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java index b78a9c6..44458b3 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java @@ -19,54 +19,52 @@ package org.apache.helix.rest.server.json.instance; * under the License. */ -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; public class StoppableCheck { - private static final String HELIX_CHECK_PREFIX = "Helix:"; - private static final String CUSTOM_CHECK_PREFIX = "Custom:"; + // Category to differentiate which step the check fails + public enum Category { + HELIX_OWN_CHECK("Helix:"), + CUSTOM_INSTANCE_CHECK("CustomInstance:"), + CUSTOM_PARTITION_CHECK("CustomPartition:"); + + String prefix; + + Category(String prefix) { + this.prefix = prefix; + } + } @JsonProperty("stoppable") private boolean isStoppable; + // The list of failed checks should be sorted to make test consistent pass @JsonProperty("failedChecks") private List<String> failedChecks; - public StoppableCheck(boolean isStoppable, List<String> failedChecks) { + public StoppableCheck(boolean isStoppable, List<String> failedChecks, Category category) { this.isStoppable = isStoppable; - // sort the failed checks in order so that tests can always pass - Collections.sort(failedChecks); - this.failedChecks = failedChecks; + this.failedChecks = failedChecks.stream() + .sorted() + .map(checkName -> appendPrefix(checkName, category)) + .collect(Collectors.toList()); } - public static StoppableCheck mergeStoppableChecks(Map<String, Boolean> helixChecks, Map<String, Boolean> customChecks) { - Map<String, Boolean> mergedResult = ImmutableMap.<String, Boolean>builder() - .putAll(appendPrefix(helixChecks, HELIX_CHECK_PREFIX)) - .putAll(appendPrefix(customChecks, CUSTOM_CHECK_PREFIX)) - .build(); - - List<String> failedChecks = new ArrayList<>(); - for (Map.Entry<String, Boolean> entry : mergedResult.entrySet()) { - if (!entry.getValue()) { - failedChecks.add(entry.getKey()); - } - } - - return new StoppableCheck(failedChecks.isEmpty(), failedChecks); + public StoppableCheck(Map<String, Boolean> checks, Category category) { + this.failedChecks = Maps.filterValues(checks, Boolean.FALSE::equals).keySet() + .stream() + .sorted() + .map(checkName -> appendPrefix(checkName, category)) + .collect(Collectors.toList()); + this.isStoppable = this.failedChecks.isEmpty(); } - private static Map<String, Boolean> appendPrefix(Map<String, Boolean> checks, String prefix) { - Map<String, Boolean> result = new HashMap<>(); - for (Map.Entry<String, Boolean> entry : checks.entrySet()) { - result.put(prefix + entry.getKey(), entry.getValue()); - } - - return result; + private String appendPrefix(String checkName, Category category) { + return category.prefix + checkName; } public boolean isStoppable() { diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java index 3ff4e13..99449be 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java @@ -20,8 +20,10 @@ import org.apache.commons.lang3.NotImplementedException; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.InstanceConfig; +import org.apache.helix.rest.common.HelixDataAccessorWrapper; import org.apache.helix.rest.server.json.instance.StoppableCheck; import org.apache.helix.rest.server.resources.exceptions.HelixHealthException; import org.apache.helix.rest.server.service.ClusterService; @@ -171,13 +173,13 @@ public class InstancesAccessor extends AbstractHelixResource { ObjectNode failedStoppableInstances = result.putObject( InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); InstanceService instanceService = - new InstanceServiceImpl(getDataAccssor(clusterId), getConfigAccessor()); + new InstanceServiceImpl(new HelixDataAccessorWrapper((ZKHelixDataAccessor) getDataAccssor(clusterId)), getConfigAccessor()); switch (selectionBase) { case zone_based: List<String> zoneBasedInstance = getZoneBasedInstances(clusterId, instances, orderOfZone); for (String instance : zoneBasedInstance) { StoppableCheck stoppableCheckResult = - instanceService.checkSingleInstanceStoppable(clusterId, instance, customizedInput); + instanceService.getInstanceStoppableCheck(clusterId, instance, customizedInput); if (!stoppableCheckResult.isStoppable()) { ArrayNode failedReasonsNode = failedStoppableInstances.putArray(instance); for (String failedReason : stoppableCheckResult.getFailedChecks()) { diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java index e95f0a2..6b615ee 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java @@ -19,32 +19,17 @@ package org.apache.helix.rest.server.resources.helix; * under the License. */ -import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.List; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.QueryParam; + +import javax.ws.rs.*; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import org.apache.helix.ConfigAccessor; -import org.apache.helix.HelixAdmin; -import org.apache.helix.HelixDataAccessor; -import org.apache.helix.HelixException; -import org.apache.helix.ZNRecord; + +import org.apache.helix.*; import org.apache.helix.manager.zk.ZKHelixDataAccessor; -import org.apache.helix.model.CurrentState; +import org.apache.helix.model.*; import org.apache.helix.model.Error; -import org.apache.helix.model.HealthStat; -import org.apache.helix.model.HelixConfigScope; -import org.apache.helix.model.InstanceConfig; -import org.apache.helix.model.Message; -import org.apache.helix.model.ParticipantHistory; import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.helix.rest.common.HelixDataAccessorWrapper; import org.apache.helix.rest.server.json.instance.InstanceInfo; @@ -59,10 +44,11 @@ import org.eclipse.jetty.util.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.ObjectMapper; @Path("/clusters/{clusterId}/instances/{instanceName}") public class PerInstanceAccessor extends AbstractHelixResource { - private final static Logger _logger = LoggerFactory.getLogger(PerInstanceAccessor.class); + private final static Logger LOG = LoggerFactory.getLogger(PerInstanceAccessor.class); public enum PerInstanceProperties { config, @@ -86,8 +72,7 @@ public class PerInstanceAccessor extends AbstractHelixResource { HelixDataAccessor dataAccessor = getDataAccssor(clusterId); // TODO reduce GC by dependency injection InstanceService instanceService = - new InstanceServiceImpl(new HelixDataAccessorWrapper((ZKHelixDataAccessor) dataAccessor), - getConfigAccessor()); + new InstanceServiceImpl(new HelixDataAccessorWrapper((ZKHelixDataAccessor) dataAccessor), getConfigAccessor()); InstanceInfo instanceInfo = instanceService.getInstanceInfo(clusterId, instanceName, InstanceService.HealthCheck.STARTED_AND_HEALTH_CHECK_LIST); @@ -100,13 +85,16 @@ public class PerInstanceAccessor extends AbstractHelixResource { public Response isInstanceStoppable(String jsonContent, @PathParam("clusterId") String clusterId, @PathParam("instanceName") String instanceName) throws IOException { ObjectMapper objectMapper = new ObjectMapper(); + HelixDataAccessor dataAccessor = getDataAccssor(clusterId); + InstanceService instanceService = + new InstanceServiceImpl(new HelixDataAccessorWrapper((ZKHelixDataAccessor) dataAccessor), getConfigAccessor()); StoppableCheck stoppableCheck = null; try { - stoppableCheck = new InstanceServiceImpl(getDataAccssor(clusterId), getConfigAccessor()) - .checkSingleInstanceStoppable(clusterId, instanceName, jsonContent); + stoppableCheck = + instanceService.getInstanceStoppableCheck(clusterId, instanceName, jsonContent); } catch (HelixException e) { - _logger - .error(String.format("Current cluster %s has issue with health checks!", clusterId), e); + LOG.error(String.format("Current cluster %s has issue with health checks!", clusterId), + e); return serverError(e); } return OK(objectMapper.writeValueAsString(stoppableCheck)); @@ -120,14 +108,14 @@ public class PerInstanceAccessor extends AbstractHelixResource { try { record = toZNRecord(content); } catch (IOException e) { - _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e); + LOG.error("Failed to deserialize user's input " + content + ", Exception: " + e); return badRequest("Input is not a vaild ZNRecord!"); } try { admin.addInstance(clusterId, new InstanceConfig(record)); } catch (Exception ex) { - _logger.error("Error in adding an instance: " + instanceName, ex); + LOG.error("Error in adding an instance: " + instanceName, ex); return serverError(ex); } @@ -207,11 +195,11 @@ public class PerInstanceAccessor extends AbstractHelixResource { OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class))); break; default: - _logger.error("Unsupported command :" + command); + LOG.error("Unsupported command :" + command); return badRequest("Unsupported command :" + command); } } catch (Exception e) { - _logger.error("Failed in updating instance : " + instanceName, e); + LOG.error("Failed in updating instance : " + instanceName, e); return badRequest(e.getMessage()); } return OK(); @@ -266,7 +254,7 @@ public class PerInstanceAccessor extends AbstractHelixResource { try { record = toZNRecord(content); } catch (IOException e) { - _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e); + LOG.error("Failed to deserialize user's input " + content + ", Exception: " + e); return badRequest("Input is not a vaild ZNRecord!"); } InstanceConfig instanceConfig = new InstanceConfig(record); @@ -288,7 +276,7 @@ public class PerInstanceAccessor extends AbstractHelixResource { } catch (HelixException ex) { return notFound(ex.getMessage()); } catch (Exception ex) { - _logger.error(String.format("Error in update instance config for instance: %s", instanceName), + LOG.error(String.format("Error in update instance config for instance: %s", instanceName), ex); return serverError(ex); } @@ -423,14 +411,14 @@ public class PerInstanceAccessor extends AbstractHelixResource { List<String> messageNames = accessor.getChildNames(accessor.keyBuilder().messages(instanceName)); if (messageNames == null || messageNames.size() == 0) { - _logger.warn("Unable to get any messages on instance: " + instanceName); + LOG.warn("Unable to get any messages on instance: " + instanceName); return notFound(); } for (String messageName : messageNames) { Message message = accessor.getProperty(accessor.keyBuilder().message(instanceName, messageName)); if (message == null) { - _logger.warn("Message is deleted given message name: ", messageName); + LOG.warn("Message is deleted given message name: ", messageName); continue; } // if stateModelDef is valid, keep messages with StateModelDef equals to the parameter diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java index 3e642cb..51bc354 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java @@ -22,12 +22,11 @@ package org.apache.helix.rest.server.service; import java.io.IOException; import java.util.Arrays; import java.util.List; -import java.util.Map; import org.apache.helix.rest.server.json.instance.InstanceInfo; +import org.apache.helix.rest.server.json.instance.StoppableCheck; import com.google.common.collect.ImmutableList; -import org.apache.helix.rest.server.json.instance.StoppableCheck; public interface InstanceService { enum HealthCheck { @@ -79,24 +78,20 @@ public interface InstanceService { } /** - * Get the current instance stoppable checks based on Helix own business logic - * @param clusterId - * @param instanceName - * @return a map where key is stoppable check name and boolean value represents whether the check - * succeeds - */ - Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName, - List<HealthCheck> healthChecks); - - /** * Get the overall status of the instance - * @param clusterId - * @param instanceName - * @return + * @param clusterId The cluster id + * @param instanceName The instance name + * @return An instance of {@link InstanceInfo} easily convertible to JSON */ InstanceInfo getInstanceInfo(String clusterId, String instanceName, List<HealthCheck> healthChecks); - StoppableCheck checkSingleInstanceStoppable(String clusterId, String instanceName, + /** + * Get the current instance stoppable checks + * @param clusterId The cluster id + * @param instanceName The instance name + * @return An instance of {@link StoppableCheck} easily convertible to JSON + */ + StoppableCheck getInstanceStoppableCheck(String clusterId, String instanceName, String jsonContent) throws IOException; } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java index a01912a..310e2b9 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java @@ -25,13 +25,16 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; +import org.apache.helix.PropertyKey; import org.apache.helix.ZNRecord; import org.apache.helix.model.CurrentState; +import org.apache.helix.model.ExternalView; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.RESTConfig; @@ -44,69 +47,34 @@ import org.apache.helix.util.InstanceValidationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; + + public class InstanceServiceImpl implements InstanceService { private static final Logger LOG = LoggerFactory.getLogger(InstanceServiceImpl.class); private static final String PARTITION_HEALTH_KEY = "PARTITION_HEALTH"; private static final String IS_HEALTHY_KEY = "IS_HEALTHY"; private static final String EXPIRY_KEY = "EXPIRE"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private final HelixDataAccessor _dataAccessor; private final ConfigAccessor _configAccessor; + private final CustomRestClient _customRestClient; public InstanceServiceImpl(HelixDataAccessor dataAccessor, ConfigAccessor configAccessor) { _dataAccessor = dataAccessor; _configAccessor = configAccessor; + _customRestClient = CustomRestClientFactory.get(); } - @Override - public Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName, - List<HealthCheck> healthChecks) { - Map<String, Boolean> healthStatus = new HashMap<>(); - for (HealthCheck healthCheck : healthChecks) { - switch (healthCheck) { - case INVALID_CONFIG: - healthStatus.put(HealthCheck.INVALID_CONFIG.name(), - InstanceValidationUtil.hasValidConfig(_dataAccessor, clusterId, instanceName)); - if (!healthStatus.get(HealthCheck.INVALID_CONFIG.name())) { - LOG.error("The instance {} doesn't have valid configuration", instanceName); - return healthStatus; - } - case INSTANCE_NOT_ENABLED: - healthStatus.put(HealthCheck.INSTANCE_NOT_ENABLED.name(), InstanceValidationUtil - .isEnabled(_dataAccessor, _configAccessor, clusterId, instanceName)); - break; - case INSTANCE_NOT_ALIVE: - healthStatus.put(HealthCheck.INSTANCE_NOT_ALIVE.name(), - InstanceValidationUtil.isAlive(_dataAccessor, clusterId, instanceName)); - break; - case INSTANCE_NOT_STABLE: - boolean isStable = InstanceValidationUtil.isInstanceStable(_dataAccessor, instanceName); - healthStatus.put(HealthCheck.INSTANCE_NOT_STABLE.name(), isStable); - break; - case HAS_ERROR_PARTITION: - healthStatus.put(HealthCheck.HAS_ERROR_PARTITION.name(), - !InstanceValidationUtil.hasErrorPartitions(_dataAccessor, clusterId, instanceName)); - break; - case HAS_DISABLED_PARTITION: - healthStatus.put(HealthCheck.HAS_DISABLED_PARTITION.name(), - !InstanceValidationUtil.hasDisabledPartitions(_dataAccessor, clusterId, instanceName)); - break; - case EMPTY_RESOURCE_ASSIGNMENT: - healthStatus.put(HealthCheck.EMPTY_RESOURCE_ASSIGNMENT.name(), - InstanceValidationUtil.hasResourceAssigned(_dataAccessor, clusterId, instanceName)); - break; - case MIN_ACTIVE_REPLICA_CHECK_FAILED: - healthStatus.put(HealthCheck.MIN_ACTIVE_REPLICA_CHECK_FAILED.name(), - InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor, instanceName)); - break; - default: - LOG.error("Unsupported health check: {}", healthCheck); - break; - } - } - - return healthStatus; + @VisibleForTesting + InstanceServiceImpl(HelixDataAccessor dataAccessor, ConfigAccessor configAccessor, CustomRestClient customRestClient) { + _dataAccessor = dataAccessor; + _configAccessor = configAccessor; + _customRestClient = customRestClient; } @Override @@ -139,35 +107,116 @@ public class InstanceServiceImpl implements InstanceService { instanceInfoBuilder.partitions(partitions); } try { - Map<String, Boolean> healthStatus = getInstanceHealthStatus(clusterId, instanceName, healthChecks); + Map<String, Boolean> healthStatus = + getInstanceHealthStatus(clusterId, instanceName, healthChecks); instanceInfoBuilder.healthStatus(healthStatus); } catch (HelixException ex) { - LOG.error("Exception while getting health status: {}, reporting health status as unHealth", ex); + LOG.error( + "Exception while getting health status. Cluster: {}, Instance: {}, reporting health status as unHealth", + clusterId, instanceName, ex); instanceInfoBuilder.healthStatus(false); } return instanceInfoBuilder.build(); } - + /** + * {@inheritDoc} + * Step 1: Perform instance level Helix own health checks + * Step 2: Perform instance level client side health checks + * Step 3: Perform partition level (all partitions on the instance) client side health checks + * Note: if the check fails at one step, all the following steps won't be executed because the instance cannot be stopped + */ @Override - public StoppableCheck checkSingleInstanceStoppable(String clusterId, String instanceName, + public StoppableCheck getInstanceStoppableCheck(String clusterId, String instanceName, String jsonContent) throws IOException { - // TODO reduce GC by dependency injection - Map<String, Boolean> helixStoppableCheck = getInstanceHealthStatus(clusterId, - instanceName, InstanceService.HealthCheck.STOPPABLE_CHECK_LIST); - CustomRestClient customClient = CustomRestClientFactory.get(jsonContent); + LOG.info("Perform instance level helix own health checks for {}/{}", clusterId, instanceName); + Map<String, Boolean> helixStoppableCheck = getInstanceHealthStatus(clusterId, instanceName, + InstanceService.HealthCheck.STOPPABLE_CHECK_LIST); + + StoppableCheck result = + new StoppableCheck(helixStoppableCheck, StoppableCheck.Category.HELIX_OWN_CHECK); + if (!result.isStoppable()) { + return result; + } + LOG.info("{} passed helix side health checks", instanceName); + return performCustomInstanceChecks(clusterId, instanceName, getCustomPayLoads(jsonContent)); + } + + @VisibleForTesting + protected StoppableCheck performCustomInstanceChecks(String clusterId, String instanceName, + Map<String, String> customPayLoads) throws IOException { + StoppableCheck defaultSucceed = new StoppableCheck(true, Collections.emptyList(), + StoppableCheck.Category.CUSTOM_INSTANCE_CHECK); + LOG.info("Perform instance level client side health checks for {}/{}", clusterId, instanceName); + Optional<String> maybeBaseUrl = getBaseUrl(instanceName, clusterId); + if (!maybeBaseUrl.isPresent()) { + LOG.warn("Unable to get custom client health endpoint: " + instanceName); + return defaultSucceed; + } + try { + String baseUrl = maybeBaseUrl.get(); + StoppableCheck result = + new StoppableCheck(_customRestClient.getInstanceStoppableCheck(baseUrl, customPayLoads), + StoppableCheck.Category.CUSTOM_INSTANCE_CHECK); + if (!result.isStoppable()) { + return result; + } + LOG.info("{} passed client side instance level health checks", instanceName); + return performPartitionLevelChecks(clusterId, instanceName, baseUrl, customPayLoads); + } catch (IOException e) { + LOG.error("Failed to perform custom client side instance level health checks for {}/{}", + clusterId, instanceName, e); + throw e; + } + } + + @VisibleForTesting + protected StoppableCheck performPartitionLevelChecks(String clusterId, String instanceName, + String baseUrl, Map<String, String> customPayLoads) throws IOException { + LOG.info("Perform partition level health checks for {}/{}", clusterId, instanceName); + // pull the health status from ZK + PartitionHealth clusterPartitionsHealth = generatePartitionHealthMapFromZK(); + Map<String, List<String>> expiredPartitionsOnInstances = + clusterPartitionsHealth.getExpiredRecords(); + // update the health status for those expired partitions on instances try { - Map<String, Boolean> customStoppableCheck = - customClient.getInstanceStoppableCheck("", Collections.emptyMap()); - return StoppableCheck.mergeStoppableChecks(helixStoppableCheck, customStoppableCheck); + for (Map.Entry<String, List<String>> entry : expiredPartitionsOnInstances.entrySet()) { + Map<String, Boolean> partitionHealthStatus = + _customRestClient.getPartitionStoppableCheck(baseUrl, entry.getValue(), customPayLoads); + partitionHealthStatus.entrySet().forEach(kv -> clusterPartitionsHealth + .updatePartitionHealth(instanceName, kv.getKey(), kv.getValue())); + } } catch (IOException e) { - LOG.error("Failed to perform customized health check for {}/{}", clusterId, instanceName, e); + LOG.error("Failed to perform client side partition level health checks for {}/{}", clusterId, + instanceName, e); throw e; } + // sibling checks on partitions health for entire cluster + PropertyKey.Builder propertyKeyBuilder = _dataAccessor.keyBuilder(); + List<ExternalView> externalViews = + _dataAccessor.getChildNames(propertyKeyBuilder.externalViews()).stream() + .map(externalView -> (ExternalView) _dataAccessor + .getProperty(propertyKeyBuilder.externalView(externalView))) + .collect(Collectors.toList()); + List<String> unHealthyPartitions = InstanceValidationUtil.perPartitionHealthCheck(externalViews, + clusterPartitionsHealth.getGlobalPartitionHealth(), instanceName, _dataAccessor); + return new StoppableCheck(unHealthyPartitions.isEmpty(), unHealthyPartitions, + StoppableCheck.Category.CUSTOM_PARTITION_CHECK); } - public PartitionHealth generatePartitionHealthMapFromZK() { + private Map<String, String> getCustomPayLoads(String jsonContent) throws IOException { + Map<String, String> result = new HashMap<>(); + JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonContent); + // parsing the inputs as string key value pairs + jsonNode.fields().forEachRemaining(kv -> + result.put(kv.getKey(), kv.getValue().asText()) + ); + return result; + } + + @VisibleForTesting + protected PartitionHealth generatePartitionHealthMapFromZK() { PartitionHealth partitionHealth = new PartitionHealth(); // Only checks the instances are online with valid reports @@ -180,9 +229,10 @@ public class InstanceServiceImpl implements InstanceService { for (String partitionName : customizedHealth.getMapFields().keySet()) { try { Map<String, String> healthMap = customizedHealth.getMapField(partitionName); - if (healthMap == null || Long.parseLong(healthMap.get(EXPIRY_KEY)) < System - .currentTimeMillis()) { - // Clean all the existing checks. If we do not clean it, when we do the customized check, + if (healthMap == null + || Long.parseLong(healthMap.get(EXPIRY_KEY)) < System.currentTimeMillis()) { + // Clean all the existing checks. If we do not clean it, when we do the customized + // check, // Helix may think these partitions are only partitions holding on the instance. // But it could potentially have some partitions are unhealthy for expired ones. // It could problem for shutting down instances. @@ -204,61 +254,68 @@ public class InstanceServiceImpl implements InstanceService { return partitionHealth; } - /** - * Get general customized URL from RESTConfig - * - * @param configAccessor - * @param clustername - * - * @return null if RESTConfig is null - */ - protected String getGeneralCustomizedURL(ConfigAccessor configAccessor, String clustername) { - RESTConfig restConfig = configAccessor.getRESTConfig(clustername); - // If user customized URL is not ready, return true as the check - if (restConfig == null) { - return null; - } - return restConfig.getCustomizedHealthURL(); - } - - /** - * Use get user provided general URL to construct the stoppable status or partition status URL - * - * @param generalURL - * @param instanceName - * @param statusType - * @return null if URL is malformed - */ - protected String getCustomizedURLWithEndPoint(String generalURL, String instanceName, - InstanceValidationUtil.HealthStatusType statusType) { - if (generalURL == null) { - LOG.warn("Failed to generate customized URL for instance {}", instanceName); - return null; + @VisibleForTesting + protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName, + List<HealthCheck> healthChecks) { + Map<String, Boolean> healthStatus = new HashMap<>(); + for (HealthCheck healthCheck : healthChecks) { + switch (healthCheck) { + case INVALID_CONFIG: + healthStatus.put(HealthCheck.INVALID_CONFIG.name(), + InstanceValidationUtil.hasValidConfig(_dataAccessor, clusterId, instanceName)); + if (!healthStatus.get(HealthCheck.INVALID_CONFIG.name())) { + LOG.error("The instance {} doesn't have valid configuration", instanceName); + return healthStatus; + } + case INSTANCE_NOT_ENABLED: + healthStatus.put(HealthCheck.INSTANCE_NOT_ENABLED.name(), InstanceValidationUtil + .isEnabled(_dataAccessor, instanceName)); + break; + case INSTANCE_NOT_ALIVE: + healthStatus.put(HealthCheck.INSTANCE_NOT_ALIVE.name(), + InstanceValidationUtil.isAlive(_dataAccessor, instanceName)); + break; + case INSTANCE_NOT_STABLE: + boolean isStable = InstanceValidationUtil.isInstanceStable(_dataAccessor, instanceName); + healthStatus.put(HealthCheck.INSTANCE_NOT_STABLE.name(), isStable); + break; + case HAS_ERROR_PARTITION: + healthStatus.put(HealthCheck.HAS_ERROR_PARTITION.name(), + !InstanceValidationUtil.hasErrorPartitions(_dataAccessor, clusterId, instanceName)); + break; + case HAS_DISABLED_PARTITION: + healthStatus.put(HealthCheck.HAS_DISABLED_PARTITION.name(), + !InstanceValidationUtil.hasDisabledPartitions(_dataAccessor, clusterId, instanceName)); + break; + case EMPTY_RESOURCE_ASSIGNMENT: + healthStatus.put(HealthCheck.EMPTY_RESOURCE_ASSIGNMENT.name(), + InstanceValidationUtil.hasResourceAssigned(_dataAccessor, clusterId, instanceName)); + break; + case MIN_ACTIVE_REPLICA_CHECK_FAILED: + healthStatus.put(HealthCheck.MIN_ACTIVE_REPLICA_CHECK_FAILED.name(), + InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor, instanceName)); + break; + default: + LOG.error("Unsupported health check: {}", healthCheck); + break; + } } - try { - // If user customized URL is not ready, return true as the check - String hostName = instanceName.split("_")[0]; - return String.format("%s/%s", generalURL.replace("*", hostName), statusType.name()); - } catch (Exception e) { - LOG.info("Failed to prepare customized check for generalURL {} instance {}", generalURL, - instanceName, e); - return null; - } + return healthStatus; } - /** - * Perform customized single instance health check map filtering - * - * Input map is user customized health out put. It will be HEALTH_ENTRY_KEY -> true/false - * @param statusMap - * @return - */ - private Map<String, Boolean> perInstanceHealthCheck(Map<String, Boolean> statusMap) { - if (statusMap != null && !statusMap.isEmpty()) { - statusMap = statusMap.entrySet().stream().filter(entry -> !entry.getValue()) - .collect(Collectors.toMap(map -> map.getKey(), map -> map.getValue())); + private Optional<String> getBaseUrl(String instance, String clusterId) { + RESTConfig restConfig = _configAccessor.getRESTConfig(clusterId); + if (restConfig == null) { + LOG.error("The cluster {} hasn't enabled client side health checks yet", clusterId); + return Optional.empty(); } - return statusMap; + String baseUrl = restConfig.get(RESTConfig.SimpleFields.CUSTOMIZED_HEALTH_URL); + // pre-assumption of the url, must be format of "http://*/path", the wildcard is replaceable by the instance vip + assert baseUrl.contains("*"); + // pre-assumption of the instance name, must be format of <instanceVip>_<port> + assert instance.contains("_"); + String instanceVip = instance.substring(0, instance.indexOf('_')); + return Optional.of(baseUrl.replace("*", instanceVip)); } } diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/json/instance/TestStoppableCheck.java b/helix-rest/src/test/java/org/apache/helix/rest/server/json/instance/TestStoppableCheck.java index bcfc0e3..0249b8f 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/json/instance/TestStoppableCheck.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/json/instance/TestStoppableCheck.java @@ -19,42 +19,30 @@ package org.apache.helix.rest.server.json.instance; * under the License. */ -import java.util.ArrayList; -import java.util.Map; - import org.testng.Assert; import org.testng.annotations.Test; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; public class TestStoppableCheck { @Test public void whenSerializingStoppableCheck() throws JsonProcessingException { - StoppableCheck stoppableCheck = new StoppableCheck(false, new ArrayList<String>() { - { - add("failedCheck"); - } - }); + StoppableCheck stoppableCheck = new StoppableCheck(false, ImmutableList.of("check"), StoppableCheck.Category.HELIX_OWN_CHECK); ObjectMapper mapper = new ObjectMapper(); String result = mapper.writeValueAsString(stoppableCheck); - Assert.assertEquals(result, "{\"stoppable\":false,\"failedChecks\":[\"failedCheck\"]}"); + Assert.assertEquals(result, "{\"stoppable\":false,\"failedChecks\":[\"Helix:check\"]}"); } @Test - public void testMergeStoppableChecks() throws JsonProcessingException { - Map<String, Boolean> helixCheck = ImmutableMap.of("check0", false, "check1", false); - Map<String, Boolean> customCheck = ImmutableMap.of("check1", true, "check2", true); - - StoppableCheck stoppableCheck = StoppableCheck.mergeStoppableChecks(helixCheck, customCheck); - ObjectMapper mapper = new ObjectMapper(); - String result = mapper.writeValueAsString(stoppableCheck); - - Assert.assertEquals(result, - "{\"stoppable\":false,\"failedChecks\":[\"Helix:check0\",\"Helix:check1\"]}"); + public void testConstructorSortingOrder() { + StoppableCheck stoppableCheck = new StoppableCheck(ImmutableMap.of("a", true, "c", false, "b", false), StoppableCheck.Category.HELIX_OWN_CHECK); + Assert.assertFalse(stoppableCheck.isStoppable()); + Assert.assertEquals(stoppableCheck.getFailedChecks(), ImmutableList.of("Helix:b", "Helix:c")); } } diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java index d304ea8..bc2e05c 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java @@ -19,82 +19,162 @@ package org.apache.helix.rest.server.service; * under the License. */ -import com.google.common.collect.ImmutableMap; +import static org.mockito.Mockito.*; + +import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; + import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixDataAccessor; import org.apache.helix.PropertyKey; import org.apache.helix.ZNRecord; import org.apache.helix.model.HealthStat; +import org.apache.helix.model.RESTConfig; +import org.apache.helix.rest.client.CustomRestClient; import org.apache.helix.rest.server.json.cluster.PartitionHealth; +import org.apache.helix.rest.server.json.instance.StoppableCheck; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.testng.Assert; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import static org.mockito.Mockito.*; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; public class TestInstanceService { private static final String TEST_CLUSTER = "TestCluster"; + private static final String TEST_INSTANCE = "instance0.linkedin.com_1235"; + + @Mock + private HelixDataAccessor _dataAccessor; + @Mock + private ConfigAccessor _configAccessor; + @Mock + private CustomRestClient _customRestClient; + + @BeforeMethod + public void beforeMethod() { + MockitoAnnotations.initMocks(this); + RESTConfig restConfig = new RESTConfig("restConfig"); + restConfig.set(RESTConfig.SimpleFields.CUSTOMIZED_HEALTH_URL, "http://*:123/path"); + when(_configAccessor.getRESTConfig(TEST_CLUSTER)).thenReturn(restConfig); + } + + @Test + public void testGetInstanceStoppableCheck_fail_at_helix_own_checks() throws IOException { + StoppableCheck expected = new StoppableCheck(false, ImmutableList.of("FailedCheck"), StoppableCheck.Category.HELIX_OWN_CHECK); + InstanceService service = new InstanceServiceImpl(_dataAccessor, _configAccessor, _customRestClient) { + @Override + public StoppableCheck getInstanceStoppableCheck(String clusterId, String instanceName, String jsonContent) + throws IOException { + return expected; + } + }; + + String jsonContent = ""; + StoppableCheck actual = service.getInstanceStoppableCheck(TEST_CLUSTER, TEST_INSTANCE, jsonContent); + + Assert.assertEquals(actual, expected); + verifyZeroInteractions(_customRestClient); + verifyZeroInteractions(_configAccessor); + } + + @Test + public void testGetInstanceStoppableCheck_fail_at_custom_instance_checks() throws IOException { + InstanceService service = new InstanceServiceImpl(_dataAccessor, _configAccessor, _customRestClient) { + @Override + protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName, + List<HealthCheck> healthChecks) { + return Collections.emptyMap(); + } + }; + when(_customRestClient.getInstanceStoppableCheck(anyString(), anyMap())).thenReturn(ImmutableMap.of("check1", false)); + + String jsonContent = "{\n" + " \"param1\": \"value1\",\n" + "\"param2\": \"value2\"\n" + "}"; + StoppableCheck actual = service.getInstanceStoppableCheck(TEST_CLUSTER, TEST_INSTANCE, jsonContent); + + Assert.assertFalse(actual.isStoppable()); + verify(_customRestClient, times(1)).getInstanceStoppableCheck(any(), any()); + verify(_customRestClient, times(0)).getPartitionStoppableCheck(any(), any(), any()); + } + + @Test + public void testGetInstanceStoppableCheck_fail_at_custom_partition_checks() throws IOException { + InstanceService service = new InstanceServiceImpl(_dataAccessor, _configAccessor, _customRestClient) { + @Override + protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName, + List<HealthCheck> healthChecks) { + return Collections.emptyMap(); + } + + @Override + protected StoppableCheck performPartitionLevelChecks(String clusterId, String instanceName, String baseUrl, + Map<String, String> customPayLoads) throws IOException { + return new StoppableCheck(false, Collections.emptyList(), StoppableCheck.Category.CUSTOM_PARTITION_CHECK); + } + }; + + // partition is health on the test instance but unhealthy on the sibling instance + when(_customRestClient.getInstanceStoppableCheck(anyString(), anyMap())).thenReturn(Collections.emptyMap()); + String jsonContent = "{\n" + " \"param1\": \"value1\",\n" + "\"param2\": \"value2\"\n" + "}"; + StoppableCheck actual = service.getInstanceStoppableCheck(TEST_CLUSTER, TEST_INSTANCE, jsonContent); + + Assert.assertFalse(actual.isStoppable()); + verify(_customRestClient, times(1)).getInstanceStoppableCheck(any(), any()); + } @Test public void testGeneratePartitionHealthMapFromZK() { - //Prepare for testing data. - Mock mock = new Mock(); - when(mock.dataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder(TEST_CLUSTER)); - when(mock.dataAccessor.getChildNames(new PropertyKey.Builder(TEST_CLUSTER).liveInstances())) - .thenReturn(mock.liveInstances); - when(mock.dataAccessor.getProperty( + List<ZNRecord> healthData = generateHealthData(); + + InstanceServiceImpl service = new InstanceServiceImpl(_dataAccessor, _configAccessor, _customRestClient); + when(_dataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder(TEST_CLUSTER)); + when(_dataAccessor.getChildNames(new PropertyKey.Builder(TEST_CLUSTER).liveInstances())) + .thenReturn(Arrays.asList("host0", "host1")); + when(_dataAccessor.getProperty( new PropertyKey.Builder(TEST_CLUSTER).healthReport("host0", "PARTITION_HEALTH"))) - .thenReturn(new HealthStat(mock.healthData.get(0))); - when(mock.dataAccessor.getProperty( + .thenReturn(new HealthStat(healthData.get(0))); + when(_dataAccessor.getProperty( new PropertyKey.Builder(TEST_CLUSTER).healthReport("host1", "PARTITION_HEALTH"))) - .thenReturn(new HealthStat(mock.healthData.get(1))); - PartitionHealth computeResult = new InstanceServiceImpl(mock.dataAccessor, mock.configAccessor) - .generatePartitionHealthMapFromZK(); + .thenReturn(new HealthStat(healthData.get(1))); + PartitionHealth computeResult = service.generatePartitionHealthMapFromZK(); PartitionHealth expectedResult = generateExpectedResult(); - Assert.assertTrue(computeResult.equals(expectedResult)); + Assert.assertEquals(computeResult, expectedResult); } - - private final class Mock { - private HelixDataAccessor dataAccessor = mock(HelixDataAccessor.class); - private ConfigAccessor configAccessor = mock(ConfigAccessor.class); - private List<String> liveInstances = Arrays.asList("host0", "host1"); - private List<ZNRecord> healthData = generateHealthData(); - - Mock() { - } - - private List<ZNRecord> generateHealthData() { - // Set EXPIRY time 100000 that guarantees the test has enough time - // Host 0 contains unhealthy partition but it does not matter. - ZNRecord record1 = new ZNRecord("PARTITION_HEALTH"); - record1.setMapField("TESTDB0_0", ImmutableMap - .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000))); - record1.setMapField("TESTDB0_1", ImmutableMap - .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000))); - record1.setMapField("TESTDB0_2", ImmutableMap - .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000))); - record1.setMapField("TESTDB1_0", ImmutableMap - .of("IS_HEALTHY", "false", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000))); - record1.setMapField("TESTDB2_0", ImmutableMap - .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000))); - - // Host 1 has expired data, which requires immediate API querying. - ZNRecord record2 = new ZNRecord("PARTITION_HEALTH"); - record2.setMapField("TESTDB0_0", ImmutableMap - .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000))); - record2.setMapField("TESTDB0_1", ImmutableMap - .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000))); - record2.setMapField("TESTDB0_2", ImmutableMap - .of("IS_HEALTHY", "true", "EXPIRE", "123456")); - record2.setMapField("TESTDB1_0", ImmutableMap - .of("IS_HEALTHY", "false", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000))); - record2.setMapField("TESTDB2_0", ImmutableMap - .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000))); - - return Arrays.asList(record1, record2); - } + private List<ZNRecord> generateHealthData() { + // Set EXPIRY time 100000 that guarantees the test has enough time + // Host 0 contains unhealthy partition but it does not matter. + ZNRecord record1 = new ZNRecord("PARTITION_HEALTH"); + record1.setMapField("TESTDB0_0", ImmutableMap + .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000))); + record1.setMapField("TESTDB0_1", ImmutableMap + .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000))); + record1.setMapField("TESTDB0_2", ImmutableMap + .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000))); + record1.setMapField("TESTDB1_0", ImmutableMap + .of("IS_HEALTHY", "false", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000))); + record1.setMapField("TESTDB2_0", ImmutableMap + .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000))); + + // Host 1 has expired data, which requires immediate API querying. + ZNRecord record2 = new ZNRecord("PARTITION_HEALTH"); + record2.setMapField("TESTDB0_0", ImmutableMap + .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000))); + record2.setMapField("TESTDB0_1", ImmutableMap + .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000))); + record2.setMapField("TESTDB0_2", ImmutableMap + .of("IS_HEALTHY", "true", "EXPIRE", "123456")); + record2.setMapField("TESTDB1_0", ImmutableMap + .of("IS_HEALTHY", "false", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000))); + record2.setMapField("TESTDB2_0", ImmutableMap + .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000))); + + return Arrays.asList(record1, record2); } private PartitionHealth generateExpectedResult() {
