http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java index 96183ea..cbd56b3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java @@ -16,10 +16,28 @@ */ package org.apache.nifi.web.api; -import java.net.URI; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.authorization.resource.Authorizable; +import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.NiFiServiceFacade; +import org.apache.nifi.web.Revision; +import org.apache.nifi.web.UpdateResult; +import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; +import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; +import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity; +import org.apache.nifi.web.api.request.ClientIdParameter; +import org.apache.nifi.web.api.request.LongParameter; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; @@ -35,27 +53,8 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; - -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.cluster.manager.impl.WebClusterManager; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.NiFiServiceFacade; -import org.apache.nifi.web.Revision; -import org.apache.nifi.web.UpdateResult; -import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; -import org.apache.nifi.web.api.dto.RevisionDTO; -import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; -import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity; -import org.apache.nifi.web.api.request.ClientIdParameter; -import org.apache.nifi.web.api.request.LongParameter; - -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; +import java.net.URI; +import java.util.Set; /** * RESTful endpoint for managing a Remote group. @@ -72,6 +71,7 @@ public class RemoteProcessGroupResource extends ApplicationResource { private NiFiServiceFacade serviceFacade; private WebClusterManager clusterManager; private NiFiProperties properties; + private Authorizer authorizer; /** * Populates the remaining content for each remote process group. The uri must be generated and the remote process groups name must be retrieved. @@ -160,18 +160,24 @@ public class RemoteProcessGroupResource extends ApplicationResource { value = "Whether to include any encapulated ports or just details about the remote process group.", required = false ) - @QueryParam("verbose") @DefaultValue(VERBOSE_DEFAULT_VALUE) Boolean verbose, + @QueryParam("verbose") @DefaultValue(VERBOSE_DEFAULT_VALUE) final Boolean verbose, @ApiParam( value = "The remote process group id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable remoteProcessGroup = lookup.getRemoteProcessGroup(id); + remoteProcessGroup.authorize(authorizer, RequestAction.READ); + }); + // get the remote process group final RemoteProcessGroupEntity entity = serviceFacade.getRemoteProcessGroup(id); populateRemainingRemoteProcessGroupEntityContent(entity); @@ -217,22 +223,22 @@ public class RemoteProcessGroupResource extends ApplicationResource { } ) public Response removeRemoteProcessGroup( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The revision is used to verify the client is working with the latest version of the flow.", required = false ) - @QueryParam(VERSION) LongParameter version, + @QueryParam(VERSION) final LongParameter version, @ApiParam( value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", required = false ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) final ClientIdParameter clientId, @ApiParam( value = "The remote process group id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { @@ -241,17 +247,19 @@ public class RemoteProcessGroupResource extends ApplicationResource { // handle expects request (usually from the cluster manager) final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - serviceFacade.claimRevision(revision); - } - if (validationPhase) { - serviceFacade.verifyDeleteRemoteProcessGroup(id); - return generateContinueResponse().build(); - } - - final RemoteProcessGroupEntity entity = serviceFacade.deleteRemoteProcessGroup(revision, id); - return clusterContext(generateOkResponse(entity)).build(); + return withWriteLock( + serviceFacade, + revision, + lookup -> { + final Authorizable remoteProcessGroup = lookup.getRemoteProcessGroup(id); + remoteProcessGroup.authorize(authorizer, RequestAction.WRITE); + }, + () -> serviceFacade.verifyDeleteRemoteProcessGroup(id), + () -> { + final RemoteProcessGroupEntity entity = serviceFacade.deleteRemoteProcessGroup(revision, id); + return clusterContext(generateOkResponse(entity)).build(); + } + ); } /** @@ -286,10 +294,10 @@ public class RemoteProcessGroupResource extends ApplicationResource { } ) public Response updateRemoteProcessGroupInputPort( - @Context HttpServletRequest httpServletRequest, - @PathParam("id") String id, - @PathParam("port-id") String portId, - RemoteProcessGroupPortEntity remoteProcessGroupPortEntity) { + @Context final HttpServletRequest httpServletRequest, + @PathParam("id") final String id, + @PathParam("port-id") final String portId, + final RemoteProcessGroupPortEntity remoteProcessGroupPortEntity) { if (remoteProcessGroupPortEntity == null || remoteProcessGroupPortEntity.getRemoteProcessGroupPort() == null) { throw new IllegalArgumentException("Remote process group port details must be specified."); @@ -308,38 +316,34 @@ public class RemoteProcessGroupResource extends ApplicationResource { // replicate if cluster manager if (properties.isClusterManager()) { - // change content type to JSON for serializing entity - final Map<String, String> headersToOverride = new HashMap<>(); - headersToOverride.put("content-type", MediaType.APPLICATION_JSON); - - // replicate the request - return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), remoteProcessGroupPortEntity, getHeaders(headersToOverride)).getResponse(); + return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), remoteProcessGroupPortEntity, getHeaders()).getResponse(); } // handle expects request (usually from the cluster manager) final Revision revision = getRevision(remoteProcessGroupPortEntity, id); - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - serviceFacade.claimRevision(revision); - } - if (validationPhase) { - // verify the update at this time - serviceFacade.verifyUpdateRemoteProcessGroupInputPort(id, requestRemoteProcessGroupPort); - return generateContinueResponse().build(); - } - - // update the specified remote process group - final RemoteProcessGroupPortEntity controllerResponse = serviceFacade.updateRemoteProcessGroupInputPort(revision, id, requestRemoteProcessGroupPort); - - // get the updated revision - final RevisionDTO updatedRevision = controllerResponse.getRevision(); - - // build the response entity - final RemoteProcessGroupPortEntity entity = new RemoteProcessGroupPortEntity(); - entity.setRevision(updatedRevision); - entity.setRemoteProcessGroupPort(controllerResponse.getRemoteProcessGroupPort()); - - return clusterContext(generateOkResponse(entity)).build(); + return withWriteLock( + serviceFacade, + revision, + lookup -> { + final Authorizable remoteProcessGroupInputPort = lookup.getRemoteProcessGroupInputPort(id, portId); + remoteProcessGroupInputPort.authorize(authorizer, RequestAction.WRITE); + }, + () -> serviceFacade.verifyUpdateRemoteProcessGroupInputPort(id, requestRemoteProcessGroupPort), + () -> { + // update the specified remote process group + final RemoteProcessGroupPortEntity controllerResponse = serviceFacade.updateRemoteProcessGroupInputPort(revision, id, requestRemoteProcessGroupPort); + + // get the updated revision + final RevisionDTO updatedRevision = controllerResponse.getRevision(); + + // build the response entity + final RemoteProcessGroupPortEntity entity = new RemoteProcessGroupPortEntity(); + entity.setRevision(updatedRevision); + entity.setRemoteProcessGroupPort(controllerResponse.getRemoteProcessGroupPort()); + + return clusterContext(generateOkResponse(entity)).build(); + } + ); } /** @@ -396,38 +400,34 @@ public class RemoteProcessGroupResource extends ApplicationResource { // replicate if cluster manager if (properties.isClusterManager()) { - // change content type to JSON for serializing entity - final Map<String, String> headersToOverride = new HashMap<>(); - headersToOverride.put("content-type", MediaType.APPLICATION_JSON); - - // replicate the request - return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), remoteProcessGroupPortEntity, getHeaders(headersToOverride)).getResponse(); + return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), remoteProcessGroupPortEntity, getHeaders()).getResponse(); } // handle expects request (usually from the cluster manager) final Revision revision = getRevision(remoteProcessGroupPortEntity, portId); - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - serviceFacade.claimRevision(revision); - } - if (validationPhase) { - // verify the update at this time - serviceFacade.verifyUpdateRemoteProcessGroupOutputPort(id, requestRemoteProcessGroupPort); - return generateContinueResponse().build(); - } - - // update the specified remote process group - final RemoteProcessGroupPortEntity controllerResponse = serviceFacade.updateRemoteProcessGroupOutputPort(revision, id, requestRemoteProcessGroupPort); - - // get the updated revision - final RevisionDTO updatedRevision = controllerResponse.getRevision(); - - // build the response entity - RemoteProcessGroupPortEntity entity = new RemoteProcessGroupPortEntity(); - entity.setRevision(updatedRevision); - entity.setRemoteProcessGroupPort(controllerResponse.getRemoteProcessGroupPort()); - - return clusterContext(generateOkResponse(entity)).build(); + return withWriteLock( + serviceFacade, + revision, + lookup -> { + final Authorizable remoteProcessGroupOutputPort = lookup.getRemoteProcessGroupOutputPort(id, portId); + remoteProcessGroupOutputPort.authorize(authorizer, RequestAction.WRITE); + }, + () -> serviceFacade.verifyUpdateRemoteProcessGroupOutputPort(id, requestRemoteProcessGroupPort), + () -> { + // update the specified remote process group + final RemoteProcessGroupPortEntity controllerResponse = serviceFacade.updateRemoteProcessGroupOutputPort(revision, id, requestRemoteProcessGroupPort); + + // get the updated revision + final RevisionDTO updatedRevision = controllerResponse.getRevision(); + + // build the response entity + RemoteProcessGroupPortEntity entity = new RemoteProcessGroupPortEntity(); + entity.setRevision(updatedRevision); + entity.setRemoteProcessGroupPort(controllerResponse.getRemoteProcessGroupPort()); + + return clusterContext(generateOkResponse(entity)).build(); + } + ); } /** @@ -481,69 +481,65 @@ public class RemoteProcessGroupResource extends ApplicationResource { // replicate if cluster manager if (properties.isClusterManager()) { - // change content type to JSON for serializing entity - final Map<String, String> headersToOverride = new HashMap<>(); - headersToOverride.put("content-type", MediaType.APPLICATION_JSON); - - // replicate the request - return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), remoteProcessGroupEntity, getHeaders(headersToOverride)).getResponse(); + return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), remoteProcessGroupEntity, getHeaders()).getResponse(); } // handle expects request (usually from the cluster manager) final Revision revision = getRevision(remoteProcessGroupEntity, id); - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - serviceFacade.claimRevision(revision); - } - if (validationPhase) { - // verify the update at this time - serviceFacade.verifyUpdateRemoteProcessGroup(requestRemoteProcessGroup); - return generateContinueResponse().build(); - } - - // if the target uri is set we have to verify it here - we don't support updating the target uri on - // an existing remote process group, however if the remote process group is being created with an id - // as is the case in clustered mode we need to verify the remote process group. treat this request as - // though its a new remote process group. - if (requestRemoteProcessGroup.getTargetUri() != null) { - // parse the uri - final URI uri; - try { - uri = URI.create(requestRemoteProcessGroup.getTargetUri()); - } catch (final IllegalArgumentException e) { - throw new IllegalArgumentException("The specified remote process group URL is malformed: " + requestRemoteProcessGroup.getTargetUri()); - } - - // validate each part of the uri - if (uri.getScheme() == null || uri.getHost() == null) { - throw new IllegalArgumentException("The specified remote process group URL is malformed: " + requestRemoteProcessGroup.getTargetUri()); - } - - if (!(uri.getScheme().equalsIgnoreCase("http") || uri.getScheme().equalsIgnoreCase("https"))) { - throw new IllegalArgumentException("The specified remote process group URL is invalid because it is not http or https: " + requestRemoteProcessGroup.getTargetUri()); - } - - // normalize the uri to the other controller - String controllerUri = uri.toString(); - if (controllerUri.endsWith("/")) { - controllerUri = StringUtils.substringBeforeLast(controllerUri, "/"); + return withWriteLock( + serviceFacade, + revision, + lookup -> { + final Authorizable remoteProcessGroup = lookup.getRemoteProcessGroup(id); + remoteProcessGroup.authorize(authorizer, RequestAction.WRITE); + }, + () -> serviceFacade.verifyUpdateRemoteProcessGroup(requestRemoteProcessGroup), + () -> { + // if the target uri is set we have to verify it here - we don't support updating the target uri on + // an existing remote process group, however if the remote process group is being created with an id + // as is the case in clustered mode we need to verify the remote process group. treat this request as + // though its a new remote process group. + if (requestRemoteProcessGroup.getTargetUri() != null) { + // parse the uri + final URI uri; + try { + uri = URI.create(requestRemoteProcessGroup.getTargetUri()); + } catch (final IllegalArgumentException e) { + throw new IllegalArgumentException("The specified remote process group URL is malformed: " + requestRemoteProcessGroup.getTargetUri()); + } + + // validate each part of the uri + if (uri.getScheme() == null || uri.getHost() == null) { + throw new IllegalArgumentException("The specified remote process group URL is malformed: " + requestRemoteProcessGroup.getTargetUri()); + } + + if (!(uri.getScheme().equalsIgnoreCase("http") || uri.getScheme().equalsIgnoreCase("https"))) { + throw new IllegalArgumentException("The specified remote process group URL is invalid because it is not http or https: " + requestRemoteProcessGroup.getTargetUri()); + } + + // normalize the uri to the other controller + String controllerUri = uri.toString(); + if (controllerUri.endsWith("/")) { + controllerUri = StringUtils.substringBeforeLast(controllerUri, "/"); + } + + // update with the normalized uri + requestRemoteProcessGroup.setTargetUri(controllerUri); + } + + // update the specified remote process group + final UpdateResult<RemoteProcessGroupEntity> updateResult = serviceFacade.updateRemoteProcessGroup(revision, requestRemoteProcessGroup); + + final RemoteProcessGroupEntity entity = updateResult.getResult(); + populateRemainingRemoteProcessGroupEntityContent(entity); + + if (updateResult.isNew()) { + return clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()), entity)).build(); + } else { + return clusterContext(generateOkResponse(entity)).build(); + } } - - // update with the normalized uri - requestRemoteProcessGroup.setTargetUri(controllerUri); - } - - // update the specified remote process group - final UpdateResult<RemoteProcessGroupEntity> updateResult = serviceFacade.updateRemoteProcessGroup(revision, requestRemoteProcessGroup); - - final RemoteProcessGroupEntity entity = updateResult.getResult(); - populateRemainingRemoteProcessGroupEntityContent(entity); - - if (updateResult.isNew()) { - return clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()), entity)).build(); - } else { - return clusterContext(generateOkResponse(entity)).build(); - } + ); } // setters @@ -559,4 +555,7 @@ public class RemoteProcessGroupResource extends ApplicationResource { this.properties = properties; } + public void setAuthorizer(Authorizer authorizer) { + this.authorizer = authorizer; + } }
http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java index bd17643..9a2dc90 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java @@ -23,6 +23,9 @@ import com.wordnik.swagger.annotations.ApiResponse; import com.wordnik.swagger.annotations.ApiResponses; import com.wordnik.swagger.annotations.Authorization; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.cluster.manager.impl.WebClusterManager; import org.apache.nifi.ui.extension.UiExtension; import org.apache.nifi.ui.extension.UiExtensionMapping; @@ -73,6 +76,7 @@ public class ReportingTaskResource extends ApplicationResource { private NiFiServiceFacade serviceFacade; private WebClusterManager clusterManager; private NiFiProperties properties; + private Authorizer authorizer; @Context private ServletContext servletContext; @@ -142,9 +146,6 @@ public class ReportingTaskResource extends ApplicationResource { /** * Retrieves the specified reporting task. * - * @param clientId Optional client id. If the client id is not specified, a - * new one will be generated. This value (whether specified or generated) is - * included in the response. * @param id The id of the reporting task to retrieve * @return A reportingTaskEntity. */ @@ -173,21 +174,22 @@ public class ReportingTaskResource extends ApplicationResource { ) public Response getReportingTask( @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( value = "The reporting task id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable reportingTask = lookup.getRemoteProcessGroup(id); + reportingTask.authorize(authorizer, RequestAction.READ); + }); + // get the reporting task final ReportingTaskEntity reportingTask = serviceFacade.getReportingTask(id); populateRemainingReportingTaskEntityContent(reportingTask); @@ -198,9 +200,6 @@ public class ReportingTaskResource extends ApplicationResource { /** * Returns the descriptor for the specified property. * - * @param clientId Optional client id. If the client id is not specified, a - * new one will be generated. This value (whether specified or generated) is - * included in the response. * @param id The id of the reporting task. * @param propertyName The property * @return a propertyDescriptorEntity @@ -230,20 +229,15 @@ public class ReportingTaskResource extends ApplicationResource { ) public Response getPropertyDescriptor( @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( value = "The reporting task id.", required = true ) - @PathParam("id") String id, + @PathParam("id") final String id, @ApiParam( value = "The property name.", required = true ) - @QueryParam("propertyName") String propertyName) { + @QueryParam("propertyName") final String propertyName) { // ensure the property name is specified if (propertyName == null) { @@ -255,6 +249,12 @@ public class ReportingTaskResource extends ApplicationResource { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable reportingTask = lookup.getRemoteProcessGroup(id); + reportingTask.authorize(authorizer, RequestAction.READ); + }); + // get the property descriptor final PropertyDescriptorDTO descriptor = serviceFacade.getReportingTaskPropertyDescriptor(id, propertyName); @@ -298,13 +298,19 @@ public class ReportingTaskResource extends ApplicationResource { value = "The reporting task id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable reportingTask = lookup.getRemoteProcessGroup(id); + reportingTask.authorize(authorizer, RequestAction.WRITE); + }); + // get the component state final ComponentStateDTO state = serviceFacade.getReportingTaskState(id); @@ -345,16 +351,16 @@ public class ReportingTaskResource extends ApplicationResource { } ) public Response clearState( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The revision used to verify the client is working with the latest version of the flow.", required = true - ) ComponentStateEntity revisionEntity, + ) final ComponentStateEntity revisionEntity, @ApiParam( value = "The reporting task id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { @@ -363,6 +369,11 @@ public class ReportingTaskResource extends ApplicationResource { // handle expects request (usually from the cluster manager) if (isValidationPhase(httpServletRequest)) { + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable reportingTask = lookup.getRemoteProcessGroup(id); + reportingTask.authorize(authorizer, RequestAction.WRITE); + }); serviceFacade.verifyCanClearReportingTaskState(id); return generateContinueResponse().build(); } @@ -407,16 +418,16 @@ public class ReportingTaskResource extends ApplicationResource { } ) public Response updateReportingTask( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The reporting task id.", required = true ) - @PathParam("id") String id, + @PathParam("id") final String id, @ApiParam( value = "The reporting task configuration details.", required = true - ) ReportingTaskEntity reportingTaskEntity) { + ) final ReportingTaskEntity reportingTaskEntity) { if (reportingTaskEntity == null || reportingTaskEntity.getComponent() == null) { throw new IllegalArgumentException("Reporting task details must be specified."); @@ -440,27 +451,29 @@ public class ReportingTaskResource extends ApplicationResource { // handle expects request (usually from the cluster manager) final Revision revision = getRevision(reportingTaskEntity, id); - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - serviceFacade.claimRevision(revision); - } - if (validationPhase) { - serviceFacade.verifyUpdateReportingTask(requestReportingTaskDTO); - return generateContinueResponse().build(); - } - - // update the reporting task - final UpdateResult<ReportingTaskEntity> controllerResponse = serviceFacade.updateReportingTask(revision, requestReportingTaskDTO); - - // get the results - final ReportingTaskEntity entity = controllerResponse.getResult(); - populateRemainingReportingTaskEntityContent(entity); - - if (controllerResponse.isNew()) { - return clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()), entity)).build(); - } else { - return clusterContext(generateOkResponse(entity)).build(); - } + return withWriteLock( + serviceFacade, + revision, + lookup -> { + final Authorizable reportingTask = lookup.getRemoteProcessGroup(id); + reportingTask.authorize(authorizer, RequestAction.WRITE); + }, + () -> serviceFacade.verifyUpdateReportingTask(requestReportingTaskDTO), + () -> { + // update the reporting task + final UpdateResult<ReportingTaskEntity> controllerResponse = serviceFacade.updateReportingTask(revision, requestReportingTaskDTO); + + // get the results + final ReportingTaskEntity entity = controllerResponse.getResult(); + populateRemainingReportingTaskEntityContent(entity); + + if (controllerResponse.isNew()) { + return clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()), entity)).build(); + } else { + return clusterContext(generateOkResponse(entity)).build(); + } + } + ); } /** @@ -521,18 +534,20 @@ public class ReportingTaskResource extends ApplicationResource { // handle expects request (usually from the cluster manager) final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - serviceFacade.claimRevision(revision); - } - if (validationPhase) { - serviceFacade.verifyDeleteReportingTask(id); - return generateContinueResponse().build(); - } - - // delete the specified reporting task - final ReportingTaskEntity entity = serviceFacade.deleteReportingTask(revision, id); - return clusterContext(generateOkResponse(entity)).build(); + return withWriteLock( + serviceFacade, + revision, + lookup -> { + final Authorizable reportingTask = lookup.getRemoteProcessGroup(id); + reportingTask.authorize(authorizer, RequestAction.WRITE); + }, + () -> serviceFacade.verifyDeleteReportingTask(id), + () -> { + // delete the specified reporting task + final ReportingTaskEntity entity = serviceFacade.deleteReportingTask(revision, id); + return clusterContext(generateOkResponse(entity)).build(); + } + ); } // setters @@ -547,4 +562,8 @@ public class ReportingTaskResource extends ApplicationResource { public void setProperties(NiFiProperties properties) { this.properties = properties; } + + public void setAuthorizer(Authorizer authorizer) { + this.authorizer = authorizer; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ResourceResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ResourceResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ResourceResource.java index b30d084..5260c5f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ResourceResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ResourceResource.java @@ -21,6 +21,16 @@ import com.wordnik.swagger.annotations.ApiOperation; import com.wordnik.swagger.annotations.ApiResponse; import com.wordnik.swagger.annotations.ApiResponses; import com.wordnik.swagger.annotations.Authorization; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.AccessDeniedException; +import org.apache.nifi.authorization.AuthorizationRequest; +import org.apache.nifi.authorization.AuthorizationResult; +import org.apache.nifi.authorization.AuthorizationResult.Result; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.authorization.resource.ResourceFactory; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.cluster.manager.impl.WebClusterManager; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.NiFiServiceFacade; @@ -49,6 +59,25 @@ public class ResourceResource extends ApplicationResource { private NiFiServiceFacade serviceFacade; private WebClusterManager clusterManager; private NiFiProperties properties; + private Authorizer authorizer; + + private void authorizeResource() { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + final AuthorizationRequest request = new AuthorizationRequest.Builder() + .resource(ResourceFactory.getResourceResource()) + .identity(user.getIdentity()) + .anonymous(user.isAnonymous()) + .accessAttempt(true) + .action(RequestAction.READ) + .build(); + + final AuthorizationResult result = authorizer.authorize(request); + if (!Result.Approved.equals(result.getResult())) { + final String message = StringUtils.isNotBlank(result.getExplanation()) ? result.getExplanation() : "Access is denied"; + throw new AccessDeniedException(message); + } + } /** * Gets the available resources that support access/authorization policies. @@ -75,6 +104,8 @@ public class ResourceResource extends ApplicationResource { ) public Response getResources() { + authorizeResource(); + // replicate if the cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); @@ -103,4 +134,8 @@ public class ResourceResource extends ApplicationResource { public void setProperties(NiFiProperties properties) { this.properties = properties; } + + public void setAuthorizer(Authorizer authorizer) { + this.authorizer = authorizer; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SnippetResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SnippetResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SnippetResource.java new file mode 100644 index 0000000..fbbebe4 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SnippetResource.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api; + +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.controller.Snippet; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.NiFiServiceFacade; +import org.apache.nifi.web.Revision; +import org.apache.nifi.web.api.dto.SnippetDTO; +import org.apache.nifi.web.api.entity.SnippetEntity; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.net.URI; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * RESTful endpoint for querying dataflow snippets. + */ +@Path("/snippets") +@Api( + value = "/snippets", + description = "Endpoint for accessing dataflow snippets." +) +public class SnippetResource extends ApplicationResource { + + private NiFiServiceFacade serviceFacade; + private WebClusterManager clusterManager; + private NiFiProperties properties; + private Authorizer authorizer; + + /** + * Populate the uri's for the specified snippet. + * + * @param entity processors + * @return dtos + */ + private SnippetEntity populateRemainingSnippetEntityContent(SnippetEntity entity) { + if (entity.getSnippet() != null) { + populateRemainingSnippetContent(entity.getSnippet()); + } + return entity; + } + + /** + * Populates the uri for the specified snippet. + */ + private SnippetDTO populateRemainingSnippetContent(SnippetDTO snippet) { + String snippetGroupId = snippet.getParentGroupId(); + + // populate the snippet href + snippet.setUri(generateResourceUri("process-groups", snippetGroupId, "snippets", snippet.getId())); + + return snippet; + } + + // -------- + // snippets + // -------- + + /** + * Creates a snippet based off the specified configuration. + * + * @param httpServletRequest request + * @param snippetEntity A snippetEntity + * @return A snippetEntity + */ + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + // TODO - @PreAuthorize("hasRole('ROLE_DFM')") + @ApiOperation( + value = "Creates a snippet", + response = SnippetEntity.class, + authorizations = { + @Authorization(value = "Read Only", type = "ROLE_MONITOR"), + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"), + @Authorization(value = "Administrator", type = "ROLE_ADMIN") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response createSnippet( + @Context HttpServletRequest httpServletRequest, + @ApiParam( + value = "The snippet configuration details.", + required = true + ) + final SnippetEntity snippetEntity) { + + if (snippetEntity == null || snippetEntity.getSnippet() == null) { + throw new IllegalArgumentException("Snippet details must be specified."); + } + + if (snippetEntity.getSnippet().getId() != null) { + throw new IllegalArgumentException("Snippet ID cannot be specified."); + } + + if (properties.isClusterManager()) { + return clusterManager.applyRequest(HttpMethod.POST, getAbsolutePath(), snippetEntity, getHeaders()).getResponse(); + } + + // handle expects request (usually from the cluster manager) + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final SnippetDTO snippet = snippetEntity.getSnippet(); + authorizeSnippet(snippet, authorizer, lookup, RequestAction.READ); + }); + } + if (validationPhase) { + return generateContinueResponse().build(); + } + + // set the processor id as appropriate + snippetEntity.getSnippet().setId(generateUuid()); + + // create the snippet + final SnippetEntity entity = serviceFacade.createSnippet(snippetEntity.getSnippet()); + populateRemainingSnippetEntityContent(entity); + + // build the response + return clusterContext(generateCreatedResponse(URI.create(entity.getSnippet().getUri()), entity)).build(); + } + + /** + * Updates the specified snippet. The contents of the snippet (component + * ids) cannot be updated once the snippet is created. + * + * @param httpServletRequest request + * @param snippetId The id of the snippet. + * @param snippetEntity A snippetEntity + * @return A snippetEntity + */ + @PUT + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @Path("{id}") + // TODO - @PreAuthorize("hasRole('ROLE_DFM')") + @ApiOperation( + value = "Updates a snippet", + response = SnippetEntity.class, + authorizations = { + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response updateSnippet( + @Context HttpServletRequest httpServletRequest, + @ApiParam( + value = "The snippet id.", + required = true + ) + @PathParam("id") String snippetId, + @ApiParam( + value = "The snippet configuration details.", + required = true + ) final SnippetEntity snippetEntity) { + + if (snippetEntity == null || snippetEntity.getSnippet() == null) { + throw new IllegalArgumentException("Snippet details must be specified."); + } + + // ensure the ids are the same + final SnippetDTO requestSnippetDTO = snippetEntity.getSnippet(); + if (!snippetId.equals(requestSnippetDTO.getId())) { + throw new IllegalArgumentException(String.format("The snippet id (%s) in the request body does not equal the " + + "snippet id of the requested resource (%s).", requestSnippetDTO.getId(), snippetId)); + } + + // replicate if cluster manager + if (properties.isClusterManager()) { + return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), snippetEntity, getHeaders()).getResponse(); + } + + // get the revision from this snippet + final Set<Revision> revisions = serviceFacade.getRevisionsFromSnippet(snippetId); + return withWriteLock( + serviceFacade, + revisions, + lookup -> { + // ensure write access to the target process group + if (requestSnippetDTO.getParentGroupId() != null) { + lookup.getProcessGroup(requestSnippetDTO.getParentGroupId()).authorize(authorizer, RequestAction.WRITE); + } + + // ensure read permission to every component in the snippet + final Snippet snippet = lookup.getSnippet(snippetId); + authorizeSnippet(snippet, authorizer, lookup, RequestAction.WRITE); + }, + () -> serviceFacade.verifyUpdateSnippet(requestSnippetDTO, revisions.stream().map(rev -> rev.getComponentId()).collect(Collectors.toSet())), + () -> { + // update the snippet + final SnippetEntity entity = serviceFacade.updateSnippet(revisions, snippetEntity.getSnippet()); + populateRemainingSnippetEntityContent(entity); + return clusterContext(generateOkResponse(entity)).build(); + } + ); + } + + /** + * Removes the specified snippet. + * + * @param httpServletRequest request + * @param snippetId The id of the snippet to remove. + * @return A entity containing the client id and an updated revision. + */ + @DELETE + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("{id}") + // TODO - @PreAuthorize("hasRole('ROLE_DFM')") + @ApiOperation( + value = "Deletes the components in a snippet and drops the snippet", + response = SnippetEntity.class, + authorizations = { + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response deleteSnippet( + @Context final HttpServletRequest httpServletRequest, + @ApiParam( + value = "The snippet id.", + required = true + ) + @PathParam("id") final String snippetId) { + + // replicate if cluster manager + if (properties.isClusterManager()) { + return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + } + + // get the revision from this snippet + final Set<Revision> revisions = serviceFacade.getRevisionsFromSnippet(snippetId); + return withWriteLock( + serviceFacade, + revisions, + lookup -> { + // ensure read permission to every component in the snippet + final Snippet snippet = lookup.getSnippet(snippetId); + authorizeSnippet(snippet, authorizer, lookup, RequestAction.WRITE); + }, + () -> serviceFacade.verifyDeleteSnippet(snippetId, revisions.stream().map(rev -> rev.getComponentId()).collect(Collectors.toSet())), + () -> { + // delete the specified snippet + final SnippetEntity snippetEntity = serviceFacade.deleteSnippet(revisions, snippetId); + return clusterContext(generateOkResponse(snippetEntity)).build(); + } + ); + } + + /* setters */ + public void setServiceFacade(NiFiServiceFacade serviceFacade) { + this.serviceFacade = serviceFacade; + } + + public void setClusterManager(WebClusterManager clusterManager) { + this.clusterManager = clusterManager; + } + + public void setProperties(NiFiProperties properties) { + this.properties = properties; + } + + public void setAuthorizer(Authorizer authorizer) { + this.authorizer = authorizer; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java index b0383b0..c53c6c0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java @@ -22,7 +22,16 @@ import com.wordnik.swagger.annotations.ApiParam; import com.wordnik.swagger.annotations.ApiResponse; import com.wordnik.swagger.annotations.ApiResponses; import com.wordnik.swagger.annotations.Authorization; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.AccessDeniedException; +import org.apache.nifi.authorization.AuthorizationRequest; +import org.apache.nifi.authorization.AuthorizationResult; +import org.apache.nifi.authorization.AuthorizationResult.Result; import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.authorization.resource.ResourceFactory; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.impl.WebClusterManager; @@ -60,6 +69,24 @@ public class SystemDiagnosticsResource extends ApplicationResource { private NiFiProperties properties; private Authorizer authorizer; + private void authorizeSystem() { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + final AuthorizationRequest request = new AuthorizationRequest.Builder() + .resource(ResourceFactory.getSystemResource()) + .identity(user.getIdentity()) + .anonymous(user.isAnonymous()) + .accessAttempt(true) + .action(RequestAction.READ) + .build(); + + final AuthorizationResult result = authorizer.authorize(request); + if (!Result.Approved.equals(result.getResult())) { + final String message = StringUtils.isNotBlank(result.getExplanation()) ? result.getExplanation() : "Access is denied"; + throw new AccessDeniedException(message); + } + } + /** * Gets the system diagnostics for this NiFi instance. * @@ -88,12 +115,14 @@ public class SystemDiagnosticsResource extends ApplicationResource { value = "Whether or not to include the breakdown per node. Optional, defaults to false", required = false ) - @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise, + @QueryParam("nodewise") @DefaultValue(NODEWISE) final Boolean nodewise, @ApiParam( value = "The id of the node where to get the status.", required = false ) - @QueryParam("clusterNodeId") String clusterNodeId) { + @QueryParam("clusterNodeId") final String clusterNodeId) { + + authorizeSystem(); // ensure a valid request if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) { http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/TemplateResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/TemplateResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/TemplateResource.java index 7893980..c866381 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/TemplateResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/TemplateResource.java @@ -23,25 +23,25 @@ import com.wordnik.swagger.annotations.ApiResponse; import com.wordnik.swagger.annotations.ApiResponses; import com.wordnik.swagger.annotations.Authorization; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.cluster.manager.impl.WebClusterManager; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.api.dto.TemplateDTO; import org.apache.nifi.web.api.entity.TemplateEntity; -import org.apache.nifi.web.api.request.ClientIdParameter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.HttpMethod; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -62,6 +62,7 @@ public class TemplateResource extends ApplicationResource { private NiFiServiceFacade serviceFacade; private WebClusterManager clusterManager; private NiFiProperties properties; + private Authorizer authorizer; /** * Populates the uri for the specified templates. @@ -119,13 +120,19 @@ public class TemplateResource extends ApplicationResource { value = "The template id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable template = lookup.getTemplate(id); + template.authorize(authorizer, RequestAction.READ); + }); + // get the template final TemplateDTO template = serviceFacade.exportTemplate(id); @@ -148,9 +155,6 @@ public class TemplateResource extends ApplicationResource { * Removes the specified template. * * @param httpServletRequest request - * @param clientId Optional client id. If the client id is not specified, a - * new one will be generated. This value (whether specified or generated) is - * included in the response. * @param id The id of the template to remove. * @return A templateEntity. */ @@ -176,17 +180,12 @@ public class TemplateResource extends ApplicationResource { } ) public Response removeTemplate( - @Context HttpServletRequest httpServletRequest, - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The template id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { @@ -194,9 +193,13 @@ public class TemplateResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - // TODO: NEED VERSION FOR REVISION! final boolean validationPhase = isValidationPhase(httpServletRequest); if (validationPhase) { + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable template = lookup.getTemplate(id); + template.authorize(authorizer, RequestAction.WRITE); + }); return generateContinueResponse().build(); } @@ -221,4 +224,8 @@ public class TemplateResource extends ApplicationResource { public void setProperties(NiFiProperties properties) { this.properties = properties; } + + public void setAuthorizer(Authorizer authorizer) { + this.authorizer = authorizer; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 97c0c13..8c8b121 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -133,6 +133,7 @@ import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; import org.apache.nifi.web.api.dto.status.ProcessorStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO; +import org.apache.nifi.web.api.entity.FlowBreadcrumbEntity; import org.apache.nifi.web.revision.RevisionManager; import javax.ws.rs.WebApplicationException; @@ -681,7 +682,6 @@ public final class DtoFactory { final SnippetDTO dto = new SnippetDTO(); dto.setId(snippet.getId()); dto.setParentGroupId(snippet.getParentGroupId()); - dto.setLinked(snippet.isLinked()); // populate the snippet contents ids dto.setConnections(mapRevisionToDto(snippet.getConnections())); @@ -1398,27 +1398,51 @@ public final class DtoFactory { } /** - * Creates a FlowBreadcrumbDTO from the specified parent ProcessGroup. + * Creates a FlowBreadcrumbEntity from the specified parent ProcessGroup. * - * @param parentGroup group + * @param group group * @return dto */ - private FlowBreadcrumbDTO createBreadcrumbDto(final ProcessGroup parentGroup) { - if (parentGroup == null) { + private FlowBreadcrumbEntity createBreadcrumbEntity(final ProcessGroup group) { + if (group == null) { return null; } - final FlowBreadcrumbDTO dto = new FlowBreadcrumbDTO(); - dto.setId(parentGroup.getIdentifier()); - dto.setName(parentGroup.getName()); + final FlowBreadcrumbDTO dto = createBreadcrumbDto(group); + final AccessPolicyDTO accessPolicy = createAccessPolicyDto(group); + final FlowBreadcrumbEntity entity = entityFactory.createFlowBreadcrumbEntity(dto, accessPolicy); + + if (group.getParent() != null) { + entity.setParentBreadcrumb(createBreadcrumbEntity(group.getParent())); + } + + return entity; + } - if (parentGroup.getParent() != null) { - dto.setParentBreadcrumb(createBreadcrumbDto(parentGroup.getParent())); + /** + * Creates a FlowBreadcrumbDTO from the specified parent ProcessGroup. + * + * @param group group + * @return dto + */ + private FlowBreadcrumbDTO createBreadcrumbDto(final ProcessGroup group) { + if (group == null) { + return null; } + final FlowBreadcrumbDTO dto = new FlowBreadcrumbDTO(); + dto.setId(group.getIdentifier()); + dto.setName(group.getName()); + return dto; } + /** + * Creates the AccessPolicyDTO based on the specified Authorizable. + * + * @param authorizable authorizable + * @return dto + */ public AccessPolicyDTO createAccessPolicyDto(final Authorizable authorizable) { final AccessPolicyDTO dto = new AccessPolicyDTO(); dto.setCanRead(authorizable.isAuthorized(authorizer, RequestAction.READ)); @@ -1440,7 +1464,7 @@ public final class DtoFactory { final ProcessGroupFlowDTO dto = new ProcessGroupFlowDTO(); dto.setId(group.getIdentifier()); dto.setLastRefreshed(new Date()); - dto.setBreadcrumb(createBreadcrumbDto(group)); + dto.setBreadcrumb(createBreadcrumbEntity(group)); dto.setFlow(createFlowDto(group, groupStatus, revisionManager)); final ProcessGroup parent = group.getParent(); @@ -1468,11 +1492,6 @@ public final class DtoFactory { flow.getConnections().add(entityFactory.createConnectionEntity(connection, null, accessPolicy, status)); } - for (final ControllerServiceDTO controllerService : snippet.getControllerServices()) { - final RevisionDTO revision = createRevisionDTO(revisionManager.getRevision(controllerService.getId())); - flow.getControllerServices().add(entityFactory.createControllerServiceEntity(controllerService, revision, null)); - } - for (final FunnelDTO funnel : snippet.getFunnels()) { final RevisionDTO revision = createRevisionDTO(revisionManager.getRevision(funnel.getId())); final AccessPolicyDTO accessPolicy = createAccessPolicyDto(group.getFunnel(funnel.getId())); @@ -1627,13 +1646,6 @@ public final class DtoFactory { dto.getOutputPorts().add(entityFactory.createPortEntity(createPortDto(outputPort), revision, accessPolicy, status)); } - // TODO - controller services once they are accessible from the group - for (final ControllerServiceNode controllerService : group.getControllerServices(false)) { - final RevisionDTO revision = createRevisionDTO(revisionManager.getRevision(controllerService.getIdentifier())); - final AccessPolicyDTO accessPolicy = createAccessPolicyDto(controllerService); - dto.getControllerServices().add(entityFactory.createControllerServiceEntity(createControllerServiceDto(controllerService), revision, accessPolicy)); - } - return dto; } @@ -2522,7 +2534,6 @@ public final class DtoFactory { copy.setOutputPortCount(original.getOutputPortCount()); copy.setParentGroupId(original.getParentGroupId()); - copy.setRunning(original.isRunning()); copy.setRunningCount(original.getRunningCount()); copy.setStoppedCount(original.getStoppedCount()); copy.setDisabledCount(original.getDisabledCount()); http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java index 2566820..70155a6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.web.api.dto; +import org.apache.nifi.web.api.dto.flow.FlowBreadcrumbDTO; +import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO; import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO; import org.apache.nifi.web.api.dto.status.PortStatusDTO; import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; @@ -24,10 +26,12 @@ import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.entity.ConnectionEntity; import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity; +import org.apache.nifi.web.api.entity.FlowBreadcrumbEntity; import org.apache.nifi.web.api.entity.FunnelEntity; import org.apache.nifi.web.api.entity.LabelEntity; import org.apache.nifi.web.api.entity.PortEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity; @@ -36,6 +40,13 @@ import org.apache.nifi.web.api.entity.SnippetEntity; public final class EntityFactory { + public ProcessGroupFlowEntity createProcessGroupFlowEntity(final ProcessGroupFlowDTO dto, final AccessPolicyDTO accessPolicy) { + final ProcessGroupFlowEntity entity = new ProcessGroupFlowEntity(); + entity.setProcessGroupFlow(dto); + entity.setAccessPolicy(accessPolicy); + return entity; + } + public ProcessorEntity createProcessorEntity(final ProcessorDTO dto, final RevisionDTO revision, final AccessPolicyDTO accessPolicy, final ProcessorStatusDTO status) { final ProcessorEntity entity = new ProcessorEntity(); entity.setRevision(revision); @@ -178,17 +189,9 @@ public final class EntityFactory { return entity; } - public SnippetEntity createSnippetEntity(final SnippetDTO dto, final RevisionDTO revision, final AccessPolicyDTO accessPolicy) { + public SnippetEntity createSnippetEntity(final SnippetDTO dto) { final SnippetEntity entity = new SnippetEntity(); - entity.setRevision(revision); - if (dto != null) { - entity.setAccessPolicy(accessPolicy); - entity.setId(dto.getId()); - if (accessPolicy != null && accessPolicy.getCanRead()) { - entity.setSnippet(dto); - } - } - + entity.setSnippet(dto); return entity; } @@ -235,4 +238,16 @@ public final class EntityFactory { return entity; } + + public FlowBreadcrumbEntity createFlowBreadcrumbEntity(final FlowBreadcrumbDTO dto, final AccessPolicyDTO accessPolicy) { + final FlowBreadcrumbEntity entity = new FlowBreadcrumbEntity(); + if (dto != null) { + entity.setAccessPolicy(accessPolicy); + entity.setId(dto.getId()); + if (accessPolicy != null && accessPolicy.getCanRead()) { + entity.setBreadcrumb(dto); + } + } + return entity; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java index 29ca220..155b36e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.web.dao; +import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.web.api.dto.ProcessGroupDTO; @@ -58,9 +59,18 @@ public interface ProcessGroupDAO { /** * Verifies the specified process group can be modified. * - * @param processGroupDTO dto + * @param groupId id + * @param state scheduled state + */ + void verifyScheduleComponents(String groupId, ScheduledState state, Set<String> componentIds); + + /** + * Schedules the components in the specified process group. + * + * @param groupId id + * @param state scheduled state */ - void verifyUpdate(ProcessGroupDTO processGroupDTO); + void scheduleComponents(String groupId, ScheduledState state, Set<String> componentIds); /** * Updates the specified process group. http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/SnippetDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/SnippetDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/SnippetDAO.java index 7e21440..806747b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/SnippetDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/SnippetDAO.java @@ -51,6 +51,13 @@ public interface SnippetDAO { boolean hasSnippet(String snippetId); /** + * Drops the specified snippet. + * + * @param snippetId snippet id + */ + void dropSnippet(String snippetId); + + /** * Gets the specified snippet. * * @param snippetId The snippet id @@ -59,31 +66,31 @@ public interface SnippetDAO { Snippet getSnippet(String snippetId); /** - * Verifies the specified snippet can be updated. + * Verifies the components of the specified snippet can be updated. * * @param snippetDTO snippet */ - void verifyUpdate(SnippetDTO snippetDTO); + void verifyUpdateSnippetComponent(SnippetDTO snippetDTO); /** - * Updates the specified snippet. + * Updates the components in the specified snippet. * * @param snippetDTO snippet * @return The snippet */ - Snippet updateSnippet(SnippetDTO snippetDTO); + Snippet updateSnippetComponents(SnippetDTO snippetDTO); /** - * Verifies the specified snippet can be removed. + * Verifies the components of the specified snippet can be removed. * * @param snippetId snippet id */ - void verifyDelete(String snippetId); + void verifyDeleteSnippetComponents(String snippetId); /** - * Deletes the specified snippet. + * Deletes the components in the specified snippet. * * @param snippetId The snippet id */ - void deleteSnippet(String snippetId); + void deleteSnippetComponents(String snippetId); } http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java index 5b4570b..3494f93 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java @@ -16,12 +16,19 @@ */ package org.apache.nifi.web.dao.impl; +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.connectable.ConnectableType; +import org.apache.nifi.connectable.Port; import org.apache.nifi.connectable.Position; import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.web.ResourceNotFoundException; import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.dao.ProcessGroupDAO; +import java.util.HashSet; import java.util.Set; public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGroupDAO { @@ -68,15 +75,51 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou } @Override - public void verifyUpdate(ProcessGroupDTO processGroupDTO) { - final ProcessGroup group = locateProcessGroup(flowController, processGroupDTO.getId()); + public void verifyScheduleComponents(final String groupId, final ScheduledState state,final Set<String> componentIds) { + final ProcessGroup group = locateProcessGroup(flowController, groupId); + + final Set<Connectable> connectables = new HashSet<>(componentIds.size()); + for (final String componentId : componentIds) { + final Connectable connectable = group.findConnectable(componentId); + if (connectable == null) { + throw new ResourceNotFoundException("Unable to find component with id " + componentId); + } + + connectables.add(connectable); + } - // determine if any action is required - if (isNotNull(processGroupDTO.isRunning())) { - if (processGroupDTO.isRunning()) { - group.verifyCanStart(); + // verify as appropriate + connectables.forEach(connectable -> { + if (ScheduledState.RUNNING.equals(state)) { + group.verifyCanStart(connectable); } else { - group.verifyCanStop(); + group.verifyCanStop(connectable); + } + }); + } + + @Override + public void scheduleComponents(final String groupId, final ScheduledState state, final Set<String> componentIds) { + final ProcessGroup group = locateProcessGroup(flowController, groupId); + + for (final String componentId : componentIds) { + final Connectable connectable = group.findConnectable(componentId); + if (ScheduledState.RUNNING.equals(state)) { + if (ConnectableType.PROCESSOR.equals(connectable.getConnectableType())) { + group.startProcessor((ProcessorNode) connectable); + } else if (ConnectableType.INPUT_PORT.equals(connectable.getConnectableType())) { + group.startInputPort((Port) connectable); + } else if (ConnectableType.OUTPUT_PORT.equals(connectable.getConnectableType())) { + group.startOutputPort((Port) connectable); + } + } else { + if (ConnectableType.PROCESSOR.equals(connectable.getConnectableType())) { + group.stopProcessor((ProcessorNode) connectable); + } else if (ConnectableType.INPUT_PORT.equals(connectable.getConnectableType())) { + group.stopInputPort((Port) connectable); + } else if (ConnectableType.OUTPUT_PORT.equals(connectable.getConnectableType())) { + group.stopOutputPort((Port) connectable); + } } } } @@ -98,15 +141,6 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou group.setComments(comments); } - // determine if any action is required - if (isNotNull(processGroupDTO.isRunning())) { - if (processGroupDTO.isRunning()) { - group.startProcessing(); - } else { - group.stopProcessing(); - } - } - return group; }