This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.16 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 5113640b27423f0619a20f69a76f3ba1d085e498 Author: exceptionfactory <exceptionfact...@apache.org> AuthorDate: Thu Jun 9 15:54:30 2022 -0500 NIFI-10096 Corrected nested inherited Parameter Context loading Signed-off-by: Joe Gresock <jgres...@gmail.com> This closes #6114. --- .../serialization/VersionedFlowSynchronizer.java | 73 +++++++++++++++------- 1 file changed, 50 insertions(+), 23 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java index 757ada77a2..17a91b0793 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java @@ -111,6 +111,7 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; @@ -151,7 +152,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer { // determine if the controller already had flow sync'd to it final boolean flowAlreadySynchronized = controller.isFlowSynchronized(); - logger.info("Synching FlowController with proposed flow: Controller Already Synchronized = {}", flowAlreadySynchronized); + logger.info("Synchronizing FlowController with proposed flow: Controller Already Synchronized = {}", flowAlreadySynchronized); // If bundle update strategy is configured to allow for compatible bundles, update any components to use compatible bundles if // the exact bundle does not exist. @@ -529,50 +530,71 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer { } private void inheritParameterContexts(final FlowController controller, final VersionedDataflow dataflow) { - final ParameterContextManager parameterContextManager = controller.getFlowManager().getParameterContextManager(); - - // Add any parameter context that doesn't yet exist. We have to add all contexts before updating them because - // one context may reference another. We need that reference to exist before we try to create the reference. - final Map<String, ParameterContext> parameterContextsByName = parameterContextManager.getParameterContextNameMapping(); - controller.getFlowManager().withParameterContextResolution(() -> { - for (final VersionedParameterContext versionedParameterContext : dataflow.getParameterContexts()) { - inheritParameterContext(versionedParameterContext, controller.getFlowManager(), parameterContextsByName); + final List<VersionedParameterContext> parameterContexts = dataflow.getParameterContexts(); + + // Build mapping of name to context for resolution of inherited contexts + final Map<String, VersionedParameterContext> namedParameterContexts = parameterContexts.stream() + .collect( + Collectors.toMap(VersionedParameterContext::getName, Function.identity()) + ); + for (final VersionedParameterContext versionedParameterContext : parameterContexts) { + inheritParameterContext(versionedParameterContext, controller.getFlowManager(), namedParameterContexts); } }); } - private void inheritParameterContext(final VersionedParameterContext versionedParameterContext, final FlowManager flowManager, final Map<String, ParameterContext> parameterContextsByName) { - final ParameterContext existingContext = parameterContextsByName.get(versionedParameterContext.getName()); + private void inheritParameterContext( + final VersionedParameterContext versionedParameterContext, + final FlowManager flowManager, + final Map<String, VersionedParameterContext> namedParameterContexts + ) { + final ParameterContextManager contextManager = flowManager.getParameterContextManager(); + final ParameterContext existingContext = contextManager.getParameterContextNameMapping().get(versionedParameterContext.getName()); if (existingContext == null) { - addParameterContext(versionedParameterContext, flowManager); + addParameterContext(versionedParameterContext, flowManager, namedParameterContexts); } else { - updateParameterContext(versionedParameterContext, existingContext, flowManager); + updateParameterContext(versionedParameterContext, existingContext, flowManager, namedParameterContexts); } } - private void addParameterContext(final VersionedParameterContext versionedParameterContext, final FlowManager flowManager) { + private void addParameterContext( + final VersionedParameterContext versionedParameterContext, + final FlowManager flowManager, + final Map<String, VersionedParameterContext> namedParameterContexts + ) { final Map<String, Parameter> parameters = createParameterMap(versionedParameterContext); final ParameterContextManager contextManager = flowManager.getParameterContextManager(); - final List<String> referenceIds = findReferencedParameterContextIds(versionedParameterContext, contextManager); + final List<String> referenceIds = findReferencedParameterContextIds(versionedParameterContext, contextManager, namedParameterContexts); flowManager.createParameterContext(versionedParameterContext.getInstanceIdentifier(), versionedParameterContext.getName(), parameters, referenceIds); logger.info("Added Parameter Context {}", versionedParameterContext.getName()); } - private List<String> findReferencedParameterContextIds(final VersionedParameterContext versionedParameterContext, final ParameterContextManager contextManager) { + private List<String> findReferencedParameterContextIds( + final VersionedParameterContext versionedParameterContext, + final ParameterContextManager contextManager, + final Map<String, VersionedParameterContext> namedParameterContexts + ) { final List<String> referenceIds = new ArrayList<>(); final Map<String, ParameterContext> parameterContextsByName = contextManager.getParameterContextNameMapping(); if (versionedParameterContext.getInheritedParameterContexts() != null) { for (final String inheritedContextName : versionedParameterContext.getInheritedParameterContexts()) { - final ParameterContext existingContext = parameterContextsByName.get(inheritedContextName); - if (existingContext == null) { - logger.warn("Parameter Context {} inherits from Parameter Context {} but cannot find a Parameter Context with name {}", - versionedParameterContext.getName(), inheritedContextName, inheritedContextName); + // Lookup inherited Parameter Context Name in Versioned Data Flow + final VersionedParameterContext inheritedParameterContext = namedParameterContexts.get(inheritedContextName); + if (inheritedParameterContext == null) { + // Lookup inherited Parameter Context Name in Parameter Context Manager + final ParameterContext existingContext = parameterContextsByName.get(inheritedContextName); + if (existingContext == null) { + logger.warn("Parameter Context {} inherits from Parameter Context {} but cannot find a Parameter Context with name {}", + versionedParameterContext.getName(), inheritedContextName, inheritedContextName); + } else { + referenceIds.add(existingContext.getIdentifier()); + } } else { - referenceIds.add(existingContext.getIdentifier()); + referenceIds.add(inheritedParameterContext.getInstanceIdentifier()); } } } @@ -607,7 +629,12 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer { return parameters; } - private void updateParameterContext(final VersionedParameterContext versionedParameterContext, final ParameterContext parameterContext, final FlowManager flowManager) { + private void updateParameterContext( + final VersionedParameterContext versionedParameterContext, + final ParameterContext parameterContext, + final FlowManager flowManager, + final Map<String, VersionedParameterContext> namedParameterContexts + ) { final Map<String, Parameter> parameters = createParameterMap(versionedParameterContext); final Map<String, String> currentValues = new HashMap<>(); @@ -648,7 +675,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer { } final ParameterContextManager contextManager = flowManager.getParameterContextManager(); - final List<String> inheritedContextIds = findReferencedParameterContextIds(versionedParameterContext, contextManager); + final List<String> inheritedContextIds = findReferencedParameterContextIds(versionedParameterContext, contextManager, namedParameterContexts); final List<ParameterContext> referencedContexts = inheritedContextIds.stream() .map(contextManager::getParameterContext) .collect(Collectors.toList());