http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java new file mode 100644 index 0000000..1eac9c2 --- /dev/null +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java @@ -0,0 +1,79 @@ +package org.apache.helix.rest.server.resources.helix; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.ZNRecord; +import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.rest.common.ContextPropertyKeys; +import org.apache.helix.rest.server.ServerContext; +import org.apache.helix.rest.server.resources.AbstractResource; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.tools.ClusterSetup; + + +/** + * This class provides methods to access Helix specific objects + * such as cluster, instance, job, resource, workflow, etc in + * metadata store. + */ +public class AbstractHelixResource extends AbstractResource{ + + public ZkClient getZkClient() { + ServerContext serverContext = getServerContext(); + return serverContext.getZkClient(); + } + + public HelixAdmin getHelixAdmin() { + ServerContext serverContext = getServerContext(); + return serverContext.getHelixAdmin(); + } + + public ClusterSetup getClusterSetup() { + ServerContext serverContext = getServerContext(); + return serverContext.getClusterSetup(); + } + + public TaskDriver getTaskDriver(String clusterName) { + ServerContext serverContext = getServerContext(); + return serverContext.getTaskDriver(clusterName); + } + + public ConfigAccessor getConfigAccessor() { + ServerContext serverContext = getServerContext(); + return serverContext.getConfigAccessor(); + } + + public HelixDataAccessor getDataAccssor(String clusterName) { + ServerContext serverContext = getServerContext(); + return serverContext.getDataAccssor(clusterName); + } + + protected static ZNRecord toZNRecord(String data) throws IOException { + return OBJECT_MAPPER.reader(ZNRecord.class).readValue(data); + } + + private ServerContext getServerContext() { + return (ServerContext) _application.getProperties().get(ContextPropertyKeys.SERVER_CONTEXT.name()); + } +}
http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java new file mode 100644 index 0000000..f6f95b0 --- /dev/null +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java @@ -0,0 +1,400 @@ +package org.apache.helix.rest.server.resources.helix; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Response; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixException; +import org.apache.helix.PropertyKey; +import org.apache.helix.ZNRecord; +import org.apache.helix.manager.zk.ZKUtil; +import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.HelixConfigScope; +import org.apache.helix.model.LeaderHistory; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.Message; +import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.model.builder.HelixConfigScopeBuilder; +import org.apache.helix.tools.ClusterSetup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Path("/clusters") +public class ClusterAccessor extends AbstractHelixResource { + private static Logger _logger = LoggerFactory.getLogger(ClusterAccessor.class.getName()); + + public enum ClusterProperties { + controller, + instances, + liveInstances, + resources, + paused, + maintenance, + messages, + stateModelDefinitions, + clusters + } + + @GET + public Response getClusters() { + HelixAdmin helixAdmin = getHelixAdmin(); + List<String> clusters = helixAdmin.getClusters(); + + Map<String, List<String>> dataMap = new HashMap<>(); + dataMap.put(ClusterProperties.clusters.name(), clusters); + + return JSONRepresentation(dataMap); + } + + @GET + @Path("{clusterId}") + public Response getClusterInfo(@PathParam("clusterId") String clusterId) { + if (!isClusterExist(clusterId)) { + return notFound(); + } + + HelixDataAccessor dataAccessor = getDataAccssor(clusterId); + PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder(); + + Map<String, Object> clusterInfo = new HashMap<>(); + clusterInfo.put(Properties.id.name(), clusterId); + + LiveInstance controller = dataAccessor.getProperty(keyBuilder.controllerLeader()); + if (controller != null) { + clusterInfo.put(ClusterProperties.controller.name(), controller.getInstanceName()); + } else { + clusterInfo.put(ClusterProperties.controller.name(), "No Lead Controller!"); + } + + boolean paused = (dataAccessor.getProperty(keyBuilder.pause()) == null ? false : true); + clusterInfo.put(ClusterProperties.paused.name(), paused); + boolean maintenance = + (dataAccessor.getProperty(keyBuilder.maintenance()) == null ? false : true); + clusterInfo.put(ClusterProperties.maintenance.name(), maintenance); + + List<String> idealStates = dataAccessor.getChildNames(keyBuilder.idealStates()); + clusterInfo.put(ClusterProperties.resources.name(), idealStates); + List<String> instances = dataAccessor.getChildNames(keyBuilder.instanceConfigs()); + clusterInfo.put(ClusterProperties.instances.name(), instances); + List<String> liveInstances = dataAccessor.getChildNames(keyBuilder.liveInstances()); + clusterInfo.put(ClusterProperties.liveInstances.name(), liveInstances); + + return JSONRepresentation(clusterInfo); + } + + + @PUT + @Path("{clusterId}") + public Response createCluster(@PathParam("clusterId") String clusterId, + @DefaultValue("false") @QueryParam("recreate") String recreate) { + boolean recreateIfExists = Boolean.valueOf(recreate); + ClusterSetup clusterSetup = getClusterSetup(); + + try { + clusterSetup.addCluster(clusterId, recreateIfExists); + } catch (Exception ex) { + _logger.error("Failed to create cluster " + clusterId + ", exception: " + ex); + return serverError(ex); + } + + return created(); + } + + @DELETE + @Path("{clusterId}") + public Response deleteCluster(@PathParam("clusterId") String clusterId) { + ClusterSetup clusterSetup = getClusterSetup(); + + try { + clusterSetup.deleteCluster(clusterId); + } catch (HelixException ex) { + _logger.info( + "Failed to delete cluster " + clusterId + ", cluster is still in use. Exception: " + ex); + return badRequest(ex.getMessage()); + } catch (Exception ex) { + _logger.error("Failed to delete cluster " + clusterId + ", exception: " + ex); + return serverError(ex); + } + + return OK(); + } + + @POST + @Path("{clusterId}") + public Response updateCluster(@PathParam("clusterId") String clusterId, + @QueryParam("command") String commandStr, @QueryParam("superCluster") String superCluster, + String content) { + Command command; + try { + command = getCommand(commandStr); + } catch (HelixException ex) { + return badRequest(ex.getMessage()); + } + + ClusterSetup clusterSetup = getClusterSetup(); + HelixAdmin helixAdmin = getHelixAdmin(); + + switch (command) { + case activate: + if (superCluster == null) { + return badRequest("Super Cluster name is missing!"); + } + try { + clusterSetup.activateCluster(clusterId, superCluster, true); + } catch (Exception ex) { + _logger.error("Failed to add cluster " + clusterId + " to super cluster " + superCluster); + return serverError(ex); + } + break; + + case expand: + try { + clusterSetup.expandCluster(clusterId); + } catch (Exception ex) { + _logger.error("Failed to expand cluster " + clusterId); + return serverError(ex); + } + break; + + case enable: + try { + helixAdmin.enableCluster(clusterId, true); + } catch (Exception ex) { + _logger.error("Failed to enable cluster " + clusterId); + return serverError(ex); + } + break; + + case disable: + try { + helixAdmin.enableCluster(clusterId, false); + } catch (Exception ex) { + _logger.error("Failed to disable cluster " + clusterId); + return serverError(ex); + } + break; + case enableMaintenanceMode: + try { + helixAdmin.enableMaintenanceMode(clusterId, true, content); + } catch (Exception ex) { + _logger.error("Failed to enable maintenance mode " + clusterId); + return serverError(ex); + } + break; + case disableMaintenanceMode: + try { + helixAdmin.enableMaintenanceMode(clusterId, false); + } catch (Exception ex) { + _logger.error("Failed to disable maintenance mode " + clusterId); + return serverError(ex); + } + break; + default: + return badRequest("Unsupported command " + command); + } + + return OK(); + } + + + @GET + @Path("{clusterId}/configs") + public Response getClusterConfig(@PathParam("clusterId") String clusterId) { + ConfigAccessor accessor = getConfigAccessor(); + ClusterConfig config = null; + try { + config = accessor.getClusterConfig(clusterId); + } catch (HelixException ex) { + // cluster not found. + _logger.info("Failed to get cluster config for cluster " + clusterId + + ", cluster not found, Exception: " + ex); + } catch (Exception ex) { + _logger.error("Failed to get cluster config for cluster " + clusterId + " Exception: " + ex); + return serverError(ex); + } + if (config == null) { + return notFound(); + } + return JSONRepresentation(config.getRecord()); + } + + @POST + @Path("{clusterId}/configs") + public Response updateClusterConfig( + @PathParam("clusterId") String clusterId, @QueryParam("command") String commandStr, + String content) { + Command command; + try { + command = getCommand(commandStr); + } catch (HelixException ex) { + return badRequest(ex.getMessage()); + } + + ZNRecord record; + try { + record = toZNRecord(content); + } catch (IOException e) { + _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e); + return badRequest("Input is not a valid ZNRecord!"); + } + + if (!record.getId().equals(clusterId)) { + return badRequest("ID does not match the cluster name in input!"); + } + + ClusterConfig config = new ClusterConfig(record); + ConfigAccessor configAccessor = getConfigAccessor(); + try { + switch (command) { + case update: + configAccessor.updateClusterConfig(clusterId, config); + break; + case delete: { + HelixConfigScope clusterScope = + new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER) + .forCluster(clusterId).build(); + configAccessor.remove(clusterScope, config.getRecord()); + } + break; + + default: + return badRequest("Unsupported command " + commandStr); + } + } catch (HelixException ex) { + return notFound(ex.getMessage()); + } catch (Exception ex) { + _logger.error( + "Failed to " + command + " cluster config, cluster " + clusterId + " new config: " + + content + ", Exception: " + ex); + return serverError(ex); + } + return OK(); + } + + @GET + @Path("{clusterId}/controller") + public Response getClusterController(@PathParam("clusterId") String clusterId) { + HelixDataAccessor dataAccessor = getDataAccssor(clusterId); + Map<String, Object> controllerInfo = new HashMap<>(); + controllerInfo.put(Properties.id.name(), clusterId); + + LiveInstance leader = dataAccessor.getProperty(dataAccessor.keyBuilder().controllerLeader()); + if (leader != null) { + controllerInfo.put(ClusterProperties.controller.name(), leader.getInstanceName()); + controllerInfo.putAll(leader.getRecord().getSimpleFields()); + } else { + controllerInfo.put(ClusterProperties.controller.name(), "No Lead Controller!"); + } + + return JSONRepresentation(controllerInfo); + } + + @GET + @Path("{clusterId}/controller/history") + public Response getClusterControllerHistory(@PathParam("clusterId") String clusterId) { + HelixDataAccessor dataAccessor = getDataAccssor(clusterId); + Map<String, Object> controllerHistory = new HashMap<>(); + controllerHistory.put(Properties.id.name(), clusterId); + + LeaderHistory history = + dataAccessor.getProperty(dataAccessor.keyBuilder().controllerLeaderHistory()); + if (history != null) { + controllerHistory.put(Properties.history.name(), history.getHistoryList()); + } else { + controllerHistory.put(Properties.history.name(), Collections.emptyList()); + } + + return JSONRepresentation(controllerHistory); + } + + @GET + @Path("{clusterId}/controller/messages") + public Response getClusterControllerMessages(@PathParam("clusterId") String clusterId) { + HelixDataAccessor dataAccessor = getDataAccssor(clusterId); + + Map<String, Object> controllerMessages = new HashMap<>(); + controllerMessages.put(Properties.id.name(), clusterId); + + List<String> messages = + dataAccessor.getChildNames(dataAccessor.keyBuilder().controllerMessages()); + controllerMessages.put(ClusterProperties.messages.name(), messages); + controllerMessages.put(Properties.count.name(), messages.size()); + + return JSONRepresentation(controllerMessages); + } + + @GET + @Path("{clusterId}/controller/messages/{messageId}") + public Response getClusterControllerMessages(@PathParam("clusterId") String clusterId, @PathParam("messageId") String messageId) { + HelixDataAccessor dataAccessor = getDataAccssor(clusterId); + Message message = dataAccessor.getProperty( + dataAccessor.keyBuilder().controllerMessage(messageId)); + return JSONRepresentation(message.getRecord()); + } + + @GET + @Path("{clusterId}/statemodeldefs") + public Response getClusterStateModelDefinitions(@PathParam("clusterId") String clusterId) { + HelixDataAccessor dataAccessor = getDataAccssor(clusterId); + List<String> stateModelDefs = + dataAccessor.getChildNames(dataAccessor.keyBuilder().stateModelDefs()); + + Map<String, Object> clusterStateModelDefs = new HashMap<>(); + clusterStateModelDefs.put(Properties.id.name(), clusterId); + clusterStateModelDefs.put(ClusterProperties.stateModelDefinitions.name(), stateModelDefs); + + return JSONRepresentation(clusterStateModelDefs); + } + + @GET + @Path("{clusterId}/statemodeldefs/{statemodel}") + public Response getClusterStateModelDefinition(@PathParam("clusterId") String clusterId, + @PathParam("statemodel") String statemodel) { + HelixDataAccessor dataAccessor = getDataAccssor(clusterId); + StateModelDefinition stateModelDef = + dataAccessor.getProperty(dataAccessor.keyBuilder().stateModelDef(statemodel)); + + return JSONRepresentation(stateModelDef.getRecord()); + } + + private boolean isClusterExist(String cluster) { + ZkClient zkClient = getZkClient(); + if (ZKUtil.isClusterSetup(cluster, zkClient)) { + return true; + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstanceAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstanceAccessor.java new file mode 100644 index 0000000..2748dea --- /dev/null +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstanceAccessor.java @@ -0,0 +1,545 @@ +package org.apache.helix.rest.server.resources.helix; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Response; + +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixException; +import org.apache.helix.ZNRecord; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.Error; +import org.apache.helix.model.HealthStat; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.Message; +import org.apache.helix.model.ParticipantHistory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.node.ArrayNode; +import org.codehaus.jackson.node.JsonNodeFactory; +import org.codehaus.jackson.node.ObjectNode; + +@Path("/clusters/{clusterId}/instances") +public class InstanceAccessor extends AbstractHelixResource { + private final static Logger _logger = LoggerFactory.getLogger(InstanceAccessor.class); + + public enum InstanceProperties { + instances, + online, + disabled, + config, + liveInstance, + resource, + resources, + partitions, + errors, + new_messages, + read_messages, + total_message_count, + read_message_count, + healthreports, + instanceTags + } + + @GET + public Response getInstances(@PathParam("clusterId") String clusterId) { + HelixDataAccessor accessor = getDataAccssor(clusterId); + ObjectNode root = JsonNodeFactory.instance.objectNode(); + root.put(Properties.id.name(), JsonNodeFactory.instance.textNode(clusterId)); + + ArrayNode instancesNode = root.putArray(InstanceProperties.instances.name()); + ArrayNode onlineNode = root.putArray(InstanceProperties.online.name()); + ArrayNode disabledNode = root.putArray(InstanceProperties.disabled.name()); + + List<String> instances = accessor.getChildNames(accessor.keyBuilder().instanceConfigs()); + + if (instances != null) { + instancesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(instances)); + } else { + return notFound(); + } + + List<String> liveInstances = accessor.getChildNames(accessor.keyBuilder().liveInstances()); + ClusterConfig clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig()); + + for (String instanceName : instances) { + InstanceConfig instanceConfig = + accessor.getProperty(accessor.keyBuilder().instanceConfig(instanceName)); + if (instanceConfig != null) { + if (!instanceConfig.getInstanceEnabled() || (clusterConfig.getDisabledInstances() != null + && clusterConfig.getDisabledInstances().containsKey(instanceName))) { + disabledNode.add(JsonNodeFactory.instance.textNode(instanceName)); + } + + if (liveInstances.contains(instanceName)){ + onlineNode.add(JsonNodeFactory.instance.textNode(instanceName)); + } + } + } + + return JSONRepresentation(root); + } + + @POST + public Response updateInstances(@PathParam("clusterId") String clusterId, + @QueryParam("command") String command, String content) { + Command cmd; + try { + cmd = Command.valueOf(command); + } catch (Exception e) { + return badRequest("Invalid command : " + command); + } + + HelixAdmin admin = getHelixAdmin(); + try { + JsonNode node = null; + if (content.length() != 0) { + node = OBJECT_MAPPER.readTree(content); + } + if (node == null) { + return badRequest("Invalid input for content : " + content); + } + List<String> enableInstances = OBJECT_MAPPER + .readValue(node.get(InstanceProperties.instances.name()).toString(), + OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class)); + switch (cmd) { + case enable: + admin.enableInstance(clusterId, enableInstances, true); + + break; + case disable: + admin.enableInstance(clusterId, enableInstances, false); + break; + default: + _logger.error("Unsupported command :" + command); + return badRequest("Unsupported command :" + command); + } + } catch (Exception e) { + _logger.error("Failed in updating instances : " + content, e); + return badRequest(e.getMessage()); + } + return OK(); + } + + @GET + @Path("{instanceName}") + public Response getInstance(@PathParam("clusterId") String clusterId, + @PathParam("instanceName") String instanceName) throws IOException { + HelixDataAccessor accessor = getDataAccssor(clusterId); + Map<String, Object> instanceMap = new HashMap<>(); + instanceMap.put(Properties.id.name(), JsonNodeFactory.instance.textNode(instanceName)); + instanceMap.put(InstanceProperties.liveInstance.name(), null); + + InstanceConfig instanceConfig = + accessor.getProperty(accessor.keyBuilder().instanceConfig(instanceName)); + LiveInstance liveInstance = + accessor.getProperty(accessor.keyBuilder().liveInstance(instanceName)); + + if (instanceConfig != null) { + instanceMap.put(InstanceProperties.config.name(), instanceConfig.getRecord()); + } else { + return notFound(); + } + + if (liveInstance != null) { + instanceMap.put(InstanceProperties.liveInstance.name(), liveInstance.getRecord()); + } + + return JSONRepresentation(instanceMap); + } + + @PUT + @Path("{instanceName}") + public Response addInstance(@PathParam("clusterId") String clusterId, + @PathParam("instanceName") String instanceName, String content) { + HelixAdmin admin = getHelixAdmin(); + ZNRecord record; + try { + record = toZNRecord(content); + } catch (IOException e) { + _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e); + return badRequest("Input is not a vaild ZNRecord!"); + } + + try { + admin.addInstance(clusterId, new InstanceConfig(record)); + } catch (Exception ex) { + _logger.error("Error in adding an instance: " + instanceName, ex); + return serverError(ex); + } + + return OK(); + } + + @POST + @Path("{instanceName}") + public Response updateInstance(@PathParam("clusterId") String clusterId, + @PathParam("instanceName") String instanceName, @QueryParam("command") String command, + String content) { + Command cmd; + try { + cmd = Command.valueOf(command); + } catch (Exception e) { + return badRequest("Invalid command : " + command); + } + + HelixAdmin admin = getHelixAdmin(); + try { + JsonNode node = null; + if (content.length() != 0) { + node = OBJECT_MAPPER.readTree(content); + } + + switch (cmd) { + case enable: + admin.enableInstance(clusterId, instanceName, true); + break; + case disable: + admin.enableInstance(clusterId, instanceName, false); + break; + case reset: + if (!validInstance(node, instanceName)) { + return badRequest("Instance names are not match!"); + } + admin.resetPartition(clusterId, instanceName, + node.get(InstanceProperties.resource.name()).toString(), (List<String>) OBJECT_MAPPER + .readValue(node.get(InstanceProperties.partitions.name()).toString(), + OBJECT_MAPPER.getTypeFactory() + .constructCollectionType(List.class, String.class))); + break; + case addInstanceTag: + if (!validInstance(node, instanceName)) { + return badRequest("Instance names are not match!"); + } + for (String tag : (List<String>) OBJECT_MAPPER + .readValue(node.get(InstanceProperties.instanceTags.name()).toString(), + OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class))) { + admin.addInstanceTag(clusterId, instanceName, tag); + } + break; + case removeInstanceTag: + if (!validInstance(node, instanceName)) { + return badRequest("Instance names are not match!"); + } + for (String tag : (List<String>) OBJECT_MAPPER + .readValue(node.get(InstanceProperties.instanceTags.name()).toString(), + OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class))) { + admin.removeInstanceTag(clusterId, instanceName, tag); + } + break; + case enablePartitions: + admin.enablePartition(true, clusterId, instanceName, + node.get(InstanceProperties.resource.name()).getTextValue(), + (List<String>) OBJECT_MAPPER + .readValue(node.get(InstanceProperties.partitions.name()).toString(), + OBJECT_MAPPER.getTypeFactory() + .constructCollectionType(List.class, String.class))); + break; + case disablePartitions: + admin.enablePartition(false, clusterId, instanceName, + node.get(InstanceProperties.resource.name()).getTextValue(), + (List<String>) OBJECT_MAPPER + .readValue(node.get(InstanceProperties.partitions.name()).toString(), + OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class))); + break; + default: + _logger.error("Unsupported command :" + command); + return badRequest("Unsupported command :" + command); + } + } catch (Exception e) { + _logger.error("Failed in updating instance : " + instanceName, e); + return badRequest(e.getMessage()); + } + return OK(); + } + + @DELETE + @Path("{instanceName}") + public Response deleteInstance(@PathParam("clusterId") String clusterId, + @PathParam("instanceName") String instanceName) { + HelixAdmin admin = getHelixAdmin(); + try { + InstanceConfig instanceConfig = admin.getInstanceConfig(clusterId, instanceName); + admin.dropInstance(clusterId, instanceConfig); + } catch (HelixException e) { + return badRequest(e.getMessage()); + } + + return OK(); + } + + @GET + @Path("{instanceName}/configs") + public Response getInstanceConfig(@PathParam("clusterId") String clusterId, + @PathParam("instanceName") String instanceName) throws IOException { + HelixDataAccessor accessor = getDataAccssor(clusterId); + InstanceConfig instanceConfig = + accessor.getProperty(accessor.keyBuilder().instanceConfig(instanceName)); + + if (instanceConfig != null) { + return JSONRepresentation(instanceConfig.getRecord()); + } + + return notFound(); + } + + @PUT + @Path("{instanceName}/configs") + public Response updateInstanceConfig(@PathParam("clusterId") String clusterId, + @PathParam("instanceName") String instanceName, String content) throws IOException { + HelixAdmin admin = getHelixAdmin(); + ZNRecord record; + try { + record = toZNRecord(content); + } catch (IOException e) { + _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e); + return badRequest("Input is not a vaild ZNRecord!"); + } + + try { + admin.setInstanceConfig(clusterId, instanceName, new InstanceConfig(record)); + } catch (Exception ex) { + _logger.error("Error in update instance config: " + instanceName, ex); + return serverError(ex); + } + + return OK(); + } + + @GET + @Path("{instanceName}/resources") + public Response getResourcesOnInstance(@PathParam("clusterId") String clusterId, + @PathParam("instanceName") String instanceName) throws IOException { + HelixDataAccessor accessor = getDataAccssor(clusterId); + + ObjectNode root = JsonNodeFactory.instance.objectNode(); + root.put(Properties.id.name(), instanceName); + ArrayNode resourcesNode = root.putArray(InstanceProperties.resources.name()); + + List<String> sessionIds = accessor.getChildNames(accessor.keyBuilder().sessions(instanceName)); + if (sessionIds == null || sessionIds.size() == 0) { + return null; + } + + // Only get resource list from current session id + String currentSessionId = sessionIds.get(0); + + List<String> resources = + accessor.getChildNames(accessor.keyBuilder().currentStates(instanceName, currentSessionId)); + if (resources != null && resources.size() > 0) { + resourcesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(resources)); + } + + return JSONRepresentation(root); + } + + @GET + @Path("{instanceName}/resources/{resourceName}") + public Response getResourceOnInstance(@PathParam("clusterId") String clusterId, + @PathParam("instanceName") String instanceName, + @PathParam("resourceName") String resourceName) throws IOException { + HelixDataAccessor accessor = getDataAccssor(clusterId); + List<String> sessionIds = accessor.getChildNames(accessor.keyBuilder().sessions(instanceName)); + if (sessionIds == null || sessionIds.size() == 0) { + return notFound(); + } + + // Only get resource list from current session id + String currentSessionId = sessionIds.get(0); + CurrentState resourceCurrentState = accessor + .getProperty(accessor.keyBuilder().currentState(instanceName, currentSessionId, resourceName)); + if (resourceCurrentState != null) { + return JSONRepresentation(resourceCurrentState.getRecord()); + } + + return notFound(); + } + + @GET + @Path("{instanceName}/errors") + public Response getErrorsOnInstance(@PathParam("clusterId") String clusterId, + @PathParam("instanceName") String instanceName) throws IOException { + HelixDataAccessor accessor = getDataAccssor(clusterId); + + ObjectNode root = JsonNodeFactory.instance.objectNode(); + root.put(Properties.id.name(), instanceName); + ObjectNode errorsNode = JsonNodeFactory.instance.objectNode(); + + List<String> sessionIds = + accessor.getChildNames(accessor.keyBuilder().errors(instanceName)); + + if (sessionIds == null || sessionIds.size() == 0) { + return notFound(); + } + + for (String sessionId : sessionIds) { + List<String> resources = + accessor.getChildNames(accessor.keyBuilder().errors(instanceName, sessionId)); + if (resources != null) { + ObjectNode resourcesNode = JsonNodeFactory.instance.objectNode(); + for (String resourceName : resources) { + List<String> partitions = accessor + .getChildNames(accessor.keyBuilder().errors(instanceName, sessionId, resourceName)); + if (partitions != null) { + ArrayNode partitionsNode = resourcesNode.putArray(resourceName); + partitionsNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(partitions)); + } + } + errorsNode.put(sessionId, resourcesNode); + } + } + root.put(InstanceProperties.errors.name(), errorsNode); + + return JSONRepresentation(root); + } + + @GET + @Path("{instanceName}/errors/{sessionId}/{resourceName}/{partitionName}") + public Response getErrorsOnInstance(@PathParam("clusterId") String clusterId, + @PathParam("instanceName") String instanceName, @PathParam("sessionId") String sessionId, + @PathParam("resourceName") String resourceName, + @PathParam("partitionName") String partitionName) throws IOException { + HelixDataAccessor accessor = getDataAccssor(clusterId); + Error error = accessor.getProperty(accessor.keyBuilder() + .stateTransitionError(instanceName, sessionId, resourceName, partitionName)); + if (error != null) { + return JSONRepresentation(error.getRecord()); + } + + return notFound(); + } + + @GET + @Path("{instanceName}/history") + public Response getHistoryOnInstance(@PathParam("clusterId") String clusterId, + @PathParam("instanceName") String instanceName) throws IOException { + HelixDataAccessor accessor = getDataAccssor(clusterId); + ParticipantHistory history = + accessor.getProperty(accessor.keyBuilder().participantHistory(instanceName)); + if (history != null) { + return JSONRepresentation(history.getRecord()); + } + return notFound(); + } + + @GET + @Path("{instanceName}/messages") + public Response getMessagesOnInstance(@PathParam("clusterId") String clusterId, + @PathParam("instanceName") String instanceName) throws IOException { + HelixDataAccessor accessor = getDataAccssor(clusterId); + + ObjectNode root = JsonNodeFactory.instance.objectNode(); + root.put(Properties.id.name(), instanceName); + ArrayNode newMessages = root.putArray(InstanceProperties.new_messages.name()); + ArrayNode readMessages = root.putArray(InstanceProperties.read_messages.name()); + + + List<String> messages = + accessor.getChildNames(accessor.keyBuilder().messages(instanceName)); + if (messages == null || messages.size() == 0) { + return notFound(); + } + + for (String messageName : messages) { + Message message = accessor.getProperty(accessor.keyBuilder().message(instanceName, messageName)); + if (message.getMsgState() == Message.MessageState.NEW) { + newMessages.add(messageName); + } + + if (message.getMsgState() == Message.MessageState.READ) { + readMessages.add(messageName); + } + } + + root.put(InstanceProperties.total_message_count.name(), + newMessages.size() + readMessages.size()); + root.put(InstanceProperties.read_message_count.name(), readMessages.size()); + + return JSONRepresentation(root); + } + + @GET + @Path("{instanceName}/messages/{messageId}") + public Response getMessageOnInstance(@PathParam("clusterId") String clusterId, + @PathParam("instanceName") String instanceName, + @PathParam("messageId") String messageId) throws IOException { + HelixDataAccessor accessor = getDataAccssor(clusterId); + Message message = accessor.getProperty(accessor.keyBuilder().message(instanceName, messageId)); + if (message != null) { + return JSONRepresentation(message.getRecord()); + } + + return notFound(); + } + + @GET + @Path("{instanceName}/healthreports") + public Response getHealthReportsOnInstance(@PathParam("clusterId") String clusterId, + @PathParam("instanceName") String instanceName) throws IOException { + HelixDataAccessor accessor = getDataAccssor(clusterId); + + ObjectNode root = JsonNodeFactory.instance.objectNode(); + root.put(Properties.id.name(), instanceName); + ArrayNode healthReportsNode = root.putArray(InstanceProperties.healthreports.name()); + + List<String> healthReports = + accessor.getChildNames(accessor.keyBuilder().healthReports(instanceName)); + + if (healthReports != null && healthReports.size() > 0) { + healthReportsNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(healthReports)); + } + + return JSONRepresentation(root); + } + + @GET + @Path("{instanceName}/healthreports/{reportName}") + public Response getHealthReportsOnInstance(@PathParam("clusterId") String clusterId, + @PathParam("instanceName") String instanceName, + @PathParam("reportName") String reportName) throws IOException { + HelixDataAccessor accessor = getDataAccssor(clusterId); + HealthStat healthStat = + accessor.getProperty(accessor.keyBuilder().healthReport(instanceName, reportName)); + if (healthStat != null) { + return JSONRepresentation(healthStat); + } + + return notFound(); + } + + private boolean validInstance(JsonNode node, String instanceName) { + return instanceName.equals(node.get(Properties.id.name()).getValueAsText()); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java new file mode 100644 index 0000000..2d27f51 --- /dev/null +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java @@ -0,0 +1,200 @@ +package org.apache.helix.rest.server.resources.helix; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.core.Response; + +import org.apache.helix.HelixException; +import org.apache.helix.ZNRecord; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.JobContext; +import org.apache.helix.task.TaskConfig; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.WorkflowConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.codehaus.jackson.node.ArrayNode; +import org.codehaus.jackson.node.JsonNodeFactory; +import org.codehaus.jackson.node.ObjectNode; + +@Path("/clusters/{clusterId}/workflows/{workflowName}/jobs") +public class JobAccessor extends AbstractHelixResource { + private static Logger _logger = LoggerFactory.getLogger(JobAccessor.class.getName()); + + public enum JobProperties { + Jobs, + JobConfig, + JobContext, + TASK_COMMAND + } + + @GET + public Response getJobs(@PathParam("clusterId") String clusterId, + @PathParam("workflowName") String workflowName) { + TaskDriver driver = getTaskDriver(clusterId); + WorkflowConfig workflowConfig = driver.getWorkflowConfig(workflowName); + ObjectNode root = JsonNodeFactory.instance.objectNode(); + + if (workflowConfig == null) { + return badRequest(String.format("Workflow %s is not found!", workflowName)); + } + + Set<String> jobs = workflowConfig.getJobDag().getAllNodes(); + root.put(Properties.id.name(), JobProperties.Jobs.name()); + ArrayNode jobsNode = root.putArray(JobProperties.Jobs.name()); + + if (jobs != null) { + jobsNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(jobs)); + } + return JSONRepresentation(root); + } + + @GET + @Path("{jobName}") + public Response getJob(@PathParam("clusterId") String clusterId, + @PathParam("workflowName") String workflowName, @PathParam("jobName") String jobName) { + TaskDriver driver = getTaskDriver(clusterId); + Map<String, ZNRecord> jobMap = new HashMap<>(); + + + JobConfig jobConfig = driver.getJobConfig(jobName); + if (jobConfig != null) { + jobMap.put(JobProperties.JobConfig.name(), jobConfig.getRecord()); + } else { + return badRequest(String.format("Job config for %s does not exists", jobName)); + } + + JobContext jobContext = + driver.getJobContext(jobName); + jobMap.put(JobProperties.JobContext.name(), null); + + if (jobContext != null) { + jobMap.put(JobProperties.JobContext.name(), jobContext.getRecord()); + } + + return JSONRepresentation(jobMap); + } + + @PUT + @Path("{jobName}") + public Response addJob(@PathParam("clusterId") String clusterId, + @PathParam("workflowName") String workflowName, @PathParam("jobName") String jobName, + String content) { + ZNRecord record; + TaskDriver driver = getTaskDriver(clusterId); + + try { + record = toZNRecord(content); + JobConfig.Builder jobConfig = JobAccessor.getJobConfig(record); + driver.enqueueJob(workflowName, jobName, jobConfig); + } catch (HelixException e) { + return badRequest( + String.format("Failed to enqueue job %s for reason : %s", jobName, e.getMessage())); + } catch (IOException e) { + return badRequest(String.format("Invalid input for Job Config of Job : %s", jobName)); + } + + return OK(); + } + + @DELETE + @Path("{jobName}") + public Response deleteJob(@PathParam("clusterId") String clusterId, + @PathParam("workflowName") String workflowName, @PathParam("jobName") String jobName) { + TaskDriver driver = getTaskDriver(clusterId); + + try { + driver.deleteJob(workflowName, jobName); + } catch (Exception e) { + return badRequest(e.getMessage()); + } + + return OK(); + } + + @GET + @Path("{jobName}/configs") + public Response getJobConfig(@PathParam("clusterId") String clusterId, + @PathParam("workflowName") String workflowName, @PathParam("jobName") String jobName) { + TaskDriver driver = getTaskDriver(clusterId); + + JobConfig jobConfig = driver.getJobConfig(jobName); + if (jobConfig != null) { + return JSONRepresentation(jobConfig.getRecord()); + } + return badRequest("Job config for " + jobName + " does not exists"); + } + + @GET + @Path("{jobName}/context") + public Response getJobContext(@PathParam("clusterId") String clusterId, + @PathParam("workflowName") String workflowName, @PathParam("jobName") String jobName) { + TaskDriver driver = getTaskDriver(clusterId); + + JobContext jobContext = + driver.getJobContext(jobName); + if (jobContext != null) { + return JSONRepresentation(jobContext.getRecord()); + } + return badRequest("Job context for " + jobName + " does not exists"); + } + + protected static JobConfig.Builder getJobConfig(Map<String, String> cfgMap) { + return new JobConfig.Builder().fromMap(cfgMap); + } + + protected static JobConfig.Builder getJobConfig(ZNRecord record) { + JobConfig.Builder jobConfig = new JobConfig.Builder().fromMap(record.getSimpleFields()); + jobConfig.addTaskConfigMap(getTaskConfigMap(record.getMapFields())); + + return jobConfig; + } + + private static Map<String, TaskConfig> getTaskConfigMap( + Map<String, Map<String, String>> taskConfigs) { + Map<String, TaskConfig> taskConfigsMap = new HashMap<>(); + if (taskConfigs == null || taskConfigs.isEmpty()) { + return Collections.emptyMap(); + } + + for (Map<String, String> taskConfigMap : taskConfigs.values()) { + if (!taskConfigMap.containsKey(JobProperties.TASK_COMMAND.name())) { + continue; + } + + TaskConfig taskConfig = + new TaskConfig(taskConfigMap.get(JobProperties.TASK_COMMAND.name()), taskConfigMap); + taskConfigsMap.put(taskConfig.getId(), taskConfig); + } + + return taskConfigsMap; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/MetadataAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/MetadataAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/MetadataAccessor.java new file mode 100644 index 0000000..5757760 --- /dev/null +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/MetadataAccessor.java @@ -0,0 +1,45 @@ +package org.apache.helix.rest.server.resources.helix; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.core.Response; +import org.apache.helix.rest.common.ContextPropertyKeys; +import org.apache.helix.rest.common.HelixRestNamespace; +import org.apache.helix.rest.common.HelixRestUtils; +import org.apache.helix.rest.server.resources.AbstractResource; + +@Path("") +public class MetadataAccessor extends AbstractResource { + @GET + public Response getMetadata() { + if (HelixRestUtils.isDefaultServlet(_servletRequest.getServletPath())) { + // To keep API endpoints to behave the same, if user call /admin/v2/ , + // we will return NotFound + return notFound(); + } + // This will be the root of all namespaced servlets, and returns + // servlet namespace information + HelixRestNamespace namespace = (HelixRestNamespace) _application.getProperties().get( + ContextPropertyKeys.METADATA.name()); + return JSONRepresentation(namespace.getRestInfo()); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java new file mode 100644 index 0000000..04d3536 --- /dev/null +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java @@ -0,0 +1,278 @@ +package org.apache.helix.rest.server.resources.helix; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Response; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixException; +import org.apache.helix.PropertyPathBuilder; +import org.apache.helix.ZNRecord; +import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.ResourceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.codehaus.jackson.node.ArrayNode; +import org.codehaus.jackson.node.JsonNodeFactory; +import org.codehaus.jackson.node.ObjectNode; + +@Path("/clusters/{clusterId}/resources") +public class ResourceAccessor extends AbstractHelixResource { + private final static Logger _logger = LoggerFactory.getLogger(ResourceAccessor.class); + public enum ResourceProperties { + idealState, + idealStates, + externalView, + externalViews, + resourceConfig, + } + + @GET + public Response getResources(@PathParam("clusterId") String clusterId) { + ObjectNode root = JsonNodeFactory.instance.objectNode(); + root.put(Properties.id.name(), JsonNodeFactory.instance.textNode(clusterId)); + + ZkClient zkClient = getZkClient(); + + ArrayNode idealStatesNode = root.putArray(ResourceProperties.idealStates.name()); + ArrayNode externalViewsNode = root.putArray(ResourceProperties.externalViews.name()); + + List<String> idealStates = zkClient.getChildren(PropertyPathBuilder.idealState(clusterId)); + List<String> externalViews = zkClient.getChildren(PropertyPathBuilder.externalView(clusterId)); + + if (idealStates != null) { + idealStatesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(idealStates)); + } else { + return notFound(); + } + + if (externalViews != null) { + externalViewsNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(externalViews)); + } + + return JSONRepresentation(root); + } + + @GET + @Path("{resourceName}") + public Response getResource(@PathParam("clusterId") String clusterId, + @PathParam("resourceName") String resourceName) throws IOException { + ConfigAccessor accessor = getConfigAccessor(); + HelixAdmin admin = getHelixAdmin(); + + ResourceConfig resourceConfig = accessor.getResourceConfig(clusterId, resourceName); + IdealState idealState = admin.getResourceIdealState(clusterId, resourceName); + ExternalView externalView = admin.getResourceExternalView(clusterId, resourceName); + + Map<String, ZNRecord> resourceMap = new HashMap<>(); + if (idealState != null) { + resourceMap.put(ResourceProperties.idealState.name(), idealState.getRecord()); + } else { + return notFound(); + } + + resourceMap.put(ResourceProperties.resourceConfig.name(), null); + resourceMap.put(ResourceProperties.externalView.name(), null); + + if (resourceConfig != null) { + resourceMap.put(ResourceProperties.resourceConfig.name(), resourceConfig.getRecord()); + } + + if (externalView != null) { + resourceMap.put(ResourceProperties.externalView.name(), externalView.getRecord()); + } + + return JSONRepresentation(resourceMap); + } + + @PUT + @Path("{resourceName}") + public Response addResource(@PathParam("clusterId") String clusterId, + @PathParam("resourceName") String resourceName, + @DefaultValue("-1") @QueryParam("numPartitions") int numPartitions, + @DefaultValue("") @QueryParam("stateModelRef") String stateModelRef, + @DefaultValue("SEMI_AUTO") @QueryParam("rebalancerMode") String rebalancerMode, + @DefaultValue("DEFAULT") @QueryParam("rebalanceStrategy") String rebalanceStrategy, + @DefaultValue("0") @QueryParam("bucketSize") int bucketSize, + @DefaultValue("-1") @QueryParam("maxPartitionsPerInstance") int maxPartitionsPerInstance, + String content) { + + HelixAdmin admin = getHelixAdmin(); + + try { + if (content.length() != 0) { + ZNRecord record; + try { + record = toZNRecord(content); + } catch (IOException e) { + _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e); + return badRequest("Input is not a vaild ZNRecord!"); + } + + if (record.getSimpleFields() != null) { + admin.addResource(clusterId, resourceName, new IdealState(record)); + } + } else { + admin.addResource(clusterId, resourceName, numPartitions, stateModelRef, rebalancerMode, + rebalanceStrategy, bucketSize, maxPartitionsPerInstance); + } + } catch (Exception e) { + _logger.error("Error in adding a resource: " + resourceName, e); + return serverError(e); + } + + return OK(); + } + + @POST + @Path("{resourceName}") + public Response updateResource(@PathParam("clusterId") String clusterId, + @PathParam("resourceName") String resourceName, @QueryParam("command") String command, + @DefaultValue("-1") @QueryParam("replicas") int replicas, + @DefaultValue("") @QueryParam("keyPrefix") String keyPrefix, + @DefaultValue("") @QueryParam("group") String group){ + Command cmd; + try { + cmd = Command.valueOf(command); + } catch (Exception e) { + return badRequest("Invalid command : " + command); + } + + HelixAdmin admin = getHelixAdmin(); + try { + switch (cmd) { + case enable: + admin.enableResource(clusterId, resourceName, true); + break; + case disable: + admin.enableResource(clusterId, resourceName, false); + break; + case rebalance: + if (replicas == -1) { + return badRequest("Number of replicas is needed for rebalancing!"); + } + keyPrefix = keyPrefix.length() == 0 ? resourceName : keyPrefix; + admin.rebalance(clusterId, resourceName, replicas, keyPrefix, group); + break; + default: + _logger.error("Unsupported command :" + command); + return badRequest("Unsupported command :" + command); + } + } catch (Exception e) { + _logger.error("Failed in updating resource : " + resourceName, e); + return badRequest(e.getMessage()); + } + return OK(); + } + + @DELETE + @Path("{resourceName}") + public Response deleteResource(@PathParam("clusterId") String clusterId, + @PathParam("resourceName") String resourceName) { + HelixAdmin admin = getHelixAdmin(); + try { + admin.dropResource(clusterId, resourceName); + } catch (Exception e) { + _logger.error("Error in deleting a resource: " + resourceName, e); + return serverError(); + } + return OK(); + } + + @GET + @Path("{resourceName}/configs") + public Response getResourceConfig(@PathParam("clusterId") String clusterId, + @PathParam("resourceName") String resourceName) { + ConfigAccessor accessor = getConfigAccessor(); + ResourceConfig resourceConfig = accessor.getResourceConfig(clusterId, resourceName); + if (resourceConfig != null) { + return JSONRepresentation(resourceConfig.getRecord()); + } + + return notFound(); + } + + @POST + @Path("{resourceName}/configs") + public Response updateResourceConfig(@PathParam("clusterId") String clusterId, + @PathParam("resourceName") String resourceName, String content) { + ZNRecord record; + try { + record = toZNRecord(content); + } catch (IOException e) { + _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e); + return badRequest("Input is not a vaild ZNRecord!"); + } + ResourceConfig resourceConfig = new ResourceConfig(record); + ConfigAccessor configAccessor = getConfigAccessor(); + try { + configAccessor.updateResourceConfig(clusterId, resourceName, resourceConfig); + } catch (HelixException ex) { + return notFound(ex.getMessage()); + } catch (Exception ex) { + _logger.error( + "Failed to update cluster config, cluster " + clusterId + " new config: " + content + + ", Exception: " + ex); + return serverError(ex); + } + return OK(); + } + + @GET + @Path("{resourceName}/idealState") + public Response getResourceIdealState(@PathParam("clusterId") String clusterId, + @PathParam("resourceName") String resourceName) { + HelixAdmin admin = getHelixAdmin(); + IdealState idealState = admin.getResourceIdealState(clusterId, resourceName); + if (idealState != null) { + return JSONRepresentation(idealState.getRecord()); + } + + return notFound(); + } + + @GET + @Path("{resourceName}/externalView") + public Response getResourceExternalView(@PathParam("clusterId") String clusterId, + @PathParam("resourceName") String resourceName) { + HelixAdmin admin = getHelixAdmin(); + ExternalView externalView = admin.getResourceExternalView(clusterId, resourceName); + if (externalView != null) { + return JSONRepresentation(externalView.getRecord()); + } + + return notFound(); + } + +} http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java new file mode 100644 index 0000000..ebd04fd --- /dev/null +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java @@ -0,0 +1,325 @@ +package org.apache.helix.rest.server.resources.helix; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Response; + +import org.apache.helix.HelixException; +import org.apache.helix.ZNRecord; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.JobDag; +import org.apache.helix.task.JobQueue; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.Workflow; +import org.apache.helix.task.WorkflowConfig; +import org.apache.helix.task.WorkflowContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.type.TypeFactory; +import org.codehaus.jackson.node.ArrayNode; +import org.codehaus.jackson.node.JsonNodeFactory; +import org.codehaus.jackson.node.ObjectNode; +import org.codehaus.jackson.node.TextNode; + +@Path("/clusters/{clusterId}/workflows") +public class WorkflowAccessor extends AbstractHelixResource { + private static Logger _logger = LoggerFactory.getLogger(WorkflowAccessor.class.getName()); + + public enum WorkflowProperties { + Workflows, + WorkflowConfig, + WorkflowContext, + Jobs, + ParentJobs + } + + public enum TaskCommand { + stop, + resume, + clean + } + + @GET + public Response getWorkflows(@PathParam("clusterId") String clusterId) { + TaskDriver taskDriver = getTaskDriver(clusterId); + Map<String, WorkflowConfig> workflowConfigMap = taskDriver.getWorkflows(); + Map<String, List<String>> dataMap = new HashMap<>(); + dataMap.put(WorkflowProperties.Workflows.name(), new ArrayList<>(workflowConfigMap.keySet())); + + return JSONRepresentation(dataMap); + } + + @GET + @Path("{workflowId}") + public Response getWorkflow(@PathParam("clusterId") String clusterId, + @PathParam("workflowId") String workflowId) { + TaskDriver taskDriver = getTaskDriver(clusterId); + WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(workflowId); + WorkflowContext workflowContext = taskDriver.getWorkflowContext(workflowId); + + ObjectNode root = JsonNodeFactory.instance.objectNode(); + TextNode id = JsonNodeFactory.instance.textNode(workflowId); + root.put(Properties.id.name(), id); + + ObjectNode workflowConfigNode = JsonNodeFactory.instance.objectNode(); + ObjectNode workflowContextNode = JsonNodeFactory.instance.objectNode(); + + if (workflowConfig != null) { + getWorkflowConfigNode(workflowConfigNode, workflowConfig.getRecord()); + } + + if (workflowContext != null) { + getWorkflowContextNode(workflowContextNode, workflowContext.getRecord()); + } + + root.put(WorkflowProperties.WorkflowConfig.name(), workflowConfigNode); + root.put(WorkflowProperties.WorkflowContext.name(), workflowContextNode); + + JobDag jobDag = workflowConfig.getJobDag(); + ArrayNode jobs = OBJECT_MAPPER.valueToTree(jobDag.getAllNodes()); + ObjectNode parentJobs = OBJECT_MAPPER.valueToTree(jobDag.getParentsToChildren()); + root.put(WorkflowProperties.Jobs.name(), jobs); + root.put(WorkflowProperties.ParentJobs.name(), parentJobs); + + return JSONRepresentation(root); + } + + @PUT + @Path("{workflowId}") + public Response createWorkflow(@PathParam("clusterId") String clusterId, + @PathParam("workflowId") String workflowId, String content) { + TaskDriver driver = getTaskDriver(clusterId); + Map<String, String> cfgMap; + try { + JsonNode root = OBJECT_MAPPER.readTree(content); + cfgMap = OBJECT_MAPPER + .readValue(root.get(WorkflowProperties.WorkflowConfig.name()).toString(), + TypeFactory.defaultInstance() + .constructMapType(HashMap.class, String.class, String.class)); + + WorkflowConfig workflowConfig = WorkflowConfig.Builder.fromMap(cfgMap).build(); + + // Since JobQueue can keep adding jobs, Helix create JobQueue will ignore the jobs + if (workflowConfig.isJobQueue()) { + driver.start(new JobQueue.Builder(workflowId).setWorkflowConfig(workflowConfig).build()); + return OK(); + } + + Workflow.Builder workflow = new Workflow.Builder(workflowId); + + if (root.get(WorkflowProperties.Jobs.name()) != null) { + Map<String, JobConfig.Builder> jobConfigs = + getJobConfigs((ArrayNode) root.get(WorkflowProperties.Jobs.name())); + for (Map.Entry<String, JobConfig.Builder> job : jobConfigs.entrySet()) { + workflow.addJob(job.getKey(), job.getValue()); + } + } + + if (root.get(WorkflowProperties.ParentJobs.name()) != null) { + Map<String, List<String>> parentJobs = OBJECT_MAPPER + .readValue(root.get(WorkflowProperties.ParentJobs.name()).toString(), + TypeFactory.defaultInstance() + .constructMapType(HashMap.class, String.class, List.class)); + for (Map.Entry<String, List<String>> entry : parentJobs.entrySet()) { + String parentJob = entry.getKey(); + for (String childJob : entry.getValue()) { + workflow.addParentChildDependency(parentJob, childJob); + } + } + } + + driver.start(workflow.build()); + } catch (IOException e) { + return badRequest(String + .format("Invalid input of Workflow %s for reason : %s", workflowId, e.getMessage())); + } catch (HelixException e) { + return badRequest(String + .format("Failed to create workflow %s for reason : %s", workflowId, e.getMessage())); + } + return OK(); + } + + @DELETE + @Path("{workflowId}") + public Response deleteWorkflow(@PathParam("clusterId") String clusterId, + @PathParam("workflowId") String workflowId) { + TaskDriver driver = getTaskDriver(clusterId); + try { + driver.delete(workflowId); + } catch (HelixException e) { + return badRequest(String + .format("Failed to delete workflow %s for reason : %s", workflowId, e.getMessage())); + } + return OK(); + } + + @POST + @Path("{workflowId}") + public Response updateWorkflow(@PathParam("clusterId") String clusterId, + @PathParam("workflowId") String workflowId, @QueryParam("command") String command) { + TaskDriver driver = getTaskDriver(clusterId); + + try { + TaskCommand cmd = TaskCommand.valueOf(command); + switch (cmd) { + case stop: + driver.stop(workflowId); + break; + case resume: + driver.resume(workflowId); + break; + case clean: + driver.cleanupQueue(workflowId); + break; + default: + return badRequest(String.format("Invalid command : %s", command)); + } + } catch (HelixException e) { + return badRequest( + String.format("Failed to execute operation %s for reason : %s", command, e.getMessage())); + } catch (Exception e) { + return serverError(e); + } + + return OK(); + } + + @GET + @Path("{workflowId}/configs") + public Response getWorkflowConfig(@PathParam("clusterId") String clusterId, + @PathParam("workflowId") String workflowId) { + TaskDriver taskDriver = getTaskDriver(clusterId); + WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(workflowId); + ObjectNode workflowConfigNode = JsonNodeFactory.instance.objectNode(); + if (workflowConfig != null) { + getWorkflowConfigNode(workflowConfigNode, workflowConfig.getRecord()); + } + + return JSONRepresentation(workflowConfigNode); + } + + @POST + @Path("{workflowId}/configs") + public Response updateWorkflowConfig(@PathParam("clusterId") String clusterId, + @PathParam("workflowId") String workflowId, String content) { + ZNRecord record; + TaskDriver driver = getTaskDriver(clusterId); + + try { + record = toZNRecord(content); + + WorkflowConfig workflowConfig = driver.getWorkflowConfig(workflowId); + if (workflowConfig == null) { + return badRequest( + String.format("WorkflowConfig for workflow %s does not exists!", workflowId)); + } + + workflowConfig.getRecord().update(record); + driver.updateWorkflow(workflowId, workflowConfig); + } catch (HelixException e) { + return badRequest( + String.format("Failed to update WorkflowConfig for workflow %s", workflowId)); + } catch (Exception e) { + return badRequest(String.format("Invalid WorkflowConfig for workflow %s", workflowId)); + } + + return OK(); + } + + @GET + @Path("{workflowId}/context") + public Response getWorkflowContext(@PathParam("clusterId") String clusterId, + @PathParam("workflowId") String workflowId) { + TaskDriver taskDriver = getTaskDriver(clusterId); + WorkflowContext workflowContext = taskDriver.getWorkflowContext(workflowId); + ObjectNode workflowContextNode = JsonNodeFactory.instance.objectNode(); + if (workflowContext != null) { + getWorkflowContextNode(workflowContextNode, workflowContext.getRecord()); + } + + return JSONRepresentation(workflowContextNode); + } + + private void getWorkflowConfigNode(ObjectNode workflowConfigNode, ZNRecord record) { + for (Map.Entry<String, String> entry : record.getSimpleFields().entrySet()) { + if (!entry.getKey().equals(WorkflowConfig.WorkflowConfigProperty.Dag)) { + workflowConfigNode.put(entry.getKey(), JsonNodeFactory.instance.textNode(entry.getValue())); + } + } + } + + private void getWorkflowContextNode(ObjectNode workflowContextNode, ZNRecord record) { + if (record.getMapFields() != null) { + for (String fieldName : record.getMapFields().keySet()) { + JsonNode node = OBJECT_MAPPER.valueToTree(record.getMapField(fieldName)); + workflowContextNode.put(fieldName, node); + } + } + + if (record.getSimpleFields() != null) { + for (Map.Entry<String, String> entry : record.getSimpleFields().entrySet()) { + workflowContextNode + .put(entry.getKey(), JsonNodeFactory.instance.textNode(entry.getValue())); + } + } + } + + private Map<String, JobConfig.Builder> getJobConfigs(ArrayNode root) + throws HelixException, IOException { + Map<String, JobConfig.Builder> jobConfigsMap = new HashMap<>(); + for (Iterator<JsonNode> it = root.getElements(); it.hasNext(); ) { + JsonNode job = it.next(); + ZNRecord record = null; + + try { + record = toZNRecord(job.toString()); + } catch (IOException e) { + // Ignore the parse since it could be just simple fields + } + + if (record == null || record.getSimpleFields().isEmpty()) { + Map<String, String> cfgMap = OBJECT_MAPPER.readValue(job.toString(), + TypeFactory.defaultInstance() + .constructMapType(HashMap.class, String.class, String.class)); + jobConfigsMap + .put(job.get(Properties.id.name()).getTextValue(), JobAccessor.getJobConfig(cfgMap)); + } else { + jobConfigsMap + .put(job.get(Properties.id.name()).getTextValue(), JobAccessor.getJobConfig(record)); + } + } + + return jobConfigsMap; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/metadata/NamespacesAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/metadata/NamespacesAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/metadata/NamespacesAccessor.java new file mode 100644 index 0000000..dd5c5f9 --- /dev/null +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/metadata/NamespacesAccessor.java @@ -0,0 +1,47 @@ +package org.apache.helix.rest.server.resources.metadata; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.core.Response; +import org.apache.helix.rest.common.ContextPropertyKeys; +import org.apache.helix.rest.common.HelixRestNamespace; +import org.apache.helix.rest.server.resources.AbstractResource; + + +@Path("/namespaces") +public class NamespacesAccessor extends AbstractResource { + @GET + public Response getHelixRestNamespaces() { + @SuppressWarnings("unchecked") + List<HelixRestNamespace> allNamespaces = + (List<HelixRestNamespace>) _application.getProperties() + .get(ContextPropertyKeys.ALL_NAMESPACES.name()); + List<Map<String, String>> ret = new ArrayList<>(); + for (HelixRestNamespace namespace : allNamespaces) { + ret.add(namespace.getRestInfo()); + } + return JSONRepresentation(ret); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java index 94c5f63..763d3e2 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java @@ -42,7 +42,7 @@ import org.apache.helix.model.MaintenanceSignal; import org.apache.helix.rest.common.HelixRestNamespace; import org.apache.helix.rest.server.auditlog.AuditLog; import org.apache.helix.rest.server.resources.AbstractResource.Command; -import org.apache.helix.rest.server.resources.ClusterAccessor; +import org.apache.helix.rest.server.resources.helix.ClusterAccessor; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; import org.testng.Assert; http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/test/java/org/apache/helix/rest/server/TestHelixRestServer.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestHelixRestServer.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestHelixRestServer.java index e213bd3..00ffdba 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestHelixRestServer.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestHelixRestServer.java @@ -57,8 +57,7 @@ public class TestHelixRestServer extends AbstractTestClass { try { List<HelixRestNamespace> invalidManifest3 = new ArrayList<>(); invalidManifest3.add( - new HelixRestNamespace("DuplicatedName", HelixRestNamespace.HelixMetadataStoreType.ZOOKEEPER, ZK_ADDR, - true)); + new HelixRestNamespace("DuplicatedName", HelixRestNamespace.HelixMetadataStoreType.ZOOKEEPER, ZK_ADDR, true)); invalidManifest3.add( new HelixRestNamespace("DuplicatedName", HelixRestNamespace.HelixMetadataStoreType.ZOOKEEPER, ZK_ADDR, false)); @@ -77,7 +76,7 @@ public class TestHelixRestServer extends AbstractTestClass { new HelixRestNamespace("test4-2", HelixRestNamespace.HelixMetadataStoreType.ZOOKEEPER, ZK_ADDR, true)); HelixRestServer svr = new HelixRestServer(invalidManifest4, 10250, "/", Collections.<AuditLogger>emptyList()); Assert.assertFalse(true, "InvalidManifest4 test failed"); - } catch (IllegalArgumentException e) { + } catch (IllegalStateException e) { // OK } } http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstanceAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstanceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstanceAccessor.java index 947ba49..763f95b 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstanceAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstanceAccessor.java @@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; import java.util.List; import java.util.Set; import javax.ws.rs.client.Entity; @@ -33,10 +32,9 @@ import javax.ws.rs.core.Response; import org.apache.helix.HelixException; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; -import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.InstanceConfig; import org.apache.helix.rest.server.resources.AbstractResource; -import org.apache.helix.rest.server.resources.InstanceAccessor; +import org.apache.helix.rest.server.resources.helix.InstanceAccessor; import org.codehaus.jackson.JsonNode; import org.testng.Assert; import org.testng.annotations.Test; http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java index eca6836..682039d 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java @@ -27,8 +27,8 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import org.apache.helix.TestHelper; -import org.apache.helix.rest.server.resources.JobAccessor; -import org.apache.helix.rest.server.resources.WorkflowAccessor; +import org.apache.helix.rest.server.resources.helix.JobAccessor; +import org.apache.helix.rest.server.resources.helix.WorkflowAccessor; import org.apache.helix.task.JobConfig; import org.apache.helix.task.JobQueue; import org.apache.helix.task.TaskDriver; http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/test/java/org/apache/helix/rest/server/TestNamespacedAPIAccess.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestNamespacedAPIAccess.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestNamespacedAPIAccess.java index e6f036d..c77f939 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestNamespacedAPIAccess.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestNamespacedAPIAccess.java @@ -21,15 +21,22 @@ package org.apache.helix.rest.server; import com.google.common.collect.ImmutableMap; import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import javax.ws.rs.client.Entity; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import org.apache.helix.PropertyKey; import org.apache.helix.rest.common.HelixRestNamespace; +import org.codehaus.jackson.map.ObjectMapper; import org.testng.Assert; import org.testng.annotations.Test; public class TestNamespacedAPIAccess extends AbstractTestClass { + ObjectMapper _mapper = new ObjectMapper(); + @Test public void testDefaultNamespaceCompatibility() { String testClusterName1 = "testClusterForDefaultNamespaceCompatibility1"; @@ -81,4 +88,52 @@ public class TestNamespacedAPIAccess extends AbstractTestClass { get(String.format("/clusters/%s", testClusterName), Response.Status.OK.getStatusCode(), false); } + @Test + public void testNamespaceServer() throws IOException { + // Default endpoints should not have any namespace information returned + get("/", Response.Status.NOT_FOUND.getStatusCode(), false); + + // Get invalid namespace should return not found + get("/namespaces/invalid-namespace", Response.Status.NOT_FOUND.getStatusCode(), false); + + // list namespace should return a list of all namespaces + String body = get("/namespaces", Response.Status.OK.getStatusCode(), true); + List<Map<String, String>> namespaceMaps = _mapper + .readValue(body, _mapper.getTypeFactory().constructCollectionType(List.class, Map.class)); + Assert.assertEquals(namespaceMaps.size(), 2); + + Set<String> expectedNamespaceNames = new HashSet<>(); + expectedNamespaceNames.add(HelixRestNamespace.DEFAULT_NAMESPACE_NAME); + expectedNamespaceNames.add(TEST_NAMESPACE); + + for (Map<String, String> namespaceMap : namespaceMaps) { + String name = namespaceMap.get(HelixRestNamespace.HelixRestNamespaceProperty.NAME.name()); + boolean isDefault = Boolean.parseBoolean( + namespaceMap.get(HelixRestNamespace.HelixRestNamespaceProperty.IS_DEFAULT.name())); + switch (name) { + case HelixRestNamespace.DEFAULT_NAMESPACE_NAME: + Assert.assertTrue(isDefault); + break; + case TEST_NAMESPACE: + Assert.assertFalse(isDefault); + break; + default: + Assert.assertFalse(true, "Namespace " + name + " is not expected"); + break; + } + expectedNamespaceNames.remove(name); + } + Assert.assertTrue(expectedNamespaceNames.isEmpty()); + + // Accessing root of namespaced API endpoint shall return information of that namespace + body = get(String.format("/namespaces/%s", HelixRestNamespace.DEFAULT_NAMESPACE_NAME), + Response.Status.OK.getStatusCode(), true); + Map<String, String> namespace = _mapper.readValue(body, + _mapper.getTypeFactory().constructMapType(Map.class, String.class, String.class)); + Assert.assertEquals(namespace.get(HelixRestNamespace.HelixRestNamespaceProperty.NAME.name()), + HelixRestNamespace.DEFAULT_NAMESPACE_NAME); + Assert.assertTrue(Boolean.parseBoolean( + namespace.get(HelixRestNamespace.HelixRestNamespaceProperty.IS_DEFAULT.name()))); + } + } http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java index 8e116e9..dea4e06 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java @@ -30,7 +30,7 @@ import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.builder.FullAutoModeISBuilder; -import org.apache.helix.rest.server.resources.ResourceAccessor; +import org.apache.helix.rest.server.resources.helix.ResourceAccessor; import org.codehaus.jackson.JsonNode; import org.testng.Assert; import org.testng.annotations.Test; http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java index c41101f..ad8894a 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java @@ -8,7 +8,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import org.apache.helix.TestHelper; -import org.apache.helix.rest.server.resources.WorkflowAccessor; +import org.apache.helix.rest.server.resources.helix.WorkflowAccessor; import org.apache.helix.task.JobQueue; import org.apache.helix.task.TargetState; import org.apache.helix.task.TaskDriver;
