This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 98480e57ca5 NIFI-15661 Fixed Flow Comparison showing changes for
nested child components (#10955)
98480e57ca5 is described below
commit 98480e57ca55f21ba2e0b9f49e059588ae44c3f8
Author: Pierre Villard <[email protected]>
AuthorDate: Tue Mar 10 21:23:27 2026 +0100
NIFI-15661 Fixed Flow Comparison showing changes for nested child
components (#10955)
- Flow Comparison should not report child components for nested versioned
process groups with version-only changes
Signed-off-by: David Handermann <[email protected]>
---
.../registry/flow/diff/StandardFlowComparator.java | 18 +-
.../flow/diff/TestStandardFlowComparator.java | 284 ++++++++++++++++-----
2 files changed, 219 insertions(+), 83 deletions(-)
diff --git
a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java
b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java
index a298e66a14c..875d595b456 100644
---
a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java
+++
b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java
@@ -576,7 +576,7 @@ public class StandardFlowComparator implements
FlowComparator {
final boolean shouldCompareVersioned =
flowCoordinateDifferences.stream()
.anyMatch(diff -> !diff.getFieldName().isPresent() ||
!diff.getFieldName().get().equals(FLOW_VERSION)) ||
flowComparatorVersionedStrategy == FlowComparatorVersionedStrategy.DEEP;
- final boolean compareGroupContents = !bothGroupsVersioned ||
shouldCompareVersioned || hasProcessGroupContents(groupA) ||
hasProcessGroupContents(groupB);
+ final boolean compareGroupContents = !bothGroupsVersioned ||
shouldCompareVersioned;
if (compareGroupContents) {
extractPGComponentsDifferences(groupA, groupB, differences);
@@ -630,22 +630,6 @@ public class StandardFlowComparator implements
FlowComparator {
this::compare));
}
- private boolean hasProcessGroupContents(final VersionedProcessGroup group)
{
- if (group == null) {
- return false;
- }
-
- return !group.getConnections().isEmpty()
- || !group.getProcessors().isEmpty()
- || !group.getControllerServices().isEmpty()
- || !group.getFunnels().isEmpty()
- || !group.getInputPorts().isEmpty()
- || !group.getLabels().isEmpty()
- || !group.getOutputPorts().isEmpty()
- || !group.getProcessGroups().isEmpty()
- || !group.getRemoteProcessGroups().isEmpty();
- }
-
private void compareFlowCoordinates(final VersionedProcessGroup groupA,
final VersionedProcessGroup groupB, final Set<FlowDifference> differences) {
final VersionedFlowCoordinates coordinatesA =
groupA.getVersionedFlowCoordinates();
final VersionedFlowCoordinates coordinatesB =
groupB.getVersionedFlowCoordinates();
diff --git
a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/test/java/org/apache/nifi/registry/flow/diff/TestStandardFlowComparator.java
b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/test/java/org/apache/nifi/registry/flow/diff/TestStandardFlowComparator.java
index 1db6c87ef3d..b068b43e780 100644
---
a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/test/java/org/apache/nifi/registry/flow/diff/TestStandardFlowComparator.java
+++
b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/test/java/org/apache/nifi/registry/flow/diff/TestStandardFlowComparator.java
@@ -422,72 +422,6 @@ public class TestStandardFlowComparator {
"Differences found: " + differences);
}
- /**
- * Test that when a separately versioned nested PG has DIFFERENT version
coordinates,
- * scheduled state changes inside it ARE detected.
- */
- @Test
- public void
testChangesDetectedForSeparatelyVersionedNestedGroupWhenVersionsDiffer() {
- final String rootPgIdentifier = "rootPG";
- final String nestedPgIdentifier = "nestedPG";
- final String procIdentifier = "processorZ";
- final VersionedProcessGroup registryRoot = new VersionedProcessGroup();
- registryRoot.setIdentifier(rootPgIdentifier);
-
- final VersionedProcessGroup localRoot = new VersionedProcessGroup();
- localRoot.setIdentifier(rootPgIdentifier);
-
- final VersionedProcessGroup registryNested = new
VersionedProcessGroup();
- registryNested.setIdentifier(nestedPgIdentifier);
- final VersionedFlowCoordinates registryCoords =
createVersionedFlowCoordinates();
- registryCoords.setVersion("1");
- registryNested.setVersionedFlowCoordinates(registryCoords);
- registryRoot.getProcessGroups().add(registryNested);
-
- final VersionedProcessGroup localNested = new VersionedProcessGroup();
- localNested.setIdentifier(nestedPgIdentifier);
- final VersionedFlowCoordinates localCoords =
createVersionedFlowCoordinates();
- localCoords.setVersion("2"); // Different version
- localNested.setVersionedFlowCoordinates(localCoords);
- localRoot.getProcessGroups().add(localNested);
-
- final VersionedProcessor registryProcessor = new VersionedProcessor();
- registryProcessor.setIdentifier(procIdentifier);
- registryProcessor.setScheduledState(ScheduledState.ENABLED);
- registryProcessor.setProperties(Collections.emptyMap());
- registryProcessor.setPropertyDescriptors(Collections.emptyMap());
- registryNested.getProcessors().add(registryProcessor);
-
- final VersionedProcessor localProcessor = new VersionedProcessor();
- localProcessor.setIdentifier(procIdentifier);
- localProcessor.setScheduledState(ScheduledState.DISABLED);
- localProcessor.setProperties(Collections.emptyMap());
- localProcessor.setPropertyDescriptors(Collections.emptyMap());
- localNested.getProcessors().add(localProcessor);
-
- final ComparableDataFlow registryFlow = new
StandardComparableDataFlow("registry", registryRoot);
- final ComparableDataFlow localFlow = new
StandardComparableDataFlow("local", localRoot);
-
- final StandardFlowComparator testComparator = new
StandardFlowComparator(
- registryFlow,
- localFlow,
- Collections.emptySet(),
- new StaticDifferenceDescriptor(),
- Function.identity(),
- VersionedComponent::getIdentifier,
- FlowComparatorVersionedStrategy.SHALLOW);
-
- final Set<FlowDifference> differences =
testComparator.compare().getDifferences();
-
- final boolean scheduledStateDiffFound = differences.stream()
- .anyMatch(diff -> diff.getDifferenceType() ==
DifferenceType.SCHEDULED_STATE_CHANGED
- && diff.getComponentB() != null
- &&
procIdentifier.equals(diff.getComponentB().getIdentifier()));
-
- assertTrue(scheduledStateDiffFound,
- "When nested versioned PG has different version coordinates,
scheduled state change should be detected");
- }
-
/**
* Test for NIFI-15366: Simulates the actual bug scenario where:
* - Child PG B was committed separately to registry
@@ -653,6 +587,224 @@ public class TestStandardFlowComparator {
return asset;
}
+ /**
+ * Ensures that when a non-versioned nested PG (PG2) with a processor is
added inside a
+ * versioned parent PG (PG1), "Show Local Changes" on PG1 reports both the
addition of
+ * PG2 and the processor inside PG2.
+ */
+ @Test
+ public void testAddNonVersionedNestedPGWithProcessorShowsBothAdditions() {
+ final String rootPgId = "rootPG";
+ final String nestedPgId = "nestedPG";
+ final String nestedProcessorId = "nestedProcessor";
+
+ // Registry snapshot: PG1 has no nested PGs
+ final VersionedProcessGroup registryRoot = new VersionedProcessGroup();
+ registryRoot.setIdentifier(rootPgId);
+
+ // Local: PG1 now contains PG2 (NOT versioned) with a processor
+ final VersionedProcessGroup localRoot = new VersionedProcessGroup();
+ localRoot.setIdentifier(rootPgId);
+
+ final VersionedProcessGroup localNested = new VersionedProcessGroup();
+ localNested.setIdentifier(nestedPgId);
+ localRoot.getProcessGroups().add(localNested);
+
+ final VersionedProcessor nestedProcessor = new VersionedProcessor();
+ nestedProcessor.setIdentifier(nestedProcessorId);
+ nestedProcessor.setScheduledState(ScheduledState.ENABLED);
+ nestedProcessor.setProperties(Collections.emptyMap());
+ nestedProcessor.setPropertyDescriptors(Collections.emptyMap());
+ localNested.getProcessors().add(nestedProcessor);
+
+ final ComparableDataFlow registryFlow = new
StandardComparableDataFlow("Versioned Flow", registryRoot);
+ final ComparableDataFlow localFlow = new
StandardComparableDataFlow("Local Flow", localRoot);
+
+ // DEEP strategy: both PG2 addition AND processor addition should be
reported
+ final StandardFlowComparator deepComparator = new
StandardFlowComparator(
+ registryFlow, localFlow, Collections.emptySet(), new
StaticDifferenceDescriptor(),
+ Function.identity(), VersionedComponent::getIdentifier,
FlowComparatorVersionedStrategy.DEEP);
+
+ final Set<FlowDifference> deepDifferences =
deepComparator.compare().getDifferences();
+
+ assertTrue(deepDifferences.stream().anyMatch(diff ->
diff.getDifferenceType() == DifferenceType.COMPONENT_ADDED
+ && diff.getComponentB().getComponentType() ==
ComponentType.PROCESS_GROUP
+ &&
nestedPgId.equals(diff.getComponentB().getIdentifier())),
+ "DEEP: Should report PG2 as added. Differences: " +
deepDifferences);
+ assertTrue(deepDifferences.stream().anyMatch(diff ->
diff.getDifferenceType() == DifferenceType.COMPONENT_ADDED
+ && diff.getComponentB().getComponentType() ==
ComponentType.PROCESSOR
+ &&
nestedProcessorId.equals(diff.getComponentB().getIdentifier())),
+ "DEEP: Should report the processor inside PG2 as added.
Differences: " + deepDifferences);
+
+ // SHALLOW strategy: only PG2 addition is reported (processor inside
is not expanded)
+ final StandardFlowComparator shallowComparator = new
StandardFlowComparator(
+ registryFlow, localFlow, Collections.emptySet(), new
StaticDifferenceDescriptor(),
+ Function.identity(), VersionedComponent::getIdentifier,
FlowComparatorVersionedStrategy.SHALLOW);
+
+ final Set<FlowDifference> shallowDifferences =
shallowComparator.compare().getDifferences();
+
+ assertTrue(shallowDifferences.stream().anyMatch(diff ->
diff.getDifferenceType() == DifferenceType.COMPONENT_ADDED
+ && diff.getComponentB().getComponentType() ==
ComponentType.PROCESS_GROUP
+ &&
nestedPgId.equals(diff.getComponentB().getIdentifier())),
+ "SHALLOW: Should report PG2 as added. Differences: " +
shallowDifferences);
+ assertEquals(1, shallowDifferences.size(),
+ "SHALLOW: Should only report the PG addition itself (processor
inside not expanded). Differences: " + shallowDifferences);
+ }
+
+ /**
+ * Ensures that when a processor is added to an already-existing
non-versioned nested PG,
+ * "Show Local Changes" on the parent reports the processor addition.
+ */
+ @Test
+ public void
testAddProcessorToExistingNonVersionedNestedPGShowsProcessorAddition() {
+ final String rootPgId = "rootPG";
+ final String nestedPgId = "nestedPG";
+ final String addedProcessorId = "addedProcessor";
+
+ // Registry snapshot: PG1 contains PG2 (no flow coordinates = not
versioned), PG2 is empty
+ final VersionedProcessGroup registryRoot = new VersionedProcessGroup();
+ registryRoot.setIdentifier(rootPgId);
+
+ final VersionedProcessGroup registryNested = new
VersionedProcessGroup();
+ registryNested.setIdentifier(nestedPgId);
+ registryRoot.getProcessGroups().add(registryNested);
+
+ // Local: PG2 now has a processor added to it
+ final VersionedProcessGroup localRoot = new VersionedProcessGroup();
+ localRoot.setIdentifier(rootPgId);
+
+ final VersionedProcessGroup localNested = new VersionedProcessGroup();
+ localNested.setIdentifier(nestedPgId);
+ localRoot.getProcessGroups().add(localNested);
+
+ final VersionedProcessor addedProcessor = new VersionedProcessor();
+ addedProcessor.setIdentifier(addedProcessorId);
+ addedProcessor.setScheduledState(ScheduledState.ENABLED);
+ addedProcessor.setProperties(Collections.emptyMap());
+ addedProcessor.setPropertyDescriptors(Collections.emptyMap());
+ localNested.getProcessors().add(addedProcessor);
+
+ final ComparableDataFlow registryFlow = new
StandardComparableDataFlow("Versioned Flow", registryRoot);
+ final ComparableDataFlow localFlow = new
StandardComparableDataFlow("Local Flow", localRoot);
+
+ final StandardFlowComparator testComparator = new
StandardFlowComparator(
+ registryFlow, localFlow, Collections.emptySet(), new
StaticDifferenceDescriptor(),
+ Function.identity(), VersionedComponent::getIdentifier,
FlowComparatorVersionedStrategy.SHALLOW);
+
+ final Set<FlowDifference> differences =
testComparator.compare().getDifferences();
+
+ assertEquals(1, differences.size(), "Should report exactly the added
processor. Differences: " + differences);
+ final FlowDifference diff = differences.iterator().next();
+ assertEquals(DifferenceType.COMPONENT_ADDED, diff.getDifferenceType());
+ assertEquals(addedProcessorId, diff.getComponentB().getIdentifier());
+ assertEquals(ComponentType.PROCESSOR,
diff.getComponentB().getComponentType());
+ }
+
+ /**
+ * When a nested separately version-controlled PG has its version changed
+ * (committed separately to the registry), the parent's "Show Local
Changes" should only
+ * report the version coordinate change for the child PG. It should NOT
report individual
+ * component changes (processors, connections, etc.) from inside the child
PG.
+ *
+ * In the real-world scenario, the parent's registry snapshot contains the
child PG's contents
+ * with the child's REGISTRY identifiers, while locally the child PG's
contents use LOCAL NiFi
+ * instance identifiers. This causes identifier mismatches that the
comparator incorrectly
+ * reports as components being added/removed.
+ *
+ * Scenario:
+ * - Parent PG contains a processor and a nested child PG with 3 processors
+ * - Both are committed to the registry; child is at version 1
+ * - User changes something in the child PG and commits child to version 2
+ * - Parent's "Show Local Changes" should show: child PG version changed
(1 -> 2)
+ * - Parent's "Show Local Changes" should NOT show: individual processors
from child PG
+ */
+ @Test
+ public void
testNestedVersionedPGVersionChangeOnlyReportsVersionCoordinateDifference() {
+ final String parentPgId = "parentPG";
+ final String childPgId = "childPG";
+ final String parentProcessorId = "parentProcessor";
+
+ // --- Registry snapshot of parent (child at version 1) ---
+ final VersionedProcessGroup registryParent = new
VersionedProcessGroup();
+ registryParent.setIdentifier(parentPgId);
+
+ final VersionedProcessor registryParentProc = new VersionedProcessor();
+ registryParentProc.setIdentifier(parentProcessorId);
+ registryParentProc.setScheduledState(ScheduledState.ENABLED);
+ registryParentProc.setProperties(Collections.emptyMap());
+ registryParentProc.setPropertyDescriptors(Collections.emptyMap());
+ registryParent.getProcessors().add(registryParentProc);
+
+ final VersionedProcessGroup registryChild = new
VersionedProcessGroup();
+ registryChild.setIdentifier(childPgId);
+ final VersionedFlowCoordinates registryChildCoords =
createVersionedFlowCoordinates();
+ registryChildCoords.setVersion("1");
+ registryChild.setVersionedFlowCoordinates(registryChildCoords);
+ registryParent.getProcessGroups().add(registryChild);
+
+ // Child processors in registry snapshot have REGISTRY identifiers
+ for (int i = 1; i <= 3; i++) {
+ final VersionedProcessor proc = new VersionedProcessor();
+ proc.setIdentifier("registry-child-proc-" + i);
+ proc.setScheduledState(ScheduledState.ENABLED);
+ proc.setProperties(Collections.emptyMap());
+ proc.setPropertyDescriptors(Collections.emptyMap());
+ registryChild.getProcessors().add(proc);
+ }
+
+ // --- Local state of parent (child at version 2, committed
separately) ---
+ final VersionedProcessGroup localParent = new VersionedProcessGroup();
+ localParent.setIdentifier(parentPgId);
+
+ final VersionedProcessor localParentProc = new VersionedProcessor();
+ localParentProc.setIdentifier(parentProcessorId);
+ localParentProc.setScheduledState(ScheduledState.ENABLED);
+ localParentProc.setProperties(Collections.emptyMap());
+ localParentProc.setPropertyDescriptors(Collections.emptyMap());
+ localParent.getProcessors().add(localParentProc);
+
+ final VersionedProcessGroup localChild = new VersionedProcessGroup();
+ localChild.setIdentifier(childPgId);
+ final VersionedFlowCoordinates localChildCoords =
createVersionedFlowCoordinates();
+ localChildCoords.setVersion("2");
+ localChild.setVersionedFlowCoordinates(localChildCoords);
+ localParent.getProcessGroups().add(localChild);
+
+ // Child processors locally have LOCAL NiFi instance identifiers
(different from registry)
+ for (int i = 1; i <= 3; i++) {
+ final VersionedProcessor proc = new VersionedProcessor();
+ proc.setIdentifier("local-child-proc-" + i);
+ proc.setScheduledState(ScheduledState.ENABLED);
+ proc.setProperties(Collections.emptyMap());
+ proc.setPropertyDescriptors(Collections.emptyMap());
+ localChild.getProcessors().add(proc);
+ }
+
+ final ComparableDataFlow registryFlow = new
StandardComparableDataFlow("Versioned Flow", registryParent);
+ final ComparableDataFlow localFlow = new
StandardComparableDataFlow("Local Flow", localParent);
+
+ final StandardFlowComparator testComparator = new
StandardFlowComparator(
+ registryFlow, localFlow, Collections.emptySet(), new
StaticDifferenceDescriptor(),
+ Function.identity(), VersionedComponent::getIdentifier,
FlowComparatorVersionedStrategy.SHALLOW);
+
+ final Set<FlowDifference> differences =
testComparator.compare().getDifferences();
+
+ final long versionCoordinateChanges = differences.stream()
+ .filter(diff -> diff.getDifferenceType() ==
DifferenceType.VERSIONED_FLOW_COORDINATES_CHANGED)
+ .count();
+ assertEquals(1, versionCoordinateChanges, "Should report exactly one
version coordinate change for the child PG");
+
+ final boolean hasChildProcessorDifferences = differences.stream()
+ .anyMatch(diff -> (diff.getComponentA() instanceof
VersionedProcessor &&
!parentProcessorId.equals(diff.getComponentA().getIdentifier()))
+ || (diff.getComponentB() instanceof VersionedProcessor
&& !parentProcessorId.equals(diff.getComponentB().getIdentifier())));
+ assertFalse(hasChildProcessorDifferences,
+ "Parent's local changes should NOT include individual
processor differences from inside the nested versioned child PG. " +
+ "Differences found: " + differences);
+
+ assertEquals(1, differences.size(),
+ "Should only have 1 difference (child PG version coordinate
change). Differences found: " + differences);
+ }
+
private VersionedFlowCoordinates createVersionedFlowCoordinates() {
final VersionedFlowCoordinates coordinates = new
VersionedFlowCoordinates();
coordinates.setRegistryId("registry");