exceptionfactory commented on a change in pull request #4846:
URL: https://github.com/apache/nifi/pull/4846#discussion_r583786018



##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
##########
@@ -4173,6 +4187,205 @@ private void sanitizeRegistryInfo(final 
VersionedProcessGroup versionedProcessGr
         }
     }
 
+    /**
+     * Uploads the specified versioned flow definition and adds it to a new 
process group.
+     *
+     * @param httpServletRequest request
+     * @param in The flow definition stream
+     * @return A processGroupEntity
+     * @throws IOException if there is an error during deserialization of the 
InputStream
+     */
+    @POST
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/process-groups/upload")
+    @ApiOperation(
+            value = "Uploads a versioned flow definition and creates a process 
group",
+            response = ProcessGroupEntity.class,
+            authorizations = {
+                    @Authorization(value = "Write - /process-groups/{uuid}")
+            }
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not 
authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource 
could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid 
but NiFi was not in the appropriate state to process it. Retrying the same 
request later may be successful.")
+            }
+    )
+    public Response uploadProcessGroup(
+            @Context final HttpServletRequest httpServletRequest,
+            @ApiParam(
+                    value = "The process group id.",
+                    required = true
+            )
+            @PathParam("id") final String groupId,
+            @ApiParam(
+                    value = "The process group name.",
+                    required = true
+            )
+            @FormDataParam("groupName") final String groupName,
+            @ApiParam(
+                    value = "The process group X position.",
+                    required = true
+            )
+            @FormDataParam("position-x") final Double positionX,
+            @ApiParam(
+                    value = "The process group Y position.",
+                    required = true
+            )
+            @FormDataParam("position-y") final Double positionY,
+            @ApiParam(
+                    value = "The client id.",
+                    required = true
+            )
+            @FormDataParam("clientId") final String clientId,
+            @ApiParam(
+                    value = "Acknowledges that this node is disconnected to 
allow for mutable requests to proceed.",
+                    required = false
+            )
+            @FormDataParam(DISCONNECTED_NODE_ACKNOWLEDGED) 
@DefaultValue("false") final Boolean disconnectedNodeAcknowledged,
+            @FormDataParam("file") final InputStream in) throws IOException {
+
+        // ensure the group name is specified
+        if (StringUtils.isBlank(groupName)) {
+            throw new IllegalArgumentException("The process group name is 
required.");
+        }
+
+        if (StringUtils.isBlank(groupId)) {
+            throw new IllegalArgumentException("The parent process group id is 
required");
+        }
+
+        if (positionX == null) {
+            throw new IllegalArgumentException("The x coordinate of the 
proposed position must be specified.");
+        }
+
+        if (positionY == null) {
+            throw new IllegalArgumentException("The y coordinate of the 
proposed position must be specified.");
+        }
+
+        if (StringUtils.isBlank(clientId)) {
+            throw new IllegalArgumentException("The client id must be 
specified");
+        }
+
+        // get the contents of the InputStream as a String
+        String stringContent;
+        if (in != null) {
+            try {
+                stringContent = IOUtils.toString(in, StandardCharsets.UTF_8);
+            } catch (IOException e) {
+                throw new IOException("Unable to read the InputStream", e);
+            }
+        } else {
+            logger.warn("The InputStream is null");
+            throw new NullPointerException("The InputStream is null");
+        }
+
+        // deserialize content to a VersionedFlowSnapshot
+        VersionedFlowSnapshot deserializedSnapshot;
+
+        if (stringContent.length() > 0) {
+            try {
+                deserializedSnapshot = MAPPER.readValue(stringContent, 
VersionedFlowSnapshot.class);
+            } catch (JsonParseException jpe) {
+                logger.warn("Parsing uploaded JSON failed", jpe);
+                return 
Response.status(Response.Status.OK).entity(INVALID_JSON_RESPONSE).type("application/json").build();
+            } catch (IOException e) {
+                logger.warn("Deserialization of uploaded JSON failed", e);
+                throw new IOException("Deserialization of uploaded JSON 
failed", e);
+            }
+        } else {
+            logger.warn("The uploaded file was empty");
+            throw new IOException("The uploaded file was empty.");
+        }
+
+        // create a new ProcessGroupEntity
+        final ProcessGroupEntity newProcessGroupEntity = 
createProcessGroupEntity(groupId, groupName, positionX, positionY, 
deserializedSnapshot);
+
+        // replicate the request or call serviceFacade.updateProcessGroup
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.POST, newProcessGroupEntity);
+        } else if (isDisconnectedFromCluster()) {
+            
verifyDisconnectedNodeModification(newProcessGroupEntity.isDisconnectedNodeAcknowledged());
+        }
+
+        return withWriteLock(
+                serviceFacade,
+                newProcessGroupEntity,
+                lookup -> {
+                    final NiFiUser user = NiFiUserUtils.getNiFiUser();
+                    final Authorizable processGroup = 
lookup.getProcessGroup(groupId).getAuthorizable();
+                    processGroup.authorize(authorizer, RequestAction.WRITE, 
user);
+
+                    // if request specifies a Parameter Context, need to 
authorize that user has READ policy for the Parameter Context.
+                    final ParameterContextReferenceEntity 
referencedParamContext = 
newProcessGroupEntity.getComponent().getParameterContext();
+                    if (referencedParamContext != null && 
referencedParamContext.getId() != null) {
+                        
lookup.getParameterContext(referencedParamContext.getId()).authorize(authorizer,
 RequestAction.READ, user);
+                    }
+
+                    // if any of the components is a Restricted Component, 
then we must authorize the user
+                    // for write access to the RestrictedComponents resource
+                    final VersionedFlowSnapshot versionedFlowSnapshot = 
newProcessGroupEntity.getVersionedFlowSnapshot();
+                    if (versionedFlowSnapshot != null) {
+                        final Set<ConfigurableComponent> restrictedComponents 
= 
FlowRegistryUtils.getRestrictedComponents(versionedFlowSnapshot.getFlowContents(),
 serviceFacade);
+                        restrictedComponents.forEach(restrictedComponent -> {
+                            final ComponentAuthorizable 
restrictedComponentAuthorizable = 
lookup.getConfigurableComponent(restrictedComponent);
+                            authorizeRestrictions(authorizer, 
restrictedComponentAuthorizable);
+                        });
+
+                        final Map<String, VersionedParameterContext> 
parameterContexts = versionedFlowSnapshot.getParameterContexts();
+                        if (parameterContexts != null) {
+                            parameterContexts.values().forEach(context -> 
AuthorizeParameterReference.authorizeParameterContextAddition(context, 
serviceFacade, authorizer, lookup, user));
+                        }
+                    }
+                },
+                () -> {
+                    final VersionedFlowSnapshot versionedFlowSnapshot = 
newProcessGroupEntity.getVersionedFlowSnapshot();
+                    if (versionedFlowSnapshot != null) {
+                        
serviceFacade.verifyComponentTypes(versionedFlowSnapshot.getFlowContents());
+                    }
+                },
+                processGroupEntity -> {
+                    final ProcessGroupDTO processGroup = 
processGroupEntity.getComponent();
+
+                    // set the processor id as appropriate
+                    processGroup.setId(generateUuid());
+
+                    // get the versioned flow
+                    final VersionedFlowSnapshot flowSnapshot = 
processGroupEntity.getVersionedFlowSnapshot();
+
+                    // create the process group contents
+                    final Revision revision = new Revision((long) 0, clientId, 
processGroup.getId());
+
+                    ProcessGroupEntity entity = 
serviceFacade.createProcessGroup(revision, groupId, processGroup);
+
+                    if (flowSnapshot != null) {
+                        final RevisionDTO revisionDto = entity.getRevision();
+                        final String newGroupId = 
entity.getComponent().getId();
+                        final Revision newGroupRevision = new 
Revision(revisionDto.getVersion(), revisionDto.getClientId(), newGroupId);
+
+                        // We don't want the Process Group's position to be 
updated because we want to keep the position where the user
+                        // placed the Process Group. We do not want to use the 
name of the Process Group that is in the Flow Contents.
+                        // To accomplish this, we call 
updateProcessGroupContents() passing 'false' for the updateSettings flag, set
+                        // the Process Group name, and null out the position.
+                        flowSnapshot.getFlowContents().setPosition(null);
+                        flowSnapshot.getFlowContents().setName(groupName);
+
+                        entity = 
serviceFacade.updateProcessGroupContents(newGroupRevision, newGroupId, null, 
flowSnapshot,
+                                getIdGenerationSeed().orElse(null), false, 
false, true);
+                    }
+
+                    populateRemainingProcessGroupEntityContent(entity);
+
+                    // generate a 201 created response
+                    String uri = entity.getUri();
+                    return generateCreatedResponse(URI.create(uri), 
entity).build();
+                }
+        );

Review comment:
       This block of code looks very similar to the implementation in the 
`createProcessGroup` method.  Did you consider refactoring both methods to 
minimize duplication?

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
##########
@@ -4173,6 +4179,227 @@ private void sanitizeRegistryInfo(final 
VersionedProcessGroup versionedProcessGr
         }
     }
 
+    /**
+     * Uploads the specified versioned flow file and adds it to a new process 
group.
+     *
+     * @param httpServletRequest request
+     * @param in The flow file stream
+     * @return A processGroupEntity
+     * @throws IOException if there is an error during deserialization of the 
InputStream
+     */
+    @POST
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/process-groups/upload")
+    @ApiOperation(
+            value = "Uploads a versioned flow file and creates a process 
group",
+            response = ProcessGroupEntity.class,
+            authorizations = {
+                    @Authorization(value = "Write - /process-groups/{uuid}")
+            }
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not 
authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource 
could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid 
but NiFi was not in the appropriate state to process it. Retrying the same 
request later may be successful.")
+            }
+    )
+    public Response uploadProcessGroup(
+            @Context final HttpServletRequest httpServletRequest,
+            @ApiParam(
+                    value = "The process group id.",
+                    required = true
+            )
+            @PathParam("id") final String groupId,
+            @ApiParam(
+                    value = "The process group name.",
+                    required = true
+            )
+            @FormDataParam("groupName") final String groupName,
+            @ApiParam(
+                    value = "The process group X position.",
+                    required = true
+            )
+            @FormDataParam("position-x") final Double positionX,
+            @ApiParam(
+                    value = "The process group Y position.",
+                    required = true
+            )
+            @FormDataParam("position-y") final Double positionY,
+            @ApiParam(
+                    value = "The client id.",
+                    required = true
+            )
+            @FormDataParam("clientId") final String clientId,
+            @ApiParam(
+                    value = "Acknowledges that this node is disconnected to 
allow for mutable requests to proceed.",
+                    required = false
+            )
+            @FormDataParam(DISCONNECTED_NODE_ACKNOWLEDGED) 
@DefaultValue("false") final Boolean disconnectedNodeAcknowledged,
+            @FormDataParam("file") final InputStream in) throws IOException {
+
+        // ensure the group name is specified
+        if (StringUtils.isBlank(groupName)) {
+            throw new IllegalArgumentException("The process group name is 
required.");
+        }
+
+        if (StringUtils.isBlank(groupId)) {
+            throw new IllegalArgumentException("The parent process group id 
must be the same as specified in the URI.");
+        }
+
+        if (positionX == null) {
+            throw new IllegalArgumentException("The x coordinate of the 
proposed position must be specified.");
+        }
+
+        if (positionY == null) {
+            throw new IllegalArgumentException("The y coordinate of the 
proposed position must be specified.");
+        }
+
+        if (StringUtils.isBlank(clientId)) {
+            throw new IllegalArgumentException("The client id must be 
specified");
+        }
+
+        // create a new process group
+        final ProcessGroupEntity newProcessGroupEntity = new 
ProcessGroupEntity();
+
+        // get the contents of the InputStream as a String
+        String stringContent = null;
+        if (in != null) {

Review comment:
       Thanks for adding the `NullPointException`.  To the second point of the 
comment, it should be possible to remove the call to `IOUtils.toString()` and 
pass the `InputStream` directly to `ObjectMapper.readValue()`.

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
##########
@@ -4173,6 +4187,205 @@ private void sanitizeRegistryInfo(final 
VersionedProcessGroup versionedProcessGr
         }
     }
 
+    /**
+     * Uploads the specified versioned flow definition and adds it to a new 
process group.
+     *
+     * @param httpServletRequest request
+     * @param in The flow definition stream
+     * @return A processGroupEntity
+     * @throws IOException if there is an error during deserialization of the 
InputStream
+     */
+    @POST
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/process-groups/upload")
+    @ApiOperation(
+            value = "Uploads a versioned flow definition and creates a process 
group",
+            response = ProcessGroupEntity.class,
+            authorizations = {
+                    @Authorization(value = "Write - /process-groups/{uuid}")
+            }
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to 
complete the request because it was invalid. The request should not be retried 
without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be 
authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not 
authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource 
could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid 
but NiFi was not in the appropriate state to process it. Retrying the same 
request later may be successful.")
+            }
+    )
+    public Response uploadProcessGroup(
+            @Context final HttpServletRequest httpServletRequest,
+            @ApiParam(
+                    value = "The process group id.",
+                    required = true
+            )
+            @PathParam("id") final String groupId,
+            @ApiParam(
+                    value = "The process group name.",
+                    required = true
+            )
+            @FormDataParam("groupName") final String groupName,
+            @ApiParam(
+                    value = "The process group X position.",
+                    required = true
+            )
+            @FormDataParam("position-x") final Double positionX,
+            @ApiParam(
+                    value = "The process group Y position.",
+                    required = true
+            )
+            @FormDataParam("position-y") final Double positionY,
+            @ApiParam(
+                    value = "The client id.",
+                    required = true
+            )
+            @FormDataParam("clientId") final String clientId,
+            @ApiParam(
+                    value = "Acknowledges that this node is disconnected to 
allow for mutable requests to proceed.",
+                    required = false
+            )
+            @FormDataParam(DISCONNECTED_NODE_ACKNOWLEDGED) 
@DefaultValue("false") final Boolean disconnectedNodeAcknowledged,
+            @FormDataParam("file") final InputStream in) throws IOException {
+
+        // ensure the group name is specified
+        if (StringUtils.isBlank(groupName)) {
+            throw new IllegalArgumentException("The process group name is 
required.");
+        }
+
+        if (StringUtils.isBlank(groupId)) {
+            throw new IllegalArgumentException("The parent process group id is 
required");
+        }
+
+        if (positionX == null) {
+            throw new IllegalArgumentException("The x coordinate of the 
proposed position must be specified.");
+        }
+
+        if (positionY == null) {
+            throw new IllegalArgumentException("The y coordinate of the 
proposed position must be specified.");
+        }
+
+        if (StringUtils.isBlank(clientId)) {
+            throw new IllegalArgumentException("The client id must be 
specified");
+        }
+
+        // get the contents of the InputStream as a String
+        String stringContent;
+        if (in != null) {
+            try {
+                stringContent = IOUtils.toString(in, StandardCharsets.UTF_8);
+            } catch (IOException e) {
+                throw new IOException("Unable to read the InputStream", e);
+            }
+        } else {
+            logger.warn("The InputStream is null");
+            throw new NullPointerException("The InputStream is null");
+        }
+
+        // deserialize content to a VersionedFlowSnapshot
+        VersionedFlowSnapshot deserializedSnapshot;
+
+        if (stringContent.length() > 0) {
+            try {
+                deserializedSnapshot = MAPPER.readValue(stringContent, 
VersionedFlowSnapshot.class);
+            } catch (JsonParseException jpe) {
+                logger.warn("Parsing uploaded JSON failed", jpe);
+                return 
Response.status(Response.Status.OK).entity(INVALID_JSON_RESPONSE).type("application/json").build();

Review comment:
       The call to `type()` indicates that the string provided to `entity()` 
should be formatted as JSON, which corresponds to the 
`@Produces(MediaType.APPLICATION_JSON)` annotation on this method.
   
   On closer review, it looks like the status should be 
`Response.Status.BAD_REQUEST` to return an HTTP 400 instead of 
`Response.Status.OK`, otherwise wouldn't the JavaScript handling call success 
in this case?
   
   Stepping back for a moment, is there a reason for returning a response at 
this point instead of throwing an exception and having the framework return an 
appropriately formatted response?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to