http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java
index f6dd081..0b8af7c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java
@@ -16,32 +16,16 @@
  */
 package org.apache.nifi.web.api;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URI;
-import java.util.HashSet;
-import java.util.Set;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.core.StreamingOutput;
-
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.RequestAction;
+import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
 import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
@@ -60,12 +44,29 @@ import org.apache.nifi.web.api.entity.FlowFileEntity;
 import org.apache.nifi.web.api.entity.ListingRequestEntity;
 import org.apache.nifi.web.api.request.ClientIdParameter;
 
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.StreamingOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * RESTful endpoint for managing a flowfile queue.
@@ -75,12 +76,12 @@ import com.wordnik.swagger.annotations.Authorization;
     value = "/flowfile-queues",
     description = "Endpoint for managing a FlowFile Queue."
 )
-// TODO: Need revisions of the Connections for these endpoints!
 public class FlowFileQueueResource extends ApplicationResource {
 
     private NiFiServiceFacade serviceFacade;
     private WebClusterManager clusterManager;
     private NiFiProperties properties;
+    private Authorizer authorizer;
 
     /**
      * Populate the URIs for the specified flowfile listing.
@@ -147,17 +148,17 @@ public class FlowFileQueueResource extends 
ApplicationResource {
                 value = "The connection id.",
                 required = true
             )
-            @PathParam("connection-id") String connectionId,
+            @PathParam("connection-id") final String connectionId,
             @ApiParam(
                 value = "The flowfile uuid.",
                 required = true
             )
-            @PathParam("flowfile-uuid") String flowFileUuid,
+            @PathParam("flowfile-uuid") final String flowFileUuid,
             @ApiParam(
                 value = "The id of the node where the content exists if 
clustered.",
                 required = false
             )
-            @QueryParam("clusterNodeId") String clusterNodeId) {
+            @QueryParam("clusterNodeId") final String clusterNodeId) {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
@@ -179,6 +180,12 @@ public class FlowFileQueueResource extends 
ApplicationResource {
             }
         }
 
+        // authorize access
+        serviceFacade.authorizeAccess(lookup -> {
+            final Authorizable connection = lookup.getConnection(connectionId);
+            connection.authorize(authorizer, RequestAction.WRITE);
+        });
+
         // get the flowfile
         final FlowFileDTO flowfileDto = 
serviceFacade.getFlowFile(connectionId, flowFileUuid);
         populateRemainingFlowFileContent(connectionId, flowfileDto);
@@ -224,22 +231,22 @@ public class FlowFileQueueResource extends 
ApplicationResource {
                 value = "If the client id is not specified, new one will be 
generated. This value (whether specified or generated) is included in the 
response.",
                 required = false
             )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
+            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) final 
ClientIdParameter clientId,
             @ApiParam(
                 value = "The connection id.",
                 required = true
             )
-            @PathParam("connection-id") String connectionId,
+            @PathParam("connection-id") final String connectionId,
             @ApiParam(
                 value = "The flowfile uuid.",
                 required = true
             )
-            @PathParam("flowfile-uuid") String flowFileUuid,
+            @PathParam("flowfile-uuid") final String flowFileUuid,
             @ApiParam(
                 value = "The id of the node where the content exists if 
clustered.",
                 required = false
             )
-            @QueryParam("clusterNodeId") String clusterNodeId) {
+            @QueryParam("clusterNodeId") final String clusterNodeId) {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
@@ -261,6 +268,12 @@ public class FlowFileQueueResource extends 
ApplicationResource {
             }
         }
 
+        // authorize access
+        serviceFacade.authorizeAccess(lookup -> {
+            final Authorizable connection = lookup.getConnection(connectionId);
+            connection.authorize(authorizer, RequestAction.WRITE);
+        });
+
         // get the uri of the request
         final String uri = generateResourceUri("flowfile-queues", 
connectionId, "flowfiles", flowFileUuid, "content");
 
@@ -320,18 +333,24 @@ public class FlowFileQueueResource extends 
ApplicationResource {
         }
     )
     public Response createFlowFileListing(
-            @Context HttpServletRequest httpServletRequest,
+            @Context final HttpServletRequest httpServletRequest,
             @ApiParam(
                 value = "The connection id.",
                 required = true
             )
-            @PathParam("connection-id") String id) {
+            @PathParam("connection-id") final String id) {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
             return clusterManager.applyRequest(HttpMethod.POST, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
+        // authorize access
+        serviceFacade.authorizeAccess(lookup -> {
+            final Authorizable connection = lookup.getConnection(id);
+            connection.authorize(authorizer, RequestAction.WRITE);
+        });
+
         // handle expects request (usually from the cluster manager)
         final String expects = 
httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
         if (expects != null) {
@@ -388,18 +407,24 @@ public class FlowFileQueueResource extends 
ApplicationResource {
                 value = "The connection id.",
                 required = true
             )
-            @PathParam("connection-id") String connectionId,
+            @PathParam("connection-id") final String connectionId,
             @ApiParam(
                 value = "The listing request id.",
                 required = true
             )
-            @PathParam("listing-request-id") String listingRequestId) {
+            @PathParam("listing-request-id") final String listingRequestId) {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
             return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
+        // authorize access
+        serviceFacade.authorizeAccess(lookup -> {
+            final Authorizable connection = lookup.getConnection(connectionId);
+            connection.authorize(authorizer, RequestAction.WRITE);
+        });
+
         // get the listing request
         final ListingRequestDTO listingRequest = 
serviceFacade.getFlowFileListingRequest(connectionId, listingRequestId);
         populateRemainingFlowFileListingContent(connectionId, listingRequest);
@@ -441,17 +466,17 @@ public class FlowFileQueueResource extends 
ApplicationResource {
         }
     )
     public Response deleteListingRequest(
-            @Context HttpServletRequest httpServletRequest,
+            @Context final HttpServletRequest httpServletRequest,
             @ApiParam(
                 value = "The connection id.",
                 required = true
             )
-            @PathParam("connection-id") String connectionId,
+            @PathParam("connection-id") final String connectionId,
             @ApiParam(
                 value = "The listing request id.",
                 required = true
             )
-            @PathParam("listing-request-id") String listingRequestId) {
+            @PathParam("listing-request-id") final String listingRequestId) {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
@@ -464,6 +489,12 @@ public class FlowFileQueueResource extends 
ApplicationResource {
             return generateContinueResponse().build();
         }
 
+        // authorize access
+        serviceFacade.authorizeAccess(lookup -> {
+            final Authorizable connection = lookup.getConnection(connectionId);
+            connection.authorize(authorizer, RequestAction.WRITE);
+        });
+
         // delete the listing request
         final ListingRequestDTO listingRequest = 
serviceFacade.deleteFlowFileListingRequest(connectionId, listingRequestId);
 
@@ -510,18 +541,24 @@ public class FlowFileQueueResource extends 
ApplicationResource {
         }
     )
     public Response createDropRequest(
-        @Context HttpServletRequest httpServletRequest,
+        @Context final HttpServletRequest httpServletRequest,
         @ApiParam(
             value = "The connection id.",
             required = true
         )
-        @PathParam("connection-id") String id) {
+        @PathParam("connection-id") final String id) {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
             return clusterManager.applyRequest(HttpMethod.POST, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
+        // authorize access
+        serviceFacade.authorizeAccess(lookup -> {
+            final Authorizable connection = lookup.getConnection(id);
+            connection.authorize(authorizer, RequestAction.WRITE);
+        });
+
         // handle expects request (usually from the cluster manager)
         final String expects = 
httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
         if (expects != null) {
@@ -577,18 +614,24 @@ public class FlowFileQueueResource extends 
ApplicationResource {
                     value = "The connection id.",
                     required = true
             )
-            @PathParam("connection-id") String connectionId,
+            @PathParam("connection-id") final String connectionId,
             @ApiParam(
                     value = "The drop request id.",
                     required = true
             )
-            @PathParam("drop-request-id") String dropRequestId) {
+            @PathParam("drop-request-id") final String dropRequestId) {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
             return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
+        // authorize access
+        serviceFacade.authorizeAccess(lookup -> {
+            final Authorizable connection = lookup.getConnection(connectionId);
+            connection.authorize(authorizer, RequestAction.WRITE);
+        });
+
         // get the drop request
         final DropRequestDTO dropRequest = 
serviceFacade.getFlowFileDropRequest(connectionId, dropRequestId);
         dropRequest.setUri(generateResourceUri("flowfile-queues", 
connectionId, "drop-requests", dropRequestId));
@@ -630,23 +673,29 @@ public class FlowFileQueueResource extends 
ApplicationResource {
             }
     )
     public Response removeDropRequest(
-            @Context HttpServletRequest httpServletRequest,
+            @Context final HttpServletRequest httpServletRequest,
             @ApiParam(
                     value = "The connection id.",
                     required = true
             )
-            @PathParam("connection-id") String connectionId,
+            @PathParam("connection-id") final String connectionId,
             @ApiParam(
                     value = "The drop request id.",
                     required = true
             )
-            @PathParam("drop-request-id") String dropRequestId) {
+            @PathParam("drop-request-id") final String dropRequestId) {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
             return clusterManager.applyRequest(HttpMethod.DELETE, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
+        // authorize access
+        serviceFacade.authorizeAccess(lookup -> {
+            final Authorizable connection = lookup.getConnection(connectionId);
+            connection.authorize(authorizer, RequestAction.WRITE);
+        });
+
         // handle expects request (usually from the cluster manager)
         final String expects = 
httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
         if (expects != null) {
@@ -676,4 +725,8 @@ public class FlowFileQueueResource extends 
ApplicationResource {
     public void setProperties(NiFiProperties properties) {
         this.properties = properties;
     }
+
+    public void setAuthorizer(Authorizer authorizer) {
+        this.authorizer = authorizer;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
index 7e882a8..e02dac5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
@@ -16,22 +16,13 @@
  */
 package org.apache.nifi.web.api;
 
-import java.util.HashSet;
-import java.util.Set;
-
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
+import com.sun.jersey.api.core.ResourceContext;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.AccessDeniedException;
 import org.apache.nifi.authorization.AuthorizationRequest;
@@ -39,6 +30,7 @@ import org.apache.nifi.authorization.AuthorizationResult;
 import org.apache.nifi.authorization.AuthorizationResult.Result;
 import org.apache.nifi.authorization.Authorizer;
 import org.apache.nifi.authorization.RequestAction;
+import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.resource.ResourceFactory;
 import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.authorization.user.NiFiUserUtils;
@@ -47,9 +39,11 @@ import 
org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
 import org.apache.nifi.cluster.node.Node;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.web.ConfigurationSnapshot;
 import org.apache.nifi.web.NiFiServiceFacade;
+import org.apache.nifi.web.Revision;
 import org.apache.nifi.web.api.dto.AboutDTO;
 import org.apache.nifi.web.api.dto.BannerDTO;
 import org.apache.nifi.web.api.dto.BulletinBoardDTO;
@@ -88,6 +82,7 @@ import org.apache.nifi.web.api.entity.ProcessorStatusEntity;
 import org.apache.nifi.web.api.entity.ProcessorTypesEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity;
 import org.apache.nifi.web.api.entity.ReportingTaskTypesEntity;
+import org.apache.nifi.web.api.entity.ScheduleComponentsEntity;
 import org.apache.nifi.web.api.entity.SearchResultsEntity;
 import org.apache.nifi.web.api.entity.StatusHistoryEntity;
 import org.apache.nifi.web.api.request.BulletinBoardPatternParameter;
@@ -95,13 +90,26 @@ import org.apache.nifi.web.api.request.ClientIdParameter;
 import org.apache.nifi.web.api.request.IntegerParameter;
 import org.apache.nifi.web.api.request.LongParameter;
 
-import com.sun.jersey.api.core.ResourceContext;
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * RESTful endpoint for managing a Flow.
@@ -254,20 +262,11 @@ public class FlowResource extends ApplicationResource {
             return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
-        // get this process group contents
-        final ConfigurationSnapshot<ProcessGroupFlowDTO> controllerResponse = 
serviceFacade.getProcessGroupFlow(groupId, recursive);
-        final ProcessGroupFlowDTO flow = controllerResponse.getConfiguration();
-
-        // create the revision
-        final RevisionDTO revision = new RevisionDTO();
-        revision.setClientId(clientId.getClientId());
-        revision.setVersion(controllerResponse.getVersion());
-
-        // create the response entity
-        final ProcessGroupFlowEntity processGroupEntity = new 
ProcessGroupFlowEntity();
-        
processGroupEntity.setProcessGroupFlow(populateRemainingFlowContent(flow));
+        // get this process group flow
+        final ProcessGroupFlowEntity entity = 
serviceFacade.getProcessGroupFlow(groupId, recursive);
+        populateRemainingFlowContent(entity.getProcessGroupFlow());
 
-        return clusterContext(generateOkResponse(processGroupEntity)).build();
+        return clusterContext(generateOkResponse(entity)).build();
     }
 
     /**
@@ -321,6 +320,142 @@ public class FlowResource extends ApplicationResource {
         return clusterContext(generateOkResponse(entity)).build();
     }
 
+    /**
+     * Updates the specified process group.
+     *
+     * @param httpServletRequest request
+     * @param id The id of the process group.
+     * @param scheduleComponentsEntity A scheduleComponentsEntity.
+     * @return A processGroupEntity.
+     */
+    @PUT
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("process-groups/{id}")
+    // TODO - @PreAuthorize("hasRole('ROLE_DFM')")
+    @ApiOperation(
+        value = "Updates a process group",
+        response = ScheduleComponentsEntity.class,
+        authorizations = {
+            @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+        }
+    )
+    @ApiResponses(
+        value = {
+            @ApiResponse(code = 400, message = "NiFi was unable to complete 
the request because it was invalid. The request should not be retried without 
modification."),
+            @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
+            @ApiResponse(code = 403, message = "Client is not authorized to 
make this request."),
+            @ApiResponse(code = 404, message = "The specified resource could 
not be found."),
+            @ApiResponse(code = 409, message = "The request was valid but NiFi 
was not in the appropriate state to process it. Retrying the same request later 
may be successful.")
+        }
+    )
+    public Response scheduleComponents(
+        @Context HttpServletRequest httpServletRequest,
+        @ApiParam(
+            value = "The process group id.",
+            required = true
+        )
+        @PathParam("id") String id,
+        ScheduleComponentsEntity scheduleComponentsEntity) {
+
+        authorizeFlow();
+
+        // ensure the same id is being used
+        if (!id.equals(scheduleComponentsEntity.getId())) {
+            throw new IllegalArgumentException(String.format("The process 
group id (%s) in the request body does "
+                + "not equal the process group id of the requested resource 
(%s).", scheduleComponentsEntity.getId(), id));
+        }
+
+        final ScheduledState state;
+        if (scheduleComponentsEntity.getState() == null) {
+            throw new IllegalArgumentException("The scheduled state must be 
specified.");
+        } else {
+            try {
+                state = 
ScheduledState.valueOf(scheduleComponentsEntity.getState());
+            } catch (final IllegalArgumentException iae) {
+                throw new IllegalArgumentException(String.format("The 
scheduled must be one of [%s].", 
StringUtils.join(EnumSet.of(ScheduledState.RUNNING, ScheduledState.STOPPED), ", 
")));
+            }
+        }
+
+        // ensure its a supported scheduled state
+        if (ScheduledState.DISABLED.equals(state) || 
ScheduledState.STARTING.equals(state) || ScheduledState.STOPPING.equals(state)) 
{
+            throw new IllegalArgumentException(String.format("The scheduled 
must be one of [%s].", StringUtils.join(EnumSet.of(ScheduledState.RUNNING, 
ScheduledState.STOPPED), ", ")));
+        }
+
+        // if the components are not specified, gather all components and 
their current revision
+        if (scheduleComponentsEntity.getComponents() == null) {
+            // TODO - this will break while clustered until nodes are able to 
process/replicate requests
+            // get the current revisions for the components being updated
+            final Set<Revision> revisions = 
serviceFacade.getRevisionsFromGroup(id, group -> {
+                final Set<String> componentIds = new HashSet<>();
+
+                // ensure authorized for each processor we will attempt to 
schedule
+                group.findAllProcessors().stream()
+                    .filter(ScheduledState.RUNNING.equals(state) ? 
ProcessGroup.SCHEDULABLE_PROCESSORS : ProcessGroup.UNSCHEDULABLE_PROCESSORS)
+                    .filter(processor -> processor.isAuthorized(authorizer, 
RequestAction.WRITE))
+                    .forEach(processor -> {
+                        componentIds.add(processor.getIdentifier());
+                    });
+
+                // ensure authorized for each input port we will attempt to 
schedule
+                group.findAllInputPorts().stream()
+                    .filter(ScheduledState.RUNNING.equals(state) ? 
ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS)
+                    .filter(inputPort -> inputPort.isAuthorized(authorizer, 
RequestAction.WRITE))
+                    .forEach(inputPort -> {
+                        componentIds.add(inputPort.getIdentifier());
+                    });
+
+                // ensure authorized for each output port we will attempt to 
schedule
+                group.findAllOutputPorts().stream()
+                    .filter(ScheduledState.RUNNING.equals(state) ? 
ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS)
+                    .filter(outputPort -> outputPort.isAuthorized(authorizer, 
RequestAction.WRITE))
+                    .forEach(outputPort -> {
+                        componentIds.add(outputPort.getIdentifier());
+                    });
+
+                return componentIds;
+            });
+
+            // build the component mapping
+            final Map<String, RevisionDTO> componentsToSchedule = new 
HashMap<>();
+            revisions.forEach(revision -> {
+                final RevisionDTO dto = new RevisionDTO();
+                dto.setClientId(revision.getClientId());
+                dto.setVersion(revision.getVersion());
+                componentsToSchedule.put(revision.getComponentId(), dto);
+            });
+
+            // set the components and their current revision
+            scheduleComponentsEntity.setComponents(componentsToSchedule);
+        }
+
+        if (properties.isClusterManager()) {
+            return clusterManager.applyRequest(HttpMethod.PUT, 
getAbsolutePath(), scheduleComponentsEntity, getHeaders()).getResponse();
+        }
+
+        final Map<String, RevisionDTO> componentsToSchedule = 
scheduleComponentsEntity.getComponents();
+        final Map<String, Revision> componentRevisions = 
componentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
 e -> getRevision(e.getValue(), e.getKey())));
+        final Set<Revision> revisions = new 
HashSet<>(componentRevisions.values());
+
+        return withWriteLock(
+            serviceFacade,
+            revisions,
+            lookup -> {
+                // ensure access to every component being scheduled
+                componentsToSchedule.keySet().forEach(componentId -> {
+                    final Authorizable connectable = 
lookup.getConnectable(componentId);
+                    connectable.authorize(authorizer, RequestAction.WRITE);
+                });
+            },
+            () -> serviceFacade.verifyScheduleComponents(id, state, 
componentRevisions.keySet()),
+            () -> {
+                // update the process group
+                final ScheduleComponentsEntity entity = 
serviceFacade.scheduleComponents(id, state, componentRevisions);
+                return clusterContext(generateOkResponse(entity)).build();
+            }
+        );
+    }
+
     @GET
     @Consumes(MediaType.WILDCARD)
     @Produces(MediaType.TEXT_PLAIN)

http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java
index 56ffa80..e2a51cb 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java
@@ -16,10 +16,27 @@
  */
 package org.apache.nifi.web.api;
 
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.RequestAction;
+import org.apache.nifi.authorization.resource.Authorizable;
+import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.NiFiServiceFacade;
+import org.apache.nifi.web.Revision;
+import org.apache.nifi.web.UpdateResult;
+import org.apache.nifi.web.api.dto.FunnelDTO;
+import org.apache.nifi.web.api.entity.FunnelEntity;
+import org.apache.nifi.web.api.request.ClientIdParameter;
+import org.apache.nifi.web.api.request.LongParameter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
@@ -35,26 +52,8 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.cluster.manager.impl.WebClusterManager;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.web.NiFiServiceFacade;
-import org.apache.nifi.web.Revision;
-import org.apache.nifi.web.UpdateResult;
-import org.apache.nifi.web.api.dto.FunnelDTO;
-import org.apache.nifi.web.api.entity.FunnelEntity;
-import org.apache.nifi.web.api.request.ClientIdParameter;
-import org.apache.nifi.web.api.request.LongParameter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
+import java.net.URI;
+import java.util.Set;
 
 /**
  * RESTful endpoint for managing a Funnel.
@@ -71,6 +70,7 @@ public class FunnelResource extends ApplicationResource {
     private NiFiServiceFacade serviceFacade;
     private WebClusterManager clusterManager;
     private NiFiProperties properties;
+    private Authorizer authorizer;
 
     /**
      * Populates the uri for the specified funnels.
@@ -154,13 +154,19 @@ public class FunnelResource extends ApplicationResource {
                     value = "The funnel id.",
                     required = true
             )
-            @PathParam("id") String id) {
+            @PathParam("id") final String id) {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
             return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
+        // authorize access
+        serviceFacade.authorizeAccess(lookup -> {
+            final Authorizable funnel = lookup.getFunnel(id);
+            funnel.authorize(authorizer, RequestAction.READ);
+        });
+
         // get the funnel
         final FunnelEntity entity = serviceFacade.getFunnel(id);
         populateRemainingFunnelEntityContent(entity);
@@ -198,16 +204,16 @@ public class FunnelResource extends ApplicationResource {
             }
     )
     public Response updateFunnel(
-            @Context HttpServletRequest httpServletRequest,
+            @Context final HttpServletRequest httpServletRequest,
             @ApiParam(
                     value = "The funnel id.",
                     required = true
             )
-            @PathParam("id") String id,
+            @PathParam("id") final String id,
             @ApiParam(
                     value = "The funnel configuration details.",
                     required = true
-            ) FunnelEntity funnelEntity) {
+            ) final FunnelEntity funnelEntity) {
 
         if (funnelEntity == null || funnelEntity.getComponent() == null) {
             throw new IllegalArgumentException("Funnel details must be 
specified.");
@@ -226,39 +232,34 @@ public class FunnelResource extends ApplicationResource {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
-            // change content type to JSON for serializing entity
-            final Map<String, String> headersToOverride = new HashMap<>();
-            headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
-
-            // replicate the request
-            return clusterManager.applyRequest(HttpMethod.PUT, 
getAbsolutePath(), funnelEntity, getHeaders(headersToOverride)).getResponse();
+            return clusterManager.applyRequest(HttpMethod.PUT, 
getAbsolutePath(), funnelEntity, getHeaders()).getResponse();
         }
 
         // Extract the revision
         final Revision revision = getRevision(funnelEntity, id);
-
-        // handle expects request (usually from the cluster manager)
-        final boolean validationPhase = isValidationPhase(httpServletRequest);
-        if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
-            serviceFacade.claimRevision(revision);
-        }
-        if (validationPhase) {
-            serviceFacade.claimRevision(revision);
-            return generateContinueResponse().build();
-        }
-
-        // update the funnel
-        final UpdateResult<FunnelEntity> updateResult = 
serviceFacade.updateFunnel(revision, requestFunnelDTO);
-
-        // get the results
-        final FunnelEntity entity = updateResult.getResult();
-        populateRemainingFunnelEntityContent(entity);
-
-        if (updateResult.isNew()) {
-            return 
clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()),
 entity)).build();
-        } else {
-            return clusterContext(generateOkResponse(entity)).build();
-        }
+        return withWriteLock(
+            serviceFacade,
+            revision,
+            lookup -> {
+                final Authorizable funnel = lookup.getFunnel(id);
+                funnel.authorize(authorizer, RequestAction.WRITE);
+            },
+            null,
+            () -> {
+                // update the funnel
+                final UpdateResult<FunnelEntity> updateResult = 
serviceFacade.updateFunnel(revision, requestFunnelDTO);
+
+                // get the results
+                final FunnelEntity entity = updateResult.getResult();
+                populateRemainingFunnelEntityContent(entity);
+
+                if (updateResult.isNew()) {
+                    return 
clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()),
 entity)).build();
+                } else {
+                    return clusterContext(generateOkResponse(entity)).build();
+                }
+            }
+        );
     }
 
     /**
@@ -295,22 +296,22 @@ public class FunnelResource extends ApplicationResource {
             }
     )
     public Response removeFunnel(
-            @Context HttpServletRequest httpServletRequest,
+            @Context final HttpServletRequest httpServletRequest,
             @ApiParam(
                     value = "The revision is used to verify the client is 
working with the latest version of the flow.",
                     required = false
             )
-            @QueryParam(VERSION) LongParameter version,
+            @QueryParam(VERSION) final LongParameter version,
             @ApiParam(
                     value = "If the client id is not specified, new one will 
be generated. This value (whether specified or generated) is included in the 
response.",
                     required = false
             )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
+            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) final 
ClientIdParameter clientId,
             @ApiParam(
                     value = "The funnel id.",
                     required = true
             )
-            @PathParam("id") String id) {
+            @PathParam("id") final String id) {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
@@ -319,18 +320,20 @@ public class FunnelResource extends ApplicationResource {
 
         // handle expects request (usually from the cluster manager)
         final Revision revision = new Revision(version == null ? null : 
version.getLong(), clientId.getClientId(), id);
-        final boolean validationPhase = isValidationPhase(httpServletRequest);
-        if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
-            serviceFacade.claimRevision(revision);
-        }
-        if (validationPhase) {
-            serviceFacade.verifyDeleteFunnel(id);
-            return generateContinueResponse().build();
-        }
-
-        // delete the specified funnel
-        final FunnelEntity entity = serviceFacade.deleteFunnel(revision, id);
-        return clusterContext(generateOkResponse(entity)).build();
+        return withWriteLock(
+            serviceFacade,
+            revision,
+            lookup -> {
+                final Authorizable funnel = lookup.getFunnel(id);
+                funnel.authorize(authorizer, RequestAction.READ);
+            },
+            () -> serviceFacade.verifyDeleteFunnel(id),
+            () -> {
+                // delete the specified funnel
+                final FunnelEntity entity = 
serviceFacade.deleteFunnel(revision, id);
+                return clusterContext(generateOkResponse(entity)).build();
+            }
+        );
     }
 
     // setters
@@ -345,4 +348,8 @@ public class FunnelResource extends ApplicationResource {
     public void setProperties(NiFiProperties properties) {
         this.properties = properties;
     }
+
+    public void setAuthorizer(Authorizer authorizer) {
+        this.authorizer = authorizer;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
index 27110ea..f842956 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
@@ -16,10 +16,25 @@
  */
 package org.apache.nifi.web.api;
 
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.RequestAction;
+import org.apache.nifi.authorization.resource.Authorizable;
+import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.NiFiServiceFacade;
+import org.apache.nifi.web.Revision;
+import org.apache.nifi.web.UpdateResult;
+import org.apache.nifi.web.api.dto.PortDTO;
+import org.apache.nifi.web.api.entity.PortEntity;
+import org.apache.nifi.web.api.request.ClientIdParameter;
+import org.apache.nifi.web.api.request.LongParameter;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
@@ -35,24 +50,8 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.cluster.manager.impl.WebClusterManager;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.web.NiFiServiceFacade;
-import org.apache.nifi.web.Revision;
-import org.apache.nifi.web.UpdateResult;
-import org.apache.nifi.web.api.dto.PortDTO;
-import org.apache.nifi.web.api.entity.PortEntity;
-import org.apache.nifi.web.api.request.ClientIdParameter;
-import org.apache.nifi.web.api.request.LongParameter;
-
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
+import java.net.URI;
+import java.util.Set;
 
 /**
  * RESTful endpoint for managing an Input Port.
@@ -67,6 +66,7 @@ public class InputPortResource extends ApplicationResource {
     private NiFiServiceFacade serviceFacade;
     private WebClusterManager clusterManager;
     private NiFiProperties properties;
+    private Authorizer authorizer;
 
     /**
      * Populates the uri for the specified input ports.
@@ -150,13 +150,19 @@ public class InputPortResource extends 
ApplicationResource {
                     value = "The input port id.",
                     required = true
             )
-            @PathParam("id") String id) {
+            @PathParam("id") final String id) {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
             return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
+        // authorize access
+        serviceFacade.authorizeAccess(lookup -> {
+            final Authorizable inputPort = lookup.getInputPort(id);
+            inputPort.authorize(authorizer, RequestAction.READ);
+        });
+
         // get the port
         final PortEntity entity = serviceFacade.getInputPort(id);
         populateRemainingInputPortEntityContent(entity);
@@ -199,11 +205,11 @@ public class InputPortResource extends 
ApplicationResource {
                     value = "The input port id.",
                     required = true
             )
-            @PathParam("id") String id,
+            @PathParam("id") final String id,
             @ApiParam(
                     value = "The input port configuration details.",
                     required = true
-            ) PortEntity portEntity) {
+            ) final PortEntity portEntity) {
 
         if (portEntity == null || portEntity.getComponent() == null) {
             throw new IllegalArgumentException("Input port details must be 
specified.");
@@ -222,37 +228,34 @@ public class InputPortResource extends 
ApplicationResource {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
-            // change content type to JSON for serializing entity
-            final Map<String, String> headersToOverride = new HashMap<>();
-            headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
-
-            // replicate the request
-            return clusterManager.applyRequest(HttpMethod.PUT, 
getAbsolutePath(), portEntity, getHeaders(headersToOverride)).getResponse();
+            return clusterManager.applyRequest(HttpMethod.PUT, 
getAbsolutePath(), portEntity, getHeaders()).getResponse();
         }
 
         // handle expects request (usually from the cluster manager)
         final Revision revision = getRevision(portEntity, id);
-        final boolean validationPhase = isValidationPhase(httpServletRequest);
-        if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
-            serviceFacade.claimRevision(revision);
-        }
-        if (validationPhase) {
-            serviceFacade.verifyUpdateInputPort(requestPortDTO);
-            return generateContinueResponse().build();
-        }
+        return withWriteLock(
+            serviceFacade,
+            revision,
+            lookup -> {
+                final Authorizable inputPort = lookup.getInputPort(id);
+                inputPort.authorize(authorizer, RequestAction.WRITE);
+            },
+            () -> serviceFacade.verifyUpdateInputPort(requestPortDTO),
+            () -> {
+                // update the input port
+                final UpdateResult<PortEntity> updateResult = 
serviceFacade.updateInputPort(revision, requestPortDTO);
 
-        // update the input port
-        final UpdateResult<PortEntity> updateResult = 
serviceFacade.updateInputPort(revision, requestPortDTO);
-
-        // build the response entity
-        final PortEntity entity = updateResult.getResult();
-        populateRemainingInputPortEntityContent(entity);
+                // build the response entity
+                final PortEntity entity = updateResult.getResult();
+                populateRemainingInputPortEntityContent(entity);
 
-        if (updateResult.isNew()) {
-            return 
clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()),
 entity)).build();
-        } else {
-            return clusterContext(generateOkResponse(entity)).build();
-        }
+                if (updateResult.isNew()) {
+                    return 
clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()),
 entity)).build();
+                } else {
+                    return clusterContext(generateOkResponse(entity)).build();
+                }
+            }
+        );
     }
 
     /**
@@ -291,17 +294,17 @@ public class InputPortResource extends 
ApplicationResource {
                     value = "The revision is used to verify the client is 
working with the latest version of the flow.",
                     required = false
             )
-            @QueryParam(VERSION) LongParameter version,
+            @QueryParam(VERSION) final LongParameter version,
             @ApiParam(
                     value = "If the client id is not specified, new one will 
be generated. This value (whether specified or generated) is included in the 
response.",
                     required = false
             )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
+            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) final 
ClientIdParameter clientId,
             @ApiParam(
                     value = "The input port id.",
                     required = true
             )
-            @PathParam("id") String id) {
+            @PathParam("id") final String id) {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
@@ -310,18 +313,20 @@ public class InputPortResource extends 
ApplicationResource {
 
         // handle expects request (usually from the cluster manager)
         final Revision revision = new Revision(version == null ? null : 
version.getLong(), clientId.getClientId(), id);
-        final boolean validationPhase = isValidationPhase(httpServletRequest);
-        if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
-            serviceFacade.claimRevision(revision);
-        }
-        if (validationPhase) {
-            serviceFacade.verifyDeleteInputPort(id);
-            return generateContinueResponse().build();
-        }
-
-        // delete the specified input port
-        final PortEntity entity = serviceFacade.deleteInputPort(revision, id);
-        return clusterContext(generateOkResponse(entity)).build();
+        return withWriteLock(
+            serviceFacade,
+            revision,
+            lookup -> {
+                final Authorizable inputPort = lookup.getInputPort(id);
+                inputPort.authorize(authorizer, RequestAction.WRITE);
+            },
+            () -> serviceFacade.verifyDeleteInputPort(id),
+            () -> {
+                // delete the specified input port
+                final PortEntity entity = 
serviceFacade.deleteInputPort(revision, id);
+                return clusterContext(generateOkResponse(entity)).build();
+            }
+        );
     }
 
     // setters
@@ -336,4 +341,8 @@ public class InputPortResource extends ApplicationResource {
     public void setProperties(NiFiProperties properties) {
         this.properties = properties;
     }
+
+    public void setAuthorizer(Authorizer authorizer) {
+        this.authorizer = authorizer;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java
index 946d33d..d2cc3fa 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java
@@ -16,10 +16,27 @@
  */
 package org.apache.nifi.web.api;
 
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.RequestAction;
+import org.apache.nifi.authorization.resource.Authorizable;
+import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.NiFiServiceFacade;
+import org.apache.nifi.web.Revision;
+import org.apache.nifi.web.UpdateResult;
+import org.apache.nifi.web.api.dto.LabelDTO;
+import org.apache.nifi.web.api.entity.LabelEntity;
+import org.apache.nifi.web.api.request.ClientIdParameter;
+import org.apache.nifi.web.api.request.LongParameter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
@@ -35,26 +52,8 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.cluster.manager.impl.WebClusterManager;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.web.NiFiServiceFacade;
-import org.apache.nifi.web.Revision;
-import org.apache.nifi.web.UpdateResult;
-import org.apache.nifi.web.api.dto.LabelDTO;
-import org.apache.nifi.web.api.entity.LabelEntity;
-import org.apache.nifi.web.api.request.ClientIdParameter;
-import org.apache.nifi.web.api.request.LongParameter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
+import java.net.URI;
+import java.util.Set;
 
 /**
  * RESTful endpoint for managing a Label.
@@ -71,6 +70,7 @@ public class LabelResource extends ApplicationResource {
     private NiFiServiceFacade serviceFacade;
     private WebClusterManager clusterManager;
     private NiFiProperties properties;
+    private Authorizer authorizer;
 
     /**
      * Populates the uri for the specified labels.
@@ -154,13 +154,19 @@ public class LabelResource extends ApplicationResource {
                     value = "The label id.",
                     required = true
             )
-            @PathParam("id") String id) {
+            @PathParam("id") final String id) {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
             return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
+        // authorize access
+        serviceFacade.authorizeAccess(lookup -> {
+            final Authorizable label = lookup.getLabel(id);
+            label.authorize(authorizer, RequestAction.READ);
+        });
+
         // get the label
         final LabelEntity entity = serviceFacade.getLabel(id);
         populateRemainingLabelEntityContent(entity);
@@ -198,16 +204,16 @@ public class LabelResource extends ApplicationResource {
             }
     )
     public Response updateLabel(
-            @Context HttpServletRequest httpServletRequest,
+            @Context final HttpServletRequest httpServletRequest,
             @ApiParam(
                     value = "The label id.",
                     required = true
             )
-            @PathParam("id") String id,
+            @PathParam("id") final String id,
             @ApiParam(
                     value = "The label configuraiton details.",
                     required = true
-            ) LabelEntity labelEntity) {
+            ) final LabelEntity labelEntity) {
 
         if (labelEntity == null || labelEntity.getComponent() == null) {
             throw new IllegalArgumentException("Label details must be 
specified.");
@@ -226,34 +232,32 @@ public class LabelResource extends ApplicationResource {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
-            // change content type to JSON for serializing entity
-            final Map<String, String> headersToOverride = new HashMap<>();
-            headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
-
-            // replicate the request
-            return clusterManager.applyRequest(HttpMethod.PUT, 
getAbsolutePath(), labelEntity, getHeaders(headersToOverride)).getResponse();
+            return clusterManager.applyRequest(HttpMethod.PUT, 
getAbsolutePath(), labelEntity, getHeaders()).getResponse();
         }
 
         // handle expects request (usually from the cluster manager)
         final Revision revision = getRevision(labelEntity, id);
-        final boolean validationPhase = isValidationPhase(httpServletRequest);
-        if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
-            serviceFacade.claimRevision(revision);
-        }
-        if (validationPhase) {
-            return generateContinueResponse().build();
-        }
+        return withWriteLock(
+            serviceFacade,
+            revision,
+            lookup -> {
+                final Authorizable label = lookup.getLabel(id);
+                label.authorize(authorizer, RequestAction.WRITE);
+            },
+            null,
+            () -> {
+                // update the label
+                final UpdateResult<LabelEntity> result = 
serviceFacade.updateLabel(revision, requestLabelDTO);
+                final LabelEntity entity = result.getResult();
+                populateRemainingLabelEntityContent(entity);
 
-        // update the label
-        final UpdateResult<LabelEntity> result = 
serviceFacade.updateLabel(revision, requestLabelDTO);
-        final LabelEntity entity = result.getResult();
-        populateRemainingLabelEntityContent(entity);
-
-        if (result.isNew()) {
-            return 
clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()),
 entity)).build();
-        } else {
-            return clusterContext(generateOkResponse(entity)).build();
-        }
+                if (result.isNew()) {
+                    return 
clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()),
 entity)).build();
+                } else {
+                    return clusterContext(generateOkResponse(entity)).build();
+                }
+            }
+        );
     }
 
     /**
@@ -287,22 +291,22 @@ public class LabelResource extends ApplicationResource {
             }
     )
     public Response removeLabel(
-            @Context HttpServletRequest httpServletRequest,
+            @Context final HttpServletRequest httpServletRequest,
             @ApiParam(
                     value = "The revision is used to verify the client is 
working with the latest version of the flow.",
                     required = false
             )
-            @QueryParam(VERSION) LongParameter version,
+            @QueryParam(VERSION) final LongParameter version,
             @ApiParam(
                     value = "If the client id is not specified, new one will 
be generated. This value (whether specified or generated) is included in the 
response.",
                     required = false
             )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
+            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) final 
ClientIdParameter clientId,
             @ApiParam(
                     value = "The label id.",
                     required = true
             )
-            @PathParam("id") String id) {
+            @PathParam("id") final String id) {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
@@ -311,17 +315,20 @@ public class LabelResource extends ApplicationResource {
 
         // handle expects request (usually from the cluster manager)
         final Revision revision = new Revision(version == null ? null : 
version.getLong(), clientId.getClientId(), id);
-        final boolean validationPhase = isValidationPhase(httpServletRequest);
-        if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
-            serviceFacade.claimRevision(revision);
-        }
-        if (validationPhase) {
-            return generateContinueResponse().build();
-        }
-
-        // delete the specified label
-        final LabelEntity entity = serviceFacade.deleteLabel(revision, id);
-        return clusterContext(generateOkResponse(entity)).build();
+        return withWriteLock(
+            serviceFacade,
+            revision,
+            lookup -> {
+                final Authorizable label = lookup.getLabel(id);
+                label.authorize(authorizer, RequestAction.WRITE);
+            },
+            null,
+            () -> {
+                // delete the specified label
+                final LabelEntity entity = serviceFacade.deleteLabel(revision, 
id);
+                return clusterContext(generateOkResponse(entity)).build();
+            }
+        );
     }
 
     // setters
@@ -336,4 +343,8 @@ public class LabelResource extends ApplicationResource {
     public void setProperties(NiFiProperties properties) {
         this.properties = properties;
     }
+
+    public void setAuthorizer(Authorizer authorizer) {
+        this.authorizer = authorizer;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java
index eb18b79..892323b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java
@@ -16,10 +16,27 @@
  */
 package org.apache.nifi.web.api;
 
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.RequestAction;
+import org.apache.nifi.authorization.resource.Authorizable;
+import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.NiFiServiceFacade;
+import org.apache.nifi.web.Revision;
+import org.apache.nifi.web.UpdateResult;
+import org.apache.nifi.web.api.dto.PortDTO;
+import org.apache.nifi.web.api.entity.PortEntity;
+import org.apache.nifi.web.api.request.ClientIdParameter;
+import org.apache.nifi.web.api.request.LongParameter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
@@ -35,26 +52,8 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.cluster.manager.impl.WebClusterManager;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.web.NiFiServiceFacade;
-import org.apache.nifi.web.Revision;
-import org.apache.nifi.web.UpdateResult;
-import org.apache.nifi.web.api.dto.PortDTO;
-import org.apache.nifi.web.api.entity.PortEntity;
-import org.apache.nifi.web.api.request.ClientIdParameter;
-import org.apache.nifi.web.api.request.LongParameter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
+import java.net.URI;
+import java.util.Set;
 
 /**
  * RESTful endpoint for managing an Output Port.
@@ -71,6 +70,7 @@ public class OutputPortResource extends ApplicationResource {
     private NiFiServiceFacade serviceFacade;
     private WebClusterManager clusterManager;
     private NiFiProperties properties;
+    private Authorizer authorizer;
 
     /**
      * Populates the uri for the specified output ports.
@@ -154,13 +154,19 @@ public class OutputPortResource extends 
ApplicationResource {
                     value = "The output port id.",
                     required = true
             )
-            @PathParam("id") String id) {
+            @PathParam("id") final String id) {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
             return clusterManager.applyRequest(HttpMethod.GET, 
getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
+        // authorize access
+        serviceFacade.authorizeAccess(lookup -> {
+            final Authorizable outputPort = lookup.getOutputPort(id);
+            outputPort.authorize(authorizer, RequestAction.READ);
+        });
+
         // get the port
         final PortEntity entity = serviceFacade.getOutputPort(id);
         populateRemainingOutputPortEntityContent(entity);
@@ -198,16 +204,16 @@ public class OutputPortResource extends 
ApplicationResource {
             }
     )
     public Response updateOutputPort(
-            @Context HttpServletRequest httpServletRequest,
+            @Context final HttpServletRequest httpServletRequest,
             @ApiParam(
                     value = "The output port id.",
                     required = true
             )
-            @PathParam("id") String id,
+            @PathParam("id") final String id,
             @ApiParam(
                     value = "The output port configuration details.",
                     required = true
-            ) PortEntity portEntity) {
+            ) final PortEntity portEntity) {
 
         if (portEntity == null || portEntity.getComponent() == null) {
             throw new IllegalArgumentException("Output port details must be 
specified.");
@@ -226,37 +232,34 @@ public class OutputPortResource extends 
ApplicationResource {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
-            // change content type to JSON for serializing entity
-            final Map<String, String> headersToOverride = new HashMap<>();
-            headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
-
-            // replicate the request
-            return clusterManager.applyRequest(HttpMethod.PUT, 
getAbsolutePath(), portEntity, getHeaders(headersToOverride)).getResponse();
+            return clusterManager.applyRequest(HttpMethod.PUT, 
getAbsolutePath(), portEntity, getHeaders()).getResponse();
         }
 
         // handle expects request (usually from the cluster manager)
         final Revision revision = getRevision(portEntity, id);
-        final boolean validationPhase = isValidationPhase(httpServletRequest);
-        if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
-            serviceFacade.claimRevision(revision);
-        }
-        if (validationPhase) {
-            serviceFacade.verifyUpdateOutputPort(requestPortDTO);
-            return generateContinueResponse().build();
-        }
-
-        // update the output port
-        final UpdateResult<PortEntity> updateResult = 
serviceFacade.updateOutputPort(revision, requestPortDTO);
-
-        // get the results
-        final PortEntity entity = updateResult.getResult();
-        populateRemainingOutputPortEntityContent(entity);
-
-        if (updateResult.isNew()) {
-            return 
clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()),
 entity)).build();
-        } else {
-            return clusterContext(generateOkResponse(entity)).build();
-        }
+        return withWriteLock(
+            serviceFacade,
+            revision,
+            lookup -> {
+                final Authorizable outputPort = lookup.getOutputPort(id);
+                outputPort.authorize(authorizer, RequestAction.WRITE);
+            },
+            () -> serviceFacade.verifyUpdateOutputPort(requestPortDTO),
+            () -> {
+                // update the output port
+                final UpdateResult<PortEntity> updateResult = 
serviceFacade.updateOutputPort(revision, requestPortDTO);
+
+                // get the results
+                final PortEntity entity = updateResult.getResult();
+                populateRemainingOutputPortEntityContent(entity);
+
+                if (updateResult.isNew()) {
+                    return 
clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()),
 entity)).build();
+                } else {
+                    return clusterContext(generateOkResponse(entity)).build();
+                }
+            }
+        );
     }
 
     /**
@@ -290,22 +293,22 @@ public class OutputPortResource extends 
ApplicationResource {
             }
     )
     public Response removeOutputPort(
-            @Context HttpServletRequest httpServletRequest,
+            @Context final HttpServletRequest httpServletRequest,
             @ApiParam(
                     value = "The revision is used to verify the client is 
working with the latest version of the flow.",
                     required = false
             )
-            @QueryParam(VERSION) LongParameter version,
+            @QueryParam(VERSION) final LongParameter version,
             @ApiParam(
                     value = "If the client id is not specified, new one will 
be generated. This value (whether specified or generated) is included in the 
response.",
                     required = false
             )
-            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) 
ClientIdParameter clientId,
+            @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) final 
ClientIdParameter clientId,
             @ApiParam(
                     value = "The output port id.",
                     required = true
             )
-            @PathParam("id") String id) {
+            @PathParam("id") final String id) {
 
         // replicate if cluster manager
         if (properties.isClusterManager()) {
@@ -314,18 +317,20 @@ public class OutputPortResource extends 
ApplicationResource {
 
         // handle expects request (usually from the cluster manager)
         final Revision revision = new Revision(version == null ? null : 
version.getLong(), clientId.getClientId(), id);
-        final boolean validationPhase = isValidationPhase(httpServletRequest);
-        if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
-            serviceFacade.claimRevision(revision);
-        }
-        if (validationPhase) {
-            serviceFacade.verifyDeleteOutputPort(id);
-            return generateContinueResponse().build();
-        }
-
-        // delete the specified output port
-        final PortEntity entity = serviceFacade.deleteOutputPort(revision, id);
-        return clusterContext(generateOkResponse(entity)).build();
+        return withWriteLock(
+            serviceFacade,
+            revision,
+            lookup -> {
+                final Authorizable outputPort = lookup.getOutputPort(id);
+                outputPort.authorize(authorizer, RequestAction.WRITE);
+            },
+            () -> serviceFacade.verifyDeleteOutputPort(id),
+            () -> {
+                // delete the specified output port
+                final PortEntity entity = 
serviceFacade.deleteOutputPort(revision, id);
+                return clusterContext(generateOkResponse(entity)).build();
+            }
+        );
     }
 
     // setters
@@ -340,4 +345,8 @@ public class OutputPortResource extends ApplicationResource 
{
     public void setProperties(NiFiProperties properties) {
         this.properties = properties;
     }
+
+    public void setAuthorizer(Authorizer authorizer) {
+        this.authorizer = authorizer;
+    }
 }

Reply via email to