This is an automated email from the ASF dual-hosted git repository. ferdei pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 9d50c6dd53 NIFI-12372 [MiNiFi] Encrpyt raw flow sensitive properties 9d50c6dd53 is described below commit 9d50c6dd53c625e8cb44e28fda2af3500499f860 Author: Ferenc Kis <briansolo1...@gmail.com> AuthorDate: Wed Nov 15 14:40:03 2023 +0100 NIFI-12372 [MiNiFi] Encrpyt raw flow sensitive properties Signed-off-by: Ferenc Erdei <erdei.feren...@gmail.com> This closes #8028. --- .../apache/nifi/minifi/bootstrap/RunMiNiFi.java | 5 +- .../service/MiNiFiConfigurationChangeListener.java | 13 +- minifi/minifi-commons/minifi-commons-api/pom.xml | 8 + .../minifi/commons/service/FlowEnrichService.java | 35 +++ .../commons/service/FlowPropertyEncryptor.java | 35 +++ .../minifi/commons/service/FlowSerDeService.java | 43 +++ .../minifi-commons-framework/pom.xml | 10 + ...Service.java => StandardFlowEnrichService.java} | 68 +---- .../service/StandardFlowPropertyEncryptor.java | 162 +++++++++++ .../commons/service/StandardFlowSerDeService.java | 72 +++++ ...est.java => StandardFlowEnrichServiceTest.java} | 74 ++--- .../service/StandardFlowPropertyEncryptorTest.java | 320 +++++++++++++++++++++ .../apache/nifi/minifi/c2/C2NifiClientService.java | 18 +- .../DefaultUpdateConfigurationStrategy.java | 25 +- .../DefaultUpdateConfigurationStrategyTest.java | 29 +- 15 files changed, 794 insertions(+), 123 deletions(-) diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java index 66811ebfc8..cbd23dcfac 100644 --- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java +++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java @@ -46,7 +46,8 @@ import org.apache.nifi.minifi.bootstrap.service.ReloadService; import org.apache.nifi.minifi.bootstrap.util.ProcessUtils; import org.apache.nifi.minifi.bootstrap.util.UnixProcessUtils; import org.apache.nifi.minifi.commons.api.MiNiFiCommandState; -import org.apache.nifi.minifi.commons.service.FlowEnrichService; +import org.apache.nifi.minifi.commons.service.StandardFlowEnrichService; +import org.apache.nifi.minifi.commons.service.StandardFlowSerDeService; import org.apache.nifi.properties.ApplicationProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,7 +118,7 @@ public class RunMiNiFi implements ConfigurationFileHolder { periodicStatusReporterManager = new PeriodicStatusReporterManager(bootstrapProperties, miNiFiStatusProvider, miNiFiCommandSender, miNiFiParameters); MiNiFiConfigurationChangeListener configurationChangeListener = new MiNiFiConfigurationChangeListener(this, DEFAULT_LOGGER, bootstrapFileProvider, - new FlowEnrichService(new ApplicationProperties(bootstrapProperties))); + new StandardFlowEnrichService(new ApplicationProperties(bootstrapProperties)), StandardFlowSerDeService.defaultInstance()); configurationChangeCoordinator = new ConfigurationChangeCoordinator(bootstrapFileProvider, this, singleton(configurationChangeListener)); currentPortProvider = new CurrentPortProvider(miNiFiCommandSender, miNiFiParameters, processUtils); diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java index b5d30ffe09..ea0f04dd84 100644 --- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java +++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java @@ -35,11 +35,13 @@ import java.nio.file.Path; import java.util.Properties; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.io.FilenameUtils; +import org.apache.nifi.controller.flow.VersionedDataflow; import org.apache.nifi.minifi.bootstrap.RunMiNiFi; import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException; import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener; import org.apache.nifi.minifi.commons.api.MiNiFiProperties; import org.apache.nifi.minifi.commons.service.FlowEnrichService; +import org.apache.nifi.minifi.commons.service.FlowSerDeService; import org.slf4j.Logger; public class MiNiFiConfigurationChangeListener implements ConfigurationChangeListener { @@ -50,12 +52,15 @@ public class MiNiFiConfigurationChangeListener implements ConfigurationChangeLis private final Logger logger; private final BootstrapFileProvider bootstrapFileProvider; private final FlowEnrichService flowEnrichService; + private final FlowSerDeService flowSerDeService; - public MiNiFiConfigurationChangeListener(RunMiNiFi runner, Logger logger, BootstrapFileProvider bootstrapFileProvider, FlowEnrichService flowEnrichService) { + public MiNiFiConfigurationChangeListener(RunMiNiFi runner, Logger logger, BootstrapFileProvider bootstrapFileProvider, + FlowEnrichService flowEnrichService, FlowSerDeService flowSerDeService) { this.runner = runner; this.logger = logger; this.bootstrapFileProvider = bootstrapFileProvider; this.flowEnrichService = flowEnrichService; + this.flowSerDeService = flowSerDeService; } @Override @@ -83,8 +88,10 @@ public class MiNiFiConfigurationChangeListener implements ConfigurationChangeLis backup(currentRawFlowConfigFile, backupRawFlowConfigFile); byte[] rawFlow = toByteArray(flowConfigInputStream); - byte[] enrichedFlow = flowEnrichService.enrichFlow(rawFlow); - persist(enrichedFlow, currentFlowConfigFile, true); + VersionedDataflow rawDataFlow = flowSerDeService.deserialize(rawFlow); + VersionedDataflow enrichedFlow = flowEnrichService.enrichFlow(rawDataFlow); + byte[] serializedEnrichedFlow = flowSerDeService.serialize(enrichedFlow); + persist(serializedEnrichedFlow, currentFlowConfigFile, true); restartInstance(); persist(rawFlow, currentRawFlowConfigFile, false); setActiveFlowReference(wrap(rawFlow)); diff --git a/minifi/minifi-commons/minifi-commons-api/pom.xml b/minifi/minifi-commons/minifi-commons-api/pom.xml index 26196aad73..596b8ac6da 100644 --- a/minifi/minifi-commons/minifi-commons-api/pom.xml +++ b/minifi/minifi-commons/minifi-commons-api/pom.xml @@ -27,4 +27,12 @@ <artifactId>minifi-commons-api</artifactId> <packaging>jar</packaging> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-framework-core-api</artifactId> + <version>2.0.0-SNAPSHOT</version> + </dependency> + </dependencies> + </project> \ No newline at end of file diff --git a/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowEnrichService.java b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowEnrichService.java new file mode 100644 index 0000000000..5b3620803f --- /dev/null +++ b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowEnrichService.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.commons.service; + +import org.apache.nifi.controller.flow.VersionedDataflow; + +/** + * Defines interface methods used to implement a FlowEnrichService. + * The purpose of a flow enrich service is to enrich a VersionedDataFlow with various additional components specific to the MiNiFi instance + */ +public interface FlowEnrichService { + + /** + * Responsible for enriching a VersionedDataflow instance + * + * @param versionedDataflow a VersionedDataflow instance + * @return VersionedDataflow the enriched flow instance + */ + VersionedDataflow enrichFlow(VersionedDataflow versionedDataflow); +} diff --git a/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowPropertyEncryptor.java b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowPropertyEncryptor.java new file mode 100644 index 0000000000..661b4bd242 --- /dev/null +++ b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowPropertyEncryptor.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.commons.service; + +import org.apache.nifi.controller.flow.VersionedDataflow; + +/** + * Defines interface methods used to implement a FlowPropertyEncryptor. + * The purpose of a flow property encryptor is to encrypt sensitive properties in the flow using a particular strategy. + */ +public interface FlowPropertyEncryptor { + + /** + * Responsible for encrypting sensitive properties in a VersionedDataflow instance + * + * @param flow a VersionedDataflow instance to encrypt its sensitive properties + * @return VersionedDataflow the flow instance with encrypted sensitive properties + */ + VersionedDataflow encryptSensitiveProperties(VersionedDataflow flow); +} diff --git a/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowSerDeService.java b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowSerDeService.java new file mode 100644 index 0000000000..322503b77b --- /dev/null +++ b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowSerDeService.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.commons.service; + +import org.apache.nifi.controller.flow.VersionedDataflow; + +/** + * Defines interface methods used to implement a FlowSerDeService. + * The purpose of a flow serde service is to provide serialisation / deserialization for VersionedDataflow + */ +public interface FlowSerDeService { + + /** + * Responsible for serialising a VersionedDataflow instance + * + * @param flow a VersionedDataflow instance to be serialised + * @return byte[] VersionedDataflow in serialised format + */ + byte[] serialize(VersionedDataflow flow); + + /** + * Responsible for deserializing a VersionedDataflow instance + * + * @param flow a VersionedDataflow instance in serialised (byte array) format + * @return VersionedDataflow a deserialized instance + */ + VersionedDataflow deserialize(byte[] flow); +} diff --git a/minifi/minifi-commons/minifi-commons-framework/pom.xml b/minifi/minifi-commons/minifi-commons-framework/pom.xml index 3230d63f64..61d2bab185 100644 --- a/minifi/minifi-commons/minifi-commons-framework/pom.xml +++ b/minifi/minifi-commons/minifi-commons-framework/pom.xml @@ -47,6 +47,16 @@ limitations under the License. <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-jaxb-annotations</artifactId> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-property-encryptor</artifactId> + <version>2.0.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-framework-core</artifactId> + <version>2.0.0-SNAPSHOT</version> + </dependency> </dependencies> <build> diff --git a/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/FlowEnrichService.java b/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowEnrichService.java similarity index 80% rename from minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/FlowEnrichService.java rename to minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowEnrichService.java index 0f7b03d1e8..6a73992d15 100644 --- a/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/FlowEnrichService.java +++ b/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowEnrichService.java @@ -19,7 +19,6 @@ package org.apache.nifi.minifi.commons.service; import static java.lang.Boolean.FALSE; import static java.lang.Boolean.parseBoolean; -import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Map.entry; import static java.util.Optional.empty; import static java.util.Optional.ofNullable; @@ -28,21 +27,12 @@ import static java.util.stream.Collectors.toMap; import static org.apache.commons.lang3.StringUtils.EMPTY; import static org.apache.nifi.flow.ScheduledState.ENABLED; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.controller.flow.VersionedDataflow; -import org.apache.nifi.controller.serialization.FlowSerializationException; import org.apache.nifi.flow.Bundle; import org.apache.nifi.flow.ComponentType; import org.apache.nifi.flow.ControllerServiceAPI; @@ -57,7 +47,7 @@ import org.apache.nifi.properties.ReadableProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FlowEnrichService { +public class StandardFlowEnrichService implements FlowEnrichService { static final String DEFAULT_SSL_CONTEXT_SERVICE_NAME = "SSL Context Service"; @@ -66,7 +56,7 @@ public class FlowEnrichService { static final String SITE_TO_SITE_PROVENANCE_REPORTING_TASK_NAME = "Site-To-Site-Provenance-Reporting"; static final String SITE_TO_SITE_PROVENANCE_REPORTING_TASK_ID = "generated-s2s-provenance-reporting-task"; - private static final Logger LOG = LoggerFactory.getLogger(FlowEnrichService.class); + private static final Logger LOG = LoggerFactory.getLogger(StandardFlowEnrichService.class); private static final String NIFI_BUNDLE_GROUP = "org.apache.nifi"; private static final String STANDARD_RESTRICTED_SSL_CONTEXT_SERVICE = "org.apache.nifi.ssl.StandardRestrictedSSLContextService"; @@ -81,16 +71,12 @@ public class FlowEnrichService { private final ReadableProperties minifiProperties; - public FlowEnrichService(ReadableProperties minifiProperties) { + public StandardFlowEnrichService(ReadableProperties minifiProperties) { this.minifiProperties = minifiProperties; } - public byte[] enrichFlow(byte[] flowCandidate) { - if (LOG.isDebugEnabled()) { - LOG.debug("Enriching flow with content: \n{}", new String(flowCandidate, UTF_8)); - } - - VersionedDataflow versionedDataflow = parseVersionedDataflow(flowCandidate); + @Override + public VersionedDataflow enrichFlow(VersionedDataflow versionedDataflow) { versionedDataflow.setReportingTasks(ofNullable(versionedDataflow.getReportingTasks()).orElseGet(ArrayList::new)); versionedDataflow.setRegistries(ofNullable(versionedDataflow.getRegistries()).orElseGet(ArrayList::new)); versionedDataflow.setControllerServices(ofNullable(versionedDataflow.getControllerServices()).orElseGet(ArrayList::new)); @@ -119,27 +105,8 @@ public class FlowEnrichService { createProvenanceReportingTask(commonSslControllerService.map(VersionedComponent::getInstanceIdentifier).orElse(EMPTY)) .ifPresent(versionedDataflow.getReportingTasks()::add); - byte[] enrichedFlow = toByteArray(versionedDataflow); - if (LOG.isDebugEnabled()) { - LOG.debug("Enriched flow with content: \n{}", new String(enrichedFlow, UTF_8)); - } - return enrichedFlow; - } - - private VersionedDataflow parseVersionedDataflow(byte[] flow) { - try { - ObjectMapper objectMapper = deserializationObjectMapper(); - return objectMapper.readValue(flow, VersionedDataflow.class); - } catch (final Exception e) { - throw new FlowSerializationException("Could not parse flow as a VersionedDataflow", e); - } - } - private ObjectMapper deserializationObjectMapper() { - ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector(objectMapper.getTypeFactory())); - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - return objectMapper; + return versionedDataflow; } private Optional<VersionedControllerService> createCommonSslControllerService() { @@ -260,27 +227,4 @@ public class FlowEnrichService { .filter(entry -> StringUtils.isNotBlank(entry.getValue())) .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)); } - - private byte[] toByteArray(VersionedDataflow versionedDataflow) { - try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) { - JsonFactory factory = new JsonFactory(); - JsonGenerator generator = factory.createGenerator(byteArrayOutputStream); - generator.setCodec(serializationObjectMapper()); - generator.writeObject(versionedDataflow); - generator.flush(); - byteArrayOutputStream.flush(); - return byteArrayOutputStream.toByteArray(); - } catch (IOException e) { - throw new RuntimeException("Unable to convert flow to byte array", e); - } - } - - private ObjectMapper serializationObjectMapper() { - ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); - objectMapper.setDefaultPropertyInclusion(JsonInclude.Value.construct(JsonInclude.Include.NON_NULL, JsonInclude.Include.NON_NULL)); - objectMapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector(objectMapper.getTypeFactory())); - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - return objectMapper; - } } diff --git a/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowPropertyEncryptor.java b/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowPropertyEncryptor.java new file mode 100644 index 0000000000..32f8746d7e --- /dev/null +++ b/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowPropertyEncryptor.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.commons.service; + +import static java.util.Optional.ofNullable; +import static java.util.function.Predicate.not; +import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toSet; +import static java.util.stream.Stream.concat; +import static org.apache.commons.lang3.StringUtils.EMPTY; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Stream; +import org.apache.nifi.c2.protocol.component.api.DefinedType; +import org.apache.nifi.c2.protocol.component.api.PropertyDescriptor; +import org.apache.nifi.c2.protocol.component.api.RuntimeManifest; +import org.apache.nifi.controller.flow.VersionedDataflow; +import org.apache.nifi.controller.serialization.FlowSerializer; +import org.apache.nifi.encrypt.PropertyEncryptor; +import org.apache.nifi.flow.VersionedConfigurableExtension; +import org.apache.nifi.flow.VersionedParameter; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flow.VersionedPropertyDescriptor; + +public class StandardFlowPropertyEncryptor implements FlowPropertyEncryptor { + + private static final String ENCRYPTED_FORMAT = "enc{%s}"; + + private final PropertyEncryptor propertyEncryptor; + private final RuntimeManifest runTimeManifest; + + public StandardFlowPropertyEncryptor(PropertyEncryptor propertyEncryptor, RuntimeManifest runTimeManifest) { + this.propertyEncryptor = propertyEncryptor; + this.runTimeManifest = runTimeManifest; + } + + @Override + public VersionedDataflow encryptSensitiveProperties(VersionedDataflow flow) { + encryptParameterContextsProperties(flow); + + Map<String, Set<String>> sensitivePropertiesByComponentType = Optional.of(flowProvidedSensitiveProperties(flow)) + .filter(not(Map::isEmpty)) + .orElseGet(this::runtimeManifestSensitiveProperties); + + encryptFlowComponentsProperties(flow, sensitivePropertiesByComponentType); + + return flow; + } + + private void encryptParameterContextsProperties(VersionedDataflow flow) { + ofNullable(flow.getParameterContexts()) + .orElse(List.of()) + .forEach(parameterContext -> ofNullable(parameterContext.getParameters()).orElse(Set.of()) + .stream() + .filter(VersionedParameter::isSensitive) + .filter(not(parameter -> ofNullable(parameter.getValue()).orElse(EMPTY).startsWith(FlowSerializer.ENC_PREFIX))) + .forEach(parameter -> parameter.setValue(encrypt(parameter.getValue())))); + } + + private Map<String, Set<String>> flowProvidedSensitiveProperties(VersionedDataflow flow) { + return fetchFlowComponents(flow) + .map(extension -> Map.entry( + extension.getType(), + ofNullable(extension.getPropertyDescriptors()).orElse(Map.of()) + .values() + .stream() + .filter(VersionedPropertyDescriptor::isSensitive) + .map(VersionedPropertyDescriptor::getName) + .collect(toSet()) + )) + .filter(not(entry -> entry.getValue().isEmpty())) + .collect(toMap(Entry::getKey, Entry::getValue, this::mergeSets)); + } + + private Map<String, Set<String>> runtimeManifestSensitiveProperties() { + return ofNullable(runTimeManifest.getBundles()).orElse(List.of()) + .stream() + .flatMap(bundle -> Stream.of( + ofNullable(bundle.getComponentManifest().getProcessors()).orElse(List.of()), + ofNullable(bundle.getComponentManifest().getControllerServices()).orElse(List.of()) + )) + .flatMap(List::stream) + .collect(toMap( + DefinedType::getType, + type -> ofNullable(type.getPropertyDescriptors()).orElse(Map.of()) + .values() + .stream() + .filter(PropertyDescriptor::getSensitive) + .map(PropertyDescriptor::getName) + .collect(toSet()), + this::mergeSets + )); + } + + private void encryptFlowComponentsProperties(VersionedDataflow flow, Map<String, Set<String>> sensitivePropertiesByComponentType) { + fetchFlowComponents(flow) + .forEach(extension -> { + Set<String> sensitivePropertyNames = sensitivePropertiesByComponentType.getOrDefault(extension.getType(), Set.of()); + Map<String, String> encryptedProperties = ofNullable(extension.getProperties()).orElse(Map.of()) + .entrySet() + .stream() + .collect(toMap(Entry::getKey, encryptPropertyIfNeeded(sensitivePropertyNames))); + extension.setProperties(encryptedProperties); + }); + } + + private Stream<? extends VersionedConfigurableExtension> fetchFlowComponents(VersionedDataflow flow) { + return concat( + ofNullable(flow.getControllerServices()).orElse(List.of()).stream(), + fetchComponentsRecursively(flow.getRootGroup()) + ); + } + + private Stream<? extends VersionedConfigurableExtension> fetchComponentsRecursively(VersionedProcessGroup processGroup) { + return concat( + Stream.of( + ofNullable(processGroup.getProcessors()).orElse(Set.of()), + ofNullable(processGroup.getControllerServices()).orElse(Set.of())) + .flatMap(Set::stream), + ofNullable(processGroup.getProcessGroups()).orElse(Set.of()).stream() + .flatMap(this::fetchComponentsRecursively) + ); + } + + private Set<String> mergeSets(Set<String> first, Set<String> second) { + first.addAll(second); + return first; + } + + private Function<Entry<String, String>, String> encryptPropertyIfNeeded(Set<String> sensitivePropertyNames) { + return entry -> + sensitivePropertyNames.contains(entry.getKey()) && !entry.getValue().startsWith(FlowSerializer.ENC_PREFIX) + ? encrypt(entry.getValue()) + : entry.getValue(); + } + + private String encrypt(String parameter) { + return String.format(ENCRYPTED_FORMAT, propertyEncryptor.encrypt(parameter)); + } + + +} diff --git a/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowSerDeService.java b/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowSerDeService.java new file mode 100644 index 0000000000..d5db712b58 --- /dev/null +++ b/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowSerDeService.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.commons.service; + +import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.apache.nifi.controller.flow.VersionedDataflow; +import org.apache.nifi.controller.serialization.FlowSerializationException; + +public class StandardFlowSerDeService implements FlowSerDeService { + + private final ObjectMapper objectMapper; + + StandardFlowSerDeService(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + public static StandardFlowSerDeService defaultInstance() { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); + objectMapper.setDefaultPropertyInclusion(JsonInclude.Value.construct(JsonInclude.Include.NON_NULL, JsonInclude.Include.NON_NULL)); + objectMapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector(objectMapper.getTypeFactory())); + objectMapper.configure(FAIL_ON_UNKNOWN_PROPERTIES, false); + return new StandardFlowSerDeService(objectMapper); + } + + @Override + public byte[] serialize(VersionedDataflow flow) { + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) { + JsonFactory factory = new JsonFactory(); + JsonGenerator generator = factory.createGenerator(byteArrayOutputStream); + generator.setCodec(objectMapper); + generator.writeObject(flow); + generator.flush(); + byteArrayOutputStream.flush(); + return byteArrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new FlowSerializationException("Unable to serialize flow", e); + } + } + + @Override + public VersionedDataflow deserialize(byte[] flow) { + try { + return objectMapper.readValue(flow, VersionedDataflow.class); + } catch (Exception e) { + throw new FlowSerializationException("Unable to deserialize flow", e); + } + } +} diff --git a/minifi/minifi-commons/minifi-commons-framework/src/test/java/org/apache/nifi/minifi/commons/service/FlowEnrichServiceTest.java b/minifi/minifi-commons/minifi-commons-framework/src/test/java/org/apache/nifi/minifi/commons/service/StandardFlowEnrichServiceTest.java similarity index 75% rename from minifi/minifi-commons/minifi-commons-framework/src/test/java/org/apache/nifi/minifi/commons/service/FlowEnrichServiceTest.java rename to minifi/minifi-commons/minifi-commons-framework/src/test/java/org/apache/nifi/minifi/commons/service/StandardFlowEnrichServiceTest.java index c43ab70551..84493e8506 100644 --- a/minifi/minifi-commons/minifi-commons-framework/src/test/java/org/apache/nifi/minifi/commons/service/FlowEnrichServiceTest.java +++ b/minifi/minifi-commons/minifi-commons-framework/src/test/java/org/apache/nifi/minifi/commons/service/StandardFlowEnrichServiceTest.java @@ -18,9 +18,10 @@ package org.apache.nifi.minifi.commons.service; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.nifi.minifi.commons.service.FlowEnrichService.COMMON_SSL_CONTEXT_SERVICE_NAME; -import static org.apache.nifi.minifi.commons.service.FlowEnrichService.DEFAULT_SSL_CONTEXT_SERVICE_NAME; -import static org.apache.nifi.minifi.commons.service.FlowEnrichService.SITE_TO_SITE_PROVENANCE_REPORTING_TASK_NAME; +import static java.util.UUID.randomUUID; +import static org.apache.nifi.minifi.commons.service.StandardFlowEnrichService.COMMON_SSL_CONTEXT_SERVICE_NAME; +import static org.apache.nifi.minifi.commons.service.StandardFlowEnrichService.DEFAULT_SSL_CONTEXT_SERVICE_NAME; +import static org.apache.nifi.minifi.commons.service.StandardFlowEnrichService.SITE_TO_SITE_PROVENANCE_REPORTING_TASK_NAME; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -33,6 +34,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; import java.nio.file.Files; import java.nio.file.Path; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -46,76 +48,64 @@ import org.apache.nifi.flow.VersionedProcessor; import org.apache.nifi.flow.VersionedReportingTask; import org.apache.nifi.minifi.commons.api.MiNiFiProperties; import org.apache.nifi.properties.StandardReadableProperties; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.MockedStatic; -public class FlowEnrichServiceTest { +public class StandardFlowEnrichServiceTest { private static final Path DEFAULT_FLOW_JSON = Path.of("src/test/resources/default_flow.json"); @Test public void testFlowIsLeftIntactIfEnrichingIsNotNecessary() { - // given Map<String, String> properties = Map.of(); - byte[] testFlowBytes = flowToString(loadDefaultFlow()).getBytes(UTF_8); + VersionedDataflow testFlow = loadDefaultFlow(); - // when - FlowEnrichService testFlowEnrichService = new FlowEnrichService(new StandardReadableProperties(properties)); - byte[] enrichedFlowBytes = testFlowEnrichService.enrichFlow(testFlowBytes); + FlowEnrichService testFlowEnrichService = new StandardFlowEnrichService(new StandardReadableProperties(properties)); + VersionedDataflow enrichedFlow = testFlowEnrichService.enrichFlow(testFlow); - // then + byte[] testFlowBytes = flowToString(testFlow).getBytes(UTF_8); + byte[] enrichedFlowBytes = flowToString(enrichedFlow).getBytes(UTF_8); assertArrayEquals(testFlowBytes, enrichedFlowBytes); } @Test public void testMissingRootGroupIdsAreFilledIn() { - // given Map<String, String> properties = Map.of(); VersionedDataflow testFlow = loadDefaultFlow(); testFlow.getRootGroup().setIdentifier(null); testFlow.getRootGroup().setInstanceIdentifier(null); - byte[] testFlowBytes = flowToString(testFlow).getBytes(UTF_8); - UUID expectedIdentifier = UUID.randomUUID(); + UUID expectedIdentifier = randomUUID(); try (MockedStatic<UUID> uuid = mockStatic(UUID.class)) { uuid.when(UUID::randomUUID).thenReturn(expectedIdentifier); - // when - FlowEnrichService testFlowEnrichService = new FlowEnrichService(new StandardReadableProperties(properties)); - byte[] enrichedFlowBytes = testFlowEnrichService.enrichFlow(testFlowBytes); + FlowEnrichService testFlowEnrichService = new StandardFlowEnrichService(new StandardReadableProperties(properties)); + VersionedDataflow enrichedFlow = testFlowEnrichService.enrichFlow(testFlow); - // then - VersionedDataflow versionedDataflow = flowFromString(new String(enrichedFlowBytes, UTF_8)); - Assertions.assertEquals(expectedIdentifier.toString(), versionedDataflow.getRootGroup().getIdentifier()); - Assertions.assertEquals(expectedIdentifier.toString(), versionedDataflow.getRootGroup().getInstanceIdentifier()); + assertEquals(expectedIdentifier.toString(), enrichedFlow.getRootGroup().getIdentifier()); + assertEquals(expectedIdentifier.toString(), enrichedFlow.getRootGroup().getInstanceIdentifier()); } } @Test public void testCommonSslControllerServiceIsAddedWithBundleVersionAndProcessorControllerServiceIsOverridden() { - // given Map<String, String> properties = securityProperties(true); - VersionedDataflow versionedDataflow = loadDefaultFlow(); + VersionedDataflow testFlow = loadDefaultFlow(); Bundle bundle = bundle("org.apache.nifi", "nifi-ssl-context-service-nar", StringUtils.EMPTY); String originalSslControllerServiceId = "original_ssl_controller_service_id"; - versionedDataflow.getRootGroup() + testFlow.getRootGroup() .setProcessors(Set.of( processor(bundle, "processor_1", originalSslControllerServiceId), processor(bundle, "processor_2", originalSslControllerServiceId) )); - byte[] testFlowBytes = flowToString(versionedDataflow).getBytes(UTF_8); - // when - FlowEnrichService testFlowEnrichService = new FlowEnrichService(new StandardReadableProperties(properties)); - byte[] enrichedFlowBytes = testFlowEnrichService.enrichFlow(testFlowBytes); + FlowEnrichService testFlowEnrichService = new StandardFlowEnrichService(new StandardReadableProperties(properties)); + VersionedDataflow enrichedFlow = testFlowEnrichService.enrichFlow(testFlow); - // then - VersionedDataflow enrichedFlow = flowFromString(new String(enrichedFlowBytes, UTF_8)); - Assertions.assertEquals(1, enrichedFlow.getControllerServices().size()); + assertEquals(1, enrichedFlow.getControllerServices().size()); VersionedControllerService sslControllerService = enrichedFlow.getControllerServices().get(0); assertEquals(COMMON_SSL_CONTEXT_SERVICE_NAME, sslControllerService.getName()); - Assertions.assertEquals(StringUtils.EMPTY, sslControllerService.getBundle().getVersion()); + assertEquals(StringUtils.EMPTY, sslControllerService.getBundle().getVersion()); Set<VersionedProcessor> processors = enrichedFlow.getRootGroup().getProcessors(); assertEquals(2, processors.size()); assertTrue( @@ -128,7 +118,6 @@ public class FlowEnrichServiceTest { @Test public void testProvenanceReportingTaskIsAdded() { - // given Map<String, String> properties = Map.of( MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMMENT.getKey(), "comment", MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_STRATEGY.getKey(), "timer_driven", @@ -140,21 +129,18 @@ public class FlowEnrichServiceTest { MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_BATCH_SIZE.getKey(), "1000", MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMMUNICATIONS_TIMEOUT.getKey(), "30 sec" ); - byte[] testFlowBytes = flowToString(loadDefaultFlow()).getBytes(UTF_8); + VersionedDataflow testFlow = loadDefaultFlow(); - // when - FlowEnrichService testFlowEnrichService = new FlowEnrichService(new StandardReadableProperties(properties)); - byte[] enrichedFlowBytes = testFlowEnrichService.enrichFlow(testFlowBytes); + FlowEnrichService testFlowEnrichService = new StandardFlowEnrichService(new StandardReadableProperties(properties)); + VersionedDataflow enrichedFlow = testFlowEnrichService.enrichFlow(testFlow); - // then - VersionedDataflow enrichedFlow = flowFromString(new String(enrichedFlowBytes, UTF_8)); List<VersionedReportingTask> reportingTasks = enrichedFlow.getReportingTasks(); assertEquals(1, reportingTasks.size()); VersionedReportingTask provenanceReportingTask = reportingTasks.get(0); assertEquals(SITE_TO_SITE_PROVENANCE_REPORTING_TASK_NAME, provenanceReportingTask.getName()); - Assertions.assertEquals(properties.get(MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMMENT.getKey()), provenanceReportingTask.getComments()); - Assertions.assertEquals(properties.get(MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_STRATEGY.getKey()), provenanceReportingTask.getSchedulingStrategy()); - Assertions.assertEquals(properties.get(MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_PERIOD.getKey()), provenanceReportingTask.getSchedulingPeriod()); + assertEquals(properties.get(MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMMENT.getKey()), provenanceReportingTask.getComments()); + assertEquals(properties.get(MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_STRATEGY.getKey()), provenanceReportingTask.getSchedulingStrategy()); + assertEquals(properties.get(MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_PERIOD.getKey()), provenanceReportingTask.getSchedulingPeriod()); Map<String, String> provenanceReportingTaskProperties = provenanceReportingTask.getProperties(); assertEquals(properties.get(MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_INPUT_PORT_NAME.getKey()), provenanceReportingTaskProperties.get("Input Port Name")); assertEquals(properties.get(MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_DESTINATION_URL.getKey()), provenanceReportingTaskProperties.get("Destination URL")); @@ -224,10 +210,10 @@ public class FlowEnrichServiceTest { private VersionedProcessor processor(Bundle bundle, String name, String originalSslControllerServiceId) { VersionedProcessor versionedProcessor = new VersionedProcessor(); - versionedProcessor.setIdentifier(UUID.randomUUID().toString()); + versionedProcessor.setIdentifier(randomUUID().toString()); versionedProcessor.setName(name); versionedProcessor.setBundle(bundle); - versionedProcessor.setProperties(Map.of(DEFAULT_SSL_CONTEXT_SERVICE_NAME, originalSslControllerServiceId)); + versionedProcessor.setProperties(new HashMap<>(Map.of(DEFAULT_SSL_CONTEXT_SERVICE_NAME, originalSslControllerServiceId))); return versionedProcessor; } } diff --git a/minifi/minifi-commons/minifi-commons-framework/src/test/java/org/apache/nifi/minifi/commons/service/StandardFlowPropertyEncryptorTest.java b/minifi/minifi-commons/minifi-commons-framework/src/test/java/org/apache/nifi/minifi/commons/service/StandardFlowPropertyEncryptorTest.java new file mode 100644 index 0000000000..639106677f --- /dev/null +++ b/minifi/minifi-commons/minifi-commons-framework/src/test/java/org/apache/nifi/minifi/commons/service/StandardFlowPropertyEncryptorTest.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.commons.service; + +import static java.util.Map.entry; +import static java.util.UUID.randomUUID; +import static java.util.stream.Collectors.toMap; +import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.stream.Stream; +import org.apache.nifi.c2.protocol.component.api.Bundle; +import org.apache.nifi.c2.protocol.component.api.ComponentManifest; +import org.apache.nifi.c2.protocol.component.api.ControllerServiceDefinition; +import org.apache.nifi.c2.protocol.component.api.ProcessorDefinition; +import org.apache.nifi.c2.protocol.component.api.PropertyDescriptor; +import org.apache.nifi.c2.protocol.component.api.RuntimeManifest; +import org.apache.nifi.controller.flow.VersionedDataflow; +import org.apache.nifi.controller.serialization.FlowSerializer; +import org.apache.nifi.encrypt.PropertyEncryptor; +import org.apache.nifi.flow.VersionedConfigurableExtension; +import org.apache.nifi.flow.VersionedControllerService; +import org.apache.nifi.flow.VersionedParameter; +import org.apache.nifi.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flow.VersionedProcessor; +import org.apache.nifi.flow.VersionedPropertyDescriptor; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class StandardFlowPropertyEncryptorTest { + + private static final String PROCESSOR_TYPE_1 = "processor_type_1"; + private static final String PROCESSOR_TYPE_2 = "processor_type_2"; + private static final String PROCESSOR_TYPE_3 = "processor_type_3"; + private static final String CONTROLLER_SERVICE_TYPE_1 = "controller_service_type_1"; + private static final String CONTROLLER_SERVICE_TYPE_2 = "controller_service_type_2"; + private static final String CONTROLLER_SERVICE_TYPE_3 = "controller_service_type_3"; + + private static final String SENSITIVE_PROPERTY_NAME_PREFIX = "sensitive"; + + private static final String NON_SENSITIVE_1 = "non-sensitive-1"; + private static final String SENSITIVE_1 = SENSITIVE_PROPERTY_NAME_PREFIX + "-1"; + private static final String NON_SENSITIVE_2 = "non-sensitive-2"; + private static final String SENSITIVE_3 = SENSITIVE_PROPERTY_NAME_PREFIX + "-3"; + + private static final Map<String, String> PARAMETERS1 = Map.of( + NON_SENSITIVE_1, NON_SENSITIVE_1, + SENSITIVE_1, SENSITIVE_1 + ); + private static final Map<String, String> PARAMETERS2 = Map.of( + NON_SENSITIVE_2, NON_SENSITIVE_2 + ); + + private static final Map<String, String> PARAMETERS3 = Map.of( + SENSITIVE_3, SENSITIVE_3 + ); + private static final Map<String, VersionedPropertyDescriptor> DESCRIPTORS1 = Map.of( + NON_SENSITIVE_1, versionedPropertyDescriptor(NON_SENSITIVE_1, false), + SENSITIVE_1, versionedPropertyDescriptor(SENSITIVE_1, true) + ); + private static final Map<String, VersionedPropertyDescriptor> DESCRIPTORS2 = Map.of( + NON_SENSITIVE_2, versionedPropertyDescriptor(NON_SENSITIVE_2, false) + ); + private static final Map<String, VersionedPropertyDescriptor> DESCRIPTORS3 = Map.of( + SENSITIVE_3, versionedPropertyDescriptor(SENSITIVE_3, true) + ); + + + @Mock + private PropertyEncryptor mockPropertyEncryptor; + @Mock + private RuntimeManifest mockRunTimeManifest; + + private FlowPropertyEncryptor testEncryptor; + + private static VersionedPropertyDescriptor versionedPropertyDescriptor(String name, boolean isSensitive) { + VersionedPropertyDescriptor versionedPropertyDescriptor = new VersionedPropertyDescriptor(); + versionedPropertyDescriptor.setName(name); + versionedPropertyDescriptor.setSensitive(isSensitive); + return versionedPropertyDescriptor; + } + + @BeforeEach + public void setup() { + when(mockPropertyEncryptor.encrypt(anyString())).thenReturn(randomAlphabetic(5)); + testEncryptor = new StandardFlowPropertyEncryptor(mockPropertyEncryptor, mockRunTimeManifest); + } + + @Test + public void shouldEncryptParameterContextsSensitiveVariables() { + VersionedDataflow testFlow = flowWithParameterContexts(); + + VersionedDataflow encryptedFlow = testEncryptor.encryptSensitiveProperties(testFlow); + + encryptedFlow.getParameterContexts().stream() + .flatMap(context -> context.getParameters().stream()) + .forEach(parameter -> { + if (parameter.isSensitive()) { + assertTrue(parameter.getValue().startsWith(FlowSerializer.ENC_PREFIX)); + } else { + assertFalse(parameter.getValue().startsWith(FlowSerializer.ENC_PREFIX)); + } + }); + } + + @Test + public void shouldEncryptPropertiesUsingDescriptorsFromFlow() { + VersionedDataflow testFlow = flowWithPropertyDescriptors(); + + VersionedDataflow encryptedFlow = testEncryptor.encryptSensitiveProperties(testFlow); + + verify(mockRunTimeManifest, never()).getBundles(); + assertSensitiveFlowComponentPropertiesAreEncoded(encryptedFlow); + } + + @Test + public void shouldEncryptPropertiesUsingDescriptorsFromRuntimeManifest() { + VersionedDataflow testFlow = flowWithoutPropertyDescriptors(); + when(mockRunTimeManifest.getBundles()).thenReturn(runTimeManifestBundles()); + + VersionedDataflow encryptedFlow = testEncryptor.encryptSensitiveProperties(testFlow); + + assertSensitiveFlowComponentPropertiesAreEncoded(encryptedFlow); + } + + private VersionedDataflow flowWithParameterContexts() { + VersionedDataflow versionedDataflow = new VersionedDataflow(); + versionedDataflow.setRootGroup(new VersionedProcessGroup()); + versionedDataflow.setParameterContexts( + List.of( + parameterContext( + parameter(NON_SENSITIVE_1, NON_SENSITIVE_1, false), + parameter(SENSITIVE_1, SENSITIVE_1, true) + ), + parameterContext( + parameter(NON_SENSITIVE_2, NON_SENSITIVE_2, false) + ), + parameterContext( + parameter(SENSITIVE_3, SENSITIVE_3, true) + ) + ) + ); + return versionedDataflow; + } + + private VersionedParameterContext parameterContext(VersionedParameter... parameters) { + VersionedParameterContext versionedParameterContext = new VersionedParameterContext(); + versionedParameterContext.setParameters(Set.of(parameters)); + return versionedParameterContext; + } + + private VersionedParameter parameter(String name, String value, boolean sensitive) { + VersionedParameter versionedParameter = new VersionedParameter(); + versionedParameter.setName(name); + versionedParameter.setValue(value); + versionedParameter.setSensitive(sensitive); + return versionedParameter; + } + + private VersionedDataflow flowWithPropertyDescriptors() { + VersionedDataflow versionedDataflow = new VersionedDataflow(); + VersionedProcessGroup versionedProcessGroup = new VersionedProcessGroup(); + versionedProcessGroup.setProcessors( + Set.of( + versionedProcessor(PROCESSOR_TYPE_1, PARAMETERS1, DESCRIPTORS1), + versionedProcessor(PROCESSOR_TYPE_2, PARAMETERS2, DESCRIPTORS2), + versionedProcessor(PROCESSOR_TYPE_3, PARAMETERS3, DESCRIPTORS3) + )); + versionedProcessGroup.setControllerServices( + Set.of( + versionedControllerService(CONTROLLER_SERVICE_TYPE_1, PARAMETERS1, DESCRIPTORS1), + versionedControllerService(CONTROLLER_SERVICE_TYPE_2, PARAMETERS2, DESCRIPTORS2), + versionedControllerService(CONTROLLER_SERVICE_TYPE_3, PARAMETERS3, DESCRIPTORS3) + ) + ); + versionedDataflow.setRootGroup(versionedProcessGroup); + return versionedDataflow; + } + + private VersionedDataflow flowWithoutPropertyDescriptors() { + VersionedDataflow versionedDataflow = new VersionedDataflow(); + VersionedProcessGroup versionedProcessGroup = new VersionedProcessGroup(); + versionedProcessGroup.setProcessors( + Set.of( + versionedProcessor(PROCESSOR_TYPE_1, PARAMETERS1, Map.of()), + versionedProcessor(PROCESSOR_TYPE_2, PARAMETERS2, Map.of()), + versionedProcessor(PROCESSOR_TYPE_3, PARAMETERS3, Map.of()) + )); + versionedProcessGroup.setControllerServices( + Set.of( + versionedControllerService(CONTROLLER_SERVICE_TYPE_1, PARAMETERS1, Map.of()), + versionedControllerService(CONTROLLER_SERVICE_TYPE_2, PARAMETERS2, Map.of()), + versionedControllerService(CONTROLLER_SERVICE_TYPE_3, PARAMETERS3, Map.of()) + ) + ); + versionedDataflow.setRootGroup(versionedProcessGroup); + return versionedDataflow; + } + + private VersionedProcessor versionedProcessor(String processorType, Map<String, String> properties, Map<String, VersionedPropertyDescriptor> propertyDescriptors) { + VersionedProcessor versionedProcessor = new VersionedProcessor(); + versionedProcessor.setIdentifier(randomUUID().toString()); + versionedProcessor.setType(processorType); + versionedProcessor.setProperties(properties); + versionedProcessor.setPropertyDescriptors(propertyDescriptors); + return versionedProcessor; + } + + private VersionedControllerService versionedControllerService(String controllerServiceType, Map<String, String> properties, + Map<String, VersionedPropertyDescriptor> propertyDescriptors) { + VersionedControllerService versionedControllerService = new VersionedControllerService(); + versionedControllerService.setIdentifier(randomUUID().toString()); + versionedControllerService.setType(controllerServiceType); + versionedControllerService.setProperties(properties); + versionedControllerService.setPropertyDescriptors(propertyDescriptors); + return versionedControllerService; + } + + private List<Bundle> runTimeManifestBundles() { + return List.of( + bundle( + List.of(processorDefinition(PROCESSOR_TYPE_1, DESCRIPTORS1), processorDefinition(PROCESSOR_TYPE_2, DESCRIPTORS2)), + List.of(controllerServiceDefinition(CONTROLLER_SERVICE_TYPE_1, DESCRIPTORS1)) + ), + bundle( + List.of(processorDefinition(PROCESSOR_TYPE_3, DESCRIPTORS3)), + List.of(controllerServiceDefinition(CONTROLLER_SERVICE_TYPE_2, DESCRIPTORS2), controllerServiceDefinition(CONTROLLER_SERVICE_TYPE_3, DESCRIPTORS3)) + ) + ); + } + + private Bundle bundle(List<ProcessorDefinition> processorDefinition, List<ControllerServiceDefinition> controllerServiceDefinition) { + Bundle bundle = new Bundle(); + ComponentManifest componentManifest = new ComponentManifest(); + componentManifest.setProcessors(processorDefinition); + componentManifest.setControllerServices(controllerServiceDefinition); + bundle.setComponentManifest(componentManifest); + return bundle; + } + + private ProcessorDefinition processorDefinition(String processorType, Map<String, VersionedPropertyDescriptor> propertyDescriptors) { + ProcessorDefinition processorDefinition = new ProcessorDefinition(); + processorDefinition.setType(processorType); + processorDefinition.setPropertyDescriptors( + convertVersionedPropertyDescriptorMapToPropertyDescriptorMap(propertyDescriptors) + ); + return processorDefinition; + } + + private ControllerServiceDefinition controllerServiceDefinition(String controllerServiceType, Map<String, VersionedPropertyDescriptor> propertyDescriptors) { + ControllerServiceDefinition controllerServiceDefinition = new ControllerServiceDefinition(); + controllerServiceDefinition.setType(controllerServiceType); + controllerServiceDefinition.setPropertyDescriptors( + convertVersionedPropertyDescriptorMapToPropertyDescriptorMap(propertyDescriptors) + ); + return controllerServiceDefinition; + } + + private LinkedHashMap<String, PropertyDescriptor> convertVersionedPropertyDescriptorMapToPropertyDescriptorMap(Map<String, VersionedPropertyDescriptor> propertyDescriptors) { + return propertyDescriptors.values() + .stream() + .map(propertyDescriptor -> entry(propertyDescriptor.getName(), convertPropertyDescriptor(propertyDescriptor))) + .collect(toMap(Entry::getKey, Entry::getValue, (l, r) -> l, LinkedHashMap::new)); + } + + private PropertyDescriptor convertPropertyDescriptor(VersionedPropertyDescriptor versionedPropertyDescriptor) { + PropertyDescriptor propertyDescriptor = new PropertyDescriptor(); + propertyDescriptor.setName(versionedPropertyDescriptor.getName()); + propertyDescriptor.setSensitive(versionedPropertyDescriptor.isSensitive()); + return propertyDescriptor; + } + + private void assertSensitiveFlowComponentPropertiesAreEncoded(VersionedDataflow encryptedFlow) { + Stream.of( + encryptedFlow.getRootGroup().getProcessors(), + encryptedFlow.getRootGroup().getControllerServices() + ) + .flatMap(Set::stream) + .map(VersionedConfigurableExtension::getProperties) + .flatMap(properties -> properties.entrySet().stream()) + .forEach(propertyEntry -> { + if (propertyEntry.getKey().startsWith(SENSITIVE_PROPERTY_NAME_PREFIX)) { + assertTrue(propertyEntry.getValue().startsWith(FlowSerializer.ENC_PREFIX)); + } else { + assertFalse(propertyEntry.getValue().startsWith(FlowSerializer.ENC_PREFIX)); + } + } + ); + } +} diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java index 0ef45a92c2..0915d87ce6 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java @@ -46,6 +46,8 @@ import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_TR import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_TRUSTSTORE_PASSWORD; import static org.apache.nifi.minifi.commons.api.MiNiFiProperties.C2_SECURITY_TRUSTSTORE_TYPE; import static org.apache.nifi.util.NiFiProperties.FLOW_CONFIGURATION_FILE; +import static org.apache.nifi.util.NiFiProperties.SENSITIVE_PROPS_ALGORITHM; +import static org.apache.nifi.util.NiFiProperties.SENSITIVE_PROPS_KEY; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; @@ -87,6 +89,7 @@ import org.apache.nifi.c2.protocol.api.FlowQueueStatus; import org.apache.nifi.c2.serializer.C2JacksonSerializer; import org.apache.nifi.controller.FlowController; import org.apache.nifi.diagnostics.SystemDiagnostics; +import org.apache.nifi.encrypt.PropertyEncryptorBuilder; import org.apache.nifi.extension.manifest.parser.ExtensionManifestParser; import org.apache.nifi.extension.manifest.parser.jaxb.JAXBExtensionManifestParser; import org.apache.nifi.manifest.RuntimeManifestService; @@ -97,7 +100,10 @@ import org.apache.nifi.minifi.c2.command.TransferDebugCommandHelper; import org.apache.nifi.minifi.c2.command.UpdateAssetCommandHelper; import org.apache.nifi.minifi.c2.command.UpdatePropertiesPropertyProvider; import org.apache.nifi.minifi.commons.api.MiNiFiCommandState; -import org.apache.nifi.minifi.commons.service.FlowEnrichService; +import org.apache.nifi.minifi.commons.service.FlowPropertyEncryptor; +import org.apache.nifi.minifi.commons.service.StandardFlowEnrichService; +import org.apache.nifi.minifi.commons.service.StandardFlowPropertyEncryptor; +import org.apache.nifi.minifi.commons.service.StandardFlowSerDeService; import org.apache.nifi.nar.ExtensionManagerHolder; import org.apache.nifi.services.FlowService; import org.apache.nifi.util.FormatUtils; @@ -214,9 +220,13 @@ public class C2NifiClientService { UpdatePropertiesPropertyProvider updatePropertiesPropertyProvider = new UpdatePropertiesPropertyProvider(bootstrapConfigFileLocation); PropertiesPersister propertiesPersister = new PropertiesPersister(updatePropertiesPropertyProvider, bootstrapConfigFileLocation); - FlowEnrichService flowEnrichService = new FlowEnrichService(niFiProperties); - UpdateConfigurationStrategy updateConfigurationStrategy = - new DefaultUpdateConfigurationStrategy(flowController, flowService, flowEnrichService, niFiProperties.getProperty(FLOW_CONFIGURATION_FILE)); + FlowPropertyEncryptor flowPropertyEncryptor = new StandardFlowPropertyEncryptor( + new PropertyEncryptorBuilder(niFiProperties.getProperty(SENSITIVE_PROPS_KEY)) + .setAlgorithm(niFiProperties.getProperty(SENSITIVE_PROPS_ALGORITHM)).build(), + runtimeManifestService.getManifest()); + UpdateConfigurationStrategy updateConfigurationStrategy = new DefaultUpdateConfigurationStrategy(flowController, flowService, + new StandardFlowEnrichService(niFiProperties), flowPropertyEncryptor, + StandardFlowSerDeService.defaultInstance(), niFiProperties.getProperty(FLOW_CONFIGURATION_FILE)); return new C2OperationHandlerProvider(List.of( new UpdateConfigurationOperationHandler(client, flowIdHolder, updateConfigurationStrategy, emptyOperandPropertiesProvider), diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java index 1db1549d6c..a46122fa90 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java @@ -47,9 +47,12 @@ import org.apache.nifi.controller.ComponentNode; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.flow.FlowManager; +import org.apache.nifi.controller.flow.VersionedDataflow; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.minifi.commons.service.FlowEnrichService; +import org.apache.nifi.minifi.commons.service.FlowPropertyEncryptor; +import org.apache.nifi.minifi.commons.service.FlowSerDeService; import org.apache.nifi.services.FlowService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,15 +69,20 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt private final FlowController flowController; private final FlowService flowService; private final FlowEnrichService flowEnrichService; + private final FlowPropertyEncryptor flowPropertyEncryptor; + private final FlowSerDeService flowSerDeService; private final Path flowConfigurationFile; private final Path backupFlowConfigurationFile; private final Path rawFlowConfigurationFile; private final Path backupRawFlowConfigurationFile; - public DefaultUpdateConfigurationStrategy(FlowController flowController, FlowService flowService, FlowEnrichService flowEnrichService, String flowConfigurationFile) { + public DefaultUpdateConfigurationStrategy(FlowController flowController, FlowService flowService, FlowEnrichService flowEnrichService, + FlowPropertyEncryptor flowPropertyEncryptor, FlowSerDeService flowSerDeService, String flowConfigurationFile) { this.flowController = flowController; this.flowService = flowService; this.flowEnrichService = flowEnrichService; + this.flowPropertyEncryptor = flowPropertyEncryptor; + this.flowSerDeService = flowSerDeService; Path flowConfigurationFilePath = Path.of(flowConfigurationFile).toAbsolutePath(); this.flowConfigurationFile = flowConfigurationFilePath; this.backupFlowConfigurationFile = Path.of(flowConfigurationFilePath + BACKUP_EXTENSION); @@ -90,12 +98,21 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt } try { - byte[] enrichedFlowCandidate = flowEnrichService.enrichFlow(rawFlow); + VersionedDataflow rawDataFlow = flowSerDeService.deserialize(rawFlow); + + VersionedDataflow propertyEncryptedRawDataFlow = flowPropertyEncryptor.encryptSensitiveProperties(rawDataFlow); + byte[] serializedPropertyEncryptedRawDataFlow = flowSerDeService.serialize(propertyEncryptedRawDataFlow); + VersionedDataflow enrichedFlowCandidate = flowEnrichService.enrichFlow(propertyEncryptedRawDataFlow); + byte[] serializedEnrichedFlowCandidate = flowSerDeService.serialize(enrichedFlowCandidate); + backup(flowConfigurationFile, backupFlowConfigurationFile); backup(rawFlowConfigurationFile, backupRawFlowConfigurationFile); - persist(enrichedFlowCandidate, flowConfigurationFile, true); - persist(rawFlow, rawFlowConfigurationFile, false); + + persist(serializedPropertyEncryptedRawDataFlow, rawFlowConfigurationFile, false); + persist(serializedEnrichedFlowCandidate, flowConfigurationFile, true); + reloadFlow(); + return true; } catch (IllegalStateException e) { LOGGER.error("Configuration update failed. Reverting and reloading previous flow", e); diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategyTest.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategyTest.java index 82fee33fa2..87edfead9c 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategyTest.java +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategyTest.java @@ -47,8 +47,11 @@ import org.apache.commons.io.FilenameUtils; import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.flow.FlowManager; +import org.apache.nifi.controller.flow.VersionedDataflow; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.minifi.commons.service.FlowEnrichService; +import org.apache.nifi.minifi.commons.service.FlowPropertyEncryptor; +import org.apache.nifi.minifi.commons.service.FlowSerDeService; import org.apache.nifi.services.FlowService; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -64,8 +67,13 @@ public class DefaultUpdateConfigurationStrategyTest { private static final byte[] ORIGINAL_RAW_FLOW_CONFIG_CONTENT = "original_raw_content".getBytes(UTF_8); private static final byte[] ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT = "original_enriched_content".getBytes(UTF_8); + private static final byte[] NEW_RAW_FLOW_CONFIG_CONTENT = "new_raw_content".getBytes(UTF_8); + private static final VersionedDataflow NEW_RAW_FLOW_CONFIG = new VersionedDataflow(); + private static final byte[] NEW_ENCRYPTED_FLOW_CONFIG_CONTENT = "original_encrypted_content".getBytes(UTF_8); + private static final VersionedDataflow NEW_ENCRYPTED_FLOW_CONFIG = new VersionedDataflow(); private static final byte[] NEW_ENRICHED_FLOW_CONFIG_CONTENT = "new_enriched_content".getBytes(UTF_8); + private static final VersionedDataflow NEW_ENRICHED_FLOW_CONFIG = new VersionedDataflow(); @TempDir private File tempDir; @@ -77,6 +85,10 @@ public class DefaultUpdateConfigurationStrategyTest { @Mock private FlowEnrichService mockFlowEnrichService; @Mock + private FlowPropertyEncryptor mockFlowPropertyEncryptor; + @Mock + private FlowSerDeService mockFlowSerDeService; + @Mock private FlowManager mockFlowManager; @Mock private ProcessGroup mockProcessGroup; @@ -96,7 +108,8 @@ public class DefaultUpdateConfigurationStrategyTest { rawFlowConfigurationFile = flowConfigurationFile.getParent().resolve(flowConfigurationFileBaseName + RAW_EXTENSION); backupRawFlowConfigurationFile = flowConfigurationFile.getParent().resolve(flowConfigurationFileBaseName + BACKUP_EXTENSION + RAW_EXTENSION); - testUpdateConfigurationStrategy = new DefaultUpdateConfigurationStrategy(mockFlowController, mockFlowService, mockFlowEnrichService, flowConfigurationFile.toString()); + testUpdateConfigurationStrategy = new DefaultUpdateConfigurationStrategy(mockFlowController, mockFlowService, mockFlowEnrichService, + mockFlowPropertyEncryptor, mockFlowSerDeService, flowConfigurationFile.toString()); writeGzipFile(flowConfigurationFile, ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT); writePlainTextFile(rawFlowConfigurationFile, ORIGINAL_RAW_FLOW_CONFIG_CONTENT); @@ -105,7 +118,11 @@ public class DefaultUpdateConfigurationStrategyTest { @Test public void testFlowIsUpdatedAndBackupsAreClearedUp() throws IOException { // given - when(mockFlowEnrichService.enrichFlow(NEW_RAW_FLOW_CONFIG_CONTENT)).thenReturn(NEW_ENRICHED_FLOW_CONFIG_CONTENT); + when(mockFlowSerDeService.deserialize(NEW_RAW_FLOW_CONFIG_CONTENT)).thenReturn(NEW_RAW_FLOW_CONFIG); + when(mockFlowPropertyEncryptor.encryptSensitiveProperties(NEW_RAW_FLOW_CONFIG)).thenReturn(NEW_ENCRYPTED_FLOW_CONFIG); + when(mockFlowSerDeService.serialize(NEW_ENCRYPTED_FLOW_CONFIG)).thenReturn(NEW_ENCRYPTED_FLOW_CONFIG_CONTENT); + when(mockFlowEnrichService.enrichFlow(NEW_ENCRYPTED_FLOW_CONFIG)).thenReturn(NEW_ENRICHED_FLOW_CONFIG); + when(mockFlowSerDeService.serialize(NEW_ENRICHED_FLOW_CONFIG)).thenReturn(NEW_ENRICHED_FLOW_CONFIG_CONTENT); when(mockFlowController.getFlowManager()).thenReturn(mockFlowManager); when(mockFlowManager.getRootGroup()).thenReturn(mockProcessGroup); @@ -117,7 +134,7 @@ public class DefaultUpdateConfigurationStrategyTest { assertTrue(exists(flowConfigurationFile)); assertTrue(exists(rawFlowConfigurationFile)); assertArrayEquals(NEW_ENRICHED_FLOW_CONFIG_CONTENT, readGzipFile(flowConfigurationFile)); - assertArrayEquals(NEW_RAW_FLOW_CONFIG_CONTENT, readPlainTextFile(rawFlowConfigurationFile)); + assertArrayEquals(NEW_ENCRYPTED_FLOW_CONFIG_CONTENT, readPlainTextFile(rawFlowConfigurationFile)); assertFalse(exists(backupFlowConfigurationFile)); assertFalse(exists(backupRawFlowConfigurationFile)); verify(mockFlowService, times(1)).load(null); @@ -128,7 +145,11 @@ public class DefaultUpdateConfigurationStrategyTest { @Test public void testFlowIsRevertedInCaseOfAnyErrorAndBackupsAreClearedUp() throws IOException { // given - when(mockFlowEnrichService.enrichFlow(NEW_RAW_FLOW_CONFIG_CONTENT)).thenReturn(NEW_ENRICHED_FLOW_CONFIG_CONTENT); + when(mockFlowSerDeService.deserialize(NEW_RAW_FLOW_CONFIG_CONTENT)).thenReturn(NEW_RAW_FLOW_CONFIG); + when(mockFlowPropertyEncryptor.encryptSensitiveProperties(NEW_RAW_FLOW_CONFIG)).thenReturn(NEW_ENCRYPTED_FLOW_CONFIG); + when(mockFlowSerDeService.serialize(NEW_ENCRYPTED_FLOW_CONFIG)).thenReturn(NEW_ENCRYPTED_FLOW_CONFIG_CONTENT); + when(mockFlowEnrichService.enrichFlow(NEW_ENCRYPTED_FLOW_CONFIG)).thenReturn(NEW_ENRICHED_FLOW_CONFIG); + when(mockFlowSerDeService.serialize(NEW_ENRICHED_FLOW_CONFIG)).thenReturn(NEW_ENRICHED_FLOW_CONFIG_CONTENT); when(mockFlowController.getFlowManager()).thenReturn(mockFlowManager); when(mockFlowManager.getRootGroup()).thenReturn(mockProcessGroup); doThrow(new IOException()).when(mockFlowService).load(null);