exceptionfactory commented on code in PR #7191:
URL: https://github.com/apache/nifi/pull/7191#discussion_r1307954380
##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java:
##########
@@ -3039,6 +3098,86 @@ public Response getTemplates() {
return generateOkResponse(entity).build();
}
+ // -------------
+ // flow-analysis
+ // -------------
+
+ /**
+ * Returns flow analysis results produced by the analysis of a given
process group.
+ *
+ * @return a flowAnalysisResultEntity containing flow analysis results
produced by the analysis of the given process group
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("flow-analysis/result/{processGroupId}")
+ @ApiOperation(
+ value = "Returns flow analysis results produced by the analysis of a
given process group",
+ response = FlowAnalysisResultEntity.class,
+ authorizations = {
+ @Authorization(value = "Read - /controller")
+ })
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the
request because it was invalid. The request should not be retried without
modification."),
+ @ApiResponse(code = 401, message = "Client could not be
authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make
this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not
be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was
not in the appropriate state to process it. Retrying the same request later may
be successful.")
+ })
+ public Response getFlowAnalysisResults(
+ @Context final HttpServletRequest httpServletRequest,
+ @ApiParam(
+ value = "The id of the process group representing (a part of) the
flow to be analyzed.",
+ required = true
+ )
+ @PathParam("processGroupId")
+ final String processGroupId
+ ) {
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.GET);
+ }
+
+ authorizeFlow();
+
+ FlowAnalysisResultEntity entity =
serviceFacade.getFlowAnalysisResult(processGroupId);
+
+ return generateOkResponse(entity).build();
+ }
+
+ /**
+ * Returns all flow analysis results currently in effect.
+ *
+ * @return a flowAnalysisRuleEntity containing all flow analysis results
currently in-effect
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("flow-analysis/result")
Review Comment:
It seems like this path should be pluralized to `/results`:
```suggestion
@Path("flow-analysis/results")
```
##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/flowanalysis/StandardFlowAnalyzer.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowanalysis;
+
+import org.apache.nifi.controller.FlowAnalysisRuleNode;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.flowanalysis.FlowAnalysisRuleProvider;
+import org.apache.nifi.controller.flowanalysis.FlowAnalysisUtil;
+import org.apache.nifi.controller.flowanalysis.FlowAnalyzer;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
+import org.apache.nifi.validation.RuleViolation;
+import org.apache.nifi.validation.RuleViolationsManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * {@link FlowAnalyzer} that uses {@link
org.apache.nifi.flowanalysis.FlowAnalysisRule FlowAnalysisRules}.
+ */
+public class StandardFlowAnalyzer implements FlowAnalyzer {
+ private Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private final RuleViolationsManager ruleViolationsManager;
+
+ private final FlowAnalysisRuleProvider flowAnalysisRuleProvider;
+ private final ExtensionManager extensionManager;
+
+ private ControllerServiceProvider controllerServiceProvider;
+
+ public StandardFlowAnalyzer(
+ final RuleViolationsManager ruleViolationsManager,
+ final FlowAnalysisRuleProvider flowAnalysisRuleProvider,
+ final ExtensionManager extensionManager
+ ) {
+ this.ruleViolationsManager = ruleViolationsManager;
+ this.flowAnalysisRuleProvider = flowAnalysisRuleProvider;
+ this.extensionManager = extensionManager;
+ }
+
+ public void initialize(final ControllerServiceProvider
controllerServiceProvider) {
+ this.controllerServiceProvider = controllerServiceProvider;
+ }
+
+ @Override
+ public void analyzeProcessor(ProcessorNode processorNode) {
+ logger.debug("Running analysis on {}", processorNode);
+
+ final NiFiRegistryFlowMapper mapper = createMapper();
+
+ VersionedProcessor versionedProcessor = mapper.mapProcessor(
+ processorNode,
+ controllerServiceProvider,
+ Collections.emptySet(),
+ new HashMap<>()
+ );
+
+ analyzeComponent(versionedProcessor);
+ }
+
+ @Override
+ public void analyzeControllerService(ControllerServiceNode
controllerServiceNode) {
+ logger.debug("Running analysis on {}", controllerServiceNode);
+
+ final NiFiRegistryFlowMapper mapper = createMapper();
+
+ VersionedControllerService versionedControllerService =
mapper.mapControllerService(
+ controllerServiceNode,
+ controllerServiceProvider,
+ Collections.emptySet(),
+ new HashMap<>()
+ );
+
+ analyzeComponent(versionedControllerService);
+ }
+
+ private void analyzeComponent(VersionedComponent component) {
+ Instant start = Instant.now();
Review Comment:
It would be more efficient to use `System.currentTimeMillis()` for these
calculations as it avoids the object creation of `Instant`. Although fairly
insignificant on its own, it could add up to more work when analyzing a flow
with tens of thousands of components.
##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/flowanalysis/StandardFlowAnalyzer.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowanalysis;
+
+import org.apache.nifi.controller.FlowAnalysisRuleNode;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.flowanalysis.FlowAnalysisRuleProvider;
+import org.apache.nifi.controller.flowanalysis.FlowAnalysisUtil;
+import org.apache.nifi.controller.flowanalysis.FlowAnalyzer;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
+import org.apache.nifi.validation.RuleViolation;
+import org.apache.nifi.validation.RuleViolationsManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * {@link FlowAnalyzer} that uses {@link
org.apache.nifi.flowanalysis.FlowAnalysisRule FlowAnalysisRules}.
+ */
+public class StandardFlowAnalyzer implements FlowAnalyzer {
+ private Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private final RuleViolationsManager ruleViolationsManager;
+
+ private final FlowAnalysisRuleProvider flowAnalysisRuleProvider;
+ private final ExtensionManager extensionManager;
+
+ private ControllerServiceProvider controllerServiceProvider;
+
+ public StandardFlowAnalyzer(
+ final RuleViolationsManager ruleViolationsManager,
+ final FlowAnalysisRuleProvider flowAnalysisRuleProvider,
+ final ExtensionManager extensionManager
+ ) {
+ this.ruleViolationsManager = ruleViolationsManager;
+ this.flowAnalysisRuleProvider = flowAnalysisRuleProvider;
+ this.extensionManager = extensionManager;
+ }
+
+ public void initialize(final ControllerServiceProvider
controllerServiceProvider) {
+ this.controllerServiceProvider = controllerServiceProvider;
+ }
+
+ @Override
+ public void analyzeProcessor(ProcessorNode processorNode) {
+ logger.debug("Running analysis on {}", processorNode);
+
+ final NiFiRegistryFlowMapper mapper = createMapper();
+
+ VersionedProcessor versionedProcessor = mapper.mapProcessor(
+ processorNode,
+ controllerServiceProvider,
+ Collections.emptySet(),
+ new HashMap<>()
+ );
+
+ analyzeComponent(versionedProcessor);
+ }
+
+ @Override
+ public void analyzeControllerService(ControllerServiceNode
controllerServiceNode) {
+ logger.debug("Running analysis on {}", controllerServiceNode);
+
+ final NiFiRegistryFlowMapper mapper = createMapper();
+
+ VersionedControllerService versionedControllerService =
mapper.mapControllerService(
+ controllerServiceNode,
+ controllerServiceProvider,
+ Collections.emptySet(),
+ new HashMap<>()
+ );
+
+ analyzeComponent(versionedControllerService);
+ }
+
+ private void analyzeComponent(VersionedComponent component) {
+ Instant start = Instant.now();
+
+ String componentId = component.getIdentifier();
+ Set<FlowAnalysisRuleNode> flowAnalysisRules =
flowAnalysisRuleProvider.getAllFlowAnalysisRules();
+
+ Set<RuleViolation> violations = flowAnalysisRules.stream()
+ .filter(FlowAnalysisRuleNode::isEnabled)
+ .flatMap(flowAnalysisRuleNode -> {
+ String ruleId = flowAnalysisRuleNode.getIdentifier();
+
+ try {
+ Collection<ComponentAnalysisResult> analysisResults =
flowAnalysisRuleNode
+ .getFlowAnalysisRule()
+ .analyzeComponent(component,
flowAnalysisRuleNode.getFlowAnalysisRuleContext());
+
+ return analysisResults.stream()
+ .map(analysisResult -> new RuleViolation(
+
flowAnalysisRuleNode.getEnforcementPolicy(),
+ componentId,
+ componentId,
+ getDisplayName(component),
+ component.getGroupIdentifier(),
+ ruleId,
+ analysisResult.getIssueId(),
+ analysisResult.getMessage(),
+ analysisResult.getExplanation()
+ ));
+ } catch (Exception e) {
+ logger.error("FlowAnalysis error while running '{}'
against '{}'", flowAnalysisRuleNode.getName(), component, e);
+ return Stream.empty();
+ }
+ })
+ .collect(Collectors.toSet());
+
+ ruleViolationsManager.upsertComponentViolations(componentId,
violations);
+
+ Instant end = Instant.now();
+
+ long durationMs = Duration.between(start, end).toMillis();
+
+ logger.debug("Flow Analysis took {} ms", durationMs);
+ }
+
+ @Override
+ public void analyzeProcessGroup(VersionedProcessGroup processGroup) {
+ logger.debug("Running analysis on process group {}.",
processGroup.getIdentifier());
+
+ Instant start = Instant.now();
+
+ Set<FlowAnalysisRuleNode> flowAnalysisRules =
flowAnalysisRuleProvider.getAllFlowAnalysisRules();
+
+ Collection<RuleViolation> groupViolations = new HashSet<>();
+ Map<VersionedComponent, Collection<RuleViolation>>
componentToRuleViolations = new HashMap<>();
+
+ analyzeProcessGroup(processGroup, flowAnalysisRules, groupViolations,
componentToRuleViolations);
+
+ ruleViolationsManager.upsertGroupViolations(processGroup,
groupViolations, componentToRuleViolations);
+
+ Instant end = Instant.now();
+
+ long durationMs = Duration.between(start, end).toMillis();
+
+ logger.debug("Flow Analysis took {} ms", durationMs);
Review Comment:
This log should indicate the process group being analyzed.
##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/flowanalysis/StandardFlowAnalyzer.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowanalysis;
+
+import org.apache.nifi.controller.FlowAnalysisRuleNode;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.flowanalysis.FlowAnalysisRuleProvider;
+import org.apache.nifi.controller.flowanalysis.FlowAnalysisUtil;
+import org.apache.nifi.controller.flowanalysis.FlowAnalyzer;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
+import org.apache.nifi.validation.RuleViolation;
+import org.apache.nifi.validation.RuleViolationsManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * {@link FlowAnalyzer} that uses {@link
org.apache.nifi.flowanalysis.FlowAnalysisRule FlowAnalysisRules}.
+ */
+public class StandardFlowAnalyzer implements FlowAnalyzer {
+ private Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private final RuleViolationsManager ruleViolationsManager;
+
+ private final FlowAnalysisRuleProvider flowAnalysisRuleProvider;
+ private final ExtensionManager extensionManager;
+
+ private ControllerServiceProvider controllerServiceProvider;
+
+ public StandardFlowAnalyzer(
+ final RuleViolationsManager ruleViolationsManager,
+ final FlowAnalysisRuleProvider flowAnalysisRuleProvider,
+ final ExtensionManager extensionManager
+ ) {
+ this.ruleViolationsManager = ruleViolationsManager;
+ this.flowAnalysisRuleProvider = flowAnalysisRuleProvider;
+ this.extensionManager = extensionManager;
+ }
+
+ public void initialize(final ControllerServiceProvider
controllerServiceProvider) {
+ this.controllerServiceProvider = controllerServiceProvider;
+ }
+
+ @Override
+ public void analyzeProcessor(ProcessorNode processorNode) {
+ logger.debug("Running analysis on {}", processorNode);
+
+ final NiFiRegistryFlowMapper mapper = createMapper();
+
+ VersionedProcessor versionedProcessor = mapper.mapProcessor(
+ processorNode,
+ controllerServiceProvider,
+ Collections.emptySet(),
+ new HashMap<>()
+ );
+
+ analyzeComponent(versionedProcessor);
+ }
+
+ @Override
+ public void analyzeControllerService(ControllerServiceNode
controllerServiceNode) {
+ logger.debug("Running analysis on {}", controllerServiceNode);
+
+ final NiFiRegistryFlowMapper mapper = createMapper();
+
+ VersionedControllerService versionedControllerService =
mapper.mapControllerService(
+ controllerServiceNode,
+ controllerServiceProvider,
+ Collections.emptySet(),
+ new HashMap<>()
+ );
+
+ analyzeComponent(versionedControllerService);
+ }
+
+ private void analyzeComponent(VersionedComponent component) {
+ Instant start = Instant.now();
+
+ String componentId = component.getIdentifier();
+ Set<FlowAnalysisRuleNode> flowAnalysisRules =
flowAnalysisRuleProvider.getAllFlowAnalysisRules();
+
+ Set<RuleViolation> violations = flowAnalysisRules.stream()
+ .filter(FlowAnalysisRuleNode::isEnabled)
+ .flatMap(flowAnalysisRuleNode -> {
+ String ruleId = flowAnalysisRuleNode.getIdentifier();
+
+ try {
+ Collection<ComponentAnalysisResult> analysisResults =
flowAnalysisRuleNode
+ .getFlowAnalysisRule()
+ .analyzeComponent(component,
flowAnalysisRuleNode.getFlowAnalysisRuleContext());
+
+ return analysisResults.stream()
+ .map(analysisResult -> new RuleViolation(
+
flowAnalysisRuleNode.getEnforcementPolicy(),
+ componentId,
+ componentId,
+ getDisplayName(component),
+ component.getGroupIdentifier(),
+ ruleId,
+ analysisResult.getIssueId(),
+ analysisResult.getMessage(),
+ analysisResult.getExplanation()
+ ));
+ } catch (Exception e) {
+ logger.error("FlowAnalysis error while running '{}'
against '{}'", flowAnalysisRuleNode.getName(), component, e);
+ return Stream.empty();
+ }
+ })
+ .collect(Collectors.toSet());
+
+ ruleViolationsManager.upsertComponentViolations(componentId,
violations);
+
+ Instant end = Instant.now();
+
+ long durationMs = Duration.between(start, end).toMillis();
+
+ logger.debug("Flow Analysis took {} ms", durationMs);
Review Comment:
This log should indicate the component being analyzed.
##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java:
##########
@@ -3039,6 +3098,86 @@ public Response getTemplates() {
return generateOkResponse(entity).build();
}
+ // -------------
+ // flow-analysis
+ // -------------
+
+ /**
+ * Returns flow analysis results produced by the analysis of a given
process group.
+ *
+ * @return a flowAnalysisResultEntity containing flow analysis results
produced by the analysis of the given process group
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("flow-analysis/result/{processGroupId}")
Review Comment:
```suggestion
@Path("flow-analysis/results/{processGroupId}")
```
##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowAnalysisRuleDTO.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.dto;
+
+import io.swagger.annotations.ApiModelProperty;
+
+import javax.xml.bind.annotation.XmlType;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+@XmlType(name = "flowAnalysisRule")
+public class FlowAnalysisRuleDTO extends ComponentDTO {
+ public static final String VALID = "VALID";
+ public static final String INVALID = "INVALID";
+ public static final String VALIDATING = "VALIDATING";
+
+ private String name;
+ private String type;
+ private BundleDTO bundle;
+ private String state;
+ private String comments;
+ private Boolean persistsState;
+ private Boolean restricted;
+ private Boolean deprecated;
+ private Boolean isExtensionMissing;
+ private Boolean multipleVersionsAvailable;
+ private Boolean supportsSensitiveDynamicProperties;
+
+ private String enforcementPolicy;
+
+ private Map<String, String> properties;
+ private Map<String, PropertyDescriptorDTO> descriptors;
+ private Set<String> sensitiveDynamicPropertyNames;
+
+ private String customUiUrl;
+ private String annotationData;
Review Comment:
These two properties can be removed.
##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowAnalysisRuleDAO.java:
##########
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.dao.impl;
+
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigurableComponent;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ReloadComponent;
+import org.apache.nifi.controller.FlowAnalysisRuleNode;
+import org.apache.nifi.controller.exception.ComponentLifeCycleException;
+import org.apache.nifi.controller.exception.ValidationException;
+import
org.apache.nifi.controller.flowanalysis.FlowAnalysisRuleInstantiationException;
+import org.apache.nifi.controller.flowanalysis.FlowAnalysisRuleProvider;
+import org.apache.nifi.controller.service.StandardConfigurationContext;
+import org.apache.nifi.flowanalysis.FlowAnalysisRuleState;
+import org.apache.nifi.flowanalysis.EnforcementPolicy;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.logging.LogRepository;
+import org.apache.nifi.logging.StandardLoggingContext;
+import org.apache.nifi.logging.repository.NopLogRepository;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.parameter.ParameterLookup;
+import org.apache.nifi.processor.SimpleProcessLogger;
+import org.apache.nifi.util.BundleUtils;
+import org.apache.nifi.web.NiFiCoreException;
+import org.apache.nifi.web.ResourceNotFoundException;
+import org.apache.nifi.web.api.dto.BundleDTO;
+import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
+import org.apache.nifi.web.api.dto.FlowAnalysisRuleDTO;
+import org.apache.nifi.web.dao.ComponentStateDAO;
+import org.apache.nifi.web.dao.FlowAnalysisRuleDAO;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class StandardFlowAnalysisRuleDAO extends ComponentDAO implements
FlowAnalysisRuleDAO {
+
+ private FlowAnalysisRuleProvider flowAnalysisRuleProvider;
+ private ComponentStateDAO componentStateDAO;
+ private ReloadComponent reloadComponent;
+ private FlowController flowController;
+
+ private FlowAnalysisRuleNode locateFlowAnalysisRule(final String
flowAnalysisRuleId) {
+ // get the flow analysis rule
+ final FlowAnalysisRuleNode flowAnalysisRule =
flowAnalysisRuleProvider.getFlowAnalysisRuleNode(flowAnalysisRuleId);
+
+ // ensure the flow analysis rule exists
+ if (flowAnalysisRule == null) {
+ throw new ResourceNotFoundException(String.format("Unable to
locate flow analysis rule with id '%s'.", flowAnalysisRuleId));
+ }
+
+ return flowAnalysisRule;
+ }
+
+ @Override
+ public void verifyCreate(final FlowAnalysisRuleDTO flowAnalysisRuleDTO) {
+ verifyCreate(flowAnalysisRuleProvider.getExtensionManager(),
flowAnalysisRuleDTO.getType(), flowAnalysisRuleDTO.getBundle());
+ }
+
+ @Override
+ public FlowAnalysisRuleNode createFlowAnalysisRule(final
FlowAnalysisRuleDTO flowAnalysisRuleDTO) {
+ // ensure the type is specified
+ if (flowAnalysisRuleDTO.getType() == null) {
+ throw new IllegalArgumentException("The flow analysis rule type
must be specified.");
+ }
+
+ try {
+ // create the flow analysis rule
+ final ExtensionManager extensionManager =
flowAnalysisRuleProvider.getExtensionManager();
+ final BundleCoordinate bundleCoordinate =
BundleUtils.getBundle(extensionManager, flowAnalysisRuleDTO.getType(),
flowAnalysisRuleDTO.getBundle());
+ final FlowAnalysisRuleNode flowAnalysisRule =
flowAnalysisRuleProvider.createFlowAnalysisRule(
+ flowAnalysisRuleDTO.getType(),
flowAnalysisRuleDTO.getId(), bundleCoordinate, true);
+
+ // ensure we can perform the update
+ verifyUpdate(flowAnalysisRule, flowAnalysisRuleDTO);
+
+ // perform the update
+ configureFlowAnalysisRule(flowAnalysisRule, flowAnalysisRuleDTO);
+
+ return flowAnalysisRule;
+ } catch (FlowAnalysisRuleInstantiationException rtie) {
+ throw new NiFiCoreException(rtie.getMessage(), rtie);
+ }
+ }
+
+ @Override
+ public FlowAnalysisRuleNode getFlowAnalysisRule(final String
flowAnalysisRuleId) {
+ return locateFlowAnalysisRule(flowAnalysisRuleId);
+ }
+
+ @Override
+ public boolean hasFlowAnalysisRule(final String flowAnalysisRuleId) {
+ return
flowAnalysisRuleProvider.getFlowAnalysisRuleNode(flowAnalysisRuleId) != null;
+ }
+
+ @Override
+ public Set<FlowAnalysisRuleNode> getFlowAnalysisRules() {
+ return flowAnalysisRuleProvider.getAllFlowAnalysisRules();
+ }
+
+ @Override
+ public FlowAnalysisRuleNode updateFlowAnalysisRule(final
FlowAnalysisRuleDTO flowAnalysisRuleDTO) {
+ // get the flow analysis rule
+ final FlowAnalysisRuleNode flowAnalysisRule =
locateFlowAnalysisRule(flowAnalysisRuleDTO.getId());
+
+ // ensure we can perform the update
+ verifyUpdate(flowAnalysisRule, flowAnalysisRuleDTO);
+
+ // perform the update
+ configureFlowAnalysisRule(flowAnalysisRule, flowAnalysisRuleDTO);
+
+ // attempt to change the underlying processor if an updated bundle is
specified
+ // updating the bundle must happen after configuring so that any
additional classpath resources are set first
+ updateBundle(flowAnalysisRule, flowAnalysisRuleDTO);
+
+ // configure state
+ // see if an update is necessary
+ if (isNotNull(flowAnalysisRuleDTO.getState())) {
+ final FlowAnalysisRuleState purposedState =
FlowAnalysisRuleState.valueOf(flowAnalysisRuleDTO.getState());
+
+ // only attempt an action if it is changing
+ if (!purposedState.equals(flowAnalysisRule.getState())) {
+ try {
+ // perform the appropriate action
+ switch (purposedState) {
+ case ENABLED:
+
flowAnalysisRuleProvider.enableFlowAnalysisRule(flowAnalysisRule);
+ break;
+ case DISABLED:
+
flowAnalysisRuleProvider.disableFlowAnalysisRule(flowAnalysisRule);
+ break;
+ }
+ } catch (IllegalStateException | ComponentLifeCycleException
ise) {
+ throw new NiFiCoreException(ise.getMessage(), ise);
+ } catch (NullPointerException npe) {
+ throw new NiFiCoreException("Unable to update flow
analysis rule state.", npe);
+ } catch (Exception e) {
+ throw new NiFiCoreException("Unable to update flow
analysis rule state: " + e, e);
+ }
+ }
+ }
+
+ return flowAnalysisRule;
+ }
+
+ private void updateBundle(FlowAnalysisRuleNode flowAnalysisRule,
FlowAnalysisRuleDTO flowAnalysisRuleDTO) {
+ final BundleDTO bundleDTO = flowAnalysisRuleDTO.getBundle();
+ if (bundleDTO != null) {
+ final ExtensionManager extensionManager =
flowAnalysisRuleProvider.getExtensionManager();
+ final BundleCoordinate incomingCoordinate =
BundleUtils.getBundle(extensionManager,
flowAnalysisRule.getCanonicalClassName(), bundleDTO);
+ final BundleCoordinate existingCoordinate =
flowAnalysisRule.getBundleCoordinate();
+ if
(!existingCoordinate.getCoordinate().equals(incomingCoordinate.getCoordinate()))
{
+ try {
+ // we need to use the property descriptors from the temp
component here in case we are changing from a ghost component to a real
component
+ final ConfigurableComponent tempComponent =
extensionManager.getTempComponent(flowAnalysisRule.getCanonicalClassName(),
incomingCoordinate);
+ final Set<URL> additionalUrls =
flowAnalysisRule.getAdditionalClasspathResources(tempComponent.getPropertyDescriptors());
+ reloadComponent.reload(flowAnalysisRule,
flowAnalysisRule.getCanonicalClassName(), incomingCoordinate, additionalUrls);
+ } catch (FlowAnalysisRuleInstantiationException e) {
+ throw new NiFiCoreException(String.format("Unable to
update flow analysis rule %s from %s to %s due to: %s",
+ flowAnalysisRuleDTO.getId(),
flowAnalysisRule.getBundleCoordinate().getCoordinate(),
incomingCoordinate.getCoordinate(), e.getMessage()), e);
+ }
+ }
+ }
+ }
+
+ private List<String> validateProposedConfiguration(final
FlowAnalysisRuleNode flowAnalysisRule, final FlowAnalysisRuleDTO
flowAnalysisRuleDTO) {
+ final List<String> validationErrors = new ArrayList<>();
+
+ return validationErrors;
+ }
+
+ @Override
+ public void verifyDelete(final String flowAnalysisRuleId) {
+ final FlowAnalysisRuleNode flowAnalysisRule =
locateFlowAnalysisRule(flowAnalysisRuleId);
+ flowAnalysisRule.verifyCanDelete();
+ }
+
+ @Override
+ public void verifyUpdate(final FlowAnalysisRuleDTO flowAnalysisRuleDTO) {
+ final FlowAnalysisRuleNode flowAnalysisRule =
locateFlowAnalysisRule(flowAnalysisRuleDTO.getId());
+ verifyUpdate(flowAnalysisRule, flowAnalysisRuleDTO);
+ }
+
+ private void verifyUpdate(final FlowAnalysisRuleNode flowAnalysisRule,
final FlowAnalysisRuleDTO flowAnalysisRuleDTO) {
+ // ensure the state, if specified, is valid
+ if (isNotNull(flowAnalysisRuleDTO.getState())) {
+ try {
+ final FlowAnalysisRuleState purposedState =
FlowAnalysisRuleState.valueOf(flowAnalysisRuleDTO.getState());
+
+ // only attempt an action if it is changing
+ if (!purposedState.equals(flowAnalysisRule.getState())) {
+ // perform the appropriate action
+ switch (purposedState) {
+ case ENABLED:
+ flowAnalysisRule.verifyCanEnable();
+ break;
+ case DISABLED:
+ flowAnalysisRule.verifyCanDisable();
+ break;
+ }
+ }
+ } catch (IllegalArgumentException iae) {
+ throw new IllegalArgumentException(String.format(
+ "The specified flow analysis rule state (%s) is not
valid. Valid options are 'RUNNING', 'STOPPED', and 'DISABLED'.",
Review Comment:
```suggestion
"The specified flow analysis rule state (%s) is not
valid. Valid options are 'ENABLED' or 'DISABLED'.",
```
##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java:
##########
@@ -3039,6 +3098,86 @@ public Response getTemplates() {
return generateOkResponse(entity).build();
}
+ // -------------
+ // flow-analysis
+ // -------------
+
+ /**
+ * Returns flow analysis results produced by the analysis of a given
process group.
+ *
+ * @return a flowAnalysisResultEntity containing flow analysis results
produced by the analysis of the given process group
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("flow-analysis/result/{processGroupId}")
+ @ApiOperation(
+ value = "Returns flow analysis results produced by the analysis of a
given process group",
+ response = FlowAnalysisResultEntity.class,
+ authorizations = {
+ @Authorization(value = "Read - /controller")
+ })
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the
request because it was invalid. The request should not be retried without
modification."),
+ @ApiResponse(code = 401, message = "Client could not be
authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make
this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not
be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was
not in the appropriate state to process it. Retrying the same request later may
be successful.")
+ })
+ public Response getFlowAnalysisResults(
+ @Context final HttpServletRequest httpServletRequest,
+ @ApiParam(
+ value = "The id of the process group representing (a part of) the
flow to be analyzed.",
+ required = true
+ )
+ @PathParam("processGroupId")
+ final String processGroupId
+ ) {
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.GET);
+ }
+
+ authorizeFlow();
+
+ FlowAnalysisResultEntity entity =
serviceFacade.getFlowAnalysisResult(processGroupId);
+
+ return generateOkResponse(entity).build();
+ }
+
+ /**
+ * Returns all flow analysis results currently in effect.
+ *
+ * @return a flowAnalysisRuleEntity containing all flow analysis results
currently in-effect
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("flow-analysis/result")
+ @ApiOperation(
+ value = "Returns all flow analysis results currently in effect",
+ response = FlowAnalysisResultEntity.class,
+ authorizations = {
+ @Authorization(value = "Read - /controller")
Review Comment:
```suggestion
@Authorization(value = "Read - /flow")
```
##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java:
##########
@@ -3039,6 +3098,86 @@ public Response getTemplates() {
return generateOkResponse(entity).build();
}
+ // -------------
+ // flow-analysis
+ // -------------
+
+ /**
+ * Returns flow analysis results produced by the analysis of a given
process group.
+ *
+ * @return a flowAnalysisResultEntity containing flow analysis results
produced by the analysis of the given process group
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("flow-analysis/result/{processGroupId}")
+ @ApiOperation(
+ value = "Returns flow analysis results produced by the analysis of a
given process group",
+ response = FlowAnalysisResultEntity.class,
+ authorizations = {
+ @Authorization(value = "Read - /controller")
Review Comment:
```suggestion
@Authorization(value = "Read - /flow")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]