This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.19 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit f1193a11ff6c53cb25d59b100571a88936d338ee Author: Mark Payne <marka...@hotmail.com> AuthorDate: Wed Nov 30 18:08:28 2022 -0500 NIFI-10918: When fetching a flow from a Flow Registry, if it references any 'internal versioned flows' instead of requiring that we have a client configured for the appropriate URL, attempt to fetch the flow from each client. We will start with the clients that do report that they can handle the URL but will try others as well. As soon as we successfully fetch the flow, we stop. NIFI-10918: Fixed checkstyle violations This closes #6736 Signed-off-by: Bence Simon <bsi...@apache.org> --- .../flow/StandardFlowRegistryClientNode.java | 44 +++++++++++++++++----- .../apache/nifi/util/FlowDifferenceFilters.java | 27 +++++++++++++ 2 files changed, 61 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java index c2a5462ebc..98a0925880 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java @@ -46,11 +46,15 @@ import org.apache.nifi.parameter.ParameterLookup; import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.util.CharacterFilterUtils; import org.apache.nifi.util.file.classloader.ClassLoaderUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URL; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; @@ -58,6 +62,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; public final class StandardFlowRegistryClientNode extends AbstractComponentNode implements FlowRegistryClientNode { + private static final Logger logger = LoggerFactory.getLogger(StandardFlowRegistryClientNode.class); private final FlowManager flowManager; private final Authorizable parent; @@ -299,11 +304,7 @@ public final class StandardFlowRegistryClientNode extends AbstractComponentNode final VersionedFlowCoordinates coordinates = group.getVersionedFlowCoordinates(); if (coordinates != null) { - final String storageLocation = coordinates.getStorageLocation() == null ? coordinates.getRegistryUrl() : coordinates.getStorageLocation(); - final String bucketId = coordinates.getBucketId(); - final String flowId = coordinates.getFlowId(); - final int version = coordinates.getVersion(); - final RegisteredFlowSnapshot snapshot = getRegistryForInternalFlow(storageLocation).getFlowContents(context, bucketId, flowId, version, true); + final RegisteredFlowSnapshot snapshot = fetchFlowContents(context, coordinates, true); final VersionedProcessGroup contents = snapshot.getFlowContents(); group.setVersionedFlowCoordinates(coordinates); @@ -332,14 +333,37 @@ public final class StandardFlowRegistryClientNode extends AbstractComponentNode } } - private FlowRegistryClientNode getRegistryForInternalFlow(final String storageLocation) throws FlowRegistryException, IOException { - for (FlowRegistryClientNode registryClientNode : flowManager.getAllFlowRegistryClients()) { - if (registryClientNode.isStorageLocationApplicable(storageLocation)) { - return registryClientNode; + private RegisteredFlowSnapshot fetchFlowContents(final FlowRegistryClientUserContext context, final VersionedFlowCoordinates coordinates, + final boolean fetchRemoteFlows) throws FlowRegistryException { + + final String storageLocation = coordinates.getStorageLocation() == null ? coordinates.getRegistryUrl() : coordinates.getStorageLocation(); + final String bucketId = coordinates.getBucketId(); + final String flowId = coordinates.getFlowId(); + final int version = coordinates.getVersion(); + + final List<FlowRegistryClientNode> clientNodes = getRegistryClientsForInternalFlow(storageLocation); + for (final FlowRegistryClientNode clientNode : clientNodes) { + try { + logger.debug("Attempting to fetch flow for Bucket [{}] Flow [{}] Version [{}] using {}", bucketId, flowId, version, clientNode); + final RegisteredFlowSnapshot snapshot = clientNode.getFlowContents(context, bucketId, flowId, version, fetchRemoteFlows); + coordinates.setRegistryId(clientNode.getIdentifier()); + + logger.debug("Successfully fetched flow for Bucket [{}] Flow [{}] Version [{}] using {}", bucketId, flowId, version, clientNode); + return snapshot; + } catch (final Exception e) { + logger.debug("Failed to fetch flow", e); } } - throw new FlowRegistryException(String.format("No applicable registry found for storage location %s", storageLocation)); + throw new FlowRegistryException(String.format("Could not find any Registry Client that was able to fetch flow with Bucket [%s] Flow [%s] Version [%s] with Storage Location [%s]", + bucketId, flowId, version, storageLocation)); + } + + private List<FlowRegistryClientNode> getRegistryClientsForInternalFlow(final String storageLocation) { + // Sort clients based on whether or not they believe they are applicable for the given storage location + final List<FlowRegistryClientNode> matchingClients = new ArrayList<>(flowManager.getAllFlowRegistryClients()); + matchingClients.sort(Comparator.comparing(client -> client.isStorageLocationApplicable(storageLocation) ? -1 : 1)); + return matchingClients; } private RegisteredFlowSnapshot createRegisteredFlowSnapshot( diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java index 512bc3696e..b052af8110 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java @@ -72,9 +72,36 @@ public class FlowDifferenceFilters { || isNewRetryConfigWithDefaultValue(difference, flowManager) || isNewZIndexLabelConfigWithDefaultValue(difference, flowManager) || isNewZIndexConnectionConfigWithDefaultValue(difference, flowManager) + || isRegistryUrlChange(difference) || isParameterContextChange(difference); } + // The Registry URL may change if, for instance, a registry is moved to a new host, or is made secure, the port changes, etc. + // Since this can be handled by the client anyway, there's no need to flag this as a 'local modification' + private static boolean isRegistryUrlChange(final FlowDifference difference) { + if (difference.getDifferenceType() != DifferenceType.VERSIONED_FLOW_COORDINATES_CHANGED) { + return false; + } + if (!(difference.getValueA() instanceof VersionedFlowCoordinates)) { + return false; + } + if (!(difference.getValueB() instanceof VersionedFlowCoordinates)) { + return false; + } + + final VersionedFlowCoordinates coordinatesA = (VersionedFlowCoordinates) difference.getValueA(); + final VersionedFlowCoordinates coordinatesB = (VersionedFlowCoordinates) difference.getValueB(); + + if (Objects.equals(coordinatesA.getBucketId(), coordinatesB.getBucketId()) + && Objects.equals(coordinatesA.getFlowId(), coordinatesB.getFlowId()) + && Objects.equals(coordinatesA.getVersion(), coordinatesB.getVersion())) { + + return true; + } + + return false; + } + /** * Predicate that returns true if the difference is NOT a name change on a public port (i.e. VersionedPort that allows remote access). */