This is an automated email from the ASF dual-hosted git repository. markap14 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new fd2de5a151 NIFI-12198 Add API and CLI commands to import reporting task snapshots (#7875) fd2de5a151 is described below commit fd2de5a1515458ccc73df3e9a85fed9d6c4c3ce1 Author: Bryan Bende <bbe...@apache.org> AuthorDate: Fri Oct 20 10:45:48 2023 -0400 NIFI-12198 Add API and CLI commands to import reporting task snapshots (#7875) * NIFI-12198 Add API and CLI commands to import reporting task snapshots --- .../VersionedReportingTaskImportRequestEntity.java | 47 +++++++ ...VersionedReportingTaskImportResponseEntity.java | 47 +++++++ .../java/org/apache/nifi/util/BundleUtils.java | 35 +++-- .../reporting/ReportingTaskProvider.java | 9 +- .../org/apache/nifi/controller/FlowController.java | 9 +- .../serialization/FlowSynchronizationUtils.java | 111 ++++++++++++++++ .../StandardVersionedReportingTaskImporter.java | 145 +++++++++++++++++++++ .../serialization/VersionedFlowSynchronizer.java | 84 ++---------- .../VersionedReportingTaskImportResult.java | 44 +++++++ .../VersionedReportingTaskImporter.java | 34 +++++ .../nifi/registry/flow/FlowRegistryUtils.java | 27 +++- .../org/apache/nifi/web/NiFiServiceFacade.java | 25 ++++ .../apache/nifi/web/StandardNiFiServiceFacade.java | 72 ++++++++++ .../apache/nifi/web/api/ControllerResource.java | 120 +++++++++++++---- .../nifi/web/controller/ControllerFacade.java | 6 + .../web/dao/impl/StandardReportingTaskDAO.java | 24 ++-- .../nifi/web/StandardNiFiServiceFacadeTest.java | 43 ++++++ .../cli/impl/client/nifi/ControllerClient.java | 5 + .../client/nifi/impl/JerseyControllerClient.java | 19 +++ .../cli/impl/command/nifi/NiFiCommandGroup.java | 2 + .../command/nifi/flow/ImportReportingTasks.java | 69 ++++++++++ .../result/nifi/ImportReportingTasksResult.java | 90 +++++++++++++ 22 files changed, 928 insertions(+), 139 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedReportingTaskImportRequestEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedReportingTaskImportRequestEntity.java new file mode 100644 index 0000000000..0b8e1dc605 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedReportingTaskImportRequestEntity.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import io.swagger.annotations.ApiModelProperty; +import org.apache.nifi.flow.VersionedReportingTaskSnapshot; + +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement(name = "versionedReportingTaskImportRequestEntity") +public class VersionedReportingTaskImportRequestEntity extends Entity { + + private VersionedReportingTaskSnapshot reportingTaskSnapshot; + private Boolean disconnectedNodeAcknowledged; + + @ApiModelProperty("The snapshot to import") + public VersionedReportingTaskSnapshot getReportingTaskSnapshot() { + return reportingTaskSnapshot; + } + + public void setReportingTaskSnapshot(VersionedReportingTaskSnapshot reportingTaskSnapshot) { + this.reportingTaskSnapshot = reportingTaskSnapshot; + } + + @ApiModelProperty("The disconnected node acknowledged flag") + public Boolean getDisconnectedNodeAcknowledged() { + return disconnectedNodeAcknowledged; + } + + public void setDisconnectedNodeAcknowledged(Boolean disconnectedNodeAcknowledged) { + this.disconnectedNodeAcknowledged = disconnectedNodeAcknowledged; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedReportingTaskImportResponseEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedReportingTaskImportResponseEntity.java new file mode 100644 index 0000000000..bf672a60c4 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedReportingTaskImportResponseEntity.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import io.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlRootElement; +import java.util.Set; + +@XmlRootElement(name = "versionedReportingTaskImportResponseEntity") +public class VersionedReportingTaskImportResponseEntity extends Entity { + + private Set<ReportingTaskEntity> reportingTasks; + private Set<ControllerServiceEntity> controllerServices; + + @ApiModelProperty("The reporting tasks created by the import") + public Set<ReportingTaskEntity> getReportingTasks() { + return reportingTasks; + } + + public void setReportingTasks(Set<ReportingTaskEntity> reportingTasks) { + this.reportingTasks = reportingTasks; + } + + @ApiModelProperty("The controller services created by the import") + public Set<ControllerServiceEntity> getControllerServices() { + return controllerServices; + } + + public void setControllerServices(Set<ControllerServiceEntity> controllerServices) { + this.controllerServices = controllerServices; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/BundleUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/BundleUtils.java index d7ad003f62..80708b2a64 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/BundleUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/BundleUtils.java @@ -18,7 +18,9 @@ package org.apache.nifi.util; import org.apache.nifi.bundle.Bundle; import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.flow.VersionedConfigurableExtension; import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flow.VersionedReportingTaskSnapshot; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarClassLoadersHolder; import org.apache.nifi.nar.PythonBundle; @@ -216,23 +218,11 @@ public final class BundleUtils { */ public static void discoverCompatibleBundles(final ExtensionManager extensionManager, final VersionedProcessGroup versionedGroup) { if (versionedGroup.getProcessors() != null) { - versionedGroup.getProcessors().forEach(processor -> { - final BundleDTO dto = createBundleDto(processor.getBundle()); - final BundleCoordinate coordinate = BundleUtils.getOptionalCompatibleBundle(extensionManager, processor.getType(), dto).orElse( - new BundleCoordinate(dto.getGroup(), dto.getArtifact(), dto.getVersion())); - processor.setBundle(createBundle(coordinate)); - }); + versionedGroup.getProcessors().forEach(processor -> discoverCompatibleBundle(extensionManager, processor)); } if (versionedGroup.getControllerServices() != null) { - versionedGroup.getControllerServices().forEach(controllerService -> { - final BundleDTO dto = createBundleDto(controllerService.getBundle()); - - final BundleCoordinate coordinate = BundleUtils.getOptionalCompatibleBundle(extensionManager, controllerService.getType(), createBundleDto(controllerService.getBundle())).orElse( - new BundleCoordinate(dto.getGroup(), dto.getArtifact(), dto.getVersion())); - - controllerService.setBundle(createBundle(coordinate)); - }); + versionedGroup.getControllerServices().forEach(controllerService -> discoverCompatibleBundle(extensionManager, controllerService)); } if (versionedGroup.getProcessGroups() != null) { @@ -240,6 +230,23 @@ public final class BundleUtils { } } + public static void discoverCompatibleBundles(final ExtensionManager extensionManager, final VersionedReportingTaskSnapshot reportingTaskSnapshot) { + if (reportingTaskSnapshot.getReportingTasks() != null) { + reportingTaskSnapshot.getReportingTasks().forEach(reportingTask -> discoverCompatibleBundle(extensionManager, reportingTask)); + } + + if (reportingTaskSnapshot.getControllerServices() != null) { + reportingTaskSnapshot.getControllerServices().forEach(controllerService -> discoverCompatibleBundle(extensionManager, controllerService)); + } + } + + public static void discoverCompatibleBundle(final ExtensionManager extensionManager, final VersionedConfigurableExtension extension) { + final BundleDTO dto = createBundleDto(extension.getBundle()); + final BundleCoordinate coordinate = BundleUtils.getOptionalCompatibleBundle(extensionManager, extension.getType(), dto).orElse( + new BundleCoordinate(dto.getGroup(), dto.getArtifact(), dto.getVersion())); + extension.setBundle(createBundle(coordinate)); + } + public static BundleCoordinate discoverCompatibleBundle(final ExtensionManager extensionManager, final String type, final org.apache.nifi.flow.Bundle bundle) { return getCompatibleBundle(extensionManager, type, createBundleDto(bundle)); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java index 4e310b7535..ea035e053c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java @@ -16,12 +16,12 @@ */ package org.apache.nifi.controller.reporting; -import java.util.Set; - import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.nar.ExtensionManager; +import java.util.Set; + /** * A ReportingTaskProvider is responsible for providing management of, and * access to, Reporting Tasks @@ -41,11 +41,8 @@ public interface ReportingTaskProvider { * being restored after a restart of the software * * @return the ReportingTaskNode that is used to manage the reporting task - * - * @throws ReportingTaskInstantiationException if unable to create the - * Reporting Task */ - ReportingTaskNode createReportingTask(String type, String id, BundleCoordinate bundleCoordinate, boolean firstTimeAdded) throws ReportingTaskInstantiationException; + ReportingTaskNode createReportingTask(String type, String id, BundleCoordinate bundleCoordinate, boolean firstTimeAdded); /** * @param identifier of node diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 7bb4eab8d5..a7ef827fa7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -18,8 +18,6 @@ package org.apache.nifi.controller; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.admin.service.AuditService; -import org.apache.nifi.flowanalysis.StandardFlowAnalyzer; -import org.apache.nifi.flowanalysis.TriggerFlowAnalysisTask; import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.authorization.Authorizer; @@ -81,7 +79,6 @@ import org.apache.nifi.controller.queue.clustered.server.ConnectionLoadBalanceSe import org.apache.nifi.controller.queue.clustered.server.LoadBalanceAuthorizer; import org.apache.nifi.controller.queue.clustered.server.LoadBalanceProtocol; import org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol; -import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; import org.apache.nifi.controller.reporting.ReportingTaskProvider; import org.apache.nifi.controller.repository.ContentRepository; import org.apache.nifi.controller.repository.CounterRepository; @@ -148,6 +145,8 @@ import org.apache.nifi.flow.Bundle; import org.apache.nifi.flow.VersionedConnection; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flowanalysis.FlowAnalysisRule; +import org.apache.nifi.flowanalysis.StandardFlowAnalyzer; +import org.apache.nifi.flowanalysis.TriggerFlowAnalysisTask; import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.groups.BundleUpdateStrategy; @@ -180,9 +179,9 @@ import org.apache.nifi.python.DisabledPythonBridge; import org.apache.nifi.python.PythonBridge; import org.apache.nifi.python.PythonBridgeInitializationContext; import org.apache.nifi.python.PythonProcessConfig; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup; import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; import org.apache.nifi.registry.flow.mapping.VersionedComponentStateLookup; -import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup; import org.apache.nifi.remote.HttpRemoteSiteListener; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RemoteResourceManager; @@ -2296,7 +2295,7 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr } @Override - public ReportingTaskNode createReportingTask(final String type, final String id, final BundleCoordinate bundleCoordinate, final boolean firstTimeAdded) throws ReportingTaskInstantiationException { + public ReportingTaskNode createReportingTask(final String type, final String id, final BundleCoordinate bundleCoordinate, final boolean firstTimeAdded) { return flowManager.createReportingTask(type, id, bundleCoordinate, firstTimeAdded); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizationUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizationUtils.java new file mode 100644 index 0000000000..a090788818 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizationUtils.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.serialization; + +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ComponentNode; +import org.apache.nifi.encrypt.EncryptionException; +import org.apache.nifi.encrypt.PropertyEncryptor; +import org.apache.nifi.flow.Bundle; +import org.apache.nifi.flow.VersionedConfigurableExtension; +import org.apache.nifi.flow.VersionedPropertyDescriptor; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.util.BundleUtils; +import org.apache.nifi.web.api.dto.BundleDTO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class FlowSynchronizationUtils { + + private static final Logger logger = LoggerFactory.getLogger(FlowSynchronizationUtils.class); + + private FlowSynchronizationUtils() { + } + + static BundleCoordinate createBundleCoordinate(final ExtensionManager extensionManager, final Bundle bundle, final String componentType) { + BundleCoordinate coordinate; + try { + final BundleDTO bundleDto = new BundleDTO(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion()); + coordinate = BundleUtils.getCompatibleBundle(extensionManager, componentType, bundleDto); + } catch (final IllegalStateException e) { + coordinate = new BundleCoordinate(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion()); + } + + return coordinate; + } + + static Set<String> getSensitiveDynamicPropertyNames(final ComponentNode componentNode, final VersionedConfigurableExtension extension) { + final Set<String> versionedSensitivePropertyNames = new LinkedHashSet<>(); + + // Get Sensitive Property Names based on encrypted values including both supported and dynamic properties + extension.getProperties() + .entrySet() + .stream() + .filter(entry -> isValueSensitive(entry.getValue())) + .map(Map.Entry::getKey) + .forEach(versionedSensitivePropertyNames::add); + + // Get Sensitive Property Names based on supported and dynamic property descriptors + extension.getPropertyDescriptors() + .values() + .stream() + .filter(VersionedPropertyDescriptor::isSensitive) + .map(VersionedPropertyDescriptor::getName) + .forEach(versionedSensitivePropertyNames::add); + + // Filter combined Sensitive Property Names based on Component Property Descriptor status + return versionedSensitivePropertyNames.stream() + .map(componentNode::getPropertyDescriptor) + .filter(PropertyDescriptor::isDynamic) + .map(PropertyDescriptor::getName) + .collect(Collectors.toSet()); + } + + static boolean isValueSensitive(final String value) { + return value != null && value.startsWith(FlowSerializer.ENC_PREFIX) && value.endsWith(FlowSerializer.ENC_SUFFIX); + } + + static Map<String, String> decryptProperties(final Map<String, String> encrypted, final PropertyEncryptor encryptor) { + final Map<String, String> decrypted = new HashMap<>(encrypted.size()); + encrypted.forEach((key, value) -> decrypted.put(key, decrypt(value, encryptor))); + return decrypted; + } + + static String decrypt(final String value, final PropertyEncryptor encryptor) { + if (isValueSensitive(value)) { + try { + return encryptor.decrypt(value.substring(FlowSerializer.ENC_PREFIX.length(), value.length() - FlowSerializer.ENC_SUFFIX.length())); + } catch (EncryptionException e) { + final String moreDescriptiveMessage = "There was a problem decrypting a sensitive flow configuration value. " + + "Check that the nifi.sensitive.props.key value in nifi.properties matches the value used to encrypt the flow.json.gz file"; + logger.error(moreDescriptiveMessage, e); + throw new EncryptionException(moreDescriptiveMessage, e); + } + } else { + return value; + } + } + + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardVersionedReportingTaskImporter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardVersionedReportingTaskImporter.java new file mode 100644 index 0000000000..d48d77d856 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardVersionedReportingTaskImporter.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.serialization; + +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.ReportingTaskNode; +import org.apache.nifi.controller.flow.FlowManager; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.encrypt.PropertyEncryptor; +import org.apache.nifi.flow.VersionedControllerService; +import org.apache.nifi.flow.VersionedReportingTask; +import org.apache.nifi.flow.VersionedReportingTaskSnapshot; +import org.apache.nifi.logging.LogLevel; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.scheduling.SchedulingStrategy; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.nifi.controller.serialization.FlowSynchronizationUtils.createBundleCoordinate; +import static org.apache.nifi.controller.serialization.FlowSynchronizationUtils.decryptProperties; +import static org.apache.nifi.controller.serialization.FlowSynchronizationUtils.getSensitiveDynamicPropertyNames; + +public class StandardVersionedReportingTaskImporter implements VersionedReportingTaskImporter { + + final FlowController flowController; + + public StandardVersionedReportingTaskImporter(final FlowController flowController) { + this.flowController = flowController; + } + + @Override + public VersionedReportingTaskImportResult importSnapshot(final VersionedReportingTaskSnapshot reportingTaskSnapshot) { + final List<VersionedControllerService> controllerServices = Optional.ofNullable(reportingTaskSnapshot.getControllerServices()).orElse(Collections.emptyList()); + final Set<ControllerServiceNode> controllerServiceNodes = importControllerServices(controllerServices); + + final List<VersionedReportingTask> reportingTasks = Optional.ofNullable(reportingTaskSnapshot.getReportingTasks()).orElse(Collections.emptyList()); + final Set<ReportingTaskNode> reportingTaskNodes = importReportingTasks(reportingTasks); + + return new VersionedReportingTaskImportResult(reportingTaskNodes, controllerServiceNodes); + } + + private Set<ReportingTaskNode> importReportingTasks(final List<VersionedReportingTask> reportingTasks) { + final Set<ReportingTaskNode> taskNodes = new HashSet<>(); + for (final VersionedReportingTask reportingTask : reportingTasks) { + final ReportingTaskNode taskNode = addReportingTask(reportingTask); + taskNodes.add(taskNode); + } + return taskNodes; + } + + private ReportingTaskNode addReportingTask(final VersionedReportingTask reportingTask) { + final ExtensionManager extensionManager = flowController.getExtensionManager(); + final BundleCoordinate coordinate = createBundleCoordinate(extensionManager, reportingTask.getBundle(), reportingTask.getType()); + + final ReportingTaskNode taskNode = flowController.createReportingTask(reportingTask.getType(), reportingTask.getInstanceIdentifier(), coordinate, false); + taskNode.setName(reportingTask.getName()); + taskNode.setComments(reportingTask.getComments()); + taskNode.setSchedulingPeriod(reportingTask.getSchedulingPeriod()); + taskNode.setSchedulingStrategy(SchedulingStrategy.valueOf(reportingTask.getSchedulingStrategy())); + taskNode.setAnnotationData(reportingTask.getAnnotationData()); + + final Set<String> sensitiveDynamicPropertyNames = getSensitiveDynamicPropertyNames(taskNode, reportingTask); + final Map<String, String> decryptedProperties = decryptProperties(reportingTask.getProperties(), flowController.getEncryptor()); + taskNode.setProperties(decryptedProperties, false, sensitiveDynamicPropertyNames); + return taskNode; + } + + private Set<ControllerServiceNode> importControllerServices(final List<VersionedControllerService> controllerServices) { + final Set<ControllerServiceNode> controllerServicesAdded = new HashSet<>(); + for (final VersionedControllerService versionedControllerService : controllerServices) { + final ControllerServiceNode serviceNode = flowController.getFlowManager().getRootControllerService(versionedControllerService.getInstanceIdentifier()); + if (serviceNode == null) { + final ControllerServiceNode added = addRootControllerService(versionedControllerService); + controllerServicesAdded.add(added); + } + } + + for (final VersionedControllerService versionedControllerService : controllerServices) { + final ControllerServiceNode serviceNode = flowController.getFlowManager().getRootControllerService(versionedControllerService.getInstanceIdentifier()); + if (controllerServicesAdded.contains(serviceNode)) { + updateRootControllerService(serviceNode, versionedControllerService, flowController.getEncryptor()); + } + } + + return controllerServicesAdded; + } + + private ControllerServiceNode addRootControllerService(final VersionedControllerService controllerService) { + final FlowManager flowManager = flowController.getFlowManager(); + final ExtensionManager extensionManager = flowController.getExtensionManager(); + + final BundleCoordinate bundleCoordinate = createBundleCoordinate(extensionManager, controllerService.getBundle(), controllerService.getType()); + final ControllerServiceNode serviceNode = flowManager.createControllerService(controllerService.getType(), controllerService.getInstanceIdentifier(), + bundleCoordinate, Collections.emptySet(), true, true, null); + serviceNode.setVersionedComponentId(controllerService.getIdentifier()); + flowManager.addRootControllerService(serviceNode); + + return serviceNode; + } + + private void updateRootControllerService(final ControllerServiceNode serviceNode, final VersionedControllerService controllerService, + final PropertyEncryptor encryptor) { + serviceNode.pauseValidationTrigger(); + try { + serviceNode.setName(controllerService.getName()); + serviceNode.setAnnotationData(controllerService.getAnnotationData()); + serviceNode.setComments(controllerService.getComments()); + + if (controllerService.getBulletinLevel() != null) { + serviceNode.setBulletinLevel(LogLevel.valueOf(controllerService.getBulletinLevel())); + } else { + // this situation exists for backward compatibility with nifi 1.16 and earlier where controller services do not have bulletinLevels set in flow.xml/flow.json + // and bulletinLevels are at the WARN level by default + serviceNode.setBulletinLevel(LogLevel.WARN); + } + + final Set<String> sensitiveDynamicPropertyNames = getSensitiveDynamicPropertyNames(serviceNode, controllerService); + final Map<String, String> decryptedProperties = decryptProperties(controllerService.getProperties(), encryptor); + serviceNode.setProperties(decryptedProperties, false, sensitiveDynamicPropertyNames); + } finally { + serviceNode.resumeValidationTrigger(); + } + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java index 5db7190bf1..18341d5317 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java @@ -24,7 +24,6 @@ import org.apache.nifi.authorization.ManagedAuthorizer; import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.cluster.protocol.DataFlow; import org.apache.nifi.cluster.protocol.StandardDataFlow; -import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Position; import org.apache.nifi.controller.AbstractComponentNode; @@ -51,13 +50,11 @@ import org.apache.nifi.controller.inheritance.MissingComponentsCheck; import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.StandardConfigurationContext; -import org.apache.nifi.encrypt.EncryptionException; import org.apache.nifi.encrypt.PropertyEncryptor; import org.apache.nifi.flow.Bundle; import org.apache.nifi.flow.ExecutionEngine; import org.apache.nifi.flow.ScheduledState; import org.apache.nifi.flow.VersionedComponent; -import org.apache.nifi.flow.VersionedConfigurableExtension; import org.apache.nifi.flow.VersionedControllerService; import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedFlowAnalysisRule; @@ -67,7 +64,6 @@ import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.flow.VersionedParameterProvider; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedProcessor; -import org.apache.nifi.flow.VersionedPropertyDescriptor; import org.apache.nifi.flow.VersionedReportingTask; import org.apache.nifi.groups.AbstractComponentScheduler; import org.apache.nifi.groups.BundleUpdateStrategy; @@ -117,7 +113,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -128,6 +123,11 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; +import static org.apache.nifi.controller.serialization.FlowSynchronizationUtils.createBundleCoordinate; +import static org.apache.nifi.controller.serialization.FlowSynchronizationUtils.decrypt; +import static org.apache.nifi.controller.serialization.FlowSynchronizationUtils.decryptProperties; +import static org.apache.nifi.controller.serialization.FlowSynchronizationUtils.getSensitiveDynamicPropertyNames; + public class VersionedFlowSynchronizer implements FlowSynchronizer { private static final Logger logger = LoggerFactory.getLogger(VersionedFlowSynchronizer.class); /** @@ -575,7 +575,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer { return; } - final BundleCoordinate coordinate = createBundleCoordinate(versionedFlowRegistryClient.getBundle(), versionedFlowRegistryClient.getType()); + final BundleCoordinate coordinate = createBundleCoordinate(extensionManager, versionedFlowRegistryClient.getBundle(), versionedFlowRegistryClient.getType()); final FlowRegistryClientNode flowRegistryClient = flowController.getFlowManager().createFlowRegistryClient( versionedFlowRegistryClient.getType(), versionedFlowRegistryClient.getIdentifier(), coordinate, Collections.emptySet() , false, true, null); @@ -638,7 +638,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer { } private void addReportingTask(final FlowController controller, final VersionedReportingTask reportingTask) throws ReportingTaskInstantiationException { - final BundleCoordinate coordinate = createBundleCoordinate(reportingTask.getBundle(), reportingTask.getType()); + final BundleCoordinate coordinate = createBundleCoordinate(extensionManager, reportingTask.getBundle(), reportingTask.getType()); final ReportingTaskNode taskNode = controller.createReportingTask(reportingTask.getType(), reportingTask.getInstanceIdentifier(), coordinate, false); updateReportingTask(taskNode, reportingTask, controller); @@ -706,7 +706,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer { } private void addFlowAnalysisRule(final FlowController controller, final VersionedFlowAnalysisRule flowAnalysisRule) throws FlowAnalysisRuleInstantiationException { - final BundleCoordinate coordinate = createBundleCoordinate(flowAnalysisRule.getBundle(), flowAnalysisRule.getType()); + final BundleCoordinate coordinate = createBundleCoordinate(extensionManager, flowAnalysisRule.getBundle(), flowAnalysisRule.getType()); final FlowAnalysisRuleNode ruleNode = controller.createFlowAnalysisRule(flowAnalysisRule.getType(), flowAnalysisRule.getInstanceIdentifier(), coordinate, false); updateFlowAnalysisRule(ruleNode, flowAnalysisRule, controller); @@ -752,7 +752,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer { } private void addParameterProvider(final FlowController controller, final VersionedParameterProvider parameterProvider, final PropertyEncryptor encryptor) { - final BundleCoordinate coordinate = createBundleCoordinate(parameterProvider.getBundle(), parameterProvider.getType()); + final BundleCoordinate coordinate = createBundleCoordinate(extensionManager, parameterProvider.getBundle(), parameterProvider.getType()); final ParameterProviderNode parameterProviderNode = controller.getFlowManager() .createParameterProvider(parameterProvider.getType(), parameterProvider.getInstanceIdentifier(), coordinate, false); @@ -987,7 +987,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer { } private ControllerServiceNode addRootControllerService(final FlowController controller, final VersionedControllerService versionedControllerService) { - final BundleCoordinate bundleCoordinate = createBundleCoordinate(versionedControllerService.getBundle(), versionedControllerService.getType()); + final BundleCoordinate bundleCoordinate = createBundleCoordinate(extensionManager, versionedControllerService.getBundle(), versionedControllerService.getType()); final ControllerServiceNode serviceNode = controller.getFlowManager().createControllerService(versionedControllerService.getType(), versionedControllerService.getInstanceIdentifier(), bundleCoordinate,Collections.emptySet(), true, true, null); @@ -1019,70 +1019,6 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer { } } - private Set<String> getSensitiveDynamicPropertyNames(final ComponentNode componentNode, final VersionedConfigurableExtension extension) { - final Set<String> versionedSensitivePropertyNames = new LinkedHashSet<>(); - - // Get Sensitive Property Names based on encrypted values including both supported and dynamic properties - extension.getProperties() - .entrySet() - .stream() - .filter(entry -> isValueSensitive(entry.getValue())) - .map(Map.Entry::getKey) - .forEach(versionedSensitivePropertyNames::add); - - // Get Sensitive Property Names based on supported and dynamic property descriptors - extension.getPropertyDescriptors() - .values() - .stream() - .filter(VersionedPropertyDescriptor::isSensitive) - .map(VersionedPropertyDescriptor::getName) - .forEach(versionedSensitivePropertyNames::add); - - // Filter combined Sensitive Property Names based on Component Property Descriptor status - return versionedSensitivePropertyNames.stream() - .map(componentNode::getPropertyDescriptor) - .filter(PropertyDescriptor::isDynamic) - .map(PropertyDescriptor::getName) - .collect(Collectors.toSet()); - } - - private Map<String, String> decryptProperties(final Map<String, String> encrypted, final PropertyEncryptor encryptor) { - final Map<String, String> decrypted = new HashMap<>(encrypted.size()); - encrypted.forEach((key, value) -> decrypted.put(key, decrypt(value, encryptor))); - return decrypted; - } - - private String decrypt(final String value, final PropertyEncryptor encryptor) { - if (isValueSensitive(value)) { - try { - return encryptor.decrypt(value.substring(FlowSerializer.ENC_PREFIX.length(), value.length() - FlowSerializer.ENC_SUFFIX.length())); - } catch (EncryptionException e) { - final String moreDescriptiveMessage = "There was a problem decrypting a sensitive flow configuration value. " + - "Check that the nifi.sensitive.props.key value in nifi.properties matches the value used to encrypt the flow.json.gz file"; - logger.error(moreDescriptiveMessage, e); - throw new EncryptionException(moreDescriptiveMessage, e); - } - } else { - return value; - } - } - - private boolean isValueSensitive(final String value) { - return value != null && value.startsWith(FlowSerializer.ENC_PREFIX) && value.endsWith(FlowSerializer.ENC_SUFFIX); - } - - private BundleCoordinate createBundleCoordinate(final Bundle bundle, final String componentType) { - BundleCoordinate coordinate; - try { - final BundleDTO bundleDto = new BundleDTO(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion()); - coordinate = BundleUtils.getCompatibleBundle(extensionManager, componentType, bundleDto); - } catch (final IllegalStateException e) { - coordinate = new BundleCoordinate(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion()); - } - - return coordinate; - } - private void inheritAuthorizations(final DataFlow existingFlow, final DataFlow proposedFlow, final FlowController controller) { final Authorizer authorizer = controller.getAuthorizer(); if (!(authorizer instanceof ManagedAuthorizer)) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedReportingTaskImportResult.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedReportingTaskImportResult.java new file mode 100644 index 0000000000..7c3fa40247 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedReportingTaskImportResult.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.serialization; + +import org.apache.nifi.controller.ReportingTaskNode; +import org.apache.nifi.controller.service.ControllerServiceNode; + +import java.util.Collections; +import java.util.Set; + +public class VersionedReportingTaskImportResult { + + private final Set<ReportingTaskNode> reportingTaskNodes; + private final Set<ControllerServiceNode> controllerServiceNodes; + + public VersionedReportingTaskImportResult(final Set<ReportingTaskNode> reportingTaskNodes, + final Set<ControllerServiceNode> controllerServiceNodes) { + this.reportingTaskNodes = Collections.unmodifiableSet(reportingTaskNodes == null ? Collections.emptySet() : reportingTaskNodes); + this.controllerServiceNodes = Collections.unmodifiableSet(controllerServiceNodes == null ? Collections.emptySet() : controllerServiceNodes); + } + + public Set<ReportingTaskNode> getReportingTaskNodes() { + return reportingTaskNodes; + } + + public Set<ControllerServiceNode> getControllerServiceNodes() { + return controllerServiceNodes; + } +} + diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedReportingTaskImporter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedReportingTaskImporter.java new file mode 100644 index 0000000000..0587d732f0 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedReportingTaskImporter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.serialization; + +import org.apache.nifi.flow.VersionedReportingTaskSnapshot; + +/** + * Encapsulates logic for importing reporting tasks and controller services. + */ +public interface VersionedReportingTaskImporter { + + /** + * Imports the given snapshot. A new instance of each reporting task and controller service will be created. + * + * @param reportingTaskSnapshot the snapshot + * @return the result of the import containing any create reporting tasks and controller services + */ + VersionedReportingTaskImportResult importSnapshot(VersionedReportingTaskSnapshot reportingTaskSnapshot); + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryUtils.java index 0021f5755e..c64cb84c9c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryUtils.java @@ -22,20 +22,33 @@ import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.flow.Bundle; import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flow.VersionedReportingTaskSnapshot; import org.apache.nifi.util.Tuple; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.api.dto.BundleDTO; +import java.util.Collections; import java.util.HashSet; +import java.util.Optional; import java.util.Set; public class FlowRegistryUtils { public static Set<ConfigurableComponent> getRestrictedComponents(final VersionedProcessGroup group, final NiFiServiceFacade serviceFacade) { - final Set<ConfigurableComponent> restrictedComponents = new HashSet<>(); - final Set<Tuple<String, BundleCoordinate>> componentTypes = new HashSet<>(); populateComponentTypes(group, componentTypes); + return getRestrictedComponents(serviceFacade, componentTypes); + } + + public static Set<ConfigurableComponent> getRestrictedComponents(final VersionedReportingTaskSnapshot reportingTaskSnapshot, final NiFiServiceFacade serviceFacade) { + final Set<Tuple<String, BundleCoordinate>> componentTypes = new HashSet<>(); + populateComponentTypes(reportingTaskSnapshot, componentTypes); + return getRestrictedComponents(serviceFacade, componentTypes); + + } + + private static Set<ConfigurableComponent> getRestrictedComponents(NiFiServiceFacade serviceFacade, Set<Tuple<String, BundleCoordinate>> componentTypes) { + final Set<ConfigurableComponent> restrictedComponents = new HashSet<>(); for (final Tuple<String, BundleCoordinate> tuple : componentTypes) { final ConfigurableComponent component = serviceFacade.getTempComponent(tuple.getKey(), tuple.getValue()); @@ -66,6 +79,16 @@ public class FlowRegistryUtils { } } + private static void populateComponentTypes(final VersionedReportingTaskSnapshot reportingTaskSnapshot, final Set<Tuple<String, BundleCoordinate>> componentTypes) { + Optional.ofNullable(reportingTaskSnapshot.getReportingTasks()).orElse(Collections.emptyList()).stream() + .map(versionedReportingTask -> new Tuple<>(versionedReportingTask.getType(), createBundleCoordinate(versionedReportingTask.getBundle()))) + .forEach(componentTypes::add); + + Optional.ofNullable(reportingTaskSnapshot.getControllerServices()).orElse(Collections.emptyList()).stream() + .map(versionedSvc -> new Tuple<>(versionedSvc.getType(), createBundleCoordinate(versionedSvc.getBundle()))) + .forEach(componentTypes::add); + } + public static BundleCoordinate createBundleCoordinate(final Bundle bundle) { return new BundleCoordinate(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 4904fdef4d..5dfa1a15af 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -137,6 +137,7 @@ import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity; import org.apache.nifi.web.api.entity.VersionControlInformationEntity; import org.apache.nifi.web.api.entity.VersionedFlowEntity; import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity; +import org.apache.nifi.web.api.entity.VersionedReportingTaskImportResponseEntity; import org.apache.nifi.web.api.request.FlowMetricsRegistry; import java.util.Collection; @@ -145,6 +146,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.function.Supplier; /** * Defines the NiFiServiceFacade interface. @@ -333,6 +335,22 @@ public interface NiFiServiceFacade { */ VersionedReportingTaskSnapshot getVersionedReportingTaskSnapshot(); + /** + * Generates unique identifiers for all components in the snapshot so that multiple imports of the same snapshot + * create multiple components. + * + * @param reportingTaskSnapshot the snapshot + */ + void generateIdentifiersForImport(VersionedReportingTaskSnapshot reportingTaskSnapshot, Supplier<String> idGenerator); + + /** + * Imports the reporting tasks and controller services in the given snapshot. + * + * @param reportingTaskSnapshot the snapshot to import + * @return the response entity + */ + VersionedReportingTaskImportResponseEntity importReportingTasks(VersionedReportingTaskSnapshot reportingTaskSnapshot); + /** * Gets the controller level bulletins. * @@ -2596,6 +2614,13 @@ public interface NiFiServiceFacade { */ void discoverCompatibleBundles(VersionedProcessGroup versionedGroup); + /** + * Discovers the compatible bundle details for the components in the specified snapshot and updates the snapshot to reflect the appropriate bundles. + * + * @param reportingTaskSnapshot the snapshot + */ + void discoverCompatibleBundles(VersionedReportingTaskSnapshot reportingTaskSnapshot); + /** * For any Controller Service that is found in the given Versioned Process Group, if that Controller Service is not itself included in the Versioned Process Groups, * attempts to find an existing Controller Service that matches the definition. If any is found, the component within the Versioned Process Group is updated to point diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 774265236f..6ac784df7f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -91,6 +91,8 @@ import org.apache.nifi.controller.leader.election.LeaderElectionManager; import org.apache.nifi.controller.repository.FlowFileEvent; import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.controller.repository.claim.ContentDirection; +import org.apache.nifi.controller.serialization.VersionedReportingTaskImportResult; +import org.apache.nifi.controller.serialization.VersionedReportingTaskImporter; import org.apache.nifi.controller.serialization.VersionedReportingTaskSnapshotMapper; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; @@ -109,6 +111,7 @@ import org.apache.nifi.flow.ExecutionEngine; import org.apache.nifi.flow.ExternalControllerServiceReference; import org.apache.nifi.flow.ParameterProviderReference; import org.apache.nifi.flow.VersionedComponent; +import org.apache.nifi.flow.VersionedConfigurableExtension; import org.apache.nifi.flow.VersionedConnection; import org.apache.nifi.flow.VersionedControllerService; import org.apache.nifi.flow.VersionedExternalFlow; @@ -116,6 +119,8 @@ import org.apache.nifi.flow.VersionedExternalFlowMetadata; import org.apache.nifi.flow.VersionedFlowCoordinates; import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flow.VersionedPropertyDescriptor; +import org.apache.nifi.flow.VersionedReportingTask; import org.apache.nifi.flow.VersionedReportingTaskSnapshot; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroupCounts; @@ -332,6 +337,7 @@ import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity; import org.apache.nifi.web.api.entity.VersionControlInformationEntity; import org.apache.nifi.web.api.entity.VersionedFlowEntity; import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity; +import org.apache.nifi.web.api.entity.VersionedReportingTaskImportResponseEntity; import org.apache.nifi.web.api.request.FlowMetricsRegistry; import org.apache.nifi.web.controller.ControllerFacade; import org.apache.nifi.web.dao.AccessPolicyDAO; @@ -3983,6 +3989,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { BundleUtils.discoverCompatibleBundles(controllerFacade.getExtensionManager(), versionedGroup); } + @Override + public void discoverCompatibleBundles(final VersionedReportingTaskSnapshot reportingTaskSnapshot) { + BundleUtils.discoverCompatibleBundles(controllerFacade.getExtensionManager(), reportingTaskSnapshot); + } + @Override public void resolveParameterProviders(final RegisteredFlowSnapshot versionedFlowSnapshot, final NiFiUser user) { final Map<String, ParameterProviderReference> parameterProviderReferences = versionedFlowSnapshot.getParameterProviders(); @@ -4200,6 +4211,67 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { }); } + @Override + public void generateIdentifiersForImport(final VersionedReportingTaskSnapshot reportingTaskSnapshot, final Supplier<String> idGenerator) { + final List<VersionedReportingTask> reportingTasks = Optional.ofNullable(reportingTaskSnapshot.getReportingTasks()).orElse(Collections.emptyList()); + final List<VersionedControllerService> controllerServices = Optional.ofNullable(reportingTaskSnapshot.getControllerServices()).orElse(Collections.emptyList()); + + // First generate all new ids and instance ids for each component, maintaining a mapping from old id to new instance id, the instance id + // will be used to create the component, so we want to update any CS references to use the instance id + final Map<String, String> oldIdToNewInstanceIdMap = new HashMap<>(); + controllerServices.forEach(controllerService -> generateIdentifiersForImport(controllerService, idGenerator, oldIdToNewInstanceIdMap)); + reportingTasks.forEach(reportingTask -> generateIdentifiersForImport(reportingTask, idGenerator, oldIdToNewInstanceIdMap)); + + // Now go back through all components and update any property values that referenced old CS ids + controllerServices.forEach(controllerService -> updateControllerServiceReferences(controllerService, oldIdToNewInstanceIdMap)); + reportingTasks.forEach(reportingTask -> updateControllerServiceReferences(reportingTask, oldIdToNewInstanceIdMap)); + } + + private void generateIdentifiersForImport(final VersionedConfigurableExtension extension, final Supplier<String> idGenerator, + final Map<String, String> oldIdToNewIdMap) { + final String identifier = idGenerator.get(); + final String instanceIdentifier = idGenerator.get(); + oldIdToNewIdMap.put(extension.getIdentifier(), instanceIdentifier); + extension.setIdentifier(identifier); + extension.setInstanceIdentifier(instanceIdentifier); + } + + private void updateControllerServiceReferences(final VersionedConfigurableExtension extension, final Map<String, String> oldIdToNewIdMap) { + final Map<String, String> propertyValues = Optional.ofNullable(extension.getProperties()).orElse(Collections.emptyMap()); + final Map<String, VersionedPropertyDescriptor> propertyDescriptors = Optional.ofNullable(extension.getPropertyDescriptors()).orElse(Collections.emptyMap()); + + propertyDescriptors.forEach((propName, propDescriptor) -> { + if (propDescriptor.getIdentifiesControllerService()) { + final String oldServiceId = propertyValues.get(propName); + if (oldServiceId != null) { + final String newServiceId = oldIdToNewIdMap.get(oldServiceId); + propertyValues.put(propName, newServiceId); + } + } + }); + } + + @Override + public VersionedReportingTaskImportResponseEntity importReportingTasks(final VersionedReportingTaskSnapshot reportingTaskSnapshot) { + final VersionedReportingTaskImporter reportingTaskImporter = controllerFacade.createReportingTaskImporter(); + final VersionedReportingTaskImportResult importResult = reportingTaskImporter.importSnapshot(reportingTaskSnapshot); + + controllerFacade.save(); + + final Set<ReportingTaskEntity> reportingTaskEntities = importResult.getReportingTaskNodes().stream() + .map(this::createReportingTaskEntity) + .collect(Collectors.toSet()); + + final Set<ControllerServiceEntity> controllerServiceEntities = importResult.getControllerServiceNodes().stream() + .map(serviceNode -> createControllerServiceEntity(serviceNode, false)) + .collect(Collectors.toSet()); + + final VersionedReportingTaskImportResponseEntity importResponseEntity = new VersionedReportingTaskImportResponseEntity(); + importResponseEntity.setReportingTasks(reportingTaskEntities); + importResponseEntity.setControllerServices(controllerServiceEntities); + return importResponseEntity; + } + @Override public ControllerBulletinsEntity getControllerBulletins() { final NiFiUser user = NiFiUserUtils.getNiFiUser(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java index bf888660b1..73846c8e9c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java @@ -24,30 +24,6 @@ import io.swagger.annotations.ApiResponses; import io.swagger.annotations.Authorization; import io.swagger.annotations.SwaggerDefinition; import io.swagger.annotations.Tag; -import java.net.URI; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.AuthorizeControllerServiceReference; import org.apache.nifi.authorization.Authorizer; @@ -56,7 +32,10 @@ import org.apache.nifi.authorization.RequestAction; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; +import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.controller.FlowController; +import org.apache.nifi.flow.VersionedReportingTaskSnapshot; +import org.apache.nifi.registry.flow.FlowRegistryUtils; import org.apache.nifi.web.IllegalClusterResourceRequestException; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.ResourceNotFoundException; @@ -101,12 +80,39 @@ import org.apache.nifi.web.api.entity.ParameterProviderEntity; import org.apache.nifi.web.api.entity.PropertyDescriptorEntity; import org.apache.nifi.web.api.entity.ReportingTaskEntity; import org.apache.nifi.web.api.entity.VerifyConfigRequestEntity; +import org.apache.nifi.web.api.entity.VersionedReportingTaskImportRequestEntity; +import org.apache.nifi.web.api.entity.VersionedReportingTaskImportResponseEntity; import org.apache.nifi.web.api.request.ClientIdParameter; import org.apache.nifi.web.api.request.DateTimeParameter; import org.apache.nifi.web.api.request.LongParameter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.net.URI; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + /** * RESTful endpoint for managing a Flow Controller. */ @@ -449,6 +455,72 @@ public class ControllerResource extends ApplicationResource { ); } + /** + * Imports a reporting task snapshot. + */ + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @Path("reporting-tasks/import") + @ApiOperation( + value = "Imports a reporting task snapshot", + response = VersionedReportingTaskImportResponseEntity.class, + authorizations = { + @Authorization(value = "Write - /controller") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response importReportingTaskSnapshot( + @Context final HttpServletRequest httpServletRequest, + @ApiParam( + value = "The import request containing the reporting task snapshot to import.", + required = true + ) final VersionedReportingTaskImportRequestEntity importRequestEntity) { + + if (importRequestEntity == null || importRequestEntity.getReportingTaskSnapshot() == null) { + throw new IllegalArgumentException("Reporting task snapshot is required"); + } + + final VersionedReportingTaskSnapshot requestSnapshot = importRequestEntity.getReportingTaskSnapshot(); + serviceFacade.discoverCompatibleBundles(requestSnapshot); + serviceFacade.generateIdentifiersForImport(requestSnapshot, () -> generateUuid()); + + if (isReplicateRequest()) { + return replicate(HttpMethod.POST, importRequestEntity); + } else if (isDisconnectedFromCluster()) { + verifyDisconnectedNodeModification(importRequestEntity.getDisconnectedNodeAcknowledged()); + } + + return withWriteLock( + serviceFacade, + importRequestEntity, + lookup -> { + authorizeController(RequestAction.WRITE); + + final Set<ConfigurableComponent> restrictedComponents = FlowRegistryUtils.getRestrictedComponents(requestSnapshot, serviceFacade); + restrictedComponents.forEach(restrictedComponent -> { + final ComponentAuthorizable restrictedComponentAuthorizable = lookup.getConfigurableComponent(restrictedComponent); + authorizeRestrictions(authorizer, restrictedComponentAuthorizable); + }); + }, + () -> { + // Nothing to verify + }, + (importRequest) -> { + final VersionedReportingTaskSnapshot snapshot = importRequest.getReportingTaskSnapshot(); + final VersionedReportingTaskImportResponseEntity responseEntity = serviceFacade.importReportingTasks(snapshot); + return generateOkResponse(responseEntity).build(); + } + ); + } + // ------------------- // flow-analysis-rules // ------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index 1b1dc3e1b8..754b38cb04 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -51,6 +51,8 @@ import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.repository.ContentNotFoundException; import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.controller.repository.claim.ContentDirection; +import org.apache.nifi.controller.serialization.StandardVersionedReportingTaskImporter; +import org.apache.nifi.controller.serialization.VersionedReportingTaskImporter; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.ControllerServiceResolver; @@ -1760,6 +1762,10 @@ public class ControllerFacade implements Authorizable { return flowController.getEventAccess().getFlowFileRepositoryStorageUsage(); } + public VersionedReportingTaskImporter createReportingTaskImporter() { + return new StandardVersionedReportingTaskImporter(flowController); + } + /* * setters */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java index f3ff093cb6..1de941b52e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java @@ -93,23 +93,19 @@ public class StandardReportingTaskDAO extends ComponentDAO implements ReportingT throw new IllegalArgumentException("The reporting task type must be specified."); } - try { - // create the reporting task - final ExtensionManager extensionManager = reportingTaskProvider.getExtensionManager(); - final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(extensionManager, reportingTaskDTO.getType(), reportingTaskDTO.getBundle()); - final ReportingTaskNode reportingTask = reportingTaskProvider.createReportingTask( - reportingTaskDTO.getType(), reportingTaskDTO.getId(), bundleCoordinate, true); + // create the reporting task + final ExtensionManager extensionManager = reportingTaskProvider.getExtensionManager(); + final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(extensionManager, reportingTaskDTO.getType(), reportingTaskDTO.getBundle()); + final ReportingTaskNode reportingTask = reportingTaskProvider.createReportingTask( + reportingTaskDTO.getType(), reportingTaskDTO.getId(), bundleCoordinate, true); - // ensure we can perform the update - verifyUpdate(reportingTask, reportingTaskDTO); + // ensure we can perform the update + verifyUpdate(reportingTask, reportingTaskDTO); - // perform the update - configureReportingTask(reportingTask, reportingTaskDTO); + // perform the update + configureReportingTask(reportingTask, reportingTaskDTO); - return reportingTask; - } catch (ReportingTaskInstantiationException rtie) { - throw new NiFiCoreException(rtie.getMessage(), rtie); - } + return reportingTask; } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java index 07ea707b08..01dd931258 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java @@ -44,6 +44,9 @@ import org.apache.nifi.flow.ExternalControllerServiceReference; import org.apache.nifi.flow.ParameterProviderReference; import org.apache.nifi.flow.VersionedControllerService; import org.apache.nifi.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedPropertyDescriptor; +import org.apache.nifi.flow.VersionedReportingTask; +import org.apache.nifi.flow.VersionedReportingTaskSnapshot; import org.apache.nifi.flowanalysis.EnforcementPolicy; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; @@ -107,6 +110,7 @@ import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -942,4 +946,43 @@ public class StandardNiFiServiceFacadeTest { return processGroup; } + + @Test + public void testGenerateIdsForImportingReportingTaskSnapshot() { + final String originalServiceId = "s1"; + final VersionedControllerService service = new VersionedControllerService(); + service.setIdentifier(originalServiceId); + + final VersionedPropertyDescriptor serviceDescriptor = new VersionedPropertyDescriptor(); + serviceDescriptor.setName("My Service"); + serviceDescriptor.setIdentifiesControllerService(true); + + final Map<String, VersionedPropertyDescriptor> reportingTaskDescriptors = new HashMap<>(); + reportingTaskDescriptors.put(serviceDescriptor.getName(), serviceDescriptor); + + final Map<String, String> reportingTaskPropertyValues = new HashMap<>(); + reportingTaskPropertyValues.put(serviceDescriptor.getName(), service.getIdentifier()); + + final String originalReportingTaskId = "r1"; + final VersionedReportingTask reportingTask = new VersionedReportingTask(); + reportingTask.setIdentifier(originalReportingTaskId); + reportingTask.setPropertyDescriptors(reportingTaskDescriptors); + reportingTask.setProperties(reportingTaskPropertyValues); + + final VersionedReportingTaskSnapshot reportingTaskSnapshot = new VersionedReportingTaskSnapshot(); + reportingTaskSnapshot.setReportingTasks(Collections.singletonList(reportingTask)); + reportingTaskSnapshot.setControllerServices(Collections.singletonList(service)); + + serviceFacade.generateIdentifiersForImport(reportingTaskSnapshot, () -> UUID.randomUUID().toString()); + + assertNotNull(service.getIdentifier()); + assertNotNull(service.getInstanceIdentifier()); + assertNotEquals(originalServiceId, service.getIdentifier()); + + assertNotNull(reportingTask.getIdentifier()); + assertNotNull(reportingTask.getInstanceIdentifier()); + assertNotEquals(originalReportingTaskId, reportingTask.getIdentifier()); + + assertEquals(service.getInstanceIdentifier(), reportingTask.getProperties().get(serviceDescriptor.getName())); + } } \ No newline at end of file diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ControllerClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ControllerClient.java index f2880c317f..34b3cdce29 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ControllerClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ControllerClient.java @@ -24,6 +24,8 @@ import org.apache.nifi.web.api.entity.FlowRegistryClientsEntity; import org.apache.nifi.web.api.entity.NodeEntity; import org.apache.nifi.web.api.entity.ParameterProviderEntity; import org.apache.nifi.web.api.entity.ReportingTaskEntity; +import org.apache.nifi.web.api.entity.VersionedReportingTaskImportRequestEntity; +import org.apache.nifi.web.api.entity.VersionedReportingTaskImportResponseEntity; import java.io.IOException; @@ -56,6 +58,9 @@ public interface ControllerClient { ReportingTaskEntity createReportingTask(ReportingTaskEntity reportingTask) throws NiFiClientException, IOException; + VersionedReportingTaskImportResponseEntity importReportingTasks(VersionedReportingTaskImportRequestEntity importRequestEntity) + throws NiFiClientException, IOException; + ParameterProviderEntity createParamProvider(ParameterProviderEntity paramProvider) throws NiFiClientException, IOException; ControllerConfigurationEntity getControllerConfiguration() throws NiFiClientException, IOException; diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyControllerClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyControllerClient.java index b48381cbfd..fe15f9a2a5 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyControllerClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyControllerClient.java @@ -28,6 +28,8 @@ import org.apache.nifi.web.api.entity.FlowRegistryClientsEntity; import org.apache.nifi.web.api.entity.NodeEntity; import org.apache.nifi.web.api.entity.ParameterProviderEntity; import org.apache.nifi.web.api.entity.ReportingTaskEntity; +import org.apache.nifi.web.api.entity.VersionedReportingTaskImportRequestEntity; +import org.apache.nifi.web.api.entity.VersionedReportingTaskImportResponseEntity; import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; @@ -227,6 +229,23 @@ public class JerseyControllerClient extends AbstractJerseyClient implements Cont }); } + @Override + public VersionedReportingTaskImportResponseEntity importReportingTasks(VersionedReportingTaskImportRequestEntity importRequestEntity) + throws NiFiClientException, IOException { + if (importRequestEntity == null) { + throw new IllegalArgumentException("Import request entity cannot be null"); + } + + return executeAction("Error creating reporting task", () -> { + final WebTarget target = controllerTarget.path("reporting-tasks/import"); + + return getRequestBuilder(target).post( + Entity.entity(importRequestEntity, MediaType.APPLICATION_JSON), + VersionedReportingTaskImportResponseEntity.class + ); + }); + } + @Override public ParameterProviderEntity createParamProvider(final ParameterProviderEntity paramProvider) throws NiFiClientException, IOException { if (paramProvider == null) { diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java index 29feb8073d..c3df04d797 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java @@ -35,6 +35,7 @@ import org.apache.nifi.toolkit.cli.impl.command.nifi.flow.GetControllerConfigura import org.apache.nifi.toolkit.cli.impl.command.nifi.flow.GetReportingTask; import org.apache.nifi.toolkit.cli.impl.command.nifi.flow.GetReportingTasks; import org.apache.nifi.toolkit.cli.impl.command.nifi.flow.GetRootId; +import org.apache.nifi.toolkit.cli.impl.command.nifi.flow.ImportReportingTasks; import org.apache.nifi.toolkit.cli.impl.command.nifi.flow.StartReportingTasks; import org.apache.nifi.toolkit.cli.impl.command.nifi.flow.StopReportingTasks; import org.apache.nifi.toolkit.cli.impl.command.nifi.flow.UpdateControllerConfiguration; @@ -154,6 +155,7 @@ public class NiFiCommandGroup extends AbstractCommandGroup { commands.add(new StopReportingTasks()); commands.add(new ExportReportingTasks()); commands.add(new ExportReportingTask()); + commands.add(new ImportReportingTasks()); commands.add(new ListUsers()); commands.add(new CreateUser()); commands.add(new ListUserGroups()); diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/flow/ImportReportingTasks.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/flow/ImportReportingTasks.java new file mode 100644 index 0000000000..a83536a46f --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/flow/ImportReportingTasks.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.cli.impl.command.nifi.flow; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.cli.MissingOptionException; +import org.apache.nifi.flow.VersionedReportingTaskSnapshot; +import org.apache.nifi.toolkit.cli.api.CommandException; +import org.apache.nifi.toolkit.cli.api.Context; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.toolkit.cli.impl.command.CommandOption; +import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand; +import org.apache.nifi.toolkit.cli.impl.result.nifi.ImportReportingTasksResult; +import org.apache.nifi.toolkit.cli.impl.util.JacksonUtils; +import org.apache.nifi.web.api.entity.VersionedReportingTaskImportRequestEntity; +import org.apache.nifi.web.api.entity.VersionedReportingTaskImportResponseEntity; + +import java.io.IOException; +import java.util.Properties; + +public class ImportReportingTasks extends AbstractNiFiCommand<ImportReportingTasksResult> { + + public ImportReportingTasks() { + super("import-reporting-tasks", ImportReportingTasksResult.class); + } + + @Override + public void doInitialize(final Context context) { + addOption(CommandOption.INPUT_SOURCE.createOption()); + } + + @Override + public String getDescription() { + return "Imports the contents of a reporting task snapshot produced from export-reporting-tasks or export-reporting-task."; + } + + @Override + public ImportReportingTasksResult doExecute(final NiFiClient client, final Properties properties) throws NiFiClientException, IOException, MissingOptionException, CommandException { + final String inputFile = getRequiredArg(properties, CommandOption.INPUT_SOURCE); + final String contents = getInputSourceContent(inputFile); + + final ObjectMapper objectMapper = JacksonUtils.getObjectMapper(); + final VersionedReportingTaskSnapshot reportingTaskSnapshot = objectMapper.readValue(contents, VersionedReportingTaskSnapshot.class); + if (reportingTaskSnapshot == null) { + throw new IOException("Unable to deserialize reporting task snapshot from " + inputFile); + } + + final VersionedReportingTaskImportRequestEntity importRequestEntity = new VersionedReportingTaskImportRequestEntity(); + importRequestEntity.setReportingTaskSnapshot(reportingTaskSnapshot); + + final VersionedReportingTaskImportResponseEntity importResponseEntity = client.getControllerClient().importReportingTasks(importRequestEntity); + return new ImportReportingTasksResult(getResultType(properties), importResponseEntity); + } +} diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/nifi/ImportReportingTasksResult.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/nifi/ImportReportingTasksResult.java new file mode 100644 index 0000000000..183d1d19f5 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/nifi/ImportReportingTasksResult.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.cli.impl.result.nifi; + +import org.apache.nifi.toolkit.cli.api.ResultType; +import org.apache.nifi.toolkit.cli.impl.result.AbstractWritableResult; +import org.apache.nifi.toolkit.cli.impl.result.writer.DynamicTableWriter; +import org.apache.nifi.toolkit.cli.impl.result.writer.Table; +import org.apache.nifi.toolkit.cli.impl.result.writer.TableWriter; +import org.apache.nifi.web.api.dto.ControllerServiceDTO; +import org.apache.nifi.web.api.dto.ReportingTaskDTO; +import org.apache.nifi.web.api.entity.ControllerServiceEntity; +import org.apache.nifi.web.api.entity.ReportingTaskEntity; +import org.apache.nifi.web.api.entity.VersionedReportingTaskImportResponseEntity; + +import java.io.IOException; +import java.io.PrintStream; +import java.util.Comparator; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +public class ImportReportingTasksResult extends AbstractWritableResult<VersionedReportingTaskImportResponseEntity> { + + private static final String REPORTING_TASK_TYPE = "REPORTING_TASK"; + private static final String CONTROLLER_SERVICE_TYPE = "CONTROLLER_SERVICE"; + + private final VersionedReportingTaskImportResponseEntity responseEntity; + + public ImportReportingTasksResult(final ResultType resultType, final VersionedReportingTaskImportResponseEntity responseEntity) { + super(resultType); + this.responseEntity = responseEntity; + } + + @Override + public VersionedReportingTaskImportResponseEntity getResult() { + return responseEntity; + } + + @Override + protected void writeSimpleResult(final PrintStream output) throws IOException { + final Set<ReportingTaskEntity> tasksEntities = responseEntity.getReportingTasks(); + final Set<ControllerServiceEntity> serviceEntities = responseEntity.getControllerServices(); + if (tasksEntities == null || serviceEntities == null) { + return; + } + + final List<ReportingTaskDTO> taskDTOS = tasksEntities.stream() + .map(ReportingTaskEntity::getComponent) + .sorted(Comparator.comparing(ReportingTaskDTO::getName)) + .collect(Collectors.toList()); + + final List<ControllerServiceDTO> serviceDTOS = serviceEntities.stream() + .map(ControllerServiceEntity::getComponent) + .sorted(Comparator.comparing(ControllerServiceDTO::getName)) + .collect(Collectors.toList()); + + final Table table = new Table.Builder() + .column("#", 3, 3, false) + .column("Name", 5, 40, true) + .column("ID", 36, 36, false) + .column("Type", 15, 20, true) + .build(); + + int componentCount = 0; + for (final ReportingTaskDTO taskDTO : taskDTOS) { + table.addRow(String.valueOf(++componentCount), taskDTO.getName(), taskDTO.getId(), REPORTING_TASK_TYPE); + } + for (final ControllerServiceDTO serviceDTO : serviceDTOS) { + table.addRow(String.valueOf(++componentCount), serviceDTO.getName(), serviceDTO.getId(), CONTROLLER_SERVICE_TYPE); + } + + final TableWriter tableWriter = new DynamicTableWriter(); + tableWriter.write(table, output); + } +}