This is an automated email from the ASF dual-hosted git repository.

bejancsaba pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new bc589e273d NIFI-13020 Fix legacy flow C2 update issue
bc589e273d is described below

commit bc589e273d4e29dad8837a64895f60fb4a56173b
Author: Ferenc Kis <briansolo1...@gmail.com>
AuthorDate: Tue Apr 9 12:14:54 2024 +0200

    NIFI-13020 Fix legacy flow C2 update issue
    
    Signed-off-by: Csaba Bejan <bejan.cs...@gmail.com>
    
    This closes #8624
---
 .../commons/service/StandardFlowEnrichService.java | 112 ++++++++++++++++++++-
 1 file changed, 108 insertions(+), 4 deletions(-)

diff --git 
a/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowEnrichService.java
 
b/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowEnrichService.java
index 6a73992d15..95cc77abec 100644
--- 
a/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowEnrichService.java
+++ 
b/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowEnrichService.java
@@ -20,26 +20,37 @@ package org.apache.nifi.minifi.commons.service;
 import static java.lang.Boolean.FALSE;
 import static java.lang.Boolean.parseBoolean;
 import static java.util.Map.entry;
+import static java.util.Objects.isNull;
 import static java.util.Optional.empty;
 import static java.util.Optional.ofNullable;
 import static java.util.UUID.randomUUID;
 import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
 import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.nifi.flow.ScheduledState.ENABLED;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.controller.flow.VersionedDataflow;
 import org.apache.nifi.flow.Bundle;
 import org.apache.nifi.flow.ComponentType;
+import org.apache.nifi.flow.ConnectableComponent;
 import org.apache.nifi.flow.ControllerServiceAPI;
+import org.apache.nifi.flow.Position;
 import org.apache.nifi.flow.ScheduledState;
 import org.apache.nifi.flow.VersionedComponent;
 import org.apache.nifi.flow.VersionedControllerService;
 import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flow.VersionedRemoteProcessGroup;
 import org.apache.nifi.flow.VersionedReportingTask;
 import org.apache.nifi.logging.LogLevel;
 import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
@@ -68,6 +79,10 @@ public class StandardFlowEnrichService implements 
FlowEnrichService {
     private static final String SITE_TO_SITE_REPORTING_NAR_ARTIFACT = 
"nifi-site-to-site-reporting-nar";
     private static final String PROVENANCE_REPORTING_TASK_PROTOCOL = "HTTP";
     private static final String PROVENANCE_REPORTING_TASK_BEGINNING_OF_STREAM 
= "beginning-of-stream";
+    private static final String DEFAULT_BULLETIN_LEVEL = "WARN";
+    private static final String DEFAULT_EXECUTION_NODE = "ALL";
+    private static final Position DEFAULT_POSITION = new Position(0.0, 0.0);
+    private static final Predicate<? super VersionedComponent> 
IS_LEGACY_COMPONENT = versionedComponent -> 
isBlank(versionedComponent.getInstanceIdentifier());
 
     private final ReadableProperties minifiProperties;
 
@@ -86,11 +101,11 @@ public class StandardFlowEnrichService implements 
FlowEnrichService {
         
maxConcurrentThreads.ifPresent(versionedDataflow::setMaxTimerDrivenThreadCount);
 
         VersionedProcessGroup rootGroup = versionedDataflow.getRootGroup();
-        if (rootGroup.getIdentifier() == null) {
+        if (isBlank(rootGroup.getIdentifier())) {
             rootGroup.setIdentifier(randomUUID().toString());
         }
-        if (rootGroup.getInstanceIdentifier() == null) {
-            rootGroup.setInstanceIdentifier(randomUUID().toString());
+        if (isNull(rootGroup.getPosition())) {
+            rootGroup.setPosition(DEFAULT_POSITION);
         }
 
         rootGroup.getControllerServices().forEach(controllerService -> 
controllerService.setScheduledState(ENABLED));
@@ -106,6 +121,13 @@ public class StandardFlowEnrichService implements 
FlowEnrichService {
         
createProvenanceReportingTask(commonSslControllerService.map(VersionedComponent::getInstanceIdentifier).orElse(EMPTY))
             .ifPresent(versionedDataflow.getReportingTasks()::add);
 
+        if (IS_LEGACY_COMPONENT.test(rootGroup)) {
+            LOG.info("Legacy flow detected. Initializing missing but mandatory 
properties on components");
+            initializeComponentsMissingProperties(rootGroup);
+            Map<String, String> idToInstanceIdMap = 
createIdToInstanceIdMap(rootGroup);
+            setConnectableComponentsInstanceId(rootGroup, idToInstanceIdMap);
+        }
+
         return versionedDataflow;
     }
 
@@ -225,6 +247,88 @@ public class StandardFlowEnrichService implements 
FlowEnrichService {
                 entry("SSL Context Service", sslControllerServiceIdentifier))
             .stream()
             .filter(entry -> StringUtils.isNotBlank(entry.getValue()))
-            .collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
+            .collect(toMap(Entry::getKey, Entry::getValue));
+    }
+
+    private void initializeComponentsMissingProperties(VersionedProcessGroup 
versionedProcessGroup) {
+        versionedProcessGroup.setInstanceIdentifier(randomUUID().toString());
+
+        Stream.of(
+                
ofNullable(versionedProcessGroup.getControllerServices()).orElse(Set.of()),
+                
ofNullable(versionedProcessGroup.getConnections()).orElse(Set.of()),
+                
ofNullable(versionedProcessGroup.getProcessors()).orElse(Set.of()),
+                
ofNullable(versionedProcessGroup.getInputPorts()).orElse(Set.of()),
+                
ofNullable(versionedProcessGroup.getOutputPorts()).orElse(Set.of()),
+                
ofNullable(versionedProcessGroup.getFunnels()).orElse(Set.of()),
+                
ofNullable(versionedProcessGroup.getRemoteProcessGroups()).orElse(Set.of()),
+                
ofNullable(versionedProcessGroup.getRemoteProcessGroups()).orElse(Set.of())
+                    .stream()
+                    .map(VersionedRemoteProcessGroup::getInputPorts)
+                    .flatMap(Set::stream)
+                    .collect(toSet()),
+                
ofNullable(versionedProcessGroup.getRemoteProcessGroups()).orElse(Set.of())
+                    .stream()
+                    .map(VersionedRemoteProcessGroup::getOutputPorts)
+                    .flatMap(Set::stream)
+                    .collect(toSet()))
+            .flatMap(Set::stream)
+            .filter(IS_LEGACY_COMPONENT)
+            .forEach(versionedComponent -> {
+                
versionedComponent.setInstanceIdentifier(randomUUID().toString());
+                if (versionedComponent instanceof VersionedProcessor 
processor) {
+                    if (isBlank(processor.getBulletinLevel())) {
+                        processor.setBulletinLevel(DEFAULT_BULLETIN_LEVEL);
+                    }
+                    if (isBlank(processor.getExecutionNode())) {
+                        processor.setExecutionNode(DEFAULT_EXECUTION_NODE);
+                    }
+                }
+            });
+
+        
versionedProcessGroup.getProcessGroups().forEach(this::initializeComponentsMissingProperties);
+    }
+
+    private Map<String, String> createIdToInstanceIdMap(VersionedProcessGroup 
versionedProcessGroup) {
+        Map<String, String> thisProcessGroupIdToInstanceIdMaps = Stream.of(
+                
ofNullable(versionedProcessGroup.getProcessors()).orElse(Set.of()),
+                
ofNullable(versionedProcessGroup.getInputPorts()).orElse(Set.of()),
+                
ofNullable(versionedProcessGroup.getOutputPorts()).orElse(Set.of()),
+                
ofNullable(versionedProcessGroup.getFunnels()).orElse(Set.of()),
+                
ofNullable(versionedProcessGroup.getRemoteProcessGroups()).orElse(Set.of())
+                    .stream()
+                    .map(VersionedRemoteProcessGroup::getInputPorts)
+                    .flatMap(Set::stream)
+                    .collect(toSet()),
+                
ofNullable(versionedProcessGroup.getRemoteProcessGroups()).orElse(Set.of())
+                    .stream()
+                    .map(VersionedRemoteProcessGroup::getOutputPorts)
+                    .flatMap(Set::stream)
+                    .collect(toSet())
+            )
+            .flatMap(Set::stream)
+            .collect(toMap(VersionedComponent::getIdentifier, 
VersionedComponent::getInstanceIdentifier));
+
+        Stream<Map<String, String>> childProcessGroupsIdToInstanceIdMaps = 
ofNullable(versionedProcessGroup.getProcessGroups()).orElse(Set.of())
+            .stream()
+            .map(this::createIdToInstanceIdMap);
+
+        return Stream.concat(
+                Stream.of(thisProcessGroupIdToInstanceIdMaps),
+                childProcessGroupsIdToInstanceIdMaps)
+            .map(Map::entrySet)
+            .flatMap(Set::stream)
+            .collect(toMap(Entry::getKey, Entry::getValue));
+    }
+
+    private void setConnectableComponentsInstanceId(VersionedProcessGroup 
versionedProcessGroup, Map<String, String> idToInstanceIdMap) {
+        ofNullable(versionedProcessGroup.getConnections()).orElse(Set.of())
+            .forEach(connection -> {
+                ConnectableComponent source = connection.getSource();
+                
source.setInstanceIdentifier(idToInstanceIdMap.get(source.getId()));
+                ConnectableComponent destination = connection.getDestination();
+                
destination.setInstanceIdentifier(idToInstanceIdMap.get(destination.getId()));
+            });
+        ofNullable(versionedProcessGroup.getProcessGroups()).orElse(Set.of())
+            .forEach(childProcessGroup -> 
setConnectableComponentsInstanceId(childProcessGroup, idToInstanceIdMap));
     }
 }

Reply via email to