http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index 097f214..65c8045 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -16,47 +16,28 @@ */ package org.apache.nifi.web.api; -import java.io.InputStream; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.xml.bind.JAXBContext; -import javax.xml.bind.JAXBElement; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Unmarshaller; -import javax.xml.transform.stream.StreamSource; - +import com.sun.jersey.api.core.ResourceContext; +import com.sun.jersey.multipart.FormDataParam; +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.controller.Snippet; import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.AuthorizableLookup; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.Revision; import org.apache.nifi.web.UpdateResult; import org.apache.nifi.web.api.dto.ConnectionDTO; import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; -import org.apache.nifi.web.api.dto.SnippetDTO; import org.apache.nifi.web.api.dto.TemplateDTO; import org.apache.nifi.web.api.dto.flow.FlowDTO; import org.apache.nifi.web.api.entity.ConnectionEntity; @@ -80,7 +61,6 @@ import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.entity.ProcessorsEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity; -import org.apache.nifi.web.api.entity.SnippetEntity; import org.apache.nifi.web.api.entity.TemplateEntity; import org.apache.nifi.web.api.entity.TemplatesEntity; import org.apache.nifi.web.api.request.ClientIdParameter; @@ -88,14 +68,34 @@ import org.apache.nifi.web.api.request.LongParameter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.sun.jersey.api.core.ResourceContext; -import com.sun.jersey.multipart.FormDataParam; -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBElement; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; +import javax.xml.transform.stream.StreamSource; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; /** * RESTful endpoint for managing a Group. @@ -117,6 +117,7 @@ public class ProcessGroupResource extends ApplicationResource { private NiFiServiceFacade serviceFacade; private WebClusterManager clusterManager; private NiFiProperties properties; + private Authorizer authorizer; private ProcessorResource processorResource; private InputPortResource inputPortResource; @@ -199,31 +200,6 @@ public class ProcessGroupResource extends ApplicationResource { } /** - * Populate the uri's for the specified snippet. - * - * @param entity processors - * @return dtos - */ - private SnippetEntity populateRemainingSnippetEntityContent(SnippetEntity entity) { - if (entity.getSnippet() != null) { - populateRemainingSnippetContent(entity.getSnippet()); - } - return entity; - } - - /** - * Populates the uri for the specified snippet. - */ - private SnippetDTO populateRemainingSnippetContent(SnippetDTO snippet) { - String snippetGroupId = snippet.getParentGroupId(); - - // populate the snippet href - snippet.setUri(generateResourceUri("process-groups", snippetGroupId, "snippets", snippet.getId())); - - return snippet; - } - - /** * Retrieves the contents of the specified group. * * @param groupId The id of the process group. @@ -257,13 +233,19 @@ public class ProcessGroupResource extends ApplicationResource { value = "The process group id.", required = false ) - @PathParam("id") String groupId) { + @PathParam("id") final String groupId) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId); + processGroup.authorize(authorizer, RequestAction.READ); + }); + // get this process group contents final ProcessGroupEntity entity = serviceFacade.getProcessGroup(groupId); populateRemainingProcessGroupEntityContent(entity); @@ -272,7 +254,6 @@ public class ProcessGroupResource extends ApplicationResource { entity.getComponent().setContents(null); } - return clusterContext(generateOkResponse(entity)).build(); } @@ -306,17 +287,16 @@ public class ProcessGroupResource extends ApplicationResource { } ) public Response updateProcessGroup( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The process group id.", required = true ) - @PathParam("id") String id, + @PathParam("id") final String id, @ApiParam( value = "The process group configuration details.", required = true - ) - ProcessGroupEntity processGroupEntity) { + ) final ProcessGroupEntity processGroupEntity) { if (processGroupEntity == null || processGroupEntity.getComponent() == null) { throw new IllegalArgumentException("Process group details must be specified."); @@ -339,25 +319,27 @@ public class ProcessGroupResource extends ApplicationResource { // handle expects request (usually from the cluster manager) final Revision revision = getRevision(processGroupEntity, id); - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - serviceFacade.claimRevision(revision); - } - if (validationPhase) { - serviceFacade.verifyUpdateProcessGroup(requestProcessGroupDTO); - return generateContinueResponse().build(); - } - - // update the process group - final UpdateResult<ProcessGroupEntity> updateResult = serviceFacade.updateProcessGroup(revision, requestProcessGroupDTO); - final ProcessGroupEntity entity = updateResult.getResult(); - populateRemainingProcessGroupEntityContent(entity); - - if (updateResult.isNew()) { - return clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()), entity)).build(); - } else { - return clusterContext(generateOkResponse(entity)).build(); - } + return withWriteLock( + serviceFacade, + revision, + lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(id); + processGroup.authorize(authorizer, RequestAction.WRITE); + }, + null, + () -> { + // update the process group + final UpdateResult<ProcessGroupEntity> updateResult = serviceFacade.updateProcessGroup(revision, requestProcessGroupDTO); + final ProcessGroupEntity entity = updateResult.getResult(); + populateRemainingProcessGroupEntityContent(entity); + + if (updateResult.isNew()) { + return clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()), entity)).build(); + } else { + return clusterContext(generateOkResponse(entity)).build(); + } + } + ); } /** @@ -390,23 +372,23 @@ public class ProcessGroupResource extends ApplicationResource { @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") } ) - public Response removeProcessGroupReference( - @Context HttpServletRequest httpServletRequest, + public Response removeProcessGroup( + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The revision is used to verify the client is working with the latest version of the flow.", required = false ) - @QueryParam(VERSION) LongParameter version, + @QueryParam(VERSION) final LongParameter version, @ApiParam( value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", required = false ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) final ClientIdParameter clientId, @ApiParam( value = "The process group id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { @@ -415,20 +397,22 @@ public class ProcessGroupResource extends ApplicationResource { // handle expects request (usually from the cluster manager) final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - serviceFacade.claimRevision(revision); - } - if (validationPhase) { - serviceFacade.verifyDeleteProcessGroup(id); - return generateContinueResponse().build(); - } - - // delete the process group - final ProcessGroupEntity entity = serviceFacade.deleteProcessGroup(revision, id); - - // create the response - return clusterContext(generateOkResponse(entity)).build(); + return withWriteLock( + serviceFacade, + revision, + lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(id); + processGroup.authorize(authorizer, RequestAction.WRITE); + }, + () -> serviceFacade.verifyDeleteProcessGroup(id), + () -> { + // delete the process group + final ProcessGroupEntity entity = serviceFacade.deleteProcessGroup(revision, id); + + // create the response + return clusterContext(generateOkResponse(entity)).build(); + } + ); } /** @@ -461,17 +445,16 @@ public class ProcessGroupResource extends ApplicationResource { } ) public Response createProcessGroup( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The process group id.", - required = false + required = true ) - @PathParam("id") String groupId, + @PathParam("id") final String groupId, @ApiParam( value = "The process group configuration details.", required = true - ) - ProcessGroupEntity processGroupEntity) { + ) final ProcessGroupEntity processGroupEntity) { if (processGroupEntity == null || processGroupEntity.getComponent() == null) { throw new IllegalArgumentException("Process group details must be specified."); @@ -492,8 +475,15 @@ public class ProcessGroupResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); - if (expects != null) { + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId); + processGroup.authorize(authorizer, RequestAction.WRITE); + }); + } + if (validationPhase) { return generateContinueResponse().build(); } @@ -542,13 +532,19 @@ public class ProcessGroupResource extends ApplicationResource { value = "The process group id.", required = true ) - @PathParam("id") String groupId) { + @PathParam("id") final String groupId) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId); + processGroup.authorize(authorizer, RequestAction.READ); + }); + // get the process groups final Set<ProcessGroupEntity> entities = serviceFacade.getProcessGroups(groupId); @@ -601,17 +597,16 @@ public class ProcessGroupResource extends ApplicationResource { } ) public Response createProcessor( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The process group id.", required = true ) - @PathParam("id") String groupId, + @PathParam("id") final String groupId, @ApiParam( value = "The processor configuration details.", required = true - ) - ProcessorEntity processorEntity) { + ) final ProcessorEntity processorEntity) { if (processorEntity == null || processorEntity.getComponent() == null) { throw new IllegalArgumentException("Processor details must be specified."); @@ -636,8 +631,15 @@ public class ProcessGroupResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); - if (expects != null) { + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId); + processGroup.authorize(authorizer, RequestAction.WRITE); + }); + } + if (validationPhase) { return generateContinueResponse().build(); } @@ -687,13 +689,19 @@ public class ProcessGroupResource extends ApplicationResource { value = "The process group id.", required = true ) - @PathParam("id") String groupId) { + @PathParam("id") final String groupId) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId); + processGroup.authorize(authorizer, RequestAction.READ); + }); + // get the processors final Set<ProcessorEntity> processors = serviceFacade.getProcessors(groupId); @@ -739,16 +747,16 @@ public class ProcessGroupResource extends ApplicationResource { } ) public Response createInputPort( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The process group id.", required = true ) - @PathParam("id") String groupId, + @PathParam("id") final String groupId, @ApiParam( value = "The input port configuration details.", required = true - ) PortEntity portEntity) { + ) final PortEntity portEntity) { if (portEntity == null || portEntity.getComponent() == null) { throw new IllegalArgumentException("Port details must be specified."); @@ -769,8 +777,15 @@ public class ProcessGroupResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); - if (expects != null) { + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId); + processGroup.authorize(authorizer, RequestAction.WRITE); + }); + } + if (validationPhase) { return generateContinueResponse().build(); } @@ -818,13 +833,19 @@ public class ProcessGroupResource extends ApplicationResource { value = "The process group id.", required = true ) - @PathParam("id") String groupId) { + @PathParam("id") final String groupId) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId); + processGroup.authorize(authorizer, RequestAction.READ); + }); + // get all the input ports final Set<PortEntity> inputPorts = serviceFacade.getInputPorts(groupId); @@ -869,16 +890,16 @@ public class ProcessGroupResource extends ApplicationResource { } ) public Response createOutputPort( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The process group id.", required = true ) - @PathParam("id") String groupId, + @PathParam("id") final String groupId, @ApiParam( value = "The output port configuration.", required = true - ) PortEntity portEntity) { + ) final PortEntity portEntity) { if (portEntity == null || portEntity.getComponent() == null) { throw new IllegalArgumentException("Port details must be specified."); @@ -899,8 +920,15 @@ public class ProcessGroupResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); - if (expects != null) { + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId); + processGroup.authorize(authorizer, RequestAction.WRITE); + }); + } + if (validationPhase) { return generateContinueResponse().build(); } @@ -948,13 +976,19 @@ public class ProcessGroupResource extends ApplicationResource { value = "The process group id.", required = true ) - @PathParam("id") String groupId) { + @PathParam("id") final String groupId) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId); + processGroup.authorize(authorizer, RequestAction.READ); + }); + // get all the output ports final Set<PortEntity> outputPorts = serviceFacade.getOutputPorts(groupId); @@ -1000,16 +1034,16 @@ public class ProcessGroupResource extends ApplicationResource { } ) public Response createFunnel( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The process group id.", required = true ) - @PathParam("id") String groupId, + @PathParam("id") final String groupId, @ApiParam( value = "The funnel configuration details.", required = true - ) FunnelEntity funnelEntity) { + ) final FunnelEntity funnelEntity) { if (funnelEntity == null || funnelEntity.getComponent() == null) { throw new IllegalArgumentException("Funnel details must be specified."); @@ -1030,8 +1064,15 @@ public class ProcessGroupResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); - if (expects != null) { + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId); + processGroup.authorize(authorizer, RequestAction.WRITE); + }); + } + if (validationPhase) { return generateContinueResponse().build(); } @@ -1079,13 +1120,19 @@ public class ProcessGroupResource extends ApplicationResource { value = "The process group id.", required = true ) - @PathParam("id") String groupId) { + @PathParam("id") final String groupId) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId); + processGroup.authorize(authorizer, RequestAction.READ); + }); + // get all the funnels final Set<FunnelEntity> funnels = serviceFacade.getFunnels(groupId); @@ -1131,16 +1178,16 @@ public class ProcessGroupResource extends ApplicationResource { } ) public Response createLabel( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The process group id.", required = true ) - @PathParam("id") String groupId, + @PathParam("id") final String groupId, @ApiParam( value = "The label configuration details.", required = true - ) LabelEntity labelEntity) { + ) final LabelEntity labelEntity) { if (labelEntity == null || labelEntity.getComponent() == null) { throw new IllegalArgumentException("Label details must be specified."); @@ -1161,8 +1208,15 @@ public class ProcessGroupResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); - if (expects != null) { + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId); + processGroup.authorize(authorizer, RequestAction.WRITE); + }); + } + if (validationPhase) { return generateContinueResponse().build(); } @@ -1210,13 +1264,19 @@ public class ProcessGroupResource extends ApplicationResource { value = "The process group id.", required = true ) - @PathParam("id") String groupId) { + @PathParam("id") final String groupId) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId); + processGroup.authorize(authorizer, RequestAction.READ); + }); + // get all the labels final Set<LabelEntity> labels = serviceFacade.getLabels(groupId); @@ -1262,16 +1322,16 @@ public class ProcessGroupResource extends ApplicationResource { } ) public Response createRemoteProcessGroup( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The process group id.", required = true ) - @PathParam("id") String groupId, + @PathParam("id") final String groupId, @ApiParam( value = "The remote process group configuration details.", required = true - ) RemoteProcessGroupEntity remoteProcessGroupEntity) { + ) final RemoteProcessGroupEntity remoteProcessGroupEntity) { if (remoteProcessGroupEntity == null || remoteProcessGroupEntity.getComponent() == null) { throw new IllegalArgumentException("Remote process group details must be specified."); @@ -1298,8 +1358,15 @@ public class ProcessGroupResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); - if (expects != null) { + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId); + processGroup.authorize(authorizer, RequestAction.WRITE); + }); + } + if (validationPhase) { return generateContinueResponse().build(); } @@ -1373,18 +1440,24 @@ public class ProcessGroupResource extends ApplicationResource { value = "Whether to include any encapulated ports or just details about the remote process group.", required = false ) - @QueryParam("verbose") @DefaultValue(VERBOSE) Boolean verbose, + @QueryParam("verbose") @DefaultValue(VERBOSE) final Boolean verbose, @ApiParam( value = "The process group id.", required = true ) - @PathParam("id") String groupId) { + @PathParam("id") final String groupId) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId); + processGroup.authorize(authorizer, RequestAction.READ); + }); + // get all the remote process groups final Set<RemoteProcessGroupEntity> remoteProcessGroups = serviceFacade.getRemoteProcessGroups(groupId); @@ -1439,16 +1512,16 @@ public class ProcessGroupResource extends ApplicationResource { } ) public Response createConnection( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The process group id.", required = true ) - @PathParam("id") String groupId, + @PathParam("id") final String groupId, @ApiParam( value = "The connection configuration details.", required = true - ) ConnectionEntity connectionEntity) { + ) final ConnectionEntity connectionEntity) { if (connectionEntity == null || connectionEntity.getComponent() == null) { throw new IllegalArgumentException("Connection details must be specified."); @@ -1472,8 +1545,15 @@ public class ProcessGroupResource extends ApplicationResource { final ConnectionDTO connection = connectionEntity.getComponent(); // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); - if (expects != null) { + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId); + processGroup.authorize(authorizer, RequestAction.WRITE); + }); + } + if (validationPhase) { serviceFacade.verifyCreateConnection(groupId, connection); return generateContinueResponse().build(); } @@ -1530,6 +1610,12 @@ public class ProcessGroupResource extends ApplicationResource { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId); + processGroup.authorize(authorizer, RequestAction.READ); + }); + // all of the relationships for the specified source processor Set<ConnectionEntity> connections = serviceFacade.getConnections(groupId); @@ -1541,320 +1627,6 @@ public class ProcessGroupResource extends ApplicationResource { return clusterContext(generateOkResponse(entity)).build(); } - // -------- - // snippets - // -------- - - /** - * Creates a snippet based off the specified configuration. - * - * @param httpServletRequest request - * @param snippetEntity A snippetEntity - * @return A snippetEntity - */ - @POST - @Consumes(MediaType.APPLICATION_JSON) - @Produces(MediaType.APPLICATION_JSON) - @Path("{id}/snippets") - // TODO - @PreAuthorize("hasRole('ROLE_DFM')") - @ApiOperation( - value = "Creates a snippet", - response = SnippetEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"), - @Authorization(value = "Administrator", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response createSnippet( - @Context HttpServletRequest httpServletRequest, - @ApiParam( - value = "The process group id.", - required = true - ) - @PathParam("id") String groupId, - @ApiParam( - value = "The snippet configuration details.", - required = true - ) - final SnippetEntity snippetEntity) { - - if (snippetEntity == null || snippetEntity.getSnippet() == null) { - throw new IllegalArgumentException("Snippet details must be specified."); - } - - if (snippetEntity.getSnippet().getId() != null) { - throw new IllegalArgumentException("Snippet ID cannot be specified."); - } - - // ensure the group id has been specified - if (snippetEntity.getSnippet().getParentGroupId() == null) { - throw new IllegalArgumentException("The group id must be specified when creating a snippet."); - } - - if (snippetEntity.getSnippet().getParentGroupId() != null && !groupId.equals(snippetEntity.getSnippet().getParentGroupId())) { - throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s", - snippetEntity.getSnippet().getParentGroupId(), groupId)); - } - snippetEntity.getSnippet().setParentGroupId(groupId); - - if (properties.isClusterManager()) { - return clusterManager.applyRequest(HttpMethod.POST, getAbsolutePath(), snippetEntity, getHeaders()).getResponse(); - } - - // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); - if (expects != null) { - return generateContinueResponse().build(); - } - - // set the processor id as appropriate - snippetEntity.getSnippet().setId(generateUuid()); - - // create the snippet - final SnippetEntity entity = serviceFacade.createSnippet(snippetEntity.getSnippet()); - populateRemainingSnippetEntityContent(entity); - - // build the response - return clusterContext(generateCreatedResponse(URI.create(entity.getSnippet().getUri()), entity)).build(); - } - - /** - * Retrieves the specified snippet. - * - * @param id The id of the snippet to retrieve. - * @return A snippetEntity. - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces(MediaType.APPLICATION_JSON) - @Path("{id}/snippets/{snippet-id}") - // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") - @ApiOperation( - value = "Gets a snippet", - response = SnippetEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"), - @Authorization(value = "Administrator", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response getSnippet( - @ApiParam( - value = "The process group id.", - required = true - ) - @PathParam("id") String groupId, - @ApiParam( - value = "The snippet id.", - required = true - ) - @PathParam("snippet-id") String id) { - - // replicate if cluster manager - if (properties.isClusterManager()) { - return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); - } - - // get the snippet - final SnippetEntity entity = serviceFacade.getSnippet(id); - populateRemainingSnippetEntityContent(entity); - - return clusterContext(generateOkResponse(entity)).build(); - } - - /** - * Updates the specified snippet. The contents of the snippet (component - * ids) cannot be updated once the snippet is created. - * - * @param httpServletRequest request - * @param id The id of the snippet. - * @param snippetEntity A snippetEntity - * @return A snippetEntity - */ - @PUT - @Consumes(MediaType.APPLICATION_JSON) - @Produces(MediaType.APPLICATION_JSON) - @Path("{id}/snippets/{snippet-id}") - // TODO - @PreAuthorize("hasRole('ROLE_DFM')") - @ApiOperation( - value = "Updates a snippet", - response = SnippetEntity.class, - authorizations = { - @Authorization(value = "Data Flow Manager", type = "ROLE_DFM") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response updateSnippet( - @Context HttpServletRequest httpServletRequest, - @ApiParam( - value = "The process group id.", - required = true - ) - @PathParam("id") String groupId, - @ApiParam( - value = "The snippet id.", - required = true - ) - @PathParam("snippet-id") String id, - @ApiParam( - value = "The snippet configuration details.", - required = true - ) final SnippetEntity snippetEntity) { - - if (snippetEntity == null || snippetEntity.getSnippet() == null) { - throw new IllegalArgumentException("Snippet details must be specified."); - } - - if (snippetEntity.getRevision() == null) { - throw new IllegalArgumentException("Revision must be specified."); - } - - // ensure the ids are the same - final SnippetDTO requestSnippetDTO = snippetEntity.getSnippet(); - if (!id.equals(requestSnippetDTO.getId())) { - throw new IllegalArgumentException(String.format("The snippet id (%s) in the request body does not equal the " - + "snippet id of the requested resource (%s).", requestSnippetDTO.getId(), id)); - } - - // replicate if cluster manager - if (properties.isClusterManager()) { - return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), snippetEntity, getHeaders()).getResponse(); - } - - // handle expects request (usually from the cluster manager) - final Revision revision = getRevision(snippetEntity, id); - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - serviceFacade.claimRevision(revision); - } - if (validationPhase) { - serviceFacade.verifyUpdateSnippet(requestSnippetDTO); - return generateContinueResponse().build(); - } - - // update the snippet - final UpdateResult<SnippetEntity> updateResult = serviceFacade.updateSnippet(revision, snippetEntity.getSnippet()); - - // get the results - final SnippetEntity entity = updateResult.getResult(); - populateRemainingSnippetEntityContent(entity); - - if (updateResult.isNew()) { - return clusterContext(generateCreatedResponse(URI.create(entity.getSnippet().getUri()), entity)).build(); - } else { - return clusterContext(generateOkResponse(entity)).build(); - } - } - - /** - * Removes the specified snippet. - * - * @param httpServletRequest request - * @param version The revision is used to verify the client is working with - * the latest version of the flow. - * @param clientId Optional client id. If the client id is not specified, a - * new one will be generated. This value (whether specified or generated) is - * included in the response. - * @param id The id of the snippet to remove. - * @return A entity containing the client id and an updated revision. - */ - @DELETE - @Consumes(MediaType.WILDCARD) - @Produces(MediaType.APPLICATION_JSON) - @Path("{id}/snippets/{snippet-id}") - // TODO - @PreAuthorize("hasRole('ROLE_DFM')") - @ApiOperation( - value = "Deletes a snippet", - response = SnippetEntity.class, - authorizations = { - @Authorization(value = "Data Flow Manager", type = "ROLE_DFM") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response deleteSnippet( - @Context HttpServletRequest httpServletRequest, - @ApiParam( - value = "The revision is used to verify the client is working with the latest version of the flow.", - required = false - ) - @QueryParam(VERSION) LongParameter version, - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( - value = "The process group id.", - required = true - ) - @PathParam("id") String groupId, - @ApiParam( - value = "The snippet id.", - required = true - ) - @PathParam("snippet-id") String id) { - - // replicate if cluster manager - if (properties.isClusterManager()) { - return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); - } - - final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); - - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - - // We need to claim the revision for the Processor if either this is the first phase of a two-phase - // request, or if this is not a two-phase request. - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - serviceFacade.claimRevision(revision); - } - if (validationPhase) { - serviceFacade.verifyDeleteSnippet(id); - return generateContinueResponse().build(); - } - - // delete the specified snippet - final SnippetEntity snippetEntity = serviceFacade.deleteSnippet(revision, id); - - return clusterContext(generateOkResponse(snippetEntity)).build(); - } - // ---------------- // snippet instance // ---------------- @@ -1908,14 +1680,24 @@ public class ProcessGroupResource extends ApplicationResource { throw new IllegalArgumentException("The origin position (x, y) must be specified"); } + if (copySnippetEntity.getSnippetId() == null) { + throw new IllegalArgumentException("The snippet id must be specified."); + } + // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.POST, getAbsolutePath(), copySnippetEntity, getHeaders()).getResponse(); } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); - if (expects != null) { + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + // authorize access + serviceFacade.authorizeAccess(lookup -> { + authorizeSnippetUsage(lookup, groupId, copySnippetEntity.getSnippetId()); + }); + } + if (validationPhase) { return generateContinueResponse().build(); } @@ -1997,8 +1779,18 @@ public class ProcessGroupResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); - if (expects != null) { + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId); + processGroup.authorize(authorizer, RequestAction.WRITE); + + final Authorizable template = lookup.getTemplate(instantiateTemplateRequestEntity.getTemplateId()); + template.authorize(authorizer, RequestAction.READ); + }); + } + if (validationPhase) { return generateContinueResponse().build(); } @@ -2024,6 +1816,15 @@ public class ProcessGroupResource extends ApplicationResource { // templates // --------- + private void authorizeSnippetUsage(final AuthorizableLookup lookup, final String groupId, final String snippetId) { + // ensure write access to the target process group + lookup.getProcessGroup(groupId).authorize(authorizer, RequestAction.WRITE); + + // ensure read permission to every component in the snippet + final Snippet snippet = lookup.getSnippet(snippetId); + authorizeSnippet(snippet, authorizer, lookup, RequestAction.READ); + } + /** * Retrieves all the of templates in this NiFi. * @@ -2056,13 +1857,19 @@ public class ProcessGroupResource extends ApplicationResource { value = "The process group id.", required = true ) - @PathParam("id") String groupId) { + @PathParam("id") final String groupId) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId); + processGroup.authorize(authorizer, RequestAction.READ); + }); + // get all the templates final Set<TemplateDTO> templates = templateResource.populateRemainingTemplatesContent(serviceFacade.getTemplates()); @@ -2104,18 +1911,20 @@ public class ProcessGroupResource extends ApplicationResource { } ) public Response createTemplate( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The process group id.", required = true ) - @PathParam("id") String groupId, + @PathParam("id") final String groupId, @ApiParam( value = "The create template request.", required = true - ) CreateTemplateRequestEntity createTemplateRequestEntity) { + ) final CreateTemplateRequestEntity createTemplateRequestEntity) { - // TODO - verify parent group id + if (createTemplateRequestEntity.getSnippetId() == null) { + throw new IllegalArgumentException("The snippet identifier must be specified."); + } // replicate if cluster manager if (properties.isClusterManager()) { @@ -2123,8 +1932,14 @@ public class ProcessGroupResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); - if (expects != null) { + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + // authorize access + serviceFacade.authorizeAccess(lookup -> { + authorizeSnippetUsage(lookup, groupId, createTemplateRequestEntity.getSnippetId()); + }); + } + if (validationPhase) { return generateContinueResponse().build(); } @@ -2157,14 +1972,14 @@ public class ProcessGroupResource extends ApplicationResource { @Path("{id}/templates/upload") // TODO - @PreAuthorize("hasRole('ROLE_DFM')") public Response uploadTemplate( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The process group id.", required = true ) - @PathParam("id") String groupId, - @FormDataParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @FormDataParam("template") InputStream in) { + @PathParam("id") final String groupId, + @FormDataParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) final ClientIdParameter clientId, + @FormDataParam("template") final InputStream in) { // unmarshal the template final TemplateDTO template; @@ -2227,13 +2042,13 @@ public class ProcessGroupResource extends ApplicationResource { @Path("{id}/templates/import") // TODO - @PreAuthorize("hasRole('ROLE_DFM')") public Response importTemplate( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The process group id.", required = true ) - @PathParam("id") String groupId, - TemplateEntity templateEntity) { + @PathParam("id") final String groupId, + final TemplateEntity templateEntity) { // replicate if cluster manager if (properties.isClusterManager()) { @@ -2241,8 +2056,15 @@ public class ProcessGroupResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); - if (expects != null) { + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId); + processGroup.authorize(authorizer, RequestAction.WRITE); + }); + } + if (validationPhase) { return generateContinueResponse().build(); } @@ -2306,16 +2128,16 @@ public class ProcessGroupResource extends ApplicationResource { } ) public Response createControllerService( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The process group id.", required = true ) - @PathParam("id") String groupId, + @PathParam("id") final String groupId, @ApiParam( value = "The controller service configuration details.", required = true - ) ControllerServiceEntity controllerServiceEntity) { + ) final ControllerServiceEntity controllerServiceEntity) { if (controllerServiceEntity == null || controllerServiceEntity.getComponent() == null) { throw new IllegalArgumentException("Controller service details must be specified."); @@ -2340,8 +2162,15 @@ public class ProcessGroupResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); - if (expects != null) { + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId); + processGroup.authorize(authorizer, RequestAction.WRITE); + }); + } + if (validationPhase) { return generateContinueResponse().build(); } @@ -2404,4 +2233,8 @@ public class ProcessGroupResource extends ApplicationResource { public void setProperties(NiFiProperties properties) { this.properties = properties; } + + public void setAuthorizer(Authorizer authorizer) { + this.authorizer = authorizer; + } }
http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java index 4178774..2370d62 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java @@ -23,6 +23,9 @@ import com.wordnik.swagger.annotations.ApiResponse; import com.wordnik.swagger.annotations.ApiResponses; import com.wordnik.swagger.annotations.Authorization; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; import org.apache.nifi.cluster.manager.impl.WebClusterManager; import org.apache.nifi.cluster.node.Node; @@ -83,6 +86,7 @@ public class ProcessorResource extends ApplicationResource { private NiFiServiceFacade serviceFacade; private WebClusterManager clusterManager; private NiFiProperties properties; + private Authorizer authorizer; @Context private ServletContext servletContext; @@ -193,13 +197,19 @@ public class ProcessorResource extends ApplicationResource { value = "The processor id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processor = lookup.getProcessor(id); + processor.authorize(authorizer, RequestAction.READ); + }); + // get the specified processor final ProcessorEntity entity = serviceFacade.getProcessor(id); populateRemainingProcessorEntityContent(entity); @@ -243,17 +253,17 @@ public class ProcessorResource extends ApplicationResource { value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", required = false ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) final ClientIdParameter clientId, @ApiParam( value = "The processor id.", required = true ) - @PathParam("id") String id, + @PathParam("id") final String id, @ApiParam( value = "The property name.", required = true ) - @QueryParam("propertyName") String propertyName) { + @QueryParam("propertyName") final String propertyName) { // ensure the property name is specified if (propertyName == null) { @@ -265,6 +275,12 @@ public class ProcessorResource extends ApplicationResource { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processor = lookup.getProcessor(id); + processor.authorize(authorizer, RequestAction.READ); + }); + // get the property descriptor final PropertyDescriptorDTO descriptor = serviceFacade.getProcessorPropertyDescriptor(id, propertyName); @@ -308,13 +324,19 @@ public class ProcessorResource extends ApplicationResource { value = "The processor id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processor = lookup.getProcessor(id); + processor.authorize(authorizer, RequestAction.WRITE); + }); + // get the component state final ComponentStateDTO state = serviceFacade.getProcessorState(id); @@ -356,17 +378,16 @@ public class ProcessorResource extends ApplicationResource { } ) public Response clearState( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The revision used to verify the client is working with the latest version of the flow.", required = true - ) - ComponentStateEntity revisionEntity, + ) final ComponentStateEntity revisionEntity, @ApiParam( value = "The processor id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // ensure the revision was specified if (revisionEntity == null) { @@ -380,6 +401,11 @@ public class ProcessorResource extends ApplicationResource { // handle expects request (usually from the cluster manager) if (isValidationPhase(httpServletRequest)) { + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processor = lookup.getProcessor(id); + processor.authorize(authorizer, RequestAction.WRITE); + }); serviceFacade.verifyCanClearProcessorState(id); return generateContinueResponse().build(); } @@ -424,17 +450,16 @@ public class ProcessorResource extends ApplicationResource { } ) public Response updateProcessor( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The processor id.", required = true ) - @PathParam("id") String id, + @PathParam("id") final String id, @ApiParam( value = "The processor configuration details.", required = true - ) - ProcessorEntity processorEntity) { + ) final ProcessorEntity processorEntity) { if (processorEntity == null || processorEntity.getComponent() == null) { throw new IllegalArgumentException("Processor details must be specified."); @@ -468,41 +493,27 @@ public class ProcessorResource extends ApplicationResource { // handle expects request (usually from the cluster manager) final Revision revision = getRevision(processorEntity, id); - final boolean validationPhase = isValidationPhase(httpServletRequest); - final boolean twoPhaseRequest = isTwoPhaseRequest(httpServletRequest); - final String requestId = getHeaders().get("X-RequestTransactionId"); - - logger.debug("For Update Processor, Validation Phase = {}, Two-phase request = {}, Request ID = {}", validationPhase, twoPhaseRequest, requestId); - if (validationPhase || !twoPhaseRequest) { - serviceFacade.claimRevision(revision); - logger.debug("Claimed Revision {}", revision); - } - if (validationPhase) { - serviceFacade.verifyUpdateProcessor(requestProcessorDTO); - logger.debug("Verified Update of Processor"); - return generateContinueResponse().build(); - } - - // update the processor - final UpdateResult<ProcessorEntity> result; - try { - logger.debug("Updating Processor with Revision {}", revision); - result = serviceFacade.updateProcessor(revision, requestProcessorDTO); - logger.debug("Updated Processor with Revision {}", revision); - } catch (final Exception e) { - final boolean tpr = isTwoPhaseRequest(httpServletRequest); - logger.error("Got Exception trying to update processor. two-phase request = {}, validation phase = {}, revision = {}", tpr, validationPhase, revision); - logger.error("", e); - throw e; - } - final ProcessorEntity entity = result.getResult(); - populateRemainingProcessorEntityContent(entity); - - if (result.isNew()) { - return clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()), entity)).build(); - } else { - return clusterContext(generateOkResponse(entity)).build(); - } + return withWriteLock( + serviceFacade, + revision, + lookup -> { + final Authorizable processor = lookup.getProcessor(id); + processor.authorize(authorizer, RequestAction.WRITE); + }, + () -> serviceFacade.verifyUpdateProcessor(requestProcessorDTO), + () -> { + // update the processor + final UpdateResult<ProcessorEntity> result = serviceFacade.updateProcessor(revision, requestProcessorDTO); + final ProcessorEntity entity = result.getResult(); + populateRemainingProcessorEntityContent(entity); + + if (result.isNew()) { + return clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()), entity)).build(); + } else { + return clusterContext(generateOkResponse(entity)).build(); + } + } + ); } /** @@ -536,22 +547,22 @@ public class ProcessorResource extends ApplicationResource { } ) public Response deleteProcessor( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The revision is used to verify the client is working with the latest version of the flow.", required = false ) - @QueryParam(VERSION) LongParameter version, + @QueryParam(VERSION) final LongParameter version, @ApiParam( value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", required = false ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) final ClientIdParameter clientId, @ApiParam( value = "The processor id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { @@ -559,25 +570,22 @@ public class ProcessorResource extends ApplicationResource { } final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); - - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - - // We need to claim the revision for the Processor if either this is the first phase of a two-phase - // request, or if this is not a two-phase request. - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - serviceFacade.claimRevision(revision); - } - if (validationPhase) { - serviceFacade.verifyDeleteProcessor(id); - return generateContinueResponse().build(); - } - - // delete the processor - final ProcessorEntity entity = serviceFacade.deleteProcessor(revision, id); - - // generate the response - return clusterContext(generateOkResponse(entity)).build(); + return withWriteLock( + serviceFacade, + revision, + lookup -> { + final Authorizable processor = lookup.getProcessor(id); + processor.authorize(authorizer, RequestAction.WRITE); + }, + () -> serviceFacade.verifyDeleteProcessor(id), + () -> { + // delete the processor + final ProcessorEntity entity = serviceFacade.deleteProcessor(revision, id); + + // generate the response + return clusterContext(generateOkResponse(entity)).build(); + } + ); } // setters @@ -592,4 +600,8 @@ public class ProcessorResource extends ApplicationResource { public void setProperties(NiFiProperties properties) { this.properties = properties; } + + public void setAuthorizer(Authorizer authorizer) { + this.authorizer = authorizer; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceResource.java index a01f6e6..06621b2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceResource.java @@ -16,31 +16,22 @@ */ package org.apache.nifi.web.api; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.URI; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.GET; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.StreamingOutput; - +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.AccessDeniedException; +import org.apache.nifi.authorization.AuthorizationRequest; +import org.apache.nifi.authorization.AuthorizationResult; +import org.apache.nifi.authorization.AuthorizationResult.Result; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.authorization.resource.ResourceFactory; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.impl.WebClusterManager; @@ -65,12 +56,29 @@ import org.apache.nifi.web.api.request.LongParameter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; /** @@ -89,6 +97,7 @@ public class ProvenanceResource extends ApplicationResource { private NiFiProperties properties; private NiFiServiceFacade serviceFacade; private WebClusterManager clusterManager; + private Authorizer authorizer; /** * Populates the uri for the specified provenance. @@ -106,6 +115,24 @@ public class ProvenanceResource extends ApplicationResource { return lineage; } + private void authorizeProvenanceRequest() { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + final AuthorizationRequest request = new AuthorizationRequest.Builder() + .resource(ResourceFactory.getProvenanceResource()) + .identity(user.getIdentity()) + .anonymous(user.isAnonymous()) + .accessAttempt(true) + .action(RequestAction.READ) + .build(); + + final AuthorizationResult result = authorizer.authorize(request); + if (!Result.Approved.equals(result.getResult())) { + final String message = StringUtils.isNotBlank(result.getExplanation()) ? result.getExplanation() : "Access is denied"; + throw new AccessDeniedException(message); + } + } + /** * Gets the provenance search options for this NiFi. * @@ -133,6 +160,8 @@ public class ProvenanceResource extends ApplicationResource { ) public Response getSearchOptions() { + authorizeProvenanceRequest(); + // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); @@ -178,11 +207,13 @@ public class ProvenanceResource extends ApplicationResource { } ) public Response submitReplay( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The replay request.", required = true - ) SubmitReplayRequestEntity replayRequestEntity) { + ) final SubmitReplayRequestEntity replayRequestEntity) { + + authorizeProvenanceRequest(); // ensure the event id is specified if (replayRequestEntity == null || replayRequestEntity.getEventId() == null) { @@ -259,12 +290,14 @@ public class ProvenanceResource extends ApplicationResource { value = "The id of the node where the content exists if clustered.", required = false ) - @QueryParam("clusterNodeId") String clusterNodeId, + @QueryParam("clusterNodeId") final String clusterNodeId, @ApiParam( value = "The provenance event id.", required = true ) - @PathParam("id") LongParameter id) { + @PathParam("id") final LongParameter id) { + + authorizeProvenanceRequest(); // ensure proper input if (id == null) { @@ -352,12 +385,14 @@ public class ProvenanceResource extends ApplicationResource { value = "The id of the node where the content exists if clustered.", required = false ) - @QueryParam("clusterNodeId") String clusterNodeId, + @QueryParam("clusterNodeId") final String clusterNodeId, @ApiParam( value = "The provenance event id.", required = true ) - @PathParam("id") LongParameter id) { + @PathParam("id") final LongParameter id) { + + authorizeProvenanceRequest(); // ensure proper input if (id == null) { @@ -445,12 +480,14 @@ public class ProvenanceResource extends ApplicationResource { } ) public Response submitProvenanceRequest( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The provenance query details.", required = true ) ProvenanceEntity provenanceEntity) { + authorizeProvenanceRequest(); + // check the request if (provenanceEntity == null) { provenanceEntity = new ProvenanceEntity(); @@ -546,12 +583,14 @@ public class ProvenanceResource extends ApplicationResource { value = "The id of the node where this query exists if clustered.", required = false ) - @QueryParam("clusterNodeId") String clusterNodeId, + @QueryParam("clusterNodeId") final String clusterNodeId, @ApiParam( value = "The id of the provenance query.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { + + authorizeProvenanceRequest(); // replicate if cluster manager if (properties.isClusterManager()) { @@ -617,17 +656,19 @@ public class ProvenanceResource extends ApplicationResource { } ) public Response deleteProvenance( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The id of the node where this query exists if clustered.", required = false ) - @QueryParam("clusterNodeId") String clusterNodeId, + @QueryParam("clusterNodeId") final String clusterNodeId, @ApiParam( value = "The id of the provenance query.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { + + authorizeProvenanceRequest(); // replicate if cluster manager if (properties.isClusterManager()) { @@ -699,12 +740,14 @@ public class ProvenanceResource extends ApplicationResource { value = "The id of the node where this event exists if clustered.", required = false ) - @QueryParam("clusterNodeId") String clusterNodeId, + @QueryParam("clusterNodeId") final String clusterNodeId, @ApiParam( value = "The provenence event id.", required = true ) - @PathParam("id") LongParameter id) { + @PathParam("id") final LongParameter id) { + + authorizeProvenanceRequest(); // ensure the id is specified if (id == null) { @@ -780,12 +823,13 @@ public class ProvenanceResource extends ApplicationResource { } ) public Response submitLineageRequest( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The lineage query details.", required = true - ) - final LineageEntity lineageEntity) { + ) final LineageEntity lineageEntity) { + + authorizeProvenanceRequest(); if (lineageEntity == null || lineageEntity.getLineage() == null || lineageEntity.getLineage().getRequest() == null) { throw new IllegalArgumentException("Lineage request must be specified."); @@ -892,12 +936,14 @@ public class ProvenanceResource extends ApplicationResource { value = "The id of the node where this query exists if clustered.", required = false ) - @QueryParam("clusterNodeId") String clusterNodeId, + @QueryParam("clusterNodeId") final String clusterNodeId, @ApiParam( value = "The id of the lineage query.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { + + authorizeProvenanceRequest(); // replicate if cluster manager if (properties.isClusterManager()) { @@ -961,17 +1007,19 @@ public class ProvenanceResource extends ApplicationResource { } ) public Response deleteLineage( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The id of the node where this query exists if clustered.", required = false ) - @QueryParam("clusterNodeId") String clusterNodeId, + @QueryParam("clusterNodeId") final String clusterNodeId, @ApiParam( value = "The id of the lineage query.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { + + authorizeProvenanceRequest(); // replicate if cluster manager if (properties.isClusterManager()) { @@ -1020,4 +1068,8 @@ public class ProvenanceResource extends ApplicationResource { public void setServiceFacade(NiFiServiceFacade serviceFacade) { this.serviceFacade = serviceFacade; } + + public void setAuthorizer(Authorizer authorizer) { + this.authorizer = authorizer; + } }