HELIX-661: implement GET namespace(s)
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/40710b27 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/40710b27 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/40710b27 Branch: refs/heads/master Commit: 40710b2713ea0e4f1d4a936396c98ef01f8e2b68 Parents: 4ff98fb Author: hrzhang <[email protected]> Authored: Wed Dec 13 18:57:40 2017 -0800 Committer: Junkai Xue <[email protected]> Committed: Wed Jan 24 18:32:39 2018 -0800 ---------------------------------------------------------------------- .../helix/rest/common/ContextPropertyKeys.java | 3 +- .../helix/rest/common/HelixRestNamespace.java | 17 +- .../helix/rest/common/HelixRestUtils.java | 15 +- .../apache/helix/rest/common/ServletType.java | 56 ++ .../helix/rest/server/HelixRestServer.java | 78 +-- .../rest/server/resources/AbstractResource.java | 40 -- .../rest/server/resources/ClusterAccessor.java | 400 -------------- .../rest/server/resources/InstanceAccessor.java | 545 ------------------- .../rest/server/resources/JobAccessor.java | 200 ------- .../rest/server/resources/ResourceAccessor.java | 278 ---------- .../server/resources/UIResourceAccessor.java | 62 --- .../rest/server/resources/WorkflowAccessor.java | 325 ----------- .../resources/helix/AbstractHelixResource.java | 79 +++ .../server/resources/helix/ClusterAccessor.java | 400 ++++++++++++++ .../resources/helix/InstanceAccessor.java | 545 +++++++++++++++++++ .../server/resources/helix/JobAccessor.java | 200 +++++++ .../resources/helix/MetadataAccessor.java | 45 ++ .../resources/helix/ResourceAccessor.java | 278 ++++++++++ .../resources/helix/WorkflowAccessor.java | 325 +++++++++++ .../resources/metadata/NamespacesAccessor.java | 47 ++ .../helix/rest/server/TestClusterAccessor.java | 2 +- .../helix/rest/server/TestHelixRestServer.java | 5 +- .../helix/rest/server/TestInstanceAccessor.java | 4 +- .../helix/rest/server/TestJobAccessor.java | 4 +- .../rest/server/TestNamespacedAPIAccess.java | 55 ++ .../helix/rest/server/TestResourceAccessor.java | 2 +- .../helix/rest/server/TestWorkflowAccessor.java | 2 +- 27 files changed, 2099 insertions(+), 1913 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/common/ContextPropertyKeys.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/ContextPropertyKeys.java b/helix-rest/src/main/java/org/apache/helix/rest/common/ContextPropertyKeys.java index ce59abc..ffe1283 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/common/ContextPropertyKeys.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/common/ContextPropertyKeys.java @@ -21,5 +21,6 @@ package org.apache.helix.rest.common; public enum ContextPropertyKeys { SERVER_CONTEXT, - NAMESPACE + METADATA, + ALL_NAMESPACES } http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java index 5d1c8f3..a2fb52c 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java @@ -19,10 +19,15 @@ package org.apache.helix.rest.common; * under the License. */ +import java.util.HashMap; +import java.util.Map; + + public class HelixRestNamespace { public enum HelixMetadataStoreType { - ZOOKEEPER + ZOOKEEPER, + NO_METADATA_STORE } public enum HelixRestNamespaceProperty { @@ -78,7 +83,8 @@ public class HelixRestNamespace { if (_name == null || _name.length() == 0) { throw new IllegalArgumentException("Name of namespace not provided"); } - if (_metadataStoreAddress == null || _metadataStoreAddress.isEmpty()) { + if (_metadataStoreType != HelixMetadataStoreType.NO_METADATA_STORE && (_metadataStoreAddress == null + || _metadataStoreAddress.isEmpty())) { throw new IllegalArgumentException( String.format("Metadata store address \"%s\" is not valid for namespace %s", _metadataStoreAddress, _name)); } @@ -96,4 +102,11 @@ public class HelixRestNamespace { return _metadataStoreAddress; } + public Map<String, String> getRestInfo() { + // In REST APIs we currently don't expose metadata store information + Map<String, String> ret = new HashMap<>(); + ret.put(HelixRestNamespaceProperty.NAME.name(), _name); + ret.put(HelixRestNamespaceProperty.IS_DEFAULT.name(), String.valueOf(_isDefault)); + return ret; + } } http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestUtils.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestUtils.java b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestUtils.java index 6c4a3df..39491ab 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestUtils.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestUtils.java @@ -21,17 +21,6 @@ package org.apache.helix.rest.common; public class HelixRestUtils { /** - * Generate servlet path spec for a given namespace. - * @param namespace Name of the namespace - * @param isDefaultServlet mark this as true to get path spec for the special servlet for default namespace - * @return servlet path spec - */ - public static String makeServletPathSpec(String namespace, boolean isDefaultServlet) { - return isDefaultServlet ? HelixRestNamespace.DEFAULT_NAMESPACE_PATH_SPEC - : String.format("/namespaces/%s/*", namespace); - } - - /** * Extract namespace information from servlet path. There are 3 cases: * 1. /namespaces/namespaceName -> return namespaceName * 2. /namespaces -> return "" @@ -40,7 +29,7 @@ public class HelixRestUtils { * @return Namespace name retrieved from servlet spec. */ public static String getNamespaceFromServletPath(String servletPath) { - if (isDefaultNamespaceServlet(servletPath)) { + if (isDefaultServlet(servletPath)) { return HelixRestNamespace.DEFAULT_NAMESPACE_NAME; } @@ -52,7 +41,7 @@ public class HelixRestUtils { } } - private static boolean isDefaultNamespaceServlet(String servletPath) { + public static boolean isDefaultServlet(String servletPath) { // Special servlet for default namespace has path spec "/*", so servletPath is empty return servletPath == null || servletPath.isEmpty(); } http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/common/ServletType.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/ServletType.java b/helix-rest/src/main/java/org/apache/helix/rest/common/ServletType.java new file mode 100644 index 0000000..bbff2d6 --- /dev/null +++ b/helix-rest/src/main/java/org/apache/helix/rest/common/ServletType.java @@ -0,0 +1,56 @@ +package org.apache.helix.rest.common; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.rest.server.resources.helix.AbstractHelixResource; +import org.apache.helix.rest.server.resources.metadata.NamespacesAccessor; + +public enum ServletType { + /** + * Servlet serving default API endpoints (/admin/v2/clusters/...) + */ + DEFAULT_SERVLET(HelixRestNamespace.DEFAULT_NAMESPACE_PATH_SPEC, + new String[] { AbstractHelixResource.class.getPackage().getName(), + NamespacesAccessor.class.getPackage().getName() + }), + + /** + * Servlet serving namespaced API endpoints (/admin/v2/namespaces/{namespaceName}) + */ + COMMON_SERVLET("/namespaces/%s/*", + new String[] { AbstractHelixResource.class.getPackage().getName(), + }); + + private final String _servletPathSpecTemplate; + private final String[] _servletPackageArray; + + ServletType(String servletPathSpecTemplate, String[] servletPackageArray) { + _servletPathSpecTemplate = servletPathSpecTemplate; + _servletPackageArray = servletPackageArray; + } + + public String getServletPathSpecTemplate() { + return _servletPathSpecTemplate; + } + + public String[] getServletPackageArray() { + return _servletPackageArray; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java index 3737308..e0c1c4e 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java @@ -27,11 +27,10 @@ import java.util.Map; import org.apache.helix.HelixException; import org.apache.helix.rest.common.ContextPropertyKeys; import org.apache.helix.rest.common.HelixRestNamespace; -import org.apache.helix.rest.common.HelixRestUtils; +import org.apache.helix.rest.common.ServletType; import org.apache.helix.rest.server.auditlog.AuditLogger; import org.apache.helix.rest.server.filters.AuditLogFilter; import org.apache.helix.rest.server.filters.CORSFilter; -import org.apache.helix.rest.server.resources.AbstractResource; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConnectionFactory; @@ -53,18 +52,13 @@ public class HelixRestServer { private int _port; private String _urlPrefix; private Server _server; + private List<HelixRestNamespace> _helixNamespaces; private ServletContextHandler _servletContextHandler; private List<AuditLogger> _auditLoggers; // Key is name of namespace, value of the resource config of that namespace private Map<String, ResourceConfig> _resourceConfigMap; - // In additional to regular servlets serving namespaced API endpoints, We have a default servlet - // serving un-namespaced API (/admin/v2/clusters/...) for default namespace as well. We use this - // literal as a key in _resourceConfigMap to keep records for default servlet. - // TODO: try to find a way to serve 2 sets of endpoints of default namespace in 1 servlet - private static final String DEFAULT_SERVLET_KEY = "DefaultServlet"; - public HelixRestServer(String zkAddr, int port, String urlPrefix) { this(zkAddr, port, urlPrefix, Collections.<AuditLogger>emptyList()); } @@ -77,7 +71,8 @@ public class HelixRestServer { init(namespaces, port, urlPrefix, auditLoggers); } - public HelixRestServer(List<HelixRestNamespace> namespaces, int port, String urlPrefix, List<AuditLogger> auditLoggers) { + public HelixRestServer(List<HelixRestNamespace> namespaces, int port, String urlPrefix, + List<AuditLogger> auditLoggers) { init(namespaces, port, urlPrefix, auditLoggers); } @@ -93,32 +88,16 @@ public class HelixRestServer { _auditLoggers = auditLoggers; _resourceConfigMap = new HashMap<>(); _servletContextHandler = new ServletContextHandler(_server, _urlPrefix); + _helixNamespaces = namespaces; // Initialize all namespaces try { - for (HelixRestNamespace namespace : namespaces) { + for (HelixRestNamespace namespace : _helixNamespaces) { LOG.info("Initializing namespace " + namespace.getName()); - if (_resourceConfigMap.containsKey(namespace.getName())) { - throw new IllegalArgumentException(String.format("Duplicated namespace name \"%s\"", namespace.getName())); - } - - // Create resource and context for namespaced servlet - _resourceConfigMap.put(namespace.getName(), - makeResourceConfig(namespace, AbstractResource.class.getPackage().getName())); - LOG.info("Initializing servlet for namespace " + namespace.getName()); - initServlet(_resourceConfigMap.get(namespace.getName()), - HelixRestUtils.makeServletPathSpec(namespace.getName(), false)); - - // Create special resource and context for default namespace servlet + prepareServlet(namespace, ServletType.COMMON_SERVLET); if (namespace.isDefault()) { - if (_resourceConfigMap.containsKey(DEFAULT_SERVLET_KEY)) { - throw new IllegalArgumentException("More than 1 default namespaces are provided"); - } - LOG.info("Creating special servlet for default namespace"); - _resourceConfigMap.put(DEFAULT_SERVLET_KEY, - makeResourceConfig(namespace, AbstractResource.class.getPackage().getName())); - initServlet(_resourceConfigMap.get(DEFAULT_SERVLET_KEY), - HelixRestUtils.makeServletPathSpec(namespace.getName(), true)); + LOG.info("Creating default servlet for default namespace"); + prepareServlet(namespace, ServletType.DEFAULT_SERVLET); } } } catch (Exception e) { @@ -135,12 +114,39 @@ public class HelixRestServer { })); } - private ResourceConfig makeResourceConfig(HelixRestNamespace ns, String... packages) { + private void prepareServlet(HelixRestNamespace namespace, ServletType type) { + String resourceConfigMapKey = getResourceConfigMapKey(type, namespace); + if (_resourceConfigMap.containsKey(resourceConfigMapKey)) { + throw new IllegalArgumentException( + String.format("Duplicated namespace name \"%s\"", namespace.getName())); + } + + // Prepare resource config + ResourceConfig config = getResourceConfig(namespace, type); + _resourceConfigMap.put(resourceConfigMapKey, config); + + // Initialize servlet + initServlet(config, String.format(type.getServletPathSpecTemplate(), namespace.getName())); + } + + private String getResourceConfigMapKey(ServletType type, HelixRestNamespace namespace) { + return String.format("%s_%s", type.name(), namespace.getName()); + } + + private ResourceConfig getResourceConfig(HelixRestNamespace namespace, ServletType type) { ResourceConfig cfg = new ResourceConfig(); - cfg.packages(packages) - .property(ContextPropertyKeys.SERVER_CONTEXT.name(), new ServerContext(ns.getMetadataStoreAddress())) - .register(new CORSFilter()) - .register(new AuditLogFilter(_auditLoggers)); + cfg.packages(type.getServletPackageArray()); + + cfg.property(ContextPropertyKeys.SERVER_CONTEXT.name(), + new ServerContext(namespace.getMetadataStoreAddress())); + if (type == ServletType.DEFAULT_SERVLET) { + cfg.property(ContextPropertyKeys.ALL_NAMESPACES.name(), _helixNamespaces); + } else { + cfg.property(ContextPropertyKeys.METADATA.name(), namespace); + } + + cfg.register(new CORSFilter()); + cfg.register(new AuditLogFilter(_auditLoggers)); return cfg; } @@ -186,7 +192,7 @@ public class HelixRestServer { for (Map.Entry<String, ResourceConfig> e : _resourceConfigMap.entrySet()) { ServerContext ctx = (ServerContext) e.getValue().getProperty(ContextPropertyKeys.SERVER_CONTEXT.name()); if (ctx == null) { - LOG.warn("Server context for servlet " + e.getKey() + " is null."); + LOG.info("Server context for servlet " + e.getKey() + " is null."); } else { LOG.info("Closing context for servlet " + e.getKey()); ctx.close(); http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java index a89ae5d..e3c565d 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java @@ -83,42 +83,6 @@ public class AbstractResource { protected HttpServletRequest _servletRequest; protected AuditLog.Builder _auditLogBuilder; - public ZkClient getZkClient() { - ServerContext serverContext = (ServerContext) _application.getProperties() - .get(ContextPropertyKeys.SERVER_CONTEXT.name()); - return serverContext.getZkClient(); - } - - public HelixAdmin getHelixAdmin() { - ServerContext serverContext = (ServerContext) _application.getProperties() - .get(ContextPropertyKeys.SERVER_CONTEXT.name()); - return serverContext.getHelixAdmin(); - } - - public ClusterSetup getClusterSetup() { - ServerContext serverContext = (ServerContext) _application.getProperties() - .get(ContextPropertyKeys.SERVER_CONTEXT.name()); - return serverContext.getClusterSetup(); - } - - public TaskDriver getTaskDriver(String clusterName) { - ServerContext serverContext = (ServerContext) _application.getProperties() - .get(ContextPropertyKeys.SERVER_CONTEXT.name()); - return serverContext.getTaskDriver(clusterName); - } - - public ConfigAccessor getConfigAccessor() { - ServerContext serverContext = (ServerContext) _application.getProperties() - .get(ContextPropertyKeys.SERVER_CONTEXT.name()); - return serverContext.getConfigAccessor(); - } - - public HelixDataAccessor getDataAccssor(String clusterName) { - ServerContext serverContext = (ServerContext) _application.getProperties() - .get(ContextPropertyKeys.SERVER_CONTEXT.name()); - return serverContext.getDataAccssor(clusterName); - } - protected void addExceptionToAuditLog(Exception ex) { if (_auditLogBuilder == null) { _auditLogBuilder = @@ -196,10 +160,6 @@ public class AbstractResource { return sw.toString(); } - protected static ZNRecord toZNRecord(String data) throws IOException { - return OBJECT_MAPPER.reader(ZNRecord.class).readValue(data); - } - protected Command getCommand(String commandStr) throws HelixException { if (commandStr == null) { throw new HelixException("Unknown command " + commandStr); http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/ClusterAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/ClusterAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/ClusterAccessor.java deleted file mode 100644 index 1c998b7..0000000 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/ClusterAccessor.java +++ /dev/null @@ -1,400 +0,0 @@ -package org.apache.helix.rest.server.resources; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import javax.ws.rs.DELETE; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.Response; -import org.apache.helix.ConfigAccessor; -import org.apache.helix.HelixAdmin; -import org.apache.helix.HelixDataAccessor; -import org.apache.helix.HelixException; -import org.apache.helix.PropertyKey; -import org.apache.helix.ZNRecord; -import org.apache.helix.manager.zk.ZKUtil; -import org.apache.helix.manager.zk.ZkClient; -import org.apache.helix.model.ClusterConfig; -import org.apache.helix.model.HelixConfigScope; -import org.apache.helix.model.LeaderHistory; -import org.apache.helix.model.LiveInstance; -import org.apache.helix.model.Message; -import org.apache.helix.model.StateModelDefinition; -import org.apache.helix.model.builder.HelixConfigScopeBuilder; -import org.apache.helix.tools.ClusterSetup; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Path("/clusters") -public class ClusterAccessor extends AbstractResource { - private static Logger _logger = LoggerFactory.getLogger(ClusterAccessor.class.getName()); - - public enum ClusterProperties { - controller, - instances, - liveInstances, - resources, - paused, - maintenance, - messages, - stateModelDefinitions, - clusters - } - - @GET - public Response getClusters() { - HelixAdmin helixAdmin = getHelixAdmin(); - List<String> clusters = helixAdmin.getClusters(); - - Map<String, List<String>> dataMap = new HashMap<>(); - dataMap.put(ClusterProperties.clusters.name(), clusters); - - return JSONRepresentation(dataMap); - } - - @GET - @Path("{clusterId}") - public Response getClusterInfo(@PathParam("clusterId") String clusterId) { - if (!isClusterExist(clusterId)) { - return notFound(); - } - - HelixDataAccessor dataAccessor = getDataAccssor(clusterId); - PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder(); - - Map<String, Object> clusterInfo = new HashMap<>(); - clusterInfo.put(Properties.id.name(), clusterId); - - LiveInstance controller = dataAccessor.getProperty(keyBuilder.controllerLeader()); - if (controller != null) { - clusterInfo.put(ClusterProperties.controller.name(), controller.getInstanceName()); - } else { - clusterInfo.put(ClusterProperties.controller.name(), "No Lead Controller!"); - } - - boolean paused = (dataAccessor.getProperty(keyBuilder.pause()) == null ? false : true); - clusterInfo.put(ClusterProperties.paused.name(), paused); - boolean maintenance = - (dataAccessor.getProperty(keyBuilder.maintenance()) == null ? false : true); - clusterInfo.put(ClusterProperties.maintenance.name(), maintenance); - - List<String> idealStates = dataAccessor.getChildNames(keyBuilder.idealStates()); - clusterInfo.put(ClusterProperties.resources.name(), idealStates); - List<String> instances = dataAccessor.getChildNames(keyBuilder.instanceConfigs()); - clusterInfo.put(ClusterProperties.instances.name(), instances); - List<String> liveInstances = dataAccessor.getChildNames(keyBuilder.liveInstances()); - clusterInfo.put(ClusterProperties.liveInstances.name(), liveInstances); - - return JSONRepresentation(clusterInfo); - } - - - @PUT - @Path("{clusterId}") - public Response createCluster(@PathParam("clusterId") String clusterId, - @DefaultValue("false") @QueryParam("recreate") String recreate) { - boolean recreateIfExists = Boolean.valueOf(recreate); - ClusterSetup clusterSetup = getClusterSetup(); - - try { - clusterSetup.addCluster(clusterId, recreateIfExists); - } catch (Exception ex) { - _logger.error("Failed to create cluster " + clusterId + ", exception: " + ex); - return serverError(ex); - } - - return created(); - } - - @DELETE - @Path("{clusterId}") - public Response deleteCluster(@PathParam("clusterId") String clusterId) { - ClusterSetup clusterSetup = getClusterSetup(); - - try { - clusterSetup.deleteCluster(clusterId); - } catch (HelixException ex) { - _logger.info( - "Failed to delete cluster " + clusterId + ", cluster is still in use. Exception: " + ex); - return badRequest(ex.getMessage()); - } catch (Exception ex) { - _logger.error("Failed to delete cluster " + clusterId + ", exception: " + ex); - return serverError(ex); - } - - return OK(); - } - - @POST - @Path("{clusterId}") - public Response updateCluster(@PathParam("clusterId") String clusterId, - @QueryParam("command") String commandStr, @QueryParam("superCluster") String superCluster, - String content) { - Command command; - try { - command = getCommand(commandStr); - } catch (HelixException ex) { - return badRequest(ex.getMessage()); - } - - ClusterSetup clusterSetup = getClusterSetup(); - HelixAdmin helixAdmin = getHelixAdmin(); - - switch (command) { - case activate: - if (superCluster == null) { - return badRequest("Super Cluster name is missing!"); - } - try { - clusterSetup.activateCluster(clusterId, superCluster, true); - } catch (Exception ex) { - _logger.error("Failed to add cluster " + clusterId + " to super cluster " + superCluster); - return serverError(ex); - } - break; - - case expand: - try { - clusterSetup.expandCluster(clusterId); - } catch (Exception ex) { - _logger.error("Failed to expand cluster " + clusterId); - return serverError(ex); - } - break; - - case enable: - try { - helixAdmin.enableCluster(clusterId, true); - } catch (Exception ex) { - _logger.error("Failed to enable cluster " + clusterId); - return serverError(ex); - } - break; - - case disable: - try { - helixAdmin.enableCluster(clusterId, false); - } catch (Exception ex) { - _logger.error("Failed to disable cluster " + clusterId); - return serverError(ex); - } - break; - case enableMaintenanceMode: - try { - helixAdmin.enableMaintenanceMode(clusterId, true, content); - } catch (Exception ex) { - _logger.error("Failed to enable maintenance mode " + clusterId); - return serverError(ex); - } - break; - case disableMaintenanceMode: - try { - helixAdmin.enableMaintenanceMode(clusterId, false); - } catch (Exception ex) { - _logger.error("Failed to disable maintenance mode " + clusterId); - return serverError(ex); - } - break; - default: - return badRequest("Unsupported command " + command); - } - - return OK(); - } - - - @GET - @Path("{clusterId}/configs") - public Response getClusterConfig(@PathParam("clusterId") String clusterId) { - ConfigAccessor accessor = getConfigAccessor(); - ClusterConfig config = null; - try { - config = accessor.getClusterConfig(clusterId); - } catch (HelixException ex) { - // cluster not found. - _logger.info("Failed to get cluster config for cluster " + clusterId - + ", cluster not found, Exception: " + ex); - } catch (Exception ex) { - _logger.error("Failed to get cluster config for cluster " + clusterId + " Exception: " + ex); - return serverError(ex); - } - if (config == null) { - return notFound(); - } - return JSONRepresentation(config.getRecord()); - } - - @POST - @Path("{clusterId}/configs") - public Response updateClusterConfig( - @PathParam("clusterId") String clusterId, @QueryParam("command") String commandStr, - String content) { - Command command; - try { - command = getCommand(commandStr); - } catch (HelixException ex) { - return badRequest(ex.getMessage()); - } - - ZNRecord record; - try { - record = toZNRecord(content); - } catch (IOException e) { - _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e); - return badRequest("Input is not a valid ZNRecord!"); - } - - if (!record.getId().equals(clusterId)) { - return badRequest("ID does not match the cluster name in input!"); - } - - ClusterConfig config = new ClusterConfig(record); - ConfigAccessor configAccessor = getConfigAccessor(); - try { - switch (command) { - case update: - configAccessor.updateClusterConfig(clusterId, config); - break; - case delete: { - HelixConfigScope clusterScope = - new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER) - .forCluster(clusterId).build(); - configAccessor.remove(clusterScope, config.getRecord()); - } - break; - - default: - return badRequest("Unsupported command " + commandStr); - } - } catch (HelixException ex) { - return notFound(ex.getMessage()); - } catch (Exception ex) { - _logger.error( - "Failed to " + command + " cluster config, cluster " + clusterId + " new config: " - + content + ", Exception: " + ex); - return serverError(ex); - } - return OK(); - } - - @GET - @Path("{clusterId}/controller") - public Response getClusterController(@PathParam("clusterId") String clusterId) { - HelixDataAccessor dataAccessor = getDataAccssor(clusterId); - Map<String, Object> controllerInfo = new HashMap<>(); - controllerInfo.put(Properties.id.name(), clusterId); - - LiveInstance leader = dataAccessor.getProperty(dataAccessor.keyBuilder().controllerLeader()); - if (leader != null) { - controllerInfo.put(ClusterProperties.controller.name(), leader.getInstanceName()); - controllerInfo.putAll(leader.getRecord().getSimpleFields()); - } else { - controllerInfo.put(ClusterProperties.controller.name(), "No Lead Controller!"); - } - - return JSONRepresentation(controllerInfo); - } - - @GET - @Path("{clusterId}/controller/history") - public Response getClusterControllerHistory(@PathParam("clusterId") String clusterId) { - HelixDataAccessor dataAccessor = getDataAccssor(clusterId); - Map<String, Object> controllerHistory = new HashMap<>(); - controllerHistory.put(Properties.id.name(), clusterId); - - LeaderHistory history = - dataAccessor.getProperty(dataAccessor.keyBuilder().controllerLeaderHistory()); - if (history != null) { - controllerHistory.put(Properties.history.name(), history.getHistoryList()); - } else { - controllerHistory.put(Properties.history.name(), Collections.emptyList()); - } - - return JSONRepresentation(controllerHistory); - } - - @GET - @Path("{clusterId}/controller/messages") - public Response getClusterControllerMessages(@PathParam("clusterId") String clusterId) { - HelixDataAccessor dataAccessor = getDataAccssor(clusterId); - - Map<String, Object> controllerMessages = new HashMap<>(); - controllerMessages.put(Properties.id.name(), clusterId); - - List<String> messages = - dataAccessor.getChildNames(dataAccessor.keyBuilder().controllerMessages()); - controllerMessages.put(ClusterProperties.messages.name(), messages); - controllerMessages.put(Properties.count.name(), messages.size()); - - return JSONRepresentation(controllerMessages); - } - - @GET - @Path("{clusterId}/controller/messages/{messageId}") - public Response getClusterControllerMessages(@PathParam("clusterId") String clusterId, @PathParam("messageId") String messageId) { - HelixDataAccessor dataAccessor = getDataAccssor(clusterId); - Message message = dataAccessor.getProperty( - dataAccessor.keyBuilder().controllerMessage(messageId)); - return JSONRepresentation(message.getRecord()); - } - - @GET - @Path("{clusterId}/statemodeldefs") - public Response getClusterStateModelDefinitions(@PathParam("clusterId") String clusterId) { - HelixDataAccessor dataAccessor = getDataAccssor(clusterId); - List<String> stateModelDefs = - dataAccessor.getChildNames(dataAccessor.keyBuilder().stateModelDefs()); - - Map<String, Object> clusterStateModelDefs = new HashMap<>(); - clusterStateModelDefs.put(Properties.id.name(), clusterId); - clusterStateModelDefs.put(ClusterProperties.stateModelDefinitions.name(), stateModelDefs); - - return JSONRepresentation(clusterStateModelDefs); - } - - @GET - @Path("{clusterId}/statemodeldefs/{statemodel}") - public Response getClusterStateModelDefinition(@PathParam("clusterId") String clusterId, - @PathParam("statemodel") String statemodel) { - HelixDataAccessor dataAccessor = getDataAccssor(clusterId); - StateModelDefinition stateModelDef = - dataAccessor.getProperty(dataAccessor.keyBuilder().stateModelDef(statemodel)); - - return JSONRepresentation(stateModelDef.getRecord()); - } - - private boolean isClusterExist(String cluster) { - ZkClient zkClient = getZkClient(); - if (ZKUtil.isClusterSetup(cluster, zkClient)) { - return true; - } - return false; - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/InstanceAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/InstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/InstanceAccessor.java deleted file mode 100644 index eeecba9..0000000 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/InstanceAccessor.java +++ /dev/null @@ -1,545 +0,0 @@ -package org.apache.helix.rest.server.resources; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import javax.ws.rs.DELETE; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.Response; - -import org.apache.helix.HelixAdmin; -import org.apache.helix.HelixDataAccessor; -import org.apache.helix.HelixException; -import org.apache.helix.ZNRecord; -import org.apache.helix.model.ClusterConfig; -import org.apache.helix.model.CurrentState; -import org.apache.helix.model.Error; -import org.apache.helix.model.HealthStat; -import org.apache.helix.model.InstanceConfig; -import org.apache.helix.model.LiveInstance; -import org.apache.helix.model.Message; -import org.apache.helix.model.ParticipantHistory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.codehaus.jackson.JsonNode; -import org.codehaus.jackson.node.ArrayNode; -import org.codehaus.jackson.node.JsonNodeFactory; -import org.codehaus.jackson.node.ObjectNode; - -@Path("/clusters/{clusterId}/instances") -public class InstanceAccessor extends AbstractResource { - private final static Logger _logger = LoggerFactory.getLogger(InstanceAccessor.class); - - public enum InstanceProperties { - instances, - online, - disabled, - config, - liveInstance, - resource, - resources, - partitions, - errors, - new_messages, - read_messages, - total_message_count, - read_message_count, - healthreports, - instanceTags - } - - @GET - public Response getInstances(@PathParam("clusterId") String clusterId) { - HelixDataAccessor accessor = getDataAccssor(clusterId); - ObjectNode root = JsonNodeFactory.instance.objectNode(); - root.put(Properties.id.name(), JsonNodeFactory.instance.textNode(clusterId)); - - ArrayNode instancesNode = root.putArray(InstanceProperties.instances.name()); - ArrayNode onlineNode = root.putArray(InstanceProperties.online.name()); - ArrayNode disabledNode = root.putArray(InstanceProperties.disabled.name()); - - List<String> instances = accessor.getChildNames(accessor.keyBuilder().instanceConfigs()); - - if (instances != null) { - instancesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(instances)); - } else { - return notFound(); - } - - List<String> liveInstances = accessor.getChildNames(accessor.keyBuilder().liveInstances()); - ClusterConfig clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig()); - - for (String instanceName : instances) { - InstanceConfig instanceConfig = - accessor.getProperty(accessor.keyBuilder().instanceConfig(instanceName)); - if (instanceConfig != null) { - if (!instanceConfig.getInstanceEnabled() || (clusterConfig.getDisabledInstances() != null - && clusterConfig.getDisabledInstances().containsKey(instanceName))) { - disabledNode.add(JsonNodeFactory.instance.textNode(instanceName)); - } - - if (liveInstances.contains(instanceName)){ - onlineNode.add(JsonNodeFactory.instance.textNode(instanceName)); - } - } - } - - return JSONRepresentation(root); - } - - @POST - public Response updateInstances(@PathParam("clusterId") String clusterId, - @QueryParam("command") String command, String content) { - Command cmd; - try { - cmd = Command.valueOf(command); - } catch (Exception e) { - return badRequest("Invalid command : " + command); - } - - HelixAdmin admin = getHelixAdmin(); - try { - JsonNode node = null; - if (content.length() != 0) { - node = OBJECT_MAPPER.readTree(content); - } - if (node == null) { - return badRequest("Invalid input for content : " + content); - } - List<String> enableInstances = OBJECT_MAPPER - .readValue(node.get(InstanceProperties.instances.name()).toString(), - OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class)); - switch (cmd) { - case enable: - admin.enableInstance(clusterId, enableInstances, true); - - break; - case disable: - admin.enableInstance(clusterId, enableInstances, false); - break; - default: - _logger.error("Unsupported command :" + command); - return badRequest("Unsupported command :" + command); - } - } catch (Exception e) { - _logger.error("Failed in updating instances : " + content, e); - return badRequest(e.getMessage()); - } - return OK(); - } - - @GET - @Path("{instanceName}") - public Response getInstance(@PathParam("clusterId") String clusterId, - @PathParam("instanceName") String instanceName) throws IOException { - HelixDataAccessor accessor = getDataAccssor(clusterId); - Map<String, Object> instanceMap = new HashMap<>(); - instanceMap.put(Properties.id.name(), JsonNodeFactory.instance.textNode(instanceName)); - instanceMap.put(InstanceProperties.liveInstance.name(), null); - - InstanceConfig instanceConfig = - accessor.getProperty(accessor.keyBuilder().instanceConfig(instanceName)); - LiveInstance liveInstance = - accessor.getProperty(accessor.keyBuilder().liveInstance(instanceName)); - - if (instanceConfig != null) { - instanceMap.put(InstanceProperties.config.name(), instanceConfig.getRecord()); - } else { - return notFound(); - } - - if (liveInstance != null) { - instanceMap.put(InstanceProperties.liveInstance.name(), liveInstance.getRecord()); - } - - return JSONRepresentation(instanceMap); - } - - @PUT - @Path("{instanceName}") - public Response addInstance(@PathParam("clusterId") String clusterId, - @PathParam("instanceName") String instanceName, String content) { - HelixAdmin admin = getHelixAdmin(); - ZNRecord record; - try { - record = toZNRecord(content); - } catch (IOException e) { - _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e); - return badRequest("Input is not a vaild ZNRecord!"); - } - - try { - admin.addInstance(clusterId, new InstanceConfig(record)); - } catch (Exception ex) { - _logger.error("Error in adding an instance: " + instanceName, ex); - return serverError(ex); - } - - return OK(); - } - - @POST - @Path("{instanceName}") - public Response updateInstance(@PathParam("clusterId") String clusterId, - @PathParam("instanceName") String instanceName, @QueryParam("command") String command, - String content) { - Command cmd; - try { - cmd = Command.valueOf(command); - } catch (Exception e) { - return badRequest("Invalid command : " + command); - } - - HelixAdmin admin = getHelixAdmin(); - try { - JsonNode node = null; - if (content.length() != 0) { - node = OBJECT_MAPPER.readTree(content); - } - - switch (cmd) { - case enable: - admin.enableInstance(clusterId, instanceName, true); - break; - case disable: - admin.enableInstance(clusterId, instanceName, false); - break; - case reset: - if (!validInstance(node, instanceName)) { - return badRequest("Instance names are not match!"); - } - admin.resetPartition(clusterId, instanceName, - node.get(InstanceProperties.resource.name()).toString(), (List<String>) OBJECT_MAPPER - .readValue(node.get(InstanceProperties.partitions.name()).toString(), - OBJECT_MAPPER.getTypeFactory() - .constructCollectionType(List.class, String.class))); - break; - case addInstanceTag: - if (!validInstance(node, instanceName)) { - return badRequest("Instance names are not match!"); - } - for (String tag : (List<String>) OBJECT_MAPPER - .readValue(node.get(InstanceProperties.instanceTags.name()).toString(), - OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class))) { - admin.addInstanceTag(clusterId, instanceName, tag); - } - break; - case removeInstanceTag: - if (!validInstance(node, instanceName)) { - return badRequest("Instance names are not match!"); - } - for (String tag : (List<String>) OBJECT_MAPPER - .readValue(node.get(InstanceProperties.instanceTags.name()).toString(), - OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class))) { - admin.removeInstanceTag(clusterId, instanceName, tag); - } - break; - case enablePartitions: - admin.enablePartition(true, clusterId, instanceName, - node.get(InstanceProperties.resource.name()).getTextValue(), - (List<String>) OBJECT_MAPPER - .readValue(node.get(InstanceProperties.partitions.name()).toString(), - OBJECT_MAPPER.getTypeFactory() - .constructCollectionType(List.class, String.class))); - break; - case disablePartitions: - admin.enablePartition(false, clusterId, instanceName, - node.get(InstanceProperties.resource.name()).getTextValue(), - (List<String>) OBJECT_MAPPER - .readValue(node.get(InstanceProperties.partitions.name()).toString(), - OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class))); - break; - default: - _logger.error("Unsupported command :" + command); - return badRequest("Unsupported command :" + command); - } - } catch (Exception e) { - _logger.error("Failed in updating instance : " + instanceName, e); - return badRequest(e.getMessage()); - } - return OK(); - } - - @DELETE - @Path("{instanceName}") - public Response deleteInstance(@PathParam("clusterId") String clusterId, - @PathParam("instanceName") String instanceName) { - HelixAdmin admin = getHelixAdmin(); - try { - InstanceConfig instanceConfig = admin.getInstanceConfig(clusterId, instanceName); - admin.dropInstance(clusterId, instanceConfig); - } catch (HelixException e) { - return badRequest(e.getMessage()); - } - - return OK(); - } - - @GET - @Path("{instanceName}/configs") - public Response getInstanceConfig(@PathParam("clusterId") String clusterId, - @PathParam("instanceName") String instanceName) throws IOException { - HelixDataAccessor accessor = getDataAccssor(clusterId); - InstanceConfig instanceConfig = - accessor.getProperty(accessor.keyBuilder().instanceConfig(instanceName)); - - if (instanceConfig != null) { - return JSONRepresentation(instanceConfig.getRecord()); - } - - return notFound(); - } - - @PUT - @Path("{instanceName}/configs") - public Response updateInstanceConfig(@PathParam("clusterId") String clusterId, - @PathParam("instanceName") String instanceName, String content) throws IOException { - HelixAdmin admin = getHelixAdmin(); - ZNRecord record; - try { - record = toZNRecord(content); - } catch (IOException e) { - _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e); - return badRequest("Input is not a vaild ZNRecord!"); - } - - try { - admin.setInstanceConfig(clusterId, instanceName, new InstanceConfig(record)); - } catch (Exception ex) { - _logger.error("Error in update instance config: " + instanceName, ex); - return serverError(ex); - } - - return OK(); - } - - @GET - @Path("{instanceName}/resources") - public Response getResourcesOnInstance(@PathParam("clusterId") String clusterId, - @PathParam("instanceName") String instanceName) throws IOException { - HelixDataAccessor accessor = getDataAccssor(clusterId); - - ObjectNode root = JsonNodeFactory.instance.objectNode(); - root.put(Properties.id.name(), instanceName); - ArrayNode resourcesNode = root.putArray(InstanceProperties.resources.name()); - - List<String> sessionIds = accessor.getChildNames(accessor.keyBuilder().sessions(instanceName)); - if (sessionIds == null || sessionIds.size() == 0) { - return null; - } - - // Only get resource list from current session id - String currentSessionId = sessionIds.get(0); - - List<String> resources = - accessor.getChildNames(accessor.keyBuilder().currentStates(instanceName, currentSessionId)); - if (resources != null && resources.size() > 0) { - resourcesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(resources)); - } - - return JSONRepresentation(root); - } - - @GET - @Path("{instanceName}/resources/{resourceName}") - public Response getResourceOnInstance(@PathParam("clusterId") String clusterId, - @PathParam("instanceName") String instanceName, - @PathParam("resourceName") String resourceName) throws IOException { - HelixDataAccessor accessor = getDataAccssor(clusterId); - List<String> sessionIds = accessor.getChildNames(accessor.keyBuilder().sessions(instanceName)); - if (sessionIds == null || sessionIds.size() == 0) { - return notFound(); - } - - // Only get resource list from current session id - String currentSessionId = sessionIds.get(0); - CurrentState resourceCurrentState = accessor - .getProperty(accessor.keyBuilder().currentState(instanceName, currentSessionId, resourceName)); - if (resourceCurrentState != null) { - return JSONRepresentation(resourceCurrentState.getRecord()); - } - - return notFound(); - } - - @GET - @Path("{instanceName}/errors") - public Response getErrorsOnInstance(@PathParam("clusterId") String clusterId, - @PathParam("instanceName") String instanceName) throws IOException { - HelixDataAccessor accessor = getDataAccssor(clusterId); - - ObjectNode root = JsonNodeFactory.instance.objectNode(); - root.put(Properties.id.name(), instanceName); - ObjectNode errorsNode = JsonNodeFactory.instance.objectNode(); - - List<String> sessionIds = - accessor.getChildNames(accessor.keyBuilder().errors(instanceName)); - - if (sessionIds == null || sessionIds.size() == 0) { - return notFound(); - } - - for (String sessionId : sessionIds) { - List<String> resources = - accessor.getChildNames(accessor.keyBuilder().errors(instanceName, sessionId)); - if (resources != null) { - ObjectNode resourcesNode = JsonNodeFactory.instance.objectNode(); - for (String resourceName : resources) { - List<String> partitions = accessor - .getChildNames(accessor.keyBuilder().errors(instanceName, sessionId, resourceName)); - if (partitions != null) { - ArrayNode partitionsNode = resourcesNode.putArray(resourceName); - partitionsNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(partitions)); - } - } - errorsNode.put(sessionId, resourcesNode); - } - } - root.put(InstanceProperties.errors.name(), errorsNode); - - return JSONRepresentation(root); - } - - @GET - @Path("{instanceName}/errors/{sessionId}/{resourceName}/{partitionName}") - public Response getErrorsOnInstance(@PathParam("clusterId") String clusterId, - @PathParam("instanceName") String instanceName, @PathParam("sessionId") String sessionId, - @PathParam("resourceName") String resourceName, - @PathParam("partitionName") String partitionName) throws IOException { - HelixDataAccessor accessor = getDataAccssor(clusterId); - Error error = accessor.getProperty(accessor.keyBuilder() - .stateTransitionError(instanceName, sessionId, resourceName, partitionName)); - if (error != null) { - return JSONRepresentation(error.getRecord()); - } - - return notFound(); - } - - @GET - @Path("{instanceName}/history") - public Response getHistoryOnInstance(@PathParam("clusterId") String clusterId, - @PathParam("instanceName") String instanceName) throws IOException { - HelixDataAccessor accessor = getDataAccssor(clusterId); - ParticipantHistory history = - accessor.getProperty(accessor.keyBuilder().participantHistory(instanceName)); - if (history != null) { - return JSONRepresentation(history.getRecord()); - } - return notFound(); - } - - @GET - @Path("{instanceName}/messages") - public Response getMessagesOnInstance(@PathParam("clusterId") String clusterId, - @PathParam("instanceName") String instanceName) throws IOException { - HelixDataAccessor accessor = getDataAccssor(clusterId); - - ObjectNode root = JsonNodeFactory.instance.objectNode(); - root.put(Properties.id.name(), instanceName); - ArrayNode newMessages = root.putArray(InstanceProperties.new_messages.name()); - ArrayNode readMessages = root.putArray(InstanceProperties.read_messages.name()); - - - List<String> messages = - accessor.getChildNames(accessor.keyBuilder().messages(instanceName)); - if (messages == null || messages.size() == 0) { - return notFound(); - } - - for (String messageName : messages) { - Message message = accessor.getProperty(accessor.keyBuilder().message(instanceName, messageName)); - if (message.getMsgState() == Message.MessageState.NEW) { - newMessages.add(messageName); - } - - if (message.getMsgState() == Message.MessageState.READ) { - readMessages.add(messageName); - } - } - - root.put(InstanceProperties.total_message_count.name(), - newMessages.size() + readMessages.size()); - root.put(InstanceProperties.read_message_count.name(), readMessages.size()); - - return JSONRepresentation(root); - } - - @GET - @Path("{instanceName}/messages/{messageId}") - public Response getMessageOnInstance(@PathParam("clusterId") String clusterId, - @PathParam("instanceName") String instanceName, - @PathParam("messageId") String messageId) throws IOException { - HelixDataAccessor accessor = getDataAccssor(clusterId); - Message message = accessor.getProperty(accessor.keyBuilder().message(instanceName, messageId)); - if (message != null) { - return JSONRepresentation(message.getRecord()); - } - - return notFound(); - } - - @GET - @Path("{instanceName}/healthreports") - public Response getHealthReportsOnInstance(@PathParam("clusterId") String clusterId, - @PathParam("instanceName") String instanceName) throws IOException { - HelixDataAccessor accessor = getDataAccssor(clusterId); - - ObjectNode root = JsonNodeFactory.instance.objectNode(); - root.put(Properties.id.name(), instanceName); - ArrayNode healthReportsNode = root.putArray(InstanceProperties.healthreports.name()); - - List<String> healthReports = - accessor.getChildNames(accessor.keyBuilder().healthReports(instanceName)); - - if (healthReports != null && healthReports.size() > 0) { - healthReportsNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(healthReports)); - } - - return JSONRepresentation(root); - } - - @GET - @Path("{instanceName}/healthreports/{reportName}") - public Response getHealthReportsOnInstance(@PathParam("clusterId") String clusterId, - @PathParam("instanceName") String instanceName, - @PathParam("reportName") String reportName) throws IOException { - HelixDataAccessor accessor = getDataAccssor(clusterId); - HealthStat healthStat = - accessor.getProperty(accessor.keyBuilder().healthReport(instanceName, reportName)); - if (healthStat != null) { - return JSONRepresentation(healthStat); - } - - return notFound(); - } - - private boolean validInstance(JsonNode node, String instanceName) { - return instanceName.equals(node.get(Properties.id.name()).getValueAsText()); - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/JobAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/JobAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/JobAccessor.java deleted file mode 100644 index db6e6ad..0000000 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/JobAccessor.java +++ /dev/null @@ -1,200 +0,0 @@ -package org.apache.helix.rest.server.resources; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import javax.ws.rs.DELETE; -import javax.ws.rs.GET; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.core.Response; - -import org.apache.helix.HelixException; -import org.apache.helix.ZNRecord; -import org.apache.helix.task.JobConfig; -import org.apache.helix.task.JobContext; -import org.apache.helix.task.TaskConfig; -import org.apache.helix.task.TaskDriver; -import org.apache.helix.task.WorkflowConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.codehaus.jackson.node.ArrayNode; -import org.codehaus.jackson.node.JsonNodeFactory; -import org.codehaus.jackson.node.ObjectNode; - -@Path("/clusters/{clusterId}/workflows/{workflowName}/jobs") -public class JobAccessor extends AbstractResource { - private static Logger _logger = LoggerFactory.getLogger(JobAccessor.class.getName()); - - public enum JobProperties { - Jobs, - JobConfig, - JobContext, - TASK_COMMAND - } - - @GET - public Response getJobs(@PathParam("clusterId") String clusterId, - @PathParam("workflowName") String workflowName) { - TaskDriver driver = getTaskDriver(clusterId); - WorkflowConfig workflowConfig = driver.getWorkflowConfig(workflowName); - ObjectNode root = JsonNodeFactory.instance.objectNode(); - - if (workflowConfig == null) { - return badRequest(String.format("Workflow %s is not found!", workflowName)); - } - - Set<String> jobs = workflowConfig.getJobDag().getAllNodes(); - root.put(Properties.id.name(), JobProperties.Jobs.name()); - ArrayNode jobsNode = root.putArray(JobProperties.Jobs.name()); - - if (jobs != null) { - jobsNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(jobs)); - } - return JSONRepresentation(root); - } - - @GET - @Path("{jobName}") - public Response getJob(@PathParam("clusterId") String clusterId, - @PathParam("workflowName") String workflowName, @PathParam("jobName") String jobName) { - TaskDriver driver = getTaskDriver(clusterId); - Map<String, ZNRecord> jobMap = new HashMap<>(); - - - JobConfig jobConfig = driver.getJobConfig(jobName); - if (jobConfig != null) { - jobMap.put(JobProperties.JobConfig.name(), jobConfig.getRecord()); - } else { - return badRequest(String.format("Job config for %s does not exists", jobName)); - } - - JobContext jobContext = - driver.getJobContext(jobName); - jobMap.put(JobProperties.JobContext.name(), null); - - if (jobContext != null) { - jobMap.put(JobProperties.JobContext.name(), jobContext.getRecord()); - } - - return JSONRepresentation(jobMap); - } - - @PUT - @Path("{jobName}") - public Response addJob(@PathParam("clusterId") String clusterId, - @PathParam("workflowName") String workflowName, @PathParam("jobName") String jobName, - String content) { - ZNRecord record; - TaskDriver driver = getTaskDriver(clusterId); - - try { - record = toZNRecord(content); - JobConfig.Builder jobConfig = JobAccessor.getJobConfig(record); - driver.enqueueJob(workflowName, jobName, jobConfig); - } catch (HelixException e) { - return badRequest( - String.format("Failed to enqueue job %s for reason : %s", jobName, e.getMessage())); - } catch (IOException e) { - return badRequest(String.format("Invalid input for Job Config of Job : %s", jobName)); - } - - return OK(); - } - - @DELETE - @Path("{jobName}") - public Response deleteJob(@PathParam("clusterId") String clusterId, - @PathParam("workflowName") String workflowName, @PathParam("jobName") String jobName) { - TaskDriver driver = getTaskDriver(clusterId); - - try { - driver.deleteJob(workflowName, jobName); - } catch (Exception e) { - return badRequest(e.getMessage()); - } - - return OK(); - } - - @GET - @Path("{jobName}/configs") - public Response getJobConfig(@PathParam("clusterId") String clusterId, - @PathParam("workflowName") String workflowName, @PathParam("jobName") String jobName) { - TaskDriver driver = getTaskDriver(clusterId); - - JobConfig jobConfig = driver.getJobConfig(jobName); - if (jobConfig != null) { - return JSONRepresentation(jobConfig.getRecord()); - } - return badRequest("Job config for " + jobName + " does not exists"); - } - - @GET - @Path("{jobName}/context") - public Response getJobContext(@PathParam("clusterId") String clusterId, - @PathParam("workflowName") String workflowName, @PathParam("jobName") String jobName) { - TaskDriver driver = getTaskDriver(clusterId); - - JobContext jobContext = - driver.getJobContext(jobName); - if (jobContext != null) { - return JSONRepresentation(jobContext.getRecord()); - } - return badRequest("Job context for " + jobName + " does not exists"); - } - - protected static JobConfig.Builder getJobConfig(Map<String, String> cfgMap) { - return new JobConfig.Builder().fromMap(cfgMap); - } - - protected static JobConfig.Builder getJobConfig(ZNRecord record) { - JobConfig.Builder jobConfig = new JobConfig.Builder().fromMap(record.getSimpleFields()); - jobConfig.addTaskConfigMap(getTaskConfigMap(record.getMapFields())); - - return jobConfig; - } - - private static Map<String, TaskConfig> getTaskConfigMap( - Map<String, Map<String, String>> taskConfigs) { - Map<String, TaskConfig> taskConfigsMap = new HashMap<>(); - if (taskConfigs == null || taskConfigs.isEmpty()) { - return Collections.emptyMap(); - } - - for (Map<String, String> taskConfigMap : taskConfigs.values()) { - if (!taskConfigMap.containsKey(JobProperties.TASK_COMMAND.name())) { - continue; - } - - TaskConfig taskConfig = - new TaskConfig(taskConfigMap.get(JobProperties.TASK_COMMAND.name()), taskConfigMap); - taskConfigsMap.put(taskConfig.getId(), taskConfig); - } - - return taskConfigsMap; - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/ResourceAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/ResourceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/ResourceAccessor.java deleted file mode 100644 index 7ea571f..0000000 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/ResourceAccessor.java +++ /dev/null @@ -1,278 +0,0 @@ -package org.apache.helix.rest.server.resources; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.Response; -import org.apache.helix.ConfigAccessor; -import org.apache.helix.HelixAdmin; -import org.apache.helix.HelixException; -import org.apache.helix.PropertyPathBuilder; -import org.apache.helix.ZNRecord; -import org.apache.helix.manager.zk.ZkClient; -import org.apache.helix.model.ExternalView; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.ResourceConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.codehaus.jackson.node.ArrayNode; -import org.codehaus.jackson.node.JsonNodeFactory; -import org.codehaus.jackson.node.ObjectNode; - -@Path("/clusters/{clusterId}/resources") -public class ResourceAccessor extends AbstractResource { - private final static Logger _logger = LoggerFactory.getLogger(ResourceAccessor.class); - public enum ResourceProperties { - idealState, - idealStates, - externalView, - externalViews, - resourceConfig, - } - - @GET - public Response getResources(@PathParam("clusterId") String clusterId) { - ObjectNode root = JsonNodeFactory.instance.objectNode(); - root.put(Properties.id.name(), JsonNodeFactory.instance.textNode(clusterId)); - - ZkClient zkClient = getZkClient(); - - ArrayNode idealStatesNode = root.putArray(ResourceProperties.idealStates.name()); - ArrayNode externalViewsNode = root.putArray(ResourceProperties.externalViews.name()); - - List<String> idealStates = zkClient.getChildren(PropertyPathBuilder.idealState(clusterId)); - List<String> externalViews = zkClient.getChildren(PropertyPathBuilder.externalView(clusterId)); - - if (idealStates != null) { - idealStatesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(idealStates)); - } else { - return notFound(); - } - - if (externalViews != null) { - externalViewsNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(externalViews)); - } - - return JSONRepresentation(root); - } - - @GET - @Path("{resourceName}") - public Response getResource(@PathParam("clusterId") String clusterId, - @PathParam("resourceName") String resourceName) throws IOException { - ConfigAccessor accessor = getConfigAccessor(); - HelixAdmin admin = getHelixAdmin(); - - ResourceConfig resourceConfig = accessor.getResourceConfig(clusterId, resourceName); - IdealState idealState = admin.getResourceIdealState(clusterId, resourceName); - ExternalView externalView = admin.getResourceExternalView(clusterId, resourceName); - - Map<String, ZNRecord> resourceMap = new HashMap<>(); - if (idealState != null) { - resourceMap.put(ResourceProperties.idealState.name(), idealState.getRecord()); - } else { - return notFound(); - } - - resourceMap.put(ResourceProperties.resourceConfig.name(), null); - resourceMap.put(ResourceProperties.externalView.name(), null); - - if (resourceConfig != null) { - resourceMap.put(ResourceProperties.resourceConfig.name(), resourceConfig.getRecord()); - } - - if (externalView != null) { - resourceMap.put(ResourceProperties.externalView.name(), externalView.getRecord()); - } - - return JSONRepresentation(resourceMap); - } - - @PUT - @Path("{resourceName}") - public Response addResource(@PathParam("clusterId") String clusterId, - @PathParam("resourceName") String resourceName, - @DefaultValue("-1") @QueryParam("numPartitions") int numPartitions, - @DefaultValue("") @QueryParam("stateModelRef") String stateModelRef, - @DefaultValue("SEMI_AUTO") @QueryParam("rebalancerMode") String rebalancerMode, - @DefaultValue("DEFAULT") @QueryParam("rebalanceStrategy") String rebalanceStrategy, - @DefaultValue("0") @QueryParam("bucketSize") int bucketSize, - @DefaultValue("-1") @QueryParam("maxPartitionsPerInstance") int maxPartitionsPerInstance, - String content) { - - HelixAdmin admin = getHelixAdmin(); - - try { - if (content.length() != 0) { - ZNRecord record; - try { - record = toZNRecord(content); - } catch (IOException e) { - _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e); - return badRequest("Input is not a vaild ZNRecord!"); - } - - if (record.getSimpleFields() != null) { - admin.addResource(clusterId, resourceName, new IdealState(record)); - } - } else { - admin.addResource(clusterId, resourceName, numPartitions, stateModelRef, rebalancerMode, - rebalanceStrategy, bucketSize, maxPartitionsPerInstance); - } - } catch (Exception e) { - _logger.error("Error in adding a resource: " + resourceName, e); - return serverError(e); - } - - return OK(); - } - - @POST - @Path("{resourceName}") - public Response updateResource(@PathParam("clusterId") String clusterId, - @PathParam("resourceName") String resourceName, @QueryParam("command") String command, - @DefaultValue("-1") @QueryParam("replicas") int replicas, - @DefaultValue("") @QueryParam("keyPrefix") String keyPrefix, - @DefaultValue("") @QueryParam("group") String group){ - Command cmd; - try { - cmd = Command.valueOf(command); - } catch (Exception e) { - return badRequest("Invalid command : " + command); - } - - HelixAdmin admin = getHelixAdmin(); - try { - switch (cmd) { - case enable: - admin.enableResource(clusterId, resourceName, true); - break; - case disable: - admin.enableResource(clusterId, resourceName, false); - break; - case rebalance: - if (replicas == -1) { - return badRequest("Number of replicas is needed for rebalancing!"); - } - keyPrefix = keyPrefix.length() == 0 ? resourceName : keyPrefix; - admin.rebalance(clusterId, resourceName, replicas, keyPrefix, group); - break; - default: - _logger.error("Unsupported command :" + command); - return badRequest("Unsupported command :" + command); - } - } catch (Exception e) { - _logger.error("Failed in updating resource : " + resourceName, e); - return badRequest(e.getMessage()); - } - return OK(); - } - - @DELETE - @Path("{resourceName}") - public Response deleteResource(@PathParam("clusterId") String clusterId, - @PathParam("resourceName") String resourceName) { - HelixAdmin admin = getHelixAdmin(); - try { - admin.dropResource(clusterId, resourceName); - } catch (Exception e) { - _logger.error("Error in deleting a resource: " + resourceName, e); - return serverError(); - } - return OK(); - } - - @GET - @Path("{resourceName}/configs") - public Response getResourceConfig(@PathParam("clusterId") String clusterId, - @PathParam("resourceName") String resourceName) { - ConfigAccessor accessor = getConfigAccessor(); - ResourceConfig resourceConfig = accessor.getResourceConfig(clusterId, resourceName); - if (resourceConfig != null) { - return JSONRepresentation(resourceConfig.getRecord()); - } - - return notFound(); - } - - @POST - @Path("{resourceName}/configs") - public Response updateResourceConfig(@PathParam("clusterId") String clusterId, - @PathParam("resourceName") String resourceName, String content) { - ZNRecord record; - try { - record = toZNRecord(content); - } catch (IOException e) { - _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e); - return badRequest("Input is not a vaild ZNRecord!"); - } - ResourceConfig resourceConfig = new ResourceConfig(record); - ConfigAccessor configAccessor = getConfigAccessor(); - try { - configAccessor.updateResourceConfig(clusterId, resourceName, resourceConfig); - } catch (HelixException ex) { - return notFound(ex.getMessage()); - } catch (Exception ex) { - _logger.error( - "Failed to update cluster config, cluster " + clusterId + " new config: " + content - + ", Exception: " + ex); - return serverError(ex); - } - return OK(); - } - - @GET - @Path("{resourceName}/idealState") - public Response getResourceIdealState(@PathParam("clusterId") String clusterId, - @PathParam("resourceName") String resourceName) { - HelixAdmin admin = getHelixAdmin(); - IdealState idealState = admin.getResourceIdealState(clusterId, resourceName); - if (idealState != null) { - return JSONRepresentation(idealState.getRecord()); - } - - return notFound(); - } - - @GET - @Path("{resourceName}/externalView") - public Response getResourceExternalView(@PathParam("clusterId") String clusterId, - @PathParam("resourceName") String resourceName) { - HelixAdmin admin = getHelixAdmin(); - ExternalView externalView = admin.getResourceExternalView(clusterId, resourceName); - if (externalView != null) { - return JSONRepresentation(externalView.getRecord()); - } - - return notFound(); - } - -} http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/UIResourceAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/UIResourceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/UIResourceAccessor.java deleted file mode 100644 index 7ef22a4..0000000 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/UIResourceAccessor.java +++ /dev/null @@ -1,62 +0,0 @@ -package org.apache.helix.rest.server.resources; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.PathSegment; -import javax.ws.rs.core.Response; -import java.io.InputStream; -import java.util.List; - -@Path("/ui") -public class UIResourceAccessor extends AbstractResource { - private static final String INDEX_PAGE = "index.html"; - private static final String UI_RESOURCE_FOLDER = "ui"; - - @GET - public Response getIndex() { - return getStaticFile(INDEX_PAGE); - } - - @GET - @Path("{fileName}") - public Response getStaticFile(@PathParam("fileName") String fileName) { - InputStream is = getClass().getClassLoader().getResourceAsStream(UI_RESOURCE_FOLDER + "/" + fileName); - - if (is == null) { - // forward any other requests to index except index is not found - return fileName.equalsIgnoreCase(INDEX_PAGE) ? notFound() : getIndex(); - } - - return Response.ok(is, MediaType.TEXT_HTML).build(); - } - - @GET - @Path("{any: .*}") - public Response getStaticFile(@PathParam("any") List<PathSegment> segments) { - // get the last segment - String fileName = segments.get(segments.size() - 1).getPath(); - - return getStaticFile(fileName); - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/WorkflowAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/WorkflowAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/WorkflowAccessor.java deleted file mode 100644 index 398a4d2..0000000 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/WorkflowAccessor.java +++ /dev/null @@ -1,325 +0,0 @@ -package org.apache.helix.rest.server.resources; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import javax.ws.rs.DELETE; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.Response; - -import org.apache.helix.HelixException; -import org.apache.helix.ZNRecord; -import org.apache.helix.task.JobConfig; -import org.apache.helix.task.JobDag; -import org.apache.helix.task.JobQueue; -import org.apache.helix.task.TaskDriver; -import org.apache.helix.task.Workflow; -import org.apache.helix.task.WorkflowConfig; -import org.apache.helix.task.WorkflowContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.codehaus.jackson.JsonNode; -import org.codehaus.jackson.map.type.TypeFactory; -import org.codehaus.jackson.node.ArrayNode; -import org.codehaus.jackson.node.JsonNodeFactory; -import org.codehaus.jackson.node.ObjectNode; -import org.codehaus.jackson.node.TextNode; - -@Path("/clusters/{clusterId}/workflows") -public class WorkflowAccessor extends AbstractResource { - private static Logger _logger = LoggerFactory.getLogger(WorkflowAccessor.class.getName()); - - public enum WorkflowProperties { - Workflows, - WorkflowConfig, - WorkflowContext, - Jobs, - ParentJobs - } - - public enum TaskCommand { - stop, - resume, - clean - } - - @GET - public Response getWorkflows(@PathParam("clusterId") String clusterId) { - TaskDriver taskDriver = getTaskDriver(clusterId); - Map<String, WorkflowConfig> workflowConfigMap = taskDriver.getWorkflows(); - Map<String, List<String>> dataMap = new HashMap<>(); - dataMap.put(WorkflowProperties.Workflows.name(), new ArrayList<>(workflowConfigMap.keySet())); - - return JSONRepresentation(dataMap); - } - - @GET - @Path("{workflowId}") - public Response getWorkflow(@PathParam("clusterId") String clusterId, - @PathParam("workflowId") String workflowId) { - TaskDriver taskDriver = getTaskDriver(clusterId); - WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(workflowId); - WorkflowContext workflowContext = taskDriver.getWorkflowContext(workflowId); - - ObjectNode root = JsonNodeFactory.instance.objectNode(); - TextNode id = JsonNodeFactory.instance.textNode(workflowId); - root.put(Properties.id.name(), id); - - ObjectNode workflowConfigNode = JsonNodeFactory.instance.objectNode(); - ObjectNode workflowContextNode = JsonNodeFactory.instance.objectNode(); - - if (workflowConfig != null) { - getWorkflowConfigNode(workflowConfigNode, workflowConfig.getRecord()); - } - - if (workflowContext != null) { - getWorkflowContextNode(workflowContextNode, workflowContext.getRecord()); - } - - root.put(WorkflowProperties.WorkflowConfig.name(), workflowConfigNode); - root.put(WorkflowProperties.WorkflowContext.name(), workflowContextNode); - - JobDag jobDag = workflowConfig.getJobDag(); - ArrayNode jobs = OBJECT_MAPPER.valueToTree(jobDag.getAllNodes()); - ObjectNode parentJobs = OBJECT_MAPPER.valueToTree(jobDag.getParentsToChildren()); - root.put(WorkflowProperties.Jobs.name(), jobs); - root.put(WorkflowProperties.ParentJobs.name(), parentJobs); - - return JSONRepresentation(root); - } - - @PUT - @Path("{workflowId}") - public Response createWorkflow(@PathParam("clusterId") String clusterId, - @PathParam("workflowId") String workflowId, String content) { - TaskDriver driver = getTaskDriver(clusterId); - Map<String, String> cfgMap; - try { - JsonNode root = OBJECT_MAPPER.readTree(content); - cfgMap = OBJECT_MAPPER - .readValue(root.get(WorkflowProperties.WorkflowConfig.name()).toString(), - TypeFactory.defaultInstance() - .constructMapType(HashMap.class, String.class, String.class)); - - WorkflowConfig workflowConfig = WorkflowConfig.Builder.fromMap(cfgMap).build(); - - // Since JobQueue can keep adding jobs, Helix create JobQueue will ignore the jobs - if (workflowConfig.isJobQueue()) { - driver.start(new JobQueue.Builder(workflowId).setWorkflowConfig(workflowConfig).build()); - return OK(); - } - - Workflow.Builder workflow = new Workflow.Builder(workflowId); - - if (root.get(WorkflowProperties.Jobs.name()) != null) { - Map<String, JobConfig.Builder> jobConfigs = - getJobConfigs((ArrayNode) root.get(WorkflowProperties.Jobs.name())); - for (Map.Entry<String, JobConfig.Builder> job : jobConfigs.entrySet()) { - workflow.addJob(job.getKey(), job.getValue()); - } - } - - if (root.get(WorkflowProperties.ParentJobs.name()) != null) { - Map<String, List<String>> parentJobs = OBJECT_MAPPER - .readValue(root.get(WorkflowProperties.ParentJobs.name()).toString(), - TypeFactory.defaultInstance() - .constructMapType(HashMap.class, String.class, List.class)); - for (Map.Entry<String, List<String>> entry : parentJobs.entrySet()) { - String parentJob = entry.getKey(); - for (String childJob : entry.getValue()) { - workflow.addParentChildDependency(parentJob, childJob); - } - } - } - - driver.start(workflow.build()); - } catch (IOException e) { - return badRequest(String - .format("Invalid input of Workflow %s for reason : %s", workflowId, e.getMessage())); - } catch (HelixException e) { - return badRequest(String - .format("Failed to create workflow %s for reason : %s", workflowId, e.getMessage())); - } - return OK(); - } - - @DELETE - @Path("{workflowId}") - public Response deleteWorkflow(@PathParam("clusterId") String clusterId, - @PathParam("workflowId") String workflowId) { - TaskDriver driver = getTaskDriver(clusterId); - try { - driver.delete(workflowId); - } catch (HelixException e) { - return badRequest(String - .format("Failed to delete workflow %s for reason : %s", workflowId, e.getMessage())); - } - return OK(); - } - - @POST - @Path("{workflowId}") - public Response updateWorkflow(@PathParam("clusterId") String clusterId, - @PathParam("workflowId") String workflowId, @QueryParam("command") String command) { - TaskDriver driver = getTaskDriver(clusterId); - - try { - TaskCommand cmd = TaskCommand.valueOf(command); - switch (cmd) { - case stop: - driver.stop(workflowId); - break; - case resume: - driver.resume(workflowId); - break; - case clean: - driver.cleanupQueue(workflowId); - break; - default: - return badRequest(String.format("Invalid command : %s", command)); - } - } catch (HelixException e) { - return badRequest( - String.format("Failed to execute operation %s for reason : %s", command, e.getMessage())); - } catch (Exception e) { - return serverError(e); - } - - return OK(); - } - - @GET - @Path("{workflowId}/configs") - public Response getWorkflowConfig(@PathParam("clusterId") String clusterId, - @PathParam("workflowId") String workflowId) { - TaskDriver taskDriver = getTaskDriver(clusterId); - WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(workflowId); - ObjectNode workflowConfigNode = JsonNodeFactory.instance.objectNode(); - if (workflowConfig != null) { - getWorkflowConfigNode(workflowConfigNode, workflowConfig.getRecord()); - } - - return JSONRepresentation(workflowConfigNode); - } - - @POST - @Path("{workflowId}/configs") - public Response updateWorkflowConfig(@PathParam("clusterId") String clusterId, - @PathParam("workflowId") String workflowId, String content) { - ZNRecord record; - TaskDriver driver = getTaskDriver(clusterId); - - try { - record = toZNRecord(content); - - WorkflowConfig workflowConfig = driver.getWorkflowConfig(workflowId); - if (workflowConfig == null) { - return badRequest( - String.format("WorkflowConfig for workflow %s does not exists!", workflowId)); - } - - workflowConfig.getRecord().update(record); - driver.updateWorkflow(workflowId, workflowConfig); - } catch (HelixException e) { - return badRequest( - String.format("Failed to update WorkflowConfig for workflow %s", workflowId)); - } catch (Exception e) { - return badRequest(String.format("Invalid WorkflowConfig for workflow %s", workflowId)); - } - - return OK(); - } - - @GET - @Path("{workflowId}/context") - public Response getWorkflowContext(@PathParam("clusterId") String clusterId, - @PathParam("workflowId") String workflowId) { - TaskDriver taskDriver = getTaskDriver(clusterId); - WorkflowContext workflowContext = taskDriver.getWorkflowContext(workflowId); - ObjectNode workflowContextNode = JsonNodeFactory.instance.objectNode(); - if (workflowContext != null) { - getWorkflowContextNode(workflowContextNode, workflowContext.getRecord()); - } - - return JSONRepresentation(workflowContextNode); - } - - private void getWorkflowConfigNode(ObjectNode workflowConfigNode, ZNRecord record) { - for (Map.Entry<String, String> entry : record.getSimpleFields().entrySet()) { - if (!entry.getKey().equals(WorkflowConfig.WorkflowConfigProperty.Dag)) { - workflowConfigNode.put(entry.getKey(), JsonNodeFactory.instance.textNode(entry.getValue())); - } - } - } - - private void getWorkflowContextNode(ObjectNode workflowContextNode, ZNRecord record) { - if (record.getMapFields() != null) { - for (String fieldName : record.getMapFields().keySet()) { - JsonNode node = OBJECT_MAPPER.valueToTree(record.getMapField(fieldName)); - workflowContextNode.put(fieldName, node); - } - } - - if (record.getSimpleFields() != null) { - for (Map.Entry<String, String> entry : record.getSimpleFields().entrySet()) { - workflowContextNode - .put(entry.getKey(), JsonNodeFactory.instance.textNode(entry.getValue())); - } - } - } - - private Map<String, JobConfig.Builder> getJobConfigs(ArrayNode root) - throws HelixException, IOException { - Map<String, JobConfig.Builder> jobConfigsMap = new HashMap<>(); - for (Iterator<JsonNode> it = root.getElements(); it.hasNext(); ) { - JsonNode job = it.next(); - ZNRecord record = null; - - try { - record = toZNRecord(job.toString()); - } catch (IOException e) { - // Ignore the parse since it could be just simple fields - } - - if (record == null || record.getSimpleFields().isEmpty()) { - Map<String, String> cfgMap = OBJECT_MAPPER.readValue(job.toString(), - TypeFactory.defaultInstance() - .constructMapType(HashMap.class, String.class, String.class)); - jobConfigsMap - .put(job.get(Properties.id.name()).getTextValue(), JobAccessor.getJobConfig(cfgMap)); - } else { - jobConfigsMap - .put(job.get(Properties.id.name()).getTextValue(), JobAccessor.getJobConfig(record)); - } - } - - return jobConfigsMap; - } -}
