This is an automated email from the ASF dual-hosted git repository.
denes pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 5106197 NIFI-9018 When connection points to a moved port version
change with … (#5286)
5106197 is described below
commit 5106197b3b5cb439d0ba486216ce800ba917c6b5
Author: timeabarna <[email protected]>
AuthorDate: Wed Aug 11 14:26:08 2021 +0200
NIFI-9018 When connection points to a moved port version change with …
(#5286)
* NIFI-9018 When connection points to a moved port version change with NiFi
Registry may throw exception
---
.../apache/nifi/groups/StandardProcessGroup.java | 33 +--
.../nifi/integration/FrameworkIntegrationTest.java | 17 +-
.../nifi/integration/versioned/ImportFlowIT.java | 234 +++++++++++++++++----
3 files changed, 231 insertions(+), 53 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index cee0b10..6feb49e 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -4282,6 +4282,27 @@ public final class StandardProcessGroup implements
ProcessGroup {
rpgsRemoved.remove(proposedRpg.getIdentifier());
}
+ //Remove deletable Input and Output Ports.
+ //addConnection method may link the ports incorrectly, for example:
+ //Current flow: PGA IP1 ProcessGroupA has an Input Port IP1
+ //New flow: PGA P1-C1 ProcessGroupA has a new connection C1,
its source is a new Processor P1
+ // | | and its destination pointing to the
moved Input Port IP1 under a new child ProcessGroup PGB
+ // PGB IP1
+ //As Input Port (IP1) originally belonged to PGA the new connection
would be incorrectly linked to the old Input Port
+ //instead of the one being in PGB, so it needs to be removed first
before updating the connections.
+
+ for (final String removedVersionedId : inputPortsRemoved) {
+ final Port port = inputPortsByVersionedId.get(removedVersionedId);
+ LOG.info("Removing {} from {}", port, group);
+ group.removeInputPort(port);
+ }
+
+ for (final String removedVersionedId : outputPortsRemoved) {
+ final Port port = outputPortsByVersionedId.get(removedVersionedId);
+ LOG.info("Removing {} from {}", port, group);
+ group.removeOutputPort(port);
+ }
+
// Add and update Connections
for (final VersionedConnection proposedConnection :
proposed.getConnections()) {
final Connection connection =
connectionsByVersionedId.get(proposedConnection.getIdentifier());
@@ -4320,18 +4341,6 @@ public final class StandardProcessGroup implements
ProcessGroup {
group.removeFunnel(funnel);
}
- for (final String removedVersionedId : inputPortsRemoved) {
- final Port port = inputPortsByVersionedId.get(removedVersionedId);
- LOG.info("Removing {} from {}", port, group);
- group.removeInputPort(port);
- }
-
- for (final String removedVersionedId : outputPortsRemoved) {
- final Port port = outputPortsByVersionedId.get(removedVersionedId);
- LOG.info("Removing {} from {}", port, group);
- group.removeOutputPort(port);
- }
-
// Now that all input/output ports have been removed, we should be
able to update
// all ports to the final name that was proposed in the new flow
version.
for (final Map.Entry<Port, String> portAndFinalName :
proposedPortFinalNames.entrySet()) {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
index 38b289b..f5505b6 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
@@ -29,6 +29,7 @@ import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.components.state.StateProvider;
import org.apache.nifi.components.validation.ValidationStatus;
+import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.StandardConnection;
import org.apache.nifi.controller.ControllerService;
@@ -388,11 +389,19 @@ public class FrameworkIntegrationTest {
return createProcessorNode(processorType.getName());
}
+ protected final ProcessorNode createProcessorNode(final Class<? extends
Processor> processorType, final ProcessGroup destination) {
+ return createProcessorNode(processorType.getName(), destination);
+ }
+
protected final ProcessorNode createProcessorNode(final String
processorType) {
+ return createProcessorNode(processorType, rootProcessGroup);
+ }
+
+ protected final ProcessorNode createProcessorNode(final String
processorType, final ProcessGroup destination) {
final String uuid = getSimpleTypeName(processorType) + "-" +
UUID.randomUUID().toString();
final BundleCoordinate bundleCoordinate =
SystemBundle.SYSTEM_BUNDLE_COORDINATE;
final ProcessorNode procNode =
flowController.getFlowManager().createProcessor(processorType, uuid,
bundleCoordinate, Collections.emptySet(), true, true);
- rootProcessGroup.addProcessor(procNode);
+ destination.addProcessor(procNode);
return procNode;
}
@@ -465,15 +474,15 @@ public class FrameworkIntegrationTest {
return processorNode;
}
- protected final Connection connect(final ProcessorNode source, final
ProcessorNode destination, final Relationship relationship) {
+ protected final Connection connect(final Connectable source, final
Connectable destination, final Relationship relationship) {
return connect(source, destination,
Collections.singleton(relationship));
}
- protected final Connection connect(final ProcessorNode source, final
ProcessorNode destination, final Collection<Relationship> relationships) {
+ protected final Connection connect(final Connectable source, final
Connectable destination, final Collection<Relationship> relationships) {
return connect(rootProcessGroup, source, destination, relationships);
}
- protected final Connection connect(ProcessGroup processGroup, final
ProcessorNode source, final ProcessorNode destination, final
Collection<Relationship> relationships) {
+ protected final Connection connect(ProcessGroup processGroup, final
Connectable source, final Connectable destination, final
Collection<Relationship> relationships) {
final String id = UUID.randomUUID().toString();
final Connection connection = new
StandardConnection.Builder(processScheduler)
.source(source)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
index 529c266..c4a00b3 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
@@ -19,14 +19,20 @@ package org.apache.nifi.integration.versioned;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.validation.ValidationStatus;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.StandardSnippet;
import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.integration.DirectInjectionExtensionManager;
import org.apache.nifi.integration.FrameworkIntegrationTest;
import org.apache.nifi.integration.cs.LongValidatingControllerService;
import org.apache.nifi.integration.cs.NopServiceReferencingProcessor;
+import org.apache.nifi.integration.processors.GenerateProcessor;
import org.apache.nifi.integration.processors.UsernamePasswordProcessor;
import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterContext;
@@ -55,8 +61,10 @@ import
org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.util.FlowDifferenceFilters;
+import org.jetbrains.annotations.NotNull;
import org.junit.Test;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -384,7 +392,71 @@ public class ImportFlowIT extends FrameworkIntegrationTest
{
assertEquals("#{secret-param}",
nodeInGroupWithNoValue.getProperty(UsernamePasswordProcessor.PASSWORD).getRawValue());
}
+ @Test
+ public void testUpdateFlowWithInputPortMovedFromGroupAToGroupB() {
+ //Testing use case NIFI-9018
+ //Create Process Group A
+ final ProcessGroup groupA = createProcessGroup("group-a-id", "Group
A", getRootGroup());
+
+ //Add Input Port to Process Group A
+ final Port port =
getFlowController().getFlowManager().createLocalInputPort("input-port-id",
"Input Port");
+ groupA.addInputPort(port);
+
+ //Create a snapshot
+ final VersionedFlowSnapshot version1 = createFlowSnapshot(groupA);
+
+ //Create Process Group B under Process Group A
+ final ProcessGroup groupB = createProcessGroup("group-b-id", "Group
B", groupA);
+
+ //Move Input Port from Process Group A to Process Group B
+ moveInputPort(port, groupB);
+
+ //Create Processor under Process Group A
+ final ProcessorNode processor =
createProcessorNode(GenerateProcessor.class, groupA);
+
+ //Create Connection between Processor in Process Group A and Input
Port in Process Group B
+ final Connection connection = connect(groupA, processor, port,
processor.getRelationships());
+
+ //Create another snapshot
+ final VersionedFlowSnapshot version2 = createFlowSnapshot(groupA);
+
+ //Change Process Group A version to Version 1
+ groupA.updateFlow(version1, null, false, true, true);
+
+ //Process Group A should have only one Input Port and no Process
Groups, Processors or Connections
+ assertTrue(groupA.getProcessGroups().isEmpty());
+ assertTrue(groupA.getProcessors().isEmpty());
+ assertTrue(groupA.getConnections().isEmpty());
+ assertEquals(1, groupA.getInputPorts().size());
+ assertEquals(port.getVersionedComponentId(),
groupA.getInputPorts().stream().findFirst().get().getVersionedComponentId());
+
+ //Change Process Group A version to Version 2
+ groupA.updateFlow(version2, null, false, true, true);
+
+ //Process Group A should have a Process Group, a Processor and a
Connection and no Input Ports
+ assertEquals(1, groupA.getProcessGroups().size());
+ assertEquals(groupB.getVersionedComponentId(),
groupA.getProcessGroups().stream().findFirst().get().getVersionedComponentId());
+ assertEquals(1, groupA.getProcessors().size());
+ assertEquals(processor.getVersionedComponentId(),
groupA.getProcessors().stream().findFirst().get().getVersionedComponentId());
+ assertEquals(1, groupA.getConnections().size());
+ assertEquals(connection.getVersionedComponentId(),
groupA.getConnections().stream().findFirst().get().getVersionedComponentId());
+ assertTrue(groupA.getInputPorts().isEmpty());
+ }
+
+ private ProcessGroup createProcessGroup(final String groupId, final String
groupName, final ProcessGroup destination) {
+ final ProcessGroup group =
getFlowController().getFlowManager().createProcessGroup(groupId);
+ group.setName(groupName);
+ destination.addProcessGroup(group);
+ return group;
+ }
+
+ private void moveInputPort(final Port port, final ProcessGroup
destination) {
+ final StandardSnippet snippet = new StandardSnippet();
+ snippet.setParentGroupId(port.getProcessGroupIdentifier());
+ snippet.addInputPorts(Collections.singletonMap(port.getIdentifier(),
null));
+ port.getProcessGroup().move(snippet, destination);
+ }
private Set<FlowDifference> getLocalModifications(final ProcessGroup
processGroup, final VersionedFlowSnapshot versionedFlowSnapshot) {
@@ -408,59 +480,82 @@ public class ImportFlowIT extends
FrameworkIntegrationTest {
return differences;
}
- private VersionedFlowSnapshot createFlowSnapshot(final
List<ControllerServiceNode> controllerServices, final List<ProcessorNode>
processors, final Set<Parameter> parameters) {
- final VersionedFlowSnapshotMetadata snapshotMetadata = new
VersionedFlowSnapshotMetadata();
- snapshotMetadata.setAuthor("unit-test");
- snapshotMetadata.setBucketIdentifier("unit-test-bucket");
- snapshotMetadata.setFlowIdentifier("unit-test-flow");
- snapshotMetadata.setTimestamp(System.currentTimeMillis());
- snapshotMetadata.setVersion(1);
+ private VersionedFlowSnapshot createFlowSnapshot(final ProcessGroup group,
final List<ControllerServiceNode> controllerServices,
+ final List<ProcessorNode>
processors, final Set<Parameter> parameters) {
+ final VersionedFlowSnapshotMetadata snapshotMetadata =
createSnapshotMetadata();
- final Bucket bucket = new Bucket();
- bucket.setCreatedTimestamp(System.currentTimeMillis());
- bucket.setIdentifier("unit-test-bucket");
- bucket.setName("Unit Test Bucket");
+ final Bucket bucket = createBucket();
- final VersionedFlow flow = new VersionedFlow();
- flow.setBucketIdentifier("unit-test-bucket");
- flow.setBucketName("Unit Test Bucket");
- flow.setCreatedTimestamp(System.currentTimeMillis());
- flow.setIdentifier("unit-test-flow");
- flow.setName("Unit Test Flow");
+ final VersionedFlow flow = createVersionedFlow();
- final BundleCoordinate coordinate =
getSystemBundle().getBundleDetails().getCoordinate();
- final Bundle bundle = new Bundle();
- bundle.setArtifact(coordinate.getId());
- bundle.setGroup(coordinate.getGroup());
- bundle.setVersion(coordinate.getVersion());
+ createBundle();
final NiFiRegistryFlowMapper flowMapper = new
NiFiRegistryFlowMapper(getExtensionManager());
+ final List<ProcessorNode> processorNodes;
+ final List<ControllerServiceNode> controllerServiceNodes;
+ final List<Port> inputPorts;
+ final List<Connection> connections;
+ final List<ProcessGroup> processGroups;
+ final Set<VersionedProcessGroup> versionedProcessGroups;
+
+ if (group == null) {
+ processorNodes = processors;
+ controllerServiceNodes = controllerServices;
+ inputPorts = Collections.EMPTY_LIST;
+ connections = Collections.EMPTY_LIST;
+ versionedProcessGroups = Collections.EMPTY_SET;
+ } else {
+ processorNodes = new ArrayList<>(group.getProcessors());
+ controllerServiceNodes = new
ArrayList<>(group.getControllerServices(false));
+ inputPorts = new ArrayList<>(group.getInputPorts());
+ connections = new ArrayList<>(group.getConnections());
+ processGroups = new ArrayList<>(group.getProcessGroups());
+
+ final VersionedProcessGroup versionedGroup =
flowMapper.mapProcessGroup(group,
getFlowController().getControllerServiceProvider(),getFlowController().getFlowRegistryClient(),true);
+ processGroups.forEach(processGroup->
+
versionedGroup.getProcessGroups().stream().filter(versionedProcessGroup ->
versionedProcessGroup.getName().equals(processGroup.getName()))
+ .forEach(filteredProcessGroup ->
processGroup.setVersionedComponentId(filteredProcessGroup.getIdentifier())));
+ versionedProcessGroups = new
HashSet<>(versionedGroup.getProcessGroups());
+ }
+
final Set<VersionedProcessor> versionedProcessors = new HashSet<>();
- for (final ProcessorNode processor : processors) {
+ for (final ProcessorNode processor : processorNodes) {
final VersionedProcessor versionedProcessor =
flowMapper.mapProcessor(processor,
getFlowController().getControllerServiceProvider(), Collections.emptySet(), new
HashMap<>());
versionedProcessors.add(versionedProcessor);
processor.setVersionedComponentId(versionedProcessor.getIdentifier());
}
- final Set<VersionedControllerService> services = new HashSet<>();
- for (final ControllerServiceNode serviceNode : controllerServices) {
- final VersionedControllerService service =
flowMapper.mapControllerService(serviceNode,
getFlowController().getControllerServiceProvider(), Collections.emptySet(), new
HashMap<>());
- services.add(service);
- serviceNode.setVersionedComponentId(service.getIdentifier());
+ final Set<VersionedControllerService> versionedServices = new
HashSet<>();
+ for (final ControllerServiceNode serviceNode : controllerServiceNodes)
{
+ final VersionedControllerService versionedService =
flowMapper.mapControllerService(serviceNode,
getFlowController().getControllerServiceProvider(),
+ Collections.emptySet(), new HashMap<>());
+ versionedServices.add(versionedService);
+
serviceNode.setVersionedComponentId(versionedService.getIdentifier());
}
- final VersionedProcessGroup flowContents = new VersionedProcessGroup();
- flowContents.setIdentifier("unit-test-flow-contents");
- flowContents.setName("Unit Test");
+ final Set<VersionedPort> versionedInputPorts = new HashSet<>();
+ for (final Port inputPort : inputPorts) {
+ final VersionedPort versionedInputPort =
flowMapper.mapPort(inputPort);
+ versionedInputPorts.add(versionedInputPort);
+
inputPort.setVersionedComponentId(versionedInputPort.getIdentifier());
+ }
+
+ final Set<VersionedConnection> versionedConnections = new HashSet<>();
+ for (final Connection connection : connections) {
+ final VersionedConnection versionedConnection =
flowMapper.mapConnection(connection);
+ versionedConnections.add(versionedConnection);
+
connection.setVersionedComponentId(versionedConnection.getIdentifier());
+ }
+
+ final VersionedProcessGroup flowContents = createFlowContents();
flowContents.setProcessors(versionedProcessors);
- flowContents.setControllerServices(services);
+ flowContents.setControllerServices(versionedServices);
+ flowContents.setProcessGroups(versionedProcessGroups);
+ flowContents.setInputPorts(versionedInputPorts);
+ flowContents.setConnections(versionedConnections);
- final VersionedFlowSnapshot versionedFlowSnapshot = new
VersionedFlowSnapshot();
- versionedFlowSnapshot.setSnapshotMetadata(snapshotMetadata);
- versionedFlowSnapshot.setBucket(bucket);
- versionedFlowSnapshot.setFlow(flow);
- versionedFlowSnapshot.setFlowContents(flowContents);
+ final VersionedFlowSnapshot versionedFlowSnapshot =
createVersionedFlowSnapshot(snapshotMetadata, bucket, flow, flowContents);
if (parameters != null) {
final Set<VersionedParameter> versionedParameters = new
HashSet<>();
@@ -483,4 +578,69 @@ public class ImportFlowIT extends FrameworkIntegrationTest
{
return versionedFlowSnapshot;
}
+
+ private VersionedFlowSnapshot createFlowSnapshot(final
List<ControllerServiceNode> controllerServices, final List<ProcessorNode>
processors, final Set<Parameter> parameters) {
+ return createFlowSnapshot(null, controllerServices, processors,
parameters);
+ }
+
+ private VersionedFlowSnapshot createFlowSnapshot(final ProcessGroup group)
{
+ return createFlowSnapshot(group, Collections.EMPTY_LIST,
Collections.EMPTY_LIST, null);
+ }
+
+ @NotNull
+ private VersionedFlowSnapshot
createVersionedFlowSnapshot(VersionedFlowSnapshotMetadata snapshotMetadata,
Bucket bucket, VersionedFlow flow, VersionedProcessGroup flowContents) {
+ final VersionedFlowSnapshot versionedFlowSnapshot = new
VersionedFlowSnapshot();
+ versionedFlowSnapshot.setSnapshotMetadata(snapshotMetadata);
+ versionedFlowSnapshot.setBucket(bucket);
+ versionedFlowSnapshot.setFlow(flow);
+ versionedFlowSnapshot.setFlowContents(flowContents);
+ return versionedFlowSnapshot;
+ }
+
+ @NotNull
+ private VersionedProcessGroup createFlowContents() {
+ final VersionedProcessGroup flowContents = new VersionedProcessGroup();
+ flowContents.setIdentifier("unit-test-flow-contents");
+ flowContents.setName("Unit Test");
+ return flowContents;
+ }
+
+ private void createBundle() {
+ final BundleCoordinate coordinate =
getSystemBundle().getBundleDetails().getCoordinate();
+ final Bundle bundle = new Bundle();
+ bundle.setArtifact(coordinate.getId());
+ bundle.setGroup(coordinate.getGroup());
+ bundle.setVersion(coordinate.getVersion());
+ }
+
+ @NotNull
+ private VersionedFlow createVersionedFlow() {
+ final VersionedFlow flow = new VersionedFlow();
+ flow.setBucketIdentifier("unit-test-bucket");
+ flow.setBucketName("Unit Test Bucket");
+ flow.setCreatedTimestamp(System.currentTimeMillis());
+ flow.setIdentifier("unit-test-flow");
+ flow.setName("Unit Test Flow");
+ return flow;
+ }
+
+ @NotNull
+ private Bucket createBucket() {
+ final Bucket bucket = new Bucket();
+ bucket.setCreatedTimestamp(System.currentTimeMillis());
+ bucket.setIdentifier("unit-test-bucket");
+ bucket.setName("Unit Test Bucket");
+ return bucket;
+ }
+
+ @NotNull
+ private VersionedFlowSnapshotMetadata createSnapshotMetadata() {
+ final VersionedFlowSnapshotMetadata snapshotMetadata = new
VersionedFlowSnapshotMetadata();
+ snapshotMetadata.setAuthor("unit-test");
+ snapshotMetadata.setBucketIdentifier("unit-test-bucket");
+ snapshotMetadata.setFlowIdentifier("unit-test-flow");
+ snapshotMetadata.setTimestamp(System.currentTimeMillis());
+ snapshotMetadata.setVersion(1);
+ return snapshotMetadata;
+ }
}