This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f27ace1ccf NIFI-12016: This closes #7662. Allow use of compatible NAR 
bundles when loading flow from cluster connection; when determining what 
bundles are compatible, consider not just any bundle if it's the only one but 
also any bundle whose version matches the framework version so that when NiFi 
is upgraded, it is handled more gracefully.
f27ace1ccf is described below

commit f27ace1ccf36cb0f7a958462499c388652dde1c0
Author: Mark Payne <marka...@hotmail.com>
AuthorDate: Wed Aug 30 17:39:31 2023 -0400

    NIFI-12016: This closes #7662. Allow use of compatible NAR bundles when 
loading flow from cluster connection; when determining what bundles are 
compatible, consider not just any bundle if it's the only one but also any 
bundle whose version matches the framework version so that when NiFi is 
upgraded, it is handled more gracefully.
    
    Signed-off-by: Joseph Witt <joew...@apache.org>
---
 .../java/org/apache/nifi/util/BundleUtils.java     |  44 ++++++---
 .../java/org/apache/nifi/util/TestBundleUtils.java | 100 +++++++++++++++++++
 .../nifi/controller/StandardFlowService.java       |   2 +-
 .../system/clustering/FlowSynchronizationIT.java   | 109 ---------------------
 4 files changed, 133 insertions(+), 122 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/BundleUtils.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/BundleUtils.java
index d0c428f263..d7ad003f62 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/BundleUtils.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/BundleUtils.java
@@ -20,22 +20,40 @@ import org.apache.nifi.bundle.Bundle;
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.flow.VersionedProcessGroup;
 import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarClassLoadersHolder;
 import org.apache.nifi.nar.PythonBundle;
 import org.apache.nifi.web.api.dto.BundleDTO;
 
 import java.util.List;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 /**
  * Utility class for Bundles.
  */
 public final class BundleUtils {
-    private static Optional<BundleCoordinate> findOptionalBundleForType(final 
ExtensionManager extensionManager, final String type, final BundleCoordinate 
desiredCoordinate) {
+    static Optional<BundleCoordinate> findOptionalBundleForType(final 
ExtensionManager extensionManager, final String type, final Bundle 
frameworkBundle) {
         final List<Bundle> bundles = extensionManager.getBundles(type);
         if (bundles.size() == 1) {
             return 
Optional.of(bundles.get(0).getBundleDetails().getCoordinate());
         }
+
+        // All NARs that are packaged with NiFi will have the same bundle 
coordinate as the NiFi framework bundle.
+        // During an upgrade, it's fairly common to have two versions of a 
NAR: the version shipped with NiFi and another version, perhaps to maintain
+        // backward compatibility to because the new version behaves some 
different way and the user wants the old behavior in some instances, etc.
+        // In this case, the user may have two versions. For example, version 
2.2.0 and 2.4.0 while NiFi is at version 2.4.0.
+        // Now, during upgrade to 2.4.1, there will no longer be a 2.4.0 
available. We want to be smart enough to realize that those extension using 
version
+        // 2.2.0 stay there but those using 2.4.0 upgrade to 2.4.1.
+        // To do this, we always first match on the exact version but this 
method is called when there's no exact match. So those marked 2.2.0 won't 
arrive here.
+        // But for those extensions that were using 2.4.0, we want to now look 
for version 2.4.1 - I.e., the one with the same version as the framework. If we
+        // find that version, then we want to use it. This helps to smooth out 
the upgrade process even when users have multiple versions of a given NAR.
+        final String frameworkVersion = 
frameworkBundle.getBundleDetails().getCoordinate().getVersion();
+        for (final Bundle bundle : bundles) {
+            final String componentVersion = 
bundle.getBundleDetails().getCoordinate().getVersion();
+            if (frameworkVersion.equals(componentVersion)) {
+                return Optional.of(bundle.getBundleDetails().getCoordinate());
+            }
+        }
+
         return Optional.empty();
     }
 
@@ -71,7 +89,10 @@ public final class BundleUtils {
                 throw new IllegalStateException(String.format("%s from %s is 
not known to this NiFi instance.", type, coordinate));
             }
         } else {
-            final List<BundleCoordinate> bundlesForType = 
extensionManager.getBundles(type).stream().map(b -> 
b.getBundleDetails().getCoordinate()).collect(Collectors.toList());
+            final List<BundleCoordinate> bundlesForType = 
extensionManager.getBundles(type).stream()
+                .map(b -> b.getBundleDetails().getCoordinate())
+                .toList();
+
             if (bundlesForType.contains(coordinate)) {
                 return coordinate;
             } else {
@@ -82,18 +103,17 @@ public final class BundleUtils {
 
 
     private static Optional<BundleCoordinate> 
findOptionalCompatibleBundle(final ExtensionManager extensionManager, final 
String type,
-                                                         final BundleDTO 
bundleDTO, final boolean allowCompatibleBundle) {
+                                                         final BundleDTO 
bundleDTO) {
         final BundleCoordinate coordinate = new 
BundleCoordinate(bundleDTO.getGroup(), bundleDTO.getArtifact(), 
bundleDTO.getVersion());
         final Bundle bundle = extensionManager.getBundle(coordinate);
 
         if (bundle == null) {
-            if (allowCompatibleBundle) {
-                return findOptionalBundleForType(extensionManager, type, 
coordinate);
-            } else {
-                return Optional.empty();
-            }
+            return findOptionalBundleForType(extensionManager, type, 
NarClassLoadersHolder.getInstance().getFrameworkBundle());
         } else {
-            final List<BundleCoordinate> bundlesForType = 
extensionManager.getBundles(type).stream().map(b -> 
b.getBundleDetails().getCoordinate()).collect(Collectors.toList());
+            final List<BundleCoordinate> bundlesForType = 
extensionManager.getBundles(type).stream()
+                .map(b -> b.getBundleDetails().getCoordinate())
+                .toList();
+
             if (bundlesForType.contains(coordinate)) {
                 return Optional.of(coordinate);
             } else {
@@ -181,9 +201,9 @@ public final class BundleUtils {
 
     public static Optional<BundleCoordinate> getOptionalCompatibleBundle(final 
ExtensionManager extensionManager, final String type, final BundleDTO 
bundleDTO) {
         if (bundleDTO == null) {
-            return findOptionalBundleForType(extensionManager, type, null);
+            return findOptionalBundleForType(extensionManager, type, 
NarClassLoadersHolder.getInstance().getFrameworkBundle());
         } else {
-            return findOptionalCompatibleBundle(extensionManager, type, 
bundleDTO, true);
+            return findOptionalCompatibleBundle(extensionManager, type, 
bundleDTO);
         }
     }
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/util/TestBundleUtils.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/util/TestBundleUtils.java
new file mode 100644
index 0000000000..0368ac513b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/util/TestBundleUtils.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.bundle.BundleDetails;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarClassLoaders;
+import org.apache.nifi.nar.NarClassLoadersHolder;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+public class TestBundleUtils {
+
+    private static final String PROCESSOR_TYPE = "MyProcessor";
+    private static final String FRAMEWORK_VERSION = "5.0.0";
+
+    private static final Bundle frameworkBundle = 
createBundle("framework-bundle", FRAMEWORK_VERSION);
+    private static ExtensionManager extensionManager;
+
+
+    @BeforeAll
+    public static void setup() throws IOException, ClassNotFoundException {
+        extensionManager = Mockito.mock(ExtensionManager.class);
+
+        final NarClassLoaders narClassLoaders = 
NarClassLoadersHolder.getInstance();
+        narClassLoaders.init(null, new File("target/extensions"));
+    }
+
+    @Test
+    public void findOptionalBundleMatchingFramework() throws IOException, 
ClassNotFoundException {
+        final Bundle frameworkVersionBundle = createBundle("my-bundle", 
FRAMEWORK_VERSION);
+        final Bundle otherBundle = createBundle("my-bundle", "1.2.3");
+        final List<Bundle> bundles = Arrays.asList(frameworkVersionBundle, 
otherBundle);
+        when(extensionManager.getBundles(PROCESSOR_TYPE)).thenReturn(bundles);
+
+        final Optional<BundleCoordinate> compatibleCoordinate = 
BundleUtils.findOptionalBundleForType(extensionManager, PROCESSOR_TYPE, 
frameworkBundle);
+        assertTrue(compatibleCoordinate.isPresent());
+        
assertEquals(frameworkVersionBundle.getBundleDetails().getCoordinate(), 
compatibleCoordinate.get());
+    }
+
+    @Test
+    public void findOptionalBundleNotMatchingFramework() throws IOException, 
ClassNotFoundException {
+        final Bundle version3 = createBundle("my-bundle", "3.0.0");
+        final Bundle otherBundle = createBundle("my-bundle", "1.2.3");
+        final List<Bundle> bundles = Arrays.asList(version3, otherBundle);
+        when(extensionManager.getBundles(PROCESSOR_TYPE)).thenReturn(bundles);
+
+        final Optional<BundleCoordinate> compatibleCoordinate = 
BundleUtils.findOptionalBundleForType(extensionManager, PROCESSOR_TYPE, 
frameworkBundle);
+        assertFalse(compatibleCoordinate.isPresent());
+    }
+
+    @Test
+    public void testFindOptionalBundleOnlyOneBundle() throws IOException, 
ClassNotFoundException {
+        final Bundle otherBundle = createBundle("my-bundle", "1.2.3");
+        final List<Bundle> bundles = Collections.singletonList(otherBundle);
+        when(extensionManager.getBundles(PROCESSOR_TYPE)).thenReturn(bundles);
+
+        final Optional<BundleCoordinate> compatibleCoordinate = 
BundleUtils.findOptionalBundleForType(extensionManager, PROCESSOR_TYPE, 
frameworkBundle);
+        assertTrue(compatibleCoordinate.isPresent());
+        assertEquals(otherBundle.getBundleDetails().getCoordinate(), 
compatibleCoordinate.get());
+    }
+
+    private static Bundle createBundle(final String artifactId, final String 
version) {
+        final BundleDetails bundleDetails = new BundleDetails.Builder()
+            .coordinate(new BundleCoordinate("org.apache.nifi", artifactId, 
version))
+            .workingDir(new File("target"))
+            .build();
+
+        return new Bundle(bundleDetails, 
TestBundleUtils.class.getClassLoader());
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 2ce5477474..26f835a08c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -956,7 +956,7 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
             controller.setNodeId(nodeId);
 
             // load new controller state
-            loadFromBytes(dataFlow, true, 
BundleUpdateStrategy.USE_SPECIFIED_OR_FAIL);
+            loadFromBytes(dataFlow, true, 
BundleUpdateStrategy.USE_SPECIFIED_OR_COMPATIBLE_OR_GHOST);
 
             // set node ID on controller before we start heartbeating because 
heartbeat needs node ID
             clusterCoordinator.setLocalNodeIdentifier(nodeId);
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
index f4926bc04f..48c426bfc7 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
@@ -30,7 +30,6 @@ import org.apache.nifi.tests.system.NiFiSystemIT;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessorClient;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
-import org.apache.nifi.web.api.dto.NodeDTO;
 import org.apache.nifi.web.api.dto.PortDTO;
 import org.apache.nifi.web.api.dto.ProcessorDTO;
 import org.apache.nifi.web.api.dto.flow.FlowDTO;
@@ -353,84 +352,6 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
         });
     }
 
-    @Test
-    public void testCannotJoinClusterIfMissingNar() throws 
NiFiClientException, IOException, InterruptedException {
-        getClientUtil().createProcessor("GenerateFlowFile");
-
-        // Shut down node 2
-        disconnectNode(2);
-        final NiFiInstance node2 = getNiFiInstance().getNodeInstance(2);
-        node2.stop();
-
-        // Remove node from the cluster. This way we know when it's attempted 
to connect
-        final Integer node2ApiPort = getNodeApiPort(2);
-        removeNode(2);
-        removeExtensionsNar(node2);
-
-        node2.start(false);
-
-        // Wait until node is no longer removed from cluster, which will 
happen when it starts up and requests to connect
-        waitFor(() -> !isNodeRemoved(node2ApiPort));
-
-        // Wait for node to show as disconnected because it doesn't have the 
necessary nar
-        waitForNodeState(2, NodeConnectionState.DISCONNECTED);
-
-        // We need to restore the extensions nar and restart the node so that 
subsequent tests can succeed
-        restoreExtensionsNar(node2);
-        node2.stop();
-        node2.start();
-
-        waitForAllNodesConnected();
-    }
-
-    private void removeNode(final int index) throws NiFiClientException, 
IOException, InterruptedException {
-        final NodeDTO nodeDto = getNodeEntity(index).getNode();
-        final String nodeId = nodeDto.getNodeId();
-        final Integer apiPort = nodeDto.getApiPort();
-        getNifiClient().getControllerClient().deleteNode(nodeId);
-        waitFor(() -> isNodeRemoved(apiPort));
-    }
-
-    private Integer getNodeApiPort(final int index) throws 
NiFiClientException, IOException {
-        final NodeDTO nodeDto = getNodeEntity(index).getNode();
-        final Integer apiPort = nodeDto.getApiPort();
-        return apiPort;
-    }
-
-    @Test
-    public void testCanJoinClusterIfAllNodesMissingNar() throws 
NiFiClientException, IOException, InterruptedException {
-        final ProcessorEntity generate = 
getClientUtil().createProcessor("GenerateFlowFile");
-
-        // Shut down node 2
-        disconnectNode(2);
-        final NiFiInstance node2 = getNiFiInstance().getNodeInstance(2);
-        node2.stop();
-
-        final NiFiInstance node1 = getNiFiInstance().getNodeInstance(1);
-        node1.stop();
-
-        removeExtensionsNar(node1);
-        removeExtensionsNar(node2);
-
-        node1.start(false);
-        node2.start(true);
-
-        waitForAllNodesConnected();
-
-        
assertTrue(getNifiClient().getProcessorClient().getProcessor(generate.getId()).getComponent().getExtensionMissing());
-
-        // In order to ensure that subsequent tests are able to operate 
properly, we need to restore the nar and restart
-        node1.stop();
-        node2.stop();
-
-        restoreExtensionsNar(node1);
-        restoreExtensionsNar(node2);
-
-        node1.start(false);
-        node2.start(true);
-        waitForAllNodesConnected();
-    }
-
 
     @Test
     public void testCannotRemoveComponentsWhileNodeDisconnected() throws 
NiFiClientException, IOException, InterruptedException {
@@ -497,36 +418,6 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
     }
 
 
-    private void removeExtensionsNar(final NiFiInstance nifiInstance) {
-        final File extensionsNar = getExtensionsNar(nifiInstance);
-        final File backupFile = new File(extensionsNar.getParentFile(), 
extensionsNar.getName() + ".backup");
-        assertTrue(extensionsNar.renameTo(backupFile));
-    }
-
-    private void restoreExtensionsNar(final NiFiInstance nifiInstance) {
-        final File backupFile = getExtensionsNar(nifiInstance);
-        final File extensionsNar = new File(backupFile.getParentFile(), 
backupFile.getName().replace(".backup", ""));
-        assertTrue(backupFile.renameTo(extensionsNar));
-    }
-
-    private File getExtensionsNar(final NiFiInstance nifiInstance) {
-        final File libDir = new File(nifiInstance.getInstanceDirectory(), 
"lib");
-        final File[] testExtensionsNar = libDir.listFiles(file -> 
file.getName().startsWith("nifi-system-test-extensions-nar-"));
-        assertEquals(1, testExtensionsNar.length);
-
-        return testExtensionsNar[0];
-    }
-
-
-    private boolean isNodeRemoved(final int apiPort) {
-        try {
-            return 
getNifiClient().getControllerClient().getNodes().getCluster().getNodes().stream()
-                .noneMatch(dto -> dto.getApiPort() == apiPort);
-        } catch (Exception e) {
-            return false;
-        }
-    }
-
     @Test
     public void testRestartWithFlowXmlGzNoJson() throws NiFiClientException, 
IOException {
         restartWithOnlySingleFlowPersistenceFile("flow.json.gz");

Reply via email to