AMBARI-19256 : Asset support Rest API (Belliraj HB via nitirajrathore)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/47a98824 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/47a98824 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/47a98824 Branch: refs/heads/trunk Commit: 47a98824566ad3015b9c9655a53b34e171f43ae9 Parents: e900093 Author: Nitiraj Rathore <nitiraj.rath...@gmail.com> Authored: Fri Dec 23 16:11:12 2016 +0530 Committer: Nitiraj Rathore <nitiraj.rath...@gmail.com> Committed: Fri Dec 23 16:12:35 2016 +0530 ---------------------------------------------------------------------- .../ambari/view/AssetDefinitionRefType.java | 23 + .../org/apache/oozie/ambari/view/Constants.java | 25 + .../apache/oozie/ambari/view/EntityStatus.java | 23 + .../apache/oozie/ambari/view/OozieDelegate.java | 243 ++++ .../ambari/view/OozieProxyImpersonator.java | 1087 ++++++++---------- .../apache/oozie/ambari/view/OozieUtils.java | 226 ++-- .../ambari/view/ServiceFormattedException.java | 53 + .../oozie/ambari/view/WorkflowFilesService.java | 176 +-- .../ambari/view/assets/AssetDefinitionRepo.java | 29 + .../oozie/ambari/view/assets/AssetRepo.java | 37 +- .../oozie/ambari/view/assets/AssetResource.java | 197 ++++ .../oozie/ambari/view/assets/AssetService.java | 102 +- .../ambari/view/assets/model/ActionAsset.java | 112 +- .../assets/model/ActionAssetDefinition.java | 42 + .../view/assets/model/AssetDefintion.java | 69 ++ .../oozie/ambari/view/model/APIResult.java | 63 + .../oozie/ambari/view/model/BaseModel.java | 61 +- .../apache/oozie/ambari/view/model/Indexed.java | 24 + .../apache/oozie/ambari/view/model/Paging.java | 30 + .../apache/oozie/ambari/view/model/When.java | 28 + .../apache/oozie/ambari/view/repo/BaseRepo.java | 113 ++ .../workflowmanager/WorkflowManagerService.java | 94 +- .../WorkflowsManagerResource.java | 8 +- .../view/workflowmanager/WorkflowsRepo.java | 63 +- .../view/workflowmanager/model/Workflow.java | 133 +-- .../views/wfmanager/src/main/resources/view.xml | 17 + 26 files changed, 2006 insertions(+), 1072 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/AssetDefinitionRefType.java ---------------------------------------------------------------------- diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/AssetDefinitionRefType.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/AssetDefinitionRefType.java new file mode 100644 index 0000000..8c96504 --- /dev/null +++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/AssetDefinitionRefType.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.ambari.view; + +public enum AssetDefinitionRefType { + HDFS, + DB +} http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/Constants.java ---------------------------------------------------------------------- diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/Constants.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/Constants.java new file mode 100644 index 0000000..238b002 --- /dev/null +++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/Constants.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.ambari.view; + +public class Constants { + public static final String STATUS_FAILED = "failed"; + public static final String STATUS_OK = "ok"; + public static final String STATUS_KEY = "status"; + public static final String MESSAGE_KEY = "message"; +} http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/EntityStatus.java ---------------------------------------------------------------------- diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/EntityStatus.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/EntityStatus.java new file mode 100644 index 0000000..6447cf2 --- /dev/null +++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/EntityStatus.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.ambari.view; + +public enum EntityStatus { + DRAFT, + PUBLISHED +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieDelegate.java ---------------------------------------------------------------------- diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieDelegate.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieDelegate.java new file mode 100644 index 0000000..2779f05 --- /dev/null +++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieDelegate.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.ambari.view; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.ws.rs.HttpMethod; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.Response; + +import org.apache.ambari.view.ViewContext; +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OozieDelegate { + private final static Logger LOGGER = LoggerFactory + .getLogger(OozieDelegate.class); + private static final String OOZIEPARAM_PREFIX = "oozieparam."; + private static final int OOZIEPARAM_PREFIX_LENGTH = OOZIEPARAM_PREFIX + .length(); + private static final String EQUAL_SYMBOL = "="; + private static final String OOZIE_WF_RERUN_FAILNODES_CONF_KEY = "oozie.wf.rerun.failnodes"; + private static final String OOZIE_USE_SYSTEM_LIBPATH_CONF_KEY = "oozie.use.system.libpath"; + private static final String USER_NAME_HEADER = "user.name"; + private static final String USER_OOZIE_SUPER = "oozie"; + private static final String DO_AS_HEADER = "doAs"; + private static final String SERVICE_URI_PROP = "oozie.service.uri"; + private static final String DEFAULT_SERVICE_URI = "http://sandbox.hortonworks.com:11000/oozie"; + + private ViewContext viewContext; + + private OozieUtils oozieUtils = new OozieUtils(); + private final Utils utils = new Utils(); + private final AmbariIOUtil ambariIOUtil; + + public OozieDelegate(ViewContext viewContext) { + super(); + this.viewContext = viewContext; + this.ambariIOUtil = new AmbariIOUtil(viewContext); + } + + public String submitWorkflowJobToOozie(HttpHeaders headers, + String filePath, MultivaluedMap<String, String> queryParams, + JobType jobType) { + String nameNode = "hdfs://" + + viewContext.getCluster().getConfigurationValue("hdfs-site", + "dfs.namenode.rpc-address"); + + if (!queryParams.containsKey("config.nameNode")) { + ArrayList<String> nameNodes = new ArrayList<String>(); + LOGGER.info("Namenode===" + nameNode); + nameNodes.add(nameNode); + queryParams.put("config.nameNode", nameNodes); + } + + Map<String, String> workflowConigs = getWorkflowConfigs(filePath, + queryParams, jobType, nameNode); + String configXMl = oozieUtils.generateConfigXml(workflowConigs); + LOGGER.info("Config xml==" + configXMl); + HashMap<String, String> customHeaders = new HashMap<String, String>(); + customHeaders.put("Content-Type", "application/xml;charset=UTF-8"); + Response serviceResponse = consumeService(headers, getServiceUri() + + "/v2/jobs?" + getJobSumbitOozieParams(queryParams), + HttpMethod.POST, configXMl, customHeaders); + + LOGGER.info("Resp from oozie status entity==" + + serviceResponse.getEntity()); + if (serviceResponse.getEntity() instanceof String) { + return (String) serviceResponse.getEntity(); + } else { + return "success"; + } + } + + public Response consumeService(HttpHeaders headers, String path, + MultivaluedMap<String, String> queryParameters, String method, + String body) throws Exception { + return consumeService(headers, this.buildUri(path, queryParameters), + method, body, null); + } + + private Response consumeService(HttpHeaders headers, String urlToRead, + String method, String body, Map<String, String> customHeaders) { + Response response = null; + InputStream stream = readFromOozie(headers, urlToRead, method, body, + customHeaders); + String stringResponse = null; + try { + stringResponse = IOUtils.toString(stream); + } catch (IOException e) { + LOGGER.error("Error while converting stream to string", e); + throw new RuntimeException(e); + } + if (stringResponse.contains(Response.Status.BAD_REQUEST.name())) { + response = Response.status(Response.Status.BAD_REQUEST) + .entity(stringResponse).type(MediaType.TEXT_PLAIN).build(); + } else { + response = Response.status(Response.Status.OK) + .entity(stringResponse) + .type(utils.deduceType(stringResponse)).build(); + } + return response; + } + + public InputStream readFromOozie(HttpHeaders headers, String urlToRead, + String method, String body, Map<String, String> customHeaders) { + + Map<String, String> newHeaders = utils.getHeaders(headers); + newHeaders.put(USER_NAME_HEADER, USER_OOZIE_SUPER); + + newHeaders.put(DO_AS_HEADER, viewContext.getUsername()); + newHeaders.put("Accept", MediaType.APPLICATION_JSON); + if (customHeaders != null) { + newHeaders.putAll(customHeaders); + } + LOGGER.info(String.format("Proxy request for url: [%s] %s", method, + urlToRead)); + + return ambariIOUtil.readFromUrl(urlToRead, method, body, newHeaders); + } + + private Map<String, String> getWorkflowConfigs(String filePath, + MultivaluedMap<String, String> queryParams, JobType jobType, + String nameNode) { + HashMap<String, String> workflowConigs = new HashMap<String, String>(); + if (queryParams.containsKey("resourceManager") + && "useDefault".equals(queryParams.getFirst("resourceManager"))) { + String jobTrackerNode = viewContext.getCluster() + .getConfigurationValue("yarn-site", + "yarn.resourcemanager.address"); + LOGGER.info("jobTrackerNode===" + jobTrackerNode); + workflowConigs.put("resourceManager", jobTrackerNode); + workflowConigs.put("jobTracker", jobTrackerNode); + } + if (queryParams != null) { + for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) { + if (entry.getKey().startsWith("config.")) { + if (entry.getValue() != null && entry.getValue().size() > 0) { + workflowConigs.put(entry.getKey().substring(7), entry + .getValue().get(0)); + } + } + } + } + + if (queryParams.containsKey("oozieconfig.useSystemLibPath")) { + String useSystemLibPath = queryParams + .getFirst("oozieconfig.useSystemLibPath"); + workflowConigs.put(OOZIE_USE_SYSTEM_LIBPATH_CONF_KEY, + useSystemLibPath); + } else { + workflowConigs.put(OOZIE_USE_SYSTEM_LIBPATH_CONF_KEY, "true"); + } + if (queryParams.containsKey("oozieconfig.rerunOnFailure")) { + String rerunFailnodes = queryParams + .getFirst("oozieconfig.rerunOnFailure"); + workflowConigs.put(OOZIE_WF_RERUN_FAILNODES_CONF_KEY, + rerunFailnodes); + } else { + workflowConigs.put(OOZIE_WF_RERUN_FAILNODES_CONF_KEY, "true"); + } + workflowConigs.put("user.name", viewContext.getUsername()); + workflowConigs.put(oozieUtils.getJobPathPropertyKey(jobType), nameNode + + filePath); + return workflowConigs; + } + + private String getJobSumbitOozieParams( + MultivaluedMap<String, String> queryParams) { + StringBuilder query = new StringBuilder(); + if (queryParams != null) { + for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) { + if (entry.getKey().startsWith(OOZIEPARAM_PREFIX)) { + if (entry.getValue() != null && entry.getValue().size() > 0) { + for (String val : entry.getValue()) { + query.append( + entry.getKey().substring( + OOZIEPARAM_PREFIX_LENGTH)) + .append(EQUAL_SYMBOL).append(val) + .append("&"); + } + } + } + } + } + return query.toString(); + } + + private String getServiceUri() { + String serviceURI = viewContext.getProperties().get(SERVICE_URI_PROP) != null ? viewContext + .getProperties().get(SERVICE_URI_PROP) : DEFAULT_SERVICE_URI; + return serviceURI; + } + + private String buildUri(String absolutePath, + MultivaluedMap<String, String> queryParameters) { + int index = absolutePath.indexOf("proxy/") + 5; + absolutePath = absolutePath.substring(index); + String serviceURI = getServiceUri(); + serviceURI += absolutePath; + MultivaluedMap<String, String> params = addOrReplaceUserName(queryParameters); + return serviceURI + utils.convertParamsToUrl(params); + } + + private MultivaluedMap<String, String> addOrReplaceUserName( + MultivaluedMap<String, String> parameters) { + for (Map.Entry<String, List<String>> entry : parameters.entrySet()) { + if ("user.name".equals(entry.getKey())) { + ArrayList<String> vals = new ArrayList<String>(1); + vals.add(viewContext.getUsername()); + entry.setValue(vals); + } + } + return parameters; + } + + public String getDagUrl(String jobid) { + return getServiceUri() + "/v2/job/" + jobid + "?show=graph"; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieProxyImpersonator.java ---------------------------------------------------------------------- diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieProxyImpersonator.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieProxyImpersonator.java index 08a166d..9dd02d4 100644 --- a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieProxyImpersonator.java +++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieProxyImpersonator.java @@ -17,12 +17,14 @@ */ package org.apache.oozie.ambari.view; +import static org.apache.oozie.ambari.view.Constants.MESSAGE_KEY; +import static org.apache.oozie.ambari.view.Constants.STATUS_KEY; +import static org.apache.oozie.ambari.view.Constants.STATUS_OK; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import javax.inject.Inject; @@ -40,8 +42,8 @@ import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Context; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.StreamingOutput; import javax.ws.rs.core.UriInfo; @@ -50,6 +52,7 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.security.AccessControlException; +import org.apache.oozie.ambari.view.assets.AssetResource; import org.apache.oozie.ambari.view.workflowmanager.WorkflowManagerService; import org.apache.oozie.ambari.view.workflowmanager.WorkflowsManagerResource; import org.slf4j.Logger; @@ -63,639 +66,449 @@ import com.google.inject.Singleton; */ @Singleton public class OozieProxyImpersonator { - private final static Logger LOGGER = LoggerFactory - .getLogger(OozieProxyImpersonator.class); - - private static final String OOZIEPARAM_PREFIX = "oozieparam."; - private static final int OOZIEPARAM_PREFIX_LENGTH = OOZIEPARAM_PREFIX - .length(); - private static final String EQUAL_SYMBOL = "="; - private static final String OOZIE_WF_RERUN_FAILNODES_CONF_KEY = "oozie.wf.rerun.failnodes"; - private static final String OOZIE_USE_SYSTEM_LIBPATH_CONF_KEY = "oozie.use.system.libpath"; - - private ViewContext viewContext; - - private static final String USER_NAME_HEADER = "user.name"; - private static final String USER_OOZIE_SUPER = "oozie"; - private static final String DO_AS_HEADER = "doAs"; - - private static final String SERVICE_URI_PROP = "oozie.service.uri"; - private static final String DEFAULT_SERVICE_URI = "http://sandbox.hortonworks.com:11000/oozie"; - private Utils utils = new Utils(); - private AmbariIOUtil ambariIOUtil; - private OozieUtils oozieUtils = new OozieUtils(); - private HDFSFileUtils hdfsFileUtils; - private WorkflowFilesService workflowFilesService; - //private WorkflowManagerService workflowManagerService; - - private static enum ErrorCodes { - OOZIE_SUBMIT_ERROR("error.oozie.submit", "Oozie Submit error"), OOZIE_IO_ERROR( - "error.oozie.io", "Oozie I/O error"), FILE_ACCESS_ACL_ERROR( - "error.file.access.control", - "Access Error to file due to access control"), FILE_ACCESS_UNKNOWN_ERROR( - "error.file.access", "Error accessing file"), WORKFLOW_PATH_EXISTS( - "error.workflow.path.exists", "Worfklow path exists"); - private String errorCode; - private String description; - - ErrorCodes(String errorCode, String description) { - this.errorCode = errorCode; - this.description = description; - } - - public String getErrorCode() { - return errorCode; - } - - public String getDescription() { - return description; - } - } - - @Inject - public OozieProxyImpersonator(ViewContext viewContext) { - this.viewContext = viewContext; - hdfsFileUtils = new HDFSFileUtils(viewContext); - workflowFilesService = new WorkflowFilesService(hdfsFileUtils); - ambariIOUtil=new AmbariIOUtil(viewContext); - //workflowManagerService = new WorkflowManagerService(viewContext); - LOGGER.info(String.format( - "OozieProxyImpersonator initialized for instance: %s", - viewContext.getInstanceName())); - - } - - @Path("/fileServices") - public FileServices fileServices() { - return new FileServices(viewContext); - } - - @Path("/wfprojects") - public WorkflowsManagerResource workflowsManagerResource() { - return new WorkflowsManagerResource(viewContext); + private final static Logger LOGGER = LoggerFactory + .getLogger(OozieProxyImpersonator.class); + + private ViewContext viewContext; + private final Utils utils = new Utils(); + + + private final HDFSFileUtils hdfsFileUtils; + private final WorkflowFilesService workflowFilesService; + private WorkflowManagerService workflowManagerService; + private static final boolean PROJ_MANAGER_ENABLED = false; + private final OozieDelegate oozieDelegate; + private final OozieUtils oozieUtils = new OozieUtils(); + private final AssetResource assetResource; + private final AmbariIOUtil ambariIOUtil; + private static enum ErrorCodes { + OOZIE_SUBMIT_ERROR("error.oozie.submit", "Oozie Submit error"), OOZIE_IO_ERROR( + "error.oozie.io", "Oozie I/O error"), FILE_ACCESS_ACL_ERROR( + "error.file.access.control", + "Access Error to file due to access control"), FILE_ACCESS_UNKNOWN_ERROR( + "error.file.access", "Error accessing file"), WORKFLOW_PATH_EXISTS( + "error.workflow.path.exists", "Worfklow path exists"); + private String errorCode; + private String description; + + ErrorCodes(String errorCode, String description) { + this.errorCode = errorCode; + this.description = description; + } + + public String getErrorCode() { + return errorCode; + } + + public String getDescription() { + return description; + } + } + + @Inject + public OozieProxyImpersonator(ViewContext viewContext) { + this.viewContext = viewContext; + hdfsFileUtils = new HDFSFileUtils(viewContext); + workflowFilesService = new WorkflowFilesService(hdfsFileUtils); + this.oozieDelegate = new OozieDelegate(viewContext); + assetResource = new AssetResource(viewContext); + if (PROJ_MANAGER_ENABLED) { + workflowManagerService = new WorkflowManagerService(viewContext); + } + ambariIOUtil=new AmbariIOUtil(viewContext); + + LOGGER.info(String.format( + "OozieProxyImpersonator initialized for instance: %s", + viewContext.getInstanceName())); + + } + + @Path("/fileServices") + public FileServices fileServices() { + return new FileServices(viewContext); + } + + @Path("/wfprojects") + public WorkflowsManagerResource workflowsManagerResource() { + return new WorkflowsManagerResource(viewContext); + } + + @Path("/assets") + public AssetResource assetResource() { + return this.assetResource; + } + + @GET + @Path("/getCurrentUserName") + public Response getCurrentUserName() { + return Response.ok(viewContext.getUsername()).build(); + } + + @POST + @Path("/submitJob") + @Consumes({MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML}) + public Response submitJob(String postBody, @Context HttpHeaders headers, + @Context UriInfo ui, @QueryParam("app.path") String appPath, + @QueryParam("projectId") String projectId, + @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite, + @QueryParam("description") String description, + @QueryParam("jobType") String jobType) { + LOGGER.info("submit workflow job called"); + return submitJobInternal(postBody, headers, ui, appPath, overwrite, + JobType.valueOf(jobType), projectId, description); + } + + @POST + @Path("/saveWorkflow") + @Consumes({MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML}) + public Response saveWorkflow(String postBody, @Context HttpHeaders headers, + @Context UriInfo ui, @QueryParam("app.path") String appPath, @QueryParam("description") String description, + @QueryParam("projectId") String projectId, @QueryParam("jobType") String jobType, + @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite) { + LOGGER.info("save workflow called"); + if (StringUtils.isEmpty(appPath)) { + throw new RuntimeException("app path can't be empty."); + } + appPath = appPath.trim(); + if (!overwrite) { + boolean fileExists = hdfsFileUtils.fileExists(appPath); + if (fileExists) { + return getFileExistsResponse(); + } + } + postBody = utils.formatXml(postBody); + try { + String filePath = workflowFilesService.createWorkflowFile(appPath, + postBody, overwrite); + LOGGER.info(String.format( + "submit workflow job done. filePath=[%s]", filePath)); + if (PROJ_MANAGER_ENABLED) { + JobType deducedJobType = oozieUtils.deduceJobType(postBody); + String workflowName = oozieUtils.deduceWorkflowNameFromXml(postBody); + workflowManagerService.saveWorkflow(projectId, appPath, + deducedJobType, description, + viewContext.getUsername(), workflowName); + } + + return Response.ok().build(); + } catch (Exception ex) { + LOGGER.error(ex.getMessage(), ex); + return getRespCodeForException(ex); + + } + } + + @POST + @Path("/saveWorkflowDraft") + @Consumes({MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML}) + public Response saveDraft(String postBody, @Context HttpHeaders headers, + @Context UriInfo ui, @QueryParam("app.path") String appPath, + @QueryParam("projectId") String projectId, @QueryParam("description") String description, + @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite, @QueryParam("jobType") String jobTypeStr) + throws IOException { + LOGGER.info("save workflow called"); + if (StringUtils.isEmpty(appPath)) { + throw new RuntimeException("app path can't be empty."); + } + appPath = appPath.trim(); + workflowFilesService.saveDraft(appPath, postBody, overwrite); + if (PROJ_MANAGER_ENABLED) { + JobType jobType = StringUtils.isEmpty(jobTypeStr) ? JobType.WORKFLOW : JobType.valueOf(jobTypeStr); + String name = oozieUtils.deduceWorkflowNameFromJson(postBody); + workflowManagerService.saveWorkflow(projectId, appPath, + jobType, description, + viewContext.getUsername(), name); + } + return Response.ok().build(); + } + + @POST + @Path("/publishAsset") + @Consumes({MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML}) + public Response publishAsset(String postBody, @Context HttpHeaders headers, + @Context UriInfo ui, @QueryParam("uploadPath") String uploadPath, + @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite) { + LOGGER.info("publish asset called"); + if (StringUtils.isEmpty(uploadPath)) { + throw new RuntimeException("upload path can't be empty."); + } + uploadPath = uploadPath.trim(); + Map<String, String> validateAsset = assetResource.validateAsset(headers, postBody, + ui.getQueryParameters()); + if (!STATUS_OK.equals(validateAsset.get(STATUS_KEY))) { + return Response.status(Status.BAD_REQUEST).entity( + validateAsset.get(MESSAGE_KEY)).build(); + } + return saveAsset(postBody, uploadPath, overwrite); + } + + private Response saveAsset(String postBody, String uploadPath, + Boolean overwrite) { + uploadPath = workflowFilesService.getAssetFileName(uploadPath); + if (!overwrite) { + boolean fileExists = hdfsFileUtils.fileExists(uploadPath); + if (fileExists) { + return getFileExistsResponse(); + } + } + postBody = utils.formatXml(postBody); + try { + String filePath = workflowFilesService.createAssetFile(uploadPath, + postBody, overwrite); + LOGGER.info(String.format("publish asset job done. filePath=[%s]", + filePath)); + return Response.ok().build(); + } catch (Exception ex) { + LOGGER.error(ex.getMessage(), ex); + return getRespCodeForException(ex); + } + } + + + @GET + @Path("/readWorkflowDraft") + public Response readDraft(@QueryParam("workflowXmlPath") String workflowPath) { + if (StringUtils.isEmpty(workflowPath)) { + throw new RuntimeException("workflowXmlPath can't be empty."); + } + try { + final InputStream is = workflowFilesService.readDraft(workflowPath); + StreamingOutput streamer = new StreamingOutput() { + @Override + public void write(OutputStream os) throws IOException, + WebApplicationException { + IOUtils.copy(is, os); + is.close(); + os.close(); + } + }; + return Response.ok(streamer).status(200).build(); + } catch (IOException e) { + return getRespCodeForException(e); + } + } + + @POST + @Path("/discardWorkflowDraft") + public Response discardDraft( + @QueryParam("workflowXmlPath") String workflowPath) + throws IOException { + workflowFilesService.discardDraft(workflowPath); + return Response.ok().build(); + } + + private Response submitJobInternal(String postBody, HttpHeaders headers, + UriInfo ui, String appPath, Boolean overwrite, JobType jobType, + String projectId, String description) { + if (StringUtils.isEmpty(appPath)) { + throw new RuntimeException("app path can't be empty."); + } + appPath = appPath.trim(); + if (!overwrite) { + boolean fileExists = hdfsFileUtils.fileExists(appPath); + if (fileExists) { + return getFileExistsResponse(); + } + } + postBody = utils.formatXml(postBody); + try { + String filePath = hdfsFileUtils.writeToFile(appPath, postBody, + overwrite); + LOGGER.info(String.format( + "submit workflow job done. filePath=[%s]", filePath)); + } catch (Exception ex) { + LOGGER.error(ex.getMessage(), ex); + return getRespCodeForException(ex); + + } + if (PROJ_MANAGER_ENABLED) { + String name = oozieUtils.deduceWorkflowNameFromXml(postBody); + workflowManagerService.saveWorkflow(projectId, appPath, jobType, + "todo description", viewContext.getUsername(), name); + } + + String response = oozieDelegate.submitWorkflowJobToOozie(headers, + appPath, ui.getQueryParameters(), jobType); + if (response != null && response.trim().startsWith("{")) { + // dealing with oozie giving error but with 200 response. + return Response.status(Response.Status.OK).entity(response).build(); + } else { + HashMap<String, String> resp = new HashMap<String, String>(); + resp.put("status", ErrorCodes.OOZIE_SUBMIT_ERROR.getErrorCode()); + resp.put("message", response); + return Response.status(Response.Status.BAD_REQUEST).entity(resp) + .build(); + } + + } + + private Response getRespCodeForException(Exception ex) { + if (ex instanceof AccessControlException) { + HashMap<String, String> errorDetails = getErrorDetails( + ErrorCodes.FILE_ACCESS_ACL_ERROR.getErrorCode(), + ErrorCodes.FILE_ACCESS_ACL_ERROR.getDescription(), ex); + return Response.status(Response.Status.BAD_REQUEST) + .entity(errorDetails).build(); + } else if (ex instanceof IOException) { + HashMap<String, String> errorDetails = getErrorDetails( + ErrorCodes.FILE_ACCESS_UNKNOWN_ERROR.getErrorCode(), + ErrorCodes.FILE_ACCESS_UNKNOWN_ERROR.getDescription(), ex); + return Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(errorDetails).build(); + } else { + HashMap<String, String> errorDetails = getErrorDetails( + ErrorCodes.FILE_ACCESS_UNKNOWN_ERROR.getErrorCode(), + ErrorCodes.FILE_ACCESS_UNKNOWN_ERROR.getDescription(), ex); + return Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(errorDetails).build(); + } + + } + + private Response getFileExistsResponse() { + HashMap<String, String> resp = new HashMap<String, String>(); + resp.put("status", ErrorCodes.WORKFLOW_PATH_EXISTS.getErrorCode()); + resp.put("message", ErrorCodes.WORKFLOW_PATH_EXISTS.getDescription()); + return Response.status(Response.Status.BAD_REQUEST).entity(resp) + .build(); + } + + @GET + @Path("/readWorkflowDetail") + public Response isDraftAvailable( + @QueryParam("workflowXmlPath") String workflowPath) { + WorkflowFileInfo workflowDetails = workflowFilesService + .getWorkflowDetails(workflowPath); + return Response.ok(workflowDetails).build(); + } + + @GET + @Path("/readWorkflowXml") + public Response readWorkflowXxml( + @QueryParam("workflowXmlPath") String workflowPath) { + if (StringUtils.isEmpty(workflowPath)) { + throw new RuntimeException("workflowXmlPath can't be empty."); + } + try { + final InputStream is = workflowFilesService + .readWorkflowXml(workflowPath); + StreamingOutput streamer = new StreamingOutput() { + @Override + public void write(OutputStream os) throws IOException, + WebApplicationException { + IOUtils.copy(is, os); + is.close(); + os.close(); + } + }; + return Response.ok(streamer).status(200).build(); + } catch (IOException e) { + return getRespCodeForException(e); + } + } + + private HashMap<String, String> getErrorDetails(String status, + String message, Exception ex) { + HashMap<String, String> resp = new HashMap<String, String>(); + resp.put("status", status); + if (message != null) { + resp.put("message", message); + } + if (ex != null) { + resp.put("stackTrace", ExceptionUtils.getFullStackTrace(ex)); + } + return resp; + } + + @GET + @Path("/getDag") + @Produces("image/png") + public Response getDag(@Context HttpHeaders headers, + @Context UriInfo ui, @QueryParam("jobid") String jobid) { + Map<String, String> newHeaders = utils.getHeaders(headers); + final InputStream is = oozieDelegate.readFromOozie(headers, + oozieDelegate.getDagUrl(jobid), HttpMethod.GET, null, + newHeaders); + StreamingOutput streamer = new StreamingOutput() { + @Override + public void write(OutputStream os) throws IOException, + WebApplicationException { + IOUtils.copy(is, os); + is.close(); + os.close(); + } + + }; + return Response.ok(streamer).status(200).build(); + } + + @GET + @Path("/{path: .*}") + public Response handleGet(@Context HttpHeaders headers, @Context UriInfo ui) { + try { + return oozieDelegate.consumeService(headers, ui.getAbsolutePath() + .getPath(), ui.getQueryParameters(), HttpMethod.GET, null); + } catch (Exception ex) { + LOGGER.error("Error in GET proxy", ex); + return Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(getErrorDetailsForException("Oozie", ex)).build(); + } + } + + @POST + @Path("/{path: .*}") + public Response handlePost(String xml, @Context HttpHeaders headers, + @Context UriInfo ui) { + try { + + return oozieDelegate.consumeService(headers, ui.getAbsolutePath() + .getPath(), ui.getQueryParameters(), HttpMethod.POST, xml); + } catch (Exception ex) { + LOGGER.error("Error in POST proxy", ex); + return Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(getErrorDetailsForException("Oozie", ex)).build(); + } + } + + @DELETE + @Path("/{path: .*}") + public Response handleDelete(@Context HttpHeaders headers, + @Context UriInfo ui) { + try { + return oozieDelegate.consumeService(headers, ui.getAbsolutePath() + .getPath(), ui.getQueryParameters(), HttpMethod.POST, null); + } catch (Exception ex) { + LOGGER.error("Error in DELETE proxy", ex); + return Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(getErrorDetailsForException("Oozie", ex)).build(); + } + } + + @PUT + @Path("/{path: .*}") + public Response handlePut(String body, @Context HttpHeaders headers, + @Context UriInfo ui) { + try { + return oozieDelegate.consumeService(headers, ui.getAbsolutePath() + .getPath(), ui.getQueryParameters(), HttpMethod.PUT, body); + } catch (Exception ex) { + LOGGER.error("Error in PUT proxy", ex); + return Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(getErrorDetailsForException("Oozie", ex)).build(); + } + } + + private Map<String, String> getErrorDetailsForException(String component, + Exception ex) { + String errorCode = component + "exception"; + String errorMessage = component + " Exception"; + if (ex instanceof RuntimeException) { + Throwable cause = ex.getCause(); + if (cause instanceof IOException) { + errorCode = component + "io.exception"; + errorMessage = component + "IO Exception"; + } } - - @GET - @Path("/getCurrentUserName") - public Response getCurrentUserName() { - return Response.ok(viewContext.getUsername()).build(); - } - - @POST - @Path("/submitJob") - @Consumes({ MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML }) - public Response submitJob(String postBody, @Context HttpHeaders headers, - @Context UriInfo ui, @QueryParam("app.path") String appPath, - @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite, - @QueryParam("jobType") String jobType) { - LOGGER.info("submit workflow job called"); - return submitJobInternal(postBody, headers, ui, appPath, overwrite, - JobType.valueOf(jobType)); - } - - @POST - @Path("/submitWorkflow") - @Consumes({ MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML }) - public Response submitWorkflow(String postBody, - @Context HttpHeaders headers, @Context UriInfo ui, - @QueryParam("app.path") String appPath, - @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite) { - LOGGER.info("submit workflow job called"); - return submitJobInternal(postBody, headers, ui, appPath, overwrite, - JobType.WORKFLOW); - } - - @POST - @Path("/saveWorkflow") - @Consumes({ MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML }) - public Response saveWorkflow(String postBody, @Context HttpHeaders headers, - @Context UriInfo ui, @QueryParam("app.path") String appPath, - @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite) { - LOGGER.info("save workflow called"); - if (StringUtils.isEmpty(appPath)) { - throw new RuntimeException("app path can't be empty."); - } - appPath = appPath.trim(); - if (!overwrite) { - boolean fileExists = hdfsFileUtils.fileExists(appPath); - if (fileExists) { - return getFileExistsResponse(); - } - } - postBody = utils.formatXml(postBody); - try { - String filePath = workflowFilesService.createWorkflowFile(appPath, - postBody, overwrite); - LOGGER.info(String.format( - "submit workflow job done. filePath=[%s]", filePath)); - /* workflowManagerService.saveWorkflow(appPath, JobType.WORKFLOW, - "todo description", viewContext.getUsername());*/ - return Response.ok().build(); - } catch (Exception ex) { - LOGGER.error(ex.getMessage(), ex); - return getRespCodeForException(ex); - - } - } - - @POST - @Path("/publishAsset") - @Consumes({ MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML }) - public Response publishAsset(String postBody, @Context HttpHeaders headers, - @Context UriInfo ui, @QueryParam("uploadPath") String uploadPath, - @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite) { - LOGGER.info("publish asset called"); - if (StringUtils.isEmpty(uploadPath)) { - throw new RuntimeException("upload path can't be empty."); - } - uploadPath = uploadPath.trim(); - Response dryRunResponse = validateAsset(headers, postBody, ui.getQueryParameters()); - if (dryRunResponse.getStatus() == 200) { - return saveAsset(postBody, uploadPath, overwrite); - } - return dryRunResponse; - } - - private Response validateAsset(HttpHeaders headers, String postBody, MultivaluedMap<String, String> queryParams) { - String workflowXml = oozieUtils.generateWorkflowXml(postBody); - try { - String tempWfPath = "/user/"+viewContext.getUsername()+"/tmpooziewfs/tempwf.xml"; - hdfsFileUtils.writeToFile(tempWfPath, workflowXml, true); - queryParams.put("oozieparam.action",getAsList("dryrun")); - queryParams.put("oozieconfig.rerunOnFailure",getAsList("false")); - queryParams.put("oozieconfig.useSystemLibPath",getAsList("true")); - queryParams.put("resourceManager",getAsList("useDefault")); - String dryRunResp = submitWorkflowJobToOozie(headers,tempWfPath,queryParams,JobType.WORKFLOW); - LOGGER.info(String.format("resp from validating asset=[%s]",dryRunResp)); - if (dryRunResp != null && dryRunResp.trim().startsWith("{")) { - return Response.status(Response.Status.OK).entity(dryRunResp).build(); - } else { - HashMap<String, String> resp = new HashMap<String, String>(); - resp.put("status", ErrorCodes.OOZIE_SUBMIT_ERROR.getErrorCode()); - resp.put("message", dryRunResp); - //resp.put("stackTrace", dryRunResp); - return Response.status(Response.Status.BAD_REQUEST).entity(resp).build(); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private List<String> getAsList(String string) { - ArrayList<String> li=new ArrayList<>(1); - li.add(string); - return li; - } - - private Response saveAsset(String postBody, String uploadPath, - Boolean overwrite) { - uploadPath = workflowFilesService.getAssetFileName(uploadPath); - if (!overwrite) { - boolean fileExists = hdfsFileUtils.fileExists(uploadPath); - if (fileExists) { - return getFileExistsResponse(); - } - } - postBody = utils.formatXml(postBody); - try { - String filePath = workflowFilesService.createAssetFile(uploadPath, - postBody, overwrite); - LOGGER.info(String.format( - "publish asset job done. filePath=[%s]", filePath)); - return Response.ok().build(); - } catch (Exception ex) { - LOGGER.error(ex.getMessage(), ex); - return getRespCodeForException(ex); - } - } - - @POST - @Path("/saveWorkflowDraft") - @Consumes({ MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML }) - public Response saveDraft(String postBody, @Context HttpHeaders headers, - @Context UriInfo ui, @QueryParam("app.path") String appPath, - @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite) - throws IOException { - LOGGER.info("save workflow called"); - if (StringUtils.isEmpty(appPath)) { - throw new RuntimeException("app path can't be empty."); - } - appPath = appPath.trim(); - workflowFilesService.saveDraft(appPath, postBody, overwrite); - /* workflowManagerService.saveWorkflow(appPath, JobType.WORKFLOW, - "todo description", viewContext.getUsername());*/ - return Response.ok().build(); - } - - @GET - @Path("/readWorkflowDraft") - public Response readDraft(@QueryParam("workflowXmlPath") String workflowPath) { - if (StringUtils.isEmpty(workflowPath)) { - throw new RuntimeException("workflowXmlPath can't be empty."); - } - try { - final InputStream is = workflowFilesService.readDraft(workflowPath); - StreamingOutput streamer = new StreamingOutput() { - @Override - public void write(OutputStream os) throws IOException, - WebApplicationException { - IOUtils.copy(is, os); - is.close(); - os.close(); - } - }; - return Response.ok(streamer).status(200).build(); - } catch (IOException e) { - return getRespCodeForException(e); - } - } - - @POST - @Path("/discardWorkflowDraft") - public Response discardDraft(@QueryParam("workflowXmlPath") String workflowPath) throws IOException{ - workflowFilesService.discardDraft(workflowPath); - return Response.ok().build(); - } - - private Response submitJobInternal(String postBody, HttpHeaders headers, - UriInfo ui, String appPath, Boolean overwrite, JobType jobType) { - if (StringUtils.isEmpty(appPath)) { - throw new RuntimeException("app path can't be empty."); - } - appPath = appPath.trim(); - if (!overwrite) { - boolean fileExists = hdfsFileUtils.fileExists(appPath); - if (fileExists) { - return getFileExistsResponse(); - } - } - postBody = utils.formatXml(postBody); - try { - String filePath = hdfsFileUtils.writeToFile(appPath, postBody, - overwrite); - LOGGER.info(String.format( - "submit workflow job done. filePath=[%s]", filePath)); - } catch (Exception ex) { - LOGGER.error(ex.getMessage(), ex); - return getRespCodeForException(ex); - - } - /* workflowManagerService.saveWorkflow(appPath, jobType, - "todo description", viewContext.getUsername());*/ - String response = submitWorkflowJobToOozie(headers, appPath, - ui.getQueryParameters(), jobType); - if (response != null && response.trim().startsWith("{")) { - // dealing with oozie giving error but with 200 response. - return Response.status(Response.Status.OK).entity(response).build(); - } else { - HashMap<String, String> resp = new HashMap<String, String>(); - resp.put("status", ErrorCodes.OOZIE_SUBMIT_ERROR.getErrorCode()); - resp.put("message", response); - return Response.status(Response.Status.BAD_REQUEST).entity(resp) - .build(); - } - - } - - private Response getRespCodeForException(Exception ex) { - if (ex instanceof AccessControlException) { - HashMap<String, String> errorDetails = getErrorDetails( - ErrorCodes.FILE_ACCESS_ACL_ERROR.getErrorCode(), - ErrorCodes.FILE_ACCESS_ACL_ERROR.getDescription(), ex); - return Response.status(Response.Status.BAD_REQUEST) - .entity(errorDetails).build(); - } else if (ex instanceof IOException) { - HashMap<String, String> errorDetails = getErrorDetails( - ErrorCodes.FILE_ACCESS_UNKNOWN_ERROR.getErrorCode(), - ErrorCodes.FILE_ACCESS_UNKNOWN_ERROR.getDescription(), ex); - return Response.status(Response.Status.INTERNAL_SERVER_ERROR) - .entity(errorDetails).build(); - } else { - HashMap<String, String> errorDetails = getErrorDetails( - ErrorCodes.FILE_ACCESS_UNKNOWN_ERROR.getErrorCode(), - ErrorCodes.FILE_ACCESS_UNKNOWN_ERROR.getDescription(), ex); - return Response.status(Response.Status.INTERNAL_SERVER_ERROR) - .entity(errorDetails).build(); - } - - } - - private Response getFileExistsResponse() { - HashMap<String, String> resp = new HashMap<String, String>(); - resp.put("status", ErrorCodes.WORKFLOW_PATH_EXISTS.getErrorCode()); - resp.put("message", ErrorCodes.WORKFLOW_PATH_EXISTS.getDescription()); - return Response.status(Response.Status.BAD_REQUEST).entity(resp) - .build(); - } - - @GET - @Path("/readWorkflowDetail") - public Response isDraftAvailable(@QueryParam("workflowXmlPath") String workflowPath){ - WorkflowFileInfo workflowDetails = workflowFilesService.getWorkflowDetails(workflowPath); - return Response.ok(workflowDetails).build(); - } - - @GET - @Path("/readWorkflowXml") - public Response readWorkflowXxml( - @QueryParam("workflowXmlPath") String workflowPath) { - if (StringUtils.isEmpty(workflowPath)) { - throw new RuntimeException("workflowXmlPath can't be empty."); - } - try { - final InputStream is = workflowFilesService.readWorkflowXml(workflowPath); - StreamingOutput streamer = new StreamingOutput() { - @Override - public void write(OutputStream os) throws IOException, - WebApplicationException { - IOUtils.copy(is, os); - is.close(); - os.close(); - } - }; - return Response.ok(streamer).status(200).build(); - } catch (IOException e) { - return getRespCodeForException(e); - } - } - - private HashMap<String, String> getErrorDetails(String status, - String message, Exception ex) { - HashMap<String, String> resp = new HashMap<String, String>(); - resp.put("status", status); - if (message != null) { - resp.put("message", message); - } - if (ex != null) { - resp.put("stackTrace", ExceptionUtils.getFullStackTrace(ex)); - } - return resp; - } - - @GET - @Path("/getDag") - @Produces("image/png") - public Response submitWorkflow(@Context HttpHeaders headers, - @Context UriInfo ui, @QueryParam("jobid") String jobid) { - String imgUrl = getServiceUri() + "/v2/job/" + jobid + "?show=graph"; - Map<String, String> newHeaders = utils.getHeaders(headers); - final InputStream is = readFromOozie(headers, imgUrl, HttpMethod.GET, - null, newHeaders); - StreamingOutput streamer = new StreamingOutput() { - - @Override - public void write(OutputStream os) throws IOException, - WebApplicationException { - IOUtils.copy(is, os); - is.close(); - os.close(); - } - - }; - return Response.ok(streamer).status(200).build(); - } - - @GET - @Path("/{path: .*}") - public Response handleGet(@Context HttpHeaders headers, @Context UriInfo ui) { - try { - String serviceURI = buildURI(ui); - return consumeService(headers, serviceURI, HttpMethod.GET, null); - } catch (Exception ex) { - LOGGER.error("Error in GET proxy", ex); - return Response.status(Response.Status.INTERNAL_SERVER_ERROR) - .entity(getErrorDetailsForException("Oozie", ex)).build(); - } - } - - @POST - @Path("/{path: .*}") - public Response handlePost(String xml, @Context HttpHeaders headers, - @Context UriInfo ui) { - try { - String serviceURI = buildURI(ui); - return consumeService(headers, serviceURI, HttpMethod.POST, xml); - } catch (Exception ex) { - LOGGER.error("Error in POST proxy", ex); - return Response.status(Response.Status.INTERNAL_SERVER_ERROR) - .entity(getErrorDetailsForException("Oozie", ex)).build(); - } - } - - @DELETE - @Path("/{path: .*}") - public Response handleDelete(@Context HttpHeaders headers, - @Context UriInfo ui) { - try { - String serviceURI = buildURI(ui); - return consumeService(headers, serviceURI, HttpMethod.POST, null); - } catch (Exception ex) { - LOGGER.error("Error in DELETE proxy", ex); - return Response.status(Response.Status.INTERNAL_SERVER_ERROR) - .entity(getErrorDetailsForException("Oozie", ex)).build(); - } - } - - @PUT - @Path("/{path: .*}") - public Response handlePut(String body, @Context HttpHeaders headers, - @Context UriInfo ui) { - try { - String serviceURI = buildURI(ui); - return consumeService(headers, serviceURI, HttpMethod.PUT, body); - } catch (Exception ex) { - LOGGER.error("Error in PUT proxy", ex); - return Response.status(Response.Status.INTERNAL_SERVER_ERROR) - .entity(getErrorDetailsForException("Oozie", ex)).build(); - } - } - - private Map<String, String> getErrorDetailsForException(String component, - Exception ex) { - String errorCode = component + "exception"; - String errorMessage = component + " Exception"; - if (ex instanceof RuntimeException) { - Throwable cause = ex.getCause(); - if (cause instanceof IOException) { - errorCode = component + "io.exception"; - errorMessage = component + "IO Exception"; - } - } - return getErrorDetails(errorCode, errorMessage, ex); - } - - private String submitWorkflowJobToOozie(HttpHeaders headers, - String filePath, MultivaluedMap<String, String> queryParams, - JobType jobType) { - String nameNode = "hdfs://" - + viewContext.getCluster().getConfigurationValue("hdfs-site", - "dfs.namenode.rpc-address"); - - if (!queryParams.containsKey("config.nameNode")) { - ArrayList<String> nameNodes = new ArrayList<String>(); - LOGGER.info("Namenode===" + nameNode); - nameNodes.add(nameNode); - queryParams.put("config.nameNode", nameNodes); - } - - Map<String, String> workflowConigs = getWorkflowConfigs(filePath, - queryParams, jobType, nameNode); - String configXMl = oozieUtils.generateConfigXml(workflowConigs); - LOGGER.info("Config xml==" + configXMl); - HashMap<String, String> customHeaders = new HashMap<String, String>(); - customHeaders.put("Content-Type", "application/xml;charset=UTF-8"); - Response serviceResponse = consumeService(headers, getServiceUri() - + "/v2/jobs?" + getJobSumbitOozieParams(queryParams), - HttpMethod.POST, configXMl, customHeaders); - - LOGGER.info("Resp from oozie status entity==" - + serviceResponse.getEntity()); - if (serviceResponse.getEntity() instanceof String) { - return (String) serviceResponse.getEntity(); - } else { - return "success"; - } - - } - - private Map<String, String> getWorkflowConfigs(String filePath, - MultivaluedMap<String, String> queryParams, JobType jobType, - String nameNode) { - HashMap<String, String> workflowConigs = new HashMap<String, String>(); - if (queryParams.containsKey("resourceManager") - && "useDefault".equals(queryParams.getFirst("resourceManager"))) { - String jobTrackerNode = viewContext.getCluster() - .getConfigurationValue("yarn-site", - "yarn.resourcemanager.address"); - LOGGER.info("jobTrackerNode===" + jobTrackerNode); - workflowConigs.put("resourceManager", jobTrackerNode); - workflowConigs.put("jobTracker", jobTrackerNode); - } - if (queryParams != null) { - for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) { - if (entry.getKey().startsWith("config.")) { - if (entry.getValue() != null && entry.getValue().size() > 0) { - workflowConigs.put(entry.getKey().substring(7), entry - .getValue().get(0)); - } - } - } - } - - if (queryParams.containsKey("oozieconfig.useSystemLibPath")) { - String useSystemLibPath = queryParams - .getFirst("oozieconfig.useSystemLibPath"); - workflowConigs.put(OOZIE_USE_SYSTEM_LIBPATH_CONF_KEY, - useSystemLibPath); - } else { - workflowConigs.put(OOZIE_USE_SYSTEM_LIBPATH_CONF_KEY, "true"); - } - if (queryParams.containsKey("oozieconfig.rerunOnFailure")) { - String rerunFailnodes = queryParams - .getFirst("oozieconfig.rerunOnFailure"); - workflowConigs.put(OOZIE_WF_RERUN_FAILNODES_CONF_KEY, - rerunFailnodes); - } else { - workflowConigs.put(OOZIE_WF_RERUN_FAILNODES_CONF_KEY, "true"); - } - workflowConigs.put("user.name", viewContext.getUsername()); - workflowConigs.put(oozieUtils.getJobPathPropertyKey(jobType), nameNode - + filePath); - return workflowConigs; - } - - private String getJobSumbitOozieParams( - MultivaluedMap<String, String> queryParams) { - StringBuilder query = new StringBuilder(); - if (queryParams != null) { - for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) { - if (entry.getKey().startsWith(OOZIEPARAM_PREFIX)) { - if (entry.getValue() != null && entry.getValue().size() > 0) { - for (String val : entry.getValue()) { - query.append( - entry.getKey().substring( - OOZIEPARAM_PREFIX_LENGTH)) - .append(EQUAL_SYMBOL).append(val) - .append("&"); - } - } - } - } - } - return query.toString(); - } - - private String buildURI(UriInfo ui) { - String uiURI = ui.getAbsolutePath().getPath(); - int index = uiURI.indexOf("proxy/") + 5; - uiURI = uiURI.substring(index); - String serviceURI = getServiceUri(); - serviceURI += uiURI; - MultivaluedMap<String, String> params = addOrReplaceUserName(ui - .getQueryParameters()); - return serviceURI + utils.convertParamsToUrl(params); - } - - private MultivaluedMap<String, String> addOrReplaceUserName( - MultivaluedMap<String, String> parameters) { - for (Map.Entry<String, List<String>> entry : parameters.entrySet()) { - if ("user.name".equals(entry.getKey())) { - ArrayList<String> vals = new ArrayList<String>(1); - vals.add(viewContext.getUsername()); - entry.setValue(vals); - } - } - return parameters; - } - - private String getServiceUri() { - String serviceURI = viewContext.getProperties().get(SERVICE_URI_PROP) != null ? viewContext - .getProperties().get(SERVICE_URI_PROP) : DEFAULT_SERVICE_URI; - return serviceURI; - } - - private Response consumeService(HttpHeaders headers, String urlToRead, - String method, String body) throws Exception { - return consumeService(headers, urlToRead, method, body, null); - } - - private Response consumeService(HttpHeaders headers, String urlToRead, - String method, String body, Map<String, String> customHeaders) { - Response response = null; - InputStream stream = readFromOozie(headers, urlToRead, method, body, - customHeaders); - String stringResponse = null; - try { - stringResponse = IOUtils.toString(stream); - } catch (IOException e) { - LOGGER.error("Error while converting stream to string", e); - throw new RuntimeException(e); - } - if (stringResponse.contains(Response.Status.BAD_REQUEST.name())) { - response = Response.status(Response.Status.BAD_REQUEST) - .entity(stringResponse).type(MediaType.TEXT_PLAIN).build(); - } else { - response = Response.status(Response.Status.OK) - .entity(stringResponse) - .type(utils.deduceType(stringResponse)).build(); - } - return response; - } - - private InputStream readFromOozie(HttpHeaders headers, String urlToRead, - String method, String body, Map<String, String> customHeaders) { - - Map<String, String> newHeaders = utils.getHeaders(headers); - newHeaders.put(USER_NAME_HEADER, USER_OOZIE_SUPER); - - newHeaders.put(DO_AS_HEADER, viewContext.getUsername()); - newHeaders.put("Accept", MediaType.APPLICATION_JSON); - if (customHeaders != null) { - newHeaders.putAll(customHeaders); - } - LOGGER.info(String.format("Proxy request for url: [%s] %s", method, - urlToRead)); - - return ambariIOUtil.readFromUrl(urlToRead, method, body, newHeaders); - } + return getErrorDetails(errorCode, errorMessage, ex); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieUtils.java ---------------------------------------------------------------------- diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieUtils.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieUtils.java index f002102..9791c47 100644 --- a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieUtils.java +++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieUtils.java @@ -32,92 +32,144 @@ import org.w3c.dom.Element; import org.xml.sax.InputSource; import org.xml.sax.SAXException; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; + public class OozieUtils { - private final static Logger LOGGER = LoggerFactory - .getLogger(OozieUtils.class); - private Utils utils = new Utils(); - - public String generateConfigXml(Map<String, String> map) { - DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); - DocumentBuilder db; - try { - db = dbf.newDocumentBuilder(); - Document doc = db.newDocument(); - Element configElement = doc.createElement("configuration"); - doc.appendChild(configElement); - for (Map.Entry<String, String> entry : map.entrySet()) { - Element propElement = doc.createElement("property"); - configElement.appendChild(propElement); - Element nameElem = doc.createElement("name"); - nameElem.setTextContent(entry.getKey()); - Element valueElem = doc.createElement("value"); - valueElem.setTextContent(entry.getValue()); - propElement.appendChild(nameElem); - propElement.appendChild(valueElem); - } - return utils.generateXml(doc); - } catch (ParserConfigurationException e) { - LOGGER.error("error in generating config xml", e); - throw new RuntimeException(e); - } - } - public String getJobPathPropertyKey(JobType jobType) { - switch (jobType) { - case WORKFLOW: - return "oozie.wf.application.path"; - case COORDINATOR: - return "oozie.coord.application.path"; - case BUNDLE: - return "oozie.bundle.application.path"; - } - throw new RuntimeException("Unknown Job Type"); - } - - public String generateWorkflowXml(String actionNodeXml) { - DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); - DocumentBuilder db; - try { - db = dbf.newDocumentBuilder(); - Document doc = db.newDocument(); - - Element workflowElement = doc.createElement("workflow-app"); - workflowElement.setAttribute("name", "testWorkflow"); - workflowElement.setAttribute("xmlns", "uri:oozie:workflow:0.5"); - doc.appendChild(workflowElement); - - Element startElement = doc.createElement("start"); - startElement.setAttribute("to", "testAction"); - workflowElement.appendChild(startElement); - - Element actionElement = doc.createElement("action"); - actionElement.setAttribute("name", "testAction"); - Element actionSettingsElement = db.parse(new InputSource(new StringReader(actionNodeXml))).getDocumentElement(); - actionElement.appendChild(doc.importNode(actionSettingsElement, true)); - workflowElement.appendChild(actionElement); - - Element actionOkTransitionElement = doc.createElement("ok"); - actionOkTransitionElement.setAttribute("to", "end"); - actionElement.appendChild(actionOkTransitionElement); - - Element actionErrorTransitionElement = doc.createElement("error"); - actionErrorTransitionElement.setAttribute("to", "kill"); - actionElement.appendChild(actionErrorTransitionElement); - - Element killElement = doc.createElement("kill"); - killElement.setAttribute("name", "kill"); - Element killMessageElement = doc.createElement("message"); - killMessageElement.setTextContent("Kill node message"); - killElement.appendChild(killMessageElement); - workflowElement.appendChild(killElement); - - Element endElement = doc.createElement("end"); - endElement.setAttribute("name", "end"); - workflowElement.appendChild(endElement); - - return utils.generateXml(doc); - } catch (ParserConfigurationException | SAXException | IOException e) { - LOGGER.error("error in generating workflow xml", e); - throw new RuntimeException(e); - } - } + private final static Logger LOGGER = LoggerFactory + .getLogger(OozieUtils.class); + private Utils utils = new Utils(); + + public String generateConfigXml(Map<String, String> map) { + DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); + DocumentBuilder db; + try { + db = dbf.newDocumentBuilder(); + Document doc = db.newDocument(); + Element configElement = doc.createElement("configuration"); + doc.appendChild(configElement); + for (Map.Entry<String, String> entry : map.entrySet()) { + Element propElement = doc.createElement("property"); + configElement.appendChild(propElement); + Element nameElem = doc.createElement("name"); + nameElem.setTextContent(entry.getKey()); + Element valueElem = doc.createElement("value"); + valueElem.setTextContent(entry.getValue()); + propElement.appendChild(nameElem); + propElement.appendChild(valueElem); + } + return utils.generateXml(doc); + } catch (ParserConfigurationException e) { + LOGGER.error("error in generating config xml", e); + throw new RuntimeException(e); + } + } + + public String getJobPathPropertyKey(JobType jobType) { + switch (jobType) { + case WORKFLOW: + return "oozie.wf.application.path"; + case COORDINATOR: + return "oozie.coord.application.path"; + case BUNDLE: + return "oozie.bundle.application.path"; + } + throw new RuntimeException("Unknown Job Type"); + } + + public JobType deduceJobType(String xml) { + try { + DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); + DocumentBuilder db = null; + + db = dbf.newDocumentBuilder(); + InputSource is = new InputSource(); + is.setCharacterStream(new StringReader(xml)); + + Document doc = db.parse(is); + String rootNode = doc.getDocumentElement().getNodeName(); + if ("workflow-app".equals(rootNode)) { + return JobType.WORKFLOW; + } else if ("coordinator-app".equals(rootNode)) { + return JobType.COORDINATOR; + } else if ("bundle-app".equals(rootNode)) { + return JobType.BUNDLE; + } + throw new RuntimeException("invalid xml submitted"); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public String deduceWorkflowNameFromJson(String json) { + JsonElement jsonElement = new JsonParser().parse(json); + String name = jsonElement.getAsJsonObject().get("name").getAsString(); + return name; + } + + public String deduceWorkflowNameFromXml(String xml) { + try { + DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); + DocumentBuilder db = dbf.newDocumentBuilder(); + InputSource is = new InputSource(); + is.setCharacterStream(new StringReader(xml)); + Document doc = db.parse(is); + String name = doc.getDocumentElement().getAttributeNode("name").getValue(); + return name; + + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public String generateWorkflowXml(String actionNodeXml) { + DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); + DocumentBuilder db; + try { + db = dbf.newDocumentBuilder(); + Document doc = db.newDocument(); + + Element workflowElement = doc.createElement("workflow-app"); + workflowElement.setAttribute("name", "testWorkflow"); + workflowElement.setAttribute("xmlns", "uri:oozie:workflow:0.5"); + doc.appendChild(workflowElement); + + Element startElement = doc.createElement("start"); + startElement.setAttribute("to", "testAction"); + workflowElement.appendChild(startElement); + + Element actionElement = doc.createElement("action"); + actionElement.setAttribute("name", "testAction"); + Element actionSettingsElement = db.parse( + new InputSource(new StringReader(actionNodeXml))) + .getDocumentElement(); + actionElement.appendChild(doc.importNode(actionSettingsElement, + true)); + workflowElement.appendChild(actionElement); + + Element actionOkTransitionElement = doc.createElement("ok"); + actionOkTransitionElement.setAttribute("to", "end"); + actionElement.appendChild(actionOkTransitionElement); + + Element actionErrorTransitionElement = doc.createElement("error"); + actionErrorTransitionElement.setAttribute("to", "kill"); + actionElement.appendChild(actionErrorTransitionElement); + + Element killElement = doc.createElement("kill"); + killElement.setAttribute("name", "kill"); + Element killMessageElement = doc.createElement("message"); + killMessageElement.setTextContent("Kill node message"); + killElement.appendChild(killMessageElement); + workflowElement.appendChild(killElement); + + Element endElement = doc.createElement("end"); + endElement.setAttribute("name", "end"); + workflowElement.appendChild(endElement); + + return utils.generateXml(doc); + } catch (ParserConfigurationException | SAXException | IOException e) { + LOGGER.error("error in generating workflow xml", e); + throw new RuntimeException(e); + } + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/ServiceFormattedException.java ---------------------------------------------------------------------- diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/ServiceFormattedException.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/ServiceFormattedException.java new file mode 100644 index 0000000..3a57d63 --- /dev/null +++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/ServiceFormattedException.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.ambari.view; + +import java.util.HashMap; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import org.apache.commons.lang.exception.ExceptionUtils; +import org.json.simple.JSONObject; + +public class ServiceFormattedException extends WebApplicationException { + private final static int STATUS = 500; + + public ServiceFormattedException(Throwable exception) { + super(errorEntity(exception.getMessage(), exception)); + } + + public ServiceFormattedException(String message, Throwable exception) { + super(errorEntity(message, exception)); + } + + protected static Response errorEntity(String message, Throwable e) { + HashMap<String, Object> response = new HashMap<String, Object>(); + response.put("message", message); + String trace = null; + if (e != null) { + trace = ExceptionUtils.getStackTrace(e); + } + response.put("trace", trace); + response.put("status", STATUS); + return Response.status(STATUS).entity(new JSONObject(response)) + .type(MediaType.APPLICATION_JSON).build(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/WorkflowFilesService.java ---------------------------------------------------------------------- diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/WorkflowFilesService.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/WorkflowFilesService.java index 01bde47..f98df0d 100644 --- a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/WorkflowFilesService.java +++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/WorkflowFilesService.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,98 +19,104 @@ package org.apache.oozie.ambari.view; import java.io.IOException; import java.io.InputStream; + import org.apache.hadoop.fs.FileStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class WorkflowFilesService { - private HDFSFileUtils hdfsFileUtils; + private final static Logger LOGGER = LoggerFactory + .getLogger(WorkflowFilesService.class); + private HDFSFileUtils hdfsFileUtils; + + public WorkflowFilesService(HDFSFileUtils hdfsFileUtils) { + super(); + this.hdfsFileUtils = hdfsFileUtils; + } + + public String createWorkflowFile(String appPath, String content, + boolean overwrite) throws IOException { + return hdfsFileUtils.writeToFile(getWorkflowFileName(appPath), content, + overwrite); + } + + public String createAssetFile(String appPath, String content, + boolean overwrite) throws IOException { + return hdfsFileUtils.writeToFile(appPath, content, + overwrite); + } - public WorkflowFilesService(HDFSFileUtils hdfsFileUtils) { - super(); - this.hdfsFileUtils = hdfsFileUtils; - } + public String saveDraft(String appPath, String content, boolean overwrite) + throws IOException { + return hdfsFileUtils.writeToFile(getWorkflowDrafFileName(appPath), + content, overwrite); + } - public String createWorkflowFile(String appPath, String content, - boolean overwrite) throws IOException { - return hdfsFileUtils.writeToFile(getWorkflowFileName(appPath), content, - overwrite); - } - - public String createAssetFile(String appPath, String content, - boolean overwrite) throws IOException { - return hdfsFileUtils.writeToFile(appPath, content, - overwrite); - } + public InputStream readDraft(String appPath) throws IOException { + return hdfsFileUtils.read(getWorkflowDrafFileName(appPath)); + } - public String saveDraft(String appPath, String content, boolean overwrite) - throws IOException { - return hdfsFileUtils.writeToFile(getWorkflowDrafFileName(appPath), - content, overwrite); - } + public InputStream readWorkflowXml(String appPath) throws IOException { + return hdfsFileUtils.read(getWorkflowFileName(appPath)); + } - public InputStream readDraft(String appPath) throws IOException { - return hdfsFileUtils.read(getWorkflowDrafFileName(appPath)); - } - public InputStream readWorkflowXml(String appPath) throws IOException { - return hdfsFileUtils.read(getWorkflowFileName(appPath)); - } + private String getWorkflowDrafFileName(String appPath) { + return getWorkflowFileName(appPath).concat(".draft.json"); + } - private String getWorkflowDrafFileName(String appPath) { - return getWorkflowFileName(appPath).concat(".draft.json"); - } + private String getWorkflowFileName(String appPath) { + String workflowFile = null; + if (appPath.endsWith(".xml")) { + workflowFile = appPath; + } else { + workflowFile = appPath + (appPath.endsWith("/") ? "" : "/") + + "workflow.xml"; + } + return workflowFile; + } - private String getWorkflowFileName(String appPath) { - String workflowFile = null; - if (appPath.endsWith(".xml")) { - workflowFile = appPath; - } else { - workflowFile = appPath + (appPath.endsWith("/") ? "" : "/") - + "workflow.xml"; - } - return workflowFile; - } - - public String getAssetFileName(String appPath) { - String assetFile = null; - if (appPath.endsWith(".xml")) { - assetFile = appPath; - } else { - assetFile = appPath + (appPath.endsWith("/") ? "" : "/") - + "asset.xml"; - } - return assetFile; - } + public String getAssetFileName(String appPath) { + String assetFile = null; + if (appPath.endsWith(".xml")) { + assetFile = appPath; + } else { + assetFile = appPath + (appPath.endsWith("/") ? "" : "/") + + "asset.xml"; + } + return assetFile; + } - public void discardDraft(String workflowPath) throws IOException { - hdfsFileUtils.deleteFile(getWorkflowDrafFileName(workflowPath)); + public void discardDraft(String workflowPath) throws IOException { + hdfsFileUtils.deleteFile(getWorkflowDrafFileName(workflowPath)); - } + } - public WorkflowFileInfo getWorkflowDetails(String appPath) { - WorkflowFileInfo workflowInfo = new WorkflowFileInfo(); - workflowInfo.setWorkflowPath(getWorkflowFileName(appPath)); - boolean draftExists = hdfsFileUtils - .fileExists(getWorkflowDrafFileName(appPath)); - workflowInfo.setDraftExists(draftExists); - boolean workflowExists = hdfsFileUtils.fileExists(getWorkflowFileName(appPath)); - FileStatus workflowFileStatus = null; - if (workflowExists){ - workflowFileStatus = hdfsFileUtils - .getFileStatus(getWorkflowFileName(appPath)); - workflowInfo.setWorkflowModificationTime(workflowFileStatus - .getModificationTime()); - } - if (draftExists) { - FileStatus draftFileStatus = hdfsFileUtils - .getFileStatus(getWorkflowDrafFileName(appPath)); - workflowInfo.setDraftModificationTime(draftFileStatus - .getModificationTime()); - if (!workflowExists){ - workflowInfo.setIsDraftCurrent(true); - }else{ - workflowInfo.setIsDraftCurrent(draftFileStatus.getModificationTime() - - workflowFileStatus.getModificationTime() > 0); - } - } - return workflowInfo; - } + public WorkflowFileInfo getWorkflowDetails(String appPath) { + WorkflowFileInfo workflowInfo = new WorkflowFileInfo(); + workflowInfo.setWorkflowPath(getWorkflowFileName(appPath)); + boolean draftExists = hdfsFileUtils + .fileExists(getWorkflowDrafFileName(appPath)); + workflowInfo.setDraftExists(draftExists); + boolean workflowExists = hdfsFileUtils.fileExists(getWorkflowFileName(appPath)); + FileStatus workflowFileStatus = null; + if (workflowExists) { + workflowFileStatus = hdfsFileUtils + .getFileStatus(getWorkflowFileName(appPath)); + workflowInfo.setWorkflowModificationTime(workflowFileStatus + .getModificationTime()); + } + if (draftExists) { + FileStatus draftFileStatus = hdfsFileUtils + .getFileStatus(getWorkflowDrafFileName(appPath)); + workflowInfo.setDraftModificationTime(draftFileStatus + .getModificationTime()); + if (!workflowExists) { + workflowInfo.setIsDraftCurrent(true); + } else { + workflowInfo.setIsDraftCurrent(draftFileStatus.getModificationTime() + - workflowFileStatus.getModificationTime() > 0); + } + } + return workflowInfo; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetDefinitionRepo.java ---------------------------------------------------------------------- diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetDefinitionRepo.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetDefinitionRepo.java new file mode 100644 index 0000000..cebc7ea --- /dev/null +++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetDefinitionRepo.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.ambari.view.assets; + +import org.apache.ambari.view.DataStore; +import org.apache.oozie.ambari.view.assets.model.ActionAssetDefinition; +import org.apache.oozie.ambari.view.repo.BaseRepo; + +public class AssetDefinitionRepo extends BaseRepo<ActionAssetDefinition> { + public AssetDefinitionRepo(DataStore dataStore) { + super(ActionAssetDefinition.class, dataStore); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetRepo.java ---------------------------------------------------------------------- diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetRepo.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetRepo.java index 1390ec0..df936a4 100644 --- a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetRepo.java +++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetRepo.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,26 +17,23 @@ */ package org.apache.oozie.ambari.view.assets; -import java.util.Collection; - import org.apache.ambari.view.DataStore; +import org.apache.ambari.view.PersistenceException; import org.apache.oozie.ambari.view.assets.model.ActionAsset; +import org.apache.oozie.ambari.view.repo.BaseRepo; -public class AssetRepo { - private final DataStore dataStore; +import java.util.Collection; - public AssetRepo(DataStore dataStore) { - super(); - this.dataStore = dataStore; - } - public void saveAsset(ActionAsset asset){ - - } - public void deleteAsset(ActionAsset asset){ - - } - public Collection<ActionAsset>listAllAssets(){ - return null; - } +public class AssetRepo extends BaseRepo<ActionAsset> { + public AssetRepo(DataStore dataStore) { + super(ActionAsset.class, dataStore); + } + public Collection<ActionAsset> getMyAsets(String userName) { + try { + return dataStore.findAll(ActionAsset.class, " owner='" + userName + "'"); + } catch (PersistenceException e) { + throw new RuntimeException(e); + } + } }