Github user mcgilman commented on a diff in the pull request: https://github.com/apache/nifi/pull/2990#discussion_r217387691 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java --- @@ -434,6 +436,197 @@ public Response updateRemoteProcessGroupOutputPort( ); } + /** + * Updates the specified remote process group input port run status. + * + * @param httpServletRequest request + * @param id The id of the remote process group to update. + * @param portId The id of the input port to update. + * @param requestRemotePortRunStatusEntity The remoteProcessGroupPortRunStatusEntity + * @return A remoteProcessGroupPortEntity + */ + @PUT + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @Path("{id}/input-ports/{port-id}/run-status") + @ApiOperation( + value = "Updates run status of a remote port", + notes = NON_GUARANTEED_ENDPOINT, + response = RemoteProcessGroupPortEntity.class, + authorizations = { + @Authorization(value = "Write - /remote-process-groups/{uuid} or /operation/remote-process-groups/{uuid}") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response updateRemoteProcessGroupInputPortRunStatus( + @Context final HttpServletRequest httpServletRequest, + @ApiParam( + value = "The remote process group id.", + required = true + ) + @PathParam("id") final String id, + @ApiParam( + value = "The remote process group port id.", + required = true + ) + @PathParam("port-id") final String portId, + @ApiParam( + value = "The remote process group port.", + required = true + ) final RemotePortRunStatusEntity requestRemotePortRunStatusEntity) { + + if (requestRemotePortRunStatusEntity == null) { + throw new IllegalArgumentException("Remote process group port run status must be specified."); + } + + if (requestRemotePortRunStatusEntity.getRevision() == null) { + throw new IllegalArgumentException("Revision must be specified."); + } + + requestRemotePortRunStatusEntity.validateState(); + + if (isReplicateRequest()) { + return replicate(HttpMethod.PUT, requestRemotePortRunStatusEntity); + } else if (isDisconnectedFromCluster()) { + verifyDisconnectedNodeModification(requestRemotePortRunStatusEntity.isDisconnectedNodeAcknowledged()); + } + + final Revision requestRevision = getRevision(requestRemotePortRunStatusEntity.getRevision(), id); + return withWriteLock( + serviceFacade, + requestRemotePortRunStatusEntity, + requestRevision, + lookup -> { + final Authorizable remoteProcessGroup = lookup.getRemoteProcessGroup(id); + OperationAuthorizable.isAuthorized(remoteProcessGroup, authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + }, + () -> serviceFacade.verifyUpdateRemoteProcessGroupInputPort(id, createPortDTOWithDesiredRunStatus(portId, id, requestRemotePortRunStatusEntity)), + (revision, remotePortRunStatusEntity) -> { + // update the specified remote process group + final RemoteProcessGroupPortEntity controllerResponse = serviceFacade.updateRemoteProcessGroupInputPort(revision, id, + createPortDTOWithDesiredRunStatus(portId, id, remotePortRunStatusEntity)); + + // get the updated revision + final RevisionDTO updatedRevision = controllerResponse.getRevision(); + + // build the response entity + final RemoteProcessGroupPortEntity entity = new RemoteProcessGroupPortEntity(); + entity.setRevision(updatedRevision); + entity.setRemoteProcessGroupPort(controllerResponse.getRemoteProcessGroupPort()); + + return generateOkResponse(entity).build(); + } + ); + } + + private RemoteProcessGroupPortDTO createPortDTOWithDesiredRunStatus(final String portId, final String groupId, final RemotePortRunStatusEntity entity) { + final RemoteProcessGroupPortDTO dto = new RemoteProcessGroupPortDTO(); + dto.setId(portId); + dto.setGroupId(groupId); + dto.setTransmitting(shouldTransmit(entity)); + return dto; + } + + /** + * Updates the specified remote process group output port run status. + * + * @param httpServletRequest request + * @param id The id of the remote process group to update. + * @param portId The id of the output port to update. + * @param requestRemotePortRunStatusEntity The remoteProcessGroupPortEntity + * @return A remoteProcessGroupPortEntity + */ + @PUT + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @Path("{id}/output-ports/{port-id}/run-status") + @ApiOperation( + value = "Updates run status of a remote port", + notes = NON_GUARANTEED_ENDPOINT, + response = RemoteProcessGroupPortEntity.class, + authorizations = { + @Authorization(value = "Write - /remote-process-groups/{uuid} or /operation/remote-process-groups/{uuid}") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response updateRemoteProcessGroupOutputPortRunStatus( + @Context HttpServletRequest httpServletRequest, + @ApiParam( + value = "The remote process group id.", + required = true + ) + @PathParam("id") String id, + @ApiParam( + value = "The remote process group port id.", + required = true + ) + @PathParam("port-id") String portId, + @ApiParam( + value = "The remote process group port.", + required = true + ) RemotePortRunStatusEntity requestRemotePortRunStatusEntity) { + + if (requestRemotePortRunStatusEntity == null) { + throw new IllegalArgumentException("Remote process group port run status must be specified."); + } + + if (requestRemotePortRunStatusEntity.getRevision() == null) { + throw new IllegalArgumentException("Revision must be specified."); + } + + requestRemotePortRunStatusEntity.validateState(); + + if (isReplicateRequest()) { + return replicate(HttpMethod.PUT, requestRemotePortRunStatusEntity); + } else if (isDisconnectedFromCluster()) { + verifyDisconnectedNodeModification(requestRemotePortRunStatusEntity.isDisconnectedNodeAcknowledged()); + } + + // handle expects request (usually from the cluster manager) + final Revision requestRevision = getRevision(requestRemotePortRunStatusEntity.getRevision(), id); + return withWriteLock( + serviceFacade, + requestRemotePortRunStatusEntity, + requestRevision, + lookup -> { + final Authorizable remoteProcessGroup = lookup.getRemoteProcessGroup(id); + OperationAuthorizable.isAuthorized(remoteProcessGroup, authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); --- End diff -- This should be calling `authorize` and not checking `isAuthorized`.
---