This is an automated email from the ASF dual-hosted git repository.

denes pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 5106197  NIFI-9018 When connection points to a moved port version 
change with … (#5286)
5106197 is described below

commit 5106197b3b5cb439d0ba486216ce800ba917c6b5
Author: timeabarna <[email protected]>
AuthorDate: Wed Aug 11 14:26:08 2021 +0200

    NIFI-9018 When connection points to a moved port version change with … 
(#5286)
    
    * NIFI-9018 When connection points to a moved port version change with NiFi 
Registry may throw exception
---
 .../apache/nifi/groups/StandardProcessGroup.java   |  33 +--
 .../nifi/integration/FrameworkIntegrationTest.java |  17 +-
 .../nifi/integration/versioned/ImportFlowIT.java   | 234 +++++++++++++++++----
 3 files changed, 231 insertions(+), 53 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index cee0b10..6feb49e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -4282,6 +4282,27 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             rpgsRemoved.remove(proposedRpg.getIdentifier());
         }
 
+        //Remove deletable Input and Output Ports.
+        //addConnection method may link the ports incorrectly, for example:
+        //Current flow: PGA IP1         ProcessGroupA has an Input Port IP1
+        //New flow: PGA P1-C1           ProcessGroupA has a new connection C1, 
its source is a new Processor P1
+        //            |     |           and its destination pointing to the 
moved Input Port IP1 under a new child ProcessGroup PGB
+        //          PGB    IP1
+        //As Input Port (IP1) originally belonged to PGA the new connection 
would be incorrectly linked to the old Input Port
+        //instead of the one being in PGB, so it needs to be removed first 
before updating the connections.
+
+        for (final String removedVersionedId : inputPortsRemoved) {
+            final Port port = inputPortsByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", port, group);
+            group.removeInputPort(port);
+        }
+
+        for (final String removedVersionedId : outputPortsRemoved) {
+            final Port port = outputPortsByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", port, group);
+            group.removeOutputPort(port);
+        }
+
         // Add and update Connections
         for (final VersionedConnection proposedConnection : 
proposed.getConnections()) {
             final Connection connection = 
connectionsByVersionedId.get(proposedConnection.getIdentifier());
@@ -4320,18 +4341,6 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             group.removeFunnel(funnel);
         }
 
-        for (final String removedVersionedId : inputPortsRemoved) {
-            final Port port = inputPortsByVersionedId.get(removedVersionedId);
-            LOG.info("Removing {} from {}", port, group);
-            group.removeInputPort(port);
-        }
-
-        for (final String removedVersionedId : outputPortsRemoved) {
-            final Port port = outputPortsByVersionedId.get(removedVersionedId);
-            LOG.info("Removing {} from {}", port, group);
-            group.removeOutputPort(port);
-        }
-
         // Now that all input/output ports have been removed, we should be 
able to update
         // all ports to the final name that was proposed in the new flow 
version.
         for (final Map.Entry<Port, String> portAndFinalName : 
proposedPortFinalNames.entrySet()) {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
index 38b289b..f5505b6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
@@ -29,6 +29,7 @@ import org.apache.nifi.cluster.protocol.NodeProtocolSender;
 import org.apache.nifi.cluster.protocol.StandardDataFlow;
 import org.apache.nifi.components.state.StateProvider;
 import org.apache.nifi.components.validation.ValidationStatus;
+import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.connectable.StandardConnection;
 import org.apache.nifi.controller.ControllerService;
@@ -388,11 +389,19 @@ public class FrameworkIntegrationTest {
         return createProcessorNode(processorType.getName());
     }
 
+    protected final ProcessorNode createProcessorNode(final Class<? extends 
Processor> processorType, final ProcessGroup destination) {
+        return createProcessorNode(processorType.getName(), destination);
+    }
+
     protected final ProcessorNode createProcessorNode(final String 
processorType) {
+        return createProcessorNode(processorType, rootProcessGroup);
+    }
+
+    protected final ProcessorNode createProcessorNode(final String 
processorType, final ProcessGroup destination) {
         final String uuid = getSimpleTypeName(processorType) + "-" + 
UUID.randomUUID().toString();
         final BundleCoordinate bundleCoordinate = 
SystemBundle.SYSTEM_BUNDLE_COORDINATE;
         final ProcessorNode procNode = 
flowController.getFlowManager().createProcessor(processorType, uuid, 
bundleCoordinate, Collections.emptySet(), true, true);
-        rootProcessGroup.addProcessor(procNode);
+        destination.addProcessor(procNode);
 
         return procNode;
     }
@@ -465,15 +474,15 @@ public class FrameworkIntegrationTest {
         return processorNode;
     }
 
-    protected final Connection connect(final ProcessorNode source, final 
ProcessorNode destination, final Relationship relationship) {
+    protected final Connection connect(final Connectable source, final 
Connectable destination, final Relationship relationship) {
         return connect(source, destination, 
Collections.singleton(relationship));
     }
 
-    protected final Connection connect(final ProcessorNode source, final 
ProcessorNode destination, final Collection<Relationship> relationships) {
+    protected final Connection connect(final Connectable source, final 
Connectable destination, final Collection<Relationship> relationships) {
         return connect(rootProcessGroup, source, destination, relationships);
     }
 
-    protected final Connection connect(ProcessGroup processGroup, final 
ProcessorNode source, final ProcessorNode destination, final 
Collection<Relationship> relationships) {
+    protected final Connection connect(ProcessGroup processGroup, final 
Connectable source, final Connectable destination, final 
Collection<Relationship> relationships) {
         final String id = UUID.randomUUID().toString();
         final Connection connection = new 
StandardConnection.Builder(processScheduler)
                 .source(source)
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
index 529c266..c4a00b3 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
@@ -19,14 +19,20 @@ package org.apache.nifi.integration.versioned;
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.validation.ValidationStatus;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Port;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.StandardSnippet;
 import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedPort;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.integration.DirectInjectionExtensionManager;
 import org.apache.nifi.integration.FrameworkIntegrationTest;
 import org.apache.nifi.integration.cs.LongValidatingControllerService;
 import org.apache.nifi.integration.cs.NopServiceReferencingProcessor;
+import org.apache.nifi.integration.processors.GenerateProcessor;
 import org.apache.nifi.integration.processors.UsernamePasswordProcessor;
 import org.apache.nifi.parameter.Parameter;
 import org.apache.nifi.parameter.ParameterContext;
@@ -55,8 +61,10 @@ import 
org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
 import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
 import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
 import org.apache.nifi.util.FlowDifferenceFilters;
+import org.jetbrains.annotations.NotNull;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -384,7 +392,71 @@ public class ImportFlowIT extends FrameworkIntegrationTest 
{
         assertEquals("#{secret-param}", 
nodeInGroupWithNoValue.getProperty(UsernamePasswordProcessor.PASSWORD).getRawValue());
     }
 
+    @Test
+    public void testUpdateFlowWithInputPortMovedFromGroupAToGroupB() {
+        //Testing use case NIFI-9018
+        //Create Process Group A
+        final ProcessGroup groupA = createProcessGroup("group-a-id", "Group 
A", getRootGroup());
+
+        //Add Input Port to Process Group A
+        final Port port = 
getFlowController().getFlowManager().createLocalInputPort("input-port-id", 
"Input Port");
+        groupA.addInputPort(port);
+
+        //Create a snapshot
+        final VersionedFlowSnapshot version1 = createFlowSnapshot(groupA);
+
+        //Create Process Group B under Process Group A
+        final ProcessGroup groupB = createProcessGroup("group-b-id", "Group 
B", groupA);
+
+        //Move Input Port from Process Group A to Process Group B
+        moveInputPort(port, groupB);
+
+        //Create Processor under Process Group A
+        final ProcessorNode processor = 
createProcessorNode(GenerateProcessor.class, groupA);
+
+        //Create Connection between Processor in Process Group A and Input 
Port in Process Group B
+        final Connection connection = connect(groupA, processor, port, 
processor.getRelationships());
+
+        //Create another snapshot
+        final VersionedFlowSnapshot version2 = createFlowSnapshot(groupA);
+
+        //Change Process Group A version to Version 1
+        groupA.updateFlow(version1, null, false, true, true);
+
+        //Process Group A should have only one Input Port and no Process 
Groups, Processors or Connections
+        assertTrue(groupA.getProcessGroups().isEmpty());
+        assertTrue(groupA.getProcessors().isEmpty());
+        assertTrue(groupA.getConnections().isEmpty());
+        assertEquals(1, groupA.getInputPorts().size());
+        assertEquals(port.getVersionedComponentId(), 
groupA.getInputPorts().stream().findFirst().get().getVersionedComponentId());
+
+        //Change Process Group A version to Version 2
+        groupA.updateFlow(version2, null, false, true, true);
+
+        //Process Group A should have a Process Group, a Processor and a 
Connection and no Input Ports
+        assertEquals(1, groupA.getProcessGroups().size());
+        assertEquals(groupB.getVersionedComponentId(), 
groupA.getProcessGroups().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(1, groupA.getProcessors().size());
+        assertEquals(processor.getVersionedComponentId(), 
groupA.getProcessors().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(1, groupA.getConnections().size());
+        assertEquals(connection.getVersionedComponentId(), 
groupA.getConnections().stream().findFirst().get().getVersionedComponentId());
+        assertTrue(groupA.getInputPorts().isEmpty());
+    }
+
+    private ProcessGroup createProcessGroup(final String groupId, final String 
groupName, final ProcessGroup destination) {
+        final ProcessGroup group = 
getFlowController().getFlowManager().createProcessGroup(groupId);
+        group.setName(groupName);
+        destination.addProcessGroup(group);
+        return group;
+    }
+
+    private void moveInputPort(final Port port, final ProcessGroup 
destination) {
+        final StandardSnippet snippet = new StandardSnippet();
+        snippet.setParentGroupId(port.getProcessGroupIdentifier());
+        snippet.addInputPorts(Collections.singletonMap(port.getIdentifier(), 
null));
 
+        port.getProcessGroup().move(snippet, destination);
+    }
 
 
     private Set<FlowDifference> getLocalModifications(final ProcessGroup 
processGroup, final VersionedFlowSnapshot versionedFlowSnapshot) {
@@ -408,59 +480,82 @@ public class ImportFlowIT extends 
FrameworkIntegrationTest {
         return differences;
     }
 
-    private VersionedFlowSnapshot createFlowSnapshot(final 
List<ControllerServiceNode> controllerServices, final List<ProcessorNode> 
processors, final Set<Parameter> parameters) {
-        final VersionedFlowSnapshotMetadata snapshotMetadata = new 
VersionedFlowSnapshotMetadata();
-        snapshotMetadata.setAuthor("unit-test");
-        snapshotMetadata.setBucketIdentifier("unit-test-bucket");
-        snapshotMetadata.setFlowIdentifier("unit-test-flow");
-        snapshotMetadata.setTimestamp(System.currentTimeMillis());
-        snapshotMetadata.setVersion(1);
+    private VersionedFlowSnapshot createFlowSnapshot(final ProcessGroup group, 
final List<ControllerServiceNode> controllerServices,
+                                                     final List<ProcessorNode> 
processors, final Set<Parameter> parameters) {
+        final VersionedFlowSnapshotMetadata snapshotMetadata = 
createSnapshotMetadata();
 
-        final Bucket bucket = new Bucket();
-        bucket.setCreatedTimestamp(System.currentTimeMillis());
-        bucket.setIdentifier("unit-test-bucket");
-        bucket.setName("Unit Test Bucket");
+        final Bucket bucket = createBucket();
 
-        final VersionedFlow flow = new VersionedFlow();
-        flow.setBucketIdentifier("unit-test-bucket");
-        flow.setBucketName("Unit Test Bucket");
-        flow.setCreatedTimestamp(System.currentTimeMillis());
-        flow.setIdentifier("unit-test-flow");
-        flow.setName("Unit Test Flow");
+        final VersionedFlow flow = createVersionedFlow();
 
-        final BundleCoordinate coordinate = 
getSystemBundle().getBundleDetails().getCoordinate();
-        final Bundle bundle = new Bundle();
-        bundle.setArtifact(coordinate.getId());
-        bundle.setGroup(coordinate.getGroup());
-        bundle.setVersion(coordinate.getVersion());
+        createBundle();
 
         final NiFiRegistryFlowMapper flowMapper = new 
NiFiRegistryFlowMapper(getExtensionManager());
 
+        final List<ProcessorNode> processorNodes;
+        final List<ControllerServiceNode> controllerServiceNodes;
+        final List<Port> inputPorts;
+        final List<Connection> connections;
+        final List<ProcessGroup> processGroups;
+        final Set<VersionedProcessGroup> versionedProcessGroups;
+
+        if (group == null) {
+            processorNodes = processors;
+            controllerServiceNodes = controllerServices;
+            inputPorts = Collections.EMPTY_LIST;
+            connections = Collections.EMPTY_LIST;
+            versionedProcessGroups = Collections.EMPTY_SET;
+        } else {
+            processorNodes = new ArrayList<>(group.getProcessors());
+            controllerServiceNodes = new 
ArrayList<>(group.getControllerServices(false));
+            inputPorts = new ArrayList<>(group.getInputPorts());
+            connections = new ArrayList<>(group.getConnections());
+            processGroups = new ArrayList<>(group.getProcessGroups());
+
+            final VersionedProcessGroup versionedGroup = 
flowMapper.mapProcessGroup(group, 
getFlowController().getControllerServiceProvider(),getFlowController().getFlowRegistryClient(),true);
+            processGroups.forEach(processGroup->
+                
versionedGroup.getProcessGroups().stream().filter(versionedProcessGroup -> 
versionedProcessGroup.getName().equals(processGroup.getName()))
+                        .forEach(filteredProcessGroup -> 
processGroup.setVersionedComponentId(filteredProcessGroup.getIdentifier())));
+            versionedProcessGroups = new 
HashSet<>(versionedGroup.getProcessGroups());
+        }
+
         final Set<VersionedProcessor> versionedProcessors = new HashSet<>();
-        for (final ProcessorNode processor : processors) {
+        for (final ProcessorNode processor : processorNodes) {
             final VersionedProcessor versionedProcessor = 
flowMapper.mapProcessor(processor, 
getFlowController().getControllerServiceProvider(), Collections.emptySet(), new 
HashMap<>());
             versionedProcessors.add(versionedProcessor);
             
processor.setVersionedComponentId(versionedProcessor.getIdentifier());
         }
 
-        final Set<VersionedControllerService> services = new HashSet<>();
-        for (final ControllerServiceNode serviceNode : controllerServices) {
-            final VersionedControllerService service = 
flowMapper.mapControllerService(serviceNode, 
getFlowController().getControllerServiceProvider(), Collections.emptySet(), new 
HashMap<>());
-            services.add(service);
-            serviceNode.setVersionedComponentId(service.getIdentifier());
+        final Set<VersionedControllerService> versionedServices = new 
HashSet<>();
+        for (final ControllerServiceNode serviceNode : controllerServiceNodes) 
{
+            final VersionedControllerService versionedService = 
flowMapper.mapControllerService(serviceNode, 
getFlowController().getControllerServiceProvider(),
+                    Collections.emptySet(), new HashMap<>());
+            versionedServices.add(versionedService);
+            
serviceNode.setVersionedComponentId(versionedService.getIdentifier());
         }
 
-        final VersionedProcessGroup flowContents = new VersionedProcessGroup();
-        flowContents.setIdentifier("unit-test-flow-contents");
-        flowContents.setName("Unit Test");
+        final Set<VersionedPort> versionedInputPorts = new HashSet<>();
+        for (final Port inputPort : inputPorts) {
+            final VersionedPort versionedInputPort = 
flowMapper.mapPort(inputPort);
+            versionedInputPorts.add(versionedInputPort);
+            
inputPort.setVersionedComponentId(versionedInputPort.getIdentifier());
+        }
+
+        final Set<VersionedConnection> versionedConnections = new HashSet<>();
+        for (final Connection connection : connections) {
+            final VersionedConnection versionedConnection = 
flowMapper.mapConnection(connection);
+            versionedConnections.add(versionedConnection);
+            
connection.setVersionedComponentId(versionedConnection.getIdentifier());
+        }
+
+        final VersionedProcessGroup flowContents = createFlowContents();
         flowContents.setProcessors(versionedProcessors);
-        flowContents.setControllerServices(services);
+        flowContents.setControllerServices(versionedServices);
+        flowContents.setProcessGroups(versionedProcessGroups);
+        flowContents.setInputPorts(versionedInputPorts);
+        flowContents.setConnections(versionedConnections);
 
-        final VersionedFlowSnapshot versionedFlowSnapshot = new 
VersionedFlowSnapshot();
-        versionedFlowSnapshot.setSnapshotMetadata(snapshotMetadata);
-        versionedFlowSnapshot.setBucket(bucket);
-        versionedFlowSnapshot.setFlow(flow);
-        versionedFlowSnapshot.setFlowContents(flowContents);
+        final VersionedFlowSnapshot versionedFlowSnapshot = 
createVersionedFlowSnapshot(snapshotMetadata, bucket, flow, flowContents);
 
         if (parameters != null) {
             final Set<VersionedParameter> versionedParameters = new 
HashSet<>();
@@ -483,4 +578,69 @@ public class ImportFlowIT extends FrameworkIntegrationTest 
{
 
         return versionedFlowSnapshot;
     }
+
+    private VersionedFlowSnapshot createFlowSnapshot(final 
List<ControllerServiceNode> controllerServices, final List<ProcessorNode> 
processors, final Set<Parameter> parameters) {
+        return createFlowSnapshot(null, controllerServices, processors, 
parameters);
+    }
+
+    private VersionedFlowSnapshot createFlowSnapshot(final ProcessGroup group) 
{
+        return createFlowSnapshot(group, Collections.EMPTY_LIST, 
Collections.EMPTY_LIST, null);
+    }
+
+    @NotNull
+    private VersionedFlowSnapshot 
createVersionedFlowSnapshot(VersionedFlowSnapshotMetadata snapshotMetadata, 
Bucket bucket, VersionedFlow flow, VersionedProcessGroup flowContents) {
+        final VersionedFlowSnapshot versionedFlowSnapshot = new 
VersionedFlowSnapshot();
+        versionedFlowSnapshot.setSnapshotMetadata(snapshotMetadata);
+        versionedFlowSnapshot.setBucket(bucket);
+        versionedFlowSnapshot.setFlow(flow);
+        versionedFlowSnapshot.setFlowContents(flowContents);
+        return versionedFlowSnapshot;
+    }
+
+    @NotNull
+    private VersionedProcessGroup createFlowContents() {
+        final VersionedProcessGroup flowContents = new VersionedProcessGroup();
+        flowContents.setIdentifier("unit-test-flow-contents");
+        flowContents.setName("Unit Test");
+        return flowContents;
+    }
+
+    private void createBundle() {
+        final BundleCoordinate coordinate = 
getSystemBundle().getBundleDetails().getCoordinate();
+        final Bundle bundle = new Bundle();
+        bundle.setArtifact(coordinate.getId());
+        bundle.setGroup(coordinate.getGroup());
+        bundle.setVersion(coordinate.getVersion());
+    }
+
+    @NotNull
+    private VersionedFlow createVersionedFlow() {
+        final VersionedFlow flow = new VersionedFlow();
+        flow.setBucketIdentifier("unit-test-bucket");
+        flow.setBucketName("Unit Test Bucket");
+        flow.setCreatedTimestamp(System.currentTimeMillis());
+        flow.setIdentifier("unit-test-flow");
+        flow.setName("Unit Test Flow");
+        return flow;
+    }
+
+    @NotNull
+    private Bucket createBucket() {
+        final Bucket bucket = new Bucket();
+        bucket.setCreatedTimestamp(System.currentTimeMillis());
+        bucket.setIdentifier("unit-test-bucket");
+        bucket.setName("Unit Test Bucket");
+        return bucket;
+    }
+
+    @NotNull
+    private VersionedFlowSnapshotMetadata createSnapshotMetadata() {
+        final VersionedFlowSnapshotMetadata snapshotMetadata = new 
VersionedFlowSnapshotMetadata();
+        snapshotMetadata.setAuthor("unit-test");
+        snapshotMetadata.setBucketIdentifier("unit-test-bucket");
+        snapshotMetadata.setFlowIdentifier("unit-test-flow");
+        snapshotMetadata.setTimestamp(System.currentTimeMillis());
+        snapshotMetadata.setVersion(1);
+        return snapshotMetadata;
+    }
 }

Reply via email to