This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.19
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit f1193a11ff6c53cb25d59b100571a88936d338ee
Author: Mark Payne <marka...@hotmail.com>
AuthorDate: Wed Nov 30 18:08:28 2022 -0500

    NIFI-10918: When fetching a flow from a Flow Registry, if it references any 
'internal versioned flows' instead of requiring that we have a client 
configured for the appropriate URL, attempt to fetch the flow from each client. 
We will start with the clients that do report that they can handle the URL but 
will try others as well. As soon as we successfully fetch the flow, we stop.
    
    NIFI-10918: Fixed checkstyle violations
    
    This closes #6736
    Signed-off-by: Bence Simon <bsi...@apache.org>
---
 .../flow/StandardFlowRegistryClientNode.java       | 44 +++++++++++++++++-----
 .../apache/nifi/util/FlowDifferenceFilters.java    | 27 +++++++++++++
 2 files changed, 61 insertions(+), 10 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java
index c2a5462ebc..98a0925880 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java
@@ -46,11 +46,15 @@ import org.apache.nifi.parameter.ParameterLookup;
 import org.apache.nifi.registry.ComponentVariableRegistry;
 import org.apache.nifi.util.CharacterFilterUtils;
 import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -58,6 +62,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 public final class StandardFlowRegistryClientNode extends 
AbstractComponentNode implements FlowRegistryClientNode {
+    private static final Logger logger = 
LoggerFactory.getLogger(StandardFlowRegistryClientNode.class);
 
     private final FlowManager flowManager;
     private final Authorizable parent;
@@ -299,11 +304,7 @@ public final class StandardFlowRegistryClientNode extends 
AbstractComponentNode
         final VersionedFlowCoordinates coordinates = 
group.getVersionedFlowCoordinates();
 
         if (coordinates != null) {
-            final String storageLocation = coordinates.getStorageLocation() == 
null ? coordinates.getRegistryUrl() : coordinates.getStorageLocation();
-            final String bucketId = coordinates.getBucketId();
-            final String flowId = coordinates.getFlowId();
-            final int version = coordinates.getVersion();
-            final RegisteredFlowSnapshot snapshot =  
getRegistryForInternalFlow(storageLocation).getFlowContents(context, bucketId, 
flowId, version, true);
+            final RegisteredFlowSnapshot snapshot = fetchFlowContents(context, 
coordinates, true);
             final VersionedProcessGroup contents = snapshot.getFlowContents();
 
             group.setVersionedFlowCoordinates(coordinates);
@@ -332,14 +333,37 @@ public final class StandardFlowRegistryClientNode extends 
AbstractComponentNode
         }
     }
 
-    private FlowRegistryClientNode getRegistryForInternalFlow(final String 
storageLocation) throws FlowRegistryException, IOException {
-        for (FlowRegistryClientNode registryClientNode : 
flowManager.getAllFlowRegistryClients()) {
-            if 
(registryClientNode.isStorageLocationApplicable(storageLocation)) {
-                return registryClientNode;
+    private RegisteredFlowSnapshot fetchFlowContents(final 
FlowRegistryClientUserContext context, final VersionedFlowCoordinates 
coordinates,
+                                                     final boolean 
fetchRemoteFlows) throws FlowRegistryException {
+
+        final String storageLocation = coordinates.getStorageLocation() == 
null ? coordinates.getRegistryUrl() : coordinates.getStorageLocation();
+        final String bucketId = coordinates.getBucketId();
+        final String flowId = coordinates.getFlowId();
+        final int version = coordinates.getVersion();
+
+        final List<FlowRegistryClientNode> clientNodes = 
getRegistryClientsForInternalFlow(storageLocation);
+        for (final FlowRegistryClientNode clientNode : clientNodes) {
+            try {
+                logger.debug("Attempting to fetch flow for Bucket [{}] Flow 
[{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
+                final RegisteredFlowSnapshot snapshot = 
clientNode.getFlowContents(context, bucketId, flowId, version, 
fetchRemoteFlows);
+                coordinates.setRegistryId(clientNode.getIdentifier());
+
+                logger.debug("Successfully fetched flow for Bucket [{}] Flow 
[{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
+                return snapshot;
+            } catch (final Exception e) {
+                logger.debug("Failed to fetch flow", e);
             }
         }
 
-        throw new FlowRegistryException(String.format("No applicable registry 
found for storage location %s", storageLocation));
+        throw new FlowRegistryException(String.format("Could not find any 
Registry Client that was able to fetch flow with Bucket [%s] Flow [%s] Version 
[%s] with Storage Location [%s]",
+            bucketId, flowId, version, storageLocation));
+    }
+
+    private List<FlowRegistryClientNode> 
getRegistryClientsForInternalFlow(final String storageLocation) {
+        // Sort clients based on whether or not they believe they are 
applicable for the given storage location
+        final List<FlowRegistryClientNode> matchingClients = new 
ArrayList<>(flowManager.getAllFlowRegistryClients());
+        matchingClients.sort(Comparator.comparing(client -> 
client.isStorageLocationApplicable(storageLocation) ? -1 : 1));
+        return matchingClients;
     }
 
     private RegisteredFlowSnapshot createRegisteredFlowSnapshot(
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
index 512bc3696e..b052af8110 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
@@ -72,9 +72,36 @@ public class FlowDifferenceFilters {
             || isNewRetryConfigWithDefaultValue(difference, flowManager)
             || isNewZIndexLabelConfigWithDefaultValue(difference, flowManager)
             || isNewZIndexConnectionConfigWithDefaultValue(difference, 
flowManager)
+            || isRegistryUrlChange(difference)
             || isParameterContextChange(difference);
     }
 
+    // The Registry URL may change if, for instance, a registry is moved to a 
new host, or is made secure, the port changes, etc.
+    // Since this can be handled by the client anyway, there's no need to flag 
this as a 'local modification'
+    private static boolean isRegistryUrlChange(final FlowDifference 
difference) {
+        if (difference.getDifferenceType() != 
DifferenceType.VERSIONED_FLOW_COORDINATES_CHANGED) {
+            return false;
+        }
+        if (!(difference.getValueA() instanceof VersionedFlowCoordinates)) {
+            return false;
+        }
+        if (!(difference.getValueB() instanceof VersionedFlowCoordinates)) {
+            return false;
+        }
+
+        final VersionedFlowCoordinates coordinatesA = 
(VersionedFlowCoordinates) difference.getValueA();
+        final VersionedFlowCoordinates coordinatesB = 
(VersionedFlowCoordinates) difference.getValueB();
+
+        if (Objects.equals(coordinatesA.getBucketId(), 
coordinatesB.getBucketId())
+            && Objects.equals(coordinatesA.getFlowId(), 
coordinatesB.getFlowId())
+            && Objects.equals(coordinatesA.getVersion(), 
coordinatesB.getVersion())) {
+
+            return true;
+        }
+
+        return false;
+    }
+
     /**
      * Predicate that returns true if the difference is NOT a name change on a 
public port (i.e. VersionedPort that allows remote access).
      */

Reply via email to