This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit e9fb49652d37b7f5eec5e0924056804112a43b96 Author: Junkai Xue <[email protected]> AuthorDate: Fri Mar 29 16:40:51 2019 -0700 Refactor InstanceAccessor to InstancesAccessor and PerInstanceAccessor RB=1614063 BUG=HELIX-1725 G=helix-reviewers A=hulee Signed-off-by: Hunter Lee <[email protected]> --- .../rest/server/resources/AbstractResource.java | 1 + .../resources/exceptions/HelixHealthException.java | 16 ++ .../server/resources/helix/InstancesAccessor.java | 244 ++++++++++++++++ ...tanceAccessor.java => PerInstanceAccessor.java} | 315 +++------------------ .../helix/rest/server/service/InstanceService.java | 4 + .../rest/server/service/InstanceServiceImpl.java | 21 +- .../helix/rest/server/AbstractTestClass.java | 36 +++ .../helix/rest/server/TestInstancesAccessor.java | 124 ++++++++ ...eAccessor.java => TestPerInstanceAccessor.java} | 164 +---------- 9 files changed, 504 insertions(+), 421 deletions(-) diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java index 3b7d995..5128493 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java @@ -62,6 +62,7 @@ public class AbstractResource { disablePartitions, update, delete, + stoppable, rebalance, reset, resetPartitions, diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/exceptions/HelixHealthException.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/exceptions/HelixHealthException.java new file mode 100644 index 0000000..238a5f0 --- /dev/null +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/exceptions/HelixHealthException.java @@ -0,0 +1,16 @@ +package org.apache.helix.rest.server.resources.exceptions; + +public class HelixHealthException extends RuntimeException { + + public HelixHealthException(String message) { + super(message); + } + + public HelixHealthException(Throwable cause) { + super(cause); + } + + public HelixHealthException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java new file mode 100644 index 0000000..a697e9b --- /dev/null +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java @@ -0,0 +1,244 @@ +package org.apache.helix.rest.server.resources.helix; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Response; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixException; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.rest.server.json.instance.StoppableCheck; +import org.apache.helix.rest.server.resources.exceptions.HelixHealthException; +import org.apache.helix.rest.server.service.ClusterService; +import org.apache.helix.rest.server.service.ClusterServiceImpl; +import org.apache.helix.rest.server.service.InstanceService; +import org.apache.helix.rest.server.service.InstanceServiceImpl; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.node.ArrayNode; +import org.codehaus.jackson.node.JsonNodeFactory; +import org.codehaus.jackson.node.ObjectNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Path("/clusters/{clusterId}/instances") +public class InstancesAccessor extends AbstractHelixResource { + private final static Logger _logger = LoggerFactory.getLogger(InstancesAccessor.class); + + public enum InstancesProperties { + instances, + online, + disabled, + selection_base, + zone_order, + customized_values, + instance_stoppable_parallel, + instance_not_stoppable_with_reasons + } + + public enum InstanceHealthSelectionBase { + instance_based, + zone_based + } + + + @GET + public Response getAllInstances(@PathParam("clusterId") String clusterId) { + HelixDataAccessor accessor = getDataAccssor(clusterId); + List<String> instances = accessor.getChildNames(accessor.keyBuilder().instanceConfigs()); + + if (instances == null) { + return notFound(); + } + + ObjectNode root = JsonNodeFactory.instance.objectNode(); + root.put(Properties.id.name(), JsonNodeFactory.instance.textNode(clusterId)); + + ArrayNode instancesNode = root.putArray(InstancesAccessor.InstancesProperties.instances.name()); + instancesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(instances)); + ArrayNode onlineNode = root.putArray(InstancesAccessor.InstancesProperties.online.name()); + ArrayNode disabledNode = root.putArray(InstancesAccessor.InstancesProperties.disabled.name()); + + List<String> liveInstances = accessor.getChildNames(accessor.keyBuilder().liveInstances()); + ClusterConfig clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig()); + + for (String instanceName : instances) { + InstanceConfig instanceConfig = + accessor.getProperty(accessor.keyBuilder().instanceConfig(instanceName)); + if (instanceConfig != null) { + if (!instanceConfig.getInstanceEnabled() || (clusterConfig.getDisabledInstances() != null + && clusterConfig.getDisabledInstances().containsKey(instanceName))) { + disabledNode.add(JsonNodeFactory.instance.textNode(instanceName)); + } + + if (liveInstances.contains(instanceName)){ + onlineNode.add(JsonNodeFactory.instance.textNode(instanceName)); + } + } + } + + return JSONRepresentation(root); + } + + @POST + public Response instancesOperations(@PathParam("clusterId") String clusterId, + @QueryParam("command") String command, String content) { + Command cmd; + try { + cmd = Command.valueOf(command); + } catch (Exception e) { + return badRequest("Invalid command : " + command); + } + + HelixAdmin admin = getHelixAdmin(); + try { + JsonNode node = null; + if (content.length() != 0) { + node = OBJECT_MAPPER.readTree(content); + } + if (node == null) { + return badRequest("Invalid input for content : " + content); + } + List<String> enableInstances = OBJECT_MAPPER + .readValue(node.get(InstancesAccessor.InstancesProperties.instances.name()).toString(), + OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class)); + switch (cmd) { + case enable: + admin.enableInstance(clusterId, enableInstances, true); + break; + case disable: + admin.enableInstance(clusterId, enableInstances, false); + break; + case stoppable: + return getParallelStoppableInstances(clusterId, node); + default: + _logger.error("Unsupported command :" + command); + return badRequest("Unsupported command :" + command); + } + } catch (HelixHealthException e) { + _logger + .error(String.format("Current cluster %s has issue with health checks!", clusterId), e); + return serverError(e); + } catch (Exception e) { + _logger.error("Failed in updating instances : " + content, e); + return badRequest(e.getMessage()); + } + return OK(); + } + + private Response getParallelStoppableInstances(String clusterId, JsonNode node) + throws IOException { + try { + // TODO: Process input data from the content + InstancesAccessor.InstanceHealthSelectionBase selectionBase = + InstancesAccessor.InstanceHealthSelectionBase.valueOf( + node.get(InstancesAccessor.InstancesProperties.selection_base.name()) + .getValueAsText()); + List<String> instances = OBJECT_MAPPER + .readValue(node.get(InstancesAccessor.InstancesProperties.instances.name()).toString(), + OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class)); + + List<String> orderOfZone = null; + String customizedInput = null; + if (node.get(InstancesAccessor.InstancesProperties.customized_values.name()) != null) { + customizedInput = node.get(InstancesAccessor.InstancesProperties.customized_values.name()).getTextValue(); + } + + if (node.get(InstancesAccessor.InstancesProperties.zone_order.name()) != null) { + orderOfZone = OBJECT_MAPPER + .readValue(node.get(InstancesAccessor.InstancesProperties.zone_order.name()).toString(), + OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class)); + } + + // Prepare output result + ObjectNode result = JsonNodeFactory.instance.objectNode(); + ArrayNode stoppableInstances = + result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + ObjectNode failedStoppableInstances = result.putObject( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + InstanceService instanceService = + new InstanceServiceImpl(getDataAccssor(clusterId), getConfigAccessor()); + switch (selectionBase) { + case zone_based: + List<String> zoneBasedInstance = getZoneBasedInstances(clusterId, instances, orderOfZone); + for (String instance : zoneBasedInstance) { + StoppableCheck stoppableCheck = + instanceService.checkSingleInstanceStoppable(clusterId, instance, customizedInput); + if (!stoppableCheck.isStoppable()) { + ArrayNode failedReasonsNode = failedStoppableInstances.putArray(instance); + for (String failedReason : stoppableCheck.getFailedChecks()) { + failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason)); + } + } else { + stoppableInstances.add(instance); + } + } + break; + case instance_based: + default: + throw new NotImplementedException("instance_based selection is not supported yet!"); + } + return JSONRepresentation(result); + } catch (HelixException e) { + _logger + .error(String.format("Current cluster %s has issue with health checks!", clusterId), e); + throw new HelixHealthException(e); + } catch (Exception e) { + _logger.error(String.format( + "Failed to get parallel stoppable instances for cluster %s with a HelixException!", + clusterId), e); + throw e; + } + } + + /** + * Get instances belongs to the first zone. If the zone is already empty, Helix will iterate zones + * by order until find the zone contains instances. + * + * The order of zones can directly come from user input. If user did not specify it, Helix will order + * zones with alphabetical order. + * + * @param clusterId + * @param instances + * @param orderedZones + * @return + */ + private List<String> getZoneBasedInstances(String clusterId, List<String> instances, + List<String> orderedZones) { + ClusterService clusterService = + new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor()); + Map<String, Set<String>> zoneMapping = + clusterService.getClusterTopology(clusterId).toZoneMapping(); + if (orderedZones == null) { + orderedZones = new ArrayList<>(zoneMapping.keySet()); + } + Collections.sort(orderedZones); + if (orderedZones.isEmpty()) { + return orderedZones; + } + + Set<String> instanceSet = null; + for (String zone : orderedZones) { + instanceSet = new TreeSet<>(instances); + Set<String> currentZoneInstanceSet = new HashSet<>(zoneMapping.get(zone)); + instanceSet.retainAll(currentZoneInstanceSet); + if (instanceSet.size() > 0) { + return new ArrayList<>(instanceSet); + } + } + + return Collections.EMPTY_LIST; + } +} diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java similarity index 59% rename from helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstanceAccessor.java rename to helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java index d07fb4e..e95f0a2 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstanceAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java @@ -21,13 +21,7 @@ package org.apache.helix.rest.server.resources.helix; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.GET; @@ -38,14 +32,12 @@ import javax.ws.rs.PathParam; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import org.apache.commons.lang3.NotImplementedException; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.ZNRecord; import org.apache.helix.manager.zk.ZKHelixDataAccessor; -import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.CurrentState; import org.apache.helix.model.Error; import org.apache.helix.model.HealthStat; @@ -54,13 +46,9 @@ import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.Message; import org.apache.helix.model.ParticipantHistory; import org.apache.helix.model.builder.HelixConfigScopeBuilder; -import org.apache.helix.rest.client.CustomRestClient; -import org.apache.helix.rest.client.CustomRestClientFactory; import org.apache.helix.rest.common.HelixDataAccessorWrapper; import org.apache.helix.rest.server.json.instance.InstanceInfo; import org.apache.helix.rest.server.json.instance.StoppableCheck; -import org.apache.helix.rest.server.service.ClusterService; -import org.apache.helix.rest.server.service.ClusterServiceImpl; import org.apache.helix.rest.server.service.InstanceService; import org.apache.helix.rest.server.service.InstanceServiceImpl; import org.codehaus.jackson.JsonNode; @@ -72,14 +60,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Path("/clusters/{clusterId}/instances") -public class InstanceAccessor extends AbstractHelixResource { - private final static Logger _logger = LoggerFactory.getLogger(InstanceAccessor.class); +@Path("/clusters/{clusterId}/instances/{instanceName}") +public class PerInstanceAccessor extends AbstractHelixResource { + private final static Logger _logger = LoggerFactory.getLogger(PerInstanceAccessor.class); - public enum InstanceProperties { - instances, - online, - disabled, + public enum PerInstanceProperties { config, liveInstance, resource, @@ -91,181 +76,18 @@ public class InstanceAccessor extends AbstractHelixResource { total_message_count, read_message_count, healthreports, - instanceTags, - selection_base, - zone_order, - customized_values, - instance_stoppable_parallel, - instance_not_stoppable_with_reasons - } - - public enum InstanceHealthSelectionBase { - instance_based, - zone_based + instanceTags } @GET - public Response getAllInstances(@PathParam("clusterId") String clusterId) { - HelixDataAccessor accessor = getDataAccssor(clusterId); - ObjectNode root = JsonNodeFactory.instance.objectNode(); - root.put(Properties.id.name(), JsonNodeFactory.instance.textNode(clusterId)); - - ArrayNode instancesNode = root.putArray(InstanceProperties.instances.name()); - ArrayNode onlineNode = root.putArray(InstanceProperties.online.name()); - ArrayNode disabledNode = root.putArray(InstanceProperties.disabled.name()); - - List<String> instances = accessor.getChildNames(accessor.keyBuilder().instanceConfigs()); - - if (instances != null) { - instancesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(instances)); - } else { - return notFound(); - } - - List<String> liveInstances = accessor.getChildNames(accessor.keyBuilder().liveInstances()); - ClusterConfig clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig()); - - for (String instanceName : instances) { - InstanceConfig instanceConfig = - accessor.getProperty(accessor.keyBuilder().instanceConfig(instanceName)); - if (instanceConfig != null) { - if (!instanceConfig.getInstanceEnabled() || (clusterConfig.getDisabledInstances() != null - && clusterConfig.getDisabledInstances().containsKey(instanceName))) { - disabledNode.add(JsonNodeFactory.instance.textNode(instanceName)); - } - - if (liveInstances.contains(instanceName)){ - onlineNode.add(JsonNodeFactory.instance.textNode(instanceName)); - } - } - } - - return JSONRepresentation(root); - } - - @POST - public Response updateInstances(@PathParam("clusterId") String clusterId, - @QueryParam("command") String command, String content) { - Command cmd; - try { - cmd = Command.valueOf(command); - } catch (Exception e) { - return badRequest("Invalid command : " + command); - } - - HelixAdmin admin = getHelixAdmin(); - try { - JsonNode node = null; - if (content.length() != 0) { - node = OBJECT_MAPPER.readTree(content); - } - if (node == null) { - return badRequest("Invalid input for content : " + content); - } - List<String> enableInstances = OBJECT_MAPPER - .readValue(node.get(InstanceProperties.instances.name()).toString(), - OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class)); - switch (cmd) { - case enable: - admin.enableInstance(clusterId, enableInstances, true); - - break; - case disable: - admin.enableInstance(clusterId, enableInstances, false); - break; - default: - _logger.error("Unsupported command :" + command); - return badRequest("Unsupported command :" + command); - } - } catch (Exception e) { - _logger.error("Failed in updating instances : " + content, e); - return badRequest(e.getMessage()); - } - return OK(); - } - - @POST - @Path("stoppable") - @Consumes(MediaType.APPLICATION_JSON) - public Response getParallelStoppableInstances(@PathParam("clusterId") String clusterId, - String content) { - try { - JsonNode node = null; - if (content.length() != 0) { - node = OBJECT_MAPPER.readTree(content); - } - if (node == null) { - return badRequest("Invalid input for content : " + content); - } - - // TODO: Process input data from the content - InstanceHealthSelectionBase selectionBase = InstanceHealthSelectionBase - .valueOf(node.get(InstanceProperties.selection_base.name()).getValueAsText()); - List<String> instances = OBJECT_MAPPER - .readValue(node.get(InstanceProperties.instances.name()).toString(), - OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class)); - - List<String> orderOfZone = null; - String customizedInput = null; - if (node.get(InstanceProperties.customized_values.name()) != null) { - customizedInput = node.get(InstanceProperties.customized_values.name()).getTextValue(); - } - - if (node.get(InstanceProperties.zone_order.name()) != null) { - orderOfZone = OBJECT_MAPPER - .readValue(node.get(InstanceProperties.zone_order.name()).toString(), - OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class)); - } - - // Prepare output result - ObjectNode result = JsonNodeFactory.instance.objectNode(); - ArrayNode stoppableInstances = - result.putArray(InstanceProperties.instance_stoppable_parallel.name()); - ObjectNode failedStoppableInstances = - result.putObject(InstanceProperties.instance_not_stoppable_with_reasons.name()); - - switch (selectionBase) { - case zone_based: - List<String> zoneBasedInstance = getZoneBasedInstance(clusterId, instances, orderOfZone); - for (String instance : zoneBasedInstance) { - StoppableCheck stoppableCheck = - checkSingleInstanceStoppable(clusterId, instance, customizedInput); - if (!stoppableCheck.isStoppable()) { - ArrayNode failedReasonsNode = failedStoppableInstances.putArray(instance); - for (String failedReason : stoppableCheck.getFailedChecks()) { - failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason)); - } - } else { - stoppableInstances.add(instance); - } - } - break; - case instance_based: - default: - throw new NotImplementedException("instance_based selection is not support now!"); - } - return JSONRepresentation(result); - } catch (HelixException e) { - _logger - .error(String.format("Current cluster %s has issue with health checks!", clusterId), e); - return serverError(e); - } catch (Exception e) { - _logger.error( - String.format("Failed to get parallel stoppable instances for cluster %s!", clusterId), - e); - return serverError(e); - } - } - - @GET - @Path("{instanceName}") public Response getInstanceById(@PathParam("clusterId") String clusterId, @PathParam("instanceName") String instanceName) throws IOException { ObjectMapper objectMapper = new ObjectMapper(); HelixDataAccessor dataAccessor = getDataAccssor(clusterId); // TODO reduce GC by dependency injection - InstanceService instanceService = new InstanceServiceImpl( - new HelixDataAccessorWrapper((ZKHelixDataAccessor) dataAccessor), getConfigAccessor()); + InstanceService instanceService = + new InstanceServiceImpl(new HelixDataAccessorWrapper((ZKHelixDataAccessor) dataAccessor), + getConfigAccessor()); InstanceInfo instanceInfo = instanceService.getInstanceInfo(clusterId, instanceName, InstanceService.HealthCheck.STARTED_AND_HEALTH_CHECK_LIST); @@ -273,14 +95,15 @@ public class InstanceAccessor extends AbstractHelixResource { } @POST - @Path("{instanceName}/stoppable") + @Path("stoppable") @Consumes(MediaType.APPLICATION_JSON) - public Response isInstanceStoppable(String jsonContent, - @PathParam("clusterId") String clusterId, @PathParam("instanceName") String instanceName) throws IOException { + public Response isInstanceStoppable(String jsonContent, @PathParam("clusterId") String clusterId, + @PathParam("instanceName") String instanceName) throws IOException { ObjectMapper objectMapper = new ObjectMapper(); StoppableCheck stoppableCheck = null; try { - stoppableCheck = checkSingleInstanceStoppable(clusterId, instanceName, jsonContent); + stoppableCheck = new InstanceServiceImpl(getDataAccssor(clusterId), getConfigAccessor()) + .checkSingleInstanceStoppable(clusterId, instanceName, jsonContent); } catch (HelixException e) { _logger .error(String.format("Current cluster %s has issue with health checks!", clusterId), e); @@ -290,7 +113,6 @@ public class InstanceAccessor extends AbstractHelixResource { } @PUT - @Path("{instanceName}") public Response addInstance(@PathParam("clusterId") String clusterId, @PathParam("instanceName") String instanceName, String content) { HelixAdmin admin = getHelixAdmin(); @@ -313,7 +135,6 @@ public class InstanceAccessor extends AbstractHelixResource { } @POST - @Path("{instanceName}") public Response updateInstance(@PathParam("clusterId") String clusterId, @PathParam("instanceName") String instanceName, @QueryParam("command") String command, String content) { @@ -345,8 +166,8 @@ public class InstanceAccessor extends AbstractHelixResource { return badRequest("Instance names are not match!"); } admin.resetPartition(clusterId, instanceName, - node.get(InstanceProperties.resource.name()).getTextValue(), (List<String>) OBJECT_MAPPER - .readValue(node.get(InstanceProperties.partitions.name()).toString(), + node.get(PerInstanceProperties.resource.name()).getTextValue(), (List<String>) OBJECT_MAPPER + .readValue(node.get(PerInstanceProperties.partitions.name()).toString(), OBJECT_MAPPER.getTypeFactory() .constructCollectionType(List.class, String.class))); break; @@ -355,7 +176,7 @@ public class InstanceAccessor extends AbstractHelixResource { return badRequest("Instance names are not match!"); } for (String tag : (List<String>) OBJECT_MAPPER - .readValue(node.get(InstanceProperties.instanceTags.name()).toString(), + .readValue(node.get(PerInstanceProperties.instanceTags.name()).toString(), OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class))) { admin.addInstanceTag(clusterId, instanceName, tag); } @@ -365,24 +186,24 @@ public class InstanceAccessor extends AbstractHelixResource { return badRequest("Instance names are not match!"); } for (String tag : (List<String>) OBJECT_MAPPER - .readValue(node.get(InstanceProperties.instanceTags.name()).toString(), + .readValue(node.get(PerInstanceProperties.instanceTags.name()).toString(), OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class))) { admin.removeInstanceTag(clusterId, instanceName, tag); } break; case enablePartitions: admin.enablePartition(true, clusterId, instanceName, - node.get(InstanceProperties.resource.name()).getTextValue(), + node.get(PerInstanceProperties.resource.name()).getTextValue(), (List<String>) OBJECT_MAPPER - .readValue(node.get(InstanceProperties.partitions.name()).toString(), + .readValue(node.get(PerInstanceProperties.partitions.name()).toString(), OBJECT_MAPPER.getTypeFactory() .constructCollectionType(List.class, String.class))); break; case disablePartitions: admin.enablePartition(false, clusterId, instanceName, - node.get(InstanceProperties.resource.name()).getTextValue(), + node.get(PerInstanceProperties.resource.name()).getTextValue(), (List<String>) OBJECT_MAPPER - .readValue(node.get(InstanceProperties.partitions.name()).toString(), + .readValue(node.get(PerInstanceProperties.partitions.name()).toString(), OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class))); break; default: @@ -397,7 +218,6 @@ public class InstanceAccessor extends AbstractHelixResource { } @DELETE - @Path("{instanceName}") public Response deleteInstance(@PathParam("clusterId") String clusterId, @PathParam("instanceName") String instanceName) { HelixAdmin admin = getHelixAdmin(); @@ -412,7 +232,7 @@ public class InstanceAccessor extends AbstractHelixResource { } @GET - @Path("{instanceName}/configs") + @Path("configs") public Response getInstanceConfig(@PathParam("clusterId") String clusterId, @PathParam("instanceName") String instanceName) throws IOException { HelixDataAccessor accessor = getDataAccssor(clusterId); @@ -427,7 +247,7 @@ public class InstanceAccessor extends AbstractHelixResource { } @POST - @Path("{instanceName}/configs") + @Path("configs") public Response updateInstanceConfig(@PathParam("clusterId") String clusterId, @PathParam("instanceName") String instanceName, @QueryParam("command") String commandStr, String content) { @@ -476,14 +296,14 @@ public class InstanceAccessor extends AbstractHelixResource { } @GET - @Path("{instanceName}/resources") + @Path("resources") public Response getResourcesOnInstance(@PathParam("clusterId") String clusterId, @PathParam("instanceName") String instanceName) throws IOException { HelixDataAccessor accessor = getDataAccssor(clusterId); ObjectNode root = JsonNodeFactory.instance.objectNode(); root.put(Properties.id.name(), instanceName); - ArrayNode resourcesNode = root.putArray(InstanceProperties.resources.name()); + ArrayNode resourcesNode = root.putArray(PerInstanceProperties.resources.name()); List<String> sessionIds = accessor.getChildNames(accessor.keyBuilder().sessions(instanceName)); if (sessionIds == null || sessionIds.size() == 0) { @@ -502,8 +322,7 @@ public class InstanceAccessor extends AbstractHelixResource { return JSONRepresentation(root); } - @GET - @Path("{instanceName}/resources/{resourceName}") + @GET @Path("resources/{resourceName}") public Response getResourceOnInstance(@PathParam("clusterId") String clusterId, @PathParam("instanceName") String instanceName, @PathParam("resourceName") String resourceName) throws IOException { @@ -515,8 +334,8 @@ public class InstanceAccessor extends AbstractHelixResource { // Only get resource list from current session id String currentSessionId = sessionIds.get(0); - CurrentState resourceCurrentState = accessor - .getProperty(accessor.keyBuilder().currentState(instanceName, currentSessionId, resourceName)); + CurrentState resourceCurrentState = accessor.getProperty( + accessor.keyBuilder().currentState(instanceName, currentSessionId, resourceName)); if (resourceCurrentState != null) { return JSONRepresentation(resourceCurrentState.getRecord()); } @@ -524,8 +343,7 @@ public class InstanceAccessor extends AbstractHelixResource { return notFound(); } - @GET - @Path("{instanceName}/errors") + @GET @Path("errors") public Response getErrorsOnInstance(@PathParam("clusterId") String clusterId, @PathParam("instanceName") String instanceName) throws IOException { HelixDataAccessor accessor = getDataAccssor(clusterId); @@ -534,8 +352,7 @@ public class InstanceAccessor extends AbstractHelixResource { root.put(Properties.id.name(), instanceName); ObjectNode errorsNode = JsonNodeFactory.instance.objectNode(); - List<String> sessionIds = - accessor.getChildNames(accessor.keyBuilder().errors(instanceName)); + List<String> sessionIds = accessor.getChildNames(accessor.keyBuilder().errors(instanceName)); if (sessionIds == null || sessionIds.size() == 0) { return notFound(); @@ -557,13 +374,13 @@ public class InstanceAccessor extends AbstractHelixResource { errorsNode.put(sessionId, resourcesNode); } } - root.put(InstanceProperties.errors.name(), errorsNode); + root.put(PerInstanceProperties.errors.name(), errorsNode); return JSONRepresentation(root); } @GET - @Path("{instanceName}/errors/{sessionId}/{resourceName}/{partitionName}") + @Path("errors/{sessionId}/{resourceName}/{partitionName}") public Response getErrorsOnInstance(@PathParam("clusterId") String clusterId, @PathParam("instanceName") String instanceName, @PathParam("sessionId") String sessionId, @PathParam("resourceName") String resourceName, @@ -579,7 +396,7 @@ public class InstanceAccessor extends AbstractHelixResource { } @GET - @Path("{instanceName}/history") + @Path("history") public Response getHistoryOnInstance(@PathParam("clusterId") String clusterId, @PathParam("instanceName") String instanceName) throws IOException { HelixDataAccessor accessor = getDataAccssor(clusterId); @@ -592,7 +409,7 @@ public class InstanceAccessor extends AbstractHelixResource { } @GET - @Path("{instanceName}/messages") + @Path("messages") public Response getMessagesOnInstance(@PathParam("clusterId") String clusterId, @PathParam("instanceName") String instanceName, @QueryParam("stateModelDef") String stateModelDef) { @@ -600,9 +417,8 @@ public class InstanceAccessor extends AbstractHelixResource { ObjectNode root = JsonNodeFactory.instance.objectNode(); root.put(Properties.id.name(), instanceName); - ArrayNode newMessages = root.putArray(InstanceProperties.new_messages.name()); - ArrayNode readMessages = root.putArray(InstanceProperties.read_messages.name()); - + ArrayNode newMessages = root.putArray(PerInstanceProperties.new_messages.name()); + ArrayNode readMessages = root.putArray(PerInstanceProperties.read_messages.name()); List<String> messageNames = accessor.getChildNames(accessor.keyBuilder().messages(instanceName)); @@ -629,15 +445,15 @@ public class InstanceAccessor extends AbstractHelixResource { } } - root.put(InstanceProperties.total_message_count.name(), + root.put(PerInstanceProperties.total_message_count.name(), newMessages.size() + readMessages.size()); - root.put(InstanceProperties.read_message_count.name(), readMessages.size()); + root.put(PerInstanceProperties.read_message_count.name(), readMessages.size()); return JSONRepresentation(root); } @GET - @Path("{instanceName}/messages/{messageId}") + @Path("messages/{messageId}") public Response getMessageOnInstance(@PathParam("clusterId") String clusterId, @PathParam("instanceName") String instanceName, @PathParam("messageId") String messageId) throws IOException { @@ -651,14 +467,14 @@ public class InstanceAccessor extends AbstractHelixResource { } @GET - @Path("{instanceName}/healthreports") + @Path("healthreports") public Response getHealthReportsOnInstance(@PathParam("clusterId") String clusterId, @PathParam("instanceName") String instanceName) throws IOException { HelixDataAccessor accessor = getDataAccssor(clusterId); ObjectNode root = JsonNodeFactory.instance.objectNode(); root.put(Properties.id.name(), instanceName); - ArrayNode healthReportsNode = root.putArray(InstanceProperties.healthreports.name()); + ArrayNode healthReportsNode = root.putArray(PerInstanceProperties.healthreports.name()); List<String> healthReports = accessor.getChildNames(accessor.keyBuilder().healthReports(instanceName)); @@ -671,9 +487,9 @@ public class InstanceAccessor extends AbstractHelixResource { } @GET - @Path("{instanceName}/healthreports/{reportName}") - public Response getHealthReportsOnInstance(@PathParam("clusterId") String clusterId, - @PathParam("instanceName") String instanceName, + @Path("healthreports/{reportName}") + public Response getHealthReportsOnInstance( + @PathParam("clusterId") String clusterId, @PathParam("instanceName") String instanceName, @PathParam("reportName") String reportName) throws IOException { HelixDataAccessor accessor = getDataAccssor(clusterId); HealthStat healthStat = @@ -688,47 +504,4 @@ public class InstanceAccessor extends AbstractHelixResource { private boolean validInstance(JsonNode node, String instanceName) { return instanceName.equals(node.get(Properties.id.name()).getValueAsText()); } - - private List<String> getZoneBasedInstance(String clusterId, List<String> instances, List<String> orderOfZone) { - ClusterService - clusterService = new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor()); - Map<String, Set<String>> zoneMapping = clusterService.getClusterTopology(clusterId).toZoneMapping(); - if (orderOfZone == null) { - orderOfZone = new ArrayList<>(zoneMapping.keySet()); - } - Collections.sort(orderOfZone); - if (orderOfZone.isEmpty()) { - return orderOfZone; - } - - Set<String> instanceSet = null; - for (String zone : orderOfZone) { - instanceSet = new TreeSet<>(instances); - Set<String> currentZoneInstanceSet = new HashSet<>(zoneMapping.get(zone)); - instanceSet.retainAll(currentZoneInstanceSet); - if (instanceSet.size() > 0) { - return new ArrayList<>(instanceSet); - } - } - - return Collections.EMPTY_LIST; - } - - private StoppableCheck checkSingleInstanceStoppable(String clusterId, String instanceName, - String jsonContent) { - HelixDataAccessor dataAccessor = getDataAccssor(clusterId); - // TODO reduce GC by dependency injection - InstanceService instanceService = new InstanceServiceImpl( - new HelixDataAccessorWrapper((ZKHelixDataAccessor) dataAccessor), getConfigAccessor()); - - Map<String, Boolean> helixStoppableCheck = instanceService.getInstanceHealthStatus(clusterId, - instanceName, InstanceService.HealthCheck.STOPPABLE_CHECK_LIST); - CustomRestClient customClient = CustomRestClientFactory.get(jsonContent); - // TODO add the json content parse logic - Map<String, Boolean> customStoppableCheck = - customClient.getInstanceStoppableCheck(Collections.<String, String> emptyMap()); - StoppableCheck stoppableCheck = - StoppableCheck.mergeStoppableChecks(helixStoppableCheck, customStoppableCheck); - return stoppableCheck; - } } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java index 42bfe67..f32551b 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java @@ -26,6 +26,7 @@ import java.util.Map; import org.apache.helix.rest.server.json.instance.InstanceInfo; import com.google.common.collect.ImmutableList; +import org.apache.helix.rest.server.json.instance.StoppableCheck; public interface InstanceService { enum HealthCheck { @@ -89,4 +90,7 @@ public interface InstanceService { */ InstanceInfo getInstanceInfo(String clusterId, String instanceName, List<HealthCheck> healthChecks); + + StoppableCheck checkSingleInstanceStoppable(String clusterId, String instanceName, + String jsonContent); } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java index c7ef015..22ac30b 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java @@ -20,16 +20,19 @@ package org.apache.helix.rest.server.service; */ import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.TreeMap; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixDataAccessor; -import org.apache.helix.HelixException; import org.apache.helix.model.CurrentState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; +import org.apache.helix.rest.client.CustomRestClient; +import org.apache.helix.rest.client.CustomRestClientFactory; import org.apache.helix.rest.server.json.instance.InstanceInfo; +import org.apache.helix.rest.server.json.instance.StoppableCheck; import org.apache.helix.util.InstanceValidationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,4 +128,20 @@ public class InstanceServiceImpl implements InstanceService { return instanceInfoBuilder.build(); } + + + @Override + public StoppableCheck checkSingleInstanceStoppable(String clusterId, String instanceName, + String jsonContent) { + // TODO reduce GC by dependency injection + Map<String, Boolean> helixStoppableCheck = getInstanceHealthStatus(clusterId, + instanceName, InstanceService.HealthCheck.STOPPABLE_CHECK_LIST); + CustomRestClient customClient = CustomRestClientFactory.get(jsonContent); + // TODO add the json content parse logic + Map<String, Boolean> customStoppableCheck = + customClient.getInstanceStoppableCheck(Collections.<String, String> emptyMap()); + StoppableCheck stoppableCheck = + StoppableCheck.mergeStoppableChecks(helixStoppableCheck, customStoppableCheck); + return stoppableCheck; + } } diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java index 9caed00..51a64e8 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java @@ -29,6 +29,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeSet; import java.util.logging.Level; import javax.ws.rs.client.Entity; @@ -52,6 +53,7 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.manager.zk.client.DedicatedZkClientFactory; import org.apache.helix.manager.zk.client.HelixZkClient; import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; import org.apache.helix.rest.common.ContextPropertyKeys; import org.apache.helix.rest.common.HelixRestNamespace; import org.apache.helix.rest.server.auditlog.AuditLog; @@ -105,6 +107,9 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest { protected static final String TEST_NAMESPACE = "test-namespace"; protected static HelixZkClient _gZkClientTestNS; protected static BaseDataAccessor<ZNRecord> _baseAccessorTestNS; + protected static final String STOPPABLE_CLUSTER = "StoppableTestCluster"; + protected static final List<String> STOPPABLE_INSTANCES = + Arrays.asList("instance0", "instance1", "instance2", "instance3", "instance4", "instance5"); protected static Set<String> _clusters; protected static String _superCluster = "superCluster"; @@ -276,6 +281,8 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest { _resourcesMap.put(cluster, resources); startController(cluster); } + preSetupForParallelInstancesStoppableTest(STOPPABLE_CLUSTER, STOPPABLE_INSTANCES); + _clusters.add(STOPPABLE_CLUSTER); } protected Set<String> createInstances(String cluster, int numInstances) throws Exception { @@ -468,4 +475,33 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest { protected TaskDriver getTaskDriver(String clusterName) { return new TaskDriver(_gZkClient, clusterName); } + + private void preSetupForParallelInstancesStoppableTest(String clusterName, List<String> instances) { + _gSetupTool.addCluster(clusterName, true); + ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName); + clusterConfig.setFaultZoneType("helixZoneId"); + clusterConfig.setPersistIntermediateAssignment(true); + _configAccessor.setClusterConfig(clusterName, clusterConfig); + // Create instance configs + List<InstanceConfig> instanceConfigs = new ArrayList<>(); + for (int i = 0; i < instances.size() - 1; i++) { + InstanceConfig instanceConfig = new InstanceConfig(instances.get(i)); + instanceConfig.setDomain("helixZoneId=zone1,host=instance" + i); + instanceConfigs.add(instanceConfig); + } + instanceConfigs.add(new InstanceConfig(instances.get(instances.size() - 1))); + instanceConfigs.get(instanceConfigs.size() - 1).setDomain("helixZoneId=zone2,host=instance5"); + + instanceConfigs.get(1).setInstanceEnabled(false); + instanceConfigs.get(3).setInstanceEnabledForPartition("FakeResource", "FakePartition", false); + + for (InstanceConfig instanceConfig : instanceConfigs) { + _gSetupTool.getClusterManagementTool().addInstance(clusterName, instanceConfig); + } + + // Start participant + startInstances(clusterName, new TreeSet<>(instances), 3); + createResources(clusterName, 1); + startController(clusterName); + } } diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java new file mode 100644 index 0000000..2d9b420 --- /dev/null +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java @@ -0,0 +1,124 @@ +package org.apache.helix.rest.server; + +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.helix.TestHelper; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.rest.server.resources.helix.InstancesAccessor; +import org.apache.helix.rest.server.util.JerseyUriRequestBuilder; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; +import org.codehaus.jackson.JsonNode; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestInstancesAccessor extends AbstractTestClass { + private final static String CLUSTER_NAME = "TestCluster_0"; + + @Test + public void testEndToEndChecks() { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + // Select instances with zone based + String content = String + .format("{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\"]}", + InstancesAccessor.InstancesProperties.selection_base.name(), + InstancesAccessor.InstanceHealthSelectionBase.zone_based.name(), + InstancesAccessor.InstancesProperties.instances.name(), "instance0", "instance1", + "instance2", "instance3", "instance4", "instance5"); + Response response = + new JerseyUriRequestBuilder("clusters/{}/instances?command=stoppable").format(STOPPABLE_CLUSTER) + .post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE)); + String checkResult = response.readEntity(String.class); + Assert.assertEquals(checkResult, + "{\n \"instance_stoppable_parallel\" : [ \"instance0\", \"instance2\" ],\n" + + " \"instance_not_stoppable_with_reasons\" : {\n \"instance1\" : [ \"Helix:EMPTY_RESOURCE_ASSIGNMENT\", \"Helix:INSTANCE_NOT_ENABLED\", \"Helix:INSTANCE_NOT_STABLE\" ],\n" + + " \"instance3\" : [ \"Helix:HAS_DISABLED_PARTITION\" ],\n" + + " \"instance4\" : [ \"Helix:EMPTY_RESOURCE_ASSIGNMENT\", \"Helix:INSTANCE_NOT_STABLE\", \"Helix:INSTANCE_NOT_ALIVE\" ]\n }\n}\n"); + + // Disable one selected instance0, it should failed to check + String instance = "instance0"; + InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(STOPPABLE_CLUSTER, instance); + instanceConfig.setInstanceEnabled(false); + instanceConfig.setInstanceEnabledForPartition("FakeResource", "FakePartition", false); + _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER, instance, instanceConfig); + + Entity entity = + Entity.entity("", MediaType.APPLICATION_JSON_TYPE); + response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/stoppable") + .format(STOPPABLE_CLUSTER, instance).post(this, entity); + checkResult = response.readEntity(String.class); + Assert.assertEquals(checkResult, + "{\"stoppable\":false,\"failedChecks\":[\"Helix:HAS_DISABLED_PARTITION\",\"Helix:INSTANCE_NOT_ENABLED\",\"Helix:INSTANCE_NOT_STABLE\"]}"); + + // Reenable instance0, it should passed the check + instanceConfig.setInstanceEnabled(true); + instanceConfig.setInstanceEnabledForPartition("FakeResource", "FakePartition", true); + _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER, instance, instanceConfig); + HelixClusterVerifier + verifier = new BestPossibleExternalViewVerifier.Builder(STOPPABLE_CLUSTER).setZkAddr(ZK_ADDR).build(); + Assert.assertTrue(((BestPossibleExternalViewVerifier) verifier).verifyByPolling()); + + entity = + Entity.entity("", MediaType.APPLICATION_JSON_TYPE); + response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/stoppable") + .format(STOPPABLE_CLUSTER, instance).post(this, entity); + checkResult = response.readEntity(String.class); + Assert.assertEquals(checkResult, + "{\"stoppable\":true,\"failedChecks\":[]}"); + } + + @Test(dependsOnMethods = "testEndToEndChecks") + public void testGetAllInstances() throws IOException { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + String body = new JerseyUriRequestBuilder("clusters/{}/instances").isBodyReturnExpected(true) + .format(CLUSTER_NAME).get(this); + + JsonNode node = OBJECT_MAPPER.readTree(body); + String instancesStr = node.get(InstancesAccessor.InstancesProperties.instances.name()).toString(); + Assert.assertNotNull(instancesStr); + + Set<String> instances = OBJECT_MAPPER.readValue(instancesStr, + OBJECT_MAPPER.getTypeFactory().constructCollectionType(Set.class, String.class)); + Assert.assertEquals(instances, _instancesMap.get(CLUSTER_NAME), "Instances from response: " + + instances + " vs instances actually: " + _instancesMap.get(CLUSTER_NAME)); + } + + @Test(enabled = false) + public void testUpdateInstances() throws IOException { + // TODO: Reenable the test after storage node fix the problem + // Batch disable instances + + List<String> instancesToDisable = Arrays.asList( + new String[] { CLUSTER_NAME + "localhost_12918", CLUSTER_NAME + "localhost_12919", + CLUSTER_NAME + "localhost_12920" + }); + Entity entity = Entity.entity(OBJECT_MAPPER.writeValueAsString( + ImmutableMap.of(InstancesAccessor.InstancesProperties.instances.name(), instancesToDisable)), + MediaType.APPLICATION_JSON_TYPE); + post("clusters/" + CLUSTER_NAME + "/instances", ImmutableMap.of("command", "disable"), entity, + Response.Status.OK.getStatusCode()); + ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); + Assert.assertEquals(clusterConfig.getDisabledInstances().keySet(), + new HashSet<>(instancesToDisable)); + + instancesToDisable = Arrays + .asList(new String[] { CLUSTER_NAME + "localhost_12918", CLUSTER_NAME + "localhost_12920" + }); + entity = Entity.entity(OBJECT_MAPPER.writeValueAsString( + ImmutableMap.of(InstancesAccessor.InstancesProperties.instances.name(), instancesToDisable)), + MediaType.APPLICATION_JSON_TYPE); + post("clusters/" + CLUSTER_NAME + "/instances", ImmutableMap.of("command", "enable"), entity, + Response.Status.OK.getStatusCode()); + clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); + Assert.assertEquals(clusterConfig.getDisabledInstances().keySet(), + new HashSet<>(Arrays.asList(CLUSTER_NAME + "localhost_12919"))); + } +} diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstanceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java similarity index 66% rename from helix-rest/src/test/java/org/apache/helix/rest/server/TestInstanceAccessor.java rename to helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java index a4d0156..1a9a6da 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstanceAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java @@ -26,9 +26,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.TreeSet; import javax.ws.rs.client.Entity; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -38,14 +36,11 @@ import org.apache.helix.HelixException; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; import org.apache.helix.manager.zk.ZKHelixDataAccessor; -import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.Message; import org.apache.helix.rest.server.resources.AbstractResource; -import org.apache.helix.rest.server.resources.helix.InstanceAccessor; +import org.apache.helix.rest.server.resources.helix.PerInstanceAccessor; import org.apache.helix.rest.server.util.JerseyUriRequestBuilder; -import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; -import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; import org.apache.helix.util.InstanceValidationUtil; import org.codehaus.jackson.JsonNode; import org.testng.Assert; @@ -54,79 +49,22 @@ import org.testng.annotations.Test; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -public class TestInstanceAccessor extends AbstractTestClass { +public class TestPerInstanceAccessor extends AbstractTestClass { private final static String CLUSTER_NAME = "TestCluster_0"; private final static String INSTANCE_NAME = CLUSTER_NAME + "localhost_12918"; - @Test - public void testEndToEndChecks() { - System.out.println("Start test :" + TestHelper.getTestMethodName()); - String clusterName = TestHelper.getTestMethodName(); - List<String> instances = - Arrays.asList("instance0", "instance1", "instance2", "instance3", "instance4", "instance5"); - preSetupForParallelInstancesStoppableTest(clusterName, instances); - - // Select instances with zone based - String content = String - .format("{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\"]}", - InstanceAccessor.InstanceProperties.selection_base.name(), - InstanceAccessor.InstanceHealthSelectionBase.zone_based.name(), - InstanceAccessor.InstanceProperties.instances.name(), "instance0", "instance1", - "instance2", "instance3", "instance4", "instance5"); - Response response = - new JerseyUriRequestBuilder("clusters/{}/instances/stoppable").format(clusterName) - .post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE)); - String checkResult = response.readEntity(String.class); - Assert.assertEquals(checkResult, - "{\n \"instance_stoppable_parallel\" : [ \"instance0\", \"instance2\" ],\n" - + " \"instance_not_stoppable_with_reasons\" : {\n \"instance1\" : [ \"Helix:INSTANCE_NOT_STABLE\", \"Helix:INSTANCE_NOT_ENABLED\", \"Helix:EMPTY_RESOURCE_ASSIGNMENT\" ],\n" - + " \"instance3\" : [ \"Helix:HAS_DISABLED_PARTITION\" ],\n" - + " \"instance4\" : [ \"Helix:INSTANCE_NOT_STABLE\", \"Helix:INSTANCE_NOT_ALIVE\", \"Helix:EMPTY_RESOURCE_ASSIGNMENT\" ]\n }\n}\n"); - - // Disable one selected instance0, it should failed to check - String instance = "instance0"; - InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(clusterName, instance); - instanceConfig.setInstanceEnabled(false); - instanceConfig.setInstanceEnabledForPartition("FakeResource", "FakePartition", false); - _configAccessor.setInstanceConfig(clusterName, instance, instanceConfig); - - Entity entity = - Entity.entity("", MediaType.APPLICATION_JSON_TYPE); - response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/stoppable") - .format(clusterName, instance).post(this, entity); - checkResult = response.readEntity(String.class); - Assert.assertEquals(checkResult, - "{\"stoppable\":false,\"failedChecks\":[\"Helix:INSTANCE_NOT_STABLE\",\"Helix:HAS_DISABLED_PARTITION\",\"Helix:INSTANCE_NOT_ENABLED\"]}"); - - // Reenable instance0, it should passed the check - instanceConfig.setInstanceEnabled(true); - instanceConfig.setInstanceEnabledForPartition("FakeResource", "FakePartition", true); - _configAccessor.setInstanceConfig(clusterName, instance, instanceConfig); - HelixClusterVerifier verifier = new BestPossibleExternalViewVerifier.Builder(clusterName).setZkAddr(ZK_ADDR).build(); - Assert.assertTrue(((BestPossibleExternalViewVerifier) verifier).verifyByPolling()); - - entity = - Entity.entity("", MediaType.APPLICATION_JSON_TYPE); - response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/stoppable") - .format(clusterName, instance).post(this, entity); - checkResult = response.readEntity(String.class); - Assert.assertEquals(checkResult, - "{\"stoppable\":true,\"failedChecks\":[]}"); - } - - @Test(dependsOnMethods = "testEndToEndChecks") public void testIsInstanceStoppable() throws IOException { System.out.println("Start test :" + TestHelper.getTestMethodName()); Map<String, String> params = ImmutableMap.of("client", "espresso"); Entity entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(params), MediaType.APPLICATION_JSON_TYPE); Response response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/stoppable") - .format("testEndToEndChecks", "instance1").post(this, entity); + .format(STOPPABLE_CLUSTER, "instance1").post(this, entity); String stoppableCheckResult = response.readEntity(String.class); Assert.assertEquals(stoppableCheckResult, - "{\"stoppable\":false,\"failedChecks\":[\"Helix:INSTANCE_NOT_STABLE\"," - + "\"Helix:INSTANCE_NOT_ENABLED\",\"Helix:EMPTY_RESOURCE_ASSIGNMENT\"]}"); + "{\"stoppable\":false,\"failedChecks\":[\"Helix:EMPTY_RESOURCE_ASSIGNMENT\"," + + "\"Helix:INSTANCE_NOT_ENABLED\",\"Helix:INSTANCE_NOT_STABLE\"]}"); } @Test (dependsOnMethods = "testIsInstanceStoppable") @@ -150,7 +88,7 @@ public class TestInstanceAccessor extends AbstractTestClass { .isBodyReturnExpected(true).format(CLUSTER_NAME, INSTANCE_NAME).get(this); JsonNode node = OBJECT_MAPPER.readTree(body); int newMessageCount = - node.get(InstanceAccessor.InstanceProperties.total_message_count.name()).getIntValue(); + node.get(PerInstanceAccessor.PerInstanceProperties.total_message_count.name()).getIntValue(); Assert.assertEquals(newMessageCount, 1); } @@ -177,7 +115,7 @@ public class TestInstanceAccessor extends AbstractTestClass { .isBodyReturnExpected(true).format(CLUSTER_NAME, INSTANCE_NAME).get(this); JsonNode node = OBJECT_MAPPER.readTree(body); int newMessageCount = - node.get(InstanceAccessor.InstanceProperties.total_message_count.name()).getIntValue(); + node.get(PerInstanceAccessor.PerInstanceProperties.total_message_count.name()).getIntValue(); Assert.assertEquals(newMessageCount, 1); @@ -186,34 +124,18 @@ public class TestInstanceAccessor extends AbstractTestClass { .isBodyReturnExpected(true).format(CLUSTER_NAME, INSTANCE_NAME).get(this); node = OBJECT_MAPPER.readTree(body); newMessageCount = - node.get(InstanceAccessor.InstanceProperties.total_message_count.name()).getIntValue(); + node.get(PerInstanceAccessor.PerInstanceProperties.total_message_count.name()).getIntValue(); Assert.assertEquals(newMessageCount, 0); } @Test(dependsOnMethods = "testGetMessagesByStateModelDef") - public void testGetAllInstances() throws IOException { - System.out.println("Start test :" + TestHelper.getTestMethodName()); - String body = new JerseyUriRequestBuilder("clusters/{}/instances").isBodyReturnExpected(true) - .format(CLUSTER_NAME).get(this); - - JsonNode node = OBJECT_MAPPER.readTree(body); - String instancesStr = node.get(InstanceAccessor.InstanceProperties.instances.name()).toString(); - Assert.assertNotNull(instancesStr); - - Set<String> instances = OBJECT_MAPPER.readValue(instancesStr, - OBJECT_MAPPER.getTypeFactory().constructCollectionType(Set.class, String.class)); - Assert.assertEquals(instances, _instancesMap.get(CLUSTER_NAME), "Instances from response: " - + instances + " vs instances actually: " + _instancesMap.get(CLUSTER_NAME)); - } - - @Test public void testGetInstanceById() throws IOException { System.out.println("Start test :" + TestHelper.getTestMethodName()); String body = new JerseyUriRequestBuilder("clusters/{}/instances/{}").isBodyReturnExpected(true) .format(CLUSTER_NAME, INSTANCE_NAME).get(this); JsonNode node = OBJECT_MAPPER.readTree(body); - String instancesCfg = node.get(InstanceAccessor.InstanceProperties.config.name()).toString(); + String instancesCfg = node.get(PerInstanceAccessor.PerInstanceProperties.config.name()).toString(); Assert.assertNotNull(instancesCfg); boolean isHealth = node.get("health").getBooleanValue(); Assert.assertFalse(isHealth); @@ -268,7 +190,7 @@ public class TestInstanceAccessor extends AbstractTestClass { List<String> tagList = ImmutableList.of("tag3", "tag1", "tag2"); entity = Entity.entity( OBJECT_MAPPER.writeValueAsString(ImmutableMap.of(AbstractResource.Properties.id.name(), - INSTANCE_NAME, InstanceAccessor.InstanceProperties.instanceTags.name(), tagList)), + INSTANCE_NAME, PerInstanceAccessor.PerInstanceProperties.instanceTags.name(), tagList)), MediaType.APPLICATION_JSON_TYPE); new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=addInstanceTag") @@ -282,7 +204,7 @@ public class TestInstanceAccessor extends AbstractTestClass { removeList.remove("tag2"); entity = Entity.entity( OBJECT_MAPPER.writeValueAsString(ImmutableMap.of(AbstractResource.Properties.id.name(), - INSTANCE_NAME, InstanceAccessor.InstanceProperties.instanceTags.name(), removeList)), + INSTANCE_NAME, PerInstanceAccessor.PerInstanceProperties.instanceTags.name(), removeList)), MediaType.APPLICATION_JSON_TYPE); new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=removeInstanceTag") @@ -291,35 +213,6 @@ public class TestInstanceAccessor extends AbstractTestClass { Assert.assertEquals(_configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_NAME).getTags(), ImmutableList.of("tag2")); - // TODO: Reenable the test after storage node fix the problem - // Batch disable instances - /* - List<String> instancesToDisable = Arrays.asList( - new String[] { CLUSTER_NAME + "localhost_12918", CLUSTER_NAME + "localhost_12919", - CLUSTER_NAME + "localhost_12920" - }); - entity = Entity.entity(OBJECT_MAPPER.writeValueAsString( - ImmutableMap.of(InstanceAccessor.InstanceProperties.instances.name(), instancesToDisable)), - MediaType.APPLICATION_JSON_TYPE); - post("clusters/" + CLUSTER_NAME + "/instances", ImmutableMap.of("command", "disable"), entity, - Response.Status.OK.getStatusCode()); - ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); - Assert.assertEquals(clusterConfig.getDisabledInstances().keySet(), - new HashSet<>(instancesToDisable)); - - instancesToDisable = Arrays - .asList(new String[] { CLUSTER_NAME + "localhost_12918", CLUSTER_NAME + "localhost_12920" - }); - entity = Entity.entity(OBJECT_MAPPER.writeValueAsString( - ImmutableMap.of(InstanceAccessor.InstanceProperties.instances.name(), instancesToDisable)), - MediaType.APPLICATION_JSON_TYPE); - post("clusters/" + CLUSTER_NAME + "/instances", ImmutableMap.of("command", "enable"), entity, - Response.Status.OK.getStatusCode()); - clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); - Assert.assertEquals(clusterConfig.getDisabledInstances().keySet(), - new HashSet<>(Arrays.asList(CLUSTER_NAME + "localhost_12919"))); - */ - // Test enable disable partitions String dbName = "_db_0_"; List<String> partitionsToDisable = Arrays.asList(CLUSTER_NAME + dbName + "0", @@ -327,9 +220,9 @@ public class TestInstanceAccessor extends AbstractTestClass { entity = Entity.entity( OBJECT_MAPPER.writeValueAsString(ImmutableMap.of(AbstractResource.Properties.id.name(), - INSTANCE_NAME, InstanceAccessor.InstanceProperties.resource.name(), + INSTANCE_NAME, PerInstanceAccessor.PerInstanceProperties.resource.name(), CLUSTER_NAME + dbName.substring(0, dbName.length() - 1), - InstanceAccessor.InstanceProperties.partitions.name(), partitionsToDisable)), + PerInstanceAccessor.PerInstanceProperties.partitions.name(), partitionsToDisable)), MediaType.APPLICATION_JSON_TYPE); new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=disablePartitions") @@ -342,9 +235,9 @@ public class TestInstanceAccessor extends AbstractTestClass { new HashSet<>(partitionsToDisable)); entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(ImmutableMap .of(AbstractResource.Properties.id.name(), INSTANCE_NAME, - InstanceAccessor.InstanceProperties.resource.name(), + PerInstanceAccessor.PerInstanceProperties.resource.name(), CLUSTER_NAME + dbName.substring(0, dbName.length() - 1), - InstanceAccessor.InstanceProperties.partitions.name(), + PerInstanceAccessor.PerInstanceProperties.partitions.name(), ImmutableList.of(CLUSTER_NAME + dbName + "1"))), MediaType.APPLICATION_JSON_TYPE); new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=enablePartitions") @@ -494,32 +387,5 @@ public class TestInstanceAccessor extends AbstractTestClass { Collections.EMPTY_MAP, Collections.EMPTY_MAP, Collections.EMPTY_MAP)); } - private void preSetupForParallelInstancesStoppableTest(String clusterName, List<String> instances) { - _gSetupTool.addCluster(clusterName, true); - ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName); - clusterConfig.setFaultZoneType("helixZoneId"); - clusterConfig.setPersistIntermediateAssignment(true); - _configAccessor.setClusterConfig(clusterName, clusterConfig); - // Create instance configs - List<InstanceConfig> instanceConfigs = new ArrayList<>(); - for (int i = 0; i < instances.size() - 1; i++) { - InstanceConfig instanceConfig = new InstanceConfig(instances.get(i)); - instanceConfig.setDomain("helixZoneId=zone1,host=instance" + i); - instanceConfigs.add(instanceConfig); - } - instanceConfigs.add(new InstanceConfig(instances.get(instances.size() - 1))); - instanceConfigs.get(instanceConfigs.size() - 1).setDomain("helixZoneId=zone2,host=instance5"); - - instanceConfigs.get(1).setInstanceEnabled(false); - instanceConfigs.get(3).setInstanceEnabledForPartition("FakeResource", "FakePartition", false); - for (InstanceConfig instanceConfig : instanceConfigs) { - _gSetupTool.getClusterManagementTool().addInstance(clusterName, instanceConfig); - } - - // Start participant - startInstances(clusterName, new TreeSet<>(instances), 3); - createResources(clusterName, 1); - startController(clusterName); - } }
