This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 905f7096a2 NIFI-14700 Added Option to Ignore Changes to Parameter
Values in Git Registry Clients (#10056)
905f7096a2 is described below
commit 905f7096a2f665ce2f0e82c602d9c042fbb52052
Author: Pierre Villard <[email protected]>
AuthorDate: Mon Jul 7 22:03:30 2025 +0200
NIFI-14700 Added Option to Ignore Changes to Parameter Values in Git
Registry Clients (#10056)
Signed-off-by: David Handermann <[email protected]>
---
.../flow/git/AbstractGitFlowRegistryClient.java | 23 ++-
.../nifi/github/GitHubFlowRegistryClientTest.java | 176 +++++++++++++++++++++
2 files changed, 198 insertions(+), 1 deletion(-)
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClient.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClient.java
index bfec50dcfd..5e539bc147 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClient.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClient.java
@@ -26,6 +26,8 @@ import org.apache.nifi.flow.Position;
import org.apache.nifi.flow.VersionedComponent;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedFlowCoordinates;
+import org.apache.nifi.flow.VersionedParameter;
+import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.registry.flow.AbstractFlowRegistryClient;
@@ -58,9 +60,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -362,6 +366,22 @@ public abstract class AbstractGitFlowRegistryClient
extends AbstractFlowRegistry
flowSnapshot.getParameterContexts().forEach((name,
parameterContext) ->
parameterContext.getParameters().forEach(parameter ->
parameter.setValue(null))
);
+ } else if
(ParameterContextValuesStrategy.IGNORE_CHANGES.equals(parameterContextValuesStrategy))
{
+ existingSnapshot.getParameterContexts().forEach((name,
parameterContext) -> {
+ final VersionedParameterContext targetContext =
flowSnapshot.getParameterContexts().get(name);
+ if (targetContext != null) {
+ final Map<String, VersionedParameter> targetParamMap =
targetContext.getParameters()
+ .stream()
+
.collect(Collectors.toMap(VersionedParameter::getName, Function.identity()));
+
+ parameterContext.getParameters().forEach(parameter -> {
+ final VersionedParameter targetParam =
targetParamMap.get(parameter.getName());
+ if (targetParam != null) {
+ targetParam.setValue(parameter.getValue());
+ }
+ });
+ }
+ });
}
// replace the id of the top level group and all of its references
with a constant value prior to serializing to avoid
@@ -676,7 +696,8 @@ public abstract class AbstractGitFlowRegistryClient extends
AbstractFlowRegistry
enum ParameterContextValuesStrategy implements DescribedValue {
RETAIN("Retain", "Retain Values in Parameter Contexts without
modifications"),
- REMOVE("Remove", "Remove Values from Parameter Context");
+ REMOVE("Remove", "Remove Values from Parameter Context"),
+ IGNORE_CHANGES("Ignore Changes", "Ignore any change on existing
parameters");
private final String displayName;
private final String description;
diff --git
a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/test/java/org/apache/nifi/github/GitHubFlowRegistryClientTest.java
b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/test/java/org/apache/nifi/github/GitHubFlowRegistryClientTest.java
index e7f9e98b4b..0d50d59acd 100644
---
a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/test/java/org/apache/nifi/github/GitHubFlowRegistryClientTest.java
+++
b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/test/java/org/apache/nifi/github/GitHubFlowRegistryClientTest.java
@@ -18,6 +18,8 @@
package org.apache.nifi.github;
import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.flow.VersionedParameter;
+import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.registry.flow.FlowRegistryBucket;
@@ -37,7 +39,11 @@ import org.mockito.ArgumentCaptor;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -164,6 +170,155 @@ public class GitHubFlowRegistryClientTest {
assertEquals(incomingMetadata.getBucketIdentifier(),
resultBucket.getName());
}
+ @Test
+ public void testRegisterFlowSnapshotWithIgnoreUpdatesParameterStrategy()
throws IOException, FlowRegistryException {
+ setupClientConfigurationContextWithDefaults();
+
+ final PropertyValue parametersPropertyValue =
createMockPropertyValueWithEnum("IGNORE_CHANGES");
+
when(clientConfigurationContext.getProperty(GitHubFlowRegistryClient.PARAMETER_CONTEXT_VALUES)).thenReturn(parametersPropertyValue);
+
+ final RegisteredFlow incomingFlow = createIncomingRegisteredFlow();
+
+ // Create existing snapshot with PC1 (Parameter1=A) and PC2
(Parameter2=B)
+ final RegisteredFlowSnapshot existingSnapshot =
createExistingSnapshotWithParameters();
+
+ // Create incoming snapshot with PC1 (Parameter1=X, Parameter2=Y,
Parameter3=Z)
+ final RegisteredFlowSnapshot incomingSnapshot =
createIncomingSnapshotWithNewParameters(incomingFlow);
+
+ final String snapshotFilePath =
"%s/%s.json".formatted(incomingFlow.getBucketIdentifier(),
incomingFlow.getIdentifier());
+
+ // Mock the repository client to return the existing snapshot
+ when(repositoryClient.getContentFromBranch(snapshotFilePath,
DEFAULT_REPO_BRANCH))
+ .thenReturn(new ByteArrayInputStream("existing
content".getBytes()));
+ when(flowSnapshotSerializer.deserialize(any(InputStream.class)))
+ .thenReturn(existingSnapshot);
+
+ final String serializedSnapshotContent = "serialized content";
+
when(flowSnapshotSerializer.serialize(any(RegisteredFlowSnapshot.class)))
+ .thenReturn(serializedSnapshotContent);
+
+ final String commitSha = "commitSha";
+
when(repositoryClient.createContent(any(GitCreateContentRequest.class)))
+ .thenReturn(commitSha);
+
+ // Execute the method under test
+ final RegisteredFlowSnapshot resultSnapshot =
flowRegistryClient.registerFlowSnapshot(
+ clientConfigurationContext, incomingSnapshot,
RegisterAction.COMMIT);
+
+ // Verify the result
+ assertNotNull(resultSnapshot);
+
+ // Verify Parameter Context PC1
+ VersionedParameterContext pc1 =
resultSnapshot.getParameterContexts().get("PC1");
+ assertNotNull(pc1);
+
+ Map<String, VersionedParameter> pc1Params = pc1.getParameters()
+ .stream()
+ .collect(Collectors.toMap(VersionedParameter::getName,
Function.identity()));
+
+ // Parameter1 should keep existing value A (not updated to X)
+ assertEquals("A", pc1Params.get("Parameter1").getValue());
+ // Parameter2 should have new value Y (new parameter)
+ assertEquals("Y", pc1Params.get("Parameter2").getValue());
+ // Parameter3 should have new value Z (new parameter)
+ assertEquals("Z", pc1Params.get("Parameter3").getValue());
+
+ // Verify Parameter Context PC2
+ VersionedParameterContext pc2 =
resultSnapshot.getParameterContexts().get("PC2");
+ assertNull(pc2);
+ }
+
+ private RegisteredFlowSnapshot createExistingSnapshotWithParameters() {
+ final RegisteredFlowSnapshot existingSnapshot = new
RegisteredFlowSnapshot();
+
+ // Create existing flow
+ final RegisteredFlow existingFlow = new RegisteredFlow();
+ existingFlow.setIdentifier("flow-id");
+ existingFlow.setName("Test Flow");
+ existingFlow.setBucketIdentifier("bucket-id");
+ existingFlow.setBucketName("Test Bucket");
+ existingFlow.setBranch(DEFAULT_REPO_BRANCH);
+ existingSnapshot.setFlow(existingFlow);
+
+ // Create existing metadata
+ final RegisteredFlowSnapshotMetadata existingMetadata = new
RegisteredFlowSnapshotMetadata();
+ existingMetadata.setBranch(DEFAULT_REPO_BRANCH);
+ existingMetadata.setBucketIdentifier("bucket-id");
+ existingMetadata.setFlowIdentifier("flow-id");
+ existingSnapshot.setSnapshotMetadata(existingMetadata);
+
+ // Create Parameter Context PC1 with Parameter1=A
+ final VersionedParameterContext pc1 = new VersionedParameterContext();
+ pc1.setName("PC1");
+
+ final VersionedParameter param1 = new VersionedParameter();
+ param1.setName("Parameter1");
+ param1.setValue("A");
+ pc1.setParameters(Set.of(param1));
+
+ // Create Parameter Context PC2 with Parameter2=B
+ final VersionedParameterContext pc2 = new VersionedParameterContext();
+ pc2.setName("PC2");
+
+ final VersionedParameter param2 = new VersionedParameter();
+ param2.setName("Parameter2");
+ param2.setValue("B");
+ pc2.setParameters(Set.of(param2));
+
+ // Set parameter contexts on existing snapshot
+ final Map<String, VersionedParameterContext> existingParameterContexts
= new HashMap<>();
+ existingParameterContexts.put("PC1", pc1);
+ existingParameterContexts.put("PC2", pc2);
+ existingSnapshot.setParameterContexts(existingParameterContexts);
+
+ // Set flow contents
+ existingSnapshot.setFlowContents(new VersionedProcessGroup());
+
+ return existingSnapshot;
+ }
+
+ private RegisteredFlowSnapshot
createIncomingSnapshotWithNewParameters(RegisteredFlow incomingFlow) {
+ final RegisteredFlowSnapshot incomingSnapshot = new
RegisteredFlowSnapshot();
+ incomingSnapshot.setFlow(incomingFlow);
+
+ final long timestamp = System.currentTimeMillis();
+ final RegisteredFlowSnapshotMetadata incomingMetadata = new
RegisteredFlowSnapshotMetadata();
+ incomingMetadata.setBranch(incomingFlow.getBranch());
+
incomingMetadata.setBucketIdentifier(incomingFlow.getBucketIdentifier());
+ incomingMetadata.setFlowIdentifier(incomingFlow.getIdentifier());
+ incomingMetadata.setComments("Unit test with parameter updates");
+ incomingMetadata.setTimestamp(timestamp);
+ incomingSnapshot.setSnapshotMetadata(incomingMetadata);
+
+ // Create Parameter Context PC1 with Parameter1=X, Parameter2=Y,
Parameter3=Z
+ final VersionedParameterContext pc1 = new VersionedParameterContext();
+ pc1.setName("PC1");
+
+ final VersionedParameter param1 = new VersionedParameter();
+ param1.setName("Parameter1");
+ param1.setValue("X"); // This should be ignored and keep value A
+
+ final VersionedParameter param2 = new VersionedParameter();
+ param2.setName("Parameter2");
+ param2.setValue("Y"); // This is new and should be kept
+
+ final VersionedParameter param3 = new VersionedParameter();
+ param3.setName("Parameter3");
+ param3.setValue("Z"); // This is new and should be kept
+
+ pc1.setParameters(Set.of(param1, param2, param3));
+
+ // Set parameter contexts on incoming snapshot (only PC1 in this case)
+ final Map<String, VersionedParameterContext> incomingParameterContexts
= new HashMap<>();
+ incomingParameterContexts.put("PC1", pc1);
+ incomingSnapshot.setParameterContexts(incomingParameterContexts);
+
+ // Set flow contents
+ incomingSnapshot.setFlowContents(new VersionedProcessGroup());
+
+ return incomingSnapshot;
+ }
+
private RegisteredFlow createIncomingRegisteredFlow() {
final RegisteredFlow incomingFlow = new RegisteredFlow();
incomingFlow.setIdentifier("Flow1");
@@ -198,6 +353,27 @@ public class GitHubFlowRegistryClientTest {
return propertyValue;
}
+ private PropertyValue createMockPropertyValueWithEnum(final String value) {
+ final PropertyValue propertyValue = mock(PropertyValue.class);
+ when(propertyValue.getValue()).thenReturn(value);
+
+ // Mock asAllowableValue to find the enum constant that matches the
string value
+ when(propertyValue.asAllowableValue(any())).thenAnswer(invocation -> {
+ Class<?> clazz = invocation.getArgument(0);
+ if (clazz.isEnum()) {
+ Object[] enumConstants = clazz.getEnumConstants();
+ for (Object enumConstant : enumConstants) {
+ if (value.equals(enumConstant.toString())) {
+ return enumConstant;
+ }
+ }
+ }
+ return null;
+ });
+
+ return propertyValue;
+ }
+
private static class TestableGitHubRepositoryClient extends
GitHubFlowRegistryClient {
private final GitHubRepositoryClient repositoryClient;
private final FlowSnapshotSerializer flowSnapshotSerializer;