http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java index f6dd081..0b8af7c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java @@ -16,32 +16,16 @@ */ package org.apache.nifi.web.api; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.URI; -import java.util.HashSet; -import java.util.Set; - -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.Status; -import javax.ws.rs.core.StreamingOutput; - +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.impl.WebClusterManager; @@ -60,12 +44,29 @@ import org.apache.nifi.web.api.entity.FlowFileEntity; import org.apache.nifi.web.api.entity.ListingRequestEntity; import org.apache.nifi.web.api.request.ClientIdParameter; -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.StreamingOutput; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.util.HashSet; +import java.util.Set; /** * RESTful endpoint for managing a flowfile queue. @@ -75,12 +76,12 @@ import com.wordnik.swagger.annotations.Authorization; value = "/flowfile-queues", description = "Endpoint for managing a FlowFile Queue." ) -// TODO: Need revisions of the Connections for these endpoints! public class FlowFileQueueResource extends ApplicationResource { private NiFiServiceFacade serviceFacade; private WebClusterManager clusterManager; private NiFiProperties properties; + private Authorizer authorizer; /** * Populate the URIs for the specified flowfile listing. @@ -147,17 +148,17 @@ public class FlowFileQueueResource extends ApplicationResource { value = "The connection id.", required = true ) - @PathParam("connection-id") String connectionId, + @PathParam("connection-id") final String connectionId, @ApiParam( value = "The flowfile uuid.", required = true ) - @PathParam("flowfile-uuid") String flowFileUuid, + @PathParam("flowfile-uuid") final String flowFileUuid, @ApiParam( value = "The id of the node where the content exists if clustered.", required = false ) - @QueryParam("clusterNodeId") String clusterNodeId) { + @QueryParam("clusterNodeId") final String clusterNodeId) { // replicate if cluster manager if (properties.isClusterManager()) { @@ -179,6 +180,12 @@ public class FlowFileQueueResource extends ApplicationResource { } } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable connection = lookup.getConnection(connectionId); + connection.authorize(authorizer, RequestAction.WRITE); + }); + // get the flowfile final FlowFileDTO flowfileDto = serviceFacade.getFlowFile(connectionId, flowFileUuid); populateRemainingFlowFileContent(connectionId, flowfileDto); @@ -224,22 +231,22 @@ public class FlowFileQueueResource extends ApplicationResource { value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", required = false ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) final ClientIdParameter clientId, @ApiParam( value = "The connection id.", required = true ) - @PathParam("connection-id") String connectionId, + @PathParam("connection-id") final String connectionId, @ApiParam( value = "The flowfile uuid.", required = true ) - @PathParam("flowfile-uuid") String flowFileUuid, + @PathParam("flowfile-uuid") final String flowFileUuid, @ApiParam( value = "The id of the node where the content exists if clustered.", required = false ) - @QueryParam("clusterNodeId") String clusterNodeId) { + @QueryParam("clusterNodeId") final String clusterNodeId) { // replicate if cluster manager if (properties.isClusterManager()) { @@ -261,6 +268,12 @@ public class FlowFileQueueResource extends ApplicationResource { } } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable connection = lookup.getConnection(connectionId); + connection.authorize(authorizer, RequestAction.WRITE); + }); + // get the uri of the request final String uri = generateResourceUri("flowfile-queues", connectionId, "flowfiles", flowFileUuid, "content"); @@ -320,18 +333,24 @@ public class FlowFileQueueResource extends ApplicationResource { } ) public Response createFlowFileListing( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The connection id.", required = true ) - @PathParam("connection-id") String id) { + @PathParam("connection-id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.POST, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable connection = lookup.getConnection(id); + connection.authorize(authorizer, RequestAction.WRITE); + }); + // handle expects request (usually from the cluster manager) final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); if (expects != null) { @@ -388,18 +407,24 @@ public class FlowFileQueueResource extends ApplicationResource { value = "The connection id.", required = true ) - @PathParam("connection-id") String connectionId, + @PathParam("connection-id") final String connectionId, @ApiParam( value = "The listing request id.", required = true ) - @PathParam("listing-request-id") String listingRequestId) { + @PathParam("listing-request-id") final String listingRequestId) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable connection = lookup.getConnection(connectionId); + connection.authorize(authorizer, RequestAction.WRITE); + }); + // get the listing request final ListingRequestDTO listingRequest = serviceFacade.getFlowFileListingRequest(connectionId, listingRequestId); populateRemainingFlowFileListingContent(connectionId, listingRequest); @@ -441,17 +466,17 @@ public class FlowFileQueueResource extends ApplicationResource { } ) public Response deleteListingRequest( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The connection id.", required = true ) - @PathParam("connection-id") String connectionId, + @PathParam("connection-id") final String connectionId, @ApiParam( value = "The listing request id.", required = true ) - @PathParam("listing-request-id") String listingRequestId) { + @PathParam("listing-request-id") final String listingRequestId) { // replicate if cluster manager if (properties.isClusterManager()) { @@ -464,6 +489,12 @@ public class FlowFileQueueResource extends ApplicationResource { return generateContinueResponse().build(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable connection = lookup.getConnection(connectionId); + connection.authorize(authorizer, RequestAction.WRITE); + }); + // delete the listing request final ListingRequestDTO listingRequest = serviceFacade.deleteFlowFileListingRequest(connectionId, listingRequestId); @@ -510,18 +541,24 @@ public class FlowFileQueueResource extends ApplicationResource { } ) public Response createDropRequest( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The connection id.", required = true ) - @PathParam("connection-id") String id) { + @PathParam("connection-id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.POST, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable connection = lookup.getConnection(id); + connection.authorize(authorizer, RequestAction.WRITE); + }); + // handle expects request (usually from the cluster manager) final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); if (expects != null) { @@ -577,18 +614,24 @@ public class FlowFileQueueResource extends ApplicationResource { value = "The connection id.", required = true ) - @PathParam("connection-id") String connectionId, + @PathParam("connection-id") final String connectionId, @ApiParam( value = "The drop request id.", required = true ) - @PathParam("drop-request-id") String dropRequestId) { + @PathParam("drop-request-id") final String dropRequestId) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable connection = lookup.getConnection(connectionId); + connection.authorize(authorizer, RequestAction.WRITE); + }); + // get the drop request final DropRequestDTO dropRequest = serviceFacade.getFlowFileDropRequest(connectionId, dropRequestId); dropRequest.setUri(generateResourceUri("flowfile-queues", connectionId, "drop-requests", dropRequestId)); @@ -630,23 +673,29 @@ public class FlowFileQueueResource extends ApplicationResource { } ) public Response removeDropRequest( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The connection id.", required = true ) - @PathParam("connection-id") String connectionId, + @PathParam("connection-id") final String connectionId, @ApiParam( value = "The drop request id.", required = true ) - @PathParam("drop-request-id") String dropRequestId) { + @PathParam("drop-request-id") final String dropRequestId) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable connection = lookup.getConnection(connectionId); + connection.authorize(authorizer, RequestAction.WRITE); + }); + // handle expects request (usually from the cluster manager) final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); if (expects != null) { @@ -676,4 +725,8 @@ public class FlowFileQueueResource extends ApplicationResource { public void setProperties(NiFiProperties properties) { this.properties = properties; } + + public void setAuthorizer(Authorizer authorizer) { + this.authorizer = authorizer; + } }
http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java index 7e882a8..e02dac5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java @@ -16,22 +16,13 @@ */ package org.apache.nifi.web.api; -import java.util.HashSet; -import java.util.Set; - -import javax.ws.rs.Consumes; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; - +import com.sun.jersey.api.core.ResourceContext; +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.AccessDeniedException; import org.apache.nifi.authorization.AuthorizationRequest; @@ -39,6 +30,7 @@ import org.apache.nifi.authorization.AuthorizationResult; import org.apache.nifi.authorization.AuthorizationResult.Result; import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.ResourceFactory; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; @@ -47,9 +39,11 @@ import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.impl.WebClusterManager; import org.apache.nifi.cluster.node.Node; import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.controller.ScheduledState; +import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.ConfigurationSnapshot; import org.apache.nifi.web.NiFiServiceFacade; +import org.apache.nifi.web.Revision; import org.apache.nifi.web.api.dto.AboutDTO; import org.apache.nifi.web.api.dto.BannerDTO; import org.apache.nifi.web.api.dto.BulletinBoardDTO; @@ -88,6 +82,7 @@ import org.apache.nifi.web.api.entity.ProcessorStatusEntity; import org.apache.nifi.web.api.entity.ProcessorTypesEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity; import org.apache.nifi.web.api.entity.ReportingTaskTypesEntity; +import org.apache.nifi.web.api.entity.ScheduleComponentsEntity; import org.apache.nifi.web.api.entity.SearchResultsEntity; import org.apache.nifi.web.api.entity.StatusHistoryEntity; import org.apache.nifi.web.api.request.BulletinBoardPatternParameter; @@ -95,13 +90,26 @@ import org.apache.nifi.web.api.request.ClientIdParameter; import org.apache.nifi.web.api.request.IntegerParameter; import org.apache.nifi.web.api.request.LongParameter; -import com.sun.jersey.api.core.ResourceContext; -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; /** * RESTful endpoint for managing a Flow. @@ -254,20 +262,11 @@ public class FlowResource extends ApplicationResource { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } - // get this process group contents - final ConfigurationSnapshot<ProcessGroupFlowDTO> controllerResponse = serviceFacade.getProcessGroupFlow(groupId, recursive); - final ProcessGroupFlowDTO flow = controllerResponse.getConfiguration(); - - // create the revision - final RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - revision.setVersion(controllerResponse.getVersion()); - - // create the response entity - final ProcessGroupFlowEntity processGroupEntity = new ProcessGroupFlowEntity(); - processGroupEntity.setProcessGroupFlow(populateRemainingFlowContent(flow)); + // get this process group flow + final ProcessGroupFlowEntity entity = serviceFacade.getProcessGroupFlow(groupId, recursive); + populateRemainingFlowContent(entity.getProcessGroupFlow()); - return clusterContext(generateOkResponse(processGroupEntity)).build(); + return clusterContext(generateOkResponse(entity)).build(); } /** @@ -321,6 +320,142 @@ public class FlowResource extends ApplicationResource { return clusterContext(generateOkResponse(entity)).build(); } + /** + * Updates the specified process group. + * + * @param httpServletRequest request + * @param id The id of the process group. + * @param scheduleComponentsEntity A scheduleComponentsEntity. + * @return A processGroupEntity. + */ + @PUT + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @Path("process-groups/{id}") + // TODO - @PreAuthorize("hasRole('ROLE_DFM')") + @ApiOperation( + value = "Updates a process group", + response = ScheduleComponentsEntity.class, + authorizations = { + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response scheduleComponents( + @Context HttpServletRequest httpServletRequest, + @ApiParam( + value = "The process group id.", + required = true + ) + @PathParam("id") String id, + ScheduleComponentsEntity scheduleComponentsEntity) { + + authorizeFlow(); + + // ensure the same id is being used + if (!id.equals(scheduleComponentsEntity.getId())) { + throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does " + + "not equal the process group id of the requested resource (%s).", scheduleComponentsEntity.getId(), id)); + } + + final ScheduledState state; + if (scheduleComponentsEntity.getState() == null) { + throw new IllegalArgumentException("The scheduled state must be specified."); + } else { + try { + state = ScheduledState.valueOf(scheduleComponentsEntity.getState()); + } catch (final IllegalArgumentException iae) { + throw new IllegalArgumentException(String.format("The scheduled must be one of [%s].", StringUtils.join(EnumSet.of(ScheduledState.RUNNING, ScheduledState.STOPPED), ", "))); + } + } + + // ensure its a supported scheduled state + if (ScheduledState.DISABLED.equals(state) || ScheduledState.STARTING.equals(state) || ScheduledState.STOPPING.equals(state)) { + throw new IllegalArgumentException(String.format("The scheduled must be one of [%s].", StringUtils.join(EnumSet.of(ScheduledState.RUNNING, ScheduledState.STOPPED), ", "))); + } + + // if the components are not specified, gather all components and their current revision + if (scheduleComponentsEntity.getComponents() == null) { + // TODO - this will break while clustered until nodes are able to process/replicate requests + // get the current revisions for the components being updated + final Set<Revision> revisions = serviceFacade.getRevisionsFromGroup(id, group -> { + final Set<String> componentIds = new HashSet<>(); + + // ensure authorized for each processor we will attempt to schedule + group.findAllProcessors().stream() + .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PROCESSORS : ProcessGroup.UNSCHEDULABLE_PROCESSORS) + .filter(processor -> processor.isAuthorized(authorizer, RequestAction.WRITE)) + .forEach(processor -> { + componentIds.add(processor.getIdentifier()); + }); + + // ensure authorized for each input port we will attempt to schedule + group.findAllInputPorts().stream() + .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS) + .filter(inputPort -> inputPort.isAuthorized(authorizer, RequestAction.WRITE)) + .forEach(inputPort -> { + componentIds.add(inputPort.getIdentifier()); + }); + + // ensure authorized for each output port we will attempt to schedule + group.findAllOutputPorts().stream() + .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS) + .filter(outputPort -> outputPort.isAuthorized(authorizer, RequestAction.WRITE)) + .forEach(outputPort -> { + componentIds.add(outputPort.getIdentifier()); + }); + + return componentIds; + }); + + // build the component mapping + final Map<String, RevisionDTO> componentsToSchedule = new HashMap<>(); + revisions.forEach(revision -> { + final RevisionDTO dto = new RevisionDTO(); + dto.setClientId(revision.getClientId()); + dto.setVersion(revision.getVersion()); + componentsToSchedule.put(revision.getComponentId(), dto); + }); + + // set the components and their current revision + scheduleComponentsEntity.setComponents(componentsToSchedule); + } + + if (properties.isClusterManager()) { + return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), scheduleComponentsEntity, getHeaders()).getResponse(); + } + + final Map<String, RevisionDTO> componentsToSchedule = scheduleComponentsEntity.getComponents(); + final Map<String, Revision> componentRevisions = componentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getRevision(e.getValue(), e.getKey()))); + final Set<Revision> revisions = new HashSet<>(componentRevisions.values()); + + return withWriteLock( + serviceFacade, + revisions, + lookup -> { + // ensure access to every component being scheduled + componentsToSchedule.keySet().forEach(componentId -> { + final Authorizable connectable = lookup.getConnectable(componentId); + connectable.authorize(authorizer, RequestAction.WRITE); + }); + }, + () -> serviceFacade.verifyScheduleComponents(id, state, componentRevisions.keySet()), + () -> { + // update the process group + final ScheduleComponentsEntity entity = serviceFacade.scheduleComponents(id, state, componentRevisions); + return clusterContext(generateOkResponse(entity)).build(); + } + ); + } + @GET @Consumes(MediaType.WILDCARD) @Produces(MediaType.TEXT_PLAIN) http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java index 56ffa80..e2a51cb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java @@ -16,10 +16,27 @@ */ package org.apache.nifi.web.api; -import java.net.URI; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.authorization.resource.Authorizable; +import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.NiFiServiceFacade; +import org.apache.nifi.web.Revision; +import org.apache.nifi.web.UpdateResult; +import org.apache.nifi.web.api.dto.FunnelDTO; +import org.apache.nifi.web.api.entity.FunnelEntity; +import org.apache.nifi.web.api.request.ClientIdParameter; +import org.apache.nifi.web.api.request.LongParameter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; @@ -35,26 +52,8 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; - -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.cluster.manager.impl.WebClusterManager; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.NiFiServiceFacade; -import org.apache.nifi.web.Revision; -import org.apache.nifi.web.UpdateResult; -import org.apache.nifi.web.api.dto.FunnelDTO; -import org.apache.nifi.web.api.entity.FunnelEntity; -import org.apache.nifi.web.api.request.ClientIdParameter; -import org.apache.nifi.web.api.request.LongParameter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; +import java.net.URI; +import java.util.Set; /** * RESTful endpoint for managing a Funnel. @@ -71,6 +70,7 @@ public class FunnelResource extends ApplicationResource { private NiFiServiceFacade serviceFacade; private WebClusterManager clusterManager; private NiFiProperties properties; + private Authorizer authorizer; /** * Populates the uri for the specified funnels. @@ -154,13 +154,19 @@ public class FunnelResource extends ApplicationResource { value = "The funnel id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable funnel = lookup.getFunnel(id); + funnel.authorize(authorizer, RequestAction.READ); + }); + // get the funnel final FunnelEntity entity = serviceFacade.getFunnel(id); populateRemainingFunnelEntityContent(entity); @@ -198,16 +204,16 @@ public class FunnelResource extends ApplicationResource { } ) public Response updateFunnel( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The funnel id.", required = true ) - @PathParam("id") String id, + @PathParam("id") final String id, @ApiParam( value = "The funnel configuration details.", required = true - ) FunnelEntity funnelEntity) { + ) final FunnelEntity funnelEntity) { if (funnelEntity == null || funnelEntity.getComponent() == null) { throw new IllegalArgumentException("Funnel details must be specified."); @@ -226,39 +232,34 @@ public class FunnelResource extends ApplicationResource { // replicate if cluster manager if (properties.isClusterManager()) { - // change content type to JSON for serializing entity - final Map<String, String> headersToOverride = new HashMap<>(); - headersToOverride.put("content-type", MediaType.APPLICATION_JSON); - - // replicate the request - return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), funnelEntity, getHeaders(headersToOverride)).getResponse(); + return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), funnelEntity, getHeaders()).getResponse(); } // Extract the revision final Revision revision = getRevision(funnelEntity, id); - - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - serviceFacade.claimRevision(revision); - } - if (validationPhase) { - serviceFacade.claimRevision(revision); - return generateContinueResponse().build(); - } - - // update the funnel - final UpdateResult<FunnelEntity> updateResult = serviceFacade.updateFunnel(revision, requestFunnelDTO); - - // get the results - final FunnelEntity entity = updateResult.getResult(); - populateRemainingFunnelEntityContent(entity); - - if (updateResult.isNew()) { - return clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()), entity)).build(); - } else { - return clusterContext(generateOkResponse(entity)).build(); - } + return withWriteLock( + serviceFacade, + revision, + lookup -> { + final Authorizable funnel = lookup.getFunnel(id); + funnel.authorize(authorizer, RequestAction.WRITE); + }, + null, + () -> { + // update the funnel + final UpdateResult<FunnelEntity> updateResult = serviceFacade.updateFunnel(revision, requestFunnelDTO); + + // get the results + final FunnelEntity entity = updateResult.getResult(); + populateRemainingFunnelEntityContent(entity); + + if (updateResult.isNew()) { + return clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()), entity)).build(); + } else { + return clusterContext(generateOkResponse(entity)).build(); + } + } + ); } /** @@ -295,22 +296,22 @@ public class FunnelResource extends ApplicationResource { } ) public Response removeFunnel( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The revision is used to verify the client is working with the latest version of the flow.", required = false ) - @QueryParam(VERSION) LongParameter version, + @QueryParam(VERSION) final LongParameter version, @ApiParam( value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", required = false ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) final ClientIdParameter clientId, @ApiParam( value = "The funnel id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { @@ -319,18 +320,20 @@ public class FunnelResource extends ApplicationResource { // handle expects request (usually from the cluster manager) final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - serviceFacade.claimRevision(revision); - } - if (validationPhase) { - serviceFacade.verifyDeleteFunnel(id); - return generateContinueResponse().build(); - } - - // delete the specified funnel - final FunnelEntity entity = serviceFacade.deleteFunnel(revision, id); - return clusterContext(generateOkResponse(entity)).build(); + return withWriteLock( + serviceFacade, + revision, + lookup -> { + final Authorizable funnel = lookup.getFunnel(id); + funnel.authorize(authorizer, RequestAction.READ); + }, + () -> serviceFacade.verifyDeleteFunnel(id), + () -> { + // delete the specified funnel + final FunnelEntity entity = serviceFacade.deleteFunnel(revision, id); + return clusterContext(generateOkResponse(entity)).build(); + } + ); } // setters @@ -345,4 +348,8 @@ public class FunnelResource extends ApplicationResource { public void setProperties(NiFiProperties properties) { this.properties = properties; } + + public void setAuthorizer(Authorizer authorizer) { + this.authorizer = authorizer; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java index 27110ea..f842956 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java @@ -16,10 +16,25 @@ */ package org.apache.nifi.web.api; -import java.net.URI; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.authorization.resource.Authorizable; +import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.NiFiServiceFacade; +import org.apache.nifi.web.Revision; +import org.apache.nifi.web.UpdateResult; +import org.apache.nifi.web.api.dto.PortDTO; +import org.apache.nifi.web.api.entity.PortEntity; +import org.apache.nifi.web.api.request.ClientIdParameter; +import org.apache.nifi.web.api.request.LongParameter; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; @@ -35,24 +50,8 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; - -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.cluster.manager.impl.WebClusterManager; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.NiFiServiceFacade; -import org.apache.nifi.web.Revision; -import org.apache.nifi.web.UpdateResult; -import org.apache.nifi.web.api.dto.PortDTO; -import org.apache.nifi.web.api.entity.PortEntity; -import org.apache.nifi.web.api.request.ClientIdParameter; -import org.apache.nifi.web.api.request.LongParameter; - -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; +import java.net.URI; +import java.util.Set; /** * RESTful endpoint for managing an Input Port. @@ -67,6 +66,7 @@ public class InputPortResource extends ApplicationResource { private NiFiServiceFacade serviceFacade; private WebClusterManager clusterManager; private NiFiProperties properties; + private Authorizer authorizer; /** * Populates the uri for the specified input ports. @@ -150,13 +150,19 @@ public class InputPortResource extends ApplicationResource { value = "The input port id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable inputPort = lookup.getInputPort(id); + inputPort.authorize(authorizer, RequestAction.READ); + }); + // get the port final PortEntity entity = serviceFacade.getInputPort(id); populateRemainingInputPortEntityContent(entity); @@ -199,11 +205,11 @@ public class InputPortResource extends ApplicationResource { value = "The input port id.", required = true ) - @PathParam("id") String id, + @PathParam("id") final String id, @ApiParam( value = "The input port configuration details.", required = true - ) PortEntity portEntity) { + ) final PortEntity portEntity) { if (portEntity == null || portEntity.getComponent() == null) { throw new IllegalArgumentException("Input port details must be specified."); @@ -222,37 +228,34 @@ public class InputPortResource extends ApplicationResource { // replicate if cluster manager if (properties.isClusterManager()) { - // change content type to JSON for serializing entity - final Map<String, String> headersToOverride = new HashMap<>(); - headersToOverride.put("content-type", MediaType.APPLICATION_JSON); - - // replicate the request - return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), portEntity, getHeaders(headersToOverride)).getResponse(); + return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), portEntity, getHeaders()).getResponse(); } // handle expects request (usually from the cluster manager) final Revision revision = getRevision(portEntity, id); - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - serviceFacade.claimRevision(revision); - } - if (validationPhase) { - serviceFacade.verifyUpdateInputPort(requestPortDTO); - return generateContinueResponse().build(); - } + return withWriteLock( + serviceFacade, + revision, + lookup -> { + final Authorizable inputPort = lookup.getInputPort(id); + inputPort.authorize(authorizer, RequestAction.WRITE); + }, + () -> serviceFacade.verifyUpdateInputPort(requestPortDTO), + () -> { + // update the input port + final UpdateResult<PortEntity> updateResult = serviceFacade.updateInputPort(revision, requestPortDTO); - // update the input port - final UpdateResult<PortEntity> updateResult = serviceFacade.updateInputPort(revision, requestPortDTO); - - // build the response entity - final PortEntity entity = updateResult.getResult(); - populateRemainingInputPortEntityContent(entity); + // build the response entity + final PortEntity entity = updateResult.getResult(); + populateRemainingInputPortEntityContent(entity); - if (updateResult.isNew()) { - return clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()), entity)).build(); - } else { - return clusterContext(generateOkResponse(entity)).build(); - } + if (updateResult.isNew()) { + return clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()), entity)).build(); + } else { + return clusterContext(generateOkResponse(entity)).build(); + } + } + ); } /** @@ -291,17 +294,17 @@ public class InputPortResource extends ApplicationResource { value = "The revision is used to verify the client is working with the latest version of the flow.", required = false ) - @QueryParam(VERSION) LongParameter version, + @QueryParam(VERSION) final LongParameter version, @ApiParam( value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", required = false ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) final ClientIdParameter clientId, @ApiParam( value = "The input port id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { @@ -310,18 +313,20 @@ public class InputPortResource extends ApplicationResource { // handle expects request (usually from the cluster manager) final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - serviceFacade.claimRevision(revision); - } - if (validationPhase) { - serviceFacade.verifyDeleteInputPort(id); - return generateContinueResponse().build(); - } - - // delete the specified input port - final PortEntity entity = serviceFacade.deleteInputPort(revision, id); - return clusterContext(generateOkResponse(entity)).build(); + return withWriteLock( + serviceFacade, + revision, + lookup -> { + final Authorizable inputPort = lookup.getInputPort(id); + inputPort.authorize(authorizer, RequestAction.WRITE); + }, + () -> serviceFacade.verifyDeleteInputPort(id), + () -> { + // delete the specified input port + final PortEntity entity = serviceFacade.deleteInputPort(revision, id); + return clusterContext(generateOkResponse(entity)).build(); + } + ); } // setters @@ -336,4 +341,8 @@ public class InputPortResource extends ApplicationResource { public void setProperties(NiFiProperties properties) { this.properties = properties; } + + public void setAuthorizer(Authorizer authorizer) { + this.authorizer = authorizer; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java index 946d33d..d2cc3fa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java @@ -16,10 +16,27 @@ */ package org.apache.nifi.web.api; -import java.net.URI; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.authorization.resource.Authorizable; +import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.NiFiServiceFacade; +import org.apache.nifi.web.Revision; +import org.apache.nifi.web.UpdateResult; +import org.apache.nifi.web.api.dto.LabelDTO; +import org.apache.nifi.web.api.entity.LabelEntity; +import org.apache.nifi.web.api.request.ClientIdParameter; +import org.apache.nifi.web.api.request.LongParameter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; @@ -35,26 +52,8 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; - -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.cluster.manager.impl.WebClusterManager; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.NiFiServiceFacade; -import org.apache.nifi.web.Revision; -import org.apache.nifi.web.UpdateResult; -import org.apache.nifi.web.api.dto.LabelDTO; -import org.apache.nifi.web.api.entity.LabelEntity; -import org.apache.nifi.web.api.request.ClientIdParameter; -import org.apache.nifi.web.api.request.LongParameter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; +import java.net.URI; +import java.util.Set; /** * RESTful endpoint for managing a Label. @@ -71,6 +70,7 @@ public class LabelResource extends ApplicationResource { private NiFiServiceFacade serviceFacade; private WebClusterManager clusterManager; private NiFiProperties properties; + private Authorizer authorizer; /** * Populates the uri for the specified labels. @@ -154,13 +154,19 @@ public class LabelResource extends ApplicationResource { value = "The label id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable label = lookup.getLabel(id); + label.authorize(authorizer, RequestAction.READ); + }); + // get the label final LabelEntity entity = serviceFacade.getLabel(id); populateRemainingLabelEntityContent(entity); @@ -198,16 +204,16 @@ public class LabelResource extends ApplicationResource { } ) public Response updateLabel( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The label id.", required = true ) - @PathParam("id") String id, + @PathParam("id") final String id, @ApiParam( value = "The label configuraiton details.", required = true - ) LabelEntity labelEntity) { + ) final LabelEntity labelEntity) { if (labelEntity == null || labelEntity.getComponent() == null) { throw new IllegalArgumentException("Label details must be specified."); @@ -226,34 +232,32 @@ public class LabelResource extends ApplicationResource { // replicate if cluster manager if (properties.isClusterManager()) { - // change content type to JSON for serializing entity - final Map<String, String> headersToOverride = new HashMap<>(); - headersToOverride.put("content-type", MediaType.APPLICATION_JSON); - - // replicate the request - return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), labelEntity, getHeaders(headersToOverride)).getResponse(); + return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), labelEntity, getHeaders()).getResponse(); } // handle expects request (usually from the cluster manager) final Revision revision = getRevision(labelEntity, id); - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - serviceFacade.claimRevision(revision); - } - if (validationPhase) { - return generateContinueResponse().build(); - } + return withWriteLock( + serviceFacade, + revision, + lookup -> { + final Authorizable label = lookup.getLabel(id); + label.authorize(authorizer, RequestAction.WRITE); + }, + null, + () -> { + // update the label + final UpdateResult<LabelEntity> result = serviceFacade.updateLabel(revision, requestLabelDTO); + final LabelEntity entity = result.getResult(); + populateRemainingLabelEntityContent(entity); - // update the label - final UpdateResult<LabelEntity> result = serviceFacade.updateLabel(revision, requestLabelDTO); - final LabelEntity entity = result.getResult(); - populateRemainingLabelEntityContent(entity); - - if (result.isNew()) { - return clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()), entity)).build(); - } else { - return clusterContext(generateOkResponse(entity)).build(); - } + if (result.isNew()) { + return clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()), entity)).build(); + } else { + return clusterContext(generateOkResponse(entity)).build(); + } + } + ); } /** @@ -287,22 +291,22 @@ public class LabelResource extends ApplicationResource { } ) public Response removeLabel( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The revision is used to verify the client is working with the latest version of the flow.", required = false ) - @QueryParam(VERSION) LongParameter version, + @QueryParam(VERSION) final LongParameter version, @ApiParam( value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", required = false ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) final ClientIdParameter clientId, @ApiParam( value = "The label id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { @@ -311,17 +315,20 @@ public class LabelResource extends ApplicationResource { // handle expects request (usually from the cluster manager) final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - serviceFacade.claimRevision(revision); - } - if (validationPhase) { - return generateContinueResponse().build(); - } - - // delete the specified label - final LabelEntity entity = serviceFacade.deleteLabel(revision, id); - return clusterContext(generateOkResponse(entity)).build(); + return withWriteLock( + serviceFacade, + revision, + lookup -> { + final Authorizable label = lookup.getLabel(id); + label.authorize(authorizer, RequestAction.WRITE); + }, + null, + () -> { + // delete the specified label + final LabelEntity entity = serviceFacade.deleteLabel(revision, id); + return clusterContext(generateOkResponse(entity)).build(); + } + ); } // setters @@ -336,4 +343,8 @@ public class LabelResource extends ApplicationResource { public void setProperties(NiFiProperties properties) { this.properties = properties; } + + public void setAuthorizer(Authorizer authorizer) { + this.authorizer = authorizer; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java index eb18b79..892323b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java @@ -16,10 +16,27 @@ */ package org.apache.nifi.web.api; -import java.net.URI; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.authorization.resource.Authorizable; +import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.NiFiServiceFacade; +import org.apache.nifi.web.Revision; +import org.apache.nifi.web.UpdateResult; +import org.apache.nifi.web.api.dto.PortDTO; +import org.apache.nifi.web.api.entity.PortEntity; +import org.apache.nifi.web.api.request.ClientIdParameter; +import org.apache.nifi.web.api.request.LongParameter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; @@ -35,26 +52,8 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; - -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.cluster.manager.impl.WebClusterManager; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.NiFiServiceFacade; -import org.apache.nifi.web.Revision; -import org.apache.nifi.web.UpdateResult; -import org.apache.nifi.web.api.dto.PortDTO; -import org.apache.nifi.web.api.entity.PortEntity; -import org.apache.nifi.web.api.request.ClientIdParameter; -import org.apache.nifi.web.api.request.LongParameter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; +import java.net.URI; +import java.util.Set; /** * RESTful endpoint for managing an Output Port. @@ -71,6 +70,7 @@ public class OutputPortResource extends ApplicationResource { private NiFiServiceFacade serviceFacade; private WebClusterManager clusterManager; private NiFiProperties properties; + private Authorizer authorizer; /** * Populates the uri for the specified output ports. @@ -154,13 +154,19 @@ public class OutputPortResource extends ApplicationResource { value = "The output port id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable outputPort = lookup.getOutputPort(id); + outputPort.authorize(authorizer, RequestAction.READ); + }); + // get the port final PortEntity entity = serviceFacade.getOutputPort(id); populateRemainingOutputPortEntityContent(entity); @@ -198,16 +204,16 @@ public class OutputPortResource extends ApplicationResource { } ) public Response updateOutputPort( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The output port id.", required = true ) - @PathParam("id") String id, + @PathParam("id") final String id, @ApiParam( value = "The output port configuration details.", required = true - ) PortEntity portEntity) { + ) final PortEntity portEntity) { if (portEntity == null || portEntity.getComponent() == null) { throw new IllegalArgumentException("Output port details must be specified."); @@ -226,37 +232,34 @@ public class OutputPortResource extends ApplicationResource { // replicate if cluster manager if (properties.isClusterManager()) { - // change content type to JSON for serializing entity - final Map<String, String> headersToOverride = new HashMap<>(); - headersToOverride.put("content-type", MediaType.APPLICATION_JSON); - - // replicate the request - return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), portEntity, getHeaders(headersToOverride)).getResponse(); + return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), portEntity, getHeaders()).getResponse(); } // handle expects request (usually from the cluster manager) final Revision revision = getRevision(portEntity, id); - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - serviceFacade.claimRevision(revision); - } - if (validationPhase) { - serviceFacade.verifyUpdateOutputPort(requestPortDTO); - return generateContinueResponse().build(); - } - - // update the output port - final UpdateResult<PortEntity> updateResult = serviceFacade.updateOutputPort(revision, requestPortDTO); - - // get the results - final PortEntity entity = updateResult.getResult(); - populateRemainingOutputPortEntityContent(entity); - - if (updateResult.isNew()) { - return clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()), entity)).build(); - } else { - return clusterContext(generateOkResponse(entity)).build(); - } + return withWriteLock( + serviceFacade, + revision, + lookup -> { + final Authorizable outputPort = lookup.getOutputPort(id); + outputPort.authorize(authorizer, RequestAction.WRITE); + }, + () -> serviceFacade.verifyUpdateOutputPort(requestPortDTO), + () -> { + // update the output port + final UpdateResult<PortEntity> updateResult = serviceFacade.updateOutputPort(revision, requestPortDTO); + + // get the results + final PortEntity entity = updateResult.getResult(); + populateRemainingOutputPortEntityContent(entity); + + if (updateResult.isNew()) { + return clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()), entity)).build(); + } else { + return clusterContext(generateOkResponse(entity)).build(); + } + } + ); } /** @@ -290,22 +293,22 @@ public class OutputPortResource extends ApplicationResource { } ) public Response removeOutputPort( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, @ApiParam( value = "The revision is used to verify the client is working with the latest version of the flow.", required = false ) - @QueryParam(VERSION) LongParameter version, + @QueryParam(VERSION) final LongParameter version, @ApiParam( value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", required = false ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) final ClientIdParameter clientId, @ApiParam( value = "The output port id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { @@ -314,18 +317,20 @@ public class OutputPortResource extends ApplicationResource { // handle expects request (usually from the cluster manager) final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - serviceFacade.claimRevision(revision); - } - if (validationPhase) { - serviceFacade.verifyDeleteOutputPort(id); - return generateContinueResponse().build(); - } - - // delete the specified output port - final PortEntity entity = serviceFacade.deleteOutputPort(revision, id); - return clusterContext(generateOkResponse(entity)).build(); + return withWriteLock( + serviceFacade, + revision, + lookup -> { + final Authorizable outputPort = lookup.getOutputPort(id); + outputPort.authorize(authorizer, RequestAction.WRITE); + }, + () -> serviceFacade.verifyDeleteOutputPort(id), + () -> { + // delete the specified output port + final PortEntity entity = serviceFacade.deleteOutputPort(revision, id); + return clusterContext(generateOkResponse(entity)).build(); + } + ); } // setters @@ -340,4 +345,8 @@ public class OutputPortResource extends ApplicationResource { public void setProperties(NiFiProperties properties) { this.properties = properties; } + + public void setAuthorizer(Authorizer authorizer) { + this.authorizer = authorizer; + } }